Skip to content

ADQA Core API

The ADQA class is the main entry point for the library.

adqa.core.api.ADQA

Public entry point for ADQA. Orchestrates the Phase 3 pipeline.

Source code in src/adqa/core/api.py
class ADQA:
    """
    Public entry point for ADQA.
    Orchestrates the Phase 3 pipeline.
    """

    def __init__(
        self,
        *,
        data_source: DataSource,
        config: ADQAConfig | None = None,
        tags: list[str] | None = None,
        metadata: dict[str, Any] | None = None,
        historical_profiles: Any | None = None,
    ):
        self._config: ADQAConfig = config or ADQAConfig()
        self._data_source: DataSource = data_source
        self._tags = tags or []
        self._metadata = metadata or {}
        self._historical_profiles = historical_profiles

        # Build reader early → fail fast
        self._reader: DataReader = DataReaderFactory.create(self._data_source)

        # Lineage setup
        self._lineage: NoOpLineageRecorder | LineageRecorder = self._init_lineage()

        # Profiling setup
        self._cache = ProfileCache()
        self._profiler = ProfilingEngine(
            config=self._config,
            cache=self._cache,
            lineage=self._lineage,
        )

        # Detection setup
        registry = build_registry()
        thresholds = self._config.detection.thresholds

        ml_detectors = []
        if self._config.ml_enabled and self._config.detection.enable_ml:
            ml_detectors = registry.create_ml_detectors(thresholds=thresholds)

        self._detection_engine = DetectionEngine(
            rule_detectors=registry.create_rule_detectors(thresholds=thresholds),
            ml_detectors=ml_detectors,
            lineage=self._lineage,
        )

        # Scoring setup
        self._scoring_engine = ScoringEngine(
            lineage=self._lineage,
            weight_map=self._config.scoring.weight_map,
        )

        # Execution setup
        self._execution_engine = ExecutionEngine(lineage=self._lineage)

    @staticmethod
    def from_path(
        path: str,
        config: ADQAConfig | None = None,
        tags: list[str] | None = None,
        metadata: dict[str, Any] | None = None,
        historical_profiles: Any | None = None,
    ) -> ADQA:
        """
        Quick-start from a local or remote path.
        """
        source = DataSource.load(path)
        return ADQA(
            data_source=source,
            config=config,
            tags=tags,
            metadata=metadata,
            historical_profiles=historical_profiles,
        )

    @staticmethod
    def from_df(
        df: pd.DataFrame,
        config: ADQAConfig | None = None,
        tags: list[str] | None = None,
        metadata: dict[str, Any] | None = None,
        historical_profiles: Any | None = None,
    ) -> ADQA:
        """
        Quick-start from an existing pandas DataFrame.
        """
        source = DataSource.from_df(df)
        return ADQA(
            data_source=source,
            config=config,
            tags=tags,
            metadata=metadata,
            historical_profiles=historical_profiles,
        )

    def on_action(self, action_type: str, handler: Any) -> ADQA:
        """
        Register a custom side-effect handler for an action (e.g. BLOCK, WARN).
        Returns self for fluent chaining.
        """
        self._execution_engine.executor.register_handler(action_type, handler)
        return self

    # --------------------------------------------------
    # Public API
    # --------------------------------------------------

    def analyze(
        self, profile_override: Any = None, historical_profiles: Any | None = None
    ) -> ADQAResult:
        """
        Run ADQA analysis.
        Orchestrates full Phase 3 pipeline:
        Ingress -> Profiling -> Detection -> Scoring -> Execution.
        """
        trace_id, trace_emitter, snapshot = self._initialize_trace_session()

        hist = historical_profiles or self._historical_profiles

        with trace_emitter.span("ADQA_ANALYZE", component=TraceComponent.TRACE):
            # ---- Data ingress ----
            trace_emitter.emit(
                TraceEvent(
                    trace_id=trace_id,
                    event_type=TraceEventType.DATA_INGRESS,
                    name="READ_DATA_START",
                )
            )

            try:
                df = self._reader.read()
            except Exception as e:
                trace_emitter.emit(
                    TraceEvent(
                        trace_id=trace_id,
                        event_type=TraceEventType.ERROR,
                        name="READ_DATA_ERROR",
                        metadata={"error": str(e)},
                    )
                )
                return ADQAResult(
                    dataframe=None,
                    execution_mode=self._config.execution_mode.value,
                    trace_id=str(trace_id),
                    config_hash=snapshot.hash(),
                    error=str(e),
                )

            trace_emitter.emit(
                TraceEvent(
                    trace_id=trace_id,
                    event_type=TraceEventType.DATA_INGRESS,
                    name="READ_DATA_END",
                    metadata={
                        "rows": len(df),
                        "columns": list(df.columns),
                    },
                )
            )

            # ---- Lineage ----
            self._lineage.record(
                trace_id=trace_id,
                operation="read",
                inputs={
                    "source": (
                        self._data_source.config.model_dump(mode="json")
                        if hasattr(self._data_source.config, "model_dump")
                        else str(self._data_source.config)
                    )
                },
                outputs={"dataframe_columns": list(df.columns)},
            )

            # ---- Profiling ----
            self._profiler._tracer = trace_emitter
            if profile_override is not None:
                profiling_result = profile_override
                trace_emitter.trace("PROFILE_OVERRIDE_APPLIED")
            else:
                profiling_result = self._profiler.run(df)

            # ---- Detection ----
            self._detection_engine.tracing = trace_emitter
            column_profiles_map = {
                p.name: p for p in profiling_result.dataset_profile.columns
            }
            detections = self._detection_engine.run(
                dataset_profile=profiling_result.dataset_profile,
                column_profiles=column_profiles_map,
                ml_profiles=profiling_result.ml_profiles,
                raw_data_sample=df,
                historical_profiles=hist,
            )

            # ---- Scoring ----
            self._scoring_engine.tracer = trace_emitter
            scoring_result = self._scoring_engine.run(detections)

            # ---- Execution ----
            self._execution_engine.tracer = trace_emitter
            execution_result, remediated_df = self._execution_engine.run(
                scoring_result.decision, self._config, df=df
            )

            return ADQAResult(
                dataframe=remediated_df if remediated_df is not None else df,
                profiles=profiling_result,
                detections=detections,
                scores=scoring_result.aggregated,
                decision=scoring_result.decision,
                execution_mode=self._config.execution_mode.value,
                actions=execution_result.executed_actions,
                blocked=execution_result.blocked,
                plan=execution_result.plan,
                approval_payload=execution_result.approval_payload,
                trace_id=str(trace_id),
                config_hash=snapshot.hash(),
            )

    def execute_plan(
        self, plan: ActionPlan, df: pd.DataFrame | None = None
    ) -> ADQAResult:
        """
        Execute an ActionPlan that has been approved
        (e.g. from a previous analyze() run).
        """
        # We need a trace session for this standalone execution run
        trace_id, trace_emitter, snapshot = self._initialize_trace_session()

        with trace_emitter.span("ADQA_EXECUTE_PLAN", component=TraceComponent.TRACE):
            self._execution_engine.tracer = trace_emitter
            result, remediated_df = self._execution_engine.execute_plan(plan, df=df)

            return ADQAResult(
                dataframe=remediated_df,
                execution_mode=self._config.execution_mode.value,
                actions=result.executed_actions,
                blocked=result.blocked,
                plan=plan,
                trace_id=str(trace_id),
                config_hash=snapshot.hash(),
            )

    # --------------------------------------------------
    # Internal helpers
    # --------------------------------------------------

    def _initialize_trace_session(
        self,
    ) -> tuple[UUID, NoOpTraceEmitter | TraceEmitter, ConfigSnapshot]:
        """
        Initialize trace context and emit start/snapshot events.
        """
        # Create trace context for this run
        trace_context = TraceContext()
        trace_id = trace_context.trace_id

        # Initialize emitter with this context
        trace_emitter = self._create_emitter(trace_context)

        # Emit trace context initialization
        trace_metadata: dict[str, Any] = {
            k: to_trace_value(v) for k, v in trace_context.to_dict().items()
        }
        trace_metadata["tags"] = [to_trace_value(t) for t in self._tags]
        trace_metadata.update(self._metadata)

        trace_emitter.emit(
            TraceEvent(
                trace_id=trace_id,
                event_type=TraceEventType.START,
                name="TRACE_CONTEXT_INITIALIZED",
                metadata=trace_metadata,
            )
        )

        # Emit config snapshot
        snapshot = snapshot_from_config(self._config)

        trace_emitter.emit(
            TraceEvent(
                trace_id=trace_id,
                event_type=TraceEventType.SNAPSHOT,
                name="CONFIG_SNAPSHOT",
                metadata={
                    "config": snapshot.to_json(),
                    "hash": snapshot.hash(),
                },
            )
        )

        return trace_id, trace_emitter, snapshot

    def _create_emitter(self, context: TraceContext) -> NoOpTraceEmitter | TraceEmitter:
        if not self._config.tracing_enabled:
            return NoOpTraceEmitter()

        store: InMemoryTraceStore | JSONTraceStore | None = None
        if self._config.trace_store == TraceStoreType.IN_MEMORY:
            store = InMemoryTraceStore()
        elif self._config.trace_store == TraceStoreType.JSONL:
            # Default to adqa_traces.jsonl since path isn't in config
            store = JSONTraceStore(file_path="adqa_traces.jsonl")

        return TraceEmitter(context=context, store=store, store_traces=True)

    def _init_lineage(self) -> NoOpLineageRecorder | LineageRecorder:
        if not self._config.lineage_enabled:
            return NoOpLineageRecorder()

        adapter = InMemoryLineageAdapter()
        recorder = LineageRecorder(adapter=adapter, enabled=True)
        return recorder

analyze(profile_override=None, historical_profiles=None)

Run ADQA analysis. Orchestrates full Phase 3 pipeline: Ingress -> Profiling -> Detection -> Scoring -> Execution.

Source code in src/adqa/core/api.py
def analyze(
    self, profile_override: Any = None, historical_profiles: Any | None = None
) -> ADQAResult:
    """
    Run ADQA analysis.
    Orchestrates full Phase 3 pipeline:
    Ingress -> Profiling -> Detection -> Scoring -> Execution.
    """
    trace_id, trace_emitter, snapshot = self._initialize_trace_session()

    hist = historical_profiles or self._historical_profiles

    with trace_emitter.span("ADQA_ANALYZE", component=TraceComponent.TRACE):
        # ---- Data ingress ----
        trace_emitter.emit(
            TraceEvent(
                trace_id=trace_id,
                event_type=TraceEventType.DATA_INGRESS,
                name="READ_DATA_START",
            )
        )

        try:
            df = self._reader.read()
        except Exception as e:
            trace_emitter.emit(
                TraceEvent(
                    trace_id=trace_id,
                    event_type=TraceEventType.ERROR,
                    name="READ_DATA_ERROR",
                    metadata={"error": str(e)},
                )
            )
            return ADQAResult(
                dataframe=None,
                execution_mode=self._config.execution_mode.value,
                trace_id=str(trace_id),
                config_hash=snapshot.hash(),
                error=str(e),
            )

        trace_emitter.emit(
            TraceEvent(
                trace_id=trace_id,
                event_type=TraceEventType.DATA_INGRESS,
                name="READ_DATA_END",
                metadata={
                    "rows": len(df),
                    "columns": list(df.columns),
                },
            )
        )

        # ---- Lineage ----
        self._lineage.record(
            trace_id=trace_id,
            operation="read",
            inputs={
                "source": (
                    self._data_source.config.model_dump(mode="json")
                    if hasattr(self._data_source.config, "model_dump")
                    else str(self._data_source.config)
                )
            },
            outputs={"dataframe_columns": list(df.columns)},
        )

        # ---- Profiling ----
        self._profiler._tracer = trace_emitter
        if profile_override is not None:
            profiling_result = profile_override
            trace_emitter.trace("PROFILE_OVERRIDE_APPLIED")
        else:
            profiling_result = self._profiler.run(df)

        # ---- Detection ----
        self._detection_engine.tracing = trace_emitter
        column_profiles_map = {
            p.name: p for p in profiling_result.dataset_profile.columns
        }
        detections = self._detection_engine.run(
            dataset_profile=profiling_result.dataset_profile,
            column_profiles=column_profiles_map,
            ml_profiles=profiling_result.ml_profiles,
            raw_data_sample=df,
            historical_profiles=hist,
        )

        # ---- Scoring ----
        self._scoring_engine.tracer = trace_emitter
        scoring_result = self._scoring_engine.run(detections)

        # ---- Execution ----
        self._execution_engine.tracer = trace_emitter
        execution_result, remediated_df = self._execution_engine.run(
            scoring_result.decision, self._config, df=df
        )

        return ADQAResult(
            dataframe=remediated_df if remediated_df is not None else df,
            profiles=profiling_result,
            detections=detections,
            scores=scoring_result.aggregated,
            decision=scoring_result.decision,
            execution_mode=self._config.execution_mode.value,
            actions=execution_result.executed_actions,
            blocked=execution_result.blocked,
            plan=execution_result.plan,
            approval_payload=execution_result.approval_payload,
            trace_id=str(trace_id),
            config_hash=snapshot.hash(),
        )

execute_plan(plan, df=None)

Execute an ActionPlan that has been approved (e.g. from a previous analyze() run).

Source code in src/adqa/core/api.py
def execute_plan(
    self, plan: ActionPlan, df: pd.DataFrame | None = None
) -> ADQAResult:
    """
    Execute an ActionPlan that has been approved
    (e.g. from a previous analyze() run).
    """
    # We need a trace session for this standalone execution run
    trace_id, trace_emitter, snapshot = self._initialize_trace_session()

    with trace_emitter.span("ADQA_EXECUTE_PLAN", component=TraceComponent.TRACE):
        self._execution_engine.tracer = trace_emitter
        result, remediated_df = self._execution_engine.execute_plan(plan, df=df)

        return ADQAResult(
            dataframe=remediated_df,
            execution_mode=self._config.execution_mode.value,
            actions=result.executed_actions,
            blocked=result.blocked,
            plan=plan,
            trace_id=str(trace_id),
            config_hash=snapshot.hash(),
        )

from_df(df, config=None, tags=None, metadata=None, historical_profiles=None) staticmethod

Quick-start from an existing pandas DataFrame.

Source code in src/adqa/core/api.py
@staticmethod
def from_df(
    df: pd.DataFrame,
    config: ADQAConfig | None = None,
    tags: list[str] | None = None,
    metadata: dict[str, Any] | None = None,
    historical_profiles: Any | None = None,
) -> ADQA:
    """
    Quick-start from an existing pandas DataFrame.
    """
    source = DataSource.from_df(df)
    return ADQA(
        data_source=source,
        config=config,
        tags=tags,
        metadata=metadata,
        historical_profiles=historical_profiles,
    )

from_path(path, config=None, tags=None, metadata=None, historical_profiles=None) staticmethod

Quick-start from a local or remote path.

Source code in src/adqa/core/api.py
@staticmethod
def from_path(
    path: str,
    config: ADQAConfig | None = None,
    tags: list[str] | None = None,
    metadata: dict[str, Any] | None = None,
    historical_profiles: Any | None = None,
) -> ADQA:
    """
    Quick-start from a local or remote path.
    """
    source = DataSource.load(path)
    return ADQA(
        data_source=source,
        config=config,
        tags=tags,
        metadata=metadata,
        historical_profiles=historical_profiles,
    )

on_action(action_type, handler)

Register a custom side-effect handler for an action (e.g. BLOCK, WARN). Returns self for fluent chaining.

Source code in src/adqa/core/api.py
def on_action(self, action_type: str, handler: Any) -> ADQA:
    """
    Register a custom side-effect handler for an action (e.g. BLOCK, WARN).
    Returns self for fluent chaining.
    """
    self._execution_engine.executor.register_handler(action_type, handler)
    return self

adqa.core.result.ADQAResult dataclass

Public result object returned by ADQA.analyze().

Source code in src/adqa/core/result.py
@dataclass(frozen=True)
class ADQAResult:
    """
    Public result object returned by ADQA.analyze().
    """

    # Core outputs
    dataframe: pd.DataFrame | None

    # Phase 3
    profiles: ProfilingResult | None = None
    detections: DetectionResultBundle | None = None
    scores: AggregatedScore | None = None
    decision: QualityDecision | None = None

    # Execution semantics
    execution_mode: str | None = None
    actions: list[Action] | None = None
    blocked: bool = False

    # Human-in-the-loop
    plan: ActionPlan | None = None
    approval_payload: dict[str, Any] | None = None

    # Trace references (never raw trace data)
    trace_id: str | None = None
    config_hash: str | None = None

    # Error handling
    error: str | None = None

    def summary(self) -> str:
        """
        Generate a human-readable summary of the results.
        """
        if self.error:
            return f"❌ ADQA Error: {self.error}"

        if not self.decision:
            return "⏳ No analysis results available."

        emoji_map = {"PASS": "✅", "WARN": "⚠️"}
        status_emoji = emoji_map.get(self.decision.decision, "❌")

        lines = [
            f"{status_emoji} ADQA Quality Decision: {self.decision.decision}",
            f"   Score: {self.decision.score:.2f} (Global)",
        ]

        if self.decision.dimension_breakdown:
            worst_dim = min(
                self.decision.dimension_breakdown.items(), key=lambda x: x[1]
            )
            lines.append(f"   Worst Dimension: {worst_dim[0]} ({worst_dim[1]:.2f})")

        if self.decision.affected_columns:
            cols = ", ".join(self.decision.affected_columns[:3])
            suffix = "..." if len(self.decision.affected_columns) > 3 else ""
            lines.append(f"   Affected Columns: {cols}{suffix}")

        if self.blocked:
            lines.append("   🚫 PIPELINE BLOCKED: critical violations found.")
        elif self.approval_payload:
            lines.append("   ⏳ PENDING APPROVAL: human review required.")

        return "\n".join(lines)

    def to_dict(self) -> dict[str, Any]:
        """
        Convert the result to a JSON-serializable dictionary.
        """
        return {
            "trace_id": self.trace_id,
            "decision": self.decision.to_dict() if self.decision else None,
            "score": self.decision.score if self.decision else None,
            "blocked": self.blocked,
            "execution_mode": self.execution_mode,
            "error": self.error,
            "config_hash": self.config_hash,
            "detections": self.detections.to_dict() if self.detections else None,
            "plan": self.plan.to_dict() if self.plan else None,
            "actions": [a.to_dict() for a in self.actions] if self.actions else None,
        }

    def save_json(self, file_path: str) -> None:
        """
        Persist the result to a JSON file.
        """
        import json

        with open(file_path, "w") as f:
            json.dump(self.to_dict(), f, indent=2)

save_json(file_path)

Persist the result to a JSON file.

Source code in src/adqa/core/result.py
def save_json(self, file_path: str) -> None:
    """
    Persist the result to a JSON file.
    """
    import json

    with open(file_path, "w") as f:
        json.dump(self.to_dict(), f, indent=2)

summary()

Generate a human-readable summary of the results.

Source code in src/adqa/core/result.py
def summary(self) -> str:
    """
    Generate a human-readable summary of the results.
    """
    if self.error:
        return f"❌ ADQA Error: {self.error}"

    if not self.decision:
        return "⏳ No analysis results available."

    emoji_map = {"PASS": "✅", "WARN": "⚠️"}
    status_emoji = emoji_map.get(self.decision.decision, "❌")

    lines = [
        f"{status_emoji} ADQA Quality Decision: {self.decision.decision}",
        f"   Score: {self.decision.score:.2f} (Global)",
    ]

    if self.decision.dimension_breakdown:
        worst_dim = min(
            self.decision.dimension_breakdown.items(), key=lambda x: x[1]
        )
        lines.append(f"   Worst Dimension: {worst_dim[0]} ({worst_dim[1]:.2f})")

    if self.decision.affected_columns:
        cols = ", ".join(self.decision.affected_columns[:3])
        suffix = "..." if len(self.decision.affected_columns) > 3 else ""
        lines.append(f"   Affected Columns: {cols}{suffix}")

    if self.blocked:
        lines.append("   🚫 PIPELINE BLOCKED: critical violations found.")
    elif self.approval_payload:
        lines.append("   ⏳ PENDING APPROVAL: human review required.")

    return "\n".join(lines)

to_dict()

Convert the result to a JSON-serializable dictionary.

Source code in src/adqa/core/result.py
def to_dict(self) -> dict[str, Any]:
    """
    Convert the result to a JSON-serializable dictionary.
    """
    return {
        "trace_id": self.trace_id,
        "decision": self.decision.to_dict() if self.decision else None,
        "score": self.decision.score if self.decision else None,
        "blocked": self.blocked,
        "execution_mode": self.execution_mode,
        "error": self.error,
        "config_hash": self.config_hash,
        "detections": self.detections.to_dict() if self.detections else None,
        "plan": self.plan.to_dict() if self.plan else None,
        "actions": [a.to_dict() for a in self.actions] if self.actions else None,
    }