Source code for qmlhc.metrics.anomalies

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Anomaly Detection Metrics
-------------------------
Early-warning and evaluation utilities for anomaly detection on time series.

Provided metrics
----------------
- ``early_roc_auc``: ROC-AUC restricted to an early-detection horizon.
- ``recall_at_lag``: Recall counting predictions made within a lag window.

"""

from __future__ import annotations

import numpy as np

from ..core.types import TensorLike


[docs] def early_roc_auc(y_true: TensorLike, scores: TensorLike, horizon: int = 1) -> float: """ Compute ROC-AUC restricted to an early-detection horizon. Positive events occurring within the next ``horizon`` steps are labeled as positives for the current time index. The metric then estimates the ROC-AUC by pairwise comparisons between positive and negative score sets. Parameters ---------- y_true : TensorLike Ground-truth anomaly indicator (1 for anomaly, 0 otherwise), shape ``(T,)``. scores : TensorLike Continuous anomaly scores aligned with ``y_true``, shape ``(T,)``. horizon : int, optional Early-detection lookahead window, by default ``1``. Returns ------- float Early-window ROC-AUC in ``[0, 1]``. Returns ``0.5`` if positives or negatives are absent (uninformative baseline). """ y = np.asarray(y_true, dtype=float).reshape(-1) s = np.asarray(scores, dtype=float).reshape(-1) n = len(y) # Shifted labels: mark current index positive if an event occurs within horizon. y_shift = np.zeros_like(y) for i in range(n - horizon): if y[i + horizon] > 0: y_shift[i] = 1.0 pos = s[y_shift == 1] neg = s[y_shift == 0] if len(pos) == 0 or len(neg) == 0: return 0.5 # Empirical AUC via pairwise comparisons greater = sum(p > n_ for p in pos for n_ in neg) auc = greater / (len(pos) * len(neg)) return float(auc)
[docs] def recall_at_lag(y_true: TensorLike, y_pred: TensorLike, lag: int = 1) -> float: """ Fraction of anomalies recalled within a backward lag window. For each true anomaly at index ``i``, counts a hit if any predicted positive occurs in ``[i - lag, i]``. Suitable for evaluating early alarms that may precede the labeled event by up to ``lag`` steps. Parameters ---------- y_true : TensorLike Ground-truth anomaly indicator (1 for anomaly, 0 otherwise), shape ``(T,)``. y_pred : TensorLike Binary predictions aligned with ``y_true``, shape ``(T,)``. lag : int, optional Backward window size for crediting early detections, by default ``1``. Returns ------- float Recall in ``[0, 1]`` computed with lag tolerance. """ y = np.asarray(y_true, dtype=float).reshape(-1) p = np.asarray(y_pred, dtype=float).reshape(-1) hits = 0 total = 0 for i in range(len(y)): if y[i] > 0: total += 1 # Credit any positive prediction within [i - lag, i] for j in range(max(0, i - lag), i + 1): if j < len(p) and p[j] > 0: hits += 1 break return float(hits / (total + 1e-12))