Skip to content

Checkpointers

State persistence between agent runs. Oracle Autonomous Database is the production backend: native JSON columns, the OracleCheckpointSaver versioned saver matching the langgraph-oracledb shape, and pool-based async access via python-oracledb.

For long-term memory (durable KV store, semantic recall), see Memory. This page covers the per-run state snapshot contract used by AgentConfig.checkpointer.

Contract

BaseCheckpointer

Bases: ABC

Abstract base class for checkpointer implementations.

Checkpointers handle saving and loading agent state, enabling features like: - Conversation persistence - Session recovery - Branching conversations - State inspection and debugging - Full-text search (backend-dependent) - Metadata queries (backend-dependent)

All methods are async to support various backends (file, database, network storage, etc.).

Use the capabilities property to check which features are available before calling extended methods.

Example

if checkpointer.capabilities.search: ... results = await checkpointer.search("error handling") if checkpointer.capabilities.branching: ... await checkpointer.copy_thread("main", "experiment")

capabilities property

capabilities: CheckpointerCapabilities

Return the capabilities of this checkpointer.

Override in subclasses to advertise supported features.

save abstractmethod async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state.

Parameters:

Name Type Description Default
state AgentState

Current agent state to persist

required
thread_id str

Unique identifier for the conversation thread

required
checkpoint_id str | None

Optional specific checkpoint ID. If not provided, a new ID will be generated.

None
metadata dict[str, Any] | None

Optional metadata for querying/filtering checkpoints

None

Returns:

Type Description
str

Checkpoint ID that can be used to restore this state

Source code in src/locus/memory/checkpointer.py
@abstractmethod
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state.

    Args:
        state: Current agent state to persist
        thread_id: Unique identifier for the conversation thread
        checkpoint_id: Optional specific checkpoint ID. If not provided,
                      a new ID will be generated.
        metadata: Optional metadata for querying/filtering checkpoints

    Returns:
        Checkpoint ID that can be used to restore this state
    """
    ...

load abstractmethod async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state.

Parameters:

Name Type Description Default
thread_id str

Thread identifier to load from

required
checkpoint_id str | None

Optional specific checkpoint ID. If not provided, loads the latest checkpoint.

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/checkpointer.py
@abstractmethod
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state.

    Args:
        thread_id: Thread identifier to load from
        checkpoint_id: Optional specific checkpoint ID. If not provided,
                      loads the latest checkpoint.

    Returns:
        Restored AgentState or None if not found
    """
    ...

list_checkpoints abstractmethod async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints for a thread.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number of checkpoint IDs to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/checkpointer.py
@abstractmethod
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints for a thread.

    Args:
        thread_id: Thread identifier
        limit: Maximum number of checkpoint IDs to return

    Returns:
        List of checkpoint IDs, newest first
    """
    ...

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete a checkpoint or all checkpoints for a thread.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to delete. If None, deletes all checkpoints for the thread.

None

Returns:

Type Description
bool

True if deletion was successful

Source code in src/locus/memory/checkpointer.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Delete a checkpoint or all checkpoints for a thread.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to delete. If None,
                      deletes all checkpoints for the thread.

    Returns:
        True if deletion was successful
    """
    raise NotImplementedError("delete not implemented for this backend")

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if a checkpoint exists.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to check. If None, checks if any checkpoint exists for the thread.

None

Returns:

Type Description
bool

True if the checkpoint exists

Source code in src/locus/memory/checkpointer.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Check if a checkpoint exists.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to check. If None,
                      checks if any checkpoint exists for the thread.

    Returns:
        True if the checkpoint exists
    """
    if checkpoint_id is None:
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        return len(checkpoints) > 0
    state = await self.load(thread_id, checkpoint_id)
    return state is not None

close async

close() -> None

Close any resources (connections, files, etc.).

Override in subclasses if cleanup is needed.

Source code in src/locus/memory/checkpointer.py
async def close(self) -> None:
    """
    Close any resources (connections, files, etc.).

    Override in subclasses if cleanup is needed.
    """

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint (latest if None)

None

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found

Source code in src/locus/memory/checkpointer.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """
    Get checkpoint metadata.

    Requires: capabilities.metadata_query = True

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint (latest if None)

    Returns:
        Metadata dict or None if not found
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("get_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Requires: capabilities.vacuum = True

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted checkpoints

Source code in src/locus/memory/checkpointer.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """
    Delete old checkpoints.

    Requires: capabilities.vacuum = True

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted checkpoints
    """
    self._require_capability("vacuum")
    raise NotImplementedError("vacuum not implemented")

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy a thread to create a branch.

Requires: capabilities.branching = True

Parameters:

Name Type Description Default
source_thread_id str

Source thread to copy from

required
dest_thread_id str

Destination thread ID

required

Returns:

Type Description
bool

True if successful

Source code in src/locus/memory/checkpointer.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """
    Copy a thread to create a branch.

    Requires: capabilities.branching = True

    Args:
        source_thread_id: Source thread to copy from
        dest_thread_id: Destination thread ID

    Returns:
        True if successful
    """
    self._require_capability("branching")
    raise NotImplementedError("copy_thread not implemented")

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

List all thread IDs.

Requires: capabilities.list_threads = True

Parameters:

Name Type Description Default
limit int

Maximum threads to return

100
pattern str

Pattern to filter threads (backend-specific)

'*'

Returns:

Type Description
list[str]

List of thread IDs

Source code in src/locus/memory/checkpointer.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """
    List all thread IDs.

    Requires: capabilities.list_threads = True

    Args:
        limit: Maximum threads to return
        pattern: Pattern to filter threads (backend-specific)

    Returns:
        List of thread IDs
    """
    self._require_capability("list_threads")
    raise NotImplementedError("list_threads not implemented")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

List checkpoints with their metadata.

Requires: capabilities.list_with_metadata = True

Parameters:

Name Type Description Default
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of {thread_id, checkpoint_id, metadata, ...} dicts

Source code in src/locus/memory/checkpointer.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    List checkpoints with their metadata.

    Requires: capabilities.list_with_metadata = True

    Args:
        limit: Maximum results

    Returns:
        List of {thread_id, checkpoint_id, metadata, ...} dicts
    """
    self._require_capability("list_with_metadata")
    raise NotImplementedError("list_with_metadata not implemented")

CheckpointerCapabilities dataclass

CheckpointerCapabilities(search: bool = False, metadata_query: bool = False, vacuum: bool = False, branching: bool = False, ttl: bool = False, list_threads: bool = False, list_with_metadata: bool = False, persistent_checkpoint_ids: bool = False)

Capabilities supported by a checkpointer.

Use this to discover what features a checkpointer supports before calling optional methods.

Example

if checkpointer.capabilities.search: ... results = await checkpointer.search("error handling")

Oracle Database 26ai

Three flavours, depending on how you want state stored:

  • OracleBackend — async, JSON-column storage; the default full-checkpointer interface.
  • OracleCheckpointSaver — versioned (LangGraph-shape) saver, with explicit checkpoint versions and parent IDs for time-travel.
  • OracleSyncBackend / OracleSyncCheckpointSaver — sync companions for the langgraph-oracledb split, when you need to run inside a sync ORM session.

OracleBackend

OracleBackend(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, **kwargs: Any)

Bases: BaseModel

Oracle Database checkpoint backend.

Production-grade persistent storage with JSON support and full-text search.

Features: - Connection pooling - JSON column storage with search - Metadata indexing - Vacuum (cleanup old checkpoints) - Works with Autonomous Database (wallet-based auth)

Example with DSN

backend = OracleBackend( ... dsn="mydb_high", # TNS name from tnsnames.ora ... user="admin", ... password="secret", ... wallet_location="/path/to/wallet", ... ) await backend.save("thread_1", state.model_dump())

Example with connection string

backend = OracleBackend( ... host="adb.us-ashburn-1.oraclecloud.com", ... port=1522, ... service_name="xxx_high.adb.oraclecloud.com", ... user="admin", ... password="secret", ... )

Source code in src/locus/memory/backends/oracle.py
def __init__(
    self,
    dsn: str | None = None,
    user: str = "admin",
    password: str | SecretStr = "",
    wallet_location: str | None = None,
    wallet_password: str | SecretStr | None = None,
    host: str | None = None,
    port: int = 1521,
    service_name: str | None = None,
    **kwargs: Any,
) -> None:
    config = OracleConfig(
        dsn=dsn,
        user=user,
        password=SecretStr(password) if isinstance(password, str) else password,
        wallet_location=wallet_location,
        wallet_password=SecretStr(wallet_password)
        if isinstance(wallet_password, str)
        else wallet_password,
        host=host,
        port=port,
        service_name=service_name,
        **kwargs,
    )
    super().__init__(config=config)

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state to Oracle Database.

Implements :meth:BaseCheckpointer.save. Parameter order is (state, thread_id, ...) to match the abstract — the prior (thread_id, data, ...) signature silently mismatched the agent runtime, which calls save(state, thread_id) and would end up trying to bind an :class:AgentState to the VARCHAR2(255) thread_id column.

Parameters:

Name Type Description Default
state AgentState

Current agent state. Serialized via :meth:AgentState.to_checkpoint (Pydantic JSON dump).

required
thread_id str

Thread identifier (column primary key).

required
checkpoint_id str | None

Optional checkpoint ID. Generated if omitted.

None
metadata dict[str, Any] | None

Optional metadata for querying.

None

Returns:

Type Description
str

Checkpoint ID.

Source code in src/locus/memory/backends/oracle.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state to Oracle Database.

    Implements :meth:`BaseCheckpointer.save`. Parameter order is
    ``(state, thread_id, ...)`` to match the abstract — the prior
    ``(thread_id, data, ...)`` signature silently mismatched the
    agent runtime, which calls ``save(state, thread_id)`` and would
    end up trying to bind an :class:`AgentState` to the
    ``VARCHAR2(255) thread_id`` column.

    Args:
        state: Current agent state. Serialized via
            :meth:`AgentState.to_checkpoint` (Pydantic JSON dump).
        thread_id: Thread identifier (column primary key).
        checkpoint_id: Optional checkpoint ID. Generated if omitted.
        metadata: Optional metadata for querying.

    Returns:
        Checkpoint ID.
    """
    from uuid import uuid4

    # Handle both calling conventions:
    #   1. BaseCheckpointer:        save(state, thread_id, ...)
    #   2. StorageBackendAdapter:   save(storage_key_str, data_dict, ...)
    # Detected by the type of the first arg.
    if isinstance(state, str) and isinstance(thread_id, dict):
        state, thread_id = thread_id, state  # swap to (data_dict, key_str)

    checkpoint_id = checkpoint_id or uuid4().hex
    data = state.to_checkpoint() if isinstance(state, AgentState) else state
    merge_sql = f"""
        MERGE INTO {self._full_table_name} t
        USING (SELECT :thread_id AS thread_id FROM dual) s
        ON (t.thread_id = s.thread_id)
        WHEN MATCHED THEN
            UPDATE SET
                checkpoint_id = :checkpoint_id,
                data = :data,
                updated_at = SYSTIMESTAMP,
                metadata = :metadata
        WHEN NOT MATCHED THEN
            INSERT (thread_id, checkpoint_id, data, metadata)
            VALUES (:thread_id, :checkpoint_id, :data, :metadata)
        """
    binds = {
        "thread_id": thread_id,
        "checkpoint_id": checkpoint_id,
        "data": json.dumps(data, default=str),
        "metadata": json.dumps(metadata or {}),
    }

    async def _do_save() -> None:
        await self._ensure_table()
        pool = await self._get_pool()
        async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
            # Stream large agent-state JSON through TEMPORARY CLOB
            # locators. The naive ``cursor.setinputsizes(data=CLOB)``
            # path inlines the payload in the TTC packet and corrupts
            # the protocol stream (ORA-03146 "invalid buffer length
            # for TTC field") once the JSON crosses a packet
            # boundary — easy to hit once an agent accumulates a few
            # turns + tool history + reflexion's confidence_history.
            # ``conn.createlob`` + ``await lob.write`` round-trips
            # the data via LOB locator instead, sidestepping the
            # inline buffer entirely.
            import oracledb as _oracledb

            data_lob = await conn.createlob(_oracledb.DB_TYPE_CLOB)
            await data_lob.write(binds["data"])
            meta_lob = await conn.createlob(_oracledb.DB_TYPE_CLOB)
            await meta_lob.write(binds["metadata"])
            await cursor.execute(
                merge_sql,
                {
                    "thread_id": binds["thread_id"],
                    "checkpoint_id": binds["checkpoint_id"],
                    "data": data_lob,
                    "metadata": meta_lob,
                },
            )
            await conn.commit()

    # Body-time retry: ADB intermittently kills pooled connections
    # mid-operation (ORA-03138 security policy, DPY-4011 idle close,
    # ORA-03146 TTC protocol desync). Drop the pool + retry up to
    # two times before surfacing the error — the first retry
    # rebuilds the pool, the second handles a fresh pool that
    # inherited a poisoned connection from the same broken loop.
    from locus._oracle_pool_cache import is_reconnectable

    max_attempts = 3
    for attempt in range(max_attempts):
        try:
            await _do_save()
            break
        except Exception as exc:
            if not is_reconnectable(exc) or attempt == max_attempts - 1:
                raise
            # Try to drain the dead pool politely; ignore errors.
            old_pool = self._pool
            self._pool = None
            self._initialized = False
            if old_pool is not None:
                try:
                    await old_pool.close(force=True)
                except Exception:  # noqa: BLE001
                    pass

    return checkpoint_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> dict | None

Load a saved payload from Oracle Database.

Returns the raw dict payload as written by :meth:save. The :class:StorageBackendAdapter wrapper rehydrates this into an :class:AgentState for the agent runtime; callers that hold a bare OracleBackend get the dict directly.

checkpoint_id is accepted for signature parity but ignored — this backend stores one row per thread_id (MERGE upsert), so latest-state is the only retrievable checkpoint.

Source code in src/locus/memory/backends/oracle.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict | None:
    """Load a saved payload from Oracle Database.

    Returns the raw dict payload as written by :meth:`save`. The
    :class:`StorageBackendAdapter` wrapper rehydrates this into an
    :class:`AgentState` for the agent runtime; callers that hold a
    bare ``OracleBackend`` get the dict directly.

    ``checkpoint_id`` is accepted for signature parity but ignored —
    this backend stores one row per ``thread_id`` (MERGE upsert), so
    latest-state is the only retrievable checkpoint.
    """
    del checkpoint_id  # one-row-per-thread; arg present for parity

    async def _do_load() -> Any:
        await self._ensure_table()
        pool = await self._get_pool()
        async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
            await cursor.execute(
                f"SELECT data FROM {self._full_table_name} WHERE thread_id = :thread_id",
                {"thread_id": thread_id},
            )
            return await cursor.fetchone()

    from locus._oracle_pool_cache import is_reconnectable

    row = None
    max_attempts = 3
    for attempt in range(max_attempts):
        try:
            row = await _do_load()
            break
        except Exception as exc:
            if not is_reconnectable(exc) or attempt == max_attempts - 1:
                raise
            old_pool = self._pool
            self._pool = None
            self._initialized = False
            if old_pool is not None:
                try:
                    await old_pool.close(force=True)
                except Exception:  # noqa: BLE001
                    pass

    if row is None:
        return None

    # Handle CLOB - read if needed
    data = row[0]
    if hasattr(data, "read"):
        data = data.read()

    # oracledb might already return JSON as dict
    if isinstance(data, dict):
        return data
    loaded: dict = json.loads(data)
    return loaded

delete async

delete(thread_id: str) -> bool

Delete checkpoint from Oracle Database.

Source code in src/locus/memory/backends/oracle.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from Oracle Database."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"DELETE FROM {self._full_table_name} WHERE thread_id = :thread_id",
            {"thread_id": thread_id},
        )
        deleted: bool = cursor.rowcount > 0
        await conn.commit()

    return deleted

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/oracle.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"SELECT 1 FROM {self._full_table_name} WHERE thread_id = :thread_id",
            {"thread_id": thread_id},
        )
        row = await cursor.fetchone()

    return row is not None

list_threads async

list_threads(limit: int = 100, offset: int = 0, pattern: str = '%') -> list[str]

List all thread IDs matching pattern.

Source code in src/locus/memory/backends/oracle.py
async def list_threads(
    self,
    limit: int = 100,
    offset: int = 0,
    pattern: str = "%",
) -> list[str]:
    """List all thread IDs matching pattern."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"""
                SELECT thread_id FROM {self._full_table_name}
                WHERE thread_id LIKE :pattern
                ORDER BY updated_at DESC
                OFFSET :offset ROWS FETCH NEXT :limit ROWS ONLY
                """,
            {"pattern": pattern, "limit": limit, "offset": offset},
        )
        rows = await cursor.fetchall()

    return [row[0] for row in rows]

get_metadata async

get_metadata(thread_id: str) -> dict[str, Any] | None

Get checkpoint metadata.

Source code in src/locus/memory/backends/oracle.py
async def get_metadata(self, thread_id: str) -> dict[str, Any] | None:
    """Get checkpoint metadata."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"""
                SELECT checkpoint_id, created_at, updated_at, metadata
                FROM {self._full_table_name}
                WHERE thread_id = :thread_id
                """,
            {"thread_id": thread_id},
        )
        row = await cursor.fetchone()

    if row is None:
        return None

    metadata = row[3]
    if hasattr(metadata, "read"):
        metadata = metadata.read()
    # oracledb might already return JSON as dict
    if isinstance(metadata, str):
        metadata = json.loads(metadata) if metadata else {}
    elif metadata is None:
        metadata = {}

    return {
        "checkpoint_id": row[0],
        "created_at": row[1].isoformat() if row[1] else None,
        "updated_at": row[2].isoformat() if row[2] else None,
        "metadata": metadata,
    }

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Uses Oracle JSON path expressions.

Source code in src/locus/memory/backends/oracle.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Uses Oracle JSON path expressions.
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        # Validate key to prevent JSON path injection
        if not key.isidentifier():
            raise ValueError(f"Invalid metadata key: {key!r}")
        # Use JSON_VALUE for querying
        await cursor.execute(
            f"""
                SELECT thread_id, data, updated_at
                FROM {self._full_table_name}
                WHERE JSON_VALUE(metadata, '$.{key}') = :value
                ORDER BY updated_at DESC
                FETCH FIRST :limit ROWS ONLY
                """,
            {"value": str(value), "limit": limit},
        )
        rows = await cursor.fetchall()

    results = []
    for row in rows:
        data = row[1]
        if hasattr(data, "read"):
            data = data.read()
        # oracledb might already return JSON as dict
        if isinstance(data, str):
            data = json.loads(data)
        results.append(
            {
                "thread_id": row[0],
                "data": data,
                "updated_at": row[2].isoformat() if row[2] else None,
            }
        )

    return results

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Search checkpoints by content.

Uses Oracle JSON_TEXTCONTAINS for full-text search within JSON.

Source code in src/locus/memory/backends/oracle.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Search checkpoints by content.

    Uses Oracle JSON_TEXTCONTAINS for full-text search within JSON.
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        # Simple LIKE search on JSON content
        # For production, use Oracle Text or JSON search index
        await cursor.execute(
            f"""
                SELECT thread_id, data, updated_at
                FROM {self._full_table_name}
                WHERE LOWER(data) LIKE LOWER(:query_pattern)
                ORDER BY updated_at DESC
                FETCH FIRST :limit ROWS ONLY
                """,
            {"query_pattern": f"%{query}%", "limit": limit},
        )
        rows = await cursor.fetchall()

    results = []
    for row in rows:
        data = row[1]
        if hasattr(data, "read"):
            data = data.read()
        # oracledb might already return JSON as dict
        if isinstance(data, str):
            data = json.loads(data)
        results.append(
            {
                "thread_id": row[0],
                "data": data,
                "updated_at": row[2].isoformat() if row[2] else None,
            }
        )

    return results

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted rows

Source code in src/locus/memory/backends/oracle.py
async def vacuum(self, older_than_days: int = 30) -> int:
    """
    Delete old checkpoints.

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted rows
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        # Use NUMTODSINTERVAL for dynamic interval
        await cursor.execute(
            f"""
                DELETE FROM {self._full_table_name}
                WHERE updated_at < SYSTIMESTAMP - NUMTODSINTERVAL(:days, 'DAY')
                """,
            {"days": older_than_days},
        )
        deleted_count: int = cursor.rowcount
        await conn.commit()

    return deleted_count

count async

count(pattern: str = '%') -> int

Count checkpoints matching pattern.

Source code in src/locus/memory/backends/oracle.py
async def count(self, pattern: str = "%") -> int:
    """Count checkpoints matching pattern."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"SELECT COUNT(*) FROM {self._full_table_name} WHERE thread_id LIKE :pattern",
            {"pattern": pattern},
        )
        row = await cursor.fetchone()

    return row[0] if row else 0

close async

close() -> None

Close the connection pool and reset provisioning state.

force=True is the right choice here — by the time we close the pool we are either tearing down at end of run_sync or recovering from a network-level error; waiting for in-flight operations to drain on a possibly-dead loop would just hang. Cleanup must never raise; an exception here would mask the primary error path callers actually care about.

Source code in src/locus/memory/backends/oracle.py
async def close(self) -> None:
    """Close the connection pool and reset provisioning state.

    ``force=True`` is the right choice here — by the time we close
    the pool we are either tearing down at end of ``run_sync`` or
    recovering from a network-level error; waiting for in-flight
    operations to drain on a possibly-dead loop would just hang.
    Cleanup must never raise; an exception here would mask the
    primary error path callers actually care about.
    """
    if self._pool is not None:
        try:
            await self._pool.close(force=True)
        except Exception:  # noqa: BLE001 — best-effort teardown
            pass
        self._pool = None
    # Reset so the next acquire re-verifies table existence on the
    # fresh pool / connection.
    self._initialized = False

OracleCheckpointSaver

OracleCheckpointSaver(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, table_name: str = 'locus', schema_name: str | None = None, min_pool_size: int = 1, max_pool_size: int = 5, auto_create_table: bool = True, **kwargs: Any)

Bases: BaseModel

Versioned Oracle checkpoint saver with pending-writes durability.

locus-native — does not inherit from langgraph.checkpoint.base.BaseCheckpointSaver. The method surface matches the LangGraph shape (put / get / list_checkpoints / put_writes / get_writes / delete_thread) so adapter layers can wire it into LangGraph runtimes without dragging a langchain dependency into locus.

Example with TNS alias::

>>> saver = OracleCheckpointSaver(
...     dsn="mydb_low",
...     user="admin",
...     password="secret",
...     wallet_location="/path/to/wallet",
...     table_name="locus",
... )
>>> await saver.put(
...     thread_id="t1",
...     checkpoint_id="c1",
...     checkpoint_data={"step": 0, "values": {"x": 1}},
... )
>>> latest = await saver.get(thread_id="t1")

Example with pending writes (intra-step durability)::

>>> await saver.put_writes(
...     thread_id="t1",
...     checkpoint_id="c1",
...     task_id="node-a",
...     writes=[("x", 2), ("y", 3)],
... )
Source code in src/locus/memory/backends/oracle_versioned.py
def __init__(
    self,
    dsn: str | None = None,
    user: str = "admin",
    password: str | SecretStr = "",
    wallet_location: str | None = None,
    wallet_password: str | SecretStr | None = None,
    host: str | None = None,
    port: int = 1521,
    service_name: str | None = None,
    table_name: str = "locus",
    schema_name: str | None = None,
    min_pool_size: int = 1,
    max_pool_size: int = 5,
    auto_create_table: bool = True,
    **kwargs: Any,
) -> None:
    # The two physical table names are derived from ``table_name``
    # so callers configure one base prefix. Validate the prefix here
    # (re-validate the derived suffixed name too) before letting
    # OracleConfig do its own validation.
    _validate_sql_identifier(table_name, "table_name")
    _validate_sql_identifier(f"{table_name}_checkpoints", "table_name")
    _validate_sql_identifier(f"{table_name}_writes", "table_name")
    if schema_name is not None:
        _validate_sql_identifier(schema_name, "schema_name")

    config = OracleConfig(
        dsn=dsn,
        user=user,
        password=SecretStr(password) if isinstance(password, str) else password,
        wallet_location=wallet_location,
        wallet_password=(
            SecretStr(wallet_password) if isinstance(wallet_password, str) else wallet_password
        ),
        host=host,
        port=port,
        service_name=service_name,
        # OracleConfig validates table_name itself — we pass the
        # checkpoint-table form so its identifier check covers the
        # actual DDL target.
        table_name=f"{table_name}_checkpoints",
        schema_name=schema_name,
        min_pool_size=min_pool_size,
        max_pool_size=max_pool_size,
        **kwargs,
    )
    super().__init__(config=config, auto_create_table=auto_create_table)
    # Stash the base prefix so derived names (_writes table) can
    # share it without re-deriving from the suffixed form.
    self.__dict__["_table_prefix"] = table_name

put async

put(*, thread_id: str, checkpoint_id: str, checkpoint_data: dict, checkpoint_ns: str = 'default', parent_checkpoint_id: str | None = None, metadata: dict | None = None) -> None

Persist one checkpoint row.

History is preserved: (thread_id, checkpoint_ns, checkpoint_id) is the primary key, so re-saving a different checkpoint_id for the same thread inserts a new row rather than overwriting.

Source code in src/locus/memory/backends/oracle_versioned.py
async def put(
    self,
    *,
    thread_id: str,
    checkpoint_id: str,
    checkpoint_data: dict,
    checkpoint_ns: str = "default",
    parent_checkpoint_id: str | None = None,
    metadata: dict | None = None,
) -> None:
    """Persist one checkpoint row.

    History is preserved: ``(thread_id, checkpoint_ns,
    checkpoint_id)`` is the primary key, so re-saving a different
    ``checkpoint_id`` for the same thread inserts a new row rather
    than overwriting.
    """
    await self._ensure_tables()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        self._pin_checkpoint_clobs(cursor)
        await cursor.execute(
            f"""
            INSERT INTO {self._checkpoints_table} (
                thread_id,
                checkpoint_ns,
                checkpoint_id,
                parent_checkpoint_id,
                checkpoint_data,
                metadata
            ) VALUES (
                :thread_id,
                :checkpoint_ns,
                :checkpoint_id,
                :parent_checkpoint_id,
                :checkpoint_data,
                :metadata
            )
            """,
            {
                "thread_id": thread_id,
                "checkpoint_ns": checkpoint_ns,
                "checkpoint_id": checkpoint_id,
                "parent_checkpoint_id": parent_checkpoint_id,
                "checkpoint_data": json.dumps(checkpoint_data, default=str),
                "metadata": json.dumps(metadata or {}, default=str),
            },
        )
        await conn.commit()

get async

get(*, thread_id: str, checkpoint_id: str | None = None, checkpoint_ns: str = 'default') -> dict | None

Fetch one checkpoint.

checkpoint_id=None returns the most recent row for the (thread_id, checkpoint_ns) pair, ordered by created_at DESC with FETCH FIRST 1 ROWS ONLY.

Returns None when no matching row exists; otherwise a dict with keys checkpoint_id, parent_checkpoint_id, checkpoint, metadata, created_at.

Source code in src/locus/memory/backends/oracle_versioned.py
async def get(
    self,
    *,
    thread_id: str,
    checkpoint_id: str | None = None,
    checkpoint_ns: str = "default",
) -> dict | None:
    """Fetch one checkpoint.

    ``checkpoint_id=None`` returns the most recent row for the
    ``(thread_id, checkpoint_ns)`` pair, ordered by ``created_at
    DESC`` with ``FETCH FIRST 1 ROWS ONLY``.

    Returns ``None`` when no matching row exists; otherwise a dict
    with keys ``checkpoint_id``, ``parent_checkpoint_id``,
    ``checkpoint``, ``metadata``, ``created_at``.
    """
    await self._ensure_tables()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        if checkpoint_id is None:
            # Latest row for the thread.
            await cursor.execute(
                f"""
                SELECT
                    checkpoint_id,
                    parent_checkpoint_id,
                    checkpoint_data,
                    metadata,
                    created_at
                FROM {self._checkpoints_table}
                WHERE thread_id = :thread_id
                  AND checkpoint_ns = :checkpoint_ns
                ORDER BY created_at DESC
                FETCH FIRST 1 ROWS ONLY
                """,
                {"thread_id": thread_id, "checkpoint_ns": checkpoint_ns},
            )
        else:
            await cursor.execute(
                f"""
                SELECT
                    checkpoint_id,
                    parent_checkpoint_id,
                    checkpoint_data,
                    metadata,
                    created_at
                FROM {self._checkpoints_table}
                WHERE thread_id = :thread_id
                  AND checkpoint_ns = :checkpoint_ns
                  AND checkpoint_id = :checkpoint_id
                """,
                {
                    "thread_id": thread_id,
                    "checkpoint_ns": checkpoint_ns,
                    "checkpoint_id": checkpoint_id,
                },
            )
        row = await cursor.fetchone()

    if row is None:
        return None

    return {
        "checkpoint_id": row[0],
        "parent_checkpoint_id": row[1],
        "checkpoint": await self._decode_clob(row[2]),
        "metadata": await self._decode_clob(row[3]) or {},
        "created_at": (row[4].isoformat() if isinstance(row[4], datetime) else row[4]),
    }

list_checkpoints async

list_checkpoints(*, thread_id: str, checkpoint_ns: str = 'default', limit: int = 10, before: str | None = None) -> list[dict]

Return checkpoints for a thread, newest first.

before is a checkpoint_id; when supplied, only rows strictly older than that row's created_at are returned — the typical "page back through history" pattern LangGraph's alist exposes.

Source code in src/locus/memory/backends/oracle_versioned.py
async def list_checkpoints(
    self,
    *,
    thread_id: str,
    checkpoint_ns: str = "default",
    limit: int = 10,
    before: str | None = None,
) -> list[dict]:
    """Return checkpoints for a thread, newest first.

    ``before`` is a ``checkpoint_id``; when supplied, only rows
    strictly older than that row's ``created_at`` are returned —
    the typical "page back through history" pattern LangGraph's
    ``alist`` exposes.
    """
    await self._ensure_tables()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        if before is None:
            await cursor.execute(
                f"""
                SELECT
                    checkpoint_id,
                    parent_checkpoint_id,
                    checkpoint_data,
                    metadata,
                    created_at
                FROM {self._checkpoints_table}
                WHERE thread_id = :thread_id
                  AND checkpoint_ns = :checkpoint_ns
                ORDER BY created_at DESC
                FETCH FIRST :lim ROWS ONLY
                """,
                {
                    "thread_id": thread_id,
                    "checkpoint_ns": checkpoint_ns,
                    "lim": limit,
                },
            )
        else:
            await cursor.execute(
                f"""
                SELECT
                    checkpoint_id,
                    parent_checkpoint_id,
                    checkpoint_data,
                    metadata,
                    created_at
                FROM {self._checkpoints_table}
                WHERE thread_id = :thread_id
                  AND checkpoint_ns = :checkpoint_ns
                  AND created_at < (
                      SELECT created_at FROM {self._checkpoints_table}
                      WHERE thread_id = :thread_id
                        AND checkpoint_ns = :checkpoint_ns
                        AND checkpoint_id = :before
                  )
                ORDER BY created_at DESC
                FETCH FIRST :lim ROWS ONLY
                """,
                {
                    "thread_id": thread_id,
                    "checkpoint_ns": checkpoint_ns,
                    "before": before,
                    "lim": limit,
                },
            )
        rows = await cursor.fetchall()

    results: list[dict] = []
    for row in rows:
        results.append(
            {
                "checkpoint_id": row[0],
                "parent_checkpoint_id": row[1],
                "checkpoint": await self._decode_clob(row[2]),
                "metadata": await self._decode_clob(row[3]) or {},
                "created_at": (row[4].isoformat() if isinstance(row[4], datetime) else row[4]),
            }
        )
    return results

put_writes async

put_writes(*, thread_id: str, checkpoint_id: str, task_id: str, writes: list[tuple[str, Any]], checkpoint_ns: str = 'default') -> None

Persist pending writes for one (checkpoint, task) pair.

Idempotent: existing writes for the same (thread_id, checkpoint_ns, checkpoint_id, task_id) triple are deleted first, then the new set is inserted with monotonic idx 0..N-1. Retries are safe.

Source code in src/locus/memory/backends/oracle_versioned.py
async def put_writes(
    self,
    *,
    thread_id: str,
    checkpoint_id: str,
    task_id: str,
    writes: list[tuple[str, Any]],
    checkpoint_ns: str = "default",
) -> None:
    """Persist pending writes for one ``(checkpoint, task)`` pair.

    Idempotent: existing writes for the same
    ``(thread_id, checkpoint_ns, checkpoint_id, task_id)`` triple
    are deleted first, then the new set is inserted with monotonic
    ``idx`` 0..N-1. Retries are safe.
    """
    await self._ensure_tables()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        # Delete any prior writes for this task. ``put_writes`` is a
        # replace, not an append.
        await cursor.execute(
            f"""
            DELETE FROM {self._writes_table}
            WHERE thread_id = :thread_id
              AND checkpoint_ns = :checkpoint_ns
              AND checkpoint_id = :checkpoint_id
              AND task_id = :task_id
            """,
            {
                "thread_id": thread_id,
                "checkpoint_ns": checkpoint_ns,
                "checkpoint_id": checkpoint_id,
                "task_id": task_id,
            },
        )

        for idx, (channel, value) in enumerate(writes):
            self._pin_value_clob(cursor)
            await cursor.execute(
                f"""
                INSERT INTO {self._writes_table} (
                    thread_id,
                    checkpoint_ns,
                    checkpoint_id,
                    task_id,
                    idx,
                    channel,
                    value
                ) VALUES (
                    :thread_id,
                    :checkpoint_ns,
                    :checkpoint_id,
                    :task_id,
                    :idx,
                    :channel,
                    :value
                )
                """,
                {
                    "thread_id": thread_id,
                    "checkpoint_ns": checkpoint_ns,
                    "checkpoint_id": checkpoint_id,
                    "task_id": task_id,
                    "idx": idx,
                    "channel": channel,
                    "value": json.dumps(value, default=str),
                },
            )

        await conn.commit()

get_writes async

get_writes(*, thread_id: str, checkpoint_id: str, checkpoint_ns: str = 'default', task_id: str | None = None) -> list[dict]

Fetch pending writes, ordered by (task_id, idx).

task_id=None returns writes for every task at the checkpoint; otherwise only the named task's rows.

Source code in src/locus/memory/backends/oracle_versioned.py
async def get_writes(
    self,
    *,
    thread_id: str,
    checkpoint_id: str,
    checkpoint_ns: str = "default",
    task_id: str | None = None,
) -> list[dict]:
    """Fetch pending writes, ordered by ``(task_id, idx)``.

    ``task_id=None`` returns writes for every task at the
    checkpoint; otherwise only the named task's rows.
    """
    await self._ensure_tables()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        if task_id is None:
            await cursor.execute(
                f"""
                SELECT task_id, idx, channel, value
                FROM {self._writes_table}
                WHERE thread_id = :thread_id
                  AND checkpoint_ns = :checkpoint_ns
                  AND checkpoint_id = :checkpoint_id
                ORDER BY task_id, idx
                """,
                {
                    "thread_id": thread_id,
                    "checkpoint_ns": checkpoint_ns,
                    "checkpoint_id": checkpoint_id,
                },
            )
        else:
            await cursor.execute(
                f"""
                SELECT task_id, idx, channel, value
                FROM {self._writes_table}
                WHERE thread_id = :thread_id
                  AND checkpoint_ns = :checkpoint_ns
                  AND checkpoint_id = :checkpoint_id
                  AND task_id = :task_id
                ORDER BY idx
                """,
                {
                    "thread_id": thread_id,
                    "checkpoint_ns": checkpoint_ns,
                    "checkpoint_id": checkpoint_id,
                    "task_id": task_id,
                },
            )
        rows = await cursor.fetchall()

    results: list[dict] = []
    for row in rows:
        results.append(
            {
                "task_id": row[0],
                "idx": row[1],
                "channel": row[2],
                "value": await self._decode_clob(row[3]),
            }
        )
    return results

delete_thread async

delete_thread(thread_id: str) -> None

Cascade-delete every checkpoint + pending write for a thread.

Source code in src/locus/memory/backends/oracle_versioned.py
async def delete_thread(self, thread_id: str) -> None:
    """Cascade-delete every checkpoint + pending write for a thread."""
    await self._ensure_tables()
    pool = await self._get_pool()

    async with safe_acquire(self, pool) as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"DELETE FROM {self._writes_table} WHERE thread_id = :thread_id",
            {"thread_id": thread_id},
        )
        await cursor.execute(
            f"DELETE FROM {self._checkpoints_table} WHERE thread_id = :thread_id",
            {"thread_id": thread_id},
        )
        await conn.commit()

close async

close() -> None

Close the connection pool.

Source code in src/locus/memory/backends/oracle_versioned.py
async def close(self) -> None:
    """Close the connection pool."""
    if self._pool is not None:
        await self._pool.close()
        self._pool = None

OracleSyncBackend

OracleSyncBackend(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, **kwargs: Any)

Sync companion to :class:OracleBackend.

Constructor accepts the same arguments as :class:OracleBackend; every public async method is mirrored as a blocking method with the same name (no _sync suffix) so porting code between the two surfaces is a single import change.

Source code in src/locus/memory/backends/oracle_sync.py
def __init__(
    self,
    dsn: str | None = None,
    user: str = "admin",
    password: str | SecretStr = "",
    wallet_location: str | None = None,
    wallet_password: str | SecretStr | None = None,
    host: str | None = None,
    port: int = 1521,
    service_name: str | None = None,
    **kwargs: Any,
) -> None:
    self._async = OracleBackend(
        dsn=dsn,
        user=user,
        password=password,
        wallet_location=wallet_location,
        wallet_password=wallet_password,
        host=host,
        port=port,
        service_name=service_name,
        **kwargs,
    )

OracleSyncCheckpointSaver

OracleSyncCheckpointSaver(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, table_name: str = 'locus', schema_name: str | None = None, min_pool_size: int = 1, max_pool_size: int = 5, auto_create_table: bool = True, **kwargs: Any)

Sync companion to :class:OracleCheckpointSaver.

Versioned, history-preserving checkpoint saver with pending-writes durability — synchronous API. Method names match the async counterpart exactly (put / get / list_checkpoints / put_writes / get_writes / delete_thread / close).

Source code in src/locus/memory/backends/oracle_sync.py
def __init__(
    self,
    dsn: str | None = None,
    user: str = "admin",
    password: str | SecretStr = "",
    wallet_location: str | None = None,
    wallet_password: str | SecretStr | None = None,
    host: str | None = None,
    port: int = 1521,
    service_name: str | None = None,
    table_name: str = "locus",
    schema_name: str | None = None,
    min_pool_size: int = 1,
    max_pool_size: int = 5,
    auto_create_table: bool = True,
    **kwargs: Any,
) -> None:
    self._async = OracleCheckpointSaver(
        dsn=dsn,
        user=user,
        password=password,
        wallet_location=wallet_location,
        wallet_password=wallet_password,
        host=host,
        port=port,
        service_name=service_name,
        table_name=table_name,
        schema_name=schema_name,
        min_pool_size=min_pool_size,
        max_pool_size=max_pool_size,
        auto_create_table=auto_create_table,
        **kwargs,
    )

OCI Object Storage

OCIBucketBackend

OCIBucketBackend(bucket_name: str, namespace: str, prefix: str = 'locus/checkpoints/', profile_name: str = 'DEFAULT', auth_type: str = 'api_key', region: str | None = None, retry_strategy: Any = None, **kwargs: Any)

Bases: BaseCheckpointer

OCI Object Storage-backed checkpointer.

Durable, per-checkpoint storage with lifecycle-policy support. Pass the instance directly to :class:~locus.agent.Agent — no adapter needed.

Example::

checkpointer = OCIBucketBackend(
    bucket_name="my-checkpoints",
    namespace="yzhbfkqxqsx9",
    profile_name="API_KEY_AUTH",
)
agent = Agent(config=cfg, checkpointer=checkpointer)

With an OCI compute instance principal::

checkpointer = OCIBucketBackend(
    bucket_name="my-checkpoints",
    namespace="yzhbfkqxqsx9",
    auth_type="instance_principal",
)
Capabilities
  • list_threads — yes (via object prefix delimiter listing)
  • list_with_metadata — yes
  • metadata_query — yes (via get_metadata)
  • branching — yes (via copy_thread)
  • vacuum — yes (prefer bucket lifecycle policies for prod)
  • persistent_checkpoint_ids — yes
Source code in src/locus/memory/backends/oci_bucket.py
def __init__(
    self,
    bucket_name: str,
    namespace: str,
    prefix: str = "locus/checkpoints/",
    profile_name: str = "DEFAULT",
    auth_type: str = "api_key",
    region: str | None = None,
    retry_strategy: Any = None,
    **kwargs: Any,
) -> None:
    self.config = OCIBucketConfig(
        bucket_name=bucket_name,
        namespace=namespace,
        prefix=prefix,
        profile_name=profile_name,
        auth_type=auth_type,
        region=region,
        **kwargs,
    )
    self._client: ObjectStorageClient | None = None
    self._initialized = False
    # Override the default retry strategy by passing one explicitly.
    # Default (None) resolves to ``oci.retry.DEFAULT_RETRY_STRATEGY`` at
    # first call — exponential backoff with jitter on 429 / 5xx /
    # transport errors. Pass ``oci.retry.NoneRetryStrategy()`` to
    # disable retries (e.g. for tests).
    self._retry_strategy: Any = retry_strategy

close async

close() -> None

Close any resources (connections, files, etc.).

Override in subclasses if cleanup is needed.

Source code in src/locus/memory/checkpointer.py
async def close(self) -> None:
    """
    Close any resources (connections, files, etc.).

    Override in subclasses if cleanup is needed.
    """

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete threads whose latest checkpoint is older than the cutoff.

For production, prefer an OCI Object Storage lifecycle rule — it runs server-side and costs nothing in client CPU.

Source code in src/locus/memory/backends/oci_bucket.py
async def vacuum(self, older_than_days: int = 30) -> int:
    """Delete threads whose latest checkpoint is older than the cutoff.

    For production, prefer an OCI Object Storage lifecycle rule — it runs
    server-side and costs nothing in client CPU.
    """
    await self._ensure_bucket()

    cutoff = datetime.now(UTC) - timedelta(days=older_than_days)
    threads = await self.list_threads(limit=1000)

    async def _maybe_delete(thread_id: str) -> bool:
        meta = await self.get_metadata(thread_id)
        if not meta:
            return False
        updated = meta.get("updated_at")
        if not updated:
            return False
        try:
            updated_dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
        except (ValueError, TypeError):
            return False
        if updated_dt >= cutoff:
            return False
        await self.delete(thread_id)
        return True

    results = await asyncio.gather(*(_maybe_delete(t) for t in threads))
    return sum(1 for r in results if r)

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy every checkpoint under source to dest (for branching).

Source code in src/locus/memory/backends/oci_bucket.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """Copy every checkpoint under source to dest (for branching)."""
    await self._ensure_bucket()

    checkpoints = await self.list_checkpoints(source_thread_id, limit=1000)
    if not checkpoints:
        return False

    for cp_id in checkpoints:
        payload, meta = await asyncio.gather(
            self._get_json(self._checkpoint_key(source_thread_id, cp_id)),
            self._get_json(self._meta_key(source_thread_id, cp_id)),
        )
        if payload is None:
            continue
        payload["thread_id"] = dest_thread_id
        if meta is not None:
            meta["thread_id"] = dest_thread_id
            await asyncio.gather(
                self._put_json(self._checkpoint_key(dest_thread_id, cp_id), payload),
                self._put_json(self._meta_key(dest_thread_id, cp_id), meta),
            )
        else:
            await self._put_json(self._checkpoint_key(dest_thread_id, cp_id), payload)

    # Point dest's latest at the most-recent source checkpoint.
    await self._put_bytes(
        self._latest_key(dest_thread_id),
        checkpoints[0].encode("utf-8"),
        "text/plain",
    )
    return True

Other backends

When you can't run on Oracle / OCI, the same BaseCheckpointer contract is implemented for Redis, PostgreSQL, OpenSearch, file system, and an HTTP-API adapter.

RedisBackend

RedisBackend(url: str = 'redis://localhost:6379', **kwargs: Any)

Bases: BaseModel

Redis checkpoint backend.

Fast key-value storage with optional TTL for checkpoints.

Example

backend = RedisBackend(url="redis://localhost:6379") await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")

Source code in src/locus/memory/backends/redis.py
def __init__(self, url: str = "redis://localhost:6379", **kwargs: Any) -> None:
    config = RedisConfig(url=url, **kwargs)
    super().__init__(config=config)

save async

save(thread_id: str, data: dict[str, Any]) -> None

Save checkpoint to Redis.

Source code in src/locus/memory/backends/redis.py
async def save(self, thread_id: str, data: dict[str, Any]) -> None:
    """Save checkpoint to Redis."""
    client = await self._get_client()
    key = self._key(thread_id)
    value = json.dumps(data)

    if self.config.ttl_seconds:
        await client.setex(key, self.config.ttl_seconds, value)
    else:
        await client.set(key, value)

load async

load(thread_id: str) -> dict[str, Any] | None

Load checkpoint from Redis.

Source code in src/locus/memory/backends/redis.py
async def load(self, thread_id: str) -> dict[str, Any] | None:
    """Load checkpoint from Redis."""
    client = await self._get_client()
    key = self._key(thread_id)
    value = await client.get(key)

    if value is None:
        return None

    data: dict[str, Any] = json.loads(value)
    return data

delete async

delete(thread_id: str) -> bool

Delete checkpoint from Redis.

Source code in src/locus/memory/backends/redis.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from Redis."""
    client = await self._get_client()
    key = self._key(thread_id)
    result: int = await client.delete(key)
    return result > 0

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/redis.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    client = await self._get_client()
    key = self._key(thread_id)
    existing: int = await client.exists(key)
    return existing > 0

list_threads async

list_threads(limit: int = 100, offset: int = 0, pattern: str = '*') -> list[str]

List all thread IDs matching pattern.

Source code in src/locus/memory/backends/redis.py
async def list_threads(
    self,
    limit: int = 100,
    offset: int = 0,
    pattern: str = "*",
) -> list[str]:
    """List all thread IDs matching pattern."""
    client = await self._get_client()
    full_pattern = f"{self.config.prefix}{pattern}"
    keys = await client.keys(full_pattern)
    prefix_len = len(self.config.prefix)
    threads = [k[prefix_len:] for k in keys]
    # Apply offset and limit
    return threads[offset : offset + limit]

close async

close() -> None

Close Redis connection.

Source code in src/locus/memory/backends/redis.py
async def close(self) -> None:
    """Close Redis connection."""
    if self._client:
        await self._client.close()
        self._client = None

PostgreSQLBackend

PostgreSQLBackend(host: str = 'localhost', port: int = 5432, database: str = 'locus', user: str = 'postgres', password: str | SecretStr = '', dsn: str | None = None, **kwargs: Any)

Bases: BaseModel

PostgreSQL checkpoint backend.

Production-grade persistent storage with ACID guarantees.

Features: - Connection pooling - Transaction support - JSON/JSONB storage - Indexing for fast lookups - Concurrent access safe

Example

backend = PostgreSQLBackend( ... host="localhost", ... database="myapp", ... user="postgres", ... password="secret", ... ) await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")

With DSN

backend = PostgreSQLBackend(dsn="postgresql://user:pass@localhost:5432/mydb")

Source code in src/locus/memory/backends/postgresql.py
def __init__(
    self,
    host: str = "localhost",
    port: int = 5432,
    database: str = "locus",
    user: str = "postgres",
    password: str | SecretStr = "",
    dsn: str | None = None,
    **kwargs: Any,
) -> None:
    config = PostgreSQLConfig(
        host=host,
        port=port,
        database=database,
        user=user,
        password=SecretStr(password) if isinstance(password, str) else password,
        dsn=dsn,
        **kwargs,
    )
    super().__init__(config=config)

save async

save(thread_id: str, data: dict[str, Any], checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save checkpoint to PostgreSQL.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
data dict[str, Any]

Checkpoint data

required
checkpoint_id str | None

Optional checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for querying

None

Returns:

Type Description
str

Checkpoint ID

Source code in src/locus/memory/backends/postgresql.py
async def save(
    self,
    thread_id: str,
    data: dict[str, Any],
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save checkpoint to PostgreSQL.

    Args:
        thread_id: Thread identifier
        data: Checkpoint data
        checkpoint_id: Optional checkpoint ID
        metadata: Optional metadata for querying

    Returns:
        Checkpoint ID
    """
    await self._ensure_table()
    pool = await self._get_pool()

    from uuid import uuid4

    checkpoint_id = checkpoint_id or uuid4().hex
    now = datetime.now(UTC)

    async with pool.acquire() as conn:
        await conn.execute(
            f"""
            INSERT INTO {self._full_table_name}
                (thread_id, checkpoint_id, data, created_at, updated_at, metadata)
            VALUES ($1, $2, $3::jsonb, $4, $5, $6::jsonb)
            ON CONFLICT (thread_id) DO UPDATE SET
                checkpoint_id = EXCLUDED.checkpoint_id,
                data = EXCLUDED.data,
                updated_at = EXCLUDED.updated_at,
                metadata = EXCLUDED.metadata
            """,
            thread_id,
            checkpoint_id,
            json.dumps(data),
            now,
            now,
            json.dumps(metadata or {}),
        )

    return checkpoint_id

load async

load(thread_id: str) -> dict[str, Any] | None

Load checkpoint from PostgreSQL.

Source code in src/locus/memory/backends/postgresql.py
async def load(self, thread_id: str) -> dict[str, Any] | None:
    """Load checkpoint from PostgreSQL."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            f"SELECT data FROM {self._full_table_name} WHERE thread_id = $1",
            thread_id,
        )

    if row is None:
        return None

    data: dict[str, Any] = json.loads(row["data"])
    return data

delete async

delete(thread_id: str) -> bool

Delete checkpoint from PostgreSQL.

Source code in src/locus/memory/backends/postgresql.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from PostgreSQL."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        result: str = await conn.execute(
            f"DELETE FROM {self._full_table_name} WHERE thread_id = $1",
            thread_id,
        )

    return result == "DELETE 1"

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/postgresql.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            f"SELECT 1 FROM {self._full_table_name} WHERE thread_id = $1",
            thread_id,
        )

    return row is not None

list_threads async

list_threads(pattern: str = '%', limit: int = 100, offset: int = 0) -> list[str]

List all thread IDs matching pattern.

Source code in src/locus/memory/backends/postgresql.py
async def list_threads(
    self,
    pattern: str = "%",
    limit: int = 100,
    offset: int = 0,
) -> list[str]:
    """List all thread IDs matching pattern."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        rows = await conn.fetch(
            f"""
            SELECT thread_id FROM {self._full_table_name}
            WHERE thread_id LIKE $1
            ORDER BY updated_at DESC
            LIMIT $2 OFFSET $3
            """,
            pattern,
            limit,
            offset,
        )

    return [row["thread_id"] for row in rows]

get_metadata async

get_metadata(thread_id: str) -> dict[str, Any] | None

Get checkpoint metadata.

Source code in src/locus/memory/backends/postgresql.py
async def get_metadata(self, thread_id: str) -> dict[str, Any] | None:
    """Get checkpoint metadata."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            f"""
            SELECT checkpoint_id, created_at, updated_at, metadata
            FROM {self._full_table_name}
            WHERE thread_id = $1
            """,
            thread_id,
        )

    if row is None:
        return None

    return {
        "checkpoint_id": row["checkpoint_id"],
        "created_at": row["created_at"].isoformat(),
        "updated_at": row["updated_at"].isoformat(),
        "metadata": json.loads(row["metadata"]) if row["metadata"] else {},
    }

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Uses PostgreSQL JSONB operators for efficient querying.

Source code in src/locus/memory/backends/postgresql.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Uses PostgreSQL JSONB operators for efficient querying.
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        rows = await conn.fetch(
            f"""
            SELECT thread_id, data, updated_at
            FROM {self._full_table_name}
            WHERE metadata @> $1::jsonb
            ORDER BY updated_at DESC
            LIMIT $2
            """,
            json.dumps({key: value}),
            limit,
        )

    return [
        {
            "thread_id": row["thread_id"],
            "data": json.loads(row["data"]),
            "updated_at": row["updated_at"].isoformat(),
        }
        for row in rows
    ]

search_data async

search_data(path: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Search checkpoints by data field using JSON path.

Parameters:

Name Type Description Default
path str

JSON path (e.g., "messages", "confidence")

required
value Any

Value to match

required
limit int

Maximum results

100
Example

results = await backend.search_data("agent_id", "agent-123")

Source code in src/locus/memory/backends/postgresql.py
async def search_data(
    self,
    path: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Search checkpoints by data field using JSON path.

    Args:
        path: JSON path (e.g., "messages", "confidence")
        value: Value to match
        limit: Maximum results

    Example:
        >>> results = await backend.search_data("agent_id", "agent-123")
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        rows = await conn.fetch(
            f"""
            SELECT thread_id, data, updated_at
            FROM {self._full_table_name}
            WHERE data @> $1::jsonb
            ORDER BY updated_at DESC
            LIMIT $2
            """,
            json.dumps({path: value}),
            limit,
        )

    return [
        {
            "thread_id": row["thread_id"],
            "data": json.loads(row["data"]),
            "updated_at": row["updated_at"].isoformat(),
        }
        for row in rows
    ]

count async

count(pattern: str = '%') -> int

Count checkpoints matching pattern.

Source code in src/locus/memory/backends/postgresql.py
async def count(self, pattern: str = "%") -> int:
    """Count checkpoints matching pattern."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            f"SELECT COUNT(*) as cnt FROM {self._full_table_name} WHERE thread_id LIKE $1",
            pattern,
        )

    return row["cnt"] if row else 0

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted rows

Source code in src/locus/memory/backends/postgresql.py
async def vacuum(self, older_than_days: int = 30) -> int:
    """
    Delete old checkpoints.

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted rows
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        result = await conn.execute(
            f"""
            DELETE FROM {self._full_table_name}
            WHERE updated_at < NOW() - make_interval(days => $1)
            """,
            older_than_days,
        )

    # Parse "DELETE N"
    try:
        return int(result.split()[1])
    except (IndexError, ValueError):
        return 0

close async

close() -> None

Close connection pool.

Source code in src/locus/memory/backends/postgresql.py
async def close(self) -> None:
    """Close connection pool."""
    if self._pool:
        await self._pool.close()
        self._pool = None

OpenSearchBackend

OpenSearchBackend(hosts: list[str] | None = None, index_name: str = 'locus-checkpoints', username: str | None = None, password: str | None = None, **kwargs: Any)

Bases: BaseModel

OpenSearch checkpoint backend.

Scalable document storage with full-text search capabilities.

Example

backend = OpenSearchBackend(hosts=["localhost:9200"]) await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1") results = await backend.search("user query")

Source code in src/locus/memory/backends/opensearch.py
def __init__(
    self,
    hosts: list[str] | None = None,
    index_name: str = "locus-checkpoints",
    username: str | None = None,
    password: str | None = None,
    **kwargs: Any,
) -> None:
    config = OpenSearchConfig(
        hosts=hosts or ["localhost:9200"],
        index_name=index_name,
        username=username,
        password=password,
        **kwargs,
    )
    super().__init__(config=config)

save async

save(thread_id: str, data: dict[str, Any], metadata: dict[str, Any] | None = None) -> None

Save checkpoint to OpenSearch.

Source code in src/locus/memory/backends/opensearch.py
async def save(
    self,
    thread_id: str,
    data: dict[str, Any],
    metadata: dict[str, Any] | None = None,
) -> None:
    """Save checkpoint to OpenSearch."""
    await self._ensure_index()
    client = await self._get_client()

    now = datetime.now(UTC).isoformat()

    doc = {
        "thread_id": thread_id,
        "data": data,
        "data_json": json.dumps(data),  # For text search
        "updated_at": now,
        "metadata": metadata or {},
    }

    # Check if exists for created_at
    try:
        existing = await client.get(
            index=self.config.index_name,
            id=thread_id,
        )
        doc["created_at"] = existing["_source"].get("created_at", now)
    except Exception:  # noqa: BLE001 — first-write path; any lookup failure == "no prior"
        doc["created_at"] = now

    await client.index(
        index=self.config.index_name,
        id=thread_id,
        body=doc,
        refresh=True,
    )

load async

load(thread_id: str) -> dict[str, Any] | None

Load checkpoint from OpenSearch.

Source code in src/locus/memory/backends/opensearch.py
async def load(self, thread_id: str) -> dict[str, Any] | None:
    """Load checkpoint from OpenSearch."""
    await self._ensure_index()
    client = await self._get_client()

    try:
        result = await client.get(
            index=self.config.index_name,
            id=thread_id,
        )
        data: dict[str, Any] = result["_source"]["data"]
        return data
    except Exception:  # noqa: BLE001 — missing document == None by design
        return None

delete async

delete(thread_id: str) -> bool

Delete checkpoint from OpenSearch.

Source code in src/locus/memory/backends/opensearch.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from OpenSearch."""
    await self._ensure_index()
    client = await self._get_client()

    try:
        await client.delete(
            index=self.config.index_name,
            id=thread_id,
            refresh=True,
        )
        return True
    except Exception:  # noqa: BLE001 — delete is idempotent; report boolean result
        return False

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/opensearch.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    await self._ensure_index()
    client = await self._get_client()

    present: bool = await client.exists(
        index=self.config.index_name,
        id=thread_id,
    )
    return present

list_threads async

list_threads(limit: int = 100, offset: int = 0) -> list[str]

List all thread IDs.

Source code in src/locus/memory/backends/opensearch.py
async def list_threads(
    self,
    limit: int = 100,
    offset: int = 0,
) -> list[str]:
    """List all thread IDs."""
    await self._ensure_index()
    client = await self._get_client()

    result = await client.search(
        index=self.config.index_name,
        body={
            "query": {"match_all": {}},
            "size": limit,
            "from": offset,
            "_source": ["thread_id"],
            "sort": [{"updated_at": "desc"}],
        },
    )

    return [hit["_source"]["thread_id"] for hit in result["hits"]["hits"]]

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Search checkpoints by content.

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Source code in src/locus/memory/backends/opensearch.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Search checkpoints by content.

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores
    """
    await self._ensure_index()
    client = await self._get_client()

    result = await client.search(
        index=self.config.index_name,
        body={
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["data_json", "thread_id"],
                }
            },
            "size": limit,
            "_source": ["thread_id", "data", "updated_at"],
        },
    )

    return [
        {
            "thread_id": hit["_source"]["thread_id"],
            "data": hit["_source"]["data"],
            "score": hit["_score"],
            "updated_at": hit["_source"]["updated_at"],
        }
        for hit in result["hits"]["hits"]
    ]

get_by_metadata async

get_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Get checkpoints by metadata field.

Source code in src/locus/memory/backends/opensearch.py
async def get_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """Get checkpoints by metadata field."""
    await self._ensure_index()
    client = await self._get_client()

    result = await client.search(
        index=self.config.index_name,
        body={
            "query": {"term": {f"metadata.{key}": value}},
            "size": limit,
        },
    )

    return [
        {
            "thread_id": hit["_source"]["thread_id"],
            "data": hit["_source"]["data"],
        }
        for hit in result["hits"]["hits"]
    ]

close async

close() -> None

Close OpenSearch connection.

Source code in src/locus/memory/backends/opensearch.py
async def close(self) -> None:
    """Close OpenSearch connection."""
    if self._client:
        await self._client.close()
        self._client = None

FileCheckpointer

FileCheckpointer(base_dir: str | Path = '.locus_checkpoints', pretty: bool = True)

Bases: BaseCheckpointer

File-based checkpointer for persistent local storage.

Stores each checkpoint as a JSON file, organized by thread ID. Provides durable storage that survives process restarts.

Parameters:

Name Type Description Default
base_dir str | Path

Base directory for checkpoint storage. Defaults to ".locus_checkpoints" in current directory.

'.locus_checkpoints'
pretty bool

Whether to format JSON for readability (default True)

True
Example
checkpointer = FileCheckpointer("./checkpoints")

# Save state
checkpoint_id = await checkpointer.save(state, "thread-1")

# Load state
restored = await checkpointer.load("thread-1")

# Files are stored at: ./checkpoints/thread-1/{checkpoint_id}.json
Source code in src/locus/memory/backends/file.py
def __init__(
    self,
    base_dir: str | Path = ".locus_checkpoints",
    pretty: bool = True,
):
    self.base_dir = Path(base_dir)
    self.pretty = pretty
    self._lock = asyncio.Lock()

capabilities property

capabilities: CheckpointerCapabilities

Return the capabilities of this checkpointer.

Override in subclasses to advertise supported features.

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if a checkpoint exists.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to check. If None, checks if any checkpoint exists for the thread.

None

Returns:

Type Description
bool

True if the checkpoint exists

Source code in src/locus/memory/checkpointer.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Check if a checkpoint exists.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to check. If None,
                      checks if any checkpoint exists for the thread.

    Returns:
        True if the checkpoint exists
    """
    if checkpoint_id is None:
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        return len(checkpoints) > 0
    state = await self.load(thread_id, checkpoint_id)
    return state is not None

close async

close() -> None

Close any resources (connections, files, etc.).

Override in subclasses if cleanup is needed.

Source code in src/locus/memory/checkpointer.py
async def close(self) -> None:
    """
    Close any resources (connections, files, etc.).

    Override in subclasses if cleanup is needed.
    """

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint (latest if None)

None

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found

Source code in src/locus/memory/checkpointer.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """
    Get checkpoint metadata.

    Requires: capabilities.metadata_query = True

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint (latest if None)

    Returns:
        Metadata dict or None if not found
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("get_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Requires: capabilities.vacuum = True

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted checkpoints

Source code in src/locus/memory/checkpointer.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """
    Delete old checkpoints.

    Requires: capabilities.vacuum = True

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted checkpoints
    """
    self._require_capability("vacuum")
    raise NotImplementedError("vacuum not implemented")

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy a thread to create a branch.

Requires: capabilities.branching = True

Parameters:

Name Type Description Default
source_thread_id str

Source thread to copy from

required
dest_thread_id str

Destination thread ID

required

Returns:

Type Description
bool

True if successful

Source code in src/locus/memory/checkpointer.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """
    Copy a thread to create a branch.

    Requires: capabilities.branching = True

    Args:
        source_thread_id: Source thread to copy from
        dest_thread_id: Destination thread ID

    Returns:
        True if successful
    """
    self._require_capability("branching")
    raise NotImplementedError("copy_thread not implemented")

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

List all thread IDs.

Requires: capabilities.list_threads = True

Parameters:

Name Type Description Default
limit int

Maximum threads to return

100
pattern str

Pattern to filter threads (backend-specific)

'*'

Returns:

Type Description
list[str]

List of thread IDs

Source code in src/locus/memory/checkpointer.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """
    List all thread IDs.

    Requires: capabilities.list_threads = True

    Args:
        limit: Maximum threads to return
        pattern: Pattern to filter threads (backend-specific)

    Returns:
        List of thread IDs
    """
    self._require_capability("list_threads")
    raise NotImplementedError("list_threads not implemented")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

List checkpoints with their metadata.

Requires: capabilities.list_with_metadata = True

Parameters:

Name Type Description Default
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of {thread_id, checkpoint_id, metadata, ...} dicts

Source code in src/locus/memory/checkpointer.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    List checkpoints with their metadata.

    Requires: capabilities.list_with_metadata = True

    Args:
        limit: Maximum results

    Returns:
        List of {thread_id, checkpoint_id, metadata, ...} dicts
    """
    self._require_capability("list_with_metadata")
    raise NotImplementedError("list_with_metadata not implemented")

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state to a JSON file.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required
thread_id str

Thread identifier

required
checkpoint_id str | None

Optional specific checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for querying/filtering checkpoints

None

Returns:

Type Description
str

Checkpoint ID for the saved state

Source code in src/locus/memory/backends/file.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state to a JSON file.

    Args:
        state: Current agent state
        thread_id: Thread identifier
        checkpoint_id: Optional specific checkpoint ID
        metadata: Optional metadata for querying/filtering checkpoints

    Returns:
        Checkpoint ID for the saved state
    """
    checkpoint_id = checkpoint_id or uuid4().hex

    async with self._lock:
        thread_dir = self._get_thread_dir(thread_id)
        thread_dir.mkdir(parents=True, exist_ok=True)

        checkpoint_path = self._get_checkpoint_path(thread_id, checkpoint_id)

        # Prepare data with metadata
        data = {
            "checkpoint_id": checkpoint_id,
            "thread_id": thread_id,
            "created_at": datetime.now(UTC).isoformat(),
            "state": state.to_checkpoint(),
            "metadata": metadata or {},
        }

        # Write to file (run in executor to not block)
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self._write_json, checkpoint_path, data)

    return checkpoint_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state from a JSON file.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint ID (latest if None)

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/backends/file.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state from a JSON file.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint ID (latest if None)

    Returns:
        Restored AgentState or None if not found
    """
    from locus.core.state import AgentState

    thread_dir = self._get_thread_dir(thread_id)

    if not thread_dir.exists():
        return None

    if checkpoint_id is None:
        # Get latest checkpoint
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        if not checkpoints:
            return None
        checkpoint_id = checkpoints[0]

    checkpoint_path = self._get_checkpoint_path(thread_id, checkpoint_id)

    loop = asyncio.get_event_loop()
    data = await loop.run_in_executor(None, self._read_json, checkpoint_path)

    if data is None:
        return None

    return AgentState.from_checkpoint(data["state"])

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints for a thread.

Reads checkpoint files and returns IDs sorted by creation time (newest first).

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/backends/file.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints for a thread.

    Reads checkpoint files and returns IDs sorted by creation time
    (newest first).

    Args:
        thread_id: Thread identifier
        limit: Maximum number to return

    Returns:
        List of checkpoint IDs, newest first
    """
    thread_dir = self._get_thread_dir(thread_id)

    if not thread_dir.exists():
        return []

    # Get all checkpoint files with their metadata
    checkpoints: list[tuple[str, datetime]] = []

    loop = asyncio.get_event_loop()

    for path in thread_dir.glob("*.json"):
        data = await loop.run_in_executor(None, self._read_json, path)
        if data and "checkpoint_id" in data:
            created_at = datetime.fromisoformat(
                data.get("created_at", "1970-01-01T00:00:00+00:00")
            )
            checkpoints.append((data["checkpoint_id"], created_at))

    # Sort by creation time descending
    checkpoints.sort(key=lambda x: x[1], reverse=True)

    return [cp_id for cp_id, _ in checkpoints[:limit]]

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint file(s).

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to delete (all if None)

None

Returns:

Type Description
bool

True if deletion was successful

Source code in src/locus/memory/backends/file.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Delete checkpoint file(s).

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to delete (all if None)

    Returns:
        True if deletion was successful
    """
    import shutil

    thread_dir = self._get_thread_dir(thread_id)

    if not thread_dir.exists():
        return False

    async with self._lock:
        if checkpoint_id is None:
            # Delete entire thread directory
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(
                None, lambda: shutil.rmtree(thread_dir, ignore_errors=True)
            )
            return True
        checkpoint_path = self._get_checkpoint_path(thread_id, checkpoint_id)
        if checkpoint_path.exists():
            checkpoint_path.unlink()
            return True
        return False

get_storage_path

get_storage_path() -> Path

Get the base storage directory path.

Source code in src/locus/memory/backends/file.py
def get_storage_path(self) -> Path:
    """Get the base storage directory path."""
    return self.base_dir

get_disk_usage async

get_disk_usage(thread_id: str | None = None) -> int

Get total disk usage in bytes.

Parameters:

Name Type Description Default
thread_id str | None

Specific thread (all threads if None)

None

Returns:

Type Description
int

Total size in bytes

Source code in src/locus/memory/backends/file.py
async def get_disk_usage(self, thread_id: str | None = None) -> int:
    """
    Get total disk usage in bytes.

    Args:
        thread_id: Specific thread (all threads if None)

    Returns:
        Total size in bytes
    """
    if thread_id is not None:
        thread_dir = self._get_thread_dir(thread_id)
        if not thread_dir.exists():
            return 0
        return sum(f.stat().st_size for f in thread_dir.glob("*.json"))

    if not self.base_dir.exists():
        return 0

    total = 0
    for thread_dir in self.base_dir.iterdir():
        if thread_dir.is_dir():
            total += sum(f.stat().st_size for f in thread_dir.glob("*.json"))
    return total

HTTPCheckpointer

HTTPCheckpointer(base_url: str, headers: dict[str, str] | None = None, auth: tuple[str, str] | None = None, timeout: float = 30.0)

Bases: BaseCheckpointer

HTTP API-based checkpointer for remote storage.

Stores checkpoints via HTTP API calls, suitable for distributed systems or cloud-based storage backends.

The API is expected to implement the following endpoints: - POST /threads/{thread_id}/checkpoints - Create checkpoint - GET /threads/{thread_id}/checkpoints/{checkpoint_id} - Get checkpoint - GET /threads/{thread_id}/checkpoints - List checkpoints - DELETE /threads/{thread_id}/checkpoints/{checkpoint_id} - Delete checkpoint

Parameters:

Name Type Description Default
base_url str

Base URL of the checkpoint API

required
headers dict[str, str] | None

Additional headers to include in requests

None
auth tuple[str, str] | None

Authentication tuple (username, password) for basic auth

None
timeout float

Request timeout in seconds

30.0
Example
checkpointer = HTTPCheckpointer(
    base_url="https://api.example.com/v1",
    headers={"Authorization": "Bearer token"},
)

# Save state
checkpoint_id = await checkpointer.save(state, "thread-1")

# Load state
restored = await checkpointer.load("thread-1")
Source code in src/locus/memory/backends/http.py
def __init__(
    self,
    base_url: str,
    headers: dict[str, str] | None = None,
    auth: tuple[str, str] | None = None,
    timeout: float = 30.0,
):
    self.base_url = base_url.rstrip("/")
    self.headers = headers or {}
    self.auth = auth
    self.timeout = timeout
    self._client: Any = None

capabilities property

capabilities: CheckpointerCapabilities

Return the capabilities of this checkpointer.

Override in subclasses to advertise supported features.

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if a checkpoint exists.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to check. If None, checks if any checkpoint exists for the thread.

None

Returns:

Type Description
bool

True if the checkpoint exists

Source code in src/locus/memory/checkpointer.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Check if a checkpoint exists.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to check. If None,
                      checks if any checkpoint exists for the thread.

    Returns:
        True if the checkpoint exists
    """
    if checkpoint_id is None:
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        return len(checkpoints) > 0
    state = await self.load(thread_id, checkpoint_id)
    return state is not None

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint (latest if None)

None

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found

Source code in src/locus/memory/checkpointer.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """
    Get checkpoint metadata.

    Requires: capabilities.metadata_query = True

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint (latest if None)

    Returns:
        Metadata dict or None if not found
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("get_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Requires: capabilities.vacuum = True

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted checkpoints

Source code in src/locus/memory/checkpointer.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """
    Delete old checkpoints.

    Requires: capabilities.vacuum = True

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted checkpoints
    """
    self._require_capability("vacuum")
    raise NotImplementedError("vacuum not implemented")

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy a thread to create a branch.

Requires: capabilities.branching = True

Parameters:

Name Type Description Default
source_thread_id str

Source thread to copy from

required
dest_thread_id str

Destination thread ID

required

Returns:

Type Description
bool

True if successful

Source code in src/locus/memory/checkpointer.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """
    Copy a thread to create a branch.

    Requires: capabilities.branching = True

    Args:
        source_thread_id: Source thread to copy from
        dest_thread_id: Destination thread ID

    Returns:
        True if successful
    """
    self._require_capability("branching")
    raise NotImplementedError("copy_thread not implemented")

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

List all thread IDs.

Requires: capabilities.list_threads = True

Parameters:

Name Type Description Default
limit int

Maximum threads to return

100
pattern str

Pattern to filter threads (backend-specific)

'*'

Returns:

Type Description
list[str]

List of thread IDs

Source code in src/locus/memory/checkpointer.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """
    List all thread IDs.

    Requires: capabilities.list_threads = True

    Args:
        limit: Maximum threads to return
        pattern: Pattern to filter threads (backend-specific)

    Returns:
        List of thread IDs
    """
    self._require_capability("list_threads")
    raise NotImplementedError("list_threads not implemented")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

List checkpoints with their metadata.

Requires: capabilities.list_with_metadata = True

Parameters:

Name Type Description Default
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of {thread_id, checkpoint_id, metadata, ...} dicts

Source code in src/locus/memory/checkpointer.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    List checkpoints with their metadata.

    Requires: capabilities.list_with_metadata = True

    Args:
        limit: Maximum results

    Returns:
        List of {thread_id, checkpoint_id, metadata, ...} dicts
    """
    self._require_capability("list_with_metadata")
    raise NotImplementedError("list_with_metadata not implemented")

close async

close() -> None

Close the HTTP client.

Source code in src/locus/memory/backends/http.py
async def close(self) -> None:
    """Close the HTTP client."""
    if self._client is not None:
        await self._client.aclose()
        self._client = None

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state via HTTP POST.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required
thread_id str

Thread identifier

required
checkpoint_id str | None

Optional specific checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for querying/filtering checkpoints

None

Returns:

Type Description
str

Checkpoint ID for the saved state

Raises:

Type Description
HTTPError

If the request fails

Source code in src/locus/memory/backends/http.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state via HTTP POST.

    Args:
        state: Current agent state
        thread_id: Thread identifier
        checkpoint_id: Optional specific checkpoint ID
        metadata: Optional metadata for querying/filtering checkpoints

    Returns:
        Checkpoint ID for the saved state

    Raises:
        HTTPError: If the request fails
    """
    checkpoint_id = checkpoint_id or uuid4().hex

    client = await self._get_client()

    payload = {
        "checkpoint_id": checkpoint_id,
        "thread_id": thread_id,
        "created_at": datetime.now(UTC).isoformat(),
        "state": state.to_checkpoint(),
        "metadata": metadata or {},
    }

    response = await client.post(
        f"/threads/{_encode_path_segment(thread_id)}/checkpoints",
        json=payload,
    )
    response.raise_for_status()

    # Extract checkpoint_id from response if provided
    result: dict[str, Any] = response.json()
    returned_id: str = result.get("checkpoint_id", checkpoint_id)
    return returned_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state via HTTP GET.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint ID (latest if None)

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/backends/http.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state via HTTP GET.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint ID (latest if None)

    Returns:
        Restored AgentState or None if not found
    """
    from locus.core.state import AgentState

    client = await self._get_client()

    if checkpoint_id is None:
        # Get latest checkpoint
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        if not checkpoints:
            return None
        checkpoint_id = checkpoints[0]

    try:
        response = await client.get(
            f"/threads/{_encode_path_segment(thread_id)}"
            f"/checkpoints/{_encode_path_segment(checkpoint_id)}",
        )
        response.raise_for_status()
    except Exception:  # noqa: BLE001 — missing/unreachable == absent by design
        return None

    data = response.json()

    # Handle both wrapped and unwrapped state formats
    state_data = data.get("state", data)
    return AgentState.from_checkpoint(state_data)

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints via HTTP GET.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/backends/http.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints via HTTP GET.

    Args:
        thread_id: Thread identifier
        limit: Maximum number to return

    Returns:
        List of checkpoint IDs, newest first
    """
    client = await self._get_client()

    try:
        response = await client.get(
            f"/threads/{_encode_path_segment(thread_id)}/checkpoints",
            params={"limit": limit},
        )
        response.raise_for_status()
    except Exception:  # noqa: BLE001 — unreachable == empty by design
        return []

    data = response.json()

    # Handle various response formats
    if isinstance(data, list):
        # Direct list of checkpoints
        if data and isinstance(data[0], str):
            return data[:limit]
        if data and isinstance(data[0], dict):
            return [cp["checkpoint_id"] for cp in data[:limit]]
    elif isinstance(data, dict):
        # Wrapped response
        checkpoints = data.get("checkpoints", data.get("data", []))
        if checkpoints and isinstance(checkpoints[0], str):
            truncated: list[str] = checkpoints[:limit]
            return truncated
        if checkpoints and isinstance(checkpoints[0], dict):
            return [cp["checkpoint_id"] for cp in checkpoints[:limit]]

    return []

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint(s) via HTTP DELETE.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to delete (all if None)

None

Returns:

Type Description
bool

True if deletion was successful

Source code in src/locus/memory/backends/http.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Delete checkpoint(s) via HTTP DELETE.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to delete (all if None)

    Returns:
        True if deletion was successful
    """
    client = await self._get_client()

    try:
        if checkpoint_id is None:
            # Delete all checkpoints for thread
            response = await client.delete(
                f"/threads/{_encode_path_segment(thread_id)}/checkpoints",
            )
        else:
            response = await client.delete(
                f"/threads/{_encode_path_segment(thread_id)}"
                f"/checkpoints/{_encode_path_segment(checkpoint_id)}",
            )
        response.raise_for_status()
        return True
    except Exception:  # noqa: BLE001 — delete is idempotent; report boolean result
        return False

health_check async

health_check() -> bool

Check if the API is reachable.

Returns:

Type Description
bool

True if the API responds successfully

Source code in src/locus/memory/backends/http.py
async def health_check(self) -> bool:
    """
    Check if the API is reachable.

    Returns:
        True if the API responds successfully
    """
    client = await self._get_client()

    try:
        response = await client.get("/health")
        ok: bool = response.status_code < 400
        return ok
    except Exception:  # noqa: BLE001 — health check is a boolean probe
        return False

__aenter__ async

__aenter__() -> Self

Enter async context manager.

Source code in src/locus/memory/backends/http.py
async def __aenter__(self) -> Self:
    """Enter async context manager."""
    await self._get_client()
    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Exit async context manager.

Source code in src/locus/memory/backends/http.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Exit async context manager."""
    await self.close()

MemoryCheckpointer

MemoryCheckpointer()

Bases: BaseCheckpointer

In-memory checkpointer for testing and development.

Stores all checkpoints in a dictionary. Data is not persistent and will be lost when the process terminates.

Useful for: - Unit and integration testing - Development and prototyping - Short-lived agent sessions - As a fast caching layer

Capabilities: - list_threads: Yes - persistent_checkpoint_ids: Yes (within process lifetime)

Example
checkpointer = MemoryCheckpointer()

# Save state
checkpoint_id = await checkpointer.save(state, "thread-1")

# Load state
restored = await checkpointer.load("thread-1")
Source code in src/locus/memory/backends/memory.py
def __init__(self) -> None:
    # Storage: {thread_id: {checkpoint_id: (state_data, timestamp, metadata)}}
    self._storage: dict[str, dict[str, tuple[dict[str, Any], datetime, dict[str, Any]]]] = {}

capabilities property

capabilities: CheckpointerCapabilities

Memory checkpointer capabilities.

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if a checkpoint exists.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to check. If None, checks if any checkpoint exists for the thread.

None

Returns:

Type Description
bool

True if the checkpoint exists

Source code in src/locus/memory/checkpointer.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Check if a checkpoint exists.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to check. If None,
                      checks if any checkpoint exists for the thread.

    Returns:
        True if the checkpoint exists
    """
    if checkpoint_id is None:
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        return len(checkpoints) > 0
    state = await self.load(thread_id, checkpoint_id)
    return state is not None

close async

close() -> None

Close any resources (connections, files, etc.).

Override in subclasses if cleanup is needed.

Source code in src/locus/memory/checkpointer.py
async def close(self) -> None:
    """
    Close any resources (connections, files, etc.).

    Override in subclasses if cleanup is needed.
    """

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint (latest if None)

None

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found

Source code in src/locus/memory/checkpointer.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """
    Get checkpoint metadata.

    Requires: capabilities.metadata_query = True

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint (latest if None)

    Returns:
        Metadata dict or None if not found
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("get_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Requires: capabilities.vacuum = True

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted checkpoints

Source code in src/locus/memory/checkpointer.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """
    Delete old checkpoints.

    Requires: capabilities.vacuum = True

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted checkpoints
    """
    self._require_capability("vacuum")
    raise NotImplementedError("vacuum not implemented")

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy a thread to create a branch.

Requires: capabilities.branching = True

Parameters:

Name Type Description Default
source_thread_id str

Source thread to copy from

required
dest_thread_id str

Destination thread ID

required

Returns:

Type Description
bool

True if successful

Source code in src/locus/memory/checkpointer.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """
    Copy a thread to create a branch.

    Requires: capabilities.branching = True

    Args:
        source_thread_id: Source thread to copy from
        dest_thread_id: Destination thread ID

    Returns:
        True if successful
    """
    self._require_capability("branching")
    raise NotImplementedError("copy_thread not implemented")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

List checkpoints with their metadata.

Requires: capabilities.list_with_metadata = True

Parameters:

Name Type Description Default
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of {thread_id, checkpoint_id, metadata, ...} dicts

Source code in src/locus/memory/checkpointer.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    List checkpoints with their metadata.

    Requires: capabilities.list_with_metadata = True

    Args:
        limit: Maximum results

    Returns:
        List of {thread_id, checkpoint_id, metadata, ...} dicts
    """
    self._require_capability("list_with_metadata")
    raise NotImplementedError("list_with_metadata not implemented")

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state to memory.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required
thread_id str

Thread identifier

required
checkpoint_id str | None

Optional specific checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for the checkpoint

None

Returns:

Type Description
str

Checkpoint ID for the saved state

Source code in src/locus/memory/backends/memory.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state to memory.

    Args:
        state: Current agent state
        thread_id: Thread identifier
        checkpoint_id: Optional specific checkpoint ID
        metadata: Optional metadata for the checkpoint

    Returns:
        Checkpoint ID for the saved state
    """
    checkpoint_id = checkpoint_id or uuid4().hex

    if thread_id not in self._storage:
        self._storage[thread_id] = {}

    self._storage[thread_id][checkpoint_id] = (
        state.to_checkpoint(),
        datetime.now(UTC),
        metadata or {},
    )

    return checkpoint_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state from memory.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint ID (latest if None)

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/backends/memory.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state from memory.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint ID (latest if None)

    Returns:
        Restored AgentState or None if not found
    """
    from locus.core.state import AgentState

    if thread_id not in self._storage:
        return None

    thread_data = self._storage[thread_id]

    if not thread_data:
        return None

    if checkpoint_id is None:
        # Get latest checkpoint by timestamp
        latest_id = max(
            thread_data.keys(),
            key=lambda k: thread_data[k][1],
        )
        checkpoint_id = latest_id

    if checkpoint_id not in thread_data:
        return None

    state_data, _, _ = thread_data[checkpoint_id]
    return AgentState.from_checkpoint(state_data)

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints for a thread.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/backends/memory.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints for a thread.

    Args:
        thread_id: Thread identifier
        limit: Maximum number to return

    Returns:
        List of checkpoint IDs, newest first
    """
    if thread_id not in self._storage:
        return []

    thread_data = self._storage[thread_id]

    # Sort by timestamp descending
    sorted_ids = sorted(
        thread_data.keys(),
        key=lambda k: thread_data[k][1],
        reverse=True,
    )

    return sorted_ids[:limit]

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint(s) from memory.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to delete (all if None)

None

Returns:

Type Description
bool

True if deletion was successful

Source code in src/locus/memory/backends/memory.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Delete checkpoint(s) from memory.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to delete (all if None)

    Returns:
        True if deletion was successful
    """
    if thread_id not in self._storage:
        return False

    if checkpoint_id is None:
        # Delete all checkpoints for thread
        del self._storage[thread_id]
        return True
    if checkpoint_id in self._storage[thread_id]:
        del self._storage[thread_id][checkpoint_id]
        return True
    return False

clear

clear() -> None

Clear all stored checkpoints.

Source code in src/locus/memory/backends/memory.py
def clear(self) -> None:
    """Clear all stored checkpoints."""
    self._storage.clear()

get_thread_ids

get_thread_ids() -> list[str]

Get list of all thread IDs with checkpoints.

Source code in src/locus/memory/backends/memory.py
def get_thread_ids(self) -> list[str]:
    """Get list of all thread IDs with checkpoints."""
    return list(self._storage.keys())

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

List all thread IDs.

Parameters:

Name Type Description Default
limit int

Maximum threads to return

100
pattern str

Pattern to filter (supports * as wildcard)

'*'

Returns:

Type Description
list[str]

List of thread IDs

Source code in src/locus/memory/backends/memory.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """
    List all thread IDs.

    Args:
        limit: Maximum threads to return
        pattern: Pattern to filter (supports * as wildcard)

    Returns:
        List of thread IDs
    """
    import fnmatch

    threads = list(self._storage.keys())

    if pattern != "*":
        threads = [t for t in threads if fnmatch.fnmatch(t, pattern)]

    return threads[:limit]

get_checkpoint_count

get_checkpoint_count(thread_id: str | None = None) -> int

Get count of stored checkpoints.

Parameters:

Name Type Description Default
thread_id str | None

Specific thread (all threads if None)

None

Returns:

Type Description
int

Number of checkpoints

Source code in src/locus/memory/backends/memory.py
def get_checkpoint_count(self, thread_id: str | None = None) -> int:
    """
    Get count of stored checkpoints.

    Args:
        thread_id: Specific thread (all threads if None)

    Returns:
        Number of checkpoints
    """
    if thread_id is not None:
        return len(self._storage.get(thread_id, {}))
    return sum(len(t) for t in self._storage.values())

Adapters

StorageBackendAdapter wraps any of the simple key-value backends above into the full BaseCheckpointer interface. The convenience factory functions below are one-line shortcuts.

StorageBackendAdapter

StorageBackendAdapter(backend: Any)

Bases: BaseCheckpointer

Adapter that wraps simple storage backends to implement BaseCheckpointer.

Storage backends have a simple interface: - save(thread_id: str, data: dict) -> None - load(thread_id: str) -> dict | None - delete(thread_id: str) -> bool - exists(thread_id: str) -> bool - list_threads() -> list[str]

This adapter converts between AgentState and dict representations.

Key improvement: Checkpoint IDs are now stored IN the backend, not in memory. This ensures persistence across restarts.

Storage schema: - {thread_id}:{checkpoint_id} -> checkpoint data - {thread_id}:latest -> latest checkpoint (for quick access) - {thread_id}:_checkpoints -> list of checkpoint metadata

Example

from locus.memory.backends import RedisBackend from locus.memory.backends.adapters import StorageBackendAdapter

Create storage backend

storage = RedisBackend(url="redis://localhost:6379")

Wrap with adapter for use with Agent

checkpointer = StorageBackendAdapter(storage)

Use with Agent

agent = Agent(model=model, checkpointer=checkpointer)

Initialize adapter with a storage backend.

Parameters:

Name Type Description Default
backend Any

Storage backend with save/load/delete/exists methods

required
Source code in src/locus/memory/backends/adapters.py
def __init__(self, backend: Any) -> None:
    """
    Initialize adapter with a storage backend.

    Args:
        backend: Storage backend with save/load/delete/exists methods
    """
    self._backend = backend
    self._capabilities_cache: CheckpointerCapabilities | None = None

capabilities property

capabilities: CheckpointerCapabilities

Derive capabilities from backend methods.

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state with persistent checkpoint ID tracking.

Source code in src/locus/memory/backends/adapters.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """Save agent state with persistent checkpoint ID tracking."""
    checkpoint_id = checkpoint_id or uuid4().hex
    now = datetime.now(UTC)

    # Create storage key
    storage_key = f"{thread_id}:{checkpoint_id}"

    # Convert state to dict and save
    data = state.to_checkpoint()
    data["_checkpoint_id"] = checkpoint_id
    data["_checkpoint_timestamp"] = now.isoformat()
    data["_metadata"] = metadata or {}

    # Save the checkpoint (some backends support metadata parameter)
    save_method = self._backend.save
    import inspect

    sig = inspect.signature(save_method)
    if "metadata" in sig.parameters:
        await save_method(storage_key, data, metadata=metadata)
    else:
        await save_method(storage_key, data)

    # Also save as "latest" for easy retrieval
    await self._backend.save(f"{thread_id}:latest", data)

    # Update checkpoint index (persistent checkpoint ID list)
    await self._update_checkpoint_index(thread_id, checkpoint_id, now, metadata)

    return checkpoint_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state from the storage backend.

Source code in src/locus/memory/backends/adapters.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """Load agent state from the storage backend."""
    from locus.core.state import AgentState

    # Determine storage key
    if checkpoint_id:
        storage_key = f"{thread_id}:{checkpoint_id}"
    else:
        storage_key = f"{thread_id}:latest"

    # Load data
    data = await self._backend.load(storage_key)
    if data is None:
        return None

    # Remove adapter metadata before restoring
    data.pop("_checkpoint_id", None)
    data.pop("_checkpoint_timestamp", None)
    data.pop("_metadata", None)

    return AgentState.from_checkpoint(data)

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints from persistent index.

Source code in src/locus/memory/backends/adapters.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """List available checkpoints from persistent index."""
    index_key = f"{thread_id}:_checkpoints"

    existing = await self._backend.load(index_key)
    if existing is None:
        return []

    checkpoints = existing.get("checkpoints", [])
    return [cp.get("checkpoint_id") for cp in checkpoints[:limit] if cp.get("checkpoint_id")]

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint(s) with index update.

Source code in src/locus/memory/backends/adapters.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """Delete checkpoint(s) with index update."""
    if checkpoint_id:
        # Delete specific checkpoint
        storage_key = f"{thread_id}:{checkpoint_id}"
        result: bool = await self._backend.delete(storage_key)

        # Update index
        await self._remove_from_index(thread_id, checkpoint_id)

        return result
    else:
        # Delete all checkpoints for thread
        deleted = False

        # Get all checkpoint IDs from index
        checkpoints = await self.list_checkpoints(thread_id, limit=1000)

        # Delete each checkpoint
        for cp_id in checkpoints:
            if await self._backend.delete(f"{thread_id}:{cp_id}"):
                deleted = True

        # Delete latest pointer
        if await self._backend.exists(f"{thread_id}:latest"):
            await self._backend.delete(f"{thread_id}:latest")
            deleted = True

        # Delete index
        if await self._backend.exists(f"{thread_id}:_checkpoints"):
            await self._backend.delete(f"{thread_id}:_checkpoints")
            deleted = True

        return deleted

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/adapters.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """Check if checkpoint exists."""
    if checkpoint_id:
        storage_key = f"{thread_id}:{checkpoint_id}"
    else:
        storage_key = f"{thread_id}:latest"

    present: bool = await self._backend.exists(storage_key)
    return present

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Delegate to backend search.

Source code in src/locus/memory/backends/adapters.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """Delegate to backend search."""
    self._require_capability("search")
    results: list[dict[str, Any]] = await self._backend.search(query, limit=limit)
    return results

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Delegate to backend metadata query.

Source code in src/locus/memory/backends/adapters.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """Delegate to backend metadata query."""
    self._require_capability("metadata_query")
    if hasattr(self._backend, "query_by_metadata"):
        results: list[dict[str, Any]] = await self._backend.query_by_metadata(
            key, value, limit=limit
        )
        return results
    if hasattr(self._backend, "get_by_metadata"):
        via_get: list[dict[str, Any]] = await self._backend.get_by_metadata(
            key, value, limit=limit
        )
        return via_get
    raise NotImplementedError("Backend has no metadata query method")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata from index or backend.

Source code in src/locus/memory/backends/adapters.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """Get checkpoint metadata from index or backend."""
    # First try the backend's native method
    if hasattr(self._backend, "get_metadata"):
        storage_key = f"{thread_id}:{checkpoint_id}" if checkpoint_id else f"{thread_id}:latest"
        meta: dict[str, Any] | None = await self._backend.get_metadata(storage_key)
        return meta

    # Fallback to checkpoint index
    index_key = f"{thread_id}:_checkpoints"
    existing = await self._backend.load(index_key)
    if existing is None:
        return None

    checkpoints = existing.get("checkpoints", [])

    if checkpoint_id:
        for cp in checkpoints:
            if cp.get("checkpoint_id") == checkpoint_id:
                matched: dict[str, Any] = cp
                return matched
        return None
    # Return latest
    latest: dict[str, Any] | None = checkpoints[0] if checkpoints else None
    return latest

vacuum async

vacuum(older_than_days: int = 30) -> int

Delegate to backend vacuum.

Source code in src/locus/memory/backends/adapters.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """Delegate to backend vacuum."""
    self._require_capability("vacuum")
    deleted: int = await self._backend.vacuum(older_than_days)
    return deleted

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy all checkpoints from one thread to another (branching).

Source code in src/locus/memory/backends/adapters.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """Copy all checkpoints from one thread to another (branching)."""
    self._require_capability("branching")

    # Always use manual implementation since adapter uses different key structure
    # ({thread_id}:{checkpoint_id}) than backends expect
    checkpoints = await self.list_checkpoints(source_thread_id, limit=1000)
    if not checkpoints:
        return False

    for cp_id in checkpoints:
        state = await self.load(source_thread_id, cp_id)
        if state:
            meta = await self.get_metadata(source_thread_id, cp_id)
            await self.save(
                state, dest_thread_id, cp_id, metadata=meta.get("metadata") if meta else None
            )
    return True

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

Delegate to backend list_threads.

Source code in src/locus/memory/backends/adapters.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """Delegate to backend list_threads."""
    self._require_capability("list_threads")

    # Backend might have different signature
    if hasattr(self._backend, "list_threads"):
        import inspect

        sig = inspect.signature(self._backend.list_threads)
        if "pattern" in sig.parameters:
            with_pattern: list[str] = await self._backend.list_threads(
                pattern=pattern, limit=limit
            )
            return with_pattern
        if "limit" in sig.parameters:
            threads: list[str] = await self._backend.list_threads(limit=limit)
        else:
            threads = await self._backend.list_threads()

        # Apply pattern filter if backend doesn't support it
        if pattern != "*":
            import fnmatch

            threads = [t for t in threads if fnmatch.fnmatch(t, pattern)]

        return threads[:limit]

    raise NotImplementedError("Backend has no list_threads method")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

Delegate to backend list_with_metadata.

Source code in src/locus/memory/backends/adapters.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """Delegate to backend list_with_metadata."""
    self._require_capability("list_with_metadata")
    items: list[dict[str, Any]] = await self._backend.list_with_metadata(limit=limit)
    return items

close async

close() -> None

Close the underlying backend if it supports it.

Source code in src/locus/memory/backends/adapters.py
async def close(self) -> None:
    """Close the underlying backend if it supports it."""
    if hasattr(self._backend, "close"):
        await self._backend.close()

oracle_checkpointer

oracle_checkpointer(dsn: str | None = None, user: str = 'admin', password: str = '', **kwargs: Any) -> StorageBackendAdapter

Create an Oracle Database-backed checkpointer.

Parameters:

Name Type Description Default
dsn str | None

Oracle connection string (e.g., "host:port/service_name")

None
user str

Database user

'admin'
password str

Database password

''
**kwargs Any

Additional OracleBackend options (table_name, wallet_location)

{}

Returns:

Type Description
StorageBackendAdapter

StorageBackendAdapter wrapping OracleBackend

Capabilities
  • search: Yes (via JSON search)
  • metadata_query: Yes
  • vacuum: Yes
  • list_threads: Yes
  • persistent_checkpoint_ids: Yes
Example

checkpointer = oracle_checkpointer( ... dsn="mydb_high", ... user="admin", ... password="secret", ... ) agent = Agent(model=model, checkpointer=checkpointer)

Source code in src/locus/memory/backends/adapters.py
def oracle_checkpointer(
    dsn: str | None = None,
    user: str = "admin",
    password: str = "",
    **kwargs: Any,
) -> StorageBackendAdapter:
    """
    Create an Oracle Database-backed checkpointer.

    Args:
        dsn: Oracle connection string (e.g., "host:port/service_name")
        user: Database user
        password: Database password
        **kwargs: Additional OracleBackend options (table_name, wallet_location)

    Returns:
        StorageBackendAdapter wrapping OracleBackend

    Capabilities:
        - search: Yes (via JSON search)
        - metadata_query: Yes
        - vacuum: Yes
        - list_threads: Yes
        - persistent_checkpoint_ids: Yes

    Example:
        >>> checkpointer = oracle_checkpointer(
        ...     dsn="mydb_high",
        ...     user="admin",
        ...     password="secret",
        ... )
        >>> agent = Agent(model=model, checkpointer=checkpointer)
    """
    from locus.memory.backends.oracle import OracleBackend

    backend = OracleBackend(
        dsn=dsn,
        user=user,
        password=password,
        **kwargs,
    )
    return StorageBackendAdapter(backend)

oci_bucket_checkpointer

oci_bucket_checkpointer(bucket_name: str, namespace: str, prefix: str = 'locus/checkpoints/', **kwargs: Any) -> BaseCheckpointer

Create an OCI Object Storage-backed checkpointer.

OCIBucketBackend is a native BaseCheckpointer — this factory is kept as a thin convenience alias for parity with the other backend factories. You can just as well instantiate the class directly.

Parameters:

Name Type Description Default
bucket_name str

OCI bucket name

required
namespace str

OCI namespace

required
prefix str

Object prefix

'locus/checkpoints/'
**kwargs Any

Additional OCIBucketBackend options (profile_name, auth_type, region)

{}
Example

checkpointer = oci_bucket_checkpointer( ... bucket_name="my-checkpoints", ... namespace="my-namespace", ... ) agent = Agent(config=cfg, checkpointer=checkpointer)

Source code in src/locus/memory/backends/adapters.py
def oci_bucket_checkpointer(
    bucket_name: str,
    namespace: str,
    prefix: str = "locus/checkpoints/",
    **kwargs: Any,
) -> BaseCheckpointer:
    """
    Create an OCI Object Storage-backed checkpointer.

    ``OCIBucketBackend`` is a native ``BaseCheckpointer`` — this factory is
    kept as a thin convenience alias for parity with the other backend
    factories. You can just as well instantiate the class directly.

    Args:
        bucket_name: OCI bucket name
        namespace: OCI namespace
        prefix: Object prefix
        **kwargs: Additional OCIBucketBackend options (profile_name, auth_type, region)

    Example:
        >>> checkpointer = oci_bucket_checkpointer(
        ...     bucket_name="my-checkpoints",
        ...     namespace="my-namespace",
        ... )
        >>> agent = Agent(config=cfg, checkpointer=checkpointer)
    """
    from locus.memory.backends.oci_bucket import OCIBucketBackend

    return OCIBucketBackend(
        bucket_name=bucket_name,
        namespace=namespace,
        prefix=prefix,
        **kwargs,
    )

opensearch_checkpointer

opensearch_checkpointer(hosts: list[str] | None = None, index_name: str = 'locus-checkpoints', **kwargs: Any) -> StorageBackendAdapter

Create an OpenSearch-backed checkpointer.

Parameters:

Name Type Description Default
hosts list[str] | None

OpenSearch hosts

None
index_name str

Index name for checkpoints

'locus-checkpoints'
**kwargs Any

Additional OpenSearchBackend options (username, password, use_ssl)

{}

Returns:

Type Description
StorageBackendAdapter

StorageBackendAdapter wrapping OpenSearchBackend

Capabilities
  • search: Yes (full-text search)
  • metadata_query: Yes (via get_by_metadata)
  • list_threads: Yes
  • persistent_checkpoint_ids: Yes
Example

checkpointer = opensearch_checkpointer(hosts=["localhost:9200"]) agent = Agent(model=model, checkpointer=checkpointer)

Source code in src/locus/memory/backends/adapters.py
def opensearch_checkpointer(
    hosts: list[str] | None = None,
    index_name: str = "locus-checkpoints",
    **kwargs: Any,
) -> StorageBackendAdapter:
    """
    Create an OpenSearch-backed checkpointer.

    Args:
        hosts: OpenSearch hosts
        index_name: Index name for checkpoints
        **kwargs: Additional OpenSearchBackend options (username, password, use_ssl)

    Returns:
        StorageBackendAdapter wrapping OpenSearchBackend

    Capabilities:
        - search: Yes (full-text search)
        - metadata_query: Yes (via get_by_metadata)
        - list_threads: Yes
        - persistent_checkpoint_ids: Yes

    Example:
        >>> checkpointer = opensearch_checkpointer(hosts=["localhost:9200"])
        >>> agent = Agent(model=model, checkpointer=checkpointer)
    """
    from locus.memory.backends.opensearch import OpenSearchBackend

    backend = OpenSearchBackend(hosts=hosts, index_name=index_name, **kwargs)
    return StorageBackendAdapter(backend)

postgresql_checkpointer

postgresql_checkpointer(host: str = 'localhost', port: int = 5432, database: str = 'locus', user: str = 'postgres', password: str = '', dsn: str | None = None, **kwargs: Any) -> StorageBackendAdapter

Create a PostgreSQL-backed checkpointer.

Parameters:

Name Type Description Default
host str

PostgreSQL host

'localhost'
port int

PostgreSQL port

5432
database str

Database name

'locus'
user str

Database user

'postgres'
password str

Database password

''
dsn str | None

Connection string (overrides other params)

None
**kwargs Any

Additional PostgreSQLBackend options

{}

Returns:

Type Description
StorageBackendAdapter

StorageBackendAdapter wrapping PostgreSQLBackend

Capabilities
  • search: Yes (via search_data)
  • metadata_query: Yes (via query_by_metadata)
  • vacuum: Yes
  • list_threads: Yes
  • persistent_checkpoint_ids: Yes
Example

checkpointer = postgresql_checkpointer(database="myapp") agent = Agent(model=model, checkpointer=checkpointer)

Source code in src/locus/memory/backends/adapters.py
def postgresql_checkpointer(
    host: str = "localhost",
    port: int = 5432,
    database: str = "locus",
    user: str = "postgres",
    password: str = "",
    dsn: str | None = None,
    **kwargs: Any,
) -> StorageBackendAdapter:
    """
    Create a PostgreSQL-backed checkpointer.

    Args:
        host: PostgreSQL host
        port: PostgreSQL port
        database: Database name
        user: Database user
        password: Database password
        dsn: Connection string (overrides other params)
        **kwargs: Additional PostgreSQLBackend options

    Returns:
        StorageBackendAdapter wrapping PostgreSQLBackend

    Capabilities:
        - search: Yes (via search_data)
        - metadata_query: Yes (via query_by_metadata)
        - vacuum: Yes
        - list_threads: Yes
        - persistent_checkpoint_ids: Yes

    Example:
        >>> checkpointer = postgresql_checkpointer(database="myapp")
        >>> agent = Agent(model=model, checkpointer=checkpointer)
    """
    from locus.memory.backends.postgresql import PostgreSQLBackend

    backend = PostgreSQLBackend(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        dsn=dsn,
        **kwargs,
    )
    return StorageBackendAdapter(backend)

redis_checkpointer

redis_checkpointer(url: str = 'redis://localhost:6379', prefix: str = 'locus:state:', **kwargs: Any) -> StorageBackendAdapter

Create a Redis-backed checkpointer.

Parameters:

Name Type Description Default
url str

Redis URL

'redis://localhost:6379'
prefix str

Key prefix for all checkpoints

'locus:state:'
**kwargs Any

Additional RedisBackend options (ttl_seconds, db)

{}

Returns:

Type Description
StorageBackendAdapter

StorageBackendAdapter wrapping RedisBackend

Capabilities
  • ttl: Yes (via ttl_seconds)
  • list_threads: Yes
  • persistent_checkpoint_ids: Yes
Example

checkpointer = redis_checkpointer("redis://localhost:6379") agent = Agent(model=model, checkpointer=checkpointer)

Source code in src/locus/memory/backends/adapters.py
def redis_checkpointer(
    url: str = "redis://localhost:6379",
    prefix: str = "locus:state:",
    **kwargs: Any,
) -> StorageBackendAdapter:
    """
    Create a Redis-backed checkpointer.

    Args:
        url: Redis URL
        prefix: Key prefix for all checkpoints
        **kwargs: Additional RedisBackend options (ttl_seconds, db)

    Returns:
        StorageBackendAdapter wrapping RedisBackend

    Capabilities:
        - ttl: Yes (via ttl_seconds)
        - list_threads: Yes
        - persistent_checkpoint_ids: Yes

    Example:
        >>> checkpointer = redis_checkpointer("redis://localhost:6379")
        >>> agent = Agent(model=model, checkpointer=checkpointer)
    """
    from locus.memory.backends.redis import RedisBackend

    backend = RedisBackend(url=url, prefix=prefix, **kwargs)
    return StorageBackendAdapter(backend)