Skip to content

statistics_cache

Caching layer for category statistics with Redis support.

Provides a singleton cache for CategoryStatistics that integrates with the backend invalidation system. Automatically invalidates when model reference data changes.

StatisticsCache

Bases: RedisCache[CategoryStatistics]

Singleton cache for category statistics.

Integrates with existing backend invalidation system to automatically clear statistics when model data changes. Uses Redis for distributed caching when available, with in-memory fallback.

Inherits from RedisCache[CategoryStatistics] for common caching infrastructure.

Source code in src/horde_model_reference/analytics/statistics_cache.py
class StatisticsCache(RedisCache[CategoryStatistics]):
    """Singleton cache for category statistics.

    Integrates with existing backend invalidation system to automatically
    clear statistics when model data changes. Uses Redis for distributed
    caching when available, with in-memory fallback.

    Inherits from RedisCache[CategoryStatistics] for common caching infrastructure.
    """

    _instance: StatisticsCache | None = None

    def _get_cache_key_prefix(self) -> str:
        """Get the Redis key prefix for statistics cache.

        Returns:
            Redis key prefix string.

        """
        return f"{horde_model_reference_settings.redis.key_prefix}:stats"

    def _get_ttl(self) -> int:
        """Get the TTL in seconds for statistics cache entries.

        Returns:
            TTL in seconds from settings.

        """
        return horde_model_reference_settings.statistics_cache_ttl

    def _get_model_class(self) -> type[CategoryStatistics]:
        """Get the Pydantic model class for deserialization.

        Returns:
            CategoryStatistics class.

        """
        from horde_model_reference.analytics.statistics import CategoryStatistics

        return CategoryStatistics

    def _register_invalidation_callback(self) -> None:
        """Register callback with ModelReferenceManager backend for automatic invalidation."""
        try:
            from horde_model_reference import ModelReferenceManager

            manager = ModelReferenceManager()
            if hasattr(manager.backend, "register_invalidation_callback"):
                manager.backend.register_invalidation_callback(self._on_category_invalidated)
                logger.info("StatisticsCache registered invalidation callback with backend")
            else:
                logger.warning(f"Backend {type(manager.backend).__name__} does not support invalidation callbacks")
        except Exception as e:
            logger.warning(f"Failed to register invalidation callback: {e}")
            logger.info("Statistics cache will rely on TTL-based expiration only")

    def _on_category_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Invalidate statistics cache when model reference data changes.

        Invalidates both grouped and ungrouped variants for the category.

        Args:
            category: The category that was invalidated.

        """
        logger.debug(f"Invalidating statistics cache for category: {category}")
        self.invalidate(category, grouped=None)  # Invalidate both variants

_instance class-attribute instance-attribute

_instance: StatisticsCache | None = None

_lock class-attribute instance-attribute

_lock: RLock = RLock()

_cache instance-attribute

_cache: dict[str, T]

_timestamps instance-attribute

_timestamps: dict[str, float]

_redis_client instance-attribute

_redis_client: Redis[bytes] | None

_redis_key_prefix instance-attribute

_redis_key_prefix: str

_get_cache_key_prefix

_get_cache_key_prefix() -> str

Get the Redis key prefix for statistics cache.

Returns:

  • str

    Redis key prefix string.

Source code in src/horde_model_reference/analytics/statistics_cache.py
def _get_cache_key_prefix(self) -> str:
    """Get the Redis key prefix for statistics cache.

    Returns:
        Redis key prefix string.

    """
    return f"{horde_model_reference_settings.redis.key_prefix}:stats"

_get_ttl

_get_ttl() -> int

Get the TTL in seconds for statistics cache entries.

Returns:

  • int

    TTL in seconds from settings.

Source code in src/horde_model_reference/analytics/statistics_cache.py
def _get_ttl(self) -> int:
    """Get the TTL in seconds for statistics cache entries.

    Returns:
        TTL in seconds from settings.

    """
    return horde_model_reference_settings.statistics_cache_ttl

_get_model_class

_get_model_class() -> type[CategoryStatistics]

Get the Pydantic model class for deserialization.

Returns:

Source code in src/horde_model_reference/analytics/statistics_cache.py
def _get_model_class(self) -> type[CategoryStatistics]:
    """Get the Pydantic model class for deserialization.

    Returns:
        CategoryStatistics class.

    """
    from horde_model_reference.analytics.statistics import CategoryStatistics

    return CategoryStatistics

_register_invalidation_callback

_register_invalidation_callback() -> None

Register callback with ModelReferenceManager backend for automatic invalidation.

Source code in src/horde_model_reference/analytics/statistics_cache.py
def _register_invalidation_callback(self) -> None:
    """Register callback with ModelReferenceManager backend for automatic invalidation."""
    try:
        from horde_model_reference import ModelReferenceManager

        manager = ModelReferenceManager()
        if hasattr(manager.backend, "register_invalidation_callback"):
            manager.backend.register_invalidation_callback(self._on_category_invalidated)
            logger.info("StatisticsCache registered invalidation callback with backend")
        else:
            logger.warning(f"Backend {type(manager.backend).__name__} does not support invalidation callbacks")
    except Exception as e:
        logger.warning(f"Failed to register invalidation callback: {e}")
        logger.info("Statistics cache will rely on TTL-based expiration only")

_on_category_invalidated

_on_category_invalidated(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Invalidate statistics cache when model reference data changes.

Invalidates both grouped and ungrouped variants for the category.

Parameters:

Source code in src/horde_model_reference/analytics/statistics_cache.py
def _on_category_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Invalidate statistics cache when model reference data changes.

    Invalidates both grouped and ungrouped variants for the category.

    Args:
        category: The category that was invalidated.

    """
    logger.debug(f"Invalidating statistics cache for category: {category}")
    self.invalidate(category, grouped=None)  # Invalidate both variants

__new__

__new__() -> RedisCache[T]

Singleton pattern matching ModelReferenceManager.

Source code in src/horde_model_reference/analytics/base_cache.py
def __new__(cls) -> RedisCache[T]:
    """Singleton pattern matching ModelReferenceManager."""
    with cls._lock:
        if not cls._instance:
            cls._instance = super().__new__(cls)
            cls._instance._initialize()
        return cls._instance

_initialize

_initialize() -> None

Initialize caching infrastructure.

Source code in src/horde_model_reference/analytics/base_cache.py
def _initialize(self) -> None:
    """Initialize caching infrastructure."""
    self._cache = {}
    self._timestamps = {}
    self._redis_client = None
    self._redis_key_prefix = self._get_cache_key_prefix()

    logger.debug(f"Initializing {self.__class__.__name__} with TTL={self._get_ttl()}s")

    # Try to connect to Redis if configured
    if horde_model_reference_settings.redis.use_redis:
        try:
            import redis

            self._redis_client = redis.from_url(
                horde_model_reference_settings.redis.url,
                socket_timeout=horde_model_reference_settings.redis.socket_timeout,
                decode_responses=False,
            )
            self._redis_client.ping()
            logger.info(f"{self.__class__.__name__} using Redis for distributed caching")
        except Exception as e:
            logger.warning(f"Failed to connect to Redis for {self.__class__.__name__}: {e}")
            logger.info(f"{self.__class__.__name__} falling back to in-memory cache")
            self._redis_client = None
    else:
        logger.info(f"{self.__class__.__name__} using in-memory cache (Redis disabled)")

    # Register invalidation callback
    self._register_invalidation_callback()

_build_cache_key

_build_cache_key(
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> str

Build cache key from category and options.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category.

  • grouped (bool, default: False ) –

    Whether this is for grouped text models.

  • include_backend_variations (bool, default: False ) –

    Whether backend variations are included.

Returns:

  • str

    Cache key string.

Source code in src/horde_model_reference/analytics/base_cache.py
def _build_cache_key(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> str:
    """Build cache key from category and options.

    Args:
        category: The model reference category.
        grouped: Whether this is for grouped text models.
        include_backend_variations: Whether backend variations are included.

    Returns:
        Cache key string.

    """
    group_suffix = ":grouped=true" if grouped else ":grouped=false"
    variations_suffix = ":variations=true" if include_backend_variations else ""
    return f"{category.value}{group_suffix}{variations_suffix}"

_get_redis_key

_get_redis_key(cache_key: str) -> str

Generate Redis key from cache key.

Parameters:

  • cache_key (str) –

    The cache key (category + grouping state).

Returns:

  • str

    Full Redis key string with prefix.

Source code in src/horde_model_reference/analytics/base_cache.py
def _get_redis_key(self, cache_key: str) -> str:
    """Generate Redis key from cache key.

    Args:
        cache_key: The cache key (category + grouping state).

    Returns:
        Full Redis key string with prefix.

    """
    return f"{self._redis_key_prefix}:{cache_key}"

get

get(
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
    allow_stale: bool | None = None,
) -> T | None

Get cached result for a category.

Checks Redis first (if available), then in-memory cache.

When cache hydration is enabled (settings.cache_hydration_enabled=True) and allow_stale is True (or None with hydration enabled), implements stale-while-revalidate: - Returns cached data even if past normal TTL - Only returns None if data exceeds stale_ttl (default 1 hour) - Background hydration is expected to refresh data before stale_ttl

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category.

  • grouped (bool, default: False ) –

    Whether to get grouped text models variant.

  • include_backend_variations (bool, default: False ) –

    Whether backend variations are included.

  • allow_stale (bool | None, default: None ) –

    Whether to return stale data beyond normal TTL. If None, defaults to True when hydration is enabled, False otherwise.

Returns:

  • T | None

    Cached result or None if not cached or expired beyond stale TTL.

Source code in src/horde_model_reference/analytics/base_cache.py
def get(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
    allow_stale: bool | None = None,
) -> T | None:
    """Get cached result for a category.

    Checks Redis first (if available), then in-memory cache.

    When cache hydration is enabled (settings.cache_hydration_enabled=True) and
    allow_stale is True (or None with hydration enabled), implements stale-while-revalidate:
    - Returns cached data even if past normal TTL
    - Only returns None if data exceeds stale_ttl (default 1 hour)
    - Background hydration is expected to refresh data before stale_ttl

    Args:
        category: The model reference category.
        grouped: Whether to get grouped text models variant.
        include_backend_variations: Whether backend variations are included.
        allow_stale: Whether to return stale data beyond normal TTL.
            If None, defaults to True when hydration is enabled, False otherwise.

    Returns:
        Cached result or None if not cached or expired beyond stale TTL.

    """
    cache_key = self._build_cache_key(category, grouped, include_backend_variations)

    # Determine stale behavior
    hydration_enabled = horde_model_reference_settings.cache_hydration_enabled
    effective_allow_stale = allow_stale if allow_stale is not None else hydration_enabled
    stale_ttl = horde_model_reference_settings.cache_hydration_stale_ttl_seconds

    # Try Redis first
    if self._redis_client:
        try:
            redis_key = self._get_redis_key(cache_key)
            cached_bytes = self._redis_client.get(redis_key)

            if cached_bytes:
                model_class = self._get_model_class()
                cached_dict = json.loads(cached_bytes.decode("utf-8"))
                result = model_class(**cached_dict)
                logger.debug(f"{self.__class__.__name__} cache hit (Redis): {cache_key}")
                return result
        except Exception as e:
            logger.warning(f"Failed to get from Redis for {cache_key}: {e}")

    # Try in-memory cache
    with self._lock:
        if cache_key in self._cache:
            age = time.time() - self._timestamps.get(cache_key, 0)

            ttl = self._get_ttl()

            # Fresh data - always return
            if age < ttl:
                logger.debug(f"{self.__class__.__name__} cache hit (memory): {cache_key}")
                return self._cache[cache_key]

            # Stale data - return if stale allowed and within stale TTL
            if effective_allow_stale and age < stale_ttl:
                logger.debug(
                    f"{self.__class__.__name__} returning stale data (memory): {cache_key}, "
                    f"age={age:.1f}s (TTL={ttl}s, stale_ttl={stale_ttl}s)"
                )
                return self._cache[cache_key]

            # Data too old - remove and return None
            logger.debug(
                f"{self.__class__.__name__} cache expired (memory): {cache_key}, "
                f"age={age:.1f}s (stale_allowed={effective_allow_stale})"
            )
            self._cache.pop(cache_key, None)
            self._timestamps.pop(cache_key, None)

    logger.debug(f"{self.__class__.__name__} cache miss: {cache_key}")
    return None

is_fresh

is_fresh(
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> bool

Check if cached data is fresh (within normal TTL).

Useful for determining if background hydration should run.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category.

  • grouped (bool, default: False ) –

    Whether to check grouped text models variant.

  • include_backend_variations (bool, default: False ) –

    Whether backend variations are included.

Returns:

  • bool

    True if fresh data exists within TTL, False otherwise.

Source code in src/horde_model_reference/analytics/base_cache.py
def is_fresh(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> bool:
    """Check if cached data is fresh (within normal TTL).

    Useful for determining if background hydration should run.

    Args:
        category: The model reference category.
        grouped: Whether to check grouped text models variant.
        include_backend_variations: Whether backend variations are included.

    Returns:
        True if fresh data exists within TTL, False otherwise.

    """
    cache_key = self._build_cache_key(category, grouped, include_backend_variations)

    # Check Redis TTL
    if self._redis_client:
        try:
            redis_key = self._get_redis_key(cache_key)
            ttl = self._redis_client.ttl(redis_key)
            if ttl > 0:
                return True
        except Exception as e:
            logger.warning(f"Failed to check Redis TTL for {cache_key}: {e}")

    # Check in-memory
    with self._lock:
        if cache_key in self._cache:
            age = time.time() - self._timestamps.get(cache_key, 0)
            return age < self._get_ttl()

    return False

set

set(
    category: MODEL_REFERENCE_CATEGORY,
    result: T,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> None

Store result in cache.

Stores in both Redis (if available) and in-memory cache.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category.

  • result (T) –

    The computed result to cache.

  • grouped (bool, default: False ) –

    Whether this is the grouped text models variant.

  • include_backend_variations (bool, default: False ) –

    Whether backend variations are included.

Source code in src/horde_model_reference/analytics/base_cache.py
def set(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    result: T,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> None:
    """Store result in cache.

    Stores in both Redis (if available) and in-memory cache.

    Args:
        category: The model reference category.
        result: The computed result to cache.
        grouped: Whether this is the grouped text models variant.
        include_backend_variations: Whether backend variations are included.

    """
    cache_key = self._build_cache_key(category, grouped, include_backend_variations)

    # Store in Redis
    if self._redis_client:
        try:
            redis_key = self._get_redis_key(cache_key)
            serialized = result.model_dump_json()
            self._redis_client.setex(redis_key, self._get_ttl(), serialized)
            logger.debug(f"Stored in Redis: {cache_key}")
        except Exception as e:
            logger.warning(f"Failed to store in Redis for {cache_key}: {e}")

    # Store in-memory (always, as fallback)
    with self._lock:
        self._cache[cache_key] = result
        self._timestamps[cache_key] = time.time()
        logger.debug(f"Stored in memory: {cache_key}")

invalidate

invalidate(
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool | None = None,
    include_backend_variations: bool | None = None,
) -> None

Invalidate cached results for a category.

Removes from both Redis and in-memory cache. If grouped is None, invalidates all grouped/ungrouped variants. If include_backend_variations is None, invalidates all variation states.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category to invalidate.

  • grouped (bool | None, default: None ) –

    Whether to invalidate grouped variant (None = both).

  • include_backend_variations (bool | None, default: None ) –

    Whether to invalidate variation states (None = both).

Source code in src/horde_model_reference/analytics/base_cache.py
def invalidate(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool | None = None,
    include_backend_variations: bool | None = None,
) -> None:
    """Invalidate cached results for a category.

    Removes from both Redis and in-memory cache. If grouped is None,
    invalidates all grouped/ungrouped variants. If include_backend_variations
    is None, invalidates all variation states.

    Args:
        category: The model reference category to invalidate.
        grouped: Whether to invalidate grouped variant (None = both).
        include_backend_variations: Whether to invalidate variation states (None = both).

    """
    # Determine which variants to invalidate
    grouped_variants = [False, True] if grouped is None else [grouped]
    variations_variants = [False, True] if include_backend_variations is None else [include_backend_variations]

    for gv in grouped_variants:
        for vv in variations_variants:
            cache_key = self._build_cache_key(category, gv, vv)
            logger.debug(f"Invalidating cache: {cache_key}")

            # Invalidate Redis
            if self._redis_client:
                try:
                    redis_key = self._get_redis_key(cache_key)
                    deleted_count = self._redis_client.delete(redis_key)
                    if deleted_count > 0:
                        logger.debug(f"Deleted Redis key: {redis_key}")
                except Exception as e:
                    logger.warning(f"Failed to delete from Redis for {cache_key}: {e}")

            # Invalidate in-memory
            with self._lock:
                removed = self._cache.pop(cache_key, None) is not None
                self._timestamps.pop(cache_key, None)
                if removed:
                    logger.debug(f"Removed from memory cache: {cache_key}")

clear_all

clear_all() -> None

Clear all cached results.

Useful for testing or when a global cache reset is needed.

Source code in src/horde_model_reference/analytics/base_cache.py
def clear_all(self) -> None:
    """Clear all cached results.

    Useful for testing or when a global cache reset is needed.
    """
    logger.info(f"Clearing all {self.__class__.__name__} entries")

    # Clear Redis
    if self._redis_client:
        try:
            for category in MODEL_REFERENCE_CATEGORY:
                for grouped in [False, True]:
                    for variations in [False, True]:
                        cache_key = self._build_cache_key(category, grouped, variations)
                        redis_key = self._get_redis_key(cache_key)
                        self._redis_client.delete(redis_key)
            logger.debug("Cleared all Redis keys")
        except Exception as e:
            logger.warning(f"Failed to clear Redis cache: {e}")

    # Clear in-memory
    with self._lock:
        self._cache.clear()
        self._timestamps.clear()
        logger.debug("Cleared in-memory cache")

get_cache_info

get_cache_info() -> dict[
    str, int | float | bool | list[str]
]

Get information about the current cache state.

Returns:

Source code in src/horde_model_reference/analytics/base_cache.py
def get_cache_info(self) -> dict[str, int | float | bool | list[str]]:
    """Get information about the current cache state.

    Returns:
        Dictionary with cache statistics including size, Redis status, TTL.

    """
    with self._lock:
        return {
            "cache_size": len(self._cache),
            "redis_enabled": self._redis_client is not None,
            "ttl_seconds": self._get_ttl(),
            "keys_cached": list(self._cache.keys()),
        }