Metadata-Version: 2.4
Name: sodas-pipeline-sdk
Version: 0.1.0
Summary: SODAS Pipeline SDK for building Argo Workflows with quality analysis pipelines
Author: ETRI SeongHwan Kim
License: ETRI
Keywords: argo,data-quality,pipeline,workflows
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.12
Requires-Dist: boto3>=1.38.0
Requires-Dist: click>=8.1.0
Requires-Dist: httpx>=0.28.1
Requires-Dist: jinja2>=3.1.6
Requires-Dist: pandas>=2.0.0
Requires-Dist: pydantic-settings<3.0,>=2.0
Requires-Dist: pydantic<3.0,>=2.10
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: pyyaml>=6.0
Requires-Dist: requests>=2.31.0
Description-Content-Type: text/markdown

# SODAS Pipeline SDK

Python SDK for building Argo Workflows with data quality analysis pipelines for SODAS Platform.

## Overview

SODAS Pipeline SDK provides a type-safe, configuration-driven approach to building distributed data quality pipelines on Argo Workflows.

### Key Features

**Configuration Management**

- 3-layer hierarchical config (Infrastructure → Registry → Workflow)
- Centralized DataHub registry with IRI-based reference resolution

**Developer Experience**

- Type-safe workflow construction with Pydantic validation
- Helper API with method chaining and auto-wiring
- Smart defaults: `TaskHelper()` auto-loads settings from config.yaml

**Workflow Capabilities**

- Load, edit, and modify existing workflows programmatically
- Insert, delete, replace tasks with automatic dependency rewiring
- Navigation API for upstream/downstream task analysis

**Export & Integration**

- Direct string export: `workflow.to_json()`, `workflow.to_yaml()`
- File export with optional return: `workflow.export("file.json", return_string=True)`
- S3 storage integration and Argo Workflows compatible output

---

## Quick Start

### Installation

```bash
# Install UV package manager and dependencies
./setup.sh
```

### Simple Example

```python
from sodas_pipeline_sdk.core.helpers import TaskHelper, WorkflowHelper

# Create helpers
task_helper = TaskHelper()
workflow = WorkflowHelper("my-workflow")

# Create tasks with smart defaults
task1 = task_helper.create_task(
    name="process-data",
    image="data-processor:latest",
    command=["python", "process.py"],
    settings={"input_path": "/data/input.csv"},
    output_path="processed.csv"
)

# Add to workflow
workflow.add_task(task1)

# Export as JSON string
json_str = workflow.to_json(auto_run=True)

# Or save to file
workflow.export("workflow.json", format='json')
```

---

## Core Concepts

### 1. Helper API (Recommended)

The Helper API provides the simplest and most intuitive way to build workflows:

```python
from sodas_pipeline_sdk.core.helpers import TaskHelper, WorkflowHelper

# Initialize helpers
task_helper = TaskHelper()

# Set defaults once - applies to all tasks
task_helper.set_defaults(
    bucket="jobs",
    workspace_prefix="dqv",
    datahub="ddhf"
)

workflow = WorkflowHelper("data-quality-workflow")

# Create tasks with auto-wiring
task1 = task_helper.create_task(
    name="fetch-data",
    image="sodas-dataset",
    settings={"dataset_iri": "http://example.org/dataset/123"},
    output_path="dataset.csv"
)

task2 = task_helper.create_task(
    name="analyze-quality",
    image="qualityreport",
    auto_input_from="fetch-data",  # Auto dependency + input wiring
    settings={"report_type": "completeness"}
)

workflow.add_task(task1).add_task(task2)

# Export
json_output = workflow.to_json(auto_run=True)
yaml_output = workflow.to_yaml(auto_run=True)
```

### 2. Workflow Editing

Load, edit, and save existing workflows:

```python
# Load existing workflow
workflow = WorkflowHelper.from_file("existing_workflow.json")

# Insert new task
new_task = task_helper.create_task(
    name="validation",
    image="validator:latest"
)
workflow.insert_task_after("fetch-data", new_task, rewire_outputs=True)

# Delete task with auto-rewiring
workflow.delete_task("old-task", rewire=True)

# Replace task while preserving connections
workflow.replace_task("analyze-quality", updated_task, preserve_connections=True)

# Export modified workflow
workflow.export("modified_workflow.json", return_string=True)
```

---

## API Examples

### Create Data Quality Pipeline

```python
from sodas_pipeline_sdk.core.helpers import TaskHelper, WorkflowHelper

def create_quality_pipeline(dataset_iri: str) -> WorkflowHelper:
    """Create SODAS dataset quality analysis pipeline"""

    task_helper = TaskHelper()
    task_helper.set_defaults(
        bucket="jobs",
        workspace_prefix="dqv",
        datahub="ddhf"
    )

    workflow = WorkflowHelper("quality-pipeline")

    # Fetch dataset
    fetch = task_helper.create_task(
        name="sodas-dataset",
        image="sodas-dataset",
        command=["python", "-m", "task"],
        settings={
            "dataset_iri": dataset_iri,
            "dqv_api_endpoint": "http://dqv-server:30301",
            "datahub_api_url": "http://datahub:30001"
        },
        output_path="dataset.csv"
    )
    workflow.add_task(fetch)

    # Quality reports (run in parallel)
    reports = ["erroneous-data", "relation-data", "missing-data", "bias-data"]
    for report in reports:
        report_task = task_helper.create_task(
            name=f"{report}-report",
            image="qualityreport",
            command=["python", "-m", "src.report.report_generator2"],
            settings={
                "dataset_iri": dataset_iri,
                "notebook_name": f"{report.replace('-', '_')}.ipynb"
            },
            output_path=f"{report}_report.json",
            auto_input_from="sodas-dataset"
        )
        workflow.add_task(report_task)

    # Aggregate report
    total = task_helper.create_task(
        name="dqv-total",
        image="qualityreport",
        settings={"notebook_name": "dqv_total.ipynb"},
        auto_input_from="sodas-dataset",
        dependencies=reports
    )
    workflow.add_task(total)

    return workflow

# Usage
workflow = create_quality_pipeline("http://example.org/dataset/123")
workflow.export("quality_pipeline.json", format='json')
```

### Export Formats

```python
workflow = WorkflowHelper("my-workflow")

# Method 1: Direct string export (simplest)
json_str = workflow.to_json(auto_run=True)
yaml_str = workflow.to_yaml(auto_run=True)

# Method 2: File export with optional string return
json_str = workflow.export(
    output_path="workflow.json",
    return_string=True,
    format='json'
)

# Method 3: File only (no return)
workflow.export("workflow.yaml", format='yaml')
```

---

## Configuration

### Hierarchical Configuration Structure

The SDK uses a **3-layer configuration** (`config/config.yaml`):

```yaml
# Layer 1: Infrastructure - Low-level infrastructure settings
infrastructure:
  s3:
    endpoint: "http://your-minio-endpoint.example.com"
    access_key: "YOUR_ACCESS_KEY"
    secret_key: "YOUR_SECRET_KEY"

  argo:
    harbor_registry: "your-harbor-registry.example.com"
    harbor_project: "your-project-name"
    default_cpu: "2"
    default_memory: "2048Mi"

# Layer 2: Registry - Reference data (datahub mappings)
registry:
  datahubs:
    datahub_example_1:
      id: "00000000-0000-0000-0000-000000000001"
      name: "Example Datahub 1"
      iri: "http://api.example1.com:30000/"

    datahub_example_2:
      id: "00000000-0000-0000-0000-000000000002"
      name: "Example Datahub 2"
      iri: "http://api.example2.com:30000/"

# Layer 3: Workflow - Workflow-level defaults
workflow:
  defaults:
    datahub: "datahub_example_1" # Must exist in registry.datahubs
    s3:
      bucket: "your-bucket-name"
      workspace_prefix: "your-workspace-prefix"
```

## License

Copyright (c) 2025 ETRI

Licensed under the [ETRI license](LICENSE).

See [LICENSE](LICENSE) file for details.

---
