Skip to content

replica_backend_base

Shared functionality for REPLICA-oriented model reference backends.

This base class provides comprehensive caching infrastructure for all backend types, not just REPLICA mode backends. It includes TTL-based caching, mtime validation, and extensible hooks for custom cache validation logic.

ReplicaBackendBase

Bases: ModelReferenceBackend

Base class providing comprehensive caching infrastructure for model reference backends.

Despite its name, this class serves as a universal caching layer for both REPLICA and PRIMARY backend modes. It implements a dual-cache architecture supporting both modern (v2/converted) and legacy JSON formats, with sophisticated validation mechanisms including TTL expiration, file modification time tracking, and extensible custom validation hooks.

Cache Architecture

The class maintains two independent cache systems:

V2/Converted Format Cache: - _cache: Primary storage for converted model reference data - _category_timestamps: Tracks when each category was last cached - _last_known_mtimes: File modification times for invalidation - _stale_categories: Set of categories marked for refresh

Legacy Format Cache: - _legacy_json_cache: Dict representation of legacy JSON - _legacy_json_string_cache: String representation of legacy JSON - _legacy_cache_timestamps: Tracks when each legacy category was cached - _legacy_last_known_mtimes: Legacy file modification times - _stale_legacy_categories: Set of legacy categories marked for refresh

Cache Validation

The validation system performs multiple checks to ensure cache freshness:

  1. Explicit Staleness: Categories marked via mark_stale() or _invalidate_cache()
  2. TTL Expiration: Time-based expiration if cache_ttl_seconds is set
  3. File Modification: Automatic invalidation when source file mtime changes
  4. Custom Validation: Extensible via _additional_cache_validation() override
Thread Safety

All cache operations are protected by: - _lock: RLock for synchronous operations - _async_lock: AsyncLock for async operations (if needed)

Subclass Integration

For V2/Converted Format: - Use _fetch_with_cache() for standard fetch-and-cache pattern - Call _get_from_cache() to retrieve cached v2 data - Call _store_in_cache() to store fetched v2 data - Override _get_file_path_for_validation() to enable mtime validation - Override _additional_cache_validation() for custom validation logic

For Legacy Format: - Call _get_legacy_from_cache() to retrieve cached legacy data (dict + string) - Call _store_legacy_in_cache() to store fetched legacy data - Override _get_legacy_file_path_for_validation() for legacy mtime validation

Cache Query Methods
  • has_cached_data(): Check if data exists (ignores validity)
  • is_cache_valid(): Check if cached data exists AND is valid
  • needs_refresh(): Check if existing cached data should be refetched
  • should_fetch_data(): Combined check for initial fetch OR refresh

Examples:

Basic fetch implementation using the cache helper::

def fetch_category(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False
) -> dict[str, Any] | None:
    return self._fetch_with_cache(
        category,
        lambda: self._fetch_from_source(category),
        force_refresh=force_refresh
    )

Custom validation with file path override::

def _get_file_path_for_validation(
    self,
    category: MODEL_REFERENCE_CATEGORY
) -> Path | None:
    return self.data_dir / f"{category.value}.json"

def _additional_cache_validation(
    self,
    category: MODEL_REFERENCE_CATEGORY
) -> bool:
    # Custom validation logic
    return self._check_data_integrity(category)
Source code in src/horde_model_reference/backends/replica_backend_base.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
class ReplicaBackendBase(ModelReferenceBackend):
    """Base class providing comprehensive caching infrastructure for model reference backends.

    Despite its name, this class serves as a universal caching layer for both REPLICA and PRIMARY
    backend modes. It implements a dual-cache architecture supporting both modern (v2/converted)
    and legacy JSON formats, with sophisticated validation mechanisms including TTL expiration,
    file modification time tracking, and extensible custom validation hooks.

    Cache Architecture:
        The class maintains two independent cache systems:

        **V2/Converted Format Cache:**
            - `_cache`: Primary storage for converted model reference data
            - `_category_timestamps`: Tracks when each category was last cached
            - `_last_known_mtimes`: File modification times for invalidation
            - `_stale_categories`: Set of categories marked for refresh

        **Legacy Format Cache:**
            - `_legacy_json_cache`: Dict representation of legacy JSON
            - `_legacy_json_string_cache`: String representation of legacy JSON
            - `_legacy_cache_timestamps`: Tracks when each legacy category was cached
            - `_legacy_last_known_mtimes`: Legacy file modification times
            - `_stale_legacy_categories`: Set of legacy categories marked for refresh

    Cache Validation:
        The validation system performs multiple checks to ensure cache freshness:

        1. **Explicit Staleness**: Categories marked via `mark_stale()` or `_invalidate_cache()`
        2. **TTL Expiration**: Time-based expiration if `cache_ttl_seconds` is set
        3. **File Modification**: Automatic invalidation when source file mtime changes
        4. **Custom Validation**: Extensible via `_additional_cache_validation()` override

    Thread Safety:
        All cache operations are protected by:
            - `_lock`: `RLock` for synchronous operations
            - `_async_lock`: `AsyncLock` for async operations (if needed)

    Subclass Integration:
        **For V2/Converted Format:**
            - Use `_fetch_with_cache()` for standard fetch-and-cache pattern
            - Call `_get_from_cache()` to retrieve cached v2 data
            - Call `_store_in_cache()` to store fetched v2 data
            - Override `_get_file_path_for_validation()` to enable mtime validation
            - Override `_additional_cache_validation()` for custom validation logic

        **For Legacy Format:**
            - Call `_get_legacy_from_cache()` to retrieve cached legacy data (dict + string)
            - Call `_store_legacy_in_cache()` to store fetched legacy data
            - Override `_get_legacy_file_path_for_validation()` for legacy mtime validation

    Cache Query Methods:
        - `has_cached_data()`: Check if data exists (ignores validity)
        - `is_cache_valid()`: Check if cached data exists AND is valid
        - `needs_refresh()`: Check if existing cached data should be refetched
        - `should_fetch_data()`: Combined check for initial fetch OR refresh


    Examples:
        Basic fetch implementation using the cache helper::

            def fetch_category(
                self,
                category: MODEL_REFERENCE_CATEGORY,
                *,
                force_refresh: bool = False
            ) -> dict[str, Any] | None:
                return self._fetch_with_cache(
                    category,
                    lambda: self._fetch_from_source(category),
                    force_refresh=force_refresh
                )

        Custom validation with file path override::

            def _get_file_path_for_validation(
                self,
                category: MODEL_REFERENCE_CATEGORY
            ) -> Path | None:
                return self.data_dir / f"{category.value}.json"

            def _additional_cache_validation(
                self,
                category: MODEL_REFERENCE_CATEGORY
            ) -> bool:
                # Custom validation logic
                return self._check_data_integrity(category)

    """

    def __init__(
        self,
        *,
        mode: ReplicateMode = ReplicateMode.REPLICA,
        cache_ttl_seconds: int | None = None,
    ) -> None:
        """Configure shared cache tracking for all backends.

        Args:
            mode: The replication mode (REPLICA or PRIMARY).
            cache_ttl_seconds: TTL for cache entries in seconds. None means no expiration.

        """
        super().__init__(mode=mode)

        self._cache_ttl_seconds = cache_ttl_seconds

        # V2/Converted format cache infrastructure
        self._cache: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
        self._category_timestamps: dict[MODEL_REFERENCE_CATEGORY, float] = {}
        self._last_known_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}
        self._stale_categories: set[MODEL_REFERENCE_CATEGORY] = set()

        # Legacy format cache infrastructure (dict and string)
        self._legacy_json_cache: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
        self._legacy_json_string_cache: dict[MODEL_REFERENCE_CATEGORY, str | None] = {}
        self._legacy_cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float] = {}
        self._legacy_last_known_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}
        self._stale_legacy_categories: set[MODEL_REFERENCE_CATEGORY] = set()

        self._lock = RLock()
        self._async_lock: AsyncLock = AsyncLock()

    def _mark_category_fresh(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Record that we hold a fresh cache entry for *category*.

        Also updates mtime if a file path is provided by the subclass.

        Args:
            category: The category to mark as fresh.

        """
        self._category_timestamps[category] = time.time()
        self._stale_categories.discard(category)

        file_path = self._get_file_path_for_validation(category)
        if file_path and file_path.exists():
            try:
                self._last_known_mtimes[category] = file_path.stat().st_mtime
            except Exception:
                self._last_known_mtimes[category] = 0.0

        logger.debug(f"Marked category {category} as fresh")

    def _invalidate_category_timestamp(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Drop timestamp knowledge for *category* without adjusting payloads."""
        self._category_timestamps.pop(category, None)

    def has_cached_data(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        """Check if any data has been cached for this category.

        This is a simple existence check that doesn't validate freshness.
        Use this for initial fetch detection: "Have we loaded this at least once?"

        Args:
            category: The category to check.

        Returns:
            bool: True if data exists in cache (may be stale), False if never loaded.

        """
        with self._lock:
            return category in self._cache

    def is_cache_valid(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        """Check if cached data exists and is still valid for the given category.

        This method performs comprehensive validation to determine if cached data can be
        used without refetching. It's primarily used internally by cache retrieval methods
        but can also be called directly for validation checks.

        Args:
            category: The category to validate.

        Returns:
            True if cache exists and all validation checks pass, False otherwise.

        Validation Steps:
            The method performs checks in the following order:

            1. **Explicit Staleness**: Returns False if category is in `_stale_categories`
            2. **Cache Existence**: Returns False if category has never been cached
            3. **Timestamp Existence**: Returns False if no timestamp recorded
            4. **TTL Expiration**: Checks if `cache_ttl_seconds` exceeded (calls `mark_stale()` if expired)
            5. **File Modification**: Compares current file mtime with cached mtime (calls `mark_stale()` if changed)
            6. **Custom Validation**: Calls `_additional_cache_validation()` for subclass-specific checks

        Side Effects:
            When staleness is detected (TTL expiration or mtime change), this method calls
            `mark_stale()` to trigger invalidation callbacks and notify the manager.

        Related Methods:
            - `has_cached_data()`: Simple existence check, ignores validity
            - `should_fetch_data()`: Combined check for "should I fetch?" (initial OR refresh)
            - `needs_refresh()`: Checks if cached data should be refetched (staleness only)

        Return Value Semantics:
            - Returns `False` for both "no data" and "stale data" cases
            - Use `has_cached_data()` to distinguish between these cases
            - Use `needs_refresh()` to check staleness without considering initial fetch


        Note:
            This method is thread-safe and uses the internal `_lock` for synchronization.

        """
        with self._lock:
            if category in self._stale_categories:
                logger.debug(f"Category {category} marked stale, cache invalid")
                return False

            if category not in self._cache:
                return False

            last_updated = self._category_timestamps.get(category)

        if last_updated is None:
            logger.debug(f"Category {category} has no timestamp, considering cache invalid")
            return False

        if self._cache_ttl_seconds is not None:
            elapsed = time.time() - last_updated
            if elapsed > self._cache_ttl_seconds:
                logger.debug(f"Category {category} TTL expired ({elapsed}s > {self._cache_ttl_seconds}s)")
                self.mark_stale(category)
                return False

        file_path = self._get_file_path_for_validation(category)
        if file_path and file_path.exists():
            try:
                current_mtime = file_path.stat().st_mtime
                last_known = self._last_known_mtimes.get(category, 0.0)
                if current_mtime != last_known:
                    logger.debug(
                        f"File {file_path.name} mtime changed "
                        f"(current={current_mtime}, cached={last_known}), cache invalid"
                    )
                    self.mark_stale(category)
                    return False
            except Exception:
                return False

        if not self._additional_cache_validation(category):
            logger.debug(f"Category {category} failed additional validation")
            return False

        return True

    @override
    def _mark_stale_impl(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        with self._lock:
            self._stale_categories.add(category)

    def should_fetch_data(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        """Determine if data should be fetched (initial load OR refresh).

        This is a convenience method that combines both initial fetch detection
        and refresh detection into a single check. Use this when you want to
        know "should I fetch data now?" regardless of whether it's an initial
        load or a refresh.

        This is equivalent to: `not is_cache_valid(category) or needs_refresh(category)`
        but handles the logic more efficiently.

        Args:
            category: The category to check.

        Returns:
            bool: True if data should be fetched (either initial or refresh),
                  False if cached data is valid and fresh.

        """
        return not self.is_cache_valid(category)

    @override
    def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        with self._lock:
            if category in self._stale_categories:
                logger.debug(f"Category {category} marked stale, needs refresh")
                return True

            last_updated = self._category_timestamps.get(category)

        if last_updated is None:
            # No timestamp means no data has been fetched/cached yet.
            # This is not a "refresh" scenario - it's an initial fetch scenario.
            # Callers should handle initial fetch separately from refresh logic.
            logger.debug(f"Category {category} has no timestamp, no refresh needed (not yet fetched)")
            return False

        if self._cache_ttl_seconds is not None:
            cache_stale = (time.time() - last_updated) > self._cache_ttl_seconds
            if cache_stale:
                logger.debug(f"Category {category} cache is stale, needs refresh")
                self.mark_stale(category)
                return True

        file_path = self._get_file_path_for_validation(category)
        if file_path and file_path.exists():
            try:
                current_mtime = file_path.stat().st_mtime
                last_known = self._last_known_mtimes.get(category, 0.0)
                if current_mtime != last_known:
                    logger.debug(f"File {file_path.name} mtime changed, needs refresh")
                    self.mark_stale(category)
                    return True
            except Exception:
                return True

        return False

    @property
    def cache_ttl_seconds(self) -> int | None:
        """The cache TTL currently enforced for category payloads."""
        return self._cache_ttl_seconds

    @property
    def lock(self) -> RLock:
        """Thread-safe lock shared by subclasses for critical sections."""
        return self._lock

    @property
    def async_lock(self) -> AsyncLock | None:
        """Asyncio lock usable by subclasses when coordinating coroutines."""
        return self._async_lock

    def _set_cache_ttl_seconds(self, ttl_seconds: int | None) -> None:
        """Allow subclasses to tweak TTL after initialization if desired."""
        self._cache_ttl_seconds = ttl_seconds

    def _get_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
        """Return file path for mtime validation.

        Subclasses should override this if they want automatic mtime validation.
        If a path is returned, the cache will be invalidated if the file's mtime changes.

        Args:
            category: The category to get the file path for.

        Returns:
            Path | None: File path to check for mtime, or None to skip mtime validation.

        """
        return None

    def _additional_cache_validation(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        """Perform additional cache validation.

        Subclasses can override this to add custom validation logic beyond
        TTL and mtime checks. This is called during `is_cache_valid()`.

        Args:
            category: The category to validate.

        Returns:
            bool: True if cache is valid, False to invalidate.

        """
        return True

    def _fetch_with_cache(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        fetch_fn: Callable[[], dict[str, Any] | None],
        *,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Implement standard fetch pattern with automatic caching.

        This helper method implements the recommended fetch pattern:
        1. Check cache if not forcing refresh
        2. Return cached data if valid
        3. Fetch data using provided function
        4. Store in cache and return

        Use this in your fetch_category() implementations to avoid boilerplate.

        Args:
            category: The category to fetch.
            fetch_fn: Callable that fetches the data (no args, returns dict or None).
            force_refresh: If True, skip cache check and force fetch.

        Returns:
            dict[str, Any] | None: The fetched/cached data.

        Example:
            def fetch_category(self, category, *, force_refresh=False):
                return self._fetch_with_cache(
                    category,
                    lambda: self._fetch_from_source(category),
                    force_refresh=force_refresh
                )

        """
        # Check cache first unless force refresh
        if not force_refresh:
            cached_data = self._get_from_cache(category)
            if cached_data is not None:
                return cached_data

        # Fetch data
        data = fetch_fn()

        # Store in cache
        if data is not None:
            self._store_in_cache(category, data)
        else:
            # Store None to indicate "checked but not found"
            self._store_in_cache(category, None)

        return data

    def _get_from_cache(self, category: MODEL_REFERENCE_CATEGORY) -> dict[str, Any] | None:
        """Get data from cache if valid.

        This is the primary method subclasses should use to retrieve cached data.
        It handles all validation logic internally, including initial fetch detection
        (returns None if data has never been loaded).

        This method determines if an INITIAL fetch is needed by checking cache existence.
        Use `needs_refresh()` to check if existing cached data should be RE-fetched.

        Args:
            category: The category to retrieve from cache.

        Returns:
            dict[str, Any] | None: Cached data if valid, None if cache miss (initial fetch needed)
                                   or cache invalid (refresh needed).

        """
        with self._lock:
            if self.is_cache_valid(category):
                logger.debug(f"Cache hit for {category}")
                return self._cache.get(category)

            logger.debug(f"Cache miss for {category}")
            return None

    def _store_in_cache(self, category: MODEL_REFERENCE_CATEGORY, data: dict[str, Any] | None) -> None:
        """Store data in cache and mark category as fresh.

        This is the primary method subclasses should use to store fetched data.
        It handles timestamp updates and mtime tracking internally.

        Args:
            category: The category to store.
            data: The data to cache, or None if category has no data.

        """
        with self._lock:
            self._cache[category] = data
            # Only mark as fresh if we actually have data
            # None values indicate failed loads and should not prevent retries
            if data is not None:
                self._mark_category_fresh(category)
                logger.debug(f"Stored {category} in cache")
            else:
                logger.debug(f"Stored None for {category}, not marking as fresh")

    def _invalidate_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Invalidate cache for a category without deleting the data.

        This marks the category as stale, forcing a refetch on next access.

        Args:
            category: The category to invalidate.

        """
        with self._lock:
            self._stale_categories.add(category)
            self._category_timestamps.pop(category, None)
            logger.debug(f"Invalidated cache for {category}")

    def _get_legacy_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
        """Return legacy file path for mtime validation.

        Subclasses should override this if they want automatic mtime validation
        for legacy format files. This is separate from the v2 file path.

        Args:
            category: The category to get the legacy file path for.

        Returns:
            Path | None: Legacy file path to check for mtime, or None to skip mtime validation.

        """
        return None

    def _mark_legacy_category_fresh(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Record that we hold a fresh legacy cache entry for *category*.

        Also updates legacy file mtime if a path is provided by the subclass.

        Args:
            category: The category to mark as fresh.

        """
        self._legacy_cache_timestamps[category] = time.time()
        self._stale_legacy_categories.discard(category)

        legacy_file_path = self._get_legacy_file_path_for_validation(category)
        if legacy_file_path and legacy_file_path.exists():
            try:
                self._legacy_last_known_mtimes[category] = legacy_file_path.stat().st_mtime
            except Exception:
                self._legacy_last_known_mtimes[category] = 0.0

        logger.debug(f"Marked legacy category {category} as fresh")

    def is_legacy_cache_valid(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        """Return True if the legacy cache for *category* is considered fresh.

        Performs validation checks for legacy format cache:
        1. Staleness check (explicit invalidation)
        2. Cache existence check (dict or string)
        3. TTL expiration check
        4. File mtime check (if legacy file path provided by subclass)

        Args:
            category: The category to validate.

        Returns:
            bool: True if legacy cache is valid and can be used.

        """
        with self._lock:
            if category in self._stale_legacy_categories:
                logger.debug(f"Legacy category {category} marked stale, cache invalid")
                return False

            if category not in self._legacy_json_cache and category not in self._legacy_json_string_cache:
                return False

            last_updated = self._legacy_cache_timestamps.get(category)

        if last_updated is None:
            logger.debug(f"Legacy category {category} has no timestamp, considering cache invalid")
            return False

        if self._cache_ttl_seconds is not None:
            elapsed = time.time() - last_updated
            if elapsed > self._cache_ttl_seconds:
                logger.debug(f"Legacy category {category} TTL expired ({elapsed}s > {self._cache_ttl_seconds}s)")
                return False

        legacy_file_path = self._get_legacy_file_path_for_validation(category)
        if legacy_file_path and legacy_file_path.exists():
            try:
                current_mtime = legacy_file_path.stat().st_mtime
                last_known = self._legacy_last_known_mtimes.get(category, 0.0)
                if current_mtime != last_known:
                    logger.debug(
                        f"Legacy file {legacy_file_path.name} mtime changed "
                        f"(current={current_mtime}, cached={last_known}), cache invalid"
                    )
                    return False
            except Exception:
                return False

        return True

    def _get_legacy_from_cache(
        self,
        category: MODEL_REFERENCE_CATEGORY,
    ) -> tuple[dict[str, Any] | None, str | None]:
        """Get legacy data from cache if valid.

        Returns both dict and string representations of legacy JSON.

        Args:
            category: The category to retrieve from cache.

        Returns:
            tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) if cache miss.

        """
        with self._lock:
            if self.is_legacy_cache_valid(category):
                logger.debug(f"Legacy cache hit for {category}")
                return (
                    self._legacy_json_cache.get(category),
                    self._legacy_json_string_cache.get(category),
                )

            logger.debug(f"Legacy cache miss for {category}")
            return None, None

    def _store_legacy_in_cache(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        legacy_dict: dict[str, Any] | None,
        legacy_string: str | None,
    ) -> None:
        """Store legacy data in cache and mark category as fresh.

        Stores both dict and string representations of legacy JSON.

        Args:
            category: The category to store.
            legacy_dict: The legacy JSON as a dict, or None.
            legacy_string: The legacy JSON as a string, or None.

        """
        with self._lock:
            self._legacy_json_cache[category] = legacy_dict
            self._legacy_json_string_cache[category] = legacy_string
            # Only mark as fresh if we actually have data
            # None values indicate failed loads and should not prevent retries
            if legacy_dict is not None or legacy_string is not None:
                self._mark_legacy_category_fresh(category)
                logger.debug(f"Stored legacy {category} in cache")
            else:
                logger.debug(f"Stored None for legacy {category}, not marking as fresh")

    def _invalidate_legacy_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Invalidate legacy cache for a category without deleting the data.

        This marks the category as stale, forcing a refetch on next access.

        Args:
            category: The category to invalidate.

        """
        with self._lock:
            self._stale_legacy_categories.add(category)
            self._legacy_cache_timestamps.pop(category, None)
            logger.debug(f"Invalidated legacy cache for {category}")

_cache_ttl_seconds instance-attribute

_cache_ttl_seconds = cache_ttl_seconds

_cache instance-attribute

_cache: dict[
    MODEL_REFERENCE_CATEGORY, dict[str, Any] | None
] = {}

_category_timestamps instance-attribute

_category_timestamps: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_last_known_mtimes instance-attribute

_last_known_mtimes: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_stale_categories instance-attribute

_stale_categories: set[MODEL_REFERENCE_CATEGORY] = set()

_legacy_json_cache instance-attribute

_legacy_json_cache: dict[
    MODEL_REFERENCE_CATEGORY, dict[str, Any] | None
] = {}

_legacy_json_string_cache instance-attribute

_legacy_json_string_cache: dict[
    MODEL_REFERENCE_CATEGORY, str | None
] = {}

_legacy_cache_timestamps instance-attribute

_legacy_cache_timestamps: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_legacy_last_known_mtimes instance-attribute

_legacy_last_known_mtimes: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_stale_legacy_categories instance-attribute

_stale_legacy_categories: set[MODEL_REFERENCE_CATEGORY] = (
    set()
)

_lock instance-attribute

_lock = RLock()

_async_lock instance-attribute

_async_lock: Lock = Lock()

cache_ttl_seconds property

cache_ttl_seconds: int | None

The cache TTL currently enforced for category payloads.

lock property

lock: RLock

Thread-safe lock shared by subclasses for critical sections.

async_lock property

async_lock: Lock | None

Asyncio lock usable by subclasses when coordinating coroutines.

_replicate_mode class-attribute instance-attribute

_replicate_mode = mode

_invalidation_callbacks instance-attribute

_invalidation_callbacks: list[
    Callable[[MODEL_REFERENCE_CATEGORY], None]
] = []

replicate_mode property

replicate_mode: ReplicateMode

Get the replicate mode of this backend.

__init__

__init__(
    *,
    mode: ReplicateMode = ReplicateMode.REPLICA,
    cache_ttl_seconds: int | None = None,
) -> None

Configure shared cache tracking for all backends.

Parameters:

  • mode (ReplicateMode, default: REPLICA ) –

    The replication mode (REPLICA or PRIMARY).

  • cache_ttl_seconds (int | None, default: None ) –

    TTL for cache entries in seconds. None means no expiration.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def __init__(
    self,
    *,
    mode: ReplicateMode = ReplicateMode.REPLICA,
    cache_ttl_seconds: int | None = None,
) -> None:
    """Configure shared cache tracking for all backends.

    Args:
        mode: The replication mode (REPLICA or PRIMARY).
        cache_ttl_seconds: TTL for cache entries in seconds. None means no expiration.

    """
    super().__init__(mode=mode)

    self._cache_ttl_seconds = cache_ttl_seconds

    # V2/Converted format cache infrastructure
    self._cache: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
    self._category_timestamps: dict[MODEL_REFERENCE_CATEGORY, float] = {}
    self._last_known_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}
    self._stale_categories: set[MODEL_REFERENCE_CATEGORY] = set()

    # Legacy format cache infrastructure (dict and string)
    self._legacy_json_cache: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
    self._legacy_json_string_cache: dict[MODEL_REFERENCE_CATEGORY, str | None] = {}
    self._legacy_cache_timestamps: dict[MODEL_REFERENCE_CATEGORY, float] = {}
    self._legacy_last_known_mtimes: dict[MODEL_REFERENCE_CATEGORY, float] = {}
    self._stale_legacy_categories: set[MODEL_REFERENCE_CATEGORY] = set()

    self._lock = RLock()
    self._async_lock: AsyncLock = AsyncLock()

_mark_category_fresh

_mark_category_fresh(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Record that we hold a fresh cache entry for category.

Also updates mtime if a file path is provided by the subclass.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _mark_category_fresh(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Record that we hold a fresh cache entry for *category*.

    Also updates mtime if a file path is provided by the subclass.

    Args:
        category: The category to mark as fresh.

    """
    self._category_timestamps[category] = time.time()
    self._stale_categories.discard(category)

    file_path = self._get_file_path_for_validation(category)
    if file_path and file_path.exists():
        try:
            self._last_known_mtimes[category] = file_path.stat().st_mtime
        except Exception:
            self._last_known_mtimes[category] = 0.0

    logger.debug(f"Marked category {category} as fresh")

_invalidate_category_timestamp

_invalidate_category_timestamp(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Drop timestamp knowledge for category without adjusting payloads.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_category_timestamp(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Drop timestamp knowledge for *category* without adjusting payloads."""
    self._category_timestamps.pop(category, None)

has_cached_data

has_cached_data(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if any data has been cached for this category.

This is a simple existence check that doesn't validate freshness. Use this for initial fetch detection: "Have we loaded this at least once?"

Parameters:

Returns:

  • bool ( bool ) –

    True if data exists in cache (may be stale), False if never loaded.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def has_cached_data(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if any data has been cached for this category.

    This is a simple existence check that doesn't validate freshness.
    Use this for initial fetch detection: "Have we loaded this at least once?"

    Args:
        category: The category to check.

    Returns:
        bool: True if data exists in cache (may be stale), False if never loaded.

    """
    with self._lock:
        return category in self._cache

is_cache_valid

is_cache_valid(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if cached data exists and is still valid for the given category.

This method performs comprehensive validation to determine if cached data can be used without refetching. It's primarily used internally by cache retrieval methods but can also be called directly for validation checks.

Parameters:

Returns:

  • bool

    True if cache exists and all validation checks pass, False otherwise.

Validation Steps

The method performs checks in the following order:

  1. Explicit Staleness: Returns False if category is in _stale_categories
  2. Cache Existence: Returns False if category has never been cached
  3. Timestamp Existence: Returns False if no timestamp recorded
  4. TTL Expiration: Checks if cache_ttl_seconds exceeded (calls mark_stale() if expired)
  5. File Modification: Compares current file mtime with cached mtime (calls mark_stale() if changed)
  6. Custom Validation: Calls _additional_cache_validation() for subclass-specific checks
Side Effects

When staleness is detected (TTL expiration or mtime change), this method calls mark_stale() to trigger invalidation callbacks and notify the manager.

Return Value Semantics
  • Returns False for both "no data" and "stale data" cases
  • Use has_cached_data() to distinguish between these cases
  • Use needs_refresh() to check staleness without considering initial fetch
Note

This method is thread-safe and uses the internal _lock for synchronization.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def is_cache_valid(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if cached data exists and is still valid for the given category.

    This method performs comprehensive validation to determine if cached data can be
    used without refetching. It's primarily used internally by cache retrieval methods
    but can also be called directly for validation checks.

    Args:
        category: The category to validate.

    Returns:
        True if cache exists and all validation checks pass, False otherwise.

    Validation Steps:
        The method performs checks in the following order:

        1. **Explicit Staleness**: Returns False if category is in `_stale_categories`
        2. **Cache Existence**: Returns False if category has never been cached
        3. **Timestamp Existence**: Returns False if no timestamp recorded
        4. **TTL Expiration**: Checks if `cache_ttl_seconds` exceeded (calls `mark_stale()` if expired)
        5. **File Modification**: Compares current file mtime with cached mtime (calls `mark_stale()` if changed)
        6. **Custom Validation**: Calls `_additional_cache_validation()` for subclass-specific checks

    Side Effects:
        When staleness is detected (TTL expiration or mtime change), this method calls
        `mark_stale()` to trigger invalidation callbacks and notify the manager.

    Related Methods:
        - `has_cached_data()`: Simple existence check, ignores validity
        - `should_fetch_data()`: Combined check for "should I fetch?" (initial OR refresh)
        - `needs_refresh()`: Checks if cached data should be refetched (staleness only)

    Return Value Semantics:
        - Returns `False` for both "no data" and "stale data" cases
        - Use `has_cached_data()` to distinguish between these cases
        - Use `needs_refresh()` to check staleness without considering initial fetch


    Note:
        This method is thread-safe and uses the internal `_lock` for synchronization.

    """
    with self._lock:
        if category in self._stale_categories:
            logger.debug(f"Category {category} marked stale, cache invalid")
            return False

        if category not in self._cache:
            return False

        last_updated = self._category_timestamps.get(category)

    if last_updated is None:
        logger.debug(f"Category {category} has no timestamp, considering cache invalid")
        return False

    if self._cache_ttl_seconds is not None:
        elapsed = time.time() - last_updated
        if elapsed > self._cache_ttl_seconds:
            logger.debug(f"Category {category} TTL expired ({elapsed}s > {self._cache_ttl_seconds}s)")
            self.mark_stale(category)
            return False

    file_path = self._get_file_path_for_validation(category)
    if file_path and file_path.exists():
        try:
            current_mtime = file_path.stat().st_mtime
            last_known = self._last_known_mtimes.get(category, 0.0)
            if current_mtime != last_known:
                logger.debug(
                    f"File {file_path.name} mtime changed "
                    f"(current={current_mtime}, cached={last_known}), cache invalid"
                )
                self.mark_stale(category)
                return False
        except Exception:
            return False

    if not self._additional_cache_validation(category):
        logger.debug(f"Category {category} failed additional validation")
        return False

    return True

_mark_stale_impl

_mark_stale_impl(
    category: MODEL_REFERENCE_CATEGORY,
) -> None
Source code in src/horde_model_reference/backends/replica_backend_base.py
@override
def _mark_stale_impl(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    with self._lock:
        self._stale_categories.add(category)

should_fetch_data

should_fetch_data(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Determine if data should be fetched (initial load OR refresh).

This is a convenience method that combines both initial fetch detection and refresh detection into a single check. Use this when you want to know "should I fetch data now?" regardless of whether it's an initial load or a refresh.

This is equivalent to: not is_cache_valid(category) or needs_refresh(category) but handles the logic more efficiently.

Parameters:

Returns:

  • bool ( bool ) –

    True if data should be fetched (either initial or refresh), False if cached data is valid and fresh.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def should_fetch_data(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Determine if data should be fetched (initial load OR refresh).

    This is a convenience method that combines both initial fetch detection
    and refresh detection into a single check. Use this when you want to
    know "should I fetch data now?" regardless of whether it's an initial
    load or a refresh.

    This is equivalent to: `not is_cache_valid(category) or needs_refresh(category)`
    but handles the logic more efficiently.

    Args:
        category: The category to check.

    Returns:
        bool: True if data should be fetched (either initial or refresh),
              False if cached data is valid and fresh.

    """
    return not self.is_cache_valid(category)

needs_refresh

needs_refresh(category: MODEL_REFERENCE_CATEGORY) -> bool
Source code in src/horde_model_reference/backends/replica_backend_base.py
@override
def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    with self._lock:
        if category in self._stale_categories:
            logger.debug(f"Category {category} marked stale, needs refresh")
            return True

        last_updated = self._category_timestamps.get(category)

    if last_updated is None:
        # No timestamp means no data has been fetched/cached yet.
        # This is not a "refresh" scenario - it's an initial fetch scenario.
        # Callers should handle initial fetch separately from refresh logic.
        logger.debug(f"Category {category} has no timestamp, no refresh needed (not yet fetched)")
        return False

    if self._cache_ttl_seconds is not None:
        cache_stale = (time.time() - last_updated) > self._cache_ttl_seconds
        if cache_stale:
            logger.debug(f"Category {category} cache is stale, needs refresh")
            self.mark_stale(category)
            return True

    file_path = self._get_file_path_for_validation(category)
    if file_path and file_path.exists():
        try:
            current_mtime = file_path.stat().st_mtime
            last_known = self._last_known_mtimes.get(category, 0.0)
            if current_mtime != last_known:
                logger.debug(f"File {file_path.name} mtime changed, needs refresh")
                self.mark_stale(category)
                return True
        except Exception:
            return True

    return False

_set_cache_ttl_seconds

_set_cache_ttl_seconds(ttl_seconds: int | None) -> None

Allow subclasses to tweak TTL after initialization if desired.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _set_cache_ttl_seconds(self, ttl_seconds: int | None) -> None:
    """Allow subclasses to tweak TTL after initialization if desired."""
    self._cache_ttl_seconds = ttl_seconds

_get_file_path_for_validation

_get_file_path_for_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Return file path for mtime validation.

Subclasses should override this if they want automatic mtime validation. If a path is returned, the cache will be invalidated if the file's mtime changes.

Parameters:

Returns:

  • Path | None

    Path | None: File path to check for mtime, or None to skip mtime validation.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Return file path for mtime validation.

    Subclasses should override this if they want automatic mtime validation.
    If a path is returned, the cache will be invalidated if the file's mtime changes.

    Args:
        category: The category to get the file path for.

    Returns:
        Path | None: File path to check for mtime, or None to skip mtime validation.

    """
    return None

_additional_cache_validation

_additional_cache_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Perform additional cache validation.

Subclasses can override this to add custom validation logic beyond TTL and mtime checks. This is called during is_cache_valid().

Parameters:

Returns:

  • bool ( bool ) –

    True if cache is valid, False to invalidate.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _additional_cache_validation(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Perform additional cache validation.

    Subclasses can override this to add custom validation logic beyond
    TTL and mtime checks. This is called during `is_cache_valid()`.

    Args:
        category: The category to validate.

    Returns:
        bool: True if cache is valid, False to invalidate.

    """
    return True

_fetch_with_cache

_fetch_with_cache(
    category: MODEL_REFERENCE_CATEGORY,
    fetch_fn: Callable[[], dict[str, Any] | None],
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Implement standard fetch pattern with automatic caching.

This helper method implements the recommended fetch pattern: 1. Check cache if not forcing refresh 2. Return cached data if valid 3. Fetch data using provided function 4. Store in cache and return

Use this in your fetch_category() implementations to avoid boilerplate.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • fetch_fn (Callable[[], dict[str, Any] | None]) –

    Callable that fetches the data (no args, returns dict or None).

  • force_refresh (bool, default: False ) –

    If True, skip cache check and force fetch.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The fetched/cached data.

Example

def fetch_category(self, category, *, force_refresh=False): return self._fetch_with_cache( category, lambda: self._fetch_from_source(category), force_refresh=force_refresh )

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _fetch_with_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    fetch_fn: Callable[[], dict[str, Any] | None],
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Implement standard fetch pattern with automatic caching.

    This helper method implements the recommended fetch pattern:
    1. Check cache if not forcing refresh
    2. Return cached data if valid
    3. Fetch data using provided function
    4. Store in cache and return

    Use this in your fetch_category() implementations to avoid boilerplate.

    Args:
        category: The category to fetch.
        fetch_fn: Callable that fetches the data (no args, returns dict or None).
        force_refresh: If True, skip cache check and force fetch.

    Returns:
        dict[str, Any] | None: The fetched/cached data.

    Example:
        def fetch_category(self, category, *, force_refresh=False):
            return self._fetch_with_cache(
                category,
                lambda: self._fetch_from_source(category),
                force_refresh=force_refresh
            )

    """
    # Check cache first unless force refresh
    if not force_refresh:
        cached_data = self._get_from_cache(category)
        if cached_data is not None:
            return cached_data

    # Fetch data
    data = fetch_fn()

    # Store in cache
    if data is not None:
        self._store_in_cache(category, data)
    else:
        # Store None to indicate "checked but not found"
        self._store_in_cache(category, None)

    return data

_get_from_cache

_get_from_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> dict[str, Any] | None

Get data from cache if valid.

This is the primary method subclasses should use to retrieve cached data. It handles all validation logic internally, including initial fetch detection (returns None if data has never been loaded).

This method determines if an INITIAL fetch is needed by checking cache existence. Use needs_refresh() to check if existing cached data should be RE-fetched.

Parameters:

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: Cached data if valid, None if cache miss (initial fetch needed) or cache invalid (refresh needed).

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_from_cache(self, category: MODEL_REFERENCE_CATEGORY) -> dict[str, Any] | None:
    """Get data from cache if valid.

    This is the primary method subclasses should use to retrieve cached data.
    It handles all validation logic internally, including initial fetch detection
    (returns None if data has never been loaded).

    This method determines if an INITIAL fetch is needed by checking cache existence.
    Use `needs_refresh()` to check if existing cached data should be RE-fetched.

    Args:
        category: The category to retrieve from cache.

    Returns:
        dict[str, Any] | None: Cached data if valid, None if cache miss (initial fetch needed)
                               or cache invalid (refresh needed).

    """
    with self._lock:
        if self.is_cache_valid(category):
            logger.debug(f"Cache hit for {category}")
            return self._cache.get(category)

        logger.debug(f"Cache miss for {category}")
        return None

_store_in_cache

_store_in_cache(
    category: MODEL_REFERENCE_CATEGORY,
    data: dict[str, Any] | None,
) -> None

Store data in cache and mark category as fresh.

This is the primary method subclasses should use to store fetched data. It handles timestamp updates and mtime tracking internally.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _store_in_cache(self, category: MODEL_REFERENCE_CATEGORY, data: dict[str, Any] | None) -> None:
    """Store data in cache and mark category as fresh.

    This is the primary method subclasses should use to store fetched data.
    It handles timestamp updates and mtime tracking internally.

    Args:
        category: The category to store.
        data: The data to cache, or None if category has no data.

    """
    with self._lock:
        self._cache[category] = data
        # Only mark as fresh if we actually have data
        # None values indicate failed loads and should not prevent retries
        if data is not None:
            self._mark_category_fresh(category)
            logger.debug(f"Stored {category} in cache")
        else:
            logger.debug(f"Stored None for {category}, not marking as fresh")

_invalidate_cache

_invalidate_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Invalidate cache for a category without deleting the data.

This marks the category as stale, forcing a refetch on next access.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Invalidate cache for a category without deleting the data.

    This marks the category as stale, forcing a refetch on next access.

    Args:
        category: The category to invalidate.

    """
    with self._lock:
        self._stale_categories.add(category)
        self._category_timestamps.pop(category, None)
        logger.debug(f"Invalidated cache for {category}")

_get_legacy_file_path_for_validation

_get_legacy_file_path_for_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Return legacy file path for mtime validation.

Subclasses should override this if they want automatic mtime validation for legacy format files. This is separate from the v2 file path.

Parameters:

Returns:

  • Path | None

    Path | None: Legacy file path to check for mtime, or None to skip mtime validation.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_legacy_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Return legacy file path for mtime validation.

    Subclasses should override this if they want automatic mtime validation
    for legacy format files. This is separate from the v2 file path.

    Args:
        category: The category to get the legacy file path for.

    Returns:
        Path | None: Legacy file path to check for mtime, or None to skip mtime validation.

    """
    return None

_mark_legacy_category_fresh

_mark_legacy_category_fresh(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Record that we hold a fresh legacy cache entry for category.

Also updates legacy file mtime if a path is provided by the subclass.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _mark_legacy_category_fresh(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Record that we hold a fresh legacy cache entry for *category*.

    Also updates legacy file mtime if a path is provided by the subclass.

    Args:
        category: The category to mark as fresh.

    """
    self._legacy_cache_timestamps[category] = time.time()
    self._stale_legacy_categories.discard(category)

    legacy_file_path = self._get_legacy_file_path_for_validation(category)
    if legacy_file_path and legacy_file_path.exists():
        try:
            self._legacy_last_known_mtimes[category] = legacy_file_path.stat().st_mtime
        except Exception:
            self._legacy_last_known_mtimes[category] = 0.0

    logger.debug(f"Marked legacy category {category} as fresh")

is_legacy_cache_valid

is_legacy_cache_valid(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Return True if the legacy cache for category is considered fresh.

Performs validation checks for legacy format cache: 1. Staleness check (explicit invalidation) 2. Cache existence check (dict or string) 3. TTL expiration check 4. File mtime check (if legacy file path provided by subclass)

Parameters:

Returns:

  • bool ( bool ) –

    True if legacy cache is valid and can be used.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def is_legacy_cache_valid(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Return True if the legacy cache for *category* is considered fresh.

    Performs validation checks for legacy format cache:
    1. Staleness check (explicit invalidation)
    2. Cache existence check (dict or string)
    3. TTL expiration check
    4. File mtime check (if legacy file path provided by subclass)

    Args:
        category: The category to validate.

    Returns:
        bool: True if legacy cache is valid and can be used.

    """
    with self._lock:
        if category in self._stale_legacy_categories:
            logger.debug(f"Legacy category {category} marked stale, cache invalid")
            return False

        if category not in self._legacy_json_cache and category not in self._legacy_json_string_cache:
            return False

        last_updated = self._legacy_cache_timestamps.get(category)

    if last_updated is None:
        logger.debug(f"Legacy category {category} has no timestamp, considering cache invalid")
        return False

    if self._cache_ttl_seconds is not None:
        elapsed = time.time() - last_updated
        if elapsed > self._cache_ttl_seconds:
            logger.debug(f"Legacy category {category} TTL expired ({elapsed}s > {self._cache_ttl_seconds}s)")
            return False

    legacy_file_path = self._get_legacy_file_path_for_validation(category)
    if legacy_file_path and legacy_file_path.exists():
        try:
            current_mtime = legacy_file_path.stat().st_mtime
            last_known = self._legacy_last_known_mtimes.get(category, 0.0)
            if current_mtime != last_known:
                logger.debug(
                    f"Legacy file {legacy_file_path.name} mtime changed "
                    f"(current={current_mtime}, cached={last_known}), cache invalid"
                )
                return False
        except Exception:
            return False

    return True

_get_legacy_from_cache

_get_legacy_from_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> tuple[dict[str, Any] | None, str | None]

Get legacy data from cache if valid.

Returns both dict and string representations of legacy JSON.

Parameters:

Returns:

  • tuple[dict[str, Any] | None, str | None]

    tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) if cache miss.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_legacy_from_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
) -> tuple[dict[str, Any] | None, str | None]:
    """Get legacy data from cache if valid.

    Returns both dict and string representations of legacy JSON.

    Args:
        category: The category to retrieve from cache.

    Returns:
        tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) if cache miss.

    """
    with self._lock:
        if self.is_legacy_cache_valid(category):
            logger.debug(f"Legacy cache hit for {category}")
            return (
                self._legacy_json_cache.get(category),
                self._legacy_json_string_cache.get(category),
            )

        logger.debug(f"Legacy cache miss for {category}")
        return None, None

_store_legacy_in_cache

_store_legacy_in_cache(
    category: MODEL_REFERENCE_CATEGORY,
    legacy_dict: dict[str, Any] | None,
    legacy_string: str | None,
) -> None

Store legacy data in cache and mark category as fresh.

Stores both dict and string representations of legacy JSON.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to store.

  • legacy_dict (dict[str, Any] | None) –

    The legacy JSON as a dict, or None.

  • legacy_string (str | None) –

    The legacy JSON as a string, or None.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _store_legacy_in_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    legacy_dict: dict[str, Any] | None,
    legacy_string: str | None,
) -> None:
    """Store legacy data in cache and mark category as fresh.

    Stores both dict and string representations of legacy JSON.

    Args:
        category: The category to store.
        legacy_dict: The legacy JSON as a dict, or None.
        legacy_string: The legacy JSON as a string, or None.

    """
    with self._lock:
        self._legacy_json_cache[category] = legacy_dict
        self._legacy_json_string_cache[category] = legacy_string
        # Only mark as fresh if we actually have data
        # None values indicate failed loads and should not prevent retries
        if legacy_dict is not None or legacy_string is not None:
            self._mark_legacy_category_fresh(category)
            logger.debug(f"Stored legacy {category} in cache")
        else:
            logger.debug(f"Stored None for legacy {category}, not marking as fresh")

_invalidate_legacy_cache

_invalidate_legacy_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Invalidate legacy cache for a category without deleting the data.

This marks the category as stale, forcing a refetch on next access.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_legacy_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Invalidate legacy cache for a category without deleting the data.

    This marks the category as stale, forcing a refetch on next access.

    Args:
        category: The category to invalidate.

    """
    with self._lock:
        self._stale_legacy_categories.add(category)
        self._legacy_cache_timestamps.pop(category, None)
        logger.debug(f"Invalidated legacy cache for {category}")

fetch_category abstractmethod

fetch_category(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Fetch model reference data for a specific category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • force_refresh (bool, default: False ) –

    If True, bypass any backend-level caching and fetch fresh data. Defaults to False.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The model reference data as a dictionary mapping model names to their attributes, or None if the category cannot be fetched.

Implementation Requirements
  • Return data as a dictionary: {model_name: {attribute: value, ...}, ...}
  • Return None if category cannot be fetched
  • Honor force_refresh to bypass internal caches
  • Handle errors gracefully (log and return None)
Example Implementation
def fetch_category(self, category, *, force_refresh=False):
    def fetch():
        # Your fetch logic here
        response = httpx.get(f"{self.base_url}/{category}")
        return response.json() if response.status_code == 200 else None

    return self._fetch_with_cache(category, fetch, force_refresh=force_refresh)
See Also
  • fetch_all_categories(): Batch fetching of all categories
  • fetch_category_async(): Async variant
  • [ReplicaBackendBase._fetch_with_cache()] [^^^.replica_backend_base.ReplicaBackendBase._fetch_with_cache]: Helper for cache management
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def fetch_category(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Fetch model reference data for a specific category.

    Args:
        category: The category to fetch.
        force_refresh: If True, bypass any backend-level caching
            and fetch fresh data. Defaults to False.

    Returns:
        dict[str, Any] | None: The model reference data as a dictionary mapping
            model names to their attributes, or None if the category cannot be fetched.

    Implementation Requirements:
        - Return data as a dictionary: `{model_name: {attribute: value, ...}, ...}`
        - Return `None` if category cannot be fetched
        - Honor `force_refresh` to bypass internal caches
        - Handle errors gracefully (log and return `None`)

    Example Implementation:
        ```python
        def fetch_category(self, category, *, force_refresh=False):
            def fetch():
                # Your fetch logic here
                response = httpx.get(f"{self.base_url}/{category}")
                return response.json() if response.status_code == 200 else None

            return self._fetch_with_cache(category, fetch, force_refresh=force_refresh)
        ```

    See Also:
        - [fetch_all_categories()][(c).fetch_all_categories]: Batch fetching of all categories
        - [fetch_category_async()][(c).fetch_category_async]: Async variant
        - [ReplicaBackendBase._fetch_with_cache()]
          [^^^.replica_backend_base.ReplicaBackendBase._fetch_with_cache]:
          Helper for cache management

    """

fetch_all_categories abstractmethod

fetch_all_categories(
    *, force_refresh: bool = False
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Fetch model reference data for all categories.

Parameters:

  • force_refresh (bool, default: False ) –

    If True, bypass any backend-level caching and fetch fresh data. Defaults to False.

Returns:

  • dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

    dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]: A mapping of categories to their model reference data. Categories that cannot be fetched have None values.

Implementation Requirements
  • Return a dictionary mapping each category to its data
  • Use None values for categories that cannot be fetched
  • Typically implemented as a loop over fetch_category()
Example Implementation
def fetch_all_categories(self, *, force_refresh=False):
    return {
        category: self.fetch_category(category, force_refresh=force_refresh)
        for category in MODEL_REFERENCE_CATEGORY
    }
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def fetch_all_categories(
    self,
    *,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Fetch model reference data for all categories.

    Args:
        force_refresh: If True, bypass any backend-level caching
            and fetch fresh data. Defaults to False.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]: A mapping of categories
            to their model reference data. Categories that cannot be fetched have None values.

    Implementation Requirements:
        - Return a dictionary mapping each category to its data
        - Use `None` values for categories that cannot be fetched
        - Typically implemented as a loop over [fetch_category()][(c).fetch_category]

    Example Implementation:
        ```python
        def fetch_all_categories(self, *, force_refresh=False):
            return {
                category: self.fetch_category(category, force_refresh=force_refresh)
                for category in MODEL_REFERENCE_CATEGORY
            }
        ```

    See Also:
        - [fetch_category()][(c).fetch_category]: Single category fetching
        - [fetch_all_categories_async()][(c).fetch_all_categories_async]: Async variant

    """

fetch_category_async abstractmethod async

fetch_category_async(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Asynchronously fetch model reference data for a specific category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • httpx_client (AsyncClient | None, default: None ) –

    An optional httpx async client for connection pooling.

  • force_refresh (bool, default: False ) –

    If True, bypass any backend-level caching. Defaults to False.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The model reference data, or None if fetch failed.

Implementation Requirements
  • Use async I/O where possible (network requests, file operations with aiofiles)
  • Accept optional httpx_client for connection pooling
  • Create temporary client if not provided
  • Same return format as synchronous version
  • Share cache with synchronous methods when using ReplicaBackendBase
Example Implementation
async def fetch_category_async(self, category, *, httpx_client=None, force_refresh=False):
    close_client = httpx_client is None
    if httpx_client is None:
        httpx_client = httpx.AsyncClient()

    try:
        response = await httpx_client.get(f"{self.base_url}/{category}")
        return response.json() if response.status_code == 200 else None
    finally:
        if close_client:
            await httpx_client.aclose()
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
async def fetch_category_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Asynchronously fetch model reference data for a specific category.

    Args:
        category: The category to fetch.
        httpx_client: An optional httpx async client for connection pooling.
        force_refresh: If True, bypass any backend-level caching. Defaults to False.

    Returns:
        dict[str, Any] | None: The model reference data, or None if fetch failed.

    Implementation Requirements:
        - Use async I/O where possible (network requests, file operations with aiofiles)
        - Accept optional `httpx_client` for connection pooling
        - Create temporary client if not provided
        - Same return format as synchronous version
        - Share cache with synchronous methods when using
          [ReplicaBackendBase][^^^.replica_backend_base.ReplicaBackendBase]

    Example Implementation:
        ```python
        async def fetch_category_async(self, category, *, httpx_client=None, force_refresh=False):
            close_client = httpx_client is None
            if httpx_client is None:
                httpx_client = httpx.AsyncClient()

            try:
                response = await httpx_client.get(f"{self.base_url}/{category}")
                return response.json() if response.status_code == 200 else None
            finally:
                if close_client:
                    await httpx_client.aclose()
        ```

    See Also:
        - [fetch_category()][(c).fetch_category]: Synchronous variant
        - [fetch_all_categories_async()][(c).fetch_all_categories_async]: Async batch fetching

    """

fetch_all_categories_async abstractmethod async

fetch_all_categories_async(
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Asynchronously fetch model reference data for all categories.

Parameters:

  • httpx_client (AsyncClient | None, default: None ) –

    An optional httpx async client for connection pooling.

  • force_refresh (bool, default: False ) –

    If True, bypass any backend-level caching. Defaults to False.

Returns:

Implementation Requirements
  • Use asyncio.gather() for concurrent fetching when possible
  • Share httpx_client across fetches for connection pooling
  • Same return format as synchronous version
Example Implementation
async def fetch_all_categories_async(self, *, httpx_client=None, force_refresh=False):
    close_client = httpx_client is None
    if httpx_client is None:
        httpx_client = httpx.AsyncClient()

    try:
        tasks = [
            self.fetch_category_async(cat, httpx_client=httpx_client, force_refresh=force_refresh)
            for cat in MODEL_REFERENCE_CATEGORY
        ]
        results = await asyncio.gather(*tasks)
        return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))
    finally:
        if close_client:
            await httpx_client.aclose()
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
async def fetch_all_categories_async(
    self,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Asynchronously fetch model reference data for all categories.

    Args:
        httpx_client: An optional httpx async client for connection pooling.
        force_refresh: If True, bypass any backend-level caching. Defaults to False.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]: A mapping of categories to their data.

    Implementation Requirements:
        - Use `asyncio.gather()` for concurrent fetching when possible
        - Share `httpx_client` across fetches for connection pooling
        - Same return format as synchronous version

    Example Implementation:
        ```python
        async def fetch_all_categories_async(self, *, httpx_client=None, force_refresh=False):
            close_client = httpx_client is None
            if httpx_client is None:
                httpx_client = httpx.AsyncClient()

            try:
                tasks = [
                    self.fetch_category_async(cat, httpx_client=httpx_client, force_refresh=force_refresh)
                    for cat in MODEL_REFERENCE_CATEGORY
                ]
                results = await asyncio.gather(*tasks)
                return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))
            finally:
                if close_client:
                    await httpx_client.aclose()
        ```

    See Also:
        - [fetch_all_categories()][(c).fetch_all_categories]: Synchronous variant
        - [fetch_category_async()][(c).fetch_category_async]: Async single category fetch

    """

register_invalidation_callback

register_invalidation_callback(
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None

Register a callback to be called when a category is invalidated.

This allows external components (like ModelReferenceManager) to be notified when cached data becomes stale and needs to be refreshed.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def register_invalidation_callback(
    self,
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None:
    """Register a callback to be called when a category is invalidated.

    This allows external components (like ModelReferenceManager) to be notified
    when cached data becomes stale and needs to be refreshed.

    Args:
        callback: Function to call with the invalidated category.

    """
    self._invalidation_callbacks.append(callback)
    logger.debug(f"Registered invalidation callback: {getattr(callback, '__name__', repr(callback))}")

_notify_invalidation

_notify_invalidation(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Notify all registered callbacks that a category has been invalidated.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def _notify_invalidation(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Notify all registered callbacks that a category has been invalidated.

    Args:
        category: The category that was invalidated.

    """
    for callback in self._invalidation_callbacks:
        try:
            callback(category)
        except Exception as e:
            cb_name = getattr(callback, "__name__", repr(callback))
            logger.error(f"Invalidation callback {cb_name} failed for {category}: {e}")

mark_stale

mark_stale(category: MODEL_REFERENCE_CATEGORY) -> None

Mark a category's data as stale, requiring refresh on next access.

This method calls the backend-specific implementation and then notifies all registered callbacks.

Parameters:

Implementation Note

The base class provides this public implementation. Subclasses should override _mark_stale_impl() instead of this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def mark_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Mark a category's data as stale, requiring refresh on next access.

    This method calls the backend-specific implementation and then notifies
    all registered callbacks.

    Args:
        category: The category to mark as stale.

    Implementation Note:
        The base class provides this public implementation. Subclasses should override
        [_mark_stale_impl()][(c)._mark_stale_impl]
        instead of this method.

    See Also:
        - [_mark_stale_impl()][(c)._mark_stale_impl]: Backend-specific staleness tracking
        - [register_invalidation_callback()][(c).register_invalidation_callback]:
          Register callbacks for invalidation events

    """
    self._mark_stale_impl(category)
    self._notify_invalidation(category)

get_category_file_path abstractmethod

get_category_file_path(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Get the file path for a category's data, if applicable.

Some backends (like file-based ones) have a physical file path associated with each category. Others (like database backends) may return None.

Parameters:

Returns:

  • Path | None

    Path | None: The file path, or None if not applicable for this backend.

Implementation Requirements
  • Return Path object for file-based backends
  • Return None for backends without file storage (HTTP-only, database, etc.)
Example Implementations
# File-based backend
def get_category_file_path(self, category):
    return self.base_path / f"{category.value}.json"

# HTTP-only backend
def get_category_file_path(self, category):
    return None
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Get the file path for a category's data, if applicable.

    Some backends (like file-based ones) have a physical file path associated
    with each category. Others (like database backends) may return None.

    Args:
        category: The category to get the path for.

    Returns:
        Path | None: The file path, or None if not applicable for this backend.

    Implementation Requirements:
        - Return `Path` object for file-based backends
        - Return `None` for backends without file storage (HTTP-only, database, etc.)

    Example Implementations:
        ```python
        # File-based backend
        def get_category_file_path(self, category):
            return self.base_path / f"{category.value}.json"

        # HTTP-only backend
        def get_category_file_path(self, category):
            return None
        ```

    See Also:
        - [get_all_category_file_paths()][(c).get_all_category_file_paths]: Get all file paths

    """

get_all_category_file_paths abstractmethod

get_all_category_file_paths() -> dict[
    MODEL_REFERENCE_CATEGORY, Path | None
]

Get file paths for all categories, if applicable.

Returns:

  • dict[MODEL_REFERENCE_CATEGORY, Path | None]

    dict[MODEL_REFERENCE_CATEGORY, Path | None]: Mapping of categories to their file paths. Returns None values for categories without file paths.

Implementation Requirements
  • Return dictionary with all categories
  • Use None values for categories without file paths
  • Typically implemented by iterating over categories and calling get_category_file_path()
Example Implementation
def get_all_category_file_paths(self):
    return {
        category: self.get_category_file_path(category)
        for category in MODEL_REFERENCE_CATEGORY
    }
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
    """Get file paths for all categories, if applicable.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, Path | None]: Mapping of categories to their file paths.
            Returns None values for categories without file paths.

    Implementation Requirements:
        - Return dictionary with all categories
        - Use `None` values for categories without file paths
        - Typically implemented by iterating over categories and calling `get_category_file_path()`

    Example Implementation:
        ```python
        def get_all_category_file_paths(self):
            return {
                category: self.get_category_file_path(category)
                for category in MODEL_REFERENCE_CATEGORY
            }
        ```

    See Also:
        - [get_category_file_path()][(c).get_category_file_path]: Get single category file path

    """

get_legacy_json abstractmethod

get_legacy_json(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None

Get raw legacy JSON for a specific category without pydantic validation.

This method returns cached legacy format JSON data, downloading if needed. The cache is populated during initialization, downloads, and on-demand loads.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Category to retrieve.

  • redownload (bool, default: False ) –

    If True, redownload before returning and refresh cache.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The raw legacy JSON dict, or None if not found.

Implementation Requirements
  • Return legacy format data as dictionary
  • Support caching with optional redownload
  • Return None if not available
  • The redownload parameter is analogous to force_refresh in fetch methods
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def get_legacy_json(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None:
    """Get raw legacy JSON for a specific category without pydantic validation.

    This method returns cached legacy format JSON data, downloading if needed.
    The cache is populated during initialization, downloads, and on-demand loads.

    Args:
        category: Category to retrieve.
        redownload: If True, redownload before returning and refresh cache.

    Returns:
        dict[str, Any] | None: The raw legacy JSON dict, or None if not found.

    Implementation Requirements:
        - Return legacy format data as dictionary
        - Support caching with optional redownload
        - Return `None` if not available
        - The `redownload` parameter is analogous to `force_refresh` in fetch methods

    See Also:
        - [get_legacy_json_string()][(c).get_legacy_json_string]: Get as string instead of dict

    """

get_legacy_json_string abstractmethod

get_legacy_json_string(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None

Get raw legacy JSON string for a specific category without pydantic validation.

This method returns cached legacy format JSON data as a string, downloading if needed. The cache is populated during initialization, downloads, and on-demand loads.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Category to retrieve.

  • redownload (bool, default: False ) –

    If True, redownload before returning and refresh cache.

Returns:

  • str | None

    str | None: The raw legacy JSON string, or None if not found.

Implementation Requirements
  • Return legacy format data as JSON string
  • Same caching semantics as get_legacy_json()
  • Return None if not available
See Also
Source code in src/horde_model_reference/backends/base.py
@abstractmethod
def get_legacy_json_string(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None:
    """Get raw legacy JSON string for a specific category without pydantic validation.

    This method returns cached legacy format JSON data as a string, downloading if needed.
    The cache is populated during initialization, downloads, and on-demand loads.

    Args:
        category: Category to retrieve.
        redownload: If True, redownload before returning and refresh cache.

    Returns:
        str | None: The raw legacy JSON string, or None if not found.

    Implementation Requirements:
        - Return legacy format data as JSON string
        - Same caching semantics as [get_legacy_json()][(c).get_legacy_json]
        - Return `None` if not available

    See Also:
        - [get_legacy_json()][(c).get_legacy_json]: Get as dict instead of string

    """

support_any_writes

support_any_writes() -> bool

Check if this backend supports any write operations (v2 or legacy).

Returns:

  • bool ( bool ) –

    True if any write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def support_any_writes(self) -> bool:
    """Check if this backend supports any write operations (v2 or legacy).

    Returns:
        bool: True if any write operations are supported, False otherwise.

    """
    return self.supports_writes() or self.supports_legacy_writes()

supports_writes

supports_writes() -> bool

Check if this backend supports write operations (v2 format).

Write operations include update_model() and delete_model(). Typically only PRIMARY mode backends support writes.

Returns:

  • bool ( bool ) –

    True if write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_writes(self) -> bool:
    """Check if this backend supports write operations (v2 format).

    Write operations include update_model() and delete_model().
    Typically only PRIMARY mode backends support writes.

    Returns:
        bool: True if write operations are supported, False otherwise.

    """
    return False

supports_legacy_writes

supports_legacy_writes() -> bool

Check if this backend supports write operations in legacy format.

Legacy write operations include update_model_legacy() and delete_model_legacy(). Only available when canonical_format='LEGACY' in PRIMARY mode.

Returns:

  • bool ( bool ) –

    True if legacy write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_legacy_writes(self) -> bool:
    """Check if this backend supports write operations in legacy format.

    Legacy write operations include update_model_legacy() and delete_model_legacy().
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Returns:
        bool: True if legacy write operations are supported, False otherwise.

    """
    return False

supports_cache_warming

supports_cache_warming() -> bool

Check if this backend supports cache warming operations.

Cache warming pre-populates the cache with data to improve initial request performance. Typically only backends with distributed caching (like Redis) support this.

Returns:

  • bool ( bool ) –

    True if cache warming is supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_cache_warming(self) -> bool:
    """Check if this backend supports cache warming operations.

    Cache warming pre-populates the cache with data to improve initial request performance.
    Typically only backends with distributed caching (like Redis) support this.

    Returns:
        bool: True if cache warming is supported, False otherwise.

    """
    return False

supports_health_checks

supports_health_checks() -> bool

Check if this backend supports health check operations.

Health checks verify that the backend's external dependencies (Redis, databases, etc.) are accessible and functioning correctly.

Returns:

  • bool ( bool ) –

    True if health checks are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_health_checks(self) -> bool:
    """Check if this backend supports health check operations.

    Health checks verify that the backend's external dependencies (Redis, databases, etc.)
    are accessible and functioning correctly.

    Returns:
        bool: True if health checks are supported, False otherwise.

    """
    return False

supports_statistics

supports_statistics() -> bool

Check if this backend supports statistics retrieval.

Statistics provide insights into backend performance, cache hits/misses, etc.

Returns:

  • bool ( bool ) –

    True if statistics are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_statistics(self) -> bool:
    """Check if this backend supports statistics retrieval.

    Statistics provide insights into backend performance, cache hits/misses, etc.

    Returns:
        bool: True if statistics are supported, False otherwise.

    """
    return False

update_model

update_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data as a dictionary.

Parameters:

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Requirements
  • Create model if it doesn't exist
  • Update model if it exists
  • Ensure atomic writes (use temp files with rename for file-based backends)
  • Call mark_stale() after successful write to invalidate cache
  • Override supports_writes() to return True
See Also
Source code in src/horde_model_reference/backends/base.py
def update_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data as a dictionary.

    Args:
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Requirements:
        - Create model if it doesn't exist
        - Update model if it exists
        - Ensure atomic writes (use temp files with rename for file-based backends)
        - Call [mark_stale()][(c).mark_stale] after successful write to invalidate cache
        - Override [supports_writes()][(c).supports_writes] to return `True`

    See Also:
        - [update_model_from_base_model()][(c).update_model_from_base_model]:
          Update from pydantic model (automatically provided)
        - [delete_model()][(c).delete_model]: Delete a model
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

update_model_from_base_model

update_model_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference from a pydantic BaseModel.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Note

The base class provides this implementation automatically. It: 1. Checks supports_writes() returns True 2. Converts the pydantic model to dict using model_dump(exclude_unset=True) 3. Calls update_model() with the dictionary

Backends that support writes typically don't need to override this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def update_model_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference from a pydantic BaseModel.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Note:
        The base class provides this implementation automatically. It:
        1. Checks [supports_writes()][(c).supports_writes] returns `True`
        2. Converts the pydantic model to dict using `model_dump(exclude_unset=True)`
        3. Calls [update_model()][(c).update_model] with the dictionary

        Backends that support writes typically don't need to override this method.

    See Also:
        - [update_model()][(c).update_model]: Update from dictionary (implement this)
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    if not self.supports_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model

delete_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

Parameters:

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Requirements
  • Raise KeyError if model doesn't exist
  • Ensure atomic writes
  • Call mark_stale() after successful delete to invalidate cache
  • Override supports_writes() to return True
See Also
Source code in src/horde_model_reference/backends/base.py
def delete_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.

    Args:
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.
        KeyError: If the model doesn't exist.

    Implementation Requirements:
        - Raise `KeyError` if model doesn't exist
        - Ensure atomic writes
        - Call [mark_stale()][(c).mark_stale] after successful delete to invalidate cache
        - Override [supports_writes()][(c).supports_writes] to return `True`

    See Also:
        - [update_model()][(c).update_model]: Update or create a model
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

update_model_legacy

update_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data in legacy format as a dictionary.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data in legacy format as a dictionary.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

update_model_legacy_from_base_model

update_model_legacy_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format from a pydantic BaseModel.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format from a pydantic BaseModel.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    if not self.supports_legacy_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model_legacy(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model_legacy

delete_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference from legacy format files.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category containing the model.

  • model_name (str) –

    The name of the model to delete.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def delete_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference from legacy format files.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

warm_cache

warm_cache() -> None

Pre-populate cache with all categories for faster initial requests.

This is an optional method that backends with cache warming support can implement. Backends without cache warming should leave the default implementation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def warm_cache(self) -> None:
    """Pre-populate cache with all categories for faster initial requests.

    This is an optional method that backends with cache warming support can implement.
    Backends without cache warming should leave the default implementation.

    Raises:
        NotImplementedError: If the backend does not support cache warming.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support cache warming")

warm_cache_async async

warm_cache_async() -> None

Asynchronously pre-populate cache with all categories for faster initial requests.

This is an optional method that backends with cache warming support can implement. Backends without cache warming should leave the default implementation.

Raises:

Source code in src/horde_model_reference/backends/base.py
async def warm_cache_async(self) -> None:
    """Asynchronously pre-populate cache with all categories for faster initial requests.

    This is an optional method that backends with cache warming support can implement.
    Backends without cache warming should leave the default implementation.

    Raises:
        NotImplementedError: If the backend does not support async cache warming.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async cache warming")

health_check

health_check() -> bool

Check the health of the backend's external dependencies.

This is an optional method that backends with health check support can implement. Backends without external dependencies should leave the default implementation.

Returns:

  • bool ( bool ) –

    True if healthy, False otherwise.

Raises:

Source code in src/horde_model_reference/backends/base.py
def health_check(self) -> bool:
    """Check the health of the backend's external dependencies.

    This is an optional method that backends with health check support can implement.
    Backends without external dependencies should leave the default implementation.

    Returns:
        bool: True if healthy, False otherwise.

    Raises:
        NotImplementedError: If the backend does not support health checks.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support health checks")

get_statistics

get_statistics() -> dict[str, Any]

Get backend performance and usage statistics.

This is an optional method that backends with statistics support can implement. The structure of returned statistics is backend-specific.

Returns:

  • dict[str, Any]

    dict[str, Any]: Backend-specific statistics.

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_statistics(self) -> dict[str, Any]:
    """Get backend performance and usage statistics.

    This is an optional method that backends with statistics support can implement.
    The structure of returned statistics is backend-specific.

    Returns:
        dict[str, Any]: Backend-specific statistics.

    Raises:
        NotImplementedError: If the backend does not support statistics.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support statistics")

get_replicate_mode

get_replicate_mode() -> ReplicateMode

Get the replication mode of this backend.

Returns:

  • ReplicateMode ( ReplicateMode ) –

    The replicate mode (PRIMARY or REPLICA).

Source code in src/horde_model_reference/backends/base.py
def get_replicate_mode(self) -> ReplicateMode:
    """Get the replication mode of this backend.

    Returns:
        ReplicateMode: The replicate mode (PRIMARY or REPLICA).

    """
    return self._replicate_mode

supports_metadata

supports_metadata() -> bool

Check if this backend supports metadata tracking.

Metadata tracking records operation counts, timestamps, and health metrics for both legacy (v1) and v2 format operations. Typically only PRIMARY mode backends support metadata tracking.

Returns:

  • bool ( bool ) –

    True if metadata tracking is supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_metadata(self) -> bool:
    """Check if this backend supports metadata tracking.

    Metadata tracking records operation counts, timestamps, and health metrics
    for both legacy (v1) and v2 format operations. Typically only PRIMARY mode
    backends support metadata tracking.

    Returns:
        bool: True if metadata tracking is supported, False otherwise.

    """
    return False

get_legacy_metadata

get_legacy_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get legacy format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get legacy format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_legacy_metadata_async async

get_legacy_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get legacy format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_legacy_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get legacy format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_metadata

get_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get v2 format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get v2 format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_metadata_async async

get_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get v2 format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get v2 format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_all_legacy_metadata

get_all_legacy_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get legacy format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get legacy format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_all_legacy_metadata_async async

get_all_legacy_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get legacy format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_all_legacy_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get legacy format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_all_metadata

get_all_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get v2 format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_all_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get v2 format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_all_metadata_async async

get_all_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get v2 format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_all_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get v2 format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")