Skip to content

github_backend

GitHub-based backend for REPLICA mode.

This backend downloads legacy model reference files from GitHub repositories, converts them to the new format, and provides them to REPLICA clients as a fallback when the PRIMARY API is unavailable.

GitHubBackend

Bases: ReplicaBackendBase

Backend that fetches legacy model references from GitHub and converts them.

This backend is designed for REPLICA mode only. It: 1. Downloads legacy JSON files from AI-Horde GitHub repositories 2. Stores them in a local legacy/ folder 3. Converts them to the new format using legacy converters 4. Returns the converted (new format) data 5. Provides a fallback for REPLICA clients when PRIMARY API is down 6. PRIMARYs can initialize for the first time if needed from GitHub

This backend is read-only and enforces REPLICA mode.

Source code in src/horde_model_reference/backends/github_backend.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
class GitHubBackend(ReplicaBackendBase):
    """Backend that fetches legacy model references from GitHub and converts them.

    This backend is designed for REPLICA mode only. It:
    1. Downloads legacy JSON files from AI-Horde GitHub repositories
    2. Stores them in a local legacy/ folder
    3. Converts them to the new format using legacy converters
    4. Returns the converted (new format) data
    5. Provides a fallback for REPLICA clients when PRIMARY API is down
    6. PRIMARYs can initialize for the first time if needed from GitHub

    This backend is read-only and enforces REPLICA mode.
    """

    def __init__(
        self,
        *,
        base_path: str | Path = horde_model_reference_paths.base_path,
        cache_ttl_seconds: int = horde_model_reference_settings.cache_ttl_seconds,
        retry_max_attempts: int = horde_model_reference_settings.legacy_download_retry_max_attempts,
        retry_backoff_seconds: float = horde_model_reference_settings.legacy_download_retry_backoff_seconds,
        replicate_mode: ReplicateMode = ReplicateMode.REPLICA,
    ) -> None:
        """Initialize the GitHub backend for REPLICA mode.

        Args:
            base_path: Base path for storing model reference files.
            cache_ttl_seconds: TTL for internal cache in seconds.
            retry_max_attempts: Max download retry attempts.
            retry_backoff_seconds: Backoff time between retries.
            replicate_mode: Must be REPLICA. Defaults to REPLICA.

        Raises:
            ValueError: If replicate_mode is not REPLICA.

        """
        super().__init__(mode=replicate_mode, cache_ttl_seconds=cache_ttl_seconds)

        if self._replicate_mode != ReplicateMode.REPLICA:
            logger.warning(
                "GitHubBackend is designed for REPLICA mode only. For PRIMARY mode, use FileSystemBackend or "
                "RedisBackend. You can ignore this warning if you intend to use GitHubBackend for one-time "
                "initialization in PRIMARY mode."
            )

        self.base_path = Path(base_path)
        self.legacy_path = self.base_path.joinpath(LEGACY_REFERENCE_FOLDER_NAME)

        self.retry_max_attempts = retry_max_attempts
        self.retry_backoff_seconds = retry_backoff_seconds

        self._references_paths_cache: dict[MODEL_REFERENCE_CATEGORY, Path | None] = {}
        self._times_downloaded: dict[MODEL_REFERENCE_CATEGORY, int] = {}

        for category in MODEL_REFERENCE_CATEGORY:
            file_path = horde_model_reference_paths.get_legacy_model_reference_file_path(
                category,
                base_path=self.base_path,
            )

            if file_path.exists():
                self._references_paths_cache[category] = file_path
                self._load_legacy_json_from_disk(category, file_path)
            else:
                if self._replicate_mode == ReplicateMode.REPLICA or (
                    self._replicate_mode == ReplicateMode.PRIMARY
                    and horde_model_reference_settings.github_seed_enabled
                ):
                    self._references_paths_cache[category] = None
                else:
                    raise FileNotFoundError(f"Model reference file not found for {category}.")

            self._times_downloaded[category] = 0

    @override
    def _get_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
        """Return the converted (v2) file path for mtime validation.

        Args:
            category: The category to get the file path for.

        Returns:
            Path | None: Path to converted file for mtime validation.

        """
        return horde_model_reference_paths.get_model_reference_file_path(
            category,
            base_path=self.base_path,
        )

    @override
    def _get_legacy_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
        """Return the legacy file path for mtime validation.

        Args:
            category: The category to get the legacy file path for.

        Returns:
            Path | None: Path to legacy file for mtime validation.

        """
        return self._references_paths_cache.get(category)

    @override
    def fetch_category(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Fetch model reference data for a specific category.

        Downloads legacy format from GitHub, converts to new format, and returns it.

        Args:
            category: The category to fetch.
            force_refresh: If True, force download even if file exists.

        Returns:
            dict[str, Any] | None: The converted model reference data (new format).

        """
        with self._lock:
            # Use helper to determine if we need to fetch
            if force_refresh or self.should_fetch_data(category):
                self._download_and_convert_single(category, overwrite_existing=force_refresh)
                return self._load_converted_from_disk(category)

            # Return cached data
            return self._get_from_cache(category)

    @override
    def fetch_all_categories(
        self,
        *,
        force_refresh: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Fetch model reference data for all categories.

        Downloads all legacy files from GitHub, converts them, and returns new format data.

        Args:
            force_refresh: If True, force download all files.

        Returns:
            dict mapping categories to their converted model reference data.

        """
        with self._lock:
            if force_refresh:
                self._download_and_convert_all(overwrite_existing=True)

            result: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
            for category in MODEL_REFERENCE_CATEGORY:
                # Use helper to determine if we need to fetch
                if force_refresh or self.should_fetch_data(category):
                    self._download_legacy(category, overwrite_existing=force_refresh)
                    convert_legacy_database_by_category(category, self.base_path, self.base_path)
                    result[category] = self._load_converted_from_disk(category)
                else:
                    # Return cached data
                    result[category] = self._get_from_cache(category)

            return result

    @override
    async def fetch_category_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        httpx_client: httpx.AsyncClient | None = None,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Asynchronously fetch model reference data for a category.

        Args:
            category: The category to fetch.
            httpx_client: Optional httpx async client for downloads.
            force_refresh: If True, force download.

        Returns:
            dict[str, Any] | None: The converted model reference data.

        """
        lock = self.async_lock
        if lock is None:
            raise RuntimeError("Async lock is unavailable for GitHubBackend")

        async with lock:
            # Use helper to determine if we need to fetch
            if force_refresh or self.should_fetch_data(category):
                await self._download_legacy_async(
                    category,
                    httpx_client,
                    overwrite_existing=force_refresh,
                )
                convert_legacy_database_by_category(category, self.base_path, self.base_path)
                return self._load_converted_from_disk(category)

            # Return cached data
            return self._get_from_cache(category)

    @override
    async def fetch_all_categories_async(
        self,
        *,
        httpx_client: httpx.AsyncClient | None = None,
        force_refresh: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Asynchronously fetch all categories.

        Args:
            httpx_client: Optional httpx async client.
            force_refresh: If True, force download all.

        Returns:
            dict mapping categories to their data.

        """
        lock = self.async_lock
        if lock is None:
            raise RuntimeError("Async lock is unavailable for GitHubBackend")

        async with lock:
            # Download all that need refresh
            tasks = []
            categories_to_download = []
            for category in MODEL_REFERENCE_CATEGORY:
                # Use helper to determine if we need to fetch
                if force_refresh or self.should_fetch_data(category):
                    categories_to_download.append(category)
                    tasks.append(
                        self._download_legacy_async(
                            category,
                            httpx_client,
                            overwrite_existing=force_refresh,
                        )
                    )

            if tasks:
                await asyncio.gather(*tasks)
                convert_all_legacy_model_references()

            # Collect results from cache or disk
            result: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
            for category in MODEL_REFERENCE_CATEGORY:
                if category in categories_to_download:
                    result[category] = self._load_converted_from_disk(category)
                else:
                    result[category] = self._get_from_cache(category)

            return result

    @override
    def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        """Check if a category needs refresh.

        Base class handles all validation including mtime checks via hooks.

        Args:
            category: The category to check.

        Returns:
            bool: True if needs refresh (stale or mtime changed).

        """
        return super().needs_refresh(category)

    @override
    def mark_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Mark a category as stale, requiring refresh.

        Args:
            category: The category to mark stale.

        """
        logger.debug(f"Marking category {category} as stale")
        super().mark_stale(category)

    @override
    def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
        """Get the file path for a category's converted data.

        Args:
            category: The category to get path for.

        Returns:
            Path | None: Path to the converted (new format) file, or None if not available.

        """
        return horde_model_reference_paths.get_model_reference_file_path(
            category,
            base_path=self.base_path,
        )

    @override
    def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
        """Get file paths for all categories' converted data.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, Path | None]: Mapping of categories to their converted file paths.

        """
        return horde_model_reference_paths.get_all_model_reference_file_paths(base_path=self.base_path)

    def _load_legacy_json_from_disk(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        file_path: Path,
    ) -> dict[str, Any] | None:
        """Load legacy data from disk and populate cache via base class.

        For text_generation category, loads CSV format (models.csv).
        For other categories, loads JSON format.

        Args:
            category: The category to load.
            file_path: Path to the legacy file.

        Returns:
            dict[str, Any] | None: The loaded data, or None on error.

        """
        if not file_path.exists():
            logger.debug(f"Legacy file {file_path} does not exist")
            self._store_legacy_in_cache(category, None, None)
            return None

        try:
            # Handle CSV format for text_generation
            if category == MODEL_REFERENCE_CATEGORY.text_generation:
                data: dict[str, Any] = self._read_legacy_csv_to_dict(file_path)
                # For CSV, we store the dict and generate a JSON string for cache
                content_str = ujson.dumps(data, escape_forward_slashes=False, indent=4)
                self._store_legacy_in_cache(category, data, content_str)
                logger.debug(f"Loaded legacy CSV for category {category!r} from {file_path!r}")
                return data

            # Handle JSON format for other categories
            with open(file_path, "rb") as f:
                content = f.read()

            # Handle empty files (e.g., for categories with no GitHub URL)
            if not content or content.strip() == b"":
                logger.debug(f"Legacy file {file_path} is empty, treating as empty dict")
                self._store_legacy_in_cache(category, {}, "{}")
                return {}

            data = ujson.loads(content)
            content_str = content.decode("utf-8")

            self._store_legacy_in_cache(category, data, content_str)
            logger.debug(f"Loaded legacy JSON for category {category!r} from {file_path!r}")
            return data
        except Exception:
            logger.exception(f"Failed to load legacy data for category {category!r} from {file_path!r}")
            self._store_legacy_in_cache(category, None, None)
            return None

    def _read_legacy_csv_to_dict(self, file_path: Path) -> dict[str, Any]:
        """Read legacy CSV file (models.csv format) and convert to dict format.

        Uses the shared ``csv_rows_to_legacy_dict`` to replicate convert.py exactly,
        including defaults.json merging, instruct_format, correct field ordering,
        and backend prefix generation (3 entries per model).

        Args:
            file_path: Path to the legacy CSV file.

        Returns:
            dict[str, Any]: Model data with 3 entries per CSV row (matching db.json format).

        """
        parsed_rows, parse_issues = parse_legacy_text_csv_file(file_path)
        for issue in parse_issues:
            logger.warning(f"Legacy CSV issue for {issue.row_identifier}: {issue.message}")

        data = csv_rows_to_legacy_dict(parsed_rows, with_backend_prefixes=True)
        logger.debug(f"Read {len(data)} models from legacy CSV (includes backend prefix duplicates)")
        return data

    def _read_csv_to_dict(self, file_path: Path) -> dict[str, Any]:
        """Read CSV file and convert to dict format (one entry per base model).

        Uses the shared ``csv_rows_to_legacy_dict`` without backend prefixes.

        Args:
            file_path: Path to the CSV file.

        Returns:
            dict[str, Any]: Model data with one entry per base model.

        """
        parsed_rows, parse_issues = parse_legacy_text_csv_file(file_path)
        for issue in parse_issues:
            logger.warning(f"CSV issue for {issue.row_identifier}: {issue.message}")

        data = csv_rows_to_legacy_dict(parsed_rows, with_backend_prefixes=False)
        logger.debug(f"Read {len(data)} models from CSV (grouped, no backend prefixes)")
        return data

    def _load_converted_from_disk(
        self,
        category: MODEL_REFERENCE_CATEGORY,
    ) -> dict[str, Any] | None:
        """Load converted (v2 format) data from disk and cache via base class.

        All v2 format files (including text_generation.json) are in JSON format.
        CSV format is only used for legacy files (legacy/models.csv).

        Args:
            category: The category to load.

        Returns:
            dict[str, Any] | None: The loaded data, or None on error.

        """
        file_path = horde_model_reference_paths.get_model_reference_file_path(
            category,
            base_path=self.base_path,
        )

        if not file_path.exists():
            logger.debug(f"Converted file {file_path} does not exist")
            self._store_in_cache(category, None)
            return None

        try:
            # All v2 files are JSON format (including text_generation.json)
            with open(file_path, encoding="utf-8") as f:
                data = cast(dict[str, Any], ujson.load(f))

            self._store_in_cache(category, data)
            logger.debug(f"Loaded converted JSON for category {category!r} from {file_path!r}")
            return data
        except Exception:
            logger.exception(f"Failed to load converted data for category {category!r} from {file_path!r}")
            self._store_in_cache(category, None)
            return None

    @override
    def get_legacy_json(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        redownload: bool = False,
    ) -> dict[str, Any] | None:
        """Get raw legacy JSON for a specific category without pydantic validation.

        This method returns cached legacy format JSON data, downloading if needed.
        The cache is populated during initialization, downloads, and on-demand loads.

        Args:
            category: Category to retrieve.
            redownload: If True, redownload before returning and refresh cache.

        Returns:
            dict[str, Any] | None: The raw legacy JSON dict, or None if not found.

        """
        with self._lock:
            # Use helper to determine if we need to fetch
            if redownload or self.should_fetch_data(category):
                self._download_legacy(category, overwrite_existing=redownload)

            # Try cache first
            legacy_dict, _ = self._get_legacy_from_cache(category)
            if legacy_dict is not None:
                return legacy_dict

            # Load from disk and cache if available
            file_path = self._references_paths_cache.get(category)
            if file_path:
                return self._load_legacy_json_from_disk(category, file_path)

            return None

    @override
    def get_legacy_json_string(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        redownload: bool = False,
    ) -> str | None:
        """Get raw legacy JSON string for a specific category without pydantic validation.

        This method returns cached legacy format JSON data as a string, downloading if needed.
        The cache is populated during initialization, downloads, and on-demand loads.

        Args:
            category: Category to retrieve.
            redownload: If True, redownload before returning and refresh cache.

        Returns:
            str | None: The raw legacy JSON string, or None if not found.

        """
        with self._lock:
            # Use helper to determine if we need to fetch
            if redownload or self.should_fetch_data(category):
                self._download_legacy(category, overwrite_existing=redownload)

            # Try cache first
            _, legacy_string = self._get_legacy_from_cache(category)
            if legacy_string is not None:
                return legacy_string

            # Load from disk and cache if available
            file_path = self._references_paths_cache.get(category)
            if file_path:
                self._load_legacy_json_from_disk(category, file_path)
                _, legacy_string = self._get_legacy_from_cache(category)
                return legacy_string

            return None

    def _download_and_convert_single(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
    ) -> None:
        """Download a single legacy file and convert it.

        Args:
            category: The category to download and convert.
            overwrite_existing: If True, overwrite existing files.

        """
        self._download_legacy(category, overwrite_existing=overwrite_existing)

        convert_legacy_database_by_category(category, self.base_path, self.base_path)

    def _download_and_convert_all(self, overwrite_existing: bool = False) -> None:
        """Download all legacy files and convert them."""
        for category in MODEL_REFERENCE_CATEGORY:
            self._download_legacy(category, overwrite_existing=overwrite_existing)

        convert_all_legacy_model_references(self.base_path, self.base_path)

    def _download_allowed(self) -> bool:
        """Return `True` if downloading is allowed based on replicate mode and settings."""
        if self._replicate_mode == ReplicateMode.PRIMARY and horde_model_reference_settings.github_seed_enabled:
            return True
        return self._replicate_mode == ReplicateMode.REPLICA

    def _download_legacy(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
    ) -> Path | None:
        """Download a single legacy file from GitHub (synchronous).

        Args:
            category: The category to download.
            overwrite_existing: If True, overwrite existing file.

        Returns:
            Path | None: Path to the downloaded file, or None on failure.

        """
        if not self._download_allowed():
            return self._references_paths_cache.get(category)

        target_file_path = horde_model_reference_paths.get_legacy_model_reference_file_path(
            category,
            base_path=self.base_path,
        )

        needs_refresh = self.needs_refresh(category)
        if needs_refresh:
            logger.debug(f"Category {category} needs refresh, proceeding to download")
            overwrite_existing = True

        with self._lock:
            if target_file_path.exists() and not overwrite_existing:
                logger.debug(f"Legacy file {target_file_path} already exists, skipping download")
                return target_file_path

            target_url: str | None = None
            if category in get_github_image_categories():
                target_url = horde_model_reference_paths.legacy_image_model_github_urls[category]
            elif category in get_github_text_categories():
                target_url = horde_model_reference_paths.legacy_text_model_github_urls[category]
            else:
                logger.debug(f"No known GitHub URL for {category}, creating empty file")
                target_file_path.parent.mkdir(parents=True, exist_ok=True)
                target_file_path.touch(exist_ok=True)
                return target_file_path

            try:
                for attempt in http_retry_sync(
                    max_attempts=self.retry_max_attempts,
                    min_wait=self.retry_backoff_seconds,
                    extra_exceptions=(ujson.JSONDecodeError, OSError, ValueError),
                ):
                    with attempt:
                        response = requests.get(target_url, timeout=30)

                        if response.status_code != 200:
                            raise OSError(f"Failed to download {category}: HTTP {response.status_code}")

                        # Handle CSV format for text_generation category
                        if category == MODEL_REFERENCE_CATEGORY.text_generation:
                            target_file_path.parent.mkdir(parents=True, exist_ok=True)
                            with open(target_file_path, "wb") as f:
                                f.write(response.content)

                            try:
                                data = self._read_legacy_csv_to_dict(target_file_path)
                                raw_json_str = ujson.dumps(data, escape_forward_slashes=False, indent=4)
                            except Exception as e:
                                raise ValueError(f"Failed to parse {category} CSV: {e}") from e
                        else:
                            data = ujson.loads(response.content)
                            target_file_path.parent.mkdir(parents=True, exist_ok=True)
                            raw_json_str = response.content.decode("utf-8")
                            with open(target_file_path, "wb") as f:
                                f.write(response.content)

                self._times_downloaded[category] += 1
                if self._times_downloaded[category] > 1:
                    logger.debug(f"Downloaded {category} {self._times_downloaded[category]} times")

                logger.info(f"Downloaded {category} to {target_file_path}")
                self._references_paths_cache[category] = target_file_path

                self._store_legacy_in_cache(category, data, raw_json_str)
                logger.debug(f"Populated legacy cache for {category} after download")

                return target_file_path

            except (RetryError, OSError, ujson.JSONDecodeError, ValueError):
                logger.warning(f"Failed to download {category} after {self.retry_max_attempts} attempts")
                return None

        return None

    async def _download_legacy_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        httpx_client: httpx.AsyncClient | None = None,
        overwrite_existing: bool = False,
    ) -> Path | None:
        """Download a single legacy file from GitHub (asynchronous).

        Args:
            category: The category to download.
            httpx_client: Optional httpx async client for downloads.
            overwrite_existing: If True, overwrite existing file.

        Returns:
            Path | None: Path to the downloaded file, or None on failure.

        """
        if not self._download_allowed():
            logger.debug(f"Replicate mode is not REPLICA, skipping download for {category}")
            return self._references_paths_cache.get(category)

        if httpx_client is None:
            logger.debug("No httpx_client provided, will create a new one for this download")

        target_file_path = horde_model_reference_paths.get_legacy_model_reference_file_path(
            category,
            base_path=self.base_path,
        )

        needs_refresh = self.needs_refresh(category)
        if needs_refresh:
            logger.debug(f"Category {category} needs refresh, proceeding to download")
            overwrite_existing = True

        if target_file_path.exists() and not overwrite_existing:
            logger.debug(f"Legacy file {target_file_path} already exists, skipping download")
            return target_file_path

        target_url: str | None = None
        if category in get_github_image_categories():
            target_url = horde_model_reference_paths.legacy_image_model_github_urls[category]
        elif category in get_github_text_categories():
            target_url = horde_model_reference_paths.legacy_text_model_github_urls[category]
        else:
            logger.debug(f"No known GitHub URL for {category}")
            return None

        try:
            async for attempt in http_retry_async(
                max_attempts=self.retry_max_attempts,
                min_wait=self.retry_backoff_seconds,
                extra_exceptions=(ujson.JSONDecodeError, OSError, ValueError),
            ):
                with attempt:
                    if httpx_client is not None:
                        response = await httpx_client.get(target_url)
                    else:
                        async with httpx.AsyncClient() as client:
                            response = await client.get(target_url)

                    if response.status_code != 200:
                        raise OSError(f"Failed to download {category}: HTTP {response.status_code}")

                    content = response.content
                    target_file_path.parent.mkdir(parents=True, exist_ok=True)

                    if category == MODEL_REFERENCE_CATEGORY.text_generation:
                        async with aiofiles.open(target_file_path, "wb") as f:
                            await f.write(content)

                        try:
                            data = self._read_legacy_csv_to_dict(target_file_path)
                            content_str = ujson.dumps(data, escape_forward_slashes=False, indent=4)
                        except Exception as e:
                            raise ValueError(f"Failed to parse {category} CSV: {e}") from e
                    else:
                        data = ujson.loads(content)
                        content_str = content.decode("utf-8")

                        async with aiofiles.open(target_file_path, "wb") as f:
                            await f.write(content)

            self._times_downloaded[category] += 1
            if self._times_downloaded[category] > 1:
                logger.debug(f"Downloaded {category} {self._times_downloaded[category]} times")

            logger.info(f"Downloaded {category} to {target_file_path}")
            self._references_paths_cache[category] = target_file_path

            self._store_legacy_in_cache(category, data, content_str)
            logger.debug(f"Populated legacy cache for {category} after async download")

            return target_file_path

        except (RetryError, OSError, ujson.JSONDecodeError, ValueError):
            logger.warning(f"Failed to download {category} after {self.retry_max_attempts} attempts")
            return None

base_path instance-attribute

base_path = Path(base_path)

legacy_path instance-attribute

legacy_path = joinpath(LEGACY_REFERENCE_FOLDER_NAME)

retry_max_attempts instance-attribute

retry_max_attempts = retry_max_attempts

retry_backoff_seconds instance-attribute

retry_backoff_seconds = retry_backoff_seconds

_references_paths_cache instance-attribute

_references_paths_cache: dict[
    MODEL_REFERENCE_CATEGORY, Path | None
] = {}

_times_downloaded instance-attribute

_times_downloaded: dict[MODEL_REFERENCE_CATEGORY, int] = {}

_replicate_mode class-attribute instance-attribute

_replicate_mode = mode

_invalidation_callbacks instance-attribute

_invalidation_callbacks: list[
    Callable[[MODEL_REFERENCE_CATEGORY], None]
] = []

replicate_mode property

replicate_mode: ReplicateMode

Get the replicate mode of this backend.

_cache_ttl_seconds instance-attribute

_cache_ttl_seconds = cache_ttl_seconds

_cache instance-attribute

_cache: dict[
    MODEL_REFERENCE_CATEGORY, dict[str, Any] | None
] = {}

_category_timestamps instance-attribute

_category_timestamps: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_last_known_mtimes instance-attribute

_last_known_mtimes: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_stale_categories instance-attribute

_stale_categories: set[MODEL_REFERENCE_CATEGORY] = set()

_legacy_json_cache instance-attribute

_legacy_json_cache: dict[
    MODEL_REFERENCE_CATEGORY, dict[str, Any] | None
] = {}

_legacy_json_string_cache instance-attribute

_legacy_json_string_cache: dict[
    MODEL_REFERENCE_CATEGORY, str | None
] = {}

_legacy_cache_timestamps instance-attribute

_legacy_cache_timestamps: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_legacy_last_known_mtimes instance-attribute

_legacy_last_known_mtimes: dict[
    MODEL_REFERENCE_CATEGORY, float
] = {}

_stale_legacy_categories instance-attribute

_stale_legacy_categories: set[MODEL_REFERENCE_CATEGORY] = (
    set()
)

_lock instance-attribute

_lock = RLock()

_async_lock instance-attribute

_async_lock: Lock = Lock()

cache_ttl_seconds property

cache_ttl_seconds: int | None

The cache TTL currently enforced for category payloads.

lock property

lock: RLock

Thread-safe lock shared by subclasses for critical sections.

async_lock property

async_lock: Lock | None

Asyncio lock usable by subclasses when coordinating coroutines.

__init__

__init__(
    *,
    base_path: str
    | Path = horde_model_reference_paths.base_path,
    cache_ttl_seconds: int = horde_model_reference_settings.cache_ttl_seconds,
    retry_max_attempts: int = horde_model_reference_settings.legacy_download_retry_max_attempts,
    retry_backoff_seconds: float = horde_model_reference_settings.legacy_download_retry_backoff_seconds,
    replicate_mode: ReplicateMode = ReplicateMode.REPLICA,
) -> None

Initialize the GitHub backend for REPLICA mode.

Parameters:

  • base_path (str | Path, default: base_path ) –

    Base path for storing model reference files.

  • cache_ttl_seconds (int, default: cache_ttl_seconds ) –

    TTL for internal cache in seconds.

  • retry_max_attempts (int, default: legacy_download_retry_max_attempts ) –

    Max download retry attempts.

  • retry_backoff_seconds (float, default: legacy_download_retry_backoff_seconds ) –

    Backoff time between retries.

  • replicate_mode (ReplicateMode, default: REPLICA ) –

    Must be REPLICA. Defaults to REPLICA.

Raises:

  • ValueError

    If replicate_mode is not REPLICA.

Source code in src/horde_model_reference/backends/github_backend.py
def __init__(
    self,
    *,
    base_path: str | Path = horde_model_reference_paths.base_path,
    cache_ttl_seconds: int = horde_model_reference_settings.cache_ttl_seconds,
    retry_max_attempts: int = horde_model_reference_settings.legacy_download_retry_max_attempts,
    retry_backoff_seconds: float = horde_model_reference_settings.legacy_download_retry_backoff_seconds,
    replicate_mode: ReplicateMode = ReplicateMode.REPLICA,
) -> None:
    """Initialize the GitHub backend for REPLICA mode.

    Args:
        base_path: Base path for storing model reference files.
        cache_ttl_seconds: TTL for internal cache in seconds.
        retry_max_attempts: Max download retry attempts.
        retry_backoff_seconds: Backoff time between retries.
        replicate_mode: Must be REPLICA. Defaults to REPLICA.

    Raises:
        ValueError: If replicate_mode is not REPLICA.

    """
    super().__init__(mode=replicate_mode, cache_ttl_seconds=cache_ttl_seconds)

    if self._replicate_mode != ReplicateMode.REPLICA:
        logger.warning(
            "GitHubBackend is designed for REPLICA mode only. For PRIMARY mode, use FileSystemBackend or "
            "RedisBackend. You can ignore this warning if you intend to use GitHubBackend for one-time "
            "initialization in PRIMARY mode."
        )

    self.base_path = Path(base_path)
    self.legacy_path = self.base_path.joinpath(LEGACY_REFERENCE_FOLDER_NAME)

    self.retry_max_attempts = retry_max_attempts
    self.retry_backoff_seconds = retry_backoff_seconds

    self._references_paths_cache: dict[MODEL_REFERENCE_CATEGORY, Path | None] = {}
    self._times_downloaded: dict[MODEL_REFERENCE_CATEGORY, int] = {}

    for category in MODEL_REFERENCE_CATEGORY:
        file_path = horde_model_reference_paths.get_legacy_model_reference_file_path(
            category,
            base_path=self.base_path,
        )

        if file_path.exists():
            self._references_paths_cache[category] = file_path
            self._load_legacy_json_from_disk(category, file_path)
        else:
            if self._replicate_mode == ReplicateMode.REPLICA or (
                self._replicate_mode == ReplicateMode.PRIMARY
                and horde_model_reference_settings.github_seed_enabled
            ):
                self._references_paths_cache[category] = None
            else:
                raise FileNotFoundError(f"Model reference file not found for {category}.")

        self._times_downloaded[category] = 0

_get_file_path_for_validation

_get_file_path_for_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Return the converted (v2) file path for mtime validation.

Parameters:

Returns:

  • Path | None

    Path | None: Path to converted file for mtime validation.

Source code in src/horde_model_reference/backends/github_backend.py
@override
def _get_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Return the converted (v2) file path for mtime validation.

    Args:
        category: The category to get the file path for.

    Returns:
        Path | None: Path to converted file for mtime validation.

    """
    return horde_model_reference_paths.get_model_reference_file_path(
        category,
        base_path=self.base_path,
    )

_get_legacy_file_path_for_validation

_get_legacy_file_path_for_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Return the legacy file path for mtime validation.

Parameters:

Returns:

  • Path | None

    Path | None: Path to legacy file for mtime validation.

Source code in src/horde_model_reference/backends/github_backend.py
@override
def _get_legacy_file_path_for_validation(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Return the legacy file path for mtime validation.

    Args:
        category: The category to get the legacy file path for.

    Returns:
        Path | None: Path to legacy file for mtime validation.

    """
    return self._references_paths_cache.get(category)

fetch_category

fetch_category(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Fetch model reference data for a specific category.

Downloads legacy format from GitHub, converts to new format, and returns it.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • force_refresh (bool, default: False ) –

    If True, force download even if file exists.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The converted model reference data (new format).

Source code in src/horde_model_reference/backends/github_backend.py
@override
def fetch_category(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Fetch model reference data for a specific category.

    Downloads legacy format from GitHub, converts to new format, and returns it.

    Args:
        category: The category to fetch.
        force_refresh: If True, force download even if file exists.

    Returns:
        dict[str, Any] | None: The converted model reference data (new format).

    """
    with self._lock:
        # Use helper to determine if we need to fetch
        if force_refresh or self.should_fetch_data(category):
            self._download_and_convert_single(category, overwrite_existing=force_refresh)
            return self._load_converted_from_disk(category)

        # Return cached data
        return self._get_from_cache(category)

fetch_all_categories

fetch_all_categories(
    *, force_refresh: bool = False
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Fetch model reference data for all categories.

Downloads all legacy files from GitHub, converts them, and returns new format data.

Parameters:

  • force_refresh (bool, default: False ) –

    If True, force download all files.

Returns:

Source code in src/horde_model_reference/backends/github_backend.py
@override
def fetch_all_categories(
    self,
    *,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Fetch model reference data for all categories.

    Downloads all legacy files from GitHub, converts them, and returns new format data.

    Args:
        force_refresh: If True, force download all files.

    Returns:
        dict mapping categories to their converted model reference data.

    """
    with self._lock:
        if force_refresh:
            self._download_and_convert_all(overwrite_existing=True)

        result: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
        for category in MODEL_REFERENCE_CATEGORY:
            # Use helper to determine if we need to fetch
            if force_refresh or self.should_fetch_data(category):
                self._download_legacy(category, overwrite_existing=force_refresh)
                convert_legacy_database_by_category(category, self.base_path, self.base_path)
                result[category] = self._load_converted_from_disk(category)
            else:
                # Return cached data
                result[category] = self._get_from_cache(category)

        return result

fetch_category_async async

fetch_category_async(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Asynchronously fetch model reference data for a category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional httpx async client for downloads.

  • force_refresh (bool, default: False ) –

    If True, force download.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The converted model reference data.

Source code in src/horde_model_reference/backends/github_backend.py
@override
async def fetch_category_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Asynchronously fetch model reference data for a category.

    Args:
        category: The category to fetch.
        httpx_client: Optional httpx async client for downloads.
        force_refresh: If True, force download.

    Returns:
        dict[str, Any] | None: The converted model reference data.

    """
    lock = self.async_lock
    if lock is None:
        raise RuntimeError("Async lock is unavailable for GitHubBackend")

    async with lock:
        # Use helper to determine if we need to fetch
        if force_refresh or self.should_fetch_data(category):
            await self._download_legacy_async(
                category,
                httpx_client,
                overwrite_existing=force_refresh,
            )
            convert_legacy_database_by_category(category, self.base_path, self.base_path)
            return self._load_converted_from_disk(category)

        # Return cached data
        return self._get_from_cache(category)

fetch_all_categories_async async

fetch_all_categories_async(
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Asynchronously fetch all categories.

Parameters:

  • httpx_client (AsyncClient | None, default: None ) –

    Optional httpx async client.

  • force_refresh (bool, default: False ) –

    If True, force download all.

Returns:

Source code in src/horde_model_reference/backends/github_backend.py
@override
async def fetch_all_categories_async(
    self,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Asynchronously fetch all categories.

    Args:
        httpx_client: Optional httpx async client.
        force_refresh: If True, force download all.

    Returns:
        dict mapping categories to their data.

    """
    lock = self.async_lock
    if lock is None:
        raise RuntimeError("Async lock is unavailable for GitHubBackend")

    async with lock:
        # Download all that need refresh
        tasks = []
        categories_to_download = []
        for category in MODEL_REFERENCE_CATEGORY:
            # Use helper to determine if we need to fetch
            if force_refresh or self.should_fetch_data(category):
                categories_to_download.append(category)
                tasks.append(
                    self._download_legacy_async(
                        category,
                        httpx_client,
                        overwrite_existing=force_refresh,
                    )
                )

        if tasks:
            await asyncio.gather(*tasks)
            convert_all_legacy_model_references()

        # Collect results from cache or disk
        result: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
        for category in MODEL_REFERENCE_CATEGORY:
            if category in categories_to_download:
                result[category] = self._load_converted_from_disk(category)
            else:
                result[category] = self._get_from_cache(category)

        return result

needs_refresh

needs_refresh(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if a category needs refresh.

Base class handles all validation including mtime checks via hooks.

Parameters:

Returns:

  • bool ( bool ) –

    True if needs refresh (stale or mtime changed).

Source code in src/horde_model_reference/backends/github_backend.py
@override
def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if a category needs refresh.

    Base class handles all validation including mtime checks via hooks.

    Args:
        category: The category to check.

    Returns:
        bool: True if needs refresh (stale or mtime changed).

    """
    return super().needs_refresh(category)

mark_stale

mark_stale(category: MODEL_REFERENCE_CATEGORY) -> None

Mark a category as stale, requiring refresh.

Parameters:

Source code in src/horde_model_reference/backends/github_backend.py
@override
def mark_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Mark a category as stale, requiring refresh.

    Args:
        category: The category to mark stale.

    """
    logger.debug(f"Marking category {category} as stale")
    super().mark_stale(category)

get_category_file_path

get_category_file_path(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Get the file path for a category's converted data.

Parameters:

Returns:

  • Path | None

    Path | None: Path to the converted (new format) file, or None if not available.

Source code in src/horde_model_reference/backends/github_backend.py
@override
def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Get the file path for a category's converted data.

    Args:
        category: The category to get path for.

    Returns:
        Path | None: Path to the converted (new format) file, or None if not available.

    """
    return horde_model_reference_paths.get_model_reference_file_path(
        category,
        base_path=self.base_path,
    )

get_all_category_file_paths

get_all_category_file_paths() -> dict[
    MODEL_REFERENCE_CATEGORY, Path | None
]

Get file paths for all categories' converted data.

Returns:

Source code in src/horde_model_reference/backends/github_backend.py
@override
def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
    """Get file paths for all categories' converted data.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, Path | None]: Mapping of categories to their converted file paths.

    """
    return horde_model_reference_paths.get_all_model_reference_file_paths(base_path=self.base_path)

_load_legacy_json_from_disk

_load_legacy_json_from_disk(
    category: MODEL_REFERENCE_CATEGORY, file_path: Path
) -> dict[str, Any] | None

Load legacy data from disk and populate cache via base class.

For text_generation category, loads CSV format (models.csv). For other categories, loads JSON format.

Parameters:

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The loaded data, or None on error.

Source code in src/horde_model_reference/backends/github_backend.py
def _load_legacy_json_from_disk(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    file_path: Path,
) -> dict[str, Any] | None:
    """Load legacy data from disk and populate cache via base class.

    For text_generation category, loads CSV format (models.csv).
    For other categories, loads JSON format.

    Args:
        category: The category to load.
        file_path: Path to the legacy file.

    Returns:
        dict[str, Any] | None: The loaded data, or None on error.

    """
    if not file_path.exists():
        logger.debug(f"Legacy file {file_path} does not exist")
        self._store_legacy_in_cache(category, None, None)
        return None

    try:
        # Handle CSV format for text_generation
        if category == MODEL_REFERENCE_CATEGORY.text_generation:
            data: dict[str, Any] = self._read_legacy_csv_to_dict(file_path)
            # For CSV, we store the dict and generate a JSON string for cache
            content_str = ujson.dumps(data, escape_forward_slashes=False, indent=4)
            self._store_legacy_in_cache(category, data, content_str)
            logger.debug(f"Loaded legacy CSV for category {category!r} from {file_path!r}")
            return data

        # Handle JSON format for other categories
        with open(file_path, "rb") as f:
            content = f.read()

        # Handle empty files (e.g., for categories with no GitHub URL)
        if not content or content.strip() == b"":
            logger.debug(f"Legacy file {file_path} is empty, treating as empty dict")
            self._store_legacy_in_cache(category, {}, "{}")
            return {}

        data = ujson.loads(content)
        content_str = content.decode("utf-8")

        self._store_legacy_in_cache(category, data, content_str)
        logger.debug(f"Loaded legacy JSON for category {category!r} from {file_path!r}")
        return data
    except Exception:
        logger.exception(f"Failed to load legacy data for category {category!r} from {file_path!r}")
        self._store_legacy_in_cache(category, None, None)
        return None

_read_legacy_csv_to_dict

_read_legacy_csv_to_dict(file_path: Path) -> dict[str, Any]

Read legacy CSV file (models.csv format) and convert to dict format.

Uses the shared csv_rows_to_legacy_dict to replicate convert.py exactly, including defaults.json merging, instruct_format, correct field ordering, and backend prefix generation (3 entries per model).

Parameters:

  • file_path (Path) –

    Path to the legacy CSV file.

Returns:

  • dict[str, Any]

    dict[str, Any]: Model data with 3 entries per CSV row (matching db.json format).

Source code in src/horde_model_reference/backends/github_backend.py
def _read_legacy_csv_to_dict(self, file_path: Path) -> dict[str, Any]:
    """Read legacy CSV file (models.csv format) and convert to dict format.

    Uses the shared ``csv_rows_to_legacy_dict`` to replicate convert.py exactly,
    including defaults.json merging, instruct_format, correct field ordering,
    and backend prefix generation (3 entries per model).

    Args:
        file_path: Path to the legacy CSV file.

    Returns:
        dict[str, Any]: Model data with 3 entries per CSV row (matching db.json format).

    """
    parsed_rows, parse_issues = parse_legacy_text_csv_file(file_path)
    for issue in parse_issues:
        logger.warning(f"Legacy CSV issue for {issue.row_identifier}: {issue.message}")

    data = csv_rows_to_legacy_dict(parsed_rows, with_backend_prefixes=True)
    logger.debug(f"Read {len(data)} models from legacy CSV (includes backend prefix duplicates)")
    return data

_read_csv_to_dict

_read_csv_to_dict(file_path: Path) -> dict[str, Any]

Read CSV file and convert to dict format (one entry per base model).

Uses the shared csv_rows_to_legacy_dict without backend prefixes.

Parameters:

  • file_path (Path) –

    Path to the CSV file.

Returns:

  • dict[str, Any]

    dict[str, Any]: Model data with one entry per base model.

Source code in src/horde_model_reference/backends/github_backend.py
def _read_csv_to_dict(self, file_path: Path) -> dict[str, Any]:
    """Read CSV file and convert to dict format (one entry per base model).

    Uses the shared ``csv_rows_to_legacy_dict`` without backend prefixes.

    Args:
        file_path: Path to the CSV file.

    Returns:
        dict[str, Any]: Model data with one entry per base model.

    """
    parsed_rows, parse_issues = parse_legacy_text_csv_file(file_path)
    for issue in parse_issues:
        logger.warning(f"CSV issue for {issue.row_identifier}: {issue.message}")

    data = csv_rows_to_legacy_dict(parsed_rows, with_backend_prefixes=False)
    logger.debug(f"Read {len(data)} models from CSV (grouped, no backend prefixes)")
    return data

_load_converted_from_disk

_load_converted_from_disk(
    category: MODEL_REFERENCE_CATEGORY,
) -> dict[str, Any] | None

Load converted (v2 format) data from disk and cache via base class.

All v2 format files (including text_generation.json) are in JSON format. CSV format is only used for legacy files (legacy/models.csv).

Parameters:

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The loaded data, or None on error.

Source code in src/horde_model_reference/backends/github_backend.py
def _load_converted_from_disk(
    self,
    category: MODEL_REFERENCE_CATEGORY,
) -> dict[str, Any] | None:
    """Load converted (v2 format) data from disk and cache via base class.

    All v2 format files (including text_generation.json) are in JSON format.
    CSV format is only used for legacy files (legacy/models.csv).

    Args:
        category: The category to load.

    Returns:
        dict[str, Any] | None: The loaded data, or None on error.

    """
    file_path = horde_model_reference_paths.get_model_reference_file_path(
        category,
        base_path=self.base_path,
    )

    if not file_path.exists():
        logger.debug(f"Converted file {file_path} does not exist")
        self._store_in_cache(category, None)
        return None

    try:
        # All v2 files are JSON format (including text_generation.json)
        with open(file_path, encoding="utf-8") as f:
            data = cast(dict[str, Any], ujson.load(f))

        self._store_in_cache(category, data)
        logger.debug(f"Loaded converted JSON for category {category!r} from {file_path!r}")
        return data
    except Exception:
        logger.exception(f"Failed to load converted data for category {category!r} from {file_path!r}")
        self._store_in_cache(category, None)
        return None

get_legacy_json

get_legacy_json(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None

Get raw legacy JSON for a specific category without pydantic validation.

This method returns cached legacy format JSON data, downloading if needed. The cache is populated during initialization, downloads, and on-demand loads.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Category to retrieve.

  • redownload (bool, default: False ) –

    If True, redownload before returning and refresh cache.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The raw legacy JSON dict, or None if not found.

Source code in src/horde_model_reference/backends/github_backend.py
@override
def get_legacy_json(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None:
    """Get raw legacy JSON for a specific category without pydantic validation.

    This method returns cached legacy format JSON data, downloading if needed.
    The cache is populated during initialization, downloads, and on-demand loads.

    Args:
        category: Category to retrieve.
        redownload: If True, redownload before returning and refresh cache.

    Returns:
        dict[str, Any] | None: The raw legacy JSON dict, or None if not found.

    """
    with self._lock:
        # Use helper to determine if we need to fetch
        if redownload or self.should_fetch_data(category):
            self._download_legacy(category, overwrite_existing=redownload)

        # Try cache first
        legacy_dict, _ = self._get_legacy_from_cache(category)
        if legacy_dict is not None:
            return legacy_dict

        # Load from disk and cache if available
        file_path = self._references_paths_cache.get(category)
        if file_path:
            return self._load_legacy_json_from_disk(category, file_path)

        return None

get_legacy_json_string

get_legacy_json_string(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None

Get raw legacy JSON string for a specific category without pydantic validation.

This method returns cached legacy format JSON data as a string, downloading if needed. The cache is populated during initialization, downloads, and on-demand loads.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Category to retrieve.

  • redownload (bool, default: False ) –

    If True, redownload before returning and refresh cache.

Returns:

  • str | None

    str | None: The raw legacy JSON string, or None if not found.

Source code in src/horde_model_reference/backends/github_backend.py
@override
def get_legacy_json_string(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None:
    """Get raw legacy JSON string for a specific category without pydantic validation.

    This method returns cached legacy format JSON data as a string, downloading if needed.
    The cache is populated during initialization, downloads, and on-demand loads.

    Args:
        category: Category to retrieve.
        redownload: If True, redownload before returning and refresh cache.

    Returns:
        str | None: The raw legacy JSON string, or None if not found.

    """
    with self._lock:
        # Use helper to determine if we need to fetch
        if redownload or self.should_fetch_data(category):
            self._download_legacy(category, overwrite_existing=redownload)

        # Try cache first
        _, legacy_string = self._get_legacy_from_cache(category)
        if legacy_string is not None:
            return legacy_string

        # Load from disk and cache if available
        file_path = self._references_paths_cache.get(category)
        if file_path:
            self._load_legacy_json_from_disk(category, file_path)
            _, legacy_string = self._get_legacy_from_cache(category)
            return legacy_string

        return None

_download_and_convert_single

_download_and_convert_single(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> None

Download a single legacy file and convert it.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to download and convert.

  • overwrite_existing (bool, default: False ) –

    If True, overwrite existing files.

Source code in src/horde_model_reference/backends/github_backend.py
def _download_and_convert_single(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> None:
    """Download a single legacy file and convert it.

    Args:
        category: The category to download and convert.
        overwrite_existing: If True, overwrite existing files.

    """
    self._download_legacy(category, overwrite_existing=overwrite_existing)

    convert_legacy_database_by_category(category, self.base_path, self.base_path)

_download_and_convert_all

_download_and_convert_all(
    overwrite_existing: bool = False,
) -> None

Download all legacy files and convert them.

Source code in src/horde_model_reference/backends/github_backend.py
def _download_and_convert_all(self, overwrite_existing: bool = False) -> None:
    """Download all legacy files and convert them."""
    for category in MODEL_REFERENCE_CATEGORY:
        self._download_legacy(category, overwrite_existing=overwrite_existing)

    convert_all_legacy_model_references(self.base_path, self.base_path)

_download_allowed

_download_allowed() -> bool

Return True if downloading is allowed based on replicate mode and settings.

Source code in src/horde_model_reference/backends/github_backend.py
def _download_allowed(self) -> bool:
    """Return `True` if downloading is allowed based on replicate mode and settings."""
    if self._replicate_mode == ReplicateMode.PRIMARY and horde_model_reference_settings.github_seed_enabled:
        return True
    return self._replicate_mode == ReplicateMode.REPLICA

_download_legacy

_download_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> Path | None

Download a single legacy file from GitHub (synchronous).

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to download.

  • overwrite_existing (bool, default: False ) –

    If True, overwrite existing file.

Returns:

  • Path | None

    Path | None: Path to the downloaded file, or None on failure.

Source code in src/horde_model_reference/backends/github_backend.py
def _download_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> Path | None:
    """Download a single legacy file from GitHub (synchronous).

    Args:
        category: The category to download.
        overwrite_existing: If True, overwrite existing file.

    Returns:
        Path | None: Path to the downloaded file, or None on failure.

    """
    if not self._download_allowed():
        return self._references_paths_cache.get(category)

    target_file_path = horde_model_reference_paths.get_legacy_model_reference_file_path(
        category,
        base_path=self.base_path,
    )

    needs_refresh = self.needs_refresh(category)
    if needs_refresh:
        logger.debug(f"Category {category} needs refresh, proceeding to download")
        overwrite_existing = True

    with self._lock:
        if target_file_path.exists() and not overwrite_existing:
            logger.debug(f"Legacy file {target_file_path} already exists, skipping download")
            return target_file_path

        target_url: str | None = None
        if category in get_github_image_categories():
            target_url = horde_model_reference_paths.legacy_image_model_github_urls[category]
        elif category in get_github_text_categories():
            target_url = horde_model_reference_paths.legacy_text_model_github_urls[category]
        else:
            logger.debug(f"No known GitHub URL for {category}, creating empty file")
            target_file_path.parent.mkdir(parents=True, exist_ok=True)
            target_file_path.touch(exist_ok=True)
            return target_file_path

        try:
            for attempt in http_retry_sync(
                max_attempts=self.retry_max_attempts,
                min_wait=self.retry_backoff_seconds,
                extra_exceptions=(ujson.JSONDecodeError, OSError, ValueError),
            ):
                with attempt:
                    response = requests.get(target_url, timeout=30)

                    if response.status_code != 200:
                        raise OSError(f"Failed to download {category}: HTTP {response.status_code}")

                    # Handle CSV format for text_generation category
                    if category == MODEL_REFERENCE_CATEGORY.text_generation:
                        target_file_path.parent.mkdir(parents=True, exist_ok=True)
                        with open(target_file_path, "wb") as f:
                            f.write(response.content)

                        try:
                            data = self._read_legacy_csv_to_dict(target_file_path)
                            raw_json_str = ujson.dumps(data, escape_forward_slashes=False, indent=4)
                        except Exception as e:
                            raise ValueError(f"Failed to parse {category} CSV: {e}") from e
                    else:
                        data = ujson.loads(response.content)
                        target_file_path.parent.mkdir(parents=True, exist_ok=True)
                        raw_json_str = response.content.decode("utf-8")
                        with open(target_file_path, "wb") as f:
                            f.write(response.content)

            self._times_downloaded[category] += 1
            if self._times_downloaded[category] > 1:
                logger.debug(f"Downloaded {category} {self._times_downloaded[category]} times")

            logger.info(f"Downloaded {category} to {target_file_path}")
            self._references_paths_cache[category] = target_file_path

            self._store_legacy_in_cache(category, data, raw_json_str)
            logger.debug(f"Populated legacy cache for {category} after download")

            return target_file_path

        except (RetryError, OSError, ujson.JSONDecodeError, ValueError):
            logger.warning(f"Failed to download {category} after {self.retry_max_attempts} attempts")
            return None

    return None

_download_legacy_async async

_download_legacy_async(
    category: MODEL_REFERENCE_CATEGORY,
    httpx_client: AsyncClient | None = None,
    overwrite_existing: bool = False,
) -> Path | None

Download a single legacy file from GitHub (asynchronous).

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to download.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional httpx async client for downloads.

  • overwrite_existing (bool, default: False ) –

    If True, overwrite existing file.

Returns:

  • Path | None

    Path | None: Path to the downloaded file, or None on failure.

Source code in src/horde_model_reference/backends/github_backend.py
async def _download_legacy_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    httpx_client: httpx.AsyncClient | None = None,
    overwrite_existing: bool = False,
) -> Path | None:
    """Download a single legacy file from GitHub (asynchronous).

    Args:
        category: The category to download.
        httpx_client: Optional httpx async client for downloads.
        overwrite_existing: If True, overwrite existing file.

    Returns:
        Path | None: Path to the downloaded file, or None on failure.

    """
    if not self._download_allowed():
        logger.debug(f"Replicate mode is not REPLICA, skipping download for {category}")
        return self._references_paths_cache.get(category)

    if httpx_client is None:
        logger.debug("No httpx_client provided, will create a new one for this download")

    target_file_path = horde_model_reference_paths.get_legacy_model_reference_file_path(
        category,
        base_path=self.base_path,
    )

    needs_refresh = self.needs_refresh(category)
    if needs_refresh:
        logger.debug(f"Category {category} needs refresh, proceeding to download")
        overwrite_existing = True

    if target_file_path.exists() and not overwrite_existing:
        logger.debug(f"Legacy file {target_file_path} already exists, skipping download")
        return target_file_path

    target_url: str | None = None
    if category in get_github_image_categories():
        target_url = horde_model_reference_paths.legacy_image_model_github_urls[category]
    elif category in get_github_text_categories():
        target_url = horde_model_reference_paths.legacy_text_model_github_urls[category]
    else:
        logger.debug(f"No known GitHub URL for {category}")
        return None

    try:
        async for attempt in http_retry_async(
            max_attempts=self.retry_max_attempts,
            min_wait=self.retry_backoff_seconds,
            extra_exceptions=(ujson.JSONDecodeError, OSError, ValueError),
        ):
            with attempt:
                if httpx_client is not None:
                    response = await httpx_client.get(target_url)
                else:
                    async with httpx.AsyncClient() as client:
                        response = await client.get(target_url)

                if response.status_code != 200:
                    raise OSError(f"Failed to download {category}: HTTP {response.status_code}")

                content = response.content
                target_file_path.parent.mkdir(parents=True, exist_ok=True)

                if category == MODEL_REFERENCE_CATEGORY.text_generation:
                    async with aiofiles.open(target_file_path, "wb") as f:
                        await f.write(content)

                    try:
                        data = self._read_legacy_csv_to_dict(target_file_path)
                        content_str = ujson.dumps(data, escape_forward_slashes=False, indent=4)
                    except Exception as e:
                        raise ValueError(f"Failed to parse {category} CSV: {e}") from e
                else:
                    data = ujson.loads(content)
                    content_str = content.decode("utf-8")

                    async with aiofiles.open(target_file_path, "wb") as f:
                        await f.write(content)

        self._times_downloaded[category] += 1
        if self._times_downloaded[category] > 1:
            logger.debug(f"Downloaded {category} {self._times_downloaded[category]} times")

        logger.info(f"Downloaded {category} to {target_file_path}")
        self._references_paths_cache[category] = target_file_path

        self._store_legacy_in_cache(category, data, content_str)
        logger.debug(f"Populated legacy cache for {category} after async download")

        return target_file_path

    except (RetryError, OSError, ujson.JSONDecodeError, ValueError):
        logger.warning(f"Failed to download {category} after {self.retry_max_attempts} attempts")
        return None

register_invalidation_callback

register_invalidation_callback(
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None

Register a callback to be called when a category is invalidated.

This allows external components (like ModelReferenceManager) to be notified when cached data becomes stale and needs to be refreshed.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def register_invalidation_callback(
    self,
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None:
    """Register a callback to be called when a category is invalidated.

    This allows external components (like ModelReferenceManager) to be notified
    when cached data becomes stale and needs to be refreshed.

    Args:
        callback: Function to call with the invalidated category.

    """
    self._invalidation_callbacks.append(callback)
    logger.debug(f"Registered invalidation callback: {getattr(callback, '__name__', repr(callback))}")

_notify_invalidation

_notify_invalidation(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Notify all registered callbacks that a category has been invalidated.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def _notify_invalidation(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Notify all registered callbacks that a category has been invalidated.

    Args:
        category: The category that was invalidated.

    """
    for callback in self._invalidation_callbacks:
        try:
            callback(category)
        except Exception as e:
            cb_name = getattr(callback, "__name__", repr(callback))
            logger.error(f"Invalidation callback {cb_name} failed for {category}: {e}")

_mark_stale_impl

_mark_stale_impl(
    category: MODEL_REFERENCE_CATEGORY,
) -> None
Source code in src/horde_model_reference/backends/replica_backend_base.py
@override
def _mark_stale_impl(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    with self._lock:
        self._stale_categories.add(category)

support_any_writes

support_any_writes() -> bool

Check if this backend supports any write operations (v2 or legacy).

Returns:

  • bool ( bool ) –

    True if any write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def support_any_writes(self) -> bool:
    """Check if this backend supports any write operations (v2 or legacy).

    Returns:
        bool: True if any write operations are supported, False otherwise.

    """
    return self.supports_writes() or self.supports_legacy_writes()

supports_writes

supports_writes() -> bool

Check if this backend supports write operations (v2 format).

Write operations include update_model() and delete_model(). Typically only PRIMARY mode backends support writes.

Returns:

  • bool ( bool ) –

    True if write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_writes(self) -> bool:
    """Check if this backend supports write operations (v2 format).

    Write operations include update_model() and delete_model().
    Typically only PRIMARY mode backends support writes.

    Returns:
        bool: True if write operations are supported, False otherwise.

    """
    return False

supports_legacy_writes

supports_legacy_writes() -> bool

Check if this backend supports write operations in legacy format.

Legacy write operations include update_model_legacy() and delete_model_legacy(). Only available when canonical_format='LEGACY' in PRIMARY mode.

Returns:

  • bool ( bool ) –

    True if legacy write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_legacy_writes(self) -> bool:
    """Check if this backend supports write operations in legacy format.

    Legacy write operations include update_model_legacy() and delete_model_legacy().
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Returns:
        bool: True if legacy write operations are supported, False otherwise.

    """
    return False

supports_cache_warming

supports_cache_warming() -> bool

Check if this backend supports cache warming operations.

Cache warming pre-populates the cache with data to improve initial request performance. Typically only backends with distributed caching (like Redis) support this.

Returns:

  • bool ( bool ) –

    True if cache warming is supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_cache_warming(self) -> bool:
    """Check if this backend supports cache warming operations.

    Cache warming pre-populates the cache with data to improve initial request performance.
    Typically only backends with distributed caching (like Redis) support this.

    Returns:
        bool: True if cache warming is supported, False otherwise.

    """
    return False

supports_health_checks

supports_health_checks() -> bool

Check if this backend supports health check operations.

Health checks verify that the backend's external dependencies (Redis, databases, etc.) are accessible and functioning correctly.

Returns:

  • bool ( bool ) –

    True if health checks are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_health_checks(self) -> bool:
    """Check if this backend supports health check operations.

    Health checks verify that the backend's external dependencies (Redis, databases, etc.)
    are accessible and functioning correctly.

    Returns:
        bool: True if health checks are supported, False otherwise.

    """
    return False

supports_statistics

supports_statistics() -> bool

Check if this backend supports statistics retrieval.

Statistics provide insights into backend performance, cache hits/misses, etc.

Returns:

  • bool ( bool ) –

    True if statistics are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_statistics(self) -> bool:
    """Check if this backend supports statistics retrieval.

    Statistics provide insights into backend performance, cache hits/misses, etc.

    Returns:
        bool: True if statistics are supported, False otherwise.

    """
    return False

update_model

update_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data as a dictionary.

Parameters:

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Requirements
  • Create model if it doesn't exist
  • Update model if it exists
  • Ensure atomic writes (use temp files with rename for file-based backends)
  • Call mark_stale() after successful write to invalidate cache
  • Override supports_writes() to return True
See Also
Source code in src/horde_model_reference/backends/base.py
def update_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data as a dictionary.

    Args:
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Requirements:
        - Create model if it doesn't exist
        - Update model if it exists
        - Ensure atomic writes (use temp files with rename for file-based backends)
        - Call [mark_stale()][(c).mark_stale] after successful write to invalidate cache
        - Override [supports_writes()][(c).supports_writes] to return `True`

    See Also:
        - [update_model_from_base_model()][(c).update_model_from_base_model]:
          Update from pydantic model (automatically provided)
        - [delete_model()][(c).delete_model]: Delete a model
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

update_model_from_base_model

update_model_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference from a pydantic BaseModel.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Note

The base class provides this implementation automatically. It: 1. Checks supports_writes() returns True 2. Converts the pydantic model to dict using model_dump(exclude_unset=True) 3. Calls update_model() with the dictionary

Backends that support writes typically don't need to override this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def update_model_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference from a pydantic BaseModel.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Note:
        The base class provides this implementation automatically. It:
        1. Checks [supports_writes()][(c).supports_writes] returns `True`
        2. Converts the pydantic model to dict using `model_dump(exclude_unset=True)`
        3. Calls [update_model()][(c).update_model] with the dictionary

        Backends that support writes typically don't need to override this method.

    See Also:
        - [update_model()][(c).update_model]: Update from dictionary (implement this)
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    if not self.supports_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model

delete_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

Parameters:

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Requirements
  • Raise KeyError if model doesn't exist
  • Ensure atomic writes
  • Call mark_stale() after successful delete to invalidate cache
  • Override supports_writes() to return True
See Also
Source code in src/horde_model_reference/backends/base.py
def delete_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.

    Args:
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.
        KeyError: If the model doesn't exist.

    Implementation Requirements:
        - Raise `KeyError` if model doesn't exist
        - Ensure atomic writes
        - Call [mark_stale()][(c).mark_stale] after successful delete to invalidate cache
        - Override [supports_writes()][(c).supports_writes] to return `True`

    See Also:
        - [update_model()][(c).update_model]: Update or create a model
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

update_model_legacy

update_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data in legacy format as a dictionary.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data in legacy format as a dictionary.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

update_model_legacy_from_base_model

update_model_legacy_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format from a pydantic BaseModel.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format from a pydantic BaseModel.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    if not self.supports_legacy_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model_legacy(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model_legacy

delete_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference from legacy format files.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category containing the model.

  • model_name (str) –

    The name of the model to delete.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def delete_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference from legacy format files.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

warm_cache

warm_cache() -> None

Pre-populate cache with all categories for faster initial requests.

This is an optional method that backends with cache warming support can implement. Backends without cache warming should leave the default implementation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def warm_cache(self) -> None:
    """Pre-populate cache with all categories for faster initial requests.

    This is an optional method that backends with cache warming support can implement.
    Backends without cache warming should leave the default implementation.

    Raises:
        NotImplementedError: If the backend does not support cache warming.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support cache warming")

warm_cache_async async

warm_cache_async() -> None

Asynchronously pre-populate cache with all categories for faster initial requests.

This is an optional method that backends with cache warming support can implement. Backends without cache warming should leave the default implementation.

Raises:

Source code in src/horde_model_reference/backends/base.py
async def warm_cache_async(self) -> None:
    """Asynchronously pre-populate cache with all categories for faster initial requests.

    This is an optional method that backends with cache warming support can implement.
    Backends without cache warming should leave the default implementation.

    Raises:
        NotImplementedError: If the backend does not support async cache warming.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async cache warming")

health_check

health_check() -> bool

Check the health of the backend's external dependencies.

This is an optional method that backends with health check support can implement. Backends without external dependencies should leave the default implementation.

Returns:

  • bool ( bool ) –

    True if healthy, False otherwise.

Raises:

Source code in src/horde_model_reference/backends/base.py
def health_check(self) -> bool:
    """Check the health of the backend's external dependencies.

    This is an optional method that backends with health check support can implement.
    Backends without external dependencies should leave the default implementation.

    Returns:
        bool: True if healthy, False otherwise.

    Raises:
        NotImplementedError: If the backend does not support health checks.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support health checks")

get_statistics

get_statistics() -> dict[str, Any]

Get backend performance and usage statistics.

This is an optional method that backends with statistics support can implement. The structure of returned statistics is backend-specific.

Returns:

  • dict[str, Any]

    dict[str, Any]: Backend-specific statistics.

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_statistics(self) -> dict[str, Any]:
    """Get backend performance and usage statistics.

    This is an optional method that backends with statistics support can implement.
    The structure of returned statistics is backend-specific.

    Returns:
        dict[str, Any]: Backend-specific statistics.

    Raises:
        NotImplementedError: If the backend does not support statistics.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support statistics")

get_replicate_mode

get_replicate_mode() -> ReplicateMode

Get the replication mode of this backend.

Returns:

  • ReplicateMode ( ReplicateMode ) –

    The replicate mode (PRIMARY or REPLICA).

Source code in src/horde_model_reference/backends/base.py
def get_replicate_mode(self) -> ReplicateMode:
    """Get the replication mode of this backend.

    Returns:
        ReplicateMode: The replicate mode (PRIMARY or REPLICA).

    """
    return self._replicate_mode

supports_metadata

supports_metadata() -> bool

Check if this backend supports metadata tracking.

Metadata tracking records operation counts, timestamps, and health metrics for both legacy (v1) and v2 format operations. Typically only PRIMARY mode backends support metadata tracking.

Returns:

  • bool ( bool ) –

    True if metadata tracking is supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_metadata(self) -> bool:
    """Check if this backend supports metadata tracking.

    Metadata tracking records operation counts, timestamps, and health metrics
    for both legacy (v1) and v2 format operations. Typically only PRIMARY mode
    backends support metadata tracking.

    Returns:
        bool: True if metadata tracking is supported, False otherwise.

    """
    return False

get_legacy_metadata

get_legacy_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get legacy format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get legacy format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_legacy_metadata_async async

get_legacy_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get legacy format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_legacy_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get legacy format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_metadata

get_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get v2 format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get v2 format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_metadata_async async

get_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get v2 format metadata for a specific category.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Parameters:

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get v2 format metadata for a specific category.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata: The v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_all_legacy_metadata

get_all_legacy_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get legacy format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get legacy format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_all_legacy_metadata_async async

get_all_legacy_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get legacy format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_all_legacy_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get legacy format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

get_all_metadata

get_all_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get v2 format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
def get_all_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get v2 format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support metadata tracking")

get_all_metadata_async async

get_all_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get v2 format metadata for all categories.

This is an optional method that metadata-capable backends can implement. Backends without metadata support should leave the default implementation.

Returns:

Raises:

Source code in src/horde_model_reference/backends/base.py
async def get_all_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get v2 format metadata for all categories.

    This is an optional method that metadata-capable backends can implement.
    Backends without metadata support should leave the default implementation.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    Raises:
        NotImplementedError: If the backend does not support metadata.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support async metadata tracking")

_mark_category_fresh

_mark_category_fresh(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Record that we hold a fresh cache entry for category.

Also updates mtime if a file path is provided by the subclass.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _mark_category_fresh(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Record that we hold a fresh cache entry for *category*.

    Also updates mtime if a file path is provided by the subclass.

    Args:
        category: The category to mark as fresh.

    """
    self._category_timestamps[category] = time.time()
    self._stale_categories.discard(category)

    file_path = self._get_file_path_for_validation(category)
    if file_path and file_path.exists():
        try:
            self._last_known_mtimes[category] = file_path.stat().st_mtime
        except Exception:
            self._last_known_mtimes[category] = 0.0

    logger.debug(f"Marked category {category} as fresh")

_invalidate_category_timestamp

_invalidate_category_timestamp(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Drop timestamp knowledge for category without adjusting payloads.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_category_timestamp(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Drop timestamp knowledge for *category* without adjusting payloads."""
    self._category_timestamps.pop(category, None)

has_cached_data

has_cached_data(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if any data has been cached for this category.

This is a simple existence check that doesn't validate freshness. Use this for initial fetch detection: "Have we loaded this at least once?"

Parameters:

Returns:

  • bool ( bool ) –

    True if data exists in cache (may be stale), False if never loaded.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def has_cached_data(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if any data has been cached for this category.

    This is a simple existence check that doesn't validate freshness.
    Use this for initial fetch detection: "Have we loaded this at least once?"

    Args:
        category: The category to check.

    Returns:
        bool: True if data exists in cache (may be stale), False if never loaded.

    """
    with self._lock:
        return category in self._cache

is_cache_valid

is_cache_valid(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if cached data exists and is still valid for the given category.

This method performs comprehensive validation to determine if cached data can be used without refetching. It's primarily used internally by cache retrieval methods but can also be called directly for validation checks.

Parameters:

Returns:

  • bool

    True if cache exists and all validation checks pass, False otherwise.

Validation Steps

The method performs checks in the following order:

  1. Explicit Staleness: Returns False if category is in _stale_categories
  2. Cache Existence: Returns False if category has never been cached
  3. Timestamp Existence: Returns False if no timestamp recorded
  4. TTL Expiration: Checks if cache_ttl_seconds exceeded (calls mark_stale() if expired)
  5. File Modification: Compares current file mtime with cached mtime (calls mark_stale() if changed)
  6. Custom Validation: Calls _additional_cache_validation() for subclass-specific checks
Side Effects

When staleness is detected (TTL expiration or mtime change), this method calls mark_stale() to trigger invalidation callbacks and notify the manager.

Return Value Semantics
  • Returns False for both "no data" and "stale data" cases
  • Use has_cached_data() to distinguish between these cases
  • Use needs_refresh() to check staleness without considering initial fetch
Note

This method is thread-safe and uses the internal _lock for synchronization.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def is_cache_valid(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if cached data exists and is still valid for the given category.

    This method performs comprehensive validation to determine if cached data can be
    used without refetching. It's primarily used internally by cache retrieval methods
    but can also be called directly for validation checks.

    Args:
        category: The category to validate.

    Returns:
        True if cache exists and all validation checks pass, False otherwise.

    Validation Steps:
        The method performs checks in the following order:

        1. **Explicit Staleness**: Returns False if category is in `_stale_categories`
        2. **Cache Existence**: Returns False if category has never been cached
        3. **Timestamp Existence**: Returns False if no timestamp recorded
        4. **TTL Expiration**: Checks if `cache_ttl_seconds` exceeded (calls `mark_stale()` if expired)
        5. **File Modification**: Compares current file mtime with cached mtime (calls `mark_stale()` if changed)
        6. **Custom Validation**: Calls `_additional_cache_validation()` for subclass-specific checks

    Side Effects:
        When staleness is detected (TTL expiration or mtime change), this method calls
        `mark_stale()` to trigger invalidation callbacks and notify the manager.

    Related Methods:
        - `has_cached_data()`: Simple existence check, ignores validity
        - `should_fetch_data()`: Combined check for "should I fetch?" (initial OR refresh)
        - `needs_refresh()`: Checks if cached data should be refetched (staleness only)

    Return Value Semantics:
        - Returns `False` for both "no data" and "stale data" cases
        - Use `has_cached_data()` to distinguish between these cases
        - Use `needs_refresh()` to check staleness without considering initial fetch


    Note:
        This method is thread-safe and uses the internal `_lock` for synchronization.

    """
    with self._lock:
        if category in self._stale_categories:
            logger.debug(f"Category {category} marked stale, cache invalid")
            return False

        if category not in self._cache:
            return False

        last_updated = self._category_timestamps.get(category)

    if last_updated is None:
        logger.debug(f"Category {category} has no timestamp, considering cache invalid")
        return False

    if self._cache_ttl_seconds is not None:
        elapsed = time.time() - last_updated
        if elapsed > self._cache_ttl_seconds:
            logger.debug(f"Category {category} TTL expired ({elapsed}s > {self._cache_ttl_seconds}s)")
            self.mark_stale(category)
            return False

    file_path = self._get_file_path_for_validation(category)
    if file_path and file_path.exists():
        try:
            current_mtime = file_path.stat().st_mtime
            last_known = self._last_known_mtimes.get(category, 0.0)
            if current_mtime != last_known:
                logger.debug(
                    f"File {file_path.name} mtime changed "
                    f"(current={current_mtime}, cached={last_known}), cache invalid"
                )
                self.mark_stale(category)
                return False
        except Exception:
            return False

    if not self._additional_cache_validation(category):
        logger.debug(f"Category {category} failed additional validation")
        return False

    return True

should_fetch_data

should_fetch_data(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Determine if data should be fetched (initial load OR refresh).

This is a convenience method that combines both initial fetch detection and refresh detection into a single check. Use this when you want to know "should I fetch data now?" regardless of whether it's an initial load or a refresh.

This is equivalent to: not is_cache_valid(category) or needs_refresh(category) but handles the logic more efficiently.

Parameters:

Returns:

  • bool ( bool ) –

    True if data should be fetched (either initial or refresh), False if cached data is valid and fresh.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def should_fetch_data(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Determine if data should be fetched (initial load OR refresh).

    This is a convenience method that combines both initial fetch detection
    and refresh detection into a single check. Use this when you want to
    know "should I fetch data now?" regardless of whether it's an initial
    load or a refresh.

    This is equivalent to: `not is_cache_valid(category) or needs_refresh(category)`
    but handles the logic more efficiently.

    Args:
        category: The category to check.

    Returns:
        bool: True if data should be fetched (either initial or refresh),
              False if cached data is valid and fresh.

    """
    return not self.is_cache_valid(category)

_set_cache_ttl_seconds

_set_cache_ttl_seconds(ttl_seconds: int | None) -> None

Allow subclasses to tweak TTL after initialization if desired.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _set_cache_ttl_seconds(self, ttl_seconds: int | None) -> None:
    """Allow subclasses to tweak TTL after initialization if desired."""
    self._cache_ttl_seconds = ttl_seconds

_additional_cache_validation

_additional_cache_validation(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Perform additional cache validation.

Subclasses can override this to add custom validation logic beyond TTL and mtime checks. This is called during is_cache_valid().

Parameters:

Returns:

  • bool ( bool ) –

    True if cache is valid, False to invalidate.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _additional_cache_validation(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Perform additional cache validation.

    Subclasses can override this to add custom validation logic beyond
    TTL and mtime checks. This is called during `is_cache_valid()`.

    Args:
        category: The category to validate.

    Returns:
        bool: True if cache is valid, False to invalidate.

    """
    return True

_fetch_with_cache

_fetch_with_cache(
    category: MODEL_REFERENCE_CATEGORY,
    fetch_fn: Callable[[], dict[str, Any] | None],
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Implement standard fetch pattern with automatic caching.

This helper method implements the recommended fetch pattern: 1. Check cache if not forcing refresh 2. Return cached data if valid 3. Fetch data using provided function 4. Store in cache and return

Use this in your fetch_category() implementations to avoid boilerplate.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • fetch_fn (Callable[[], dict[str, Any] | None]) –

    Callable that fetches the data (no args, returns dict or None).

  • force_refresh (bool, default: False ) –

    If True, skip cache check and force fetch.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The fetched/cached data.

Example

def fetch_category(self, category, *, force_refresh=False): return self._fetch_with_cache( category, lambda: self._fetch_from_source(category), force_refresh=force_refresh )

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _fetch_with_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    fetch_fn: Callable[[], dict[str, Any] | None],
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Implement standard fetch pattern with automatic caching.

    This helper method implements the recommended fetch pattern:
    1. Check cache if not forcing refresh
    2. Return cached data if valid
    3. Fetch data using provided function
    4. Store in cache and return

    Use this in your fetch_category() implementations to avoid boilerplate.

    Args:
        category: The category to fetch.
        fetch_fn: Callable that fetches the data (no args, returns dict or None).
        force_refresh: If True, skip cache check and force fetch.

    Returns:
        dict[str, Any] | None: The fetched/cached data.

    Example:
        def fetch_category(self, category, *, force_refresh=False):
            return self._fetch_with_cache(
                category,
                lambda: self._fetch_from_source(category),
                force_refresh=force_refresh
            )

    """
    # Check cache first unless force refresh
    if not force_refresh:
        cached_data = self._get_from_cache(category)
        if cached_data is not None:
            return cached_data

    # Fetch data
    data = fetch_fn()

    # Store in cache
    if data is not None:
        self._store_in_cache(category, data)
    else:
        # Store None to indicate "checked but not found"
        self._store_in_cache(category, None)

    return data

_get_from_cache

_get_from_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> dict[str, Any] | None

Get data from cache if valid.

This is the primary method subclasses should use to retrieve cached data. It handles all validation logic internally, including initial fetch detection (returns None if data has never been loaded).

This method determines if an INITIAL fetch is needed by checking cache existence. Use needs_refresh() to check if existing cached data should be RE-fetched.

Parameters:

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: Cached data if valid, None if cache miss (initial fetch needed) or cache invalid (refresh needed).

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_from_cache(self, category: MODEL_REFERENCE_CATEGORY) -> dict[str, Any] | None:
    """Get data from cache if valid.

    This is the primary method subclasses should use to retrieve cached data.
    It handles all validation logic internally, including initial fetch detection
    (returns None if data has never been loaded).

    This method determines if an INITIAL fetch is needed by checking cache existence.
    Use `needs_refresh()` to check if existing cached data should be RE-fetched.

    Args:
        category: The category to retrieve from cache.

    Returns:
        dict[str, Any] | None: Cached data if valid, None if cache miss (initial fetch needed)
                               or cache invalid (refresh needed).

    """
    with self._lock:
        if self.is_cache_valid(category):
            logger.debug(f"Cache hit for {category}")
            return self._cache.get(category)

        logger.debug(f"Cache miss for {category}")
        return None

_store_in_cache

_store_in_cache(
    category: MODEL_REFERENCE_CATEGORY,
    data: dict[str, Any] | None,
) -> None

Store data in cache and mark category as fresh.

This is the primary method subclasses should use to store fetched data. It handles timestamp updates and mtime tracking internally.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _store_in_cache(self, category: MODEL_REFERENCE_CATEGORY, data: dict[str, Any] | None) -> None:
    """Store data in cache and mark category as fresh.

    This is the primary method subclasses should use to store fetched data.
    It handles timestamp updates and mtime tracking internally.

    Args:
        category: The category to store.
        data: The data to cache, or None if category has no data.

    """
    with self._lock:
        self._cache[category] = data
        # Only mark as fresh if we actually have data
        # None values indicate failed loads and should not prevent retries
        if data is not None:
            self._mark_category_fresh(category)
            logger.debug(f"Stored {category} in cache")
        else:
            logger.debug(f"Stored None for {category}, not marking as fresh")

_invalidate_cache

_invalidate_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Invalidate cache for a category without deleting the data.

This marks the category as stale, forcing a refetch on next access.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Invalidate cache for a category without deleting the data.

    This marks the category as stale, forcing a refetch on next access.

    Args:
        category: The category to invalidate.

    """
    with self._lock:
        self._stale_categories.add(category)
        self._category_timestamps.pop(category, None)
        logger.debug(f"Invalidated cache for {category}")

_mark_legacy_category_fresh

_mark_legacy_category_fresh(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Record that we hold a fresh legacy cache entry for category.

Also updates legacy file mtime if a path is provided by the subclass.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _mark_legacy_category_fresh(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Record that we hold a fresh legacy cache entry for *category*.

    Also updates legacy file mtime if a path is provided by the subclass.

    Args:
        category: The category to mark as fresh.

    """
    self._legacy_cache_timestamps[category] = time.time()
    self._stale_legacy_categories.discard(category)

    legacy_file_path = self._get_legacy_file_path_for_validation(category)
    if legacy_file_path and legacy_file_path.exists():
        try:
            self._legacy_last_known_mtimes[category] = legacy_file_path.stat().st_mtime
        except Exception:
            self._legacy_last_known_mtimes[category] = 0.0

    logger.debug(f"Marked legacy category {category} as fresh")

is_legacy_cache_valid

is_legacy_cache_valid(
    category: MODEL_REFERENCE_CATEGORY,
) -> bool

Return True if the legacy cache for category is considered fresh.

Performs validation checks for legacy format cache: 1. Staleness check (explicit invalidation) 2. Cache existence check (dict or string) 3. TTL expiration check 4. File mtime check (if legacy file path provided by subclass)

Parameters:

Returns:

  • bool ( bool ) –

    True if legacy cache is valid and can be used.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def is_legacy_cache_valid(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Return True if the legacy cache for *category* is considered fresh.

    Performs validation checks for legacy format cache:
    1. Staleness check (explicit invalidation)
    2. Cache existence check (dict or string)
    3. TTL expiration check
    4. File mtime check (if legacy file path provided by subclass)

    Args:
        category: The category to validate.

    Returns:
        bool: True if legacy cache is valid and can be used.

    """
    with self._lock:
        if category in self._stale_legacy_categories:
            logger.debug(f"Legacy category {category} marked stale, cache invalid")
            return False

        if category not in self._legacy_json_cache and category not in self._legacy_json_string_cache:
            return False

        last_updated = self._legacy_cache_timestamps.get(category)

    if last_updated is None:
        logger.debug(f"Legacy category {category} has no timestamp, considering cache invalid")
        return False

    if self._cache_ttl_seconds is not None:
        elapsed = time.time() - last_updated
        if elapsed > self._cache_ttl_seconds:
            logger.debug(f"Legacy category {category} TTL expired ({elapsed}s > {self._cache_ttl_seconds}s)")
            return False

    legacy_file_path = self._get_legacy_file_path_for_validation(category)
    if legacy_file_path and legacy_file_path.exists():
        try:
            current_mtime = legacy_file_path.stat().st_mtime
            last_known = self._legacy_last_known_mtimes.get(category, 0.0)
            if current_mtime != last_known:
                logger.debug(
                    f"Legacy file {legacy_file_path.name} mtime changed "
                    f"(current={current_mtime}, cached={last_known}), cache invalid"
                )
                return False
        except Exception:
            return False

    return True

_get_legacy_from_cache

_get_legacy_from_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> tuple[dict[str, Any] | None, str | None]

Get legacy data from cache if valid.

Returns both dict and string representations of legacy JSON.

Parameters:

Returns:

  • tuple[dict[str, Any] | None, str | None]

    tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) if cache miss.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _get_legacy_from_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
) -> tuple[dict[str, Any] | None, str | None]:
    """Get legacy data from cache if valid.

    Returns both dict and string representations of legacy JSON.

    Args:
        category: The category to retrieve from cache.

    Returns:
        tuple[dict | None, str | None]: (legacy_dict, legacy_string) or (None, None) if cache miss.

    """
    with self._lock:
        if self.is_legacy_cache_valid(category):
            logger.debug(f"Legacy cache hit for {category}")
            return (
                self._legacy_json_cache.get(category),
                self._legacy_json_string_cache.get(category),
            )

        logger.debug(f"Legacy cache miss for {category}")
        return None, None

_store_legacy_in_cache

_store_legacy_in_cache(
    category: MODEL_REFERENCE_CATEGORY,
    legacy_dict: dict[str, Any] | None,
    legacy_string: str | None,
) -> None

Store legacy data in cache and mark category as fresh.

Stores both dict and string representations of legacy JSON.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to store.

  • legacy_dict (dict[str, Any] | None) –

    The legacy JSON as a dict, or None.

  • legacy_string (str | None) –

    The legacy JSON as a string, or None.

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _store_legacy_in_cache(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    legacy_dict: dict[str, Any] | None,
    legacy_string: str | None,
) -> None:
    """Store legacy data in cache and mark category as fresh.

    Stores both dict and string representations of legacy JSON.

    Args:
        category: The category to store.
        legacy_dict: The legacy JSON as a dict, or None.
        legacy_string: The legacy JSON as a string, or None.

    """
    with self._lock:
        self._legacy_json_cache[category] = legacy_dict
        self._legacy_json_string_cache[category] = legacy_string
        # Only mark as fresh if we actually have data
        # None values indicate failed loads and should not prevent retries
        if legacy_dict is not None or legacy_string is not None:
            self._mark_legacy_category_fresh(category)
            logger.debug(f"Stored legacy {category} in cache")
        else:
            logger.debug(f"Stored None for legacy {category}, not marking as fresh")

_invalidate_legacy_cache

_invalidate_legacy_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Invalidate legacy cache for a category without deleting the data.

This marks the category as stale, forcing a refetch on next access.

Parameters:

Source code in src/horde_model_reference/backends/replica_backend_base.py
def _invalidate_legacy_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Invalidate legacy cache for a category without deleting the data.

    This marks the category as stale, forcing a refetch on next access.

    Args:
        category: The category to invalidate.

    """
    with self._lock:
        self._stale_legacy_categories.add(category)
        self._legacy_cache_timestamps.pop(category, None)
        logger.debug(f"Invalidated legacy cache for {category}")