Skip to content

redis_backend

Redis-based distributed cache backend for PRIMARY mode.

This backend wraps a file-based backend and adds distributed caching via Redis. It's designed for PRIMARY mode multi-worker deployments where multiple FastAPI workers need to share cached model reference data.

RedisBackend

Bases: ModelReferenceBackend

Redis-backed distributed cache with file backend fallback.

Architecture: - File backend is the source of truth - Redis provides distributed caching across multiple PRIMARY workers - On cache miss, reads from file backend and populates Redis - Pub/sub notifies other workers of cache invalidations - Only usable in PRIMARY mode

Source code in src/horde_model_reference/backends/redis_backend.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
class RedisBackend(ModelReferenceBackend):
    """Redis-backed distributed cache with file backend fallback.

    Architecture:
    - File backend is the source of truth
    - Redis provides distributed caching across multiple PRIMARY workers
    - On cache miss, reads from file backend and populates Redis
    - Pub/sub notifies other workers of cache invalidations
    - Only usable in PRIMARY mode
    """

    _file_backend: FileSystemBackend
    _redis_settings: RedisSettings
    _ttl: int

    _lock: RLock
    _sync_redis: redis.Redis[bytes]

    _pubsub: redis.client.PubSub
    _pubsub_thread: threading.Thread | None
    _pubsub_running: bool

    def __init__(
        self,
        *,
        file_backend: FileSystemBackend,
        redis_settings: RedisSettings,
        cache_ttl_seconds: int | None = None,
    ) -> None:
        """Initialize Redis backend with filesystem backend.

        Args:
            file_backend: Filesystem backend to wrap.
            redis_settings: Redis connection settings.
            cache_ttl_seconds: TTL for cache entries.
                If None, uses redis_settings.ttl_seconds.

        Raises:
            ValueError: If file_backend is not in PRIMARY mode.

        """
        if file_backend.replicate_mode != ReplicateMode.PRIMARY:
            raise ValueError(
                "RedisBackend can only be used with a FileSystemBackend in PRIMARY mode. "
                "For REPLICA mode, use GitHubBackend or HTTPBackend."
            )

        super().__init__(mode=ReplicateMode.PRIMARY)

        self._file_backend = file_backend
        self._redis_settings = redis_settings
        self._ttl = redis_settings.ttl_seconds or cache_ttl_seconds or 60

        self._lock = RLock()

        try:
            self._sync_redis = self._create_sync_pool()
            logger.info(f"Redis connection established: {redis_settings.url}")
        except Exception as e:
            logger.error(f"Failed to connect to Redis: {e}")
            raise

        self._pubsub_running = False
        self._pubsub_thread: threading.Thread | None = None
        if redis_settings.use_pubsub:
            self._setup_pubsub()

    def _create_sync_pool(self) -> redis.Redis[bytes]:
        """Create synchronous Redis connection pool."""
        return redis.from_url(
            self._redis_settings.url,
            max_connections=self._redis_settings.pool_size,
            socket_timeout=self._redis_settings.socket_timeout,
            socket_connect_timeout=self._redis_settings.socket_connect_timeout,
            decode_responses=False,
        )

    def _category_key(self, category: MODEL_REFERENCE_CATEGORY) -> str:
        """Generate Redis key for a category."""
        return f"{self._redis_settings.key_prefix}:category:{category.value}"

    def _legacy_metadata_key(self, category: MODEL_REFERENCE_CATEGORY) -> str:
        """Generate Redis key for legacy metadata."""
        return f"{self._redis_settings.key_prefix}:meta:legacy:{category.value}"

    def _v2_metadata_key(self, category: MODEL_REFERENCE_CATEGORY) -> str:
        """Generate Redis key for v2 metadata."""
        return f"{self._redis_settings.key_prefix}:meta:v2:{category.value}"

    def _invalidation_channel(self) -> str:
        """Get the Redis pub/sub channel for invalidations."""
        return f"{self._redis_settings.key_prefix}:invalidate"

    def _setup_pubsub(self) -> None:
        """Set up pub/sub for cache invalidation events."""
        try:
            self._pubsub = self._sync_redis.pubsub(ignore_subscribe_messages=True)
            channel = self._invalidation_channel()
            self._pubsub.subscribe(channel)

            self._pubsub_running = True
            self._pubsub_thread = threading.Thread(
                target=self._listen_for_invalidations,
                daemon=True,
                name="RedisBackend-PubSub",
            )
            self._pubsub_thread.start()
            logger.info(f"Redis pub/sub listening on {channel}")
        except Exception as e:
            logger.warning(f"Failed to setup Redis pub/sub: {e}")
            self._pubsub_running = False

    def _listen_for_invalidations(self) -> None:
        """Listen for cache invalidation events from other workers."""
        logger.debug("Redis pub/sub listener started")
        try:
            messages = self._pubsub.listen()  # type: ignore[no-untyped-call]
            if not isinstance(messages, Iterable):
                raise ValueError("Expected iterable from pubsub.listen()")
            for message in messages:
                if not self._pubsub_running:
                    break

                if message["type"] == "message":
                    try:
                        data = message["data"]
                        category_str = data.decode("utf-8") if isinstance(data, bytes) else str(data)
                        category = MODEL_REFERENCE_CATEGORY(category_str)
                        logger.debug(f"Received invalidation for {category} from another worker")

                        key = self._category_key(category)
                        try:
                            self._sync_redis.delete(key)
                            logger.debug(f"Invalidated local Redis cache for {category}")
                        except Exception as delete_error:
                            logger.warning(f"Failed to invalidate Redis cache for {category}: {delete_error}")

                        self._file_backend.mark_stale(category)
                        self._notify_invalidation(category)

                    except Exception as e:
                        logger.warning(f"Failed to process invalidation message: {e}")
        except Exception as e:
            logger.error(f"Redis pub/sub listener error: {e}")
        finally:
            logger.debug("Redis pub/sub listener stopped")

    def _retry_redis_operation(
        self,
        operation: Callable[..., str | bool | bytes | int | None],
        *args: str | int | float | None,
        **kwargs: str | int | float | None,
    ) -> str | bool | int | bytes | None:
        """Retry a Redis operation with full-jitter exponential backoff."""

        @retry(
            stop=stop_after_attempt(self._redis_settings.retry_max_attempts),
            wait=wait_random_exponential(min=self._redis_settings.retry_backoff_seconds, max=30),
            retry=retry_if_exception_type(redis.ConnectionError),
            reraise=True,
        )
        def _execute() -> str | bool | bytes | int | None:
            return operation(*args, **kwargs)

        return _execute()

    @override
    def fetch_category(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Fetch from Redis cache, fallback to file backend on miss.

        Args:
            category: The category to fetch.
            force_refresh: If True, bypass Redis cache and fetch from files.

        Returns:
            dict[str, Any] | None: The model reference data.

        """
        key = self._category_key(category)
        data: dict[str, Any] | None = None

        if not force_refresh:
            try:
                cached = self._retry_redis_operation(self._sync_redis.get, key)
                if cached:
                    if not isinstance(cached, str):
                        raise ValueError("Expected str from Redis")

                    data = json.loads(cached)
                    logger.debug(f"Redis cache hit for {category}")
                    return data
                logger.debug(f"Redis cache miss for {category}")
            except Exception as e:
                logger.warning(f"Redis fetch failed for {category}, falling back to file: {e}")

        data = self._file_backend.fetch_category(category, force_refresh=force_refresh)

        if data is not None:
            try:
                json_data = json.dumps(data)
                self._retry_redis_operation(self._sync_redis.setex, key, self._ttl, json_data)
                logger.debug(f"Populated Redis cache for {category}")
            except Exception as e:
                logger.warning(f"Failed to cache {category} in Redis: {e}")

        return data

    @override
    def fetch_all_categories(
        self,
        *,
        force_refresh: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Fetch all categories, using Redis cache where available."""
        result: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}

        for category in MODEL_REFERENCE_CATEGORY:
            result[category] = self.fetch_category(category, force_refresh=force_refresh)

        return result

    async def fetch_category_async(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        *,
        httpx_client: httpx.AsyncClient | None = None,
        force_refresh: bool = False,
    ) -> dict[str, Any] | None:
        """Asynchronously fetch from Redis, fallback to file backend.

        Args:
            category: The category to fetch.
            httpx_client: Optional shared HTTPX client for file backend.
            force_refresh: If True, bypass Redis cache.

        Returns:
            dict[str, Any] | None: The model reference data.

        """
        key = self._category_key(category)
        data: dict[str, Any] | None = None

        if not force_refresh:
            cached: bytes | None = None
            try:
                async with redis.asyncio.from_url(
                    self._redis_settings.url,
                    max_connections=self._redis_settings.pool_size,
                    socket_timeout=self._redis_settings.socket_timeout,
                    socket_connect_timeout=self._redis_settings.socket_connect_timeout,
                    decode_responses=False,
                ) as async_redis:
                    cached = await async_redis.get(key)

                if cached:
                    data = json.loads(cached)
                    logger.debug(f"Redis cache hit for {category} (async)")
                    return data
                logger.debug(f"Redis cache miss for {category} (async)")
            except Exception as e:
                logger.warning(f"Async Redis fetch failed for {category}, falling back to file: {e}")

        data = await self._file_backend.fetch_category_async(
            category,
            httpx_client=httpx_client,
            force_refresh=force_refresh,
        )

        if data is not None:
            try:
                async with redis.asyncio.from_url(
                    self._redis_settings.url,
                    max_connections=self._redis_settings.pool_size,
                    socket_timeout=self._redis_settings.socket_timeout,
                    socket_connect_timeout=self._redis_settings.socket_connect_timeout,
                    decode_responses=False,
                ) as async_redis:
                    json_data = json.dumps(data)
                    await async_redis.setex(key, self._ttl, json_data)
                logger.debug(f"Populated Redis cache for {category} (async)")
            except Exception as e:
                logger.warning(f"Failed to cache {category} in Redis (async): {e}")

        return data

    async def fetch_all_categories_async(
        self,
        *,
        httpx_client: httpx.AsyncClient | None = None,
        force_refresh: bool = False,
    ) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
        """Asynchronously fetch all categories from Redis with PRIMARY API fallback."""
        tasks = [
            self.fetch_category_async(
                category,
                httpx_client=httpx_client,
                force_refresh=force_refresh,
            )
            for category in MODEL_REFERENCE_CATEGORY
        ]

        results = await asyncio.gather(*tasks)

        return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))

    @override
    def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
        """Check if category needs refresh (delegates to file backend)."""
        return self._file_backend.needs_refresh(category)

    @override
    def _mark_stale_impl(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Mark category as stale and invalidate Redis cache.

        Also publishes invalidation event to notify other workers.
        """
        key = self._category_key(category)

        if self._redis_settings.use_pubsub:
            try:
                channel = self._invalidation_channel()
                self._retry_redis_operation(
                    self._sync_redis.publish,
                    channel,
                    category.value,
                )
                logger.debug(f"Published invalidation for {category}")
            except Exception as e:
                logger.warning(f"Failed to publish invalidation for {category}: {e}")

        try:
            self._retry_redis_operation(self._sync_redis.delete, key)
            logger.debug(f"Invalidated Redis cache for {category}")
        except Exception as e:
            logger.warning(f"Failed to invalidate Redis cache for {category}: {e}")

        self._file_backend.mark_stale(category)

    @override
    def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
        """Get file path (delegates to file backend)."""
        return self._file_backend.get_category_file_path(category)

    @override
    def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
        """Get all file paths (delegates to file backend)."""
        return self._file_backend.get_all_category_file_paths()

    @override
    def get_legacy_json(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        redownload: bool = False,
    ) -> dict[str, Any] | None:
        """Get legacy JSON (delegates to file backend)."""
        return self._file_backend.get_legacy_json(category, redownload=redownload)

    @override
    def get_legacy_json_string(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        redownload: bool = False,
    ) -> str | None:
        """Get legacy JSON string (delegates to file backend)."""
        return self._file_backend.get_legacy_json_string(category, redownload=redownload)

    @override
    def get_replicate_mode(self) -> ReplicateMode:
        """Get replication mode (always PRIMARY for Redis backend)."""
        return self._replicate_mode

    @override
    def supports_writes(self) -> bool:
        """Check if backend supports writes (delegates to file backend)."""
        return self._file_backend.supports_writes()

    @override
    def supports_metadata(self) -> bool:
        """Check if backend supports metadata tracking (delegates to file backend).

        Returns:
            bool: True if file backend supports metadata.

        """
        return self._file_backend.supports_metadata()

    @override
    def supports_cache_warming(self) -> bool:
        """Check if backend supports cache warming (True for Redis).

        Returns:
            bool: Always True.

        """
        return True

    @override
    def supports_health_checks(self) -> bool:
        """Check if backend supports health checks (True for Redis).

        Returns:
            bool: Always True.

        """
        return True

    @override
    def supports_statistics(self) -> bool:
        """Check if backend supports statistics (True for Redis).

        Returns:
            bool: Always True.

        """
        return True

    @override
    def update_model(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        record_dict: dict[str, Any],
        *,
        logical_user_id: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Update model via file backend, then invalidate Redis cache."""
        self._file_backend.update_model(
            category,
            model_name,
            record_dict,
            logical_user_id=logical_user_id,
            request_id=request_id,
        )

        self.mark_stale(category)

    @override
    def delete_model(
        self,
        category: MODEL_REFERENCE_CATEGORY,
        model_name: str,
        *,
        logical_user_id: str | None = None,
        request_id: str | None = None,
    ) -> None:
        """Delete model via file backend, then invalidate Redis cache."""
        self._file_backend.delete_model(
            category,
            model_name,
            logical_user_id=logical_user_id,
            request_id=request_id,
        )

        self.mark_stale(category)

    @override
    def warm_cache(self) -> None:
        """Pre-populate Redis cache from files on startup."""
        logger.info("Warming Redis cache...")
        for category in MODEL_REFERENCE_CATEGORY:
            try:
                self.fetch_category(category, force_refresh=True)
            except Exception as e:
                logger.warning(f"Failed to warm cache for {category}: {e}")
        logger.info("Redis cache warming complete")

    @override
    async def warm_cache_async(self) -> None:
        """Asynchronously pre-populate Redis cache."""
        logger.info("Warming Redis cache (async)...")
        async with httpx.AsyncClient() as client:
            tasks = [
                self.fetch_category_async(category, httpx_client=client, force_refresh=True)
                for category in MODEL_REFERENCE_CATEGORY
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            for category, result in zip(MODEL_REFERENCE_CATEGORY, results, strict=False):
                if isinstance(result, Exception):
                    logger.warning(f"Failed to warm cache for {category}: {result}")

        logger.info("Redis cache warming complete (async)")

    @override
    def health_check(self) -> bool:
        """Check Redis connectivity."""
        try:
            self._sync_redis.ping()
            return True
        except Exception as e:
            logger.warning(f"Redis health check failed: {e}")
            return False

    @override
    def get_statistics(self) -> dict[str, Any]:
        """Get Redis cache statistics.

        Returns:
            dict containing:
                - connected: Whether Redis is connected
                - keys_count: Number of keys in Redis database
                - total_connections: Total connections received
                - total_commands: Total commands processed
                - memory_used_bytes: Memory used in bytes
                - memory_used_human: Human-readable memory usage

        """
        try:
            info = self._sync_redis.info("stats")
            memory = self._sync_redis.info("memory")

            return {
                "connected": True,
                "keys_count": self._sync_redis.dbsize(),
                "total_connections": info.get("total_connections_received", 0),
                "total_commands": info.get("total_commands_processed", 0),
                "memory_used_bytes": memory.get("used_memory", 0),
                "memory_used_human": memory.get("used_memory_human", "unknown"),
            }
        except Exception as e:
            logger.warning(f"Failed to get Redis stats: {e}")
            return {"connected": False, "error": str(e)}

    @override
    def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Get legacy format metadata, using Redis cache.

        Args:
            category: The category to get metadata for.

        Returns:
            CategoryMetadata | None: The legacy metadata, or None if not available.

        """
        key = self._legacy_metadata_key(category)

        try:
            cached = self._retry_redis_operation(self._sync_redis.get, key)
            if cached:
                if not isinstance(cached, str):
                    raise ValueError("Expected str from Redis")
                data = json.loads(cached)
                return CategoryMetadata(**data)
        except Exception as e:
            logger.warning(f"Redis fetch failed for legacy metadata {category}, falling back to file: {e}")

        # Fall back to file backend
        metadata = self._file_backend.get_legacy_metadata(category)

        # Cache the result in Redis
        if metadata is not None:
            try:
                json_data = json.dumps(metadata.model_dump(mode="json"))
                self._retry_redis_operation(self._sync_redis.setex, key, self._ttl, json_data)
            except Exception as e:
                logger.warning(f"Failed to cache legacy metadata for {category} in Redis: {e}")

        return metadata

    @override
    async def get_legacy_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Asynchronously get legacy format metadata, using Redis cache.

        Args:
            category: The category to get metadata for.

        Returns:
            CategoryMetadata | None: The legacy metadata, or None if not available.

        """
        # For now, delegate to sync version
        return self.get_legacy_metadata(category)

    @override
    def get_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Get v2 format metadata, using Redis cache.

        Args:
            category: The category to get metadata for.

        Returns:
            CategoryMetadata | None: The v2 metadata, or None if not available.

        """
        key = self._v2_metadata_key(category)

        try:
            cached = self._retry_redis_operation(self._sync_redis.get, key)
            if cached:
                if not isinstance(cached, str):
                    raise ValueError("Expected str from Redis")
                data = json.loads(cached)
                return CategoryMetadata(**data)
        except Exception as e:
            logger.warning(f"Redis fetch failed for v2 metadata {category}, falling back to file: {e}")

        # Fall back to file backend
        metadata = self._file_backend.get_metadata(category)

        # Cache the result in Redis
        if metadata is not None:
            try:
                json_data = json.dumps(metadata.model_dump(mode="json"))
                self._retry_redis_operation(self._sync_redis.setex, key, self._ttl, json_data)
            except Exception as e:
                logger.warning(f"Failed to cache v2 metadata for {category} in Redis: {e}")

        return metadata

    @override
    async def get_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
        """Asynchronously get v2 format metadata, using Redis cache.

        Args:
            category: The category to get metadata for.

        Returns:
            CategoryMetadata | None: The v2 metadata, or None if not available.

        """
        # For now, delegate to sync version
        return self.get_metadata(category)

    @override
    def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Get legacy format metadata for all categories.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

        """
        result = {}
        for category in MODEL_REFERENCE_CATEGORY:
            metadata = self.get_legacy_metadata(category)
            if metadata is not None:
                result[category] = metadata
        return result

    @override
    async def get_all_legacy_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Asynchronously get legacy format metadata for all categories.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

        """
        return self.get_all_legacy_metadata()

    @override
    def get_all_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Get v2 format metadata for all categories.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

        """
        result = {}
        for category in MODEL_REFERENCE_CATEGORY:
            metadata = self.get_metadata(category)
            if metadata is not None:
                result[category] = metadata
        return result

    @override
    async def get_all_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
        """Asynchronously get v2 format metadata for all categories.

        Returns:
            dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

        """
        return self.get_all_metadata()

    def __del__(self) -> None:
        """Clean up Redis connections on deletion."""
        if hasattr(self, "_pubsub_running"):
            self._pubsub_running = False

        if hasattr(self, "_pubsub"):
            with contextlib.suppress(Exception):
                self._pubsub.close()

        if hasattr(self, "_sync_redis"):
            with contextlib.suppress(Exception):
                self._sync_redis.close()

_pubsub instance-attribute

_pubsub: PubSub

_file_backend instance-attribute

_file_backend: FileSystemBackend = file_backend

_redis_settings instance-attribute

_redis_settings: RedisSettings = redis_settings

_ttl instance-attribute

_ttl: int = ttl_seconds or cache_ttl_seconds or 60

_lock instance-attribute

_lock: RLock = RLock()

_sync_redis instance-attribute

_sync_redis: Redis[bytes] = _create_sync_pool()

_pubsub_running instance-attribute

_pubsub_running: bool = False

_pubsub_thread instance-attribute

_pubsub_thread: Thread | None = None

_replicate_mode class-attribute instance-attribute

_replicate_mode = mode

_invalidation_callbacks instance-attribute

_invalidation_callbacks: list[
    Callable[[MODEL_REFERENCE_CATEGORY], None]
] = []

replicate_mode property

replicate_mode: ReplicateMode

Get the replicate mode of this backend.

__init__

__init__(
    *,
    file_backend: FileSystemBackend,
    redis_settings: RedisSettings,
    cache_ttl_seconds: int | None = None,
) -> None

Initialize Redis backend with filesystem backend.

Parameters:

  • file_backend (FileSystemBackend) –

    Filesystem backend to wrap.

  • redis_settings (RedisSettings) –

    Redis connection settings.

  • cache_ttl_seconds (int | None, default: None ) –

    TTL for cache entries. If None, uses redis_settings.ttl_seconds.

Raises:

  • ValueError

    If file_backend is not in PRIMARY mode.

Source code in src/horde_model_reference/backends/redis_backend.py
def __init__(
    self,
    *,
    file_backend: FileSystemBackend,
    redis_settings: RedisSettings,
    cache_ttl_seconds: int | None = None,
) -> None:
    """Initialize Redis backend with filesystem backend.

    Args:
        file_backend: Filesystem backend to wrap.
        redis_settings: Redis connection settings.
        cache_ttl_seconds: TTL for cache entries.
            If None, uses redis_settings.ttl_seconds.

    Raises:
        ValueError: If file_backend is not in PRIMARY mode.

    """
    if file_backend.replicate_mode != ReplicateMode.PRIMARY:
        raise ValueError(
            "RedisBackend can only be used with a FileSystemBackend in PRIMARY mode. "
            "For REPLICA mode, use GitHubBackend or HTTPBackend."
        )

    super().__init__(mode=ReplicateMode.PRIMARY)

    self._file_backend = file_backend
    self._redis_settings = redis_settings
    self._ttl = redis_settings.ttl_seconds or cache_ttl_seconds or 60

    self._lock = RLock()

    try:
        self._sync_redis = self._create_sync_pool()
        logger.info(f"Redis connection established: {redis_settings.url}")
    except Exception as e:
        logger.error(f"Failed to connect to Redis: {e}")
        raise

    self._pubsub_running = False
    self._pubsub_thread: threading.Thread | None = None
    if redis_settings.use_pubsub:
        self._setup_pubsub()

_create_sync_pool

_create_sync_pool() -> redis.Redis[bytes]

Create synchronous Redis connection pool.

Source code in src/horde_model_reference/backends/redis_backend.py
def _create_sync_pool(self) -> redis.Redis[bytes]:
    """Create synchronous Redis connection pool."""
    return redis.from_url(
        self._redis_settings.url,
        max_connections=self._redis_settings.pool_size,
        socket_timeout=self._redis_settings.socket_timeout,
        socket_connect_timeout=self._redis_settings.socket_connect_timeout,
        decode_responses=False,
    )

_category_key

_category_key(category: MODEL_REFERENCE_CATEGORY) -> str

Generate Redis key for a category.

Source code in src/horde_model_reference/backends/redis_backend.py
def _category_key(self, category: MODEL_REFERENCE_CATEGORY) -> str:
    """Generate Redis key for a category."""
    return f"{self._redis_settings.key_prefix}:category:{category.value}"

_legacy_metadata_key

_legacy_metadata_key(
    category: MODEL_REFERENCE_CATEGORY,
) -> str

Generate Redis key for legacy metadata.

Source code in src/horde_model_reference/backends/redis_backend.py
def _legacy_metadata_key(self, category: MODEL_REFERENCE_CATEGORY) -> str:
    """Generate Redis key for legacy metadata."""
    return f"{self._redis_settings.key_prefix}:meta:legacy:{category.value}"

_v2_metadata_key

_v2_metadata_key(category: MODEL_REFERENCE_CATEGORY) -> str

Generate Redis key for v2 metadata.

Source code in src/horde_model_reference/backends/redis_backend.py
def _v2_metadata_key(self, category: MODEL_REFERENCE_CATEGORY) -> str:
    """Generate Redis key for v2 metadata."""
    return f"{self._redis_settings.key_prefix}:meta:v2:{category.value}"

_invalidation_channel

_invalidation_channel() -> str

Get the Redis pub/sub channel for invalidations.

Source code in src/horde_model_reference/backends/redis_backend.py
def _invalidation_channel(self) -> str:
    """Get the Redis pub/sub channel for invalidations."""
    return f"{self._redis_settings.key_prefix}:invalidate"

_setup_pubsub

_setup_pubsub() -> None

Set up pub/sub for cache invalidation events.

Source code in src/horde_model_reference/backends/redis_backend.py
def _setup_pubsub(self) -> None:
    """Set up pub/sub for cache invalidation events."""
    try:
        self._pubsub = self._sync_redis.pubsub(ignore_subscribe_messages=True)
        channel = self._invalidation_channel()
        self._pubsub.subscribe(channel)

        self._pubsub_running = True
        self._pubsub_thread = threading.Thread(
            target=self._listen_for_invalidations,
            daemon=True,
            name="RedisBackend-PubSub",
        )
        self._pubsub_thread.start()
        logger.info(f"Redis pub/sub listening on {channel}")
    except Exception as e:
        logger.warning(f"Failed to setup Redis pub/sub: {e}")
        self._pubsub_running = False

_listen_for_invalidations

_listen_for_invalidations() -> None

Listen for cache invalidation events from other workers.

Source code in src/horde_model_reference/backends/redis_backend.py
def _listen_for_invalidations(self) -> None:
    """Listen for cache invalidation events from other workers."""
    logger.debug("Redis pub/sub listener started")
    try:
        messages = self._pubsub.listen()  # type: ignore[no-untyped-call]
        if not isinstance(messages, Iterable):
            raise ValueError("Expected iterable from pubsub.listen()")
        for message in messages:
            if not self._pubsub_running:
                break

            if message["type"] == "message":
                try:
                    data = message["data"]
                    category_str = data.decode("utf-8") if isinstance(data, bytes) else str(data)
                    category = MODEL_REFERENCE_CATEGORY(category_str)
                    logger.debug(f"Received invalidation for {category} from another worker")

                    key = self._category_key(category)
                    try:
                        self._sync_redis.delete(key)
                        logger.debug(f"Invalidated local Redis cache for {category}")
                    except Exception as delete_error:
                        logger.warning(f"Failed to invalidate Redis cache for {category}: {delete_error}")

                    self._file_backend.mark_stale(category)
                    self._notify_invalidation(category)

                except Exception as e:
                    logger.warning(f"Failed to process invalidation message: {e}")
    except Exception as e:
        logger.error(f"Redis pub/sub listener error: {e}")
    finally:
        logger.debug("Redis pub/sub listener stopped")

_retry_redis_operation

_retry_redis_operation(
    operation: Callable[
        ..., str | bool | bytes | int | None
    ],
    *args: str | int | float | None,
    **kwargs: str | int | float | None,
) -> str | bool | int | bytes | None

Retry a Redis operation with full-jitter exponential backoff.

Source code in src/horde_model_reference/backends/redis_backend.py
def _retry_redis_operation(
    self,
    operation: Callable[..., str | bool | bytes | int | None],
    *args: str | int | float | None,
    **kwargs: str | int | float | None,
) -> str | bool | int | bytes | None:
    """Retry a Redis operation with full-jitter exponential backoff."""

    @retry(
        stop=stop_after_attempt(self._redis_settings.retry_max_attempts),
        wait=wait_random_exponential(min=self._redis_settings.retry_backoff_seconds, max=30),
        retry=retry_if_exception_type(redis.ConnectionError),
        reraise=True,
    )
    def _execute() -> str | bool | bytes | int | None:
        return operation(*args, **kwargs)

    return _execute()

fetch_category

fetch_category(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Fetch from Redis cache, fallback to file backend on miss.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • force_refresh (bool, default: False ) –

    If True, bypass Redis cache and fetch from files.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The model reference data.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def fetch_category(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Fetch from Redis cache, fallback to file backend on miss.

    Args:
        category: The category to fetch.
        force_refresh: If True, bypass Redis cache and fetch from files.

    Returns:
        dict[str, Any] | None: The model reference data.

    """
    key = self._category_key(category)
    data: dict[str, Any] | None = None

    if not force_refresh:
        try:
            cached = self._retry_redis_operation(self._sync_redis.get, key)
            if cached:
                if not isinstance(cached, str):
                    raise ValueError("Expected str from Redis")

                data = json.loads(cached)
                logger.debug(f"Redis cache hit for {category}")
                return data
            logger.debug(f"Redis cache miss for {category}")
        except Exception as e:
            logger.warning(f"Redis fetch failed for {category}, falling back to file: {e}")

    data = self._file_backend.fetch_category(category, force_refresh=force_refresh)

    if data is not None:
        try:
            json_data = json.dumps(data)
            self._retry_redis_operation(self._sync_redis.setex, key, self._ttl, json_data)
            logger.debug(f"Populated Redis cache for {category}")
        except Exception as e:
            logger.warning(f"Failed to cache {category} in Redis: {e}")

    return data

fetch_all_categories

fetch_all_categories(
    *, force_refresh: bool = False
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Fetch all categories, using Redis cache where available.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def fetch_all_categories(
    self,
    *,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Fetch all categories, using Redis cache where available."""
    result: dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None] = {}

    for category in MODEL_REFERENCE_CATEGORY:
        result[category] = self.fetch_category(category, force_refresh=force_refresh)

    return result

fetch_category_async async

fetch_category_async(
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None

Asynchronously fetch from Redis, fallback to file backend.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to fetch.

  • httpx_client (AsyncClient | None, default: None ) –

    Optional shared HTTPX client for file backend.

  • force_refresh (bool, default: False ) –

    If True, bypass Redis cache.

Returns:

  • dict[str, Any] | None

    dict[str, Any] | None: The model reference data.

Source code in src/horde_model_reference/backends/redis_backend.py
async def fetch_category_async(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[str, Any] | None:
    """Asynchronously fetch from Redis, fallback to file backend.

    Args:
        category: The category to fetch.
        httpx_client: Optional shared HTTPX client for file backend.
        force_refresh: If True, bypass Redis cache.

    Returns:
        dict[str, Any] | None: The model reference data.

    """
    key = self._category_key(category)
    data: dict[str, Any] | None = None

    if not force_refresh:
        cached: bytes | None = None
        try:
            async with redis.asyncio.from_url(
                self._redis_settings.url,
                max_connections=self._redis_settings.pool_size,
                socket_timeout=self._redis_settings.socket_timeout,
                socket_connect_timeout=self._redis_settings.socket_connect_timeout,
                decode_responses=False,
            ) as async_redis:
                cached = await async_redis.get(key)

            if cached:
                data = json.loads(cached)
                logger.debug(f"Redis cache hit for {category} (async)")
                return data
            logger.debug(f"Redis cache miss for {category} (async)")
        except Exception as e:
            logger.warning(f"Async Redis fetch failed for {category}, falling back to file: {e}")

    data = await self._file_backend.fetch_category_async(
        category,
        httpx_client=httpx_client,
        force_refresh=force_refresh,
    )

    if data is not None:
        try:
            async with redis.asyncio.from_url(
                self._redis_settings.url,
                max_connections=self._redis_settings.pool_size,
                socket_timeout=self._redis_settings.socket_timeout,
                socket_connect_timeout=self._redis_settings.socket_connect_timeout,
                decode_responses=False,
            ) as async_redis:
                json_data = json.dumps(data)
                await async_redis.setex(key, self._ttl, json_data)
            logger.debug(f"Populated Redis cache for {category} (async)")
        except Exception as e:
            logger.warning(f"Failed to cache {category} in Redis (async): {e}")

    return data

fetch_all_categories_async async

fetch_all_categories_async(
    *,
    httpx_client: AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]

Asynchronously fetch all categories from Redis with PRIMARY API fallback.

Source code in src/horde_model_reference/backends/redis_backend.py
async def fetch_all_categories_async(
    self,
    *,
    httpx_client: httpx.AsyncClient | None = None,
    force_refresh: bool = False,
) -> dict[MODEL_REFERENCE_CATEGORY, dict[str, Any] | None]:
    """Asynchronously fetch all categories from Redis with PRIMARY API fallback."""
    tasks = [
        self.fetch_category_async(
            category,
            httpx_client=httpx_client,
            force_refresh=force_refresh,
        )
        for category in MODEL_REFERENCE_CATEGORY
    ]

    results = await asyncio.gather(*tasks)

    return dict(zip(MODEL_REFERENCE_CATEGORY, results, strict=False))

needs_refresh

needs_refresh(category: MODEL_REFERENCE_CATEGORY) -> bool

Check if category needs refresh (delegates to file backend).

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def needs_refresh(self, category: MODEL_REFERENCE_CATEGORY) -> bool:
    """Check if category needs refresh (delegates to file backend)."""
    return self._file_backend.needs_refresh(category)

_mark_stale_impl

_mark_stale_impl(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Mark category as stale and invalidate Redis cache.

Also publishes invalidation event to notify other workers.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def _mark_stale_impl(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Mark category as stale and invalidate Redis cache.

    Also publishes invalidation event to notify other workers.
    """
    key = self._category_key(category)

    if self._redis_settings.use_pubsub:
        try:
            channel = self._invalidation_channel()
            self._retry_redis_operation(
                self._sync_redis.publish,
                channel,
                category.value,
            )
            logger.debug(f"Published invalidation for {category}")
        except Exception as e:
            logger.warning(f"Failed to publish invalidation for {category}: {e}")

    try:
        self._retry_redis_operation(self._sync_redis.delete, key)
        logger.debug(f"Invalidated Redis cache for {category}")
    except Exception as e:
        logger.warning(f"Failed to invalidate Redis cache for {category}: {e}")

    self._file_backend.mark_stale(category)

get_category_file_path

get_category_file_path(
    category: MODEL_REFERENCE_CATEGORY,
) -> Path | None

Get file path (delegates to file backend).

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_category_file_path(self, category: MODEL_REFERENCE_CATEGORY) -> Path | None:
    """Get file path (delegates to file backend)."""
    return self._file_backend.get_category_file_path(category)

get_all_category_file_paths

get_all_category_file_paths() -> dict[
    MODEL_REFERENCE_CATEGORY, Path | None
]

Get all file paths (delegates to file backend).

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_all_category_file_paths(self) -> dict[MODEL_REFERENCE_CATEGORY, Path | None]:
    """Get all file paths (delegates to file backend)."""
    return self._file_backend.get_all_category_file_paths()

get_legacy_json

get_legacy_json(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None

Get legacy JSON (delegates to file backend).

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_legacy_json(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> dict[str, Any] | None:
    """Get legacy JSON (delegates to file backend)."""
    return self._file_backend.get_legacy_json(category, redownload=redownload)

get_legacy_json_string

get_legacy_json_string(
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None

Get legacy JSON string (delegates to file backend).

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_legacy_json_string(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    redownload: bool = False,
) -> str | None:
    """Get legacy JSON string (delegates to file backend)."""
    return self._file_backend.get_legacy_json_string(category, redownload=redownload)

get_replicate_mode

get_replicate_mode() -> ReplicateMode

Get replication mode (always PRIMARY for Redis backend).

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_replicate_mode(self) -> ReplicateMode:
    """Get replication mode (always PRIMARY for Redis backend)."""
    return self._replicate_mode

supports_writes

supports_writes() -> bool

Check if backend supports writes (delegates to file backend).

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def supports_writes(self) -> bool:
    """Check if backend supports writes (delegates to file backend)."""
    return self._file_backend.supports_writes()

supports_metadata

supports_metadata() -> bool

Check if backend supports metadata tracking (delegates to file backend).

Returns:

  • bool ( bool ) –

    True if file backend supports metadata.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def supports_metadata(self) -> bool:
    """Check if backend supports metadata tracking (delegates to file backend).

    Returns:
        bool: True if file backend supports metadata.

    """
    return self._file_backend.supports_metadata()

supports_cache_warming

supports_cache_warming() -> bool

Check if backend supports cache warming (True for Redis).

Returns:

  • bool ( bool ) –

    Always True.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def supports_cache_warming(self) -> bool:
    """Check if backend supports cache warming (True for Redis).

    Returns:
        bool: Always True.

    """
    return True

supports_health_checks

supports_health_checks() -> bool

Check if backend supports health checks (True for Redis).

Returns:

  • bool ( bool ) –

    Always True.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def supports_health_checks(self) -> bool:
    """Check if backend supports health checks (True for Redis).

    Returns:
        bool: Always True.

    """
    return True

supports_statistics

supports_statistics() -> bool

Check if backend supports statistics (True for Redis).

Returns:

  • bool ( bool ) –

    Always True.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def supports_statistics(self) -> bool:
    """Check if backend supports statistics (True for Redis).

    Returns:
        bool: Always True.

    """
    return True

update_model

update_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update model via file backend, then invalidate Redis cache.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def update_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update model via file backend, then invalidate Redis cache."""
    self._file_backend.update_model(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

    self.mark_stale(category)

delete_model

delete_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete model via file backend, then invalidate Redis cache.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def delete_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete model via file backend, then invalidate Redis cache."""
    self._file_backend.delete_model(
        category,
        model_name,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

    self.mark_stale(category)

warm_cache

warm_cache() -> None

Pre-populate Redis cache from files on startup.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def warm_cache(self) -> None:
    """Pre-populate Redis cache from files on startup."""
    logger.info("Warming Redis cache...")
    for category in MODEL_REFERENCE_CATEGORY:
        try:
            self.fetch_category(category, force_refresh=True)
        except Exception as e:
            logger.warning(f"Failed to warm cache for {category}: {e}")
    logger.info("Redis cache warming complete")

warm_cache_async async

warm_cache_async() -> None

Asynchronously pre-populate Redis cache.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
async def warm_cache_async(self) -> None:
    """Asynchronously pre-populate Redis cache."""
    logger.info("Warming Redis cache (async)...")
    async with httpx.AsyncClient() as client:
        tasks = [
            self.fetch_category_async(category, httpx_client=client, force_refresh=True)
            for category in MODEL_REFERENCE_CATEGORY
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for category, result in zip(MODEL_REFERENCE_CATEGORY, results, strict=False):
            if isinstance(result, Exception):
                logger.warning(f"Failed to warm cache for {category}: {result}")

    logger.info("Redis cache warming complete (async)")

health_check

health_check() -> bool

Check Redis connectivity.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def health_check(self) -> bool:
    """Check Redis connectivity."""
    try:
        self._sync_redis.ping()
        return True
    except Exception as e:
        logger.warning(f"Redis health check failed: {e}")
        return False

get_statistics

get_statistics() -> dict[str, Any]

Get Redis cache statistics.

Returns:

  • dict[str, Any]

    dict containing: - connected: Whether Redis is connected - keys_count: Number of keys in Redis database - total_connections: Total connections received - total_commands: Total commands processed - memory_used_bytes: Memory used in bytes - memory_used_human: Human-readable memory usage

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_statistics(self) -> dict[str, Any]:
    """Get Redis cache statistics.

    Returns:
        dict containing:
            - connected: Whether Redis is connected
            - keys_count: Number of keys in Redis database
            - total_connections: Total connections received
            - total_commands: Total commands processed
            - memory_used_bytes: Memory used in bytes
            - memory_used_human: Human-readable memory usage

    """
    try:
        info = self._sync_redis.info("stats")
        memory = self._sync_redis.info("memory")

        return {
            "connected": True,
            "keys_count": self._sync_redis.dbsize(),
            "total_connections": info.get("total_connections_received", 0),
            "total_commands": info.get("total_commands_processed", 0),
            "memory_used_bytes": memory.get("used_memory", 0),
            "memory_used_human": memory.get("used_memory_human", "unknown"),
        }
    except Exception as e:
        logger.warning(f"Failed to get Redis stats: {e}")
        return {"connected": False, "error": str(e)}

get_legacy_metadata

get_legacy_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get legacy format metadata, using Redis cache.

Parameters:

Returns:

  • CategoryMetadata

    CategoryMetadata | None: The legacy metadata, or None if not available.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_legacy_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get legacy format metadata, using Redis cache.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata | None: The legacy metadata, or None if not available.

    """
    key = self._legacy_metadata_key(category)

    try:
        cached = self._retry_redis_operation(self._sync_redis.get, key)
        if cached:
            if not isinstance(cached, str):
                raise ValueError("Expected str from Redis")
            data = json.loads(cached)
            return CategoryMetadata(**data)
    except Exception as e:
        logger.warning(f"Redis fetch failed for legacy metadata {category}, falling back to file: {e}")

    # Fall back to file backend
    metadata = self._file_backend.get_legacy_metadata(category)

    # Cache the result in Redis
    if metadata is not None:
        try:
            json_data = json.dumps(metadata.model_dump(mode="json"))
            self._retry_redis_operation(self._sync_redis.setex, key, self._ttl, json_data)
        except Exception as e:
            logger.warning(f"Failed to cache legacy metadata for {category} in Redis: {e}")

    return metadata

get_legacy_metadata_async async

get_legacy_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get legacy format metadata, using Redis cache.

Parameters:

Returns:

  • CategoryMetadata

    CategoryMetadata | None: The legacy metadata, or None if not available.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
async def get_legacy_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get legacy format metadata, using Redis cache.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata | None: The legacy metadata, or None if not available.

    """
    # For now, delegate to sync version
    return self.get_legacy_metadata(category)

get_metadata

get_metadata(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Get v2 format metadata, using Redis cache.

Parameters:

Returns:

  • CategoryMetadata

    CategoryMetadata | None: The v2 metadata, or None if not available.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_metadata(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Get v2 format metadata, using Redis cache.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata | None: The v2 metadata, or None if not available.

    """
    key = self._v2_metadata_key(category)

    try:
        cached = self._retry_redis_operation(self._sync_redis.get, key)
        if cached:
            if not isinstance(cached, str):
                raise ValueError("Expected str from Redis")
            data = json.loads(cached)
            return CategoryMetadata(**data)
    except Exception as e:
        logger.warning(f"Redis fetch failed for v2 metadata {category}, falling back to file: {e}")

    # Fall back to file backend
    metadata = self._file_backend.get_metadata(category)

    # Cache the result in Redis
    if metadata is not None:
        try:
            json_data = json.dumps(metadata.model_dump(mode="json"))
            self._retry_redis_operation(self._sync_redis.setex, key, self._ttl, json_data)
        except Exception as e:
            logger.warning(f"Failed to cache v2 metadata for {category} in Redis: {e}")

    return metadata

get_metadata_async async

get_metadata_async(
    category: MODEL_REFERENCE_CATEGORY,
) -> CategoryMetadata

Asynchronously get v2 format metadata, using Redis cache.

Parameters:

Returns:

  • CategoryMetadata

    CategoryMetadata | None: The v2 metadata, or None if not available.

Source code in src/horde_model_reference/backends/redis_backend.py
@override
async def get_metadata_async(self, category: MODEL_REFERENCE_CATEGORY) -> CategoryMetadata:
    """Asynchronously get v2 format metadata, using Redis cache.

    Args:
        category: The category to get metadata for.

    Returns:
        CategoryMetadata | None: The v2 metadata, or None if not available.

    """
    # For now, delegate to sync version
    return self.get_metadata(category)

get_all_legacy_metadata

get_all_legacy_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get legacy format metadata for all categories.

Returns:

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_all_legacy_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get legacy format metadata for all categories.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    """
    result = {}
    for category in MODEL_REFERENCE_CATEGORY:
        metadata = self.get_legacy_metadata(category)
        if metadata is not None:
            result[category] = metadata
    return result

get_all_legacy_metadata_async async

get_all_legacy_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get legacy format metadata for all categories.

Returns:

Source code in src/horde_model_reference/backends/redis_backend.py
@override
async def get_all_legacy_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get legacy format metadata for all categories.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their legacy metadata.

    """
    return self.get_all_legacy_metadata()

get_all_metadata

get_all_metadata() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Get v2 format metadata for all categories.

Returns:

Source code in src/horde_model_reference/backends/redis_backend.py
@override
def get_all_metadata(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Get v2 format metadata for all categories.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    """
    result = {}
    for category in MODEL_REFERENCE_CATEGORY:
        metadata = self.get_metadata(category)
        if metadata is not None:
            result[category] = metadata
    return result

get_all_metadata_async async

get_all_metadata_async() -> dict[
    MODEL_REFERENCE_CATEGORY, CategoryMetadata
]

Asynchronously get v2 format metadata for all categories.

Returns:

Source code in src/horde_model_reference/backends/redis_backend.py
@override
async def get_all_metadata_async(self) -> dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]:
    """Asynchronously get v2 format metadata for all categories.

    Returns:
        dict[MODEL_REFERENCE_CATEGORY, CategoryMetadata]: Mapping of categories to their v2 metadata.

    """
    return self.get_all_metadata()

__del__

__del__() -> None

Clean up Redis connections on deletion.

Source code in src/horde_model_reference/backends/redis_backend.py
def __del__(self) -> None:
    """Clean up Redis connections on deletion."""
    if hasattr(self, "_pubsub_running"):
        self._pubsub_running = False

    if hasattr(self, "_pubsub"):
        with contextlib.suppress(Exception):
            self._pubsub.close()

    if hasattr(self, "_sync_redis"):
        with contextlib.suppress(Exception):
            self._sync_redis.close()

register_invalidation_callback

register_invalidation_callback(
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None

Register a callback to be called when a category is invalidated.

This allows external components (like ModelReferenceManager) to be notified when cached data becomes stale and needs to be refreshed.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def register_invalidation_callback(
    self,
    callback: Callable[[MODEL_REFERENCE_CATEGORY], None],
) -> None:
    """Register a callback to be called when a category is invalidated.

    This allows external components (like ModelReferenceManager) to be notified
    when cached data becomes stale and needs to be refreshed.

    Args:
        callback: Function to call with the invalidated category.

    """
    self._invalidation_callbacks.append(callback)
    logger.debug(f"Registered invalidation callback: {getattr(callback, '__name__', repr(callback))}")

_notify_invalidation

_notify_invalidation(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Notify all registered callbacks that a category has been invalidated.

Parameters:

Source code in src/horde_model_reference/backends/base.py
def _notify_invalidation(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Notify all registered callbacks that a category has been invalidated.

    Args:
        category: The category that was invalidated.

    """
    for callback in self._invalidation_callbacks:
        try:
            callback(category)
        except Exception as e:
            cb_name = getattr(callback, "__name__", repr(callback))
            logger.error(f"Invalidation callback {cb_name} failed for {category}: {e}")

mark_stale

mark_stale(category: MODEL_REFERENCE_CATEGORY) -> None

Mark a category's data as stale, requiring refresh on next access.

This method calls the backend-specific implementation and then notifies all registered callbacks.

Parameters:

Implementation Note

The base class provides this public implementation. Subclasses should override _mark_stale_impl() instead of this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def mark_stale(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Mark a category's data as stale, requiring refresh on next access.

    This method calls the backend-specific implementation and then notifies
    all registered callbacks.

    Args:
        category: The category to mark as stale.

    Implementation Note:
        The base class provides this public implementation. Subclasses should override
        [_mark_stale_impl()][(c)._mark_stale_impl]
        instead of this method.

    See Also:
        - [_mark_stale_impl()][(c)._mark_stale_impl]: Backend-specific staleness tracking
        - [register_invalidation_callback()][(c).register_invalidation_callback]:
          Register callbacks for invalidation events

    """
    self._mark_stale_impl(category)
    self._notify_invalidation(category)

support_any_writes

support_any_writes() -> bool

Check if this backend supports any write operations (v2 or legacy).

Returns:

  • bool ( bool ) –

    True if any write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def support_any_writes(self) -> bool:
    """Check if this backend supports any write operations (v2 or legacy).

    Returns:
        bool: True if any write operations are supported, False otherwise.

    """
    return self.supports_writes() or self.supports_legacy_writes()

supports_legacy_writes

supports_legacy_writes() -> bool

Check if this backend supports write operations in legacy format.

Legacy write operations include update_model_legacy() and delete_model_legacy(). Only available when canonical_format='LEGACY' in PRIMARY mode.

Returns:

  • bool ( bool ) –

    True if legacy write operations are supported, False otherwise.

Source code in src/horde_model_reference/backends/base.py
def supports_legacy_writes(self) -> bool:
    """Check if this backend supports write operations in legacy format.

    Legacy write operations include update_model_legacy() and delete_model_legacy().
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Returns:
        bool: True if legacy write operations are supported, False otherwise.

    """
    return False

update_model_from_base_model

update_model_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference from a pydantic BaseModel.

This is an optional method that write-capable backends can implement. Read-only backends should leave the default implementation which raises NotImplementedError.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Implementation Note

The base class provides this implementation automatically. It: 1. Checks supports_writes() returns True 2. Converts the pydantic model to dict using model_dump(exclude_unset=True) 3. Calls update_model() with the dictionary

Backends that support writes typically don't need to override this method.

See Also
Source code in src/horde_model_reference/backends/base.py
def update_model_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference from a pydantic BaseModel.

    This is an optional method that write-capable backends can implement.
    Read-only backends should leave the default implementation which raises NotImplementedError.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support write operations.

    Implementation Note:
        The base class provides this implementation automatically. It:
        1. Checks [supports_writes()][(c).supports_writes] returns `True`
        2. Converts the pydantic model to dict using `model_dump(exclude_unset=True)`
        3. Calls [update_model()][(c).update_model] with the dictionary

        Backends that support writes typically don't need to override this method.

    See Also:
        - [update_model()][(c).update_model]: Update from dictionary (implement this)
        - [supports_writes()][(c).supports_writes]: Feature detection method

    """
    if not self.supports_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

update_model_legacy

update_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_dict (dict[str, Any]) –

    The model record data in legacy format as a dictionary.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_dict: dict[str, Any],
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_dict: The model record data in legacy format as a dictionary.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

update_model_legacy_from_base_model

update_model_legacy_from_base_model(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Update or create a model reference in legacy format from a pydantic BaseModel.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category to update.

  • model_name (str) –

    The name of the model to update or create.

  • record_model (BaseModel) –

    The model record data as a pydantic BaseModel.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def update_model_legacy_from_base_model(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    record_model: BaseModel,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Update or create a model reference in legacy format from a pydantic BaseModel.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category to update.
        model_name: The name of the model to update or create.
        record_model: The model record data as a pydantic BaseModel.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    if not self.supports_legacy_writes():
        raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")

    record_dict = record_model.model_dump(exclude_unset=True)
    self.update_model_legacy(
        category,
        model_name,
        record_dict,
        logical_user_id=logical_user_id,
        request_id=request_id,
    )

delete_model_legacy

delete_model_legacy(
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None

Delete a model reference from legacy format files.

This is an optional method that legacy-write-capable backends can implement. Only available when canonical_format='LEGACY' in PRIMARY mode.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The category containing the model.

  • model_name (str) –

    The name of the model to delete.

  • logical_user_id (str | None, default: None ) –

    Immutable Horde user id for auditing contexts (optional).

  • request_id (str | None, default: None ) –

    Optional tracing/idempotency identifier for audit correlation.

Raises:

Source code in src/horde_model_reference/backends/base.py
def delete_model_legacy(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    model_name: str,
    *,
    logical_user_id: str | None = None,
    request_id: str | None = None,
) -> None:
    """Delete a model reference from legacy format files.

    This is an optional method that legacy-write-capable backends can implement.
    Only available when canonical_format='LEGACY' in PRIMARY mode.

    Args:
        category: The category containing the model.
        model_name: The name of the model to delete.
        logical_user_id: Immutable Horde user id for auditing contexts (optional).
        request_id: Optional tracing/idempotency identifier for audit correlation.

    Raises:
        NotImplementedError: If the backend does not support legacy write operations.

    """
    raise NotImplementedError(f"{self.__class__.__name__} does not support legacy write operations")