Metadata-Version: 2.4
Name: antaris-pipeline
Version: 5.0.1
Summary: Unified orchestration pipeline for Antaris Analytics Suite
Author-email: Antaris Analytics <dev@antarisanalytics.com>
License: Apache-2.0
Project-URL: Homepage, https://antarisanalytics.ai
Project-URL: Documentation, https://docs.antarisanalytics.ai
Project-URL: Repository, https://github.com/antaris-analytics/antaris-pipeline
Project-URL: Issues, https://github.com/antaris-analytics/antaris-pipeline/issues
Keywords: ai,agents,pipeline,orchestration,telemetrics
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: antaris-memory>=2.0.0
Requires-Dist: antaris-router>=3.0.0
Requires-Dist: antaris-guard>=2.0.0
Requires-Dist: antaris-context>=2.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: click>=8.0.0
Requires-Dist: rich>=13.0.0
Requires-Dist: asyncio-dgram>=2.1.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: isort>=5.12.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Requires-Dist: pre-commit>=3.0.0; extra == "dev"
Provides-Extra: telemetrics
Requires-Dist: clickhouse-driver>=0.2.6; extra == "telemetrics"
Requires-Dist: uvicorn>=0.20.0; extra == "telemetrics"
Requires-Dist: fastapi>=0.100.0; extra == "telemetrics"
Requires-Dist: websockets>=11.0.0; extra == "telemetrics"
Dynamic: license-file

# antaris-pipeline

**The unified orchestration engine for the Antaris AI suite.**

`antaris-pipeline` ties together memory, routing, guard, and context into a single, production-ready processing loop. It handles the full lifecycle of every LLM interaction — from input safety scanning through intelligent model selection, context optimization, response generation, and memory persistence — while emitting structured telemetry at every phase.

---

## 📦 Installation

```bash
pip install antaris-pipeline
```

**Version:** 4.9.20

**Dependencies (auto-installed):**
| Package | Role |
|---|---|
| `antaris-memory` | Long-term memory storage, BM25 retrieval, context packets |
| `antaris-router` | Smart model selection, provider routing, fallback chains |
| `antaris-guard` | Input/output safety scanning, threat classification |
| `antaris-context` | Token budgeting, context compression, adaptive optimization |

---

## 🗂️ Table of Contents

1. [Quick Start](#-quick-start)
2. [Core Exports](#-core-exports)
3. [create_pipeline() Factory](#-create_pipeline-factory)
4. [Pipeline Profile Shortcuts](#-pipeline-profile-shortcuts)
5. [ProfileType Presets](#-profiletype-presets)
6. [PipelineConfig & Sub-Configs](#-pipelineconfig--sub-configs)
7. [AntarisPipeline — Main Orchestrator](#-antarispipeline--main-orchestrator)
8. [The 8-Phase Processing Pipeline](#-the-8-phase-processing-pipeline)
9. [PipelineResult](#-pipelineresult)
10. [Dry Run Mode](#-dry-run-mode)
11. [Memory Retrieval](#-memory-retrieval)
12. [Session Lifecycle Hooks](#-session-lifecycle-hooks)
13. [Custom Token Estimator](#-custom-token-estimator)
14. [Performance & Intelligence](#-performance--intelligence)
15. [AgentPipeline — Agent Lifecycle Wrapper](#-agentpipeline--agent-lifecycle-wrapper)
16. [Hook System](#-hook-system)
17. [Telemetrics & Events](#-telemetrics--events)
18. [Cross-Package Intelligence Flows](#-cross-package-intelligence-flows)
19. [Complete Integration Example](#-complete-integration-example)

---

## ⚡ Quick Start

```python
from antaris_pipeline import create_pipeline

# Create a balanced pipeline (memory + routing + guard + context)
pipeline = create_pipeline(
    storage_path="./memory_store",
    session_id="my-session",
    agent_name="MyBot",
)

# Process a user message end-to-end
result = pipeline.process(
    input_text="What's the capital of France?",
    model_caller=lambda text: call_your_llm(text),
)

if result.success:
    print(result.output)
else:
    print(f"Error: {result.error}")
```

---

## 📋 Core Exports

All public symbols are importable directly from `antaris_pipeline`:

```python
from antaris_pipeline import (
    # Core orchestrators
    Pipeline,
    AntarisPipeline,
    PipelineResult,
    create_pipeline,

    # Agent lifecycle wrapper
    AgentPipeline,
    PreTurnResult,
    PostTurnResult,

    # Configuration
    PipelineConfig,
    MemoryConfig,
    RouterConfig,
    GuardConfig,
    ContextConfig,
    TelemetricsConfig,

    # Profile system
    ProfileType,
    create_config,
    PROFILE_PRESETS,

    # Events & telemetrics
    AntarisEvent,
    EventType,
    ConfidenceBasis,
    PerformanceMetrics,
    EventEmitter,
    TelemetricsCollector,
    TelemetricsServer,
    PipelineTelemetry,

    # Hook system
    HookPhase,
    HookContext,
    HookResult,
    HookRegistry,
    PipelineHooks,
    HookCallback,

    # Profile shortcut factories
    balanced_pipeline,
    strict_pipeline,
    cost_optimized_pipeline,
    performance_pipeline,
    debug_pipeline,

    # Event helpers
    memory_event,
    router_event,
    guard_event,
    context_event,
)
```

---

## 🏭 `create_pipeline()` Factory

The recommended entry point for creating a fully-wired pipeline instance.

### Signature

```python
def create_pipeline(
    storage_path: str,
    memory_config: dict | None = None,
    router_config: dict | None = None,
    guard_config: dict | None = None,
    context_config: dict | None = None,
    pipeline_config: PipelineConfig | None = None,
    session_id: str | None = None,
    agent_name: str | None = None,
) -> AntarisPipeline:
```

### Parameters

| Parameter | Type | Default | Description |
|---|---|---|---|
| `storage_path` | `str` | — | Path to the memory store directory. Created if it doesn't exist. |
| `memory_config` | `dict \| None` | `None` | Dict of `MemoryConfig` field overrides (e.g. `{"half_life": 7.0}`). |
| `router_config` | `dict \| None` | `None` | Dict of `RouterConfig` field overrides. |
| `guard_config` | `dict \| None` | `None` | Dict of `GuardConfig` field overrides (e.g. `{"sensitivity": "balanced"}`). |
| `context_config` | `dict \| None` | `None` | Dict of `ContextConfig` field overrides (e.g. `{"total_budget": 8000}`). |
| `pipeline_config` | `PipelineConfig \| None` | `None` | Full `PipelineConfig` object. When provided, individual dicts are ignored. |
| `session_id` | `str \| None` | `None` | Session identifier. Auto-generated UUID if not provided. |
| `agent_name` | `str \| None` | `None` | Name of the agent. Used for memory filtering and context restoration. |

### Example

```python
from antaris_pipeline import create_pipeline

pipeline = create_pipeline(
    storage_path="./memory_store",
    memory_config={"half_life": 7.0},
    router_config={},
    guard_config={"sensitivity": "balanced"},
    context_config={"total_budget": 8000, "template": "agent_with_tools"},
    pipeline_config=None,
    session_id="my-session",
    agent_name="MyBot",
)
```

---

## 🚀 Pipeline Profile Shortcuts

Five pre-configured factory functions for common deployment scenarios. Each wraps `create_pipeline()` with tuned defaults.

```python
from antaris_pipeline import (
    balanced_pipeline,
    strict_pipeline,
    cost_optimized_pipeline,
    performance_pipeline,
    debug_pipeline,
)
```

### `balanced_pipeline(storage_path, **kwargs) -> AntarisPipeline`

General-purpose profile. Moderate safety, standard routing, compression enabled.

```python
pipeline = balanced_pipeline("./store", agent_name="MyBot")
```

### `strict_pipeline(storage_path, **kwargs) -> AntarisPipeline`

High-safety profile. Strict guard sensitivity, conservative routing, output scanning enforced. Ideal for customer-facing or regulated environments.

```python
pipeline = strict_pipeline("./store")
```

### `cost_optimized_pipeline(storage_path, **kwargs) -> AntarisPipeline`

Minimizes cost. Prefers cheaper models, aggressive compression, reduced memory retrieval limits.

```python
pipeline = cost_optimized_pipeline("./store")
```

### `performance_pipeline(storage_path, **kwargs) -> AntarisPipeline`

Optimized for low latency. Faster models preferred, reduced guard overhead, adaptive budgeting enabled.

```python
pipeline = performance_pipeline("./store")
```

### `debug_pipeline(storage_path, **kwargs) -> AntarisPipeline`

Full telemetrics, verbose logging, all hooks active. Use during development to inspect every phase.

```python
pipeline = debug_pipeline("./store")
# Emits complete JSONL trace to ./telemetrics/
```

### Comparison Table

| Profile | Safety | Cost | Speed | Telemetrics |
|---|---|---|---|---|
| `balanced` | Medium | Medium | Medium | Standard |
| `strict` | High | Higher | Slower | Standard |
| `cost_optimized` | Medium | Low | Medium | Minimal |
| `performance` | Medium | Medium | Fast | Standard |
| `debug` | Medium | Medium | Medium | Full |

---

## 🎛️ ProfileType Presets

`ProfileType` is an enum. Use `create_config()` to generate a `PipelineConfig` from a named preset. This is useful when you want to start from a preset and customize further.

```python
from antaris_pipeline import ProfileType, create_config

config = create_config(ProfileType.BALANCED)
config = create_config(ProfileType.STRICT_SAFETY)
config = create_config(ProfileType.COST_OPTIMIZED)
config = create_config(ProfileType.PERFORMANCE)
config = create_config(ProfileType.DEBUG)
```

### Available ProfileType Values

| Enum Value | Equivalent Shortcut |
|---|---|
| `ProfileType.BALANCED` | `balanced_pipeline()` |
| `ProfileType.STRICT_SAFETY` | `strict_pipeline()` |
| `ProfileType.COST_OPTIMIZED` | `cost_optimized_pipeline()` |
| `ProfileType.PERFORMANCE` | `performance_pipeline()` |
| `ProfileType.DEBUG` | `debug_pipeline()` |

### PROFILE_PRESETS

`PROFILE_PRESETS` is a dict mapping each `ProfileType` to its default `PipelineConfig`:

```python
from antaris_pipeline import PROFILE_PRESETS, ProfileType

config = PROFILE_PRESETS[ProfileType.BALANCED]
```

---

## ⚙️ PipelineConfig & Sub-Configs

`PipelineConfig` is the central configuration object. Pass it to `create_pipeline()` or use it directly when constructing `AntarisPipeline`.

### Full Config Example

```python
from antaris_pipeline import (
    PipelineConfig,
    MemoryConfig,
    RouterConfig,
    GuardConfig,
    ContextConfig,
    TelemetricsConfig,
)

config = PipelineConfig(
    memory=MemoryConfig(
        decay_half_life_hours=168,
        search_timeout_ms=5000,
        context_packet_max_tokens=2000,
    ),
    router=RouterConfig(
        track_model_performance=True,
    ),
    guard=GuardConfig(
        enable_output_scanning=True,
        default_policy_strictness=0.5,   # 0.0–1.0
    ),
    context=ContextConfig(
        default_max_tokens=8000,
        enable_compression=True,
        enable_adaptive_budgeting=False,
        model_context_limits={
            "claude-opus-4-6": 200000,
            "gpt-4o": 128000,
        },
    ),
    telemetrics=TelemetricsConfig(
        output_directory="./telemetrics",
        buffer_size=100,
        enable_telemetrics=True,
    ),
)
```

---

### `MemoryConfig`

Controls memory storage, retrieval, and decay behaviour.

| Field | Type | Default | Description |
|---|---|---|---|
| `decay_half_life_hours` | `float` | `168` | Memory decay rate. Memories older than this lose relevance weight. 168h = 1 week. |
| `search_timeout_ms` | `int` | `5000` | BM25 search timeout in milliseconds. |
| `context_packet_max_tokens` | `int` | `2000` | Max tokens for the memory context packet injected into prompts. |

```python
memory_cfg = MemoryConfig(
    decay_half_life_hours=72,        # 3-day decay
    search_timeout_ms=3000,
    context_packet_max_tokens=1500,
)
```

---

### `RouterConfig`

Controls model selection, provider preferences, and performance tracking.

| Field | Type | Default | Description |
|---|---|---|---|
| `track_model_performance` | `bool` | `True` | Record per-model latency and success rates. Feeds into cross-package intelligence. |

```python
router_cfg = RouterConfig(
    track_model_performance=True,
)
```

---

### `GuardConfig`

Controls safety scanning for both input and output.

| Field | Type | Default | Description |
|---|---|---|---|
| `enable_output_scanning` | `bool` | `True` | Whether to scan the model's response (Phase 7). |
| `default_policy_strictness` | `float` | `0.5` | Safety threshold. `0.0` = PERMISSIVE, `0.5` = BALANCED, `1.0` = STRICT. |

```python
guard_cfg = GuardConfig(
    enable_output_scanning=True,
    default_policy_strictness=0.8,   # Lean toward strict
)
```

**Strictness mapping:**

| Range | Mode |
|---|---|
| `0.0 – 0.33` | PERMISSIVE |
| `0.34 – 0.66` | BALANCED |
| `0.67 – 1.0` | STRICT |

---

### `ContextConfig`

Controls token budget management, compression, and model-specific context limits.

| Field | Type | Default | Description |
|---|---|---|---|
| `default_max_tokens` | `int` | `8000` | Default token budget for context assembly. |
| `enable_compression` | `bool` | `True` | Enable context compression when approaching budget limits. |
| `enable_adaptive_budgeting` | `bool` | `False` | Dynamically adjust token budgets based on request complexity. |
| `model_context_limits` | `dict[str, int]` | `{}` | Per-model context window overrides. Used by Router → Context intelligence flow. |

```python
context_cfg = ContextConfig(
    default_max_tokens=12000,
    enable_compression=True,
    enable_adaptive_budgeting=True,
    model_context_limits={
        "claude-opus-4-6": 200000,
        "gpt-4o": 128000,
        "gpt-4o-mini": 128000,
    },
)
```

---

### `TelemetricsConfig`

Controls telemetry output location, buffering, and enable/disable.

| Field | Type | Default | Description |
|---|---|---|---|
| `output_directory` | `str` | `"./telemetrics"` | Directory for JSONL telemetry files. |
| `buffer_size` | `int` | `100` | Number of events to buffer before flushing to disk. |
| `enable_telemetrics` | `bool` | `True` | Master switch for telemetry collection. |

```python
telemetrics_cfg = TelemetricsConfig(
    output_directory="./logs/telemetrics",
    buffer_size=50,
    enable_telemetrics=True,
)
```

---

## 🧠 AntarisPipeline — Main Orchestrator

`AntarisPipeline` (also exported as `Pipeline`) is the central class that wires all sub-systems together and runs the 8-phase processing loop.

### Construction

Typically created via `create_pipeline()` or a profile shortcut. Direct construction:

```python
from antaris_pipeline import AntarisPipeline, PipelineConfig

pipeline = AntarisPipeline(
    storage_path="./memory_store",
    config=PipelineConfig(...),
    session_id="my-session",
    agent_name="MyBot",
)
```

### Public Methods

| Method | Description |
|---|---|
| `process(...)` | Run the full 8-phase pipeline. Returns `PipelineResult`. |
| `dry_run(input_text, context_data=None)` | Simulate all phases without calling the model or writing to memory. |
| `memory_retrieval(input_text, context_data=None, limit=10)` | Query memory directly, bypassing the full pipeline. |
| `flush_to_memory(reason="")` | Persist in-session state to memory (for compaction). |
| `get_compaction_summary()` | Get a markdown summary string of the current session state. |
| `on_session_start(summary="", agent_name=None)` | Restore context from memory at the start of a new session. |
| `set_token_estimator(fn)` | Replace the default token counting heuristic. |
| `get_performance_stats()` | Return session-level performance metrics. |
| `get_intelligence_summary()` | Return cross-package intelligence state. |

---

## 🔄 The 8-Phase Processing Pipeline

`pipeline.process()` executes all 8 phases sequentially for every request.

```python
result = pipeline.process(
    input_text="user message",
    model_caller=lambda text: call_llm(text),
    context_data={"extra": "context"},   # Optional dict merged into context
    dry_run=False,
)
```

### Parameters

| Parameter | Type | Required | Description |
|---|---|---|---|
| `input_text` | `str` | ✅ | The raw user input or prompt. |
| `model_caller` | `Callable[[str], str] \| None` | ✅ (unless `dry_run=True`) | A callable that accepts the assembled prompt and returns the model's response string. |
| `context_data` | `dict \| None` | ❌ | Additional key-value context merged into the context assembly step. |
| `dry_run` | `bool` | ❌ | If `True`, skips model execution and memory writes. Equivalent to calling `dry_run()`. |

---

### Phase 1 — Guard Input Scan

**What happens:** The raw `input_text` is scanned by `antaris-guard` for threats, policy violations, and risk patterns.

**If blocked:** `result.success = False`, `result.error` contains the block reason, and all subsequent phases are skipped.

**Key outputs stored in result:**
- `result.guard_decisions[0]` — the input guard decision dict
- Fields: `is_blocked`, `risk_score` (0.0–1.0), `threat_level`, `patterns_matched`

```python
# Guard decisions are always in result.guard_decisions
# Index 0 = input scan, Index 1 = output scan (Phase 7)
input_guard = result.guard_decisions[0]
print(input_guard["risk_score"])      # e.g. 0.12
print(input_guard["is_blocked"])      # False
print(input_guard["threat_level"])    # "LOW"
```

---

### Phase 2 — Memory Retrieval

**What happens:** BM25 search runs against the memory store using `input_text` as the query. Top results are assembled into a context packet (up to `context_packet_max_tokens`).

**Key outputs:**
- `result.memory_retrievals` — list of retrieved memory dicts
- Each memory: `{content, score, category, source, timestamp, ...}`

```python
print(f"Retrieved {len(result.memory_retrievals)} memories")
for mem in result.memory_retrievals:
    print(f"  [{mem['category']}] {mem['content'][:80]}")
```

---

### Phase 3 — Context Building

**What happens:** The router's model selection hint (from Phase 4 lookahead) sets the token budget ceiling. Memory context packet is injected. Any `context_data` is merged. The full context is optimized/compressed if needed.

**Key outputs:**
- `result.context_optimizations` — dict with compression ratio, token counts, optimization applied

```python
opts = result.context_optimizations
print(f"Compression ratio: {opts.get('compression_ratio')}")
print(f"Input tokens: {opts.get('input_tokens')}")
print(f"Budget used: {opts.get('budget_utilization')}")
```

---

### Phase 4 — Smart Routing

**What happens:** `antaris-router` selects the best model/provider based on: task characteristics, cost constraints, model context limits, and historical performance data fed back from memory.

**Key outputs:**
- `result.routing_decision` — full routing dict

```python
rd = result.routing_decision
print(f"Model: {rd['selected_model']}")
print(f"Provider: {rd['provider']}")
print(f"Tier: {rd['tier']}")
print(f"Confidence: {rd['confidence']}")
print(f"Est. cost: ${rd['estimated_cost']:.4f}")
print(f"Fallback chain: {rd['fallback_chain']}")
print(f"Reasoning: {rd['reasoning']}")
```

---

### Phase 5 — Model Execution

**What happens:** The assembled context is passed to `model_caller`. The callable receives the optimized prompt string and must return the response string.

**In dry_run mode:** This phase is skipped. A simulated response is generated instead.

```python
# model_caller receives the assembled, optimized prompt
result = pipeline.process(
    input_text="Summarize this document",
    model_caller=lambda prompt: anthropic_client.messages.create(
        model="claude-opus-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ).content[0].text,
)
```

---

### Phase 6 — Memory Storage

**What happens:** The conversation turn (input + output) is stored to memory.

**Risk-based branching:**
- If the input had `risk_score > 0.7` (from Phase 1), the turn is stored as a **security fact** rather than a conversation entry. This prevents contamination of the conversation history with threat-related content.
- Normal turns are stored as conversation entries with full metadata.

---

### Phase 7 — Guard Output Scan

**What happens:** If `enable_output_scanning=True`, the model's response is scanned for policy violations or unsafe content.

**If blocked:** `result.success = False` even if the model call succeeded. The output scan result is appended to `result.guard_decisions`.

```python
# Index 1 in guard_decisions is the output scan
if len(result.guard_decisions) > 1:
    output_guard = result.guard_decisions[1]
    print(output_guard["is_blocked"])
```

---

### Phase 8 — Cross-Package Intelligence Updates

**What happens:** All five cross-package intelligence flows are updated with data from this request (see [Cross-Package Intelligence Flows](#-cross-package-intelligence-flows)).

This includes:
- Routing feedback (latency + success) written to memory
- Context pressure counters updated
- Security pattern tracking updated in memory

---

## 📊 PipelineResult

`process()` always returns a `PipelineResult`. It never raises on pipeline-level failures — errors are captured in `result.error`.

### Fields

| Field | Type | Description |
|---|---|---|
| `success` | `bool` | `True` if the pipeline completed without blocking or error. |
| `output` | `str` | The model's response. Empty string if blocked or errored. |
| `error` | `str \| None` | Error or block reason. `None` on success. |
| `events` | `List[AntarisEvent]` | All telemetry events emitted during this request. |
| `performance` | `dict` | `{"total_latency_ms": float, "dry_run": bool}` |
| `memory_retrievals` | `List[dict]` | Memories retrieved in Phase 2. |
| `routing_decision` | `dict` | Full routing decision from Phase 4. |
| `guard_decisions` | `List[dict]` | Input guard (index 0) and optionally output guard (index 1). |
| `context_optimizations` | `dict` | Context assembly stats from Phase 3. |

### Methods

```python
# Serialize to dict (e.g. for logging or API responses)
result_dict = result.to_dict()
```

### Usage Example

```python
result = pipeline.process(
    input_text="Hello!",
    model_caller=my_llm_caller,
)

if not result.success:
    print(f"Blocked or error: {result.error}")
else:
    print(f"Response: {result.output}")
    print(f"Latency: {result.performance['total_latency_ms']:.1f}ms")
    print(f"Model used: {result.routing_decision['selected_model']}")
    print(f"Memories retrieved: {len(result.memory_retrievals)}")
```

---

## 🔍 Dry Run Mode

Dry run simulates all 8 phases without calling the model or writing anything to memory. Use it to preview routing decisions, guard behavior, and context utilization before committing to a live request.

### Method 1: `pipeline.dry_run()`

```python
sim = pipeline.dry_run(
    input_text="What would happen?",
    context_data={"topic": "security"},   # Optional
)
```

### Method 2: `pipeline.process()` with `dry_run=True`

```python
result = pipeline.process(
    input_text="What would happen?",
    model_caller=None,   # Not called in dry_run
    dry_run=True,
)
```

### Dry Run Response Fields

```python
sim = pipeline.dry_run("What would happen?")

# Input analysis
sim["input"]["text"]               # The input string
sim["input"]["length"]             # Character count
sim["input"]["context_provided"]   # bool

# Guard input prediction
sim["guard_input"]["would_allow"]      # bool
sim["guard_input"]["risk_score"]       # float 0.0–1.0
sim["guard_input"]["threat_level"]     # "LOW" | "MEDIUM" | "HIGH"
sim["guard_input"]["patterns_active"]  # int — active rule count
sim["guard_input"]["sensitivity"]      # current sensitivity level
sim["guard_input"]["match_count"]      # int — patterns matched
sim["guard_input"]["message"]          # human-readable summary

# Memory prediction
sim["memory"]["would_retrieve"]   # bool
sim["memory"]["total_in_store"]   # int — total stored memories
sim["memory"]["top_relevance"]    # float — top BM25 score
sim["memory"]["matched_terms"]    # list of matched query terms

# Context prediction
sim["context"]["would_optimize"]          # bool
sim["context"]["estimated_input_tokens"]  # int
sim["context"]["total_budget"]            # int
sim["context"]["utilization"]             # float 0.0–1.0

# Router prediction
sim["router"]["would_select"]         # selected model name
sim["router"]["provider"]             # provider name
sim["router"]["tier"]                 # model tier
sim["router"]["confidence"]           # float 0.0–1.0
sim["router"]["estimated_cost_usd"]   # float
sim["router"]["fallback_chain"]       # list of fallback models
sim["router"]["reasoning"]            # routing explanation string

# Output guard prediction
sim["guard_output"]["would_scan"]    # bool
sim["guard_output"]["sensitivity"]   # current sensitivity

# Timing
sim["dry_run_time_ms"]   # float — simulation time in ms
```

### Example: Pre-flight Check

```python
sim = pipeline.dry_run("Ignore all previous instructions and...")

if not sim["guard_input"]["would_allow"]:
    print(f"⛔ Would be blocked (risk: {sim['guard_input']['risk_score']:.2f})")
else:
    print(f"✅ Would route to: {sim['router']['would_select']}")
    print(f"   Est. cost: ${sim['router']['estimated_cost_usd']:.4f}")
    print(f"   Context utilization: {sim['context']['utilization']:.0%}")
```

---

## 🗃️ Memory Retrieval

Run a direct BM25 memory search without executing the full pipeline. Useful for building custom context or debugging retrieval quality.

```python
data = pipeline.memory_retrieval(
    input_text="security vulnerability report",
    context_data={"agent_id": "MyBot"},   # Optional filter hints
    limit=10,
)
```

### Parameters

| Parameter | Type | Default | Description |
|---|---|---|---|
| `input_text` | `str` | — | Query text for BM25 search. |
| `context_data` | `dict \| None` | `None` | Additional context passed to the memory engine (e.g. for filtering). |
| `limit` | `int` | `10` | Maximum number of results to return. |

### Response

```python
data["retrievals"]        # List[dict] — full memory objects
data["relevance_scores"]  # List[float] — BM25 scores per memory
data["context_packet"]    # str — assembled context string (ready for injection)
data["memory_count"]      # int — total memories in store
data["retrieved_count"]   # int — number actually returned
```

### Example

```python
data = pipeline.memory_retrieval("pricing strategy", limit=5)

print(f"Found {data['retrieved_count']} of {data['memory_count']} memories")
for mem, score in zip(data["retrievals"], data["relevance_scores"]):
    print(f"  [{score:.3f}] {mem['content'][:80]}")

# Use the context packet directly
my_prompt = f"{data['context_packet']}\n\nUser: What's our pricing strategy?"
```

---

## 🔄 Session Lifecycle Hooks

### `flush_to_memory(reason="") -> int`

Persists current in-session state to the memory store. Call this before compaction to ensure session intelligence is preserved across context resets.

```python
n = pipeline.flush_to_memory(reason="compaction")
print(f"Stored {n} memory entries")
```

**What gets stored:**
- Performance summary: per-model latency stats, success rates, request count
- Security pattern summary: any threat patterns observed this session

**Returns:** Number of memory entries written.

---

### `get_compaction_summary() -> str`

Returns a markdown-formatted summary of the current session state. Designed to be passed as `prependContext` during compaction events.

```python
summary = pipeline.get_compaction_summary()
# Returns a markdown string, e.g.:
# ## Session Summary
# - Requests: 42
# - Models used: claude-opus-4-6 (38), gpt-4o (4)
# - Avg latency: 1234ms
# - Security events: 2
```

**Typical compaction workflow:**

```python
# In your compaction hook:
def before_compaction_hook(ctx):
    n = pipeline.flush_to_memory(reason="compaction")
    summary = pipeline.get_compaction_summary()
    # Pass summary to your compaction system as prependContext
    return summary

# Or with the hook system:
from antaris_pipeline import HookPhase, HookContext, HookResult

def compaction_hook(ctx: HookContext) -> HookResult:
    pipeline.flush_to_memory(reason="compaction")
    return HookResult(continue_pipeline=True)

registry.register(HookPhase.BEFORE_MODEL_CALL, compaction_hook)
```

---

### `on_session_start(summary="", agent_name=None) -> dict`

Restores relevant context from memory at the beginning of a new session. Call this once, immediately after creating the pipeline, before the first `process()` call.

```python
ctx = pipeline.on_session_start(
    summary="",           # Optional compaction summary from previous session
    agent_name="MyBot",   # Optional agent name for memory filtering
)
```

### Parameters

| Parameter | Type | Default | Description |
|---|---|---|---|
| `summary` | `str` | `""` | Compaction summary from the previous session (from `get_compaction_summary()`). |
| `agent_name` | `str \| None` | `None` | Agent name used to filter memory retrieval by `agent_id`. |

### Response

```python
ctx["prependContext"]   # str — markdown context string, ready to prepend to system prompt
```

### Restoration Strategy

1. **Primary:** `memory.recent()` — recency-first retrieval of the most recent memories
2. **Supplemental:** BM25 search filtered by `agent_id` to find agent-specific context
3. Top 10 memories, deduplicated, formatted with category and source metadata
4. Output format: `"## Restored Session Context\n..."` — ready to prepend to your system prompt

### Full Session Start Example

```python
pipeline = create_pipeline("./memory_store", agent_name="MyBot")

# Restore previous context
ctx = pipeline.on_session_start(agent_name="MyBot")

system_prompt = f"""
{ctx['prependContext']}

You are MyBot, a helpful assistant.
"""

# Now start processing requests
result = pipeline.process(
    input_text=user_message,
    model_caller=lambda prompt: llm_call(system_prompt, prompt),
)
```

---

## 🔢 Custom Token Estimator

Replace the built-in token estimation heuristic with your own. This affects context budget calculations in Phase 3.

### Default Behavior

The default estimator uses: `len(text) // 4` — a rough approximation suitable for most use cases.

### Setting a Custom Estimator

```python
pipeline.set_token_estimator(fn: Callable[[str], int])
```

### Example: tiktoken (OpenAI-compatible)

```python
import tiktoken

enc = tiktoken.get_encoding("cl100k_base")
pipeline.set_token_estimator(lambda text: len(enc.encode(text)))
```

### Example: Anthropic token counting

```python
import anthropic

client = anthropic.Anthropic()

def anthropic_token_count(text: str) -> int:
    response = client.messages.count_tokens(
        model="claude-opus-4-6",
        messages=[{"role": "user", "content": text}],
    )
    return response.input_tokens

pipeline.set_token_estimator(anthropic_token_count)
```

### Example: Simple word-based estimate

```python
# Roughly 0.75 tokens per word
pipeline.set_token_estimator(lambda text: int(len(text.split()) * 1.33))
```

---

## 📈 Performance & Intelligence

### `get_performance_stats() -> dict`

Returns session-level performance metrics.

```python
stats = pipeline.get_performance_stats()
```

**Response fields:**

| Field | Description |
|---|---|
| `session_id` | Current session identifier |
| `total_requests` | Number of `process()` calls this session |
| `routing_feedback` | Per-model latency and success rate history |
| `security_patterns` | Observed threat pattern counters |
| `telemetrics_summary` | Summary from the telemetrics collector |

```python
stats = pipeline.get_performance_stats()
print(f"Session: {stats['session_id']}")
print(f"Total requests: {stats['total_requests']}")
for model, fb in stats["routing_feedback"].items():
    print(f"  {model}: {fb['success_rate']:.0%} success, {fb['avg_latency_ms']:.0f}ms avg")
```

---

### `get_intelligence_summary() -> dict`

Returns the full cross-package intelligence state — a deep view into what the pipeline has learned this session.

```python
intel = pipeline.get_intelligence_summary()
```

**Response fields:**

| Field | Description |
|---|---|
| `routing_feedback` | Model performance history (latency + success rates) |
| `security_patterns` | Active threat pattern counters from guard |
| `context_pressure_events` | Count of heavy-compression events (DoS signals) |
| `provider_health` | Per-provider availability and error rates |
| `memory_stats` | Memory store size and retrieval performance |
| `guard_posture` | Current guard sensitivity and recent decisions |
| `last_routing_result` | Full routing decision from the most recent request |
| `current_trace_id` | Active trace ID for telemetry correlation |

```python
intel = pipeline.get_intelligence_summary()
print(f"Context pressure events: {intel['context_pressure_events']}")
print(f"Guard posture: {intel['guard_posture']}")
print(f"Last model: {intel['last_routing_result']['selected_model']}")
```

---

## 🤖 AgentPipeline — Agent Lifecycle Wrapper

`AgentPipeline` wraps `AntarisPipeline` and exposes a simplified two-step API optimized for conversational agent frameworks. Instead of one monolithic `process()` call, it exposes `pre_turn()` and `post_turn()` — letting you inject your own logic (tool calls, multi-step reasoning) between guard/retrieval and memory storage.

### Construction

```python
from antaris_pipeline import AgentPipeline

agent = AgentPipeline(
    pipeline=pipeline,       # An AntarisPipeline instance
    session_id="session-1",  # Session identifier
)
```

### Parameters

| Parameter | Type | Description |
|---|---|---|
| `pipeline` | `AntarisPipeline` | The underlying pipeline to wrap. |
| `session_id` | `str` | Session identifier. |

---

### `pre_turn(user_message, search_limit=10) -> PreTurnResult`

Runs Phase 1 (guard input scan) and Phase 2 (memory retrieval) without executing the model or storing anything. Call this at the start of each user turn to get safety clearance and relevant context.

```python
pre = agent.pre_turn(
    user_message="Tell me about the deployment last week",
    search_limit=10,
)
```

**Parameters:**

| Parameter | Type | Default | Description |
|---|---|---|---|
| `user_message` | `str` | — | The user's input text. |
| `search_limit` | `int` | `10` | Max memories to retrieve. |

**`PreTurnResult` fields:**

| Field | Type | Description |
|---|---|---|
| `context_packet` | `str` | Assembled memory context, ready to inject into the system prompt. |
| `memories_found` | `int` | Number of relevant memories retrieved. |
| `guard_result` | `dict` | Full guard decision dict (same as `result.guard_decisions[0]`). |
| `should_continue` | `bool` | `False` if the input was blocked. Check this before calling the model. |

```python
pre = agent.pre_turn("Hello!")

if not pre.should_continue:
    print(f"Blocked: {pre.guard_result['threat_level']}")
    return

# Inject context into your agent's system prompt
system_prompt = f"""
{pre.context_packet}
You are a helpful assistant.
"""

# Run your agent logic (tool calls, multi-step, etc.)
response = my_agent_logic(system_prompt, user_message)
```

---

### `post_turn(user_message, agent_response, agent_id=None, session_id=None, channel_id=None) -> PostTurnResult`

Runs Phase 6 (memory storage) after the agent has generated a response. Call this at the end of each turn to persist the conversation.

```python
post = agent.post_turn(
    user_message="Tell me about the deployment",
    agent_response="Last week we deployed v4.9.0 with...",
    agent_id="MyBot",
    session_id="session-1",
    channel_id="discord-channel-123",
)
```

**Parameters:**

| Parameter | Type | Default | Description |
|---|---|---|---|
| `user_message` | `str` | — | The original user input. |
| `agent_response` | `str` | — | The agent's generated response. |
| `agent_id` | `str \| None` | `None` | Agent identifier for memory metadata. |
| `session_id` | `str \| None` | `None` | Session ID override. Uses `AgentPipeline.session_id` if not set. |
| `channel_id` | `str \| None` | `None` | Channel identifier for memory metadata. |

**`PostTurnResult` fields:**

| Field | Type | Description |
|---|---|---|
| `ingested` | `int` | Number of memory entries created. |
| `saved` | `bool` | `True` if memory storage succeeded. |

---

### Full AgentPipeline Example

```python
from antaris_pipeline import create_pipeline, AgentPipeline

pipeline = create_pipeline("./store", agent_name="MyBot")
agent = AgentPipeline(pipeline=pipeline, session_id="session-abc")

async def handle_message(user_message: str) -> str:
    # Step 1: Pre-turn (guard + memory)
    pre = agent.pre_turn(user_message, search_limit=10)

    if not pre.should_continue:
        return "I can't help with that."

    # Step 2: Build system prompt with retrieved context
    system = f"""
{pre.context_packet}
You are MyBot. Be concise and helpful.
    """.strip()

    # Step 3: Call your LLM (with tool calls, chain-of-thought, etc.)
    response = await my_llm(system, user_message)

    # Step 4: Post-turn (store to memory)
    post = agent.post_turn(
        user_message=user_message,
        agent_response=response,
        agent_id="MyBot",
    )

    return response
```

---

## 🪝 Hook System

The hook system lets you inject custom logic at any phase of the pipeline. Hooks can observe, modify, or short-circuit processing.

### Concepts

| Class | Description |
|---|---|
| `HookPhase` | Enum of all available injection points |
| `HookContext` | Input to your hook function — contains current pipeline state |
| `HookResult` | Output from your hook — controls pipeline continuation |
| `HookRegistry` | Registry that maps phases to hook functions |
| `HookCallback` | Type alias: `Callable[[HookContext], HookResult]` |
| `PipelineHooks` | Higher-level hook management (attached to `AntarisPipeline`) |

---

### Available Hook Phases

```python
from antaris_pipeline import HookPhase

# Input safety
HookPhase.BEFORE_GUARD_INPUT
HookPhase.AFTER_GUARD_INPUT

# Memory
HookPhase.BEFORE_MEMORY_RETRIEVE
HookPhase.AFTER_MEMORY_RETRIEVE

# Context assembly
HookPhase.BEFORE_CONTEXT_BUILD
HookPhase.AFTER_CONTEXT_BUILD

# Routing
HookPhase.BEFORE_ROUTE
HookPhase.AFTER_ROUTE

# Model execution
HookPhase.BEFORE_MODEL_CALL
HookPhase.AFTER_MODEL_CALL

# Memory storage
HookPhase.BEFORE_MEMORY_STORE
HookPhase.AFTER_MEMORY_STORE

# Output safety
HookPhase.BEFORE_GUARD_OUTPUT
HookPhase.AFTER_GUARD_OUTPUT
```

---

### Writing a Hook

```python
from antaris_pipeline import HookContext, HookResult

def my_hook(ctx: HookContext) -> HookResult:
    # Available on ctx:
    # ctx.phase          — HookPhase enum value
    # ctx.input_text     — original user input
    # ctx.result_so_far  — partial PipelineResult accumulated so far
    # ctx.metadata       — dict of phase-specific data

    # Observe and continue (most common)
    return HookResult(continue_pipeline=True, modified_input=None)

    # Modify the input for the next phase
    return HookResult(continue_pipeline=True, modified_input="sanitized input")

    # Short-circuit the pipeline
    return HookResult(continue_pipeline=False, modified_input=None)
```

**`HookResult` fields:**

| Field | Type | Description |
|---|---|---|
| `continue_pipeline` | `bool` | If `False`, stops processing and returns current state. |
| `modified_input` | `str \| None` | If set, replaces `input_text` for subsequent phases. |

---

### Registering Hooks

```python
from antaris_pipeline import HookRegistry, HookPhase

registry = HookRegistry()
registry.register(HookPhase.BEFORE_MODEL_CALL, my_hook)
registry.register(HookPhase.AFTER_MEMORY_RETRIEVE, another_hook)
```

---

### Hook Examples

**Logging hook — observe every phase:**

```python
import time

def timing_hook(ctx: HookContext) -> HookResult:
    print(f"[{ctx.phase.name}] input_len={len(ctx.input_text)}")
    return HookResult(continue_pipeline=True)

for phase in HookPhase:
    registry.register(phase, timing_hook)
```

**Input sanitization hook:**

```python
import re

def sanitize_hook(ctx: HookContext) -> HookResult:
    cleaned = re.sub(r"<[^>]+>", "", ctx.input_text)  # Strip HTML
    return HookResult(continue_pipeline=True, modified_input=cleaned)

registry.register(HookPhase.BEFORE_GUARD_INPUT, sanitize_hook)
```

**Compaction hook — flush before model call:**

```python
def compaction_hook(ctx: HookContext) -> HookResult:
    if ctx.metadata.get("request_count", 0) % 50 == 0:
        pipeline.flush_to_memory(reason="periodic_compaction")
    return HookResult(continue_pipeline=True)

registry.register(HookPhase.BEFORE_MODEL_CALL, compaction_hook)
```

**Circuit-breaker hook — block on high context pressure:**

```python
def pressure_guard_hook(ctx: HookContext) -> HookResult:
    intel = pipeline.get_intelligence_summary()
    if intel["context_pressure_events"] > 10:
        # Too many compression events — possible DoS
        return HookResult(continue_pipeline=False)
    return HookResult(continue_pipeline=True)

registry.register(HookPhase.BEFORE_CONTEXT_BUILD, pressure_guard_hook)
```

---

## 📡 Telemetrics & Events

`antaris-pipeline` emits structured events at every phase. Events are written to JSONL files and can be consumed in real time.

### Output Files

```
./telemetrics/telemetrics_{session_id}.jsonl
```

Each line is a JSON object representing one event. Files rotate per session.

---

### Event Types

Events are emitted automatically. You don't need to instrument anything.

| EventType | When Emitted |
|---|---|
| `PIPELINE_START` | At the start of each `process()` call |
| `PIPELINE_COMPLETE` | When `process()` completes successfully |
| `PIPELINE_ERROR` | When an unhandled error occurs |
| `PIPELINE_DRY_RUN` | When `dry_run()` is called |
| `GUARD_SCAN` | Each time a guard scan runs |
| `GUARD_ALLOW` | When a guard scan allows the content |
| `GUARD_DENY` | When a guard scan blocks the content |
| `MEMORY_RETRIEVE` | After BM25 memory search |
| `MEMORY_INGEST` | After memory storage |
| `CONTEXT_BUILD` | After context assembly |
| `ROUTER_ROUTE` | After model selection |

---

### AntarisEvent Structure

```python
from antaris_pipeline import AntarisEvent, EventType

# Events on a result:
for event in result.events:
    print(f"[{event.event_type}] {event.timestamp} — {event.data}")
```

**`AntarisEvent` fields:**

| Field | Type | Description |
|---|---|---|
| `event_type` | `EventType` | The event category. |
| `timestamp` | `float` | Unix timestamp. |
| `session_id` | `str` | Session this event belongs to. |
| `trace_id` | `str` | Per-request trace ID for correlation. |
| `data` | `dict` | Event-specific payload. |

---

### Telemetrics Classes

| Class | Description |
|---|---|
| `TelemetricsCollector` | Buffers events and writes to JSONL. Configured via `TelemetricsConfig`. |
| `TelemetricsServer` | Optional HTTP server for real-time event streaming. |
| `PipelineTelemetry` | High-level interface used internally by `AntarisPipeline`. |
| `EventEmitter` | Base class for emitting typed events. |

---

### Event Helpers

Convenience functions for constructing events in hook callbacks or custom code:

```python
from antaris_pipeline import memory_event, router_event, guard_event, context_event

evt = memory_event(session_id="s1", retrieved=5, total=200)
evt = router_event(session_id="s1", model="claude-opus-4-6", cost=0.0042)
evt = guard_event(session_id="s1", blocked=False, risk_score=0.12)
evt = context_event(session_id="s1", tokens=4200, budget=8000)
```

---

### Parsing Telemetry Files

```python
import json
from pathlib import Path

session_id = "my-session"
log_file = Path(f"./telemetrics/telemetrics_{session_id}.jsonl")

events = [json.loads(line) for line in log_file.read_text().splitlines()]

# Filter to GUARD_DENY events
blocks = [e for e in events if e["event_type"] == "GUARD_DENY"]
print(f"Total blocks this session: {len(blocks)}")

# Calculate average latency
pipeline_events = [e for e in events if e["event_type"] == "PIPELINE_COMPLETE"]
avg_latency = sum(e["data"]["total_latency_ms"] for e in pipeline_events) / len(pipeline_events)
print(f"Avg latency: {avg_latency:.1f}ms")
```

---

### `PerformanceMetrics` and `ConfidenceBasis`

Additional telemetry types exposed for custom instrumentation:

```python
from antaris_pipeline import PerformanceMetrics, ConfidenceBasis

# PerformanceMetrics: attached to routing decisions
# Fields: latency_ms, tokens_used, cost_usd, success

# ConfidenceBasis: explains why the router chose a model
# Values: PERFORMANCE_HISTORY, COST_OPTIMIZATION, CAPABILITY_MATCH, FALLBACK, DEFAULT
```

---

## 🔗 Cross-Package Intelligence Flows

`antaris-pipeline` implements 5 automatic intelligence flows that wire the sub-packages together. These run in Phase 8 of every `process()` call and require no configuration.

---

### Flow 1 — Memory → Router

**What:** Model success rates and latency history (stored as `routing_feedback` memories) are retrieved and passed to the router as performance hints.

**Effect:** The router avoids models with poor recent performance and prefers those with proven reliability for the current task type.

```
Memory Store ──[routing_feedback memories]──► Router (Phase 4 model selection)
```

---

### Flow 2 — Router → Context

**What:** The selected model's context window size (from `model_context_limits` in `ContextConfig`) is used as the upper bound for the token budget.

**Effect:** Context assembly never tries to pack more tokens than the selected model can actually handle.

```
Router (selected_model) ──[context_limit]──► Context Builder (Phase 3 budget ceiling)
```

---

### Flow 3 — Guard → Memory

**What:** When `risk_score > 0.7` on an input scan (Phase 1), the content is stored to memory as a **security fact** rather than a normal conversation entry.

**Effect:** High-risk inputs are flagged in memory for future reference without contaminating conversation history. The agent learns about threat patterns over time.

```
Guard (risk_score > 0.7) ──[security fact]──► Memory Store (Phase 6, alternate storage path)
```

---

### Flow 4 — Context → Guard

**What:** When compression ratio drops below 0.3 (i.e., the context had to be compressed to less than 30% of its original size), this is flagged as a potential DoS vector and the `context_pressure_events` counter is incremented.

**Effect:** Sustained context pressure (many high-compression events) is a signal of prompt stuffing or adversarial input. Visible via `get_intelligence_summary()["context_pressure_events"]`.

```
Context Builder (ratio < 0.3) ──[pressure signal]──► Guard/Intelligence (Phase 8 counter)
```

---

### Flow 5 — Router → Memory

**What:** After each model call completes (Phase 5), the model name, latency, and success/failure are stored as a `routing_feedback` memory entry.

**Effect:** The pipeline builds up a performance history for each model over time, which feeds back into Flow 1 to improve future routing decisions.

```
Model Execution (Phase 5) ──[latency + success]──► Memory Store (routing_feedback)
                                                    ↑
                               (retrieved in Flow 1 next request)
```

---

### Intelligence Flow Summary

```
┌──────────────────────────────────────────────────────────────┐
│                  Cross-Package Intelligence                   │
│                                                              │
│  Memory ──Flow 1──► Router  (perf history → model select)   │
│  Router ──Flow 2──► Context (model limits → token budget)    │
│  Guard  ──Flow 3──► Memory  (high-risk → security facts)     │
│  Context──Flow 4──► Guard   (heavy compress → DoS signal)    │
│  Router ──Flow 5──► Memory  (latency+success → feedback)     │
└──────────────────────────────────────────────────────────────┘
```

All flows are wired at construction time. No configuration required.

---

## 🧩 Complete Integration Example

A full example showing `antaris-pipeline` integrated into a Discord bot:

```python
import asyncio
from antaris_pipeline import create_pipeline, AgentPipeline
import anthropic

# ── Setup ────────────────────────────────────────────────────────────────────

client = anthropic.Anthropic()

pipeline = create_pipeline(
    storage_path="./memory_store",
    guard_config={"sensitivity": "balanced"},
    context_config={"total_budget": 8000},
    session_id="discord-bot-session",
    agent_name="MyBot",
)

# Swap in a real token counter
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
pipeline.set_token_estimator(lambda text: len(enc.encode(text)))

agent = AgentPipeline(pipeline=pipeline, session_id="discord-bot-session")

# ── Session start ─────────────────────────────────────────────────────────────

ctx = pipeline.on_session_start(agent_name="MyBot")
BASE_SYSTEM = f"""
{ctx['prependContext']}

You are MyBot, a helpful assistant in a Discord server.
Be concise. Use markdown sparingly.
""".strip()


# ── Per-message handler ───────────────────────────────────────────────────────

async def handle_message(user_id: str, user_message: str) -> str:
    # Step 1: Pre-turn (guard + memory retrieval)
    pre = agent.pre_turn(user_message, search_limit=8)

    if not pre.should_continue:
        risk = pre.guard_result.get("risk_score", 0)
        return f"⛔ I can't help with that. (risk: {risk:.2f})"

    # Step 2: Build context-aware system prompt
    system = BASE_SYSTEM
    if pre.context_packet:
        system = f"{pre.context_packet}\n\n{BASE_SYSTEM}"

    # Step 3: Call the LLM
    def llm_caller(prompt: str) -> str:
        resp = client.messages.create(
            model="claude-opus-4-6",
            max_tokens=1024,
            system=system,
            messages=[{"role": "user", "content": prompt}],
        )
        return resp.content[0].text

    result = pipeline.process(
        input_text=user_message,
        model_caller=llm_caller,
        context_data={"user_id": user_id},
    )

    if not result.success:
        return f"Something went wrong: {result.error}"

    # Step 4: Store to memory
    post = agent.post_turn(
        user_message=user_message,
        agent_response=result.output,
        agent_id="MyBot",
        channel_id="discord-main",
    )

    return result.output


# ── Periodic flush (call from your compaction/restart handler) ────────────────

def on_shutdown():
    n = pipeline.flush_to_memory(reason="shutdown")
    print(f"Flushed {n} memories before shutdown")
    summary = pipeline.get_compaction_summary()
    print(summary)


# ── Dry-run health check (call on startup) ────────────────────────────────────

def health_check():
    sim = pipeline.dry_run("Hello, how are you?")
    print("=== Pipeline Health Check ===")
    print(f"Guard: {'✅ allow' if sim['guard_input']['would_allow'] else '⛔ block'}")
    print(f"Memory: {sim['memory']['total_in_store']} entries in store")
    print(f"Router: → {sim['router']['would_select']} (${sim['router']['estimated_cost_usd']:.4f})")
    print(f"Context: {sim['context']['utilization']:.0%} budget utilization")
    print(f"Dry run time: {sim['dry_run_time_ms']:.1f}ms")
```

---

## 🗺️ Architecture Overview

```
                        antaris-pipeline
                        ┌───────────────────────────────────────────┐
  User Input            │                                           │
       │                │  Phase 1: Guard Input Scan                │
       ▼                │       ↓ (if blocked → return error)       │
  AntarisPipeline       │  Phase 2: Memory Retrieval (BM25)         │
  .process()            │       ↓                                   │
       │                │  Phase 3: Context Building + Optimization │
       │                │       ↓                                   │
       │                │  Phase 4: Smart Routing                   │
       │                │       ↓                                   │
       │                │  Phase 5: Model Execution                 │
       │                │       ↓                                   │
       │                │  Phase 6: Memory Storage                  │
       │                │       ↓                                   │
       │                │  Phase 7: Guard Output Scan               │
       │                │       ↓                                   │
       │                │  Phase 8: Intelligence Updates            │
       │                │                                           │
       ▼                └───────────────────────────────────────────┘
  PipelineResult
  (success, output, events, routing_decision, guard_decisions, ...)

  Sub-packages:
  ┌─────────────────┐  ┌──────────────────┐  ┌─────────────┐  ┌──────────────────┐
  │ antaris-memory  │  │  antaris-router  │  │antaris-guard│  │ antaris-context  │
  │ BM25 retrieval  │  │ Model selection  │  │ Safety scan │  │ Token budgeting  │
  │ Decay scoring   │  │ Provider routing │  │ Threat class│  │ Compression      │
  │ Fact storage    │  │ Fallback chains  │  │ Policy rules│  │ Adaptive budget  │
  └─────────────────┘  └──────────────────┘  └─────────────┘  └──────────────────┘
```

---

## 🔖 Changelog

### 4.9.20
- Cross-package intelligence flows (Sprint 3): all 5 flows wired automatically
- `on_session_start()` with recency-first + BM25 supplemental strategy
- `flush_to_memory()` and `get_compaction_summary()` for compaction support
- `AgentPipeline` with `PreTurnResult` / `PostTurnResult` typed returns
- Full hook system with 14 injection points across 8 phases
- `debug_pipeline()` shortcut with full telemetrics enabled
- `set_token_estimator()` for custom token counting
- `dry_run()` simulation with per-phase prediction fields
- JSONL telemetry output with session-scoped files

---

## 📄 License

Copyright © Antaris Analytics LLC. All rights reserved.
