Skip to content

model_reference_manager

Singleton manager for model reference lifecycle: backend selection, caching, and the public API.

TModelRecord module-attribute

TModelRecord = TypeVar(
    "TModelRecord", bound=GenericModelRecord
)

__all__ module-attribute

__all__ = [
    "DeferredPrefetchHandle",
    "ModelReferenceManager",
    "PrefetchStrategy",
]

PrefetchStrategy

Bases: StrEnum

Controls when and how the manager fetches model references.

Source code in src/horde_model_reference/model_reference_manager.py
class PrefetchStrategy(StrEnum):
    """Controls when and how the manager fetches model references."""

    LAZY = "lazy"
    """Defer backend fetches until first access (legacy lazy_mode=True behavior)."""

    SYNC = "sync"
    """Immediately fetch all categories on the calling thread during initialization."""

    DEFERRED = "deferred"
    """Expose a handle the caller can trigger later (sync or async) without blocking init."""

    ASYNC = "async"
    """Automatically schedule a background async warm-up when an event loop is available."""

    NONE = "none"
    """Skip all automatic warm-up; callers must invoke caching helpers manually."""

LAZY class-attribute instance-attribute

LAZY = 'lazy'

Defer backend fetches until first access (legacy lazy_mode=True behavior).

SYNC class-attribute instance-attribute

SYNC = 'sync'

Immediately fetch all categories on the calling thread during initialization.

DEFERRED class-attribute instance-attribute

DEFERRED = 'deferred'

Expose a handle the caller can trigger later (sync or async) without blocking init.

ASYNC class-attribute instance-attribute

ASYNC = 'async'

Automatically schedule a background async warm-up when an event loop is available.

NONE class-attribute instance-attribute

NONE = 'none'

Skip all automatic warm-up; callers must invoke caching helpers manually.

ModelReferenceManager

Singleton class for downloading and reading model reference files.

This class is responsible for managing the lifecycle of model reference files, including downloading, caching, and providing access to the model references.

Uses a pluggable backend architecture to support different data sources (GitHub, database, etc.).

Settings on initialization (base_path, backend, prefetch_strategy, etc) are only set on the first instantiation

(e.g. ModelReferenceManager(base_path=...)). Subsequent instantiations will return the same instance.

Retrieve all model references with get_all_model_references_or_none().

Source code in src/horde_model_reference/model_reference_manager.py
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
class ModelReferenceManager:
    """Singleton class for downloading and reading model reference files.

    This class is responsible for managing the lifecycle of model reference files,
    including downloading, caching, and providing access to the model references.

    Uses a pluggable backend architecture to support different data sources (GitHub, database, etc.).

        Settings on initialization (base_path, backend, prefetch_strategy, etc) are only set on the first instantiation
    (e.g. `ModelReferenceManager(base_path=...)`). Subsequent instantiations will return the same instance.

    Retrieve all model references with `get_all_model_references_or_none()`.
    """

    backend: ModelReferenceBackend
    """The backend provider for model reference data."""
    _cached_records: dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]
    """Cache of pydantic model records by category."""

    _instance: ModelReferenceManager | None = None
    _replicate_mode: ReplicateMode = ReplicateMode.REPLICA
    _prefetch_strategy: PrefetchStrategy = PrefetchStrategy.SYNC
    _deferred_prefetch_handle: DeferredPrefetchHandle | None = None
    _async_prefetch_task: asyncio.Task[None] | None = None
    _audit_writer: AuditTrailWriter | None = None
    _pending_queue_service: PendingQueueService | None = None

    _lock: RLock = RLock()

    @classmethod
    def get_instance(cls) -> ModelReferenceManager:
        """Get the singleton instance of ModelReferenceManager.

        Returns:
            ModelReferenceManager: The singleton instance.

        Raises:
            RuntimeError: If the instance has not been created yet.

        """
        with cls._lock:
            if cls._instance is None:
                raise RuntimeError("ModelReferenceManager instance has not been created yet.")
            return cls._instance

    @classmethod
    def has_instance(cls) -> bool:
        """Check if the singleton instance has been created.

        Returns:
            bool: True if the instance exists, False otherwise.

        """
        with cls._lock:
            return cls._instance is not None

    @classmethod
    def reset(cls) -> None:
        """Destroy the singleton instance so a fresh one can be created.

        Intended for testing and development only. Production code should not
        call this — the singleton is designed to live for the process lifetime.
        """
        with cls._lock:
            instance = cls._instance
            if instance is None:
                return

            if instance._deferred_prefetch_handle is not None:
                instance._deferred_prefetch_handle = None

            if instance._async_prefetch_task is not None and not instance._async_prefetch_task.done():
                instance._async_prefetch_task.cancel()
                instance._async_prefetch_task = None

            cls._instance = None

    @staticmethod
    def _create_backend(
        base_path: str | Path,
        replicate_mode: ReplicateMode,
        audit_writer: AuditTrailWriter | None,
    ) -> ModelReferenceBackend:
        """Create the appropriate backend based on mode and settings.

        Args:
            base_path: Base path for model reference files.
            replicate_mode: The replication mode.
            audit_writer: Optional audit writer used by write-capable backends.

        Returns:
            ModelReferenceBackend: The configured backend instance.

        """
        logger.debug(f"Creating backend with replicate_mode={replicate_mode}, base_path={base_path}")
        if replicate_mode == ReplicateMode.PRIMARY:
            logger.debug("Creating backend for PRIMARY mode")

            # Check if GitHub seeding will be needed
            github_seeding_will_occur = False
            if horde_model_reference_settings.github_seed_enabled:
                # Quick check to see if any categories are missing
                # (we'll do proper check after backend creation)
                github_seeding_will_occur = True

            filesystem_backend = FileSystemBackend(
                base_path=base_path,
                cache_ttl_seconds=horde_model_reference_settings.cache_ttl_seconds,
                replicate_mode=ReplicateMode.PRIMARY,
                skip_startup_metadata_population=github_seeding_will_occur,
                audit_writer=audit_writer,
            )

            if horde_model_reference_settings.github_seed_enabled:
                logger.info("GitHub seeding enabled for PRIMARY mode")

                all_paths = filesystem_backend.get_all_category_file_paths()
                missing_categories = [cat for cat, path in all_paths.items() if path is None or not path.exists()]

                if missing_categories:
                    logger.info(f"Missing categories detected: {missing_categories}. Seeding from GitHub...")

                    github_backend = GitHubBackend(
                        base_path=base_path,
                        replicate_mode=ReplicateMode.PRIMARY,
                    )

                    github_backend.fetch_all_categories(force_refresh=True)
                    logger.info("GitHub seeding completed")

                    # Populate metadata after seeding
                    logger.info("Populating metadata after GitHub seeding")
                    filesystem_backend.ensure_all_metadata_populated()
                else:
                    logger.debug("All files exist, skipping GitHub seeding")
                    # Files exist but seeding was skipped, so run metadata population
                    logger.info("Running metadata population check (seeding was skipped)")
                    filesystem_backend.ensure_all_metadata_populated()

            if horde_model_reference_settings.redis.use_redis:
                from horde_model_reference.backends.redis_backend import RedisBackend

                logger.info("Wrapping FileSystemBackend with RedisBackend for distributed caching")
                return RedisBackend(
                    file_backend=filesystem_backend,
                    redis_settings=horde_model_reference_settings.redis,
                    cache_ttl_seconds=horde_model_reference_settings.cache_ttl_seconds,
                )

            logger.info("Using FileSystemBackend for single-worker PRIMARY deployment")
            return filesystem_backend

        logger.debug("Creating backend for REPLICA mode")

        github_backend = GitHubBackend(
            base_path=base_path,
            replicate_mode=ReplicateMode.REPLICA,
        )

        if horde_model_reference_settings.primary_api_url:
            logger.info(f"Using HTTPBackend with PRIMARY API: {horde_model_reference_settings.primary_api_url}")
            return HTTPBackend(
                primary_api_url=horde_model_reference_settings.primary_api_url,
                github_backend=github_backend,
                cache_ttl_seconds=horde_model_reference_settings.cache_ttl_seconds,
                timeout_seconds=horde_model_reference_settings.primary_api_timeout,
                enable_github_fallback=horde_model_reference_settings.enable_github_fallback,
            )

        logger.info("Using GitHubBackend only (no PRIMARY API configured)")
        return github_backend

    def __new__(
        cls,
        *,
        backend: ModelReferenceBackend | None = None,
        base_path: str | Path = horde_model_reference_paths.base_path,
        replicate_mode: ReplicateMode = horde_model_reference_settings.replicate_mode,
        prefetch_strategy: PrefetchStrategy = PrefetchStrategy.LAZY,
    ) -> ModelReferenceManager:
        """Create a new instance of ModelReferenceManager.

        Uses the singleton pattern to ensure only one instance exists to avoid multiple downloads and conversions.
        Subsequent instantiations will return the same instance, and an attempt to re-instantiate with different
        settings will raise an exception.

        Args:
            backend: The backend to use for fetching model references.
                If None, automatically selects the appropriate backend based on replicate_mode and settings:
                - PRIMARY mode: FileSystemBackend (optionally wrapped with RedisBackend if configured)
                - REPLICA mode: HTTPBackend (if PRIMARY API URL configured) or GitHubBackend (fallback)
                Defaults to None.
            base_path: The base path to use for storing model reference files.
                Only used if backend is None. Defaults to horde_model_reference_paths.base_path.
            replicate_mode: The replicate mode to use.
                - PRIMARY: Local filesystem is source of truth
                - REPLICA: Fetch from PRIMARY API or GitHub
                Only used if backend is None. Defaults to horde_model_reference_settings.replicate_mode.
            prefetch_strategy: Controls whether initial cache warm-up is skipped (LAZY/NONE),
                performed synchronously, deferred, or executed via background async task.
                Defaults to PrefetchStrategy.LAZY.

        Returns:
            ModelReferenceManager: The singleton instance of ModelReferenceManager.

        Raises:
            RuntimeError: If an attempt is made to re-instantiate with different settings.

        """
        if not isinstance(prefetch_strategy, PrefetchStrategy):
            try:
                prefetch_strategy = PrefetchStrategy(prefetch_strategy)
            except ValueError as exc:  # pragma: no cover - defensive branch
                raise ValueError(
                    f"prefetch_strategy must be one of: {', '.join(strategy.value for strategy in PrefetchStrategy)}"
                ) from exc

        with cls._lock:
            if not cls._instance:
                cls._instance = super().__new__(cls)

                audit_writer: AuditTrailWriter | None = None
                if horde_model_reference_settings.audit.enabled:
                    audit_writer = AuditTrailWriter(
                        root_path=horde_model_reference_paths.audit_path,
                        max_file_size_bytes=horde_model_reference_settings.audit.max_segment_bytes,
                    )

                if backend is None:
                    backend = cls._create_backend(
                        base_path=base_path,
                        replicate_mode=replicate_mode,
                        audit_writer=audit_writer,
                    )

                backend_mode = backend.replicate_mode
                if backend_mode != replicate_mode:
                    raise RuntimeError(
                        "Backend replicate_mode does not match requested ModelReferenceManager configuration. "
                        f"Backend mode: {backend_mode}, requested mode: {replicate_mode}."
                    )

                cls._instance.backend = backend
                cls._instance._replicate_mode = replicate_mode
                if backend.supports_writes():
                    cls._instance._audit_writer = audit_writer
                    cls._instance._pending_queue_service = cls._build_pending_queue_service(
                        audit_writer=audit_writer,
                    )
                else:
                    cls._instance._audit_writer = None
                    cls._instance._pending_queue_service = None
                cls._instance._cached_records = {}
                cls._instance._deferred_prefetch_handle = None
                cls._instance._async_prefetch_task = None

                # Register invalidation callback so backend can notify us when cache is stale
                cls._instance.backend.register_invalidation_callback(cls._instance._on_backend_invalidated)

                cls._instance._apply_prefetch_strategy(strategy=prefetch_strategy)
            else:
                if backend is not None and backend is not cls._instance.backend:
                    raise RuntimeError(
                        "ModelReferenceManager is a singleton and has already been instantiated "
                        "with a different backend."
                    )
                if replicate_mode != cls._instance._replicate_mode:
                    raise RuntimeError(
                        "ModelReferenceManager is a singleton and has already been instantiated with different "
                        "settings.\nExisting settings: "
                        f"replicate_mode={cls._instance._replicate_mode}.\n"
                        "New settings: "
                        f"replicate_mode={replicate_mode}."
                    )
                if prefetch_strategy != cls._instance._prefetch_strategy:
                    raise RuntimeError(
                        "ModelReferenceManager is a singleton and has already been instantiated with different "
                        "settings."
                        f"\nExisting prefetch_strategy={cls._instance._prefetch_strategy.value};"
                        f" new prefetch_strategy={prefetch_strategy.value}."
                    )

        return cls._instance

    def _apply_prefetch_strategy(self, *, strategy: PrefetchStrategy) -> None:
        """Apply the configured prefetch strategy once the backend is available."""
        self._prefetch_strategy = strategy
        self._deferred_prefetch_handle = None
        self._async_prefetch_task = None

        if strategy in (PrefetchStrategy.LAZY, PrefetchStrategy.NONE):
            logger.debug(f"prefetch skipped because strategy={strategy.value}")
            return

        if strategy is PrefetchStrategy.SYNC:
            self._fetch_from_backend_if_needed(force_refresh=False)
            return

        if strategy is PrefetchStrategy.DEFERRED:
            self._deferred_prefetch_handle = DeferredPrefetchHandle(manager=self, force_refresh=False)
            logger.info(
                "Deferred prefetch handle created; call run_sync/run_async to warm caches without blocking",
            )
            return

        if strategy is PrefetchStrategy.ASYNC:
            self._schedule_async_prefetch(force_refresh=False)
            return

        raise ValueError(f"Unsupported prefetch strategy: {strategy}")

    def _on_backend_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """On callback invoked by backend when a category's cache is invalidated.

        This ensures the pydantic model cache stays in sync with backend invalidations.

        Args:
            category: The category that was invalidated.

        """
        logger.debug(f"Backend invalidated category {category}, clearing pydantic cache")
        self._invalidate_cache(category)

    def _invalidate_cache(self, category: MODEL_REFERENCE_CATEGORY | None = None) -> None:
        """Invalidate the cached pydantic model references.

        Args:
            category: If provided, only invalidate the specific category.
                If None, invalidate the entire cache.

        """
        with self._lock:
            if category is None:
                logger.debug("Invalidating entire cached pydantic records.")
                self._cached_records = {}
            else:
                logger.debug(f"Invalidating cached pydantic records for category: {category}.")
                self._cached_records.pop(category, None)

    def invalidate_category_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Explicitly invalidate cached data for a category.

        Intended for use by the apply workflow after a successful backend write,
        so stale data is never served regardless of backend callback timing.

        Args:
            category: The category whose cache should be dropped.

        """
        self._invalidate_cache(category)

    def _fetch_from_backend_if_needed(
        self,
        force_refresh: bool,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Fetch references from backend if needed.

        Args:
            force_refresh: Whether to force refresh all categories.

        """
        return self.backend.fetch_all_categories(force_refresh=force_refresh)

    async def _fetch_from_backend_if_needed_async(
        self,
        force_refresh: bool,
        httpx_client: httpx.AsyncClient | None,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Asynchronously fetch references from backend if needed.

        Args:
            force_refresh: Whether to force refresh all categories.
            httpx_client: An optional httpx async client to use.

        """
        return await self.backend.fetch_all_categories_async(
            force_refresh=force_refresh,
            httpx_client=httpx_client,
        )

    @staticmethod
    def _build_pending_queue_service(
        *,
        audit_writer: AuditTrailWriter | None,
    ) -> PendingQueueService | None:
        """Create the pending queue service when enabled."""
        if not horde_model_reference_settings.pending_queue.enabled:
            return None

        from horde_model_reference.pending_queue.service import PendingQueueService
        from horde_model_reference.pending_queue.store import PendingQueueStore

        store = PendingQueueStore(root_path=horde_model_reference_paths.pending_queue_path)
        return PendingQueueService(store=store, audit_writer=audit_writer)

    @property
    def prefetch_strategy(self) -> PrefetchStrategy:
        """Return the prefetch strategy originally configured for this manager."""
        return self._prefetch_strategy

    @property
    def pending_queue_service(self) -> PendingQueueService | None:
        """Return the pending queue service when queueing is enabled."""
        return self._pending_queue_service

    @property
    def deferred_prefetch_handle(self) -> DeferredPrefetchHandle | None:
        """Handle that callers can use to trigger a deferred eager fetch."""
        return self._deferred_prefetch_handle

    def create_deferred_prefetch_handle(
        self,
        *,
        force_refresh: bool = False,
    ) -> DeferredPrefetchHandle:
        """Create a deferred prefetch handle tied to this manager.

        Args:
            force_refresh: Whether the handle should bypass backend caches.

        Returns:
            DeferredPrefetchHandle: Handle that can execute the warm-up later.

        """
        handle = DeferredPrefetchHandle(manager=self, force_refresh=force_refresh)
        self._deferred_prefetch_handle = handle
        return handle

    def _schedule_async_prefetch(self, *, force_refresh: bool) -> None:
        """Schedule an async cache warm-up when an event loop is available."""
        handle = self.create_deferred_prefetch_handle(force_refresh=force_refresh)

        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            logger.warning(
                "PrefetchStrategy.ASYNC requested but no running event loop detected; "
                "exposing deferred handle for manual execution instead.",
            )
            self._async_prefetch_task = None
            return

        logger.info("Scheduling asynchronous prefetch warm-up task")
        task = loop.create_task(handle.run_async())
        self._async_prefetch_task = task

        def _log_completion(completed: asyncio.Task[None]) -> None:
            try:
                completed.result()
            except Exception as exc:  # pragma: no cover - best-effort logging
                logger.error("Deferred async prefetch failed: %s", exc)

        task.add_done_callback(_log_completion)

    async def warm_cache_async(
        self,
        *,
        force_refresh: bool = False,
        httpx_client: httpx.AsyncClient | None = None,
    ) -> None:
        """Warm cached pydantic records using backend async APIs.

        Args:
            force_refresh: Whether to bypass backend caches while warming.
            httpx_client: Optional shared async client for HTTP backends.

        """
        await self.get_all_model_references_or_none_async(
            overwrite_existing=force_refresh,
            httpx_client=httpx_client,
        )

    async def ensure_ready_async(
        self,
        *,
        overwrite_existing: bool = False,
        httpx_client: httpx.AsyncClient | None = None,
    ) -> None:
        """Ensure cached references exist by delegating to ``warm_cache_async``.

        Args:
            overwrite_existing: Whether to bypass backend caches while warming.
            httpx_client: Optional shared async client for HTTP backends.

        """
        await self.warm_cache_async(force_refresh=overwrite_existing, httpx_client=httpx_client)

    @staticmethod
    def _file_json_dict_to_model_reference(
        category: MODEL_REFERENCE_CATEGORY,
        file_json_dict: dict[str, Any] | None,
        safe_mode: bool = False,
    ) -> dict[str, GenericModelRecord] | None:
        """Return a model reference object from a JSON dictionary, or None if conversion failed.

        Args:
            category: The target model reference category to convert.
            file_json_dict: The dict object representing the model reference.
            safe_mode: Whether to raise exceptions on failure. If False, exceptions are caught
                and None is returned. Defaults to False.

        Returns:
            dict[str, GenericModelRecord] | None: The dict representing the model reference,
                or None if conversion failed.

        """
        if file_json_dict is None:
            logger.warning(f"File dict json is None for {category}.")
            return None

        if category in categories_managed_elsewhere:
            logger.info(f"Skipping conversion for category: {category} (managed elsewhere)")
            return None

        try:
            record_type = MODEL_RECORD_TYPE_LOOKUP.get(category, GenericModelRecord)
            model_reference: dict[str, GenericModelRecord] = {}
            for model_value in file_json_dict.values():
                model_instance = record_type.model_validate(model_value)
                model_reference[model_instance.name] = model_instance

            return model_reference

        except Exception as e:
            if not safe_mode:
                logger.exception(f"Failed to convert file dict JSON to model reference for {category}: {e}")
                return None
            raise e

    @staticmethod
    def model_reference_to_json_dict(
        model_reference: dict[str, GenericModelRecord],
        safe_mode: bool = False,
    ) -> dict[str, Any] | None:
        """Return a JSON dictionary from a model reference object, or None if conversion failed.

        Args:
            model_reference: The model reference object.
            safe_mode: Whether to raise exceptions on failure. If False, exceptions are caught
                and None is returned. Use `model_reference_to_json_dict_safe()` for the better type hinting if you
                intend to use this. Defaults to False.

        Returns:
            dict | None: The dict representing the model reference, or None if conversion failed.

        """
        if model_reference is None:
            raise ValueError("model_reference cannot be None")

        try:
            return {
                name: record.model_dump(
                    exclude_unset=True,
                )
                for name, record in model_reference.items()
            }
        except Exception as e:
            if not safe_mode:
                logger.exception(f"Failed to convert model reference to JSON: {e}")
                return None

            raise e

    @staticmethod
    def model_reference_to_json_dict_safe(
        model_reference: dict[str, GenericModelRecord],
    ) -> dict[str, Any]:
        """Return a JSON dictionary from a model reference object.

        Raises an exception if conversion fails.

        Args:
            model_reference: The model reference object.

        Returns:
            dict: The dict representing the model reference.

        """
        json_dict_safe = ModelReferenceManager.model_reference_to_json_dict(model_reference, safe_mode=True)

        if json_dict_safe is None:
            raise RuntimeError("Conversion to JSON dict failed in safe mode, but no exception was raised.")

        return json_dict_safe

    def _get_all_cached_model_references(
        self,
        safe_mode: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]:
        """Get all cached pydantic model references.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]: A mapping of model reference
                categories to their corresponding pydantic model objects.

        """
        with self._lock:
            logger.debug(f"Returning {len(self._cached_records)} cached pydantic model references.")
            return dict(self._cached_records)

    def _evaluate_cache_state(
        self,
        *,
        overwrite_existing: bool,
        safe_mode: bool,
    ) -> tuple[
        bool,
        dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None],
        list[MODEL_REFERENCE_CATEGORY],
    ]:
        """Return whether cached data can be reused plus categories needing refresh."""
        with self._lock:
            refresh_map = {category: self.backend.needs_refresh(category) for category in MODEL_REFERENCE_CATEGORY}
            all_categories_cached = all(cat in self._cached_records for cat in MODEL_REFERENCE_CATEGORY)
            needs_backend_refresh = overwrite_existing or any(refresh_map.values())

            if not overwrite_existing and all_categories_cached and not needs_backend_refresh:
                logger.debug("Using fully cached pydantic model references.")
                return True, self._get_all_cached_model_references(safe_mode=safe_mode), []

            categories_to_load: list[MODEL_REFERENCE_CATEGORY] = []
            for category in MODEL_REFERENCE_CATEGORY:
                cached_value = self._cached_records.get(category)
                if (
                    overwrite_existing
                    or category not in self._cached_records
                    or cached_value is None
                    or refresh_map[category]
                ):
                    categories_to_load.append(category)

            return False, {}, categories_to_load

    def _load_categories_from_payload(
        self,
        *,
        categories_to_load: Iterable[MODEL_REFERENCE_CATEGORY],
        payload: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] | None,
        overwrite_existing: bool,
        safe_mode: bool,
    ) -> None:
        """Convert backend payload into cached pydantic models for selected categories."""
        normalized_payload = payload or {}
        prepared_payload: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
        missing_payload: list[MODEL_REFERENCE_CATEGORY] = []

        for category in categories_to_load:
            if category in normalized_payload:
                prepared_payload[category] = normalized_payload[category]
            else:
                missing_payload.append(category)

        if missing_payload:
            logger.debug(
                "Backend payload missing %d categories; falling back to per-category fetch: %s",
                len(missing_payload),
                missing_payload,
            )
            for category in missing_payload:
                prepared_payload[category] = self.backend.fetch_category(
                    category,
                    force_refresh=overwrite_existing,
                )

        with self._lock:
            for category, file_json in prepared_payload.items():
                model_reference = self._file_json_dict_to_model_reference(
                    category,
                    file_json,
                    safe_mode=safe_mode,
                )
                self._cached_records[category] = model_reference

    def get_all_model_references_or_none(
        self,
        overwrite_existing: bool = False,
        *,
        safe_mode: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]:
        """Return a mapping of all model reference categories to their corresponding model reference objects.

        Note that values may be None if the model reference file could not be found or parsed.

        Args:
            overwrite_existing: Whether to force a redownload of all model reference files.
                Defaults to False.
            safe_mode: Whether to raise exceptions on failure. If False, exceptions are caught
                and None is returned for that category. Defaults to False. Use `get_all_model_references()`
                for the better type hinting if you intend to use this.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]: A mapping of model reference
                categories to their corresponding model reference objects.

        """
        use_cache, cached_result, categories_to_load = self._evaluate_cache_state(
            overwrite_existing=overwrite_existing,
            safe_mode=safe_mode,
        )

        if use_cache:
            return cached_result

        logger.debug("Fetching model references from backend as needed.")
        backend_payload = self._fetch_from_backend_if_needed(force_refresh=overwrite_existing)

        if categories_to_load:
            logger.debug("Loading %d model reference categories: %s", len(categories_to_load), categories_to_load)
            self._load_categories_from_payload(
                categories_to_load=categories_to_load,
                payload=backend_payload,
                overwrite_existing=overwrite_existing,
                safe_mode=safe_mode,
            )

        return self._get_all_cached_model_references(safe_mode=safe_mode)

    def _build_safe_reference_view(
        self,
        all_references: dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None],
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]:
        """Convert a possibly sparse reference view into a safe mapping with logging.

        Args:
            all_references: Mapping of categories to model reference dicts or None.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]: Mapping where
            missing categories map to empty dicts.

        """
        safe_references: dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]] = {}
        missing_references: list[MODEL_REFERENCE_CATEGORY] = []
        for category, reference in all_references.items():
            if reference is None:
                missing_references.append(category)
                safe_references[category] = {}
            else:
                safe_references[category] = reference

        if missing_references:
            logger.error(f"Missing model references for categories: {missing_references}")

        return safe_references

    def get_all_model_references(
        self,
        overwrite_existing: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]:
        """Return a mapping of all model reference categories to their corresponding model reference objects.

        If a model reference file could not be found or parsed, an exception is raised. If you want to allow
        missing model references, use `get_all_model_references_or_none()` instead.

        Args:
            overwrite_existing: Whether to force a redownload of all model reference files.
                Defaults to False.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]: A mapping of model reference
                categories to their corresponding model reference objects.

        """
        all_references = self.get_all_model_references_or_none(overwrite_existing=overwrite_existing)
        return self._build_safe_reference_view(all_references)

    async def get_all_model_references_or_none_async(
        self,
        overwrite_existing: bool = False,
        *,
        safe_mode: bool = False,
        httpx_client: httpx.AsyncClient | None = None,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]:
        """Return model references asynchronously without enforcing presence.

        Args:
            overwrite_existing: Whether to force backend refresh.
            safe_mode: Whether to propagate conversion errors.
            httpx_client: Optional shared async client for HTTP backends.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]: Possibly
            sparse mapping keyed by category.

        """
        use_cache, cached_result, categories_to_load = self._evaluate_cache_state(
            overwrite_existing=overwrite_existing,
            safe_mode=safe_mode,
        )

        if use_cache:
            return cached_result

        logger.debug("Asynchronously fetching model references from backend as needed.")
        backend_payload = await self._fetch_from_backend_if_needed_async(
            force_refresh=overwrite_existing,
            httpx_client=httpx_client,
        )

        if categories_to_load:
            logger.debug("Loading %d model reference categories via async payload", len(categories_to_load))
            self._load_categories_from_payload(
                categories_to_load=categories_to_load,
                payload=backend_payload,
                overwrite_existing=overwrite_existing,
                safe_mode=safe_mode,
            )

        return self._get_all_cached_model_references(safe_mode=safe_mode)

    async def get_all_model_references_async(
        self,
        overwrite_existing: bool = False,
        *,
        httpx_client: httpx.AsyncClient | None = None,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]:
        """Return all model references asynchronously, raising on missing categories.

        Args:
            overwrite_existing: Whether to force backend refresh.
            httpx_client: Optional shared async client for HTTP backends.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]: Mapping with
            empty dicts substituted for missing categories.

        """
        all_references = await self.get_all_model_references_or_none_async(
            overwrite_existing=overwrite_existing,
            httpx_client=httpx_client,
        )
        return self._build_safe_reference_view(all_references)

    def get_model_reference_or_none(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
    ) -> dict[str, GenericModelRecord] | None:
        """Return the model reference object for a specific category.

        Args:
            category: The category to retrieve.
            overwrite_existing: Whether to force a redownload. Defaults to False.

        Returns:
            dict[str, GenericModelRecord] | None: The model reference object for the category,
                or None if not found.

        """
        all_references = self.get_all_model_references_or_none(overwrite_existing=overwrite_existing)
        return all_references.get(category)

    async def get_model_reference_or_none_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
        *,
        httpx_client: httpx.AsyncClient | None = None,
    ) -> dict[str, GenericModelRecord] | None:
        """Return a single category's references asynchronously without strict enforcement.

        Args:
            category: Target category to load.
            overwrite_existing: Whether to force backend refresh.
            httpx_client: Optional shared async client for HTTP backends.

        Returns:
            dict[str, GenericModelRecord] | None: Mapping of model names or None.

        """
        all_references = await self.get_all_model_references_or_none_async(
            overwrite_existing=overwrite_existing,
            httpx_client=httpx_client,
        )
        return all_references.get(category)

    def get_model_reference(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
    ) -> dict[str, GenericModelRecord]:
        """Return the model reference object for a specific category.

        Raises an exception if the model reference could not be found or parsed.
        If you want to allow missing model references, use `get_model_reference_or_none()` instead.

        Args:
            category: The category to retrieve.
            overwrite_existing: Whether to force a redownload. Defaults to False.

        Returns:
            dict[str, GenericModelRecord]: The model reference object for the category.

        """
        model_reference = self.get_model_reference_or_none(
            category,
            overwrite_existing=overwrite_existing,
        )
        if model_reference is None:
            raise RuntimeError(f"Model reference for category {category} not found or could not be parsed.")

        return model_reference

    async def get_model_reference_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
        *,
        httpx_client: httpx.AsyncClient | None = None,
    ) -> dict[str, GenericModelRecord]:
        """Return a single category's references asynchronously, raising if missing.

        Args:
            category: Target category to load.
            overwrite_existing: Whether to force backend refresh.
            httpx_client: Optional shared async client for HTTP backends.

        Returns:
            dict[str, GenericModelRecord]: Mapping of model names for the category.

        Raises:
            RuntimeError: If the category is missing or could not be parsed.

        """
        model_reference = await self.get_model_reference_or_none_async(
            category,
            overwrite_existing=overwrite_existing,
            httpx_client=httpx_client,
        )
        if model_reference is None:
            raise RuntimeError(f"Model reference for category {category} not found or could not be parsed.")

        return model_reference

    def get_model_names_or_none(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
    ) -> list[str] | None:
        """Return a list of model names for a specific category.

        Args:
            category: The category to retrieve.
            overwrite_existing: Whether to force a redownload. Defaults to False.

        Returns:
            list[str] | None: The list of model names for the category, or None if not found.

        """
        model_reference = self.get_model_reference_or_none(
            category,
            overwrite_existing=overwrite_existing,
        )
        if model_reference is None:
            return None

        return list(model_reference.keys())

    def get_model_names(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
    ) -> list[str]:
        """Return a list of model names for a specific category.

        Raises an exception if the model reference could not be found or parsed.
        If you want to allow missing model references, use `get_model_names_or_none()` instead.

        Args:
            category: The category to retrieve.
            overwrite_existing: Whether to force a redownload. Defaults to False.

        Returns:
            list[str]: The list of model names for the category.

        """
        model_reference = self.get_model_reference(
            category,
            overwrite_existing=overwrite_existing,
        )
        if model_reference is None:
            raise RuntimeError(f"Model reference for category {category} not found or could not be parsed.")

        return list(model_reference.keys())

    def get_model_or_none(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        overwrite_existing: bool = False,
    ) -> GenericModelRecord | None:
        """Return a specific model from a category.

        Args:
            category: The category to retrieve.
            model_name: The name of the model within the category.
            overwrite_existing: Whether to force a redownload. Defaults to False.

        Returns:
            GenericModelRecord | None: The model record, or None if not found.

        """
        model_reference = self.get_model_reference_or_none(
            category,
            overwrite_existing=overwrite_existing,
        )
        if model_reference is None:
            return None

        return model_reference.get(model_name)

    def get_model(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        overwrite_existing: bool = False,
    ) -> GenericModelRecord:
        """Return a specific model from a category.

        Raises an exception if the model could not be found or parsed.
        If you want to allow missing models, use `get_model_or_none()` instead.

        Args:
            category: The category to retrieve.
            model_name: The name of the model within the category.
            overwrite_existing: Whether to force a redownload. Defaults to False.

        Returns:
            GenericModelRecord: The model record.

        """
        model_reference = self.get_model_reference(
            category,
            overwrite_existing=overwrite_existing,
        )

        model_record = model_reference.get(model_name)
        if model_record is None:
            raise RuntimeError(f"Model {model_name} not found in category {category}.")

        return model_record

    def get_raw_model_reference_json(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        overwrite_existing: bool = False,
    ) -> dict[str, Any] | None:
        """Return the raw JSON dict for a specific category without pydantic validation.

        This method delegates to the backend to fetch the raw JSON data directly,
        avoiding the overhead of creating pydantic models. Ideal for API endpoints
        that need fast JSON responses.

        Args:
            category: The category to retrieve.
            overwrite_existing: Whether to force a redownload. Defaults to False.

        Returns:
            dict[str, Any] | None: The raw JSON dict for the category, or None if not found.

        """
        return self.backend.fetch_category(category, force_refresh=overwrite_existing)

    def get_raw_model_json(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        overwrite_existing: bool = False,
    ) -> dict[str, Any] | None:
        """Return the raw JSON dict for a specific model in a category without pydantic validation.

        This method delegates to the backend to fetch the raw JSON data directly,
        avoiding the overhead of creating pydantic models. Ideal for API endpoints
        that need fast JSON responses.

        Args:
            category: The category to retrieve.
            model_name: The name of the model within the category.
            overwrite_existing: Whether to force a redownload. Defaults to False.

        Returns:
            dict[str, Any] | None: The raw JSON dict for the model, or None if not found.

        """
        category_json = self.backend.fetch_category(category, force_refresh=overwrite_existing)

        if category_json is None:
            return None

        return category_json.get(model_name)

    def _get_typed_models(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        record_type: type[TModelRecord],
    ) -> dict[str, TModelRecord]:
        """Return a typed mapping for the requested category."""
        model_reference = self.get_model_reference(category)

        if len(model_reference) == 0:
            return {}

        typed_reference: dict[str, TModelRecord] = {}
        for name, record in model_reference.items():
            if not isinstance(record, record_type):
                raise RuntimeError(
                    f"Some records in {category.value} category are not {record_type.__name__} instances."
                )
            typed_reference[name] = record

        return typed_reference

    @property
    def blip_models(self) -> dict[str, BlipModelRecord]:
        """Return a mapping of BLIP model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.blip,
            record_type=BlipModelRecord,
        )

    @property
    def clip_models(self) -> dict[str, ClipModelRecord]:
        """Return a mapping of CLIP model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.clip,
            record_type=ClipModelRecord,
        )

    @property
    def codeformer_models(self) -> dict[str, CodeformerModelRecord]:
        """Return a mapping of CodeFormer model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.codeformer,
            record_type=CodeformerModelRecord,
        )

    @property
    def controlnet_models(self) -> dict[str, ControlNetModelRecord]:
        """Return a mapping of ControlNet model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.controlnet,
            record_type=ControlNetModelRecord,
        )

    @property
    def esrgan_models(self) -> dict[str, EsrganModelRecord]:
        """Return a mapping of ESRGAN model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.esrgan,
            record_type=EsrganModelRecord,
        )

    @property
    def gfpgan_models(self) -> dict[str, GfpganModelRecord]:
        """Return a mapping of GFPGAN model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.gfpgan,
            record_type=GfpganModelRecord,
        )

    @property
    def safety_checker_models(self) -> dict[str, SafetyCheckerModelRecord]:
        """Return a mapping of safety checker model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.safety_checker,
            record_type=SafetyCheckerModelRecord,
        )

    @property
    def image_generation_models(self) -> dict[str, ImageGenerationModelRecord]:
        """Return a mapping of image generation model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.image_generation,
            record_type=ImageGenerationModelRecord,
        )

    @property
    def text_generation_models(self) -> dict[str, TextGenerationModelRecord]:
        """Return a mapping of text generation model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.text_generation,
            record_type=TextGenerationModelRecord,
        )

    @property
    def audio_generation_models(self) -> dict[str, AudioGenerationModelRecord]:
        """Return a mapping of audio generation model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.audio_generation,
            record_type=AudioGenerationModelRecord,
        )

    @property
    def video_generation_models(self) -> dict[str, VideoGenerationModelRecord]:
        """Return a mapping of video generation model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.video_generation,
            record_type=VideoGenerationModelRecord,
        )

    @property
    def miscellaneous_models(self) -> dict[str, MiscellaneousModelRecord]:
        """Return a mapping of miscellaneous model names to their records."""
        return self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.miscellaneous,
            record_type=MiscellaneousModelRecord,
        )

    @overload
    def query(self, category: Literal["image_generation"]) -> ImageGenerationQuery: ...  # type: ignore[overload-overlap]

    @overload
    def query(self, category: Literal["text_generation"]) -> TextModelQuery: ...  # type: ignore[overload-overlap]

    @overload
    def query(self, category: Literal["controlnet"]) -> ControlNetQuery: ...  # type: ignore[overload-overlap]

    @overload
    def query(
        self,
        category: str,
    ) -> ModelQuery[
        GenericModelRecord, GenericFieldName | ImageGenFieldName | TextGenFieldName | ControlNetFieldName
    ]: ...

    def query(
        self,
        category: MODEL_REFERENCE_CATEGORY | str,
    ) -> (
        ImageGenerationQuery
        | TextModelQuery
        | ControlNetQuery
        | ModelQuery[GenericModelRecord, GenericFieldName | ImageGenFieldName | TextGenFieldName | ControlNetFieldName]
    ):
        """Return a query builder for a single category.

        When called with a literal category string, the return type is
        narrowed to the corresponding typed query builder (e.g.
        ``ImageGenerationQuery`` for ``"image_generation"``).

        Args:
            category: The model reference category to query.

        Returns:
            A ``ModelQuery`` (or typed subclass) ready for chaining filters.

        """
        if isinstance(category, str):
            category = MODEL_REFERENCE_CATEGORY(category)

        if category == MODEL_REFERENCE_CATEGORY.image_generation:
            return self.query_image_generation()
        if category == MODEL_REFERENCE_CATEGORY.text_generation:
            return self.query_text_generation()
        if category == MODEL_REFERENCE_CATEGORY.controlnet:
            return self.query_controlnet()

        records = self.get_model_reference(category)
        record_type = MODEL_RECORD_TYPE_LOOKUP.get(category, GenericModelRecord)
        return build_query(records, record_type)

    def query_all(
        self,
    ) -> ModelQuery[GenericModelRecord, GenericFieldName | ImageGenFieldName | TextGenFieldName | ControlNetFieldName]:
        """Return a query builder spanning all categories.

        Returns:
            A ``ModelQuery[GenericModelRecord]`` over every cached record.

        """
        all_refs = self.get_all_model_references()
        return build_cross_category_query(all_refs)

    def query_image_generation(self) -> ImageGenerationQuery:
        """Return a typed query builder for image generation models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.image_generation,
            record_type=ImageGenerationModelRecord,
        )
        return build_image_query(records)

    def query_text_generation(self) -> TextModelQuery:
        """Return a typed query builder for text generation models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.text_generation,
            record_type=TextGenerationModelRecord,
        )
        return build_text_query(records)

    def query_controlnet(self) -> ControlNetQuery:
        """Return a typed query builder for ControlNet models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.controlnet,
            record_type=ControlNetModelRecord,
        )
        return build_controlnet_query(records)

    def query_blip(self) -> ModelQuery[BlipModelRecord, GenericFieldName]:
        """Return a typed query builder for BLIP models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.blip,
            record_type=BlipModelRecord,
        )
        return build_query(records, BlipModelRecord)

    def query_clip(self) -> ModelQuery[ClipModelRecord, GenericFieldName]:
        """Return a typed query builder for CLIP models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.clip,
            record_type=ClipModelRecord,
        )
        return build_query(records, ClipModelRecord)

    def query_codeformer(self) -> ModelQuery[CodeformerModelRecord, GenericFieldName]:
        """Return a typed query builder for CodeFormer models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.codeformer,
            record_type=CodeformerModelRecord,
        )
        return build_query(records, CodeformerModelRecord)

    def query_esrgan(self) -> ModelQuery[EsrganModelRecord, GenericFieldName]:
        """Return a typed query builder for ESRGAN models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.esrgan,
            record_type=EsrganModelRecord,
        )
        return build_query(records, EsrganModelRecord)

    def query_gfpgan(self) -> ModelQuery[GfpganModelRecord, GenericFieldName]:
        """Return a typed query builder for GFPGAN models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.gfpgan,
            record_type=GfpganModelRecord,
        )
        return build_query(records, GfpganModelRecord)

    def query_safety_checker(self) -> ModelQuery[SafetyCheckerModelRecord, GenericFieldName]:
        """Return a typed query builder for safety checker models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.safety_checker,
            record_type=SafetyCheckerModelRecord,
        )
        return build_query(records, SafetyCheckerModelRecord)

    def query_audio_generation(self) -> ModelQuery[AudioGenerationModelRecord, GenericFieldName]:
        """Return a typed query builder for audio generation models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.audio_generation,
            record_type=AudioGenerationModelRecord,
        )
        return build_query(records, AudioGenerationModelRecord)

    def query_video_generation(self) -> ModelQuery[VideoGenerationModelRecord, GenericFieldName]:
        """Return a typed query builder for video generation models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.video_generation,
            record_type=VideoGenerationModelRecord,
        )
        return build_query(records, VideoGenerationModelRecord)

    def query_miscellaneous(self) -> ModelQuery[MiscellaneousModelRecord, GenericFieldName]:
        """Return a typed query builder for miscellaneous models."""
        records = self._get_typed_models(
            MODEL_REFERENCE_CATEGORY.miscellaneous,
            record_type=MiscellaneousModelRecord,
        )
        return build_query(records, MiscellaneousModelRecord)

    _CATEGORY_TO_HORDE_TYPE: ClassVar[dict[MODEL_REFERENCE_CATEGORY, HordeModelType]] = {
        MODEL_REFERENCE_CATEGORY.image_generation: "image",
        MODEL_REFERENCE_CATEGORY.text_generation: "text",
    }

    async def get_popular_models(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        limit: int = 10,
        sort_by: Literal["worker_count", "usage_day", "usage_month", "usage_total"] = "worker_count",
        include_workers: bool = False,
    ) -> list[PopularModelResult]:
        """Return models ranked by live Horde popularity metrics.

        Requires the Horde public API to be reachable. Only ``image_generation``
        and ``text_generation`` categories have Horde API data; other categories
        return an empty list.

        Args:
            category: Model category to rank.
            limit: Maximum number of results.
            sort_by: Metric to rank by.
            include_workers: Whether to fetch per-worker details (slower).

        Returns:
            A list of ``PopularModelResult`` sorted by the chosen metric.

        """
        from horde_model_reference.integrations.data_merger import (
            CombinedModelStatistics,
            PopularModelResult,
            merge_category_with_horde_data,
        )
        from horde_model_reference.integrations.horde_api_integration import HordeAPIIntegration

        horde_type: HordeModelType | None = self._CATEGORY_TO_HORDE_TYPE.get(category)
        if horde_type is None:
            return []

        model_reference = self.get_model_reference_or_none(category)
        if model_reference is None:
            return []

        horde_api = HordeAPIIntegration()
        indexed_status, indexed_stats, indexed_workers = await horde_api.get_combined_data_indexed(
            model_type=horde_type,
            include_workers=include_workers,
        )

        merged = merge_category_with_horde_data(
            model_names=model_reference.keys(),
            horde_status=indexed_status,
            horde_stats=indexed_stats,
            workers=indexed_workers,
        )

        def _sort_key(item: tuple[str, object]) -> float:
            _name, stats = item
            if not isinstance(stats, CombinedModelStatistics):
                return 0.0
            if sort_by == "worker_count":
                return float(stats.worker_count)
            if stats.usage_stats is None:
                return 0.0
            if sort_by == "usage_day":
                return float(stats.usage_stats.day)
            if sort_by == "usage_month":
                return float(stats.usage_stats.month)
            return float(stats.usage_stats.total)

        ranked = sorted(merged.items(), key=_sort_key, reverse=True)[:limit]

        results: list[PopularModelResult] = []
        for name, stats in ranked:
            record = model_reference.get(name)
            if record is None:
                continue
            results.append(
                PopularModelResult(
                    name=name,
                    record=record.model_dump(mode="json", exclude_none=True),
                    stats=stats,
                )
            )

        return results

backend instance-attribute

backend: ModelReferenceBackend

The backend provider for model reference data.

_cached_records instance-attribute

_cached_records: dict[
    MODEL_REFERENCE_CATEGORY,
    dict[str, GenericModelRecord] | None,
]

Cache of pydantic model records by category.

_instance class-attribute instance-attribute

_instance: ModelReferenceManager | None = None

_replicate_mode class-attribute instance-attribute

_replicate_mode: ReplicateMode = REPLICA

_prefetch_strategy class-attribute instance-attribute

_prefetch_strategy: PrefetchStrategy = SYNC

_deferred_prefetch_handle class-attribute instance-attribute

_deferred_prefetch_handle: DeferredPrefetchHandle | None = (
    None
)

_async_prefetch_task class-attribute instance-attribute

_async_prefetch_task: Task[None] | None = None

_audit_writer class-attribute instance-attribute

_audit_writer: AuditTrailWriter | None = None

_pending_queue_service class-attribute instance-attribute

_pending_queue_service: PendingQueueService | None = None

_lock class-attribute instance-attribute

_lock: RLock = RLock()

prefetch_strategy property

prefetch_strategy: PrefetchStrategy

Return the prefetch strategy originally configured for this manager.

pending_queue_service property

pending_queue_service: PendingQueueService | None

Return the pending queue service when queueing is enabled.

deferred_prefetch_handle property

deferred_prefetch_handle: DeferredPrefetchHandle | None

Handle that callers can use to trigger a deferred eager fetch.

blip_models property

blip_models: dict[str, BlipModelRecord]

Return a mapping of BLIP model names to their records.

clip_models property

clip_models: dict[str, ClipModelRecord]

Return a mapping of CLIP model names to their records.

codeformer_models property

codeformer_models: dict[str, CodeformerModelRecord]

Return a mapping of CodeFormer model names to their records.

controlnet_models property

controlnet_models: dict[str, ControlNetModelRecord]

Return a mapping of ControlNet model names to their records.

esrgan_models property

esrgan_models: dict[str, EsrganModelRecord]

Return a mapping of ESRGAN model names to their records.

gfpgan_models property

gfpgan_models: dict[str, GfpganModelRecord]

Return a mapping of GFPGAN model names to their records.

safety_checker_models property

safety_checker_models: dict[str, SafetyCheckerModelRecord]

Return a mapping of safety checker model names to their records.

image_generation_models property

image_generation_models: dict[
    str, ImageGenerationModelRecord
]

Return a mapping of image generation model names to their records.

text_generation_models property

text_generation_models: dict[str, TextGenerationModelRecord]

Return a mapping of text generation model names to their records.

audio_generation_models property

audio_generation_models: dict[
    str, AudioGenerationModelRecord
]

Return a mapping of audio generation model names to their records.

video_generation_models property

video_generation_models: dict[
    str, VideoGenerationModelRecord
]

Return a mapping of video generation model names to their records.

miscellaneous_models property

miscellaneous_models: dict[str, MiscellaneousModelRecord]

Return a mapping of miscellaneous model names to their records.

_CATEGORY_TO_HORDE_TYPE class-attribute

_CATEGORY_TO_HORDE_TYPE: dict[
    MODEL_REFERENCE_CATEGORY, HordeModelType
] = {image_generation: "image", text_generation: "text"}

get_instance classmethod

get_instance() -> ModelReferenceManager

Get the singleton instance of ModelReferenceManager.

Returns:

Raises:

  • RuntimeError

    If the instance has not been created yet.

Source code in src/horde_model_reference/model_reference_manager.py
@classmethod
def get_instance(cls) -> ModelReferenceManager:
    """Get the singleton instance of ModelReferenceManager.

    Returns:
        ModelReferenceManager: The singleton instance.

    Raises:
        RuntimeError: If the instance has not been created yet.

    """
    with cls._lock:
        if cls._instance is None:
            raise RuntimeError("ModelReferenceManager instance has not been created yet.")
        return cls._instance

has_instance classmethod

has_instance() -> bool

Check if the singleton instance has been created.

Returns:

  • bool ( bool ) –

    True if the instance exists, False otherwise.

Source code in src/horde_model_reference/model_reference_manager.py
@classmethod
def has_instance(cls) -> bool:
    """Check if the singleton instance has been created.

    Returns:
        bool: True if the instance exists, False otherwise.

    """
    with cls._lock:
        return cls._instance is not None

reset classmethod

reset() -> None

Destroy the singleton instance so a fresh one can be created.

Intended for testing and development only. Production code should not call this — the singleton is designed to live for the process lifetime.

Source code in src/horde_model_reference/model_reference_manager.py
@classmethod
def reset(cls) -> None:
    """Destroy the singleton instance so a fresh one can be created.

    Intended for testing and development only. Production code should not
    call this — the singleton is designed to live for the process lifetime.
    """
    with cls._lock:
        instance = cls._instance
        if instance is None:
            return

        if instance._deferred_prefetch_handle is not None:
            instance._deferred_prefetch_handle = None

        if instance._async_prefetch_task is not None and not instance._async_prefetch_task.done():
            instance._async_prefetch_task.cancel()
            instance._async_prefetch_task = None

        cls._instance = None

_create_backend staticmethod

_create_backend(
    base_path: str | Path,
    replicate_mode: ReplicateMode,
    audit_writer: AuditTrailWriter | None,
) -> ModelReferenceBackend

Create the appropriate backend based on mode and settings.

Parameters:

  • base_path (str | Path) –

    Base path for model reference files.

  • replicate_mode (ReplicateMode) –

    The replication mode.

  • audit_writer (AuditTrailWriter | None) –

    Optional audit writer used by write-capable backends.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
@staticmethod
def _create_backend(
    base_path: str | Path,
    replicate_mode: ReplicateMode,
    audit_writer: AuditTrailWriter | None,
) -> ModelReferenceBackend:
    """Create the appropriate backend based on mode and settings.

    Args:
        base_path: Base path for model reference files.
        replicate_mode: The replication mode.
        audit_writer: Optional audit writer used by write-capable backends.

    Returns:
        ModelReferenceBackend: The configured backend instance.

    """
    logger.debug(f"Creating backend with replicate_mode={replicate_mode}, base_path={base_path}")
    if replicate_mode == ReplicateMode.PRIMARY:
        logger.debug("Creating backend for PRIMARY mode")

        # Check if GitHub seeding will be needed
        github_seeding_will_occur = False
        if horde_model_reference_settings.github_seed_enabled:
            # Quick check to see if any categories are missing
            # (we'll do proper check after backend creation)
            github_seeding_will_occur = True

        filesystem_backend = FileSystemBackend(
            base_path=base_path,
            cache_ttl_seconds=horde_model_reference_settings.cache_ttl_seconds,
            replicate_mode=ReplicateMode.PRIMARY,
            skip_startup_metadata_population=github_seeding_will_occur,
            audit_writer=audit_writer,
        )

        if horde_model_reference_settings.github_seed_enabled:
            logger.info("GitHub seeding enabled for PRIMARY mode")

            all_paths = filesystem_backend.get_all_category_file_paths()
            missing_categories = [cat for cat, path in all_paths.items() if path is None or not path.exists()]

            if missing_categories:
                logger.info(f"Missing categories detected: {missing_categories}. Seeding from GitHub...")

                github_backend = GitHubBackend(
                    base_path=base_path,
                    replicate_mode=ReplicateMode.PRIMARY,
                )

                github_backend.fetch_all_categories(force_refresh=True)
                logger.info("GitHub seeding completed")

                # Populate metadata after seeding
                logger.info("Populating metadata after GitHub seeding")
                filesystem_backend.ensure_all_metadata_populated()
            else:
                logger.debug("All files exist, skipping GitHub seeding")
                # Files exist but seeding was skipped, so run metadata population
                logger.info("Running metadata population check (seeding was skipped)")
                filesystem_backend.ensure_all_metadata_populated()

        if horde_model_reference_settings.redis.use_redis:
            from horde_model_reference.backends.redis_backend import RedisBackend

            logger.info("Wrapping FileSystemBackend with RedisBackend for distributed caching")
            return RedisBackend(
                file_backend=filesystem_backend,
                redis_settings=horde_model_reference_settings.redis,
                cache_ttl_seconds=horde_model_reference_settings.cache_ttl_seconds,
            )

        logger.info("Using FileSystemBackend for single-worker PRIMARY deployment")
        return filesystem_backend

    logger.debug("Creating backend for REPLICA mode")

    github_backend = GitHubBackend(
        base_path=base_path,
        replicate_mode=ReplicateMode.REPLICA,
    )

    if horde_model_reference_settings.primary_api_url:
        logger.info(f"Using HTTPBackend with PRIMARY API: {horde_model_reference_settings.primary_api_url}")
        return HTTPBackend(
            primary_api_url=horde_model_reference_settings.primary_api_url,
            github_backend=github_backend,
            cache_ttl_seconds=horde_model_reference_settings.cache_ttl_seconds,
            timeout_seconds=horde_model_reference_settings.primary_api_timeout,
            enable_github_fallback=horde_model_reference_settings.enable_github_fallback,
        )

    logger.info("Using GitHubBackend only (no PRIMARY API configured)")
    return github_backend

__new__

__new__(
    *,
    backend: ModelReferenceBackend | None = None,
    base_path: str
    | Path = horde_model_reference_paths.base_path,
    replicate_mode: ReplicateMode = horde_model_reference_settings.replicate_mode,
    prefetch_strategy: PrefetchStrategy = PrefetchStrategy.LAZY,
) -> ModelReferenceManager

Create a new instance of ModelReferenceManager.

Uses the singleton pattern to ensure only one instance exists to avoid multiple downloads and conversions. Subsequent instantiations will return the same instance, and an attempt to re-instantiate with different settings will raise an exception.

Parameters:

  • backend (ModelReferenceBackend | None, default: None ) –

    The backend to use for fetching model references. If None, automatically selects the appropriate backend based on replicate_mode and settings: - PRIMARY mode: FileSystemBackend (optionally wrapped with RedisBackend if configured) - REPLICA mode: HTTPBackend (if PRIMARY API URL configured) or GitHubBackend (fallback) Defaults to None.

  • base_path (str | Path, default: base_path ) –

    The base path to use for storing model reference files. Only used if backend is None. Defaults to horde_model_reference_paths.base_path.

  • replicate_mode (ReplicateMode, default: replicate_mode ) –

    The replicate mode to use. - PRIMARY: Local filesystem is source of truth - REPLICA: Fetch from PRIMARY API or GitHub Only used if backend is None. Defaults to horde_model_reference_settings.replicate_mode.

  • prefetch_strategy (PrefetchStrategy, default: LAZY ) –

    Controls whether initial cache warm-up is skipped (LAZY/NONE), performed synchronously, deferred, or executed via background async task. Defaults to PrefetchStrategy.LAZY.

Returns:

Raises:

  • RuntimeError

    If an attempt is made to re-instantiate with different settings.

Source code in src/horde_model_reference/model_reference_manager.py
def __new__(
    cls,
    *,
    backend: ModelReferenceBackend | None = None,
    base_path: str | Path = horde_model_reference_paths.base_path,
    replicate_mode: ReplicateMode = horde_model_reference_settings.replicate_mode,
    prefetch_strategy: PrefetchStrategy = PrefetchStrategy.LAZY,
) -> ModelReferenceManager:
    """Create a new instance of ModelReferenceManager.

    Uses the singleton pattern to ensure only one instance exists to avoid multiple downloads and conversions.
    Subsequent instantiations will return the same instance, and an attempt to re-instantiate with different
    settings will raise an exception.

    Args:
        backend: The backend to use for fetching model references.
            If None, automatically selects the appropriate backend based on replicate_mode and settings:
            - PRIMARY mode: FileSystemBackend (optionally wrapped with RedisBackend if configured)
            - REPLICA mode: HTTPBackend (if PRIMARY API URL configured) or GitHubBackend (fallback)
            Defaults to None.
        base_path: The base path to use for storing model reference files.
            Only used if backend is None. Defaults to horde_model_reference_paths.base_path.
        replicate_mode: The replicate mode to use.
            - PRIMARY: Local filesystem is source of truth
            - REPLICA: Fetch from PRIMARY API or GitHub
            Only used if backend is None. Defaults to horde_model_reference_settings.replicate_mode.
        prefetch_strategy: Controls whether initial cache warm-up is skipped (LAZY/NONE),
            performed synchronously, deferred, or executed via background async task.
            Defaults to PrefetchStrategy.LAZY.

    Returns:
        ModelReferenceManager: The singleton instance of ModelReferenceManager.

    Raises:
        RuntimeError: If an attempt is made to re-instantiate with different settings.

    """
    if not isinstance(prefetch_strategy, PrefetchStrategy):
        try:
            prefetch_strategy = PrefetchStrategy(prefetch_strategy)
        except ValueError as exc:  # pragma: no cover - defensive branch
            raise ValueError(
                f"prefetch_strategy must be one of: {', '.join(strategy.value for strategy in PrefetchStrategy)}"
            ) from exc

    with cls._lock:
        if not cls._instance:
            cls._instance = super().__new__(cls)

            audit_writer: AuditTrailWriter | None = None
            if horde_model_reference_settings.audit.enabled:
                audit_writer = AuditTrailWriter(
                    root_path=horde_model_reference_paths.audit_path,
                    max_file_size_bytes=horde_model_reference_settings.audit.max_segment_bytes,
                )

            if backend is None:
                backend = cls._create_backend(
                    base_path=base_path,
                    replicate_mode=replicate_mode,
                    audit_writer=audit_writer,
                )

            backend_mode = backend.replicate_mode
            if backend_mode != replicate_mode:
                raise RuntimeError(
                    "Backend replicate_mode does not match requested ModelReferenceManager configuration. "
                    f"Backend mode: {backend_mode}, requested mode: {replicate_mode}."
                )

            cls._instance.backend = backend
            cls._instance._replicate_mode = replicate_mode
            if backend.supports_writes():
                cls._instance._audit_writer = audit_writer
                cls._instance._pending_queue_service = cls._build_pending_queue_service(
                    audit_writer=audit_writer,
                )
            else:
                cls._instance._audit_writer = None
                cls._instance._pending_queue_service = None
            cls._instance._cached_records = {}
            cls._instance._deferred_prefetch_handle = None
            cls._instance._async_prefetch_task = None

            # Register invalidation callback so backend can notify us when cache is stale
            cls._instance.backend.register_invalidation_callback(cls._instance._on_backend_invalidated)

            cls._instance._apply_prefetch_strategy(strategy=prefetch_strategy)
        else:
            if backend is not None and backend is not cls._instance.backend:
                raise RuntimeError(
                    "ModelReferenceManager is a singleton and has already been instantiated "
                    "with a different backend."
                )
            if replicate_mode != cls._instance._replicate_mode:
                raise RuntimeError(
                    "ModelReferenceManager is a singleton and has already been instantiated with different "
                    "settings.\nExisting settings: "
                    f"replicate_mode={cls._instance._replicate_mode}.\n"
                    "New settings: "
                    f"replicate_mode={replicate_mode}."
                )
            if prefetch_strategy != cls._instance._prefetch_strategy:
                raise RuntimeError(
                    "ModelReferenceManager is a singleton and has already been instantiated with different "
                    "settings."
                    f"\nExisting prefetch_strategy={cls._instance._prefetch_strategy.value};"
                    f" new prefetch_strategy={prefetch_strategy.value}."
                )

    return cls._instance

_apply_prefetch_strategy

_apply_prefetch_strategy(
    *, strategy: PrefetchStrategy
) -> None

Apply the configured prefetch strategy once the backend is available.

Source code in src/horde_model_reference/model_reference_manager.py
def _apply_prefetch_strategy(self, *, strategy: PrefetchStrategy) -> None:
    """Apply the configured prefetch strategy once the backend is available."""
    self._prefetch_strategy = strategy
    self._deferred_prefetch_handle = None
    self._async_prefetch_task = None

    if strategy in (PrefetchStrategy.LAZY, PrefetchStrategy.NONE):
        logger.debug(f"prefetch skipped because strategy={strategy.value}")
        return

    if strategy is PrefetchStrategy.SYNC:
        self._fetch_from_backend_if_needed(force_refresh=False)
        return

    if strategy is PrefetchStrategy.DEFERRED:
        self._deferred_prefetch_handle = DeferredPrefetchHandle(manager=self, force_refresh=False)
        logger.info(
            "Deferred prefetch handle created; call run_sync/run_async to warm caches without blocking",
        )
        return

    if strategy is PrefetchStrategy.ASYNC:
        self._schedule_async_prefetch(force_refresh=False)
        return

    raise ValueError(f"Unsupported prefetch strategy: {strategy}")

_on_backend_invalidated

_on_backend_invalidated(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

On callback invoked by backend when a category's cache is invalidated.

This ensures the pydantic model cache stays in sync with backend invalidations.

Parameters:

Source code in src/horde_model_reference/model_reference_manager.py
def _on_backend_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """On callback invoked by backend when a category's cache is invalidated.

    This ensures the pydantic model cache stays in sync with backend invalidations.

    Args:
        category: The category that was invalidated.

    """
    logger.debug(f"Backend invalidated category {category}, clearing pydantic cache")
    self._invalidate_cache(category)

_invalidate_cache

_invalidate_cache(
    category: MODEL_REFERENCE_CATEGORY | None = None,
) -> None

Invalidate the cached pydantic model references.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY | None, default: None ) –

    If provided, only invalidate the specific category. If None, invalidate the entire cache.

Source code in src/horde_model_reference/model_reference_manager.py
def _invalidate_cache(self, category: MODEL_REFERENCE_CATEGORY | None = None) -> None:
    """Invalidate the cached pydantic model references.

    Args:
        category: If provided, only invalidate the specific category.
            If None, invalidate the entire cache.

    """
    with self._lock:
        if category is None:
            logger.debug("Invalidating entire cached pydantic records.")
            self._cached_records = {}
        else:
            logger.debug(f"Invalidating cached pydantic records for category: {category}.")
            self._cached_records.pop(category, None)

invalidate_category_cache

invalidate_category_cache(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Explicitly invalidate cached data for a category.

Intended for use by the apply workflow after a successful backend write, so stale data is never served regardless of backend callback timing.

Parameters:

Source code in src/horde_model_reference/model_reference_manager.py
def invalidate_category_cache(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Explicitly invalidate cached data for a category.

    Intended for use by the apply workflow after a successful backend write,
    so stale data is never served regardless of backend callback timing.

    Args:
        category: The category whose cache should be dropped.

    """
    self._invalidate_cache(category)

_fetch_from_backend_if_needed

_fetch_from_backend_if_needed(
    force_refresh: bool,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Fetch references from backend if needed.

Parameters:

  • force_refresh (bool) –

    Whether to force refresh all categories.

Source code in src/horde_model_reference/model_reference_manager.py
def _fetch_from_backend_if_needed(
    self,
    force_refresh: bool,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Fetch references from backend if needed.

    Args:
        force_refresh: Whether to force refresh all categories.

    """
    return self.backend.fetch_all_categories(force_refresh=force_refresh)

_fetch_from_backend_if_needed_async async

_fetch_from_backend_if_needed_async(
    force_refresh: bool, httpx_client: AsyncClient | None
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Asynchronously fetch references from backend if needed.

Parameters:

  • force_refresh (bool) –

    Whether to force refresh all categories.

  • httpx_client (AsyncClient | None) –

    An optional httpx async client to use.

Source code in src/horde_model_reference/model_reference_manager.py
async def _fetch_from_backend_if_needed_async(
    self,
    force_refresh: bool,
    httpx_client: httpx.AsyncClient | None,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Asynchronously fetch references from backend if needed.

    Args:
        force_refresh: Whether to force refresh all categories.
        httpx_client: An optional httpx async client to use.

    """
    return await self.backend.fetch_all_categories_async(
        force_refresh=force_refresh,
        httpx_client=httpx_client,
    )

_build_pending_queue_service staticmethod

_build_pending_queue_service(
    *, audit_writer: AuditTrailWriter | None
) -> PendingQueueService | None

Create the pending queue service when enabled.

Source code in src/horde_model_reference/model_reference_manager.py
@staticmethod
def _build_pending_queue_service(
    *,
    audit_writer: AuditTrailWriter | None,
) -> PendingQueueService | None:
    """Create the pending queue service when enabled."""
    if not horde_model_reference_settings.pending_queue.enabled:
        return None

    from horde_model_reference.pending_queue.service import PendingQueueService
    from horde_model_reference.pending_queue.store import PendingQueueStore

    store = PendingQueueStore(root_path=horde_model_reference_paths.pending_queue_path)
    return PendingQueueService(store=store, audit_writer=audit_writer)

create_deferred_prefetch_handle

create_deferred_prefetch_handle(
    *, force_refresh: bool = False
) -> DeferredPrefetchHandle

Create a deferred prefetch handle tied to this manager.

Parameters:

  • force_refresh (bool, default: False ) –

    Whether the handle should bypass backend caches.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def create_deferred_prefetch_handle(
    self,
    *,
    force_refresh: bool = False,
) -> DeferredPrefetchHandle:
    """Create a deferred prefetch handle tied to this manager.

    Args:
        force_refresh: Whether the handle should bypass backend caches.

    Returns:
        DeferredPrefetchHandle: Handle that can execute the warm-up later.

    """
    handle = DeferredPrefetchHandle(manager=self, force_refresh=force_refresh)
    self._deferred_prefetch_handle = handle
    return handle

_schedule_async_prefetch

_schedule_async_prefetch(*, force_refresh: bool) -> None

Schedule an async cache warm-up when an event loop is available.

Source code in src/horde_model_reference/model_reference_manager.py
def _schedule_async_prefetch(self, *, force_refresh: bool) -> None:
    """Schedule an async cache warm-up when an event loop is available."""
    handle = self.create_deferred_prefetch_handle(force_refresh=force_refresh)

    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        logger.warning(
            "PrefetchStrategy.ASYNC requested but no running event loop detected; "
            "exposing deferred handle for manual execution instead.",
        )
        self._async_prefetch_task = None
        return

    logger.info("Scheduling asynchronous prefetch warm-up task")
    task = loop.create_task(handle.run_async())
    self._async_prefetch_task = task

    def _log_completion(completed: asyncio.Task[None]) -> None:
        try:
            completed.result()
        except Exception as exc:  # pragma: no cover - best-effort logging
            logger.error("Deferred async prefetch failed: %s", exc)

    task.add_done_callback(_log_completion)

warm_cache_async async

warm_cache_async(
    *,
    force_refresh: bool = False,
    httpx_client: AsyncClient | None = None,
) -> None

Warm cached pydantic records using backend async APIs.

Parameters:

  • force_refresh (bool, default: False ) –

    Whether to bypass backend caches while warming.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional shared async client for HTTP backends.

Source code in src/horde_model_reference/model_reference_manager.py
async def warm_cache_async(
    self,
    *,
    force_refresh: bool = False,
    httpx_client: httpx.AsyncClient | None = None,
) -> None:
    """Warm cached pydantic records using backend async APIs.

    Args:
        force_refresh: Whether to bypass backend caches while warming.
        httpx_client: Optional shared async client for HTTP backends.

    """
    await self.get_all_model_references_or_none_async(
        overwrite_existing=force_refresh,
        httpx_client=httpx_client,
    )

ensure_ready_async async

ensure_ready_async(
    *,
    overwrite_existing: bool = False,
    httpx_client: AsyncClient | None = None,
) -> None

Ensure cached references exist by delegating to warm_cache_async.

Parameters:

  • overwrite_existing (bool, default: False ) –

    Whether to bypass backend caches while warming.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional shared async client for HTTP backends.

Source code in src/horde_model_reference/model_reference_manager.py
async def ensure_ready_async(
    self,
    *,
    overwrite_existing: bool = False,
    httpx_client: httpx.AsyncClient | None = None,
) -> None:
    """Ensure cached references exist by delegating to ``warm_cache_async``.

    Args:
        overwrite_existing: Whether to bypass backend caches while warming.
        httpx_client: Optional shared async client for HTTP backends.

    """
    await self.warm_cache_async(force_refresh=overwrite_existing, httpx_client=httpx_client)

_file_json_dict_to_model_reference staticmethod

_file_json_dict_to_model_reference(
    category: MODEL_REFERENCE_CATEGORY,
    file_json_dict: dict[str, Any] | None,
    safe_mode: bool = False,
) -> dict[str, GenericModelRecord] | None

Return a model reference object from a JSON dictionary, or None if conversion failed.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The target model reference category to convert.

  • file_json_dict (dict[str, Any] | None) –

    The dict object representing the model reference.

  • safe_mode (bool, default: False ) –

    Whether to raise exceptions on failure. If False, exceptions are caught and None is returned. Defaults to False.

Returns:

  • dict[str, GenericModelRecord] | None

    dict[str, GenericModelRecord] | None: The dict representing the model reference, or None if conversion failed.

Source code in src/horde_model_reference/model_reference_manager.py
@staticmethod
def _file_json_dict_to_model_reference(
    category: MODEL_REFERENCE_CATEGORY,
    file_json_dict: dict[str, Any] | None,
    safe_mode: bool = False,
) -> dict[str, GenericModelRecord] | None:
    """Return a model reference object from a JSON dictionary, or None if conversion failed.

    Args:
        category: The target model reference category to convert.
        file_json_dict: The dict object representing the model reference.
        safe_mode: Whether to raise exceptions on failure. If False, exceptions are caught
            and None is returned. Defaults to False.

    Returns:
        dict[str, GenericModelRecord] | None: The dict representing the model reference,
            or None if conversion failed.

    """
    if file_json_dict is None:
        logger.warning(f"File dict json is None for {category}.")
        return None

    if category in categories_managed_elsewhere:
        logger.info(f"Skipping conversion for category: {category} (managed elsewhere)")
        return None

    try:
        record_type = MODEL_RECORD_TYPE_LOOKUP.get(category, GenericModelRecord)
        model_reference: dict[str, GenericModelRecord] = {}
        for model_value in file_json_dict.values():
            model_instance = record_type.model_validate(model_value)
            model_reference[model_instance.name] = model_instance

        return model_reference

    except Exception as e:
        if not safe_mode:
            logger.exception(f"Failed to convert file dict JSON to model reference for {category}: {e}")
            return None
        raise e

model_reference_to_json_dict staticmethod

model_reference_to_json_dict(
    model_reference: dict[str, GenericModelRecord],
    safe_mode: bool = False,
) -> dict[str, Any] | None

Return a JSON dictionary from a model reference object, or None if conversion failed.

Parameters:

  • model_reference (dict[str, GenericModelRecord]) –

    The model reference object.

  • safe_mode (bool, default: False ) –

    Whether to raise exceptions on failure. If False, exceptions are caught and None is returned. Use model_reference_to_json_dict_safe() for the better type hinting if you intend to use this. Defaults to False.

Returns:

  • dict[str, Any] | None

    dict | None: The dict representing the model reference, or None if conversion failed.

Source code in src/horde_model_reference/model_reference_manager.py
@staticmethod
def model_reference_to_json_dict(
    model_reference: dict[str, GenericModelRecord],
    safe_mode: bool = False,
) -> dict[str, Any] | None:
    """Return a JSON dictionary from a model reference object, or None if conversion failed.

    Args:
        model_reference: The model reference object.
        safe_mode: Whether to raise exceptions on failure. If False, exceptions are caught
            and None is returned. Use `model_reference_to_json_dict_safe()` for the better type hinting if you
            intend to use this. Defaults to False.

    Returns:
        dict | None: The dict representing the model reference, or None if conversion failed.

    """
    if model_reference is None:
        raise ValueError("model_reference cannot be None")

    try:
        return {
            name: record.model_dump(
                exclude_unset=True,
            )
            for name, record in model_reference.items()
        }
    except Exception as e:
        if not safe_mode:
            logger.exception(f"Failed to convert model reference to JSON: {e}")
            return None

        raise e

model_reference_to_json_dict_safe staticmethod

model_reference_to_json_dict_safe(
    model_reference: dict[str, GenericModelRecord],
) -> dict[str, Any]

Return a JSON dictionary from a model reference object.

Raises an exception if conversion fails.

Parameters:

Returns:

  • dict ( dict[str, Any] ) –

    The dict representing the model reference.

Source code in src/horde_model_reference/model_reference_manager.py
@staticmethod
def model_reference_to_json_dict_safe(
    model_reference: dict[str, GenericModelRecord],
) -> dict[str, Any]:
    """Return a JSON dictionary from a model reference object.

    Raises an exception if conversion fails.

    Args:
        model_reference: The model reference object.

    Returns:
        dict: The dict representing the model reference.

    """
    json_dict_safe = ModelReferenceManager.model_reference_to_json_dict(model_reference, safe_mode=True)

    if json_dict_safe is None:
        raise RuntimeError("Conversion to JSON dict failed in safe mode, but no exception was raised.")

    return json_dict_safe

_get_all_cached_model_references

_get_all_cached_model_references(
    safe_mode: bool = False,
) -> dict[
    MODEL_REFERENCE_CATEGORY,
    dict[str, GenericModelRecord] | None,
]

Get all cached pydantic model references.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def _get_all_cached_model_references(
    self,
    safe_mode: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]:
    """Get all cached pydantic model references.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]: A mapping of model reference
            categories to their corresponding pydantic model objects.

    """
    with self._lock:
        logger.debug(f"Returning {len(self._cached_records)} cached pydantic model references.")
        return dict(self._cached_records)

_evaluate_cache_state

_evaluate_cache_state(
    *, overwrite_existing: bool, safe_mode: bool
) -> tuple[
    bool,
    dict[
        MODEL_REFERENCE_CATEGORY,
        dict[str, GenericModelRecord] | None,
    ],
    list[MODEL_REFERENCE_CATEGORY],
]

Return whether cached data can be reused plus categories needing refresh.

Source code in src/horde_model_reference/model_reference_manager.py
def _evaluate_cache_state(
    self,
    *,
    overwrite_existing: bool,
    safe_mode: bool,
) -> tuple[
    bool,
    dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None],
    list[MODEL_REFERENCE_CATEGORY],
]:
    """Return whether cached data can be reused plus categories needing refresh."""
    with self._lock:
        refresh_map = {category: self.backend.needs_refresh(category) for category in MODEL_REFERENCE_CATEGORY}
        all_categories_cached = all(cat in self._cached_records for cat in MODEL_REFERENCE_CATEGORY)
        needs_backend_refresh = overwrite_existing or any(refresh_map.values())

        if not overwrite_existing and all_categories_cached and not needs_backend_refresh:
            logger.debug("Using fully cached pydantic model references.")
            return True, self._get_all_cached_model_references(safe_mode=safe_mode), []

        categories_to_load: list[MODEL_REFERENCE_CATEGORY] = []
        for category in MODEL_REFERENCE_CATEGORY:
            cached_value = self._cached_records.get(category)
            if (
                overwrite_existing
                or category not in self._cached_records
                or cached_value is None
                or refresh_map[category]
            ):
                categories_to_load.append(category)

        return False, {}, categories_to_load

_load_categories_from_payload

_load_categories_from_payload(
    *,
    categories_to_load: Iterable[MODEL_REFERENCE_CATEGORY],
    payload: dict[
        MODEL_REFERENCE_CATEGORY, dict[str, Any] | None
    ]
    | None,
    overwrite_existing: bool,
    safe_mode: bool,
) -> None

Convert backend payload into cached pydantic models for selected categories.

Source code in src/horde_model_reference/model_reference_manager.py
def _load_categories_from_payload(
    self,
    *,
    categories_to_load: Iterable[MODEL_REFERENCE_CATEGORY],
    payload: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] | None,
    overwrite_existing: bool,
    safe_mode: bool,
) -> None:
    """Convert backend payload into cached pydantic models for selected categories."""
    normalized_payload = payload or {}
    prepared_payload: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}
    missing_payload: list[MODEL_REFERENCE_CATEGORY] = []

    for category in categories_to_load:
        if category in normalized_payload:
            prepared_payload[category] = normalized_payload[category]
        else:
            missing_payload.append(category)

    if missing_payload:
        logger.debug(
            "Backend payload missing %d categories; falling back to per-category fetch: %s",
            len(missing_payload),
            missing_payload,
        )
        for category in missing_payload:
            prepared_payload[category] = self.backend.fetch_category(
                category,
                force_refresh=overwrite_existing,
            )

    with self._lock:
        for category, file_json in prepared_payload.items():
            model_reference = self._file_json_dict_to_model_reference(
                category,
                file_json,
                safe_mode=safe_mode,
            )
            self._cached_records[category] = model_reference

get_all_model_references_or_none

get_all_model_references_or_none(
    overwrite_existing: bool = False,
    *,
    safe_mode: bool = False,
) -> dict[
    MODEL_REFERENCE_CATEGORY,
    dict[str, GenericModelRecord] | None,
]

Return a mapping of all model reference categories to their corresponding model reference objects.

Note that values may be None if the model reference file could not be found or parsed.

Parameters:

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload of all model reference files. Defaults to False.

  • safe_mode (bool, default: False ) –

    Whether to raise exceptions on failure. If False, exceptions are caught and None is returned for that category. Defaults to False. Use get_all_model_references() for the better type hinting if you intend to use this.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def get_all_model_references_or_none(
    self,
    overwrite_existing: bool = False,
    *,
    safe_mode: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]:
    """Return a mapping of all model reference categories to their corresponding model reference objects.

    Note that values may be None if the model reference file could not be found or parsed.

    Args:
        overwrite_existing: Whether to force a redownload of all model reference files.
            Defaults to False.
        safe_mode: Whether to raise exceptions on failure. If False, exceptions are caught
            and None is returned for that category. Defaults to False. Use `get_all_model_references()`
            for the better type hinting if you intend to use this.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]: A mapping of model reference
            categories to their corresponding model reference objects.

    """
    use_cache, cached_result, categories_to_load = self._evaluate_cache_state(
        overwrite_existing=overwrite_existing,
        safe_mode=safe_mode,
    )

    if use_cache:
        return cached_result

    logger.debug("Fetching model references from backend as needed.")
    backend_payload = self._fetch_from_backend_if_needed(force_refresh=overwrite_existing)

    if categories_to_load:
        logger.debug("Loading %d model reference categories: %s", len(categories_to_load), categories_to_load)
        self._load_categories_from_payload(
            categories_to_load=categories_to_load,
            payload=backend_payload,
            overwrite_existing=overwrite_existing,
            safe_mode=safe_mode,
        )

    return self._get_all_cached_model_references(safe_mode=safe_mode)

_build_safe_reference_view

_build_safe_reference_view(
    all_references: dict[
        MODEL_REFERENCE_CATEGORY,
        dict[str, GenericModelRecord] | None,
    ],
) -> dict[
    MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]
]

Convert a possibly sparse reference view into a safe mapping with logging.

Parameters:

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def _build_safe_reference_view(
    self,
    all_references: dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None],
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]:
    """Convert a possibly sparse reference view into a safe mapping with logging.

    Args:
        all_references: Mapping of categories to model reference dicts or None.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]: Mapping where
        missing categories map to empty dicts.

    """
    safe_references: dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]] = {}
    missing_references: list[MODEL_REFERENCE_CATEGORY] = []
    for category, reference in all_references.items():
        if reference is None:
            missing_references.append(category)
            safe_references[category] = {}
        else:
            safe_references[category] = reference

    if missing_references:
        logger.error(f"Missing model references for categories: {missing_references}")

    return safe_references

get_all_model_references

get_all_model_references(
    overwrite_existing: bool = False,
) -> dict[
    MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]
]

Return a mapping of all model reference categories to their corresponding model reference objects.

If a model reference file could not be found or parsed, an exception is raised. If you want to allow missing model references, use get_all_model_references_or_none() instead.

Parameters:

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload of all model reference files. Defaults to False.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def get_all_model_references(
    self,
    overwrite_existing: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]:
    """Return a mapping of all model reference categories to their corresponding model reference objects.

    If a model reference file could not be found or parsed, an exception is raised. If you want to allow
    missing model references, use `get_all_model_references_or_none()` instead.

    Args:
        overwrite_existing: Whether to force a redownload of all model reference files.
            Defaults to False.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]: A mapping of model reference
            categories to their corresponding model reference objects.

    """
    all_references = self.get_all_model_references_or_none(overwrite_existing=overwrite_existing)
    return self._build_safe_reference_view(all_references)

get_all_model_references_or_none_async async

get_all_model_references_or_none_async(
    overwrite_existing: bool = False,
    *,
    safe_mode: bool = False,
    httpx_client: AsyncClient | None = None,
) -> dict[
    MODEL_REFERENCE_CATEGORY,
    dict[str, GenericModelRecord] | None,
]

Return model references asynchronously without enforcing presence.

Parameters:

  • overwrite_existing (bool, default: False ) –

    Whether to force backend refresh.

  • safe_mode (bool, default: False ) –

    Whether to propagate conversion errors.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional shared async client for HTTP backends.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
async def get_all_model_references_or_none_async(
    self,
    overwrite_existing: bool = False,
    *,
    safe_mode: bool = False,
    httpx_client: httpx.AsyncClient | None = None,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]:
    """Return model references asynchronously without enforcing presence.

    Args:
        overwrite_existing: Whether to force backend refresh.
        safe_mode: Whether to propagate conversion errors.
        httpx_client: Optional shared async client for HTTP backends.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord] | None]: Possibly
        sparse mapping keyed by category.

    """
    use_cache, cached_result, categories_to_load = self._evaluate_cache_state(
        overwrite_existing=overwrite_existing,
        safe_mode=safe_mode,
    )

    if use_cache:
        return cached_result

    logger.debug("Asynchronously fetching model references from backend as needed.")
    backend_payload = await self._fetch_from_backend_if_needed_async(
        force_refresh=overwrite_existing,
        httpx_client=httpx_client,
    )

    if categories_to_load:
        logger.debug("Loading %d model reference categories via async payload", len(categories_to_load))
        self._load_categories_from_payload(
            categories_to_load=categories_to_load,
            payload=backend_payload,
            overwrite_existing=overwrite_existing,
            safe_mode=safe_mode,
        )

    return self._get_all_cached_model_references(safe_mode=safe_mode)

get_all_model_references_async async

get_all_model_references_async(
    overwrite_existing: bool = False,
    *,
    httpx_client: AsyncClient | None = None,
) -> dict[
    MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]
]

Return all model references asynchronously, raising on missing categories.

Parameters:

  • overwrite_existing (bool, default: False ) –

    Whether to force backend refresh.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional shared async client for HTTP backends.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
async def get_all_model_references_async(
    self,
    overwrite_existing: bool = False,
    *,
    httpx_client: httpx.AsyncClient | None = None,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]:
    """Return all model references asynchronously, raising on missing categories.

    Args:
        overwrite_existing: Whether to force backend refresh.
        httpx_client: Optional shared async client for HTTP backends.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, dict[str, GenericModelRecord]]: Mapping with
        empty dicts substituted for missing categories.

    """
    all_references = await self.get_all_model_references_or_none_async(
        overwrite_existing=overwrite_existing,
        httpx_client=httpx_client,
    )
    return self._build_safe_reference_view(all_references)

get_model_reference_or_none

get_model_reference_or_none(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> dict[str, GenericModelRecord] | None

Return the model reference object for a specific category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to retrieve.

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload. Defaults to False.

Returns:

  • dict[str, GenericModelRecord] | None

    dict[str, GenericModelRecord] | None: The model reference object for the category, or None if not found.

Source code in src/horde_model_reference/model_reference_manager.py
def get_model_reference_or_none(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> dict[str, GenericModelRecord] | None:
    """Return the model reference object for a specific category.

    Args:
        category: The category to retrieve.
        overwrite_existing: Whether to force a redownload. Defaults to False.

    Returns:
        dict[str, GenericModelRecord] | None: The model reference object for the category,
            or None if not found.

    """
    all_references = self.get_all_model_references_or_none(overwrite_existing=overwrite_existing)
    return all_references.get(category)

get_model_reference_or_none_async async

get_model_reference_or_none_async(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
    *,
    httpx_client: AsyncClient | None = None,
) -> dict[str, GenericModelRecord] | None

Return a single category's references asynchronously without strict enforcement.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Target category to load.

  • overwrite_existing (bool, default: False ) –

    Whether to force backend refresh.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional shared async client for HTTP backends.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
async def get_model_reference_or_none_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
    *,
    httpx_client: httpx.AsyncClient | None = None,
) -> dict[str, GenericModelRecord] | None:
    """Return a single category's references asynchronously without strict enforcement.

    Args:
        category: Target category to load.
        overwrite_existing: Whether to force backend refresh.
        httpx_client: Optional shared async client for HTTP backends.

    Returns:
        dict[str, GenericModelRecord] | None: Mapping of model names or None.

    """
    all_references = await self.get_all_model_references_or_none_async(
        overwrite_existing=overwrite_existing,
        httpx_client=httpx_client,
    )
    return all_references.get(category)

get_model_reference

get_model_reference(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> dict[str, GenericModelRecord]

Return the model reference object for a specific category.

Raises an exception if the model reference could not be found or parsed. If you want to allow missing model references, use get_model_reference_or_none() instead.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to retrieve.

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload. Defaults to False.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def get_model_reference(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> dict[str, GenericModelRecord]:
    """Return the model reference object for a specific category.

    Raises an exception if the model reference could not be found or parsed.
    If you want to allow missing model references, use `get_model_reference_or_none()` instead.

    Args:
        category: The category to retrieve.
        overwrite_existing: Whether to force a redownload. Defaults to False.

    Returns:
        dict[str, GenericModelRecord]: The model reference object for the category.

    """
    model_reference = self.get_model_reference_or_none(
        category,
        overwrite_existing=overwrite_existing,
    )
    if model_reference is None:
        raise RuntimeError(f"Model reference for category {category} not found or could not be parsed.")

    return model_reference

get_model_reference_async async

get_model_reference_async(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
    *,
    httpx_client: AsyncClient | None = None,
) -> dict[str, GenericModelRecord]

Return a single category's references asynchronously, raising if missing.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Target category to load.

  • overwrite_existing (bool, default: False ) –

    Whether to force backend refresh.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional shared async client for HTTP backends.

Returns:

Raises:

  • RuntimeError

    If the category is missing or could not be parsed.

Source code in src/horde_model_reference/model_reference_manager.py
async def get_model_reference_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
    *,
    httpx_client: httpx.AsyncClient | None = None,
) -> dict[str, GenericModelRecord]:
    """Return a single category's references asynchronously, raising if missing.

    Args:
        category: Target category to load.
        overwrite_existing: Whether to force backend refresh.
        httpx_client: Optional shared async client for HTTP backends.

    Returns:
        dict[str, GenericModelRecord]: Mapping of model names for the category.

    Raises:
        RuntimeError: If the category is missing or could not be parsed.

    """
    model_reference = await self.get_model_reference_or_none_async(
        category,
        overwrite_existing=overwrite_existing,
        httpx_client=httpx_client,
    )
    if model_reference is None:
        raise RuntimeError(f"Model reference for category {category} not found or could not be parsed.")

    return model_reference

get_model_names_or_none

get_model_names_or_none(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> list[str] | None

Return a list of model names for a specific category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to retrieve.

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload. Defaults to False.

Returns:

  • list[str] | None

    list[str] | None: The list of model names for the category, or None if not found.

Source code in src/horde_model_reference/model_reference_manager.py
def get_model_names_or_none(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> list[str] | None:
    """Return a list of model names for a specific category.

    Args:
        category: The category to retrieve.
        overwrite_existing: Whether to force a redownload. Defaults to False.

    Returns:
        list[str] | None: The list of model names for the category, or None if not found.

    """
    model_reference = self.get_model_reference_or_none(
        category,
        overwrite_existing=overwrite_existing,
    )
    if model_reference is None:
        return None

    return list(model_reference.keys())

get_model_names

get_model_names(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> list[str]

Return a list of model names for a specific category.

Raises an exception if the model reference could not be found or parsed. If you want to allow missing model references, use get_model_names_or_none() instead.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to retrieve.

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload. Defaults to False.

Returns:

  • list[str]

    list[str]: The list of model names for the category.

Source code in src/horde_model_reference/model_reference_manager.py
def get_model_names(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> list[str]:
    """Return a list of model names for a specific category.

    Raises an exception if the model reference could not be found or parsed.
    If you want to allow missing model references, use `get_model_names_or_none()` instead.

    Args:
        category: The category to retrieve.
        overwrite_existing: Whether to force a redownload. Defaults to False.

    Returns:
        list[str]: The list of model names for the category.

    """
    model_reference = self.get_model_reference(
        category,
        overwrite_existing=overwrite_existing,
    )
    if model_reference is None:
        raise RuntimeError(f"Model reference for category {category} not found or could not be parsed.")

    return list(model_reference.keys())

get_model_or_none

get_model_or_none(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    overwrite_existing: bool = False,
) -> GenericModelRecord | None

Return a specific model from a category.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to retrieve.

  • model_name (str) –

    The name of the model within the category.

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload. Defaults to False.

Returns:

  • GenericModelRecord | None

    GenericModelRecord | None: The model record, or None if not found.

Source code in src/horde_model_reference/model_reference_manager.py
def get_model_or_none(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    overwrite_existing: bool = False,
) -> GenericModelRecord | None:
    """Return a specific model from a category.

    Args:
        category: The category to retrieve.
        model_name: The name of the model within the category.
        overwrite_existing: Whether to force a redownload. Defaults to False.

    Returns:
        GenericModelRecord | None: The model record, or None if not found.

    """
    model_reference = self.get_model_reference_or_none(
        category,
        overwrite_existing=overwrite_existing,
    )
    if model_reference is None:
        return None

    return model_reference.get(model_name)

get_model

get_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    overwrite_existing: bool = False,
) -> GenericModelRecord

Return a specific model from a category.

Raises an exception if the model could not be found or parsed. If you want to allow missing models, use get_model_or_none() instead.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to retrieve.

  • model_name (str) –

    The name of the model within the category.

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload. Defaults to False.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def get_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    overwrite_existing: bool = False,
) -> GenericModelRecord:
    """Return a specific model from a category.

    Raises an exception if the model could not be found or parsed.
    If you want to allow missing models, use `get_model_or_none()` instead.

    Args:
        category: The category to retrieve.
        model_name: The name of the model within the category.
        overwrite_existing: Whether to force a redownload. Defaults to False.

    Returns:
        GenericModelRecord: The model record.

    """
    model_reference = self.get_model_reference(
        category,
        overwrite_existing=overwrite_existing,
    )

    model_record = model_reference.get(model_name)
    if model_record is None:
        raise RuntimeError(f"Model {model_name} not found in category {category}.")

    return model_record

get_raw_model_reference_json

get_raw_model_reference_json(
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> dict[str, Any] | None

Return the raw JSON dict for a specific category without pydantic validation.

This method delegates to the backend to fetch the raw JSON data directly, avoiding the overhead of creating pydantic models. Ideal for API endpoints that need fast JSON responses.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to retrieve.

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload. Defaults to False.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The raw JSON dict for the category, or None if not found.

Source code in src/horde_model_reference/model_reference_manager.py
def get_raw_model_reference_json(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    overwrite_existing: bool = False,
) -> dict[str, Any] | None:
    """Return the raw JSON dict for a specific category without pydantic validation.

    This method delegates to the backend to fetch the raw JSON data directly,
    avoiding the overhead of creating pydantic models. Ideal for API endpoints
    that need fast JSON responses.

    Args:
        category: The category to retrieve.
        overwrite_existing: Whether to force a redownload. Defaults to False.

    Returns:
        dict[str, Any] | None: The raw JSON dict for the category, or None if not found.

    """
    return self.backend.fetch_category(category, force_refresh=overwrite_existing)

get_raw_model_json

get_raw_model_json(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    overwrite_existing: bool = False,
) -> dict[str, Any] | None

Return the raw JSON dict for a specific model in a category without pydantic validation.

This method delegates to the backend to fetch the raw JSON data directly, avoiding the overhead of creating pydantic models. Ideal for API endpoints that need fast JSON responses.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to retrieve.

  • model_name (str) –

    The name of the model within the category.

  • overwrite_existing (bool, default: False ) –

    Whether to force a redownload. Defaults to False.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The raw JSON dict for the model, or None if not found.

Source code in src/horde_model_reference/model_reference_manager.py
def get_raw_model_json(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    overwrite_existing: bool = False,
) -> dict[str, Any] | None:
    """Return the raw JSON dict for a specific model in a category without pydantic validation.

    This method delegates to the backend to fetch the raw JSON data directly,
    avoiding the overhead of creating pydantic models. Ideal for API endpoints
    that need fast JSON responses.

    Args:
        category: The category to retrieve.
        model_name: The name of the model within the category.
        overwrite_existing: Whether to force a redownload. Defaults to False.

    Returns:
        dict[str, Any] | None: The raw JSON dict for the model, or None if not found.

    """
    category_json = self.backend.fetch_category(category, force_refresh=overwrite_existing)

    if category_json is None:
        return None

    return category_json.get(model_name)

_get_typed_models

_get_typed_models(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    record_type: type[TModelRecord],
) -> dict[str, TModelRecord]

Return a typed mapping for the requested category.

Source code in src/horde_model_reference/model_reference_manager.py
def _get_typed_models(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    record_type: type[TModelRecord],
) -> dict[str, TModelRecord]:
    """Return a typed mapping for the requested category."""
    model_reference = self.get_model_reference(category)

    if len(model_reference) == 0:
        return {}

    typed_reference: dict[str, TModelRecord] = {}
    for name, record in model_reference.items():
        if not isinstance(record, record_type):
            raise RuntimeError(
                f"Some records in {category.value} category are not {record_type.__name__} instances."
            )
        typed_reference[name] = record

    return typed_reference

query

query(
    category: Literal["image_generation"],
) -> ImageGenerationQuery
query(
    category: Literal["text_generation"],
) -> TextModelQuery
query(category: Literal['controlnet']) -> ControlNetQuery
query(
    category: str,
) -> ModelQuery[
    GenericModelRecord,
    GenericFieldName
    | ImageGenFieldName
    | TextGenFieldName
    | ControlNetFieldName,
]
query(
    category: MODEL_REFERENCE_CATEGORY | str,
) -> (
    ImageGenerationQuery
    | TextModelQuery
    | ControlNetQuery
    | ModelQuery[
        GenericModelRecord,
        GenericFieldName
        | ImageGenFieldName
        | TextGenFieldName
        | ControlNetFieldName,
    ]
)

Return a query builder for a single category.

When called with a literal category string, the return type is narrowed to the corresponding typed query builder (e.g. ImageGenerationQuery for "image_generation").

Parameters:

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def query(
    self,
    category: MODEL_REFERENCE_CATEGORY | str,
) -> (
    ImageGenerationQuery
    | TextModelQuery
    | ControlNetQuery
    | ModelQuery[GenericModelRecord, GenericFieldName | ImageGenFieldName | TextGenFieldName | ControlNetFieldName]
):
    """Return a query builder for a single category.

    When called with a literal category string, the return type is
    narrowed to the corresponding typed query builder (e.g.
    ``ImageGenerationQuery`` for ``"image_generation"``).

    Args:
        category: The model reference category to query.

    Returns:
        A ``ModelQuery`` (or typed subclass) ready for chaining filters.

    """
    if isinstance(category, str):
        category = MODEL_REFERENCE_CATEGORY(category)

    if category == MODEL_REFERENCE_CATEGORY.image_generation:
        return self.query_image_generation()
    if category == MODEL_REFERENCE_CATEGORY.text_generation:
        return self.query_text_generation()
    if category == MODEL_REFERENCE_CATEGORY.controlnet:
        return self.query_controlnet()

    records = self.get_model_reference(category)
    record_type = MODEL_RECORD_TYPE_LOOKUP.get(category, GenericModelRecord)
    return build_query(records, record_type)

query_all

query_all() -> ModelQuery[
    GenericModelRecord,
    GenericFieldName
    | ImageGenFieldName
    | TextGenFieldName
    | ControlNetFieldName,
]

Return a query builder spanning all categories.

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
def query_all(
    self,
) -> ModelQuery[GenericModelRecord, GenericFieldName | ImageGenFieldName | TextGenFieldName | ControlNetFieldName]:
    """Return a query builder spanning all categories.

    Returns:
        A ``ModelQuery[GenericModelRecord]`` over every cached record.

    """
    all_refs = self.get_all_model_references()
    return build_cross_category_query(all_refs)

query_image_generation

query_image_generation() -> ImageGenerationQuery

Return a typed query builder for image generation models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_image_generation(self) -> ImageGenerationQuery:
    """Return a typed query builder for image generation models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.image_generation,
        record_type=ImageGenerationModelRecord,
    )
    return build_image_query(records)

query_text_generation

query_text_generation() -> TextModelQuery

Return a typed query builder for text generation models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_text_generation(self) -> TextModelQuery:
    """Return a typed query builder for text generation models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.text_generation,
        record_type=TextGenerationModelRecord,
    )
    return build_text_query(records)

query_controlnet

query_controlnet() -> ControlNetQuery

Return a typed query builder for ControlNet models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_controlnet(self) -> ControlNetQuery:
    """Return a typed query builder for ControlNet models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.controlnet,
        record_type=ControlNetModelRecord,
    )
    return build_controlnet_query(records)

query_blip

query_blip() -> ModelQuery[
    BlipModelRecord, GenericFieldName
]

Return a typed query builder for BLIP models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_blip(self) -> ModelQuery[BlipModelRecord, GenericFieldName]:
    """Return a typed query builder for BLIP models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.blip,
        record_type=BlipModelRecord,
    )
    return build_query(records, BlipModelRecord)

query_clip

query_clip() -> ModelQuery[
    ClipModelRecord, GenericFieldName
]

Return a typed query builder for CLIP models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_clip(self) -> ModelQuery[ClipModelRecord, GenericFieldName]:
    """Return a typed query builder for CLIP models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.clip,
        record_type=ClipModelRecord,
    )
    return build_query(records, ClipModelRecord)

query_codeformer

query_codeformer() -> ModelQuery[
    CodeformerModelRecord, GenericFieldName
]

Return a typed query builder for CodeFormer models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_codeformer(self) -> ModelQuery[CodeformerModelRecord, GenericFieldName]:
    """Return a typed query builder for CodeFormer models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.codeformer,
        record_type=CodeformerModelRecord,
    )
    return build_query(records, CodeformerModelRecord)

query_esrgan

query_esrgan() -> ModelQuery[
    EsrganModelRecord, GenericFieldName
]

Return a typed query builder for ESRGAN models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_esrgan(self) -> ModelQuery[EsrganModelRecord, GenericFieldName]:
    """Return a typed query builder for ESRGAN models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.esrgan,
        record_type=EsrganModelRecord,
    )
    return build_query(records, EsrganModelRecord)

query_gfpgan

query_gfpgan() -> ModelQuery[
    GfpganModelRecord, GenericFieldName
]

Return a typed query builder for GFPGAN models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_gfpgan(self) -> ModelQuery[GfpganModelRecord, GenericFieldName]:
    """Return a typed query builder for GFPGAN models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.gfpgan,
        record_type=GfpganModelRecord,
    )
    return build_query(records, GfpganModelRecord)

query_safety_checker

query_safety_checker() -> ModelQuery[
    SafetyCheckerModelRecord, GenericFieldName
]

Return a typed query builder for safety checker models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_safety_checker(self) -> ModelQuery[SafetyCheckerModelRecord, GenericFieldName]:
    """Return a typed query builder for safety checker models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.safety_checker,
        record_type=SafetyCheckerModelRecord,
    )
    return build_query(records, SafetyCheckerModelRecord)

query_audio_generation

query_audio_generation() -> ModelQuery[
    AudioGenerationModelRecord, GenericFieldName
]

Return a typed query builder for audio generation models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_audio_generation(self) -> ModelQuery[AudioGenerationModelRecord, GenericFieldName]:
    """Return a typed query builder for audio generation models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.audio_generation,
        record_type=AudioGenerationModelRecord,
    )
    return build_query(records, AudioGenerationModelRecord)

query_video_generation

query_video_generation() -> ModelQuery[
    VideoGenerationModelRecord, GenericFieldName
]

Return a typed query builder for video generation models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_video_generation(self) -> ModelQuery[VideoGenerationModelRecord, GenericFieldName]:
    """Return a typed query builder for video generation models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.video_generation,
        record_type=VideoGenerationModelRecord,
    )
    return build_query(records, VideoGenerationModelRecord)

query_miscellaneous

query_miscellaneous() -> ModelQuery[
    MiscellaneousModelRecord, GenericFieldName
]

Return a typed query builder for miscellaneous models.

Source code in src/horde_model_reference/model_reference_manager.py
def query_miscellaneous(self) -> ModelQuery[MiscellaneousModelRecord, GenericFieldName]:
    """Return a typed query builder for miscellaneous models."""
    records = self._get_typed_models(
        MODEL_REFERENCE_CATEGORY.miscellaneous,
        record_type=MiscellaneousModelRecord,
    )
    return build_query(records, MiscellaneousModelRecord)
get_popular_models(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    limit: int = 10,
    sort_by: Literal[
        "worker_count",
        "usage_day",
        "usage_month",
        "usage_total",
    ] = "worker_count",
    include_workers: bool = False,
) -> list[PopularModelResult]

Return models ranked by live Horde popularity metrics.

Requires the Horde public API to be reachable. Only image_generation and text_generation categories have Horde API data; other categories return an empty list.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    Model category to rank.

  • limit (int, default: 10 ) –

    Maximum number of results.

  • sort_by (Literal['worker_count', 'usage_day', 'usage_month', 'usage_total'], default: 'worker_count' ) –

    Metric to rank by.

  • include_workers (bool, default: False ) –

    Whether to fetch per-worker details (slower).

Returns:

Source code in src/horde_model_reference/model_reference_manager.py
async def get_popular_models(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    limit: int = 10,
    sort_by: Literal["worker_count", "usage_day", "usage_month", "usage_total"] = "worker_count",
    include_workers: bool = False,
) -> list[PopularModelResult]:
    """Return models ranked by live Horde popularity metrics.

    Requires the Horde public API to be reachable. Only ``image_generation``
    and ``text_generation`` categories have Horde API data; other categories
    return an empty list.

    Args:
        category: Model category to rank.
        limit: Maximum number of results.
        sort_by: Metric to rank by.
        include_workers: Whether to fetch per-worker details (slower).

    Returns:
        A list of ``PopularModelResult`` sorted by the chosen metric.

    """
    from horde_model_reference.integrations.data_merger import (
        CombinedModelStatistics,
        PopularModelResult,
        merge_category_with_horde_data,
    )
    from horde_model_reference.integrations.horde_api_integration import HordeAPIIntegration

    horde_type: HordeModelType | None = self._CATEGORY_TO_HORDE_TYPE.get(category)
    if horde_type is None:
        return []

    model_reference = self.get_model_reference_or_none(category)
    if model_reference is None:
        return []

    horde_api = HordeAPIIntegration()
    indexed_status, indexed_stats, indexed_workers = await horde_api.get_combined_data_indexed(
        model_type=horde_type,
        include_workers=include_workers,
    )

    merged = merge_category_with_horde_data(
        model_names=model_reference.keys(),
        horde_status=indexed_status,
        horde_stats=indexed_stats,
        workers=indexed_workers,
    )

    def _sort_key(item: tuple[str, object]) -> float:
        _name, stats = item
        if not isinstance(stats, CombinedModelStatistics):
            return 0.0
        if sort_by == "worker_count":
            return float(stats.worker_count)
        if stats.usage_stats is None:
            return 0.0
        if sort_by == "usage_day":
            return float(stats.usage_stats.day)
        if sort_by == "usage_month":
            return float(stats.usage_stats.month)
        return float(stats.usage_stats.total)

    ranked = sorted(merged.items(), key=_sort_key, reverse=True)[:limit]

    results: list[PopularModelResult] = []
    for name, stats in ranked:
        record = model_reference.get(name)
        if record is None:
            continue
        results.append(
            PopularModelResult(
                name=name,
                record=record.model_dump(mode="json", exclude_none=True),
                stats=stats,
            )
        )

    return results

DeferredPrefetchHandle

Bases: Awaitable[None]

Encapsulates a deferred eager fetch for a ModelReferenceManager.

Source code in src/horde_model_reference/model_reference_manager.py
class DeferredPrefetchHandle(Awaitable[None]):
    """Encapsulates a deferred eager fetch for a `ModelReferenceManager`."""

    def __init__(
        self,
        *,
        manager: ModelReferenceManager,
        force_refresh: bool,
    ) -> None:
        """Store the manager reference and desired refresh semantics."""
        self._manager = manager
        self._force_refresh = force_refresh

    @property
    def force_refresh(self) -> bool:
        """Whether this handle forces a backend refresh when executed."""
        return self._force_refresh

    def run_sync(self) -> None:
        """Execute the deferred fetch synchronously on the current thread."""
        self._manager._fetch_from_backend_if_needed(force_refresh=self._force_refresh)

    async def run_async(
        self,
        *,
        httpx_client: httpx.AsyncClient | None = None,
    ) -> None:
        """Execute the deferred fetch asynchronously using the backend's async API."""
        await self._manager._fetch_from_backend_if_needed_async(
            force_refresh=self._force_refresh,
            httpx_client=httpx_client,
        )

    def __await__(self) -> Generator[Any]:
        """Allow awaiting the handle directly as sugar for run_async()."""
        return self.run_async().__await__()

_manager instance-attribute

_manager = manager

_force_refresh instance-attribute

_force_refresh = force_refresh

force_refresh property

force_refresh: bool

Whether this handle forces a backend refresh when executed.

__init__

__init__(
    *, manager: ModelReferenceManager, force_refresh: bool
) -> None

Store the manager reference and desired refresh semantics.

Source code in src/horde_model_reference/model_reference_manager.py
def __init__(
    self,
    *,
    manager: ModelReferenceManager,
    force_refresh: bool,
) -> None:
    """Store the manager reference and desired refresh semantics."""
    self._manager = manager
    self._force_refresh = force_refresh

run_sync

run_sync() -> None

Execute the deferred fetch synchronously on the current thread.

Source code in src/horde_model_reference/model_reference_manager.py
def run_sync(self) -> None:
    """Execute the deferred fetch synchronously on the current thread."""
    self._manager._fetch_from_backend_if_needed(force_refresh=self._force_refresh)

run_async async

run_async(
    *, httpx_client: AsyncClient | None = None
) -> None

Execute the deferred fetch asynchronously using the backend's async API.

Source code in src/horde_model_reference/model_reference_manager.py
async def run_async(
    self,
    *,
    httpx_client: httpx.AsyncClient | None = None,
) -> None:
    """Execute the deferred fetch asynchronously using the backend's async API."""
    await self._manager._fetch_from_backend_if_needed_async(
        force_refresh=self._force_refresh,
        httpx_client=httpx_client,
    )

__await__

__await__() -> Generator[Any]

Allow awaiting the handle directly as sugar for run_async().

Source code in src/horde_model_reference/model_reference_manager.py
def __await__(self) -> Generator[Any]:
    """Allow awaiting the handle directly as sugar for run_async()."""
    return self.run_async().__await__()