Metadata-Version: 2.4
Name: x64acp-server
Version: 0.2.0a5
Summary: x64acp Agent Communication Protocol (ACP)
Project-URL: Repository, https://github.com/0x0064/x64acp
Author-email: 0x0064 <user.frndvrgs@gmail.com>
License-Expression: MIT
Keywords: acp,assistant,fastapi,sdk,socket.io,x64acp
Requires-Python: >=3.11
Requires-Dist: asyncpg>=0.31.0
Requires-Dist: pydantic>=2.12.5
Requires-Dist: python-socketio[asgi]<6,>=5.13.0
Provides-Extra: dev
Requires-Dist: aiohttp>=3.11.0; extra == 'dev'
Requires-Dist: build>=1.2.2; extra == 'dev'
Requires-Dist: fastapi>=0.117.0; extra == 'dev'
Requires-Dist: httpx>=0.28.1; extra == 'dev'
Requires-Dist: mypy>=1.19.1; extra == 'dev'
Requires-Dist: poethepoet>=0.42.1; extra == 'dev'
Requires-Dist: pytest-asyncio>=1.3.0; extra == 'dev'
Requires-Dist: pytest-cov>=6.0.0; extra == 'dev'
Requires-Dist: pytest>=9.0.2; extra == 'dev'
Requires-Dist: ruff>=0.15.7; extra == 'dev'
Requires-Dist: uvicorn>=0.32.0; extra == 'dev'
Description-Content-Type: text/markdown

# x64acp-server

Python server SDK for the **x64acp Assistant Communication Protocol** — a small, opinionated, business-agnostic communication layer for assistant-driven threads where multiple users and multiple assistants interact in shared rooms.

Pairs with [`@x64acp/client`](https://www.npmjs.com/package/@x64acp/client) (React). Node mirror (`@x64acp/server`) in progress.

```python
from fastapi import FastAPI
import asyncpg
from x64acp import AcpServer, HandshakeData, PostgresThreadStore, TextPart, UserIdentity

async def authenticate(handshake: HandshakeData) -> UserIdentity | None:
    token = handshake.headers.get("authorization", "").removeprefix("Bearer ")
    user = await lookup_user(token)  # your code
    if user is None:
        return None
    return UserIdentity(
        id=user.id,
        name=user.name,
        metadata={"tenant": {"organization": user.org_id}},
    )

pool = await asyncpg.create_pool("postgresql://localhost/myapp")
acp = AcpServer(store=PostgresThreadStore(pool=pool), authenticate=authenticate)

@acp.assistant("helper")
async def helper(ctx, send):
    history = await ctx.events()
    yield send.reasoning("thinking about the request")
    yield send.tool_call(name="search", arguments={"q": "weather"}, id="call_1")
    yield send.tool_result(tool_id="call_1", result={"temp": 72})
    yield send.message(content=[TextPart(text="It's 72°F.")])

app = FastAPI()
app.state.acp = acp
app.include_router(acp.router, prefix="/acp")
asgi = acp.mount_socketio(app)
# Run with: uvicorn yourmodule:asgi --host 0.0.0.0 --port 8000
```

## Installation

```bash
pip install x64acp-server            # or: uv add x64acp-server
```

Requires Python 3.11+ and a Postgres instance. Docker-compose fixture included for local dev (`docker compose -f docker-compose.test.yml up -d`).

## Primitives

The SDK ships exactly four primitives. Everything else — auth, tenancy, assistant routing, LLM calls, file storage, form rendering — is consumer territory.

| Primitive | What it is |
|---|---|
| **Identity** | `UserIdentity` / `AssistantIdentity` / `SystemIdentity`, each `{ id, name, metadata }`. `metadata` is opaque. |
| **Thread** | Ordered event log. Not owned; members join explicitly. 1:1 to a Socket.IO room. |
| **Event** | Discriminated union: `message`, `reasoning`, `tool.call`, `tool.result`, `thread.*`, `run.*` |
| **Run** | Assistant execution lifecycle (`pending → running → completed/failed/cancelled`) |

## `AcpServer` Constructor

```python
AcpServer(
    *,
    store,                              # ThreadStore (PostgresThreadStore provided)
    authenticate,                       # async (HandshakeData) -> Identity | None
    authorize=None,                     # async (Identity, thread_id, action) -> bool
    on_analytics=None,                  # async (list[AnalyticsEvent]) -> None
    namespace_keys=None,                # list[str] | None — see Hardening
    run_timeout_seconds=120,
    replay_cap=500,
    broadcaster=None,                   # wired automatically by mount_socketio
)
```

| Param | Required | Purpose |
|---|---|---|
| `store` | ✅ | Thread/event/run/member persistence |
| `authenticate` | ✅ | Called once per WS handshake and per REST request. Returns `Identity \| None`. **SDK never inspects credentials.** |
| `authorize` | — | Runtime policy on top of tenant match + membership. Return `False` to deny. |
| `on_analytics` | — | Sidecar telemetry flush hook. Wire to PostHog / Segment / warehouse. |
| `namespace_keys` | — | Opt-in Socket.IO namespace isolation for paranoid multi-tenant SaaS. |
| `run_timeout_seconds` | — | Per-run handler timeout. Default 120. |
| `replay_cap` | — | Max events replayed on `thread:join` with a `since` cursor. Default 500. |

### Mounting onto FastAPI

```python
app = FastAPI()
app.state.acp = acp                       # required — REST deps read from here
app.include_router(acp.router, prefix="/acp")
asgi = acp.mount_socketio(app)            # returns wrapped ASGI app
```

Run the `asgi` object with uvicorn (or any ASGI server). REST and Socket.IO share the same HTTP port.

## Authorization Actions

If `authorize` is provided, it is called with one of these action strings **after** tenant match + membership pass. Return `False` to deny; default (callback `None`) returns `True`.

| Action | When |
|---|---|
| `thread.read` | GET thread / events / members / run, `thread:join` |
| `thread.update` | PATCH thread (including tenant changes) |
| `thread.delete` | DELETE thread |
| `message.send` | POST messages, `message:send` |
| `assistant.invoke` | POST invocations, `assistant:invoke` |
| `member.add` | POST members |
| `member.remove` | DELETE members/:id |
| `run.cancel` | DELETE runs/:id, `run:cancel` |

## Identity & Tenant Model

Every event's author is one of three discrete identity types. `metadata` is opaque to the SDK — attach whatever you need.

| Model | Role | Used for |
|---|---|---|
| `UserIdentity` | `user` | Humans |
| `AssistantIdentity` | `assistant` | Assistant runs authored |
| `SystemIdentity` | `system` | Webhooks, automation, SDK-triggered events |

**`TenantScope = dict[str, str]`** — both identities and threads carry one. Visibility rule is **subset match**: a thread is visible to an identity iff every key the thread declares is present and equal on the identity.

| Thread tenant | Identity tenant | Visible? |
|---|---|---|
| `{org: A, ws: X}` | `{org: A, ws: X}` | ✅ |
| `{org: A, ws: X}` | `{org: A, ws: Y}` | ❌ |
| `{org: A}` | `{org: A, ws: X}` | ✅ (identity has extras) |
| `{}` (public) | any | ✅ |
| `{org: A}` | `{}` | ❌ |

Enforced at the SQL level via JSONB `@>` against the `threads_tenant_gin` index. Consumers read tenant from `identity.metadata["tenant"]`.

## Handler API

Handlers are async generators. Two arguments: `ctx` (read) and `send` (write). Everything you yield is persisted and broadcast.

```python
@acp.assistant("helper")
async def handler(ctx, send):
    history = await ctx.events()                    # full thread history
    members = await ctx.members()                    # current membership
    me = ctx.assistant                               # AssistantIdentity
    thread = ctx.thread                              # Thread (metadata, tenant)
    run = ctx.run                                    # current Run

    ctx.analytics.track("llm.call", {"model": "claude"})

    yield send.reasoning("thinking...")
    yield send.tool_call(name="search", arguments={"q": "weather"})
    yield send.tool_result(tool_id="call_1", result={"temp": 72})
    yield send.message(content=[TextPart(text="done")])

    await ctx.invoke("summarizer")                   # chain another assistant (new Run)
```

**Handlers react to thread state, not a single message.** They read `ctx.events()` for the full conversation history. This matches every modern agentic SDK (OpenAI Assistants, Anthropic, Vercel AI SDK, LangGraph).

### `ctx` (HandlerContext)

| Attribute | Purpose |
|---|---|
| `ctx.thread` | Current `Thread` (metadata, tenant) |
| `ctx.run` | Current `Run` this handler is executing |
| `ctx.assistant` | `AssistantIdentity` — me |
| `await ctx.events(limit=None)` | Full ordered thread history |
| `await ctx.members()` | Current `list[ThreadMember]` |
| `ctx.analytics.track(event, payload)` | Sidecar telemetry (not in thread log) |
| `await ctx.invoke(assistant_id)` | Chain another assistant (spawns new Run) |

### `send` (HandlerSend)

Pure factories — construct an `Event` with `author` auto-filled from `ctx.assistant`. Yielding is the **only** way to emit events; there is no side channel.

| Method | Yields |
|---|---|
| `send.message(content, metadata?)` | `MessageEvent` with a list of `ContentPart` |
| `send.reasoning(text, metadata?)` | `ReasoningEvent` (plain-text thinking trace) |
| `send.tool_call(name, arguments, id?, metadata?)` | `ToolCallEvent` |
| `send.tool_result(tool_id, result?, error?, metadata?)` | `ToolResultEvent` |

## Wire Protocol

### REST endpoints (mounted under your chosen prefix, e.g. `/acp`)

| Method | Path | Purpose |
|---|---|---|
| `POST` | `/threads` | Create a thread (creator auto-added as member) |
| `GET` | `/threads` | List visible threads (tenant subset-match, cursor paginated) |
| `GET` | `/threads/:id` | Read a thread |
| `PATCH` | `/threads/:id` | Update tenant or metadata |
| `DELETE` | `/threads/:id` | Delete (cascades events + members) |
| `GET` | `/threads/:id/events` | Paginated event history |
| `GET` | `/threads/:id/members` | List members |
| `POST` | `/threads/:id/members` | Add a member (idempotent) |
| `DELETE` | `/threads/:id/members/:identity_id` | Remove a member |
| `POST` | `/threads/:id/messages` | Append a `message` event |
| `POST` | `/threads/:id/invocations` | Start runs for `{ assistant_ids: [...], idempotency_key? }` |
| `GET` | `/runs/:id` | Read a run's state |
| `DELETE` | `/runs/:id` | Cancel a run |

### Socket.IO events

| Direction | Event | Payload / Purpose |
|---|---|---|
| **C2S** | `thread:join` | `{ thread_id, since?: { created_at, id } }` — subscribe, optional resume cursor |
| **C2S** | `thread:leave` | `{ thread_id }` |
| **C2S** | `message:send` | `{ thread_id, draft: EventDraft }` — mirrors REST |
| **C2S** | `assistant:invoke` | `{ thread_id, assistant_ids, idempotency_key? }` |
| **C2S** | `run:cancel` | `{ run_id }` |
| **S2C** | `event` | A new event on a joined thread (any type) |
| **S2C** | `thread:updated` | Thread metadata or tenant changed |
| **S2C** | `members:updated` | `{ thread_id, members }` |
| **S2C** | `run:updated` | Run status changed |

Action operations (`message:send`, `assistant:invoke`, `run:cancel`) exist in **both** REST and WS as thin shells over the same internal handler — connected clients save a round-trip over WS, server-to-server / webhooks use REST.

### Event types (discriminated union on `type`)

| Type | Payload |
|---|---|
| `message` | `content: list[ContentPart]` (text / image / audio / document / form) |
| `reasoning` | `content: str` — plain-text reasoning trace |
| `tool.call` | `tool: { id, name, arguments }` |
| `tool.result` | `tool: { id, result?, error? }` |
| `thread.created` | `thread: { id, tenant }` |
| `thread.member_added` / `thread.member_removed` | `member: Identity` |
| `thread.tenant_changed` | `from: TenantScope, to: TenantScope` |
| `run.started` / `run.completed` / `run.cancelled` | — |
| `run.failed` | `error: { code, message }` |

Every event carries common fields `{ id, thread_id, run_id?, author, created_at, metadata, client_id? }`.

### ContentPart types

| Type | Fields |
|---|---|
| `TextPart` | `text: str` |
| `ImagePart` | `url, mime, name?, size?` |
| `AudioPart` | `url, mime, name?, size?, duration_ms?` |
| `DocumentPart` | `url, mime, name?, size?` |
| `FormPart` | `form_id, schema (JSON Schema), status, values?, answers_event_id?, title?, description?` |

**Binary parts are URL-only.** The SDK never touches bytes — consumers handle upload via their own endpoints (S3 presigned, custom upload route, etc.).

## Hardening: `namespace_keys` defense-in-depth (opt-in)

For paranoid multi-tenant SaaS (healthcare, finance, government), layer hard Socket.IO namespace isolation on top of the existing room-join authorization gate:

```python
acp = AcpServer(
    store=PostgresThreadStore(pool=pool),
    authenticate=my_auth,
    namespace_keys=["organization", "workspace"],   # opt-in
)
```

Each unique combination of those tenant field values becomes its own Socket.IO namespace. An identity with `{"organization": "A", "workspace": "X"}` connects to `/A/X`; one with `{"organization": "B", "workspace": "X"}` connects to `/B/X`. The two namespaces **cannot cross-talk at the transport layer** — rooms in one are physically separate from rooms in the other.

- **Identities** missing any required key are rejected at WS connect AND REST auth (401/403).
- **Threads** missing any required key are rejected at `POST /threads` (400). Pre-existing stranded threads return `not_found` on `thread:join`.
- **Value constraint:** `[A-Za-z0-9._-]{1,32}`. Map richer identifiers to safe form inside `authenticate` before storing them in `identity.metadata["tenant"]`.
- **Client side:** your React client must connect to the concrete namespace path (e.g. `url="wss://api.example.com/A/X"`). Compute it from the user's tenant before mounting the provider.

**Additive, not a replacement.** The existing `matches(thread.tenant, identity.tenant)` subset-match and explicit membership checks still run on every operation. Leave `namespace_keys=None` for single-tenant or if the existing gates are sufficient.

The `derive_namespace_path(tenant, namespace_keys=[...])` helper is exported so consumers can compute the client URL from the same logic the server uses.

## Storage

`ThreadStore` is a `Protocol`. `PostgresThreadStore(pool)` is the reference implementation (asyncpg). The schema lives at `src/x64acp/store/postgres/schema.sql` and is auto-applied via `CREATE TABLE IF NOT EXISTS` on first connect — no migrations to manage manually.

**Hard invariants enforced at the database level** (not application code):

| Invariant | Mechanism |
|---|---|
| One active run per `(thread, assistant)` at a time | `runs_active_per_assistant` partial unique index |
| Duplicate `idempotency_key` per thread rejected | `runs_idempotency` partial unique index |
| Tenant subset-match in O(log n) | JSONB GIN on `threads.tenant` |
| Resume cursor is index-only | `(thread_id, created_at, id)` btree |

Implement the `ThreadStore` protocol yourself to swap in Redis, SQLite, in-memory, or any other backend.

## Errors

REST raises FastAPI `HTTPException`; Socket.IO handlers return `{ "error": { "code", "message" } }`. Common codes:

| Code | Where | Meaning |
|---|---|---|
| `unauthenticated` | WS connect, REST 401 | `authenticate` returned `None` |
| `forbidden` | WS + REST 403 | Not a member, or `authorize` returned `False` |
| `not_found` | WS + REST 404 | Thread invisible under tenant rule, or doesn't exist |
| `invalid_request` | WS + REST 400 | Missing or wrong-shaped payload |
| `namespace_invalid` | WS connect | Namespace path couldn't be parsed against `namespace_keys` |
| `namespace_mismatch` | WS connect | Identity tenant doesn't match the connected namespace |

## Public API Surface

Everything you typically need is re-exported from the top-level `x64acp` package:

```python
from x64acp import (
    # Server
    AcpServer, HandshakeData, AuthenticateCallback, AuthorizeCallback,

    # Storage
    ThreadStore, PostgresThreadStore, Page, ThreadCursor, EventCursor,

    # Identity
    Identity, UserIdentity, AssistantIdentity, SystemIdentity, parse_identity,

    # Tenant
    TenantScope, matches,

    # Thread / Run
    Thread, ThreadMember, ThreadPatch, Run, RunStatus, RunError,

    # Events + content parts
    Event, EventDraft, parse_event,
    MessageEvent, ReasoningEvent, ToolCallEvent, ToolResultEvent,
    ThreadCreatedEvent, ThreadMemberAddedEvent, ThreadMemberRemovedEvent,
    ThreadTenantChangedEvent,
    RunStartedEvent, RunCompletedEvent, RunFailedEvent, RunCancelledEvent,
    ContentPart, TextPart, ImagePart, AudioPart, DocumentPart, FormPart,
    parse_content_part,

    # Handler API
    HandlerContext, HandlerSend, HandlerCallable,

    # Analytics
    AnalyticsEvent, AssistantAnalytics, OnAnalyticsCallback,

    # Broadcast (for tests / custom transports)
    Broadcaster, RecordingBroadcaster, SocketIOBroadcaster,

    # Namespace hardening
    NamespaceViolation, derive_namespace_path, parse_namespace_path,
    validate_namespace_value,
)
```

## Development

```bash
uv sync --all-extras                    # install deps
uv run poe dev                          # lint + typecheck + test
uv run poe format                       # auto-format (ruff)
uv run poe build                        # wheel + sdist
```

### Tests require Postgres

```bash
docker compose -f docker-compose.test.yml up -d     # bundled fixture
uv run poe test

# or point at your own:
createdb x64acp_test
DATABASE_URL=postgresql://localhost/x64acp_test uv run poe test
```

## Status & Roadmap

Current: **`0.2.0a5`** — slices 1-3 (storage, REST, Socket.IO) plus `namespace_keys` defense-in-depth.

- [x] Slice 1 — Postgres store + REST router
- [x] Slice 2 — Handler API, `RunExecutor`, invoke endpoint
- [x] Slice 3 — Socket.IO transport, resume cursor, broadcast funnel
- [x] `namespace_keys` hardening layer
- [ ] Slice 4 — Node mirror (`@x64acp/server`)
- [x] Slice 5 — React client (`@x64acp/client@0.2.0-alpha.3`)

## Intentionally out of scope

These are **not** bugs — they're explicit design decisions to keep the SDK small:

- Streaming within a single message (token-by-token)
- MCP client / A2A / knowledge sources
- Settings broadcaster
- Built-in blob storage (content parts are URL-only)
- Built-in form rendering UI (form parts carry JSON Schema)
- LLM provider abstractions
- Presence / typing indicators / read receipts

If you need these, bring them yourself.

## License

MIT
