Checkpointers¶
State persistence between agent runs. Oracle Autonomous Database is
the production backend: native JSON columns, the
OracleCheckpointSaver versioned saver matching the
langgraph-oracledb shape, and pool-based async access via python-oracledb.
For long-term memory (durable KV store, semantic recall), see
Memory. This page covers the per-run state snapshot
contract used by AgentConfig.checkpointer.
Contract¶
BaseCheckpointer ¶
Bases: ABC
Abstract base class for checkpointer implementations.
Checkpointers handle saving and loading agent state, enabling features like: - Conversation persistence - Session recovery - Branching conversations - State inspection and debugging - Full-text search (backend-dependent) - Metadata queries (backend-dependent)
All methods are async to support various backends (file, database, network storage, etc.).
Use the capabilities property to check which features are available
before calling extended methods.
Example
if checkpointer.capabilities.search: ... results = await checkpointer.search("error handling") if checkpointer.capabilities.branching: ... await checkpointer.copy_thread("main", "experiment")
capabilities
property
¶
Return the capabilities of this checkpointer.
Override in subclasses to advertise supported features.
save
abstractmethod
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state to persist |
required |
thread_id
|
str
|
Unique identifier for the conversation thread |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID. If not provided, a new ID will be generated. |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying/filtering checkpoints |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID that can be used to restore this state |
Source code in src/locus/memory/checkpointer.py
load
abstractmethod
async
¶
Load agent state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier to load from |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID. If not provided, loads the latest checkpoint. |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/checkpointer.py
list_checkpoints
abstractmethod
async
¶
List available checkpoints for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number of checkpoint IDs to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/checkpointer.py
delete
async
¶
Delete a checkpoint or all checkpoints for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to delete. If None, deletes all checkpoints for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if deletion was successful |
Source code in src/locus/memory/checkpointer.py
exists
async
¶
Check if a checkpoint exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to check. If None, checks if any checkpoint exists for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the checkpoint exists |
Source code in src/locus/memory/checkpointer.py
close
async
¶
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
get_metadata
async
¶
Get checkpoint metadata.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Metadata dict or None if not found |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete old checkpoints.
Requires: capabilities.vacuum = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted checkpoints |
Source code in src/locus/memory/checkpointer.py
copy_thread
async
¶
Copy a thread to create a branch.
Requires: capabilities.branching = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_thread_id
|
str
|
Source thread to copy from |
required |
dest_thread_id
|
str
|
Destination thread ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in src/locus/memory/checkpointer.py
list_threads
async
¶
List all thread IDs.
Requires: capabilities.list_threads = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum threads to return |
100
|
pattern
|
str
|
Pattern to filter threads (backend-specific) |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of thread IDs |
Source code in src/locus/memory/checkpointer.py
list_with_metadata
async
¶
List checkpoints with their metadata.
Requires: capabilities.list_with_metadata = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of {thread_id, checkpoint_id, metadata, ...} dicts |
Source code in src/locus/memory/checkpointer.py
CheckpointerCapabilities
dataclass
¶
CheckpointerCapabilities(search: bool = False, metadata_query: bool = False, vacuum: bool = False, branching: bool = False, ttl: bool = False, list_threads: bool = False, list_with_metadata: bool = False, persistent_checkpoint_ids: bool = False)
Capabilities supported by a checkpointer.
Use this to discover what features a checkpointer supports before calling optional methods.
Example
if checkpointer.capabilities.search: ... results = await checkpointer.search("error handling")
Oracle Database 26ai¶
Three flavours, depending on how you want state stored:
OracleBackend— async, JSON-column storage; the default full-checkpointer interface.OracleCheckpointSaver— versioned (LangGraph-shape) saver, with explicit checkpoint versions and parent IDs for time-travel.OracleSyncBackend/OracleSyncCheckpointSaver— sync companions for the langgraph-oracledb split, when you need to run inside a sync ORM session.
OracleBackend ¶
OracleBackend(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, **kwargs: Any)
Bases: BaseModel
Oracle Database checkpoint backend.
Production-grade persistent storage with JSON support and full-text search.
Features: - Connection pooling - JSON column storage with search - Metadata indexing - Vacuum (cleanup old checkpoints) - Works with Autonomous Database (wallet-based auth)
Example with DSN
backend = OracleBackend( ... dsn="mydb_high", # TNS name from tnsnames.ora ... user="admin", ... password="secret", ... wallet_location="/path/to/wallet", ... ) await backend.save("thread_1", state.model_dump())
Example with connection string
backend = OracleBackend( ... host="adb.us-ashburn-1.oraclecloud.com", ... port=1522, ... service_name="xxx_high.adb.oraclecloud.com", ... user="admin", ... password="secret", ... )
Source code in src/locus/memory/backends/oracle.py
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state to Oracle Database.
Implements :meth:BaseCheckpointer.save. Parameter order is
(state, thread_id, ...) to match the abstract — the prior
(thread_id, data, ...) signature silently mismatched the
agent runtime, which calls save(state, thread_id) and would
end up trying to bind an :class:AgentState to the
VARCHAR2(255) thread_id column.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state. Serialized via
:meth: |
required |
thread_id
|
str
|
Thread identifier (column primary key). |
required |
checkpoint_id
|
str | None
|
Optional checkpoint ID. Generated if omitted. |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID. |
Source code in src/locus/memory/backends/oracle.py
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | |
load
async
¶
Load a saved payload from Oracle Database.
Returns the raw dict payload as written by :meth:save. The
:class:StorageBackendAdapter wrapper rehydrates this into an
:class:AgentState for the agent runtime; callers that hold a
bare OracleBackend get the dict directly.
checkpoint_id is accepted for signature parity but ignored —
this backend stores one row per thread_id (MERGE upsert), so
latest-state is the only retrievable checkpoint.
Source code in src/locus/memory/backends/oracle.py
delete
async
¶
Delete checkpoint from Oracle Database.
Source code in src/locus/memory/backends/oracle.py
exists
async
¶
Check if checkpoint exists.
Source code in src/locus/memory/backends/oracle.py
list_threads
async
¶
List all thread IDs matching pattern.
Source code in src/locus/memory/backends/oracle.py
get_metadata
async
¶
Get checkpoint metadata.
Source code in src/locus/memory/backends/oracle.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Uses Oracle JSON path expressions.
Source code in src/locus/memory/backends/oracle.py
search
async
¶
Search checkpoints by content.
Uses Oracle JSON_TEXTCONTAINS for full-text search within JSON.
Source code in src/locus/memory/backends/oracle.py
vacuum
async
¶
Delete old checkpoints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted rows |
Source code in src/locus/memory/backends/oracle.py
count
async
¶
Count checkpoints matching pattern.
Source code in src/locus/memory/backends/oracle.py
close
async
¶
Close the connection pool and reset provisioning state.
force=True is the right choice here — by the time we close
the pool we are either tearing down at end of run_sync or
recovering from a network-level error; waiting for in-flight
operations to drain on a possibly-dead loop would just hang.
Cleanup must never raise; an exception here would mask the
primary error path callers actually care about.
Source code in src/locus/memory/backends/oracle.py
OracleCheckpointSaver ¶
OracleCheckpointSaver(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, table_name: str = 'locus', schema_name: str | None = None, min_pool_size: int = 1, max_pool_size: int = 5, auto_create_table: bool = True, **kwargs: Any)
Bases: BaseModel
Versioned Oracle checkpoint saver with pending-writes durability.
locus-native — does not inherit from
langgraph.checkpoint.base.BaseCheckpointSaver. The method surface
matches the LangGraph shape (put / get / list_checkpoints
/ put_writes / get_writes / delete_thread) so adapter
layers can wire it into LangGraph runtimes without dragging a
langchain dependency into locus.
Example with TNS alias::
>>> saver = OracleCheckpointSaver(
... dsn="mydb_low",
... user="admin",
... password="secret",
... wallet_location="/path/to/wallet",
... table_name="locus",
... )
>>> await saver.put(
... thread_id="t1",
... checkpoint_id="c1",
... checkpoint_data={"step": 0, "values": {"x": 1}},
... )
>>> latest = await saver.get(thread_id="t1")
Example with pending writes (intra-step durability)::
>>> await saver.put_writes(
... thread_id="t1",
... checkpoint_id="c1",
... task_id="node-a",
... writes=[("x", 2), ("y", 3)],
... )
Source code in src/locus/memory/backends/oracle_versioned.py
put
async
¶
put(*, thread_id: str, checkpoint_id: str, checkpoint_data: dict, checkpoint_ns: str = 'default', parent_checkpoint_id: str | None = None, metadata: dict | None = None) -> None
Persist one checkpoint row.
History is preserved: (thread_id, checkpoint_ns,
checkpoint_id) is the primary key, so re-saving a different
checkpoint_id for the same thread inserts a new row rather
than overwriting.
Source code in src/locus/memory/backends/oracle_versioned.py
get
async
¶
get(*, thread_id: str, checkpoint_id: str | None = None, checkpoint_ns: str = 'default') -> dict | None
Fetch one checkpoint.
checkpoint_id=None returns the most recent row for the
(thread_id, checkpoint_ns) pair, ordered by created_at
DESC with FETCH FIRST 1 ROWS ONLY.
Returns None when no matching row exists; otherwise a dict
with keys checkpoint_id, parent_checkpoint_id,
checkpoint, metadata, created_at.
Source code in src/locus/memory/backends/oracle_versioned.py
list_checkpoints
async
¶
list_checkpoints(*, thread_id: str, checkpoint_ns: str = 'default', limit: int = 10, before: str | None = None) -> list[dict]
Return checkpoints for a thread, newest first.
before is a checkpoint_id; when supplied, only rows
strictly older than that row's created_at are returned —
the typical "page back through history" pattern LangGraph's
alist exposes.
Source code in src/locus/memory/backends/oracle_versioned.py
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 | |
put_writes
async
¶
put_writes(*, thread_id: str, checkpoint_id: str, task_id: str, writes: list[tuple[str, Any]], checkpoint_ns: str = 'default') -> None
Persist pending writes for one (checkpoint, task) pair.
Idempotent: existing writes for the same
(thread_id, checkpoint_ns, checkpoint_id, task_id) triple
are deleted first, then the new set is inserted with monotonic
idx 0..N-1. Retries are safe.
Source code in src/locus/memory/backends/oracle_versioned.py
get_writes
async
¶
get_writes(*, thread_id: str, checkpoint_id: str, checkpoint_ns: str = 'default', task_id: str | None = None) -> list[dict]
Fetch pending writes, ordered by (task_id, idx).
task_id=None returns writes for every task at the
checkpoint; otherwise only the named task's rows.
Source code in src/locus/memory/backends/oracle_versioned.py
delete_thread
async
¶
Cascade-delete every checkpoint + pending write for a thread.
Source code in src/locus/memory/backends/oracle_versioned.py
OracleSyncBackend ¶
OracleSyncBackend(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, **kwargs: Any)
Sync companion to :class:OracleBackend.
Constructor accepts the same arguments as
:class:OracleBackend; every public async method is mirrored as a
blocking method with the same name (no _sync suffix) so
porting code between the two surfaces is a single import change.
Source code in src/locus/memory/backends/oracle_sync.py
OracleSyncCheckpointSaver ¶
OracleSyncCheckpointSaver(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, table_name: str = 'locus', schema_name: str | None = None, min_pool_size: int = 1, max_pool_size: int = 5, auto_create_table: bool = True, **kwargs: Any)
Sync companion to :class:OracleCheckpointSaver.
Versioned, history-preserving checkpoint saver with pending-writes
durability — synchronous API. Method names match the async
counterpart exactly (put / get / list_checkpoints /
put_writes / get_writes / delete_thread / close).
Source code in src/locus/memory/backends/oracle_sync.py
OCI Object Storage¶
OCIBucketBackend ¶
OCIBucketBackend(bucket_name: str, namespace: str, prefix: str = 'locus/checkpoints/', profile_name: str = 'DEFAULT', auth_type: str = 'api_key', region: str | None = None, retry_strategy: Any = None, **kwargs: Any)
Bases: BaseCheckpointer
OCI Object Storage-backed checkpointer.
Durable, per-checkpoint storage with lifecycle-policy support. Pass the
instance directly to :class:~locus.agent.Agent — no adapter needed.
Example::
checkpointer = OCIBucketBackend(
bucket_name="my-checkpoints",
namespace="yzhbfkqxqsx9",
profile_name="API_KEY_AUTH",
)
agent = Agent(config=cfg, checkpointer=checkpointer)
With an OCI compute instance principal::
checkpointer = OCIBucketBackend(
bucket_name="my-checkpoints",
namespace="yzhbfkqxqsx9",
auth_type="instance_principal",
)
Capabilities
list_threads— yes (via object prefix delimiter listing)list_with_metadata— yesmetadata_query— yes (viaget_metadata)branching— yes (viacopy_thread)vacuum— yes (prefer bucket lifecycle policies for prod)persistent_checkpoint_ids— yes
Source code in src/locus/memory/backends/oci_bucket.py
close
async
¶
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete threads whose latest checkpoint is older than the cutoff.
For production, prefer an OCI Object Storage lifecycle rule — it runs server-side and costs nothing in client CPU.
Source code in src/locus/memory/backends/oci_bucket.py
copy_thread
async
¶
Copy every checkpoint under source to dest (for branching).
Source code in src/locus/memory/backends/oci_bucket.py
Other backends¶
When you can't run on Oracle / OCI, the same BaseCheckpointer
contract is implemented for Redis, PostgreSQL, OpenSearch, file
system, and an HTTP-API adapter.
RedisBackend ¶
Bases: BaseModel
Redis checkpoint backend.
Fast key-value storage with optional TTL for checkpoints.
Example
backend = RedisBackend(url="redis://localhost:6379") await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")
Source code in src/locus/memory/backends/redis.py
save
async
¶
Save checkpoint to Redis.
Source code in src/locus/memory/backends/redis.py
load
async
¶
Load checkpoint from Redis.
Source code in src/locus/memory/backends/redis.py
delete
async
¶
exists
async
¶
Check if checkpoint exists.
list_threads
async
¶
List all thread IDs matching pattern.
Source code in src/locus/memory/backends/redis.py
PostgreSQLBackend ¶
PostgreSQLBackend(host: str = 'localhost', port: int = 5432, database: str = 'locus', user: str = 'postgres', password: str | SecretStr = '', dsn: str | None = None, **kwargs: Any)
Bases: BaseModel
PostgreSQL checkpoint backend.
Production-grade persistent storage with ACID guarantees.
Features: - Connection pooling - Transaction support - JSON/JSONB storage - Indexing for fast lookups - Concurrent access safe
Example
backend = PostgreSQLBackend( ... host="localhost", ... database="myapp", ... user="postgres", ... password="secret", ... ) await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")
With DSN
backend = PostgreSQLBackend(dsn="postgresql://user:pass@localhost:5432/mydb")
Source code in src/locus/memory/backends/postgresql.py
save
async
¶
save(thread_id: str, data: dict[str, Any], checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save checkpoint to PostgreSQL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
data
|
dict[str, Any]
|
Checkpoint data |
required |
checkpoint_id
|
str | None
|
Optional checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID |
Source code in src/locus/memory/backends/postgresql.py
load
async
¶
Load checkpoint from PostgreSQL.
Source code in src/locus/memory/backends/postgresql.py
delete
async
¶
Delete checkpoint from PostgreSQL.
Source code in src/locus/memory/backends/postgresql.py
exists
async
¶
Check if checkpoint exists.
Source code in src/locus/memory/backends/postgresql.py
list_threads
async
¶
List all thread IDs matching pattern.
Source code in src/locus/memory/backends/postgresql.py
get_metadata
async
¶
Get checkpoint metadata.
Source code in src/locus/memory/backends/postgresql.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Uses PostgreSQL JSONB operators for efficient querying.
Source code in src/locus/memory/backends/postgresql.py
search_data
async
¶
Search checkpoints by data field using JSON path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
JSON path (e.g., "messages", "confidence") |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Example
results = await backend.search_data("agent_id", "agent-123")
Source code in src/locus/memory/backends/postgresql.py
count
async
¶
Count checkpoints matching pattern.
Source code in src/locus/memory/backends/postgresql.py
vacuum
async
¶
Delete old checkpoints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted rows |
Source code in src/locus/memory/backends/postgresql.py
OpenSearchBackend ¶
OpenSearchBackend(hosts: list[str] | None = None, index_name: str = 'locus-checkpoints', username: str | None = None, password: str | None = None, **kwargs: Any)
Bases: BaseModel
OpenSearch checkpoint backend.
Scalable document storage with full-text search capabilities.
Example
backend = OpenSearchBackend(hosts=["localhost:9200"]) await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1") results = await backend.search("user query")
Source code in src/locus/memory/backends/opensearch.py
save
async
¶
Save checkpoint to OpenSearch.
Source code in src/locus/memory/backends/opensearch.py
load
async
¶
Load checkpoint from OpenSearch.
Source code in src/locus/memory/backends/opensearch.py
delete
async
¶
Delete checkpoint from OpenSearch.
Source code in src/locus/memory/backends/opensearch.py
exists
async
¶
Check if checkpoint exists.
Source code in src/locus/memory/backends/opensearch.py
list_threads
async
¶
List all thread IDs.
Source code in src/locus/memory/backends/opensearch.py
search
async
¶
Search checkpoints by content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Source code in src/locus/memory/backends/opensearch.py
get_by_metadata
async
¶
Get checkpoints by metadata field.
Source code in src/locus/memory/backends/opensearch.py
FileCheckpointer ¶
Bases: BaseCheckpointer
File-based checkpointer for persistent local storage.
Stores each checkpoint as a JSON file, organized by thread ID. Provides durable storage that survives process restarts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_dir
|
str | Path
|
Base directory for checkpoint storage. Defaults to ".locus_checkpoints" in current directory. |
'.locus_checkpoints'
|
pretty
|
bool
|
Whether to format JSON for readability (default True) |
True
|
Example
Source code in src/locus/memory/backends/file.py
capabilities
property
¶
Return the capabilities of this checkpointer.
Override in subclasses to advertise supported features.
exists
async
¶
Check if a checkpoint exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to check. If None, checks if any checkpoint exists for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the checkpoint exists |
Source code in src/locus/memory/checkpointer.py
close
async
¶
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
get_metadata
async
¶
Get checkpoint metadata.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Metadata dict or None if not found |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete old checkpoints.
Requires: capabilities.vacuum = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted checkpoints |
Source code in src/locus/memory/checkpointer.py
copy_thread
async
¶
Copy a thread to create a branch.
Requires: capabilities.branching = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_thread_id
|
str
|
Source thread to copy from |
required |
dest_thread_id
|
str
|
Destination thread ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in src/locus/memory/checkpointer.py
list_threads
async
¶
List all thread IDs.
Requires: capabilities.list_threads = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum threads to return |
100
|
pattern
|
str
|
Pattern to filter threads (backend-specific) |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of thread IDs |
Source code in src/locus/memory/checkpointer.py
list_with_metadata
async
¶
List checkpoints with their metadata.
Requires: capabilities.list_with_metadata = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of {thread_id, checkpoint_id, metadata, ...} dicts |
Source code in src/locus/memory/checkpointer.py
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state to a JSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state |
required |
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying/filtering checkpoints |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID for the saved state |
Source code in src/locus/memory/backends/file.py
load
async
¶
Load agent state from a JSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint ID (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/backends/file.py
list_checkpoints
async
¶
List available checkpoints for a thread.
Reads checkpoint files and returns IDs sorted by creation time (newest first).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/backends/file.py
delete
async
¶
Delete checkpoint file(s).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to delete (all if None) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if deletion was successful |
Source code in src/locus/memory/backends/file.py
get_storage_path ¶
get_disk_usage
async
¶
Get total disk usage in bytes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str | None
|
Specific thread (all threads if None) |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Total size in bytes |
Source code in src/locus/memory/backends/file.py
HTTPCheckpointer ¶
HTTPCheckpointer(base_url: str, headers: dict[str, str] | None = None, auth: tuple[str, str] | None = None, timeout: float = 30.0)
Bases: BaseCheckpointer
HTTP API-based checkpointer for remote storage.
Stores checkpoints via HTTP API calls, suitable for distributed systems or cloud-based storage backends.
The API is expected to implement the following endpoints: - POST /threads/{thread_id}/checkpoints - Create checkpoint - GET /threads/{thread_id}/checkpoints/{checkpoint_id} - Get checkpoint - GET /threads/{thread_id}/checkpoints - List checkpoints - DELETE /threads/{thread_id}/checkpoints/{checkpoint_id} - Delete checkpoint
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_url
|
str
|
Base URL of the checkpoint API |
required |
headers
|
dict[str, str] | None
|
Additional headers to include in requests |
None
|
auth
|
tuple[str, str] | None
|
Authentication tuple (username, password) for basic auth |
None
|
timeout
|
float
|
Request timeout in seconds |
30.0
|
Example
Source code in src/locus/memory/backends/http.py
capabilities
property
¶
Return the capabilities of this checkpointer.
Override in subclasses to advertise supported features.
exists
async
¶
Check if a checkpoint exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to check. If None, checks if any checkpoint exists for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the checkpoint exists |
Source code in src/locus/memory/checkpointer.py
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
get_metadata
async
¶
Get checkpoint metadata.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Metadata dict or None if not found |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete old checkpoints.
Requires: capabilities.vacuum = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted checkpoints |
Source code in src/locus/memory/checkpointer.py
copy_thread
async
¶
Copy a thread to create a branch.
Requires: capabilities.branching = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_thread_id
|
str
|
Source thread to copy from |
required |
dest_thread_id
|
str
|
Destination thread ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in src/locus/memory/checkpointer.py
list_threads
async
¶
List all thread IDs.
Requires: capabilities.list_threads = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum threads to return |
100
|
pattern
|
str
|
Pattern to filter threads (backend-specific) |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of thread IDs |
Source code in src/locus/memory/checkpointer.py
list_with_metadata
async
¶
List checkpoints with their metadata.
Requires: capabilities.list_with_metadata = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of {thread_id, checkpoint_id, metadata, ...} dicts |
Source code in src/locus/memory/checkpointer.py
close
async
¶
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state via HTTP POST.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state |
required |
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying/filtering checkpoints |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID for the saved state |
Raises:
| Type | Description |
|---|---|
HTTPError
|
If the request fails |
Source code in src/locus/memory/backends/http.py
load
async
¶
Load agent state via HTTP GET.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint ID (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/backends/http.py
list_checkpoints
async
¶
List available checkpoints via HTTP GET.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/backends/http.py
delete
async
¶
Delete checkpoint(s) via HTTP DELETE.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to delete (all if None) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if deletion was successful |
Source code in src/locus/memory/backends/http.py
health_check
async
¶
Check if the API is reachable.
Returns:
| Type | Description |
|---|---|
bool
|
True if the API responds successfully |
Source code in src/locus/memory/backends/http.py
__aenter__
async
¶
MemoryCheckpointer ¶
Bases: BaseCheckpointer
In-memory checkpointer for testing and development.
Stores all checkpoints in a dictionary. Data is not persistent and will be lost when the process terminates.
Useful for: - Unit and integration testing - Development and prototyping - Short-lived agent sessions - As a fast caching layer
Capabilities: - list_threads: Yes - persistent_checkpoint_ids: Yes (within process lifetime)
Example
Source code in src/locus/memory/backends/memory.py
exists
async
¶
Check if a checkpoint exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to check. If None, checks if any checkpoint exists for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the checkpoint exists |
Source code in src/locus/memory/checkpointer.py
close
async
¶
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
get_metadata
async
¶
Get checkpoint metadata.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Metadata dict or None if not found |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete old checkpoints.
Requires: capabilities.vacuum = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted checkpoints |
Source code in src/locus/memory/checkpointer.py
copy_thread
async
¶
Copy a thread to create a branch.
Requires: capabilities.branching = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_thread_id
|
str
|
Source thread to copy from |
required |
dest_thread_id
|
str
|
Destination thread ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in src/locus/memory/checkpointer.py
list_with_metadata
async
¶
List checkpoints with their metadata.
Requires: capabilities.list_with_metadata = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of {thread_id, checkpoint_id, metadata, ...} dicts |
Source code in src/locus/memory/checkpointer.py
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state to memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state |
required |
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for the checkpoint |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID for the saved state |
Source code in src/locus/memory/backends/memory.py
load
async
¶
Load agent state from memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint ID (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/backends/memory.py
list_checkpoints
async
¶
List available checkpoints for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/backends/memory.py
delete
async
¶
Delete checkpoint(s) from memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to delete (all if None) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if deletion was successful |
Source code in src/locus/memory/backends/memory.py
clear ¶
get_thread_ids ¶
list_threads
async
¶
List all thread IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum threads to return |
100
|
pattern
|
str
|
Pattern to filter (supports * as wildcard) |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of thread IDs |
Source code in src/locus/memory/backends/memory.py
get_checkpoint_count ¶
Get count of stored checkpoints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str | None
|
Specific thread (all threads if None) |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of checkpoints |
Source code in src/locus/memory/backends/memory.py
Adapters¶
StorageBackendAdapter wraps any of the simple key-value backends
above into the full BaseCheckpointer interface. The convenience
factory functions below are one-line shortcuts.
StorageBackendAdapter ¶
Bases: BaseCheckpointer
Adapter that wraps simple storage backends to implement BaseCheckpointer.
Storage backends have a simple interface: - save(thread_id: str, data: dict) -> None - load(thread_id: str) -> dict | None - delete(thread_id: str) -> bool - exists(thread_id: str) -> bool - list_threads() -> list[str]
This adapter converts between AgentState and dict representations.
Key improvement: Checkpoint IDs are now stored IN the backend, not in memory. This ensures persistence across restarts.
Storage schema:
- {thread_id}:{checkpoint_id} -> checkpoint data
- {thread_id}:latest -> latest checkpoint (for quick access)
- {thread_id}:_checkpoints -> list of checkpoint metadata
Example
from locus.memory.backends import RedisBackend from locus.memory.backends.adapters import StorageBackendAdapter
Create storage backend¶
storage = RedisBackend(url="redis://localhost:6379")
Wrap with adapter for use with Agent¶
checkpointer = StorageBackendAdapter(storage)
Use with Agent¶
agent = Agent(model=model, checkpointer=checkpointer)
Initialize adapter with a storage backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
Any
|
Storage backend with save/load/delete/exists methods |
required |
Source code in src/locus/memory/backends/adapters.py
capabilities
property
¶
Derive capabilities from backend methods.
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state with persistent checkpoint ID tracking.
Source code in src/locus/memory/backends/adapters.py
load
async
¶
Load agent state from the storage backend.
Source code in src/locus/memory/backends/adapters.py
list_checkpoints
async
¶
List available checkpoints from persistent index.
Source code in src/locus/memory/backends/adapters.py
delete
async
¶
Delete checkpoint(s) with index update.
Source code in src/locus/memory/backends/adapters.py
exists
async
¶
Check if checkpoint exists.
Source code in src/locus/memory/backends/adapters.py
search
async
¶
Delegate to backend search.
Source code in src/locus/memory/backends/adapters.py
query_by_metadata
async
¶
Delegate to backend metadata query.
Source code in src/locus/memory/backends/adapters.py
get_metadata
async
¶
Get checkpoint metadata from index or backend.
Source code in src/locus/memory/backends/adapters.py
vacuum
async
¶
Delegate to backend vacuum.
copy_thread
async
¶
Copy all checkpoints from one thread to another (branching).
Source code in src/locus/memory/backends/adapters.py
list_threads
async
¶
Delegate to backend list_threads.
Source code in src/locus/memory/backends/adapters.py
list_with_metadata
async
¶
Delegate to backend list_with_metadata.
Source code in src/locus/memory/backends/adapters.py
oracle_checkpointer ¶
oracle_checkpointer(dsn: str | None = None, user: str = 'admin', password: str = '', **kwargs: Any) -> StorageBackendAdapter
Create an Oracle Database-backed checkpointer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dsn
|
str | None
|
Oracle connection string (e.g., "host:port/service_name") |
None
|
user
|
str
|
Database user |
'admin'
|
password
|
str
|
Database password |
''
|
**kwargs
|
Any
|
Additional OracleBackend options (table_name, wallet_location) |
{}
|
Returns:
| Type | Description |
|---|---|
StorageBackendAdapter
|
StorageBackendAdapter wrapping OracleBackend |
Capabilities
- search: Yes (via JSON search)
- metadata_query: Yes
- vacuum: Yes
- list_threads: Yes
- persistent_checkpoint_ids: Yes
Example
checkpointer = oracle_checkpointer( ... dsn="mydb_high", ... user="admin", ... password="secret", ... ) agent = Agent(model=model, checkpointer=checkpointer)
Source code in src/locus/memory/backends/adapters.py
oci_bucket_checkpointer ¶
oci_bucket_checkpointer(bucket_name: str, namespace: str, prefix: str = 'locus/checkpoints/', **kwargs: Any) -> BaseCheckpointer
Create an OCI Object Storage-backed checkpointer.
OCIBucketBackend is a native BaseCheckpointer — this factory is
kept as a thin convenience alias for parity with the other backend
factories. You can just as well instantiate the class directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket_name
|
str
|
OCI bucket name |
required |
namespace
|
str
|
OCI namespace |
required |
prefix
|
str
|
Object prefix |
'locus/checkpoints/'
|
**kwargs
|
Any
|
Additional OCIBucketBackend options (profile_name, auth_type, region) |
{}
|
Example
checkpointer = oci_bucket_checkpointer( ... bucket_name="my-checkpoints", ... namespace="my-namespace", ... ) agent = Agent(config=cfg, checkpointer=checkpointer)
Source code in src/locus/memory/backends/adapters.py
opensearch_checkpointer ¶
opensearch_checkpointer(hosts: list[str] | None = None, index_name: str = 'locus-checkpoints', **kwargs: Any) -> StorageBackendAdapter
Create an OpenSearch-backed checkpointer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hosts
|
list[str] | None
|
OpenSearch hosts |
None
|
index_name
|
str
|
Index name for checkpoints |
'locus-checkpoints'
|
**kwargs
|
Any
|
Additional OpenSearchBackend options (username, password, use_ssl) |
{}
|
Returns:
| Type | Description |
|---|---|
StorageBackendAdapter
|
StorageBackendAdapter wrapping OpenSearchBackend |
Capabilities
- search: Yes (full-text search)
- metadata_query: Yes (via get_by_metadata)
- list_threads: Yes
- persistent_checkpoint_ids: Yes
Example
checkpointer = opensearch_checkpointer(hosts=["localhost:9200"]) agent = Agent(model=model, checkpointer=checkpointer)
Source code in src/locus/memory/backends/adapters.py
postgresql_checkpointer ¶
postgresql_checkpointer(host: str = 'localhost', port: int = 5432, database: str = 'locus', user: str = 'postgres', password: str = '', dsn: str | None = None, **kwargs: Any) -> StorageBackendAdapter
Create a PostgreSQL-backed checkpointer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
PostgreSQL host |
'localhost'
|
port
|
int
|
PostgreSQL port |
5432
|
database
|
str
|
Database name |
'locus'
|
user
|
str
|
Database user |
'postgres'
|
password
|
str
|
Database password |
''
|
dsn
|
str | None
|
Connection string (overrides other params) |
None
|
**kwargs
|
Any
|
Additional PostgreSQLBackend options |
{}
|
Returns:
| Type | Description |
|---|---|
StorageBackendAdapter
|
StorageBackendAdapter wrapping PostgreSQLBackend |
Capabilities
- search: Yes (via search_data)
- metadata_query: Yes (via query_by_metadata)
- vacuum: Yes
- list_threads: Yes
- persistent_checkpoint_ids: Yes
Example
checkpointer = postgresql_checkpointer(database="myapp") agent = Agent(model=model, checkpointer=checkpointer)
Source code in src/locus/memory/backends/adapters.py
redis_checkpointer ¶
redis_checkpointer(url: str = 'redis://localhost:6379', prefix: str = 'locus:state:', **kwargs: Any) -> StorageBackendAdapter
Create a Redis-backed checkpointer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Redis URL |
'redis://localhost:6379'
|
prefix
|
str
|
Key prefix for all checkpoints |
'locus:state:'
|
**kwargs
|
Any
|
Additional RedisBackend options (ttl_seconds, db) |
{}
|
Returns:
| Type | Description |
|---|---|
StorageBackendAdapter
|
StorageBackendAdapter wrapping RedisBackend |
Capabilities
- ttl: Yes (via ttl_seconds)
- list_threads: Yes
- persistent_checkpoint_ids: Yes
Example
checkpointer = redis_checkpointer("redis://localhost:6379") agent = Agent(model=model, checkpointer=checkpointer)