Skip to content

http_backend

HTTP backend for REPLICA mode.

This backend fetches model references from the PRIMARY server's API, with fallback to GitHub if the PRIMARY is unavailable.

HTTPBackend

Bases: ReplicaBackendBase

Backend that fetches model references from PRIMARY API with GitHub fallback.

This backend is designed for REPLICA mode. It: 1. Attempts to fetch from PRIMARY server's HTTP API first 2. Falls back to GitHub if PRIMARY is unavailable 3. Caches responses locally with TTL 4. Only works in REPLICA mode

This provides the best of both worlds: - Fast, up-to-date data from PRIMARY when available - Resilience via GitHub fallback when PRIMARY is down

Source code in src/horde_model_reference/backends/http_backend.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
class HTTPBackend(ReplicaBackendBase):
    """Backend that fetches model references from PRIMARY API with GitHub fallback.

    This backend is designed for REPLICA mode. It:
    1. Attempts to fetch from PRIMARY server's HTTP API first
    2. Falls back to GitHub if PRIMARY is unavailable
    3. Caches responses locally with TTL
    4. Only works in REPLICA mode

    This provides the best of both worlds:
    - Fast, up-to-date data from PRIMARY when available
    - Resilience via GitHub fallback when PRIMARY is down
    """

    def __init__(
        self,
        *,
        primary_api_url: str,
        github_backend: GitHubBackend,
        cache_ttl_seconds: int = 60,
        timeout_seconds: int = horde_model_reference_settings.primary_api_timeout,
        retry_max_attempts: int = 3,
        retry_backoff_seconds: float = 1.0,
        enable_github_fallback: bool = horde_model_reference_settings.enable_github_fallback,
    ) -> None:
        """Initialize HTTP backend with GitHub fallback.

        Args:
            primary_api_url: Base URL of PRIMARY server API (e.g., "https://models.aihorde.net/")
            github_backend: GitHub backend to use as fallback
            cache_ttl_seconds: TTL for local cache in seconds
            timeout_seconds: HTTP request timeout in seconds
            retry_max_attempts: Max retry attempts for PRIMARY API
            retry_backoff_seconds: Backoff time between retries
            enable_github_fallback: Whether to fallback to GitHub if PRIMARY fails

        Raises:
            ValueError: If github_backend is not REPLICA mode

        """
        if github_backend.replicate_mode != ReplicateMode.REPLICA:
            raise ValueError("HTTPBackend requires a GitHubBackend in REPLICA mode as fallback")

        super().__init__(mode=ReplicateMode.REPLICA, cache_ttl_seconds=cache_ttl_seconds)

        self._primary_api_url = primary_api_url.rstrip("/")
        self._github_backend = github_backend
        self._timeout_seconds = timeout_seconds
        self._retry_max_attempts = retry_max_attempts
        self._retry_backoff_seconds = retry_backoff_seconds
        self._enable_github_fallback = enable_github_fallback

        self._primary_hits = 0
        self._github_fallbacks = 0

        logger.debug(f"HTTPBackend initialized with PRIMARY at {self._primary_api_url}")

    def _category_api_url(self, category: MODEL_REFERENCE_CATEGORY) -> str:
        """Get the PRIMARY API URL for a category."""
        return f"{self._primary_api_url}/model_references/v2/{category.value}"

    def _legacy_category_api_url(self, category: MODEL_REFERENCE_CATEGORY) -> str:
        """Get the legacy PRIMARY API URL for a category."""
        return f"{self._primary_api_url}/model_references/v1/{category.value}"

    def _fetch_from_primary(self, category: MODEL_REFERENCE_CATEGORY) -> dict[str, Any] | None:
        """Fetch from PRIMARY API with retries (synchronous)."""
        url = self._category_api_url(category)

        try:
            for attempt in http_retry_sync(
                max_attempts=self._retry_max_attempts, min_wait=self._retry_backoff_seconds
            ):
                with attempt:
                    response = httpx.get(url, timeout=self._timeout_seconds)

                    if response.status_code == 404:
                        logger.debug(f"PRIMARY API returned 404 for {category}")
                        return None
                    if is_retryable_status_code(response.status_code):
                        raise RetryableHTTPStatusError(response)
                    if response.status_code != 200:
                        logger.warning(f"PRIMARY API returned {response.status_code} for {category}")
                        return None

                    data: dict[str, Any] = response.json()
                    logger.info(f"Fetched {category} from PRIMARY API")
                    self._primary_hits += 1
                    return data
        except RetryError:
            logger.warning(f"Failed to fetch {category} from PRIMARY after {self._retry_max_attempts} attempts")
        except RetryableHTTPStatusError:
            logger.warning(f"Failed to fetch {category} from PRIMARY after {self._retry_max_attempts} attempts")
        return None

    def _fetch_legacy_from_primary(
        self,
        category: MODEL_REFERENCE_CATEGORY,
    ) -> tuple[dict[str, Any] | None, str | None]:
        """Fetch legacy JSON from PRIMARY API with retries (synchronous).

        Returns:
            tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) on failure

        """
        url = self._legacy_category_api_url(category)

        try:
            for attempt in http_retry_sync(
                max_attempts=self._retry_max_attempts, min_wait=self._retry_backoff_seconds
            ):
                with attempt:
                    response = httpx.get(url, timeout=self._timeout_seconds)

                    if response.status_code == 404:
                        logger.debug(f"PRIMARY API returned 404 for legacy {category}")
                        return None, None
                    if is_retryable_status_code(response.status_code):
                        raise RetryableHTTPStatusError(response)
                    if response.status_code != 200:
                        logger.warning(f"PRIMARY API returned {response.status_code} for legacy {category}")
                        return None, None

                    legacy_string = response.text
                    legacy_dict: dict[str, Any] = response.json()
                    logger.info(f"Fetched legacy {category} from PRIMARY API")
                    self._primary_hits += 1
                    return legacy_dict, legacy_string
        except RetryError:
            logger.warning(f"Failed to fetch legacy {category} from PRIMARY after {self._retry_max_attempts} attempts")
        except RetryableHTTPStatusError:
            logger.warning(f"Failed to fetch legacy {category} from PRIMARY after {self._retry_max_attempts} attempts")
        return None, None

    async def _fetch_from_primary_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        client: httpx.AsyncClient,
    ) -> dict[str, Any] | None:
        """Fetch from PRIMARY API with retries (asynchronous)."""
        url = self._category_api_url(category)

        try:
            async for attempt in http_retry_async(
                max_attempts=self._retry_max_attempts, min_wait=self._retry_backoff_seconds
            ):
                with attempt:
                    response = await client.get(url, timeout=self._timeout_seconds)

                    if response.status_code == 404:
                        logger.debug(f"PRIMARY API returned 404 for {category}")
                        return None
                    if is_retryable_status_code(response.status_code):
                        raise RetryableHTTPStatusError(response)
                    if response.status_code != 200:
                        logger.warning(f"PRIMARY API returned {response.status_code} for {category}")
                        return None

                    data: dict[str, Any] = response.json()
                    logger.info(f"Fetched {category} from PRIMARY API (async)")
                    self._primary_hits += 1
                    return data
        except RetryError:
            logger.warning(f"Failed to fetch {category} from PRIMARY async after {self._retry_max_attempts} attempts")
        except RetryableHTTPStatusError:
            logger.warning(f"Failed to fetch {category} from PRIMARY async after {self._retry_max_attempts} attempts")
        return None

    async def _fetch_legacy_from_primary_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        client: httpx.AsyncClient,
    ) -> tuple[dict[str, Any] | None, str | None]:
        """Fetch legacy JSON from PRIMARY API with retries (asynchronous).

        Returns:
            tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) on failure

        """
        url = self._legacy_category_api_url(category)

        try:
            async for attempt in http_retry_async(
                max_attempts=self._retry_max_attempts, min_wait=self._retry_backoff_seconds
            ):
                with attempt:
                    response = await client.get(url, timeout=self._timeout_seconds)

                    if response.status_code == 404:
                        logger.debug(f"PRIMARY API returned 404 for legacy {category}")
                        return None, None
                    if is_retryable_status_code(response.status_code):
                        raise RetryableHTTPStatusError(response)
                    if response.status_code != 200:
                        logger.warning(f"PRIMARY API returned {response.status_code} for legacy {category}")
                        return None, None

                    legacy_string = response.text
                    legacy_dict: dict[str, Any] = response.json()
                    logger.info(f"Fetched legacy {category} from PRIMARY API (async)")
                    self._primary_hits += 1
                    return legacy_dict, legacy_string
        except RetryError:
            logger.warning(
                f"Failed to fetch legacy {category} from PRIMARY async after {self._retry_max_attempts} attempts"
            )
        except RetryableHTTPStatusError:
            logger.warning(
                f"Failed to fetch legacy {category} from PRIMARY async after {self._retry_max_attempts} attempts"
            )
        return None, None

    @override
    def fetch_category(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Fetch from PRIMARY API, fallback to GitHub if needed.

        Args:
            category: The category to fetch
            force_refresh: If True, bypass local cache

        Returns:
            Model reference data or None

        """
        # Use helper to determine if we need to fetch
        if force_refresh or self.should_fetch_data(category):
            data = self._fetch_from_primary(category)

            if data is None and self._enable_github_fallback:
                logger.info(f"Falling back to GitHub for {category}")
                self._github_fallbacks += 1
                data = self._github_backend.fetch_category(category, force_refresh=force_refresh)

            if data is not None:
                self._store_in_cache(category, data)

            return data

        # Return cached data
        return self._get_from_cache(category)

    @override
    def fetch_all_categories(
        self,
        *,
        force_refresh: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Fetch all categories from PRIMARY API with GitHub fallback."""
        result: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}

        for category in MODEL_REFERENCE_CATEGORY:
            result[category] = self.fetch_category(category, force_refresh=force_refresh)

        return result

    @override
    async def fetch_category_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        httpx_client: httpx.AsyncClient | None = None,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Asynchronously fetch from PRIMARY API with GitHub fallback.

        Args:
            category: The category to fetch
            httpx_client: Optional httpx AsyncClient to use for requests
            force_refresh: If True, bypass local cache

        Returns:
            Model reference data or None

        """
        # Use helper to determine if we need to fetch
        if force_refresh or self.should_fetch_data(category):
            if httpx_client is None:
                logger.debug("Creating temporary httpx.AsyncClient for fetch_category_async")

            if httpx_client is not None:
                data = await self._fetch_from_primary_async(category, httpx_client)
            else:
                async with httpx.AsyncClient() as client:
                    data = await self._fetch_from_primary_async(category, client)

            if data is None and self._enable_github_fallback:
                logger.info(f"Falling back to GitHub for {category} (async)")
                self._github_fallbacks += 1
                data = await self._github_backend.fetch_category_async(
                    category,
                    httpx_client=httpx_client,
                    force_refresh=force_refresh,
                )

            if data is not None:
                self._store_in_cache(category, data)

            return data

        # Return cached data
        return self._get_from_cache(category)

    @override
    async def fetch_all_categories_async(
        self,
        *,
        httpx_client: httpx.AsyncClient | None = None,
        force_refresh: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Asynchronously fetch all categories."""
        import asyncio

        tasks = [
            self.fetch_category_async(
                category,
                httpx_client=httpx_client,
                force_refresh=force_refresh,
            )
            for category in MODEL_REFERENCE_CATEGORY
        ]

        results = await asyncio.gather(*tasks)

        return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))

    @override
    def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
        """Get file path (delegates to GitHub backend)."""
        return self._github_backend.get_category_file_path(category)

    @override
    def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
        """Get all file paths (delegates to GitHub backend)."""
        return self._github_backend.get_all_category_file_paths()

    @override
    def get_legacy_json(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        redownload: bool = False,
    ) -> dict[str, Any] | None:
        """Get legacy JSON from PRIMARY API with GitHub fallback.

        Args:
            category: The category to fetch
            redownload: If True, bypass cache and force refetch

        Returns:
            Legacy JSON dict or None

        """
        # Check cache first unless redownload
        if not redownload:
            legacy_dict, _ = self._get_legacy_from_cache(category)
            if legacy_dict is not None:
                return legacy_dict

        # Fetch from PRIMARY API
        legacy_dict, legacy_string = self._fetch_legacy_from_primary(category)

        # Fallback to GitHub if PRIMARY fails
        if legacy_dict is None and self._enable_github_fallback:
            logger.info(f"Falling back to GitHub for legacy {category}")
            self._github_fallbacks += 1
            legacy_dict = self._github_backend.get_legacy_json(category, redownload=redownload)
            # GitHub backend may return string separately, get it if available
            if legacy_dict is not None:
                legacy_string = self._github_backend.get_legacy_json_string(category, redownload=False)

        # Store in cache
        if legacy_dict is not None or legacy_string is not None:
            self._store_legacy_in_cache(category, legacy_dict, legacy_string)

        return legacy_dict

    @override
    def get_legacy_json_string(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        redownload: bool = False,
    ) -> str | None:
        """Get legacy JSON string from PRIMARY API with GitHub fallback.

        Args:
            category: The category to fetch
            redownload: If True, bypass cache and force refetch

        Returns:
            Legacy JSON string or None

        """
        # Check cache first unless redownload
        if not redownload:
            _, legacy_string = self._get_legacy_from_cache(category)
            if legacy_string is not None:
                return legacy_string

        # Fetch from PRIMARY API
        legacy_dict, legacy_string = self._fetch_legacy_from_primary(category)

        # Fallback to GitHub if PRIMARY fails
        if legacy_string is None and self._enable_github_fallback:
            logger.info(f"Falling back to GitHub for legacy {category} string")
            self._github_fallbacks += 1
            legacy_string = self._github_backend.get_legacy_json_string(category, redownload=redownload)
            # GitHub backend may return dict separately, get it if available
            if legacy_string is not None and legacy_dict is None:
                legacy_dict = self._github_backend.get_legacy_json(category, redownload=False)

        # Store in cache
        if legacy_dict is not None or legacy_string is not None:
            self._store_legacy_in_cache(category, legacy_dict, legacy_string)

        return legacy_string

    @override
    def get_statistics(self) -> dict[str, Any]:
        """Get backend statistics.

        Returns:
            dict containing:
                - primary_hits: Number of successful PRIMARY API fetches
                - github_fallbacks: Number of times GitHub fallback was used
                - cache_size: Number of categories in local cache

        """
        return {
            "primary_hits": self._primary_hits,
            "github_fallbacks": self._github_fallbacks,
            "cache_size": len(self._cache),
        }

_primary_api_url instance-attribute

_primary_api_url = rstrip('/')

_github_backend instance-attribute

_github_backend = github_backend

_timeout_seconds instance-attribute

_timeout_seconds = timeout_seconds

_retry_max_attempts instance-attribute

_retry_max_attempts = retry_max_attempts

_retry_backoff_seconds instance-attribute

_retry_backoff_seconds = retry_backoff_seconds

_enable_github_fallback instance-attribute

_enable_github_fallback = enable_github_fallback

_primary_hits instance-attribute

_primary_hits = 0

_github_fallbacks instance-attribute

_github_fallbacks = 0

_replicate_mode class-attribute instance-attribute

_replicate_mode = mode

_invalidation_callbacks instance-attribute

_invalidation_callbacks: list[
    Callable[[MODEL_REFERENCE_CATEGORY], None]
] = []

replicate_mode property

replicate_mode: ReplicateMode

Get the replicate mode of this backend.

_cache_ttl_seconds instance-attribute

_cache_ttl_seconds = cache_ttl_seconds

_cache instance-attribute

_cache: dict[
    MODEL_REFERENCE_CATEGORY, dict[str, Any] | None
] = {}

_category_timestamps instance-attribute

_category_timestamps: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_last_known_mtimes instance-attribute

_last_known_mtimes: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_stale_categories instance-attribute

_stale_categories: set[MODEL_REFERENCE_CATEGORY] = set()

_legacy_json_cache instance-attribute

_legacy_json_cache: dict[
    MODEL_REFERENCE_CATEGORY, dict[str, Any] | None
] = {}

_legacy_json_string_cache instance-attribute

_legacy_json_string_cache: dict[
    MODEL_REFERENCE_CATEGORY, str | None
] = {}

_legacy_cache_timestamps instance-attribute

_legacy_cache_timestamps: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_legacy_last_known_mtimes instance-attribute

_legacy_last_known_mtimes: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_stale_legacy_categories instance-attribute

_stale_legacy_categories: set[MODEL_REFERENCE_CATEGORY] = (
    set()
)

_lock instance-attribute

_lock = RLock()

_async_lock instance-attribute

_async_lock: Lock = Lock()

cache_ttl_seconds property

cache_ttl_seconds: int | None

The cache TTL currently enforced for category payloads.

lock property

lock: RLock

Thread-safe lock shared by subclasses for critical sections.

async_lock property

async_lock: Lock | None

Asyncio lock usable by subclasses when coordinating coroutines.

__init__

__init__(
    *,
    primary_api_url: str,
    github_backend: GitHubBackend,
    cache_ttl_seconds: int = 60,
    timeout_seconds: int = horde_model_reference_settings.primary_api_timeout,
    retry_max_attempts: int = 3,
    retry_backoff_seconds: float = 1.0,
    enable_github_fallback: bool = horde_model_reference_settings.enable_github_fallback,
) -> None

Initialize HTTP backend with GitHub fallback.

Parameters:

  • primary_api_url (str) –

    Base URL of PRIMARY server API (e.g., "https://models.aihorde.net/")

  • github_backend (GitHubBackend) –

    GitHub backend to use as fallback

  • cache_ttl_seconds (int, default: 60 ) –

    TTL for local cache in seconds

  • timeout_seconds (int, default: primary_api_timeout ) –

    HTTP request timeout in seconds

  • retry_max_attempts (int, default: 3 ) –

    Max retry attempts for PRIMARY API

  • retry_backoff_seconds (float, default: 1.0 ) –

    Backoff time between retries

  • enable_github_fallback (bool, default: enable_github_fallback ) –

    Whether to fallback to GitHub if PRIMARY fails

Raises:

  • ValueError

    If github_backend is not REPLICA mode

Source code in src/horde_model_reference/backends/http_backend.py
def __init__(
    self,
    *,
    primary_api_url: str,
    github_backend: GitHubBackend,
    cache_ttl_seconds: int = 60,
    timeout_seconds: int = horde_model_reference_settings.primary_api_timeout,
    retry_max_attempts: int = 3,
    retry_backoff_seconds: float = 1.0,
    enable_github_fallback: bool = horde_model_reference_settings.enable_github_fallback,
) -> None:
    """Initialize HTTP backend with GitHub fallback.

    Args:
        primary_api_url: Base URL of PRIMARY server API (e.g., "https://models.aihorde.net/")
        github_backend: GitHub backend to use as fallback
        cache_ttl_seconds: TTL for local cache in seconds
        timeout_seconds: HTTP request timeout in seconds
        retry_max_attempts: Max retry attempts for PRIMARY API
        retry_backoff_seconds: Backoff time between retries
        enable_github_fallback: Whether to fallback to GitHub if PRIMARY fails

    Raises:
        ValueError: If github_backend is not REPLICA mode

    """
    if github_backend.replicate_mode != ReplicateMode.REPLICA:
        raise ValueError("HTTPBackend requires a GitHubBackend in REPLICA mode as fallback")

    super().__init__(mode=ReplicateMode.REPLICA, cache_ttl_seconds=cache_ttl_seconds)

    self._primary_api_url = primary_api_url.rstrip("/")
    self._github_backend = github_backend
    self._timeout_seconds = timeout_seconds
    self._retry_max_attempts = retry_max_attempts
    self._retry_backoff_seconds = retry_backoff_seconds
    self._enable_github_fallback = enable_github_fallback

    self._primary_hits = 0
    self._github_fallbacks = 0

    logger.debug(f"HTTPBackend initialized with PRIMARY at {self._primary_api_url}")

_category_api_url

_category_api_url(
    category: MODEL_REFERENCE_CATEGORY,
) -> str

Get the PRIMARY API URL for a category.

Source code in src/horde_model_reference/backends/http_backend.py
def _category_api_url(self, category: MODEL_REFERENCE_CATEGORY) -> str:
    """Get the PRIMARY API URL for a category."""
    return f"{self._primary_api_url}/model_references/v2/{category.value}"

_legacy_category_api_url

_legacy_category_api_url(
    category: MODEL_REFERENCE_CATEGORY,
) -> str

Get the legacy PRIMARY API URL for a category.

Source code in src/horde_model_reference/backends/http_backend.py
def _legacy_category_api_url(self, category: MODEL_REFERENCE_CATEGORY) -> str:
    """Get the legacy PRIMARY API URL for a category."""
    return f"{self._primary_api_url}/model_references/v1/{category.value}"

_fetch_from_primary

_fetch_from_primary(
    category: MODEL_REFERENCE_CATEGORY,
) -> dict[str, Any] | None

Fetch from PRIMARY API with retries (synchronous).

Source code in src/horde_model_reference/backends/http_backend.py
def _fetch_from_primary(self, category: MODEL_REFERENCE_CATEGORY) -> dict[str, Any] | None:
    """Fetch from PRIMARY API with retries (synchronous)."""
    url = self._category_api_url(category)

    try:
        for attempt in http_retry_sync(
            max_attempts=self._retry_max_attempts, min_wait=self._retry_backoff_seconds
        ):
            with attempt:
                response = httpx.get(url, timeout=self._timeout_seconds)

                if response.status_code == 404:
                    logger.debug(f"PRIMARY API returned 404 for {category}")
                    return None
                if is_retryable_status_code(response.status_code):
                    raise RetryableHTTPStatusError(response)
                if response.status_code != 200:
                    logger.warning(f"PRIMARY API returned {response.status_code} for {category}")
                    return None

                data: dict[str, Any] = response.json()
                logger.info(f"Fetched {category} from PRIMARY API")
                self._primary_hits += 1
                return data
    except RetryError:
        logger.warning(f"Failed to fetch {category} from PRIMARY after {self._retry_max_attempts} attempts")
    except RetryableHTTPStatusError:
        logger.warning(f"Failed to fetch {category} from PRIMARY after {self._retry_max_attempts} attempts")
    return None

_fetch_legacy_from_primary

_fetch_legacy_from_primary(
    category: MODEL_REFERENCE_CATEGORY,
) -> tuple[dict[str, Any] | None, str | None]

Fetch legacy JSON from PRIMARY API with retries (synchronous).

Returns:

  • tuple[dict[str, Any] | None, str | None]

    tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) on failure

Source code in src/horde_model_reference/backends/http_backend.py
def _fetch_legacy_from_primary(
    self,
    category: MODEL_REFERENCE_CATEGORY,
) -> tuple[dict[str, Any] | None, str | None]:
    """Fetch legacy JSON from PRIMARY API with retries (synchronous).

    Returns:
        tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) on failure

    """
    url = self._legacy_category_api_url(category)

    try:
        for attempt in http_retry_sync(
            max_attempts=self._retry_max_attempts, min_wait=self._retry_backoff_seconds
        ):
            with attempt:
                response = httpx.get(url, timeout=self._timeout_seconds)

                if response.status_code == 404:
                    logger.debug(f"PRIMARY API returned 404 for legacy {category}")
                    return None, None
                if is_retryable_status_code(response.status_code):
                    raise RetryableHTTPStatusError(response)
                if response.status_code != 200:
                    logger.warning(f"PRIMARY API returned {response.status_code} for legacy {category}")
                    return None, None

                legacy_string = response.text
                legacy_dict: dict[str, Any] = response.json()
                logger.info(f"Fetched legacy {category} from PRIMARY API")
                self._primary_hits += 1
                return legacy_dict, legacy_string
    except RetryError:
        logger.warning(f"Failed to fetch legacy {category} from PRIMARY after {self._retry_max_attempts} attempts")
    except RetryableHTTPStatusError:
        logger.warning(f"Failed to fetch legacy {category} from PRIMARY after {self._retry_max_attempts} attempts")
    return None, None

_fetch_from_primary_async async

_fetch_from_primary_async(
    category: MODEL_REFERENCE_CATEGORY, client: AsyncClient
) -> dict[str, Any] | None

Fetch from PRIMARY API with retries (asynchronous).

Source code in src/horde_model_reference/backends/http_backend.py
async def _fetch_from_primary_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    client: httpx.AsyncClient,
) -> dict[str, Any] | None:
    """Fetch from PRIMARY API with retries (asynchronous)."""
    url = self._category_api_url(category)

    try:
        async for attempt in http_retry_async(
            max_attempts=self._retry_max_attempts, min_wait=self._retry_backoff_seconds
        ):
            with attempt:
                response = await client.get(url, timeout=self._timeout_seconds)

                if response.status_code == 404:
                    logger.debug(f"PRIMARY API returned 404 for {category}")
                    return None
                if is_retryable_status_code(response.status_code):
                    raise RetryableHTTPStatusError(response)
                if response.status_code != 200:
                    logger.warning(f"PRIMARY API returned {response.status_code} for {category}")
                    return None

                data: dict[str, Any] = response.json()
                logger.info(f"Fetched {category} from PRIMARY API (async)")
                self._primary_hits += 1
                return data
    except RetryError:
        logger.warning(f"Failed to fetch {category} from PRIMARY async after {self._retry_max_attempts} attempts")
    except RetryableHTTPStatusError:
        logger.warning(f"Failed to fetch {category} from PRIMARY async after {self._retry_max_attempts} attempts")
    return None

_fetch_legacy_from_primary_async async

_fetch_legacy_from_primary_async(
    category: MODEL_REFERENCE_CATEGORY, client: AsyncClient
) -> tuple[dict[str, Any] | None, str | None]

Fetch legacy JSON from PRIMARY API with retries (asynchronous).

Returns:

  • tuple[dict[str, Any] | None, str | None]

    tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) on failure

Source code in src/horde_model_reference/backends/http_backend.py
async def _fetch_legacy_from_primary_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    client: httpx.AsyncClient,
) -> tuple[dict[str, Any] | None, str | None]:
    """Fetch legacy JSON from PRIMARY API with retries (asynchronous).

    Returns:
        tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) on failure

    """
    url = self._legacy_category_api_url(category)

    try:
        async for attempt in http_retry_async(
            max_attempts=self._retry_max_attempts, min_wait=self._retry_backoff_seconds
        ):
            with attempt:
                response = await client.get(url, timeout=self._timeout_seconds)

                if response.status_code == 404:
                    logger.debug(f"PRIMARY API returned 404 for legacy {category}")
                    return None, None
                if is_retryable_status_code(response.status_code):
                    raise RetryableHTTPStatusError(response)
                if response.status_code != 200:
                    logger.warning(f"PRIMARY API returned {response.status_code} for legacy {category}")
                    return None, None

                legacy_string = response.text
                legacy_dict: dict[str, Any] = response.json()
                logger.info(f"Fetched legacy {category} from PRIMARY API (async)")
                self._primary_hits += 1
                return legacy_dict, legacy_string
    except RetryError:
        logger.warning(
            f"Failed to fetch legacy {category} from PRIMARY async after {self._retry_max_attempts} attempts"
        )
    except RetryableHTTPStatusError:
        logger.warning(
            f"Failed to fetch legacy {category} from PRIMARY async after {self._retry_max_attempts} attempts"
        )
    return None, None

fetch_category

fetch_category(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Fetch from PRIMARY API, fallback to GitHub if needed.

Parameters:

Returns:

  • dict[str, Any] | None

    Model reference data or None

Source code in src/horde_model_reference/backends/http_backend.py
@override
def fetch_category(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Fetch from PRIMARY API, fallback to GitHub if needed.

    Args:
        category: The category to fetch
        force_refresh: If True, bypass local cache

    Returns:
        Model reference data or None

    """
    # Use helper to determine if we need to fetch
    if force_refresh or self.should_fetch_data(category):
        data = self._fetch_from_primary(category)

        if data is None and self._enable_github_fallback:
            logger.info(f"Falling back to GitHub for {category}")
            self._github_fallbacks += 1
            data = self._github_backend.fetch_category(category, force_refresh=force_refresh)

        if data is not None:
            self._store_in_cache(category, data)

        return data

    # Return cached data
    return self._get_from_cache(category)

fetch_all_categories

fetch_all_categories(
    *, force_refresh: bool = False
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Fetch all categories from PRIMARY API with GitHub fallback.

Source code in src/horde_model_reference/backends/http_backend.py
@override
def fetch_all_categories(
    self,
    *,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Fetch all categories from PRIMARY API with GitHub fallback."""
    result: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}

    for category in MODEL_REFERENCE_CATEGORY:
        result[category] = self.fetch_category(category, force_refresh=force_refresh)

    return result

fetch_category_async async

fetch_category_async(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Asynchronously fetch from PRIMARY API with GitHub fallback.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch

  • httpx_client (AsyncClient | None, default: None ) –

    Optional httpx AsyncClient to use for requests

  • force_refresh (bool, default: False ) –

    If True, bypass local cache

Returns:

  • dict[str, Any] | None

    Model reference data or None

Source code in src/horde_model_reference/backends/http_backend.py
@override
async def fetch_category_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Asynchronously fetch from PRIMARY API with GitHub fallback.

    Args:
        category: The category to fetch
        httpx_client: Optional httpx AsyncClient to use for requests
        force_refresh: If True, bypass local cache

    Returns:
        Model reference data or None

    """
    # Use helper to determine if we need to fetch
    if force_refresh or self.should_fetch_data(category):
        if httpx_client is None:
            logger.debug("Creating temporary httpx.AsyncClient for fetch_category_async")

        if httpx_client is not None:
            data = await self._fetch_from_primary_async(category, httpx_client)
        else:
            async with httpx.AsyncClient() as client:
                data = await self._fetch_from_primary_async(category, client)

        if data is None and self._enable_github_fallback:
            logger.info(f"Falling back to GitHub for {category} (async)")
            self._github_fallbacks += 1
            data = await self._github_backend.fetch_category_async(
                category,
                httpx_client=httpx_client,
                force_refresh=force_refresh,
            )

        if data is not None:
            self._store_in_cache(category, data)

        return data

    # Return cached data
    return self._get_from_cache(category)

fetch_all_categories_async async

fetch_all_categories_async(
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Asynchronously fetch all categories.

Source code in src/horde_model_reference/backends/http_backend.py
@override
async def fetch_all_categories_async(
    self,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Asynchronously fetch all categories."""
    import asyncio

    tasks = [
        self.fetch_category_async(
            category,
            httpx_client=httpx_client,
            force_refresh=force_refresh,
        )
        for category in MODEL_REFERENCE_CATEGORY
    ]

    results = await asyncio.gather(*tasks)

    return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))

get_category_file_path

get_category_file_path(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Get file path (delegates to GitHub backend).

Source code in src/horde_model_reference/backends/http_backend.py
@override
def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Get file path (delegates to GitHub backend)."""
    return self._github_backend.get_category_file_path(category)

get_all_category_file_paths

get_all_category_file_paths() -> dict[
    MODEL_REFERENCE_CATEGORY, Path | None
]

Get all file paths (delegates to GitHub backend).

Source code in src/horde_model_reference/backends/http_backend.py
@override
def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
    """Get all file paths (delegates to GitHub backend)."""
    return self._github_backend.get_all_category_file_paths()

get_legacy_json

get_legacy_json(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None

Get legacy JSON from PRIMARY API with GitHub fallback.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch

  • redownload (bool, default: False ) –

    If True, bypass cache and force refetch

Returns:

  • dict[str, Any] | None

    Legacy JSON dict or None

Source code in src/horde_model_reference/backends/http_backend.py
@override
def get_legacy_json(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None:
    """Get legacy JSON from PRIMARY API with GitHub fallback.

    Args:
        category: The category to fetch
        redownload: If True, bypass cache and force refetch

    Returns:
        Legacy JSON dict or None

    """
    # Check cache first unless redownload
    if not redownload:
        legacy_dict, _ = self._get_legacy_from_cache(category)
        if legacy_dict is not None:
            return legacy_dict

    # Fetch from PRIMARY API
    legacy_dict, legacy_string = self._fetch_legacy_from_primary(category)

    # Fallback to GitHub if PRIMARY fails
    if legacy_dict is None and self._enable_github_fallback:
        logger.info(f"Falling back to GitHub for legacy {category}")
        self._github_fallbacks += 1
        legacy_dict = self._github_backend.get_legacy_json(category, redownload=redownload)
        # GitHub backend may return string separately, get it if available
        if legacy_dict is not None:
            legacy_string = self._github_backend.get_legacy_json_string(category, redownload=False)

    # Store in cache
    if legacy_dict is not None or legacy_string is not None:
        self._store_legacy_in_cache(category, legacy_dict, legacy_string)

    return legacy_dict

get_legacy_json_string

get_legacy_json_string(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None

Get legacy JSON string from PRIMARY API with GitHub fallback.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch

  • redownload (bool, default: False ) –

    If True, bypass cache and force refetch

Returns:

  • str | None

    Legacy JSON string or None

Source code in src/horde_model_reference/backends/http_backend.py
@override
def get_legacy_json_string(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None:
    """Get legacy JSON string from PRIMARY API with GitHub fallback.

    Args:
        category: The category to fetch
        redownload: If True, bypass cache and force refetch

    Returns:
        Legacy JSON string or None

    """
    # Check cache first unless redownload
    if not redownload:
        _, legacy_string = self._get_legacy_from_cache(category)
        if legacy_string is not None:
            return legacy_string

    # Fetch from PRIMARY API
    legacy_dict, legacy_string = self._fetch_legacy_from_primary(category)

    # Fallback to GitHub if PRIMARY fails
    if legacy_string is None and self._enable_github_fallback:
        logger.info(f"Falling back to GitHub for legacy {category} string")
        self._github_fallbacks += 1
        legacy_string = self._github_backend.get_legacy_json_string(category, redownload=redownload)
        # GitHub backend may return dict separately, get it if available
        if legacy_string is not None and legacy_dict is None:
            legacy_dict = self._github_backend.get_legacy_json(category, redownload=False)

    # Store in cache
    if legacy_dict is not None or legacy_string is not None:
        self._store_legacy_in_cache(category, legacy_dict, legacy_string)

    return legacy_string

get_statistics

get_statistics() -> dict[str, Any]

Get backend statistics.

Returns:

  • dict[str, Any]

    dict containing: - primary_hits: Number of successful PRIMARY API fetches - github_fallbacks: Number of times GitHub fallback was used - cache_size: Number of categories in local cache

Source code in src/horde_model_reference/backends/http_backend.py
@override
def get_statistics(self) -> dict[str, Any]:
    """Get backend statistics.

    Returns:
        dict containing:
            - primary_hits: Number of successful PRIMARY API fetches
            - github_fallbacks: Number of times GitHub fallback was used
            - cache_size: Number of categories in local cache

    """
    return {
        "primary_hits": self._primary_hits,
        "github_fallbacks": self._github_fallbacks,
        "cache_size": len(self._cache),
    }

needs_refresh

needs_refresh(category: MODEL_REFERENCE_CATEGORY) -> bool
Source code in src/horde_model_reference/backends/replica_backend_base.py
@override
def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    with self._lock:
        if category in self._stale_categories:
            logger.debug(f"Category {category} marked stale, needs refresh")
            return True

        last_updated = self._category_timestamps.get(category)

    if last_updated is None:
        # No timestamp means no data has been fetched/cached yet.
        # This is not a "refresh" scenario - it's an initial fetch scenario.
        # Callers should handle initial fetch separately from refresh logic.
        logger.debug(f"Category {category} has no timestamp, no refresh needed (not yet fetched)")
        return False

    if self._cache_ttl_seconds is not None:
        cache_stale = (time.time() - last_updated) > self._cache_ttl_seconds
        if cache_stale:
            logger.debug(f"Category {category} cache is stale, needs refresh")
            self.mark_stale(category)
            return True

    file_path = self._get_file_path_for_validation(category)
    if file_path and file_path.exists():
        try:
            current_mtime = file_path.stat().st_mtime
            last_known = self._last_known_mtimes.get(category, 0.0)
            if current_mtime != last_known:
                logger.debug(f"File {file_path.name} mtime changed, needs refresh")
                self.mark_stale(category)
                return True
        except Exception:
            return True

    return False

register_invalidation_callback

register_invalidation_callback(
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None

Register a callback to be called when a category is invalidated.

This allows external components (like ModelReferenceManager) to be notified when cached data becomes stale and needs to be refreshed.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def register_invalidation_callback(
    self,
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None:
    """Register a callback to be called when a category is invalidated.

    This allows external components (like ModelReferenceManager) to be notified
    when cached data becomes stale and needs to be refreshed.

    Args:
        callback: Function to call with the invalidated category.

    """
    self._invalidation_callbacks.append(callback)
    logger.debug(f"Registered invalidation callback: {getattr(callback, '__name__', repr(callback))}")

_notify_invalidation

_notify_invalidation(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Notify all registered callbacks that a category has been invalidated.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def _notify_invalidation(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Notify all registered callbacks that a category has been invalidated.

    Args:
        category: The category that was invalidated.

    """
    for callback in self._invalidation_callbacks:
        try:
            callback(category)
        except Exception as e:
            cb_name = getattr(callback, "__name__", repr(callback))
            logger.error(f"Invalidation callback {cb_name} failed for {category}: {e}")

_mark_stale_impl

_mark_stale_impl(
    category: MODEL_REFERENCE_CATEGORY,
) -> None
Source code in src/horde_model_reference/backends/replica_backend_base.py
@override
def _mark_stale_impl(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    with self._lock:
        self._stale_categories.add(category)

mark_stale

mark_stale(category: MODEL_REFERENCE_CATEGORY) -> None

Mark a category's data as stale, requiring refresh on next access.

This method calls the backend-specific implementation and then notifies all registered callbacks.

Parameters:

Implementation Note

The base class provides this public implementation. Subclasses should override _mark_stale_impl() instead of this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def mark_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Mark a category's data as stale, requiring refresh on next access.

    This method calls the backend-specific implementation and then notifies
    all registered callbacks.

    Args:
        category: The category to mark as stale.

    Implementation Note:
        The base class provides this public implementation. Subclasses should override
        [_mark_stale_impl()][(c)._mark_stale_impl]
        instead of this method.

    See Also:
        - [_mark_stale_impl()][(c)._mark_stale_impl]: Backend-specific staleness tracking
        - [register_invalidation_callback()][(c).register_invalidation_callback]:
          Register callbacks for invalidation events

    """
    self._mark_stale_impl(category)
    self._notify_invalidation(category)

support_any_writes

support_any_writes() -> bool

Check if this backend supports any write operations (v2 or legacy).

Returns:

  • bool ( bool ) –

    True if any write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def support_any_writes(self) -> bool:
    """Check if this backend supports any write operations (v2 or legacy).

    Returns:
        bool: True if any write operations are supported, False otherwise.

    """
    return self.supports_writes() or self.supports_legacy_writes()

supports_writes

supports_writes() -> bool

Check if this backend supports write operations (v2 format).

Write operations include update_model() and delete_model(). Typically only PRIMARY mode backends support writes.

Returns:

  • bool ( bool ) –

    True if write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_writes(self) -> bool:
    """Check if this backend supports write operations (v2 format).

    Write operations include update_model() and delete_model().
    Typically only PRIMARY mode backends support writes.

    Returns:
        bool: True if write operations are supported, False otherwise.

    """
    return False

supports_legacy_writes

supports_legacy_writes() -> bool

Check if this backend supports write operations in legacy format.

Legacy write operations include update_model_legacy() and delete_model_legacy(). Only available when canonical_format='LEGACY' in PRIMARY mode.

Returns:

  • bool ( bool ) –

    True if legacy write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_legacy_writes(self) -> bool:
    """Check if this backend supports write operations in legacy format.

    Legacy write operations include update_model_legacy() and delete_model_legacy().
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Returns:
        bool: True if legacy write operations are supported, False otherwise.

    """
    return False

supports_cache_warming

supports_cache_warming() -> bool

Check if this backend supports cache warming operations.

Cache warming pre-populates the cache with data to improve initial request performance. Typically only backends with distributed caching (like Redis) support this.

Returns:

  • bool ( bool ) –

    True if cache warming is supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_cache_warming(self) -> bool:
    """Check if this backend supports cache warming operations.

    Cache warming pre-populates the cache with data to improve initial request performance.
    Typically only backends with distributed caching (like Redis) support this.

    Returns:
        bool: True if cache warming is supported, False otherwise.

    """
    return False

supports_health_checks

supports_health_checks() -> bool

Check if this backend supports health check operations.

Health checks verify that the backend's external dependencies (Redis, databases, etc.) are accessible and functioning correctly.

Returns:

  • bool ( bool ) –

    True if health checks are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_health_checks(self) -> bool:
    """Check if this backend supports health check operations.

    Health checks verify that the backend's external dependencies (Redis, databases, etc.)
    are accessible and functioning correctly.

    Returns:
        bool: True if health checks are supported, False otherwise.

    """
    return False

supports_statistics

supports_statistics() -> bool

Check if this backend supports statistics retrieval.

Statistics provide insights into backend performance, cache hits/misses, etc.

Returns:

  • bool ( bool ) –

    True if statistics are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_statistics(self) -> bool:
    """Check if this backend supports statistics retrieval.

    Statistics provide insights into backend performance, cache hits/misses, etc.

    Returns:
        bool: True if statistics are supported, False otherwise.

    """
    return False

update_model

update_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data as a dictionary.

Parameters:

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Requirements
  • Create model if it doesn't exist
  • Update model if it exists
  • Ensure atomic writes (use temp files with rename for file-based backends)
  • Call mark_stale() after successful write to invalidate cache
  • Override supports_writes() to return True
See Also
Source code in src/horde_model_reference/backends/base.py
def update_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data as a dictionary.

    Args:
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Requirements:
        - Create model if it doesn't exist
        - Update model if it exists
        - Ensure atomic writes (use temp files with rename for file-based backends)
        - Call [mark_stale()][(c).mark_stale] after successful write to invalidate cache
        - Override [supports_writes()][(c).supports_writes] to return `True`

    See Also:
        - [update_model_from_base_model()][(c).update_model_from_base_model]:
          Update from pydantic model (automatically provided)
        - [delete_model()][(c).delete_model]: Delete a model
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

update_model_from_base_model

update_model_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference from a pydantic BaseModel.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Note

The base class provides this implementation automatically. It: 1. Checks supports_writes() returns True 2. Converts the pydantic model to dict using model_dump(exclude_unset=True) 3. Calls update_model() with the dictionary

Backends that support writes typically don't need to override this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def update_model_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference from a pydantic BaseModel.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Note:
        The base class provides this implementation automatically. It:
        1. Checks [supports_writes()][(c).supports_writes] returns `True`
        2. Converts the pydantic model to dict using `model_dump(exclude_unset=True)`
        3. Calls [update_model()][(c).update_model] with the dictionary

        Backends that support writes typically don't need to override this method.

    See Also:
        - [update_model()][(c).update_model]: Update from dictionary (implement this)
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    if not self.supports_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model

delete_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

Parameters:

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Requirements
  • Raise KeyError if model doesn't exist
  • Ensure atomic writes
  • Call mark_stale() after successful delete to invalidate cache
  • Override supports_writes() to return True
See Also
Source code in src/horde_model_reference/backends/base.py
def delete_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.

    Args:
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.
        KeyError: If the model doesn't exist.

    Implementation Requirements:
        - Raise `KeyError` if model doesn't exist
        - Ensure atomic writes
        - Call [mark_stale()][(c).mark_stale] after successful delete to invalidate cache
        - Override [supports_writes()][(c).supports_writes] to return `True`

    See Also:
        - [update_model()][(c).update_model]: Update or create a model
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

update_model_legacy

update_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data in legacy format as a dictionary.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data in legacy format as a dictionary.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

update_model_legacy_from_base_model

update_model_legacy_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format from a pydantic BaseModel.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format from a pydantic BaseModel.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    if not self.supports_legacy_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model_legacy(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model_legacy

delete_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference from legacy format files.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category containing the model.

  • model_name (str) –

    The name of the model to delete.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def delete_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference from legacy format files.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

warm_cache

warm_cache() -> None

Pre-populate cache with all categories for faster initial requests.

This is an optional method that backends with cache warming support can implement. Backends without cache warming should leave the default implementation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def warm_cache(self) -> None:
    """Pre-populate cache with all categories for faster initial requests.

    This is an optional method that backends with cache warming support can implement.
    Backends without cache warming should leave the default implementation.

    Raises:
        NotImplementedError: If the backend does not support cache warming.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support cache warming")

warm_cache_async async

warm_cache_async() -> None

Asynchronously pre-populate cache with all categories for faster initial requests.

This is an optional method that backends with cache warming support can implement. Backends without cache warming should leave the default implementation.

Raises:

Source code in src/horde_model_reference/backends/base.py
async def warm_cache_async(self) -> None:
    """Asynchronously pre-populate cache with all categories for faster initial requests.

    This is an optional method that backends with cache warming support can implement.
    Backends without cache warming should leave the default implementation.

    Raises:
        NotImplementedError: If the backend does not support async cache warming.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async cache warming")

health_check

health_check() -> bool

Check the health of the backend's external dependencies.

This is an optional method that backends with health check support can implement. Backends without external dependencies should leave the default implementation.

Returns:

  • bool ( bool ) –

    True if healthy, False otherwise.

Raises:

Source code in src/horde_model_reference/backends/base.py
def health_check(self) -> bool:
    """Check the health of the backend's external dependencies.

    This is an optional method that backends with health check support can implement.
    Backends without external dependencies should leave the default implementation.

    Returns:
        bool: True if healthy, False otherwise.

    Raises:
        NotImplementedError: If the backend does not support health checks.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support health checks")

get_replicate_mode

get_replicate_mode() -> ReplicateMode

Get the replication mode of this backend.

Returns:

  • ReplicateMode ( ReplicateMode ) –

    The replicate mode (PRIMARY or REPLICA).

Source code in src/horde_model_reference/backends/base.py
def get_replicate_mode(self) -> ReplicateMode:
    """Get the replication mode of this backend.

    Returns:
        ReplicateMode: The replicate mode (PRIMARY or REPLICA).

    """
    return self._replicate_mode

supports_metadata

supports_metadata() -> bool

Check if this backend supports metadata tracking.

Metadata tracking records operation counts, timestamps, and health metrics for both legacy (v1) and v2 format operations. Typically only PRIMARY mode backends support metadata tracking.

Returns:

  • bool ( bool ) –

    True if metadata tracking is supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_metadata(self) -> bool:
    """Check if this backend supports metadata tracking.

    Metadata tracking records operation counts, timestamps, and health metrics
    for both legacy (v1) and v2 format operations. Typically only PRIMARY mode
    backends support metadata tracking.

    Returns:
        bool: True if metadata tracking is supported, False otherwise.

    """
    return False

get_legacy_metadata

get_legacy_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get legacy format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get legacy format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_legacy_metadata_async async

get_legacy_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get legacy format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_legacy_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get legacy format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_metadata

get_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get v2 format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get v2 format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_metadata_async async

get_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get v2 format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get v2 format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_all_legacy_metadata

get_all_legacy_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get legacy format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get legacy format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_all_legacy_metadata_async async

get_all_legacy_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get legacy format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_all_legacy_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get legacy format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_all_metadata

get_all_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get v2 format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_all_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get v2 format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_all_metadata_async async

get_all_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get v2 format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_all_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get v2 format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

_mark_category_fresh

_mark_category_fresh(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Record that we hold a fresh cache entry for category.

Also updates mtime if a file path is provided by the subclass.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _mark_category_fresh(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Record that we hold a fresh cache entry for *category*.

    Also updates mtime if a file path is provided by the subclass.

    Args:
        category: The category to mark as fresh.

    """
    self._category_timestamps[category] = time.time()
    self._stale_categories.discard(category)

    file_path = self._get_file_path_for_validation(category)
    if file_path and file_path.exists():
        try:
            self._last_known_mtimes[category] = file_path.stat().st_mtime
        except Exception:
            self._last_known_mtimes[category] = 0.0

    logger.debug(f"Marked category {category} as fresh")

_invalidate_category_timestamp

_invalidate_category_timestamp(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Drop timestamp knowledge for category without adjusting payloads.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_category_timestamp(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Drop timestamp knowledge for *category* without adjusting payloads."""
    self._category_timestamps.pop(category, None)

has_cached_data

has_cached_data(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if any data has been cached for this category.

This is a simple existence check that doesn't validate freshness. Use this for initial fetch detection: "Have we loaded this at least once?"

Parameters:

Returns:

  • bool ( bool ) –

    True if data exists in cache (may be stale), False if never loaded.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def has_cached_data(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if any data has been cached for this category.

    This is a simple existence check that doesn't validate freshness.
    Use this for initial fetch detection: "Have we loaded this at least once?"

    Args:
        category: The category to check.

    Returns:
        bool: True if data exists in cache (may be stale), False if never loaded.

    """
    with self._lock:
        return category in self._cache

is_cache_valid

is_cache_valid(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if cached data exists and is still valid for the given category.

This method performs comprehensive validation to determine if cached data can be used without refetching. It's primarily used internally by cache retrieval methods but can also be called directly for validation checks.

Parameters:

Returns:

  • bool

    True if cache exists and all validation checks pass, False otherwise.

Validation Steps

The method performs checks in the following order:

  1. Explicit Staleness: Returns False if category is in _stale_categories
  2. Cache Existence: Returns False if category has never been cached
  3. Timestamp Existence: Returns False if no timestamp recorded
  4. TTL Expiration: Checks if cache_ttl_seconds exceeded (calls mark_stale() if expired)
  5. File Modification: Compares current file mtime with cached mtime (calls mark_stale() if changed)
  6. Custom Validation: Calls _additional_cache_validation() for subclass-specific checks
Side Effects

When staleness is detected (TTL expiration or mtime change), this method calls mark_stale() to trigger invalidation callbacks and notify the manager.

Return Value Semantics
  • Returns False for both "no data" and "stale data" cases
  • Use has_cached_data() to distinguish between these cases
  • Use needs_refresh() to check staleness without considering initial fetch
Note

This method is thread-safe and uses the internal _lock for synchronization.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def is_cache_valid(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if cached data exists and is still valid for the given category.

    This method performs comprehensive validation to determine if cached data can be
    used without refetching. It's primarily used internally by cache retrieval methods
    but can also be called directly for validation checks.

    Args:
        category: The category to validate.

    Returns:
        True if cache exists and all validation checks pass, False otherwise.

    Validation Steps:
        The method performs checks in the following order:

        1. **Explicit Staleness**: Returns False if category is in `_stale_categories`
        2. **Cache Existence**: Returns False if category has never been cached
        3. **Timestamp Existence**: Returns False if no timestamp recorded
        4. **TTL Expiration**: Checks if `cache_ttl_seconds` exceeded (calls `mark_stale()` if expired)
        5. **File Modification**: Compares current file mtime with cached mtime (calls `mark_stale()` if changed)
        6. **Custom Validation**: Calls `_additional_cache_validation()` for subclass-specific checks

    Side Effects:
        When staleness is detected (TTL expiration or mtime change), this method calls
        `mark_stale()` to trigger invalidation callbacks and notify the manager.

    Related Methods:
        - `has_cached_data()`: Simple existence check, ignores validity
        - `should_fetch_data()`: Combined check for "should I fetch?" (initial OR refresh)
        - `needs_refresh()`: Checks if cached data should be refetched (staleness only)

    Return Value Semantics:
        - Returns `False` for both "no data" and "stale data" cases
        - Use `has_cached_data()` to distinguish between these cases
        - Use `needs_refresh()` to check staleness without considering initial fetch


    Note:
        This method is thread-safe and uses the internal `_lock` for synchronization.

    """
    with self._lock:
        if category in self._stale_categories:
            logger.debug(f"Category {category} marked stale, cache invalid")
            return False

        if category not in self._cache:
            return False

        last_updated = self._category_timestamps.get(category)

    if last_updated is None:
        logger.debug(f"Category {category} has no timestamp, considering cache invalid")
        return False

    if self._cache_ttl_seconds is not None:
        elapsed = time.time() - last_updated
        if elapsed > self._cache_ttl_seconds:
            logger.debug(f"Category {category} TTL expired ({elapsed}s > {self._cache_ttl_seconds}s)")
            self.mark_stale(category)
            return False

    file_path = self._get_file_path_for_validation(category)
    if file_path and file_path.exists():
        try:
            current_mtime = file_path.stat().st_mtime
            last_known = self._last_known_mtimes.get(category, 0.0)
            if current_mtime != last_known:
                logger.debug(
                    f"File {file_path.name} mtime changed "
                    f"(current={current_mtime}, cached={last_known}), cache invalid"
                )
                self.mark_stale(category)
                return False
        except Exception:
            return False

    if not self._additional_cache_validation(category):
        logger.debug(f"Category {category} failed additional validation")
        return False

    return True

should_fetch_data

should_fetch_data(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Determine if data should be fetched (initial load OR refresh).

This is a convenience method that combines both initial fetch detection and refresh detection into a single check. Use this when you want to know "should I fetch data now?" regardless of whether it's an initial load or a refresh.

This is equivalent to: not is_cache_valid(category) or needs_refresh(category) but handles the logic more efficiently.

Parameters:

Returns:

  • bool ( bool ) –

    True if data should be fetched (either initial or refresh), False if cached data is valid and fresh.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def should_fetch_data(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Determine if data should be fetched (initial load OR refresh).

    This is a convenience method that combines both initial fetch detection
    and refresh detection into a single check. Use this when you want to
    know "should I fetch data now?" regardless of whether it's an initial
    load or a refresh.

    This is equivalent to: `not is_cache_valid(category) or needs_refresh(category)`
    but handles the logic more efficiently.

    Args:
        category: The category to check.

    Returns:
        bool: True if data should be fetched (either initial or refresh),
              False if cached data is valid and fresh.

    """
    return not self.is_cache_valid(category)

_set_cache_ttl_seconds

_set_cache_ttl_seconds(ttl_seconds: int | None) -> None

Allow subclasses to tweak TTL after initialization if desired.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _set_cache_ttl_seconds(self, ttl_seconds: int | None) -> None:
    """Allow subclasses to tweak TTL after initialization if desired."""
    self._cache_ttl_seconds = ttl_seconds

_get_file_path_for_validation

_get_file_path_for_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Return file path for mtime validation.

Subclasses should override this if they want automatic mtime validation. If a path is returned, the cache will be invalidated if the file's mtime changes.

Parameters:

Returns:

  • Path | None

    Path | None: File path to check for mtime, or None to skip mtime validation.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Return file path for mtime validation.

    Subclasses should override this if they want automatic mtime validation.
    If a path is returned, the cache will be invalidated if the file's mtime changes.

    Args:
        category: The category to get the file path for.

    Returns:
        Path | None: File path to check for mtime, or None to skip mtime validation.

    """
    return None

_additional_cache_validation

_additional_cache_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Perform additional cache validation.

Subclasses can override this to add custom validation logic beyond TTL and mtime checks. This is called during is_cache_valid().

Parameters:

Returns:

  • bool ( bool ) –

    True if cache is valid, False to invalidate.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _additional_cache_validation(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Perform additional cache validation.

    Subclasses can override this to add custom validation logic beyond
    TTL and mtime checks. This is called during `is_cache_valid()`.

    Args:
        category: The category to validate.

    Returns:
        bool: True if cache is valid, False to invalidate.

    """
    return True

_fetch_with_cache

_fetch_with_cache(
    category: MODEL_REFERENCE_CATEGORY,
    fetch_fn: Callable[[], dict[str, Any] | None],
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Implement standard fetch pattern with automatic caching.

This helper method implements the recommended fetch pattern: 1. Check cache if not forcing refresh 2. Return cached data if valid 3. Fetch data using provided function 4. Store in cache and return

Use this in your fetch_category() implementations to avoid boilerplate.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • fetch_fn (Callable[[], dict[str, Any] | None]) –

    Callable that fetches the data (no args, returns dict or None).

  • force_refresh (bool, default: False ) –

    If True, skip cache check and force fetch.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The fetched/cached data.

Example

def fetch_category(self, category, *, force_refresh=False): return self._fetch_with_cache( category, lambda: self._fetch_from_source(category), force_refresh=force_refresh )

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _fetch_with_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    fetch_fn: Callable[[], dict[str, Any] | None],
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Implement standard fetch pattern with automatic caching.

    This helper method implements the recommended fetch pattern:
    1. Check cache if not forcing refresh
    2. Return cached data if valid
    3. Fetch data using provided function
    4. Store in cache and return

    Use this in your fetch_category() implementations to avoid boilerplate.

    Args:
        category: The category to fetch.
        fetch_fn: Callable that fetches the data (no args, returns dict or None).
        force_refresh: If True, skip cache check and force fetch.

    Returns:
        dict[str, Any] | None: The fetched/cached data.

    Example:
        def fetch_category(self, category, *, force_refresh=False):
            return self._fetch_with_cache(
                category,
                lambda: self._fetch_from_source(category),
                force_refresh=force_refresh
            )

    """
    # Check cache first unless force refresh
    if not force_refresh:
        cached_data = self._get_from_cache(category)
        if cached_data is not None:
            return cached_data

    # Fetch data
    data = fetch_fn()

    # Store in cache
    if data is not None:
        self._store_in_cache(category, data)
    else:
        # Store None to indicate "checked but not found"
        self._store_in_cache(category, None)

    return data

_get_from_cache

_get_from_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> dict[str, Any] | None

Get data from cache if valid.

This is the primary method subclasses should use to retrieve cached data. It handles all validation logic internally, including initial fetch detection (returns None if data has never been loaded).

This method determines if an INITIAL fetch is needed by checking cache existence. Use needs_refresh() to check if existing cached data should be RE-fetched.

Parameters:

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: Cached data if valid, None if cache miss (initial fetch needed) or cache invalid (refresh needed).

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_from_cache(self, category: MODEL_REFERENCE_CATEGORY) -> dict[str, Any] | None:
    """Get data from cache if valid.

    This is the primary method subclasses should use to retrieve cached data.
    It handles all validation logic internally, including initial fetch detection
    (returns None if data has never been loaded).

    This method determines if an INITIAL fetch is needed by checking cache existence.
    Use `needs_refresh()` to check if existing cached data should be RE-fetched.

    Args:
        category: The category to retrieve from cache.

    Returns:
        dict[str, Any] | None: Cached data if valid, None if cache miss (initial fetch needed)
                               or cache invalid (refresh needed).

    """
    with self._lock:
        if self.is_cache_valid(category):
            logger.debug(f"Cache hit for {category}")
            return self._cache.get(category)

        logger.debug(f"Cache miss for {category}")
        return None

_store_in_cache

_store_in_cache(
    category: MODEL_REFERENCE_CATEGORY,
    data: dict[str, Any] | None,
) -> None

Store data in cache and mark category as fresh.

This is the primary method subclasses should use to store fetched data. It handles timestamp updates and mtime tracking internally.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _store_in_cache(self, category: MODEL_REFERENCE_CATEGORY, data: dict[str, Any] | None) -> None:
    """Store data in cache and mark category as fresh.

    This is the primary method subclasses should use to store fetched data.
    It handles timestamp updates and mtime tracking internally.

    Args:
        category: The category to store.
        data: The data to cache, or None if category has no data.

    """
    with self._lock:
        self._cache[category] = data
        # Only mark as fresh if we actually have data
        # None values indicate failed loads and should not prevent retries
        if data is not None:
            self._mark_category_fresh(category)
            logger.debug(f"Stored {category} in cache")
        else:
            logger.debug(f"Stored None for {category}, not marking as fresh")

_invalidate_cache

_invalidate_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Invalidate cache for a category without deleting the data.

This marks the category as stale, forcing a refetch on next access.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Invalidate cache for a category without deleting the data.

    This marks the category as stale, forcing a refetch on next access.

    Args:
        category: The category to invalidate.

    """
    with self._lock:
        self._stale_categories.add(category)
        self._category_timestamps.pop(category, None)
        logger.debug(f"Invalidated cache for {category}")

_get_legacy_file_path_for_validation

_get_legacy_file_path_for_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Return legacy file path for mtime validation.

Subclasses should override this if they want automatic mtime validation for legacy format files. This is separate from the v2 file path.

Parameters:

Returns:

  • Path | None

    Path | None: Legacy file path to check for mtime, or None to skip mtime validation.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_legacy_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Return legacy file path for mtime validation.

    Subclasses should override this if they want automatic mtime validation
    for legacy format files. This is separate from the v2 file path.

    Args:
        category: The category to get the legacy file path for.

    Returns:
        Path | None: Legacy file path to check for mtime, or None to skip mtime validation.

    """
    return None

_mark_legacy_category_fresh

_mark_legacy_category_fresh(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Record that we hold a fresh legacy cache entry for category.

Also updates legacy file mtime if a path is provided by the subclass.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _mark_legacy_category_fresh(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Record that we hold a fresh legacy cache entry for *category*.

    Also updates legacy file mtime if a path is provided by the subclass.

    Args:
        category: The category to mark as fresh.

    """
    self._legacy_cache_timestamps[category] = time.time()
    self._stale_legacy_categories.discard(category)

    legacy_file_path = self._get_legacy_file_path_for_validation(category)
    if legacy_file_path and legacy_file_path.exists():
        try:
            self._legacy_last_known_mtimes[category] = legacy_file_path.stat().st_mtime
        except Exception:
            self._legacy_last_known_mtimes[category] = 0.0

    logger.debug(f"Marked legacy category {category} as fresh")

is_legacy_cache_valid

is_legacy_cache_valid(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Return True if the legacy cache for category is considered fresh.

Performs validation checks for legacy format cache: 1. Staleness check (explicit invalidation) 2. Cache existence check (dict or string) 3. TTL expiration check 4. File mtime check (if legacy file path provided by subclass)

Parameters:

Returns:

  • bool ( bool ) –

    True if legacy cache is valid and can be used.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def is_legacy_cache_valid(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Return True if the legacy cache for *category* is considered fresh.

    Performs validation checks for legacy format cache:
    1. Staleness check (explicit invalidation)
    2. Cache existence check (dict or string)
    3. TTL expiration check
    4. File mtime check (if legacy file path provided by subclass)

    Args:
        category: The category to validate.

    Returns:
        bool: True if legacy cache is valid and can be used.

    """
    with self._lock:
        if category in self._stale_legacy_categories:
            logger.debug(f"Legacy category {category} marked stale, cache invalid")
            return False

        if category not in self._legacy_json_cache and category not in self._legacy_json_string_cache:
            return False

        last_updated = self._legacy_cache_timestamps.get(category)

    if last_updated is None:
        logger.debug(f"Legacy category {category} has no timestamp, considering cache invalid")
        return False

    if self._cache_ttl_seconds is not None:
        elapsed = time.time() - last_updated
        if elapsed > self._cache_ttl_seconds:
            logger.debug(f"Legacy category {category} TTL expired ({elapsed}s > {self._cache_ttl_seconds}s)")
            return False

    legacy_file_path = self._get_legacy_file_path_for_validation(category)
    if legacy_file_path and legacy_file_path.exists():
        try:
            current_mtime = legacy_file_path.stat().st_mtime
            last_known = self._legacy_last_known_mtimes.get(category, 0.0)
            if current_mtime != last_known:
                logger.debug(
                    f"Legacy file {legacy_file_path.name} mtime changed "
                    f"(current={current_mtime}, cached={last_known}), cache invalid"
                )
                return False
        except Exception:
            return False

    return True

_get_legacy_from_cache

_get_legacy_from_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> tuple[dict[str, Any] | None, str | None]

Get legacy data from cache if valid.

Returns both dict and string representations of legacy JSON.

Parameters:

Returns:

  • tuple[dict[str, Any] | None, str | None]

    tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) if cache miss.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_legacy_from_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
) -> tuple[dict[str, Any] | None, str | None]:
    """Get legacy data from cache if valid.

    Returns both dict and string representations of legacy JSON.

    Args:
        category: The category to retrieve from cache.

    Returns:
        tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) if cache miss.

    """
    with self._lock:
        if self.is_legacy_cache_valid(category):
            logger.debug(f"Legacy cache hit for {category}")
            return (
                self._legacy_json_cache.get(category),
                self._legacy_json_string_cache.get(category),
            )

        logger.debug(f"Legacy cache miss for {category}")
        return None, None

_store_legacy_in_cache

_store_legacy_in_cache(
    category: MODEL_REFERENCE_CATEGORY,
    legacy_dict: dict[str, Any] | None,
    legacy_string: str | None,
) -> None

Store legacy data in cache and mark category as fresh.

Stores both dict and string representations of legacy JSON.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to store.

  • legacy_dict (dict[str, Any] | None) –

    The legacy JSON as a dict, or None.

  • legacy_string (str | None) –

    The legacy JSON as a string, or None.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _store_legacy_in_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    legacy_dict: dict[str, Any] | None,
    legacy_string: str | None,
) -> None:
    """Store legacy data in cache and mark category as fresh.

    Stores both dict and string representations of legacy JSON.

    Args:
        category: The category to store.
        legacy_dict: The legacy JSON as a dict, or None.
        legacy_string: The legacy JSON as a string, or None.

    """
    with self._lock:
        self._legacy_json_cache[category] = legacy_dict
        self._legacy_json_string_cache[category] = legacy_string
        # Only mark as fresh if we actually have data
        # None values indicate failed loads and should not prevent retries
        if legacy_dict is not None or legacy_string is not None:
            self._mark_legacy_category_fresh(category)
            logger.debug(f"Stored legacy {category} in cache")
        else:
            logger.debug(f"Stored None for legacy {category}, not marking as fresh")

_invalidate_legacy_cache

_invalidate_legacy_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Invalidate legacy cache for a category without deleting the data.

This marks the category as stale, forcing a refetch on next access.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_legacy_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Invalidate legacy cache for a category without deleting the data.

    This marks the category as stale, forcing a refetch on next access.

    Args:
        category: The category to invalidate.

    """
    with self._lock:
        self._stale_legacy_categories.add(category)
        self._legacy_cache_timestamps.pop(category, None)
        logger.debug(f"Invalidated legacy cache for {category}")