Skip to content

text_model_grouping

Text model grouping utilities.

Provides functionality to group text generation models by base name (stripping quantization information) and aggregate metrics across grouped variants.

merge_deletion_flags

merge_deletion_flags(
    flags_list: list[DeletionRiskFlags],
) -> DeletionRiskFlags

Merge multiple deletion risk flags using logical OR.

If any variant has a flag set, the merged result will have that flag set.

Parameters:

Returns:

Source code in src/horde_model_reference/analytics/text_model_grouping.py
def merge_deletion_flags(flags_list: list[DeletionRiskFlags]) -> DeletionRiskFlags:
    """Merge multiple deletion risk flags using logical OR.

    If any variant has a flag set, the merged result will have that flag set.

    Args:
        flags_list: List of DeletionRiskFlags from all variants.

    Returns:
        Merged DeletionRiskFlags.

    """
    from horde_model_reference.analytics.deletion_risk_analysis import DeletionRiskFlags

    if not flags_list:
        return DeletionRiskFlags()

    merged = DeletionRiskFlags()
    merged.zero_usage_day = any(f.zero_usage_day for f in flags_list)
    merged.zero_usage_month = any(f.zero_usage_month for f in flags_list)
    merged.zero_usage_total = any(f.zero_usage_total for f in flags_list)
    merged.no_active_workers = any(f.no_active_workers for f in flags_list)
    merged.has_multiple_hosts = any(f.has_multiple_hosts for f in flags_list)
    merged.has_non_preferred_host = any(f.has_non_preferred_host for f in flags_list)
    merged.has_unknown_host = any(f.has_unknown_host for f in flags_list)
    merged.no_download_urls = any(f.no_download_urls for f in flags_list)
    merged.missing_description = any(f.missing_description for f in flags_list)
    merged.missing_baseline = any(f.missing_baseline for f in flags_list)
    merged.low_usage = any(f.low_usage for f in flags_list)

    return merged
merge_usage_trends(
    trends: list[UsageTrend], weights: list[int]
) -> UsageTrend

Merge usage trends using weighted average.

Parameters:

  • trends (list[UsageTrend]) –

    List of UsageTrend objects from variants.

  • weights (list[int]) –

    Weights for each trend (e.g., monthly usage counts).

Returns:

  • UsageTrend

    Merged UsageTrend with weighted average ratios.

Source code in src/horde_model_reference/analytics/text_model_grouping.py
def merge_usage_trends(trends: list[UsageTrend], weights: list[int]) -> UsageTrend:
    """Merge usage trends using weighted average.

    Args:
        trends: List of UsageTrend objects from variants.
        weights: Weights for each trend (e.g., monthly usage counts).

    Returns:
        Merged UsageTrend with weighted average ratios.

    """
    from horde_model_reference.analytics.deletion_risk_analysis import UsageTrend

    if not trends or not weights:
        return UsageTrend()

    total_weight = sum(weights)
    if total_weight == 0:
        return UsageTrend()

    day_to_month_ratios = [
        trend.day_to_month_ratio * weight
        for trend, weight in zip(trends, weights, strict=True)
        if trend.day_to_month_ratio
    ]
    month_to_total_ratios = [
        trend.month_to_total_ratio * weight
        for trend, weight in zip(trends, weights, strict=True)
        if trend.month_to_total_ratio
    ]

    return UsageTrend(
        day_to_month_ratio=sum(day_to_month_ratios) / total_weight if day_to_month_ratios else None,
        month_to_total_ratio=sum(month_to_total_ratios) / total_weight if month_to_total_ratios else None,
    )

group_risk_models

group_risk_models(
    models: list[ModelDeletionRiskInfo],
) -> list[ModelDeletionRiskInfo]

Group text model variants by base name and aggregate metrics.

Combines multiple quantization variants (Q4_K_M, Q5_0, etc.) into a single model entry with aggregated metrics.

Parameters:

Returns:

Source code in src/horde_model_reference/analytics/text_model_grouping.py
def group_risk_models(models: list[ModelDeletionRiskInfo]) -> list[ModelDeletionRiskInfo]:
    """Group text model variants by base name and aggregate metrics.

    Combines multiple quantization variants (Q4_K_M, Q5_0, etc.) into a single
    model entry with aggregated metrics.

    Args:
        models: List of ModelDeletionRiskInfo objects to group.

    Returns:
        List of grouped ModelDeletionRiskInfo objects with aggregated metrics.

    """
    from horde_model_reference.analytics.deletion_risk_analysis import ModelDeletionRiskInfo

    if not models:
        return []

    grouped: dict[str, list[ModelDeletionRiskInfo]] = {}
    for model in models:
        base_name = get_base_model_name(model.name)
        if base_name not in grouped:
            grouped[base_name] = []
        grouped[base_name].append(model)

    result: list[ModelDeletionRiskInfo] = []
    for base_name, variants in grouped.items():
        if len(variants) == 1:
            # Single variant - normalize the name to base_name (strip backend/author prefixes)
            single_model = variants[0]
            if single_model.name != base_name:
                # Create a copy with the normalized base name
                single_model = ModelDeletionRiskInfo(
                    name=base_name,
                    category=single_model.category,
                    deletion_risk_flags=single_model.deletion_risk_flags,
                    at_risk=single_model.at_risk,
                    risk_score=single_model.risk_score,
                    worker_count=single_model.worker_count,
                    usage_day=single_model.usage_day,
                    usage_month=single_model.usage_month,
                    usage_total=single_model.usage_total,
                    usage_hour=single_model.usage_hour,
                    usage_minute=single_model.usage_minute,
                    usage_percentage_of_category=single_model.usage_percentage_of_category,
                    usage_trend=single_model.usage_trend,
                    cost_benefit_score=single_model.cost_benefit_score,
                    size_gb=single_model.size_gb,
                    baseline=single_model.baseline,
                    nsfw=single_model.nsfw,
                    has_description=single_model.has_description,
                    download_count=single_model.download_count,
                    download_hosts=single_model.download_hosts,
                )
            result.append(single_model)
            continue

        logger.debug(f"Grouping {len(variants)} variants of '{base_name}'")

        total_usage_day = sum(v.usage_day for v in variants)
        total_usage_month = sum(v.usage_month for v in variants)
        total_usage_total = sum(v.usage_total for v in variants)
        total_usage_hour = sum(v.usage_hour for v in variants if v.usage_hour is not None)
        total_usage_minute = sum(v.usage_minute for v in variants if v.usage_minute is not None)

        max_worker_count = max(v.worker_count for v in variants)

        total_size_gb = sum(v.size_gb for v in variants if v.size_gb is not None)
        size_count = len([v for v in variants if v.size_gb is not None])
        avg_size_gb = total_size_gb / size_count if total_size_gb > 0 else None

        merged_flags = merge_deletion_flags([v.deletion_risk_flags for v in variants])

        weights = [v.usage_month for v in variants]
        merged_trend = merge_usage_trends([v.usage_trend for v in variants], weights)

        all_hosts = list({host for v in variants for host in v.download_hosts})

        first_variant = variants[0]

        usage_percentage = sum(v.usage_percentage_of_category for v in variants)

        cost_benefit = None
        if avg_size_gb and avg_size_gb > 0:
            cost_benefit = total_usage_month / avg_size_gb

        grouped_model = ModelDeletionRiskInfo(
            name=base_name,
            category=first_variant.category,
            deletion_risk_flags=merged_flags,
            at_risk=merged_flags.any_flags(),
            risk_score=merged_flags.flag_count(),
            worker_count=max_worker_count,
            usage_day=total_usage_day,
            usage_month=total_usage_month,
            usage_total=total_usage_total,
            usage_hour=total_usage_hour if any(v.usage_hour is not None for v in variants) else None,
            usage_minute=total_usage_minute if any(v.usage_minute is not None for v in variants) else None,
            usage_percentage_of_category=usage_percentage,
            usage_trend=merged_trend,
            cost_benefit_score=cost_benefit,
            size_gb=avg_size_gb,
            baseline=first_variant.baseline,
            nsfw=first_variant.nsfw,
            has_description=all(v.has_description for v in variants),
            download_count=sum(v.download_count for v in variants),
            download_hosts=all_hosts,
        )

        result.append(grouped_model)

    logger.info(f"Grouped {len(models)} models into {len(result)} entries")
    return result

recalculate_risk_summary

recalculate_risk_summary(
    models: list[ModelDeletionRiskInfo],
    category_total_usage: int,
) -> CategoryDeletionRiskSummary

Recalculate risk summary after grouping models.

Parameters:

  • models (list[ModelDeletionRiskInfo]) –

    List of (potentially grouped) ModelDeletionRiskInfo objects.

  • category_total_usage (int) –

    Total monthly usage for the category.

Returns:

Source code in src/horde_model_reference/analytics/text_model_grouping.py
def recalculate_risk_summary(
    models: list[ModelDeletionRiskInfo],
    category_total_usage: int,
) -> CategoryDeletionRiskSummary:
    """Recalculate risk summary after grouping models.

    Args:
        models: List of (potentially grouped) ModelDeletionRiskInfo objects.
        category_total_usage: Total monthly usage for the category.

    Returns:
        New CategoryDeletionRiskSummary with updated counts.

    """
    from horde_model_reference.analytics.deletion_risk_analysis import CategoryDeletionRiskSummary

    return CategoryDeletionRiskSummary(
        total_models=len(models),
        models_at_risk=sum(1 for m in models if m.at_risk),
        models_critical=sum(1 for m in models if m.is_critical),
        models_with_warnings=sum(1 for m in models if m.has_warning),
        models_with_zero_day_usage=sum(1 for m in models if m.deletion_risk_flags.zero_usage_day),
        models_with_zero_month_usage=sum(1 for m in models if m.deletion_risk_flags.zero_usage_month),
        models_with_zero_total_usage=sum(1 for m in models if m.deletion_risk_flags.zero_usage_total),
        models_with_no_active_workers=sum(1 for m in models if m.deletion_risk_flags.no_active_workers),
        models_with_no_downloads=sum(1 for m in models if m.deletion_risk_flags.no_download_urls),
        models_with_non_preferred_hosts=sum(1 for m in models if m.deletion_risk_flags.has_non_preferred_host),
        models_with_multiple_hosts=sum(1 for m in models if m.deletion_risk_flags.has_multiple_hosts),
        models_with_low_usage=sum(1 for m in models if m.deletion_risk_flags.low_usage),
        average_risk_score=sum(m.risk_score for m in models) / len(models) if models else 0.0,
        category_total_month_usage=category_total_usage,
    )

apply_text_model_grouping_to_risk_response

apply_text_model_grouping_to_risk_response(
    risk_response: CategoryDeletionRiskResponse,
) -> CategoryDeletionRiskResponse

Apply text model grouping to deletion risk response.

Groups text generation models by base name and recalculates summary.

Parameters:

Returns:

Source code in src/horde_model_reference/analytics/text_model_grouping.py
def apply_text_model_grouping_to_risk_response(
    risk_response: CategoryDeletionRiskResponse,
) -> CategoryDeletionRiskResponse:
    """Apply text model grouping to deletion risk response.

    Groups text generation models by base name and recalculates summary.

    Args:
        risk_response: Original CategoryDeletionRiskResponse.

    Returns:
        New CategoryDeletionRiskResponse with grouped models and updated summary.

    """
    from horde_model_reference.analytics.deletion_risk_analysis import CategoryDeletionRiskResponse

    if risk_response.category != MODEL_REFERENCE_CATEGORY.text_generation:
        logger.debug(f"Skipping grouping for non-text category: {risk_response.category}")
        return risk_response

    grouped_models = group_risk_models(risk_response.models)
    new_summary = recalculate_risk_summary(grouped_models, risk_response.category_total_month_usage)

    return CategoryDeletionRiskResponse(
        category=risk_response.category,
        category_total_month_usage=risk_response.category_total_month_usage,
        total_count=risk_response.total_count,  # Preserve original total
        returned_count=len(grouped_models),
        offset=risk_response.offset,
        limit=risk_response.limit,
        models=grouped_models,
        summary=new_summary,
    )