Bases: RedisCache[CategoryStatistics]
Singleton cache for category statistics.
Integrates with existing backend invalidation system to automatically
clear statistics when model data changes. Uses Redis for distributed
caching when available, with in-memory fallback.
Inherits from RedisCache[CategoryStatistics] for common caching infrastructure.
Source code in src/horde_model_reference/analytics/statistics_cache.py
| class StatisticsCache(RedisCache[CategoryStatistics]):
"""Singleton cache for category statistics.
Integrates with existing backend invalidation system to automatically
clear statistics when model data changes. Uses Redis for distributed
caching when available, with in-memory fallback.
Inherits from RedisCache[CategoryStatistics] for common caching infrastructure.
"""
_instance: StatisticsCache | None = None
def _get_cache_key_prefix(self) -> str:
"""Get the Redis key prefix for statistics cache.
Returns:
Redis key prefix string.
"""
return f"{horde_model_reference_settings.redis.key_prefix}:stats"
def _get_ttl(self) -> int:
"""Get the TTL in seconds for statistics cache entries.
Returns:
TTL in seconds from settings.
"""
return horde_model_reference_settings.statistics_cache_ttl
def _get_model_class(self) -> type[CategoryStatistics]:
"""Get the Pydantic model class for deserialization.
Returns:
CategoryStatistics class.
"""
from horde_model_reference.analytics.statistics import CategoryStatistics
return CategoryStatistics
def _register_invalidation_callback(self) -> None:
"""Register callback with ModelReferenceManager backend for automatic invalidation."""
try:
from horde_model_reference import ModelReferenceManager
manager = ModelReferenceManager()
if hasattr(manager.backend, "register_invalidation_callback"):
manager.backend.register_invalidation_callback(self._on_category_invalidated)
logger.info("StatisticsCache registered invalidation callback with backend")
else:
logger.warning(f"Backend {type(manager.backend).__name__} does not support invalidation callbacks")
except Exception as e:
logger.warning(f"Failed to register invalidation callback: {e}")
logger.info("Statistics cache will rely on TTL-based expiration only")
def _on_category_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
"""Invalidate statistics cache when model reference data changes.
Invalidates both grouped and ungrouped variants for the category.
Args:
category: The category that was invalidated.
"""
logger.debug(f"Invalidating statistics cache for category: {category}")
self.invalidate(category, grouped=None) # Invalidate both variants
|
_instance
class-attribute
instance-attribute
_instance: StatisticsCache | None = None
_lock
class-attribute
instance-attribute
_cache
instance-attribute
_timestamps
instance-attribute
_timestamps: dict[str, float]
_redis_client
instance-attribute
_redis_client: Redis[bytes] | None
_redis_key_prefix
instance-attribute
_get_cache_key_prefix
_get_cache_key_prefix() -> str
Get the Redis key prefix for statistics cache.
Returns:
Source code in src/horde_model_reference/analytics/statistics_cache.py
| def _get_cache_key_prefix(self) -> str:
"""Get the Redis key prefix for statistics cache.
Returns:
Redis key prefix string.
"""
return f"{horde_model_reference_settings.redis.key_prefix}:stats"
|
_get_ttl
Get the TTL in seconds for statistics cache entries.
Returns:
-
int
–
TTL in seconds from settings.
Source code in src/horde_model_reference/analytics/statistics_cache.py
| def _get_ttl(self) -> int:
"""Get the TTL in seconds for statistics cache entries.
Returns:
TTL in seconds from settings.
"""
return horde_model_reference_settings.statistics_cache_ttl
|
_get_model_class
_get_model_class() -> type[CategoryStatistics]
Get the Pydantic model class for deserialization.
Returns:
Source code in src/horde_model_reference/analytics/statistics_cache.py
| def _get_model_class(self) -> type[CategoryStatistics]:
"""Get the Pydantic model class for deserialization.
Returns:
CategoryStatistics class.
"""
from horde_model_reference.analytics.statistics import CategoryStatistics
return CategoryStatistics
|
_register_invalidation_callback
_register_invalidation_callback() -> None
Register callback with ModelReferenceManager backend for automatic invalidation.
Source code in src/horde_model_reference/analytics/statistics_cache.py
| def _register_invalidation_callback(self) -> None:
"""Register callback with ModelReferenceManager backend for automatic invalidation."""
try:
from horde_model_reference import ModelReferenceManager
manager = ModelReferenceManager()
if hasattr(manager.backend, "register_invalidation_callback"):
manager.backend.register_invalidation_callback(self._on_category_invalidated)
logger.info("StatisticsCache registered invalidation callback with backend")
else:
logger.warning(f"Backend {type(manager.backend).__name__} does not support invalidation callbacks")
except Exception as e:
logger.warning(f"Failed to register invalidation callback: {e}")
logger.info("Statistics cache will rely on TTL-based expiration only")
|
_on_category_invalidated
_on_category_invalidated(
category: MODEL_REFERENCE_CATEGORY,
) -> None
Invalidate statistics cache when model reference data changes.
Invalidates both grouped and ungrouped variants for the category.
Parameters:
Source code in src/horde_model_reference/analytics/statistics_cache.py
| def _on_category_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
"""Invalidate statistics cache when model reference data changes.
Invalidates both grouped and ungrouped variants for the category.
Args:
category: The category that was invalidated.
"""
logger.debug(f"Invalidating statistics cache for category: {category}")
self.invalidate(category, grouped=None) # Invalidate both variants
|
__new__
__new__() -> RedisCache[T]
Singleton pattern matching ModelReferenceManager.
Source code in src/horde_model_reference/analytics/base_cache.py
| def __new__(cls) -> RedisCache[T]:
"""Singleton pattern matching ModelReferenceManager."""
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance._initialize()
return cls._instance
|
_initialize
Initialize caching infrastructure.
Source code in src/horde_model_reference/analytics/base_cache.py
| def _initialize(self) -> None:
"""Initialize caching infrastructure."""
self._cache = {}
self._timestamps = {}
self._redis_client = None
self._redis_key_prefix = self._get_cache_key_prefix()
logger.debug(f"Initializing {self.__class__.__name__} with TTL={self._get_ttl()}s")
# Try to connect to Redis if configured
if horde_model_reference_settings.redis.use_redis:
try:
import redis
self._redis_client = redis.from_url(
horde_model_reference_settings.redis.url,
socket_timeout=horde_model_reference_settings.redis.socket_timeout,
decode_responses=False,
)
self._redis_client.ping()
logger.info(f"{self.__class__.__name__} using Redis for distributed caching")
except Exception as e:
logger.warning(f"Failed to connect to Redis for {self.__class__.__name__}: {e}")
logger.info(f"{self.__class__.__name__} falling back to in-memory cache")
self._redis_client = None
else:
logger.info(f"{self.__class__.__name__} using in-memory cache (Redis disabled)")
# Register invalidation callback
self._register_invalidation_callback()
|
_build_cache_key
_build_cache_key(
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
) -> str
Build cache key from category and options.
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category.
-
grouped
(bool, default:
False
)
–
Whether this is for grouped text models.
-
include_backend_variations
(bool, default:
False
)
–
Whether backend variations are included.
Returns:
Source code in src/horde_model_reference/analytics/base_cache.py
| def _build_cache_key(
self,
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
) -> str:
"""Build cache key from category and options.
Args:
category: The model reference category.
grouped: Whether this is for grouped text models.
include_backend_variations: Whether backend variations are included.
Returns:
Cache key string.
"""
group_suffix = ":grouped=true" if grouped else ":grouped=false"
variations_suffix = ":variations=true" if include_backend_variations else ""
return f"{category.value}{group_suffix}{variations_suffix}"
|
_get_redis_key
_get_redis_key(cache_key: str) -> str
Generate Redis key from cache key.
Parameters:
-
cache_key
(str)
–
The cache key (category + grouping state).
Returns:
-
str
–
Full Redis key string with prefix.
Source code in src/horde_model_reference/analytics/base_cache.py
| def _get_redis_key(self, cache_key: str) -> str:
"""Generate Redis key from cache key.
Args:
cache_key: The cache key (category + grouping state).
Returns:
Full Redis key string with prefix.
"""
return f"{self._redis_key_prefix}:{cache_key}"
|
get
get(
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
allow_stale: bool | None = None,
) -> T | None
Get cached result for a category.
Checks Redis first (if available), then in-memory cache.
When cache hydration is enabled (settings.cache_hydration_enabled=True) and
allow_stale is True (or None with hydration enabled), implements stale-while-revalidate:
- Returns cached data even if past normal TTL
- Only returns None if data exceeds stale_ttl (default 1 hour)
- Background hydration is expected to refresh data before stale_ttl
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category.
-
grouped
(bool, default:
False
)
–
Whether to get grouped text models variant.
-
include_backend_variations
(bool, default:
False
)
–
Whether backend variations are included.
-
allow_stale
(bool | None, default:
None
)
–
Whether to return stale data beyond normal TTL.
If None, defaults to True when hydration is enabled, False otherwise.
Returns:
-
T | None
–
Cached result or None if not cached or expired beyond stale TTL.
Source code in src/horde_model_reference/analytics/base_cache.py
| def get(
self,
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
allow_stale: bool | None = None,
) -> T | None:
"""Get cached result for a category.
Checks Redis first (if available), then in-memory cache.
When cache hydration is enabled (settings.cache_hydration_enabled=True) and
allow_stale is True (or None with hydration enabled), implements stale-while-revalidate:
- Returns cached data even if past normal TTL
- Only returns None if data exceeds stale_ttl (default 1 hour)
- Background hydration is expected to refresh data before stale_ttl
Args:
category: The model reference category.
grouped: Whether to get grouped text models variant.
include_backend_variations: Whether backend variations are included.
allow_stale: Whether to return stale data beyond normal TTL.
If None, defaults to True when hydration is enabled, False otherwise.
Returns:
Cached result or None if not cached or expired beyond stale TTL.
"""
cache_key = self._build_cache_key(category, grouped, include_backend_variations)
# Determine stale behavior
hydration_enabled = horde_model_reference_settings.cache_hydration_enabled
effective_allow_stale = allow_stale if allow_stale is not None else hydration_enabled
stale_ttl = horde_model_reference_settings.cache_hydration_stale_ttl_seconds
# Try Redis first
if self._redis_client:
try:
redis_key = self._get_redis_key(cache_key)
cached_bytes = self._redis_client.get(redis_key)
if cached_bytes:
model_class = self._get_model_class()
cached_dict = json.loads(cached_bytes.decode("utf-8"))
result = model_class(**cached_dict)
logger.debug(f"{self.__class__.__name__} cache hit (Redis): {cache_key}")
return result
except Exception as e:
logger.warning(f"Failed to get from Redis for {cache_key}: {e}")
# Try in-memory cache
with self._lock:
if cache_key in self._cache:
age = time.time() - self._timestamps.get(cache_key, 0)
ttl = self._get_ttl()
# Fresh data - always return
if age < ttl:
logger.debug(f"{self.__class__.__name__} cache hit (memory): {cache_key}")
return self._cache[cache_key]
# Stale data - return if stale allowed and within stale TTL
if effective_allow_stale and age < stale_ttl:
logger.debug(
f"{self.__class__.__name__} returning stale data (memory): {cache_key}, "
f"age={age:.1f}s (TTL={ttl}s, stale_ttl={stale_ttl}s)"
)
return self._cache[cache_key]
# Data too old - remove and return None
logger.debug(
f"{self.__class__.__name__} cache expired (memory): {cache_key}, "
f"age={age:.1f}s (stale_allowed={effective_allow_stale})"
)
self._cache.pop(cache_key, None)
self._timestamps.pop(cache_key, None)
logger.debug(f"{self.__class__.__name__} cache miss: {cache_key}")
return None
|
is_fresh
is_fresh(
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
) -> bool
Check if cached data is fresh (within normal TTL).
Useful for determining if background hydration should run.
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category.
-
grouped
(bool, default:
False
)
–
Whether to check grouped text models variant.
-
include_backend_variations
(bool, default:
False
)
–
Whether backend variations are included.
Returns:
-
bool
–
True if fresh data exists within TTL, False otherwise.
Source code in src/horde_model_reference/analytics/base_cache.py
| def is_fresh(
self,
category: MODEL_REFERENCE_CATEGORY,
grouped: bool = False,
include_backend_variations: bool = False,
) -> bool:
"""Check if cached data is fresh (within normal TTL).
Useful for determining if background hydration should run.
Args:
category: The model reference category.
grouped: Whether to check grouped text models variant.
include_backend_variations: Whether backend variations are included.
Returns:
True if fresh data exists within TTL, False otherwise.
"""
cache_key = self._build_cache_key(category, grouped, include_backend_variations)
# Check Redis TTL
if self._redis_client:
try:
redis_key = self._get_redis_key(cache_key)
ttl = self._redis_client.ttl(redis_key)
if ttl > 0:
return True
except Exception as e:
logger.warning(f"Failed to check Redis TTL for {cache_key}: {e}")
# Check in-memory
with self._lock:
if cache_key in self._cache:
age = time.time() - self._timestamps.get(cache_key, 0)
return age < self._get_ttl()
return False
|
set
set(
category: MODEL_REFERENCE_CATEGORY,
result: T,
grouped: bool = False,
include_backend_variations: bool = False,
) -> None
Store result in cache.
Stores in both Redis (if available) and in-memory cache.
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category.
-
result
(T)
–
The computed result to cache.
-
grouped
(bool, default:
False
)
–
Whether this is the grouped text models variant.
-
include_backend_variations
(bool, default:
False
)
–
Whether backend variations are included.
Source code in src/horde_model_reference/analytics/base_cache.py
| def set(
self,
category: MODEL_REFERENCE_CATEGORY,
result: T,
grouped: bool = False,
include_backend_variations: bool = False,
) -> None:
"""Store result in cache.
Stores in both Redis (if available) and in-memory cache.
Args:
category: The model reference category.
result: The computed result to cache.
grouped: Whether this is the grouped text models variant.
include_backend_variations: Whether backend variations are included.
"""
cache_key = self._build_cache_key(category, grouped, include_backend_variations)
# Store in Redis
if self._redis_client:
try:
redis_key = self._get_redis_key(cache_key)
serialized = result.model_dump_json()
self._redis_client.setex(redis_key, self._get_ttl(), serialized)
logger.debug(f"Stored in Redis: {cache_key}")
except Exception as e:
logger.warning(f"Failed to store in Redis for {cache_key}: {e}")
# Store in-memory (always, as fallback)
with self._lock:
self._cache[cache_key] = result
self._timestamps[cache_key] = time.time()
logger.debug(f"Stored in memory: {cache_key}")
|
invalidate
invalidate(
category: MODEL_REFERENCE_CATEGORY,
grouped: bool | None = None,
include_backend_variations: bool | None = None,
) -> None
Invalidate cached results for a category.
Removes from both Redis and in-memory cache. If grouped is None,
invalidates all grouped/ungrouped variants. If include_backend_variations
is None, invalidates all variation states.
Parameters:
-
category
(MODEL_REFERENCE_CATEGORY)
–
The model reference category to invalidate.
-
grouped
(bool | None, default:
None
)
–
Whether to invalidate grouped variant (None = both).
-
include_backend_variations
(bool | None, default:
None
)
–
Whether to invalidate variation states (None = both).
Source code in src/horde_model_reference/analytics/base_cache.py
| def invalidate(
self,
category: MODEL_REFERENCE_CATEGORY,
grouped: bool | None = None,
include_backend_variations: bool | None = None,
) -> None:
"""Invalidate cached results for a category.
Removes from both Redis and in-memory cache. If grouped is None,
invalidates all grouped/ungrouped variants. If include_backend_variations
is None, invalidates all variation states.
Args:
category: The model reference category to invalidate.
grouped: Whether to invalidate grouped variant (None = both).
include_backend_variations: Whether to invalidate variation states (None = both).
"""
# Determine which variants to invalidate
grouped_variants = [False, True] if grouped is None else [grouped]
variations_variants = [False, True] if include_backend_variations is None else [include_backend_variations]
for gv in grouped_variants:
for vv in variations_variants:
cache_key = self._build_cache_key(category, gv, vv)
logger.debug(f"Invalidating cache: {cache_key}")
# Invalidate Redis
if self._redis_client:
try:
redis_key = self._get_redis_key(cache_key)
deleted_count = self._redis_client.delete(redis_key)
if deleted_count > 0:
logger.debug(f"Deleted Redis key: {redis_key}")
except Exception as e:
logger.warning(f"Failed to delete from Redis for {cache_key}: {e}")
# Invalidate in-memory
with self._lock:
removed = self._cache.pop(cache_key, None) is not None
self._timestamps.pop(cache_key, None)
if removed:
logger.debug(f"Removed from memory cache: {cache_key}")
|
clear_all
Clear all cached results.
Useful for testing or when a global cache reset is needed.
Source code in src/horde_model_reference/analytics/base_cache.py
| def clear_all(self) -> None:
"""Clear all cached results.
Useful for testing or when a global cache reset is needed.
"""
logger.info(f"Clearing all {self.__class__.__name__} entries")
# Clear Redis
if self._redis_client:
try:
for category in MODEL_REFERENCE_CATEGORY:
for grouped in [False, True]:
for variations in [False, True]:
cache_key = self._build_cache_key(category, grouped, variations)
redis_key = self._get_redis_key(cache_key)
self._redis_client.delete(redis_key)
logger.debug("Cleared all Redis keys")
except Exception as e:
logger.warning(f"Failed to clear Redis cache: {e}")
# Clear in-memory
with self._lock:
self._cache.clear()
self._timestamps.clear()
logger.debug("Cleared in-memory cache")
|
get_cache_info
get_cache_info() -> dict[
str, int | float | bool | list[str]
]
Get information about the current cache state.
Returns:
Source code in src/horde_model_reference/analytics/base_cache.py
| def get_cache_info(self) -> dict[str, int | float | bool | list[str]]:
"""Get information about the current cache state.
Returns:
Dictionary with cache statistics including size, Redis status, TTL.
"""
with self._lock:
return {
"cache_size": len(self._cache),
"redis_enabled": self._redis_client is not None,
"ttl_seconds": self._get_ttl(),
"keys_cached": list(self._cache.keys()),
}
|