Shared HTTP retry utilities and circuit breaker state for the AI Horde integration.
Uses tenacity for retry logic with full jitter exponential backoff. Provides both
sync and async retry decorator factories, structured retry logging, and a circuit
breaker that tracks degraded connectivity to the external AI Horde API.
TRANSIENT_HTTP_EXCEPTIONS
module-attribute
TRANSIENT_HTTP_EXCEPTIONS: tuple[type[Exception], ...] = (
TimeoutException,
ConnectError,
RemoteProtocolError,
)
Network-level exceptions that are always worth retrying.
horde_api_circuit_breaker
module-attribute
horde_api_circuit_breaker = HordeAPICircuitBreaker()
Global circuit breaker for the external AI Horde API.
Import this from any module that calls the Horde API to check/update state.
RetryableHTTPStatusError
Bases: Exception
Raised when an HTTP response has a retryable status code (5xx, 429).
Wraps the original httpx response so callers can inspect it after retries are exhausted.
Source code in src/horde_model_reference/http_retry.py
| class RetryableHTTPStatusError(Exception):
"""Raised when an HTTP response has a retryable status code (5xx, 429).
Wraps the original httpx response so callers can inspect it after retries are exhausted.
"""
def __init__(self, response: httpx.Response) -> None:
"""Wrap an httpx response with a retryable status code."""
self.response = response
super().__init__(f"HTTP {response.status_code} from {response.url}")
|
response
instance-attribute
__init__
__init__(response: Response) -> None
Wrap an httpx response with a retryable status code.
Source code in src/horde_model_reference/http_retry.py
| def __init__(self, response: httpx.Response) -> None:
"""Wrap an httpx response with a retryable status code."""
self.response = response
super().__init__(f"HTTP {response.status_code} from {response.url}")
|
_CircuitState
dataclass
Internal mutable state for the circuit breaker.
Source code in src/horde_model_reference/http_retry.py
| @dataclass
class _CircuitState:
"""Internal mutable state for the circuit breaker."""
consecutive_failures: int = 0
last_failure_time: float = 0.0
last_success_time: float = 0.0
is_open: bool = False
lock: RLock = field(default_factory=RLock)
|
consecutive_failures
class-attribute
instance-attribute
consecutive_failures: int = 0
last_failure_time
class-attribute
instance-attribute
last_failure_time: float = 0.0
last_success_time
class-attribute
instance-attribute
last_success_time: float = 0.0
is_open
class-attribute
instance-attribute
lock
class-attribute
instance-attribute
lock: RLock = field(default_factory=RLock)
__init__
__init__(
consecutive_failures: int = 0,
last_failure_time: float = 0.0,
last_success_time: float = 0.0,
is_open: bool = False,
lock: RLock = RLock(),
) -> None
HordeAPICircuitBreaker
Lightweight circuit breaker for the external AI Horde API.
States
CLOSED - normal operation, requests go through.
OPEN - too many consecutive failures; requests are short-circuited
for cooldown_seconds. After cooldown, a single probe
request is allowed (half-open). On success the circuit
closes; on failure it stays open.
The breaker exposes is_degraded and seconds_until_retry for the
/heartbeat endpoint and log messages on hot paths.
Source code in src/horde_model_reference/http_retry.py
| class HordeAPICircuitBreaker:
"""Lightweight circuit breaker for the external AI Horde API.
States:
CLOSED - normal operation, requests go through.
OPEN - too many consecutive failures; requests are short-circuited
for ``cooldown_seconds``. After cooldown, a single probe
request is allowed (half-open). On success the circuit
closes; on failure it stays open.
The breaker exposes ``is_degraded`` and ``seconds_until_retry`` for the
``/heartbeat`` endpoint and log messages on hot paths.
"""
def __init__(
self,
*,
failure_threshold: int = 5,
cooldown_seconds: float = 120.0,
) -> None:
"""Initialize circuit breaker with failure threshold and cooldown."""
self._failure_threshold = failure_threshold
self._cooldown_seconds = cooldown_seconds
self._state = _CircuitState()
@property
def is_degraded(self) -> bool:
"""True when the circuit is open (AI Horde unreachable)."""
with self._state.lock:
if not self._state.is_open:
return False
# Auto-transition to half-open after cooldown
if self._cooldown_elapsed():
return True # still degraded, but will allow a probe
return True
@property
def seconds_until_retry(self) -> float | None:
"""Seconds remaining before the next probe attempt, or None if not degraded."""
with self._state.lock:
if not self._state.is_open:
return None
remaining = self._cooldown_seconds - (time.monotonic() - self._state.last_failure_time)
return max(0.0, remaining)
@property
def consecutive_failures(self) -> int:
"""Number of consecutive failures recorded."""
with self._state.lock:
return self._state.consecutive_failures
def should_allow_request(self) -> bool:
"""Return True if a request should proceed (circuit closed or half-open probe)."""
with self._state.lock:
if not self._state.is_open:
return True
if self._cooldown_elapsed():
logger.info(
"AI Horde circuit breaker: cooldown elapsed, allowing probe request "
f"(failures={self._state.consecutive_failures})"
)
return True
return False
def record_success(self) -> None:
"""Record a successful request, closing the circuit if it was open."""
with self._state.lock:
was_open = self._state.is_open
self._state.consecutive_failures = 0
self._state.is_open = False
self._state.last_success_time = time.monotonic()
if was_open:
logger.info("AI Horde circuit breaker: CLOSED (service recovered)")
def record_failure(self) -> None:
"""Record a failed request, potentially opening the circuit."""
with self._state.lock:
self._state.consecutive_failures += 1
self._state.last_failure_time = time.monotonic()
if not self._state.is_open and self._state.consecutive_failures >= self._failure_threshold:
self._state.is_open = True
logger.error(
f"AI Horde circuit breaker: OPEN after {self._state.consecutive_failures} consecutive failures. "
f"Requests will be short-circuited for {self._cooldown_seconds:.0f}s."
)
def get_status_dict(self) -> dict[str, Any]:
"""Return a dict suitable for inclusion in the /heartbeat response."""
with self._state.lock:
return {
"degraded": self._state.is_open,
"consecutive_failures": self._state.consecutive_failures,
"seconds_until_retry": round(self.seconds_until_retry, 1) if self.seconds_until_retry else None,
}
def _cooldown_elapsed(self) -> bool:
return (time.monotonic() - self._state.last_failure_time) >= self._cooldown_seconds
|
_failure_threshold
instance-attribute
_failure_threshold = failure_threshold
_cooldown_seconds
instance-attribute
_cooldown_seconds = cooldown_seconds
_state
instance-attribute
is_degraded
property
True when the circuit is open (AI Horde unreachable).
seconds_until_retry
property
seconds_until_retry: float | None
Seconds remaining before the next probe attempt, or None if not degraded.
consecutive_failures
property
consecutive_failures: int
Number of consecutive failures recorded.
__init__
__init__(
*,
failure_threshold: int = 5,
cooldown_seconds: float = 120.0,
) -> None
Initialize circuit breaker with failure threshold and cooldown.
Source code in src/horde_model_reference/http_retry.py
| def __init__(
self,
*,
failure_threshold: int = 5,
cooldown_seconds: float = 120.0,
) -> None:
"""Initialize circuit breaker with failure threshold and cooldown."""
self._failure_threshold = failure_threshold
self._cooldown_seconds = cooldown_seconds
self._state = _CircuitState()
|
should_allow_request
should_allow_request() -> bool
Return True if a request should proceed (circuit closed or half-open probe).
Source code in src/horde_model_reference/http_retry.py
| def should_allow_request(self) -> bool:
"""Return True if a request should proceed (circuit closed or half-open probe)."""
with self._state.lock:
if not self._state.is_open:
return True
if self._cooldown_elapsed():
logger.info(
"AI Horde circuit breaker: cooldown elapsed, allowing probe request "
f"(failures={self._state.consecutive_failures})"
)
return True
return False
|
record_success
Record a successful request, closing the circuit if it was open.
Source code in src/horde_model_reference/http_retry.py
| def record_success(self) -> None:
"""Record a successful request, closing the circuit if it was open."""
with self._state.lock:
was_open = self._state.is_open
self._state.consecutive_failures = 0
self._state.is_open = False
self._state.last_success_time = time.monotonic()
if was_open:
logger.info("AI Horde circuit breaker: CLOSED (service recovered)")
|
record_failure
Record a failed request, potentially opening the circuit.
Source code in src/horde_model_reference/http_retry.py
| def record_failure(self) -> None:
"""Record a failed request, potentially opening the circuit."""
with self._state.lock:
self._state.consecutive_failures += 1
self._state.last_failure_time = time.monotonic()
if not self._state.is_open and self._state.consecutive_failures >= self._failure_threshold:
self._state.is_open = True
logger.error(
f"AI Horde circuit breaker: OPEN after {self._state.consecutive_failures} consecutive failures. "
f"Requests will be short-circuited for {self._cooldown_seconds:.0f}s."
)
|
get_status_dict
get_status_dict() -> dict[str, Any]
Return a dict suitable for inclusion in the /heartbeat response.
Source code in src/horde_model_reference/http_retry.py
| def get_status_dict(self) -> dict[str, Any]:
"""Return a dict suitable for inclusion in the /heartbeat response."""
with self._state.lock:
return {
"degraded": self._state.is_open,
"consecutive_failures": self._state.consecutive_failures,
"seconds_until_retry": round(self.seconds_until_retry, 1) if self.seconds_until_retry else None,
}
|
_cooldown_elapsed
_cooldown_elapsed() -> bool
Source code in src/horde_model_reference/http_retry.py
| def _cooldown_elapsed(self) -> bool:
return (time.monotonic() - self._state.last_failure_time) >= self._cooldown_seconds
|
is_retryable_status_code
is_retryable_status_code(status_code: int) -> bool
Return True if the HTTP status code suggests a transient server-side issue.
Source code in src/horde_model_reference/http_retry.py
| def is_retryable_status_code(status_code: int) -> bool:
"""Return True if the HTTP status code suggests a transient server-side issue."""
return status_code >= 500 or status_code == 429
|
_log_retry
_log_retry(retry_state: RetryCallState) -> None
Emit a structured log line before each retry attempt.
Source code in src/horde_model_reference/http_retry.py
| def _log_retry(retry_state: RetryCallState) -> None:
"""Emit a structured log line before each retry attempt."""
outcome = retry_state.outcome
exc = outcome.exception() if outcome else None
wait = retry_state.next_action.sleep if retry_state.next_action else 0
logger.warning(
"HTTP retry | attempt={attempt} | wait={wait:.2f}s | error={error}",
attempt=retry_state.attempt_number,
wait=wait,
error=str(exc) if exc else "unknown",
)
|
http_retry_sync
http_retry_sync(
*,
max_attempts: int = 3,
min_wait: float = 0.5,
max_wait: float = 10.0,
extra_exceptions: tuple[type[Exception], ...] = (),
) -> Retrying
Create a synchronous tenacity Retrying context manager.
Usage::
for attempt in http_retry_sync():
with attempt:
response = httpx.get(url)
if is_retryable_status_code(response.status_code):
raise RetryableHTTPStatusError(response)
Parameters:
-
max_attempts
(int, default:
3
)
–
Maximum number of attempts before giving up.
-
min_wait
(float, default:
0.5
)
–
Minimum wait time for jittered exponential backoff.
-
max_wait
(float, default:
10.0
)
–
Maximum wait time for jittered exponential backoff.
-
extra_exceptions
(tuple[type[Exception], ...], default:
()
)
–
Additional exception types to retry on beyond the defaults.
Source code in src/horde_model_reference/http_retry.py
| def http_retry_sync(
*,
max_attempts: int = 3,
min_wait: float = 0.5,
max_wait: float = 10.0,
extra_exceptions: tuple[type[Exception], ...] = (),
) -> Retrying:
"""Create a synchronous tenacity Retrying context manager.
Usage::
for attempt in http_retry_sync():
with attempt:
response = httpx.get(url)
if is_retryable_status_code(response.status_code):
raise RetryableHTTPStatusError(response)
Args:
max_attempts: Maximum number of attempts before giving up.
min_wait: Minimum wait time for jittered exponential backoff.
max_wait: Maximum wait time for jittered exponential backoff.
extra_exceptions: Additional exception types to retry on beyond the defaults.
"""
return Retrying(
stop=stop_after_attempt(max_attempts),
wait=wait_random_exponential(multiplier=0.5, min=min_wait, max=max_wait),
retry=retry_if_exception_type(TRANSIENT_HTTP_EXCEPTIONS + extra_exceptions + (RetryableHTTPStatusError,)),
before_sleep=_log_retry,
reraise=True,
)
|
http_retry_async
http_retry_async(
*,
max_attempts: int = 3,
min_wait: float = 0.5,
max_wait: float = 10.0,
extra_exceptions: tuple[type[Exception], ...] = (),
) -> AsyncRetrying
Create an asynchronous tenacity AsyncRetrying context manager.
Usage::
async for attempt in http_retry_async():
with attempt:
response = await client.get(url)
if is_retryable_status_code(response.status_code):
raise RetryableHTTPStatusError(response)
Parameters:
-
max_attempts
(int, default:
3
)
–
Maximum number of attempts before giving up.
-
min_wait
(float, default:
0.5
)
–
Minimum wait time for jittered exponential backoff.
-
max_wait
(float, default:
10.0
)
–
Maximum wait time for jittered exponential backoff.
-
extra_exceptions
(tuple[type[Exception], ...], default:
()
)
–
Additional exception types to retry on beyond the defaults.
Source code in src/horde_model_reference/http_retry.py
| def http_retry_async(
*,
max_attempts: int = 3,
min_wait: float = 0.5,
max_wait: float = 10.0,
extra_exceptions: tuple[type[Exception], ...] = (),
) -> AsyncRetrying:
"""Create an asynchronous tenacity AsyncRetrying context manager.
Usage::
async for attempt in http_retry_async():
with attempt:
response = await client.get(url)
if is_retryable_status_code(response.status_code):
raise RetryableHTTPStatusError(response)
Args:
max_attempts: Maximum number of attempts before giving up.
min_wait: Minimum wait time for jittered exponential backoff.
max_wait: Maximum wait time for jittered exponential backoff.
extra_exceptions: Additional exception types to retry on beyond the defaults.
"""
return AsyncRetrying(
stop=stop_after_attempt(max_attempts),
wait=wait_random_exponential(multiplier=0.5, min=min_wait, max=max_wait),
retry=retry_if_exception_type(TRANSIENT_HTTP_EXCEPTIONS + extra_exceptions + (RetryableHTTPStatusError,)),
before_sleep=_log_retry,
reraise=True,
)
|