Skip to content

deletion_risk_cache

Caching layer for category deletion risk results with Redis support.

Provides a singleton cache for CategoryDeletionRiskResponse that integrates with the backend invalidation system. Automatically invalidates when model reference data changes.

DeletionRiskCache

Bases: RedisCache[CategoryDeletionRiskResponse]

Singleton cache for category deletion risk results.

Integrates with existing backend invalidation system to automatically clear deletion risk results when model data changes. Uses Redis for distributed caching when available, with in-memory fallback.

Inherits from RedisCache[CategoryDeletionRiskResponse] for common caching infrastructure.

Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
class DeletionRiskCache(RedisCache[CategoryDeletionRiskResponse]):
    """Singleton cache for category deletion risk results.

    Integrates with existing backend invalidation system to automatically
    clear deletion risk results when model data changes. Uses Redis for distributed
    caching when available, with in-memory fallback.

    Inherits from RedisCache[CategoryDeletionRiskResponse] for common caching infrastructure.
    """

    _instance: DeletionRiskCache | None = None

    def _get_cache_key_prefix(self) -> str:
        """Get the Redis key prefix for deletion risk cache.

        Returns:
            Redis key prefix string.

        """
        return f"{horde_model_reference_settings.redis.key_prefix}:deletion_risk"

    def _get_ttl(self) -> int:
        """Get the TTL in seconds for deletion risk cache entries.

        Returns:
            TTL in seconds from settings.

        """
        return horde_model_reference_settings.deletion_risk_cache_ttl

    def _get_model_class(self) -> type[CategoryDeletionRiskResponse]:
        """Get the Pydantic model class for deserialization.

        Returns:
            CategoryDeletionRiskResponse class.

        """
        from horde_model_reference.analytics.deletion_risk_analysis import CategoryDeletionRiskResponse

        return CategoryDeletionRiskResponse

    def _register_invalidation_callback(self) -> None:
        """Register callback with ModelReferenceManager backend for automatic invalidation."""
        try:
            from horde_model_reference import ModelReferenceManager

            manager = ModelReferenceManager()
            if hasattr(manager.backend, "register_invalidation_callback"):
                manager.backend.register_invalidation_callback(self._on_category_invalidated)
                logger.info("DeletionRiskCache registered invalidation callback with backend")
            else:
                logger.warning(f"Backend {type(manager.backend).__name__} does not support invalidation callbacks")
        except Exception as e:
            logger.warning(f"Failed to register invalidation callback: {e}")
            logger.info("Deletion risk cache will rely on TTL-based expiration only")

    def _on_category_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
        """Invalidate deletion risk cache when model reference data changes.

        Invalidates both grouped and ungrouped variants for the category.

        Args:
            category: The category that was invalidated.

        """
        logger.debug(f"Invalidating deletion risk cache for category: {category}")
        self.invalidate(category, grouped=None)  # Invalidate both variants

_instance class-attribute instance-attribute

_instance: DeletionRiskCache | None = None

_lock class-attribute instance-attribute

_lock: RLock = RLock()

_cache instance-attribute

_cache: dict[str, T]

_timestamps instance-attribute

_timestamps: dict[str, float]

_redis_client instance-attribute

_redis_client: Redis[bytes] | None

_redis_key_prefix instance-attribute

_redis_key_prefix: str

_get_cache_key_prefix

_get_cache_key_prefix() -> str

Get the Redis key prefix for deletion risk cache.

Returns:

  • str

    Redis key prefix string.

Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
def _get_cache_key_prefix(self) -> str:
    """Get the Redis key prefix for deletion risk cache.

    Returns:
        Redis key prefix string.

    """
    return f"{horde_model_reference_settings.redis.key_prefix}:deletion_risk"

_get_ttl

_get_ttl() -> int

Get the TTL in seconds for deletion risk cache entries.

Returns:

  • int

    TTL in seconds from settings.

Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
def _get_ttl(self) -> int:
    """Get the TTL in seconds for deletion risk cache entries.

    Returns:
        TTL in seconds from settings.

    """
    return horde_model_reference_settings.deletion_risk_cache_ttl

_get_model_class

_get_model_class() -> type[CategoryDeletionRiskResponse]

Get the Pydantic model class for deserialization.

Returns:

Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
def _get_model_class(self) -> type[CategoryDeletionRiskResponse]:
    """Get the Pydantic model class for deserialization.

    Returns:
        CategoryDeletionRiskResponse class.

    """
    from horde_model_reference.analytics.deletion_risk_analysis import CategoryDeletionRiskResponse

    return CategoryDeletionRiskResponse

_register_invalidation_callback

_register_invalidation_callback() -> None

Register callback with ModelReferenceManager backend for automatic invalidation.

Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
def _register_invalidation_callback(self) -> None:
    """Register callback with ModelReferenceManager backend for automatic invalidation."""
    try:
        from horde_model_reference import ModelReferenceManager

        manager = ModelReferenceManager()
        if hasattr(manager.backend, "register_invalidation_callback"):
            manager.backend.register_invalidation_callback(self._on_category_invalidated)
            logger.info("DeletionRiskCache registered invalidation callback with backend")
        else:
            logger.warning(f"Backend {type(manager.backend).__name__} does not support invalidation callbacks")
    except Exception as e:
        logger.warning(f"Failed to register invalidation callback: {e}")
        logger.info("Deletion risk cache will rely on TTL-based expiration only")

_on_category_invalidated

_on_category_invalidated(
    category: MODEL_REFERENCE_CATEGORY,
) -> None

Invalidate deletion risk cache when model reference data changes.

Invalidates both grouped and ungrouped variants for the category.

Parameters:

Source code in src/horde_model_reference/analytics/deletion_risk_cache.py
def _on_category_invalidated(self, category: MODEL_REFERENCE_CATEGORY) -> None:
    """Invalidate deletion risk cache when model reference data changes.

    Invalidates both grouped and ungrouped variants for the category.

    Args:
        category: The category that was invalidated.

    """
    logger.debug(f"Invalidating deletion risk cache for category: {category}")
    self.invalidate(category, grouped=None)  # Invalidate both variants

__new__

__new__() -> RedisCache[T]

Singleton pattern matching ModelReferenceManager.

Source code in src/horde_model_reference/analytics/base_cache.py
def __new__(cls) -> RedisCache[T]:
    """Singleton pattern matching ModelReferenceManager."""
    with cls._lock:
        if not cls._instance:
            cls._instance = super().__new__(cls)
            cls._instance._initialize()
        return cls._instance

_initialize

_initialize() -> None

Initialize caching infrastructure.

Source code in src/horde_model_reference/analytics/base_cache.py
def _initialize(self) -> None:
    """Initialize caching infrastructure."""
    self._cache = {}
    self._timestamps = {}
    self._redis_client = None
    self._redis_key_prefix = self._get_cache_key_prefix()

    logger.debug(f"Initializing {self.__class__.__name__} with TTL={self._get_ttl()}s")

    # Try to connect to Redis if configured
    if horde_model_reference_settings.redis.use_redis:
        try:
            import redis

            self._redis_client = redis.from_url(
                horde_model_reference_settings.redis.url,
                socket_timeout=horde_model_reference_settings.redis.socket_timeout,
                decode_responses=False,
            )
            self._redis_client.ping()
            logger.info(f"{self.__class__.__name__} using Redis for distributed caching")
        except Exception as e:
            logger.warning(f"Failed to connect to Redis for {self.__class__.__name__}: {e}")
            logger.info(f"{self.__class__.__name__} falling back to in-memory cache")
            self._redis_client = None
    else:
        logger.info(f"{self.__class__.__name__} using in-memory cache (Redis disabled)")

    # Register invalidation callback
    self._register_invalidation_callback()

_build_cache_key

_build_cache_key(
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> str

Build cache key from category and options.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category.

  • grouped (bool, default: False ) –

    Whether this is for grouped text models.

  • include_backend_variations (bool, default: False ) –

    Whether backend variations are included.

Returns:

  • str

    Cache key string.

Source code in src/horde_model_reference/analytics/base_cache.py
def _build_cache_key(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> str:
    """Build cache key from category and options.

    Args:
        category: The model reference category.
        grouped: Whether this is for grouped text models.
        include_backend_variations: Whether backend variations are included.

    Returns:
        Cache key string.

    """
    group_suffix = ":grouped=true" if grouped else ":grouped=false"
    variations_suffix = ":variations=true" if include_backend_variations else ""
    return f"{category.value}{group_suffix}{variations_suffix}"

_get_redis_key

_get_redis_key(cache_key: str) -> str

Generate Redis key from cache key.

Parameters:

  • cache_key (str) –

    The cache key (category + grouping state).

Returns:

  • str

    Full Redis key string with prefix.

Source code in src/horde_model_reference/analytics/base_cache.py
def _get_redis_key(self, cache_key: str) -> str:
    """Generate Redis key from cache key.

    Args:
        cache_key: The cache key (category + grouping state).

    Returns:
        Full Redis key string with prefix.

    """
    return f"{self._redis_key_prefix}:{cache_key}"

get

get(
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
    allow_stale: bool | None = None,
) -> T | None

Get cached result for a category.

Checks Redis first (if available), then in-memory cache.

When cache hydration is enabled (settings.cache_hydration_enabled=True) and allow_stale is True (or None with hydration enabled), implements stale-while-revalidate: - Returns cached data even if past normal TTL - Only returns None if data exceeds stale_ttl (default 1 hour) - Background hydration is expected to refresh data before stale_ttl

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category.

  • grouped (bool, default: False ) –

    Whether to get grouped text models variant.

  • include_backend_variations (bool, default: False ) –

    Whether backend variations are included.

  • allow_stale (bool | None, default: None ) –

    Whether to return stale data beyond normal TTL. If None, defaults to True when hydration is enabled, False otherwise.

Returns:

  • T | None

    Cached result or None if not cached or expired beyond stale TTL.

Source code in src/horde_model_reference/analytics/base_cache.py
def get(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
    allow_stale: bool | None = None,
) -> T | None:
    """Get cached result for a category.

    Checks Redis first (if available), then in-memory cache.

    When cache hydration is enabled (settings.cache_hydration_enabled=True) and
    allow_stale is True (or None with hydration enabled), implements stale-while-revalidate:
    - Returns cached data even if past normal TTL
    - Only returns None if data exceeds stale_ttl (default 1 hour)
    - Background hydration is expected to refresh data before stale_ttl

    Args:
        category: The model reference category.
        grouped: Whether to get grouped text models variant.
        include_backend_variations: Whether backend variations are included.
        allow_stale: Whether to return stale data beyond normal TTL.
            If None, defaults to True when hydration is enabled, False otherwise.

    Returns:
        Cached result or None if not cached or expired beyond stale TTL.

    """
    cache_key = self._build_cache_key(category, grouped, include_backend_variations)

    # Determine stale behavior
    hydration_enabled = horde_model_reference_settings.cache_hydration_enabled
    effective_allow_stale = allow_stale if allow_stale is not None else hydration_enabled
    stale_ttl = horde_model_reference_settings.cache_hydration_stale_ttl_seconds

    # Try Redis first
    if self._redis_client:
        try:
            redis_key = self._get_redis_key(cache_key)
            cached_bytes = self._redis_client.get(redis_key)

            if cached_bytes:
                model_class = self._get_model_class()
                cached_dict = json.loads(cached_bytes.decode("utf-8"))
                result = model_class(**cached_dict)
                logger.debug(f"{self.__class__.__name__} cache hit (Redis): {cache_key}")
                return result
        except Exception as e:
            logger.warning(f"Failed to get from Redis for {cache_key}: {e}")

    # Try in-memory cache
    with self._lock:
        if cache_key in self._cache:
            age = time.time() - self._timestamps.get(cache_key, 0)

            ttl = self._get_ttl()

            # Fresh data - always return
            if age < ttl:
                logger.debug(f"{self.__class__.__name__} cache hit (memory): {cache_key}")
                return self._cache[cache_key]

            # Stale data - return if stale allowed and within stale TTL
            if effective_allow_stale and age < stale_ttl:
                logger.debug(
                    f"{self.__class__.__name__} returning stale data (memory): {cache_key}, "
                    f"age={age:.1f}s (TTL={ttl}s, stale_ttl={stale_ttl}s)"
                )
                return self._cache[cache_key]

            # Data too old - remove and return None
            logger.debug(
                f"{self.__class__.__name__} cache expired (memory): {cache_key}, "
                f"age={age:.1f}s (stale_allowed={effective_allow_stale})"
            )
            self._cache.pop(cache_key, None)
            self._timestamps.pop(cache_key, None)

    logger.debug(f"{self.__class__.__name__} cache miss: {cache_key}")
    return None

is_fresh

is_fresh(
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> bool

Check if cached data is fresh (within normal TTL).

Useful for determining if background hydration should run.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category.

  • grouped (bool, default: False ) –

    Whether to check grouped text models variant.

  • include_backend_variations (bool, default: False ) –

    Whether backend variations are included.

Returns:

  • bool

    True if fresh data exists within TTL, False otherwise.

Source code in src/horde_model_reference/analytics/base_cache.py
def is_fresh(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> bool:
    """Check if cached data is fresh (within normal TTL).

    Useful for determining if background hydration should run.

    Args:
        category: The model reference category.
        grouped: Whether to check grouped text models variant.
        include_backend_variations: Whether backend variations are included.

    Returns:
        True if fresh data exists within TTL, False otherwise.

    """
    cache_key = self._build_cache_key(category, grouped, include_backend_variations)

    # Check Redis TTL
    if self._redis_client:
        try:
            redis_key = self._get_redis_key(cache_key)
            ttl = self._redis_client.ttl(redis_key)
            if ttl > 0:
                return True
        except Exception as e:
            logger.warning(f"Failed to check Redis TTL for {cache_key}: {e}")

    # Check in-memory
    with self._lock:
        if cache_key in self._cache:
            age = time.time() - self._timestamps.get(cache_key, 0)
            return age < self._get_ttl()

    return False

set

set(
    category: MODEL_REFERENCE_CATEGORY,
    result: T,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> None

Store result in cache.

Stores in both Redis (if available) and in-memory cache.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category.

  • result (T) –

    The computed result to cache.

  • grouped (bool, default: False ) –

    Whether this is the grouped text models variant.

  • include_backend_variations (bool, default: False ) –

    Whether backend variations are included.

Source code in src/horde_model_reference/analytics/base_cache.py
def set(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    result: T,
    grouped: bool = False,
    include_backend_variations: bool = False,
) -> None:
    """Store result in cache.

    Stores in both Redis (if available) and in-memory cache.

    Args:
        category: The model reference category.
        result: The computed result to cache.
        grouped: Whether this is the grouped text models variant.
        include_backend_variations: Whether backend variations are included.

    """
    cache_key = self._build_cache_key(category, grouped, include_backend_variations)

    # Store in Redis
    if self._redis_client:
        try:
            redis_key = self._get_redis_key(cache_key)
            serialized = result.model_dump_json()
            self._redis_client.setex(redis_key, self._get_ttl(), serialized)
            logger.debug(f"Stored in Redis: {cache_key}")
        except Exception as e:
            logger.warning(f"Failed to store in Redis for {cache_key}: {e}")

    # Store in-memory (always, as fallback)
    with self._lock:
        self._cache[cache_key] = result
        self._timestamps[cache_key] = time.time()
        logger.debug(f"Stored in memory: {cache_key}")

invalidate

invalidate(
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool | None = None,
    include_backend_variations: bool | None = None,
) -> None

Invalidate cached results for a category.

Removes from both Redis and in-memory cache. If grouped is None, invalidates all grouped/ungrouped variants. If include_backend_variations is None, invalidates all variation states.

Parameters:

  • category (MODEL_REFERENCE_CATEGORY) –

    The model reference category to invalidate.

  • grouped (bool | None, default: None ) –

    Whether to invalidate grouped variant (None = both).

  • include_backend_variations (bool | None, default: None ) –

    Whether to invalidate variation states (None = both).

Source code in src/horde_model_reference/analytics/base_cache.py
def invalidate(
    self,
    category: MODEL_REFERENCE_CATEGORY,
    grouped: bool | None = None,
    include_backend_variations: bool | None = None,
) -> None:
    """Invalidate cached results for a category.

    Removes from both Redis and in-memory cache. If grouped is None,
    invalidates all grouped/ungrouped variants. If include_backend_variations
    is None, invalidates all variation states.

    Args:
        category: The model reference category to invalidate.
        grouped: Whether to invalidate grouped variant (None = both).
        include_backend_variations: Whether to invalidate variation states (None = both).

    """
    # Determine which variants to invalidate
    grouped_variants = [False, True] if grouped is None else [grouped]
    variations_variants = [False, True] if include_backend_variations is None else [include_backend_variations]

    for gv in grouped_variants:
        for vv in variations_variants:
            cache_key = self._build_cache_key(category, gv, vv)
            logger.debug(f"Invalidating cache: {cache_key}")

            # Invalidate Redis
            if self._redis_client:
                try:
                    redis_key = self._get_redis_key(cache_key)
                    deleted_count = self._redis_client.delete(redis_key)
                    if deleted_count > 0:
                        logger.debug(f"Deleted Redis key: {redis_key}")
                except Exception as e:
                    logger.warning(f"Failed to delete from Redis for {cache_key}: {e}")

            # Invalidate in-memory
            with self._lock:
                removed = self._cache.pop(cache_key, None) is not None
                self._timestamps.pop(cache_key, None)
                if removed:
                    logger.debug(f"Removed from memory cache: {cache_key}")

clear_all

clear_all() -> None

Clear all cached results.

Useful for testing or when a global cache reset is needed.

Source code in src/horde_model_reference/analytics/base_cache.py
def clear_all(self) -> None:
    """Clear all cached results.

    Useful for testing or when a global cache reset is needed.
    """
    logger.info(f"Clearing all {self.__class__.__name__} entries")

    # Clear Redis
    if self._redis_client:
        try:
            for category in MODEL_REFERENCE_CATEGORY:
                for grouped in [False, True]:
                    for variations in [False, True]:
                        cache_key = self._build_cache_key(category, grouped, variations)
                        redis_key = self._get_redis_key(cache_key)
                        self._redis_client.delete(redis_key)
            logger.debug("Cleared all Redis keys")
        except Exception as e:
            logger.warning(f"Failed to clear Redis cache: {e}")

    # Clear in-memory
    with self._lock:
        self._cache.clear()
        self._timestamps.clear()
        logger.debug("Cleared in-memory cache")

get_cache_info

get_cache_info() -> dict[
    str, int | float | bool | list[str]
]

Get information about the current cache state.

Returns:

Source code in src/horde_model_reference/analytics/base_cache.py
def get_cache_info(self) -> dict[str, int | float | bool | list[str]]:
    """Get information about the current cache state.

    Returns:
        Dictionary with cache statistics including size, Redis status, TTL.

    """
    with self._lock:
        return {
            "cache_size": len(self._cache),
            "redis_enabled": self._redis_client is not None,
            "ttl_seconds": self._get_ttl(),
            "keys_cached": list(self._cache.keys()),
        }