Skip to content

router

Shared pending queue router builder.

WriteGuard module-attribute

WriteGuard = Callable[[ModelReferenceManager], None]

StatusesQuery module-attribute

StatusesQuery = Annotated[
    list[PendingChangeStatus] | None, Query()
]

CategoriesQuery module-attribute

CategoriesQuery = Annotated[
    list[MODEL_REFERENCE_CATEGORY] | None, Query()
]

BatchIdQuery module-attribute

BatchIdQuery = Annotated[int | None, Query(ge=1)]

ModelNameQuery module-attribute

ModelNameQuery = Annotated[
    str | None, Query(min_length=1, max_length=200)
]

OffsetQuery module-attribute

OffsetQuery = Annotated[int, Query(ge=0)]

LimitQuery module-attribute

LimitQuery = Annotated[int, Query(ge=1, le=500)]

RequestedByQuery module-attribute

RequestedByQuery = Annotated[list[str] | None, Query()]

PendingBatchRequest

Bases: BaseModel

Request payload used to approve/reject queued changes.

Source code in src/horde_model_reference/service/pending_queue/router.py
class PendingBatchRequest(BaseModel):
    """Request payload used to approve/reject queued changes."""

    batch_title: str = Field(min_length=1, max_length=120)
    approved_ids: list[int] | None = None
    rejected_ids: list[int] | None = None
    reject_reason: str | None = Field(default=None, max_length=500)

    @model_validator(mode="after")
    def _validate_payload(self) -> PendingBatchRequest:
        approved = {change_id for change_id in (self.approved_ids or []) if change_id > 0}
        rejected = {change_id for change_id in (self.rejected_ids or []) if change_id > 0}

        if not approved and not rejected:
            raise ValueError("Provide at least one approved or rejected change id.")

        overlap = approved & rejected
        if overlap:
            raise ValueError(f"Change ids cannot be both approved and rejected: {sorted(overlap)}")

        if rejected and (self.reject_reason is None or not self.reject_reason.strip()):
            raise ValueError("reject_reason is required when rejecting changes.")

        self.approved_ids = sorted(approved)
        self.rejected_ids = sorted(rejected)
        self.batch_title = self.batch_title.strip()
        if not self.batch_title:
            raise ValueError("batch_title must not be blank.")
        if self.reject_reason:
            self.reject_reason = self.reject_reason.strip()
        return self

batch_title class-attribute instance-attribute

batch_title: str = Field(min_length=1, max_length=120)

approved_ids class-attribute instance-attribute

approved_ids: list[int] | None = None

rejected_ids class-attribute instance-attribute

rejected_ids: list[int] | None = None

reject_reason class-attribute instance-attribute

reject_reason: str | None = Field(
    default=None, max_length=500
)

_validate_payload

_validate_payload() -> PendingBatchRequest
Source code in src/horde_model_reference/service/pending_queue/router.py
@model_validator(mode="after")
def _validate_payload(self) -> PendingBatchRequest:
    approved = {change_id for change_id in (self.approved_ids or []) if change_id > 0}
    rejected = {change_id for change_id in (self.rejected_ids or []) if change_id > 0}

    if not approved and not rejected:
        raise ValueError("Provide at least one approved or rejected change id.")

    overlap = approved & rejected
    if overlap:
        raise ValueError(f"Change ids cannot be both approved and rejected: {sorted(overlap)}")

    if rejected and (self.reject_reason is None or not self.reject_reason.strip()):
        raise ValueError("reject_reason is required when rejecting changes.")

    self.approved_ids = sorted(approved)
    self.rejected_ids = sorted(rejected)
    self.batch_title = self.batch_title.strip()
    if not self.batch_title:
        raise ValueError("batch_title must not be blank.")
    if self.reject_reason:
        self.reject_reason = self.reject_reason.strip()
    return self

ApplyPendingChangeRequest

Bases: BaseModel

Request payload for applying an approved change.

Source code in src/horde_model_reference/service/pending_queue/router.py
class ApplyPendingChangeRequest(BaseModel):
    """Request payload for applying an approved change."""

    job_id: str | None = Field(default=None, max_length=120)

    @field_validator("job_id")
    @classmethod
    def _normalize_job_id(cls, job_id: str | None) -> str | None:
        if job_id is None:
            return None
        normalized = job_id.strip()
        return normalized or None

job_id class-attribute instance-attribute

job_id: str | None = Field(default=None, max_length=120)

_normalize_job_id classmethod

_normalize_job_id(job_id: str | None) -> str | None
Source code in src/horde_model_reference/service/pending_queue/router.py
@field_validator("job_id")
@classmethod
def _normalize_job_id(cls, job_id: str | None) -> str | None:
    if job_id is None:
        return None
    normalized = job_id.strip()
    return normalized or None

ApplyPendingChangesRequest

Bases: BaseModel

Request payload for applying multiple approved changes.

Source code in src/horde_model_reference/service/pending_queue/router.py
class ApplyPendingChangesRequest(BaseModel):
    """Request payload for applying multiple approved changes."""

    change_ids: list[int] = Field(min_length=1)
    job_id: str | None = Field(default=None, max_length=120)
    allow_mixed_batch: bool = Field(
        default=False,
        description="If False, all changes must belong to the same batch",
    )

    @field_validator("job_id")
    @classmethod
    def _normalize_job_id(cls, job_id: str | None) -> str | None:
        if job_id is None:
            return None
        normalized = job_id.strip()
        return normalized or None

    @model_validator(mode="after")
    def _validate_change_ids(self) -> ApplyPendingChangesRequest:
        if not self.change_ids:
            raise ValueError("change_ids must include at least one id")

        deduped: list[int] = []
        seen: set[int] = set()
        for change_id in self.change_ids:
            if change_id <= 0:
                raise ValueError("change_ids must be positive integers")
            if change_id not in seen:
                deduped.append(change_id)
                seen.add(change_id)

        self.change_ids = deduped
        return self

change_ids class-attribute instance-attribute

change_ids: list[int] = Field(min_length=1)

job_id class-attribute instance-attribute

job_id: str | None = Field(default=None, max_length=120)

allow_mixed_batch class-attribute instance-attribute

allow_mixed_batch: bool = Field(
    default=False,
    description="If False, all changes must belong to the same batch",
)

_normalize_job_id classmethod

_normalize_job_id(job_id: str | None) -> str | None
Source code in src/horde_model_reference/service/pending_queue/router.py
@field_validator("job_id")
@classmethod
def _normalize_job_id(cls, job_id: str | None) -> str | None:
    if job_id is None:
        return None
    normalized = job_id.strip()
    return normalized or None

_validate_change_ids

_validate_change_ids() -> ApplyPendingChangesRequest
Source code in src/horde_model_reference/service/pending_queue/router.py
@model_validator(mode="after")
def _validate_change_ids(self) -> ApplyPendingChangesRequest:
    if not self.change_ids:
        raise ValueError("change_ids must include at least one id")

    deduped: list[int] = []
    seen: set[int] = set()
    for change_id in self.change_ids:
        if change_id <= 0:
            raise ValueError("change_ids must be positive integers")
        if change_id not in seen:
            deduped.append(change_id)
            seen.add(change_id)

    self.change_ids = deduped
    return self

ApplyPendingChangesResponse

Bases: BaseModel

Response payload summarizing a bulk apply attempt.

Batch Split Semantics: When a partial apply occurs (some changes in a batch are applied while others remain), the remaining APPROVED changes are automatically reassigned to a new batch ID. This information is provided in the batch_split_* fields to help clients update their UI.

Source code in src/horde_model_reference/service/pending_queue/router.py
class ApplyPendingChangesResponse(BaseModel):
    """Response payload summarizing a bulk apply attempt.

    Batch Split Semantics:
    When a partial apply occurs (some changes in a batch are applied while others remain),
    the remaining APPROVED changes are automatically reassigned to a new batch ID. This
    information is provided in the batch_split_* fields to help clients update their UI.
    """

    applied: list[PendingChangeRecord] = Field(default_factory=list)
    failed_change_id: int | None = None
    failed_error: str | None = None
    failed_error_type: str | None = None

    # Batch split information (populated when partial apply triggers reassignment)
    batch_split_occurred: bool = Field(
        default=False,
        description="True if applying changes triggered a batch split (remaining changes reassigned)",
    )
    batch_split_original_batch_id: int | None = Field(
        default=None,
        description="The original batch ID that was partially applied",
    )
    batch_split_new_batch_id: int | None = Field(
        default=None,
        description="The new batch ID assigned to remaining unapplied changes",
    )
    batch_split_reassigned_count: int | None = Field(
        default=None,
        description="Number of changes that were reassigned to the new batch",
    )

applied class-attribute instance-attribute

applied: list[PendingChangeRecord] = Field(
    default_factory=list
)

failed_change_id class-attribute instance-attribute

failed_change_id: int | None = None

failed_error class-attribute instance-attribute

failed_error: str | None = None

failed_error_type class-attribute instance-attribute

failed_error_type: str | None = None

batch_split_occurred class-attribute instance-attribute

batch_split_occurred: bool = Field(
    default=False,
    description="True if applying changes triggered a batch split (remaining changes reassigned)",
)

batch_split_original_batch_id class-attribute instance-attribute

batch_split_original_batch_id: int | None = Field(
    default=None,
    description="The original batch ID that was partially applied",
)

batch_split_new_batch_id class-attribute instance-attribute

batch_split_new_batch_id: int | None = Field(
    default=None,
    description="The new batch ID assigned to remaining unapplied changes",
)

batch_split_reassigned_count class-attribute instance-attribute

batch_split_reassigned_count: int | None = Field(
    default=None,
    description="Number of changes that were reassigned to the new batch",
)

ApplySingleChangeResponse

Bases: BaseModel

Response payload for applying a single pending change.

Includes the updated record and batch split information when applicable.

Source code in src/horde_model_reference/service/pending_queue/router.py
class ApplySingleChangeResponse(BaseModel):
    """Response payload for applying a single pending change.

    Includes the updated record and batch split information when applicable.
    """

    record: PendingChangeRecord = Field(
        description="The applied pending change record with updated status",
    )
    batch_split_occurred: bool = Field(
        default=False,
        description="True if applying this change triggered a batch split",
    )
    batch_split_original_batch_id: int | None = Field(
        default=None,
        description="The original batch ID that was partially applied",
    )
    batch_split_new_batch_id: int | None = Field(
        default=None,
        description="The new batch ID assigned to remaining unapplied changes",
    )
    batch_split_reassigned_count: int | None = Field(
        default=None,
        description="Number of changes that were reassigned to the new batch",
    )

record class-attribute instance-attribute

record: PendingChangeRecord = Field(
    description="The applied pending change record with updated status"
)

batch_split_occurred class-attribute instance-attribute

batch_split_occurred: bool = Field(
    default=False,
    description="True if applying this change triggered a batch split",
)

batch_split_original_batch_id class-attribute instance-attribute

batch_split_original_batch_id: int | None = Field(
    default=None,
    description="The original batch ID that was partially applied",
)

batch_split_new_batch_id class-attribute instance-attribute

batch_split_new_batch_id: int | None = Field(
    default=None,
    description="The new batch ID assigned to remaining unapplied changes",
)

batch_split_reassigned_count class-attribute instance-attribute

batch_split_reassigned_count: int | None = Field(
    default=None,
    description="Number of changes that were reassigned to the new batch",
)

PurgePendingChangesRequest

Bases: BaseModel

Request payload to purge pending changes matching a filter.

Source code in src/horde_model_reference/service/pending_queue/router.py
class PurgePendingChangesRequest(BaseModel):
    """Request payload to purge pending changes matching a filter."""

    statuses: list[PendingChangeStatus] | None = None
    categories: list[MODEL_REFERENCE_CATEGORY] | None = None
    model_name: str | None = Field(default=None, max_length=200)
    requested_by: list[str] | None = None
    purge_all: bool = False
    dry_run: bool = False

    @model_validator(mode="after")
    def _validate_payload(self) -> PurgePendingChangesRequest:
        statuses = set(self.statuses or [])
        categories = set(self.categories or [])
        requested_by = {user_id.strip() for user_id in self.requested_by or [] if user_id and user_id.strip()}
        model_name = self.model_name.strip() if self.model_name else None

        has_filters = bool(statuses or categories or requested_by or model_name)
        if not self.purge_all and not has_filters:
            raise ValueError("Provide at least one filter or set purge_all=true to clear the entire queue.")

        self.statuses = sorted(statuses)
        self.categories = sorted(categories)
        self.requested_by = sorted(requested_by) if requested_by else None
        self.model_name = model_name
        return self

statuses class-attribute instance-attribute

statuses: list[PendingChangeStatus] | None = None

categories class-attribute instance-attribute

categories: list[MODEL_REFERENCE_CATEGORY] | None = None

model_name class-attribute instance-attribute

model_name: str | None = Field(default=None, max_length=200)

requested_by class-attribute instance-attribute

requested_by: list[str] | None = None

purge_all class-attribute instance-attribute

purge_all: bool = False

dry_run class-attribute instance-attribute

dry_run: bool = False

_validate_payload

_validate_payload() -> PurgePendingChangesRequest
Source code in src/horde_model_reference/service/pending_queue/router.py
@model_validator(mode="after")
def _validate_payload(self) -> PurgePendingChangesRequest:
    statuses = set(self.statuses or [])
    categories = set(self.categories or [])
    requested_by = {user_id.strip() for user_id in self.requested_by or [] if user_id and user_id.strip()}
    model_name = self.model_name.strip() if self.model_name else None

    has_filters = bool(statuses or categories or requested_by or model_name)
    if not self.purge_all and not has_filters:
        raise ValueError("Provide at least one filter or set purge_all=true to clear the entire queue.")

    self.statuses = sorted(statuses)
    self.categories = sorted(categories)
    self.requested_by = sorted(requested_by) if requested_by else None
    self.model_name = model_name
    return self

PurgePendingChangesResponse

Bases: BaseModel

Response payload for a purge operation.

Source code in src/horde_model_reference/service/pending_queue/router.py
class PurgePendingChangesResponse(BaseModel):
    """Response payload for a purge operation."""

    removed_count: int = Field(ge=0)
    removed_change_ids: list[int] = Field(default_factory=list)
    dry_run: bool = False

removed_count class-attribute instance-attribute

removed_count: int = Field(ge=0)

removed_change_ids class-attribute instance-attribute

removed_change_ids: list[int] = Field(default_factory=list)

dry_run class-attribute instance-attribute

dry_run: bool = False

_status_for_apply_error

_status_for_apply_error(
    error: PendingChangeApplyError,
) -> int
Source code in src/horde_model_reference/service/pending_queue/router.py
def _status_for_apply_error(error: PendingChangeApplyError) -> int:
    if isinstance(error, PendingChangeNotFoundError):
        return status.HTTP_404_NOT_FOUND
    if isinstance(error, (PendingChangeStateError, PendingChangePayloadError)):
        return status.HTTP_400_BAD_REQUEST
    if isinstance(error, PendingChangeBackendError):
        return status.HTTP_503_SERVICE_UNAVAILABLE
    return status.HTTP_400_BAD_REQUEST

_assert_approver async

_assert_approver(apikey: str) -> tuple[str, str]
Source code in src/horde_model_reference/service/pending_queue/router.py
async def _assert_approver(apikey: str) -> tuple[str, str]:
    approver = await authenticate_queue_approver(apikey)
    return approver.user_id, approver.username

build_pending_queue_router

build_pending_queue_router(
    *, tags: Sequence[str], assert_write_enabled: WriteGuard
) -> APIRouter

Create a pending queue router whose guards can differ per API version.

Source code in src/horde_model_reference/service/pending_queue/router.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def build_pending_queue_router(*, tags: Sequence[str], assert_write_enabled: WriteGuard) -> APIRouter:
    """Create a pending queue router whose guards can differ per API version."""
    router = APIRouter(prefix="/pending_queue", tags=list(tags))

    @router.get(
        "/changes",
        response_model=PendingQueuePage,
        summary="List pending queue entries",
        responses={
            200: {"description": "Filtered queue entries"},
            401: {"description": "Invalid API key", "model": ErrorResponse},
            503: {"description": "Pending queue disabled", "model": ErrorResponse},
        },
    )
    async def list_pending_changes(
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
        statuses: StatusesQuery = None,
        categories: CategoriesQuery = None,
        batch_id: BatchIdQuery = None,
        model_name: ModelNameQuery = None,
        requested_by: RequestedByQuery = None,
        offset: OffsetQuery = 0,
        limit: LimitQuery = 50,
    ) -> PendingQueuePage:
        """Return a filtered, paginated list of pending queue entries."""
        await _assert_approver(apikey)
        assert_write_enabled(manager)
        queue_service = require_pending_queue_service(manager)

        normalized_name = model_name.strip() if model_name else None
        normalized_requestors = {value.strip() for value in requested_by or [] if value and value.strip()}
        queue_filter = PendingQueueFilter(
            statuses=set(statuses) if statuses else None,
            categories=set(categories) if categories else None,
            batch_id=batch_id,
            model_name=normalized_name,
            requested_by=normalized_requestors or None,
        )

        return queue_service.list_changes(queue_filter=queue_filter, offset=offset, limit=limit)

    @router.post(
        "/purge",
        response_model=PurgePendingChangesResponse,
        summary="Purge pending changes matching filters",
        responses={
            200: {"description": "Filtered changes removed"},
            400: {"description": "Invalid purge request", "model": ErrorResponse},
            401: {"description": "Invalid API key", "model": ErrorResponse},
        },
    )
    async def purge_pending_changes(
        request: PurgePendingChangesRequest,
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
    ) -> PurgePendingChangesResponse:
        """Delete queued changes in bulk, optionally as a dry run."""
        approver_id, approver_username = await _assert_approver(apikey)
        assert_write_enabled(manager)
        queue_service = require_pending_queue_service(manager)

        queue_filter = PendingQueueFilter(
            statuses=set(request.statuses) if request.statuses else None,
            categories=set(request.categories) if request.categories else None,
            model_name=request.model_name,
            requested_by=set(request.requested_by) if request.requested_by else None,
        )
        has_filter = bool(
            queue_filter.statuses or queue_filter.categories or queue_filter.model_name or queue_filter.requested_by
        )
        active_filter = queue_filter if has_filter else None

        if request.dry_run:
            page = queue_service.list_changes(queue_filter=active_filter, offset=0, limit=None)
            return PurgePendingChangesResponse(
                removed_count=page.total,
                removed_change_ids=[record.change_id for record in page.items],
                dry_run=True,
            )

        removed = queue_service.purge_changes(
            queue_filter=None if request.purge_all and not has_filter else active_filter,
            purged_by=approver_id,
            purged_username=approver_username,
        )

        return PurgePendingChangesResponse(
            removed_count=len(removed),
            removed_change_ids=[record.change_id for record in removed],
            dry_run=False,
        )

    @router.get(
        "/changes/{change_id}",
        response_model=PendingChangeRecord,
        summary="Get a single pending change",
        responses={
            200: {"description": "Pending change details"},
            401: {"description": "Invalid API key", "model": ErrorResponse},
            404: {"description": "Change not found", "model": ErrorResponse},
        },
    )
    async def read_pending_change(
        change_id: int,
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
    ) -> PendingChangeRecord:
        """Return details for a single pending change."""
        await _assert_approver(apikey)
        queue_service = require_pending_queue_service(manager)
        record = queue_service.get_change(change_id)
        if record is None:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Change not found")
        return record

    @router.get(
        "/changes/{change_id}/diff",
        response_model=PendingChangeDiff,
        summary="Get diff for a pending change",
        responses={
            200: {"description": "Diff between current and proposed state"},
            401: {"description": "Invalid API key", "model": ErrorResponse},
            404: {"description": "Change not found", "model": ErrorResponse},
        },
    )
    async def get_pending_change_diff(
        change_id: int,
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
    ) -> PendingChangeDiff:
        """Return a detailed diff for a pending change.

        Compares the pending change payload against the current model state
        in the backend to show exactly what would change if applied.

        For UPDATE operations, returns field-level diffs showing added,
        removed, and modified fields. For CREATE/DELETE operations, shows
        the full proposed/current state respectively.
        """
        await _assert_approver(apikey)
        queue_service = require_pending_queue_service(manager)
        diff_service = PendingChangeDiffService(manager=manager, queue_service=queue_service)

        diff = diff_service.compute_change_diff(change_id)
        if diff is None:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Change not found")
        return diff

    @router.get(
        "/changes/diff",
        response_model=PendingChangeDiffPage,
        summary="Get diffs for multiple pending changes",
        responses={
            200: {"description": "Diffs for requested changes"},
            400: {"description": "Invalid request", "model": ErrorResponse},
            401: {"description": "Invalid API key", "model": ErrorResponse},
        },
    )
    async def get_pending_changes_diffs(
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
        change_ids: Annotated[list[int], Query(min_length=1, max_length=100)],
    ) -> PendingChangeDiffPage:
        """Return diffs for multiple pending changes in bulk.

        Accepts a list of change IDs and returns diffs for each. Changes
        that cannot be found or diffed are reported in the errors array.
        """
        await _assert_approver(apikey)
        queue_service = require_pending_queue_service(manager)
        diff_service = PendingChangeDiffService(manager=manager, queue_service=queue_service)

        return diff_service.compute_bulk_diffs(change_ids)

    @router.post(
        "/batches",
        response_model=PendingBatchResult,
        summary="Approve or reject queued changes",
        status_code=status.HTTP_200_OK,
        responses={
            200: {"description": "Batch processed"},
            400: {"description": "Invalid batch request", "model": ErrorResponse},
            401: {"description": "Invalid API key", "model": ErrorResponse},
        },
    )
    async def process_pending_batch(
        request: PendingBatchRequest,
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
    ) -> PendingBatchResult:
        """Approve and/or reject a set of pending changes in one batch."""
        approver_id, approver_username = await _assert_approver(apikey)
        assert_write_enabled(manager)
        queue_service = require_pending_queue_service(manager)

        try:
            return queue_service.process_batch(
                approver_id=approver_id,
                approver_username=approver_username,
                batch_title=request.batch_title,
                approved_ids=request.approved_ids,
                rejected_ids=request.rejected_ids,
                reject_reason=request.reject_reason,
            )
        except ValueError as exc:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc

    @router.post(
        "/changes/{change_id}/apply",
        response_model=ApplySingleChangeResponse,
        summary="Apply an approved change to the backend",
        status_code=status.HTTP_200_OK,
        responses={
            200: {"description": "Change applied"},
            400: {"description": "Change not ready for apply", "model": ErrorResponse},
            401: {"description": "Invalid API key", "model": ErrorResponse},
            404: {"description": "Change not found", "model": ErrorResponse},
            503: {"description": "Writes not supported", "model": ErrorResponse},
        },
    )
    async def apply_pending_change_endpoint(
        change_id: int,
        request: ApplyPendingChangeRequest,
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
    ) -> JSONResponse:
        """Apply an approved pending change and mark it as applied."""
        approver_id, approver_username = await _assert_approver(apikey)
        assert_write_enabled(manager)
        queue_service = require_pending_queue_service(manager)

        try:
            result = apply_pending_change(
                manager=manager,
                queue_service=queue_service,
                change_id=change_id,
                applied_by=approver_id,
                applied_username=approver_username,
                job_id=request.job_id,
            )
        except PendingChangeNotFoundError as exc:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
        except PendingChangeStateError as exc:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
        except PendingChangePayloadError as exc:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
        except PendingChangeBackendError as exc:
            raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc
        except PendingChangeApplyError as exc:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc

        # Build response with batch split information
        response_data: dict[str, object] = {
            "record": result.record.model_dump(mode="json", exclude_none=True),
            "batch_split_occurred": result.batch_split is not None,
        }
        if result.batch_split is not None:
            response_data["batch_split_original_batch_id"] = result.batch_split.original_batch_id
            response_data["batch_split_new_batch_id"] = result.batch_split.new_batch_id
            response_data["batch_split_reassigned_count"] = len(result.batch_split.reassigned_change_ids)

        return JSONResponse(
            status_code=status.HTTP_200_OK,
            content=response_data,
        )

    @router.post(
        "/apply",
        response_model=ApplyPendingChangesResponse,
        summary="Apply multiple approved changes",
        status_code=status.HTTP_200_OK,
        responses={
            200: {"description": "All requested changes applied"},
            400: {"description": "Invalid request or change state"},
            401: {"description": "Invalid API key", "model": ErrorResponse},
            404: {"description": "One of the change ids was not found"},
            503: {"description": "Writes not supported or backend failure"},
        },
    )
    async def apply_pending_changes_endpoint(
        request: ApplyPendingChangesRequest,
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
    ) -> JSONResponse:
        """Apply a batch of approved pending changes sequentially."""
        approver_id, approver_username = await _assert_approver(apikey)
        assert_write_enabled(manager)
        queue_service = require_pending_queue_service(manager)

        try:
            result = apply_pending_changes(
                manager=manager,
                queue_service=queue_service,
                change_ids=request.change_ids,
                applied_by=approver_id,
                applied_username=approver_username,
                job_id=request.job_id,
                enforce_batch_cohesion=not request.allow_mixed_batch,
            )
        except PendingChangeNotFoundError as exc:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
        except ValueError as exc:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc

        response_payload = ApplyPendingChangesResponse(
            applied=result.applied_records,
            failed_change_id=result.failed_change_id,
            failed_error=str(result.failed_error) if result.failed_error else None,
            failed_error_type=type(result.failed_error).__name__ if result.failed_error else None,
            batch_split_occurred=result.batch_split_occurred,
            batch_split_original_batch_id=result.batch_split_original_batch_id,
            batch_split_new_batch_id=result.batch_split_new_batch_id,
            batch_split_reassigned_count=result.batch_split_reassigned_count,
        )

        status_code = status.HTTP_200_OK
        if result.failed_error is not None:
            status_code = _status_for_apply_error(result.failed_error)

        return JSONResponse(
            status_code=status_code,
            content=response_payload.model_dump(mode="json", exclude_none=True),
        )

    @router.post(
        "/apply_batch/{batch_id}",
        response_model=ApplyPendingChangesResponse,
        summary="Apply all approved changes in a specific batch",
        status_code=status.HTTP_200_OK,
        responses={
            200: {"description": "All approved changes in batch applied"},
            400: {"description": "Invalid batch or change state"},
            401: {"description": "Invalid API key", "model": ErrorResponse},
            404: {"description": "Batch not found or has no approved changes"},
            503: {"description": "Writes not supported or backend failure"},
        },
    )
    async def apply_batch_endpoint(
        batch_id: int,
        manager: Annotated[ModelReferenceManager, Depends(get_model_reference_manager)],
        apikey: Annotated[str, Depends(header_auth_scheme)],
        job_id: Annotated[str | None, Query(max_length=120)] = None,
    ) -> JSONResponse:
        """Apply all APPROVED changes in a batch, skipping already-applied changes."""
        approver_id, approver_username = await _assert_approver(apikey)
        assert_write_enabled(manager)
        queue_service = require_pending_queue_service(manager)

        # Get all changes in the batch
        batch_filter = PendingQueueFilter(batch_id=batch_id)
        all_changes = queue_service.list_changes(queue_filter=batch_filter, offset=0, limit=None)

        if all_changes.total == 0:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"No changes found for batch {batch_id}",
            )

        # Filter to only APPROVED changes (skip APPLIED, REJECTED, PENDING)
        approved_changes = [change for change in all_changes.items if change.status == PendingChangeStatus.APPROVED]

        if not approved_changes:
            # Check if batch exists but all changes are already applied
            applied_count = sum(1 for c in all_changes.items if c.status == PendingChangeStatus.APPLIED)
            if applied_count == all_changes.total:
                # All changes already applied - return success with empty list
                return JSONResponse(
                    status_code=status.HTTP_200_OK,
                    content=ApplyPendingChangesResponse(applied=[]).model_dump(mode="json", exclude_none=True),
                )
            # Batch exists but has no approved changes (all pending/rejected)
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"No approved changes found in batch {batch_id}",
            )

        change_ids = [change.change_id for change in approved_changes]

        try:
            result = apply_pending_changes(
                manager=manager,
                queue_service=queue_service,
                change_ids=change_ids,
                applied_by=approver_id,
                applied_username=approver_username,
                job_id=job_id,
                enforce_batch_cohesion=True,  # Always enforce for batch endpoint
            )
        except PendingChangeNotFoundError as exc:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
        except ValueError as exc:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc

        response_payload = ApplyPendingChangesResponse(
            applied=result.applied_records,
            failed_change_id=result.failed_change_id,
            failed_error=str(result.failed_error) if result.failed_error else None,
            failed_error_type=type(result.failed_error).__name__ if result.failed_error else None,
            batch_split_occurred=result.batch_split_occurred,
            batch_split_original_batch_id=result.batch_split_original_batch_id,
            batch_split_new_batch_id=result.batch_split_new_batch_id,
            batch_split_reassigned_count=result.batch_split_reassigned_count,
        )

        status_code = status.HTTP_200_OK
        if result.failed_error is not None:
            status_code = _status_for_apply_error(result.failed_error)

        return JSONResponse(
            status_code=status_code,
            content=response_payload.model_dump(mode="json", exclude_none=True),
        )

    return router