Metadata-Version: 2.4
Name: snowflake-data-migration-orchestrator
Version: 1.3.3
Summary: Data migration orchestrator for Snowflake
Project-URL: Bug Tracker, https://github.com/snowflake-eng/migrations-data-validation/issues
Project-URL: Source code, https://github.com/snowflake-eng/migrations-data-validation/
Project-URL: homepage, https://www.snowflake.com/
Author-email: "Snowflake, Inc." <snowflake-python-libraries-dl@snowflake.com>
License: Snowflake Conversion Software Terms
Keywords: Snowflake,cloud,data,database,migration,orchestrator
Classifier: Development Status :: 4 - Beta
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Requires-Python: >=3.11
Requires-Dist: pandas>=2.3.3
Requires-Dist: pydantic>=2.0
Requires-Dist: pyyaml>=6.0
Requires-Dist: snowflake-connector-python>=4.0.0
Requires-Dist: urllib3>=2.6.0
Provides-Extra: development
Requires-Dist: parameterized>=0.9.0; extra == 'development'
Requires-Dist: pytest-cov>=4.0.0; extra == 'development'
Requires-Dist: pytest-mock>=3.10.0; extra == 'development'
Requires-Dist: pytest>=7.0.0; extra == 'development'
Requires-Dist: ruff>=0.1.0; extra == 'development'
Description-Content-Type: text/markdown

# Data Migration Orchestrator
The Python module that contains the logic for the Snowflake Data Migration Orchestrator.

## Table of Contents

- [Overview](#overview)
- [Commands](#commands)
- [Type Checking](#type-checking)
- [Docker](#docker)
- [License](#license)

## Overview
The **Data Migration Orchestrator** is a component that manages the many tasks related to data migration workflows/jobs.

### Architecture

The orchestrator uses a **task-based architecture** with:

- **Task Types**: Organized by executor (Orchestrator, Data Exchange Agent, Warehouse)
- **Factory Pattern**: Dynamic task handler creation
- **Template Method Pattern**: Consistent task handling interface
- **Incremental Updates**: Checksum-based change detection for efficient migrations

## Disclaimer
As of right now this is still just a prototype, and we can have some freedom with the way the code is structured. That will need to change soon.

## Commands

The orchestrator CLI supports multiple subcommands. When no subcommand is given, `start` is assumed for backward compatibility.

### Starting the Orchestrator
```bash
hatch run start
```

### Creating a Workflow
Validate a workflow configuration file and insert it into the `WORKFLOW` table:
```bash
hatch run create-workflow <config-file> [--connection-name <name>] [--name <workflow-name>] [--source-platform <platform>]
```

| Option | Default | Description |
|--------|---------|-------------|
| `config_file` | *(required)* | Path to the JSON workflow configuration file |
| `--connection-name` | env vars | Snowflake connection name (see [Connection Management](#connection-management)) |
| `--name` | `MY_WORKFLOW` | Name for the workflow record |
| `--source-platform` | `sqlserver` | Source platform (`sqlserver`, `redshift`) |

The command reads `affinity` from the configuration file itself (the top-level `affinity` field).

Example:
```bash
hatch run create-workflow workflow-config.json --connection-name MY_CONN --name prod_migration
```

### Linting
Run linting with [ruff](https://docs.astral.sh/ruff/):
```bash
# Run lint checks (with auto-fix)
hatch run linter:check
```

### Type Checking
Run static type checking with [ty](https://docs.astral.sh/ty/) (extremely fast Python type checker from Astral):
```bash
# Check all source code
hatch run types:check-ty

# Check specific path
hatch run types:check-ty tests

# Watch mode - automatically re-checks on file changes
hatch run types:watch-ty
```

You can also run type checking with mypy:
```bash
hatch run types:check
```

#### ty Diagnostics Baseline

The project uses a **diagnostics baseline** to prevent type error regressions. The CI will fail if the number of `ty` diagnostics increases beyond the baseline.

| Project | Baseline |
|---------|----------|
| data-migration-orchestrator | 35 |

To check diagnostics locally:
```bash
python .github/scripts/ty_check_diagnostics.py data-migration-orchestrator
```

If you **fix** type errors and reduce the count, please update the baseline in:
- `.github/workflows/data-migration-orchestrator-all-ci.yml`
- `.github/scripts/ty_check_diagnostics.py` (DEFAULT_BASELINES)

### Testing
Run the test suite:
```bash
# Run all tests with coverage
hatch run test_all.py3.11:check

# Run tests only
hatch run test_all.py3.11:unit
```

### CI Workflows

The following checks run automatically on PRs and pushes to `main`/`develop`:

1. **Linting** - Static analysis with ruff
2. **Type Check** - ty diagnostics baseline check
3. **Unit Tests** - pytest on Python 3.11 and 3.12 with coverage

### Docker Commands
Build image:
```bash
hatch run docker-build
```

Alternatively, build directly with Docker from the **parent directory** (required due to shared module dependencies):
```bash
# From the migrations-data-validation directory (parent)
docker build -f data-migration-orchestrator/Dockerfile -t data-migration-orchestrator:latest .
```

Run container:
```bash
hatch run docker-run
```

Interactive mode (debugging):
```bash
hatch run docker-run-interactive
```

## Docker

### Setup
1. Create environment file from template:
   ```bash
   cp .env.example .env
   ```

2. Edit `.env` with your Snowflake credentials:
   ```env
   SNOWFLAKE_ACCOUNT=your_account
   SNOWFLAKE_USER=your_user
   SNOWFLAKE_PASSWORD=your_password
   SNOWFLAKE_WAREHOUSE=your_warehouse
   SNOWFLAKE_DATABASE=your_database
   SNOWFLAKE_SCHEMA=your_schema
   ```

3. Build and run:
   ```bash
   hatch run docker-build
   hatch run docker-run
   ```

### Connection Management
The application automatically detects the environment:

- **Local**: Uses credentials from `.env` file (via `SNOWFLAKE_ACCOUNT`, `SNOWFLAKE_USER`, `SNOWFLAKE_PASSWORD`, etc.)
- **SPCS**: Automatically uses Snowflake-injected credentials (OAuth)

No code changes needed when deploying to Snowflake Container Services (SPCS).

## End-to-End Flow
For testing the end-to-end flow, with one **Data Migration Orchestrator** running and one (or more) **Data Exchange Agents**, we have the following options:
1. Running all components locally.
2. Running all components in SPCS.
3. Mixed approach: running some components locally and other SPCS.

### Running all components locally
#### Starting the Data Migration Orchestrator
These steps will start the **Data Migration Orchestrator** locally, and when that is orchestrated, the corresponding resources in the Snowflake account will also be set up. That will allow you to start **Data Exchange Agents** in the same account.
*<snowflake-connection-name>* must be the name of the connection you want to use, as it appears on your `config.toml`/`connections.toml`.
1. Go to the `data-migration-orchestrator` directory (`cd data-migration-orchestrator`).
2. `export SNOWFLAKE_CONNECTION_NAME=<snowflake-connection-name>`.
3. `export DATA_MIGRATION_ORCHESTRATOR_AFFINITY=<affinity>` (this is optional, but is recommended for local development, to avoid confusion with multiple orchestrator instances from different devs).
4. `hatch run start` to start the program (it can also be started from the **VS Code** debugger just by clicking **Run and Debug** while being on the `main.py` file for this project).

#### Starting the Data Exchange Agent
These steps will start the **Data Exchange Agent** locally. They must be performed after the corresponding resources have been set up in the Snowflake account (which is done when the **Data Migration Orchestrator** starts).
1. Go to the `data-exchange-agent` directory (`cd data-exchange-agent`)
2. Create a `configuration.toml` file in the `data-exchange-agent` (it will be ignored by **Git**). there is a `configuration_example.toml` file in that same directory, which can be used as an example. A minimal configuration is also shown below.
3. `hatch run data-exchange-agent` to start the program (it can also be started from the **VS Code** debugger just by clicking **Run and Debug** while being on the `main.py` file for this project).

Example minimal config:
```toml
selected_task_source = "snowflake_stored_procedure"

[task_source.snowflake_stored_procedure]
connection_name = <snowflake-connection-name>

[application]
workers = 1
task_fetch_interval = 5s
debug_mode = false
affinity = <affinity>

[connections.target.snowflake_connection_name]
connection_name = <snowflake-connection-name>

[connections.source.sqlserver]
username = <sql-server-username>
password = <sql-server-password>
database = <sql-server-database-name>
host = <sql-server-host>
port = 1433
```

#### Creating a Workflow
Prepare a `workflow-config.json` (see [User Guide](docs/user-guide.md) for the configuration format). The `affinity` field inside the config must match the affinity you are using for the **Data Exchange Agent**.

```bash
hatch run create-workflow workflow-config.json --connection-name <snowflake-connection-name>
```

This validates the configuration and creates a new record in the `SNOWCONVERT_AI.DATA_MIGRATION.WORKFLOW` table. If `--connection-name` is omitted, the connection from `SNOWFLAKE_CONNECTION_NAME` (or other env vars) is used.

### Running all components in SPCS
#### Starting the Data Migration Orchestrator
You must run the `./data-migration-orchestrator/scripts/upload-orchestrator.sh` script (on the root of the repository). This will update the image for the **Data Exchange Agent** and push it to the image registry on the Snowflake account. This script takes an optional `--connection` parameter that receives the name of the Snowflake connection. If not provided, the default will be used.

Then, execute this in the Snowflake account:
```sql
CREATE SERVICE IF NOT EXISTS SNOWCONVERT_AI.DATA_MIGRATION.DATA_MIGRATION_SERVICE
IN COMPUTE POOL <compute-pool-name>
FROM SPECIFICATION 
$$
spec:
  containers:
  - name: orchestrator
    image: /SNOWCONVERT_AI/PUBLIC/IMAGES/data-migration-orchestrator:latest
    env:
      SNOWFLAKE_WAREHOUSE: <warehouse>
    resources:
      requests:
        memory: 2Gi
        cpu: 0.5
$$
QUERY_WAREHOUSE = <warehouse>;
```

#### Starting the Data Exchange Agent
You must run the `./data-exchange-agent/scripts/upload-agent.sh` script (on the root of the repository). This will update the image for the **Data Exchange Agent** and push it to the image registry on the Snowflake account. This script takes an optional `--connection` parameter that receives the name of the Snowflake connection. If not provided, the default will be used.

Then, execute this in the Snowflake account:
```sql
-- 1 Create secret and grant usage over it (Source System)
CREATE OR REPLACE SECRET MY_DB.SECRETS.SQL_SERVER_PASSWORD
  TYPE = PASSWORD
  USERNAME = <sql-server-username>
  PASSWORD = <sql-server-password>;

GRANT USAGE ON SECRET MY_DB.SECRETS.SQL_SERVER_PASSWORD TO ROLE <data-migration-role-name>;

-- 2. Create network rule
CREATE OR REPLACE NETWORK RULE MY_DB.SECRETS.SQL_SERVER_PASSWORD
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('<sql-server-host>');

-- 3. Create External Access Integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION SQL_SERVER_INTEGRATION
  ALLOWED_NETWORK_RULES = (DM_DEMO_AZURE_SQL_EGRESS_RULE)
  ENABLED = TRUE;

GRANT USAGE ON INTEGRATION SQL_SERVER_INTEGRATION TO ROLE <data-migration-role-name>;

-- 4. Create Service
CREATE SERVICE SNOWCONVERT_AI.DATA_MIGRATION.DATA_EXCHANGE_SERVICE
  IN COMPUTE POOL <compute-pool-name>
  FROM SPECIFICATION $$
    spec:
      containers:
        - name: agent
          image: /snowconvert_ai/public/images/data-exchange-agent:latest
          env:
            # Source System Config
            DATA_SOURCE_HOST: <sql-server-host>
            DATA_SOURCE_PORT: <1433>
            DATA_SOURCE_DATABASE: <sql-server-database>

            # Snowflake Config
            SNOWFLAKE_WAREHOUSE: <snowflake-warehouse>
            
            # Application
            WORKER_COUNT: 1
            AGENT_AFFINITY: <affinity>
            
          secrets:
            # Secrets (Source System)
          - snowflakeSecret: MY_DB.SECRETS.SQL_SERVER_PASSWORD
            secretKeyRef: USERNAME
            envVarName: DATA_SOURCE_USERNAME
          - snowflakeSecret: MY_DB.SECRETS.SQL_SERVER_PASSWORD
            secretKeyRef: PASSWORD
            envVarName: DATA_SOURCE_PASSWORD
  $$
  -- This line links the network hole to the service
  EXTERNAL_ACCESS_INTEGRATIONS = (SQL_SERVER_INTEGRATION)
  MIN_INSTANCES = 6
  MAX_INSTANCES = 6;
```

#### Creating a Workflow
Creating a workflow is the same independently of where the components are deployed (see [Creating a Workflow](#creating-a-workflow-1) above).

### Mixed approach
You can combine the two approaches and run only the **Data Exchange Agent** locally and the **Data Migration Orchestrator** in SPCS (or the other way around). There is no significant difference when doing this.

## Considerations
1. If the **Workflow Config** schema changes, update the `example-workflow-config.json` file. Also, update the mechanism for schema validation in **SnowConvert Desktop**.
2. If the **Data Exchange Agent Config** schema changes, update the `data-exchange-agent/src/data_exchange_agent/configuration_example.toml` (and possibly the `data-exchange-agent/docker-artifacts/configuration.template.toml`).
3. For development, it is useful to set orchestrator affinity (by setting the `DATA_MIGRATION_ORCHESTRATOR_AFFINITY` to your affinity value). This will prevent other orchestrator instances (from other devs) from picking up the workflows you have created (and the other way around).
