Bases: RedisCache[CategoryDeletionRiskResponse]
Singleton cache for category deletion risk results.
Integrates with existing backend invalidation system to automatically
clear deletion risk results when model data changes. Uses Redis for distributed
caching when available, with in-memory fallback.
Inherits from RedisCache[CategoryDeletionRiskResponse] for common caching infrastructure.
Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
| class DeletionRiskCache(RedisCache[CategoryDeletionRiskResponse]):
"""Singleton cache for category deletion risk results.
Integrates with existing backend invalidation system to automatically
clear deletion risk results when model data changes. Uses Redis for distributed
caching when available, with in-memory fallback.
Inherits from RedisCache[CategoryDeletionRiskResponse] for common caching infrastructure.
"""
_instance: DeletionRiskCache | None = None
def _get_cache_key_prefix(self) -> str:
"""Get the Redis key prefix for deletion risk cache.
Returns:
Redis key prefix string.
"""
return f"{horde_model_reference_settings.redis.key_prefix}:deletion_risk"
def _get_ttl(self) -> int:
"""Get the TTL in seconds for deletion risk cache entries.
Returns:
TTL in seconds from settings.
"""
return horde_model_reference_settings.deletion_risk_cache_ttl
def _get_model_class(self) -> type[CategoryDeletionRiskResponse]:
"""Get the Pydantic model class for deserialization.
Returns:
CategoryDeletionRiskResponse class.
"""
from horde_model_reference.analytics.deletion_risk_analysis import CategoryDeletionRiskResponse
return CategoryDeletionRiskResponse
def _register_invalidation_callback(self) -> None:
"""Register callback with ModelReferenceManager backend for automatic invalidation."""
try:
from horde_model_reference import ModelReferenceManager
manager = ModelReferenceManager()
if hasattr(manager.backend, "register_invalidation_callback"):
manager.backend.register_invalidation_callback(self._on_category_invalidated)
logger.info("DeletionRiskCache registered invalidation callback with backend")
else:
logger.warning(f"Backend {type(manager.backend).__name__} does not support invalidation callbacks")
except Exception as e:
logger.warning(f"Failed to register invalidation callback: {e}")
logger.info("Deletion risk cache will rely on TTL-based expiration only")
def _on_category_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
"""Invalidate deletion risk cache when model reference data changes.
Invalidates both grouped and ungrouped variants for the category.
Args:
category: The category that was invalidated.
"""
logger.debug(f"Invalidating deletion risk cache for category: {category}")
self.invalidate(category, grouped=None) # Invalidate both variants
|
_instance
class-attribute
instance-attribute
_instance: DeletionRiskCache | None = None
_lock
class-attribute
instance-attribute
_cache
instance-attribute
_timestamps
instance-attribute
_timestamps: dict[str, float]
_redis_client
instance-attribute
_redis_client: Redis[bytes] | None
_redis_key_prefix
instance-attribute
_get_cache_key_prefix
_get_cache_key_prefix() -> str
Get the Redis key prefix for deletion risk cache.
Returns:
Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
| def _get_cache_key_prefix(self) -> str:
"""Get the Redis key prefix for deletion risk cache.
Returns:
Redis key prefix string.
"""
return f"{horde_model_reference_settings.redis.key_prefix}:deletion_risk"
|
_get_ttl
Get the TTL in seconds for deletion risk cache entries.
Returns:
-
int
–
TTL in seconds from settings.
Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
| def _get_ttl(self) -> int:
"""Get the TTL in seconds for deletion risk cache entries.
Returns:
TTL in seconds from settings.
"""
return horde_model_reference_settings.deletion_risk_cache_ttl
|
_get_model_class
_get_model_class() -> type[CategoryDeletionRiskResponse]
Get the Pydantic model class for deserialization.
Returns:
Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
| def _get_model_class(self) -> type[CategoryDeletionRiskResponse]:
"""Get the Pydantic model class for deserialization.
Returns:
CategoryDeletionRiskResponse class.
"""
from horde_model_reference.analytics.deletion_risk_analysis import CategoryDeletionRiskResponse
return CategoryDeletionRiskResponse
|
_register_invalidation_callback
_register_invalidation_callback() -> None
Register callback with ModelReferenceManager backend for automatic invalidation.
Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
| def _register_invalidation_callback(self) -> None:
"""Register callback with ModelReferenceManager backend for automatic invalidation."""
try:
from horde_model_reference import ModelReferenceManager
manager = ModelReferenceManager()
if hasattr(manager.backend, "register_invalidation_callback"):
manager.backend.register_invalidation_callback(self._on_category_invalidated)
logger.info("DeletionRiskCache registered invalidation callback with backend")
else:
logger.warning(f"Backend {type(manager.backend).__name__} does not support invalidation callbacks")
except Exception as e:
logger.warning(f"Failed to register invalidation callback: {e}")
logger.info("Deletion risk cache will rely on TTL-based expiration only")
|
_on_category_invalidated
_on_category_invalidated(
category: MODEL_REFERENCE_CATEGORY,
) -> None
Invalidate deletion risk cache when model reference data changes.
Invalidates both grouped and ungrouped variants for the category.
Parameters:
Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
| def _on_category_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
"""Invalidate deletion risk cache when model reference data changes.
Invalidates both grouped and ungrouped variants for the category.
Args:
category: The category that was invalidated.
"""
logger.debug(f"Invalidating deletion risk cache for category: {category}")
self.invalidate(category, grouped=None) # Invalidate both variants
|
__new__
__new__() -> RedisCache[T]
Singleton pattern matching ModelReferenceManager.
Source code in src/horde_model_reference/analytics/base_cache.py
| def __new__(cls) -> RedisCache[T]:
"""Singleton pattern matching ModelReferenceManager."""
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance._initialize()
return cls._instance
|
_initialize
Initialize caching infrastructure.
Source code in src/horde_model_reference/analytics/base_cache.py
| def _initialize(self) -> None:
"""Initialize caching infrastructure."""
self._cache = {}
self._timestamps = {}
self._redis_client = None
self._redis_key_prefix = self._get_cache_key_prefix()
logger.debug(f"Initializing {self.__class__.__name__} with TTL={self._get_ttl()}s")
# Try to connect to Redis if configured
if horde_model_reference_settings.redis.use_redis:
try:
import redis
self._redis_client = redis.from_url(
horde_model_reference_settings.redis.url,
socket_timeout=horde_model_reference_settings.redis.socket_timeout,
decode_responses=False,
)
self._redis_client.ping()
logger.info(f"{self.__class__.__name__} using Redis for distributed caching")
except Exception as e:
logger.warning(f"Failed to connect to Redis for {self.__class__.__name__}: {e}")
logger.info(f"{self.__class__.__name__} falling back to in-memory cache")
self._redis_client = None
else:
logger.info(f"{self.__class__.__name__} using in-memory cache (Redis disabled)")
# Register invalidation callback
self._register_invalidation_callback()
|
_build_cache_key
_build_cache_key(
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
) -> str
Build cache key from category and options.
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category.
-
grouped
(bool, default:
False
)
–
Whether this is for grouped text models.
-
include_backend_variations
(bool, default:
False
)
–
Whether backend variations are included.
Returns:
Source code in src/horde_model_reference/analytics/base_cache.py
| def _build_cache_key(
self,
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
) -> str:
"""Build cache key from category and options.
Args:
category: The model reference category.
grouped: Whether this is for grouped text models.
include_backend_variations: Whether backend variations are included.
Returns:
Cache key string.
"""
group_suffix = ":grouped=true" if grouped else ":grouped=false"
variations_suffix = ":variations=true" if include_backend_variations else ""
return f"{category.value}{group_suffix}{variations_suffix}"
|
_get_redis_key
_get_redis_key(cache_key: str) -> str
Generate Redis key from cache key.
Parameters:
-
cache_key
(str)
–
The cache key (category + grouping state).
Returns:
-
str
–
Full Redis key string with prefix.
Source code in src/horde_model_reference/analytics/base_cache.py
| def _get_redis_key(self, cache_key: str) -> str:
"""Generate Redis key from cache key.
Args:
cache_key: The cache key (category + grouping state).
Returns:
Full Redis key string with prefix.
"""
return f"{self._redis_key_prefix}:{cache_key}"
|
get
get(
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
allow_stale: bool | None = None,
) -> T | None
Get cached result for a category.
Checks Redis first (if available), then in-memory cache.
When cache hydration is enabled (settings.cache_hydration_enabled=True) and
allow_stale is True (or None with hydration enabled), implements stale-while-revalidate:
- Returns cached data even if past normal TTL
- Only returns None if data exceeds stale_ttl (default 1 hour)
- Background hydration is expected to refresh data before stale_ttl
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category.
-
grouped
(bool, default:
False
)
–
Whether to get grouped text models variant.
-
include_backend_variations
(bool, default:
False
)
–
Whether backend variations are included.
-
allow_stale
(bool | None, default:
None
)
–
Whether to return stale data beyond normal TTL.
If None, defaults to True when hydration is enabled, False otherwise.
Returns:
-
T | None
–
Cached result or None if not cached or expired beyond stale TTL.
Source code in src/horde_model_reference/analytics/base_cache.py
| def get(
self,
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
allow_stale: bool | None = None,
) -> T | None:
"""Get cached result for a category.
Checks Redis first (if available), then in-memory cache.
When cache hydration is enabled (settings.cache_hydration_enabled=True) and
allow_stale is True (or None with hydration enabled), implements stale-while-revalidate:
- Returns cached data even if past normal TTL
- Only returns None if data exceeds stale_ttl (default 1 hour)
- Background hydration is expected to refresh data before stale_ttl
Args:
category: The model reference category.
grouped: Whether to get grouped text models variant.
include_backend_variations: Whether backend variations are included.
allow_stale: Whether to return stale data beyond normal TTL.
If None, defaults to True when hydration is enabled, False otherwise.
Returns:
Cached result or None if not cached or expired beyond stale TTL.
"""
cache_key = self._build_cache_key(category, grouped, include_backend_variations)
# Determine stale behavior
hydration_enabled = horde_model_reference_settings.cache_hydration_enabled
effective_allow_stale = allow_stale if allow_stale is not None else hydration_enabled
stale_ttl = horde_model_reference_settings.cache_hydration_stale_ttl_seconds
# Try Redis first
if self._redis_client:
try:
redis_key = self._get_redis_key(cache_key)
cached_bytes = self._redis_client.get(redis_key)
if cached_bytes:
model_class = self._get_model_class()
cached_dict = json.loads(cached_bytes.decode("utf-8"))
result = model_class(**cached_dict)
logger.debug(f"{self.__class__.__name__} cache hit (Redis): {cache_key}")
return result
except Exception as e:
logger.warning(f"Failed to get from Redis for {cache_key}: {e}")
# Try in-memory cache
with self._lock:
if cache_key in self._cache:
age = time.time() - self._timestamps.get(cache_key, 0)
ttl = self._get_ttl()
# Fresh data - always return
if age < ttl:
logger.debug(f"{self.__class__.__name__} cache hit (memory): {cache_key}")
return self._cache[cache_key]
# Stale data - return if stale allowed and within stale TTL
if effective_allow_stale and age < stale_ttl:
logger.debug(
f"{self.__class__.__name__} returning stale data (memory): {cache_key}, "
f"age={age:.1f}s (TTL={ttl}s, stale_ttl={stale_ttl}s)"
)
return self._cache[cache_key]
# Data too old - remove and return None
logger.debug(
f"{self.__class__.__name__} cache expired (memory): {cache_key}, "
f"age={age:.1f}s (stale_allowed={effective_allow_stale})"
)
self._cache.pop(cache_key, None)
self._timestamps.pop(cache_key, None)
logger.debug(f"{self.__class__.__name__} cache miss: {cache_key}")
return None
|
is_fresh
is_fresh(
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
) -> bool
Check if cached data is fresh (within normal TTL).
Useful for determining if background hydration should run.
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category.
-
grouped
(bool, default:
False
)
–
Whether to check grouped text models variant.
-
include_backend_variations
(bool, default:
False
)
–
Whether backend variations are included.
Returns:
-
bool
–
True if fresh data exists within TTL, False otherwise.
Source code in src/horde_model_reference/analytics/base_cache.py
| def is_fresh(
self,
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
) -> bool:
"""Check if cached data is fresh (within normal TTL).
Useful for determining if background hydration should run.
Args:
category: The model reference category.
grouped: Whether to check grouped text models variant.
include_backend_variations: Whether backend variations are included.
Returns:
True if fresh data exists within TTL, False otherwise.
"""
cache_key = self._build_cache_key(category, grouped, include_backend_variations)
# Check Redis TTL
if self._redis_client:
try:
redis_key = self._get_redis_key(cache_key)
ttl = self._redis_client.ttl(redis_key)
if ttl > 0:
return True
except Exception as e:
logger.warning(f"Failed to check Redis TTL for {cache_key}: {e}")
# Check in-memory
with self._lock:
if cache_key in self._cache:
age = time.time() - self._timestamps.get(cache_key, 0)
return age < self._get_ttl()
return False
|
set
set(
category: MODEL_REFERENCE_CATEGORY,
result: T,
grouped: bool = False,
include_backend_variations: bool = False,
) -> None
Store result in cache.
Stores in both Redis (if available) and in-memory cache.
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category.
-
result
(T)
–
The computed result to cache.
-
grouped
(bool, default:
False
)
–
Whether this is the grouped text models variant.
-
include_backend_variations
(bool, default:
False
)
–
Whether backend variations are included.
Source code in src/horde_model_reference/analytics/base_cache.py
| def set(
self,
category: MODEL_REFERENCE_CATEGORY,
result: T,
grouped: bool = False,
include_backend_variations: bool = False,
) -> None:
"""Store result in cache.
Stores in both Redis (if available) and in-memory cache.
Args:
category: The model reference category.
result: The computed result to cache.
grouped: Whether this is the grouped text models variant.
include_backend_variations: Whether backend variations are included.
"""
cache_key = self._build_cache_key(category, grouped, include_backend_variations)
# Store in Redis
if self._redis_client:
try:
redis_key = self._get_redis_key(cache_key)
serialized = result.model_dump_json()
self._redis_client.setex(redis_key, self._get_ttl(), serialized)
logger.debug(f"Stored in Redis: {cache_key}")
except Exception as e:
logger.warning(f"Failed to store in Redis for {cache_key}: {e}")
# Store in-memory (always, as fallback)
with self._lock:
self._cache[cache_key] = result
self._timestamps[cache_key] = time.time()
logger.debug(f"Stored in memory: {cache_key}")
|
invalidate
invalidate(
category: MODEL_REFERENCE_CATEGORY,
grouped: bool | None = None,
include_backend_variations: bool | None = None,
) -> None
Invalidate cached results for a category.
Removes from both Redis and in-memory cache. If grouped is None,
invalidates all grouped/ungrouped variants. If include_backend_variations
is None, invalidates all variation states.
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category to invalidate.
-
grouped
(bool | None, default:
None
)
–
Whether to invalidate grouped variant (None = both).
-
include_backend_variations
(bool | None, default:
None
)
–
Whether to invalidate variation states (None = both).
Source code in src/horde_model_reference/analytics/base_cache.py
| def invalidate(
self,
category: MODEL_REFERENCE_CATEGORY,
grouped: bool | None = None,
include_backend_variations: bool | None = None,
) -> None:
"""Invalidate cached results for a category.
Removes from both Redis and in-memory cache. If grouped is None,
invalidates all grouped/ungrouped variants. If include_backend_variations
is None, invalidates all variation states.
Args:
category: The model reference category to invalidate.
grouped: Whether to invalidate grouped variant (None = both).
include_backend_variations: Whether to invalidate variation states (None = both).
"""
# Determine which variants to invalidate
grouped_variants = [False, True] if grouped is None else [grouped]
variations_variants = [False, True] if include_backend_variations is None else [include_backend_variations]
for gv in grouped_variants:
for vv in variations_variants:
cache_key = self._build_cache_key(category, gv, vv)
logger.debug(f"Invalidating cache: {cache_key}")
# Invalidate Redis
if self._redis_client:
try:
redis_key = self._get_redis_key(cache_key)
deleted_count = self._redis_client.delete(redis_key)
if deleted_count > 0:
logger.debug(f"Deleted Redis key: {redis_key}")
except Exception as e:
logger.warning(f"Failed to delete from Redis for {cache_key}: {e}")
# Invalidate in-memory
with self._lock:
removed = self._cache.pop(cache_key, None) is not None
self._timestamps.pop(cache_key, None)
if removed:
logger.debug(f"Removed from memory cache: {cache_key}")
|
clear_all
Clear all cached results.
Useful for testing or when a global cache reset is needed.
Source code in src/horde_model_reference/analytics/base_cache.py
| def clear_all(self) -> None:
"""Clear all cached results.
Useful for testing or when a global cache reset is needed.
"""
logger.info(f"Clearing all {self.__class__.__name__} entries")
# Clear Redis
if self._redis_client:
try:
for category in MODEL_REFERENCE_CATEGORY:
for grouped in [False, True]:
for variations in [False, True]:
cache_key = self._build_cache_key(category, grouped, variations)
redis_key = self._get_redis_key(cache_key)
self._redis_client.delete(redis_key)
logger.debug("Cleared all Redis keys")
except Exception as e:
logger.warning(f"Failed to clear Redis cache: {e}")
# Clear in-memory
with self._lock:
self._cache.clear()
self._timestamps.clear()
logger.debug("Cleared in-memory cache")
|
get_cache_info
get_cache_info() -> dict[
str, int | float | bool | list[str]
]
Get information about the current cache state.
Returns:
Source code in src/horde_model_reference/analytics/base_cache.py
| def get_cache_info(self) -> dict[str, int | float | bool | list[str]]:
"""Get information about the current cache state.
Returns:
Dictionary with cache statistics including size, Redis status, TTL.
"""
with self._lock:
return {
"cache_size": len(self._cache),
"redis_enabled": self._redis_client is not None,
"ttl_seconds": self._get_ttl(),
"keys_cached": list(self._cache.keys()),
}
|