Metadata-Version: 2.4
Name: llmops-observability
Version: 50.0.0
Summary: LLMOps Observability SDK: decorators + SQS dispatch with compression
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: python-dotenv
Requires-Dist: boto3
Requires-Dist: wrapt
Provides-Extra: http
Requires-Dist: httpx>=0.24.0; extra == "http"
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: requests; extra == "test"
Requires-Dist: httpx>=0.24.0; extra == "test"

# LLMOps Observability SDK

A production-grade Python SDK for LLM observability with SQS-based event streaming for decoupled, scalable observability pipelines. Automatically captures traces, spans, token usage, costs, and metadata from your LLM applications.

## 🎯 Key Features

- ⚡ **SQS Event Streaming**: Batch events to AWS SQS with automatic spillover recovery
- 🎨 **Simple Decorators**: `@track_function`, `@track_llm_call`, `@track_llm_agent`, and `@track_llm_tool` for instant instrumentation
- � **Distributed Tracing**: `@track_external_service` (parent) and `@resume_external_service` (child) for cross-service trace correlation
- 🔄 **Sync & Async Support**: Works with both synchronous and asynchronous functions
- 🤖 **Provider Agnostic**: Compatible with any LLM provider (AWS Bedrock, OpenAI, Anthropic, etc.)
- 🪆 **Hierarchical Tracing**: Automatic parent-child span relationships with proper nesting, including multi-service spans
- 💰 **Cost Tracking**: Built-in token usage and cost calculation for AWS Bedrock models
- 🔍 **Smart Capture**: Optionally capture function locals and self for detailed debugging
- 📊 **Size Management**: Automatic truncation and compression with SQS payload limits (1 MB) to prevent data issues
- 🛡️ **Production-Ready**: Daemon workers, batch processing, clean shutdown handling
- 🌍 **Auto-Injection**: Environment and project_id automatically added to every span
- 🌐 **Multi-Service Merge**: Child segments from downstream services automatically merged with parent trace before fanout to observability platforms

## 📦 Installation

```bash
# From source (development)
cd llmops-observability_sdk
pip install -e .

# Or with uv
uv sync
```

## ⚙️ Configuration

### Environment Variables

Create a `.env` file in your application directory:

```bash
# Project Configuration (Required - Auto-injected into every span)
PROJECT_ID=my_project          # Your project identifier
ENV=uat                        # Environment: development, staging, uat, production

# AWS SQS Configuration (Required for trace streaming)
AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123456789/my-queue
AWS_PROFILE=default            # AWS profile name
AWS_REGION=us-east-1           # AWS region

# Model Configuration (Optional)
MODEL_ID=anthropic.claude-3-5-sonnet-20241022-v2:0  # Default model for cost calculation
```

**Key Configuration Values:**
- **`PROJECT_ID`**: Unique identifier for your project. Auto-injected into every span's metadata.
- **`ENV`**: Environment name (development/staging/uat/production). Auto-injected into every span's metadata.
- **`AWS_SQS_URL`**: SQS queue URL for sending traces to Lambda processor.

> 💡 **Important**: `LLMOPS_PROJECT_ID` and `LLMOPS_ENV` are automatically injected into every span's metadata by the SDK. You don't need to manually add them to decorators.

## 🚀 Quick Start

### 1. Basic Usage with Auto-Configuration
```python
from llmops_observability import (
    TraceManager, 
    track_function, 
    track_llm_call, 
    track_llm_agent, 
    track_llm_tool,
    track_external_service,
    resume_external_service,
    get_injected_headers,
)

# Start a trace - LLMOPS_PROJECT_ID and LLMOPS_ENV are auto-loaded from config.py
TraceManager.start_trace(
    name="rag_pipeline_example",
    user_id="user_123",
    session_id="session_456",
    metadata={"version": "1.0.0"},
    tags=["example", "rag"]
)

# Track regular functions
@track_function()
def process_input(user_query: str):
    return {"query": user_query, "processed": True}

# Track LLM calls with automatic cost calculation
@track_llm_call(model="anthropic.claude-3-sonnet-20240229-v1:0")
def call_llm(prompt: str):
    response = bedrock_client.invoke_model(...)  # Your LLM call
    return response

# Track calls to downstream services (automatically injects trace context)
@track_external_service(service_name="user_service", transport="http")
def fetch_user_profile(user_id: str):
    headers = get_injected_headers()
    response = requests.get(f"{USER_SERVICE_URL}/profile/{user_id}", headers=headers)
    return response.json()

# Execute your workflow
result = process_input("What is Python?")
answer = call_llm("Context: ...\n\nQuestion: What is Python?")
profile = fetch_user_profile("user_123")

# Finalize and send to SQS (optional parameters)
TraceManager.finalize_and_send(
    trace_name="rag_pipeline_example",
    trace_input={"user_msg": "What is Python?"},
    trace_output={"bot_response": answer}
)
```

## 📌 Universal API Call Instrumentation (End-to-End)

This enables outbound HTTP telemetry for a wide range of API call styles:

- `requests` (sync)
- `httpx.Client` (sync)
- `httpx.AsyncClient` (async)
- `urllib.request.urlopen` (sync, stdlib)
- FastAPI/Starlette inbound HTTP + outbound child calls in handlers/services

### What this solves

When your application calls external APIs (model gateway, internal REST, third-party tools),
the SDK now captures those as child spans with one common schema and redaction behavior,
independent of which Python client is used.

### Scope and constraints

- ✅ Sync + async outbound support
- ✅ Redaction for sensitive headers/query/body keys
- ✅ Parent-child correlation with active trace/span context
- ✅ Works with FastAPI and non-FastAPI apps
- ⚠️ Shell `curl` is not a Python HTTP client, so it is not monkeypatched by the SDK

### API surface added for this substory

Use these public helpers:

- `enable_outbound_http_instrumentation(...)`
- `disable_outbound_http_instrumentation()`
- `outbound_instrumentation_status()`
- `enable_requests_instrumentation()`
- `enable_httpx_instrumentation(sync=True, async_client=True)`
- `enable_urllib_instrumentation()`

### End-to-end architecture

1. App starts trace with `TraceManager.start_trace(...)`
2. Outbound instrumentation wraps HTTP client calls
3. Wrapper records method/url/status/latency/error + redacted payload details
4. Span is added to the active trace with `span.kind=client`
5. `TraceManager.finalize_and_send(...)` emits the full trace to SQS/Lambda pipeline

### FastAPI integration pattern (recommended)

Use **both**:

- `LLMOpsASGIMiddleware` for inbound request lifecycle
- outbound instrumentation for client calls made by your route/service code

```python
from fastapi import FastAPI
import httpx

from llmops_observability import (
    LLMOpsASGIMiddleware,
    TraceManager,
    enable_outbound_http_instrumentation,
)

app = FastAPI()
app.add_middleware(LLMOpsASGIMiddleware)

enable_outbound_http_instrumentation(
    requests_lib=True,
    httpx_lib=True,
    urllib_lib=True,
    httpx_sync=True,
    httpx_async=True,
)

@app.get("/proxy")
async def proxy():
    TraceManager.start_trace(name="proxy_call")
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com/data?token=secret", timeout=10.0)
    TraceManager.finalize_and_send(
        trace_name="proxy_call",
        trace_input={},
        trace_output={"status_code": resp.status_code},
    )
    return {"status_code": resp.status_code}
```

### Local validation (simple script)

A dedicated local test script is provided:

- `scripts/local_outbound_http_demo.py`

Run it from repo root:

```bash
pip install -e ".[http]"
python scripts/local_outbound_http_demo.py
```

What you should see:

- instrumentation status for `requests`, `httpx`, `urllib`
- 4 outbound spans printed (`requests`, `httpx`, `httpx-async`, `urllib`)
- redacted query parameter values (e.g., `token=[REDACTED]`)

Optional public endpoint test:

```bash
python scripts/local_outbound_http_demo.py --url https://httpbin.org/get
```

### Acceptance checklist for outbound

- [ ] `enable_outbound_http_instrumentation()` returns expected backend status
- [ ] Sync calls create spans (`requests`/`httpx.Client`/`urllib`)
- [ ] Async calls create spans (`httpx.AsyncClient`)
- [ ] Metadata includes `http.client.library`
- [ ] Sensitive values are redacted in URL/headers/body preview
- [ ] Outbound spans are correlated under active trace context
- [ ] `LLMOpsASGIMiddleware` behavior remains intact for inbound tracing

### 2. Automatic Environment & Project Injection

**Every span automatically gets `environment` and `project_id` in metadata:**

```python
# Set environment variables
os.environ["LLMOPS_PROJECT_ID"] = "new_test"
os.environ["ENV"] = "uat"

# Start trace
TraceManager.start_trace("my_operation")

# Every @track_function, @track_llm_call, @track_llm_agent, and @track_llm_tool span will automatically have:
# span.metadata = {
#     "environment": "uat",
#     "project_id": "new_test",
#     # ... other metadata ...
# }
```

✅ **No manual injection needed!** The SDK automatically adds these to every span.

### 3. Nested Spans (Parent-Child Relationships)

```python
@track_function()
def parent_function():
    # This creates a parent span
    child_result = child_function()
    return child_result

@track_function()
def child_function():
    # This automatically becomes a child of parent_function
    grandchild_result = grandchild_function()
    return grandchild_result

@track_function()
def grandchild_function():
    # This becomes a child of child_function
    return "result"

# Proper hierarchy maintained in Langfuse/NewRelic/S3
```

### 4. Outbound HTTP instrumentation (many clients, sync + async)

Use this when your app calls **external HTTP APIs** and you want each call as a **child span**
with the same redaction and metadata shape, regardless of client library.

**Supported clients (opt-in monkeypatching):**

| Library | Sync | Async | Notes |
|--------|------|-------|--------|
| **`requests`** | ✅ `Session.request` | — | Common sync client |
| **`httpx`** | ✅ `Client.request` | ✅ `AsyncClient.request` | Install: `pip install 'llmops-observability[http]'` or `pip install httpx` |
| **`urllib.request`** | ✅ `urlopen` | — | Stdlib; no extra dependency |

**FastAPI / Starlette:** use **`LLMOpsASGIMiddleware`** for **incoming** HTTP to your app. For **outgoing**
calls from route handlers or services, enable one of the clients above (`httpx` is typical for async apps).

**Shell `curl`:** not a Python API, so it is not monkeypatched. Use **`httpx`** or **`requests`** in code,
or wrap subprocess calls yourself if you must shell out.

**One-shot setup (recommended):**

```python
from llmops_observability import (
    TraceManager,
    enable_outbound_http_instrumentation,
)

# Enables requests + urllib always; enables httpx sync+async when httpx is installed
enable_outbound_http_instrumentation(
    requests_lib=True,
    httpx_lib=True,
    urllib_lib=True,
    httpx_sync=True,
    httpx_async=True,
)
```

**Or enable individually:** `enable_requests_instrumentation()`, `enable_httpx_instrumentation(sync=..., async_client=...)`, `enable_urllib_instrumentation()`. Disable all with `disable_outbound_http_instrumentation()`.

**`requests` example:**

```python
from llmops_observability import TraceManager, enable_requests_instrumentation
import requests

enable_requests_instrumentation()

TraceManager.start_trace(name="external_call_flow")

response = requests.get(
    "https://example.com/search?q=test&api_key=secret",
    headers={"Authorization": "Bearer super-secret-token"},
    timeout=5,
)

TraceManager.finalize_and_send(
    trace_name="external_call_flow",
    trace_input={"query": "test"},
    trace_output={"status_code": response.status_code},
)
```

**`httpx` async example:**

```python
import httpx
from llmops_observability import TraceManager, enable_httpx_instrumentation

enable_httpx_instrumentation(sync=True, async_client=True)
TraceManager.start_trace(name="async_http")

async def main():
    async with httpx.AsyncClient() as client:
        r = await client.get("https://api.example.com/v1/data", timeout=10.0)
    TraceManager.finalize_and_send(trace_name="async_http", trace_input={}, trace_output={"status": r.status_code})

# asyncio.run(main())
```

Captured outbound metadata includes (among others): `method`, `url` (redacted query params), `status_code`, `latency_ms`, `http.client.library` (`requests`, `httpx`, `httpx-async`, `urllib`), `http.call_mode` (`sync` or `async`), `error` (if any).

Quick filter snippet (demo-friendly):

```python
# spans = TraceManager._active.get("spans", [])
outbound_spans = [s for s in spans if s.metadata and s.metadata.get("outbound_http")]
sync_spans = [s for s in outbound_spans if s.metadata.get("http.call_mode") == "sync"]
async_spans = [s for s in outbound_spans if s.metadata.get("http.call_mode") == "async"]

print("outbound total:", len(outbound_spans))
print("sync outbound:", len(sync_spans))
print("async outbound:", len(async_spans))
```

Default redaction behavior:
- Redacts sensitive query/header/body keys (examples: `token`, `password`, `api_key`, `authorization`).
- Request body is captured as a redacted/truncated summary (safe preview), not full raw content.

Correlation:
- When a trace/span is active, outbound call spans are automatically linked as children.
- Inbound ASGI middleware behavior remains unchanged.

### 5. Response evaluation telemetry (score / label / rationale)

The SDK supports two evaluation paths:

1. **Decorator-first (recommended)**: enable evaluations directly on `@track_llm_call`
2. **Manual API**: call `record_evaluation(...)` yourself for custom/offline checks

#### Customer quick start (recommended)

Minimal tracing (no evaluation):

```python
from llmops_observability import TraceManager, track_llm_call

TraceManager.start_trace(name="chat_basic")

@track_llm_call()
def call_model(prompt: str):
    return {"output": {"message": {"content": [{"text": "answer"}]}}}
```

Enable built-in evaluation with one parameter:

```python
from llmops_observability import TraceManager, track_llm_call

TraceManager.start_trace(name="chat_with_eval")

@track_llm_call(evals=["correctness"])
def call_model(prompt: str):
    return {"output": {"message": {"content": [{"text": "Paris is the capital of France."}]}}}
```

Advanced usage with sampling (cost/latency control):

```python
from llmops_observability import TraceManager, track_llm_call

TraceManager.start_trace(name="chat_with_eval_sampling")

@track_llm_call(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    evals=["correctness"],
    sample_rate=0.2,  # evaluate ~20% of requests
    evaluation_metadata={"policy_version": "v1"},
)
def call_model(prompt: str):
    return {"output": {"message": {"content": [{"text": "answer"}]}}}
```

#### What happens automatically

- The SDK runs internal **LLM-as-a-judge** logic behind the scenes (no customer boilerplate).
- Each evaluation emits an `evaluation` child span with:
  - `score` (numeric)
  - `label` (categorical)
  - `rationale` (text)
- Evaluation spans are attached to the generation span via `parent_span_id` / `evaluated_span_id`.
- Evaluation runs **non-blocking** in the background.
- If evaluation fails, your main function still succeeds (failure is isolated and logged).

#### Backward compatibility

`evaluation=True` still works as an alias for the default criterion:

```python
@track_llm_call(evaluation=True)
def call_model(prompt: str):
    ...
```

Equivalent to:

```python
@track_llm_call(evals=["correctness"])
def call_model(prompt: str):
    ...
```

#### Manual evaluation API (for custom/offline checks)

Use `record_evaluation(...)` when you need explicit control (for example: rule engines, offline batch scoring, non-LLM downstream checks):

```python
from llmops_observability import TraceManager, record_evaluation, track_function

TraceManager.start_trace(name="rest_downstream_quality")

@track_function()
def call_status_service() -> dict:
    # ... your HTTP call ...
    healthy = True
    record_evaluation(
        name="status_service_eval",
        score=1.0 if healthy else 0.0,
        label="healthy" if healthy else "unhealthy",
        rationale="HTTP and payload checks passed.",
        evaluator_metadata={"method": "rules_v1"},
    )
    return {"ok": healthy}
```

#### Pattern: downstream non-LLM service (REST) quality — evaluate **inside** the client span

Wrap the HTTP call in `@track_function`, run your quality rules on the JSON/status, then call `record_evaluation(...)` **before `return`** so the evaluation nests under that downstream span.

> **Note:** this example uses `requests` (install with `pip install requests`, or `pip install -e ".[test]"` from this repo). Swap for `httpx`, stdlib `urllib`, or your internal HTTP client.

```python
import requests
from llmops_observability import TraceManager, record_evaluation, track_function

TraceManager.start_trace(name="rest_downstream_quality")

@track_function()
def call_status_service() -> dict:
    resp = requests.get("https://api.example.com/v1/status", timeout=10.0)
    data = resp.json() if resp.content else {}
    healthy = resp.status_code == 200 and data.get("ok") is True

    record_evaluation(
        name="status_service_eval",
        score=1.0 if healthy else 0.0,
        label="healthy" if healthy else "unhealthy",
        rationale=f"HTTP {resp.status_code}, body.ok={data.get('ok')}",
        evaluator_metadata={"service": "example_status", "client": "requests"},
    )
    return data

call_status_service()
TraceManager.finalize_and_send(trace_name="rest_downstream_quality", trace_input={}, trace_output={})
```

- **Span type**: `evaluation` (see `EVALUATION_SPAN_TYPE`).
- **Payload**: `score`, `label`, `rationale`, `trace_id`, and optional `parent_span_id` / `evaluated_span_id` are stored under `input_data.evaluation`.
- **Evaluator metadata**: use `evaluator_metadata={...}` for judge/model details; merged into span metadata as `metadata.evaluator`.
- **Validation**: malformed payloads raise clear `ValueError` messages (for example: invalid score, empty label/rationale, wrong metadata types, missing score+label+rationale).
- **No trace**: `record_evaluation` returns `None` when there is no active/finalized trace.

Async example:

```python
import asyncio
from llmops_observability import TraceManager, record_evaluation

async def evaluate_http_response(resp: dict) -> None:
    # Works for non-LLM downstream responses too.
    record_evaluation(
        score=1.0 if resp.get("status_code") == 200 else 0.0,
        label="pass" if resp.get("status_code") == 200 else "fail",
        rationale=f"HTTP status was {resp.get('status_code')}",
        evaluated_span_id=resp.get("span_id"),
        evaluator_metadata={"type": "rules", "name": "http_health_check"},
    )

TraceManager.start_trace(name="async_eval")
asyncio.run(evaluate_http_response({"status_code": 200, "span_id": "http_span_1"}))
TraceManager.finalize_and_send(trace_name="async_eval", trace_input={}, trace_output={})
```

### 6. Agentic crawl / search telemetry (Tavily / Scrapy-style workflows)

Model long-running **crawl or search sessions** with:
- a session span (`crawl_session`) for start/end + aggregate counters
- per-page spans (`crawl_page`) for fetches and URL/status/failure details
- per-event spans (`crawl_event`) for non-page workflow steps (search request, rerank, parse, etc.)

URLs and query payloads are **redacted** using the same policy as outbound HTTP metadata redaction.

```python
from llmops_observability import (
    TraceManager,
    crawl_session,
    record_crawl_page,
    record_crawl_event,
)

TraceManager.start_trace(name="research_agent")

with crawl_session(provider="scrapy", query="climate data", metadata={"run": "1"}):
    record_crawl_event(
        event_type="search_request",
        query={"q": "climate data", "api_key": "secret"},
        status="ok",
        duration_ms=15,
    )
    record_crawl_page(
        url="https://docs.example.com/guide?token=secret",
        query={"q": "climate data", "token": "secret"},
        status_code=200,
        latency_ms=120,
    )
    record_crawl_page(
        url="https://docs.example.com/missing",
        status_code=404,
        error="not found",
        latency_ms=40,
    )
    record_crawl_event(
        event_type="result_parse",
        status="failed",
        failure_reason="invalid html fragment",
        duration_ms=12,
    )

TraceManager.finalize_and_send(trace_name="research_agent", trace_input={}, trace_output={})
```

- **Span types**:
  - `crawl_session`: aggregate summary (`crawl.pages_ok`, `crawl.pages_fail`, `crawl.events_total`, `crawl.events_fail`, duration)
  - `crawl_page`: URL/query/status/failure/duration for each fetched page
  - `crawl_event`: generic step telemetry for search/crawl stages that are not page fetches
- **API**: `start_crawl_session` / `record_crawl_page` / `record_crawl_event` / `end_crawl_session`, or the **`crawl_session(...)`** context manager.
- **Stack**: Starting a session pushes a synthetic id on the span stack so page spans nest under it until `end_crawl_session`.
- **Partial failures**: failed pages/events increment failure counters while session completion telemetry is still emitted.

### 7. Transaction lifecycle telemetry (start/end + unique span id)

Use this API when you need an explicit transaction boundary (for example, "transaction logger Lambda" as a child call).

```python
import urllib.request
from llmops_observability import (
    TraceManager,
    start_transaction,
    end_transaction,
)

TraceManager.start_trace(name="transaction_flow")
txid = start_transaction(
    name="logger_lambda_transaction",
    metadata={"component": "transaction_logger"},
    input_data={"record_count": 3},
)

if txid:
    try:
        # Any outbound HTTP span recorded here becomes child of the transaction
        urllib.request.urlopen("https://api.example.com/lambda/log?token=secret", timeout=5)
        end_transaction(txid, status="success", output_data={"lambda_status": "accepted"})
    except Exception as exc:
        end_transaction(txid, status="error", error=str(exc))
        raise

TraceManager.finalize_and_send(trace_name="transaction_flow", trace_input={}, trace_output={})
```

Context manager / decorator helpers:

```python
from llmops_observability import transaction, track_transaction

with transaction(name="tx_with_context", metadata={"source": "orders"}):
    pass

@track_transaction(name="tx_decorated")
def run_business_step():
    return "ok"
```

Contract and guard rails:
- `start_transaction(...)` returns a unique span id (`uuid4().hex`) or `None` when tracing is unavailable.
- `end_transaction(transaction_id, ...)` returns `True` on success, `False` for unknown/already-ended ids (double-end guard).
- `get_open_transaction_ids()` helps detect missing-end situations in long-running workflows.
- Transaction span type is `transaction` (`TRANSACTION_SPAN_TYPE`) and is correlated using the active parent stack at start time.

## 📊 Data Flow Architecture

```
┌─────────────────────┐
│   Your LLM App      │
│  (with decorators)  │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  TraceManager       │
│  (collects spans)   │
│  + Auto-injects:    │
│    - environment    │
│    - project_id     │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  SQS Batch Workers  │
│  (compress & send)  │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  AWS SQS Queue      │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  Lambda Processor   │
│  (parallel routing) │
└─────┬──────┬────┬───┘
      │      │    │
      ▼      ▼    ▼
  Langfuse  S3  NewRelic
```

## 🔗 Distributed Tracing (Multi-Service Correlation)

Track requests across multiple microservices and correlate them into a single unified trace.

### Use Case
When your FastAPI app calls a downstream microservice (e.g., user_service), both emit trace segments to the same SQS queue. The EKS processor merges them into one complete trace before fanout to Langfuse/NewRelic/S3.

### Parent Service: @track_external_service

Mark a function that calls a downstream service. Automatically generates trace context headers to inject into the HTTP request:

```python
from fastapi import FastAPI
from llmops_observability import TraceManager, track_external_service, get_injected_headers
import httpx

app = FastAPI()

@app.post("/generate")
@track_external_service(service_name="user_service", transport="http")
async def generate(prompt: str):
    # get_injected_headers() returns trace context to forward
    headers = get_injected_headers()
    
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "http://user_service:8002/profile/123",
            headers=headers,  # Forward the trace context
            timeout=5,
        )
    
    # Parent trace automatically marks: has_external_segments=true
    # (only if downstream call succeeds)
    return {"response": response.json()}

# Lambda or manual invocation
TraceManager.start_trace(name="generate", user_id="alice")
result = generate("My prompt")
TraceManager.finalize_and_send(
    trace_name="generate",
    trace_input={"prompt": "My prompt"},
    trace_output=result,
)
```

**Key behavior:**
- Only marks `has_external_segments=true` if the downstream call **succeeds**
- If the downstream service is unavailable (connection refused, timeout), `has_external_segments` stays `false` and the parent processes immediately
- Prevents orphaned parent traces waiting for child segments that will never arrive

### Child Service: @resume_external_service

Consume the trace context from parent headers and emit a **child segment**:

```python
from fastapi import FastAPI
from llmops_observability import track_function, resume_external_service

app = FastAPI()

@app.get("/profile/{user_id}")
@resume_external_service()  # Extracts traceparent + x-llmops-trace-id
@track_function()
async def get_user_profile(user_id: str):
    # This function emits a child segment with the parent's trace_id
    profile = await db.fetch_user(user_id)
    return {"profile": profile}
```

**Key behavior:**
- Automatically extracts `traceparent` and `x-llmops-trace-id` headers from request
- Emits a **child segment** to SQS (marked `is_child_segment=true`)
- Child segment is staged in S3 under `child_traces/{trace_id}/` for later merge
- ASGI middleware skips auto-tracing (no duplicate envelope) when distributed headers are present

### Helper: get_injected_headers()

Call inside a `@track_external_service` function to get headers to forward:

```python
from llmops_observability import get_injected_headers

@track_external_service(service_name="downstream")
def call_api():
    headers = get_injected_headers()  # Returns {'traceparent': '...', 'x-llmops-trace-id': '...'}
    # Pass to your HTTP client
    requests.get("http://api.local:8002/endpoint", headers=headers)
```

### EKS Processor Flow

The Lambda handler in `otel_handler.py`:
1. **Decompresses** incoming messages from SQS
2. **Checks** if message is child (`is_child_segment=true`) or parent (`has_external_segments=true`)
3. **Stages child** segments in S3: `s3://bucket/child_traces/{trace_id}/{segment_id}.json`
4. **Merges** parent + all child segments (fetched from S3) into one trace
5. **Fans out** merged trace to Langfuse, NewRelic, S3 with complete span hierarchy

**Timeout & Cleanup:**
- If parent arrives without children after 60 seconds (sweep), it's treated as a standalone trace
- Orphaned children that arrive after parent has been processed are queued for late replay

### End-to-End Example

**FastAPI (parent request):**
```python
# POST /generate
@app.post("/generate")
@track_external_service(service_name="user_service")
async def generate(prompt: str):
    headers = get_injected_headers()
    resp = httpx.get("http://user_service:8002/profile/123", headers=headers)
    return {"result": resp.json()}
```

**Downstream Microservice (child):**
```python
# GET /profile/{user_id}
@app.get("/profile/{user_id}")
@resume_external_service()
async def get_profile(user_id: str):
    return {"name": "Alice", "tier": "premium"}
```

**Result in Langfuse:**
- Single trace `trace_id=...` with name "generate"
- Parent spans from FastAPI + child spans from user_service merged together
- Full hierarchy preserved (parent → child operations visible)

---

## 🎨 Decorator Reference

### @track_function - Complete Guide

#### Basic Usage (No Parameters)

```python
@track_function()
def process_data(input_data):
    # Automatically captures:
    # - Function name as span name
    # - Function arguments (args, kwargs)
    # - Return value
    # - Execution time
    # - Environment and project_id (auto-injected)
    return {"processed": input_data}
```

#### Parameter: `name` (Custom Span Name)

```python
@track_function(name="custom_span_name")
def my_function():
    # Span will appear as "custom_span_name" instead of "my_function"
    return result

# Use case: Make span names more descriptive in traces
@track_function(name="fetch_user_profile_from_db")
def get_user(user_id):
    return db.query(user_id)
```

#### Parameter: `metadata` (Add Custom Metadata)

```python
@track_function(metadata={"service": "auth", "priority": "high"})
def authenticate_user(username, password):
    # Span metadata will include:
    # {
    #     "service": "auth",
    #     "priority": "high",
    #     "environment": "uat",      # auto-injected
    #     "project_id": "new_test"   # auto-injected
    # }
    return auth_result

# Use case: Tag spans with business context
@track_function(metadata={
    "database": "postgres",
    "table": "users",
    "operation": "read"
})
def query_users(filters):
    return db.execute(query)
```

#### Parameter: `capture_locals=True` (Capture All Local Variables)

```python
@track_function(capture_locals=True)
def process_payment(amount, currency):
    user_id = "user_123"
    transaction_id = generate_id()
    tax = amount * 0.1
    total = amount + tax
    
    # All local variables captured in span.input_data.locals:
    # {
    #     "user_id": "user_123",
    #     "transaction_id": "txn_abc",
    #     "tax": 10.0,
    #     "total": 110.0,
    #     "amount": 100.0,
    #     "currency": "USD"
    # }
    
    return {"total": total}

# ⚠️ Warning: Can capture large amounts of data. Use for debugging only.
```

#### Parameter: `capture_locals=["var1", "var2"]` (Capture Specific Variables)

```python
@track_function(capture_locals=["user_id", "total"])
def process_payment(amount, currency):
    user_id = "user_123"
    transaction_id = generate_id()
    tax = amount * 0.1
    total = amount + tax
    
    # Only specified variables captured in span.input_data.locals:
    # {
    #     "user_id": "user_123",
    #     "total": 110.0
    # }
    # Note: transaction_id, tax, amount, currency are NOT captured
    
    return {"total": total}

# ✅ Recommended: Capture only what you need for debugging
```

#### Parameter: `capture_self=True` (Capture `self` in Class Methods)

```python
class PaymentProcessor:
    def __init__(self, merchant_id):
        self.merchant_id = merchant_id
        self.fee_rate = 0.029
    
    @track_function(capture_self=True)
    def process(self, amount):
        # Captures self attributes in span.input_data.self:
        # {
        #     "merchant_id": "merch_123",
        #     "fee_rate": 0.029
        # }
        fee = amount * self.fee_rate
        return amount - fee

# Use case: Debug class state during execution
class DatabaseConnection:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.connected = False
    
    @track_function(capture_self=True, capture_locals=["query"])
    def execute(self, query):
        # Captures both self and specific locals
        result = self._run_query(query)
        return result
```

#### Combined Parameters Example

```python
@track_function(
    name="complex_data_pipeline",
    metadata={"stage": "preprocessing", "version": "2.0"},
    capture_locals=["processed_count", "errors"],
    capture_self=False
)
def pipeline_stage(data):
    processed_count = 0
    errors = []
    temp_cache = {}  # Not captured
    
    for item in data:
        try:
            process_item(item)
            processed_count += 1
        except Exception as e:
            errors.append(str(e))
    
    return {"count": processed_count, "errors": errors}
```

---

### @track_llm_call - Complete Guide

#### Basic Usage (No Parameters)

```python
@track_llm_call()
def call_llm(prompt):
    # Automatically captures:
    # - Function arguments (prompt)
    # - LLM response
    # - Execution time
    # - Span type = "generation"
    # - Environment and project_id (auto-injected)
    response = bedrock_client.invoke_model(...)
    return response
```

#### Parameter: `name` (Custom Span Name)

```python
@track_llm_call(name="bedrock_claude_sonnet")
def call_llm(prompt):
    # Span appears as "bedrock_claude_sonnet" instead of "call_llm"
    response = bedrock_client.invoke_model(...)
    return response

# Use case: Distinguish between different LLM providers/models
@track_llm_call(name="openai_gpt4_turbo")
def call_openai(prompt):
    return openai.chat.completions.create(...)

@track_llm_call(name="anthropic_claude_opus")
def call_anthropic(prompt):
    return anthropic.messages.create(...)
```

#### Parameter: `model` (For Cost Calculation)

```python
@track_llm_call(model="anthropic.claude-3-sonnet-20240229-v1:0")
def call_bedrock(prompt):
    # SDK automatically calculates cost based on:
    # - Token usage from response
    # - Model pricing for Claude 3 Sonnet
    # 
    # Captured in span:
    # - usage.input_tokens
    # - usage.output_tokens
    # - cost.input_cost
    # - cost.output_cost
    # - cost.total_cost
    
    response = bedrock_client.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({"prompt": prompt})
    )
    return response

# Supported AWS Bedrock models (see pricing.py):
# - anthropic.claude-3-5-sonnet-20241022-v2:0
# - anthropic.claude-3-sonnet-20240229-v1:0
# - anthropic.claude-3-haiku-20240307-v1:0
# - anthropic.claude-3-opus-20240229-v1:0
# - And more...
```

#### Parameter: `metadata` (Add Custom Metadata)

```python
@track_llm_call(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    metadata={
        "temperature": 0.7,
        "max_tokens": 1000,
        "use_case": "code_generation"
    }
)
def generate_code(prompt):
    # Span metadata includes:
    # {
    #     "temperature": 0.7,
    #     "max_tokens": 1000,
    #     "use_case": "code_generation",
    #     "environment": "uat",      # auto-injected
    #     "project_id": "new_test"   # auto-injected
    # }
    return llm_response
```

#### Parameter: `capture_locals=True` (Capture All Locals)

```python
@track_llm_call(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    capture_locals=True
)
def enhanced_llm_call(user_query, context_docs):
    # Build prompt with context
    formatted_context = format_documents(context_docs)
    system_prompt = "You are a helpful assistant."
    final_prompt = f"{system_prompt}\n\nContext: {formatted_context}\n\nQuestion: {user_query}"
    
    # All locals captured:
    # {
    #     "user_query": "What is Python?",
    #     "context_docs": [...],
    #     "formatted_context": "...",
    #     "system_prompt": "You are a helpful assistant.",
    #     "final_prompt": "..."
    # }
    
    response = bedrock_client.invoke_model(...)
    return response
```

#### Parameter: `capture_locals=["prompt", "temperature"]` (Specific Variables)

```python
@track_llm_call(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    capture_locals=["final_prompt", "model_config"]
)
def call_with_config(user_input):
    model_config = {"temperature": 0.7, "max_tokens": 1000}
    system_message = "You are an AI assistant."  # NOT captured
    final_prompt = f"{system_message}\n\n{user_input}"
    temp_cache = {}  # NOT captured
    
    # Only captures:
    # {
    #     "final_prompt": "...",
    #     "model_config": {"temperature": 0.7, "max_tokens": 1000}
    # }
    
    response = call_llm(final_prompt, **model_config)
    return response
```

#### Parameter: `capture_self=True` (For Class Methods)

```python
class LLMOrchestrator:
    def __init__(self, model_id, api_key):
        self.model_id = model_id
        self.api_key = api_key
        self.request_count = 0
        self.total_cost = 0.0
    
    @track_llm_call(
        model="anthropic.claude-3-sonnet-20240229-v1:0",
        capture_self=True
    )
    def call_llm(self, prompt):
        # Captures self attributes:
        # {
        #     "model_id": "anthropic.claude-3-sonnet-20240229-v1:0",
        #     "request_count": 5,
        #     "total_cost": 0.042
        # }
        # Note: api_key might be captured - be careful with secrets!
        
        self.request_count += 1
        response = self._invoke_model(prompt)
        return response
```

#### Combined Parameters Example (Production Pattern)

```python
class ChatbotService:
    def __init__(self, model_id):
        self.model_id = model_id
        self.system_prompt = "You are a helpful chatbot."
    
    @track_llm_call(
        name="chatbot_generation",
        model="anthropic.claude-3-sonnet-20240229-v1:0",
        metadata={
            "service": "customer_support",
            "model_version": "v2.0",
            "priority": "high"
        },
        capture_locals=["full_prompt", "temperature"],
        capture_self=False  # Don't capture self to avoid secrets
    )
    def generate_response(self, user_message, conversation_history):
        temperature = 0.7
        full_prompt = self._build_prompt(user_message, conversation_history)
        cache_key = hash(full_prompt)  # Not captured
        
        response = bedrock_client.invoke_model(
            modelId=self.model_id,
            body=json.dumps({
                "prompt": full_prompt,
                "temperature": temperature,
                "max_tokens": 1000
            })
        )
        return response
```

---

### Quick Reference Table

| Parameter | @track_function | @track_llm_call | Type | Description |
|-----------|----------------|-----------------|------|-------------|
| `name` | ✅ | ✅ | `str` | Custom span name |
| `metadata` | ✅ | ✅ | `Dict[str, Any]` | Additional metadata |
| `capture_locals` | ✅ | ✅ | `bool` or `List[str]` | Capture local variables |
| `capture_self` | ✅ | ✅ | `bool` | Capture `self` in methods |
| `model` | ❌ | ✅ | `str` | Model ID for cost calculation |

---

### Best Practices

#### ✅ DO

```python
# Capture specific variables for debugging
@track_function(capture_locals=["error_code", "retry_count"])

# Use metadata for business context
@track_function(metadata={"team": "payments", "priority": "critical"})

# Specify model for accurate cost tracking
@track_llm_call(model="anthropic.claude-3-sonnet-20240229-v1:0")

# Use descriptive names
@track_llm_call(name="rag_retrieval_claude")
```

#### ❌ DON'T

```python
# Don't capture all locals in production (too much data)
@track_function(capture_locals=True)  # Only for debugging!

# Don't capture self if it contains secrets
@track_function(capture_self=True)  # Check for API keys first!

# Don't use generic names
@track_function(name="function_1")  # Not helpful

# Don't forget model for LLM calls
@track_llm_call()  # Missing model = no cost calculation
```

---

## 📈 What Gets Captured

### Trace-Level Data
- `trace_id`, `trace_name`, `project_id`, `environment`
- `user_id`, `session_id`
- `start_time`, `end_time`, `duration_ms`
- `trace_input`, `trace_output`
- `metadata`, `tags`
- `total_spans`, `total_generations`
- `sdk_name`, `sdk_version`

### Span-Level Data (Auto-captured for every span)
- **Core**: `span_id`, `span_name`, `span_type`, `parent_span_id`
- **Timing**: `start_time`, `end_time`, `duration_ms`
- **I/O**: `input_data`, `output_data`
- **Status**: `status`, `status_message`, `error`
- **LLM**: `model_id`, `prompt`, `response`
- **Usage**: `usage.input_tokens`, `usage.output_tokens`, `usage.total_tokens`
- **Cost**: Calculated from model pricing
- **Metadata**: `environment`, `project_id` (auto-injected), custom metadata
- **Context**: `tags`, `level`

## 🔧 Configuration Reference

### Size Limits (in config.py)

```python
MAX_OUTPUT_SIZE = 200 * 1024      # 200 KB - max individual field
MAX_SPAN_IO_SIZE = 20_000          # 20 KB - span input/output
MAX_TRACE_IO_SIZE = 50_000         # 50 KB - trace input/output
MAX_SQS_SIZE = 200_000             # 200 KB - SQS message
PROMPT_RESPONSE_MAX_SIZE = 10_000  # 10 KB - prompt/response fields
```

### SQS Configuration

```python
SQS_WORKER_COUNT = 4           # Background worker threads
SQS_BATCH_SIZE = 10            # Batch size before flush
SQS_BATCH_TIMEOUT = 0.2        # Timeout in seconds
SQS_FLUSH_TIME_THRESHOLD = 0.15
SQS_SHUTDOWN_TIMEOUT = 1.0
SQS_RETRY_ATTEMPTS = 3
SQS_RETRY_DELAY = 0.1
SQS_RETRY_BACKOFF_MULTIPLIER = 2.0
SQS_RETRY_MAX_DELAY = 0.5
SQS_IMMEDIATE_MAX_BLOCK_MS = 250
```

### Safe export hardening knobs (Substory 5)

The SDK now applies hardening by default across exported telemetry:

- **Idempotency**: trace payload + each span includes deterministic `idempotency_key`.
- **Redaction**: export path applies centralized deep redaction before serialization.
- **Non-blocking sink behavior**: immediate sends use bounded retry + backoff; failures spill to disk.

Tune via environment variables:

```bash
LLMOPS_SQS_IMMEDIATE_MAX_BLOCK_MS=250
LLMOPS_SQS_RETRY_BACKOFF_MULTIPLIER=2.0
LLMOPS_SQS_RETRY_MAX_DELAY=0.5
```

Operational guidance:
- Keep `LLMOPS_SQS_IMMEDIATE_MAX_BLOCK_MS` low (100–300ms) for request-path safety.
- Use spillover file volume alerts to detect prolonged sink outages.
- Recovery path is automatic: spillover entries are re-queued at next startup.

## 🏭 Production Best Practices

### 1. Proper Trace Lifecycle

```python
try:
    # Start trace
    TraceManager.start_trace("operation_name")
    
    # Your application logic with decorators
    result = my_tracked_function()
    
    # Finalize with trace data
    TraceManager.finalize_and_send(
        trace_input={"request": "data"},
        trace_output={"response": result}
    )
except Exception as e:
    # Trace will still be sent with error information
    logger.error(f"Error: {e}")
```

### 2. Environment-Specific Configuration

```python
# production.env
PROJECT_ID=my_app
ENV=production
AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123/prod-queue

# staging.env
PROJECT_ID=my_app
ENV=staging
AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123/staging-queue
```

### 3. Async Support

```python
@track_function()
async def async_function():
    result = await some_async_operation()
    return result

@track_llm_call(model="...")
async def async_llm_call():
    response = await async_bedrock_call()
    return response
```

## 📝 Example: Complete RAG Pipeline

# Track LLM calls
@track_llm_call()
def call_bedrock(prompt):
    # Call your LLM
    response = bedrock_client.converse(
        modelId="anthropic.claude-3-sonnet",
        messages=[{"role": "user", "content": prompt}]
    )
    return response

# Use the functions
result = process_data("some data")
llm_response = call_bedrock("Hello, world!")

# End the trace (flushes to Langfuse)
TraceManager.end_trace()
```

**Method 2: Explicit Project and Environment Override**
```python
# Override PROJECT_ID and ENV from .env
TraceManager.start_trace(
    name="chat_message",  # Operation name
    project_id="custom_project",  # Override PROJECT_ID
    environment="staging",  # Override ENV
    metadata={"user_id": "123"},
)

# Your code...

TraceManager.end_trace()
```

**Method 3: Using `finalize_and_send()` (llmops-observability)**
```python
# Start trace
TraceManager.start_trace(name="chat_session")

# Your code
user_input = "What is machine learning?"
response = await llm.generate(user_input)

# Finalize with input/output in one call
TraceManager.finalize_and_send(
    user_id="user_123",
    session_id="session_456",
    trace_name="chat_message",
    trace_input={"user_msg": user_input},
    trace_output={"bot_response": str(response)}
)
```

### 3. Capture Local Variables (Debugging)

```python
@track_function(capture_locals=True)
def complex_calculation(x, y, z):
    intermediate = x + y
    result = intermediate * z
    final = result ** 2
    # All local variables are captured in Langfuse
    return final

# Capture specific variables only
@track_function(capture_locals=["important_var", "result"])
def selective_capture(data):
    important_var = process(data)
    temp_var = "not captured"
    result = transform(important_var)
    return result
```

### 4. Nested Spans (Parent-Child Tracking)

```python
@track_function(name="parent_task")
def parent_function():
    data = fetch_data()
    # Child spans are automatically nested
    processed = child_function(data)
    return processed

@track_function(name="child_task")
def child_function(data):
    return data.upper()

# Langfuse will show: parent_task → child_task
```

### 5. ASGI Middleware (FastAPI Auto-Tracing)

```python
from fastapi import FastAPI
from llmops_observability import LLMOpsASGIMiddleware

app = FastAPI()
app.add_middleware(LLMOpsASGIMiddleware, service_name="my_api")

@app.get("/")
async def root():
    # Request is automatically traced
    return {"message": "Hello World"}

@app.post("/generate")
async def generate(prompt: str):
    # All decorated functions within request are nested
    result = await generate_text(prompt)
    return result
```

### 6. SQS Event Streaming (Event-Driven Architecture)

For event-driven, scalable deployments, the SDK supports optional event streaming to AWS SQS. Trace events are published to SQS queues where Lambda functions (or other consumers) can process them asynchronously:

```
Application (sends trace events)
    ↓
SQS Queue (decoupled message broker)
    ↓
Lambda Consumers (process & forward)
    ↓ ↓ ↓
  S3  New Relic  Datadog  (etc.)
```

**Setup:**

```bash
# Enable SQS streaming by setting AWS_SQS_URL
export AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123456789/my-queue
export AWS_PROFILE=default
export AWS_REGION=us-east-1
```

```python
from llmops_observability import TraceManager, track_function

# When AWS_SQS_URL is set, events are automatically streamed to SQS
TraceManager.start_trace(
    name="chat_message",
    metadata={"channel": "web"}
)

@track_function()
def process_message(msg):
    return process(msg)

# All trace events are batched and sent to SQS (non-blocking)
TraceManager.end_trace()
```

**Lambda Consumer Example:**

```python
import json
import boto3

s3_client = boto3.client('s3')
newrelic = boto3.client('cloudwatch')  # Or use New Relic SDK

def lambda_handler(event, context):
    """Process trace events from SQS"""
    for record in event['Records']:
        # Parse trace event from SQS message
        trace_event = json.loads(record['body'])
        
        # Store to S3
        s3_client.put_object(
            Bucket='trace-events',
            Key=f"{trace_event['trace_id']}.json",
            Body=json.dumps(trace_event)
        )
        
        # Send metrics to New Relic
        if trace_event['event_type'] == 'llm_call':
            newrelic.put_metric_data(
                Namespace='LLMOps',
                MetricData=[{
                    'MetricName': 'TokenUsage',
                    'Value': trace_event['tokens_used'],
                    'Unit': 'Count'
                }]
            )
```

**SQS Features:**
- ✅ **Automatic Batching**: Groups events for efficient SQS sending (batch size 1-10)
- ✅ **Spillover Recovery**: Saves messages to disk if SQS is unavailable, retries on restart
- ✅ **Daemon Workers**: 4 background threads handle async SQS operations
- ✅ **Clean Shutdown**: Graceful shutdown flushes pending messages
- ✅ **Resilient**: Auto-restart failed workers, exponential backoff
- ✅ **No Blocking**: SQS operations never block main application thread

**Events Streamed to SQS:**
- `trace_start`: Trace initialization with metadata
- `span_created`: Function execution tracking
- `llm_call`: LLM API calls with token usage
- `trace_end`: Trace completion with duration

**Configuration:**
```bash
# Required: SQS queue URL
export AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123456789/llm-traces

# Optional: AWS authentication (defaults to IAM role if in Lambda/EC2)
export AWS_PROFILE=custom-profile  # Default: "default"
export AWS_REGION=eu-west-1        # Default: "us-east-1"
```

**Use Cases:**
- 📊 Send trace events to New Relic, Datadog, CloudWatch
- 💾 Archive all LLM interactions to S3 for compliance/audit
- 🔄 Post-processing: cost calculation, quality analysis, retraining data
- 🚀 Scale: decouple application from storage/monitoring infrastructure

### 📥 Incoming SDK Message Schema

When SQS streaming is enabled, the SDK sends trace data in a compressed SQS message format that Lambda consumers can decompress and process. This section documents the message format and decompressed payload structure.

#### SQS Message Wrapper Format

```json
{
  "compressed": true,
  "data": "H4sIANPGn2YC/...",
  "trace_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d30",
  "type": "SDKTraceData"
}
```

**Wrapper Fields:**
- **`compressed`** (boolean): Indicates Base64 + Gzip compression is applied
- **`data`** (string): Base64-encoded, Gzip-compressed JSON payload
- **`trace_id`** (string): Unique trace identifier for deduplication
- **`type`** (string): Message type identifier ("SDKTraceData")

#### Decompressed SDK Trace Data Schema

```json
{
  "trace_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d30",
  "trace_name": "rag_pipeline_example",
  "project_id": "new_test",
  "environment": "uat",
  "user_id": "user_123",
  "session_id": "session_456",
  
  "start_time": 1769446311.0,
  "end_time": 1769446318.021,
  "duration_ms": 7021,
  
  "trace_input": {
    "user_msg": "What is Android ????"
  },
  "trace_output": {
    "bot_response": "Android is a mobile operating system..."
  },
  
  "token_usage": {
    "total_input_tokens": 145,
    "total_output_tokens": 87,
    "total_tokens": 232
  },
  
  "cost": {
    "total_cost": 0.000456
  },
  
  "spans": [
    {
      "span_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d31",
      "span_name": "retrieve_context",
      "span_type": "span",
      "parent_span_id": null,
      
      "start_time": 1769446311.0,
      "end_time": 1769446313.0,
      "duration_ms": 2000,
      
      "input_data": {"args": [], "kwargs": {"query": "Android"}, "locals": {}},
      "output_data": {"output": ["Doc 1", "Doc 2"]},
      
      "error": null,
      "model_id": null,
      "status": "success",
      
      "metadata": {
        "environment": "uat",
        "project_id": "new_test"
      },
      "tags": []
    },
    {
      "span_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d32",
      "span_name": "call_llm",
      "span_type": "generation",
      "parent_span_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d31",
      
      "start_time": 1769446313.0,
      "end_time": 1769446318.021,
      "duration_ms": 5021,
      
      "input_data": {"args": [], "kwargs": {"prompt": "Context: Doc 1, Doc 2\n\nQuestion: What is Android ????"}, "locals": {}},
      "output_data": {"output": {"message": {"content": "Android is a mobile operating system..."}}},
      
      "error": null,
      "model_id": "anthropic.claude-3-sonnet-20240229-v1:0",
      
      "usage": {
        "input_tokens": 145,
        "output_tokens": 87,
        "total_tokens": 232
      },
      
      "prompt": "Context: Doc 1, Doc 2\n\nQuestion: What is Android ????",
      "response": "Android is a mobile operating system developed by Google...",
      
      "metadata": {
        "environment": "uat",
        "project_id": "new_test"
      },
      
      "status": "success",
      "tags": []
    }
  ],
  
  "metadata": {
    "version": "1.0.0"
  },
  "tags": ["example", "rag"],
  
  "total_spans": 2,
  "total_generations": 1,
  
  "sdk_name": "llmops-observability",
  "sdk_version": "2.0.0"
}
```

#### Key Fields Reference

| Field | Type | Description | Auto-Injected |
|-------|------|-------------|---|
| `trace_id` | string | Unique trace identifier | - |
| `trace_name` | string | Trace/operation name | - |
| `project_id` | string | Project identifier from `PROJECT_ID` env var | ✅ |
| `environment` | string | Environment from `ENV` env var | ✅ |
| `user_id` | string | User identifier (optional) | - |
| `session_id` | string | Session identifier (optional) | - |
| `start_time` | float | Unix timestamp (seconds) | - |
| `end_time` | float | Unix timestamp (seconds) | - |
| `duration_ms` | int | Trace duration in milliseconds | - |
| `spans[].metadata.environment` | string | Environment (auto-injected to every span) | ✅ |
| `spans[].metadata.project_id` | string | Project ID (auto-injected to every span) | ✅ |
| `spans[].token_usage` | object | Input/output token counts | - |
| `spans[].cost` | object | Token cost calculation (Bedrock models) | - |

#### Lambda Decompression Example

```python
import json
import gzip
import base64

def lambda_handler(event, context):
    """Decompress and process SDK trace messages from SQS"""
    for record in event['Records']:
        # Parse SQS message
        message = json.loads(record['body'])
        
        if message.get('compressed'):
            # Decode Base64
            compressed_data = base64.b64decode(message['data'])
            
            # Decompress Gzip
            decompressed_data = gzip.decompress(compressed_data)
            
            # Parse JSON
            trace_data = json.loads(decompressed_data)
        else:
            trace_data = message
        
        # Now trace_data contains the full SDK trace with spans
        print(f"Trace: {trace_data['trace_id']}")
        print(f"Project: {trace_data['project_id']}")
        print(f"Environment: {trace_data['environment']}")
        print(f"Spans: {len(trace_data['spans'])}")
        
        # Process further (send to Langfuse, S3, NewRelic, etc.)
        process_trace(trace_data)
```

#### Size Limits and Truncation

- **Max Message Size**: 256KB (SQS hard limit)
- **Auto-Truncation**: Fields > 200KB are automatically truncated
- **Fallback to Disk**: If SQS is unavailable, messages spill to disk and retry on restart
- **Compression**: Typical traces compress to 10-30% of original size

### 8. Token Pricing & Cost Calculation

The SDK includes built-in AWS Bedrock token pricing for cost analysis:

```python
from llmops_observability.pricing import calculate_cost

# Calculate cost for a single LLM call
cost = calculate_cost(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    input_tokens=1500,
    output_tokens=800
)

print(f"Cost: ${cost:.4f}")  # Output: Cost: $0.0075

# Analyze costs by model
models_used = {
    "anthropic.claude-3-sonnet-20240229-v1:0": {
        "input_tokens": 10000,
        "output_tokens": 5000
    },
    "anthropic.claude-3-haiku-20240307-v1:0": {
        "input_tokens": 20000,
        "output_tokens": 10000
    }
}

total_cost = sum(
    calculate_cost(model, data["input_tokens"], data["output_tokens"])
    for model, data in models_used.items()
)
print(f"Total cost: ${total_cost:.4f}")
```

**Supported Models:**
- Claude 3.5 Sonnet (all variants)
- Claude 3 Sonnet/Opus/Haiku
- Claude 2.1 & 2.0
- Amazon Titan Text (Express, Lite)
- Cohere Command
- AI21 Jurassic
- Meta Llama 2 & 3

**Pricing Reference:**
All prices are updated as of 2024 and reflect AWS Bedrock official pricing. Update the pricing table in [src/llmops_observability/pricing.py](src/llmops_observability/pricing.py) as needed.

### 9. Async Support

```python
@track_function()
async def async_process(data):
    return await some_async_operation(data)

@track_llm_call(name="summarize")
async def async_llm_call(text):
    return await chain.ainvoke({"text": text})

# Both sync and async work seamlessly
```

### Per-Application Configuration

Each Gen AI application using this SDK should have **its own Langfuse project and credentials**. This ensures proper isolation and organization.

#### Step 1: Create Langfuse Project
1. Go to your Langfuse instance
2. Create a new project for your application (e.g., "chatbot-api", "doc-analyzer")
3. Copy the project's public key, secret key, and base URL

#### Step 2: Configure in Your Application

**Method 1: Environment Variables** (Recommended for production)

```bash
# .env file in your application root
LANGFUSE_PUBLIC_KEY=pk-lf-abc123...
LANGFUSE_SECRET_KEY=sk-lf-xyz789...
LANGFUSE_BASE_URL=https://langfuse.company.com
LANGFUSE_VERIFY_SSL=false
```

```python
from llmops_observability import TraceManager
from dotenv import load_dotenv

load_dotenv()  # Loads .env from current directory
# SDK auto-configures from environment variables
```

**Method 2: Explicit Configuration** (Recommended for testing)

```python
from llmops_observability import configure
import os

# At application startup (e.g., main.py)
configure(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    base_url=os.getenv("LANGFUSE_BASE_URL"),
    verify_ssl=False
)
```

### Environment Variables Reference

| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `LANGFUSE_PUBLIC_KEY` | Yes | None | Langfuse public key from your project |
| `LANGFUSE_SECRET_KEY` | Yes | None | Langfuse secret key from your project |
| `LANGFUSE_BASE_URL` | Yes | None | Langfuse instance URL |
| `LANGFUSE_VERIFY_SSL` | No | `false` | Whether to verify SSL certificates |
| `PROJECT_ID` | No | `unknown_project` | Project identifier (used as trace name in Langfuse) |
| `ENV` | No | `development` | Environment name (production, staging, development, etc.) - automatically mapped to `LANGFUSE_TRACING_ENVIRONMENT` |
| `MODEL_ID` | No | `anthropic.claude-3-5-sonnet-20241022-v2:0` | Default model ID for cost calculation when not explicitly provided |
| `AWS_SQS_URL` | No | None | AWS SQS queue URL (when provided, enables SQS event streaming) |
| `AWS_PROFILE` | No | `default` | AWS profile name for SQS authentication |
| `AWS_REGION` | No | `us-east-1` | AWS region for SQS |
| `LANGFUSE_DEBUG` | No | `false` | Enable debug logging for Langfuse client |

**Environment Tracking:**
- The `ENV` variable is automatically mapped to Langfuse's `LANGFUSE_TRACING_ENVIRONMENT`
- This applies the environment as a top-level attribute to all traces and observations
- Allows easy filtering by environment in Langfuse UI
- Must follow regex: `^(?!langfuse)[a-z0-9-_]+$` with max 40 characters
Track regular function execution with optional local variable capture.

```python
@track_function()
def my_function(x, y):
    return x + y

@track_function(name="custom_name", tags={"version": "1.0"})
def another_function():
    pass

# Capture all local variables for debugging
@track_function(capture_locals=True)
def debug_function(data):
    step1 = process(data)
    step2 = transform(step1)
    return step2  # All locals captured in Langfuse

# Capture specific variables only
@track_function(capture_locals=["result", "important_var"])
def selective_function(input):
    temp = input * 2  # Not captured
    result = temp + 10  # Captured
    important_var = compute(result)  # Captured
    return important_var
```

**Parameters:**
- `name`: Custom span name (default: function name)
- `tags`: Dictionary of tags/metadata
- `capture_locals`: Capture local variables - `True` (all), `False` (none), or list of variable names
- `capture_self`: Whether to capture `self` in methods (default: `True`)

## API Reference

### TraceManager

#### `start_trace(name, project_id=None, environment=None, metadata=None, user_id=None, session_id=None, tags=None)`
Start a new trace with project and environment tracking.

```python
TraceManager.start_trace(
    name="chat_message",  # Operation name (required)
    project_id="my_project",  # Optional: defaults to PROJECT_ID env var
    environment="production",  # Optional: defaults to ENV env var
    metadata={"custom": "data"},
    user_id="user_123",
    session_id="session_456",
    tags=["experiment"]
)
```

**Parameters:**
- `name` (required): Operation/trace name (e.g., "chat_message", "document_analysis")
- `project_id` (optional): Project identifier. Defaults to `PROJECT_ID` from `.env`. Used as trace name in Langfuse.
- `environment` (optional): Environment name (e.g., "production", "staging"). Defaults to `ENV` from `.env`. Automatically mapped to `LANGFUSE_TRACING_ENVIRONMENT`.
- `metadata` (optional): Custom metadata dictionary
- `user_id` (optional): User identifier
- `session_id` (optional): Session identifier
- `tags` (optional): List of tags

**Returns:** Trace ID (string)

**Example with .env auto-loading:**
```bash
# .env file
PROJECT_ID=chatbot-api
ENV=production
```

```python
# Automatically uses PROJECT_ID and ENV from .env
TraceManager.start_trace(
    name="user_query",
    metadata={"version": "2.0"}
)
# Trace name in Langfuse: "chatbot-api"
# Environment in Langfuse: "production"
```

#### `end_trace()`
End the current trace and flush to Langfuse.

```python
TraceManager.end_trace()
```

#### `finalize_and_send(user_id, session_id, trace_name, trace_input, trace_output)`
Finalize and send the trace with input/output metadata.

This is a convenience method that combines setting trace metadata and ending the trace in one call.

```python
TraceManager.start_trace(name="chat_message")

# ... your code executes ...

# Finalize with input/output details
TraceManager.finalize_and_send(
    user_id="user_123",
    session_id="session_456",
    trace_name="bedrock_chat_message",
    trace_input={"user_msg": "What is Python?"},
    trace_output={"bot_response": "Python is a programming language..."}
)
```

**Parameters:**
- `user_id`: User identifier
- `session_id`: Session identifier
- `trace_name`: Name for the trace (can override the initial name)
- `trace_input`: Dictionary containing the input data
- `trace_output`: Dictionary containing the output/response data

#### `end_trace()` vs `finalize_and_send()` - When to Use?

| Method | Purpose | When to Use | Example |
|--------|---------|------------|---------|
| `end_trace()` | Simply close trace, flush to Langfuse | Simple operations without trace-level input/output | Process data, internal workflows |
| `finalize_and_send()` | Close trace + capture end-to-end input/output | When you want full conversation/request visibility | User query → Bot response, LLM interactions |

**Code Comparison:**

```python
# Simple: Just close the trace
TraceManager.start_trace(name="chat_message")
result = process_data("some data")
llm_response = call_bedrock("Hello, world!")
TraceManager.end_trace()
# → Individual spans are captured, but no trace-level input/output
```

```python
# Full Visibility: Capture entire flow
TraceManager.start_trace(name="chat_session")
user_input = "What is machine learning?"
response = await llm.generate(user_input)
TraceManager.finalize_and_send(
    user_id="user_123",
    session_id="session_456",
    trace_name="chat_message",
    trace_input={"user_msg": user_input},        # ← What went in
    trace_output={"bot_response": str(response)} # ← What came out
)
# → Both span-level AND trace-level input/output captured for complete visibility
```

**In Langfuse UI:**
- `end_trace()`: Shows individual function spans with their inputs/outputs
- `finalize_and_send()`: Shows complete conversation flow + individual spans

### Decorators

Use these decorators together for agentic GenAI apps:

- `@track_function(...)` for non-LLM business logic and helper spans
- `@track_llm_call(...)` for model generation calls (`span_type = generation`)
- `@track_llm_agent(...)` for agent orchestrator loops (`span_type = agent`)
- `@track_llm_tool(...)` for tool execution and dispatch (`span_type = tool`)

#### `@track_function(name=None, metadata=None, ...)`
Track regular function execution.

```python
@track_function(name="retrieve", metadata={"component": "vector_search"})
def retrieve(query: str):
    return search_index(query)
```

#### `@track_llm_call(name=None, model=None, metadata=None, ...)`
Track LLM generation calls with automatic model and cost tracking.

```python
@track_llm_call(name="bedrock_converse", model="anthropic.claude-3-sonnet-20240229-v1:0")
def call_llm(messages):
    return bedrock.converse(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        messages=messages,
    )
```

#### `@track_llm_agent(name=None, metadata=None, ...)`
Track agent orchestrator spans (planner/loop/controller).

```python
@track_llm_agent(name="agent_loop", metadata={"component": "agent_orchestrator"})
def agent_loop(user_query: str):
    # planning + tool routing + iteration control
    ...
```

#### `@track_llm_tool(name=None, metadata=None, ...)`
Track tool execution spans.

```python
@track_llm_tool(name="handle_tool_call", metadata={"component": "tool_dispatch"})
def handle_tool_call(tool_name: str, tool_input: dict):
    return TOOL_REGISTRY[tool_name](**tool_input)
```

### Agentic Integration Pattern (Recommended)

```python
from llmops_observability import (
    TraceManager,
    track_function,
    track_llm_call,
    track_llm_agent,
    track_llm_tool,
)

@track_llm_tool(name="retrieve_context")
def retrieve_context_tool(query: str):
    return retrieve(query)

@track_llm_call(name="bedrock_converse")
def call_llm(messages, tools):
    return bedrock.converse(modelId="...", messages=messages, toolConfig={"tools": tools})

@track_llm_agent(name="agent_loop")
def agent_loop(user_query: str):
    # calls call_llm(...), handles tool_use, invokes retrieve_context_tool(...)
    ...
```

### Migration Notes for Existing Apps

1. Keep `@track_llm_call` on model invocation functions only.
2. Move top-level orchestrator functions from `@track_function` to `@track_llm_agent`.
3. Mark tool implementations and tool dispatch functions with `@track_llm_tool`.
4. Keep retrieval/parsing/helper utilities under `@track_function`.

This gives clear span types in observability backends and improves agent graph readability for debugging and performance analysis.

## Advanced Features

### Nested Spans & Parent-Child Relationships

The SDK automatically handles nested function calls, creating parent-child relationships in Langfuse:

```python
@track_function(name="orchestrator")
def main_workflow(user_query):
    # This is the parent span
    context = retrieve_documents(user_query)  # Child span 1
    answer = generate_response(user_query, context)  # Child span 2
    return answer

@track_function(name="retrieval")
def retrieve_documents(query):
    # This becomes a child of main_workflow
    return db.search(query)

@track_function(name="generation")
def generate_response(query, context):
    # This also becomes a child of main_workflow
    return llm.generate(query, context)
```

### Data Size Management

The SDK automatically limits output size to **200KB** to prevent issues with large data:

- Outputs larger than 200KB are truncated with metadata
- Preview of first ~1KB is included
- Prevents memory/network issues with large responses

### ASGI Middleware for FastAPI

Automatically trace all HTTP requests:

```python
from fastapi import FastAPI
from llmops_observability import LLMOpsASGIMiddleware, track_function

app = FastAPI()
app.add_middleware(LLMOpsASGIMiddleware, service_name="chatbot_api")

@app.post("/chat")
async def chat_endpoint(message: str):
    # Entire request is automatically traced
    response = process_message(message)
    return {"response": response}

@track_function()
def process_message(msg):
    # This becomes a child span of the HTTP request trace
    return "Response"
```

The middleware captures:
- Request method, path, headers
- Response status code
- Request duration
- User agent, client IP
- Automatic trace naming: `{project}_{hostname}`

## Project Structure

```
llmops-observability_sdk/
├── src/
│   └── llmops_observability/
│       ├── __init__.py                # Public API & exports
│       ├── config.py                  # Langfuse client + SQS configuration
│       ├── trace_manager.py           # Core TraceManager class & @track_function decorator
│       ├── llm.py                     # @track_llm_call decorator with LLM response parsing
│       ├── models.py                  # SpanContext, TraceConfig data models
│       ├── asgi_middleware.py         # FastAPI/Starlette ASGI middleware
│       ├── sqs.py                     # Production SQS sender with batching & spillover
│       └── pricing.py                 # AWS Bedrock token pricing calculator
├── pyproject.toml                     # Project metadata & dependencies
└── README.md                          # This file
```

**Module Details:**

- **config.py**: Manages Langfuse client initialization and SQS configuration
- **trace_manager.py**: Core orchestration - handles trace lifecycle, nested spans, Langfuse API calls
- **llm.py**: LLM call decorator with support for 10+ LLM provider response formats
- **sqs.py**: Production-grade SQS integration with 4 daemon workers, batching, spillover recovery
- **pricing.py**: Token cost calculator for 15+ AWS Bedrock model variants
- **asgi_middleware.py**: Automatic HTTP request tracing for FastAPI applications

## Architecture

### Direct Langfuse Mode (Default)

```
Application
    ↓
TraceManager
    ↓
Langfuse (Real-time)
```

Traces are sent immediately to Langfuse with no intermediate storage or batching.

### SQS Event Streaming Mode (Event-Driven)

```
Application
    ↓
TraceManager → SQS Events (Batched)
                    ↓
                Lambda Functions
                    ↓ ↓ ↓
                S3  NR  DW  (etc.)
```

When `AWS_SQS_URL` is set:
- Application sends trace events to SQS asynchronously
- Main application thread is never blocked
- Lambda functions or other services consume events from SQS
- Events forwarded to S3, New Relic, Datadog, or custom processors
- Failed sends are saved to spillover file on disk for recovery
- 4 daemon worker threads handle all SQS operations independently
- Automatic cleanup on application shutdown

## Best Practices

### 1. Configuration Management
- ✅ **Each application gets its own `.env` file** with unique Langfuse credentials
- ✅ Use `.gitignore` to exclude `.env` files from version control
- ✅ Call `configure()` at application startup before any tracing
- ❌ Never hardcode credentials in the SDK or application code

### 2. Trace Organization
```python
# Good: Descriptive trace names with context
TraceManager.start_trace(
    name="document_analysis_pipeline",
    user_id=user_id,
    session_id=session_id,
    metadata={"doc_type": "pdf", "version": "2.0"},
    tags=["production", "critical"]
)

# Bad: Generic names without context
TraceManager.start_trace(name="process")
```

### 3. Local Variables Capture
```python
# Use for debugging only - has performance impact
@track_function(capture_locals=True)  # Development
def debug_complex_logic(data):
    # All locals captured
    pass

# Production: Disable or be selective
@track_function(capture_locals=False)  # Production
@track_function(capture_locals=["final_result"])  # Selective
```

### 4. Always End Traces
```python
try:
    TraceManager.start_trace(name="workflow")
    result = process()
    return result
finally:
    TraceManager.end_trace()  # Always flush
```

### 5. Trace Naming Convention
- **Trace Name (in Langfuse)**: Uses `PROJECT_ID` for easy project identification
- **Operation Name**: The `name` parameter describes what operation is being traced
- **Environment**: Tracked automatically from `ENV` variable

```python
# Example:
# .env: PROJECT_ID=payment-service, ENV=production

TraceManager.start_trace(name="process_payment")
# In Langfuse UI:
#   - Trace Name: "payment-service"
#   - Environment: "production"
#   - Operation: "process_payment" (in metadata)
```

## 📦 SQS Message Schema

### Message Wrapper (What SDK Sends to SQS)

```json
{
  "compressed": true,
  "data": "H4sIAAAAAAAC/+1Y...",
  "trace_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d30",
  "type": "trace"
}
```

**Decompression Steps:**
1. Base64 decode the `data` field → binary gzip data
2. Gzip decompress → JSON string  
3. JSON parse → Complete trace data

### Complete Trace Data (Decompressed)

```json
{
  "trace_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d30",
  "trace_name": "rag_pipeline_example",
  "project_id": "new_test",
  "environment": "uat",
  "user_id": "user_123",
  "session_id": "session_456",
  
  "start_time": 1769446311.0,
  "end_time": 1769446318.021,
  "duration_ms": 7021,
  
  "trace_input": {"user_msg": "What is Android ????"},
  "trace_output": {"bot_response": "Android is a mobile operating system..."},
  
  "spans": [
    {
      "span_id": "64a2a265-017e-4af1-bf49-15c3dd51e2fd",
      "span_name": "retrieve_context",
      "span_type": "span",
      "parent_span_id": null,
      
      "start_time": 1769446311.0,
      "end_time": 1769446312.0,
      "duration_ms": 1000,
      
      "input_data": {
        "args": ["What is Android ????"],
        "kwargs": {},
        "locals": {}
      },
      
      "output_data": {"output": {"documents": ["Doc 1", "Doc 2"]}},
      
      "error": null,
      "model_id": null,
      
      "metadata": {
        "environment": "uat",
        "project_id": "new_test"
      },
      
      "tags": [],
      "usage": null,
      "prompt": null,
      "response": null,
      "status": "success",
      "status_message": null,
      "level": "DEFAULT"
    },
    {
      "span_id": "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
      "span_name": "call_llm",
      "span_type": "generation",
      "parent_span_id": null,
      
      "start_time": 1769446312.0,
      "end_time": 1769446316.0,
      "duration_ms": 4000,
      
      "input_data": {
        "args": [],
        "kwargs": {"prompt": "Context: Doc 1, Doc 2\n\nQuestion: What is Android ????"},
        "locals": {}
      },
      
      "output_data": {"output": {"message": {"content": "Android is a mobile operating system..."}}},
      
      "error": null,
      "model_id": "anthropic.claude-3-sonnet-20240229-v1:0",
      
      "metadata": {
        "environment": "uat",
        "project_id": "new_test"
      },
      
      "tags": [],
      
      "usage": {
        "input_tokens": 145,
        "output_tokens": 87,
        "total_tokens": 232
      },
      
      "prompt": "Context: Doc 1, Doc 2\n\nQuestion: What is Android ????",
      "response": "Android is a mobile operating system developed by Google...",
      
      "status": "success",
      "status_message": null,
      "level": "DEFAULT"
    }
  ],
  
  "metadata": {"version": "1.0.0"},
  "tags": ["example", "rag"],
  
  "total_spans": 2,
  "total_generations": 1,
  
  "sdk_name": "llmops-observability",
  "sdk_version": "2.0.0"
}
```

### Field Reference

**Trace Level:**

| Field | Auto-Injected | Description |
|-------|:-------------:|-------------|
| `trace_id` | ✅ | UUID generated on `start_trace()` |
| `trace_name` | ✅ | Operation name from `start_trace()` |
| `project_id` | ✅ | From `PROJECT_ID` env var |
| `environment` | ✅ | From `ENV` env var |
| `user_id` | ❌ | From `start_trace()` or `finalize_and_send()` |
| `session_id` | ❌ | From `start_trace()` or `finalize_and_send()` |
| `start_time` | ✅ | Unix timestamp (seconds) |
| `end_time` | ✅ | Unix timestamp (seconds) |
| `duration_ms` | ✅ | Calculated: `(end_time - start_time) * 1000` |
| `trace_input` | ❌ | From `finalize_and_send()` |
| `trace_output` | ❌ | From `finalize_and_send()` |
| `spans` | ✅ | Array of span objects |
| `total_spans` | ✅ | Count of all spans |
| `total_generations` | ✅ | Count of spans with `span_type == "generation"` |

**Span Level:**

| Field | Auto-Injected | Description |
|-------|:-------------:|-------------|
| `span_id` | ✅ | UUID for span |
| `span_name` | ✅ | Function name or custom name |
| `span_type` | ✅ | "span" or "generation" |
| `parent_span_id` | ✅ | Parent span ID (null for root) |
| `duration_ms` | ✅ | Execution time |
| `input_data` | ✅ | Function args, kwargs, locals |
| `output_data` | ✅ | Return value |
| `model_id` | ❌ | From `@track_llm_call(model=...)` |
| `usage` | ✅ | Token counts (generation spans only) |
| `prompt` | ✅ | Prompt text (generation spans only) |
| `response` | ✅ | Response text (generation spans only) |
| `metadata.environment` | ✅ | **Auto-injected from ENV** |
| `metadata.project_id` | ✅ | **Auto-injected from PROJECT_ID** |
| `status` | ✅ | "success" or "error" |

### Size Limits & Truncation

| Field | Limit | Behavior |
|-------|-------|----------|
| `trace_input` | 50 KB | Truncated with preview if exceeded |
| `trace_output` | 50 KB | Truncated with preview if exceeded |
| `span.input_data` | 20 KB | Truncated with preview if exceeded |
| `span.output_data` | 20 KB | Truncated with preview if exceeded |
| `span.prompt` | 10 KB | Truncated with preview if exceeded |
| `span.response` | 10 KB | Truncated with preview if exceeded |
| **Total Message** | 200 KB | Aggressive truncation applied |

### Lambda Decompression (Reference)

```python
import json
import base64
import gzip

def decompress_sqs_message(message_body: str) -> dict:
    """Decompress SDK trace data from SQS message."""
    sqs_message = json.loads(message_body)
    
    if not sqs_message.get("compressed"):
        return sqs_message
    
    # Decompress
    compressed_data = base64.b64decode(sqs_message['data'])
    decompressed = gzip.decompress(compressed_data)
    trace_data = json.loads(decompressed)
    
    return trace_data
```

---

## When to Use This SDK

✅ **Use llmops-observability when:**

**Development & Testing:**
- Developing and testing LLM applications locally
- Need quick debugging with local variable capture
- Want instant trace visibility in Langfuse (no delays)
- Simple, straightforward tracing without infrastructure setup

**Production Deployments:**
- Small to medium-scale with direct Langfuse integration
- Enterprise event-driven architectures with SQS + Lambda + S3
- Multi-destination observability (S3, New Relic, Datadog, custom systems)
- Centralized observability across multiple LLM applications
- Token cost tracking and analysis
- Compliance/audit: archive all LLM interactions with full traceability

**Common Use Cases:**
- RAG (Retrieval Augmented Generation) systems
- LLM-powered APIs and microservices
- Chat applications and conversational AI
- Document analysis and processing pipelines
- Real-time LLM inference monitoring
- Multi-step LLM workflows with nested tracking

**Key Advantages:**
- ✨ No external dependencies for basic tracing (Direct Langfuse mode)
- 🚀 Optional SQS integration for enterprise deployments
- 🔄 Automatic nested span tracking for complex workflows
- 💰 Built-in token cost calculation
- 🛡️ Production-ready with daemon workers and spillover recovery


## Troubleshooting

### Configuration Errors

**Error: "Langfuse not configured"**
```python
# Solution: Ensure env vars are set or call configure()
from dotenv import load_dotenv
load_dotenv()  # Load .env file

# Or configure explicitly
from llmops_observability import configure
configure(public_key="...", secret_key="...", base_url="...")
```

### Trace Not Appearing in Langfuse

1. Check that `TraceManager.end_trace()` is called
2. Verify credentials are correct
3. Check Langfuse URL is accessible
4. Look for error messages in console output

### SSL Certificate Issues

```python
# Disable SSL verification if using self-signed certs
configure(
    public_key="...",
    secret_key="...",
    base_url="...",
    verify_ssl=False  # ← Disable SSL verification
)
```

## Version History

**v8.0.0** (Current) - Production-Ready Enterprise Release
- ✨ **Dual-Mode Tracing**: Direct Langfuse integration + optional SQS event streaming
- 🎯 **SQS Event Streaming**: Production-grade AWS SQS sender with:
  - Automatic batching for efficiency
  - Spillover recovery to disk
  - 4 daemon worker threads
  - Clean shutdown support
- 💰 **Token Pricing**: AWS Bedrock cost calculator for 15+ model variants
- 🪆 **Nested Spans**: Automatic parent-child relationship tracking
- 🔍 **Locals Capture**: Function local variable capture for debugging
- 🌐 **ASGI Middleware**: FastAPI/Starlette auto-tracing
- 📊 **Smart Serialization**: 200KB automatic data size limits
- 🔄 **Sync & Async**: Full async/await support
- 🛡️ **Resilient**: Auto-restart failed workers, graceful shutdown

## License

Proprietary - Verisk Analytics

## Contributing

Internal SDK - For questions or contributions, contact the LLMOps team.

## Example: Complete Workflow

```python
from llmops_observability import TraceManager, track_function, track_llm_call, track_llm_agent, track_llm_tool
import boto3

# Initialize Bedrock client
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")

@track_function()
def retrieve_context(query):
    # Simulate RAG retrieval
    return {"documents": ["Context doc 1", "Context doc 2"]}

@track_llm_call()
def generate_answer(prompt, context):
    response = bedrock.converse(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        messages=[{
            "role": "user",
            "content": f"Context: {context}\n\nQuestion: {prompt}"
        }]
    )
    return response

# Start trace
TraceManager.start_trace(
    name="rag_pipeline",
    user_id="user_123",
    metadata={"pipeline": "v1"}
)

# Execute workflow
context = retrieve_context("What is Python?")
answer = generate_answer("What is Python?", context)

# End trace
TraceManager.end_trace()
```

## Thanks to
Verisk LLMOps Team ❤️
