Skip to content

Python API

This reference is generated from the source docstrings with mkdocstrings. It starts with the supported public surface, then documents the schema models and the advanced sub-modules.

Public surface

The supported, stable API exported from the top-level skeino package:

create_app

create_app(
    *,
    graphs: Mapping[str, GraphInput],
    settings: SkeinoSettings,
) -> FastAPI

Build a FastAPI application that exposes the skeino HTTP surface.

graphs maps assistant id → either a precompiled :class:CompiledStateGraph or a builder (checkpointer) -> CompiledStateGraph (sync or async). When a builder is supplied, skeino resolves a checkpointer via :func:skeino.persistence.open_checkpointer and passes it in.

Source code in src/skeino/app.py
def create_app(
    *,
    graphs: Mapping[str, GraphInput],
    settings: SkeinoSettings,
) -> FastAPI:
    """Build a FastAPI application that exposes the skeino HTTP surface.

    ``graphs`` maps assistant id → either a precompiled :class:`CompiledStateGraph`
    or a builder ``(checkpointer) -> CompiledStateGraph`` (sync or async). When
    a builder is supplied, skeino resolves a checkpointer via
    :func:`skeino.persistence.open_checkpointer` and passes it in.
    """
    if not graphs:
        raise ValueError("create_app requires at least one graph entry.")

    default_id = _resolve_default_id(graphs, settings)

    @asynccontextmanager
    async def lifespan(app_instance: FastAPI) -> AsyncIterator[None]:
        _check_metadata_durability(settings)
        async with AsyncExitStack() as stack:
            # The scheme alone selects the backend (default "memory"); the URI is
            # a connection detail. A URI without a matching scheme is ignored
            # (memory). Durable schemes that need a URI (postgres/mongodb/redis)
            # raise here if it's missing; sqlite defaults to ":memory:".
            checkpointer = await stack.enter_async_context(
                open_checkpointer(
                    settings.checkpointer_uri,
                    scheme=_scheme(settings),
                    options=dict(settings.checkpointer_options),
                )
            )

            compiled: dict[str, CompiledStateGraph] = {}
            for name, entry in graphs.items():
                compiled[name] = await _materialise_graph(entry, checkpointer)
            registry = GraphRegistry(compiled, default=default_id)
            default_graph = registry.default_graph

            metadata_store: MetadataStoreProtocol = _resolve_metadata_store(settings)
            if isinstance(metadata_store, InMemoryMetadataStore):
                logger.info(
                    "Using in-memory metadata store — thread/run metadata will "
                    "not persist across restarts. Use a durable scheme "
                    "(postgres/sqlite/mongodb) with checkpointer_uri to persist."
                )
            # Register cleanup BEFORE setup() so a setup failure that has already
            # opened resources (sqlite connection, motor client) is still closed
            # by the exit stack. aclose() is a no-op when nothing was opened.
            aclose = getattr(metadata_store, "aclose", None)
            if aclose is not None:
                stack.push_async_callback(aclose)
            await metadata_store.setup()

            streamer = Streamer(
                default_graph,
                agent_nodes=settings.agent_nodes,
                status_field=settings.status_field,
            )
            assistant_ops = AssistantOps(
                graph=default_graph,
                default_assistant_id=default_id,
                assistant_name=settings.assistant_name,
                assistant_description=settings.assistant_description,
                supported_assistant_ids=settings.supported_assistant_ids,
                assistant_namespace=settings.assistant_namespace,
                now=datetime.now(UTC),
            )
            thread_ops = ThreadOps(
                graph=default_graph,
                metadata_store=metadata_store,
                logger=logger,
            )
            run_ops = RunOps(
                graph=default_graph,
                metadata_store=metadata_store,
                streamer=streamer,
                thread_ops=thread_ops,
                assistant_ops=assistant_ops,
                lock_manager=ThreadLockManager(),
                logger=logger,
            )

            app_instance.state.skeino = SkeinoState(
                thread_ops=thread_ops,
                run_ops=run_ops,
                assistant_ops=assistant_ops,
                settings=settings,
            )
            app_instance.state.registry = registry
            logger.info("skeino runtime initialised (graphs=%s)", list(compiled))
            try:
                yield
            finally:
                logger.info("skeino runtime shutting down")

    fastapi_app = FastAPI(
        title=settings.server_title,
        description=settings.server_description,
        version=settings.server_version,
        lifespan=lifespan,
    )
    fastapi_app.add_middleware(
        CORSMiddleware,
        allow_origins=list(settings.cors_origins),
        allow_methods=list(settings.cors_methods),
        allow_headers=list(settings.cors_headers),
    )
    fastapi_app.include_router(health_router)
    fastapi_app.include_router(assistants_router)
    fastapi_app.include_router(threads_router)
    fastapi_app.include_router(runs_router)
    return fastapi_app

from_langgraph_json

from_langgraph_json(
    manifest_path: str | Path,
    *,
    settings: SkeinoSettings | None = None,
) -> FastAPI

Build a skeino-backed FastAPI app from a langgraph.json manifest.

Parameters

manifest_path: Path to the langgraph.json file. settings: Optional :class:SkeinoSettings whose explicit fields override anything derived from the manifest. Useful for setting graph-specific options (agent_nodes, status_field) that langgraph.json does not describe.

Source code in src/skeino/langgraph_json.py
def from_langgraph_json(
    manifest_path: str | Path,
    *,
    settings: SkeinoSettings | None = None,
) -> FastAPI:
    """Build a skeino-backed FastAPI app from a ``langgraph.json`` manifest.

    Parameters
    ----------
    manifest_path:
        Path to the ``langgraph.json`` file.
    settings:
        Optional :class:`SkeinoSettings` whose explicit fields override anything
        derived from the manifest. Useful for setting graph-specific options
        (``agent_nodes``, ``status_field``) that ``langgraph.json`` does not
        describe.

    """
    path = Path(manifest_path).resolve()
    if not path.exists():
        raise FileNotFoundError(f"Manifest not found: {path}")

    with path.open("r", encoding="utf-8") as fh:
        manifest = json.load(fh)

    manifest_dir = path.parent
    _maybe_load_dotenv(manifest_dir, manifest.get("env"))

    graphs_section = manifest.get("graphs") or {}
    if not isinstance(graphs_section, dict) or not graphs_section:
        raise ValueError(f"Manifest {path} declares no graphs.")
    resolved: dict[str, GraphInput] = {
        name: _resolve_graph_target(str(spec), manifest_dir)
        for name, spec in graphs_section.items()
    }

    for label in ("auth", "ui"):
        if label in manifest:
            logger.warning("manifest section %r is not implemented in skeino v1", label)

    effective_settings = _settings_from_manifest(manifest, overrides=settings)
    return create_app(graphs=resolved, settings=effective_settings)

SkeinoSettings

Bases: BaseModel

Configuration record passed to :func:skeino.create_app.

Settings live in your code (typed, validated, version-controlled). For deployments that read from environment variables, use pydantic-settings in your project and pass the resulting object into SkeinoSettings.

GraphRegistry

GraphRegistry(
    graphs: Mapping[str, CompiledStateGraph],
    *,
    default: str | None = None,
)

Immutable mapping of assistant id to compiled graph.

Validate the input mapping and freeze the default selection.

Source code in src/skeino/registry.py
def __init__(
    self,
    graphs: Mapping[str, CompiledStateGraph],
    *,
    default: str | None = None,
) -> None:
    """Validate the input mapping and freeze the default selection."""
    if not graphs:
        raise ValueError("GraphRegistry requires at least one graph.")
    if default is not None and default not in graphs:
        raise ValueError(
            f"default={default!r} is not present in graphs={list(graphs)!r}"
        )
    self._graphs: dict[str, CompiledStateGraph] = dict(graphs)
    self._default: str = default or next(iter(self._graphs))

default_id property

default_id: str

Identifier of the graph used when none is specified.

default_graph property

default_graph: CompiledStateGraph

Compiled graph instance for the default assistant.

get

get(assistant_id: str) -> CompiledStateGraph | None

Return the graph for assistant_id (or None when absent).

Source code in src/skeino/registry.py
def get(self, assistant_id: str) -> CompiledStateGraph | None:
    """Return the graph for ``assistant_id`` (or None when absent)."""
    return self._graphs.get(assistant_id)

Schemas

The Pydantic request/response models behind the HTTP API.

Common types

common

Shared type aliases and the checkpoint selector model.

CheckpointConfigModel

Bases: BaseModel

Checkpoint selector for thread state or run resumption.

Threads

threads

Schemas for thread creation, search, state, and history endpoints.

ThreadTtlConfig

Bases: BaseModel

Time-to-live settings for a thread.

ThreadTtlInfo

Bases: BaseModel

TTL information returned for a thread.

ThreadSuperstepUpdate

Bases: BaseModel

Initial state update applied during thread creation.

ThreadSuperstep

Bases: BaseModel

A superstep container for thread bootstrap updates.

ThreadCreateRequest

Bases: BaseModel

Payload for creating a thread.

ThreadPatchRequest

Bases: BaseModel

Mutable fields updatable on an existing thread.

metadata is optional so an empty body is a no-op; send {"metadata": {}} to intentionally clear a thread's metadata.

ThreadStateUpdateRequest

Bases: BaseModel

Manually write/patch a thread's state (human-in-the-loop edit).

ThreadModel

Bases: BaseModel

LangGraph-compatible thread representation.

ThreadSearchRequest

Bases: BaseModel

Payload for listing or searching threads.

InterruptModel

Bases: BaseModel

Serialized interrupt entry.

ThreadTaskModel

Bases: BaseModel

Serialized pending task entry.

ThreadStateModel

Bases: BaseModel

Latest checkpointed state for a thread.

ThreadStateSearchRequest

Bases: BaseModel

Payload for retrieving thread history.

Runs

runs

Schemas for run creation, command payloads, and run responses.

CommandModel

Bases: BaseModel

Serializable LangGraph command payload.

RunCreateRequest

Bases: BaseModel

Payload for creating a run on an existing thread.

RunModel

Bases: BaseModel

LangGraph-compatible run metadata.

Assistants

assistants

Schemas for assistants and graph schema introspection.

AssistantSearchRequest

Bases: BaseModel

Payload for listing assistants.

AssistantModel

Bases: BaseModel

LangGraph-compatible assistant representation.

GraphSchemaModel

Bases: BaseModel

Schema description returned for an assistant.

Server

server

Server-info and generic response schemas.

HealthResponse

Bases: BaseModel

Health check response.

InitialMessageResponse

Bases: BaseModel

Welcome message response.

ErrorResponse

Bases: BaseModel

Error payload returned from the API.

ServerInfoModel

Bases: BaseModel

Minimal system information exposed by the server.

Persistence

Advanced

These are importable for advanced use (e.g. registering a custom checkpointer) but are not part of the stability contract. See Persistence & checkpointers and Write a custom checkpointer.

persistence

Persistence layer: metadata store, checkpointer registry, enrichment.

MetadataStoreProtocol

Bases: Protocol

Async CRUD surface for thread and run metadata.

setup async

setup() -> None

Initialise the backing storage (create tables, etc.).

Source code in src/skeino/persistence/base.py
async def setup(self) -> None:
    """Initialise the backing storage (create tables, etc.)."""
    ...

fetch_thread_row async

fetch_thread_row(thread_id: str) -> ThreadRow | None

Return the stored metadata row for a thread, or None.

Source code in src/skeino/persistence/base.py
async def fetch_thread_row(self, thread_id: str) -> ThreadRow | None:
    """Return the stored metadata row for a thread, or ``None``."""
    ...

create_thread async

create_thread(
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow

Insert a thread row and return the stored record.

Source code in src/skeino/persistence/base.py
async def create_thread(
    self,
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow:
    """Insert a thread row and return the stored record."""
    ...

update_thread async

update_thread(
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None

Update mutable metadata for a thread.

Source code in src/skeino/persistence/base.py
async def update_thread(
    self,
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None:
    """Update mutable metadata for a thread."""
    ...

search_thread_rows async

search_thread_rows(
    request: ThreadSearchRequest,
) -> list[ThreadRow]

Return stored thread rows before graph-state enrichment.

Source code in src/skeino/persistence/base.py
async def search_thread_rows(self, request: ThreadSearchRequest) -> list[ThreadRow]:
    """Return stored thread rows before graph-state enrichment."""
    ...

delete_thread async

delete_thread(thread_id: str) -> None

Delete a thread row and its run rows.

Source code in src/skeino/persistence/base.py
async def delete_thread(self, thread_id: str) -> None:
    """Delete a thread row and its run rows."""
    ...

create_run async

create_run(
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow

Insert a run row and return it.

Source code in src/skeino/persistence/base.py
async def create_run(
    self,
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow:
    """Insert a run row and return it."""
    ...

update_run_status async

update_run_status(
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None

Update the persisted run status.

Source code in src/skeino/persistence/base.py
async def update_run_status(
    self,
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None:
    """Update the persisted run status."""
    ...

fetch_run_row async

fetch_run_row(thread_id: str, run_id: str) -> RunRow | None

Return a single run row for a thread, or None.

Source code in src/skeino/persistence/base.py
async def fetch_run_row(self, thread_id: str, run_id: str) -> RunRow | None:
    """Return a single run row for a thread, or ``None``."""
    ...

list_run_rows async

list_run_rows(
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]

List run rows for a thread.

Source code in src/skeino/persistence/base.py
async def list_run_rows(
    self,
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]:
    """List run rows for a thread."""
    ...

RunRow

Bases: TypedDict

Uniform run row shape every metadata store returns.

ThreadRow

Bases: TypedDict

Uniform thread row shape every metadata store returns.

CheckpointerSpec dataclass

CheckpointerSpec(
    scheme: str,
    uri: str | None = None,
    options: dict[str, Any] = dict(),
)

Declarative request for a checkpointer instance.

InMemoryMetadataStore

InMemoryMetadataStore()

Dict-backed thread + run metadata store.

Initialise empty thread and run maps.

Source code in src/skeino/persistence/in_memory_store.py
def __init__(self) -> None:
    """Initialise empty thread and run maps."""
    self._threads: dict[str, ThreadRow] = {}
    self._runs: dict[str, RunRow] = {}

setup async

setup() -> None

No-op; the dicts are already ready.

Source code in src/skeino/persistence/in_memory_store.py
async def setup(self) -> None:
    """No-op; the dicts are already ready."""
    return None

fetch_thread_row async

fetch_thread_row(thread_id: str) -> ThreadRow | None

Return the stored row for thread_id (or None).

Source code in src/skeino/persistence/in_memory_store.py
async def fetch_thread_row(self, thread_id: str) -> ThreadRow | None:
    """Return the stored row for ``thread_id`` (or None)."""
    return self._threads.get(thread_id)

create_thread async

create_thread(
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow

Insert a thread row and return it.

Source code in src/skeino/persistence/in_memory_store.py
async def create_thread(
    self,
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow:
    """Insert a thread row and return it."""
    if thread_id in self._threads:
        if if_exists == "do_nothing":
            return self._threads[thread_id]
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail=f"Thread {thread_id} already exists.",
        )
    now = _utcnow()
    ttl_payload: dict[str, JsonValue] | None = None
    if ttl is not None and ttl.ttl is not None:
        ttl_payload = {
            "strategy": ttl.strategy,
            "ttl_minutes": ttl.ttl,
            "expires_at": (now + timedelta(minutes=ttl.ttl)).isoformat(),
        }
    row: ThreadRow = {
        "thread_id": UUID(thread_id),
        "created_at": now,
        "updated_at": now,
        "state_updated_at": None,
        "metadata": dict(metadata),
        "config": dict(config),
        "status": "idle",
        "ttl": ttl_payload,
    }
    self._threads[thread_id] = row
    return row

update_thread async

update_thread(
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None

Update mutable thread fields.

Source code in src/skeino/persistence/in_memory_store.py
async def update_thread(
    self,
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None:
    """Update mutable thread fields."""
    row = self._threads.get(thread_id)
    if row is None:
        return
    row["updated_at"] = _utcnow()
    if status_value is not None:
        row["status"] = status_value
    if config is not None:
        row["config"] = dict(config)
    if metadata is not None:
        row["metadata"] = dict(metadata)
    if mark_state_updated:
        row["state_updated_at"] = _utcnow()

search_thread_rows async

search_thread_rows(
    request: ThreadSearchRequest,
) -> list[ThreadRow]

List thread rows respecting basic filter / pagination flags.

Source code in src/skeino/persistence/in_memory_store.py
async def search_thread_rows(self, request: ThreadSearchRequest) -> list[ThreadRow]:
    """List thread rows respecting basic filter / pagination flags."""
    rows = list(self._threads.values())
    if request.ids:
        allowed = {str(item) for item in request.ids}
        rows = [r for r in rows if str(r["thread_id"]) in allowed]
    if request.status is not None:
        rows = [r for r in rows if r["status"] == request.status]
    rows.sort(key=lambda r: r["updated_at"], reverse=(request.sort_order != "asc"))
    return rows[request.offset : request.offset + request.limit]

delete_thread async

delete_thread(thread_id: str) -> None

Delete a thread and its run rows.

Source code in src/skeino/persistence/in_memory_store.py
async def delete_thread(self, thread_id: str) -> None:
    """Delete a thread and its run rows."""
    self._threads.pop(thread_id, None)
    self._runs = {
        run_id: row
        for run_id, row in self._runs.items()
        if str(row["thread_id"]) != thread_id
    }

create_run async

create_run(
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow

Insert a run row and return it.

Source code in src/skeino/persistence/in_memory_store.py
async def create_run(
    self,
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow:
    """Insert a run row and return it."""
    now = _utcnow()
    row: RunRow = {
        "run_id": UUID(run_id),
        "thread_id": UUID(thread_id),
        "assistant_id": assistant_id,
        "created_at": now,
        "updated_at": now,
        "status": "pending",
        "metadata": dict(metadata),
        "kwargs": dict(kwargs),
        "multitask_strategy": multitask_strategy,
        "error": None,
    }
    self._runs[run_id] = row
    return row

update_run_status async

update_run_status(
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None

Update a run's status field.

Source code in src/skeino/persistence/in_memory_store.py
async def update_run_status(
    self,
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None:
    """Update a run's status field."""
    row = self._runs.get(run_id)
    if row is None:
        return
    row["status"] = status_value
    row["updated_at"] = _utcnow()
    # Always assign (clearing with None) — same semantics as the SQL/Mongo
    # stores, which unconditionally write the error column on update.
    row["error"] = error

fetch_run_row async

fetch_run_row(thread_id: str, run_id: str) -> RunRow | None

Return a run row scoped to thread_id.

Source code in src/skeino/persistence/in_memory_store.py
async def fetch_run_row(self, thread_id: str, run_id: str) -> RunRow | None:
    """Return a run row scoped to ``thread_id``."""
    row = self._runs.get(run_id)
    if row is None or str(row["thread_id"]) != thread_id:
        return None
    return row

list_run_rows async

list_run_rows(
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]

List runs for a thread sorted newest-first.

Source code in src/skeino/persistence/in_memory_store.py
async def list_run_rows(
    self,
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]:
    """List runs for a thread sorted newest-first."""
    rows = [r for r in self._runs.values() if str(r["thread_id"]) == thread_id]
    if status_value is not None:
        rows = [r for r in rows if r["status"] == status_value]
    rows.sort(key=lambda r: r["created_at"], reverse=True)
    return rows[offset : offset + limit]

MetadataStore

MetadataStore(postgres_uri: str)

Persist thread and run metadata alongside LangGraph checkpoints.

Store the PostgreSQL connection string used for metadata operations.

Source code in src/skeino/persistence/metadata_store.py
def __init__(self, postgres_uri: str) -> None:
    """Store the PostgreSQL connection string used for metadata operations."""
    self._postgres_uri = postgres_uri

setup async

setup() -> None

Create the metadata tables if they do not already exist.

Source code in src/skeino/persistence/metadata_store.py
async def setup(self) -> None:
    """Create the metadata tables if they do not already exist."""
    psycopg, _ = _pg()
    async with await psycopg.AsyncConnection.connect(self._postgres_uri) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(_CREATE_THREADS_TABLE_SQL)
            await cursor.execute(_CREATE_RUNS_TABLE_SQL)
            await cursor.execute(_CREATE_RUNS_THREAD_INDEX_SQL)
        await conn.commit()

fetch_thread_row async

fetch_thread_row(thread_id: str) -> ThreadRow | None

Return the stored metadata row for a thread.

Source code in src/skeino/persistence/metadata_store.py
async def fetch_thread_row(self, thread_id: str) -> ThreadRow | None:
    """Return the stored metadata row for a thread."""
    psycopg, dict_row = _pg()
    async with await psycopg.AsyncConnection.connect(
        self._postgres_uri, row_factory=dict_row
    ) as conn:
        async with conn.cursor() as cursor:
            # The SELECT/RETURNING column lists in this module exactly
            # mirror ThreadRow/RunRow — that is what makes the dict_row →
            # TypedDict annotations below sound.
            await cursor.execute(
                """
                SELECT thread_id, created_at, updated_at, state_updated_at,
                       metadata, config, status, ttl
                FROM app_threads
                WHERE thread_id = %s
                """,
                (thread_id,),
            )
            row: ThreadRow | None = await cursor.fetchone()
            return row

create_thread async

create_thread(
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow

Insert a thread row and return the stored record.

Source code in src/skeino/persistence/metadata_store.py
async def create_thread(
    self,
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow:
    """Insert a thread row and return the stored record."""
    psycopg, dict_row = _pg()
    ttl_payload = self._build_ttl_payload(ttl)
    async with await psycopg.AsyncConnection.connect(
        self._postgres_uri, row_factory=dict_row
    ) as conn:
        async with conn.cursor() as cursor:
            try:
                await cursor.execute(
                    """
                    INSERT INTO app_threads (
                        thread_id, metadata, config, status, ttl
                    ) VALUES (%s, %s, %s, %s, %s)
                    RETURNING thread_id, created_at, updated_at, state_updated_at,
                              metadata, config, status, ttl
                    """,
                    (
                        thread_id,
                        _to_jsonb(metadata),
                        _to_jsonb(config),
                        THREAD_STATUS_IDLE,
                        _to_jsonb(ttl_payload),
                    ),
                )
            except psycopg.errors.UniqueViolation as exc:
                await conn.rollback()
                if if_exists == "do_nothing":
                    existing_row = await self.fetch_thread_row(thread_id)
                    if existing_row is None:
                        raise HTTPException(
                            status_code=status.HTTP_409_CONFLICT,
                            detail=f"Thread {thread_id} insert conflicted but "
                            "the row could not be re-read (concurrent delete?).",
                        ) from exc
                    return existing_row
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=f"Thread {thread_id} already exists.",
                ) from exc
            created_row: ThreadRow | None = await cursor.fetchone()
        await conn.commit()
    if created_row is None:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to create thread {thread_id}.",
        )
    return created_row

update_thread async

update_thread(
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None

Update mutable metadata for a thread.

Source code in src/skeino/persistence/metadata_store.py
async def update_thread(
    self,
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None:
    """Update mutable metadata for a thread."""
    assignments: list[str] = ["updated_at = NOW()"]
    values: list[Any] = []
    if status_value is not None:
        assignments.append("status = %s")
        values.append(status_value)
    if config is not None:
        assignments.append("config = %s")
        values.append(_to_jsonb(config))
    if metadata is not None:
        assignments.append("metadata = %s")
        values.append(_to_jsonb(metadata))
    if mark_state_updated:
        assignments.append("state_updated_at = NOW()")

    if len(assignments) == 1:
        return

    values.append(thread_id)
    psycopg, _ = _pg()
    # nosec B608: `assignments` holds only hardcoded "column = %s"/NOW()
    # fragments built above; every user value is bound via %s parameters.
    query = f"""
        UPDATE app_threads
        SET {", ".join(assignments)}
        WHERE thread_id = %s
    """  # nosec B608
    async with await psycopg.AsyncConnection.connect(self._postgres_uri) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, values)
        await conn.commit()

delete_thread async

delete_thread(thread_id: str) -> None

Delete a thread row and its run rows.

Source code in src/skeino/persistence/metadata_store.py
async def delete_thread(self, thread_id: str) -> None:
    """Delete a thread row and its run rows."""
    psycopg, _ = _pg()
    async with await psycopg.AsyncConnection.connect(self._postgres_uri) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(
                "DELETE FROM app_runs WHERE thread_id = %s", (thread_id,)
            )
            await cursor.execute(
                "DELETE FROM app_threads WHERE thread_id = %s", (thread_id,)
            )
        await conn.commit()

search_thread_rows async

search_thread_rows(
    request: ThreadSearchRequest,
) -> list[ThreadRow]

Return stored thread rows before graph-state enrichment.

Source code in src/skeino/persistence/metadata_store.py
async def search_thread_rows(
    self,
    request: ThreadSearchRequest,
) -> list[ThreadRow]:
    """Return stored thread rows before graph-state enrichment."""
    conditions: list[str] = []
    values: list[Any] = []
    if request.ids:
        conditions.append("thread_id = ANY(%s)")
        values.append([str(thread_id) for thread_id in request.ids])
    if request.status is not None:
        conditions.append("status = %s")
        values.append(request.status)

    where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
    sort_by = request.sort_by or DEFAULT_SORT_BY
    if sort_by not in THREAD_SORT_FIELDS:
        sort_by = DEFAULT_SORT_BY
    sort_order = request.sort_order or DEFAULT_SORT_ORDER
    if sort_order not in {"asc", "desc"}:
        sort_order = DEFAULT_SORT_ORDER

    # nosec B608: `where_clause` is composed of hardcoded "column = %s"
    # conditions, and `sort_by`/`sort_order` are whitelisted against
    # THREAD_SORT_FIELDS and {"asc","desc"}; user values are bound via %s.
    query = f"""
        SELECT thread_id, created_at, updated_at, state_updated_at,
               metadata, config, status, ttl
        FROM app_threads
        {where_clause}
        ORDER BY {sort_by} {sort_order.upper()}
        LIMIT %s
        OFFSET %s
    """  # nosec B608
    values.extend([request.limit, request.offset])
    psycopg, dict_row = _pg()
    async with await psycopg.AsyncConnection.connect(
        self._postgres_uri, row_factory=dict_row
    ) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, values)
            rows = await cursor.fetchall()
    return list(rows)

create_run async

create_run(
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow

Insert a run row and return it.

Source code in src/skeino/persistence/metadata_store.py
async def create_run(
    self,
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow:
    """Insert a run row and return it."""
    psycopg, dict_row = _pg()
    async with await psycopg.AsyncConnection.connect(
        self._postgres_uri, row_factory=dict_row
    ) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(
                """
                INSERT INTO app_runs (
                    run_id, thread_id, assistant_id, status,
                    metadata, kwargs, multitask_strategy
                ) VALUES (%s, %s, %s, %s, %s, %s, %s)
                RETURNING run_id, thread_id, assistant_id, created_at, updated_at,
                          status, metadata, kwargs, multitask_strategy, error
                """,
                (
                    run_id,
                    thread_id,
                    assistant_id,
                    RUN_STATUS_PENDING,
                    _to_jsonb(metadata),
                    _to_jsonb(kwargs),
                    multitask_strategy,
                ),
            )
            run_row: RunRow | None = await cursor.fetchone()
        await conn.commit()
    if run_row is None:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to create run {run_id}.",
        )
    return run_row

update_run_status async

update_run_status(
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None

Update the persisted run status.

Source code in src/skeino/persistence/metadata_store.py
async def update_run_status(
    self,
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None:
    """Update the persisted run status."""
    psycopg, _ = _pg()
    async with await psycopg.AsyncConnection.connect(self._postgres_uri) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(
                """
                UPDATE app_runs
                SET status = %s, updated_at = NOW(), error = %s
                WHERE run_id = %s
                """,
                (status_value, error, run_id),
            )
        await conn.commit()

fetch_run_row async

fetch_run_row(thread_id: str, run_id: str) -> RunRow | None

Return a single run row for a thread.

Source code in src/skeino/persistence/metadata_store.py
async def fetch_run_row(self, thread_id: str, run_id: str) -> RunRow | None:
    """Return a single run row for a thread."""
    psycopg, dict_row = _pg()
    async with await psycopg.AsyncConnection.connect(
        self._postgres_uri, row_factory=dict_row
    ) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(
                """
                SELECT run_id, thread_id, assistant_id, created_at, updated_at,
                       status, metadata, kwargs, multitask_strategy, error
                FROM app_runs
                WHERE thread_id = %s AND run_id = %s
                """,
                (thread_id, run_id),
            )
            row: RunRow | None = await cursor.fetchone()
            return row

list_run_rows async

list_run_rows(
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]

List run rows for a thread.

Source code in src/skeino/persistence/metadata_store.py
async def list_run_rows(
    self,
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]:
    """List run rows for a thread."""
    conditions: list[str] = ["thread_id = %s"]
    values: list[Any] = [thread_id]
    if status_value is not None:
        conditions.append("status = %s")
        values.append(status_value)

    # nosec B608: `conditions` holds only hardcoded "column = %s" fragments
    # built above; every user value is bound via %s parameters.
    query = f"""
        SELECT run_id, thread_id, assistant_id, created_at, updated_at,
               status, metadata, kwargs, multitask_strategy, error
        FROM app_runs
        WHERE {" AND ".join(conditions)}
        ORDER BY created_at DESC
        LIMIT %s
        OFFSET %s
    """  # nosec B608
    values.extend([limit, offset])
    psycopg, dict_row = _pg()
    async with await psycopg.AsyncConnection.connect(
        self._postgres_uri, row_factory=dict_row
    ) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, values)
            rows = await cursor.fetchall()
    return list(rows)

MongoMetadataStore

MongoMetadataStore(uri: str, *, db_name: str | None = None)

MongoDB-backed thread + run metadata store (MetadataStoreProtocol).

Store the URI; db_name defaults to the URI's path, else "skeino".

Source code in src/skeino/persistence/mongo_store.py
def __init__(self, uri: str, *, db_name: str | None = None) -> None:
    """Store the URI; ``db_name`` defaults to the URI's path, else "skeino"."""
    self._uri = uri
    self._db_name = db_name or mongo_db_from_uri(uri) or _DEFAULT_DB_NAME
    self._client: Any = None
    self._threads: Any = None
    self._runs: Any = None

setup async

setup() -> None

Open the motor client (lazily) and ensure indexes.

Source code in src/skeino/persistence/mongo_store.py
async def setup(self) -> None:
    """Open the motor client (lazily) and ensure indexes."""
    try:
        import motor.motor_asyncio  # optional dependency: skeino[mongodb]
    except ImportError as exc:  # pragma: no cover - optional dependency
        raise RuntimeError(
            "The 'mongodb' metadata store requires the skeino[mongodb] extra "
            "(pip install 'skeino[mongodb]')."
        ) from exc

    self._client = motor.motor_asyncio.AsyncIOMotorClient(self._uri)
    db = self._client[self._db_name]
    self._threads = db["app_threads"]
    self._runs = db["app_runs"]
    await self._threads.create_index([("status", 1), ("updated_at", -1)])
    await self._runs.create_index([("thread_id", 1), ("created_at", -1)])

aclose async

aclose() -> None

Close the motor client (called on app shutdown).

Source code in src/skeino/persistence/mongo_store.py
async def aclose(self) -> None:
    """Close the motor client (called on app shutdown)."""
    if self._client is not None:
        self._client.close()
        self._client = None

fetch_thread_row async

fetch_thread_row(thread_id: str) -> ThreadRow | None

Return the stored row for thread_id (or None).

Source code in src/skeino/persistence/mongo_store.py
async def fetch_thread_row(self, thread_id: str) -> ThreadRow | None:
    """Return the stored row for ``thread_id`` (or None)."""
    doc = await self._threads.find_one({"_id": thread_id})
    return self._thread_row(doc) if doc is not None else None

create_thread async

create_thread(
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow

Insert a thread document and return its row.

Source code in src/skeino/persistence/mongo_store.py
async def create_thread(
    self,
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow:
    """Insert a thread document and return its row."""
    from pymongo.errors import DuplicateKeyError

    now = _utcnow()
    ttl_payload = self._ttl_payload(ttl, now)
    doc = {
        "_id": thread_id,
        "thread_id": thread_id,
        "created_at": now,
        "updated_at": now,
        "state_updated_at": None,
        "metadata": dict(metadata),
        "config": dict(config),
        "status": "idle",
        "ttl": ttl_payload,
    }
    try:
        await self._threads.insert_one(doc)
    except DuplicateKeyError as exc:
        if if_exists == "do_nothing":
            existing = await self.fetch_thread_row(thread_id)
            if existing is not None:
                return existing
            raise HTTPException(
                status_code=status.HTTP_409_CONFLICT,
                detail=f"Thread {thread_id} insert conflicted but the row "
                "could not be re-read (concurrent delete?).",
            ) from exc
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail=f"Thread {thread_id} already exists.",
        ) from exc
    return self._thread_row(doc)

update_thread async

update_thread(
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None

Update mutable thread fields.

Source code in src/skeino/persistence/mongo_store.py
async def update_thread(
    self,
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None:
    """Update mutable thread fields."""
    updates: dict[str, Any] = {"updated_at": _utcnow()}
    if status_value is not None:
        updates["status"] = status_value
    if config is not None:
        updates["config"] = dict(config)
    if metadata is not None:
        updates["metadata"] = dict(metadata)
    if mark_state_updated:
        updates["state_updated_at"] = _utcnow()
    if len(updates) == 1:
        return
    await self._threads.update_one({"_id": thread_id}, {"$set": updates})

search_thread_rows async

search_thread_rows(
    request: ThreadSearchRequest,
) -> list[ThreadRow]

Return stored thread rows (filtered by ids/status, sorted, paginated).

Source code in src/skeino/persistence/mongo_store.py
async def search_thread_rows(self, request: ThreadSearchRequest) -> list[ThreadRow]:
    """Return stored thread rows (filtered by ids/status, sorted, paginated)."""
    query: dict[str, Any] = {}
    if request.ids:
        query["_id"] = {"$in": [str(item) for item in request.ids]}
    if request.status is not None:
        query["status"] = request.status
    sort_by = request.sort_by or _DEFAULT_SORT_BY
    if sort_by not in _THREAD_SORT_FIELDS:
        sort_by = _DEFAULT_SORT_BY
    direction = 1 if request.sort_order == "asc" else -1
    cursor = (
        self._threads.find(query)
        .sort(sort_by, direction)
        .skip(request.offset)
        .limit(request.limit)
    )
    return [self._thread_row(doc) async for doc in cursor]

delete_thread async

delete_thread(thread_id: str) -> None

Delete a thread and its run documents.

Source code in src/skeino/persistence/mongo_store.py
async def delete_thread(self, thread_id: str) -> None:
    """Delete a thread and its run documents."""
    await self._runs.delete_many({"thread_id": thread_id})
    await self._threads.delete_one({"_id": thread_id})

create_run async

create_run(
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow

Insert a run document and return its row.

Source code in src/skeino/persistence/mongo_store.py
async def create_run(
    self,
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow:
    """Insert a run document and return its row."""
    now = _utcnow()
    doc = {
        "_id": run_id,
        "run_id": run_id,
        "thread_id": thread_id,
        "assistant_id": assistant_id,
        "created_at": now,
        "updated_at": now,
        "status": "pending",
        "metadata": dict(metadata),
        "kwargs": dict(kwargs),
        "multitask_strategy": multitask_strategy,
        "error": None,
    }
    await self._runs.insert_one(doc)
    return self._run_row(doc)

update_run_status async

update_run_status(
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None

Update a run's status field.

Source code in src/skeino/persistence/mongo_store.py
async def update_run_status(
    self,
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None:
    """Update a run's status field."""
    await self._runs.update_one(
        {"_id": run_id},
        {"$set": {"status": status_value, "updated_at": _utcnow(), "error": error}},
    )

fetch_run_row async

fetch_run_row(thread_id: str, run_id: str) -> RunRow | None

Return a run row scoped to thread_id.

Source code in src/skeino/persistence/mongo_store.py
async def fetch_run_row(self, thread_id: str, run_id: str) -> RunRow | None:
    """Return a run row scoped to ``thread_id``."""
    doc = await self._runs.find_one({"_id": run_id, "thread_id": thread_id})
    return self._run_row(doc) if doc is not None else None

list_run_rows async

list_run_rows(
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]

List runs for a thread sorted newest-first.

Source code in src/skeino/persistence/mongo_store.py
async def list_run_rows(
    self,
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]:
    """List runs for a thread sorted newest-first."""
    query: dict[str, Any] = {"thread_id": thread_id}
    if status_value is not None:
        query["status"] = status_value
    cursor = self._runs.find(query).sort("created_at", -1).skip(offset).limit(limit)
    return [self._run_row(doc) async for doc in cursor]

SqliteMetadataStore

SqliteMetadataStore(path: str)

SQLite-backed thread + run metadata store satisfying MetadataStoreProtocol.

Store the SQLite path/URI (a file path, :memory:, or sqlite://).

Source code in src/skeino/persistence/sqlite_store.py
def __init__(self, path: str) -> None:
    """Store the SQLite path/URI (a file path, ``:memory:``, or ``sqlite://``)."""
    self._path = normalize_sqlite_uri(path)
    self._conn: Any = None
    self._lock = asyncio.Lock()

setup async

setup() -> None

Open the connection (lazily importing aiosqlite) and create tables.

Source code in src/skeino/persistence/sqlite_store.py
async def setup(self) -> None:
    """Open the connection (lazily importing aiosqlite) and create tables."""
    try:
        import aiosqlite  # optional dependency: skeino[sqlite]
    except ImportError as exc:  # pragma: no cover - optional dependency
        raise RuntimeError(
            "The 'sqlite' metadata store requires the skeino[sqlite] extra "
            "(pip install 'skeino[sqlite]')."
        ) from exc

    self._conn = await aiosqlite.connect(self._path)
    # Busy timeout first: the WAL conversion below takes an exclusive lock
    # and must itself wait out a concurrent checkpointer connection.
    await self._conn.execute(f"PRAGMA busy_timeout = {_BUSY_TIMEOUT_MS}")
    # WAL lets readers and a writer coexist on a shared file; a harmless
    # no-op on ":memory:" (journal_mode stays "memory").
    await self._conn.execute("PRAGMA journal_mode=WAL")
    await self._conn.execute("PRAGMA foreign_keys = ON")
    await self._conn.execute(_CREATE_THREADS_SQL)
    await self._conn.execute(_CREATE_RUNS_SQL)
    await self._conn.execute(_CREATE_RUNS_INDEX_SQL)
    await self._conn.commit()

aclose async

aclose() -> None

Close the underlying connection (called on app shutdown).

Source code in src/skeino/persistence/sqlite_store.py
async def aclose(self) -> None:
    """Close the underlying connection (called on app shutdown)."""
    if self._conn is not None:
        await self._conn.close()
        self._conn = None

fetch_thread_row async

fetch_thread_row(thread_id: str) -> ThreadRow | None

Return the stored row for thread_id (or None).

Source code in src/skeino/persistence/sqlite_store.py
async def fetch_thread_row(self, thread_id: str) -> ThreadRow | None:
    """Return the stored row for ``thread_id`` (or None)."""
    async with self._lock:
        cursor = await self._conn.execute(
            "SELECT thread_id, created_at, updated_at, state_updated_at, "
            "metadata, config, status, ttl FROM app_threads WHERE thread_id = ?",
            (thread_id,),
        )
        row = await cursor.fetchone()
    return self._thread_row(row) if row is not None else None

create_thread async

create_thread(
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow

Insert a thread row and return it.

Source code in src/skeino/persistence/sqlite_store.py
async def create_thread(
    self,
    thread_id: str,
    *,
    metadata: dict[str, JsonValue],
    config: dict[str, JsonValue],
    ttl: ThreadTtlConfig | None,
    if_exists: ThreadIfExists,
) -> ThreadRow:
    """Insert a thread row and return it."""
    now = _utcnow()
    ttl_payload = self._ttl_payload(ttl, now)
    async with self._lock:
        try:
            await self._conn.execute(
                "INSERT INTO app_threads "
                "(thread_id, created_at, updated_at, metadata, config, status, ttl)"
                " VALUES (?, ?, ?, ?, ?, ?, ?)",
                (
                    thread_id,
                    now.isoformat(),
                    now.isoformat(),
                    json.dumps(metadata),
                    json.dumps(config),
                    "idle",
                    json.dumps(ttl_payload) if ttl_payload is not None else None,
                ),
            )
            await self._conn.commit()
        except sqlite3.IntegrityError as exc:
            await self._conn.rollback()
            if if_exists == "do_nothing":
                existing = await self._fetch_thread_locked(thread_id)
                if existing is not None:
                    return existing
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=f"Thread {thread_id} insert conflicted but the row "
                    "could not be re-read (concurrent delete?).",
                ) from exc
            raise HTTPException(
                status_code=status.HTTP_409_CONFLICT,
                detail=f"Thread {thread_id} already exists.",
            ) from exc
    return {
        "thread_id": UUID(thread_id),
        "created_at": now,
        "updated_at": now,
        "state_updated_at": None,
        "metadata": dict(metadata),
        "config": dict(config),
        "status": "idle",
        "ttl": ttl_payload,
    }

update_thread async

update_thread(
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None

Update mutable thread fields.

Source code in src/skeino/persistence/sqlite_store.py
async def update_thread(
    self,
    thread_id: str,
    *,
    status_value: ThreadStatus | None = None,
    config: dict[str, JsonValue] | None = None,
    metadata: dict[str, JsonValue] | None = None,
    mark_state_updated: bool = False,
) -> None:
    """Update mutable thread fields."""
    assignments = ["updated_at = ?"]
    values: list[Any] = [_utcnow().isoformat()]
    if status_value is not None:
        assignments.append("status = ?")
        values.append(status_value)
    if config is not None:
        assignments.append("config = ?")
        values.append(json.dumps(config))
    if metadata is not None:
        assignments.append("metadata = ?")
        values.append(json.dumps(metadata))
    if mark_state_updated:
        assignments.append("state_updated_at = ?")
        values.append(_utcnow().isoformat())
    if len(assignments) == 1:
        return
    values.append(thread_id)
    # `assignments` holds only hardcoded "column = ?" fragments; user values
    # are bound via ? parameters.
    query = f"UPDATE app_threads SET {', '.join(assignments)} WHERE thread_id = ?"  # nosec B608
    async with self._lock:
        await self._conn.execute(query, values)
        await self._conn.commit()

search_thread_rows async

search_thread_rows(
    request: ThreadSearchRequest,
) -> list[ThreadRow]

Return stored thread rows (filtered by ids/status, sorted, paginated).

Source code in src/skeino/persistence/sqlite_store.py
async def search_thread_rows(self, request: ThreadSearchRequest) -> list[ThreadRow]:
    """Return stored thread rows (filtered by ids/status, sorted, paginated)."""
    conditions: list[str] = []
    values: list[Any] = []
    if request.ids:
        placeholders = ", ".join("?" for _ in request.ids)
        conditions.append(f"thread_id IN ({placeholders})")
        values.extend(str(item) for item in request.ids)
    if request.status is not None:
        conditions.append("status = ?")
        values.append(request.status)
    where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
    sort_by = request.sort_by or _DEFAULT_SORT_BY
    if sort_by not in _THREAD_SORT_FIELDS:
        sort_by = _DEFAULT_SORT_BY
    order = "ASC" if request.sort_order == "asc" else "DESC"
    values.extend([request.limit, request.offset])
    # `where` is composed of hardcoded "column = ?"/"IN (?)" fragments;
    # sort_by is whitelisted against _THREAD_SORT_FIELDS and order is a
    # literal ASC/DESC; every user value is bound via ? parameters.
    query = f"SELECT {_THREAD_COLUMNS} FROM app_threads {where} ORDER BY {sort_by} {order} LIMIT ? OFFSET ?"  # nosec B608
    async with self._lock:
        cursor = await self._conn.execute(query, values)
        rows = await cursor.fetchall()
    return [self._thread_row(row) for row in rows]

delete_thread async

delete_thread(thread_id: str) -> None

Delete a thread and its run rows.

Source code in src/skeino/persistence/sqlite_store.py
async def delete_thread(self, thread_id: str) -> None:
    """Delete a thread and its run rows."""
    async with self._lock:
        await self._conn.execute(
            "DELETE FROM app_runs WHERE thread_id = ?", (thread_id,)
        )
        await self._conn.execute(
            "DELETE FROM app_threads WHERE thread_id = ?", (thread_id,)
        )
        await self._conn.commit()

create_run async

create_run(
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow

Insert a run row and return it.

Source code in src/skeino/persistence/sqlite_store.py
async def create_run(
    self,
    run_id: str,
    thread_id: str,
    assistant_id: str,
    metadata: dict[str, JsonValue],
    kwargs: dict[str, JsonValue],
    multitask_strategy: MultitaskStrategy,
) -> RunRow:
    """Insert a run row and return it."""
    now = _utcnow()
    async with self._lock:
        await self._conn.execute(
            "INSERT INTO app_runs "
            "(run_id, thread_id, assistant_id, created_at, updated_at, "
            "status, metadata, kwargs, multitask_strategy)"
            " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
            (
                run_id,
                thread_id,
                assistant_id,
                now.isoformat(),
                now.isoformat(),
                "pending",
                json.dumps(metadata),
                json.dumps(kwargs),
                multitask_strategy,
            ),
        )
        await self._conn.commit()
    return {
        "run_id": UUID(run_id),
        "thread_id": UUID(thread_id),
        "assistant_id": assistant_id,
        "created_at": now,
        "updated_at": now,
        "status": "pending",
        "metadata": dict(metadata),
        "kwargs": dict(kwargs),
        "multitask_strategy": multitask_strategy,
        "error": None,
    }

update_run_status async

update_run_status(
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None

Update a run's status field.

Source code in src/skeino/persistence/sqlite_store.py
async def update_run_status(
    self,
    run_id: str,
    status_value: RunStatus,
    *,
    error: str | None = None,
) -> None:
    """Update a run's status field."""
    async with self._lock:
        await self._conn.execute(
            "UPDATE app_runs SET status = ?, updated_at = ?, error = ? "
            "WHERE run_id = ?",
            (status_value, _utcnow().isoformat(), error, run_id),
        )
        await self._conn.commit()

fetch_run_row async

fetch_run_row(thread_id: str, run_id: str) -> RunRow | None

Return a run row scoped to thread_id.

Source code in src/skeino/persistence/sqlite_store.py
async def fetch_run_row(self, thread_id: str, run_id: str) -> RunRow | None:
    """Return a run row scoped to ``thread_id``."""
    async with self._lock:
        cursor = await self._conn.execute(
            f"SELECT {_RUN_COLUMNS} FROM app_runs "  # nosec B608 - static columns
            "WHERE thread_id = ? AND run_id = ?",
            (thread_id, run_id),
        )
        row = await cursor.fetchone()
    return self._run_row(row) if row is not None else None

list_run_rows async

list_run_rows(
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]

List runs for a thread sorted newest-first.

Source code in src/skeino/persistence/sqlite_store.py
async def list_run_rows(
    self,
    thread_id: str,
    *,
    limit: int,
    offset: int,
    status_value: RunStatus | None,
) -> list[RunRow]:
    """List runs for a thread sorted newest-first."""
    conditions = ["thread_id = ?"]
    values: list[Any] = [thread_id]
    if status_value is not None:
        conditions.append("status = ?")
        values.append(status_value)
    values.extend([limit, offset])
    # conditions are hardcoded "column = ?" fragments; user values are
    # bound via ? parameters.
    where = " AND ".join(conditions)
    query = f"SELECT {_RUN_COLUMNS} FROM app_runs WHERE {where} ORDER BY created_at DESC LIMIT ? OFFSET ?"  # nosec B608
    async with self._lock:
        cursor = await self._conn.execute(query, values)
        rows = await cursor.fetchall()
    return [self._run_row(row) for row in rows]

open_checkpointer async

open_checkpointer(
    uri: str | None = None,
    *,
    scheme: str | None = None,
    setup_schema: bool = True,
    options: dict[str, Any] | None = None,
) -> AsyncIterator[BaseCheckpointSaver]

Yield a checkpointer instance, releasing its resources on exit.

Resolution: an explicit scheme wins; otherwise it is derived from the uri; falling back to memory when both are absent.

Source code in src/skeino/persistence/checkpointer.py
@asynccontextmanager
async def open_checkpointer(
    uri: str | None = None,
    *,
    scheme: str | None = None,
    setup_schema: bool = True,
    options: dict[str, Any] | None = None,
) -> AsyncIterator[BaseCheckpointSaver]:
    """Yield a checkpointer instance, releasing its resources on exit.

    Resolution: an explicit ``scheme`` wins; otherwise it is derived from the
    ``uri``; falling back to ``memory`` when both are absent.
    """
    effective_scheme = (scheme or _scheme_for_uri(uri)).lower()
    builder = _REGISTRY.get(effective_scheme)
    if builder is None:
        raise ValueError(
            f"No checkpointer registered for scheme {effective_scheme!r}. "
            f"Known schemes: {sorted(_REGISTRY)}"
        )
    spec = CheckpointerSpec(
        scheme=effective_scheme,
        uri=uri,
        options={"setup_schema": setup_schema, **(options or {})},
    )
    async with builder(spec) as checkpointer:
        yield checkpointer

register_checkpointer

register_checkpointer(
    *schemes: str,
) -> Callable[[CheckpointerBuilder], CheckpointerBuilder]

Register a checkpointer builder for one or more URI schemes.

Source code in src/skeino/persistence/checkpointer.py
def register_checkpointer(
    *schemes: str,
) -> Callable[[CheckpointerBuilder], CheckpointerBuilder]:
    """Register a checkpointer builder for one or more URI schemes."""

    def decorator(builder: CheckpointerBuilder) -> CheckpointerBuilder:
        for scheme in schemes:
            _REGISTRY[scheme] = builder
        return builder

    return decorator

Streaming

streaming

Streaming layer: SSE encoding, retry, graph dispatch.

Streamer

Streamer(
    graph: Any,
    *,
    agent_nodes: frozenset[str] = frozenset(),
    status_field: str | None = None,
)

Dispatch graph streams across the supported LangGraph stream modes.

Capture the graph and the policy for incremental message streaming.

Source code in src/skeino/streaming/runner.py
def __init__(
    self,
    graph: Any,
    *,
    agent_nodes: frozenset[str] = frozenset(),
    status_field: str | None = None,
) -> None:
    """Capture the graph and the policy for incremental message streaming."""
    self._graph = graph
    self._agent_nodes = agent_nodes
    self._status_field = status_field
    self._output_keys: frozenset[str] | None = self._resolve_output_keys()

stream async

stream(
    runnable_input: Any,
    config: dict[str, Any],
    request: RunCreateRequest,
    stream_modes: list[str],
) -> AsyncIterator[tuple[str, dict[str, JsonValue]]]

Yield (event_name, payload) tuples for one streaming run.

Source code in src/skeino/streaming/runner.py
async def stream(
    self,
    runnable_input: Any,
    config: dict[str, Any],
    request: RunCreateRequest,
    stream_modes: list[str],
) -> AsyncIterator[tuple[str, dict[str, JsonValue]]]:
    """Yield ``(event_name, payload)`` tuples for one streaming run."""
    if stream_modes == ["events"]:
        async for event in self._graph.astream_events(
            runnable_input,
            config,
            version="v2",
            interrupt_before=request.interrupt_before,
            interrupt_after=request.interrupt_after,
            durability=request.durability,
        ):
            yield "events", serialize_mapping(event)
        return

    # Token-by-token streaming. When the consumer opts in (agent_nodes set)
    # and the client wants "values", synthesize incremental "values" events
    # from the message stream so UIs can reveal text as the model speaks.
    # Real langgraph-sdk clients request ["values", "messages-tuple",
    # "custom"], so match on membership, not equality — an exact ["values"]
    # check never fires for them. The accumulator *covers* exactly those
    # modes (values incrementally; messages-tuple becomes redundant; custom
    # is forwarded), so only engage when every requested mode is one it
    # serves — a request for "updates" (or anything else) still needs the
    # generic path. "events" is exclusive (validated upstream), handled above.
    if (
        self._agent_nodes
        and "values" in stream_modes
        and set(stream_modes) <= _ACCUMULATOR_MODES
    ):
        async for event_name, payload in stream_incremental_values(
            self._graph,
            runnable_input,
            config,
            request,
            agent_nodes=self._agent_nodes,
            status_field=self._status_field,
        ):
            if event_name == "values" and isinstance(payload, dict):
                payload = self._filter_values(payload)
            yield event_name, payload
        return

    async for chunk in self._graph.astream(
        runnable_input,
        config,
        context=normalize_input_payload(request.context),
        stream_mode=stream_modes if len(stream_modes) > 1 else stream_modes[0],
        interrupt_before=request.interrupt_before,
        interrupt_after=request.interrupt_after,
        durability=request.durability,
        subgraphs=request.stream_subgraphs,
    ):
        if isinstance(chunk, tuple) and len(chunk) == 3:
            event_name = str(chunk[1])
            event_payload = serialize_value(chunk[2])
        elif isinstance(chunk, tuple) and len(chunk) == 2:
            event_name = str(chunk[0])
            event_payload = serialize_value(chunk[1])
        else:
            event_name = stream_modes[0]
            event_payload = serialize_value(chunk)
        if isinstance(event_payload, dict):
            if event_name == "values":
                event_payload = self._filter_values(event_payload)
            yield event_name, event_payload

stream_incremental_values async

stream_incremental_values(
    graph: Any,
    runnable_input: Any,
    config: dict[str, Any],
    request: RunCreateRequest,
    *,
    agent_nodes: frozenset[str] = frozenset(),
    status_field: str | None = None,
) -> AsyncIterator[tuple[str, dict[str, JsonValue]]]

Yield values (and optional status) events as the graph streams.

Source code in src/skeino/streaming/incremental.py
async def stream_incremental_values(
    graph: Any,
    runnable_input: Any,
    config: dict[str, Any],
    request: RunCreateRequest,
    *,
    agent_nodes: frozenset[str] = frozenset(),
    status_field: str | None = None,
) -> AsyncIterator[tuple[str, dict[str, JsonValue]]]:
    """Yield ``values`` (and optional ``status``) events as the graph streams."""
    last_values: dict[str, JsonValue] = {}
    streaming_node: str | None = None
    streaming_id: str | None = None
    accumulated_content = ""
    accumulated_tool_calls: list[dict[str, JsonValue]] = []
    previous_messages: list[dict[str, JsonValue]] = []
    emitted_status_count: int = 0

    async for chunk in graph.astream(
        runnable_input,
        config,
        context=normalize_input_payload(request.context),
        stream_mode=["messages", "values", "custom"],
        interrupt_before=request.interrupt_before,
        interrupt_after=request.interrupt_after,
        durability=request.durability,
        subgraphs=request.stream_subgraphs,
    ):
        if isinstance(chunk, tuple) and len(chunk) == 3:
            event_name, payload = str(chunk[1]), chunk[2]
        elif isinstance(chunk, tuple) and len(chunk) == 2:
            event_name, payload = str(chunk[0]), chunk[1]
        else:
            continue

        if event_name == "custom":
            # Pass through graph-emitted custom (UI) events untouched so
            # generative-UI consumers keep working alongside token streaming.
            serialized = serialize_value(payload)
            if isinstance(serialized, dict):
                yield "custom", serialized
            continue

        if event_name == "values":
            last_values = serialize_mapping(payload)
            raw_messages = last_values.get("messages", [])
            previous_messages = (
                list(raw_messages) if isinstance(raw_messages, list) else []
            )

            if status_field is not None:
                status_items = last_values.get(status_field) or []
                if isinstance(status_items, list):
                    new_items = status_items[emitted_status_count:]
                    for item in new_items:
                        yield "status", {"message": str(item)}
                    emitted_status_count = len(status_items)

            yield "values", last_values
            streaming_node = None
            streaming_id = None
            accumulated_content = ""
            accumulated_tool_calls = []
            continue

        if event_name == "messages":
            msg_chunk, meta = payload
            node = str(meta.get("langgraph_node", "")) if isinstance(meta, dict) else ""
            if node not in agent_nodes:
                continue
            if node and node != streaming_node:
                streaming_node = node
                streaming_id = streaming_id or f"ai_{uuid4().hex[:8]}"
                accumulated_content = ""
                accumulated_tool_calls = []
            content = (
                getattr(msg_chunk, "content", None) or ""
                if hasattr(msg_chunk, "content")
                else ""
            )
            if isinstance(content, str):
                accumulated_content += content
            elif isinstance(content, list):
                for block in content:
                    if isinstance(block, dict) and "text" in block:
                        accumulated_content += str(block["text"])
                    elif isinstance(block, str):
                        accumulated_content += block
            tool_calls = getattr(msg_chunk, "tool_calls", None) or []
            if tool_calls:
                accumulated_tool_calls = [
                    {
                        "id": str(tc.get("id", "") or uuid4().hex[:12]),
                        "name": str(tc.get("name", "")),
                        "args": serialize_value(tc.get("args", {})),
                    }
                    for tc in tool_calls
                    if isinstance(tc, dict)
                ]
            streaming_id = streaming_id or f"ai_{uuid4().hex[:8]}"
            if not accumulated_content and not accumulated_tool_calls:
                continue
            in_progress: dict[str, JsonValue] = {
                "id": streaming_id,
                "type": "ai",
                "content": accumulated_content,
            }
            if accumulated_tool_calls:
                in_progress["tool_calls"] = accumulated_tool_calls
            combined = previous_messages + [in_progress]
            yield "values", {"messages": combined}

is_retriable_stream_error

is_retriable_stream_error(exc: BaseException) -> bool

Return True for transient errors worth retrying during graph streaming.

Source code in src/skeino/streaming/sse.py
def is_retriable_stream_error(exc: BaseException) -> bool:
    """Return True for transient errors worth retrying during graph streaming."""
    msg = str(exc).lower()
    if "timeout" in msg or "timed out" in msg:
        return True
    if "ssl" in msg or "syscall" in msg or "connection" in msg:
        return True
    if "could not receive data from server" in msg:
        return True
    return False

sse_event

sse_event(
    event: str, data: dict[str, JsonValue], event_id: int
) -> str

Format a server-sent event chunk.

Source code in src/skeino/streaming/sse.py
def sse_event(event: str, data: dict[str, JsonValue], event_id: int) -> str:
    """Format a server-sent event chunk."""
    return f"id: {event_id}\nevent: {event}\ndata: {_json_dumps(data)}\n\n"