Skip to content

base

Abstract base class for model reference backend providers.

ModelReferenceBackend

Bases: ABC

Abstract interface for model reference data providers.

This interface defines the contract that all backend implementations must fulfill. Backends are responsible for fetching raw model reference data from their source (GitHub, database, API, etc.) and providing it in a standardized dictionary format.

The ModelReferenceManager uses backends as pluggable data sources and handles caching, TTL management, and conversion to pydantic models.

Source code in src/horde_model_reference/backends/base.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
class ModelReferenceBackend(ABC):
    """Abstract interface for model reference data providers.

    This interface defines the contract that all backend implementations must fulfill.
    Backends are responsible for fetching raw model reference data from their source
    (GitHub, database, API, etc.) and providing it in a standardized dictionary format.

    The ModelReferenceManager uses backends as pluggable data sources and handles
    caching, TTL management, and conversion to pydantic models.
    """

    _replicate_mode = ReplicateMode.REPLICA
    _invalidation_callbacks: list[Callable[[MODEL_REFERENCE_CATEGORY], None]]

    def __init__(
        self,
        mode: ReplicateMode = ReplicateMode.REPLICA,
    ) -> None:
        """Initialize the backend."""
        super().__init__()

        self._replicate_mode = mode
        self._invalidation_callbacks = []

    @property
    def replicate_mode(self) -> ReplicateMode:
        """Get the replicate mode of this backend."""
        return self._replicate_mode

    @abstractmethod
    def fetch_category(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Fetch model reference data for a specific category.

        Args:
            category: The category to fetch.
            force_refresh: If True, bypass any backend-level caching
                and fetch fresh data. Defaults to False.

        Returns:
            dict[str, Any] | None: The model reference data as a dictionary mapping
                model names to their attributes, or None if the category cannot be fetched.

        Implementation Requirements:
            - Return data as a dictionary: `{model_name: {attribute: value, ...}, ...}`
            - Return `None` if category cannot be fetched
            - Honor `force_refresh` to bypass internal caches
            - Handle errors gracefully (log and return `None`)

        Example Implementation:
            ```python
            def fetch_category(self, category, *, force_refresh=False):
                def fetch():
                    # Your fetch logic here
                    response = httpx.get(f"{self.base_url}/{category}")
                    return response.json() if response.status_code == 200 else None

                return self._fetch_with_cache(category, fetch, force_refresh=force_refresh)
            ```

        See Also:
            - [fetch_all_categories()][(c).fetch_all_categories]: Batch fetching of all categories
            - [fetch_category_async()][(c).fetch_category_async]: Async variant
            - [ReplicaBackendBase._fetch_with_cache()]
              [^^^.replica_backend_base.ReplicaBackendBase._fetch_with_cache]:
              Helper for cache management

        """

    @abstractmethod
    def fetch_all_categories(
        self,
        *,
        force_refresh: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Fetch model reference data for all categories.

        Args:
            force_refresh: If True, bypass any backend-level caching
                and fetch fresh data. Defaults to False.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]: A mapping of categories
                to their model reference data. Categories that cannot be fetched have None values.

        Implementation Requirements:
            - Return a dictionary mapping each category to its data
            - Use `None` values for categories that cannot be fetched
            - Typically implemented as a loop over [fetch_category()][(c).fetch_category]

        Example Implementation:
            ```python
            def fetch_all_categories(self, *, force_refresh=False):
                return {
                    category: self.fetch_category(category, force_refresh=force_refresh)
                    for category in MODEL_REFERENCE_CATEGORY
                }
            ```

        See Also:
            - [fetch_category()][(c).fetch_category]: Single category fetching
            - [fetch_all_categories_async()][(c).fetch_all_categories_async]: Async variant

        """

    @abstractmethod
    async def fetch_category_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        httpx_client: httpx.AsyncClient | None = None,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Asynchronously fetch model reference data for a specific category.

        Args:
            category: The category to fetch.
            httpx_client: An optional httpx async client for connection pooling.
            force_refresh: If True, bypass any backend-level caching. Defaults to False.

        Returns:
            dict[str, Any] | None: The model reference data, or None if fetch failed.

        Implementation Requirements:
            - Use async I/O where possible (network requests, file operations with aiofiles)
            - Accept optional `httpx_client` for connection pooling
            - Create temporary client if not provided
            - Same return format as synchronous version
            - Share cache with synchronous methods when using
              [ReplicaBackendBase][^^^.replica_backend_base.ReplicaBackendBase]

        Example Implementation:
            ```python
            async def fetch_category_async(self, category, *, httpx_client=None, force_refresh=False):
                close_client = httpx_client is None
                if httpx_client is None:
                    httpx_client = httpx.AsyncClient()

                try:
                    response = await httpx_client.get(f"{self.base_url}/{category}")
                    return response.json() if response.status_code == 200 else None
                finally:
                    if close_client:
                        await httpx_client.aclose()
            ```

        See Also:
            - [fetch_category()][(c).fetch_category]: Synchronous variant
            - [fetch_all_categories_async()][(c).fetch_all_categories_async]: Async batch fetching

        """

    @abstractmethod
    async def fetch_all_categories_async(
        self,
        *,
        httpx_client: httpx.AsyncClient | None = None,
        force_refresh: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Asynchronously fetch model reference data for all categories.

        Args:
            httpx_client: An optional httpx async client for connection pooling.
            force_refresh: If True, bypass any backend-level caching. Defaults to False.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]: A mapping of categories to their data.

        Implementation Requirements:
            - Use `asyncio.gather()` for concurrent fetching when possible
            - Share `httpx_client` across fetches for connection pooling
            - Same return format as synchronous version

        Example Implementation:
            ```python
            async def fetch_all_categories_async(self, *, httpx_client=None, force_refresh=False):
                close_client = httpx_client is None
                if httpx_client is None:
                    httpx_client = httpx.AsyncClient()

                try:
                    tasks = [
                        self.fetch_category_async(cat, httpx_client=httpx_client, force_refresh=force_refresh)
                        for cat in MODEL_REFERENCE_CATEGORY
                    ]
                    results = await asyncio.gather(*tasks)
                    return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))
                finally:
                    if close_client:
                        await httpx_client.aclose()
            ```

        See Also:
            - [fetch_all_categories()][(c).fetch_all_categories]: Synchronous variant
            - [fetch_category_async()][(c).fetch_category_async]: Async single category fetch

        """

    @abstractmethod
    def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        """Check if existing cached data for a category needs to be refreshed.

        This method indicates whether cached data has become stale and should be
        re-fetched from the source. It does NOT indicate whether an initial fetch
        is needed for data that has never been loaded.

        Semantic distinction:
        - Returns False when no cached data exists (no initial fetch needed via this method)
        - Returns True when cached data exists but has become stale

        Backends can implement staleness detection based on file modification times,
        database timestamps, ETags, TTL expiration, explicit invalidation, etc.

        Args:
            category: The category to check.

        Returns:
            bool: True if cached data needs refresh due to staleness. False if no
                  cached data exists or if cached data is still fresh.

        Implementation Requirements:
            - Return `False` if no data has been cached yet (initial fetch is different concern)
            - Return `True` if cached data exists but is stale
            - Staleness can be based on: TTL expiration, file mtime changes, ETags, explicit invalidation

        Note:
            [ReplicaBackendBase][^^^.replica_backend_base.ReplicaBackendBase]
            provides a concrete implementation that checks explicit staleness marking,
            TTL expiration, file mtime changes, and custom validation hooks.

        See Also:
            - [mark_stale()][(c).mark_stale]: Explicitly mark a category as stale
            - [ReplicaBackendBase.should_fetch_data()][^^^.replica_backend_base.ReplicaBackendBase.should_fetch_data]:
              Combined initial fetch + refresh check

        """

    def register_invalidation_callback(
        self,
        callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
    ) -> None:
        """Register a callback to be called when a category is invalidated.

        This allows external components (like ModelReferenceManager) to be notified
        when cached data becomes stale and needs to be refreshed.

        Args:
            callback: Function to call with the invalidated category.

        """
        self._invalidation_callbacks.append(callback)
        logger.debug(f"Registered invalidation callback: {getattr(callback, '__name__', repr(callback))}")

    def _notify_invalidation(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Notify all registered callbacks that a category has been invalidated.

        Args:
            category: The category that was invalidated.

        """
        for callback in self._invalidation_callbacks:
            try:
                callback(category)
            except Exception as e:
                cb_name = getattr(callback, "__name__", repr(callback))
                logger.error(f"Invalidation callback {cb_name} failed for {category}: {e}")

    @abstractmethod
    def _mark_stale_impl(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Backend-specific implementation of marking a category as stale.

        Subclasses must implement this to handle their specific staleness tracking.

        Args:
            category: The category to mark as stale.

        Implementation Requirements:
            - Update backend-specific staleness tracking (e.g., add to `_stale_categories` set)
            - Called by public [mark_stale()][(c).mark_stale] method before notifying callbacks
            - Implementations should override this method, not `mark_stale()`

        Note:
            The public `mark_stale()` method calls this implementation and then automatically
            notifies all registered invalidation callbacks.

        """

    def mark_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Mark a category's data as stale, requiring refresh on next access.

        This method calls the backend-specific implementation and then notifies
        all registered callbacks.

        Args:
            category: The category to mark as stale.

        Implementation Note:
            The base class provides this public implementation. Subclasses should override
            [_mark_stale_impl()][(c)._mark_stale_impl]
            instead of this method.

        See Also:
            - [_mark_stale_impl()][(c)._mark_stale_impl]: Backend-specific staleness tracking
            - [register_invalidation_callback()][(c).register_invalidation_callback]:
              Register callbacks for invalidation events

        """
        self._mark_stale_impl(category)
        self._notify_invalidation(category)

    @abstractmethod
    def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
        """Get the file path for a category's data, if applicable.

        Some backends (like file-based ones) have a physical file path associated
        with each category. Others (like database backends) may return None.

        Args:
            category: The category to get the path for.

        Returns:
            Path | None: The file path, or None if not applicable for this backend.

        Implementation Requirements:
            - Return `Path` object for file-based backends
            - Return `None` for backends without file storage (HTTP-only, database, etc.)

        Example Implementations:
            ```python
            # File-based backend
            def get_category_file_path(self, category):
                return self.base_path / f"{category.value}.json"

            # HTTP-only backend
            def get_category_file_path(self, category):
                return None
            ```

        See Also:
            - [get_all_category_file_paths()][(c).get_all_category_file_paths]: Get all file paths

        """

    @abstractmethod
    def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
        """Get file paths for all categories, if applicable.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, Path | None]: Mapping of categories to their file paths.
                Returns None values for categories without file paths.

        Implementation Requirements:
            - Return dictionary with all categories
            - Use `None` values for categories without file paths
            - Typically implemented by iterating over categories and calling `get_category_file_path()`

        Example Implementation:
            ```python
            def get_all_category_file_paths(self):
                return {
                    category: self.get_category_file_path(category)
                    for category in MODEL_REFERENCE_CATEGORY
                }
            ```

        See Also:
            - [get_category_file_path()][(c).get_category_file_path]: Get single category file path

        """

    @abstractmethod
    def get_legacy_json(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        redownload: bool = False,
    ) -> dict[str, Any] | None:
        """Get raw legacy JSON for a specific category without pydantic validation.

        This method returns cached legacy format JSON data, downloading if needed.
        The cache is populated during initialization, downloads, and on-demand loads.

        Args:
            category: Category to retrieve.
            redownload: If True, redownload before returning and refresh cache.

        Returns:
            dict[str, Any] | None: The raw legacy JSON dict, or None if not found.

        Implementation Requirements:
            - Return legacy format data as dictionary
            - Support caching with optional redownload
            - Return `None` if not available
            - The `redownload` parameter is analogous to `force_refresh` in fetch methods

        See Also:
            - [get_legacy_json_string()][(c).get_legacy_json_string]: Get as string instead of dict

        """

    @abstractmethod
    def get_legacy_json_string(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        redownload: bool = False,
    ) -> str | None:
        """Get raw legacy JSON string for a specific category without pydantic validation.

        This method returns cached legacy format JSON data as a string, downloading if needed.
        The cache is populated during initialization, downloads, and on-demand loads.

        Args:
            category: Category to retrieve.
            redownload: If True, redownload before returning and refresh cache.

        Returns:
            str | None: The raw legacy JSON string, or None if not found.

        Implementation Requirements:
            - Return legacy format data as JSON string
            - Same caching semantics as [get_legacy_json()][(c).get_legacy_json]
            - Return `None` if not available

        See Also:
            - [get_legacy_json()][(c).get_legacy_json]: Get as dict instead of string

        """

    def support_any_writes(self) -> bool:
        """Check if this backend supports any write operations (v2 or legacy).

        Returns:
            bool: True if any write operations are supported, False otherwise.

        """
        return self.supports_writes() or self.supports_legacy_writes()

    def supports_writes(self) -> bool:
        """Check if this backend supports write operations (v2 format).

        Write operations include update_model() and delete_model().
        Typically only PRIMARY mode backends support writes.

        Returns:
            bool: True if write operations are supported, False otherwise.

        """
        return False

    def supports_legacy_writes(self) -> bool:
        """Check if this backend supports write operations in legacy format.

        Legacy write operations include update_model_legacy() and delete_model_legacy().
        Only available when canonical_format='LEGACY' in PRIMARY mode.

        Returns:
            bool: True if legacy write operations are supported, False otherwise.

        """
        return False

    def supports_cache_warming(self) -> bool:
        """Check if this backend supports cache warming operations.

        Cache warming pre-populates the cache with data to improve initial request performance.
        Typically only backends with distributed caching (like Redis) support this.

        Returns:
            bool: True if cache warming is supported, False otherwise.

        """
        return False

    def supports_health_checks(self) -> bool:
        """Check if this backend supports health check operations.

        Health checks verify that the backend's external dependencies (Redis, databases, etc.)
        are accessible and functioning correctly.

        Returns:
            bool: True if health checks are supported, False otherwise.

        """
        return False

    def supports_statistics(self) -> bool:
        """Check if this backend supports statistics retrieval.

        Statistics provide insights into backend performance, cache hits/misses, etc.

        Returns:
            bool: True if statistics are supported, False otherwise.

        """
        return False

    def update_model(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        record_dict: dict[str, Any],
        *,
        logical_user_id: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Update or create a model reference.

        This is an optional method that write-capable backends can implement.
        Read-only backends should leave the default implementation which raises NotImplementedError.

        Args:
            category: The category to update.
            model_name: The name of the model to update or create.
            record_dict: The model record data as a dictionary.

        Args:
            logical_user_id: Immutable Horde user id for auditing contexts (optional).
            request_id: Optional tracing/idempotency identifier for audit correlation.

        Raises:
            NotImplementedError: If the backend does not support write operations.

        Implementation Requirements:
            - Create model if it doesn't exist
            - Update model if it exists
            - Ensure atomic writes (use temp files with rename for file-based backends)
            - Call [mark_stale()][(c).mark_stale] after successful write to invalidate cache
            - Override [supports_writes()][(c).supports_writes] to return `True`

        See Also:
            - [update_model_from_base_model()][(c).update_model_from_base_model]:
              Update from pydantic model (automatically provided)
            - [delete_model()][(c).delete_model]: Delete a model
            - [supports_writes()][(c).supports_writes]: Feature detection method

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

    def update_model_from_base_model(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        record_model: BaseModel,
        *,
        logical_user_id: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Update or create a model reference from a pydantic BaseModel.

        This is an optional method that write-capable backends can implement.
        Read-only backends should leave the default implementation which raises NotImplementedError.

        Args:
            category: The category to update.
            model_name: The name of the model to update or create.
            record_model: The model record data as a pydantic BaseModel.
            logical_user_id: Immutable Horde user id for auditing contexts (optional).
            request_id: Optional tracing/idempotency identifier for audit correlation.

        Raises:
            NotImplementedError: If the backend does not support write operations.

        Implementation Note:
            The base class provides this implementation automatically. It:
            1. Checks [supports_writes()][(c).supports_writes] returns `True`
            2. Converts the pydantic model to dict using `model_dump(exclude_unset=True)`
            3. Calls [update_model()][(c).update_model] with the dictionary

            Backends that support writes typically don't need to override this method.

        See Also:
            - [update_model()][(c).update_model]: Update from dictionary (implement this)
            - [supports_writes()][(c).supports_writes]: Feature detection method

        """
        if not self.supports_writes():
            raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

        record_dict = record_model.model_dump(exclude_unset=True)
        self.update_model(
            category,
            model_name,
            record_dict,
            logical_user_id=logical_user_id,
            request_id=request_id,
        )

    def delete_model(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        *,
        logical_user_id: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Delete a model reference.

        This is an optional method that write-capable backends can implement.
        Read-only backends should leave the default implementation which raises NotImplementedError.

        Args:
            category: The category containing the model.
            model_name: The name of the model to delete.

        Args:
            logical_user_id: Immutable Horde user id for auditing contexts (optional).
            request_id: Optional tracing/idempotency identifier for audit correlation.

        Raises:
            NotImplementedError: If the backend does not support write operations.
            KeyError: If the model doesn't exist.

        Implementation Requirements:
            - Raise `KeyError` if model doesn't exist
            - Ensure atomic writes
            - Call [mark_stale()][(c).mark_stale] after successful delete to invalidate cache
            - Override [supports_writes()][(c).supports_writes] to return `True`

        See Also:
            - [update_model()][(c).update_model]: Update or create a model
            - [supports_writes()][(c).supports_writes]: Feature detection method

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

    def update_model_legacy(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        record_dict: dict[str, Any],
        *,
        logical_user_id: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Update or create a model reference in legacy format.

        This is an optional method that legacy-write-capable backends can implement.
        Only available when canonical_format='LEGACY' in PRIMARY mode.

        Args:
            category: The category to update.
            model_name: The name of the model to update or create.
            record_dict: The model record data in legacy format as a dictionary.
            logical_user_id: Immutable Horde user id for auditing contexts (optional).
            request_id: Optional tracing/idempotency identifier for audit correlation.

        Raises:
            NotImplementedError: If the backend does not support legacy write operations.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

    def update_model_legacy_from_base_model(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        record_model: BaseModel,
        *,
        logical_user_id: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Update or create a model reference in legacy format from a pydantic BaseModel.

        This is an optional method that legacy-write-capable backends can implement.
        Only available when canonical_format='LEGACY' in PRIMARY mode.

        Args:
            category: The category to update.
            model_name: The name of the model to update or create.
            record_model: The model record data as a pydantic BaseModel.
            logical_user_id: Immutable Horde user id for auditing contexts (optional).
            request_id: Optional tracing/idempotency identifier for audit correlation.

        Raises:
            NotImplementedError: If the backend does not support legacy write operations.

        """
        if not self.supports_legacy_writes():
            raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

        record_dict = record_model.model_dump(exclude_unset=True)
        self.update_model_legacy(
            category,
            model_name,
            record_dict,
            logical_user_id=logical_user_id,
            request_id=request_id,
        )

    def delete_model_legacy(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        *,
        logical_user_id: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Delete a model reference from legacy format files.

        This is an optional method that legacy-write-capable backends can implement.
        Only available when canonical_format='LEGACY' in PRIMARY mode.

        Args:
            category: The category containing the model.
            model_name: The name of the model to delete.
            logical_user_id: Immutable Horde user id for auditing contexts (optional).
            request_id: Optional tracing/idempotency identifier for audit correlation.

        Raises:
            NotImplementedError: If the backend does not support legacy write operations.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

    def warm_cache(self) -> None:
        """Pre-populate cache with all categories for faster initial requests.

        This is an optional method that backends with cache warming support can implement.
        Backends without cache warming should leave the default implementation.

        Raises:
            NotImplementedError: If the backend does not support cache warming.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support cache warming")

    async def warm_cache_async(self) -> None:
        """Asynchronously pre-populate cache with all categories for faster initial requests.

        This is an optional method that backends with cache warming support can implement.
        Backends without cache warming should leave the default implementation.

        Raises:
            NotImplementedError: If the backend does not support async cache warming.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support async cache warming")

    def health_check(self) -> bool:
        """Check the health of the backend's external dependencies.

        This is an optional method that backends with health check support can implement.
        Backends without external dependencies should leave the default implementation.

        Returns:
            bool: True if healthy, False otherwise.

        Raises:
            NotImplementedError: If the backend does not support health checks.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support health checks")

    def get_statistics(self) -> dict[str, Any]:
        """Get backend performance and usage statistics.

        This is an optional method that backends with statistics support can implement.
        The structure of returned statistics is backend-specific.

        Returns:
            dict[str, Any]: Backend-specific statistics.

        Raises:
            NotImplementedError: If the backend does not support statistics.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support statistics")

    def get_replicate_mode(self) -> ReplicateMode:
        """Get the replication mode of this backend.

        Returns:
            ReplicateMode: The replicate mode (PRIMARY or REPLICA).

        """
        return self._replicate_mode

    def supports_metadata(self) -> bool:
        """Check if this backend supports metadata tracking.

        Metadata tracking records operation counts, timestamps, and health metrics
        for both legacy (v1) and v2 format operations. Typically only PRIMARY mode
        backends support metadata tracking.

        Returns:
            bool: True if metadata tracking is supported, False otherwise.

        """
        return False

    def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Get legacy format metadata for a specific category.

        This is an optional method that metadata-capable backends can implement.
        Backends without metadata support should leave the default implementation.

        Args:
            category: The category to get metadata for.

        Returns:
            CategoryMetadata: The legacy metadata.

        Raises:
            NotImplementedError: If the backend does not support metadata.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

    async def get_legacy_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Asynchronously get legacy format metadata for a specific category.

        This is an optional method that metadata-capable backends can implement.
        Backends without metadata support should leave the default implementation.

        Args:
            category: The category to get metadata for.

        Returns:
            CategoryMetadata: The legacy metadata.

        Raises:
            NotImplementedError: If the backend does not support metadata.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

    def get_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Get v2 format metadata for a specific category.

        This is an optional method that metadata-capable backends can implement.
        Backends without metadata support should leave the default implementation.

        Args:
            category: The category to get metadata for.

        Returns:
            CategoryMetadata: The v2 metadata.

        Raises:
            NotImplementedError: If the backend does not support metadata.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

    async def get_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Asynchronously get v2 format metadata for a specific category.

        This is an optional method that metadata-capable backends can implement.
        Backends without metadata support should leave the default implementation.

        Args:
            category: The category to get metadata for.

        Returns:
            CategoryMetadata: The v2 metadata.

        Raises:
            NotImplementedError: If the backend does not support metadata.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

    def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Get legacy format metadata for all categories.

        This is an optional method that metadata-capable backends can implement.
        Backends without metadata support should leave the default implementation.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

        Raises:
            NotImplementedError: If the backend does not support metadata.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

    async def get_all_legacy_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Asynchronously get legacy format metadata for all categories.

        This is an optional method that metadata-capable backends can implement.
        Backends without metadata support should leave the default implementation.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

        Raises:
            NotImplementedError: If the backend does not support metadata.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

    def get_all_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Get v2 format metadata for all categories.

        This is an optional method that metadata-capable backends can implement.
        Backends without metadata support should leave the default implementation.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

        Raises:
            NotImplementedError: If the backend does not support metadata.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

    async def get_all_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Asynchronously get v2 format metadata for all categories.

        This is an optional method that metadata-capable backends can implement.
        Backends without metadata support should leave the default implementation.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

        Raises:
            NotImplementedError: If the backend does not support metadata.

        """
        raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

_replicate_mode class-attribute instance-attribute

_replicate_mode = mode

_invalidation_callbacks instance-attribute

_invalidation_callbacks: list[
    Callable[[MODEL_REFERENCE_CATEGORY], None]
] = []

replicate_mode property

replicate_mode: ReplicateMode

Get the replicate mode of this backend.

__init__

__init__(
    mode: ReplicateMode = ReplicateMode.REPLICA,
) -> None

Initialize the backend.

Source code in src/horde_model_reference/backends/base.py
def __init__(
    self,
    mode: ReplicateMode = ReplicateMode.REPLICA,
) -> None:
    """Initialize the backend."""
    super().__init__()

    self._replicate_mode = mode
    self._invalidation_callbacks = []

fetch_category abstractmethod

fetch_category(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Fetch model reference data for a specific category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • force_refresh (bool, default: False ) –

    If True, bypass any backend-level caching and fetch fresh data. Defaults to False.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The model reference data as a dictionary mapping model names to their attributes, or None if the category cannot be fetched.

Implementation Requirements
  • Return data as a dictionary: {model_name: {attribute: value, ...}, ...}
  • Return None if category cannot be fetched
  • Honor force_refresh to bypass internal caches
  • Handle errors gracefully (log and return None)
Example Implementation
def fetch_category(self, category, *, force_refresh=False):
    def fetch():
        # Your fetch logic here
        response = httpx.get(f"{self.base_url}/{category}")
        return response.json() if response.status_code == 200 else None

    return self._fetch_with_cache(category, fetch, force_refresh=force_refresh)
See Also
  • fetch_all_categories(): Batch fetching of all categories
  • fetch_category_async(): Async variant
  • [ReplicaBackendBase._fetch_with_cache()] [^^^.replica_backend_base.ReplicaBackendBase._fetch_with_cache]: Helper for cache management
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def fetch_category(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Fetch model reference data for a specific category.

    Args:
        category: The category to fetch.
        force_refresh: If True, bypass any backend-level caching
            and fetch fresh data. Defaults to False.

    Returns:
        dict[str, Any] | None: The model reference data as a dictionary mapping
            model names to their attributes, or None if the category cannot be fetched.

    Implementation Requirements:
        - Return data as a dictionary: `{model_name: {attribute: value, ...}, ...}`
        - Return `None` if category cannot be fetched
        - Honor `force_refresh` to bypass internal caches
        - Handle errors gracefully (log and return `None`)

    Example Implementation:
        ```python
        def fetch_category(self, category, *, force_refresh=False):
            def fetch():
                # Your fetch logic here
                response = httpx.get(f"{self.base_url}/{category}")
                return response.json() if response.status_code == 200 else None

            return self._fetch_with_cache(category, fetch, force_refresh=force_refresh)
        ```

    See Also:
        - [fetch_all_categories()][(c).fetch_all_categories]: Batch fetching of all categories
        - [fetch_category_async()][(c).fetch_category_async]: Async variant
        - [ReplicaBackendBase._fetch_with_cache()]
          [^^^.replica_backend_base.ReplicaBackendBase._fetch_with_cache]:
          Helper for cache management

    """

fetch_all_categories abstractmethod

fetch_all_categories(
    *, force_refresh: bool = False
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Fetch model reference data for all categories.

Parameters:

  • force_refresh (bool, default: False ) –

    If True, bypass any backend-level caching and fetch fresh data. Defaults to False.

Returns:

  • dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

    dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]: A mapping of categories to their model reference data. Categories that cannot be fetched have None values.

Implementation Requirements
  • Return a dictionary mapping each category to its data
  • Use None values for categories that cannot be fetched
  • Typically implemented as a loop over fetch_category()
Example Implementation
def fetch_all_categories(self, *, force_refresh=False):
    return {
        category: self.fetch_category(category, force_refresh=force_refresh)
        for category in MODEL_REFERENCE_CATEGORY
    }
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def fetch_all_categories(
    self,
    *,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Fetch model reference data for all categories.

    Args:
        force_refresh: If True, bypass any backend-level caching
            and fetch fresh data. Defaults to False.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]: A mapping of categories
            to their model reference data. Categories that cannot be fetched have None values.

    Implementation Requirements:
        - Return a dictionary mapping each category to its data
        - Use `None` values for categories that cannot be fetched
        - Typically implemented as a loop over [fetch_category()][(c).fetch_category]

    Example Implementation:
        ```python
        def fetch_all_categories(self, *, force_refresh=False):
            return {
                category: self.fetch_category(category, force_refresh=force_refresh)
                for category in MODEL_REFERENCE_CATEGORY
            }
        ```

    See Also:
        - [fetch_category()][(c).fetch_category]: Single category fetching
        - [fetch_all_categories_async()][(c).fetch_all_categories_async]: Async variant

    """

fetch_category_async abstractmethod async

fetch_category_async(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Asynchronously fetch model reference data for a specific category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • httpx_client (AsyncClient | None, default: None ) –

    An optional httpx async client for connection pooling.

  • force_refresh (bool, default: False ) –

    If True, bypass any backend-level caching. Defaults to False.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The model reference data, or None if fetch failed.

Implementation Requirements
  • Use async I/O where possible (network requests, file operations with aiofiles)
  • Accept optional httpx_client for connection pooling
  • Create temporary client if not provided
  • Same return format as synchronous version
  • Share cache with synchronous methods when using ReplicaBackendBase
Example Implementation
async def fetch_category_async(self, category, *, httpx_client=None, force_refresh=False):
    close_client = httpx_client is None
    if httpx_client is None:
        httpx_client = httpx.AsyncClient()

    try:
        response = await httpx_client.get(f"{self.base_url}/{category}")
        return response.json() if response.status_code == 200 else None
    finally:
        if close_client:
            await httpx_client.aclose()
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
async def fetch_category_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Asynchronously fetch model reference data for a specific category.

    Args:
        category: The category to fetch.
        httpx_client: An optional httpx async client for connection pooling.
        force_refresh: If True, bypass any backend-level caching. Defaults to False.

    Returns:
        dict[str, Any] | None: The model reference data, or None if fetch failed.

    Implementation Requirements:
        - Use async I/O where possible (network requests, file operations with aiofiles)
        - Accept optional `httpx_client` for connection pooling
        - Create temporary client if not provided
        - Same return format as synchronous version
        - Share cache with synchronous methods when using
          [ReplicaBackendBase][^^^.replica_backend_base.ReplicaBackendBase]

    Example Implementation:
        ```python
        async def fetch_category_async(self, category, *, httpx_client=None, force_refresh=False):
            close_client = httpx_client is None
            if httpx_client is None:
                httpx_client = httpx.AsyncClient()

            try:
                response = await httpx_client.get(f"{self.base_url}/{category}")
                return response.json() if response.status_code == 200 else None
            finally:
                if close_client:
                    await httpx_client.aclose()
        ```

    See Also:
        - [fetch_category()][(c).fetch_category]: Synchronous variant
        - [fetch_all_categories_async()][(c).fetch_all_categories_async]: Async batch fetching

    """

fetch_all_categories_async abstractmethod async

fetch_all_categories_async(
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Asynchronously fetch model reference data for all categories.

Parameters:

  • httpx_client (AsyncClient | None, default: None ) –

    An optional httpx async client for connection pooling.

  • force_refresh (bool, default: False ) –

    If True, bypass any backend-level caching. Defaults to False.

Returns:

Implementation Requirements
  • Use asyncio.gather() for concurrent fetching when possible
  • Share httpx_client across fetches for connection pooling
  • Same return format as synchronous version
Example Implementation
async def fetch_all_categories_async(self, *, httpx_client=None, force_refresh=False):
    close_client = httpx_client is None
    if httpx_client is None:
        httpx_client = httpx.AsyncClient()

    try:
        tasks = [
            self.fetch_category_async(cat, httpx_client=httpx_client, force_refresh=force_refresh)
            for cat in MODEL_REFERENCE_CATEGORY
        ]
        results = await asyncio.gather(*tasks)
        return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))
    finally:
        if close_client:
            await httpx_client.aclose()
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
async def fetch_all_categories_async(
    self,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Asynchronously fetch model reference data for all categories.

    Args:
        httpx_client: An optional httpx async client for connection pooling.
        force_refresh: If True, bypass any backend-level caching. Defaults to False.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]: A mapping of categories to their data.

    Implementation Requirements:
        - Use `asyncio.gather()` for concurrent fetching when possible
        - Share `httpx_client` across fetches for connection pooling
        - Same return format as synchronous version

    Example Implementation:
        ```python
        async def fetch_all_categories_async(self, *, httpx_client=None, force_refresh=False):
            close_client = httpx_client is None
            if httpx_client is None:
                httpx_client = httpx.AsyncClient()

            try:
                tasks = [
                    self.fetch_category_async(cat, httpx_client=httpx_client, force_refresh=force_refresh)
                    for cat in MODEL_REFERENCE_CATEGORY
                ]
                results = await asyncio.gather(*tasks)
                return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))
            finally:
                if close_client:
                    await httpx_client.aclose()
        ```

    See Also:
        - [fetch_all_categories()][(c).fetch_all_categories]: Synchronous variant
        - [fetch_category_async()][(c).fetch_category_async]: Async single category fetch

    """

needs_refresh abstractmethod

needs_refresh(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if existing cached data for a category needs to be refreshed.

This method indicates whether cached data has become stale and should be re-fetched from the source. It does NOT indicate whether an initial fetch is needed for data that has never been loaded.

Semantic distinction: - Returns False when no cached data exists (no initial fetch needed via this method) - Returns True when cached data exists but has become stale

Backends can implement staleness detection based on file modification times, database timestamps, ETags, TTL expiration, explicit invalidation, etc.

Parameters:

Returns:

  • bool ( bool ) –

    True if cached data needs refresh due to staleness. False if no cached data exists or if cached data is still fresh.

Implementation Requirements
  • Return False if no data has been cached yet (initial fetch is different concern)
  • Return True if cached data exists but is stale
  • Staleness can be based on: TTL expiration, file mtime changes, ETags, explicit invalidation
Note

ReplicaBackendBase provides a concrete implementation that checks explicit staleness marking, TTL expiration, file mtime changes, and custom validation hooks.

See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if existing cached data for a category needs to be refreshed.

    This method indicates whether cached data has become stale and should be
    re-fetched from the source. It does NOT indicate whether an initial fetch
    is needed for data that has never been loaded.

    Semantic distinction:
    - Returns False when no cached data exists (no initial fetch needed via this method)
    - Returns True when cached data exists but has become stale

    Backends can implement staleness detection based on file modification times,
    database timestamps, ETags, TTL expiration, explicit invalidation, etc.

    Args:
        category: The category to check.

    Returns:
        bool: True if cached data needs refresh due to staleness. False if no
              cached data exists or if cached data is still fresh.

    Implementation Requirements:
        - Return `False` if no data has been cached yet (initial fetch is different concern)
        - Return `True` if cached data exists but is stale
        - Staleness can be based on: TTL expiration, file mtime changes, ETags, explicit invalidation

    Note:
        [ReplicaBackendBase][^^^.replica_backend_base.ReplicaBackendBase]
        provides a concrete implementation that checks explicit staleness marking,
        TTL expiration, file mtime changes, and custom validation hooks.

    See Also:
        - [mark_stale()][(c).mark_stale]: Explicitly mark a category as stale
        - [ReplicaBackendBase.should_fetch_data()][^^^.replica_backend_base.ReplicaBackendBase.should_fetch_data]:
          Combined initial fetch + refresh check

    """

register_invalidation_callback

register_invalidation_callback(
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None

Register a callback to be called when a category is invalidated.

This allows external components (like ModelReferenceManager) to be notified when cached data becomes stale and needs to be refreshed.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def register_invalidation_callback(
    self,
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None:
    """Register a callback to be called when a category is invalidated.

    This allows external components (like ModelReferenceManager) to be notified
    when cached data becomes stale and needs to be refreshed.

    Args:
        callback: Function to call with the invalidated category.

    """
    self._invalidation_callbacks.append(callback)
    logger.debug(f"Registered invalidation callback: {getattr(callback, '__name__', repr(callback))}")

_notify_invalidation

_notify_invalidation(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Notify all registered callbacks that a category has been invalidated.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def _notify_invalidation(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Notify all registered callbacks that a category has been invalidated.

    Args:
        category: The category that was invalidated.

    """
    for callback in self._invalidation_callbacks:
        try:
            callback(category)
        except Exception as e:
            cb_name = getattr(callback, "__name__", repr(callback))
            logger.error(f"Invalidation callback {cb_name} failed for {category}: {e}")

_mark_stale_impl abstractmethod

_mark_stale_impl(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Backend-specific implementation of marking a category as stale.

Subclasses must implement this to handle their specific staleness tracking.

Parameters:

Implementation Requirements
  • Update backend-specific staleness tracking (e.g., add to _stale_categories set)
  • Called by public mark_stale() method before notifying callbacks
  • Implementations should override this method, not mark_stale()
Note

The public mark_stale() method calls this implementation and then automatically notifies all registered invalidation callbacks.

Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def _mark_stale_impl(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Backend-specific implementation of marking a category as stale.

    Subclasses must implement this to handle their specific staleness tracking.

    Args:
        category: The category to mark as stale.

    Implementation Requirements:
        - Update backend-specific staleness tracking (e.g., add to `_stale_categories` set)
        - Called by public [mark_stale()][(c).mark_stale] method before notifying callbacks
        - Implementations should override this method, not `mark_stale()`

    Note:
        The public `mark_stale()` method calls this implementation and then automatically
        notifies all registered invalidation callbacks.

    """

mark_stale

mark_stale(category: MODEL_REFERENCE_CATEGORY) -> None

Mark a category's data as stale, requiring refresh on next access.

This method calls the backend-specific implementation and then notifies all registered callbacks.

Parameters:

Implementation Note

The base class provides this public implementation. Subclasses should override _mark_stale_impl() instead of this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def mark_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Mark a category's data as stale, requiring refresh on next access.

    This method calls the backend-specific implementation and then notifies
    all registered callbacks.

    Args:
        category: The category to mark as stale.

    Implementation Note:
        The base class provides this public implementation. Subclasses should override
        [_mark_stale_impl()][(c)._mark_stale_impl]
        instead of this method.

    See Also:
        - [_mark_stale_impl()][(c)._mark_stale_impl]: Backend-specific staleness tracking
        - [register_invalidation_callback()][(c).register_invalidation_callback]:
          Register callbacks for invalidation events

    """
    self._mark_stale_impl(category)
    self._notify_invalidation(category)

get_category_file_path abstractmethod

get_category_file_path(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Get the file path for a category's data, if applicable.

Some backends (like file-based ones) have a physical file path associated with each category. Others (like database backends) may return None.

Parameters:

Returns:

  • Path | None

    Path | None: The file path, or None if not applicable for this backend.

Implementation Requirements
  • Return Path object for file-based backends
  • Return None for backends without file storage (HTTP-only, database, etc.)
Example Implementations
# File-based backend
def get_category_file_path(self, category):
    return self.base_path / f"{category.value}.json"

# HTTP-only backend
def get_category_file_path(self, category):
    return None
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Get the file path for a category's data, if applicable.

    Some backends (like file-based ones) have a physical file path associated
    with each category. Others (like database backends) may return None.

    Args:
        category: The category to get the path for.

    Returns:
        Path | None: The file path, or None if not applicable for this backend.

    Implementation Requirements:
        - Return `Path` object for file-based backends
        - Return `None` for backends without file storage (HTTP-only, database, etc.)

    Example Implementations:
        ```python
        # File-based backend
        def get_category_file_path(self, category):
            return self.base_path / f"{category.value}.json"

        # HTTP-only backend
        def get_category_file_path(self, category):
            return None
        ```

    See Also:
        - [get_all_category_file_paths()][(c).get_all_category_file_paths]: Get all file paths

    """

get_all_category_file_paths abstractmethod

get_all_category_file_paths() -> dict[
    MODEL_REFERENCE_CATEGORY, Path | None
]

Get file paths for all categories, if applicable.

Returns:

  • dict[MODEL_REFERENCE_CATEGORY, Path | None]

    dict[MODEL_REFERENCE_CATEGORY, Path | None]: Mapping of categories to their file paths. Returns None values for categories without file paths.

Implementation Requirements
  • Return dictionary with all categories
  • Use None values for categories without file paths
  • Typically implemented by iterating over categories and calling get_category_file_path()
Example Implementation
def get_all_category_file_paths(self):
    return {
        category: self.get_category_file_path(category)
        for category in MODEL_REFERENCE_CATEGORY
    }
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
    """Get file paths for all categories, if applicable.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, Path | None]: Mapping of categories to their file paths.
            Returns None values for categories without file paths.

    Implementation Requirements:
        - Return dictionary with all categories
        - Use `None` values for categories without file paths
        - Typically implemented by iterating over categories and calling `get_category_file_path()`

    Example Implementation:
        ```python
        def get_all_category_file_paths(self):
            return {
                category: self.get_category_file_path(category)
                for category in MODEL_REFERENCE_CATEGORY
            }
        ```

    See Also:
        - [get_category_file_path()][(c).get_category_file_path]: Get single category file path

    """

get_legacy_json abstractmethod

get_legacy_json(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None

Get raw legacy JSON for a specific category without pydantic validation.

This method returns cached legacy format JSON data, downloading if needed. The cache is populated during initialization, downloads, and on-demand loads.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Category to retrieve.

  • redownload (bool, default: False ) –

    If True, redownload before returning and refresh cache.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The raw legacy JSON dict, or None if not found.

Implementation Requirements
  • Return legacy format data as dictionary
  • Support caching with optional redownload
  • Return None if not available
  • The redownload parameter is analogous to force_refresh in fetch methods
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def get_legacy_json(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None:
    """Get raw legacy JSON for a specific category without pydantic validation.

    This method returns cached legacy format JSON data, downloading if needed.
    The cache is populated during initialization, downloads, and on-demand loads.

    Args:
        category: Category to retrieve.
        redownload: If True, redownload before returning and refresh cache.

    Returns:
        dict[str, Any] | None: The raw legacy JSON dict, or None if not found.

    Implementation Requirements:
        - Return legacy format data as dictionary
        - Support caching with optional redownload
        - Return `None` if not available
        - The `redownload` parameter is analogous to `force_refresh` in fetch methods

    See Also:
        - [get_legacy_json_string()][(c).get_legacy_json_string]: Get as string instead of dict

    """

get_legacy_json_string abstractmethod

get_legacy_json_string(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None

Get raw legacy JSON string for a specific category without pydantic validation.

This method returns cached legacy format JSON data as a string, downloading if needed. The cache is populated during initialization, downloads, and on-demand loads.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Category to retrieve.

  • redownload (bool, default: False ) –

    If True, redownload before returning and refresh cache.

Returns:

  • str | None

    str | None: The raw legacy JSON string, or None if not found.

Implementation Requirements
  • Return legacy format data as JSON string
  • Same caching semantics as get_legacy_json()
  • Return None if not available
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def get_legacy_json_string(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None:
    """Get raw legacy JSON string for a specific category without pydantic validation.

    This method returns cached legacy format JSON data as a string, downloading if needed.
    The cache is populated during initialization, downloads, and on-demand loads.

    Args:
        category: Category to retrieve.
        redownload: If True, redownload before returning and refresh cache.

    Returns:
        str | None: The raw legacy JSON string, or None if not found.

    Implementation Requirements:
        - Return legacy format data as JSON string
        - Same caching semantics as [get_legacy_json()][(c).get_legacy_json]
        - Return `None` if not available

    See Also:
        - [get_legacy_json()][(c).get_legacy_json]: Get as dict instead of string

    """

support_any_writes

support_any_writes() -> bool

Check if this backend supports any write operations (v2 or legacy).

Returns:

  • bool ( bool ) –

    True if any write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def support_any_writes(self) -> bool:
    """Check if this backend supports any write operations (v2 or legacy).

    Returns:
        bool: True if any write operations are supported, False otherwise.

    """
    return self.supports_writes() or self.supports_legacy_writes()

supports_writes

supports_writes() -> bool

Check if this backend supports write operations (v2 format).

Write operations include update_model() and delete_model(). Typically only PRIMARY mode backends support writes.

Returns:

  • bool ( bool ) –

    True if write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_writes(self) -> bool:
    """Check if this backend supports write operations (v2 format).

    Write operations include update_model() and delete_model().
    Typically only PRIMARY mode backends support writes.

    Returns:
        bool: True if write operations are supported, False otherwise.

    """
    return False

supports_legacy_writes

supports_legacy_writes() -> bool

Check if this backend supports write operations in legacy format.

Legacy write operations include update_model_legacy() and delete_model_legacy(). Only available when canonical_format='LEGACY' in PRIMARY mode.

Returns:

  • bool ( bool ) –

    True if legacy write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_legacy_writes(self) -> bool:
    """Check if this backend supports write operations in legacy format.

    Legacy write operations include update_model_legacy() and delete_model_legacy().
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Returns:
        bool: True if legacy write operations are supported, False otherwise.

    """
    return False

supports_cache_warming

supports_cache_warming() -> bool

Check if this backend supports cache warming operations.

Cache warming pre-populates the cache with data to improve initial request performance. Typically only backends with distributed caching (like Redis) support this.

Returns:

  • bool ( bool ) –

    True if cache warming is supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_cache_warming(self) -> bool:
    """Check if this backend supports cache warming operations.

    Cache warming pre-populates the cache with data to improve initial request performance.
    Typically only backends with distributed caching (like Redis) support this.

    Returns:
        bool: True if cache warming is supported, False otherwise.

    """
    return False

supports_health_checks

supports_health_checks() -> bool

Check if this backend supports health check operations.

Health checks verify that the backend's external dependencies (Redis, databases, etc.) are accessible and functioning correctly.

Returns:

  • bool ( bool ) –

    True if health checks are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_health_checks(self) -> bool:
    """Check if this backend supports health check operations.

    Health checks verify that the backend's external dependencies (Redis, databases, etc.)
    are accessible and functioning correctly.

    Returns:
        bool: True if health checks are supported, False otherwise.

    """
    return False

supports_statistics

supports_statistics() -> bool

Check if this backend supports statistics retrieval.

Statistics provide insights into backend performance, cache hits/misses, etc.

Returns:

  • bool ( bool ) –

    True if statistics are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_statistics(self) -> bool:
    """Check if this backend supports statistics retrieval.

    Statistics provide insights into backend performance, cache hits/misses, etc.

    Returns:
        bool: True if statistics are supported, False otherwise.

    """
    return False

update_model

update_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data as a dictionary.

Parameters:

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Requirements
  • Create model if it doesn't exist
  • Update model if it exists
  • Ensure atomic writes (use temp files with rename for file-based backends)
  • Call mark_stale() after successful write to invalidate cache
  • Override supports_writes() to return True
See Also
Source code in src/horde_model_reference/backends/base.py
def update_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data as a dictionary.

    Args:
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Requirements:
        - Create model if it doesn't exist
        - Update model if it exists
        - Ensure atomic writes (use temp files with rename for file-based backends)
        - Call [mark_stale()][(c).mark_stale] after successful write to invalidate cache
        - Override [supports_writes()][(c).supports_writes] to return `True`

    See Also:
        - [update_model_from_base_model()][(c).update_model_from_base_model]:
          Update from pydantic model (automatically provided)
        - [delete_model()][(c).delete_model]: Delete a model
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

update_model_from_base_model

update_model_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference from a pydantic BaseModel.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Note

The base class provides this implementation automatically. It: 1. Checks supports_writes() returns True 2. Converts the pydantic model to dict using model_dump(exclude_unset=True) 3. Calls update_model() with the dictionary

Backends that support writes typically don't need to override this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def update_model_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference from a pydantic BaseModel.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Note:
        The base class provides this implementation automatically. It:
        1. Checks [supports_writes()][(c).supports_writes] returns `True`
        2. Converts the pydantic model to dict using `model_dump(exclude_unset=True)`
        3. Calls [update_model()][(c).update_model] with the dictionary

        Backends that support writes typically don't need to override this method.

    See Also:
        - [update_model()][(c).update_model]: Update from dictionary (implement this)
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    if not self.supports_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model

delete_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

Parameters:

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Requirements
  • Raise KeyError if model doesn't exist
  • Ensure atomic writes
  • Call mark_stale() after successful delete to invalidate cache
  • Override supports_writes() to return True
See Also
Source code in src/horde_model_reference/backends/base.py
def delete_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.

    Args:
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.
        KeyError: If the model doesn't exist.

    Implementation Requirements:
        - Raise `KeyError` if model doesn't exist
        - Ensure atomic writes
        - Call [mark_stale()][(c).mark_stale] after successful delete to invalidate cache
        - Override [supports_writes()][(c).supports_writes] to return `True`

    See Also:
        - [update_model()][(c).update_model]: Update or create a model
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

update_model_legacy

update_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data in legacy format as a dictionary.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data in legacy format as a dictionary.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

update_model_legacy_from_base_model

update_model_legacy_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format from a pydantic BaseModel.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format from a pydantic BaseModel.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    if not self.supports_legacy_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model_legacy(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model_legacy

delete_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference from legacy format files.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category containing the model.

  • model_name (str) –

    The name of the model to delete.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def delete_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference from legacy format files.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

warm_cache

warm_cache() -> None

Pre-populate cache with all categories for faster initial requests.

This is an optional method that backends with cache warming support can implement. Backends without cache warming should leave the default implementation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def warm_cache(self) -> None:
    """Pre-populate cache with all categories for faster initial requests.

    This is an optional method that backends with cache warming support can implement.
    Backends without cache warming should leave the default implementation.

    Raises:
        NotImplementedError: If the backend does not support cache warming.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support cache warming")

warm_cache_async async

warm_cache_async() -> None

Asynchronously pre-populate cache with all categories for faster initial requests.

This is an optional method that backends with cache warming support can implement. Backends without cache warming should leave the default implementation.

Raises:

Source code in src/horde_model_reference/backends/base.py
async def warm_cache_async(self) -> None:
    """Asynchronously pre-populate cache with all categories for faster initial requests.

    This is an optional method that backends with cache warming support can implement.
    Backends without cache warming should leave the default implementation.

    Raises:
        NotImplementedError: If the backend does not support async cache warming.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async cache warming")

health_check

health_check() -> bool

Check the health of the backend's external dependencies.

This is an optional method that backends with health check support can implement. Backends without external dependencies should leave the default implementation.

Returns:

  • bool ( bool ) –

    True if healthy, False otherwise.

Raises:

Source code in src/horde_model_reference/backends/base.py
def health_check(self) -> bool:
    """Check the health of the backend's external dependencies.

    This is an optional method that backends with health check support can implement.
    Backends without external dependencies should leave the default implementation.

    Returns:
        bool: True if healthy, False otherwise.

    Raises:
        NotImplementedError: If the backend does not support health checks.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support health checks")

get_statistics

get_statistics() -> dict[str, Any]

Get backend performance and usage statistics.

This is an optional method that backends with statistics support can implement. The structure of returned statistics is backend-specific.

Returns:

  • dict[str, Any]

    dict[str, Any]: Backend-specific statistics.

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_statistics(self) -> dict[str, Any]:
    """Get backend performance and usage statistics.

    This is an optional method that backends with statistics support can implement.
    The structure of returned statistics is backend-specific.

    Returns:
        dict[str, Any]: Backend-specific statistics.

    Raises:
        NotImplementedError: If the backend does not support statistics.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support statistics")

get_replicate_mode

get_replicate_mode() -> ReplicateMode

Get the replication mode of this backend.

Returns:

  • ReplicateMode ( ReplicateMode ) –

    The replicate mode (PRIMARY or REPLICA).

Source code in src/horde_model_reference/backends/base.py
def get_replicate_mode(self) -> ReplicateMode:
    """Get the replication mode of this backend.

    Returns:
        ReplicateMode: The replicate mode (PRIMARY or REPLICA).

    """
    return self._replicate_mode

supports_metadata

supports_metadata() -> bool

Check if this backend supports metadata tracking.

Metadata tracking records operation counts, timestamps, and health metrics for both legacy (v1) and v2 format operations. Typically only PRIMARY mode backends support metadata tracking.

Returns:

  • bool ( bool ) –

    True if metadata tracking is supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_metadata(self) -> bool:
    """Check if this backend supports metadata tracking.

    Metadata tracking records operation counts, timestamps, and health metrics
    for both legacy (v1) and v2 format operations. Typically only PRIMARY mode
    backends support metadata tracking.

    Returns:
        bool: True if metadata tracking is supported, False otherwise.

    """
    return False

get_legacy_metadata

get_legacy_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get legacy format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get legacy format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_legacy_metadata_async async

get_legacy_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get legacy format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_legacy_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get legacy format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_metadata

get_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get v2 format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get v2 format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_metadata_async async

get_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get v2 format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get v2 format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_all_legacy_metadata

get_all_legacy_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get legacy format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get legacy format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_all_legacy_metadata_async async

get_all_legacy_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get legacy format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_all_legacy_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get legacy format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_all_metadata

get_all_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get v2 format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_all_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get v2 format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_all_metadata_async async

get_all_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get v2 format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_all_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get v2 format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")