Manages watch mode for continuously monitoring metadata changes.
Polls the PRIMARY v1 metadata /last_updated endpoint at regular intervals
and triggers sync operations when changes are detected.
Source code in src/horde_model_reference/sync/watch_mode.py
| class WatchModeManager:
"""Manages watch mode for continuously monitoring metadata changes.
Polls the PRIMARY v1 metadata /last_updated endpoint at regular intervals
and triggers sync operations when changes are detected.
"""
def __init__(
self,
*,
api_url: str,
sync_callback: Callable[[], int],
interval_seconds: int | None = None,
initial_delay_seconds: int | None = None,
enable_startup_sync: bool | None = None,
) -> None:
"""Initialize the watch mode manager.
Args:
api_url: Base URL of PRIMARY API (e.g., https://models.aihorde.net/api).
sync_callback: Function to call when changes are detected. Should return exit code (0 for success).
interval_seconds: Polling interval in seconds (default: from settings).
initial_delay_seconds: Initial delay before starting watch loop (default: from settings).
enable_startup_sync: Whether to run sync immediately on startup (default: from settings).
"""
self.api_url = api_url.rstrip("/")
self.sync_callback = sync_callback
self.interval_seconds = interval_seconds or github_sync_settings.watch_interval_seconds
self.initial_delay_seconds = initial_delay_seconds or github_sync_settings.watch_initial_delay_seconds
self.enable_startup_sync = (
enable_startup_sync if enable_startup_sync is not None else github_sync_settings.watch_enable_startup_sync
)
self.last_known_timestamp: int | None = None
self.running = False
self.consecutive_errors = 0
self.max_consecutive_errors = 10
def fetch_last_updated_timestamp(self) -> int | None:
"""Fetch the last_updated timestamp from PRIMARY metadata endpoint.
Returns:
Unix timestamp of last update, or None if fetch fails or no metadata exists.
Raises:
httpx.HTTPError: If the request fails.
"""
endpoint = f"{self.api_url}/model_references/v1/metadata/last_updated"
try:
for attempt in http_retry_sync(max_attempts=3, min_wait=1.0, max_wait=15.0):
with attempt, httpx.Client(timeout=30.0) as client:
response = client.get(endpoint)
if is_retryable_status_code(response.status_code):
raise RetryableHTTPStatusError(response)
response.raise_for_status()
data = response.json()
timestamp: int | None = data.get("last_updated")
if timestamp is None:
logger.debug("Metadata endpoint returned null timestamp (no metadata exists yet)")
else:
logger.debug(f"Fetched last_updated timestamp: {timestamp}")
return timestamp
except (RetryError, RetryableHTTPStatusError) as e:
logger.error(f"Failed to fetch metadata after retries: {e}")
raise
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching metadata: {e.response.status_code} - {e}")
raise
except httpx.HTTPError as e:
logger.error(f"Network error fetching metadata: {e}")
raise
def check_for_changes(self) -> bool:
"""Check if metadata has changed since last check.
Returns:
True if changes were detected (or first run), False otherwise.
"""
try:
current_timestamp = self.fetch_last_updated_timestamp()
# First run - initialize but don't trigger sync unless enable_startup_sync is True
if self.last_known_timestamp is None:
self.last_known_timestamp = current_timestamp
logger.info(f"Initialized last known timestamp: {current_timestamp}")
# Return True only if startup sync is enabled
return self.enable_startup_sync
# No metadata exists yet
if current_timestamp is None:
logger.debug("No metadata available yet")
return False
# Check for changes
if current_timestamp > self.last_known_timestamp:
logger.info(
f"Changes detected! Timestamp changed from {self.last_known_timestamp} to {current_timestamp}"
)
self.last_known_timestamp = current_timestamp
return True
logger.debug(f"No changes detected (timestamp: {current_timestamp})")
return False
except Exception as e:
self.consecutive_errors += 1
logger.error(
f"Error checking for changes (error {self.consecutive_errors}/{self.max_consecutive_errors}): {e}"
)
if self.consecutive_errors >= self.max_consecutive_errors:
logger.critical(
f"Exceeded maximum consecutive errors ({self.max_consecutive_errors}). "
"Check network connectivity and PRIMARY API availability."
)
return False
return False
def run(self) -> int:
"""Run the watch mode loop.
Continuously monitors metadata changes and triggers sync operations.
Returns:
Exit code (0 for success, 1 for failure).
"""
self.running = True
logger.info("=" * 80)
logger.info("GitHub Model Reference Sync Service - WATCH MODE")
logger.info("=" * 80)
logger.info(f"Monitoring PRIMARY API: {self.api_url}")
logger.info(f"Polling interval: {self.interval_seconds} seconds")
logger.info(f"Initial delay: {self.initial_delay_seconds} seconds")
logger.info(f"Startup sync: {'enabled' if self.enable_startup_sync else 'disabled'}")
logger.info("=" * 80)
# Initial delay if configured
if self.initial_delay_seconds > 0:
logger.info(f"Waiting {self.initial_delay_seconds} seconds before starting watch loop...")
time.sleep(self.initial_delay_seconds)
logger.info("Starting watch loop. Press Ctrl+C to stop.")
logger.info("-" * 80)
watch_iteration = 0
last_status_time = time.time()
status_update_interval = 300 # Show status update every 5 minutes
while self.running:
watch_iteration += 1
logger.debug(f"Watch iteration {watch_iteration}")
try:
# Check for changes
has_changes = self.check_for_changes()
# Reset error counter on successful check
if self.consecutive_errors > 0:
logger.info("Connection restored after errors")
self.consecutive_errors = 0
# Trigger sync if changes detected
if has_changes:
logger.info("Triggering sync operation due to detected changes...")
logger.info("-" * 80)
try:
exit_code = self.sync_callback()
if exit_code == 0:
logger.success("Sync completed successfully")
else:
logger.error(f"Sync failed with exit code {exit_code}")
except Exception as e:
logger.error(f"Sync operation raised an exception: {e}")
logger.info("-" * 80)
logger.info(f"Waiting for changes every {self.interval_seconds} seconds...")
except Exception as e:
logger.error(f"Unexpected error in watch loop: {e}")
self.consecutive_errors += 1
if self.consecutive_errors >= self.max_consecutive_errors:
logger.critical("Too many consecutive errors. Exiting watch mode.")
return 1
# Sleep until next check (if still running)
if self.running:
current_time = time.time()
time_since_last_status = current_time - last_status_time
# Show periodic status update every 5 minutes (or on first iteration after sync)
if time_since_last_status >= status_update_interval or watch_iteration == 1:
logger.info(
f"Still watching for changes every {self.interval_seconds} "
f"seconds (iteration {watch_iteration})..."
)
last_status_time = current_time
else:
logger.debug(f"Sleeping for {self.interval_seconds} seconds until next check...")
try:
time.sleep(self.interval_seconds)
except KeyboardInterrupt:
logger.info("\nKeyboard interrupt detected during sleep. Shutting down...")
self.running = False
logger.info("=" * 80)
logger.info("Watch mode stopped")
logger.info("=" * 80)
return 0
|
api_url
instance-attribute
sync_callback
instance-attribute
sync_callback = sync_callback
interval_seconds
instance-attribute
interval_seconds = (
interval_seconds or watch_interval_seconds
)
initial_delay_seconds
instance-attribute
initial_delay_seconds = (
initial_delay_seconds or watch_initial_delay_seconds
)
enable_startup_sync
instance-attribute
enable_startup_sync = (
enable_startup_sync
if enable_startup_sync is not None
else watch_enable_startup_sync
)
last_known_timestamp
instance-attribute
last_known_timestamp: int | None = None
running
instance-attribute
consecutive_errors
instance-attribute
max_consecutive_errors
instance-attribute
max_consecutive_errors = 10
__init__
__init__(
*,
api_url: str,
sync_callback: Callable[[], int],
interval_seconds: int | None = None,
initial_delay_seconds: int | None = None,
enable_startup_sync: bool | None = None,
) -> None
Initialize the watch mode manager.
Parameters:
-
api_url
(str)
–
Base URL of PRIMARY API (e.g., https://models.aihorde.net/api).
-
sync_callback
(Callable[[], int])
–
Function to call when changes are detected. Should return exit code (0 for success).
-
interval_seconds
(int | None, default:
None
)
–
Polling interval in seconds (default: from settings).
-
initial_delay_seconds
(int | None, default:
None
)
–
Initial delay before starting watch loop (default: from settings).
-
enable_startup_sync
(bool | None, default:
None
)
–
Whether to run sync immediately on startup (default: from settings).
Source code in src/horde_model_reference/sync/watch_mode.py
| def __init__(
self,
*,
api_url: str,
sync_callback: Callable[[], int],
interval_seconds: int | None = None,
initial_delay_seconds: int | None = None,
enable_startup_sync: bool | None = None,
) -> None:
"""Initialize the watch mode manager.
Args:
api_url: Base URL of PRIMARY API (e.g., https://models.aihorde.net/api).
sync_callback: Function to call when changes are detected. Should return exit code (0 for success).
interval_seconds: Polling interval in seconds (default: from settings).
initial_delay_seconds: Initial delay before starting watch loop (default: from settings).
enable_startup_sync: Whether to run sync immediately on startup (default: from settings).
"""
self.api_url = api_url.rstrip("/")
self.sync_callback = sync_callback
self.interval_seconds = interval_seconds or github_sync_settings.watch_interval_seconds
self.initial_delay_seconds = initial_delay_seconds or github_sync_settings.watch_initial_delay_seconds
self.enable_startup_sync = (
enable_startup_sync if enable_startup_sync is not None else github_sync_settings.watch_enable_startup_sync
)
self.last_known_timestamp: int | None = None
self.running = False
self.consecutive_errors = 0
self.max_consecutive_errors = 10
|
fetch_last_updated_timestamp
fetch_last_updated_timestamp() -> int | None
Fetch the last_updated timestamp from PRIMARY metadata endpoint.
Returns:
-
int | None
–
Unix timestamp of last update, or None if fetch fails or no metadata exists.
Raises:
Source code in src/horde_model_reference/sync/watch_mode.py
| def fetch_last_updated_timestamp(self) -> int | None:
"""Fetch the last_updated timestamp from PRIMARY metadata endpoint.
Returns:
Unix timestamp of last update, or None if fetch fails or no metadata exists.
Raises:
httpx.HTTPError: If the request fails.
"""
endpoint = f"{self.api_url}/model_references/v1/metadata/last_updated"
try:
for attempt in http_retry_sync(max_attempts=3, min_wait=1.0, max_wait=15.0):
with attempt, httpx.Client(timeout=30.0) as client:
response = client.get(endpoint)
if is_retryable_status_code(response.status_code):
raise RetryableHTTPStatusError(response)
response.raise_for_status()
data = response.json()
timestamp: int | None = data.get("last_updated")
if timestamp is None:
logger.debug("Metadata endpoint returned null timestamp (no metadata exists yet)")
else:
logger.debug(f"Fetched last_updated timestamp: {timestamp}")
return timestamp
except (RetryError, RetryableHTTPStatusError) as e:
logger.error(f"Failed to fetch metadata after retries: {e}")
raise
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching metadata: {e.response.status_code} - {e}")
raise
except httpx.HTTPError as e:
logger.error(f"Network error fetching metadata: {e}")
raise
|
check_for_changes
check_for_changes() -> bool
Check if metadata has changed since last check.
Returns:
-
bool
–
True if changes were detected (or first run), False otherwise.
Source code in src/horde_model_reference/sync/watch_mode.py
| def check_for_changes(self) -> bool:
"""Check if metadata has changed since last check.
Returns:
True if changes were detected (or first run), False otherwise.
"""
try:
current_timestamp = self.fetch_last_updated_timestamp()
# First run - initialize but don't trigger sync unless enable_startup_sync is True
if self.last_known_timestamp is None:
self.last_known_timestamp = current_timestamp
logger.info(f"Initialized last known timestamp: {current_timestamp}")
# Return True only if startup sync is enabled
return self.enable_startup_sync
# No metadata exists yet
if current_timestamp is None:
logger.debug("No metadata available yet")
return False
# Check for changes
if current_timestamp > self.last_known_timestamp:
logger.info(
f"Changes detected! Timestamp changed from {self.last_known_timestamp} to {current_timestamp}"
)
self.last_known_timestamp = current_timestamp
return True
logger.debug(f"No changes detected (timestamp: {current_timestamp})")
return False
except Exception as e:
self.consecutive_errors += 1
logger.error(
f"Error checking for changes (error {self.consecutive_errors}/{self.max_consecutive_errors}): {e}"
)
if self.consecutive_errors >= self.max_consecutive_errors:
logger.critical(
f"Exceeded maximum consecutive errors ({self.max_consecutive_errors}). "
"Check network connectivity and PRIMARY API availability."
)
return False
return False
|
run
Run the watch mode loop.
Continuously monitors metadata changes and triggers sync operations.
Returns:
-
int
–
Exit code (0 for success, 1 for failure).
Source code in src/horde_model_reference/sync/watch_mode.py
| def run(self) -> int:
"""Run the watch mode loop.
Continuously monitors metadata changes and triggers sync operations.
Returns:
Exit code (0 for success, 1 for failure).
"""
self.running = True
logger.info("=" * 80)
logger.info("GitHub Model Reference Sync Service - WATCH MODE")
logger.info("=" * 80)
logger.info(f"Monitoring PRIMARY API: {self.api_url}")
logger.info(f"Polling interval: {self.interval_seconds} seconds")
logger.info(f"Initial delay: {self.initial_delay_seconds} seconds")
logger.info(f"Startup sync: {'enabled' if self.enable_startup_sync else 'disabled'}")
logger.info("=" * 80)
# Initial delay if configured
if self.initial_delay_seconds > 0:
logger.info(f"Waiting {self.initial_delay_seconds} seconds before starting watch loop...")
time.sleep(self.initial_delay_seconds)
logger.info("Starting watch loop. Press Ctrl+C to stop.")
logger.info("-" * 80)
watch_iteration = 0
last_status_time = time.time()
status_update_interval = 300 # Show status update every 5 minutes
while self.running:
watch_iteration += 1
logger.debug(f"Watch iteration {watch_iteration}")
try:
# Check for changes
has_changes = self.check_for_changes()
# Reset error counter on successful check
if self.consecutive_errors > 0:
logger.info("Connection restored after errors")
self.consecutive_errors = 0
# Trigger sync if changes detected
if has_changes:
logger.info("Triggering sync operation due to detected changes...")
logger.info("-" * 80)
try:
exit_code = self.sync_callback()
if exit_code == 0:
logger.success("Sync completed successfully")
else:
logger.error(f"Sync failed with exit code {exit_code}")
except Exception as e:
logger.error(f"Sync operation raised an exception: {e}")
logger.info("-" * 80)
logger.info(f"Waiting for changes every {self.interval_seconds} seconds...")
except Exception as e:
logger.error(f"Unexpected error in watch loop: {e}")
self.consecutive_errors += 1
if self.consecutive_errors >= self.max_consecutive_errors:
logger.critical("Too many consecutive errors. Exiting watch mode.")
return 1
# Sleep until next check (if still running)
if self.running:
current_time = time.time()
time_since_last_status = current_time - last_status_time
# Show periodic status update every 5 minutes (or on first iteration after sync)
if time_since_last_status >= status_update_interval or watch_iteration == 1:
logger.info(
f"Still watching for changes every {self.interval_seconds} "
f"seconds (iteration {watch_iteration})..."
)
last_status_time = current_time
else:
logger.debug(f"Sleeping for {self.interval_seconds} seconds until next check...")
try:
time.sleep(self.interval_seconds)
except KeyboardInterrupt:
logger.info("\nKeyboard interrupt detected during sleep. Shutting down...")
self.running = False
logger.info("=" * 80)
logger.info("Watch mode stopped")
logger.info("=" * 80)
return 0
|