Skip to content

writer

Audit trail writer that appends structured events to segment files on disk.

DEFAULT_MAX_FILE_SIZE_BYTES module-attribute

DEFAULT_MAX_FILE_SIZE_BYTES = 5 * 1024 * 1024

_AUDIT_FILENAME_PATTERN module-attribute

_AUDIT_FILENAME_PATTERN = compile('audit-(\\d{6})\\.jsonl')

AuditTrailWriter

Append-only audit writer with size-based log rotation.

Source code in src/horde_model_reference/audit/writer.py
class AuditTrailWriter:
    """Append-only audit writer with size-based log rotation."""

    _root_path: Path
    _max_file_size_bytes: int
    _lock: RLock
    _state_path: Path
    _last_event_id: int

    def __init__(self, *, root_path: Path, max_file_size_bytes: int = DEFAULT_MAX_FILE_SIZE_BYTES) -> None:
        """Initialize the writer with a root directory and rotation threshold."""
        self._root_path = root_path
        self._root_path.mkdir(parents=True, exist_ok=True)
        self._max_file_size_bytes = max_file_size_bytes
        self._lock = RLock()
        self._state_path = self._root_path / "index.json"
        self._last_event_id = self._load_last_event_id()

    def append_event(
        self,
        *,
        domain: CanonicalFormat,
        category: str,
        model_name: str,
        operation: AuditOperation,
        logical_user_id: str,
        payload: AuditPayload | None = None,
        request_id: str | None = None,
        timestamp: int | None = None,
    ) -> AuditEvent:
        """Append a new audit event, returning the persisted object."""
        with self._lock:
            event_id = self._allocate_event_id()
            event = AuditEvent.new(
                event_id=event_id,
                domain=domain,
                category=category,
                model_name=model_name,
                operation=operation,
                logical_user_id=logical_user_id,
                payload=payload,
                request_id=request_id,
                timestamp=timestamp,
            )
            segment_path = self._resolve_segment_path(domain=domain, category=category)
            self._write_line(segment_path, event)
            return event

    def _allocate_event_id(self) -> int:
        self._last_event_id += 1
        atomic_write_json(self._state_path, {"last_event_id": self._last_event_id})
        return self._last_event_id

    def _load_last_event_id(self) -> int:
        if not self._state_path.exists():
            return 0
        try:
            data = json.loads(self._state_path.read_text() or "{}")
        except json.JSONDecodeError as exc:  # pragma: no cover - defensive
            logger.warning(f"Unable to parse audit index file {self._state_path}: {exc}")
            return 0
        return int(data.get("last_event_id", 0))

    def _resolve_segment_path(self, *, domain: CanonicalFormat, category: str) -> Path:
        category_dir: Path = self._root_path / domain.value / category
        category_dir.mkdir(parents=True, exist_ok=True)
        segments = sorted(category_dir.glob("audit-*.jsonl"))
        if not segments:
            return category_dir / "audit-000001.jsonl"
        latest = segments[-1]
        if latest.stat().st_size >= self._max_file_size_bytes:
            next_index = _extract_segment_index(latest) + 1
            return category_dir / f"audit-{next_index:06d}.jsonl"
        return latest

    def _write_line(self, path: Path, event: AuditEvent) -> None:
        serialized = json.dumps(
            event.model_dump(mode="json", exclude_none=True),
            separators=(",", ":"),
            ensure_ascii=True,
        )
        with path.open("a", encoding="utf-8") as handle:
            handle.write(serialized)
            handle.write("\n")

_root_path instance-attribute

_root_path: Path = root_path

_max_file_size_bytes instance-attribute

_max_file_size_bytes: int = max_file_size_bytes

_lock instance-attribute

_lock: RLock = RLock()

_state_path instance-attribute

_state_path: Path = _root_path / 'index.json'

_last_event_id instance-attribute

_last_event_id: int = _load_last_event_id()

__init__

__init__(
    *,
    root_path: Path,
    max_file_size_bytes: int = DEFAULT_MAX_FILE_SIZE_BYTES,
) -> None

Initialize the writer with a root directory and rotation threshold.

Source code in src/horde_model_reference/audit/writer.py
def __init__(self, *, root_path: Path, max_file_size_bytes: int = DEFAULT_MAX_FILE_SIZE_BYTES) -> None:
    """Initialize the writer with a root directory and rotation threshold."""
    self._root_path = root_path
    self._root_path.mkdir(parents=True, exist_ok=True)
    self._max_file_size_bytes = max_file_size_bytes
    self._lock = RLock()
    self._state_path = self._root_path / "index.json"
    self._last_event_id = self._load_last_event_id()

append_event

append_event(
    *,
    domain: CanonicalFormat,
    category: str,
    model_name: str,
    operation: AuditOperation,
    logical_user_id: str,
    payload: AuditPayload | None = None,
    request_id: str | None = None,
    timestamp: int | None = None,
) -> AuditEvent

Append a new audit event, returning the persisted object.

Source code in src/horde_model_reference/audit/writer.py
def append_event(
    self,
    *,
    domain: CanonicalFormat,
    category: str,
    model_name: str,
    operation: AuditOperation,
    logical_user_id: str,
    payload: AuditPayload | None = None,
    request_id: str | None = None,
    timestamp: int | None = None,
) -> AuditEvent:
    """Append a new audit event, returning the persisted object."""
    with self._lock:
        event_id = self._allocate_event_id()
        event = AuditEvent.new(
            event_id=event_id,
            domain=domain,
            category=category,
            model_name=model_name,
            operation=operation,
            logical_user_id=logical_user_id,
            payload=payload,
            request_id=request_id,
            timestamp=timestamp,
        )
        segment_path = self._resolve_segment_path(domain=domain, category=category)
        self._write_line(segment_path, event)
        return event

_allocate_event_id

_allocate_event_id() -> int
Source code in src/horde_model_reference/audit/writer.py
def _allocate_event_id(self) -> int:
    self._last_event_id += 1
    atomic_write_json(self._state_path, {"last_event_id": self._last_event_id})
    return self._last_event_id

_load_last_event_id

_load_last_event_id() -> int
Source code in src/horde_model_reference/audit/writer.py
def _load_last_event_id(self) -> int:
    if not self._state_path.exists():
        return 0
    try:
        data = json.loads(self._state_path.read_text() or "{}")
    except json.JSONDecodeError as exc:  # pragma: no cover - defensive
        logger.warning(f"Unable to parse audit index file {self._state_path}: {exc}")
        return 0
    return int(data.get("last_event_id", 0))

_resolve_segment_path

_resolve_segment_path(
    *, domain: CanonicalFormat, category: str
) -> Path
Source code in src/horde_model_reference/audit/writer.py
def _resolve_segment_path(self, *, domain: CanonicalFormat, category: str) -> Path:
    category_dir: Path = self._root_path / domain.value / category
    category_dir.mkdir(parents=True, exist_ok=True)
    segments = sorted(category_dir.glob("audit-*.jsonl"))
    if not segments:
        return category_dir / "audit-000001.jsonl"
    latest = segments[-1]
    if latest.stat().st_size >= self._max_file_size_bytes:
        next_index = _extract_segment_index(latest) + 1
        return category_dir / f"audit-{next_index:06d}.jsonl"
    return latest

_write_line

_write_line(path: Path, event: AuditEvent) -> None
Source code in src/horde_model_reference/audit/writer.py
def _write_line(self, path: Path, event: AuditEvent) -> None:
    serialized = json.dumps(
        event.model_dump(mode="json", exclude_none=True),
        separators=(",", ":"),
        ensure_ascii=True,
    )
    with path.open("a", encoding="utf-8") as handle:
        handle.write(serialized)
        handle.write("\n")

_extract_segment_index

_extract_segment_index(path: Path) -> int
Source code in src/horde_model_reference/audit/writer.py
def _extract_segment_index(path: Path) -> int:
    match = _AUDIT_FILENAME_PATTERN.match(path.name)
    if not match:
        return 1
    return int(match.group(1))