Skip to content

Tools

Decorator

The primary entry point — wrap a Python function and you get a Tool the agent can call. Parameters are introspected from the type hints and the JSON Schema is generated automatically.

tool

tool(fn: Callable[P, R]) -> Tool
tool(fn: None = None, *, name: str | None = None, description: str | None = None, idempotent: bool = False) -> Callable[[Callable[P, R]], Tool]
tool(fn: Callable[P, R] | None = None, *, name: str | None = None, description: str | None = None, idempotent: bool = False) -> Tool | Callable[[Callable[P, R]], Tool]

Decorator to create a tool from a function.

Usage

@tool def search(query: str) -> str: '''Search the knowledge base.''' return "results..."

@tool(name="custom_name", description="Custom description") def my_tool(x: int) -> int: return x * 2

@tool(idempotent=True) def book_flight(flight_id: str, customer_id: str) -> dict: '''Book a flight — safe to mark idempotent because repeated calls with the same flight/customer would create duplicate bookings, which we never want.''' ...

Parameters:

Name Type Description Default
fn Callable[P, R] | None

The function to wrap

None
name str | None

Override tool name (defaults to function name)

None
description str | None

Override description (defaults to docstring)

None
idempotent bool

If True, the ReAct loop deduplicates calls with matching (name, arguments) within a single agent run. Prevents duplicate side-effects when a model re-issues a tool call it has already made this turn.

False

Returns:

Type Description
Tool | Callable[[Callable[P, R]], Tool]

Tool instance

Source code in src/locus/tools/decorator.py
def tool(
    fn: Callable[P, R] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    idempotent: bool = False,
) -> Tool | Callable[[Callable[P, R]], Tool]:
    """
    Decorator to create a tool from a function.

    Usage:
        @tool
        def search(query: str) -> str:
            '''Search the knowledge base.'''
            return "results..."

        @tool(name="custom_name", description="Custom description")
        def my_tool(x: int) -> int:
            return x * 2

        @tool(idempotent=True)
        def book_flight(flight_id: str, customer_id: str) -> dict:
            '''Book a flight — safe to mark idempotent because repeated
            calls with the same flight/customer would create duplicate
            bookings, which we never want.'''
            ...

    Args:
        fn: The function to wrap
        name: Override tool name (defaults to function name)
        description: Override description (defaults to docstring)
        idempotent: If True, the ReAct loop deduplicates calls with
            matching (name, arguments) within a single agent run. Prevents
            duplicate side-effects when a model re-issues a tool call it
            has already made this turn.

    Returns:
        Tool instance
    """

    def decorator(func: Callable[P, R]) -> Tool:
        # Generate schema
        schema = generate_schema(func, description)
        func_schema = schema["function"]

        return Tool(
            name=name or func_schema["name"],
            description=func_schema["description"],
            parameters=func_schema["parameters"],
            fn=func,
            idempotent=idempotent,
        )

    if fn is not None:
        # Called without arguments: @tool
        return decorator(fn)

    # Called with arguments: @tool(name="...")
    return decorator

Tool

Bases: BaseModel

A tool that can be called by agents.

Created via the @tool decorator.

idempotent class-attribute instance-attribute

idempotent: bool = False

When True, the ReAct loop deduplicates calls: if the model emits the same (tool_name, arguments) combination that has already been executed earlier in the current agent run, the prior result is reused and the tool function is not invoked again. Use for tools that either have side-effects you don't want duplicated (bookings, transfers, writes) or whose output is stable across the run (config/date lookups).

execute async

execute(ctx: ToolContext | None = None, **kwargs: Any) -> Any

Execute the tool with given arguments.

Parameters:

Name Type Description Default
ctx ToolContext | None

Optional tool context (injected if function accepts it)

None
**kwargs Any

Tool arguments

{}

Returns:

Type Description
Any

Tool result

Source code in src/locus/tools/decorator.py
async def execute(self, ctx: ToolContext | None = None, **kwargs: Any) -> Any:
    """
    Execute the tool with given arguments.

    Args:
        ctx: Optional tool context (injected if function accepts it)
        **kwargs: Tool arguments

    Returns:
        Tool result
    """
    # Check if function accepts context
    sig = inspect.signature(self.fn)
    accepts_ctx = any(name in ("ctx", "context") for name in sig.parameters)

    if accepts_ctx and ctx is not None:
        # Find the context parameter name
        ctx_param = next(name for name in sig.parameters if name in ("ctx", "context"))
        kwargs[ctx_param] = ctx

    # Execute function
    if asyncio.iscoroutinefunction(self.fn):
        result = await self.fn(**kwargs)
    else:
        # Run sync function in thread pool. Propagate the current
        # contextvars context so observability emits (run_id) and
        # any other contextvar-driven instrumentation see the same
        # state inside the worker thread.
        import contextvars  # noqa: PLC0415

        loop = asyncio.get_event_loop()
        ctxvars_snapshot = contextvars.copy_context()
        result = await loop.run_in_executor(
            None,
            lambda: ctxvars_snapshot.run(self.fn, **kwargs),
        )

    return self._format_result(result)

to_openai_schema

to_openai_schema() -> dict[str, Any]

Get OpenAI-compatible tool schema.

Source code in src/locus/tools/decorator.py
def to_openai_schema(self) -> dict[str, Any]:
    """Get OpenAI-compatible tool schema."""
    return {
        "type": "function",
        "function": {
            "name": self.name,
            "description": self.description,
            "parameters": self.parameters,
        },
    }

__call__

__call__(*args: Any, **kwargs: Any) -> Any

Direct invocation of the tool.

Source code in src/locus/tools/decorator.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Direct invocation of the tool."""
    return self.fn(*args, **kwargs)

Tool context

Inject per-call context (the agent's state, custom metadata, the hook orchestrator) into a tool by declaring a ToolContext parameter.

ToolContext

Bases: BaseModel

Context passed to tools during execution.

Provides access to agent state, metadata, and utilities.

messages property

messages: list[Any]

Get conversation messages (if state available).

confidence property

confidence: float

Get current confidence score (if state available).

get_metadata

get_metadata(key: str, default: Any = None) -> Any

Get a metadata value.

Source code in src/locus/tools/context.py
def get_metadata(self, key: str, default: Any = None) -> Any:
    """Get a metadata value."""
    return self.invocation_metadata.get(key, default)

get_config

get_config(key: str, default: Any = None) -> Any

Get a tool config value.

Source code in src/locus/tools/context.py
def get_config(self, key: str, default: Any = None) -> Any:
    """Get a tool config value."""
    return self.tool_config.get(key, default)

Registry

The agent's compiled tool collection. Built once during Agent.__init__; mutating config.tools directly afterwards has no effect (use Agent.add_tool / add_tools instead).

ToolRegistry

Bases: BaseModel

Registry for managing available tools.

Handles tool registration, lookup, and schema generation.

register

register(tool: Tool) -> None

Register a tool.

Source code in src/locus/tools/registry.py
def register(self, tool: Tool) -> None:
    """Register a tool."""
    if tool.name in self.tools:
        msg = f"Tool already registered: {tool.name}"
        raise ValueError(msg)
    self.tools[tool.name] = tool

register_many

register_many(tools: list[Tool]) -> None

Register multiple tools.

Source code in src/locus/tools/registry.py
def register_many(self, tools: list[Tool]) -> None:
    """Register multiple tools."""
    for tool in tools:
        self.register(tool)

unregister

unregister(name: str) -> Tool | None

Unregister a tool by name.

Source code in src/locus/tools/registry.py
def unregister(self, name: str) -> Tool | None:
    """Unregister a tool by name."""
    return self.tools.pop(name, None)

get

get(name: str) -> Tool | None

Get a tool by name.

Source code in src/locus/tools/registry.py
def get(self, name: str) -> Tool | None:
    """Get a tool by name."""
    return self.tools.get(name)

get_or_raise

get_or_raise(name: str) -> Tool

Get a tool by name, raising if not found.

Source code in src/locus/tools/registry.py
def get_or_raise(self, name: str) -> Tool:
    """Get a tool by name, raising if not found."""
    tool = self.tools.get(name)
    if tool is None:
        available = list(self.tools.keys())
        msg = f"Tool not found: {name}. Available: {available}"
        raise KeyError(msg)
    return tool

list_tools

list_tools() -> list[str]

List all registered tool names.

Source code in src/locus/tools/registry.py
def list_tools(self) -> list[str]:
    """List all registered tool names."""
    return list(self.tools.keys())

to_openai_schemas

to_openai_schemas() -> list[dict[str, Any]]

Get all tools as OpenAI-compatible schemas.

Source code in src/locus/tools/registry.py
def to_openai_schemas(self) -> list[dict[str, Any]]:
    """Get all tools as OpenAI-compatible schemas."""
    return [tool.to_openai_schema() for tool in self.tools.values()]

__contains__

__contains__(name: str) -> bool

Check if a tool is registered.

Source code in src/locus/tools/registry.py
def __contains__(self, name: str) -> bool:
    """Check if a tool is registered."""
    return name in self.tools

__len__

__len__() -> int

Number of registered tools.

Source code in src/locus/tools/registry.py
def __len__(self) -> int:
    """Number of registered tools."""
    return len(self.tools)

__iter__

__iter__() -> Iterator[Tool]

Iterate over tools.

Source code in src/locus/tools/registry.py
def __iter__(self) -> Iterator[Tool]:  # type: ignore[override]
    """Iterate over tools."""
    return iter(self.tools.values())

Executors

The strategy the agent uses to run a batch of tool calls. The default is ConcurrentExecutor (parallel up to max_concurrency); SequentialExecutor runs them one at a time for tools that share non-thread-safe state.

ToolExecutor

Bases: BaseModel, ABC

Base class for tool execution strategies.

Subclasses implement different execution patterns (sequential, concurrent, rate-limited, etc.)

execute abstractmethod async

execute(tool_calls: list[ToolCall], registry: ToolRegistry, ctx_factory: ToolContextFactory | None = None) -> list[ToolResult]

Execute a batch of tool calls.

Parameters:

Name Type Description Default
tool_calls list[ToolCall]

Tool calls to execute

required
registry ToolRegistry

Tool registry to look up tools

required
ctx_factory ToolContextFactory | None

Optional factory for creating tool contexts

None

Returns:

Type Description
list[ToolResult]

List of tool results

Source code in src/locus/tools/executor.py
@abstractmethod
async def execute(
    self,
    tool_calls: list[ToolCall],
    registry: ToolRegistry,
    ctx_factory: ToolContextFactory | None = None,
) -> list[ToolResult]:
    """
    Execute a batch of tool calls.

    Args:
        tool_calls: Tool calls to execute
        registry: Tool registry to look up tools
        ctx_factory: Optional factory for creating tool contexts

    Returns:
        List of tool results
    """
    ...

execute_streaming async

execute_streaming(tool_calls: list[ToolCall], registry: ToolRegistry, ctx_factory: ToolContextFactory | None = None) -> AsyncIterator[tuple[int, ToolResult]]

Yield (input_index, result) as each tool call completes.

The default implementation falls back to :meth:execute and yields in input order — fine for executors with no useful streaming semantics (sequential, single-shot rate-limited). Concurrent executors override this to stream results in completion order.

Breaking out of the consumer async for (or raising inside its body) must cancel any tasks the executor has in flight. Concrete implementations are responsible for that guarantee — the runtime loop relies on it for interrupt-driven sibling cancellation.

Parameters:

Name Type Description Default
tool_calls list[ToolCall]

Tool calls to execute. input_index is the position in this list.

required
registry ToolRegistry

Tool registry to look up tools.

required
ctx_factory ToolContextFactory | None

Optional factory for creating tool contexts.

None

Yields:

Type Description
AsyncIterator[tuple[int, ToolResult]]

Tuples of (input_index, ToolResult), in completion order

AsyncIterator[tuple[int, ToolResult]]

for concurrent executors, input order for sequential ones.

Source code in src/locus/tools/executor.py
async def execute_streaming(
    self,
    tool_calls: list[ToolCall],
    registry: ToolRegistry,
    ctx_factory: ToolContextFactory | None = None,
) -> AsyncIterator[tuple[int, ToolResult]]:
    """Yield ``(input_index, result)`` as each tool call completes.

    The default implementation falls back to :meth:`execute` and yields
    in input order — fine for executors with no useful streaming
    semantics (sequential, single-shot rate-limited). Concurrent
    executors override this to stream results in completion order.

    Breaking out of the consumer ``async for`` (or raising inside its
    body) must cancel any tasks the executor has in flight. Concrete
    implementations are responsible for that guarantee — the runtime
    loop relies on it for interrupt-driven sibling cancellation.

    Args:
        tool_calls: Tool calls to execute. ``input_index`` is the
            position in this list.
        registry: Tool registry to look up tools.
        ctx_factory: Optional factory for creating tool contexts.

    Yields:
        Tuples of ``(input_index, ToolResult)``, in completion order
        for concurrent executors, input order for sequential ones.
    """
    results = await self.execute(tool_calls, registry, ctx_factory)
    for i, r in enumerate(results):
        yield i, r

ConcurrentExecutor

Bases: ToolExecutor

Execute tools concurrently with optional concurrency limit.

execute async

execute(tool_calls: list[ToolCall], registry: ToolRegistry, ctx_factory: ToolContextFactory | None = None) -> list[ToolResult]

Execute tools concurrently.

Source code in src/locus/tools/executor.py
async def execute(
    self,
    tool_calls: list[ToolCall],
    registry: ToolRegistry,
    ctx_factory: ToolContextFactory | None = None,
) -> list[ToolResult]:
    """Execute tools concurrently."""
    semaphore = asyncio.Semaphore(self.max_concurrency)

    async def execute_with_limit(tc: ToolCall) -> ToolResult:
        async with semaphore:
            return await self._execute_one(tc, registry, ctx_factory)

    tasks = [execute_with_limit(tc) for tc in tool_calls]
    results = await asyncio.gather(*tasks)

    return list(results)

execute_streaming async

execute_streaming(tool_calls: list[ToolCall], registry: ToolRegistry, ctx_factory: ToolContextFactory | None = None) -> AsyncIterator[tuple[int, ToolResult]]

Yield (input_index, result) in completion order.

Fan out one asyncio.Task per tool call (bounded by max_concurrency via a shared semaphore). Results land on a queue as they finish; the consumer iterates the queue and sees events in completion order. If the consumer breaks early or raises inside the async for body, the finally cancels every still-in-flight task — that's how the runtime loop propagates an interrupt to siblings.

We deliberately use create_task + a stop-marker pattern instead of asyncio.TaskGroup: TaskGroup blocks __aexit__ on every task finishing, which deadlocks the streaming consumer when a producer is cancelled (the cancelled task never puts on the queue, so the consumer's queue.get() blocks forever). The per-task finally here unconditionally posts a stop-marker so the consumer always learns when each task is done, even on cancellation.

Source code in src/locus/tools/executor.py
async def execute_streaming(
    self,
    tool_calls: list[ToolCall],
    registry: ToolRegistry,
    ctx_factory: ToolContextFactory | None = None,
) -> AsyncIterator[tuple[int, ToolResult]]:
    """Yield ``(input_index, result)`` in completion order.

    Fan out one ``asyncio.Task`` per tool call (bounded by
    ``max_concurrency`` via a shared semaphore). Results land on a
    queue as they finish; the consumer iterates the queue and sees
    events in completion order. If the consumer breaks early or
    raises inside the ``async for`` body, the ``finally`` cancels
    every still-in-flight task — that's how the runtime loop
    propagates an interrupt to siblings.

    We deliberately use ``create_task`` + a stop-marker pattern
    instead of ``asyncio.TaskGroup``: TaskGroup blocks ``__aexit__``
    on every task finishing, which deadlocks the streaming consumer
    when a producer is cancelled (the cancelled task never puts on
    the queue, so the consumer's ``queue.get()`` blocks forever).
    The per-task ``finally`` here unconditionally posts a
    stop-marker so the consumer always learns when each task is
    done, even on cancellation.
    """
    if not tool_calls:
        return

    semaphore = asyncio.Semaphore(self.max_concurrency)
    # Items: ``(input_index, ToolResult)`` for completed work,
    # ``(input_index, _STOP)`` to signal that a producer finished
    # (normally or via cancellation). Using a sentinel rather than
    # a typed wrapper keeps the per-iteration overhead minimal.
    queue: asyncio.Queue[tuple[int, Any]] = asyncio.Queue()

    async def run_one(input_idx: int, tc: ToolCall) -> None:
        try:
            async with semaphore:
                result = await self._execute_one(tc, registry, ctx_factory)
            queue.put_nowait((input_idx, result))
        finally:
            # Posted on success, exception, AND cancellation —
            # the consumer relies on exactly one stop-marker per
            # task to know when the batch has drained.
            queue.put_nowait((input_idx, _STOP))

    tasks = [
        asyncio.create_task(run_one(i, tc), name=f"tool-{tc.name}-{i}")
        for i, tc in enumerate(tool_calls)
    ]

    try:
        remaining = len(tasks)
        while remaining > 0:
            input_idx, item = await queue.get()
            if item is _STOP:
                remaining -= 1
                continue
            yield input_idx, item
    finally:
        for task in tasks:
            if not task.done():
                task.cancel()
        # ``return_exceptions=True`` swallows the CancelledError
        # bubbles so cleanup never raises into the caller.
        await asyncio.gather(*tasks, return_exceptions=True)

SequentialExecutor

Bases: ToolExecutor

Execute tools one at a time.

execute async

execute(tool_calls: list[ToolCall], registry: ToolRegistry, ctx_factory: ToolContextFactory | None = None) -> list[ToolResult]

Execute tools sequentially.

Source code in src/locus/tools/executor.py
async def execute(
    self,
    tool_calls: list[ToolCall],
    registry: ToolRegistry,
    ctx_factory: ToolContextFactory | None = None,
) -> list[ToolResult]:
    """Execute tools sequentially."""
    results: list[ToolResult] = []

    for tc in tool_calls:
        result = await self._execute_one(tc, registry, ctx_factory)
        results.append(result)

    return results

execute_streaming async

execute_streaming(tool_calls: list[ToolCall], registry: ToolRegistry, ctx_factory: ToolContextFactory | None = None) -> AsyncIterator[tuple[int, ToolResult]]

Yield (input_index, result) per call as it finishes.

Sequential execution means input order == completion order; this impl exists so the runtime loop can treat both executors through the same streaming interface. Breaking from the consumer simply stops further calls — there are no siblings to cancel.

Source code in src/locus/tools/executor.py
async def execute_streaming(
    self,
    tool_calls: list[ToolCall],
    registry: ToolRegistry,
    ctx_factory: ToolContextFactory | None = None,
) -> AsyncIterator[tuple[int, ToolResult]]:
    """Yield ``(input_index, result)`` per call as it finishes.

    Sequential execution means input order == completion order; this
    impl exists so the runtime loop can treat both executors through
    the same streaming interface. Breaking from the consumer simply
    stops further calls — there are no siblings to cancel.
    """
    for i, tc in enumerate(tool_calls):
        result = await self._execute_one(tc, registry, ctx_factory)
        yield i, result

Schema generation

JSON Schema generation from Python type hints / Pydantic models — used by the @tool decorator but also callable directly when you need a schema for an external system (e.g. an MCP server).

generate_schema

generate_schema(fn: Callable[..., Any], description: str | None = None) -> dict[str, Any]

Generate OpenAI-compatible tool schema from a function.

Parameters:

Name Type Description Default
fn Callable[..., Any]

The function to generate schema for

required
description str | None

Override description (uses docstring if not provided)

None

Returns:

Type Description
dict[str, Any]

Tool schema in OpenAI function format

Source code in src/locus/tools/schema.py
def generate_schema(fn: Callable[..., Any], description: str | None = None) -> dict[str, Any]:
    """
    Generate OpenAI-compatible tool schema from a function.

    Args:
        fn: The function to generate schema for
        description: Override description (uses docstring if not provided)

    Returns:
        Tool schema in OpenAI function format
    """
    sig = inspect.signature(fn)
    hints = get_type_hints(fn)

    # Get description from docstring if not provided
    if description is None:
        description = inspect.getdoc(fn) or f"Call the {fn.__name__} function"

    # Parse docstring for parameter descriptions
    param_descriptions = _parse_docstring_params(fn)

    # Build parameters schema
    properties: dict[str, Any] = {}
    required: list[str] = []

    for name, param in sig.parameters.items():
        # Skip self, cls, and context parameters
        if name in ("self", "cls", "ctx", "context"):
            continue

        # Get type hint
        hint = hints.get(name, str)

        # Skip ToolContext type
        if _is_tool_context(hint):
            continue

        # Convert to JSON schema
        prop = python_type_to_json_type(hint)

        # Add description from docstring
        if name in param_descriptions:
            prop["description"] = param_descriptions[name]

        # Handle default values
        if param.default is not inspect.Parameter.empty:
            prop["default"] = param.default
        else:
            required.append(name)

        properties[name] = prop

    return {
        "type": "function",
        "function": {
            "name": fn.__name__,
            "description": description,
            "parameters": {
                "type": "object",
                "properties": properties,
                "required": required,
            },
        },
    }

pydantic_to_json_schema

pydantic_to_json_schema(model: type[BaseModel]) -> dict[str, Any]

Convert a Pydantic model to JSON Schema.

Source code in src/locus/tools/schema.py
def pydantic_to_json_schema(model: type[BaseModel]) -> dict[str, Any]:
    """Convert a Pydantic model to JSON Schema."""
    return model.model_json_schema()

Built-in tools

get_today_date

get_today_date() -> dict

Return today's date plus common reference points for date arithmetic.

Call this whenever the user mentions a relative or partial date ("tomorrow", "next Monday", "in ten days", "April 20") so you can convert to an explicit YYYY-MM-DD before calling a date-sensitive tool.

Returns:

Type Description
dict

A dict with:

dict
  • today — today's date (YYYY-MM-DD)
dict
  • weekday — e.g. "Saturday"
dict
  • year — current year
dict
  • tomorrow / day_after_tomorrow
dict
  • next_7_days_by_weekday — map of lower-cased weekday → ISO date for the next seven days, so "Monday" / "Friday" resolve without further arithmetic
dict
  • one_week_from_now / two_weeks_from_now
Source code in src/locus/tools/builtins.py
@tool(idempotent=True)
def get_today_date() -> dict:
    """Return today's date plus common reference points for date arithmetic.

    Call this whenever the user mentions a relative or partial date
    ("tomorrow", "next Monday", "in ten days", "April 20") so you can
    convert to an explicit YYYY-MM-DD before calling a date-sensitive tool.

    Returns:
        A dict with:

        - ``today`` — today's date (YYYY-MM-DD)
        - ``weekday`` — e.g. ``"Saturday"``
        - ``year`` — current year
        - ``tomorrow`` / ``day_after_tomorrow``
        - ``next_7_days_by_weekday`` — map of lower-cased weekday → ISO date
            for the next seven days, so "Monday" / "Friday" resolve without
            further arithmetic
        - ``one_week_from_now`` / ``two_weeks_from_now``
    """
    now = datetime.now().astimezone()
    today = now.date()
    return {
        "today": today.isoformat(),
        "weekday": now.strftime("%A"),
        "year": today.year,
        "tomorrow": (today + timedelta(days=1)).isoformat(),
        "day_after_tomorrow": (today + timedelta(days=2)).isoformat(),
        "next_7_days_by_weekday": {
            (today + timedelta(days=n)).strftime("%A").lower(): (
                today + timedelta(days=n)
            ).isoformat()
            for n in range(1, 8)
        },
        "one_week_from_now": (today + timedelta(days=7)).isoformat(),
        "two_weeks_from_now": (today + timedelta(days=14)).isoformat(),
    }