Skip to main content

Command Palette

Search for a command to run...

AI-Driven Data Quality: From Rules to Reasoning

AI-Augmented Data Engineering | Article 2 of 7 | tech4nirvana.com

Updated
11 min read
AI-Driven Data Quality: From Rules to Reasoning
K
Karthik Darbha is a Data Engineering & AI Leader with over 23 years of experience in Healthcare, Pharma, Retail, Insurance, and Financial Services. He writes at tech4nirvana.com, exploring the intersection of data architecture and timeless wisdom.

Every data engineering team has a version of the same story.

A critical dashboard starts showing numbers that don't feel right. An analyst flags it Friday afternoon. The on-call engineer traces it back — a source table's null rate tripled two days ago. The rule-based data quality tests passed. The pipeline ran green. Nothing fired.

The rules said the data was fine. The data was not fine.

This is the fundamental failure mode of rule-based data quality: it can only catch what someone thought to check for, in advance, expressed as a threshold someone was willing to defend. At the scale and velocity of modern data platforms, that contract breaks constantly — and silently.


1. The Problem — What Rules Cannot See

Rule-based quality systems are built on a foundational assumption: you know what "bad" looks like before it happens. You express that knowledge as constraints: not null, within range, referential integrity, row count within ±10% of yesterday's.

This works well for known failure modes. It is structurally incapable of catching unknown ones.

Consider what rules miss:

Distribution drift — A revenue column's values quietly shift from a mean of $1.2M to $1.9M over three weeks. Every individual row is within the defined range. The aggregate distribution has fundamentally changed. No threshold fires.

Seasonal baseline collapse — A weekly batch table that normally lands with 2.1M rows on Monday arrives with 1.8M. That's within the ±10% rule. But your team doesn't know that last Monday was a holiday in the source system and 1.8M is now the new normal. The rule no longer means anything.

Correlated column anomalies — A transaction table where amount and discount_pct move in unexpected correlation — not individually out of range, but together, statistically improbable. Rule sets do not model joint distributions.

Silent schema migrations — A source system begins zero-padding a product code column. String lengths change. Values are still non-null and string-typed. Every rule passes. Downstream joins silently fail.

In Advaita, tamas describes the quality of inertia, opacity, and entropy — the state in which things lose their clarity and coherence without the obvious appearance of breaking. Rule-based data quality operates in a tamasic mode: it responds only when something crosses an explicit threshold, while the underlying signal degrades unremarked.

The math of rule maintenance compounds this. A platform with 500 Delta tables, each averaging 40 monitored columns, requires 20,000 rules — each of which must be authored, calibrated, and updated when distributions legitimately shift. Most teams maintain a fraction of that, concentrated on tables someone already knows are important.

The rest of the estate runs on trust.


2. The AI Opportunity — What's Now Tractable

The shift AI enables is from explicit constraint to learned expectation. Instead of asking "does this value violate a rule I wrote?", the system asks "does this value behave the way this column historically has?"

That reframe opens up several capabilities that were not practically achievable before:

Statistical anomaly detection per column — Models that learn the distribution of each column over time (mean, variance, skew, null rate, value frequency) and flag statistically significant deviations. No threshold authoring required.

Multivariate anomaly detection — Models like Isolation Forest or Autoencoders that learn the joint distribution across columns and flag rows that are individually plausible but collectively improbable. This catches correlated anomalies that column-level rules miss entirely.

Adaptive baselines — Instead of a static ±10% row count rule, a time-series model learns day-of-week, week-of-month, and seasonal patterns. It flags Monday arrivals against the distribution of past Monday arrivals — accounting for holidays, promotions, and growth trends.

Automated profiling at catalog scale — Profile generation (column statistics, value distributions, null rates, cardinality) can run as a background Databricks job across your entire Delta Lake, feeding both anomaly models and Unity Catalog enrichment.

Confidence-aware alerting — Instead of binary pass/fail, ML-based systems can emit anomaly scores. Downstream logic can tier the response: high-confidence anomaly triggers quarantine; medium-confidence triggers review; low-confidence logs for trend analysis.

The practical entry point for most Azure/Databricks teams is a two-layer architecture: keep your existing rule-based tests (they catch hard constraints cheaply), and add an ML anomaly detection layer that operates on distributions and scores deviations. The rules handle known failure modes. The models handle the unknown.


3. Implementation Sketch — Databricks + MLflow Anomaly Scorer on Delta

The following is a working implementation pattern, not production-ready code. It demonstrates the architecture decisions; your team will adapt thresholds, scheduling, and alerting to your context.

Architecture

ADF Pipeline Trigger
      │
      ▼
Databricks Job: Profile & Score
      │
      ├── Step 1: Column Profiling (PySpark → Delta stats table)
      ├── Step 2: Feature Engineering (rolling stats, drift metrics)
      ├── Step 3: MLflow Model Inference (Isolation Forest per table)
      └── Step 4: Write anomaly scores → Delta quality log table
                        │
                        ├── High-score rows → Quarantine path
                        └── Alerts → Teams/PagerDuty via ADF webhook

Step 1 — Profile a Delta Table

from pyspark.sql import functions as F
from pyspark.sql import DataFrame

def profile_delta_table(df: DataFrame, table_name: str, run_ts: str) -> DataFrame:
    """
    Compute per-column statistics for anomaly feature generation.
    Returns a single-row DataFrame of profile metrics.
    """
    numeric_cols = [f.name for f in df.schema.fields
                    if f.dataType.typeName() in ("double", "float", "long", "integer", "decimal")]

    profile = {"table_name": table_name, "run_ts": run_ts, "row_count": df.count()}

    for col in numeric_cols:
        stats = df.select(
            F.mean(col).alias("mean"),
            F.stddev(col).alias("stddev"),
            F.expr(f"percentile({col}, 0.25)").alias("p25"),
            F.expr(f"percentile({col}, 0.75)").alias("p75"),
            (F.sum(F.when(F.col(col).isNull(), 1).otherwise(0)) / F.count("*")).alias("null_rate")
        ).first()
        profile[f"{col}_mean"]      = stats["mean"]
        profile[f"{col}_stddev"]    = stats["stddev"]
        profile[f"{col}_p25"]       = stats["p25"]
        profile[f"{col}_p75"]       = stats["p75"]
        profile[f"{col}_null_rate"] = stats["null_rate"]

    return spark.createDataFrame([profile])

Production note: On multi-terabyte Delta tables, exact percentile() triggers a full data shuffle across the cluster. For large-scale deployments, prefer Delta Lake's transaction log column statistics (available via DESCRIBE DETAIL) for min/max/null counts, and use percentile_approx() with a t-digest algorithm for distribution estimates. This significantly reduces compute cost without meaningful loss in anomaly detection accuracy.

Step 2 — Train and Register an Isolation Forest Model

import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import IsolationForest

def train_anomaly_model(profile_history_df: pd.DataFrame, table_name: str):
    """
    Train an Isolation Forest on historical profile snapshots.
    Log model to MLflow with table-scoped experiment.
    """
    # Drop non-feature columns
    feature_df = profile_history_df.drop(columns=["table_name", "run_ts", "row_count"])
    feature_df = feature_df.fillna(feature_df.median())

    model = IsolationForest(
        n_estimators=100,
        contamination=0.05,   # assume ~5% of historical snapshots were anomalous
        random_state=42
    )
    model.fit(feature_df)

    mlflow.set_experiment(f"/data_quality/{table_name}")
    with mlflow.start_run(run_name="isolation_forest_v1"):
        mlflow.log_param("contamination", 0.05)
        mlflow.log_param("n_estimators", 100)
        mlflow.sklearn.log_model(model, artifact_path="model",
                                  registered_model_name=f"dq_anomaly_{table_name}")
    return model

Step 3 — Score a New Batch Profile

def score_profile(current_profile: pd.DataFrame, model) -> dict:
    """
    Score the current run's profile against the trained model.
    Returns anomaly flag and raw score.
    """
    feature_df = current_profile.drop(columns=["table_name", "run_ts", "row_count"])
    feature_df = feature_df.fillna(0)

    score   = model.decision_function(feature_df)[0]   # higher = more normal
    is_anomaly = model.predict(feature_df)[0] == -1    # -1 = anomaly

    return {
        "anomaly_score": float(score),
        "is_anomaly": bool(is_anomaly),
        "severity": "HIGH" if score < -0.15 else "MEDIUM" if score < -0.05 else "LOW"
    }

When is_anomaly is True, surface the contributing columns using SHAP before routing the alert:

import shap

def explain_anomaly(feature_df: pd.DataFrame, model, top_n: int = 3) -> list:
    """
    Use SHAP to identify the top contributing columns to an anomaly score.
    Returns a ranked list of (column, shap_value) tuples for alert enrichment.
    """
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(feature_df)

    # shap_values shape: (n_samples, n_features)
    feature_importance = sorted(
        zip(feature_df.columns, abs(shap_values[0])),
        key=lambda x: x[1],
        reverse=True
    )
    return feature_importance[:top_n]

# Usage: enrich the alert payload
if score_result["is_anomaly"]:
    top_contributors = explain_anomaly(feature_df, model)
    score_result["top_contributors"] = [
        {"column": col, "shap_value": round(val, 4)}
        for col, val in top_contributors
    ]

This transforms the alert payload from "transaction_fact looks 12% anomalous" into "transaction_fact anomaly — top drivers: amount_mean (0.34), discount_pct_null_rate (0.21), row_count (0.18)" — an actionable engineering ticket rather than an abstract flag.

Step 4 — Write Results to Delta Quality Log

from delta.tables import DeltaTable

result = {
    "table_name":     table_name,
    "run_ts":         run_ts,
    "anomaly_score":  score_result["anomaly_score"],
    "is_anomaly":     score_result["is_anomaly"],
    "severity":       score_result["severity"],
    "model_version":  model_version
}

spark.createDataFrame([result]).write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("catalog.data_quality.anomaly_log")

Scheduling and Alerting

Trigger this job from ADF immediately after each pipeline run completes. Add a downstream ADF activity that reads the anomaly log, filters is_anomaly = true AND severity = 'HIGH', and calls a webhook to route alerts to Teams or PagerDuty. Medium-severity alerts can land in a Slack channel for async review.

Unity Catalog integration: surface the latest anomaly score as a custom metadata property on each table, so catalog users see quality status inline alongside lineage and descriptions.


4. Limitations & Risks

Cold start problem. Isolation Forest needs 30–60 historical profile snapshots to establish a meaningful baseline. On new tables, the model will either surface noise or fail to detect genuine anomalies. Rule-based tests are strictly better here until sufficient history accumulates.

Model drift. Data distributions legitimately evolve — business launches, seasonal cycles, source system migrations. An anomaly model trained on six-month-old history will generate false positives against a platform that has legitimately changed. Models need periodic retraining cadences and triggered resets on known change events.

Explainability gap. An Isolation Forest flags "this profile looks anomalous" without saying which column drove the score. Engineers need to investigate — SHAP values or simple statistical comparison against the historical baseline can help surface the likely culprit column, but this is not automatic. The SHAP enrichment in Step 3 above addresses this for the scoring path; extend it to alert payloads and runbooks for full coverage.

Wide table dimensionality. Isolation Forest degrades on tables with hundreds of columns — the profile history DataFrame becomes highly wide, triggering the curse of dimensionality. For such tables, apply PCA as a preprocessing step to reduce feature space before training, or switch to an Autoencoder-based anomaly detector which handles high-dimensional feature spaces more robustly.

False positives at scale. A model with 5% contamination assumption, applied across 500 tables on a daily batch, generates 25 anomaly alerts per run under normal operation — before any real issues. Without severity tiering and alert routing discipline, this recreates the alert fatigue problem you were trying to solve.

PII and metadata exposure. Sending column-level statistics (even aggregated) to external LLM APIs for enrichment or RCA narration can expose sensitive schema topology. Keep profile data and model training within your Azure tenant. MLflow on Databricks Managed MLflow keeps all model artifacts within your workspace.

Sattva — the quality of clarity, purity, and coherent signal — is not the default state of a data platform. It is achieved through continuous attention. An ML anomaly system that drifts, hallucinates confidence, or floods engineers with noise does not produce sattva. It produces a more sophisticated form of tamas: entropy wrapped in the appearance of intelligence.


5. How to Overcome

Hybrid layer: rules + ML. Do not decommission rule-based tests. Let them handle hard constraints (not null, referential integrity, known format rules) cheaply. The ML layer handles distribution-level anomalies. Two layers, two purposes, different cost profiles.

Gated automation. Start with anomaly scoring and human review. Do not auto-quarantine rows until your model has a track record. Move to automated quarantine only after validating precision on your specific tables over 4–6 weeks. Trust must be earned incrementally.

Column importance ranking. Not all columns are equal. Run SHAP on a sample of historical anomaly events to identify which columns contribute most to anomaly scores. Prioritize those for explainability tooling and manual review.

Retraining triggers. Define retraining events: a known source system migration, a business rule change, a data model restructuring. Add these as pipeline parameters that trigger automatic model retraining and version registration in MLflow. Don't wait for scheduled retraining cycles when you know the baseline has changed.

Model performance tracking. Log confirmed anomalies (true positives) and false positives into a feedback table. Track precision and recall over time using MLflow experiment comparison. Treat model degradation as an incident, not a background observation.

Phased rollout. Week 1–2: profile top 20 critical tables, establish baselines. Week 3–4: run models in shadow mode (score but don't alert). Week 5–6: enable alerting on HIGH severity only. Month 2: expand to broader estate.


6. The Takeaway

For engineers: This week, pick one high-value Delta table your team cares about and run a basic profiling job on the last 30 days of data. Get the column distributions on paper — mean, stddev, null rate, row counts by day. You cannot detect anomalies without a baseline. The baseline is the work that makes everything else possible.

For leads: Review your current data quality SLAs and ask: are they defined in terms of rule coverage (we have X assertions), or outcome quality (our analysts trust the numbers)? The shift from rules to reasoning is not primarily a tooling decision. It is a quality definition decision. Redefine quality as business confidence, not test passage rate — and the right tooling follows.


AI doesn't eliminate engineering judgment — it demands better judgment, faster.


Next: Article 3 — Observability That Thinks: AI for Pipeline Monitoring. AI-assisted classification of volume drift, freshness failures, and schema anomalies — with Databricks and Azure Monitor.


AI-AUGMENTED DATA ENGINEERING

Part 2 of 2

Modern data platforms have outpaced our ability to see them clearly. This series explores how AI restores that clarity — across data quality, observability, root cause analysis, metadata intelligence, and cost optimisation. Written for engineers who build and leads who decide, each article pairs implementation depth with an honest reckoning of risk. The lens is Advaita: viveka — discriminative discernment — applied to the hardest problems in data engineering.

Start from the beginning

The Case for AI in Data Engineering

Series: AI-Augmented Data Engineering | Article 1 of 7 There is a quiet crisis in most data engineering teams. Pipelines fail at 2 AM. The on-call engineer spends three hours tracing a root cause tha