Skip to content

legacy_converters

Refactored converter classes that use the new Pydantic models from legacy_models.py.

_SLOW_DOWNLOAD_HOST_SUBSTRINGS module-attribute

_SLOW_DOWNLOAD_HOST_SUBSTRINGS = ('civitai',)

BaseLegacyConverter

Base converter for legacy model references using new Pydantic validation.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
class BaseLegacyConverter:
    """Base converter for legacy model references using new Pydantic validation."""

    legacy_folder_path: Path
    legacy_database_path: Path
    converted_folder_path: Path
    converted_database_file_path: Path
    model_reference_category: MODEL_REFERENCE_CATEGORY
    model_reference_type: type[LegacyGenericRecord]

    _all_legacy_records: dict[str, LegacyGenericRecord]
    """All validated legacy model records."""
    _all_converted_records: dict[str, GenericModelRecord]
    """All converted model records in the new format."""
    all_validation_errors_log: dict[str, list[str]]
    """All validation errors that occurred during conversion."""
    _host_counter: dict[str, int]
    """Counter for tracking download hosts across all records."""

    debug_mode: bool = False
    log_folder: Path
    dry_run: bool = False
    converted_successfully: bool = False

    def __init__(
        self,
        *,
        legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
        target_file_folder: str | Path = horde_model_reference_paths.base_path,
        log_folder: str | Path = horde_model_reference_paths.log_folder,
        model_reference_category: MODEL_REFERENCE_CATEGORY,
        debug_mode: bool = False,
        dry_run: bool = False,
    ) -> None:
        """Initialize the legacy converter.

        Args:
            legacy_folder_path: The legacy database folder.
            target_file_folder: The folder to write the converted database to.
            log_folder: The folder to write the log files to.
            model_reference_category: The category of model reference to convert.
            debug_mode: If true, include extra information in the error log.
            dry_run: If true, don't write out the converted database or any logs.

        """
        self._initialize()

        self.model_reference_category = model_reference_category
        self.model_reference_type = LegacyGenericRecord
        if model_reference_category == MODEL_REFERENCE_CATEGORY.image_generation:
            self.model_reference_type = LegacyStableDiffusionRecord
        elif model_reference_category == MODEL_REFERENCE_CATEGORY.clip:
            self.model_reference_type = LegacyClipRecord
        elif model_reference_category == MODEL_REFERENCE_CATEGORY.text_generation:
            self.model_reference_type = LegacyTextGenerationRecord

        normalized_legacy_base = path_consts.normalize_legacy_base_path(legacy_folder_path)
        normalized_target_base = path_consts.normalize_legacy_base_path(target_file_folder)

        self.legacy_folder_path = normalized_legacy_base / path_consts.LEGACY_REFERENCE_FOLDER_NAME
        self.legacy_database_path = horde_model_reference_paths.get_legacy_model_reference_file_path(
            model_reference_category=model_reference_category,
            base_path=normalized_legacy_base,
        )
        self.converted_folder_path = Path(normalized_target_base)
        self.converted_database_file_path = horde_model_reference_paths.get_model_reference_file_path(
            model_reference_category=model_reference_category,
            base_path=normalized_target_base,
        )
        self.debug_mode = debug_mode
        self.log_folder = Path(log_folder)
        self.dry_run = dry_run

    def _initialize(self) -> None:
        """Initialize the converter, allowing re-conversion if applicable."""
        self._all_legacy_records = {}
        self._all_converted_records = {}
        self.all_validation_errors_log = {}
        self._host_counter = {}
        self.converted_successfully = False

    def convert_to_new_format(self) -> dict[str, GenericModelRecord]:
        """Convert the legacy model reference to the new format.

        Returns:
            The converted model records in the new format.

        """
        if self.converted_successfully:
            self._initialize()

        self.pre_parse_records()
        self._load_and_validate_legacy_records()
        self._convert_legacy_to_new_format()
        self.post_parse_records()
        self.write_out_validation_errors()
        self.write_out_records()

        self.converted_successfully = True

        return self._all_converted_records

    def _load_and_validate_legacy_records(self) -> None:
        """Load and validate all legacy records using Pydantic models."""
        # Check if file exists and is not empty
        if not self.legacy_database_path.exists():
            logger.debug(f"Legacy database file {self.legacy_database_path} does not exist, skipping conversion")
            return

        file_size = self.legacy_database_path.stat().st_size
        if file_size == 0:
            logger.debug(f"Legacy database file {self.legacy_database_path} is empty, skipping conversion")
            return

        with open(self.legacy_database_path) as legacy_model_reference_file:
            raw_legacy_json_data: dict[str, dict[str, Any]] = json.load(legacy_model_reference_file)

        for model_record_key, model_record_contents in raw_legacy_json_data.items():
            issues: list[str] = []
            validation_context = {
                "issues": issues,
                "model_key": model_record_key,
                "debug_mode": self.debug_mode,
                "category": self.model_reference_category,
                "host_counter": self._host_counter,
            }

            # Add existing showcase files to context for stable diffusion
            if hasattr(self, "existing_showcase_files"):
                validation_context["existing_showcase_files"] = self.existing_showcase_files

            try:
                legacy_record = self.model_reference_type.model_validate(
                    model_record_contents,
                    context=validation_context,
                )
                self._all_legacy_records[model_record_key] = legacy_record

                if issues:
                    for issue in issues:
                        self.add_validation_error_to_log(model_record_key=model_record_key, error=issue)
            except ValidationError as e:
                error = f"CRITICAL: Error parsing {model_record_key}:\n{e}"
                self.add_validation_error_to_log(model_record_key=model_record_key, error=error)
                raise

    def _convert_legacy_to_new_format(self) -> None:
        """Convert validated legacy records to the new format."""
        for model_key, legacy_record in self._all_legacy_records.items():
            try:
                converted_record = self._convert_single_record(legacy_record)
                if converted_record is None:
                    self.add_validation_error_to_log(
                        model_record_key=model_key, error="Failed to convert legacy record to new format"
                    )
                else:
                    self._all_converted_records[model_key] = converted_record
            except Exception as e:
                error = f"Failed to convert {model_key}: {e}"
                self.add_validation_error_to_log(model_record_key=model_key, error=error)
                raise

    def _convert_model_record_config(self, legacy_record: LegacyGenericRecord) -> GenericModelRecordConfig:
        """Convert the config section of a legacy record to the new format."""
        download_records: dict[str, DownloadRecord] = {}

        for file_entry in legacy_record.config.files:
            if file_entry.path and "yaml" not in file_entry.path.lower():
                download_records[file_entry.path] = DownloadRecord(
                    file_name=file_entry.path,
                    file_url="",
                    sha256sum=file_entry.sha256sum if file_entry.sha256sum else "FIXME",
                    file_purpose=file_entry.file_type,
                    known_slow_download=any(
                        slow_url in file_entry.path.lower() for slow_url in _SLOW_DOWNLOAD_HOST_SUBSTRINGS
                    ),
                )

        for download_entry in legacy_record.config.download:
            if download_entry.file_name in download_records:
                download_records[download_entry.file_name].file_url = download_entry.file_url or ""
            else:
                raise ValueError(f"Unknown download entry: {download_entry.file_name}")

        return GenericModelRecordConfig(download=list(download_records.values()))

    def _convert_single_record(
        self,
        legacy_record: LegacyGenericRecord,
    ) -> GenericModelRecord | None:
        """Convert a single legacy record to the new format.

        Override this in subclasses for category-specific conversion.
        """
        model_record_config = self._convert_model_record_config(legacy_record)

        # Get the appropriate record type from the lookup
        record_class = MODEL_RECORD_TYPE_LOOKUP[self.model_reference_category]

        return record_class(
            record_type=self.model_reference_category,
            name=legacy_record.name,
            description=legacy_record.description,
            version=legacy_record.version,
            config=model_record_config,
            model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
        )

    def _convert_single_record_to_legacy(
        self,
        v2_record: GenericModelRecord,
    ) -> LegacyGenericRecord:
        """Convert a single v2 record back to legacy format.

        This is a stub for future implementation. Override this in subclasses for category-specific conversion.

        Args:
            v2_record: The v2 format record to convert back to legacy format.

        Returns:
            The legacy format record.

        Raises:
            NotImplementedError: This conversion is not yet implemented.

        """
        raise NotImplementedError(
            "v2 → legacy conversion is not yet implemented. "
            "This feature is planned for a future release. "
            "For now, only legacy → v2 conversion is supported."
        )

    def convert_from_v2_to_legacy(
        self,
        v2_records: dict[str, GenericModelRecord],
    ) -> dict[str, LegacyGenericRecord]:
        """Convert all v2 records back to legacy format.

        This is a stub for future implementation.

        Args:
            v2_records: Dictionary of v2 format records to convert.

        Returns:
            Dictionary of legacy format records.

        Raises:
            NotImplementedError: This conversion is not yet implemented.

        """
        raise NotImplementedError(
            "v2 → legacy conversion is not yet implemented. "
            "This feature is planned for a future release. "
            "For now, only legacy → v2 conversion is supported. "
            f"Attempted to convert {len(v2_records)} records from category: {self.model_reference_category}"
        )

    def pre_parse_records(self) -> None:
        """Override to perform category-specific pre-parsing."""

    def post_parse_records(self) -> None:
        """Override to perform category-specific post-parsing."""

    def write_out_records(self) -> None:
        """Write out the converted records."""
        if self.dry_run:
            return
        # Serialize the converted records to a canonical JSON string first so we can
        # compare with the on-disk file and avoid rewriting (which changes mtime).
        final_serialized = json.dumps(
            self._all_converted_records,
            indent=4,
            default=lambda o: o.model_dump(
                exclude_none=True,
                exclude_unset=False,
                by_alias=True,
            ),
        )
        # keep trailing newline for consistency with other writers
        final_serialized = final_serialized + "\n"

        target_path = Path(self.converted_database_file_path)
        target_path.parent.mkdir(parents=True, exist_ok=True)

        # If the file already exists and the content is identical, skip writing to
        # preserve the existing mtime and avoid invalidating caches that rely on it.
        try:
            if target_path.exists():
                existing = target_path.read_text()
                if existing == final_serialized:
                    logger.debug(f"No change to converted file {target_path}, skipping write.")
                    return
        except Exception:
            # If we can't read the existing file for any reason, continue and overwrite.
            pass

        # Write atomically: write to a temporary file in the same directory and replace.
        tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
        tmp_path.write_text(final_serialized)
        tmp_path.replace(target_path)

    def get_records(self) -> dict[str, GenericModelRecord]:
        """Return the converted records."""
        return self._all_converted_records

    def add_validation_error_to_log(
        self,
        *,
        model_record_key: str,
        error: str,
    ) -> None:
        """Add a validation error to the log."""
        if model_record_key not in self.all_validation_errors_log:
            self.all_validation_errors_log[model_record_key] = []
        self.all_validation_errors_log[model_record_key].append(error)

        if self.debug_mode:
            logger.debug(f"{model_record_key} has error: {error}")

    def write_out_validation_errors(self) -> None:
        """Write out the validation errors."""
        if self.dry_run or not self.debug_mode:
            return

        log_file = self.log_folder.joinpath(self.model_reference_category + ".log")
        log_file.parent.mkdir(parents=True, exist_ok=True)
        with open(log_file, "w") as validation_errors_log_file:
            validation_errors_log_file.write(
                json.dumps(
                    self.all_validation_errors_log,
                    indent=4,
                ),
            )

_all_legacy_records instance-attribute

_all_legacy_records: dict[str, LegacyGenericRecord]

All validated legacy model records.

_all_converted_records instance-attribute

_all_converted_records: dict[str, GenericModelRecord]

All converted model records in the new format.

all_validation_errors_log instance-attribute

all_validation_errors_log: dict[str, list[str]]

All validation errors that occurred during conversion.

_host_counter instance-attribute

_host_counter: dict[str, int]

Counter for tracking download hosts across all records.

converted_successfully class-attribute instance-attribute

converted_successfully: bool = False

model_reference_category instance-attribute

model_reference_category: MODEL_REFERENCE_CATEGORY = (
    model_reference_category
)

model_reference_type instance-attribute

model_reference_type: type[LegacyGenericRecord] = (
    LegacyGenericRecord
)

legacy_folder_path instance-attribute

legacy_folder_path: Path = (
    normalized_legacy_base / LEGACY_REFERENCE_FOLDER_NAME
)

legacy_database_path instance-attribute

legacy_database_path: Path = (
    get_legacy_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_legacy_base,
    )
)

converted_folder_path instance-attribute

converted_folder_path: Path = Path(normalized_target_base)

converted_database_file_path instance-attribute

converted_database_file_path: Path = (
    get_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_target_base,
    )
)

debug_mode class-attribute instance-attribute

debug_mode: bool = debug_mode

log_folder instance-attribute

log_folder: Path = Path(log_folder)

dry_run class-attribute instance-attribute

dry_run: bool = dry_run

__init__

__init__(
    *,
    legacy_folder_path: str
    | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str
    | Path = horde_model_reference_paths.base_path,
    log_folder: str
    | Path = horde_model_reference_paths.log_folder,
    model_reference_category: MODEL_REFERENCE_CATEGORY,
    debug_mode: bool = False,
    dry_run: bool = False,
) -> None

Initialize the legacy converter.

Parameters:

  • legacy_folder_path (str | Path, default: legacy_path ) –

    The legacy database folder.

  • target_file_folder (str | Path, default: base_path ) –

    The folder to write the converted database to.

  • log_folder (str | Path, default: log_folder ) –

    The folder to write the log files to.

  • model_reference_category (MODEL_REFERENCE_CATEGORY) –

    The category of model reference to convert.

  • debug_mode (bool, default: False ) –

    If true, include extra information in the error log.

  • dry_run (bool, default: False ) –

    If true, don't write out the converted database or any logs.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def __init__(
    self,
    *,
    legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str | Path = horde_model_reference_paths.base_path,
    log_folder: str | Path = horde_model_reference_paths.log_folder,
    model_reference_category: MODEL_REFERENCE_CATEGORY,
    debug_mode: bool = False,
    dry_run: bool = False,
) -> None:
    """Initialize the legacy converter.

    Args:
        legacy_folder_path: The legacy database folder.
        target_file_folder: The folder to write the converted database to.
        log_folder: The folder to write the log files to.
        model_reference_category: The category of model reference to convert.
        debug_mode: If true, include extra information in the error log.
        dry_run: If true, don't write out the converted database or any logs.

    """
    self._initialize()

    self.model_reference_category = model_reference_category
    self.model_reference_type = LegacyGenericRecord
    if model_reference_category == MODEL_REFERENCE_CATEGORY.image_generation:
        self.model_reference_type = LegacyStableDiffusionRecord
    elif model_reference_category == MODEL_REFERENCE_CATEGORY.clip:
        self.model_reference_type = LegacyClipRecord
    elif model_reference_category == MODEL_REFERENCE_CATEGORY.text_generation:
        self.model_reference_type = LegacyTextGenerationRecord

    normalized_legacy_base = path_consts.normalize_legacy_base_path(legacy_folder_path)
    normalized_target_base = path_consts.normalize_legacy_base_path(target_file_folder)

    self.legacy_folder_path = normalized_legacy_base / path_consts.LEGACY_REFERENCE_FOLDER_NAME
    self.legacy_database_path = horde_model_reference_paths.get_legacy_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_legacy_base,
    )
    self.converted_folder_path = Path(normalized_target_base)
    self.converted_database_file_path = horde_model_reference_paths.get_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_target_base,
    )
    self.debug_mode = debug_mode
    self.log_folder = Path(log_folder)
    self.dry_run = dry_run

_initialize

_initialize() -> None

Initialize the converter, allowing re-conversion if applicable.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _initialize(self) -> None:
    """Initialize the converter, allowing re-conversion if applicable."""
    self._all_legacy_records = {}
    self._all_converted_records = {}
    self.all_validation_errors_log = {}
    self._host_counter = {}
    self.converted_successfully = False

convert_to_new_format

convert_to_new_format() -> dict[str, GenericModelRecord]

Convert the legacy model reference to the new format.

Returns:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_to_new_format(self) -> dict[str, GenericModelRecord]:
    """Convert the legacy model reference to the new format.

    Returns:
        The converted model records in the new format.

    """
    if self.converted_successfully:
        self._initialize()

    self.pre_parse_records()
    self._load_and_validate_legacy_records()
    self._convert_legacy_to_new_format()
    self.post_parse_records()
    self.write_out_validation_errors()
    self.write_out_records()

    self.converted_successfully = True

    return self._all_converted_records

_load_and_validate_legacy_records

_load_and_validate_legacy_records() -> None

Load and validate all legacy records using Pydantic models.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _load_and_validate_legacy_records(self) -> None:
    """Load and validate all legacy records using Pydantic models."""
    # Check if file exists and is not empty
    if not self.legacy_database_path.exists():
        logger.debug(f"Legacy database file {self.legacy_database_path} does not exist, skipping conversion")
        return

    file_size = self.legacy_database_path.stat().st_size
    if file_size == 0:
        logger.debug(f"Legacy database file {self.legacy_database_path} is empty, skipping conversion")
        return

    with open(self.legacy_database_path) as legacy_model_reference_file:
        raw_legacy_json_data: dict[str, dict[str, Any]] = json.load(legacy_model_reference_file)

    for model_record_key, model_record_contents in raw_legacy_json_data.items():
        issues: list[str] = []
        validation_context = {
            "issues": issues,
            "model_key": model_record_key,
            "debug_mode": self.debug_mode,
            "category": self.model_reference_category,
            "host_counter": self._host_counter,
        }

        # Add existing showcase files to context for stable diffusion
        if hasattr(self, "existing_showcase_files"):
            validation_context["existing_showcase_files"] = self.existing_showcase_files

        try:
            legacy_record = self.model_reference_type.model_validate(
                model_record_contents,
                context=validation_context,
            )
            self._all_legacy_records[model_record_key] = legacy_record

            if issues:
                for issue in issues:
                    self.add_validation_error_to_log(model_record_key=model_record_key, error=issue)
        except ValidationError as e:
            error = f"CRITICAL: Error parsing {model_record_key}:\n{e}"
            self.add_validation_error_to_log(model_record_key=model_record_key, error=error)
            raise

_convert_legacy_to_new_format

_convert_legacy_to_new_format() -> None

Convert validated legacy records to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_legacy_to_new_format(self) -> None:
    """Convert validated legacy records to the new format."""
    for model_key, legacy_record in self._all_legacy_records.items():
        try:
            converted_record = self._convert_single_record(legacy_record)
            if converted_record is None:
                self.add_validation_error_to_log(
                    model_record_key=model_key, error="Failed to convert legacy record to new format"
                )
            else:
                self._all_converted_records[model_key] = converted_record
        except Exception as e:
            error = f"Failed to convert {model_key}: {e}"
            self.add_validation_error_to_log(model_record_key=model_key, error=error)
            raise

_convert_model_record_config

_convert_model_record_config(
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecordConfig

Convert the config section of a legacy record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_model_record_config(self, legacy_record: LegacyGenericRecord) -> GenericModelRecordConfig:
    """Convert the config section of a legacy record to the new format."""
    download_records: dict[str, DownloadRecord] = {}

    for file_entry in legacy_record.config.files:
        if file_entry.path and "yaml" not in file_entry.path.lower():
            download_records[file_entry.path] = DownloadRecord(
                file_name=file_entry.path,
                file_url="",
                sha256sum=file_entry.sha256sum if file_entry.sha256sum else "FIXME",
                file_purpose=file_entry.file_type,
                known_slow_download=any(
                    slow_url in file_entry.path.lower() for slow_url in _SLOW_DOWNLOAD_HOST_SUBSTRINGS
                ),
            )

    for download_entry in legacy_record.config.download:
        if download_entry.file_name in download_records:
            download_records[download_entry.file_name].file_url = download_entry.file_url or ""
        else:
            raise ValueError(f"Unknown download entry: {download_entry.file_name}")

    return GenericModelRecordConfig(download=list(download_records.values()))

_convert_single_record

_convert_single_record(
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecord | None

Convert a single legacy record to the new format.

Override this in subclasses for category-specific conversion.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_single_record(
    self,
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecord | None:
    """Convert a single legacy record to the new format.

    Override this in subclasses for category-specific conversion.
    """
    model_record_config = self._convert_model_record_config(legacy_record)

    # Get the appropriate record type from the lookup
    record_class = MODEL_RECORD_TYPE_LOOKUP[self.model_reference_category]

    return record_class(
        record_type=self.model_reference_category,
        name=legacy_record.name,
        description=legacy_record.description,
        version=legacy_record.version,
        config=model_record_config,
        model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
    )

_convert_single_record_to_legacy

_convert_single_record_to_legacy(
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord

Convert a single v2 record back to legacy format.

This is a stub for future implementation. Override this in subclasses for category-specific conversion.

Parameters:

  • v2_record (GenericModelRecord) –

    The v2 format record to convert back to legacy format.

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_single_record_to_legacy(
    self,
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord:
    """Convert a single v2 record back to legacy format.

    This is a stub for future implementation. Override this in subclasses for category-specific conversion.

    Args:
        v2_record: The v2 format record to convert back to legacy format.

    Returns:
        The legacy format record.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported."
    )

convert_from_v2_to_legacy

convert_from_v2_to_legacy(
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]

Convert all v2 records back to legacy format.

This is a stub for future implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_from_v2_to_legacy(
    self,
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]:
    """Convert all v2 records back to legacy format.

    This is a stub for future implementation.

    Args:
        v2_records: Dictionary of v2 format records to convert.

    Returns:
        Dictionary of legacy format records.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported. "
        f"Attempted to convert {len(v2_records)} records from category: {self.model_reference_category}"
    )

pre_parse_records

pre_parse_records() -> None

Override to perform category-specific pre-parsing.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def pre_parse_records(self) -> None:
    """Override to perform category-specific pre-parsing."""

post_parse_records

post_parse_records() -> None

Override to perform category-specific post-parsing.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def post_parse_records(self) -> None:
    """Override to perform category-specific post-parsing."""

write_out_records

write_out_records() -> None

Write out the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_records(self) -> None:
    """Write out the converted records."""
    if self.dry_run:
        return
    # Serialize the converted records to a canonical JSON string first so we can
    # compare with the on-disk file and avoid rewriting (which changes mtime).
    final_serialized = json.dumps(
        self._all_converted_records,
        indent=4,
        default=lambda o: o.model_dump(
            exclude_none=True,
            exclude_unset=False,
            by_alias=True,
        ),
    )
    # keep trailing newline for consistency with other writers
    final_serialized = final_serialized + "\n"

    target_path = Path(self.converted_database_file_path)
    target_path.parent.mkdir(parents=True, exist_ok=True)

    # If the file already exists and the content is identical, skip writing to
    # preserve the existing mtime and avoid invalidating caches that rely on it.
    try:
        if target_path.exists():
            existing = target_path.read_text()
            if existing == final_serialized:
                logger.debug(f"No change to converted file {target_path}, skipping write.")
                return
    except Exception:
        # If we can't read the existing file for any reason, continue and overwrite.
        pass

    # Write atomically: write to a temporary file in the same directory and replace.
    tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
    tmp_path.write_text(final_serialized)
    tmp_path.replace(target_path)

get_records

get_records() -> dict[str, GenericModelRecord]

Return the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def get_records(self) -> dict[str, GenericModelRecord]:
    """Return the converted records."""
    return self._all_converted_records

add_validation_error_to_log

add_validation_error_to_log(
    *, model_record_key: str, error: str
) -> None

Add a validation error to the log.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def add_validation_error_to_log(
    self,
    *,
    model_record_key: str,
    error: str,
) -> None:
    """Add a validation error to the log."""
    if model_record_key not in self.all_validation_errors_log:
        self.all_validation_errors_log[model_record_key] = []
    self.all_validation_errors_log[model_record_key].append(error)

    if self.debug_mode:
        logger.debug(f"{model_record_key} has error: {error}")

write_out_validation_errors

write_out_validation_errors() -> None

Write out the validation errors.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_validation_errors(self) -> None:
    """Write out the validation errors."""
    if self.dry_run or not self.debug_mode:
        return

    log_file = self.log_folder.joinpath(self.model_reference_category + ".log")
    log_file.parent.mkdir(parents=True, exist_ok=True)
    with open(log_file, "w") as validation_errors_log_file:
        validation_errors_log_file.write(
            json.dumps(
                self.all_validation_errors_log,
                indent=4,
            ),
        )

LegacyStableDiffusionConverter

Bases: BaseLegacyConverter

Converter for legacy Stable Diffusion model reference records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
class LegacyStableDiffusionConverter(BaseLegacyConverter):
    """Converter for legacy Stable Diffusion model reference records."""

    showcase_glob_pattern: str = "horde_model_reference/showcase/*"
    all_baseline_categories: dict[str, int]
    all_styles: dict[str, int]
    all_tags: dict[str, int]
    existing_showcase_files: dict[str, list[str]]

    def __init__(
        self,
        *,
        legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
        target_file_folder: str | Path = horde_model_reference_paths.base_path,
        debug_mode: bool = False,
    ) -> None:
        """Initialize the Stable Diffusion converter."""
        super().__init__(
            legacy_folder_path=legacy_folder_path,
            target_file_folder=target_file_folder,
            model_reference_category=MODEL_REFERENCE_CATEGORY.image_generation,
            debug_mode=debug_mode,
        )
        self._sd_initialize()

    def _sd_initialize(self) -> None:
        """Initialize SD-specific tracking dictionaries."""
        self.all_baseline_categories = {}
        self.all_styles = {}
        self.all_tags = {}
        self.existing_showcase_files = {}

    @override
    def pre_parse_records(self) -> None:
        existing_showcase_folders = glob.glob(self.showcase_glob_pattern, recursive=True)
        self.existing_showcase_files = self.get_existing_showcases(existing_showcase_folders)

    @override
    def _convert_single_record(
        self,
        legacy_record: LegacyGenericRecord,
    ) -> ImageGenerationModelRecord:
        """Convert a single legacy Stable Diffusion record to the new format."""
        if not isinstance(legacy_record, LegacyStableDiffusionRecord):
            raise TypeError(f"Expected {legacy_record.name} to be a LegacyStableDiffusionRecord.")

        if legacy_record.baseline:
            self.all_baseline_categories[legacy_record.baseline] = (
                self.all_baseline_categories.get(legacy_record.baseline, 0) + 1
            )
        if legacy_record.style:
            self.all_styles[legacy_record.style] = self.all_styles.get(legacy_record.style, 0) + 1
        if legacy_record.tags:
            for tag in legacy_record.tags:
                self.all_tags[tag] = self.all_tags.get(tag, 0) + 1

        model_record_config = self._convert_model_record_config(legacy_record)

        return ImageGenerationModelRecord(
            name=legacy_record.name,
            description=legacy_record.description,
            version=legacy_record.version,
            config=model_record_config,
            inpainting=legacy_record.inpainting,
            baseline=legacy_record.baseline,
            optimization=legacy_record.optimization,
            tags=legacy_record.tags or [],
            showcases=legacy_record.showcases or [],
            min_bridge_version=legacy_record.min_bridge_version,
            trigger=legacy_record.trigger or [],
            homepage=legacy_record.homepage,
            nsfw=legacy_record.nsfw if legacy_record.nsfw is not None else False,
            style=legacy_record.style,
            requirements=legacy_record.requirements,
            size_on_disk_bytes=legacy_record.size_on_disk_bytes,
            model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
        )

    @override
    def post_parse_records(self) -> None:
        super().post_parse_records()

        # Create showcase folder
        # for model_key in self._all_converted_records:
        #     expected_showcase_foldername = model_name_to_showcase_folder_name(model_key)
        #     self.create_showcase_folder(expected_showcase_foldername)

        final_on_disk_showcase_folders = glob.glob(self.showcase_glob_pattern, recursive=True)
        for folder in final_on_disk_showcase_folders:
            parsed_folder = Path(folder)

            if parsed_folder.is_file():
                continue

            if not any(parsed_folder.iterdir()):
                error = f"showcase folder '{parsed_folder.name}' is empty."
                self.add_validation_error_to_log(model_record_key=parsed_folder.name, error=error)

        final_on_disk_showcase_folders_names = [
            Path(folder).name for folder in final_on_disk_showcase_folders if Path(folder).is_dir()
        ]
        final_expected_showcase_folders = [
            model_name_to_showcase_folder_name(model_name) for model_name in self._all_converted_records
        ]

        for folder in final_on_disk_showcase_folders_names:
            if folder not in final_expected_showcase_folders:
                error = f"folder '{folder}' is not in the model records."
                self.add_validation_error_to_log(model_record_key=folder, error=error)

        if self.debug_mode:
            logger.debug(f"{self.all_styles=}")
            logger.debug(f"{self.all_baseline_categories=}")
            logger.debug(f"{self.all_tags=}")
            logger.debug(f"{self._host_counter=}")
            logger.info(f"Total number of models: {len(self._all_converted_records)}")
            logger.info(f"Total number of showcase folders: {len(final_on_disk_showcase_folders_names)}")
            logger.info(f"Total number of models with validation issues: {len(self.all_validation_errors_log)}")

    @override
    def write_out_records(self) -> None:
        sanity_check: dict[str, ImageGenerationModelRecord] = {
            key: value
            for key, value in self._all_converted_records.items()
            if isinstance(value, ImageGenerationModelRecord)
        }
        if len(sanity_check) != len(self._all_converted_records):
            raise ValueError("CRITICAL: Not all records are of the correct type.")

        if self.dry_run:
            return

        final_converted_model_reference = json.dumps(
            self._all_converted_records,
            indent=4,
            default=lambda o: o.model_dump(
                exclude_none=True,
                exclude_unset=True,
                exclude_defaults=True,
                by_alias=True,
            ),
        )
        final_converted_model_reference = final_converted_model_reference + "\n"

        target_path = Path(self.converted_database_file_path)
        target_path.parent.mkdir(parents=True, exist_ok=True)

        try:
            if target_path.exists():
                existing = target_path.read_text()
                if existing == final_converted_model_reference:
                    logger.debug(f"No change to converted file {target_path}, skipping write.")
                    return
        except Exception:
            pass

        tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
        tmp_path.write_text(final_converted_model_reference)
        tmp_path.replace(target_path)

        logger.debug(f"Converted database written to: {self.converted_database_file_path}")

    def get_existing_showcases(
        self,
        existing_showcase_folders: list[str],
    ) -> dict[str, list[str]]:
        """Return a dictionary of existing showcase files, keyed by showcase folder name."""
        existing_showcase_files: dict[str, list[str]] = {}
        for showcase_folder in existing_showcase_folders:
            model_showcase_files = glob.glob(str(Path(showcase_folder).joinpath("*")), recursive=True)
            model_showcase_folder_name = model_name_to_showcase_folder_name(Path(showcase_folder).name)

            existing_showcase_files[model_showcase_folder_name] = model_showcase_files

        return existing_showcase_files

    def create_showcase_folder(self, showcase_foldername: str) -> None:
        """Create a showcase folder with the given name."""
        if showcase_foldername not in self.existing_showcase_files:
            self.existing_showcase_files[showcase_foldername] = []

        newFolder = self.converted_folder_path.joinpath(path_consts.DEFAULT_SHOWCASE_FOLDER_NAME)
        newFolder = newFolder.joinpath(showcase_foldername)
        newFolder.mkdir(parents=True, exist_ok=True)

showcase_glob_pattern class-attribute instance-attribute

showcase_glob_pattern: str = (
    "horde_model_reference/showcase/*"
)

all_baseline_categories instance-attribute

all_baseline_categories: dict[str, int]

all_styles instance-attribute

all_styles: dict[str, int]

all_tags instance-attribute

all_tags: dict[str, int]

existing_showcase_files instance-attribute

existing_showcase_files: dict[str, list[str]]

legacy_folder_path instance-attribute

legacy_folder_path: Path = (
    normalized_legacy_base / LEGACY_REFERENCE_FOLDER_NAME
)

legacy_database_path instance-attribute

legacy_database_path: Path = (
    get_legacy_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_legacy_base,
    )
)

converted_folder_path instance-attribute

converted_folder_path: Path = Path(normalized_target_base)

converted_database_file_path instance-attribute

converted_database_file_path: Path = (
    get_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_target_base,
    )
)

model_reference_category instance-attribute

model_reference_category: MODEL_REFERENCE_CATEGORY = (
    model_reference_category
)

model_reference_type instance-attribute

model_reference_type: type[LegacyGenericRecord] = (
    LegacyGenericRecord
)

_all_legacy_records instance-attribute

_all_legacy_records: dict[str, LegacyGenericRecord]

All validated legacy model records.

_all_converted_records instance-attribute

_all_converted_records: dict[str, GenericModelRecord]

All converted model records in the new format.

all_validation_errors_log instance-attribute

all_validation_errors_log: dict[str, list[str]]

All validation errors that occurred during conversion.

_host_counter instance-attribute

_host_counter: dict[str, int]

Counter for tracking download hosts across all records.

debug_mode class-attribute instance-attribute

debug_mode: bool = debug_mode

log_folder instance-attribute

log_folder: Path = Path(log_folder)

dry_run class-attribute instance-attribute

dry_run: bool = dry_run

converted_successfully class-attribute instance-attribute

converted_successfully: bool = False

__init__

__init__(
    *,
    legacy_folder_path: str
    | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str
    | Path = horde_model_reference_paths.base_path,
    debug_mode: bool = False,
) -> None

Initialize the Stable Diffusion converter.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def __init__(
    self,
    *,
    legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str | Path = horde_model_reference_paths.base_path,
    debug_mode: bool = False,
) -> None:
    """Initialize the Stable Diffusion converter."""
    super().__init__(
        legacy_folder_path=legacy_folder_path,
        target_file_folder=target_file_folder,
        model_reference_category=MODEL_REFERENCE_CATEGORY.image_generation,
        debug_mode=debug_mode,
    )
    self._sd_initialize()

_sd_initialize

_sd_initialize() -> None

Initialize SD-specific tracking dictionaries.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _sd_initialize(self) -> None:
    """Initialize SD-specific tracking dictionaries."""
    self.all_baseline_categories = {}
    self.all_styles = {}
    self.all_tags = {}
    self.existing_showcase_files = {}

pre_parse_records

pre_parse_records() -> None
Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def pre_parse_records(self) -> None:
    existing_showcase_folders = glob.glob(self.showcase_glob_pattern, recursive=True)
    self.existing_showcase_files = self.get_existing_showcases(existing_showcase_folders)

_convert_single_record

_convert_single_record(
    legacy_record: LegacyGenericRecord,
) -> ImageGenerationModelRecord

Convert a single legacy Stable Diffusion record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def _convert_single_record(
    self,
    legacy_record: LegacyGenericRecord,
) -> ImageGenerationModelRecord:
    """Convert a single legacy Stable Diffusion record to the new format."""
    if not isinstance(legacy_record, LegacyStableDiffusionRecord):
        raise TypeError(f"Expected {legacy_record.name} to be a LegacyStableDiffusionRecord.")

    if legacy_record.baseline:
        self.all_baseline_categories[legacy_record.baseline] = (
            self.all_baseline_categories.get(legacy_record.baseline, 0) + 1
        )
    if legacy_record.style:
        self.all_styles[legacy_record.style] = self.all_styles.get(legacy_record.style, 0) + 1
    if legacy_record.tags:
        for tag in legacy_record.tags:
            self.all_tags[tag] = self.all_tags.get(tag, 0) + 1

    model_record_config = self._convert_model_record_config(legacy_record)

    return ImageGenerationModelRecord(
        name=legacy_record.name,
        description=legacy_record.description,
        version=legacy_record.version,
        config=model_record_config,
        inpainting=legacy_record.inpainting,
        baseline=legacy_record.baseline,
        optimization=legacy_record.optimization,
        tags=legacy_record.tags or [],
        showcases=legacy_record.showcases or [],
        min_bridge_version=legacy_record.min_bridge_version,
        trigger=legacy_record.trigger or [],
        homepage=legacy_record.homepage,
        nsfw=legacy_record.nsfw if legacy_record.nsfw is not None else False,
        style=legacy_record.style,
        requirements=legacy_record.requirements,
        size_on_disk_bytes=legacy_record.size_on_disk_bytes,
        model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
    )

post_parse_records

post_parse_records() -> None
Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def post_parse_records(self) -> None:
    super().post_parse_records()

    # Create showcase folder
    # for model_key in self._all_converted_records:
    #     expected_showcase_foldername = model_name_to_showcase_folder_name(model_key)
    #     self.create_showcase_folder(expected_showcase_foldername)

    final_on_disk_showcase_folders = glob.glob(self.showcase_glob_pattern, recursive=True)
    for folder in final_on_disk_showcase_folders:
        parsed_folder = Path(folder)

        if parsed_folder.is_file():
            continue

        if not any(parsed_folder.iterdir()):
            error = f"showcase folder '{parsed_folder.name}' is empty."
            self.add_validation_error_to_log(model_record_key=parsed_folder.name, error=error)

    final_on_disk_showcase_folders_names = [
        Path(folder).name for folder in final_on_disk_showcase_folders if Path(folder).is_dir()
    ]
    final_expected_showcase_folders = [
        model_name_to_showcase_folder_name(model_name) for model_name in self._all_converted_records
    ]

    for folder in final_on_disk_showcase_folders_names:
        if folder not in final_expected_showcase_folders:
            error = f"folder '{folder}' is not in the model records."
            self.add_validation_error_to_log(model_record_key=folder, error=error)

    if self.debug_mode:
        logger.debug(f"{self.all_styles=}")
        logger.debug(f"{self.all_baseline_categories=}")
        logger.debug(f"{self.all_tags=}")
        logger.debug(f"{self._host_counter=}")
        logger.info(f"Total number of models: {len(self._all_converted_records)}")
        logger.info(f"Total number of showcase folders: {len(final_on_disk_showcase_folders_names)}")
        logger.info(f"Total number of models with validation issues: {len(self.all_validation_errors_log)}")

write_out_records

write_out_records() -> None
Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def write_out_records(self) -> None:
    sanity_check: dict[str, ImageGenerationModelRecord] = {
        key: value
        for key, value in self._all_converted_records.items()
        if isinstance(value, ImageGenerationModelRecord)
    }
    if len(sanity_check) != len(self._all_converted_records):
        raise ValueError("CRITICAL: Not all records are of the correct type.")

    if self.dry_run:
        return

    final_converted_model_reference = json.dumps(
        self._all_converted_records,
        indent=4,
        default=lambda o: o.model_dump(
            exclude_none=True,
            exclude_unset=True,
            exclude_defaults=True,
            by_alias=True,
        ),
    )
    final_converted_model_reference = final_converted_model_reference + "\n"

    target_path = Path(self.converted_database_file_path)
    target_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        if target_path.exists():
            existing = target_path.read_text()
            if existing == final_converted_model_reference:
                logger.debug(f"No change to converted file {target_path}, skipping write.")
                return
    except Exception:
        pass

    tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
    tmp_path.write_text(final_converted_model_reference)
    tmp_path.replace(target_path)

    logger.debug(f"Converted database written to: {self.converted_database_file_path}")

get_existing_showcases

get_existing_showcases(
    existing_showcase_folders: list[str],
) -> dict[str, list[str]]

Return a dictionary of existing showcase files, keyed by showcase folder name.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def get_existing_showcases(
    self,
    existing_showcase_folders: list[str],
) -> dict[str, list[str]]:
    """Return a dictionary of existing showcase files, keyed by showcase folder name."""
    existing_showcase_files: dict[str, list[str]] = {}
    for showcase_folder in existing_showcase_folders:
        model_showcase_files = glob.glob(str(Path(showcase_folder).joinpath("*")), recursive=True)
        model_showcase_folder_name = model_name_to_showcase_folder_name(Path(showcase_folder).name)

        existing_showcase_files[model_showcase_folder_name] = model_showcase_files

    return existing_showcase_files

create_showcase_folder

create_showcase_folder(showcase_foldername: str) -> None

Create a showcase folder with the given name.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def create_showcase_folder(self, showcase_foldername: str) -> None:
    """Create a showcase folder with the given name."""
    if showcase_foldername not in self.existing_showcase_files:
        self.existing_showcase_files[showcase_foldername] = []

    newFolder = self.converted_folder_path.joinpath(path_consts.DEFAULT_SHOWCASE_FOLDER_NAME)
    newFolder = newFolder.joinpath(showcase_foldername)
    newFolder.mkdir(parents=True, exist_ok=True)

_initialize

_initialize() -> None

Initialize the converter, allowing re-conversion if applicable.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _initialize(self) -> None:
    """Initialize the converter, allowing re-conversion if applicable."""
    self._all_legacy_records = {}
    self._all_converted_records = {}
    self.all_validation_errors_log = {}
    self._host_counter = {}
    self.converted_successfully = False

convert_to_new_format

convert_to_new_format() -> dict[str, GenericModelRecord]

Convert the legacy model reference to the new format.

Returns:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_to_new_format(self) -> dict[str, GenericModelRecord]:
    """Convert the legacy model reference to the new format.

    Returns:
        The converted model records in the new format.

    """
    if self.converted_successfully:
        self._initialize()

    self.pre_parse_records()
    self._load_and_validate_legacy_records()
    self._convert_legacy_to_new_format()
    self.post_parse_records()
    self.write_out_validation_errors()
    self.write_out_records()

    self.converted_successfully = True

    return self._all_converted_records

_load_and_validate_legacy_records

_load_and_validate_legacy_records() -> None

Load and validate all legacy records using Pydantic models.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _load_and_validate_legacy_records(self) -> None:
    """Load and validate all legacy records using Pydantic models."""
    # Check if file exists and is not empty
    if not self.legacy_database_path.exists():
        logger.debug(f"Legacy database file {self.legacy_database_path} does not exist, skipping conversion")
        return

    file_size = self.legacy_database_path.stat().st_size
    if file_size == 0:
        logger.debug(f"Legacy database file {self.legacy_database_path} is empty, skipping conversion")
        return

    with open(self.legacy_database_path) as legacy_model_reference_file:
        raw_legacy_json_data: dict[str, dict[str, Any]] = json.load(legacy_model_reference_file)

    for model_record_key, model_record_contents in raw_legacy_json_data.items():
        issues: list[str] = []
        validation_context = {
            "issues": issues,
            "model_key": model_record_key,
            "debug_mode": self.debug_mode,
            "category": self.model_reference_category,
            "host_counter": self._host_counter,
        }

        # Add existing showcase files to context for stable diffusion
        if hasattr(self, "existing_showcase_files"):
            validation_context["existing_showcase_files"] = self.existing_showcase_files

        try:
            legacy_record = self.model_reference_type.model_validate(
                model_record_contents,
                context=validation_context,
            )
            self._all_legacy_records[model_record_key] = legacy_record

            if issues:
                for issue in issues:
                    self.add_validation_error_to_log(model_record_key=model_record_key, error=issue)
        except ValidationError as e:
            error = f"CRITICAL: Error parsing {model_record_key}:\n{e}"
            self.add_validation_error_to_log(model_record_key=model_record_key, error=error)
            raise

_convert_legacy_to_new_format

_convert_legacy_to_new_format() -> None

Convert validated legacy records to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_legacy_to_new_format(self) -> None:
    """Convert validated legacy records to the new format."""
    for model_key, legacy_record in self._all_legacy_records.items():
        try:
            converted_record = self._convert_single_record(legacy_record)
            if converted_record is None:
                self.add_validation_error_to_log(
                    model_record_key=model_key, error="Failed to convert legacy record to new format"
                )
            else:
                self._all_converted_records[model_key] = converted_record
        except Exception as e:
            error = f"Failed to convert {model_key}: {e}"
            self.add_validation_error_to_log(model_record_key=model_key, error=error)
            raise

_convert_model_record_config

_convert_model_record_config(
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecordConfig

Convert the config section of a legacy record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_model_record_config(self, legacy_record: LegacyGenericRecord) -> GenericModelRecordConfig:
    """Convert the config section of a legacy record to the new format."""
    download_records: dict[str, DownloadRecord] = {}

    for file_entry in legacy_record.config.files:
        if file_entry.path and "yaml" not in file_entry.path.lower():
            download_records[file_entry.path] = DownloadRecord(
                file_name=file_entry.path,
                file_url="",
                sha256sum=file_entry.sha256sum if file_entry.sha256sum else "FIXME",
                file_purpose=file_entry.file_type,
                known_slow_download=any(
                    slow_url in file_entry.path.lower() for slow_url in _SLOW_DOWNLOAD_HOST_SUBSTRINGS
                ),
            )

    for download_entry in legacy_record.config.download:
        if download_entry.file_name in download_records:
            download_records[download_entry.file_name].file_url = download_entry.file_url or ""
        else:
            raise ValueError(f"Unknown download entry: {download_entry.file_name}")

    return GenericModelRecordConfig(download=list(download_records.values()))

_convert_single_record_to_legacy

_convert_single_record_to_legacy(
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord

Convert a single v2 record back to legacy format.

This is a stub for future implementation. Override this in subclasses for category-specific conversion.

Parameters:

  • v2_record (GenericModelRecord) –

    The v2 format record to convert back to legacy format.

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_single_record_to_legacy(
    self,
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord:
    """Convert a single v2 record back to legacy format.

    This is a stub for future implementation. Override this in subclasses for category-specific conversion.

    Args:
        v2_record: The v2 format record to convert back to legacy format.

    Returns:
        The legacy format record.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported."
    )

convert_from_v2_to_legacy

convert_from_v2_to_legacy(
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]

Convert all v2 records back to legacy format.

This is a stub for future implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_from_v2_to_legacy(
    self,
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]:
    """Convert all v2 records back to legacy format.

    This is a stub for future implementation.

    Args:
        v2_records: Dictionary of v2 format records to convert.

    Returns:
        Dictionary of legacy format records.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported. "
        f"Attempted to convert {len(v2_records)} records from category: {self.model_reference_category}"
    )

get_records

get_records() -> dict[str, GenericModelRecord]

Return the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def get_records(self) -> dict[str, GenericModelRecord]:
    """Return the converted records."""
    return self._all_converted_records

add_validation_error_to_log

add_validation_error_to_log(
    *, model_record_key: str, error: str
) -> None

Add a validation error to the log.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def add_validation_error_to_log(
    self,
    *,
    model_record_key: str,
    error: str,
) -> None:
    """Add a validation error to the log."""
    if model_record_key not in self.all_validation_errors_log:
        self.all_validation_errors_log[model_record_key] = []
    self.all_validation_errors_log[model_record_key].append(error)

    if self.debug_mode:
        logger.debug(f"{model_record_key} has error: {error}")

write_out_validation_errors

write_out_validation_errors() -> None

Write out the validation errors.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_validation_errors(self) -> None:
    """Write out the validation errors."""
    if self.dry_run or not self.debug_mode:
        return

    log_file = self.log_folder.joinpath(self.model_reference_category + ".log")
    log_file.parent.mkdir(parents=True, exist_ok=True)
    with open(log_file, "w") as validation_errors_log_file:
        validation_errors_log_file.write(
            json.dumps(
                self.all_validation_errors_log,
                indent=4,
            ),
        )

LegacyClipConverter

Bases: BaseLegacyConverter

Converter for legacy CLIP model reference records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
class LegacyClipConverter(BaseLegacyConverter):
    """Converter for legacy CLIP model reference records."""

    def __init__(
        self,
        *,
        legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
        target_file_folder: str | Path = horde_model_reference_paths.base_path,
        debug_mode: bool = False,
    ) -> None:
        """Initialize the legacy CLIP converter."""
        super().__init__(
            legacy_folder_path=legacy_folder_path,
            target_file_folder=target_file_folder,
            model_reference_category=MODEL_REFERENCE_CATEGORY.clip,
            debug_mode=debug_mode,
        )

    @override
    def _convert_single_record(
        self,
        legacy_record: LegacyGenericRecord,
    ) -> ClipModelRecord:
        """Convert a single legacy CLIP record to the new format."""
        if not isinstance(legacy_record, LegacyClipRecord):
            raise TypeError(f"Expected {legacy_record.name} to be a LegacyClipRecord.")

        model_record_config = self._convert_model_record_config(legacy_record)

        return ClipModelRecord(
            name=legacy_record.name,
            description=legacy_record.description,
            version=legacy_record.version,
            config=model_record_config,
            pretrained_name=legacy_record.pretrained_name,
            model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
        )

legacy_folder_path instance-attribute

legacy_folder_path: Path = (
    normalized_legacy_base / LEGACY_REFERENCE_FOLDER_NAME
)

legacy_database_path instance-attribute

legacy_database_path: Path = (
    get_legacy_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_legacy_base,
    )
)

converted_folder_path instance-attribute

converted_folder_path: Path = Path(normalized_target_base)

converted_database_file_path instance-attribute

converted_database_file_path: Path = (
    get_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_target_base,
    )
)

model_reference_category instance-attribute

model_reference_category: MODEL_REFERENCE_CATEGORY = (
    model_reference_category
)

model_reference_type instance-attribute

model_reference_type: type[LegacyGenericRecord] = (
    LegacyGenericRecord
)

_all_legacy_records instance-attribute

_all_legacy_records: dict[str, LegacyGenericRecord]

All validated legacy model records.

_all_converted_records instance-attribute

_all_converted_records: dict[str, GenericModelRecord]

All converted model records in the new format.

all_validation_errors_log instance-attribute

all_validation_errors_log: dict[str, list[str]]

All validation errors that occurred during conversion.

_host_counter instance-attribute

_host_counter: dict[str, int]

Counter for tracking download hosts across all records.

debug_mode class-attribute instance-attribute

debug_mode: bool = debug_mode

log_folder instance-attribute

log_folder: Path = Path(log_folder)

dry_run class-attribute instance-attribute

dry_run: bool = dry_run

converted_successfully class-attribute instance-attribute

converted_successfully: bool = False

__init__

__init__(
    *,
    legacy_folder_path: str
    | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str
    | Path = horde_model_reference_paths.base_path,
    debug_mode: bool = False,
) -> None

Initialize the legacy CLIP converter.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def __init__(
    self,
    *,
    legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str | Path = horde_model_reference_paths.base_path,
    debug_mode: bool = False,
) -> None:
    """Initialize the legacy CLIP converter."""
    super().__init__(
        legacy_folder_path=legacy_folder_path,
        target_file_folder=target_file_folder,
        model_reference_category=MODEL_REFERENCE_CATEGORY.clip,
        debug_mode=debug_mode,
    )

_convert_single_record

_convert_single_record(
    legacy_record: LegacyGenericRecord,
) -> ClipModelRecord

Convert a single legacy CLIP record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def _convert_single_record(
    self,
    legacy_record: LegacyGenericRecord,
) -> ClipModelRecord:
    """Convert a single legacy CLIP record to the new format."""
    if not isinstance(legacy_record, LegacyClipRecord):
        raise TypeError(f"Expected {legacy_record.name} to be a LegacyClipRecord.")

    model_record_config = self._convert_model_record_config(legacy_record)

    return ClipModelRecord(
        name=legacy_record.name,
        description=legacy_record.description,
        version=legacy_record.version,
        config=model_record_config,
        pretrained_name=legacy_record.pretrained_name,
        model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
    )

_initialize

_initialize() -> None

Initialize the converter, allowing re-conversion if applicable.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _initialize(self) -> None:
    """Initialize the converter, allowing re-conversion if applicable."""
    self._all_legacy_records = {}
    self._all_converted_records = {}
    self.all_validation_errors_log = {}
    self._host_counter = {}
    self.converted_successfully = False

convert_to_new_format

convert_to_new_format() -> dict[str, GenericModelRecord]

Convert the legacy model reference to the new format.

Returns:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_to_new_format(self) -> dict[str, GenericModelRecord]:
    """Convert the legacy model reference to the new format.

    Returns:
        The converted model records in the new format.

    """
    if self.converted_successfully:
        self._initialize()

    self.pre_parse_records()
    self._load_and_validate_legacy_records()
    self._convert_legacy_to_new_format()
    self.post_parse_records()
    self.write_out_validation_errors()
    self.write_out_records()

    self.converted_successfully = True

    return self._all_converted_records

_load_and_validate_legacy_records

_load_and_validate_legacy_records() -> None

Load and validate all legacy records using Pydantic models.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _load_and_validate_legacy_records(self) -> None:
    """Load and validate all legacy records using Pydantic models."""
    # Check if file exists and is not empty
    if not self.legacy_database_path.exists():
        logger.debug(f"Legacy database file {self.legacy_database_path} does not exist, skipping conversion")
        return

    file_size = self.legacy_database_path.stat().st_size
    if file_size == 0:
        logger.debug(f"Legacy database file {self.legacy_database_path} is empty, skipping conversion")
        return

    with open(self.legacy_database_path) as legacy_model_reference_file:
        raw_legacy_json_data: dict[str, dict[str, Any]] = json.load(legacy_model_reference_file)

    for model_record_key, model_record_contents in raw_legacy_json_data.items():
        issues: list[str] = []
        validation_context = {
            "issues": issues,
            "model_key": model_record_key,
            "debug_mode": self.debug_mode,
            "category": self.model_reference_category,
            "host_counter": self._host_counter,
        }

        # Add existing showcase files to context for stable diffusion
        if hasattr(self, "existing_showcase_files"):
            validation_context["existing_showcase_files"] = self.existing_showcase_files

        try:
            legacy_record = self.model_reference_type.model_validate(
                model_record_contents,
                context=validation_context,
            )
            self._all_legacy_records[model_record_key] = legacy_record

            if issues:
                for issue in issues:
                    self.add_validation_error_to_log(model_record_key=model_record_key, error=issue)
        except ValidationError as e:
            error = f"CRITICAL: Error parsing {model_record_key}:\n{e}"
            self.add_validation_error_to_log(model_record_key=model_record_key, error=error)
            raise

_convert_legacy_to_new_format

_convert_legacy_to_new_format() -> None

Convert validated legacy records to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_legacy_to_new_format(self) -> None:
    """Convert validated legacy records to the new format."""
    for model_key, legacy_record in self._all_legacy_records.items():
        try:
            converted_record = self._convert_single_record(legacy_record)
            if converted_record is None:
                self.add_validation_error_to_log(
                    model_record_key=model_key, error="Failed to convert legacy record to new format"
                )
            else:
                self._all_converted_records[model_key] = converted_record
        except Exception as e:
            error = f"Failed to convert {model_key}: {e}"
            self.add_validation_error_to_log(model_record_key=model_key, error=error)
            raise

_convert_model_record_config

_convert_model_record_config(
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecordConfig

Convert the config section of a legacy record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_model_record_config(self, legacy_record: LegacyGenericRecord) -> GenericModelRecordConfig:
    """Convert the config section of a legacy record to the new format."""
    download_records: dict[str, DownloadRecord] = {}

    for file_entry in legacy_record.config.files:
        if file_entry.path and "yaml" not in file_entry.path.lower():
            download_records[file_entry.path] = DownloadRecord(
                file_name=file_entry.path,
                file_url="",
                sha256sum=file_entry.sha256sum if file_entry.sha256sum else "FIXME",
                file_purpose=file_entry.file_type,
                known_slow_download=any(
                    slow_url in file_entry.path.lower() for slow_url in _SLOW_DOWNLOAD_HOST_SUBSTRINGS
                ),
            )

    for download_entry in legacy_record.config.download:
        if download_entry.file_name in download_records:
            download_records[download_entry.file_name].file_url = download_entry.file_url or ""
        else:
            raise ValueError(f"Unknown download entry: {download_entry.file_name}")

    return GenericModelRecordConfig(download=list(download_records.values()))

_convert_single_record_to_legacy

_convert_single_record_to_legacy(
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord

Convert a single v2 record back to legacy format.

This is a stub for future implementation. Override this in subclasses for category-specific conversion.

Parameters:

  • v2_record (GenericModelRecord) –

    The v2 format record to convert back to legacy format.

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_single_record_to_legacy(
    self,
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord:
    """Convert a single v2 record back to legacy format.

    This is a stub for future implementation. Override this in subclasses for category-specific conversion.

    Args:
        v2_record: The v2 format record to convert back to legacy format.

    Returns:
        The legacy format record.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported."
    )

convert_from_v2_to_legacy

convert_from_v2_to_legacy(
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]

Convert all v2 records back to legacy format.

This is a stub for future implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_from_v2_to_legacy(
    self,
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]:
    """Convert all v2 records back to legacy format.

    This is a stub for future implementation.

    Args:
        v2_records: Dictionary of v2 format records to convert.

    Returns:
        Dictionary of legacy format records.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported. "
        f"Attempted to convert {len(v2_records)} records from category: {self.model_reference_category}"
    )

pre_parse_records

pre_parse_records() -> None

Override to perform category-specific pre-parsing.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def pre_parse_records(self) -> None:
    """Override to perform category-specific pre-parsing."""

post_parse_records

post_parse_records() -> None

Override to perform category-specific post-parsing.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def post_parse_records(self) -> None:
    """Override to perform category-specific post-parsing."""

write_out_records

write_out_records() -> None

Write out the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_records(self) -> None:
    """Write out the converted records."""
    if self.dry_run:
        return
    # Serialize the converted records to a canonical JSON string first so we can
    # compare with the on-disk file and avoid rewriting (which changes mtime).
    final_serialized = json.dumps(
        self._all_converted_records,
        indent=4,
        default=lambda o: o.model_dump(
            exclude_none=True,
            exclude_unset=False,
            by_alias=True,
        ),
    )
    # keep trailing newline for consistency with other writers
    final_serialized = final_serialized + "\n"

    target_path = Path(self.converted_database_file_path)
    target_path.parent.mkdir(parents=True, exist_ok=True)

    # If the file already exists and the content is identical, skip writing to
    # preserve the existing mtime and avoid invalidating caches that rely on it.
    try:
        if target_path.exists():
            existing = target_path.read_text()
            if existing == final_serialized:
                logger.debug(f"No change to converted file {target_path}, skipping write.")
                return
    except Exception:
        # If we can't read the existing file for any reason, continue and overwrite.
        pass

    # Write atomically: write to a temporary file in the same directory and replace.
    tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
    tmp_path.write_text(final_serialized)
    tmp_path.replace(target_path)

get_records

get_records() -> dict[str, GenericModelRecord]

Return the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def get_records(self) -> dict[str, GenericModelRecord]:
    """Return the converted records."""
    return self._all_converted_records

add_validation_error_to_log

add_validation_error_to_log(
    *, model_record_key: str, error: str
) -> None

Add a validation error to the log.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def add_validation_error_to_log(
    self,
    *,
    model_record_key: str,
    error: str,
) -> None:
    """Add a validation error to the log."""
    if model_record_key not in self.all_validation_errors_log:
        self.all_validation_errors_log[model_record_key] = []
    self.all_validation_errors_log[model_record_key].append(error)

    if self.debug_mode:
        logger.debug(f"{model_record_key} has error: {error}")

write_out_validation_errors

write_out_validation_errors() -> None

Write out the validation errors.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_validation_errors(self) -> None:
    """Write out the validation errors."""
    if self.dry_run or not self.debug_mode:
        return

    log_file = self.log_folder.joinpath(self.model_reference_category + ".log")
    log_file.parent.mkdir(parents=True, exist_ok=True)
    with open(log_file, "w") as validation_errors_log_file:
        validation_errors_log_file.write(
            json.dumps(
                self.all_validation_errors_log,
                indent=4,
            ),
        )

LegacyTextGenerationConverter

Bases: BaseLegacyConverter

Converter for legacy text generation model reference records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
class LegacyTextGenerationConverter(BaseLegacyConverter):
    """Converter for legacy text generation model reference records."""

    def __init__(
        self,
        *,
        legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
        target_file_folder: str | Path = horde_model_reference_paths.base_path,
        debug_mode: bool = False,
    ) -> None:
        """Initialize the legacy text generation converter."""
        super().__init__(
            legacy_folder_path=legacy_folder_path,
            target_file_folder=target_file_folder,
            model_reference_category=MODEL_REFERENCE_CATEGORY.text_generation,
            debug_mode=debug_mode,
        )

    @override
    def _load_and_validate_legacy_records(self) -> None:
        """Load and validate legacy text generation records from CSV format.

        Overrides base class to read CSV instead of JSON for text_generation category.

        IMPORTANT: This is the ONLY converter that reads CSV format. All other categories
        use JSON for legacy files. The CSV has these columns:
        - name, parameters_bn (billions), description, version, style, nsfw, baseline,
          url, tags (comma-separated), instruct_format, settings (JSON string), display_name

        The converter transforms CSV → internal dict → Pydantic validation → v2 JSON output.
        Output is ALWAYS JSON format (text_generation.json), never CSV.

        Note: parameters_bn is converted to integer parameters (billions * 1,000,000,000)
        """
        # Check if file exists and is not empty
        if not self.legacy_database_path.exists():
            logger.debug(f"Legacy database file {self.legacy_database_path} does not exist, skipping conversion")
            return

        file_size = self.legacy_database_path.stat().st_size
        if file_size == 0:
            logger.debug(f"Legacy database file {self.legacy_database_path} is empty, skipping conversion")
            return

        parsed_rows, parse_issues = parse_legacy_text_csv_file(self.legacy_database_path)
        for issue in parse_issues:
            self.add_validation_error_to_log(model_record_key=issue.row_identifier, error=issue.message)

        logger.debug(f"Loaded {len(parsed_rows)} records from CSV")

        # Now validate with Pydantic models (same as base class)
        for csv_row in parsed_rows:
            model_payload = {
                "name": csv_row.name,
                "description": csv_row.description,
                "version": csv_row.version,
                "style": csv_row.style,
                "nsfw": csv_row.nsfw,
                "baseline": csv_row.baseline,
                "url": csv_row.url,
                "tags": csv_row.tags,
                "instruct_format": csv_row.instruct_format or None,
                "settings": csv_row.settings,
                "display_name": csv_row.display_name,
                "parameters": csv_row.parameters,
            }

            validation_issues: list[str] = []
            validation_context: dict[str, object] = {
                "issues": validation_issues,
                "model_key": csv_row.name,
                "debug_mode": self.debug_mode,
                "category": self.model_reference_category,
                "host_counter": self._host_counter,
            }

            if hasattr(self, "existing_showcase_files"):
                validation_context["existing_showcase_files"] = self.existing_showcase_files

            try:
                legacy_record = self.model_reference_type.model_validate(model_payload, context=validation_context)
                self._all_legacy_records[csv_row.name] = legacy_record
                if validation_issues:
                    for validation_issue in validation_issues:
                        self.add_validation_error_to_log(model_record_key=csv_row.name, error=validation_issue)
            except ValidationError as e:
                error = f"CRITICAL: Error parsing {csv_row.name}:\n{e}"
                self.add_validation_error_to_log(model_record_key=csv_row.name, error=error)
                raise

    @override
    def _convert_single_record(
        self,
        legacy_record: LegacyGenericRecord,
    ) -> TextGenerationModelRecord | None:
        """Convert a single legacy text generation record to the new format."""
        if not isinstance(legacy_record, LegacyTextGenerationRecord):
            raise TypeError(f"Expected {legacy_record.name} to be a LegacyTextGenerationRecord.")

        model_record_config = self._convert_model_record_config(legacy_record)

        # Drop backend-prefixed entries (they are duplicates of base models)
        # Backend prefixes are only generated during GitHub sync, not stored internally
        from horde_model_reference.text_backend_names import has_legacy_text_backend_prefix

        if has_legacy_text_backend_prefix(legacy_record.name):
            self.add_validation_error_to_log(
                model_record_key=legacy_record.name,
                error=(
                    f"Model name '{legacy_record.name}' has a backend prefix. "
                    "Dropping this record as it is a duplicate (backend prefixes are not stored internally)."
                ),
            )
            return None

        return TextGenerationModelRecord(
            name=legacy_record.name,
            description=legacy_record.description,
            version=legacy_record.version,
            config=model_record_config,
            baseline=legacy_record.baseline,
            parameters=legacy_record.parameters or 0,
            nsfw=legacy_record.nsfw or False,
            style=legacy_record.style,
            display_name=legacy_record.display_name,
            url=legacy_record.url,
            tags=legacy_record.tags or [],
            instruct_format=legacy_record.instruct_format,
            settings=legacy_record.settings,
            model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
        )

    @override
    def post_parse_records(self) -> None:
        """Populate text_model_group field for all text generation records."""
        from horde_model_reference.analytics.text_model_parser import group_text_models_by_base

        # Get all model names
        model_names = list(self._all_converted_records.keys())

        # Group models by base name
        grouped_models = group_text_models_by_base(model_names)

        # Update each record with its group name
        for base_name, group in grouped_models.items():
            for model_name in group.variants:
                if model_name in self._all_converted_records:
                    record = self._all_converted_records[model_name]
                    if isinstance(record, TextGenerationModelRecord):
                        record.text_model_group = base_name

        logger.debug(f"Populated text_model_group for {len(self._all_converted_records)} text generation records")

legacy_folder_path instance-attribute

legacy_folder_path: Path = (
    normalized_legacy_base / LEGACY_REFERENCE_FOLDER_NAME
)

legacy_database_path instance-attribute

legacy_database_path: Path = (
    get_legacy_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_legacy_base,
    )
)

converted_folder_path instance-attribute

converted_folder_path: Path = Path(normalized_target_base)

converted_database_file_path instance-attribute

converted_database_file_path: Path = (
    get_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_target_base,
    )
)

model_reference_category instance-attribute

model_reference_category: MODEL_REFERENCE_CATEGORY = (
    model_reference_category
)

model_reference_type instance-attribute

model_reference_type: type[LegacyGenericRecord] = (
    LegacyGenericRecord
)

_all_legacy_records instance-attribute

_all_legacy_records: dict[str, LegacyGenericRecord]

All validated legacy model records.

_all_converted_records instance-attribute

_all_converted_records: dict[str, GenericModelRecord]

All converted model records in the new format.

all_validation_errors_log instance-attribute

all_validation_errors_log: dict[str, list[str]]

All validation errors that occurred during conversion.

_host_counter instance-attribute

_host_counter: dict[str, int]

Counter for tracking download hosts across all records.

debug_mode class-attribute instance-attribute

debug_mode: bool = debug_mode

log_folder instance-attribute

log_folder: Path = Path(log_folder)

dry_run class-attribute instance-attribute

dry_run: bool = dry_run

converted_successfully class-attribute instance-attribute

converted_successfully: bool = False

__init__

__init__(
    *,
    legacy_folder_path: str
    | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str
    | Path = horde_model_reference_paths.base_path,
    debug_mode: bool = False,
) -> None

Initialize the legacy text generation converter.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def __init__(
    self,
    *,
    legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str | Path = horde_model_reference_paths.base_path,
    debug_mode: bool = False,
) -> None:
    """Initialize the legacy text generation converter."""
    super().__init__(
        legacy_folder_path=legacy_folder_path,
        target_file_folder=target_file_folder,
        model_reference_category=MODEL_REFERENCE_CATEGORY.text_generation,
        debug_mode=debug_mode,
    )

_load_and_validate_legacy_records

_load_and_validate_legacy_records() -> None

Load and validate legacy text generation records from CSV format.

Overrides base class to read CSV instead of JSON for text_generation category.

IMPORTANT: This is the ONLY converter that reads CSV format. All other categories use JSON for legacy files. The CSV has these columns: - name, parameters_bn (billions), description, version, style, nsfw, baseline, url, tags (comma-separated), instruct_format, settings (JSON string), display_name

The converter transforms CSV → internal dict → Pydantic validation → v2 JSON output. Output is ALWAYS JSON format (text_generation.json), never CSV.

Note: parameters_bn is converted to integer parameters (billions * 1,000,000,000)

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def _load_and_validate_legacy_records(self) -> None:
    """Load and validate legacy text generation records from CSV format.

    Overrides base class to read CSV instead of JSON for text_generation category.

    IMPORTANT: This is the ONLY converter that reads CSV format. All other categories
    use JSON for legacy files. The CSV has these columns:
    - name, parameters_bn (billions), description, version, style, nsfw, baseline,
      url, tags (comma-separated), instruct_format, settings (JSON string), display_name

    The converter transforms CSV → internal dict → Pydantic validation → v2 JSON output.
    Output is ALWAYS JSON format (text_generation.json), never CSV.

    Note: parameters_bn is converted to integer parameters (billions * 1,000,000,000)
    """
    # Check if file exists and is not empty
    if not self.legacy_database_path.exists():
        logger.debug(f"Legacy database file {self.legacy_database_path} does not exist, skipping conversion")
        return

    file_size = self.legacy_database_path.stat().st_size
    if file_size == 0:
        logger.debug(f"Legacy database file {self.legacy_database_path} is empty, skipping conversion")
        return

    parsed_rows, parse_issues = parse_legacy_text_csv_file(self.legacy_database_path)
    for issue in parse_issues:
        self.add_validation_error_to_log(model_record_key=issue.row_identifier, error=issue.message)

    logger.debug(f"Loaded {len(parsed_rows)} records from CSV")

    # Now validate with Pydantic models (same as base class)
    for csv_row in parsed_rows:
        model_payload = {
            "name": csv_row.name,
            "description": csv_row.description,
            "version": csv_row.version,
            "style": csv_row.style,
            "nsfw": csv_row.nsfw,
            "baseline": csv_row.baseline,
            "url": csv_row.url,
            "tags": csv_row.tags,
            "instruct_format": csv_row.instruct_format or None,
            "settings": csv_row.settings,
            "display_name": csv_row.display_name,
            "parameters": csv_row.parameters,
        }

        validation_issues: list[str] = []
        validation_context: dict[str, object] = {
            "issues": validation_issues,
            "model_key": csv_row.name,
            "debug_mode": self.debug_mode,
            "category": self.model_reference_category,
            "host_counter": self._host_counter,
        }

        if hasattr(self, "existing_showcase_files"):
            validation_context["existing_showcase_files"] = self.existing_showcase_files

        try:
            legacy_record = self.model_reference_type.model_validate(model_payload, context=validation_context)
            self._all_legacy_records[csv_row.name] = legacy_record
            if validation_issues:
                for validation_issue in validation_issues:
                    self.add_validation_error_to_log(model_record_key=csv_row.name, error=validation_issue)
        except ValidationError as e:
            error = f"CRITICAL: Error parsing {csv_row.name}:\n{e}"
            self.add_validation_error_to_log(model_record_key=csv_row.name, error=error)
            raise

_convert_single_record

_convert_single_record(
    legacy_record: LegacyGenericRecord,
) -> TextGenerationModelRecord | None

Convert a single legacy text generation record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def _convert_single_record(
    self,
    legacy_record: LegacyGenericRecord,
) -> TextGenerationModelRecord | None:
    """Convert a single legacy text generation record to the new format."""
    if not isinstance(legacy_record, LegacyTextGenerationRecord):
        raise TypeError(f"Expected {legacy_record.name} to be a LegacyTextGenerationRecord.")

    model_record_config = self._convert_model_record_config(legacy_record)

    # Drop backend-prefixed entries (they are duplicates of base models)
    # Backend prefixes are only generated during GitHub sync, not stored internally
    from horde_model_reference.text_backend_names import has_legacy_text_backend_prefix

    if has_legacy_text_backend_prefix(legacy_record.name):
        self.add_validation_error_to_log(
            model_record_key=legacy_record.name,
            error=(
                f"Model name '{legacy_record.name}' has a backend prefix. "
                "Dropping this record as it is a duplicate (backend prefixes are not stored internally)."
            ),
        )
        return None

    return TextGenerationModelRecord(
        name=legacy_record.name,
        description=legacy_record.description,
        version=legacy_record.version,
        config=model_record_config,
        baseline=legacy_record.baseline,
        parameters=legacy_record.parameters or 0,
        nsfw=legacy_record.nsfw or False,
        style=legacy_record.style,
        display_name=legacy_record.display_name,
        url=legacy_record.url,
        tags=legacy_record.tags or [],
        instruct_format=legacy_record.instruct_format,
        settings=legacy_record.settings,
        model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
    )

post_parse_records

post_parse_records() -> None

Populate text_model_group field for all text generation records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def post_parse_records(self) -> None:
    """Populate text_model_group field for all text generation records."""
    from horde_model_reference.analytics.text_model_parser import group_text_models_by_base

    # Get all model names
    model_names = list(self._all_converted_records.keys())

    # Group models by base name
    grouped_models = group_text_models_by_base(model_names)

    # Update each record with its group name
    for base_name, group in grouped_models.items():
        for model_name in group.variants:
            if model_name in self._all_converted_records:
                record = self._all_converted_records[model_name]
                if isinstance(record, TextGenerationModelRecord):
                    record.text_model_group = base_name

    logger.debug(f"Populated text_model_group for {len(self._all_converted_records)} text generation records")

_initialize

_initialize() -> None

Initialize the converter, allowing re-conversion if applicable.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _initialize(self) -> None:
    """Initialize the converter, allowing re-conversion if applicable."""
    self._all_legacy_records = {}
    self._all_converted_records = {}
    self.all_validation_errors_log = {}
    self._host_counter = {}
    self.converted_successfully = False

convert_to_new_format

convert_to_new_format() -> dict[str, GenericModelRecord]

Convert the legacy model reference to the new format.

Returns:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_to_new_format(self) -> dict[str, GenericModelRecord]:
    """Convert the legacy model reference to the new format.

    Returns:
        The converted model records in the new format.

    """
    if self.converted_successfully:
        self._initialize()

    self.pre_parse_records()
    self._load_and_validate_legacy_records()
    self._convert_legacy_to_new_format()
    self.post_parse_records()
    self.write_out_validation_errors()
    self.write_out_records()

    self.converted_successfully = True

    return self._all_converted_records

_convert_legacy_to_new_format

_convert_legacy_to_new_format() -> None

Convert validated legacy records to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_legacy_to_new_format(self) -> None:
    """Convert validated legacy records to the new format."""
    for model_key, legacy_record in self._all_legacy_records.items():
        try:
            converted_record = self._convert_single_record(legacy_record)
            if converted_record is None:
                self.add_validation_error_to_log(
                    model_record_key=model_key, error="Failed to convert legacy record to new format"
                )
            else:
                self._all_converted_records[model_key] = converted_record
        except Exception as e:
            error = f"Failed to convert {model_key}: {e}"
            self.add_validation_error_to_log(model_record_key=model_key, error=error)
            raise

_convert_model_record_config

_convert_model_record_config(
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecordConfig

Convert the config section of a legacy record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_model_record_config(self, legacy_record: LegacyGenericRecord) -> GenericModelRecordConfig:
    """Convert the config section of a legacy record to the new format."""
    download_records: dict[str, DownloadRecord] = {}

    for file_entry in legacy_record.config.files:
        if file_entry.path and "yaml" not in file_entry.path.lower():
            download_records[file_entry.path] = DownloadRecord(
                file_name=file_entry.path,
                file_url="",
                sha256sum=file_entry.sha256sum if file_entry.sha256sum else "FIXME",
                file_purpose=file_entry.file_type,
                known_slow_download=any(
                    slow_url in file_entry.path.lower() for slow_url in _SLOW_DOWNLOAD_HOST_SUBSTRINGS
                ),
            )

    for download_entry in legacy_record.config.download:
        if download_entry.file_name in download_records:
            download_records[download_entry.file_name].file_url = download_entry.file_url or ""
        else:
            raise ValueError(f"Unknown download entry: {download_entry.file_name}")

    return GenericModelRecordConfig(download=list(download_records.values()))

_convert_single_record_to_legacy

_convert_single_record_to_legacy(
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord

Convert a single v2 record back to legacy format.

This is a stub for future implementation. Override this in subclasses for category-specific conversion.

Parameters:

  • v2_record (GenericModelRecord) –

    The v2 format record to convert back to legacy format.

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_single_record_to_legacy(
    self,
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord:
    """Convert a single v2 record back to legacy format.

    This is a stub for future implementation. Override this in subclasses for category-specific conversion.

    Args:
        v2_record: The v2 format record to convert back to legacy format.

    Returns:
        The legacy format record.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported."
    )

convert_from_v2_to_legacy

convert_from_v2_to_legacy(
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]

Convert all v2 records back to legacy format.

This is a stub for future implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_from_v2_to_legacy(
    self,
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]:
    """Convert all v2 records back to legacy format.

    This is a stub for future implementation.

    Args:
        v2_records: Dictionary of v2 format records to convert.

    Returns:
        Dictionary of legacy format records.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported. "
        f"Attempted to convert {len(v2_records)} records from category: {self.model_reference_category}"
    )

pre_parse_records

pre_parse_records() -> None

Override to perform category-specific pre-parsing.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def pre_parse_records(self) -> None:
    """Override to perform category-specific pre-parsing."""

write_out_records

write_out_records() -> None

Write out the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_records(self) -> None:
    """Write out the converted records."""
    if self.dry_run:
        return
    # Serialize the converted records to a canonical JSON string first so we can
    # compare with the on-disk file and avoid rewriting (which changes mtime).
    final_serialized = json.dumps(
        self._all_converted_records,
        indent=4,
        default=lambda o: o.model_dump(
            exclude_none=True,
            exclude_unset=False,
            by_alias=True,
        ),
    )
    # keep trailing newline for consistency with other writers
    final_serialized = final_serialized + "\n"

    target_path = Path(self.converted_database_file_path)
    target_path.parent.mkdir(parents=True, exist_ok=True)

    # If the file already exists and the content is identical, skip writing to
    # preserve the existing mtime and avoid invalidating caches that rely on it.
    try:
        if target_path.exists():
            existing = target_path.read_text()
            if existing == final_serialized:
                logger.debug(f"No change to converted file {target_path}, skipping write.")
                return
    except Exception:
        # If we can't read the existing file for any reason, continue and overwrite.
        pass

    # Write atomically: write to a temporary file in the same directory and replace.
    tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
    tmp_path.write_text(final_serialized)
    tmp_path.replace(target_path)

get_records

get_records() -> dict[str, GenericModelRecord]

Return the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def get_records(self) -> dict[str, GenericModelRecord]:
    """Return the converted records."""
    return self._all_converted_records

add_validation_error_to_log

add_validation_error_to_log(
    *, model_record_key: str, error: str
) -> None

Add a validation error to the log.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def add_validation_error_to_log(
    self,
    *,
    model_record_key: str,
    error: str,
) -> None:
    """Add a validation error to the log."""
    if model_record_key not in self.all_validation_errors_log:
        self.all_validation_errors_log[model_record_key] = []
    self.all_validation_errors_log[model_record_key].append(error)

    if self.debug_mode:
        logger.debug(f"{model_record_key} has error: {error}")

write_out_validation_errors

write_out_validation_errors() -> None

Write out the validation errors.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_validation_errors(self) -> None:
    """Write out the validation errors."""
    if self.dry_run or not self.debug_mode:
        return

    log_file = self.log_folder.joinpath(self.model_reference_category + ".log")
    log_file.parent.mkdir(parents=True, exist_ok=True)
    with open(log_file, "w") as validation_errors_log_file:
        validation_errors_log_file.write(
            json.dumps(
                self.all_validation_errors_log,
                indent=4,
            ),
        )

LegacyControlnetConverter

Bases: BaseLegacyConverter

Converter for legacy ControlNet model reference records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
class LegacyControlnetConverter(BaseLegacyConverter):
    """Converter for legacy ControlNet model reference records."""

    def __init__(
        self,
        *,
        legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
        target_file_folder: str | Path = horde_model_reference_paths.base_path,
        debug_mode: bool = False,
    ) -> None:
        """Initialize the legacy ControlNet converter."""
        super().__init__(
            legacy_folder_path=legacy_folder_path,
            target_file_folder=target_file_folder,
            model_reference_category=MODEL_REFERENCE_CATEGORY.controlnet,
            debug_mode=debug_mode,
        )

    @override
    def _convert_single_record(
        self,
        legacy_record: LegacyGenericRecord,
    ) -> GenericModelRecord:
        """Convert a single legacy ControlNet record to the new format."""
        if not isinstance(legacy_record, LegacyGenericRecord):
            raise TypeError(f"Expected {legacy_record.name} to be a LegacyGenericRecord.")

        model_record_config = self._convert_model_record_config(legacy_record)

        # Legacy controlnet records use 'type' field for the controlnet style
        # (e.g., control_canny, control_depth, etc.), but some older records
        # may use the 'style' field instead. Try 'type' first, then fall back to 'style'.
        controlnet_style = legacy_record.type or legacy_record.style

        return ControlNetModelRecord(
            name=legacy_record.name,
            description=legacy_record.description,
            version=legacy_record.version,
            config=model_record_config,
            controlnet_style=controlnet_style,
            model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
        )

legacy_folder_path instance-attribute

legacy_folder_path: Path = (
    normalized_legacy_base / LEGACY_REFERENCE_FOLDER_NAME
)

legacy_database_path instance-attribute

legacy_database_path: Path = (
    get_legacy_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_legacy_base,
    )
)

converted_folder_path instance-attribute

converted_folder_path: Path = Path(normalized_target_base)

converted_database_file_path instance-attribute

converted_database_file_path: Path = (
    get_model_reference_file_path(
        model_reference_category=model_reference_category,
        base_path=normalized_target_base,
    )
)

model_reference_category instance-attribute

model_reference_category: MODEL_REFERENCE_CATEGORY = (
    model_reference_category
)

model_reference_type instance-attribute

model_reference_type: type[LegacyGenericRecord] = (
    LegacyGenericRecord
)

_all_legacy_records instance-attribute

_all_legacy_records: dict[str, LegacyGenericRecord]

All validated legacy model records.

_all_converted_records instance-attribute

_all_converted_records: dict[str, GenericModelRecord]

All converted model records in the new format.

all_validation_errors_log instance-attribute

all_validation_errors_log: dict[str, list[str]]

All validation errors that occurred during conversion.

_host_counter instance-attribute

_host_counter: dict[str, int]

Counter for tracking download hosts across all records.

debug_mode class-attribute instance-attribute

debug_mode: bool = debug_mode

log_folder instance-attribute

log_folder: Path = Path(log_folder)

dry_run class-attribute instance-attribute

dry_run: bool = dry_run

converted_successfully class-attribute instance-attribute

converted_successfully: bool = False

__init__

__init__(
    *,
    legacy_folder_path: str
    | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str
    | Path = horde_model_reference_paths.base_path,
    debug_mode: bool = False,
) -> None

Initialize the legacy ControlNet converter.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def __init__(
    self,
    *,
    legacy_folder_path: str | Path = horde_model_reference_paths.legacy_path,
    target_file_folder: str | Path = horde_model_reference_paths.base_path,
    debug_mode: bool = False,
) -> None:
    """Initialize the legacy ControlNet converter."""
    super().__init__(
        legacy_folder_path=legacy_folder_path,
        target_file_folder=target_file_folder,
        model_reference_category=MODEL_REFERENCE_CATEGORY.controlnet,
        debug_mode=debug_mode,
    )

_convert_single_record

_convert_single_record(
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecord

Convert a single legacy ControlNet record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
@override
def _convert_single_record(
    self,
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecord:
    """Convert a single legacy ControlNet record to the new format."""
    if not isinstance(legacy_record, LegacyGenericRecord):
        raise TypeError(f"Expected {legacy_record.name} to be a LegacyGenericRecord.")

    model_record_config = self._convert_model_record_config(legacy_record)

    # Legacy controlnet records use 'type' field for the controlnet style
    # (e.g., control_canny, control_depth, etc.), but some older records
    # may use the 'style' field instead. Try 'type' first, then fall back to 'style'.
    controlnet_style = legacy_record.type or legacy_record.style

    return ControlNetModelRecord(
        name=legacy_record.name,
        description=legacy_record.description,
        version=legacy_record.version,
        config=model_record_config,
        controlnet_style=controlnet_style,
        model_classification=MODEL_CLASSIFICATION_LOOKUP[self.model_reference_category],
    )

_initialize

_initialize() -> None

Initialize the converter, allowing re-conversion if applicable.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _initialize(self) -> None:
    """Initialize the converter, allowing re-conversion if applicable."""
    self._all_legacy_records = {}
    self._all_converted_records = {}
    self.all_validation_errors_log = {}
    self._host_counter = {}
    self.converted_successfully = False

convert_to_new_format

convert_to_new_format() -> dict[str, GenericModelRecord]

Convert the legacy model reference to the new format.

Returns:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_to_new_format(self) -> dict[str, GenericModelRecord]:
    """Convert the legacy model reference to the new format.

    Returns:
        The converted model records in the new format.

    """
    if self.converted_successfully:
        self._initialize()

    self.pre_parse_records()
    self._load_and_validate_legacy_records()
    self._convert_legacy_to_new_format()
    self.post_parse_records()
    self.write_out_validation_errors()
    self.write_out_records()

    self.converted_successfully = True

    return self._all_converted_records

_load_and_validate_legacy_records

_load_and_validate_legacy_records() -> None

Load and validate all legacy records using Pydantic models.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _load_and_validate_legacy_records(self) -> None:
    """Load and validate all legacy records using Pydantic models."""
    # Check if file exists and is not empty
    if not self.legacy_database_path.exists():
        logger.debug(f"Legacy database file {self.legacy_database_path} does not exist, skipping conversion")
        return

    file_size = self.legacy_database_path.stat().st_size
    if file_size == 0:
        logger.debug(f"Legacy database file {self.legacy_database_path} is empty, skipping conversion")
        return

    with open(self.legacy_database_path) as legacy_model_reference_file:
        raw_legacy_json_data: dict[str, dict[str, Any]] = json.load(legacy_model_reference_file)

    for model_record_key, model_record_contents in raw_legacy_json_data.items():
        issues: list[str] = []
        validation_context = {
            "issues": issues,
            "model_key": model_record_key,
            "debug_mode": self.debug_mode,
            "category": self.model_reference_category,
            "host_counter": self._host_counter,
        }

        # Add existing showcase files to context for stable diffusion
        if hasattr(self, "existing_showcase_files"):
            validation_context["existing_showcase_files"] = self.existing_showcase_files

        try:
            legacy_record = self.model_reference_type.model_validate(
                model_record_contents,
                context=validation_context,
            )
            self._all_legacy_records[model_record_key] = legacy_record

            if issues:
                for issue in issues:
                    self.add_validation_error_to_log(model_record_key=model_record_key, error=issue)
        except ValidationError as e:
            error = f"CRITICAL: Error parsing {model_record_key}:\n{e}"
            self.add_validation_error_to_log(model_record_key=model_record_key, error=error)
            raise

_convert_legacy_to_new_format

_convert_legacy_to_new_format() -> None

Convert validated legacy records to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_legacy_to_new_format(self) -> None:
    """Convert validated legacy records to the new format."""
    for model_key, legacy_record in self._all_legacy_records.items():
        try:
            converted_record = self._convert_single_record(legacy_record)
            if converted_record is None:
                self.add_validation_error_to_log(
                    model_record_key=model_key, error="Failed to convert legacy record to new format"
                )
            else:
                self._all_converted_records[model_key] = converted_record
        except Exception as e:
            error = f"Failed to convert {model_key}: {e}"
            self.add_validation_error_to_log(model_record_key=model_key, error=error)
            raise

_convert_model_record_config

_convert_model_record_config(
    legacy_record: LegacyGenericRecord,
) -> GenericModelRecordConfig

Convert the config section of a legacy record to the new format.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_model_record_config(self, legacy_record: LegacyGenericRecord) -> GenericModelRecordConfig:
    """Convert the config section of a legacy record to the new format."""
    download_records: dict[str, DownloadRecord] = {}

    for file_entry in legacy_record.config.files:
        if file_entry.path and "yaml" not in file_entry.path.lower():
            download_records[file_entry.path] = DownloadRecord(
                file_name=file_entry.path,
                file_url="",
                sha256sum=file_entry.sha256sum if file_entry.sha256sum else "FIXME",
                file_purpose=file_entry.file_type,
                known_slow_download=any(
                    slow_url in file_entry.path.lower() for slow_url in _SLOW_DOWNLOAD_HOST_SUBSTRINGS
                ),
            )

    for download_entry in legacy_record.config.download:
        if download_entry.file_name in download_records:
            download_records[download_entry.file_name].file_url = download_entry.file_url or ""
        else:
            raise ValueError(f"Unknown download entry: {download_entry.file_name}")

    return GenericModelRecordConfig(download=list(download_records.values()))

_convert_single_record_to_legacy

_convert_single_record_to_legacy(
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord

Convert a single v2 record back to legacy format.

This is a stub for future implementation. Override this in subclasses for category-specific conversion.

Parameters:

  • v2_record (GenericModelRecord) –

    The v2 format record to convert back to legacy format.

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def _convert_single_record_to_legacy(
    self,
    v2_record: GenericModelRecord,
) -> LegacyGenericRecord:
    """Convert a single v2 record back to legacy format.

    This is a stub for future implementation. Override this in subclasses for category-specific conversion.

    Args:
        v2_record: The v2 format record to convert back to legacy format.

    Returns:
        The legacy format record.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported."
    )

convert_from_v2_to_legacy

convert_from_v2_to_legacy(
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]

Convert all v2 records back to legacy format.

This is a stub for future implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def convert_from_v2_to_legacy(
    self,
    v2_records: dict[str, GenericModelRecord],
) -> dict[str, LegacyGenericRecord]:
    """Convert all v2 records back to legacy format.

    This is a stub for future implementation.

    Args:
        v2_records: Dictionary of v2 format records to convert.

    Returns:
        Dictionary of legacy format records.

    Raises:
        NotImplementedError: This conversion is not yet implemented.

    """
    raise NotImplementedError(
        "v2 → legacy conversion is not yet implemented. "
        "This feature is planned for a future release. "
        "For now, only legacy → v2 conversion is supported. "
        f"Attempted to convert {len(v2_records)} records from category: {self.model_reference_category}"
    )

pre_parse_records

pre_parse_records() -> None

Override to perform category-specific pre-parsing.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def pre_parse_records(self) -> None:
    """Override to perform category-specific pre-parsing."""

post_parse_records

post_parse_records() -> None

Override to perform category-specific post-parsing.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def post_parse_records(self) -> None:
    """Override to perform category-specific post-parsing."""

write_out_records

write_out_records() -> None

Write out the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_records(self) -> None:
    """Write out the converted records."""
    if self.dry_run:
        return
    # Serialize the converted records to a canonical JSON string first so we can
    # compare with the on-disk file and avoid rewriting (which changes mtime).
    final_serialized = json.dumps(
        self._all_converted_records,
        indent=4,
        default=lambda o: o.model_dump(
            exclude_none=True,
            exclude_unset=False,
            by_alias=True,
        ),
    )
    # keep trailing newline for consistency with other writers
    final_serialized = final_serialized + "\n"

    target_path = Path(self.converted_database_file_path)
    target_path.parent.mkdir(parents=True, exist_ok=True)

    # If the file already exists and the content is identical, skip writing to
    # preserve the existing mtime and avoid invalidating caches that rely on it.
    try:
        if target_path.exists():
            existing = target_path.read_text()
            if existing == final_serialized:
                logger.debug(f"No change to converted file {target_path}, skipping write.")
                return
    except Exception:
        # If we can't read the existing file for any reason, continue and overwrite.
        pass

    # Write atomically: write to a temporary file in the same directory and replace.
    tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
    tmp_path.write_text(final_serialized)
    tmp_path.replace(target_path)

get_records

get_records() -> dict[str, GenericModelRecord]

Return the converted records.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def get_records(self) -> dict[str, GenericModelRecord]:
    """Return the converted records."""
    return self._all_converted_records

add_validation_error_to_log

add_validation_error_to_log(
    *, model_record_key: str, error: str
) -> None

Add a validation error to the log.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def add_validation_error_to_log(
    self,
    *,
    model_record_key: str,
    error: str,
) -> None:
    """Add a validation error to the log."""
    if model_record_key not in self.all_validation_errors_log:
        self.all_validation_errors_log[model_record_key] = []
    self.all_validation_errors_log[model_record_key].append(error)

    if self.debug_mode:
        logger.debug(f"{model_record_key} has error: {error}")

write_out_validation_errors

write_out_validation_errors() -> None

Write out the validation errors.

Source code in src/horde_model_reference/legacy/classes/legacy_converters.py
def write_out_validation_errors(self) -> None:
    """Write out the validation errors."""
    if self.dry_run or not self.debug_mode:
        return

    log_file = self.log_folder.joinpath(self.model_reference_category + ".log")
    log_file.parent.mkdir(parents=True, exist_ok=True)
    with open(log_file, "w") as validation_errors_log_file:
        validation_errors_log_file.write(
            json.dumps(
                self.all_validation_errors_log,
                indent=4,
            ),
        )