Skip to content

model_reference_metadata

Backend metadata tracking system for model reference operations.

This module provides centralized metadata tracking for both legacy (v1) and v2 format operations. All metadata updates go through MetadataManager methods to enable observability hooks.

OperationType

Bases: StrEnum

Type of CRUD operation performed on model references.

Source code in src/horde_model_reference/model_reference_metadata.py
class OperationType(StrEnum):
    """Type of CRUD operation performed on model references."""

    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"

CREATE class-attribute instance-attribute

CREATE = 'create'

UPDATE class-attribute instance-attribute

UPDATE = 'update'

DELETE class-attribute instance-attribute

DELETE = 'delete'

CategoryMetadata

Bases: BaseModel

Metadata tracking for a single model reference category.

Tracks operation counts, timestamps, and health metrics for either legacy or v2 format operations.

Source code in src/horde_model_reference/model_reference_metadata.py
class CategoryMetadata(BaseModel):
    """Metadata tracking for a single model reference category.

    Tracks operation counts, timestamps, and health metrics for either legacy or v2 format operations.
    """

    model_config = ConfigDict(
        use_enum_values=True,
        use_attribute_docstrings=True,
    )

    category: MODEL_REFERENCE_CATEGORY
    """The category of the model reference."""
    last_updated: int
    """Unix timestamp when metadata was last updated."""
    last_operation_type: OperationType | None = None
    """Type of the last operation performed."""
    last_operation_model: str | None = None
    """Name of the model affected by the last operation."""

    total_creates: int = Field(default=0, ge=0)
    """Total number of create operations."""
    total_updates: int = Field(default=0, ge=0)
    """Total number of update operations."""
    total_deletes: int = Field(default=0, ge=0)
    """Total number of delete operations."""
    total_models: int = Field(default=0, ge=0)
    """Current total number of models in category."""

    initialization_time: int
    """Unix timestamp when metadata was first created."""
    last_successful_operation: int
    """Unix timestamp of the last successful operation."""
    error_count: int = 0
    """Total number of errors encountered."""

    metadata_schema_version: str = "1.0.0"
    """Version of the metadata schema."""
    backend_type: str
    """Type of backend performing operations."""

model_config class-attribute instance-attribute

model_config = ConfigDict(
    use_enum_values=True, use_attribute_docstrings=True
)

category instance-attribute

category: MODEL_REFERENCE_CATEGORY

The category of the model reference.

last_updated instance-attribute

last_updated: int

Unix timestamp when metadata was last updated.

last_operation_type class-attribute instance-attribute

last_operation_type: OperationType | None = None

Type of the last operation performed.

last_operation_model class-attribute instance-attribute

last_operation_model: str | None = None

Name of the model affected by the last operation.

total_creates class-attribute instance-attribute

total_creates: int = Field(default=0, ge=0)

Total number of create operations.

total_updates class-attribute instance-attribute

total_updates: int = Field(default=0, ge=0)

Total number of update operations.

total_deletes class-attribute instance-attribute

total_deletes: int = Field(default=0, ge=0)

Total number of delete operations.

total_models class-attribute instance-attribute

total_models: int = Field(default=0, ge=0)

Current total number of models in category.

initialization_time instance-attribute

initialization_time: int

Unix timestamp when metadata was first created.

last_successful_operation instance-attribute

last_successful_operation: int

Unix timestamp of the last successful operation.

error_count class-attribute instance-attribute

error_count: int = 0

Total number of errors encountered.

metadata_schema_version class-attribute instance-attribute

metadata_schema_version: str = '1.0.0'

Version of the metadata schema.

backend_type instance-attribute

backend_type: str

Type of backend performing operations.

GenericModelRecordMetadata

Bases: BaseModel

Metadata for a generic model record.

This is imported from model_reference_records but defined here as a forward reference to avoid circular imports.

Source code in src/horde_model_reference/model_reference_metadata.py
class GenericModelRecordMetadata(BaseModel):
    """Metadata for a generic model record.

    This is imported from model_reference_records but defined here as a forward reference
    to avoid circular imports.
    """

    model_config = ConfigDict(
        extra="allow",
    )

    schema_version: str = Field(default="1.0.0")
    created_at: int | None = None
    updated_at: int | None = None
    created_by: str | None = None
    updated_by: str | None = None

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='allow')

schema_version class-attribute instance-attribute

schema_version: str = Field(default='1.0.0')

created_at class-attribute instance-attribute

created_at: int | None = None

updated_at class-attribute instance-attribute

updated_at: int | None = None

created_by class-attribute instance-attribute

created_by: str | None = None

updated_by class-attribute instance-attribute

updated_by: str | None = None

ModelMetadataHandlerProtocol

Bases: Protocol

Protocol for custom per-model metadata transformation handlers.

Implementations can be injected into ModelMetadataManager to provide custom metadata transformation logic for specific use cases.

Source code in src/horde_model_reference/model_reference_metadata.py
class ModelMetadataHandlerProtocol(Protocol):
    """Protocol for custom per-model metadata transformation handlers.

    Implementations can be injected into ModelMetadataManager to provide
    custom metadata transformation logic for specific use cases.
    """

    def transform_metadata(
        self,
        metadata: GenericModelRecordMetadata,
        context: dict[str, Any],
    ) -> GenericModelRecordMetadata:
        """Transform metadata with custom logic.

        Args:
            metadata: The metadata to transform.
            context: Additional context (e.g., model_name, category, operation_type).

        Returns:
            GenericModelRecordMetadata: Transformed metadata.

        """
        ...

transform_metadata

transform_metadata(
    metadata: GenericModelRecordMetadata,
    context: dict[str, Any],
) -> GenericModelRecordMetadata

Transform metadata with custom logic.

Parameters:

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def transform_metadata(
    self,
    metadata: GenericModelRecordMetadata,
    context: dict[str, Any],
) -> GenericModelRecordMetadata:
    """Transform metadata with custom logic.

    Args:
        metadata: The metadata to transform.
        context: Additional context (e.g., model_name, category, operation_type).

    Returns:
        GenericModelRecordMetadata: Transformed metadata.

    """
    ...

ModelMetadataManager

Manages per-model metadata operations with strong typing.

This class provides a typed interface for working with model metadata, eliminating the need for raw dictionary manipulation and magic strings. Supports dependency injection of custom handlers for specialized behavior.

Source code in src/horde_model_reference/model_reference_metadata.py
class ModelMetadataManager:
    """Manages per-model metadata operations with strong typing.

    This class provides a typed interface for working with model metadata,
    eliminating the need for raw dictionary manipulation and magic strings.
    Supports dependency injection of custom handlers for specialized behavior.
    """

    def __init__(self, custom_handler: ModelMetadataHandlerProtocol | None = None) -> None:
        """Initialize the model metadata manager.

        Args:
            custom_handler: Optional custom handler for metadata transformation.

        """
        self._custom_handler = custom_handler

    def get_metadata(self, record_dict: dict[str, Any]) -> GenericModelRecordMetadata:
        """Get metadata from a model record dictionary.

        Args:
            record_dict: Model record as dictionary.

        Returns:
            GenericModelRecordMetadata: The metadata object, or a new empty one if not present.

        """
        metadata_dict = record_dict.get("metadata", {})
        return GenericModelRecordMetadata(**metadata_dict)

    def set_metadata(self, record_dict: dict[str, Any], metadata: GenericModelRecordMetadata) -> None:
        """Set metadata on a model record dictionary.

        Args:
            record_dict: Model record as dictionary.
            metadata: The metadata to set.

        """
        record_dict["metadata"] = metadata.model_dump(exclude_unset=True, mode="json")

    def update_metadata(
        self,
        record_dict: dict[str, Any],
        **updates: Any,  # noqa: ANN401
    ) -> GenericModelRecordMetadata:
        """Update specific metadata fields on a model record.

        Args:
            record_dict: Model record as dictionary.
            **updates: Field names and values to update.

        Returns:
            GenericModelRecordMetadata: The updated metadata object.

        """
        metadata = self.get_metadata(record_dict)
        for field, value in updates.items():
            if hasattr(metadata, field):
                setattr(metadata, field, value)
        self.set_metadata(record_dict, metadata)
        return metadata

    def preserve_creation_fields(
        self,
        existing_record: dict[str, Any],
        new_record: dict[str, Any],
    ) -> None:
        """Preserve creation metadata fields from existing record when updating.

        Args:
            existing_record: The existing model record with metadata to preserve.
            new_record: The new model record to update with preserved metadata.

        """
        existing_metadata = self.get_metadata(existing_record)
        new_metadata = self.get_metadata(new_record)

        if existing_metadata.created_at is not None:
            new_metadata.created_at = existing_metadata.created_at
        if existing_metadata.created_by is not None:
            new_metadata.created_by = existing_metadata.created_by

        self.set_metadata(new_record, new_metadata)

    def set_creation_timestamp(
        self,
        record_dict: dict[str, Any],
        timestamp: int | None = None,
    ) -> None:
        """Set creation timestamp on a model record.

        Args:
            record_dict: Model record as dictionary.
            timestamp: Unix timestamp to use, or None to use current time.

        """
        if timestamp is None:
            timestamp = int(time.time())
        self.update_metadata(record_dict, created_at=timestamp)

    def set_update_timestamp(
        self,
        record_dict: dict[str, Any],
        timestamp: int | None = None,
    ) -> None:
        """Set update timestamp on a model record.

        Args:
            record_dict: Model record as dictionary.
            timestamp: Unix timestamp to use, or None to use current time.

        """
        if timestamp is None:
            timestamp = int(time.time())
        self.update_metadata(record_dict, updated_at=timestamp)

    def ensure_metadata_populated(
        self,
        record_dict: dict[str, Any],
        timestamp: int | None = None,
    ) -> bool:
        """Ensure model record has all required metadata fields populated.

        Populates missing created_at and updated_at timestamps if they are None or missing.

        Args:
            record_dict: Model record as dictionary.
            timestamp: Unix timestamp to use for missing fields, or None to use current time.

        Returns:
            bool: True if any metadata fields were populated, False if all were already present.

        """
        if timestamp is None:
            timestamp = int(time.time())

        metadata = self.get_metadata(record_dict)
        updated = False

        if metadata.created_at is None:
            metadata.created_at = timestamp
            updated = True

        if metadata.updated_at is None:
            metadata.updated_at = timestamp
            updated = True

        if updated:
            self.set_metadata(record_dict, metadata)

        return updated

    def apply_custom_handler(
        self,
        record_dict: dict[str, Any],
        context: dict[str, Any],
    ) -> None:
        """Apply custom handler transformation if one is configured.

        Args:
            record_dict: Model record as dictionary.
            context: Additional context for the transformation.

        """
        if self._custom_handler is not None:
            metadata = self.get_metadata(record_dict)
            transformed = self._custom_handler.transform_metadata(metadata, context)
            self.set_metadata(record_dict, transformed)

_custom_handler instance-attribute

_custom_handler = custom_handler

__init__

__init__(
    custom_handler: ModelMetadataHandlerProtocol
    | None = None,
) -> None

Initialize the model metadata manager.

Parameters:

Source code in src/horde_model_reference/model_reference_metadata.py
def __init__(self, custom_handler: ModelMetadataHandlerProtocol | None = None) -> None:
    """Initialize the model metadata manager.

    Args:
        custom_handler: Optional custom handler for metadata transformation.

    """
    self._custom_handler = custom_handler

get_metadata

get_metadata(
    record_dict: dict[str, Any],
) -> GenericModelRecordMetadata

Get metadata from a model record dictionary.

Parameters:

  • record_dict (dict[str, Any]) –

    Model record as dictionary.

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def get_metadata(self, record_dict: dict[str, Any]) -> GenericModelRecordMetadata:
    """Get metadata from a model record dictionary.

    Args:
        record_dict: Model record as dictionary.

    Returns:
        GenericModelRecordMetadata: The metadata object, or a new empty one if not present.

    """
    metadata_dict = record_dict.get("metadata", {})
    return GenericModelRecordMetadata(**metadata_dict)

set_metadata

set_metadata(
    record_dict: dict[str, Any],
    metadata: GenericModelRecordMetadata,
) -> None

Set metadata on a model record dictionary.

Parameters:

Source code in src/horde_model_reference/model_reference_metadata.py
def set_metadata(self, record_dict: dict[str, Any], metadata: GenericModelRecordMetadata) -> None:
    """Set metadata on a model record dictionary.

    Args:
        record_dict: Model record as dictionary.
        metadata: The metadata to set.

    """
    record_dict["metadata"] = metadata.model_dump(exclude_unset=True, mode="json")

update_metadata

update_metadata(
    record_dict: dict[str, Any], **updates: Any
) -> GenericModelRecordMetadata

Update specific metadata fields on a model record.

Parameters:

  • record_dict (dict[str, Any]) –

    Model record as dictionary.

  • **updates (Any, default: {} ) –

    Field names and values to update.

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def update_metadata(
    self,
    record_dict: dict[str, Any],
    **updates: Any,  # noqa: ANN401
) -> GenericModelRecordMetadata:
    """Update specific metadata fields on a model record.

    Args:
        record_dict: Model record as dictionary.
        **updates: Field names and values to update.

    Returns:
        GenericModelRecordMetadata: The updated metadata object.

    """
    metadata = self.get_metadata(record_dict)
    for field, value in updates.items():
        if hasattr(metadata, field):
            setattr(metadata, field, value)
    self.set_metadata(record_dict, metadata)
    return metadata

preserve_creation_fields

preserve_creation_fields(
    existing_record: dict[str, Any],
    new_record: dict[str, Any],
) -> None

Preserve creation metadata fields from existing record when updating.

Parameters:

  • existing_record (dict[str, Any]) –

    The existing model record with metadata to preserve.

  • new_record (dict[str, Any]) –

    The new model record to update with preserved metadata.

Source code in src/horde_model_reference/model_reference_metadata.py
def preserve_creation_fields(
    self,
    existing_record: dict[str, Any],
    new_record: dict[str, Any],
) -> None:
    """Preserve creation metadata fields from existing record when updating.

    Args:
        existing_record: The existing model record with metadata to preserve.
        new_record: The new model record to update with preserved metadata.

    """
    existing_metadata = self.get_metadata(existing_record)
    new_metadata = self.get_metadata(new_record)

    if existing_metadata.created_at is not None:
        new_metadata.created_at = existing_metadata.created_at
    if existing_metadata.created_by is not None:
        new_metadata.created_by = existing_metadata.created_by

    self.set_metadata(new_record, new_metadata)

set_creation_timestamp

set_creation_timestamp(
    record_dict: dict[str, Any],
    timestamp: int | None = None,
) -> None

Set creation timestamp on a model record.

Parameters:

  • record_dict (dict[str, Any]) –

    Model record as dictionary.

  • timestamp (int | None, default: None ) –

    Unix timestamp to use, or None to use current time.

Source code in src/horde_model_reference/model_reference_metadata.py
def set_creation_timestamp(
    self,
    record_dict: dict[str, Any],
    timestamp: int | None = None,
) -> None:
    """Set creation timestamp on a model record.

    Args:
        record_dict: Model record as dictionary.
        timestamp: Unix timestamp to use, or None to use current time.

    """
    if timestamp is None:
        timestamp = int(time.time())
    self.update_metadata(record_dict, created_at=timestamp)

set_update_timestamp

set_update_timestamp(
    record_dict: dict[str, Any],
    timestamp: int | None = None,
) -> None

Set update timestamp on a model record.

Parameters:

  • record_dict (dict[str, Any]) –

    Model record as dictionary.

  • timestamp (int | None, default: None ) –

    Unix timestamp to use, or None to use current time.

Source code in src/horde_model_reference/model_reference_metadata.py
def set_update_timestamp(
    self,
    record_dict: dict[str, Any],
    timestamp: int | None = None,
) -> None:
    """Set update timestamp on a model record.

    Args:
        record_dict: Model record as dictionary.
        timestamp: Unix timestamp to use, or None to use current time.

    """
    if timestamp is None:
        timestamp = int(time.time())
    self.update_metadata(record_dict, updated_at=timestamp)

ensure_metadata_populated

ensure_metadata_populated(
    record_dict: dict[str, Any],
    timestamp: int | None = None,
) -> bool

Ensure model record has all required metadata fields populated.

Populates missing created_at and updated_at timestamps if they are None or missing.

Parameters:

  • record_dict (dict[str, Any]) –

    Model record as dictionary.

  • timestamp (int | None, default: None ) –

    Unix timestamp to use for missing fields, or None to use current time.

Returns:

  • bool ( bool ) –

    True if any metadata fields were populated, False if all were already present.

Source code in src/horde_model_reference/model_reference_metadata.py
def ensure_metadata_populated(
    self,
    record_dict: dict[str, Any],
    timestamp: int | None = None,
) -> bool:
    """Ensure model record has all required metadata fields populated.

    Populates missing created_at and updated_at timestamps if they are None or missing.

    Args:
        record_dict: Model record as dictionary.
        timestamp: Unix timestamp to use for missing fields, or None to use current time.

    Returns:
        bool: True if any metadata fields were populated, False if all were already present.

    """
    if timestamp is None:
        timestamp = int(time.time())

    metadata = self.get_metadata(record_dict)
    updated = False

    if metadata.created_at is None:
        metadata.created_at = timestamp
        updated = True

    if metadata.updated_at is None:
        metadata.updated_at = timestamp
        updated = True

    if updated:
        self.set_metadata(record_dict, metadata)

    return updated

apply_custom_handler

apply_custom_handler(
    record_dict: dict[str, Any], context: dict[str, Any]
) -> None

Apply custom handler transformation if one is configured.

Parameters:

  • record_dict (dict[str, Any]) –

    Model record as dictionary.

  • context (dict[str, Any]) –

    Additional context for the transformation.

Source code in src/horde_model_reference/model_reference_metadata.py
def apply_custom_handler(
    self,
    record_dict: dict[str, Any],
    context: dict[str, Any],
) -> None:
    """Apply custom handler transformation if one is configured.

    Args:
        record_dict: Model record as dictionary.
        context: Additional context for the transformation.

    """
    if self._custom_handler is not None:
        metadata = self.get_metadata(record_dict)
        transformed = self._custom_handler.transform_metadata(metadata, context)
        self.set_metadata(record_dict, transformed)

MetadataManager

Centralized manager for backend metadata tracking.

Handles metadata for both legacy (v1) and v2 format operations separately. Also provides access to per-model metadata operations via ModelMetadataManager. All metadata updates should go through this class to enable observability hooks.

Thread-safe with RLock protection.

Source code in src/horde_model_reference/model_reference_metadata.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
class MetadataManager:
    """Centralized manager for backend metadata tracking.

    Handles metadata for both legacy (v1) and v2 format operations separately.
    Also provides access to per-model metadata operations via ModelMetadataManager.
    All metadata updates should go through this class to enable observability hooks.

    Thread-safe with RLock protection.
    """

    def __init__(
        self,
        base_path: Path,
        model_metadata_manager: ModelMetadataManager | None = None,
    ) -> None:
        """Initialize metadata manager.

        Args:
            base_path: Base path for horde model reference data.
            model_metadata_manager: Optional custom ModelMetadataManager for dependency injection.
                If None, a default instance will be created.

        """
        self._base_path = base_path
        self._lock = RLock()

        # Per-model metadata manager (supports dependency injection)
        self._model_metadata_manager = model_metadata_manager or ModelMetadataManager()

        # Separate caching for legacy and v2 metadata
        self._legacy_cache: dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata] = {}
        self._v2_cache: dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata] = {}

        # Cache timestamps for TTL
        self._legacy_cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float] = {}
        self._v2_cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float] = {}

        # File modification times for cache invalidation
        self._legacy_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}
        self._v2_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}

        # Staleness tracking
        self._stale_legacy: set[MODEL_REFERENCE_CATEGORY] = set()
        self._stale_v2: set[MODEL_REFERENCE_CATEGORY] = set()

        # Cache TTL (default 60 seconds)
        self._cache_ttl_seconds: int = 60

        # Ensure meta directories exist
        self._ensure_meta_directories()

    @property
    def model_metadata(self) -> ModelMetadataManager:
        """Access point for per-model metadata operations.

        Returns:
            ModelMetadataManager: The model metadata manager instance.

        """
        return self._model_metadata_manager

    def _ensure_meta_directories(self) -> None:
        """Ensure meta/legacy and meta/v2 directories exist."""
        legacy_meta_path = self._base_path / "meta" / "legacy"
        v2_meta_path = self._base_path / "meta" / "v2"

        legacy_meta_path.mkdir(parents=True, exist_ok=True)
        v2_meta_path.mkdir(parents=True, exist_ok=True)

    def _get_legacy_metadata_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path:
        """Get file path for legacy metadata.

        Args:
            category: Model reference category

        Returns:
            Path to legacy metadata JSON file

        """
        return self._base_path / "meta" / "legacy" / f"{category.value}_metadata.json"

    def _get_v2_metadata_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path:
        """Get file path for v2 metadata.

        Args:
            category: Model reference category

        Returns:
            Path to v2 metadata JSON file

        """
        return self._base_path / "meta" / "v2" / f"{category.value}_metadata.json"

    def _read_metadata_file(self, file_path: Path) -> CategoryMetadata | None:
        """Read metadata from file.

        Args:
            file_path: Path to metadata file

        Returns:
            CategoryMetadata if file exists and is valid, None otherwise

        """
        if not file_path.exists():
            return None

        try:
            with open(file_path, encoding="utf-8") as f:
                data = json.load(f)
            return CategoryMetadata(**data)
        except Exception as e:
            logger.error(f"Failed to read metadata from {file_path}: {e}")
            return None

    def _write_metadata_file(self, file_path: Path, metadata: CategoryMetadata) -> None:
        """Write metadata to file atomically.

        Uses temp file + rename pattern for atomic writes with backup/rollback on error.

        Args:
            file_path: Path to metadata file
            metadata: Metadata to write

        """
        temp_path = file_path.with_suffix(f".tmp.{time.time()}")
        backup_path = file_path.with_suffix(".bak")

        try:
            # Write to temp file
            with open(temp_path, "w", encoding="utf-8") as f:
                json.dump(metadata.model_dump(mode="json"), f, indent=2)
                os.fsync(f.fileno())

            # Atomic rename with backup
            if file_path.exists():
                if backup_path.exists():
                    backup_path.unlink()
                file_path.replace(backup_path)

            temp_path.replace(file_path)

            # Clean up backup
            if backup_path.exists():
                backup_path.unlink()

        except Exception as e:
            logger.error(f"Failed to write metadata to {file_path}: {e}")
            # Rollback if needed
            if backup_path.exists() and not file_path.exists():
                backup_path.replace(file_path)
            raise
        finally:
            # Clean up temp file
            if temp_path.exists():
                temp_path.unlink()

    def _is_cache_valid(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float],
        mtimes: dict[MODEL_REFERENCE_CATEGORY, float],
        stale_set: set[MODEL_REFERENCE_CATEGORY],
        metadata_path_fn: Callable[[MODEL_REFERENCE_CATEGORY], Path],
    ) -> bool:
        """Check if cached metadata is valid.

        Args:
            category: Category to check
            cache_timestamps: Dict of cache timestamps
            mtimes: Dict of file modification times
            stale_set: Set of stale categories
            metadata_path_fn: Function to get metadata file path

        Returns:
            True if cache is valid, False otherwise

        """
        # Check explicit staleness
        if category in stale_set:
            return False

        # Check if cached
        if category not in cache_timestamps:
            return False

        # Check TTL
        cache_age = time.time() - cache_timestamps[category]
        if cache_age > self._cache_ttl_seconds:
            return False

        # Check file modification time
        metadata_path = metadata_path_fn(category)
        if metadata_path.exists():
            current_mtime = metadata_path.stat().st_mtime
            if category not in mtimes or mtimes[category] != current_mtime:
                return False

        return True

    def initialize_legacy_metadata(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        backend_type: str,
    ) -> CategoryMetadata:
        """Initialize new legacy metadata for a category.

        Args:
            category: Model reference category
            backend_type: Type of backend (FileSystemBackend/RedisBackend)

        Returns:
            Newly created CategoryMetadata

        """
        current_time = int(time.time())
        return CategoryMetadata(
            category=category,
            last_updated=current_time,
            initialization_time=current_time,
            last_successful_operation=current_time,
            backend_type=backend_type,
        )

    def initialize_v2_metadata(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        backend_type: str,
    ) -> CategoryMetadata:
        """Initialize new v2 metadata for a category.

        Args:
            category: Model reference category
            backend_type: Type of backend (FileSystemBackend/RedisBackend)

        Returns:
            Newly created CategoryMetadata

        """
        current_time = int(time.time())
        return CategoryMetadata(
            category=category,
            last_updated=current_time,
            initialization_time=current_time,
            last_successful_operation=current_time,
            backend_type=backend_type,
        )

    def get_or_initialize_v2_metadata(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        backend_type: str = "FileSystemBackend",
    ) -> CategoryMetadata:
        """Get v2 metadata for a category, initializing if it doesn't exist.

        This method is safe to call during initialization/seeding when metadata may not exist yet.
        It will create and persist new metadata if the file doesn't exist.

        Args:
            category: Model reference category
            backend_type: Type of backend (FileSystemBackend/RedisBackend)

        Returns:
            CategoryMetadata: The existing or newly created v2 metadata

        """
        with self._lock:
            metadata_path = self._get_v2_metadata_path(category)
            metadata = self._read_metadata_file(metadata_path)

            if metadata is None:
                # Initialize new metadata
                metadata = self.initialize_v2_metadata(category, backend_type)
                self._write_metadata_file(metadata_path, metadata)
                logger.debug(f"Initialized v2 metadata for {category.value}")

                # Update cache
                self._v2_cache[category] = metadata
                self._v2_cache_timestamps[category] = time.time()
                self._v2_mtimes[category] = metadata_path.stat().st_mtime
                self._stale_v2.discard(category)
            else:
                # Update cache if valid check hasn't already cached it
                if not self._is_cache_valid(
                    category,
                    self._v2_cache_timestamps,
                    self._v2_mtimes,
                    self._stale_v2,
                    self._get_v2_metadata_path,
                ):
                    self._v2_cache[category] = metadata
                    self._v2_cache_timestamps[category] = time.time()
                    self._v2_mtimes[category] = metadata_path.stat().st_mtime
                    self._stale_v2.discard(category)

            return metadata

    def get_or_initialize_legacy_metadata(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        backend_type: str = "FileSystemBackend",
    ) -> CategoryMetadata:
        """Get legacy metadata for a category, initializing if it doesn't exist.

        This method is safe to call during initialization/seeding when metadata may not exist yet.
        It will create and persist new metadata if the file doesn't exist.

        Args:
            category: Model reference category
            backend_type: Type of backend (FileSystemBackend/RedisBackend)

        Returns:
            CategoryMetadata: The existing or newly created legacy metadata

        """
        with self._lock:
            metadata_path = self._get_legacy_metadata_path(category)
            metadata = self._read_metadata_file(metadata_path)

            if metadata is None:
                # Initialize new metadata
                metadata = self.initialize_legacy_metadata(category, backend_type)
                self._write_metadata_file(metadata_path, metadata)
                logger.debug(f"Initialized legacy metadata for {category.value}")

                # Update cache
                self._legacy_cache[category] = metadata
                self._legacy_cache_timestamps[category] = time.time()
                self._legacy_mtimes[category] = metadata_path.stat().st_mtime
                self._stale_legacy.discard(category)
            else:
                # Update cache if valid check hasn't already cached it
                if not self._is_cache_valid(
                    category,
                    self._legacy_cache_timestamps,
                    self._legacy_mtimes,
                    self._stale_legacy,
                    self._get_legacy_metadata_path,
                ):
                    self._legacy_cache[category] = metadata
                    self._legacy_cache_timestamps[category] = time.time()
                    self._legacy_mtimes[category] = metadata_path.stat().st_mtime
                    self._stale_legacy.discard(category)

            return metadata

    def record_legacy_operation(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        operation: OperationType,
        model_name: str,
        success: bool = True,
        backend_type: str = "FileSystemBackend",
    ) -> CategoryMetadata:
        """Record a legacy format operation (observability hook point).

        This is the centralized method for tracking all v1/legacy operations.

        Args:
            category: Model reference category
            operation: Type of operation (create/update/delete)
            model_name: Name of model affected
            success: Whether operation was successful
            backend_type: Type of backend performing operation

        Returns:
            Updated CategoryMetadata

        """
        with self._lock:
            # Load existing metadata or initialize
            metadata_path = self._get_legacy_metadata_path(category)
            metadata = self._read_metadata_file(metadata_path)

            if metadata is None:
                metadata = self.initialize_legacy_metadata(category, backend_type)

            # Update metadata
            current_time = int(time.time())
            metadata.last_updated = current_time
            metadata.last_operation_type = operation
            metadata.last_operation_model = model_name

            if success:
                metadata.last_successful_operation = current_time

                # Increment operation counters
                if operation == OperationType.CREATE:
                    metadata.total_creates += 1
                elif operation == OperationType.UPDATE:
                    metadata.total_updates += 1
                elif operation == OperationType.DELETE:
                    metadata.total_deletes += 1

            # Write to disk
            self._write_metadata_file(metadata_path, metadata)

            # Update cache
            self._legacy_cache[category] = metadata
            self._legacy_cache_timestamps[category] = time.time()
            self._legacy_mtimes[category] = metadata_path.stat().st_mtime
            self._stale_legacy.discard(category)

            logger.debug(
                f"Recorded legacy {operation.value} operation for {category.value}/{model_name} "
                f"(creates={metadata.total_creates}, updates={metadata.total_updates}, "
                f"deletes={metadata.total_deletes})"
            )

            return metadata

    def record_v2_operation(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        operation: OperationType,
        model_name: str,
        success: bool = True,
        backend_type: str = "FileSystemBackend",
    ) -> CategoryMetadata:
        """Record a v2 format operation (observability hook point).

        This is the centralized method for tracking all v2 operations.

        Args:
            category: Model reference category
            operation: Type of operation (create/update/delete)
            model_name: Name of model affected
            success: Whether operation was successful
            backend_type: Type of backend performing operation

        Returns:
            Updated CategoryMetadata

        """
        with self._lock:
            # Load existing metadata or initialize
            metadata_path = self._get_v2_metadata_path(category)
            metadata = self._read_metadata_file(metadata_path)

            if metadata is None:
                metadata = self.initialize_v2_metadata(category, backend_type)

            # Update metadata
            current_time = int(time.time())
            metadata.last_updated = current_time
            metadata.last_operation_type = operation
            metadata.last_operation_model = model_name

            if success:
                metadata.last_successful_operation = current_time

                # Increment operation counters
                if operation == OperationType.CREATE:
                    metadata.total_creates += 1
                elif operation == OperationType.UPDATE:
                    metadata.total_updates += 1
                elif operation == OperationType.DELETE:
                    metadata.total_deletes += 1

            # Write to disk
            self._write_metadata_file(metadata_path, metadata)

            # Update cache
            self._v2_cache[category] = metadata
            self._v2_cache_timestamps[category] = time.time()
            self._v2_mtimes[category] = metadata_path.stat().st_mtime
            self._stale_v2.discard(category)

            logger.debug(
                f"Recorded v2 {operation.value} operation for {category.value}/{model_name} "
                f"(creates={metadata.total_creates}, updates={metadata.total_updates}, "
                f"deletes={metadata.total_deletes})"
            )

            return metadata

    def record_legacy_error(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        error_info: str,
        backend_type: str = "FileSystemBackend",
    ) -> CategoryMetadata:
        """Record a legacy format operation error (observability hook point).

        Args:
            category: Model reference category
            error_info: Error information
            backend_type: Type of backend

        Returns:
            Updated CategoryMetadata

        """
        with self._lock:
            metadata_path = self._get_legacy_metadata_path(category)
            metadata = self._read_metadata_file(metadata_path)

            if metadata is None:
                metadata = self.initialize_legacy_metadata(category, backend_type)

            metadata.error_count += 1
            metadata.last_updated = int(time.time())

            self._write_metadata_file(metadata_path, metadata)

            # Update cache
            self._legacy_cache[category] = metadata
            self._legacy_cache_timestamps[category] = time.time()
            self._legacy_mtimes[category] = metadata_path.stat().st_mtime
            self._stale_legacy.discard(category)

            logger.warning(f"Recorded legacy error for {category.value}: {error_info}")

            return metadata

    def record_v2_error(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        error_info: str,
        backend_type: str = "FileSystemBackend",
    ) -> CategoryMetadata:
        """Record a v2 format operation error (observability hook point).

        Args:
            category: Model reference category
            error_info: Error information
            backend_type: Type of backend

        Returns:
            Updated CategoryMetadata

        """
        with self._lock:
            metadata_path = self._get_v2_metadata_path(category)
            metadata = self._read_metadata_file(metadata_path)

            if metadata is None:
                metadata = self.initialize_v2_metadata(category, backend_type)

            metadata.error_count += 1
            metadata.last_updated = int(time.time())

            self._write_metadata_file(metadata_path, metadata)

            # Update cache
            self._v2_cache[category] = metadata
            self._v2_cache_timestamps[category] = time.time()
            self._v2_mtimes[category] = metadata_path.stat().st_mtime
            self._stale_v2.discard(category)

            logger.warning(f"Recorded v2 error for {category.value}: {error_info}")

            return metadata

    def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Get legacy metadata for a category.

        Uses caching with TTL and mtime tracking.

        Args:
            category: Model reference category

        Returns:
            CategoryMetadata: The legacy metadata

        Raises:
            RuntimeError: If legacy metadata does not exist on disk

        """
        with self._lock:
            # Check cache validity
            if self._is_cache_valid(
                category,
                self._legacy_cache_timestamps,
                self._legacy_mtimes,
                self._stale_legacy,
                self._get_legacy_metadata_path,
            ):
                cached_metadata = self._legacy_cache.get(category)

                if cached_metadata is not None:
                    return cached_metadata

                raise RuntimeError("Inconsistent cache state: timestamp exists but no cached metadata")

            # Load from disk
            metadata_path = self._get_legacy_metadata_path(category)
            metadata = self._read_metadata_file(metadata_path)

            if metadata is None:
                # raise RuntimeError(f"Legacy metadata for category {category.value} does not exist on disk")
                logger.warning(f"Legacy metadata for category {category.value} does not exist on disk")
                metadata = self.initialize_legacy_metadata(category, "FileSystemBackend")
                self._write_metadata_file(metadata_path, metadata)
                logger.debug(f"Initialized legacy metadata for {category.value}")

            # Update cache
            self._legacy_cache[category] = metadata
            self._legacy_cache_timestamps[category] = time.time()
            self._legacy_mtimes[category] = metadata_path.stat().st_mtime
            self._stale_legacy.discard(category)

            return metadata

    def get_v2_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Get v2 metadata for a category.

        Uses caching with TTL and mtime tracking.

        Args:
            category: Model reference category

        Returns:
            CategoryMetadata: The v2 metadata

        Raises:
            RuntimeError: If v2 metadata does not exist on disk

        """
        with self._lock:
            # Check cache validity
            if self._is_cache_valid(
                category,
                self._v2_cache_timestamps,
                self._v2_mtimes,
                self._stale_v2,
                self._get_v2_metadata_path,
            ):
                cached_metadata = self._v2_cache.get(category)

                if cached_metadata is not None:
                    return cached_metadata

                raise RuntimeError("Inconsistent cache state: timestamp exists but no cached metadata")

            # Load from disk
            metadata_path = self._get_v2_metadata_path(category)
            metadata = self._read_metadata_file(metadata_path)

            if metadata is None:
                # raise RuntimeError(f"V2 metadata for category {category.value} does not exist on disk")
                logger.warning(f"V2 metadata for category {category.value} does not exist on disk")
                metadata = self.initialize_v2_metadata(category, "FileSystemBackend")
                self._write_metadata_file(metadata_path, metadata)
                logger.debug(f"Initialized v2 metadata for {category.value}")

            # Update cache
            self._v2_cache[category] = metadata
            self._v2_cache_timestamps[category] = time.time()
            self._v2_mtimes[category] = metadata_path.stat().st_mtime
            self._stale_v2.discard(category)

            return metadata

    def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Get all legacy metadata.

        Returns:
            Dict mapping categories to their legacy metadata

        """
        result = {}
        for category in MODEL_REFERENCE_CATEGORY:
            metadata = self.get_legacy_metadata(category)
            if metadata is not None:
                result[category] = metadata
        return result

    def get_all_v2_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Get all v2 metadata.

        Returns:
            Dict mapping categories to their v2 metadata

        """
        result = {}
        for category in MODEL_REFERENCE_CATEGORY:
            metadata = self.get_v2_metadata(category)
            if metadata is not None:
                result[category] = metadata
        return result

    def mark_legacy_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Mark legacy metadata cache as stale.

        Args:
            category: Category to mark stale

        """
        with self._lock:
            self._stale_legacy.add(category)

    def mark_v2_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Mark v2 metadata cache as stale.

        Args:
            category: Category to mark stale

        """
        with self._lock:
            self._stale_v2.add(category)

_base_path instance-attribute

_base_path = base_path

_lock instance-attribute

_lock = RLock()

_model_metadata_manager instance-attribute

_model_metadata_manager = (
    model_metadata_manager or ModelMetadataManager()
)

_legacy_cache instance-attribute

_legacy_cache: dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
] = {}

_v2_cache instance-attribute

_v2_cache: dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
] = {}

_legacy_cache_timestamps instance-attribute

_legacy_cache_timestamps: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_v2_cache_timestamps instance-attribute

_v2_cache_timestamps: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_legacy_mtimes instance-attribute

_legacy_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}

_v2_mtimes instance-attribute

_v2_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}

_stale_legacy instance-attribute

_stale_legacy: set[MODEL_REFERENCE_CATEGORY] = set()

_stale_v2 instance-attribute

_stale_v2: set[MODEL_REFERENCE_CATEGORY] = set()

_cache_ttl_seconds instance-attribute

_cache_ttl_seconds: int = 60

model_metadata property

model_metadata: ModelMetadataManager

Access point for per-model metadata operations.

Returns:

__init__

__init__(
    base_path: Path,
    model_metadata_manager: ModelMetadataManager
    | None = None,
) -> None

Initialize metadata manager.

Parameters:

  • base_path (Path) –

    Base path for horde model reference data.

  • model_metadata_manager (ModelMetadataManager | None, default: None ) –

    Optional custom ModelMetadataManager for dependency injection. If None, a default instance will be created.

Source code in src/horde_model_reference/model_reference_metadata.py
def __init__(
    self,
    base_path: Path,
    model_metadata_manager: ModelMetadataManager | None = None,
) -> None:
    """Initialize metadata manager.

    Args:
        base_path: Base path for horde model reference data.
        model_metadata_manager: Optional custom ModelMetadataManager for dependency injection.
            If None, a default instance will be created.

    """
    self._base_path = base_path
    self._lock = RLock()

    # Per-model metadata manager (supports dependency injection)
    self._model_metadata_manager = model_metadata_manager or ModelMetadataManager()

    # Separate caching for legacy and v2 metadata
    self._legacy_cache: dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata] = {}
    self._v2_cache: dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata] = {}

    # Cache timestamps for TTL
    self._legacy_cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float] = {}
    self._v2_cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float] = {}

    # File modification times for cache invalidation
    self._legacy_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}
    self._v2_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}

    # Staleness tracking
    self._stale_legacy: set[MODEL_REFERENCE_CATEGORY] = set()
    self._stale_v2: set[MODEL_REFERENCE_CATEGORY] = set()

    # Cache TTL (default 60 seconds)
    self._cache_ttl_seconds: int = 60

    # Ensure meta directories exist
    self._ensure_meta_directories()

_ensure_meta_directories

_ensure_meta_directories() -> None

Ensure meta/legacy and meta/v2 directories exist.

Source code in src/horde_model_reference/model_reference_metadata.py
def _ensure_meta_directories(self) -> None:
    """Ensure meta/legacy and meta/v2 directories exist."""
    legacy_meta_path = self._base_path / "meta" / "legacy"
    v2_meta_path = self._base_path / "meta" / "v2"

    legacy_meta_path.mkdir(parents=True, exist_ok=True)
    v2_meta_path.mkdir(parents=True, exist_ok=True)

_get_legacy_metadata_path

_get_legacy_metadata_path(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path

Get file path for legacy metadata.

Parameters:

Returns:

  • Path

    Path to legacy metadata JSON file

Source code in src/horde_model_reference/model_reference_metadata.py
def _get_legacy_metadata_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path:
    """Get file path for legacy metadata.

    Args:
        category: Model reference category

    Returns:
        Path to legacy metadata JSON file

    """
    return self._base_path / "meta" / "legacy" / f"{category.value}_metadata.json"

_get_v2_metadata_path

_get_v2_metadata_path(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path

Get file path for v2 metadata.

Parameters:

Returns:

  • Path

    Path to v2 metadata JSON file

Source code in src/horde_model_reference/model_reference_metadata.py
def _get_v2_metadata_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path:
    """Get file path for v2 metadata.

    Args:
        category: Model reference category

    Returns:
        Path to v2 metadata JSON file

    """
    return self._base_path / "meta" / "v2" / f"{category.value}_metadata.json"

_read_metadata_file

_read_metadata_file(
    file_path: Path,
) -> CategoryMetadata | None

Read metadata from file.

Parameters:

  • file_path (Path) –

    Path to metadata file

Returns:

  • CategoryMetadata | None

    CategoryMetadata if file exists and is valid, None otherwise

Source code in src/horde_model_reference/model_reference_metadata.py
def _read_metadata_file(self, file_path: Path) -> CategoryMetadata | None:
    """Read metadata from file.

    Args:
        file_path: Path to metadata file

    Returns:
        CategoryMetadata if file exists and is valid, None otherwise

    """
    if not file_path.exists():
        return None

    try:
        with open(file_path, encoding="utf-8") as f:
            data = json.load(f)
        return CategoryMetadata(**data)
    except Exception as e:
        logger.error(f"Failed to read metadata from {file_path}: {e}")
        return None

_write_metadata_file

_write_metadata_file(
    file_path: Path, metadata: CategoryMetadata
) -> None

Write metadata to file atomically.

Uses temp file + rename pattern for atomic writes with backup/rollback on error.

Parameters:

Source code in src/horde_model_reference/model_reference_metadata.py
def _write_metadata_file(self, file_path: Path, metadata: CategoryMetadata) -> None:
    """Write metadata to file atomically.

    Uses temp file + rename pattern for atomic writes with backup/rollback on error.

    Args:
        file_path: Path to metadata file
        metadata: Metadata to write

    """
    temp_path = file_path.with_suffix(f".tmp.{time.time()}")
    backup_path = file_path.with_suffix(".bak")

    try:
        # Write to temp file
        with open(temp_path, "w", encoding="utf-8") as f:
            json.dump(metadata.model_dump(mode="json"), f, indent=2)
            os.fsync(f.fileno())

        # Atomic rename with backup
        if file_path.exists():
            if backup_path.exists():
                backup_path.unlink()
            file_path.replace(backup_path)

        temp_path.replace(file_path)

        # Clean up backup
        if backup_path.exists():
            backup_path.unlink()

    except Exception as e:
        logger.error(f"Failed to write metadata to {file_path}: {e}")
        # Rollback if needed
        if backup_path.exists() and not file_path.exists():
            backup_path.replace(file_path)
        raise
    finally:
        # Clean up temp file
        if temp_path.exists():
            temp_path.unlink()

_is_cache_valid

_is_cache_valid(
    category: MODEL_REFERENCE_CATEGORY,
    cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float],
    mtimes: dict[MODEL_REFERENCE_CATEGORY, float],
    stale_set: set[MODEL_REFERENCE_CATEGORY],
    metadata_path_fn: Callable[
        [MODEL_REFERENCE_CATEGORY], Path
    ],
) -> bool

Check if cached metadata is valid.

Parameters:

Returns:

  • bool

    True if cache is valid, False otherwise

Source code in src/horde_model_reference/model_reference_metadata.py
def _is_cache_valid(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float],
    mtimes: dict[MODEL_REFERENCE_CATEGORY, float],
    stale_set: set[MODEL_REFERENCE_CATEGORY],
    metadata_path_fn: Callable[[MODEL_REFERENCE_CATEGORY], Path],
) -> bool:
    """Check if cached metadata is valid.

    Args:
        category: Category to check
        cache_timestamps: Dict of cache timestamps
        mtimes: Dict of file modification times
        stale_set: Set of stale categories
        metadata_path_fn: Function to get metadata file path

    Returns:
        True if cache is valid, False otherwise

    """
    # Check explicit staleness
    if category in stale_set:
        return False

    # Check if cached
    if category not in cache_timestamps:
        return False

    # Check TTL
    cache_age = time.time() - cache_timestamps[category]
    if cache_age > self._cache_ttl_seconds:
        return False

    # Check file modification time
    metadata_path = metadata_path_fn(category)
    if metadata_path.exists():
        current_mtime = metadata_path.stat().st_mtime
        if category not in mtimes or mtimes[category] != current_mtime:
            return False

    return True

initialize_legacy_metadata

initialize_legacy_metadata(
    category: MODEL_REFERENCE_CATEGORY, backend_type: str
) -> CategoryMetadata

Initialize new legacy metadata for a category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model reference category

  • backend_type (str) –

    Type of backend (FileSystemBackend/RedisBackend)

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def initialize_legacy_metadata(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    backend_type: str,
) -> CategoryMetadata:
    """Initialize new legacy metadata for a category.

    Args:
        category: Model reference category
        backend_type: Type of backend (FileSystemBackend/RedisBackend)

    Returns:
        Newly created CategoryMetadata

    """
    current_time = int(time.time())
    return CategoryMetadata(
        category=category,
        last_updated=current_time,
        initialization_time=current_time,
        last_successful_operation=current_time,
        backend_type=backend_type,
    )

initialize_v2_metadata

initialize_v2_metadata(
    category: MODEL_REFERENCE_CATEGORY, backend_type: str
) -> CategoryMetadata

Initialize new v2 metadata for a category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model reference category

  • backend_type (str) –

    Type of backend (FileSystemBackend/RedisBackend)

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def initialize_v2_metadata(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    backend_type: str,
) -> CategoryMetadata:
    """Initialize new v2 metadata for a category.

    Args:
        category: Model reference category
        backend_type: Type of backend (FileSystemBackend/RedisBackend)

    Returns:
        Newly created CategoryMetadata

    """
    current_time = int(time.time())
    return CategoryMetadata(
        category=category,
        last_updated=current_time,
        initialization_time=current_time,
        last_successful_operation=current_time,
        backend_type=backend_type,
    )

get_or_initialize_v2_metadata

get_or_initialize_v2_metadata(
    category: MODEL_REFERENCE_CATEGORY,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata

Get v2 metadata for a category, initializing if it doesn't exist.

This method is safe to call during initialization/seeding when metadata may not exist yet. It will create and persist new metadata if the file doesn't exist.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model reference category

  • backend_type (str, default: 'FileSystemBackend' ) –

    Type of backend (FileSystemBackend/RedisBackend)

Returns:

  • CategoryMetadata ( CategoryMetadata ) –

    The existing or newly created v2 metadata

Source code in src/horde_model_reference/model_reference_metadata.py
def get_or_initialize_v2_metadata(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata:
    """Get v2 metadata for a category, initializing if it doesn't exist.

    This method is safe to call during initialization/seeding when metadata may not exist yet.
    It will create and persist new metadata if the file doesn't exist.

    Args:
        category: Model reference category
        backend_type: Type of backend (FileSystemBackend/RedisBackend)

    Returns:
        CategoryMetadata: The existing or newly created v2 metadata

    """
    with self._lock:
        metadata_path = self._get_v2_metadata_path(category)
        metadata = self._read_metadata_file(metadata_path)

        if metadata is None:
            # Initialize new metadata
            metadata = self.initialize_v2_metadata(category, backend_type)
            self._write_metadata_file(metadata_path, metadata)
            logger.debug(f"Initialized v2 metadata for {category.value}")

            # Update cache
            self._v2_cache[category] = metadata
            self._v2_cache_timestamps[category] = time.time()
            self._v2_mtimes[category] = metadata_path.stat().st_mtime
            self._stale_v2.discard(category)
        else:
            # Update cache if valid check hasn't already cached it
            if not self._is_cache_valid(
                category,
                self._v2_cache_timestamps,
                self._v2_mtimes,
                self._stale_v2,
                self._get_v2_metadata_path,
            ):
                self._v2_cache[category] = metadata
                self._v2_cache_timestamps[category] = time.time()
                self._v2_mtimes[category] = metadata_path.stat().st_mtime
                self._stale_v2.discard(category)

        return metadata

get_or_initialize_legacy_metadata

get_or_initialize_legacy_metadata(
    category: MODEL_REFERENCE_CATEGORY,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata

Get legacy metadata for a category, initializing if it doesn't exist.

This method is safe to call during initialization/seeding when metadata may not exist yet. It will create and persist new metadata if the file doesn't exist.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model reference category

  • backend_type (str, default: 'FileSystemBackend' ) –

    Type of backend (FileSystemBackend/RedisBackend)

Returns:

  • CategoryMetadata ( CategoryMetadata ) –

    The existing or newly created legacy metadata

Source code in src/horde_model_reference/model_reference_metadata.py
def get_or_initialize_legacy_metadata(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata:
    """Get legacy metadata for a category, initializing if it doesn't exist.

    This method is safe to call during initialization/seeding when metadata may not exist yet.
    It will create and persist new metadata if the file doesn't exist.

    Args:
        category: Model reference category
        backend_type: Type of backend (FileSystemBackend/RedisBackend)

    Returns:
        CategoryMetadata: The existing or newly created legacy metadata

    """
    with self._lock:
        metadata_path = self._get_legacy_metadata_path(category)
        metadata = self._read_metadata_file(metadata_path)

        if metadata is None:
            # Initialize new metadata
            metadata = self.initialize_legacy_metadata(category, backend_type)
            self._write_metadata_file(metadata_path, metadata)
            logger.debug(f"Initialized legacy metadata for {category.value}")

            # Update cache
            self._legacy_cache[category] = metadata
            self._legacy_cache_timestamps[category] = time.time()
            self._legacy_mtimes[category] = metadata_path.stat().st_mtime
            self._stale_legacy.discard(category)
        else:
            # Update cache if valid check hasn't already cached it
            if not self._is_cache_valid(
                category,
                self._legacy_cache_timestamps,
                self._legacy_mtimes,
                self._stale_legacy,
                self._get_legacy_metadata_path,
            ):
                self._legacy_cache[category] = metadata
                self._legacy_cache_timestamps[category] = time.time()
                self._legacy_mtimes[category] = metadata_path.stat().st_mtime
                self._stale_legacy.discard(category)

        return metadata

record_legacy_operation

record_legacy_operation(
    category: MODEL_REFERENCE_CATEGORY,
    operation: OperationType,
    model_name: str,
    success: bool = True,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata

Record a legacy format operation (observability hook point).

This is the centralized method for tracking all v1/legacy operations.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model reference category

  • operation (OperationType) –

    Type of operation (create/update/delete)

  • model_name (str) –

    Name of model affected

  • success (bool, default: True ) –

    Whether operation was successful

  • backend_type (str, default: 'FileSystemBackend' ) –

    Type of backend performing operation

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def record_legacy_operation(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    operation: OperationType,
    model_name: str,
    success: bool = True,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata:
    """Record a legacy format operation (observability hook point).

    This is the centralized method for tracking all v1/legacy operations.

    Args:
        category: Model reference category
        operation: Type of operation (create/update/delete)
        model_name: Name of model affected
        success: Whether operation was successful
        backend_type: Type of backend performing operation

    Returns:
        Updated CategoryMetadata

    """
    with self._lock:
        # Load existing metadata or initialize
        metadata_path = self._get_legacy_metadata_path(category)
        metadata = self._read_metadata_file(metadata_path)

        if metadata is None:
            metadata = self.initialize_legacy_metadata(category, backend_type)

        # Update metadata
        current_time = int(time.time())
        metadata.last_updated = current_time
        metadata.last_operation_type = operation
        metadata.last_operation_model = model_name

        if success:
            metadata.last_successful_operation = current_time

            # Increment operation counters
            if operation == OperationType.CREATE:
                metadata.total_creates += 1
            elif operation == OperationType.UPDATE:
                metadata.total_updates += 1
            elif operation == OperationType.DELETE:
                metadata.total_deletes += 1

        # Write to disk
        self._write_metadata_file(metadata_path, metadata)

        # Update cache
        self._legacy_cache[category] = metadata
        self._legacy_cache_timestamps[category] = time.time()
        self._legacy_mtimes[category] = metadata_path.stat().st_mtime
        self._stale_legacy.discard(category)

        logger.debug(
            f"Recorded legacy {operation.value} operation for {category.value}/{model_name} "
            f"(creates={metadata.total_creates}, updates={metadata.total_updates}, "
            f"deletes={metadata.total_deletes})"
        )

        return metadata

record_v2_operation

record_v2_operation(
    category: MODEL_REFERENCE_CATEGORY,
    operation: OperationType,
    model_name: str,
    success: bool = True,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata

Record a v2 format operation (observability hook point).

This is the centralized method for tracking all v2 operations.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model reference category

  • operation (OperationType) –

    Type of operation (create/update/delete)

  • model_name (str) –

    Name of model affected

  • success (bool, default: True ) –

    Whether operation was successful

  • backend_type (str, default: 'FileSystemBackend' ) –

    Type of backend performing operation

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def record_v2_operation(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    operation: OperationType,
    model_name: str,
    success: bool = True,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata:
    """Record a v2 format operation (observability hook point).

    This is the centralized method for tracking all v2 operations.

    Args:
        category: Model reference category
        operation: Type of operation (create/update/delete)
        model_name: Name of model affected
        success: Whether operation was successful
        backend_type: Type of backend performing operation

    Returns:
        Updated CategoryMetadata

    """
    with self._lock:
        # Load existing metadata or initialize
        metadata_path = self._get_v2_metadata_path(category)
        metadata = self._read_metadata_file(metadata_path)

        if metadata is None:
            metadata = self.initialize_v2_metadata(category, backend_type)

        # Update metadata
        current_time = int(time.time())
        metadata.last_updated = current_time
        metadata.last_operation_type = operation
        metadata.last_operation_model = model_name

        if success:
            metadata.last_successful_operation = current_time

            # Increment operation counters
            if operation == OperationType.CREATE:
                metadata.total_creates += 1
            elif operation == OperationType.UPDATE:
                metadata.total_updates += 1
            elif operation == OperationType.DELETE:
                metadata.total_deletes += 1

        # Write to disk
        self._write_metadata_file(metadata_path, metadata)

        # Update cache
        self._v2_cache[category] = metadata
        self._v2_cache_timestamps[category] = time.time()
        self._v2_mtimes[category] = metadata_path.stat().st_mtime
        self._stale_v2.discard(category)

        logger.debug(
            f"Recorded v2 {operation.value} operation for {category.value}/{model_name} "
            f"(creates={metadata.total_creates}, updates={metadata.total_updates}, "
            f"deletes={metadata.total_deletes})"
        )

        return metadata

record_legacy_error

record_legacy_error(
    category: MODEL_REFERENCE_CATEGORY,
    error_info: str,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata

Record a legacy format operation error (observability hook point).

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model reference category

  • error_info (str) –

    Error information

  • backend_type (str, default: 'FileSystemBackend' ) –

    Type of backend

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def record_legacy_error(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    error_info: str,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata:
    """Record a legacy format operation error (observability hook point).

    Args:
        category: Model reference category
        error_info: Error information
        backend_type: Type of backend

    Returns:
        Updated CategoryMetadata

    """
    with self._lock:
        metadata_path = self._get_legacy_metadata_path(category)
        metadata = self._read_metadata_file(metadata_path)

        if metadata is None:
            metadata = self.initialize_legacy_metadata(category, backend_type)

        metadata.error_count += 1
        metadata.last_updated = int(time.time())

        self._write_metadata_file(metadata_path, metadata)

        # Update cache
        self._legacy_cache[category] = metadata
        self._legacy_cache_timestamps[category] = time.time()
        self._legacy_mtimes[category] = metadata_path.stat().st_mtime
        self._stale_legacy.discard(category)

        logger.warning(f"Recorded legacy error for {category.value}: {error_info}")

        return metadata

record_v2_error

record_v2_error(
    category: MODEL_REFERENCE_CATEGORY,
    error_info: str,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata

Record a v2 format operation error (observability hook point).

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model reference category

  • error_info (str) –

    Error information

  • backend_type (str, default: 'FileSystemBackend' ) –

    Type of backend

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def record_v2_error(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    error_info: str,
    backend_type: str = "FileSystemBackend",
) -> CategoryMetadata:
    """Record a v2 format operation error (observability hook point).

    Args:
        category: Model reference category
        error_info: Error information
        backend_type: Type of backend

    Returns:
        Updated CategoryMetadata

    """
    with self._lock:
        metadata_path = self._get_v2_metadata_path(category)
        metadata = self._read_metadata_file(metadata_path)

        if metadata is None:
            metadata = self.initialize_v2_metadata(category, backend_type)

        metadata.error_count += 1
        metadata.last_updated = int(time.time())

        self._write_metadata_file(metadata_path, metadata)

        # Update cache
        self._v2_cache[category] = metadata
        self._v2_cache_timestamps[category] = time.time()
        self._v2_mtimes[category] = metadata_path.stat().st_mtime
        self._stale_v2.discard(category)

        logger.warning(f"Recorded v2 error for {category.value}: {error_info}")

        return metadata

get_legacy_metadata

get_legacy_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get legacy metadata for a category.

Uses caching with TTL and mtime tracking.

Parameters:

Returns:

Raises:

  • RuntimeError

    If legacy metadata does not exist on disk

Source code in src/horde_model_reference/model_reference_metadata.py
def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get legacy metadata for a category.

    Uses caching with TTL and mtime tracking.

    Args:
        category: Model reference category

    Returns:
        CategoryMetadata: The legacy metadata

    Raises:
        RuntimeError: If legacy metadata does not exist on disk

    """
    with self._lock:
        # Check cache validity
        if self._is_cache_valid(
            category,
            self._legacy_cache_timestamps,
            self._legacy_mtimes,
            self._stale_legacy,
            self._get_legacy_metadata_path,
        ):
            cached_metadata = self._legacy_cache.get(category)

            if cached_metadata is not None:
                return cached_metadata

            raise RuntimeError("Inconsistent cache state: timestamp exists but no cached metadata")

        # Load from disk
        metadata_path = self._get_legacy_metadata_path(category)
        metadata = self._read_metadata_file(metadata_path)

        if metadata is None:
            # raise RuntimeError(f"Legacy metadata for category {category.value} does not exist on disk")
            logger.warning(f"Legacy metadata for category {category.value} does not exist on disk")
            metadata = self.initialize_legacy_metadata(category, "FileSystemBackend")
            self._write_metadata_file(metadata_path, metadata)
            logger.debug(f"Initialized legacy metadata for {category.value}")

        # Update cache
        self._legacy_cache[category] = metadata
        self._legacy_cache_timestamps[category] = time.time()
        self._legacy_mtimes[category] = metadata_path.stat().st_mtime
        self._stale_legacy.discard(category)

        return metadata

get_v2_metadata

get_v2_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get v2 metadata for a category.

Uses caching with TTL and mtime tracking.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/model_reference_metadata.py
def get_v2_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get v2 metadata for a category.

    Uses caching with TTL and mtime tracking.

    Args:
        category: Model reference category

    Returns:
        CategoryMetadata: The v2 metadata

    Raises:
        RuntimeError: If v2 metadata does not exist on disk

    """
    with self._lock:
        # Check cache validity
        if self._is_cache_valid(
            category,
            self._v2_cache_timestamps,
            self._v2_mtimes,
            self._stale_v2,
            self._get_v2_metadata_path,
        ):
            cached_metadata = self._v2_cache.get(category)

            if cached_metadata is not None:
                return cached_metadata

            raise RuntimeError("Inconsistent cache state: timestamp exists but no cached metadata")

        # Load from disk
        metadata_path = self._get_v2_metadata_path(category)
        metadata = self._read_metadata_file(metadata_path)

        if metadata is None:
            # raise RuntimeError(f"V2 metadata for category {category.value} does not exist on disk")
            logger.warning(f"V2 metadata for category {category.value} does not exist on disk")
            metadata = self.initialize_v2_metadata(category, "FileSystemBackend")
            self._write_metadata_file(metadata_path, metadata)
            logger.debug(f"Initialized v2 metadata for {category.value}")

        # Update cache
        self._v2_cache[category] = metadata
        self._v2_cache_timestamps[category] = time.time()
        self._v2_mtimes[category] = metadata_path.stat().st_mtime
        self._stale_v2.discard(category)

        return metadata

get_all_legacy_metadata

get_all_legacy_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get all legacy metadata.

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get all legacy metadata.

    Returns:
        Dict mapping categories to their legacy metadata

    """
    result = {}
    for category in MODEL_REFERENCE_CATEGORY:
        metadata = self.get_legacy_metadata(category)
        if metadata is not None:
            result[category] = metadata
    return result

get_all_v2_metadata

get_all_v2_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get all v2 metadata.

Returns:

Source code in src/horde_model_reference/model_reference_metadata.py
def get_all_v2_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get all v2 metadata.

    Returns:
        Dict mapping categories to their v2 metadata

    """
    result = {}
    for category in MODEL_REFERENCE_CATEGORY:
        metadata = self.get_v2_metadata(category)
        if metadata is not None:
            result[category] = metadata
    return result

mark_legacy_stale

mark_legacy_stale(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Mark legacy metadata cache as stale.

Parameters:

Source code in src/horde_model_reference/model_reference_metadata.py
def mark_legacy_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Mark legacy metadata cache as stale.

    Args:
        category: Category to mark stale

    """
    with self._lock:
        self._stale_legacy.add(category)

mark_v2_stale

mark_v2_stale(category: MODEL_REFERENCE_CATEGORY) -> None

Mark v2 metadata cache as stale.

Parameters:

Source code in src/horde_model_reference/model_reference_metadata.py
def mark_v2_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Mark v2 metadata cache as stale.

    Args:
        category: Category to mark stale

    """
    with self._lock:
        self._stale_v2.add(category)