Metadata-Version: 2.4
Name: dbx-elt-utils
Version: 2.2.1
Summary: Librería de utilidades comunes para pipelines ELT orientados a Databricks (DLT).
Author-email: DBX Analyst <developer@example.com>
Requires-Python: >=3.9
Provides-Extra: local
Requires-Dist: databricks-connect>=14.0.0; extra == 'local'
Description-Content-Type: text/markdown

# DBX ELT Utilities v2.0

`dbx-elt-utils` es la libreria estandar para crear pipelines **Lakeflow Spark Declarative Pipelines (SDP)** en Databricks. Proporciona metodos para ingesta hibrida, setup automatico de entorno (Local vs Nube), limpieza de datos, y validacion de calidad.

> **v2.0** — Soporte completo para la nueva API `pyspark.pipelines` (`dp`), Liquid Clustering, `ExpectationsFactory`, 5 funciones nuevas de limpieza, y checkpoint en UC Volumes.

## Instalacion

```bash
# Produccion (Databricks Runtime)
pip install dbx-elt-utils

# Desarrollo local (VSCode + Databricks Connect)
pip install dbx-elt-utils[local]
```

> La libreria no fuerza `pyspark` como dependencia, evitando conflictos con el Databricks Runtime.

---

## Quick Start

```python
from dbx_elt_utils.notebook_utils import init_notebook

notebook = init_notebook()

env   = notebook.env     # '_dev' o '_prod'
spark = notebook.spark   # Sesion PySpark
dp    = notebook.dp      # API SDP (pyspark.pipelines o mock local)
```

---

## Modulos

### 1. `notebook_utils` — Setup y Mock Local

La funcion `init_notebook()` detecta automaticamente si estas en Databricks o en VSCode y configura todo:

| Atributo | Descripcion |
|---|---|
| `notebook.env` | Sufijo de entorno: `_dev` o `_prod` |
| `notebook.spark` | Sesion PySpark (nativa o Connect) |
| `notebook.dp` | Modulo SDP (real o `_SdpMock`) |
| `notebook.dlt` | Alias legacy (= `notebook.dp`) |
| `notebook.is_local` | `True` si corre en VSCode |

**Mock SDP completo** — en local, `notebook.dp` es un `_SdpMock` que soporta todos los decoradores sin error:

```python
@dp.materialized_view(name="mi_tabla", comment="...")
@dp.expect("id_not_null", "id IS NOT NULL")
def mi_tabla():
    return spark.read.table("source")
```

**Funciones de testing:**

| Funcion | Descripcion |
|---|---|
| `get_test_spark()` | Sesion Spark (Workspace o Databricks Connect) |
| `get_local_source_table(spark, fqn)` | Resuelve tabla oficial vs temporal de test |
| `clean_local_test_table(spark, fqn)` | Elimina tabla `_tmp_sql` despues del test |
| `display_test_results(spark, data)` | Muestra DataFrame adaptado al entorno |
| `stop_local_spark()` | Libera streams activos sin matar la sesion |
| `get_checkpoint_base(env)` | Ruta checkpoints en UC Volumes |
| `get_schema_location_base(env)` | Ruta schema location (hash-based) |

### 2. `ingest_utils` — Ingesta Hibrida (Bronze)

```python
from dbx_elt_utils.ingest_utils import ingesta_hibrida

df = ingesta_hibrida(spark, SOURCE_LANDING, tipo="auto_detect")
```

Detecta automaticamente el tipo de origen y retorna el DataFrame adecuado:

| Tipo detectado | Origen | Metodo |
|---|---|---|
| `external_table` | Tabla Unity Catalog | `spark.readStream.table()` |
| `delta_path` | Ruta Delta (dbfs/abfss) | `spark.readStream.format("delta")` |
| `auto_loader` | Archivos (CSV/JSON/Parquet) | `cloudFiles` con schema evolution |
| `batch_files` | Ruta sin streaming | `spark.read.format()` |

**Deteccion inteligente:**
- Rutas con `/` -> archivos (Auto Loader o Delta path segun contenido `_delta_log`)
- Formato `catalog.schema.table` -> tabla Unity Catalog
- `schemaEvolutionMode=addNewColumns` activado automaticamente en Auto Loader

### 3. `clean_utils` — Limpieza y Transformacion (Silver)

```python
from dbx_elt_utils.clean_utils import (
    extraer_valor_array_string, parse_json_array_column,
    normalize_string, safe_cast, dedup_by_key, add_surrogate_key
)
```

| Funcion | Que hace | Ejemplo |
|---|---|---|
| `extraer_valor_array_string(col)` | `["12345"]` a `12345`, `[]` a `NULL` | Limpia arrays JSON de APIs |
| `parse_json_array_column(df, col, alias)` | `["a","b"]` a array real de Spark | Explode-ready |
| `normalize_string(col)` | Trim + lowercase + sin acentos | Estandarizacion de texto |
| `safe_cast(col, tipo)` | Cast seguro sin error (NULL si falla) | `safe_cast(col("edad"), "int")` |
| `dedup_by_key(df, keys, order_col)` | Dedup por clave con row_number | CDC / merge patterns |
| `add_surrogate_key(df, cols, alias)` | SHA-256 surrogate key | `add_surrogate_key(df, ["id","src"])` |

### 4. `expectations_utils` — Validacion de Calidad (Nuevo v2.0)

```python
from dbx_elt_utils.expectations_utils import ExpectationsFactory as EF

rules = EF.combine(
    EF.not_null("id", "nombre"),
    EF.in_range("edad", 0, 150),
    EF.freshness("updated_at", max_hours=48),
    EF.in_set("estado", ["ACTIVO", "INACTIVO"]),
)

@dp.table(name="mi_tabla")
@dp.expect_all_or_drop(rules)
def mi_tabla():
    ...
```

| Metodo | Genera |
|---|---|
| `EF.not_null("col")` | `col IS NOT NULL` |
| `EF.not_empty("col")` | `col IS NOT NULL AND trim(col) != ''` |
| `EF.positive("col")` | `col > 0` |
| `EF.in_range("col", 0, 100)` | `col >= 0 AND col <= 100` |
| `EF.in_set("col", [...])` | `col IN ('a', 'b', 'c')` |
| `EF.freshness("col", 24)` | `col >= current_timestamp() - INTERVAL 24 HOURS` |
| `EF.regex_match("col", pattern)` | `col RLIKE 'pattern'` |
| `EF.combine(...)` | Fusiona multiples dicts en uno |

---

## Ejemplo Completo: Bronze + Silver

```python
# -- Bronze --
from dbx_elt_utils.notebook_utils import init_notebook
from dbx_elt_utils.ingest_utils import ingesta_hibrida

notebook = init_notebook()
env, spark, dp = notebook.env, notebook.spark, notebook.dp

SOURCE = f"landing{env}.schema.mi_tabla"

@dp.materialized_view(
    name="mi_tabla",
    comment=f"Bronze desde {SOURCE}",
    cluster_by=["id"]
)
def bronze_mi_tabla():
    df = ingesta_hibrida(spark, SOURCE)
    df.createOrReplaceTempView("v_raw")
    return spark.sql("SELECT *, current_timestamp() as _ingested_at FROM v_raw")
```

```python
# -- Silver (CDC SCD1) --
from dbx_elt_utils.expectations_utils import ExpectationsFactory as EF
from pyspark.sql.functions import col

BRONZE_SOURCE = f"bronze{env}.schema.mi_tabla"

dp.create_streaming_table(
    name="mi_tabla",
    cluster_by=["id"],
    expect_all_or_drop=EF.combine(
        EF.not_null("id"),
        EF.not_empty("nombre")
    )
)

dp.apply_changes(
    target="mi_tabla",
    source=BRONZE_SOURCE,
    keys=["id"],
    sequence_by=col("_ingested_at"),
    stored_as_scd_type=1
)
```

---

## Changelog

### v2.0.0
- `notebook.dp` — Alias para `pyspark.pipelines` (nueva API SDP)
- `_SdpMock` completo: `materialized_view`, `table`, `expect*`, `apply_changes`, `create_streaming_table`, `append_flow`
- Checkpoint path en UC Volumes (`/Volumes/bronze{env}/temporary/checkpoints`)
- Schema location hash-based para Auto Loader
- `ExpectationsFactory` — Modulo nuevo de validacion de calidad
- `clean_utils`: 5 funciones nuevas (`normalize_string`, `safe_cast`, `dedup_by_key`, `add_surrogate_key`, `parse_json_array_column`)
- `ingest_utils`: Deteccion `delta_path` y tipo `batch_files`
- Dual import: `pyspark.pipelines` con fallback `dlt`

### v1.3.1
- Version estable anterior (API `@dlt.table`)
