Skip to content

Detection API

Technical reference for the detection engine and its result structures.

adqa.detection.engine.DetectionEngine

Source code in src/adqa/detection/engine.py
class DetectionEngine:
    def __init__(
        self,
        rule_detectors: list[BaseDetector],
        ml_detectors: list[BaseMLDetector],
        tracing: Any = None,
        lineage: Any = None,
    ) -> None:
        self.rule_detectors = rule_detectors
        self.ml_detectors = ml_detectors

        self.tracing = tracing or NoOpTraceEmitter()
        self.lineage = lineage or NoOpLineageRecorder()

    # =========================
    # Public API
    # =========================

    def run(
        self,
        dataset_profile: Any,
        column_profiles: dict[str, Any],
        ml_profiles: Any | None = None,
        raw_data_sample: Any | None = None,
        historical_profiles: Any | None = None,
    ) -> DetectionResultBundle:
        # Extract correlation matrix if available in dataset_profile
        correlation_matrix = None
        if hasattr(dataset_profile, "correlations") and dataset_profile.correlations:
            correlation_matrix = getattr(dataset_profile.correlations, "matrix", None)

        context = DetectionContext(
            dataset_profile=dataset_profile,
            column_profiles=column_profiles,
            ml_profiles=ml_profiles,
            correlation_matrix=correlation_matrix,
            raw_data_sample=raw_data_sample,
            historical_profiles=historical_profiles,
        )

        bundle = DetectionResultBundle()

        # -------------------------
        # Rule-based detection
        # -------------------------
        with self.tracing.span("RULE_BASED_DETECTION", count=len(self.rule_detectors)):
            for rule_det in self.rule_detectors:
                results = self._run_rule_detector(rule_det, context)
                bundle.detections.extend(results)

        # -------------------------
        # ML-based detection
        # -------------------------
        with self.tracing.span("ML_BASED_DETECTION", count=len(self.ml_detectors)):
            for ml_det in self.ml_detectors:
                evidence = self._run_ml_detector(ml_det, context)
                bundle.ml_evidence.extend(evidence)

        if self.tracing:
            self.tracing.trace(
                "DETECTION_RESULTS",
                {
                    "rule_count": len(bundle.detections),
                    "ml_count": len(bundle.ml_evidence),
                },
            )

        return bundle

    # =========================
    # Internal Execution
    # =========================

    def _run_rule_detector(
        self, detector: BaseDetector, context: DetectionContext
    ) -> list[DetectionResult]:
        if self.tracing:
            with self.tracing.span(
                "RUN_DETECTOR", detector=detector.__class__.__name__
            ):
                results = detector.detect(context)
        else:
            results = detector.detect(context)

        for r in results:
            if hasattr(detector, "dimension") and r.dimension == "unknown":
                r.dimension = detector.dimension

        self._register_lineage(context, results)

        return results

    def _run_ml_detector(
        self, detector: BaseMLDetector, context: DetectionContext
    ) -> list[MLEvidence]:
        if self.tracing:
            with self.tracing.span("ML_MODEL_RUN", model=detector.__class__.__name__):
                evidence = detector.run_model(context)
        else:
            evidence = detector.run_model(context)

        for e in evidence:
            if hasattr(detector, "dimension") and e.dimension == "accuracy":
                e.dimension = detector.dimension

        self._register_lineage(context, evidence)

        return evidence

    # =========================
    # Lineage Handling
    # =========================

    def _register_lineage(self, context: DetectionContext, outputs: Any) -> None:
        if not self.lineage or isinstance(self.lineage, NoOpLineageRecorder):
            return

        # Attempt to get trace_id from tracing context if available
        trace_id = None
        if hasattr(self.tracing, "context") and hasattr(
            self.tracing.context, "trace_id"
        ):
            trace_id = self.tracing.context.trace_id

        if not trace_id:
            return

        for output in outputs:
            try:
                op_name = "unknown_detection"
                if hasattr(output, "detector_name"):
                    op_name = f"detection_{output.detector_name}"
                elif hasattr(output, "model_name"):
                    op_name = f"ml_{output.model_name}"

                self.lineage.record(
                    trace_id=trace_id,
                    operation=op_name,
                    # Use summary metadata instead of full profile string
                    inputs={
                        "dataset_summary": {
                            "rows": context.dataset_profile.metadata.row_count,
                            "cols": context.dataset_profile.metadata.column_count,
                        }
                    },
                    outputs={"detection_id": getattr(output, "id", None)},
                    metadata={
                        "issue_type": getattr(output, "issue_type", None),
                        "score": getattr(output, "score", None),
                    },
                )
            except Exception:
                # Never break pipeline due to lineage
                pass

adqa.detection.results.DetectionResult dataclass

Source code in src/adqa/detection/results.py
@dataclass
class DetectionResult:
    detector_name: str
    issue_type: str
    dimension: str = "unknown"

    column: str | None = None
    columns: list[str] | None = None

    scope: str = "column"  # column | dataset | cross_column

    severity_hint: float = 0.0  # [0, 1], NOT final severity
    confidence: float = 1.0  # [0, 1]

    metrics: dict[str, Any] = field(default_factory=dict)
    description: str = ""

    id: str = field(default_factory=_generate_id)

    def to_dict(self) -> dict[str, Any]:
        return {
            "id": self.id,
            "detector_name": self.detector_name,
            "issue_type": self.issue_type,
            "dimension": self.dimension,
            "column": self.column,
            "columns": self.columns,
            "scope": self.scope,
            "severity_hint": self.severity_hint,
            "confidence": self.confidence,
            "metrics": self.metrics,
            "description": self.description,
        }

adqa.detection.results.DetectionResultBundle dataclass

Source code in src/adqa/detection/results.py
@dataclass
class DetectionResultBundle:
    detections: list[DetectionResult] = field(default_factory=list)
    ml_evidence: list[MLEvidence] = field(default_factory=list)

    def extend(self, other: "DetectionResultBundle") -> None:
        self.detections.extend(other.detections)
        self.ml_evidence.extend(other.ml_evidence)

    def to_dict(self) -> dict[str, Any]:
        return {
            "detections": [d.to_dict() for d in self.detections],
            "ml_evidence": [e.to_dict() for e in self.ml_evidence],
        }