Metadata-Version: 2.4
Name: databricks-dq-framework
Version: 1.0.12
Summary: Metadata-driven Data Quality Assessment Framework for Databricks / Delta Lake
Author: Nitin Mathew George
License-Expression: MIT
Keywords: databricks,delta-lake,data-quality,pyspark,dq-framework
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: python-dateutil>=2.8
Requires-Dist: pandas>=2.0
Provides-Extra: standalone
Requires-Dist: pyspark>=3.3; extra == "standalone"
Requires-Dist: delta-spark>=2.0; extra == "standalone"
Provides-Extra: dev
Requires-Dist: pytest>=7; extra == "dev"
Requires-Dist: pytest-cov; extra == "dev"

# Data Quality Assessment Framework — Databricks / Delta Lake

**Open-source Python library** reimplementing the SQL Server DQ Assessment
Framework natively on Databricks with Delta tables, PySpark DataFrames, and
Python-based validation functions.

---

## What this Library Provides

A **metadata-driven, PySpark port of a SQL Server Data Quality Assessment Framework**
for Databricks/Delta Lake. It validates data in curated Delta tables using configurable
rules — no code changes needed to add new fields or update logic.

| Feature | SQL Server Implementation | Databricks Implementation |
|---|---|---|
| Storage | SQL Server tables | Delta Lake tables (Unity Catalog) |
| Pattern evaluation | T-SQL TVFs (generated dynamically) | Python closures (generated dynamically) |
| Row-level evaluation | `CROSS APPLY TVF` | PySpark UDF + `DeltaTable.merge()` |
| Custom rules | SQL expressions in `configCustomQuery` | Plain regex patterns or named validators |
| Email validation | Complex `LIKE` / `CHARINDEX` SQL | Native Python regex (enhanced) |
| Output | `SELECT` result sets | Spark DataFrames |
| Same field configuration | Script_02 SQL `INSERT` | `ConfigManager` Python API or raw SQL |
| Same pattern library | 118 patterns | 118 patterns (identical) |
| Same precedence logic | L0A `RANK()` CTE | Python ranking equivalent |
| Same validation levels | L01→L02→L03→L04→L99 | L01→L02→L03→L04→L99 (identical) |
| Same DQ columns | `DQEligible BIT`, `DQViolations`, `DQFields` | `DQEligible BOOLEAN`, `DQViolations STRING`, `DQFields STRING` |
| Same dual logging | `auditDQChecks` + `statDQChecks` | Same Delta tables |
| Same ExecutionID | `NEWID()` GUID | `uuid.uuid4()` |
| Same `1=1` scope filtering | `WHERE 1=1 AND (...)` | Python conditional filter |
| Same sticky DQ logic | `CASE WHEN DQEligible=0 THEN 0 ELSE Result END` | Spark `when()` equivalent |

---

## Architecture at a Glance

```
masterField (config) ──► configFieldAllowedPattern ──► mapDQChecks
        │                          │
  ConfigManager API      resolve_pattern_rules.py (L0A precedence)
        │                          │
        └──────────────────────────┘
                        │
          generate_rule_functions.py
                        │
              FunctionRegistry (UDFs)
                        │
          data_assessment_rules.py (DQRunner)
                        │
   Curated Table: DQEligible | DQViolations | DQFields
                        │
        ┌───────────────┴───────────────┐
   auditDQChecks               statDQChecks
        └───────────────┬───────────────┘
               Reporting Views
          (v_auditDQChecks, v_statDQChecks)
```

**Data flow:** Config tables → precedence resolution → Python closure generation →
UDF-based row assessment → DQ column updates + dual audit logging → reporting.

---

## Quick Start

```python
from dq_framework import DQFramework

# 1. Initialise
dq = DQFramework(spark, catalog="main", schema="dq")
dq.setup()

# 2. Configure a field (ConfigManager API — no SQL required)
cfg = dq.config
cfg.register_field(1, "Source.SRC_ContactPoint.EMAIL_ADDRESS", data_category_type_id=4)
cfg.set_field_values(1, "Source.SRC_ContactPoint.EMAIL_ADDRESS", min_data_length=6, max_data_length=255)
cfg.block_category(1, "Source.SRC_ContactPoint.EMAIL_ADDRESS", "DataEmptiness")
cfg.allow_pattern(2, "Source.SRC_ContactPoint.EMAIL_ADDRESS", "Has At Sign")
cfg.add_custom_query_regex(1, "Source.SRC_ContactPoint.EMAIL_ADDRESS",
                           r"^[^@\s]+@[^@\s]+\.[^@\s]+$", must_match=True)
cfg.add_mapping(1, "Source.SRC_ContactPoint.EMAIL_ADDRESS",
                target_schema_name="Curated", target_table_name="Entity_Email_Denorm",
                target_field_name="EMAIL_ADDRESS", target_catalog_name="main")

# 3. Validate config FK integrity before generating functions
cfg.verify_config()

# 4. Generate field-level checker functions from config
dq.generate_rule_functions()

# 5. Run the DQ assessment
exec_id = dq.run_assessment(schema_name="Curated")

# 6. Query results
dq.violations(exec_id).display()
dq.quality_scores(exec_id).display()
dq.fields_below_threshold(threshold=80).display()
dq.summary_by_table(exec_id).display()
```

See [notebooks/00_quickstart.py](notebooks/00_quickstart.py) for a fully commented step-by-step walkthrough.

---

## Built-in Usage Guide

Run `dq.guide()` at any time in a notebook to print the full workflow, FullFieldName patterns, all `ConfigManager` methods, and validation levels in one readable block:

```python
dq.guide()
```

---

## Prerequisite: DQ Columns on Curated Tables

Each curated Delta table being assessed must have three extra columns.
Use the built-in helper — it reads `mapDQChecks` and adds them to every mapped table at once:

```python
dq.prepare_curated_tables()                  # all tables referenced in mapDQChecks
dq.prepare_curated_table("Schema", "Table")  # or a single table
```

Both methods are **idempotent** (safe to re-run) and require **ALTER privilege** on each target table.
The assessment runner will raise a clear error if columns are missing, pointing to these methods.

The three columns added are:

```sql
DQEligible   BOOLEAN  -- 1=all checks passed, 0=at least one failed, NULL=not assessed
DQViolations STRING   -- [field: ViolationType], ... (accumulated violations)
DQFields     STRING   -- [field1], [field2] (all fields assessed on this row)
```

| Column | Value | Meaning |
|---|---|---|
| `DQEligible` | `None` / `NULL` | Row not yet assessed |
| `DQEligible` | `True` | All configured checks passed |
| `DQEligible` | `False` | At least one check failed |
| `DQViolations` | e.g. `[email_address: Data Type]` | Violated rules (sticky — accumulates across fields) |
| `DQFields` | e.g. `[email_address], [phone]` | All fields assessed on this row |

---

## Package Structure

```
dq_framework/
├── __init__.py                        # Exports: DQFramework, ConfigManager
├── framework.py                       # DQFramework facade — top-level orchestration
├── ddl_framework_tables.py            # ≡ Script_00_DDL_Framework_Tables — Delta table DDL (9 tables)
├── seed_master_data.py                # ≡ Script_01_Master_Reference_Data — 27 categories + 118 patterns
├── config.py                          # ConfigManager — Python API for all 5 config tables
├── engine/
│   ├── pattern_checks.py              # All 118 pattern check implementations
│   ├── resolve_pattern_rules.py       # Pattern precedence (L0A CTE equivalent)
│   ├── generate_rule_functions.py     # ≡ p_DQ_GenerateRuleFunctions — FunctionRegistry + closures
│   └── data_assessment_rules.py       # ≡ p_DQ_DataAssessmentRules — DQRunner
├── reporting/
│   └── views.py                       # v_auditDQChecks + v_statDQChecks equivalents
└── agents/
    ├── __init__.py                    # Exports SYSTEM_PROMPT, DQ_TOOLS, suggest_config
    ├── context.py                     # LLM system prompt + short context
    ├── tools.py                       # OpenAI-compatible tool definitions + execute_tool_call()
    └── suggest.py                     # Schema-based config auto-suggester (no LLM required)
notebooks/
├── 00_quickstart.py                   # Step-by-step walkthrough: configure → assess → results
├── 01_agent_assistant.py              # AI agent integration: auto-suggest, tool-calling, system prompt
├── 02_integration_test.py             # Integration test suite (run on cluster before publishing)
└── 03_seed_config.py                  # Paste Excel-generated INSERT SQL here
tests/
├── conftest.py                        # Path setup + test split explanation
└── unit/
    ├── test_pattern_checks.py         # All 118 pattern functions (no cluster needed)
    ├── test_resolve_pattern_rules.py  # Pattern precedence logic
    ├── test_generate_rule_functions.py # Closure building + FunctionRegistry
    ├── test_seed_master_data.py       # Seed data integrity (counts, IDs, FK)
    └── test_suggest.py                # Schema-based config suggester
```

---

## 9 Framework Tables

All tables live in the `dq` schema (configurable). Three categories:

| Category | Tables |
|---|---|
| **Framework-Seeded** (auto-populated by `setup()`) | `masterDataCategory`, `masterPattern` |
| **User-Managed** (populated via `ConfigManager` or SQL) | `masterField`, `configFieldValues`, `configFieldAllowedPattern`, `configCustomQuery`, `mapDQChecks` |
| **Results** (auto-populated by `run_assessment()`) | `auditDQChecks`, `statDQChecks` |

| Table | Purpose |
|---|---|
| `masterDataCategory` | 27 field type classifications |
| `masterPattern` | 118 built-in validation patterns. Custom patterns use `_ID >= 1000` (IDs 1–999 are reserved for built-ins) |
| `masterField` | Source fields registered for assessment |
| `configFieldValues` | Data length (L01) and value range (L04) boundaries |
| `configFieldAllowedPattern` | Pattern allow/block rules per field |
| `configCustomQuery` | Custom validation expressions — plain regex, Spark SQL, or named validators (L02) |
| `mapDQChecks` | Maps source field → curated table column |
| `auditDQChecks` | Row-level violation audit log (failures only, append-only) |
| `statDQChecks` | Aggregated pass/fail statistics per execution (append-only) |

> **`generate_rule_functions()` output**: Prints a `•` line for each registered checker with its SQL expression count. Any DQ function referenced in `mapDQChecks` that has no rules configured (in `configFieldValues`, `configFieldAllowedPattern`, or `configCustomQuery`) is flagged with ⚠ and skipped by the assessment runner — avoiding misleading 100% quality scores for unchecked fields.

---

## Validation Levels (in order, early-exit on first failure)

Each generated field checker implements the same 5-level hierarchy as the SQL TVF:

| Level | Check Type | Priority |
|---|---|---|
| **L01** | Data Length range (min/max character count) | Checked first |
| **L02** | Custom expression rules (regex, Spark SQL, or named validator) | Checked second |
| **L03** | 118-pattern checks (in PatternPriority order) | Checked third |
| **L04** | Data Value range (min/max value comparison) | Checked fourth |
| **L99** | Default PASS (if all prior checks pass) | Final |

**Early exit**: The checker returns immediately on first failure.
Re-run after fixing the first violation to surface the next.

---

## 118 Patterns across 10 Categories

| Category | Count | Examples |
|---|---|---|
| `DataType1` | 3 | Is Fully Numeric, Is Fully Decimal, Is Fully Text |
| `DataType2` | 1 | Is AlphaNumeric |
| `DataType3` | 4 | Is Date, Is Time, Is Timestamp, Is Boolean |
| `SpecialCharacter` | 32 | Has Comma, Has At Sign, Has Hyphen, Has Pipe Symbol, … |
| `SpaceFound` | 1 | Has Space |
| `DataEmptiness` | 34 | Is Empty or NULL, Is Virtually Empty with Spaces, … |
| `InvalidKeyword` | 40 | Has Keyword-n/a, Has Keyword-null, Has Keyword-missing, … |
| `FullyDuplicatedCharacter` | 1 | Has Fully Duplicated Character (e.g. "aaaa") |
| `UnicodeCharacters` | 1 | Has Unicode Characters (outside printable ASCII 0x20–0x7E) |
| `CasingCheck` | 2 | Has Lowercase Character, Has Uppercase Character |

**Total: 118 built-in patterns.** Pattern IDs 1–999 are reserved for framework use.
To add custom patterns (e.g. domain-specific invalid keywords), use IDs `>= 1000`.

---

## Pattern Resolution (L0A — three levels of granularity)

| Level | Populate in `configFieldAllowedPattern` | Effect |
|---|---|---|
| Category-level | Set `PatternCategory` only | Applies to ALL patterns in that category |
| SubCategory-level | Set `PatternSubCategory` only | Applies to ALL patterns in that sub-category |
| Pattern-level | Set `PatternName` only | Applies to that specific named pattern |

**Precedence**: More specific rule (PatternName) overrides broader rule (Category).
**Within the same specificity level**, Allowed overrides Not Allowed — meaning a
specific "Allowed" at pattern level always wins over a broad "Not Allowed" at
category level.

Example: *"Block ALL special characters (category rule), EXCEPT allow Hyphen,
Full Stop, and At Sign (pattern-level overrides)"*

```python
cfg.block_category(1, "Source.T.FIELD", "SpecialCharacter")          # block all
cfg.allow_pattern(2, "Source.T.FIELD", "Has Hyphen")                  # except hyphen
cfg.allow_pattern(3, "Source.T.FIELD", "Has Full Stop")               # except full stop
cfg.allow_pattern(4, "Source.T.FIELD", "Has At Sign")                 # except @ sign
```

---

## Custom Validation Rules (L02)

The `configCustomQuery` table stores a `CustomQueryExpression` for each field.
Three options are available.

### Option 1: Plain regex (recommended for non-technical users)

```python
# Must match: value must satisfy the regex to PASS
cfg.add_custom_query_regex(1, "Source.T.EMAIL_ADDRESS",
                           r"^[^@\s]+@[^@\s]+\.[^@\s]+$", must_match=True,
                           description="Basic email format")

# Must NOT match: if the regex matches, the row FAILS
cfg.add_custom_query_regex(2, "Source.T.EMAIL_ADDRESS",
                           r"noemaildress", must_match=False,
                           description="Reject placeholder values")
```

The engine auto-detects regex patterns by the presence of metacharacters
(`^$+*?[({|\`) and applies `re.search()` automatically.

### Option 2: Spark SQL expression

```python
# @InputValue is replaced with the actual column reference at assessment time
cfg.add_custom_query_sql(1, "Source.T.AMOUNT",
                         "CAST(@InputValue AS DOUBLE) > 0",
                         is_condition_allowed=True,
                         description="Amount must be positive")
```

SQL expressions are applied at the DataFrame level via `F.expr()` — not inside a
Python UDF — so they support any Spark SQL built-in function.

### Option 3: Named validator (for complex logic)

Register a Python function once, then reference it by name in any field config:

```python
# Register (once, at cluster startup or in a setup cell)
dq.register_validator("au_mobile", lambda v: bool(__import__('re').match(r'^04[0-9]{8}$', v or '')))

# Reference by name in config
cfg.add_custom_query(1, "Source.T.MOBILE_NUMBER", "au_mobile", is_condition_allowed=True)
```

Built-in email validators are pre-registered:

| Validator Name | Description |
|---|---|
| `email_basic_format` | Regex: basic `x@y.z` format |
| `email_at_validation` | @ position, count, dot after @ |
| `email_domain_format` | 1–3 dots, no trailing dot, 2+ char labels |
| `email_local_part_dots` | Max 2 dots in local part |
| `email_no_placeholder` | Must not contain 'noemaildress' |
| `email_no_trailing_special` | Must not end with `.`, `-`, or `_` |

---

## Scope Filtering (`1=1` Pattern Preserved)

```python
# All schemas, all tables, all fields
dq.run_assessment()

# Specific schema
dq.run_assessment(schema_name="Curated")

# Specific table
dq.run_assessment(schema_name="Curated", table_name="Individual_Denorm")

# Single field
dq.run_assessment(schema_name="Curated",
                  table_name="Individual_Denorm",
                  field_name="FIRST_NAME")

# Reset previously-failed rows before re-assessment
dq.run_assessment(schema_name="Curated", reset_eligible_flag=True)
```

---

## Reporting

Four helper methods return Spark DataFrames ready for `.display()` or further transformation:

```python
exec_id = dq.run_assessment(schema_name="Curated")

dq.violations(exec_id).display()              # Row-level violations (field, violation type, value)
dq.quality_scores(exec_id).display()          # Pass/fail counts + % per field
dq.fields_below_threshold(threshold=80).display()  # Fields with quality < 80%
dq.summary_by_table(exec_id).display()        # Aggregated quality score per table
```

Two persistent reporting views are also created in the `dq` schema by `setup()`:

| View | Purpose |
|---|---|
| `v_auditDQChecks` | Row-level violations joined with field metadata |
| `v_statDQChecks` | Aggregated pass/fail statistics with `PercentageQualified` |

---

## AI Agent Support

The `dq_framework.agents` module provides three ways to use AI to configure the framework.
**No agent framework (Semantic Kernel, LangChain, AutoGen) is required.**

### Approach A — Auto-suggest from schema (no LLM needed)

```python
from dq_framework.agents import suggest_config

schema = [
    ("EMAIL_ADDRESS", "string"),
    ("FIRST_NAME",    "string"),
    ("POSTAL_CODE",   "string"),
    ("CUSTOMER_ID",   "int"),
]

# Returns ready-to-run ConfigManager code
code = suggest_config(schema, source_schema="Source", source_table="SRC_Customer",
                      curated_schema="Curated", curated_table="Customer_Denorm",
                      catalog="main")
print(code)
```

Or read schema directly from a live Delta table:

```python
schema = spark.table("main.Source.SRC_Customer").dtypes
code = suggest_config(schema, source_schema="Source", source_table="SRC_Customer", ...)
```

### Approach B — Chat assistant with tool calling

The 14 framework operations are exposed as OpenAI-compatible JSON Schema tool definitions.
They work unchanged with OpenAI, Azure OpenAI, Anthropic Claude, Semantic Kernel, AutoGen,
and LangChain.

```python
from dq_framework.agents import SYSTEM_PROMPT, DQ_TOOLS
from dq_framework.agents.tools import execute_tool_call
from openai import AzureOpenAI

client = AzureOpenAI(azure_endpoint="...", api_key="...", api_version="2024-05-01-preview")

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user",   "content": "Configure DQ rules for Source.SRC_Party.EMAIL_ADDRESS"},
    ],
    tools=DQ_TOOLS,
)

for call in response.choices[0].message.tool_calls or []:
    result = execute_tool_call(call.function.name, call.function.arguments, dq, cfg)
    print(f"[Tool] {call.function.name} → {result}")
```

See [notebooks/01_agent_assistant.py](notebooks/01_agent_assistant.py) for Azure OpenAI,
Anthropic Claude, and Semantic Kernel examples.

### Approach C — Paste system prompt into ChatGPT / Copilot

```python
from dq_framework.agents import SYSTEM_PROMPT
print(SYSTEM_PROMPT)  # Copy and paste into any chat interface
```

---

## Testing

### Unit tests — no cluster needed

Run locally with Python and `pytest`. No Databricks, no Spark, no Delta Lake required.

```bash
pip install dq-framework pytest
pytest tests/unit/ -v
```

| Test file | What it covers |
|---|---|
| `test_pattern_checks.py` | All major pattern check functions (DataEmptiness, SpecialChar, InvalidKeyword, …) |
| `test_resolve_pattern_rules.py` | Pattern precedence — specific overrides broad, inactive excluded |
| `test_generate_rule_functions.py` | L01/L02/L04 closure building, `FunctionRegistry`, stale function removal |
| `test_seed_master_data.py` | Seed data integrity: 27 categories, 118 patterns, no duplicate IDs |
| `test_suggest.py` | Schema-based config suggester: column classification, code generation |

### Integration tests — requires a Databricks cluster

Open [notebooks/02_integration_test.py](notebooks/02_integration_test.py) on a cluster.
It creates throwaway test schemas, runs the full pipeline end-to-end, and drops everything on completion.

| Test | What it verifies |
|---|---|
| Test 1 | Framework setup — 9 tables created, 27 categories, 118 patterns seeded |
| Test 2 | `setup()` is idempotent — no duplicate data on second call |
| Test 3 | Synthetic curated table created with known good/bad email values |
| Test 4 | `ConfigManager` — all 5 user-managed tables populated |
| Test 5 | `generate_rule_functions()` — checker built and directly invocable |
| Test 6 | `run_assessment()` — DQ columns written correctly, specific rows verified |
| Test 7 | Reporting views — violations count, quality scores, threshold filter, summary |
| Test 8 | `reset_eligible_flag` — failed rows cleared, re-assessment gives same result |
| Test 9 | Custom keyword extensibility via `add_invalid_keyword()` |
| Test 10 | `cfg.verify_config()` passes with no FK issues |

---

## Platform Requirements

| Requirement | Detail |
|---|---|
| Databricks Runtime | 12.0+ (includes PySpark 3.3+) |
| Delta Lake | 2.0+ |
| Python | 3.9+ |
| Unity Catalog | Recommended (pass `catalog=""` for legacy Hive metastore) |
| python-dateutil | Included in Databricks Runtime; required for full date/time validation |

---

## Adding a New Source Field

1. Register the field: `cfg.register_field(id, "Schema.Table.Column", data_category_type_id=N)`
2. Set length bounds: `cfg.set_field_values(id, ffn, min_data_length=M, max_data_length=N)`
3. Add pattern rules: `cfg.block_category(...)`, `cfg.allow_pattern(...)`, `cfg.block_pattern(...)`
4. Add custom expressions: `cfg.add_custom_query_regex(id, ffn, r"your_regex", must_match=True)`
5. (Optional) Add custom invalid keywords: `dq.add_invalid_keyword(id, "your_keyword")`
6. Map to curated column: `cfg.add_mapping(id, ffn, target_schema_name=..., target_table_name=..., ...)`
7. Validate FK integrity: `cfg.verify_config()`
8. Re-run: `dq.generate_rule_functions()`

---

## Known Limitations

| # | Area | Description |
|---|---|---|
| 1 | **Bug — `add_custom_query()`** | `CustomQueryDescription` column is `NOT NULL` in the DDL but the method passes `null` when no `description` argument is provided. Always supply a `description` when calling `add_custom_query()`, `add_custom_query_regex()`, or `add_custom_query_sql()` to avoid an INSERT constraint violation. |
| 2 | **Silent pattern skip** | If a `PatternName` in `configFieldAllowedPattern` does not match any row in `masterPattern`, the rule is silently ignored (equivalent to SQL's `ELSE NULL`). A typo in a pattern name will go undetected unless `verify_config()` is run. |
| 3 | **No transaction guarantees** | The assessment writes DQ columns, audit rows, and stat rows in three separate Spark actions. A cluster failure between steps can leave partial results. Re-run the assessment with `reset_eligible_flag=True` to recover. |
| 4 | **Date validation outside Databricks** | If `python-dateutil` is not installed, date/time pattern checks fall back to three hard-coded regex formats. This is rarely an issue on Databricks Runtime but may affect local unit test runs that exercise the date validators. |
| 5 | **No parameterised SQL** | Config is written via f-string SQL with single-quote escaping. Field names and expressions provided by trusted developers are safe; do not pass untrusted user input directly into ConfigManager methods. |

---

*Data Quality Non-Functional Assessment Framework — Databricks Edition*
*Generalised for open-source publication — no client-specific data included*
