Skip to content

Multi-agent

Composition

The agent-level pipelines. Same AgentResult shape at the boundary as a plain Agent.run_sync(...) call — these are drop-in for any place that takes an agent.

SequentialPipeline

Bases: BaseModel

Run agents in order, passing each output as the next agent's prompt.

Each agent receives either the original task (first agent) or the previous agent's output (subsequent agents). A prompt_template can customize how the previous output is passed.

Example

pipeline = SequentialPipeline( ... agents=[researcher, writer, editor], ... prompt_template="Based on the following:\n{previous_output}\n\nOriginal task: {task}", ... ) result = await pipeline.run("Write about quantum computing")

run async

run(task: str) -> PipelineResult

Execute agents sequentially, chaining outputs.

Source code in src/locus/agent/composition.py
async def run(self, task: str) -> PipelineResult:
    """Execute agents sequentially, chaining outputs."""
    # Local import — observability is optional; emit is a no-op
    # outside an active run_context.
    from locus.observability.emit import (  # noqa: PLC0415
        EV_PIPELINE_STAGE_COMPLETED,
        EV_PIPELINE_STAGE_STARTED,
        emit,
    )

    start_time = time.perf_counter()
    outputs: list[str] = []
    current_input = task

    try:
        for i, agent in enumerate(self.agents):
            if i > 0 and outputs:
                # Format prompt with previous output
                current_input = self.prompt_template.format(
                    previous_output=outputs[-1],
                    task=task,
                )

            stage_started = time.perf_counter()
            await emit(
                EV_PIPELINE_STAGE_STARTED,
                pipeline_kind="sequential",
                stage=i,
                stage_count=len(self.agents),
            )
            result = agent.run_sync(current_input)
            output = result.message or ""
            outputs.append(output)
            await emit(
                EV_PIPELINE_STAGE_COMPLETED,
                pipeline_kind="sequential",
                stage=i,
                output_length=len(output),
                duration_ms=(time.perf_counter() - stage_started) * 1000,
                success=True,
            )

        duration_ms = (time.perf_counter() - start_time) * 1000
        return PipelineResult(
            success=True,
            outputs=outputs,
            final_output=outputs[-1] if outputs else "",
            duration_ms=duration_ms,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return PipelineResult(
            success=False,
            outputs=outputs,
            final_output=outputs[-1] if outputs else "",
            duration_ms=duration_ms,
            error=str(e),
        )

ParallelPipeline

Bases: BaseModel

Run agents concurrently and merge their results.

All agents receive the same task (or custom prompts via task_map). Results are collected and merged using the merge_strategy.

Example

pipeline = ParallelPipeline( ... agents=[fact_checker, analyst, summarizer], ... merge_strategy="concatenate", ... ) result = await pipeline.run("Analyze climate change impacts")

run async

run(task: str, task_map: dict[int, str] | None = None) -> PipelineResult

Execute agents in parallel and merge results.

Parameters:

Name Type Description Default
task str

Default task for all agents

required
task_map dict[int, str] | None

Optional mapping of agent index to custom task

None
Source code in src/locus/agent/composition.py
async def run(self, task: str, task_map: dict[int, str] | None = None) -> PipelineResult:
    """Execute agents in parallel and merge results.

    Args:
        task: Default task for all agents
        task_map: Optional mapping of agent index to custom task
    """
    from locus.observability.emit import (  # noqa: PLC0415
        EV_PIPELINE_FANOUT_COMPLETED,
        EV_PIPELINE_FANOUT_STARTED,
        emit,
    )

    start_time = time.perf_counter()
    await emit(
        EV_PIPELINE_FANOUT_STARTED,
        agent_count=len(self.agents),
        merge_strategy=self.merge_strategy,
    )

    async def run_agent(index: int, agent: Any) -> str:
        prompt = task_map.get(index, task) if task_map else task
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(None, agent.run_sync, prompt)
        return result.message or ""

    # ``return_exceptions=True`` so one stuck/failed agent doesn't
    # collapse the whole result into an empty ``outputs=[]`` (which
    # forced every caller into defensive ``if result.success`` and
    # ate which-agent-failed context). Now each slot in ``outputs``
    # is either the agent's reply text or the stringified exception,
    # and ``error`` summarises the per-agent failures.
    tasks = [run_agent(i, agent) for i, agent in enumerate(self.agents)]
    gathered = await asyncio.gather(*tasks, return_exceptions=True)

    outputs: list[str] = []
    agent_errors: list[str] = []
    for i, item in enumerate(gathered):
        if isinstance(item, BaseException):
            outputs.append("")
            agent_errors.append(f"agent[{i}] {type(item).__name__}: {item}")
        else:
            outputs.append(item)

    if self.merge_strategy == "last":
        final = outputs[-1] if outputs else ""
    else:
        final = self.separator.join(o for o in outputs if o)

    duration_ms = (time.perf_counter() - start_time) * 1000
    await emit(
        EV_PIPELINE_FANOUT_COMPLETED,
        agents_succeeded=len(self.agents) - len(agent_errors),
        agents_failed=len(agent_errors),
        duration_ms=duration_ms,
    )
    return PipelineResult(
        success=not agent_errors,
        outputs=outputs,
        final_output=final,
        duration_ms=duration_ms,
        error="; ".join(agent_errors) if agent_errors else None,
    )

LoopAgent

Bases: BaseModel

Run an agent repeatedly until a condition is met.

The agent is called in a loop. After each iteration, the condition function is called with the latest output. If it returns True, the loop stops. A max_loops limit prevents infinite execution.

Example

loop = LoopAgent( ... agent=editor, ... condition=lambda output: "APPROVED" in output, ... max_loops=5, ... loop_prompt="Review and improve:\n{previous_output}\n\nSay APPROVED if quality is good.", ... ) result = await loop.run("Write a haiku about Python")

run async

run(task: str) -> PipelineResult

Execute agent in a loop until condition is met or max_loops reached.

Source code in src/locus/agent/composition.py
async def run(self, task: str) -> PipelineResult:
    """Execute agent in a loop until condition is met or max_loops reached."""
    from locus.observability.emit import (  # noqa: PLC0415
        EV_LOOP_ITERATION_COMPLETED,
        EV_LOOP_ITERATION_STARTED,
        EV_LOOP_TERMINATED,
        emit,
    )

    start_time = time.perf_counter()
    outputs: list[str] = []
    current_input = task
    terminated_by = "max_loops"
    try:
        for i in range(self.max_loops):
            await emit(
                EV_LOOP_ITERATION_STARTED,
                iteration=i,
                max_loops=self.max_loops,
            )
            result = self.agent.run_sync(current_input)
            output = result.message or ""
            outputs.append(output)
            stopped = self.condition(output)
            await emit(
                EV_LOOP_ITERATION_COMPLETED,
                iteration=i,
                output_length=len(output),
                condition_met=stopped,
            )

            # Check termination condition
            if stopped:
                terminated_by = "condition"
                break

            # Prepare next iteration prompt
            if i < self.max_loops - 1:
                current_input = self.loop_prompt.format(
                    previous_output=output,
                    task=task,
                )

        duration_ms = (time.perf_counter() - start_time) * 1000
        await emit(
            EV_LOOP_TERMINATED,
            terminated_by=terminated_by,
            iterations=len(outputs),
            duration_ms=duration_ms,
        )
        return PipelineResult(
            success=True,
            outputs=outputs,
            final_output=outputs[-1] if outputs else "",
            duration_ms=duration_ms,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return PipelineResult(
            success=False,
            outputs=outputs,
            final_output=outputs[-1] if outputs else "",
            duration_ms=duration_ms,
            error=str(e),
        )

Orchestrator + Specialists

A central Orchestrator routes each turn to one of N domain-focused Specialists. RoutingDecision is the typed decision record; the built-in specialists below are factory-built for common roles.

Orchestrator

Bases: BaseModel

Orchestrator for coordinating specialist agents.

Features: - Selects which specialists to invoke based on the task - Routes tasks to appropriate specialists - Correlates findings from multiple specialists - Summarizes results into a coherent response

register_specialist

register_specialist(specialist: Specialist) -> None

Register a specialist with the orchestrator.

Source code in src/locus/multiagent/orchestrator.py
def register_specialist(self, specialist: Specialist) -> None:
    """Register a specialist with the orchestrator."""
    self.specialists[specialist.id] = specialist

register_specialists

register_specialists(specialists: list[Specialist]) -> None

Register multiple specialists.

Source code in src/locus/multiagent/orchestrator.py
def register_specialists(self, specialists: list[Specialist]) -> None:
    """Register multiple specialists."""
    for specialist in specialists:
        self.register_specialist(specialist)

execute async

execute(task: str, context: dict[str, Any] | None = None) -> OrchestratorResult

Execute the orchestration workflow.

Parameters:

Name Type Description Default
task str

The task to process

required
context dict[str, Any] | None

Optional additional context

None

Returns:

Type Description
OrchestratorResult

OrchestratorResult with summary and all findings

Source code in src/locus/multiagent/orchestrator.py
async def execute(
    self,
    task: str,
    context: dict[str, Any] | None = None,
) -> OrchestratorResult:
    """
    Execute the orchestration workflow.

    Args:
        task: The task to process
        context: Optional additional context

    Returns:
        OrchestratorResult with summary and all findings
    """
    # Local import — keeps observability optional. If the user
    # never enters a run_context, ``emit`` is a no-op and the
    # bus singleton is never instantiated.
    from locus.observability.emit import (  # noqa: PLC0415
        EV_ORCHESTRATOR_DECISION,
        EV_ORCHESTRATOR_ROUTING,
        EV_ORCHESTRATOR_SPECIALISTS_INVOKED,
        EV_ORCHESTRATOR_SUMMARY,
        emit,
    )

    start_time = time.perf_counter()
    decisions: list[RoutingDecision] = []

    try:
        await emit(
            EV_ORCHESTRATOR_ROUTING,
            orchestrator_id=self.id,
            task_preview=task[:160],
            specialist_count=len(self.specialists),
        )

        # Step 1: Make routing decision
        routing_decision = await self._make_routing_decision(task)
        decisions.append(routing_decision)

        await emit(
            EV_ORCHESTRATOR_DECISION,
            orchestrator_id=self.id,
            decision="invoke_specialist",
            specialists_selected=routing_decision.specialists,
            reasoning=routing_decision.reasoning,
        )

        # Step 2: Invoke specialists
        specialist_results = await self._invoke_specialists(task, routing_decision)

        await emit(
            EV_ORCHESTRATOR_SPECIALISTS_INVOKED,
            orchestrator_id=self.id,
            specialists_invoked=list(specialist_results.keys()),
            specialists_succeeded=[sid for sid, r in specialist_results.items() if r.success],
            specialists_failed=[sid for sid, r in specialist_results.items() if not r.success],
        )

        # Step 3: Correlate findings
        correlation_decision = RoutingDecision(
            decision_type="correlate",
            reasoning="Correlating findings from specialists",
        )
        decisions.append(correlation_decision)

        await emit(
            EV_ORCHESTRATOR_DECISION,
            orchestrator_id=self.id,
            decision="correlate",
            reasoning="Correlating specialist findings",
        )

        correlation = await self._correlate_findings(task, specialist_results)

        # Step 4: Summarize
        summary_decision = RoutingDecision(
            decision_type="summarize",
            reasoning="Generating final summary",
        )
        decisions.append(summary_decision)

        await emit(
            EV_ORCHESTRATOR_DECISION,
            orchestrator_id=self.id,
            decision="summarize",
            reasoning="Generating final summary",
        )

        summary = await self._summarize(task, correlation, specialist_results)
        await emit(
            EV_ORCHESTRATOR_SUMMARY,
            orchestrator_id=self.id,
            summary_length=len(summary or ""),
        )

        duration_ms = (time.perf_counter() - start_time) * 1000

        return OrchestratorResult(
            orchestrator_id=self.id,
            success=True,
            summary=summary,
            specialist_results=specialist_results,
            decisions=decisions,
            duration_ms=duration_ms,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return OrchestratorResult(
            orchestrator_id=self.id,
            success=False,
            decisions=decisions,
            duration_ms=duration_ms,
            error=str(e),
        )

with_model

with_model(model: Any) -> Orchestrator

Return a copy of this orchestrator with the given model.

Source code in src/locus/multiagent/orchestrator.py
def with_model(self, model: Any) -> Orchestrator:
    """Return a copy of this orchestrator with the given model."""
    # Also update specialists with the model
    updated_specialists = {
        spec_id: spec.with_model(model) for spec_id, spec in self.specialists.items()
    }
    return self.model_copy(
        update={
            "model": model,
            "specialists": updated_specialists,
        }
    )

OrchestratorResult

Bases: BaseModel

Result from the orchestrator execution.

RoutingDecision

Bases: BaseModel

A decision made by the orchestrator.

create_orchestrator

create_orchestrator(name: str = 'Orchestrator', specialists: list[Specialist] | None = None, model: Any = None) -> Orchestrator

Create an orchestrator with the given specialists.

Parameters:

Name Type Description Default
name str

Orchestrator name

'Orchestrator'
specialists list[Specialist] | None

List of specialists to register

None
model Any

Model for decision making

None

Returns:

Type Description
Orchestrator

Configured Orchestrator instance

Source code in src/locus/multiagent/orchestrator.py
def create_orchestrator(
    name: str = "Orchestrator",
    specialists: list[Specialist] | None = None,
    model: Any = None,
) -> Orchestrator:
    """
    Create an orchestrator with the given specialists.

    Args:
        name: Orchestrator name
        specialists: List of specialists to register
        model: Model for decision making

    Returns:
        Configured Orchestrator instance
    """
    orchestrator = Orchestrator(name=name, model=model)

    if specialists:
        orchestrator.register_specialists(specialists)

    return orchestrator

Specialist

Bases: BaseModel

A specialist agent focused on a specific domain.

Features: - Domain-specific system prompt - Focused tool set - Optional playbook integration - Confidence-based execution

select_playbook

select_playbook(task: str) -> Playbook | None

Select the most appropriate playbook for a task.

Parameters:

Name Type Description Default
task str

The task description

required

Returns:

Type Description
Playbook | None

Best matching playbook or None

Source code in src/locus/multiagent/specialist.py
def select_playbook(self, task: str) -> Playbook | None:
    """
    Select the most appropriate playbook for a task.

    Args:
        task: The task description

    Returns:
        Best matching playbook or None
    """
    # Simple keyword matching - could be enhanced with embeddings
    task_lower = task.lower()

    best_match: Playbook | None = None
    best_score = 0

    for playbook in self.playbooks:
        # Count matching keywords
        score = 0
        playbook_words = set(playbook.name.lower().split())
        playbook_words.update(playbook.description.lower().split())

        for word in task_lower.split():
            if word in playbook_words:
                score += 1

        if score > best_score:
            best_score = score
            best_match = playbook

    return best_match

execute async

execute(task: str, context: dict[str, Any] | None = None, registry: ToolRegistry | None = None) -> SpecialistResult

Execute the specialist on a task.

Parameters:

Name Type Description Default
task str

The task to perform

required
context dict[str, Any] | None

Optional context from orchestrator or other specialists

None
registry ToolRegistry | None

Tool registry (uses self.tools if not provided)

None

Returns:

Type Description
SpecialistResult

SpecialistResult with output and confidence

Source code in src/locus/multiagent/specialist.py
async def execute(
    self,
    task: str,
    context: dict[str, Any] | None = None,
    registry: ToolRegistry | None = None,
) -> SpecialistResult:
    """
    Execute the specialist on a task.

    Args:
        task: The task to perform
        context: Optional context from orchestrator or other specialists
        registry: Tool registry (uses self.tools if not provided)

    Returns:
        SpecialistResult with output and confidence
    """
    if self.model is None:
        return SpecialistResult(
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            error="No model configured for specialist",
        )

    # Local import — observability is optional; this is a no-op
    # outside a run_context.
    from locus.observability.emit import (  # noqa: PLC0415
        EV_SPECIALIST_COMPLETED,
        EV_SPECIALIST_STARTED,
        emit,
    )

    start_time = time.perf_counter()

    await emit(
        EV_SPECIALIST_STARTED,
        specialist_id=self.id,
        specialist_type=self.specialist_type,
        task_preview=task[:160],
    )

    # Construct the typed event for back-compat consumers that iterate
    # the agent loop's event stream. Stored under a leading underscore
    # because nothing currently consumes the local — the observability
    # emit above is the live publication path.
    _start_event = SpecialistStartEvent(
        specialist_id=self.id,
        specialist_type=self.specialist_type,
        task=task,
    )

    # Select appropriate playbook
    playbook = self.select_playbook(task)

    # Build system prompt
    system_prompt = self._build_system_prompt(task, playbook)

    # Initialize state
    state = AgentState(
        agent_id=self.id,
        max_iterations=self.max_iterations,
        confidence_threshold=self.confidence_threshold,
    )

    # Add system message
    state = state.with_message(Message.system(system_prompt))

    # Add context if provided
    if context:
        context_str = self._format_context(context)
        state = state.with_message(Message.user(context_str))

    # Add task message
    state = state.with_message(Message.user(task))

    # Get tool schemas
    tool_schemas = None
    if self.tools:
        tool_schemas = [tool.to_openai_schema() for tool in self.tools]

    try:
        # Simple single-turn execution for now
        # Full agentic loop would integrate with the main loop system
        response = await self.model.complete(
            messages=list(state.messages),
            tools=tool_schemas,
        )

        # Update state with response
        state = state.with_message(response.message)

        # Extract confidence from response (simple heuristic)
        confidence = self._estimate_confidence(response.message.content or "")

        duration_ms = (time.perf_counter() - start_time) * 1000

        await emit(
            EV_SPECIALIST_COMPLETED,
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            output_preview=(response.message.content or "")[:200],
            confidence=confidence,
            duration_ms=duration_ms,
            success=True,
        )

        # Local construction kept for parity with other typed-event sites.
        complete_event = SpecialistCompleteEvent(  # noqa: F841
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            result=response.message.content,
            confidence=confidence,
            duration_ms=duration_ms,
        )

        return SpecialistResult(
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            output=response.message.content,
            confidence=confidence,
            duration_ms=duration_ms,
            state=state,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        await emit(
            EV_SPECIALIST_COMPLETED,
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            error=str(e),
            duration_ms=duration_ms,
            success=False,
        )
        return SpecialistResult(
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            error=str(e),
            duration_ms=duration_ms,
            state=state,
        )

with_model

with_model(model: Any) -> Specialist

Return a copy of this specialist with the given model.

Source code in src/locus/multiagent/specialist.py
def with_model(self, model: Any) -> Specialist:
    """Return a copy of this specialist with the given model."""
    return self.model_copy(update={"model": model})

SpecialistResult

Bases: BaseModel

Result from a specialist agent execution.

success property

success: bool

Whether the specialist completed successfully.

Playbook

Bases: BaseModel

A predefined procedure for a specialist to follow.

Playbooks provide structured guidance for domain-specific tasks.

to_prompt

to_prompt() -> str

Convert playbook to a prompt for the specialist.

Source code in src/locus/multiagent/specialist.py
def to_prompt(self) -> str:
    """Convert playbook to a prompt for the specialist."""
    lines = [
        f"## Playbook: {self.name}",
        "",
        self.description,
        "",
    ]

    if self.preconditions:
        lines.append("### Preconditions:")
        for pre in self.preconditions:
            lines.append(f"- {pre}")
        lines.append("")

    lines.append("### Steps:")
    for i, step in enumerate(self.steps, 1):
        lines.append(f"{i}. {step.instruction}")
        if step.required_tools:
            lines.append(f"   Tools: {', '.join(step.required_tools)}")
        if step.expected_output:
            lines.append(f"   Expected: {step.expected_output}")
        if step.on_failure:
            lines.append(f"   On failure: {step.on_failure}")

    if self.success_criteria:
        lines.append("")
        lines.append(f"### Success Criteria: {self.success_criteria}")

    return "\n".join(lines)

PlaybookStep

Bases: BaseModel

A step in a playbook procedure.

Built-in specialists

create_code_analyst

create_code_analyst(model: Any = None, tools: list[Tool] | None = None) -> Specialist

Create a code analysis specialist.

Source code in src/locus/multiagent/specialist.py
def create_code_analyst(
    model: Any = None,
    tools: list[Tool] | None = None,
) -> Specialist:
    """Create a code analysis specialist."""
    return Specialist(
        name="Code Analyst",
        specialist_type="code_analyst",
        description="Specializes in analyzing source code, understanding implementations, and identifying potential issues.",
        system_prompt="""You are an expert code analyst. Your responsibilities:
1. Analyze source code for bugs and issues
2. Understand code flow and logic
3. Identify potential performance problems
4. Review error handling
5. Suggest improvements

When analyzing code:
- Trace execution paths
- Look for error handling gaps
- Identify resource leaks
- Check for common antipatterns""",
        tools=tools or [],
        model=model,
    )

create_log_analyst

create_log_analyst(model: Any = None, tools: list[Tool] | None = None) -> Specialist

Create a log analysis specialist.

Source code in src/locus/multiagent/specialist.py
def create_log_analyst(
    model: Any = None,
    tools: list[Tool] | None = None,
) -> Specialist:
    """Create a log analysis specialist."""
    return Specialist(
        name="Log Analyst",
        specialist_type="log_analyst",
        description="Specializes in analyzing log files, identifying patterns, and extracting insights from system logs.",
        system_prompt="""You are an expert log analyst. Your responsibilities:
1. Parse and understand various log formats (syslog, JSON, application logs)
2. Identify error patterns and anomalies
3. Correlate events across log entries
4. Extract actionable insights from log data
5. Summarize findings clearly

When analyzing logs:
- Look for error codes, stack traces, and exception messages
- Note timestamps and event sequences
- Identify recurring patterns
- Highlight severity levels""",
        tools=tools or [],
        model=model,
    )

create_metrics_analyst

create_metrics_analyst(model: Any = None, tools: list[Tool] | None = None) -> Specialist

Create a metrics analysis specialist.

Source code in src/locus/multiagent/specialist.py
def create_metrics_analyst(
    model: Any = None,
    tools: list[Tool] | None = None,
) -> Specialist:
    """Create a metrics analysis specialist."""
    return Specialist(
        name="Metrics Analyst",
        specialist_type="metrics_analyst",
        description="Specializes in analyzing system metrics, identifying anomalies, and understanding performance trends.",
        system_prompt="""You are an expert metrics analyst. Your responsibilities:
1. Analyze time-series metrics data
2. Identify anomalies and deviations from baselines
3. Understand correlations between different metrics
4. Assess system performance and health
5. Provide actionable recommendations

When analyzing metrics:
- Compare against historical baselines
- Look for sudden spikes or drops
- Identify correlating metrics
- Consider seasonality and trends""",
        tools=tools or [],
        model=model,
    )

create_trace_analyst

create_trace_analyst(model: Any = None, tools: list[Tool] | None = None) -> Specialist

Create a distributed trace analysis specialist.

Source code in src/locus/multiagent/specialist.py
def create_trace_analyst(
    model: Any = None,
    tools: list[Tool] | None = None,
) -> Specialist:
    """Create a distributed trace analysis specialist."""
    return Specialist(
        name="Trace Analyst",
        specialist_type="trace_analyst",
        description="Specializes in analyzing distributed traces, understanding service dependencies, and identifying latency issues.",
        system_prompt="""You are an expert distributed systems analyst. Your responsibilities:
1. Analyze distributed traces across services
2. Identify latency bottlenecks
3. Map service dependencies
4. Detect failed spans and error propagation
5. Understand request flow through the system

When analyzing traces:
- Follow the request path through services
- Identify slow spans and their causes
- Look for retry patterns
- Map the dependency graph""",
        tools=tools or [],
        model=model,
    )

Swarm

Self-organizing agents that pick up tasks from a shared queue and share progress via SharedContext.

Swarm

Bases: BaseModel

A self-organizing swarm of agents.

Features: - Agents coordinate autonomously - Shared context/memory for communication - Dynamic task allocation based on capabilities - Parallel execution with coordination

add_agent

add_agent(agent: SwarmAgent) -> Swarm

Add an agent to the swarm.

If the swarm has a model configured and the incoming agent does not, the agent inherits the swarm's model. Without this, the agent's first work_on_task would fail with "No model configured for agent" — which used to be the most common silent-failure mode for new users of the swarm API.

Source code in src/locus/multiagent/swarm.py
def add_agent(self, agent: SwarmAgent) -> Swarm:
    """Add an agent to the swarm.

    If the swarm has a model configured and the incoming agent does
    not, the agent inherits the swarm's model. Without this, the
    agent's first ``work_on_task`` would fail with
    "No model configured for agent" — which used to be the most
    common silent-failure mode for new users of the swarm API.
    """
    if self.model is not None and agent.model is None:
        agent = agent.with_model(self.model)
    self.agents.append(agent)
    return self

add_task

add_task(description: str, priority: int = 0, parent_task_id: str | None = None, metadata: dict[str, Any] | None = None) -> SwarmTask

Add a task to the queue.

Source code in src/locus/multiagent/swarm.py
def add_task(
    self,
    description: str,
    priority: int = 0,
    parent_task_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> SwarmTask:
    """Add a task to the queue."""
    task = SwarmTask(
        description=description,
        priority=priority,
        parent_task_id=parent_task_id,
        metadata=metadata or {},
    )
    self.task_queue.append(task)
    # Sort by priority (higher first)
    self.task_queue.sort(key=lambda t: -t.priority)
    return task

execute async

execute(initial_task: str | None = None, decompose_tasks: bool = True) -> SwarmResult

Execute the swarm.

Parameters:

Name Type Description Default
initial_task str | None

Optional initial task to add

None
decompose_tasks bool

Whether to decompose tasks into subtasks

True

Returns:

Type Description
SwarmResult

SwarmResult with all completed work

Source code in src/locus/multiagent/swarm.py
async def execute(
    self,
    initial_task: str | None = None,
    decompose_tasks: bool = True,
) -> SwarmResult:
    """
    Execute the swarm.

    Args:
        initial_task: Optional initial task to add
        decompose_tasks: Whether to decompose tasks into subtasks

    Returns:
        SwarmResult with all completed work
    """
    start_time = time.perf_counter()

    # Add initial task if provided
    if initial_task:
        main_task = self.add_task(initial_task, priority=10)

        # Optionally decompose into subtasks
        if decompose_tasks:
            await self._generate_subtasks(main_task)

    try:
        semaphore = asyncio.Semaphore(self.max_parallel_agents)

        async def run_with_limit(agent: SwarmAgent) -> list[SwarmTask]:
            async with semaphore:
                return await self._run_agent_loop(agent)

        # Group tasks by priority and run in waves (high priority first).
        # Tasks within the same priority wave run in parallel.
        # Lower-priority waves wait for higher-priority waves to complete,
        # so they can see the earlier findings in SharedContext.
        iteration = 0
        all_completed: list[SwarmTask] = []

        # Get unique priority levels (sorted descending = highest first)
        priority_levels = sorted({t.priority for t in self.task_queue}, reverse=True)

        for priority in priority_levels:
            if iteration >= self.max_iterations:
                break

            # Check if there are pending tasks at this priority
            pending_at_level = [
                t
                for t in self.task_queue
                if t.status == TaskStatus.PENDING and t.priority == priority
            ]
            if not pending_at_level:
                continue

            # Run agents for this priority wave.
            # Agents run sequentially to avoid concurrent API issues
            # (some providers return empty responses under parallel load).
            # Each agent claims one task, completes it, then the next agent goes.
            results = []
            for agent in self.agents:
                agent_results = await self._run_agent_loop(agent)
                results.append(agent_results)

            for completed_list in results:
                all_completed.extend(completed_list)

            iteration += 1

        # Handle any remaining pending tasks (fallback)
        remaining = [t for t in self.task_queue if t.status == TaskStatus.PENDING]
        while remaining and iteration < self.max_iterations:
            tasks = [run_with_limit(agent) for agent in self.agents]
            results = await asyncio.gather(*tasks)
            for completed_list in results:
                all_completed.extend(completed_list)
            remaining = [t for t in self.task_queue if t.status == TaskStatus.PENDING]
            iteration += 1

        # Collect results
        completed = [t for t in self.task_queue if t.status == TaskStatus.COMPLETED]
        failed = [t for t in self.task_queue if t.status == TaskStatus.FAILED]

        # Generate summary
        summary = await self._generate_summary()

        duration_ms = (time.perf_counter() - start_time) * 1000

        return SwarmResult(
            swarm_id=self.id,
            success=len(failed) == 0,
            completed_tasks=completed,
            failed_tasks=failed,
            context=self.context,
            summary=summary,
            duration_ms=duration_ms,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return SwarmResult(
            swarm_id=self.id,
            success=False,
            duration_ms=duration_ms,
            error=str(e),
        )

with_model

with_model(model: Any) -> Swarm

Return a copy with the given model for all agents.

Source code in src/locus/multiagent/swarm.py
def with_model(self, model: Any) -> Swarm:
    """Return a copy with the given model for all agents."""
    updated_agents = [agent.with_model(model) for agent in self.agents]
    return self.model_copy(
        update={
            "model": model,
            "agents": updated_agents,
        }
    )

SharedContext

Bases: BaseModel

Shared context/memory for swarm agents.

All agents can read from and write to this shared state.

add_finding async

add_finding(key: str, value: Any, agent_id: str) -> None

Add a finding to the shared context.

Source code in src/locus/multiagent/swarm.py
async def add_finding(self, key: str, value: Any, agent_id: str) -> None:
    """Add a finding to the shared context."""
    async with self._lock:
        self.findings[key] = value
        self.discovery_log.append(
            {
                "type": "finding",
                "key": key,
                "value": value,
                "agent_id": agent_id,
                "timestamp": datetime.now(UTC).isoformat(),
            }
        )

post_to_blackboard async

post_to_blackboard(key: str, message: str, agent_id: str) -> None

Post a message to the blackboard.

Source code in src/locus/multiagent/swarm.py
async def post_to_blackboard(self, key: str, message: str, agent_id: str) -> None:
    """Post a message to the blackboard."""
    async with self._lock:
        self.blackboard[key] = message
        self.discovery_log.append(
            {
                "type": "blackboard",
                "key": key,
                "message": message,
                "agent_id": agent_id,
                "timestamp": datetime.now(UTC).isoformat(),
            }
        )

record_task_result async

record_task_result(task_id: str, result: str) -> None

Record a task result.

Source code in src/locus/multiagent/swarm.py
async def record_task_result(self, task_id: str, result: str) -> None:
    """Record a task result."""
    async with self._lock:
        self.task_results[task_id] = result

get_summary

get_summary() -> str

Get a summary of the current context state.

Source code in src/locus/multiagent/swarm.py
def get_summary(self) -> str:
    """Get a summary of the current context state."""
    lines = ["## Shared Context Summary"]

    if self.findings:
        lines.append("\n### Findings:")
        for key, value in self.findings.items():
            lines.append(f"- **{key}**: {value}")

    if self.task_results:
        lines.append("\n### Task Results:")
        for tid, result in self.task_results.items():
            snippet = result if len(result) <= 600 else result[:600].rstrip() + "…"
            lines.append(f"- **{tid}**:\n{snippet}")

    if self.blackboard:
        lines.append("\n### Blackboard Messages:")
        for key, msg in self.blackboard.items():
            lines.append(f"- **{key}**: {msg}")

    if len(self.discovery_log) > 5:
        lines.append(f"\n### Recent Activity: ({len(self.discovery_log)} total entries)")
        for entry in self.discovery_log[-5:]:
            lines.append(f"- [{entry['type']}] {entry.get('key', 'unknown')}")

    return "\n".join(lines)

SwarmAgent

Bases: BaseModel

An agent in the swarm.

Autonomously claims and works on tasks from the shared queue.

can_handle

can_handle(task: SwarmTask) -> bool

Decide whether this agent is eligible to claim task.

Resolution order:

  1. Tag set-membership — if the task declares required_tags, every required tag must appear in :attr:capabilities. This is the deterministic path — the swarm's primary discovery mechanism.
  2. Generalist — if neither side declares anything, the agent claims the task (capabilities=[] ⇒ generalist).
  3. Substring fallback — kept for backwards compatibility with pre-tag swarms: if the agent has capabilities but the task has no tags, match keywords against the description.
Source code in src/locus/multiagent/swarm.py
def can_handle(self, task: SwarmTask) -> bool:
    """Decide whether this agent is eligible to claim ``task``.

    Resolution order:

    1. **Tag set-membership** — if the task declares
       ``required_tags``, every required tag must appear in
       :attr:`capabilities`. This is the deterministic path —
       the swarm's primary discovery mechanism.
    2. **Generalist** — if neither side declares anything,
       the agent claims the task (``capabilities=[]`` ⇒
       generalist).
    3. **Substring fallback** — kept for backwards compatibility
       with pre-tag swarms: if the agent has capabilities but the
       task has no tags, match keywords against the description.
    """
    if task.required_tags:
        agent_tags = {c.lower() for c in self.capabilities}
        return all(req.lower() in agent_tags for req in task.required_tags)
    if not self.capabilities:
        return True
    task_lower = task.description.lower()
    return any(cap.lower() in task_lower for cap in self.capabilities)

priority_for_task

priority_for_task(task: SwarmTask) -> float

Score this agent's fit for task in the range [0, 1].

Tag-driven scoring (when the task declares tags):

  • Each required tag the agent advertises adds 1.0 weight.
  • Each preferred tag the agent advertises adds 0.5 weight.
  • Score = (sum of weights) / (max possible weights), clamped.

Falls through to the legacy substring score for tag-less tasks so pre-tag swarms keep their old behaviour.

Source code in src/locus/multiagent/swarm.py
def priority_for_task(self, task: SwarmTask) -> float:
    """Score this agent's fit for ``task`` in the range ``[0, 1]``.

    Tag-driven scoring (when the task declares tags):

    - Each required tag the agent advertises adds 1.0 weight.
    - Each preferred tag the agent advertises adds 0.5 weight.
    - Score = (sum of weights) / (max possible weights), clamped.

    Falls through to the legacy substring score for tag-less
    tasks so pre-tag swarms keep their old behaviour.
    """
    if task.required_tags or task.preferred_tags:
        agent_tags = {c.lower() for c in self.capabilities}
        req_hits = sum(1 for t in task.required_tags if t.lower() in agent_tags)
        pref_hits = sum(1 for t in task.preferred_tags if t.lower() in agent_tags)
        max_weight = float(len(task.required_tags)) + 0.5 * len(task.preferred_tags)
        if max_weight == 0:
            return 0.5
        return min(1.0, (req_hits + 0.5 * pref_hits) / max_weight)
    if not self.capabilities:
        return 0.5
    task_lower = task.description.lower()
    matches = sum(1 for cap in self.capabilities if cap.lower() in task_lower)
    return min(1.0, matches / len(self.capabilities))

work_on_task async

work_on_task(task: SwarmTask, context: SharedContext) -> tuple[str | None, str | None]

Work on a task using the shared context.

Parameters:

Name Type Description Default
task SwarmTask

The task to work on

required
context SharedContext

Shared context with other agents

required

Returns:

Type Description
tuple[str | None, str | None]

Tuple of (result, error)

Source code in src/locus/multiagent/swarm.py
    async def work_on_task(
        self,
        task: SwarmTask,
        context: SharedContext,
    ) -> tuple[str | None, str | None]:
        """
        Work on a task using the shared context.

        Args:
            task: The task to work on
            context: Shared context with other agents

        Returns:
            Tuple of (result, error)
        """
        if self.model is None:
            return None, "No model configured for agent"

        # Build prompt with context
        context_summary = context.get_summary()

        prompt = f"""## Your Role
{self.system_prompt}

## Shared Context
{context_summary}

## Task
{task.description}

## Instructions
1. Analyze the task and shared context
2. If you discover new findings, note them clearly
3. If you need information from other agents, post a request to the blackboard
4. Complete the task to the best of your ability
5. Report your findings clearly

Format your response as:
### Findings
(Any new discoveries)

### Analysis
(Your analysis and conclusions)

### Blackboard (optional)
(Any messages for other agents)"""

        messages = [
            Message.system("You are a collaborative agent in a swarm."),
            Message.user(prompt),
        ]

        try:
            response = await self.model.complete(messages=messages)
            content = response.message.content or ""

            # Extract findings and update context
            await self._extract_and_share(content, context, task.id)

            return content, None

        except Exception as e:  # noqa: BLE001
            return None, str(e)

with_model

with_model(model: Any) -> SwarmAgent

Return a copy with the given model.

Source code in src/locus/multiagent/swarm.py
def with_model(self, model: Any) -> SwarmAgent:
    """Return a copy with the given model."""
    return self.model_copy(update={"model": model})

SwarmTask

Bases: BaseModel

A task in the swarm task queue.

required_tags declares the capability tags an agent must advertise to claim this task — set-membership against :attr:SwarmAgent.capabilities. Empty means any agent may claim it; preferred_tags boost an agent's priority score without being a hard requirement.

Backwards-compat: agents that pre-date this change still work — if no tags are set, the prior substring-match path runs as a fallback (see SwarmAgent.can_handle).

SwarmResult

Bases: BaseModel

Result from swarm execution.

TaskStatus

Bases: StrEnum

Status of a task in the swarm.

create_swarm

create_swarm(name: str = 'Swarm', agents: list[SwarmAgent] | None = None, model: Any = None) -> Swarm

Create a swarm with the given agents.

Parameters:

Name Type Description Default
name str

Swarm name

'Swarm'
agents list[SwarmAgent] | None

List of agents to add

None
model Any

Model for agents and coordination. When provided, any agent in agents that doesn't already carry its own model inherits this one — otherwise Swarm.execute would later report "No model configured for agent" for those agents.

None

Returns:

Type Description
Swarm

Configured Swarm instance

Source code in src/locus/multiagent/swarm.py
def create_swarm(
    name: str = "Swarm",
    agents: list[SwarmAgent] | None = None,
    model: Any = None,
) -> Swarm:
    """
    Create a swarm with the given agents.

    Args:
        name: Swarm name
        agents: List of agents to add
        model: Model for agents and coordination. When provided, any agent
            in ``agents`` that doesn't already carry its own model inherits
            this one — otherwise ``Swarm.execute`` would later report
            "No model configured for agent" for those agents.

    Returns:
        Configured Swarm instance
    """
    swarm = Swarm(name=name, model=model)

    # ``Swarm.add_agent`` propagates the swarm's model into any agent that
    # doesn't already carry one, so we don't repeat that logic here.
    if agents:
        for agent in agents:
            swarm.add_agent(agent)

    return swarm

create_swarm_agent

create_swarm_agent(name: str, capabilities: list[str] | None = None, system_prompt: str = '', model: Any = None) -> SwarmAgent

Create a swarm agent.

Parameters:

Name Type Description Default
name str

Agent name

required
capabilities list[str] | None

List of capability keywords

None
system_prompt str

System prompt for the agent

''
model Any

Model for the agent

None

Returns:

Type Description
SwarmAgent

Configured SwarmAgent instance

Source code in src/locus/multiagent/swarm.py
def create_swarm_agent(
    name: str,
    capabilities: list[str] | None = None,
    system_prompt: str = "",
    model: Any = None,
) -> SwarmAgent:
    """
    Create a swarm agent.

    Args:
        name: Agent name
        capabilities: List of capability keywords
        system_prompt: System prompt for the agent
        model: Model for the agent

    Returns:
        Configured SwarmAgent instance
    """
    return SwarmAgent(
        name=name,
        capabilities=capabilities or [],
        system_prompt=system_prompt,
        model=model,
    )

Handoff

Agent-to-agent context transfer. HandoffReason is the typed reason the source agent emitted; HandoffContext is the carried state.

Handoff

Bases: BaseModel

Manages handoffs between agents.

Features: - Context transfer between agents - State preservation - Handoff event emission - Chain of custody tracking

register_agent

register_agent(agent: HandoffAgent) -> None

Register an agent for handoffs.

Source code in src/locus/multiagent/handoff.py
def register_agent(self, agent: HandoffAgent) -> None:
    """Register an agent for handoffs."""
    self.agents[agent.id] = agent

register_agents

register_agents(agents: list[HandoffAgent]) -> None

Register multiple agents.

Source code in src/locus/multiagent/handoff.py
def register_agents(self, agents: list[HandoffAgent]) -> None:
    """Register multiple agents."""
    for agent in agents:
        self.register_agent(agent)

create_handoff async

create_handoff(source_agent: HandoffAgent, target_agent_id: str, task: str, reason: HandoffReason, state: AgentState | None = None, findings: dict[str, Any] | None = None, instructions: str | None = None) -> HandoffContext

Create a handoff context.

Parameters:

Name Type Description Default
source_agent HandoffAgent

The agent initiating the handoff

required
target_agent_id str

ID of the target agent

required
task str

The original task

required
reason HandoffReason

Reason for the handoff

required
state AgentState | None

Current agent state

None
findings dict[str, Any] | None

Findings to transfer

None
instructions str | None

Specific instructions for target

None

Returns:

Type Description
HandoffContext

HandoffContext for the target agent

Source code in src/locus/multiagent/handoff.py
async def create_handoff(
    self,
    source_agent: HandoffAgent,
    target_agent_id: str,
    task: str,
    reason: HandoffReason,
    state: AgentState | None = None,
    findings: dict[str, Any] | None = None,
    instructions: str | None = None,
) -> HandoffContext:
    """
    Create a handoff context.

    Args:
        source_agent: The agent initiating the handoff
        target_agent_id: ID of the target agent
        task: The original task
        reason: Reason for the handoff
        state: Current agent state
        findings: Findings to transfer
        instructions: Specific instructions for target

    Returns:
        HandoffContext for the target agent
    """
    # Extract information from state
    key_messages: list[Message] = []
    state_snapshot: dict[str, Any] = {}
    conversation_summary: str | None = None
    confidence = 0.0

    if state:
        key_messages = self._extract_key_messages(state)
        conversation_summary = self._summarize_conversation(list(state.messages))
        state_snapshot = {
            "iteration": state.iteration,
            "tool_history": list(state.tool_history[-5:]),
            "errors": list(state.errors[-3:]),
        }
        confidence = state.confidence

    # Build handoff chain
    handoff_chain = [source_agent.id]
    if self.history:
        last_context = self.history[-1]
        if last_context.target_agent_id == source_agent.id:
            handoff_chain = last_context.handoff_chain + [source_agent.id]

    context = HandoffContext(
        source_agent_id=source_agent.id,
        target_agent_id=target_agent_id,
        reason=reason,
        original_task=task,
        conversation_summary=conversation_summary,
        key_messages=key_messages if self.preserve_full_history else [],
        state_snapshot=state_snapshot,
        findings=findings or {},
        confidence=confidence,
        instructions=instructions,
        handoff_chain=handoff_chain,
    )

    self.history.append(context)

    return context

execute_handoff async

execute_handoff(source_agent: HandoffAgent, target_agent_id: str, task: str, reason: HandoffReason, state: AgentState | None = None, findings: dict[str, Any] | None = None, instructions: str | None = None) -> HandoffResult

Execute a complete handoff.

Parameters:

Name Type Description Default
source_agent HandoffAgent

The agent initiating the handoff

required
target_agent_id str

ID of the target agent

required
task str

The original task

required
reason HandoffReason

Reason for the handoff

required
state AgentState | None

Current agent state

None
findings dict[str, Any] | None

Findings to transfer

None
instructions str | None

Specific instructions for target

None

Returns:

Type Description
HandoffResult

HandoffResult from the target agent

Source code in src/locus/multiagent/handoff.py
async def execute_handoff(
    self,
    source_agent: HandoffAgent,
    target_agent_id: str,
    task: str,
    reason: HandoffReason,
    state: AgentState | None = None,
    findings: dict[str, Any] | None = None,
    instructions: str | None = None,
) -> HandoffResult:
    """
    Execute a complete handoff.

    Args:
        source_agent: The agent initiating the handoff
        target_agent_id: ID of the target agent
        task: The original task
        reason: Reason for the handoff
        state: Current agent state
        findings: Findings to transfer
        instructions: Specific instructions for target

    Returns:
        HandoffResult from the target agent
    """
    # Validate target exists
    target_agent = self.agents.get(target_agent_id)
    if target_agent is None:
        return HandoffResult(
            handoff_id="",
            success=False,
            source_agent_id=source_agent.id,
            target_agent_id=target_agent_id,
            error=f"Target agent not found: {target_agent_id}",
        )

    # Check handoff chain limit
    chain_length = len(self.history)
    if chain_length >= self.max_handoff_chain:
        return HandoffResult(
            handoff_id="",
            success=False,
            source_agent_id=source_agent.id,
            target_agent_id=target_agent_id,
            error=f"Maximum handoff chain length ({self.max_handoff_chain}) exceeded",
        )

    # Create handoff context
    context = await self.create_handoff(
        source_agent=source_agent,
        target_agent_id=target_agent_id,
        task=task,
        reason=reason,
        state=state,
        findings=findings,
        instructions=instructions,
    )

    # Local import — observability is optional. The emit calls
    # below are no-ops when there is no run_context active.
    from locus.observability.emit import (  # noqa: PLC0415
        EV_HANDOFF_COMPLETED,
        EV_HANDOFF_INITIATED,
        emit,
    )

    await emit(
        EV_HANDOFF_INITIATED,
        source_agent_id=source_agent.id,
        target_agent_id=target_agent_id,
        reason=getattr(reason, "value", str(reason)),
        context_summary=context.progress_summary,
    )

    # Construct the typed event for back-compat consumers — emit
    # above is the live publication path.
    _handoff_event = HandoffEvent(  # noqa: F841
        source_agent_id=source_agent.id,
        target_agent_id=target_agent_id,
        reason=reason,
        context_summary=context.progress_summary,
    )

    # Execute handoff
    result = await target_agent.receive_handoff(context)
    result.returned_context = context

    await emit(
        EV_HANDOFF_COMPLETED,
        source_agent_id=source_agent.id,
        target_agent_id=target_agent_id,
        success=result.success,
        output_length=len(result.output or ""),
    )

    return result

chain_handoff async

chain_handoff(agent_chain: list[str], task: str, initial_state: AgentState | None = None) -> list[HandoffResult]

Execute a chain of handoffs through multiple agents.

Parameters:

Name Type Description Default
agent_chain list[str]

List of agent IDs to process through

required
task str

The task to process

required
initial_state AgentState | None

Initial state

None

Returns:

Type Description
list[HandoffResult]

List of results from each handoff

Source code in src/locus/multiagent/handoff.py
async def chain_handoff(
    self,
    agent_chain: list[str],
    task: str,
    initial_state: AgentState | None = None,
) -> list[HandoffResult]:
    """
    Execute a chain of handoffs through multiple agents.

    Args:
        agent_chain: List of agent IDs to process through
        task: The task to process
        initial_state: Initial state

    Returns:
        List of results from each handoff
    """
    results: list[HandoffResult] = []
    current_state = initial_state
    current_findings: dict[str, Any] = {}

    for i in range(len(agent_chain) - 1):
        source_id = agent_chain[i]
        target_id = agent_chain[i + 1]

        source_agent = self.agents.get(source_id)
        if source_agent is None:
            results.append(
                HandoffResult(
                    handoff_id="",
                    success=False,
                    source_agent_id=source_id,
                    target_agent_id=target_id,
                    error=f"Source agent not found: {source_id}",
                )
            )
            break

        result = await self.execute_handoff(
            source_agent=source_agent,
            target_agent_id=target_id,
            task=task,
            reason=HandoffReason.DELEGATION,
            state=current_state,
            findings=current_findings,
        )

        results.append(result)

        if not result.success:
            break

        # Update findings for next handoff
        if result.output:
            current_findings[f"from_{source_id}"] = result.output

    return results

HandoffAgent

Bases: BaseModel

An agent that can participate in handoffs.

Supports receiving context from other agents and transferring context when handing off.

receive_handoff async

receive_handoff(context: HandoffContext) -> HandoffResult

Receive a handoff from another agent.

Parameters:

Name Type Description Default
context HandoffContext

The handoff context

required

Returns:

Type Description
HandoffResult

HandoffResult with output

Source code in src/locus/multiagent/handoff.py
async def receive_handoff(
    self,
    context: HandoffContext,
) -> HandoffResult:
    """
    Receive a handoff from another agent.

    Args:
        context: The handoff context

    Returns:
        HandoffResult with output
    """
    if self.model is None:
        return HandoffResult(
            handoff_id=context.handoff_id,
            success=False,
            source_agent_id=context.source_agent_id,
            target_agent_id=self.id,
            error="No model configured for agent",
        )

    start_time = time.perf_counter()

    # Build prompt from context
    handoff_prompt = context.to_prompt()

    # Create messages
    messages = [
        Message.system(self.system_prompt),
        Message.user(handoff_prompt),
    ]

    # Add key messages from context
    for msg in context.key_messages:
        messages.append(msg)

    # Final instruction
    messages.append(
        Message.user("Continue working on the task. Report your findings and conclusions.")
    )

    try:
        # Get tool schemas
        tool_schemas = None
        if self.tools:
            tool_schemas = [tool.to_openai_schema() for tool in self.tools]

        response = await self.model.complete(
            messages=messages,
            tools=tool_schemas,
        )
        content = response.message.content or ""

        # If the provider returned an empty body (some endpoints do this
        # when the prompt is mostly structured headers), retry once with
        # a more direct continuation instruction so callers don't silently
        # receive an empty handoff result.
        if not content.strip():
            retry_messages = [
                *messages,
                Message.user(
                    "Please respond now with your findings and conclusions "
                    "as a short paragraph. Do not return an empty response."
                ),
            ]
            retry_response = await self.model.complete(
                messages=retry_messages,
                tools=tool_schemas,
            )
            content = retry_response.message.content or ""

        # Estimate new confidence
        confidence = self._estimate_confidence(content, context.confidence)

        duration_ms = (time.perf_counter() - start_time) * 1000

        return HandoffResult(
            handoff_id=context.handoff_id,
            success=bool(content.strip()),
            source_agent_id=context.source_agent_id,
            target_agent_id=self.id,
            output=content,
            final_confidence=confidence,
            duration_ms=duration_ms,
            error=None if content.strip() else "model returned empty content twice",
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return HandoffResult(
            handoff_id=context.handoff_id,
            success=False,
            source_agent_id=context.source_agent_id,
            target_agent_id=self.id,
            error=str(e),
            duration_ms=duration_ms,
        )

with_model

with_model(model: Any) -> HandoffAgent

Return a copy with the given model.

Source code in src/locus/multiagent/handoff.py
def with_model(self, model: Any) -> HandoffAgent:
    """Return a copy with the given model."""
    return self.model_copy(update={"model": model})

HandoffContext

Bases: BaseModel

Context transferred during a handoff.

Contains all information needed for the target agent to continue.

to_prompt

to_prompt() -> str

Convert handoff context to a prompt for the target agent.

Source code in src/locus/multiagent/handoff.py
def to_prompt(self) -> str:
    """Convert handoff context to a prompt for the target agent."""
    lines = [
        "## Handoff Context",
        "",
        f"**Reason:** {self.reason.value}",
        f"**From:** {self.source_agent_id}",
        f"**Confidence so far:** {self.confidence:.2f}",
        "",
        "### Original Task",
        self.original_task,
        "",
    ]

    if self.progress_summary:
        lines.extend(
            [
                "### Progress So Far",
                self.progress_summary,
                "",
            ]
        )

    if self.findings:
        lines.append("### Findings")
        for key, value in self.findings.items():
            lines.append(f"- **{key}:** {value}")
        lines.append("")

    if self.conversation_summary:
        lines.extend(
            [
                "### Conversation Summary",
                self.conversation_summary,
                "",
            ]
        )

    if self.instructions:
        lines.extend(
            [
                "### Instructions",
                self.instructions,
                "",
            ]
        )

    if self.handoff_chain:
        lines.extend(
            [
                "### Handoff Chain",
                " -> ".join(self.handoff_chain + [self.target_agent_id]),
                "",
            ]
        )

    return "\n".join(lines)

HandoffReason

Bases: StrEnum

Reason for a handoff between agents.

HandoffEvent

Bases: LocusEvent

Event emitted when a handoff occurs.

HandoffResult

Bases: BaseModel

Result from a handoff operation.

create_handoff_agent

create_handoff_agent(name: str, description: str = '', system_prompt: str = '', tools: list[Tool] | None = None, model: Any = None) -> HandoffAgent

Create a handoff-capable agent.

Parameters:

Name Type Description Default
name str

Agent name

required
description str

Agent description

''
system_prompt str

System prompt

''
tools list[Tool] | None

Available tools

None
model Any

Model for the agent

None

Returns:

Type Description
HandoffAgent

Configured HandoffAgent

Source code in src/locus/multiagent/handoff.py
def create_handoff_agent(
    name: str,
    description: str = "",
    system_prompt: str = "",
    tools: list[Tool] | None = None,
    model: Any = None,
) -> HandoffAgent:
    """
    Create a handoff-capable agent.

    Args:
        name: Agent name
        description: Agent description
        system_prompt: System prompt
        tools: Available tools
        model: Model for the agent

    Returns:
        Configured HandoffAgent
    """
    return HandoffAgent(
        name=name,
        description=description,
        system_prompt=system_prompt,
        tools=tools or [],
        model=model,
    )

create_handoff_manager

create_handoff_manager(agents: list[HandoffAgent] | None = None, max_chain: int = 5) -> Handoff

Create a handoff manager.

Parameters:

Name Type Description Default
agents list[HandoffAgent] | None

Agents to register

None
max_chain int

Maximum handoff chain length

5

Returns:

Type Description
Handoff

Configured Handoff manager

Source code in src/locus/multiagent/handoff.py
def create_handoff_manager(
    agents: list[HandoffAgent] | None = None,
    max_chain: int = 5,
) -> Handoff:
    """
    Create a handoff manager.

    Args:
        agents: Agents to register
        max_chain: Maximum handoff chain length

    Returns:
        Configured Handoff manager
    """
    manager = Handoff(max_handoff_chain=max_chain)

    if agents:
        manager.register_agents(agents)

    return manager

StateGraph

DAG-based workflow with explicit nodes, edges, reducers, and a typed state. The most expressive composition primitive — used by create_research_workflow (see DeepAgent) and the router's compiled Runnables.

StateGraph

Bases: BaseModel

A stateful graph for workflow execution.

Supports: - Conditional edges with dynamic routing - Cycles (optional, with max iteration limit) - Human-in-the-loop interrupts - Map-reduce via Send - Subgraph composition - State reducers for composable updates

model_post_init

model_post_init(__context: Any) -> None

Initialize after model creation.

Source code in src/locus/multiagent/graph.py
def model_post_init(self, __context: Any) -> None:
    """Initialize after model creation."""
    # Add virtual START and END nodes
    if START not in self.nodes:
        self.nodes[START] = Node(
            id=START,
            name="START",
            executor=lambda x: x,  # Pass-through
        )
    if END not in self.nodes:
        self.nodes[END] = Node(
            id=END,
            name="END",
            executor=lambda x: x,  # Pass-through
        )

    # Extract reducers from state schema
    if self.state_schema:
        from locus.core.reducers import extract_reducers_from_model

        self._reducers = extract_reducers_from_model(self.state_schema)

    self._rebuild_adjacency()

set_entry_point

set_entry_point(node_id: str) -> StateGraph

Set the entry point node (after START).

Source code in src/locus/multiagent/graph.py
def set_entry_point(self, node_id: str) -> StateGraph:
    """Set the entry point node (after START)."""
    if node_id not in self.nodes:
        raise ValueError(f"Node not found: {node_id}")
    self._entry_point = node_id
    # Add edge from START to entry point
    self.add_edge(START, node_id)
    return self

set_finish_point

set_finish_point(node_id: str) -> StateGraph

Set a finish point node (before END).

Source code in src/locus/multiagent/graph.py
def set_finish_point(self, node_id: str) -> StateGraph:
    """Set a finish point node (before END)."""
    if node_id not in self.nodes:
        raise ValueError(f"Node not found: {node_id}")
    self.add_edge(node_id, END)
    return self

add_node

add_node(node_id: str | Node, executor: Callable[..., Any] | StateGraph | None = None, *, description: str = '', condition: Callable[[dict[str, Any]], bool] | None = None, max_retries: int = 0, timeout_ms: float | None = None, retry_policy: RetryPolicy | None = None, cache_policy: CachePolicy | None = None, defer: bool = False) -> StateGraph

Add a node to the graph.

Parameters:

Name Type Description Default
node_id str | Node

Unique identifier for the node, or a Node object (for backward compatibility)

required
executor Callable[..., Any] | StateGraph | None

Function or subgraph to execute (optional if node_id is a Node)

None
description str

Node description

''
condition Callable[[dict[str, Any]], bool] | None

Optional condition for execution

None
max_retries int

Retry attempts on failure

0
timeout_ms float | None

Execution timeout

None

Returns:

Type Description
StateGraph

Self for chaining

Source code in src/locus/multiagent/graph.py
def add_node(
    self,
    node_id: str | Node,
    executor: Callable[..., Any] | StateGraph | None = None,
    *,
    description: str = "",
    condition: Callable[[dict[str, Any]], bool] | None = None,
    max_retries: int = 0,
    timeout_ms: float | None = None,
    retry_policy: RetryPolicy | None = None,
    cache_policy: CachePolicy | None = None,
    defer: bool = False,
) -> StateGraph:
    """
    Add a node to the graph.

    Args:
        node_id: Unique identifier for the node, or a Node object (for backward compatibility)
        executor: Function or subgraph to execute (optional if node_id is a Node)
        description: Node description
        condition: Optional condition for execution
        max_retries: Retry attempts on failure
        timeout_ms: Execution timeout

    Returns:
        Self for chaining
    """
    # Support old API: add_node(Node)
    if isinstance(node_id, Node):
        node = node_id
        if node.id in self.nodes:
            raise ValueError(f"Node already exists: {node.id}")
        self.nodes[node.id] = node
        self._rebuild_adjacency()
        return self

    # New API: add_node(node_id, executor)
    if executor is None:
        raise TypeError("add_node() missing 1 required positional argument: 'executor'")

    if node_id in self.nodes:
        raise ValueError(f"Node already exists: {node_id}")

    # Check if executor is a subgraph
    is_subgraph = isinstance(executor, StateGraph)

    # When ``is_subgraph`` is True, ``executor`` is a StateGraph
    # instance; mypy can't narrow the union without an isinstance
    # check, but the precondition is enforced upstream.
    node_executor: Callable[..., Any] = (
        executor.execute  # type: ignore[union-attr]
        if is_subgraph
        else executor  # type: ignore[assignment]
    )
    node = Node(
        id=node_id,
        name=node_id,
        description=description,
        executor=node_executor,
        condition=condition,
        max_retries=max_retries,
        timeout_ms=timeout_ms,
        retry_policy=retry_policy,
        cache_policy=cache_policy,
        defer=defer,
        is_subgraph=is_subgraph,
        subgraph=executor if is_subgraph else None,
    )
    self.nodes[node_id] = node
    self._rebuild_adjacency()
    return self

add_edge

add_edge(source: str | Node, target: str | Node, key_mapping: dict[str, str] | None = None, transform: Callable[[Any], Any] | None = None) -> StateGraph

Add a directed edge between nodes.

Parameters:

Name Type Description Default
source str | Node

Source node ID or Node object

required
target str | Node

Target node ID or Node object

required
key_mapping dict[str, str] | None

Optional key mapping for data transformation

None
transform Callable[[Any], Any] | None

Optional transform function

None

Returns:

Type Description
StateGraph

Self for chaining

Source code in src/locus/multiagent/graph.py
def add_edge(
    self,
    source: str | Node,
    target: str | Node,
    key_mapping: dict[str, str] | None = None,
    transform: Callable[[Any], Any] | None = None,
) -> StateGraph:
    """
    Add a directed edge between nodes.

    Args:
        source: Source node ID or Node object
        target: Target node ID or Node object
        key_mapping: Optional key mapping for data transformation
        transform: Optional transform function

    Returns:
        Self for chaining
    """
    # Support old API: add_edge(Node, Node)
    source_id = source.id if isinstance(source, Node) else source
    target_id = target.id if isinstance(target, Node) else target

    # Allow START and END as valid nodes
    valid_sources = set(self.nodes.keys()) | {START}
    valid_targets = set(self.nodes.keys()) | {END}

    if source_id not in valid_sources:
        raise ValueError(f"Source node not found: {source_id}")
    if target_id not in valid_targets:
        raise ValueError(f"Target node not found: {target_id}")

    edge = Edge(
        source_id=source_id,
        target_id=target_id,
        key_mapping=key_mapping,
        transform=transform,
    )
    self.edges.append(edge)
    self._rebuild_adjacency()

    # Validate no cycles (unless allowed)
    if not self.config.allow_cycles and self._has_cycle():
        self.edges.pop()
        self._rebuild_adjacency()
        raise ValueError(f"Adding edge {source_id} -> {target_id} would create a cycle")

    return self

add_conditional_edges

add_conditional_edges(source: str, router: Callable[[dict[str, Any]], str | list[str]], targets: dict[str, str] | None = None, default: str | None = None) -> StateGraph

Add conditional edges with dynamic routing.

Parameters:

Name Type Description Default
source str

Source node ID

required
router Callable[[dict[str, Any]], str | list[str]]

Function that returns target node ID(s) based on state

required
targets dict[str, str] | None

Optional mapping from router return values to node IDs

None
default str | None

Default target if router returns unmapped value

None

Returns:

Type Description
StateGraph

Self for chaining

Example

def route_by_type(state): return "error" if state["has_error"] else "success"

graph.add_conditional_edges("check", route_by_type, { "error": "handle_error", "success": "continue" })

Source code in src/locus/multiagent/graph.py
def add_conditional_edges(
    self,
    source: str,
    router: Callable[[dict[str, Any]], str | list[str]],
    targets: dict[str, str] | None = None,
    default: str | None = None,
) -> StateGraph:
    """
    Add conditional edges with dynamic routing.

    Args:
        source: Source node ID
        router: Function that returns target node ID(s) based on state
        targets: Optional mapping from router return values to node IDs
        default: Default target if router returns unmapped value

    Returns:
        Self for chaining

    Example:
        def route_by_type(state):
            return "error" if state["has_error"] else "success"

        graph.add_conditional_edges("check", route_by_type, {
            "error": "handle_error",
            "success": "continue"
        })
    """
    if source not in self.nodes:
        raise ValueError(f"Source node not found: {source}")

    cond_edge = ConditionalEdge(
        source_id=source,
        router=router,
        targets=targets or {},
        default_target=default,
    )
    self.conditional_edges.append(cond_edge)
    return self

execute async

execute(inputs: dict[str, Any] | Command | None = None, *, config: GraphConfig | None = None, _event_sink: Any = None) -> GraphResult

Execute the graph.

Parameters:

Name Type Description Default
inputs dict[str, Any] | Command | None

Initial state or Command (for resume)

None
config GraphConfig | None

Optional execution configuration

None
_event_sink Any

Internal-use only. When provided, an async callable (StreamEvent) -> None invoked after each node completes so :meth:stream can yield intermediate events. Public callers should use :meth:stream instead of touching this.

None

Returns:

Type Description
GraphResult

GraphResult with final state and outputs

Source code in src/locus/multiagent/graph.py
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
async def execute(
    self,
    inputs: dict[str, Any] | Command | None = None,
    *,
    config: GraphConfig | None = None,
    _event_sink: Any = None,
) -> GraphResult:
    """
    Execute the graph.

    Args:
        inputs: Initial state or Command (for resume)
        config: Optional execution configuration
        _event_sink: Internal-use only. When provided, an async callable
            ``(StreamEvent) -> None`` invoked after each node completes
            so :meth:`stream` can yield intermediate events. Public
            callers should use :meth:`stream` instead of touching this.

    Returns:
        GraphResult with final state and outputs
    """
    start_time = datetime.now(UTC)
    cfg = config or self.config

    # Handle resume from interrupt
    resume_value = None
    resume_node = None
    if isinstance(inputs, Command) and inputs.has_resume:
        resume_value = inputs.resume
        # Load state from checkpointer if available
        if cfg.checkpointer and cfg.thread_id:
            saved_state = await cfg.checkpointer.load(cfg.thread_id)
            if saved_state:
                inputs = saved_state.metadata.get("graph_state", {})
                resume_node = saved_state.metadata.get("interrupted_node")
                from locus.observability.emit import (  # noqa: PLC0415
                    EV_CHECKPOINT_LOADED,
                    emit,
                )

                await emit(
                    EV_CHECKPOINT_LOADED,
                    thread_id=cfg.thread_id,
                    backend=type(cfg.checkpointer).__name__,
                    resume_node=resume_node,
                )
        else:
            # Without checkpointer, get resume node from state
            state_data = inputs.update or {}
            resume_node = state_data.pop("__resume_node__", None)
            inputs = state_data
    elif isinstance(inputs, Command):
        inputs = inputs.update

    # Initialize state
    state: dict[str, Any] = dict(inputs or {})
    node_results: dict[str, NodeResult] = {}
    execution_order: list[str] = []
    iterations = 0

    # Determine starting node(s)
    if resume_node:
        current_nodes = [resume_node]
    else:
        current_nodes = self._adjacency.get(START, [])
        if not current_nodes and self._entry_point:
            current_nodes = [self._entry_point]

    # Main execution loop
    while current_nodes and iterations < cfg.max_iterations:
        iterations += 1
        next_nodes: list[str] = []

        # Check for interrupt_before
        for node_id in current_nodes:
            if node_id in cfg.interrupt_before:
                # Create a placeholder interrupt value for interrupt_before
                from locus.core.interrupt import InterruptValue

                placeholder_interrupt = InterruptValue(
                    payload={"type": "interrupt_before", "node": node_id},
                    node_id=node_id,
                    graph_id=self.id,
                )
                interrupt_state = InterruptState(
                    interrupt=placeholder_interrupt,
                    node_id=node_id,
                    pending_nodes=current_nodes,
                    state_snapshot=state,
                )
                # Include resume node in final state
                final_state_with_resume = {**state, "__resume_node__": node_id}
                return GraphResult(
                    graph_id=self.id,
                    success=False,
                    node_results=node_results,
                    final_state=final_state_with_resume,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

        # Lazy import — observability is opt-in.
        from locus.observability.emit import (  # noqa: PLC0415
            EV_GRAPH_NODE_COMPLETED,
            EV_GRAPH_NODE_STARTED,
            emit,
        )

        # Execute current nodes (parallel if enabled)
        if cfg.parallel and len(current_nodes) > 1:
            tasks = []
            node_span_ids: dict[str, str] = {}
            for node_id in current_nodes:
                if node_id == END:
                    continue
                node = self.nodes[node_id]
                node_inputs = self._gather_inputs(node_id, state)
                is_resume = node_id == resume_node
                span_id = uuid4().hex[:8]
                node_span_ids[node_id] = span_id
                await emit(
                    EV_GRAPH_NODE_STARTED,
                    graph_id=self.id,
                    node_id=node_id,
                    iteration=iterations,
                    span_id=span_id,
                    parallel=True,
                    is_resuming=is_resume,
                )
                tasks.append(
                    node.execute(
                        node_inputs,
                        resume_value=resume_value if is_resume else None,
                        is_resuming=is_resume,
                    )
                )

            if tasks:
                parallel_started = time.perf_counter()
                results = await asyncio.gather(*tasks)
                for node_id, result in zip(
                    [n for n in current_nodes if n != END],
                    results,
                    strict=True,
                ):
                    node_results[node_id] = result
                    execution_order.append(node_id)
                    await emit(
                        EV_GRAPH_NODE_COMPLETED,
                        graph_id=self.id,
                        node_id=node_id,
                        span_id=node_span_ids.get(node_id),
                        status=str(result.status),
                        duration_ms=(time.perf_counter() - parallel_started) * 1000,
                        parallel=True,
                    )
                    await _emit_node_events(
                        _event_sink, node_id, result, state, cfg.stream_mode
                    )
        else:
            # Sequential execution
            for node_id in current_nodes:
                if node_id == END:
                    continue

                node = self.nodes[node_id]
                node_inputs = self._gather_inputs(node_id, state)
                is_resume = node_id == resume_node
                span_id = uuid4().hex[:8]
                started_at = time.perf_counter()
                await emit(
                    EV_GRAPH_NODE_STARTED,
                    graph_id=self.id,
                    node_id=node_id,
                    iteration=iterations,
                    span_id=span_id,
                    parallel=False,
                    is_resuming=is_resume,
                )
                result = await node.execute(
                    node_inputs,
                    resume_value=resume_value if is_resume else None,
                    is_resuming=is_resume,
                )
                node_results[node_id] = result
                execution_order.append(node_id)
                await emit(
                    EV_GRAPH_NODE_COMPLETED,
                    graph_id=self.id,
                    node_id=node_id,
                    span_id=span_id,
                    status=str(result.status),
                    duration_ms=(time.perf_counter() - started_at) * 1000,
                    parallel=False,
                )
                await _emit_node_events(_event_sink, node_id, result, state, cfg.stream_mode)

        # Clear resume context after first node
        resume_node = None
        resume_value = None

        # Process results and determine next nodes
        for node_id in [n for n in current_nodes if n != END]:
            # Use a different name from the earlier ``result`` so mypy
            # doesn't widen the previously narrowed type.
            node_result = node_results.get(node_id)
            if not node_result:
                continue
            result = node_result

            # Handle interrupt
            if result.status == NodeStatus.INTERRUPTED:
                interrupt_state = InterruptState(
                    interrupt=result.output,
                    node_id=node_id,
                    pending_nodes=[n for n in current_nodes if n != node_id],
                    state_snapshot=state,
                )

                # Save to checkpointer if available
                if cfg.checkpointer and cfg.thread_id:
                    await cfg.checkpointer.save(
                        state=None,
                        thread_id=cfg.thread_id,
                        metadata={
                            "graph_state": state,
                            "interrupted_node": node_id,
                            "interrupt": interrupt_state.model_dump(),
                        },
                    )
                    from locus.observability.emit import (  # noqa: PLC0415
                        EV_CHECKPOINT_SAVED,
                        emit,
                    )

                    await emit(
                        EV_CHECKPOINT_SAVED,
                        thread_id=cfg.thread_id,
                        backend=type(cfg.checkpointer).__name__,
                        trigger="graph_interrupt",
                        interrupted_node=node_id,
                    )

                # Store resume node in state for checkpointer-less resumption
                final_state_with_resume = {**state, "__resume_node__": node_id}

                return GraphResult(
                    graph_id=self.id,
                    success=False,
                    node_results=node_results,
                    final_state=final_state_with_resume,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

            # Handle successful execution
            if result.success:
                # Apply state updates
                update, command = normalize_node_output(result.output)
                if update:
                    state = self._apply_state_update(state, update)
                    # Store raw output under namespaced key to avoid conflicts
                    state[f"_node_{node_id}"] = result.output

                # Handle Send (map-reduce)
                if result.sends:
                    send_results = await self._execute_sends(result.sends, state)
                    for sr in send_results:
                        if sr.success:
                            state[sr.send_id] = sr.result

                # Determine next nodes
                node_next = self._get_next_nodes(node_id, state, command)
                next_nodes.extend(node_next)

            # Check for interrupt_after
            if node_id in cfg.interrupt_after:
                interrupt_state = InterruptState(
                    interrupt=None,  # type: ignore
                    node_id=node_id,
                    pending_nodes=next_nodes,
                    state_snapshot=state,
                )
                return GraphResult(
                    graph_id=self.id,
                    success=True,
                    node_results=node_results,
                    final_state=state,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

        # Check if we've reached END
        if END in current_nodes or END in next_nodes:
            break

        # Move to next nodes (deduplicate)
        current_nodes = list(dict.fromkeys(next_nodes))

    # Calculate duration
    end_time = datetime.now(UTC)
    duration_ms = (end_time - start_time).total_seconds() * 1000

    # Determine final outputs (from last executed nodes)
    final_outputs = {}
    for node_id in reversed(execution_order):
        if node_id in node_results and node_results[node_id].success:
            final_outputs[node_id] = node_results[node_id].output

    # Check success
    success = all(
        r.status in (NodeStatus.COMPLETED, NodeStatus.SKIPPED) for r in node_results.values()
    )

    return GraphResult(
        graph_id=self.id,
        success=success,
        node_results=node_results,
        final_state=state,
        final_outputs=final_outputs,
        execution_order=execution_order,
        duration_ms=duration_ms,
        iterations=iterations,
    )

stream async

stream(inputs: dict[str, Any] | Command | None = None, *, config: GraphConfig | None = None, mode: StreamMode | None = None) -> AsyncIterator[StreamEvent]

Stream graph execution events as nodes complete.

Drives :meth:execute on a background task with an async-queue sink wired in, then yields each node-completion event in real time (instead of collecting them at the end). The final state arrives as the last yielded event in VALUES mode.

Parameters:

Name Type Description Default
inputs dict[str, Any] | Command | None

Initial state or Command (for resume).

None
config GraphConfig | None

Execution configuration.

None
mode StreamMode | None

Stream mode override (VALUES / UPDATES / NODES / DEBUG / CUSTOM). Defaults to config.stream_mode.

None

Yields:

Type Description
AsyncIterator[StreamEvent]

StreamEvent per node, then a terminal VALUES event with

AsyncIterator[StreamEvent]

the final state when mode == VALUES. Re-raises any

AsyncIterator[StreamEvent]

exception the underlying execute raised so callers can react.

Source code in src/locus/multiagent/graph.py
async def stream(
    self,
    inputs: dict[str, Any] | Command | None = None,
    *,
    config: GraphConfig | None = None,
    mode: StreamMode | None = None,
) -> AsyncIterator[StreamEvent]:
    """Stream graph execution events as nodes complete.

    Drives :meth:`execute` on a background task with an async-queue
    sink wired in, then yields each node-completion event in real time
    (instead of collecting them at the end). The final state arrives as
    the last yielded event in ``VALUES`` mode.

    Args:
        inputs: Initial state or Command (for resume).
        config: Execution configuration.
        mode: Stream mode override (``VALUES`` / ``UPDATES`` / ``NODES``
            / ``DEBUG`` / ``CUSTOM``). Defaults to ``config.stream_mode``.

    Yields:
        ``StreamEvent`` per node, then a terminal ``VALUES`` event with
        the final state when ``mode == VALUES``. Re-raises any
        exception the underlying execute raised so callers can react.
    """
    cfg = config or self.config
    # The mode argument is per-call; reflect it in cfg so the sink in
    # execute() emits the right events. We don't mutate the caller's
    # cfg — clone it.
    stream_mode = mode or cfg.stream_mode
    if mode is not None and mode != cfg.stream_mode:
        cfg = cfg.model_copy(update={"stream_mode": mode})

    queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
    sentinel = None  # marks normal completion in the queue

    async def _sink(event: StreamEvent) -> None:
        await queue.put(event)

    async def _drive() -> GraphResult:
        # Make the sink visible to ``emit_custom`` calls inside node
        # bodies via the module-level ContextVar. Reset on exit so we
        # don't leak the sink into unrelated tasks.
        token = _active_stream_sink.set(_sink)
        try:
            return await self.execute(inputs, config=cfg, _event_sink=_sink)
        finally:
            _active_stream_sink.reset(token)
            # Signal end-of-stream regardless of success/error so the
            # consumer never deadlocks.
            await queue.put(sentinel)

    task = asyncio.create_task(_drive())

    consumer_broke_early = False
    try:
        while True:
            event = await queue.get()
            if event is sentinel:
                break
            yield event
    except (GeneratorExit, asyncio.CancelledError):
        consumer_broke_early = True
        raise
    finally:
        # If the consumer broke out early, cancel the driver so the
        # background task doesn't leak.
        if consumer_broke_early and not task.done():
            task.cancel()

    # task is done (or was cancelled). Surface any exception execute
    # raised; otherwise emit the terminal final-state event in
    # VALUES mode.
    result = task.result()
    if stream_mode == StreamMode.VALUES:
        yield StreamEvent(mode=stream_mode, data=result.final_state)

ainvoke async

ainvoke(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> dict[str, Any]

LangChain/LangGraph-compatible alias for execute() returning final_state.

Source code in src/locus/multiagent/graph.py
async def ainvoke(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """LangChain/LangGraph-compatible alias for execute() returning final_state."""
    result = await self.execute(inputs or {}, config=config)
    return result.final_state

invoke

invoke(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> dict[str, Any]

Synchronous entry point — LangGraph parity for CompiledStateGraph.invoke.

Thin sync wrapper around :meth:ainvoke. Refuses to run when called from inside a live event loop — use :meth:ainvoke there instead.

Source code in src/locus/multiagent/graph.py
def invoke(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Synchronous entry point — LangGraph parity for ``CompiledStateGraph.invoke``.

    Thin sync wrapper around :meth:`ainvoke`. Refuses to run when called
    from inside a live event loop — use :meth:`ainvoke` there instead.
    """
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(self.ainvoke(inputs, config=config, **kwargs))
    msg = (
        "StateGraph.invoke() called from inside a running event loop. "
        "Use `await graph.ainvoke(...)` from async code."
    )
    raise RuntimeError(msg)

run_sync

run_sync(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> dict[str, Any]

Alias for :meth:invoke — matches the spelling used in the docs/concepts/multi-agent/graph.md examples.

Source code in src/locus/multiagent/graph.py
def run_sync(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Alias for :meth:`invoke` — matches the spelling used in the
    ``docs/concepts/multi-agent/graph.md`` examples."""
    return self.invoke(inputs, config=config, **kwargs)

astream async

astream(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> AsyncIterator[StreamEvent]

LangChain/LangGraph-compatible alias for stream().

Source code in src/locus/multiagent/graph.py
async def astream(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]:
    """LangChain/LangGraph-compatible alias for stream()."""
    async for event in self.stream(inputs, config=config):
        yield event

get_graph

get_graph() -> StateGraph

LangGraph-compatible alias — returns self.

Self carries .nodes, .edges, .draw_mermaid(), and .draw_ascii() so the LangGraph chain compiled.get_graph().draw_mermaid() works out of the box.

Source code in src/locus/multiagent/graph.py
def get_graph(self) -> StateGraph:
    """LangGraph-compatible alias — returns self.

    Self carries ``.nodes``, ``.edges``, ``.draw_mermaid()``, and
    ``.draw_ascii()`` so the LangGraph chain ``compiled.get_graph().draw_mermaid()``
    works out of the box.
    """
    return self

draw_mermaid

draw_mermaid(*, direction: str = 'TD') -> str

Render this graph as a Mermaid flowchart.

LangGraph parity for compiled.get_graph().draw_mermaid(). Delegates to :func:locus.multiagent.visualize.draw_mermaid.

Source code in src/locus/multiagent/graph.py
def draw_mermaid(self, *, direction: str = "TD") -> str:
    """Render this graph as a Mermaid flowchart.

    LangGraph parity for ``compiled.get_graph().draw_mermaid()``.
    Delegates to :func:`locus.multiagent.visualize.draw_mermaid`.
    """
    from locus.multiagent.visualize import draw_mermaid as _draw_mermaid

    return _draw_mermaid(self, direction=direction)

draw_ascii

draw_ascii() -> str

Render this graph as ASCII.

LangGraph parity for compiled.get_graph().draw_ascii(). Delegates to :func:locus.multiagent.visualize.draw_ascii.

Source code in src/locus/multiagent/graph.py
def draw_ascii(self) -> str:
    """Render this graph as ASCII.

    LangGraph parity for ``compiled.get_graph().draw_ascii()``.
    Delegates to :func:`locus.multiagent.visualize.draw_ascii`.
    """
    from locus.multiagent.visualize import draw_ascii as _draw_ascii

    return _draw_ascii(self)

get_mermaid

get_mermaid(*, direction: str = 'TD') -> str

Alias for :meth:draw_mermaid — matches the spelling used in docs/concepts/multi-agent/graph.md.

Source code in src/locus/multiagent/graph.py
def get_mermaid(self, *, direction: str = "TD") -> str:
    """Alias for :meth:`draw_mermaid` — matches the spelling used in
    ``docs/concepts/multi-agent/graph.md``."""
    return self.draw_mermaid(direction=direction)

aget_state async

aget_state(config: Any = None) -> None

LangGraph-compatible stub — locus uses checkpointer.load directly.

Source code in src/locus/multiagent/graph.py
async def aget_state(self, config: Any = None) -> None:
    """LangGraph-compatible stub — locus uses checkpointer.load directly."""
    return

compile

compile(*, checkpointer: Any | None = None, interrupt_before: list[str] | None = None, interrupt_after: list[str] | None = None, store: Any | None = None) -> StateGraph

Compile the graph with configuration.

Parameters:

Name Type Description Default
checkpointer Any | None

Checkpointer for state persistence

None
interrupt_before list[str] | None

Nodes to pause before

None
interrupt_after list[str] | None

Nodes to pause after

None
store Any | None

Store for cross-thread memory

None

Returns:

Type Description
StateGraph

Configured graph (self)

Source code in src/locus/multiagent/graph.py
def compile(
    self,
    *,
    checkpointer: Any | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    store: Any | None = None,
) -> StateGraph:
    """
    Compile the graph with configuration.

    Args:
        checkpointer: Checkpointer for state persistence
        interrupt_before: Nodes to pause before
        interrupt_after: Nodes to pause after
        store: Store for cross-thread memory

    Returns:
        Configured graph (self)
    """
    if checkpointer:
        self.config.checkpointer = checkpointer
    if interrupt_before:
        self.config.interrupt_before = interrupt_before
    if interrupt_after:
        self.config.interrupt_after = interrupt_after
    if store:
        self.config.store = store
    return self

GraphConfig

Bases: BaseModel

Configuration for graph execution.

Graph

Graph(**data: Any)

Bases: StateGraph

Legacy Graph class for backwards compatibility.

Use StateGraph for new code.

Source code in src/locus/multiagent/graph.py
def __init__(self, **data: Any):
    # Disable cycles by default for legacy Graph
    if "config" not in data:
        data["config"] = GraphConfig(allow_cycles=False)
    super().__init__(**data)

model_post_init

model_post_init(__context: Any) -> None

Initialize after model creation.

Source code in src/locus/multiagent/graph.py
def model_post_init(self, __context: Any) -> None:
    """Initialize after model creation."""
    # Add virtual START and END nodes
    if START not in self.nodes:
        self.nodes[START] = Node(
            id=START,
            name="START",
            executor=lambda x: x,  # Pass-through
        )
    if END not in self.nodes:
        self.nodes[END] = Node(
            id=END,
            name="END",
            executor=lambda x: x,  # Pass-through
        )

    # Extract reducers from state schema
    if self.state_schema:
        from locus.core.reducers import extract_reducers_from_model

        self._reducers = extract_reducers_from_model(self.state_schema)

    self._rebuild_adjacency()

set_entry_point

set_entry_point(node_id: str) -> StateGraph

Set the entry point node (after START).

Source code in src/locus/multiagent/graph.py
def set_entry_point(self, node_id: str) -> StateGraph:
    """Set the entry point node (after START)."""
    if node_id not in self.nodes:
        raise ValueError(f"Node not found: {node_id}")
    self._entry_point = node_id
    # Add edge from START to entry point
    self.add_edge(START, node_id)
    return self

set_finish_point

set_finish_point(node_id: str) -> StateGraph

Set a finish point node (before END).

Source code in src/locus/multiagent/graph.py
def set_finish_point(self, node_id: str) -> StateGraph:
    """Set a finish point node (before END)."""
    if node_id not in self.nodes:
        raise ValueError(f"Node not found: {node_id}")
    self.add_edge(node_id, END)
    return self

add_node

add_node(node_id: str | Node, executor: Callable[..., Any] | StateGraph | None = None, *, description: str = '', condition: Callable[[dict[str, Any]], bool] | None = None, max_retries: int = 0, timeout_ms: float | None = None, retry_policy: RetryPolicy | None = None, cache_policy: CachePolicy | None = None, defer: bool = False) -> StateGraph

Add a node to the graph.

Parameters:

Name Type Description Default
node_id str | Node

Unique identifier for the node, or a Node object (for backward compatibility)

required
executor Callable[..., Any] | StateGraph | None

Function or subgraph to execute (optional if node_id is a Node)

None
description str

Node description

''
condition Callable[[dict[str, Any]], bool] | None

Optional condition for execution

None
max_retries int

Retry attempts on failure

0
timeout_ms float | None

Execution timeout

None

Returns:

Type Description
StateGraph

Self for chaining

Source code in src/locus/multiagent/graph.py
def add_node(
    self,
    node_id: str | Node,
    executor: Callable[..., Any] | StateGraph | None = None,
    *,
    description: str = "",
    condition: Callable[[dict[str, Any]], bool] | None = None,
    max_retries: int = 0,
    timeout_ms: float | None = None,
    retry_policy: RetryPolicy | None = None,
    cache_policy: CachePolicy | None = None,
    defer: bool = False,
) -> StateGraph:
    """
    Add a node to the graph.

    Args:
        node_id: Unique identifier for the node, or a Node object (for backward compatibility)
        executor: Function or subgraph to execute (optional if node_id is a Node)
        description: Node description
        condition: Optional condition for execution
        max_retries: Retry attempts on failure
        timeout_ms: Execution timeout

    Returns:
        Self for chaining
    """
    # Support old API: add_node(Node)
    if isinstance(node_id, Node):
        node = node_id
        if node.id in self.nodes:
            raise ValueError(f"Node already exists: {node.id}")
        self.nodes[node.id] = node
        self._rebuild_adjacency()
        return self

    # New API: add_node(node_id, executor)
    if executor is None:
        raise TypeError("add_node() missing 1 required positional argument: 'executor'")

    if node_id in self.nodes:
        raise ValueError(f"Node already exists: {node_id}")

    # Check if executor is a subgraph
    is_subgraph = isinstance(executor, StateGraph)

    # When ``is_subgraph`` is True, ``executor`` is a StateGraph
    # instance; mypy can't narrow the union without an isinstance
    # check, but the precondition is enforced upstream.
    node_executor: Callable[..., Any] = (
        executor.execute  # type: ignore[union-attr]
        if is_subgraph
        else executor  # type: ignore[assignment]
    )
    node = Node(
        id=node_id,
        name=node_id,
        description=description,
        executor=node_executor,
        condition=condition,
        max_retries=max_retries,
        timeout_ms=timeout_ms,
        retry_policy=retry_policy,
        cache_policy=cache_policy,
        defer=defer,
        is_subgraph=is_subgraph,
        subgraph=executor if is_subgraph else None,
    )
    self.nodes[node_id] = node
    self._rebuild_adjacency()
    return self

add_edge

add_edge(source: str | Node, target: str | Node, key_mapping: dict[str, str] | None = None, transform: Callable[[Any], Any] | None = None) -> StateGraph

Add a directed edge between nodes.

Parameters:

Name Type Description Default
source str | Node

Source node ID or Node object

required
target str | Node

Target node ID or Node object

required
key_mapping dict[str, str] | None

Optional key mapping for data transformation

None
transform Callable[[Any], Any] | None

Optional transform function

None

Returns:

Type Description
StateGraph

Self for chaining

Source code in src/locus/multiagent/graph.py
def add_edge(
    self,
    source: str | Node,
    target: str | Node,
    key_mapping: dict[str, str] | None = None,
    transform: Callable[[Any], Any] | None = None,
) -> StateGraph:
    """
    Add a directed edge between nodes.

    Args:
        source: Source node ID or Node object
        target: Target node ID or Node object
        key_mapping: Optional key mapping for data transformation
        transform: Optional transform function

    Returns:
        Self for chaining
    """
    # Support old API: add_edge(Node, Node)
    source_id = source.id if isinstance(source, Node) else source
    target_id = target.id if isinstance(target, Node) else target

    # Allow START and END as valid nodes
    valid_sources = set(self.nodes.keys()) | {START}
    valid_targets = set(self.nodes.keys()) | {END}

    if source_id not in valid_sources:
        raise ValueError(f"Source node not found: {source_id}")
    if target_id not in valid_targets:
        raise ValueError(f"Target node not found: {target_id}")

    edge = Edge(
        source_id=source_id,
        target_id=target_id,
        key_mapping=key_mapping,
        transform=transform,
    )
    self.edges.append(edge)
    self._rebuild_adjacency()

    # Validate no cycles (unless allowed)
    if not self.config.allow_cycles and self._has_cycle():
        self.edges.pop()
        self._rebuild_adjacency()
        raise ValueError(f"Adding edge {source_id} -> {target_id} would create a cycle")

    return self

add_conditional_edges

add_conditional_edges(source: str, router: Callable[[dict[str, Any]], str | list[str]], targets: dict[str, str] | None = None, default: str | None = None) -> StateGraph

Add conditional edges with dynamic routing.

Parameters:

Name Type Description Default
source str

Source node ID

required
router Callable[[dict[str, Any]], str | list[str]]

Function that returns target node ID(s) based on state

required
targets dict[str, str] | None

Optional mapping from router return values to node IDs

None
default str | None

Default target if router returns unmapped value

None

Returns:

Type Description
StateGraph

Self for chaining

Example

def route_by_type(state): return "error" if state["has_error"] else "success"

graph.add_conditional_edges("check", route_by_type, { "error": "handle_error", "success": "continue" })

Source code in src/locus/multiagent/graph.py
def add_conditional_edges(
    self,
    source: str,
    router: Callable[[dict[str, Any]], str | list[str]],
    targets: dict[str, str] | None = None,
    default: str | None = None,
) -> StateGraph:
    """
    Add conditional edges with dynamic routing.

    Args:
        source: Source node ID
        router: Function that returns target node ID(s) based on state
        targets: Optional mapping from router return values to node IDs
        default: Default target if router returns unmapped value

    Returns:
        Self for chaining

    Example:
        def route_by_type(state):
            return "error" if state["has_error"] else "success"

        graph.add_conditional_edges("check", route_by_type, {
            "error": "handle_error",
            "success": "continue"
        })
    """
    if source not in self.nodes:
        raise ValueError(f"Source node not found: {source}")

    cond_edge = ConditionalEdge(
        source_id=source,
        router=router,
        targets=targets or {},
        default_target=default,
    )
    self.conditional_edges.append(cond_edge)
    return self

execute async

execute(inputs: dict[str, Any] | Command | None = None, *, config: GraphConfig | None = None, _event_sink: Any = None) -> GraphResult

Execute the graph.

Parameters:

Name Type Description Default
inputs dict[str, Any] | Command | None

Initial state or Command (for resume)

None
config GraphConfig | None

Optional execution configuration

None
_event_sink Any

Internal-use only. When provided, an async callable (StreamEvent) -> None invoked after each node completes so :meth:stream can yield intermediate events. Public callers should use :meth:stream instead of touching this.

None

Returns:

Type Description
GraphResult

GraphResult with final state and outputs

Source code in src/locus/multiagent/graph.py
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
async def execute(
    self,
    inputs: dict[str, Any] | Command | None = None,
    *,
    config: GraphConfig | None = None,
    _event_sink: Any = None,
) -> GraphResult:
    """
    Execute the graph.

    Args:
        inputs: Initial state or Command (for resume)
        config: Optional execution configuration
        _event_sink: Internal-use only. When provided, an async callable
            ``(StreamEvent) -> None`` invoked after each node completes
            so :meth:`stream` can yield intermediate events. Public
            callers should use :meth:`stream` instead of touching this.

    Returns:
        GraphResult with final state and outputs
    """
    start_time = datetime.now(UTC)
    cfg = config or self.config

    # Handle resume from interrupt
    resume_value = None
    resume_node = None
    if isinstance(inputs, Command) and inputs.has_resume:
        resume_value = inputs.resume
        # Load state from checkpointer if available
        if cfg.checkpointer and cfg.thread_id:
            saved_state = await cfg.checkpointer.load(cfg.thread_id)
            if saved_state:
                inputs = saved_state.metadata.get("graph_state", {})
                resume_node = saved_state.metadata.get("interrupted_node")
                from locus.observability.emit import (  # noqa: PLC0415
                    EV_CHECKPOINT_LOADED,
                    emit,
                )

                await emit(
                    EV_CHECKPOINT_LOADED,
                    thread_id=cfg.thread_id,
                    backend=type(cfg.checkpointer).__name__,
                    resume_node=resume_node,
                )
        else:
            # Without checkpointer, get resume node from state
            state_data = inputs.update or {}
            resume_node = state_data.pop("__resume_node__", None)
            inputs = state_data
    elif isinstance(inputs, Command):
        inputs = inputs.update

    # Initialize state
    state: dict[str, Any] = dict(inputs or {})
    node_results: dict[str, NodeResult] = {}
    execution_order: list[str] = []
    iterations = 0

    # Determine starting node(s)
    if resume_node:
        current_nodes = [resume_node]
    else:
        current_nodes = self._adjacency.get(START, [])
        if not current_nodes and self._entry_point:
            current_nodes = [self._entry_point]

    # Main execution loop
    while current_nodes and iterations < cfg.max_iterations:
        iterations += 1
        next_nodes: list[str] = []

        # Check for interrupt_before
        for node_id in current_nodes:
            if node_id in cfg.interrupt_before:
                # Create a placeholder interrupt value for interrupt_before
                from locus.core.interrupt import InterruptValue

                placeholder_interrupt = InterruptValue(
                    payload={"type": "interrupt_before", "node": node_id},
                    node_id=node_id,
                    graph_id=self.id,
                )
                interrupt_state = InterruptState(
                    interrupt=placeholder_interrupt,
                    node_id=node_id,
                    pending_nodes=current_nodes,
                    state_snapshot=state,
                )
                # Include resume node in final state
                final_state_with_resume = {**state, "__resume_node__": node_id}
                return GraphResult(
                    graph_id=self.id,
                    success=False,
                    node_results=node_results,
                    final_state=final_state_with_resume,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

        # Lazy import — observability is opt-in.
        from locus.observability.emit import (  # noqa: PLC0415
            EV_GRAPH_NODE_COMPLETED,
            EV_GRAPH_NODE_STARTED,
            emit,
        )

        # Execute current nodes (parallel if enabled)
        if cfg.parallel and len(current_nodes) > 1:
            tasks = []
            node_span_ids: dict[str, str] = {}
            for node_id in current_nodes:
                if node_id == END:
                    continue
                node = self.nodes[node_id]
                node_inputs = self._gather_inputs(node_id, state)
                is_resume = node_id == resume_node
                span_id = uuid4().hex[:8]
                node_span_ids[node_id] = span_id
                await emit(
                    EV_GRAPH_NODE_STARTED,
                    graph_id=self.id,
                    node_id=node_id,
                    iteration=iterations,
                    span_id=span_id,
                    parallel=True,
                    is_resuming=is_resume,
                )
                tasks.append(
                    node.execute(
                        node_inputs,
                        resume_value=resume_value if is_resume else None,
                        is_resuming=is_resume,
                    )
                )

            if tasks:
                parallel_started = time.perf_counter()
                results = await asyncio.gather(*tasks)
                for node_id, result in zip(
                    [n for n in current_nodes if n != END],
                    results,
                    strict=True,
                ):
                    node_results[node_id] = result
                    execution_order.append(node_id)
                    await emit(
                        EV_GRAPH_NODE_COMPLETED,
                        graph_id=self.id,
                        node_id=node_id,
                        span_id=node_span_ids.get(node_id),
                        status=str(result.status),
                        duration_ms=(time.perf_counter() - parallel_started) * 1000,
                        parallel=True,
                    )
                    await _emit_node_events(
                        _event_sink, node_id, result, state, cfg.stream_mode
                    )
        else:
            # Sequential execution
            for node_id in current_nodes:
                if node_id == END:
                    continue

                node = self.nodes[node_id]
                node_inputs = self._gather_inputs(node_id, state)
                is_resume = node_id == resume_node
                span_id = uuid4().hex[:8]
                started_at = time.perf_counter()
                await emit(
                    EV_GRAPH_NODE_STARTED,
                    graph_id=self.id,
                    node_id=node_id,
                    iteration=iterations,
                    span_id=span_id,
                    parallel=False,
                    is_resuming=is_resume,
                )
                result = await node.execute(
                    node_inputs,
                    resume_value=resume_value if is_resume else None,
                    is_resuming=is_resume,
                )
                node_results[node_id] = result
                execution_order.append(node_id)
                await emit(
                    EV_GRAPH_NODE_COMPLETED,
                    graph_id=self.id,
                    node_id=node_id,
                    span_id=span_id,
                    status=str(result.status),
                    duration_ms=(time.perf_counter() - started_at) * 1000,
                    parallel=False,
                )
                await _emit_node_events(_event_sink, node_id, result, state, cfg.stream_mode)

        # Clear resume context after first node
        resume_node = None
        resume_value = None

        # Process results and determine next nodes
        for node_id in [n for n in current_nodes if n != END]:
            # Use a different name from the earlier ``result`` so mypy
            # doesn't widen the previously narrowed type.
            node_result = node_results.get(node_id)
            if not node_result:
                continue
            result = node_result

            # Handle interrupt
            if result.status == NodeStatus.INTERRUPTED:
                interrupt_state = InterruptState(
                    interrupt=result.output,
                    node_id=node_id,
                    pending_nodes=[n for n in current_nodes if n != node_id],
                    state_snapshot=state,
                )

                # Save to checkpointer if available
                if cfg.checkpointer and cfg.thread_id:
                    await cfg.checkpointer.save(
                        state=None,
                        thread_id=cfg.thread_id,
                        metadata={
                            "graph_state": state,
                            "interrupted_node": node_id,
                            "interrupt": interrupt_state.model_dump(),
                        },
                    )
                    from locus.observability.emit import (  # noqa: PLC0415
                        EV_CHECKPOINT_SAVED,
                        emit,
                    )

                    await emit(
                        EV_CHECKPOINT_SAVED,
                        thread_id=cfg.thread_id,
                        backend=type(cfg.checkpointer).__name__,
                        trigger="graph_interrupt",
                        interrupted_node=node_id,
                    )

                # Store resume node in state for checkpointer-less resumption
                final_state_with_resume = {**state, "__resume_node__": node_id}

                return GraphResult(
                    graph_id=self.id,
                    success=False,
                    node_results=node_results,
                    final_state=final_state_with_resume,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

            # Handle successful execution
            if result.success:
                # Apply state updates
                update, command = normalize_node_output(result.output)
                if update:
                    state = self._apply_state_update(state, update)
                    # Store raw output under namespaced key to avoid conflicts
                    state[f"_node_{node_id}"] = result.output

                # Handle Send (map-reduce)
                if result.sends:
                    send_results = await self._execute_sends(result.sends, state)
                    for sr in send_results:
                        if sr.success:
                            state[sr.send_id] = sr.result

                # Determine next nodes
                node_next = self._get_next_nodes(node_id, state, command)
                next_nodes.extend(node_next)

            # Check for interrupt_after
            if node_id in cfg.interrupt_after:
                interrupt_state = InterruptState(
                    interrupt=None,  # type: ignore
                    node_id=node_id,
                    pending_nodes=next_nodes,
                    state_snapshot=state,
                )
                return GraphResult(
                    graph_id=self.id,
                    success=True,
                    node_results=node_results,
                    final_state=state,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

        # Check if we've reached END
        if END in current_nodes or END in next_nodes:
            break

        # Move to next nodes (deduplicate)
        current_nodes = list(dict.fromkeys(next_nodes))

    # Calculate duration
    end_time = datetime.now(UTC)
    duration_ms = (end_time - start_time).total_seconds() * 1000

    # Determine final outputs (from last executed nodes)
    final_outputs = {}
    for node_id in reversed(execution_order):
        if node_id in node_results and node_results[node_id].success:
            final_outputs[node_id] = node_results[node_id].output

    # Check success
    success = all(
        r.status in (NodeStatus.COMPLETED, NodeStatus.SKIPPED) for r in node_results.values()
    )

    return GraphResult(
        graph_id=self.id,
        success=success,
        node_results=node_results,
        final_state=state,
        final_outputs=final_outputs,
        execution_order=execution_order,
        duration_ms=duration_ms,
        iterations=iterations,
    )

stream async

stream(inputs: dict[str, Any] | Command | None = None, *, config: GraphConfig | None = None, mode: StreamMode | None = None) -> AsyncIterator[StreamEvent]

Stream graph execution events as nodes complete.

Drives :meth:execute on a background task with an async-queue sink wired in, then yields each node-completion event in real time (instead of collecting them at the end). The final state arrives as the last yielded event in VALUES mode.

Parameters:

Name Type Description Default
inputs dict[str, Any] | Command | None

Initial state or Command (for resume).

None
config GraphConfig | None

Execution configuration.

None
mode StreamMode | None

Stream mode override (VALUES / UPDATES / NODES / DEBUG / CUSTOM). Defaults to config.stream_mode.

None

Yields:

Type Description
AsyncIterator[StreamEvent]

StreamEvent per node, then a terminal VALUES event with

AsyncIterator[StreamEvent]

the final state when mode == VALUES. Re-raises any

AsyncIterator[StreamEvent]

exception the underlying execute raised so callers can react.

Source code in src/locus/multiagent/graph.py
async def stream(
    self,
    inputs: dict[str, Any] | Command | None = None,
    *,
    config: GraphConfig | None = None,
    mode: StreamMode | None = None,
) -> AsyncIterator[StreamEvent]:
    """Stream graph execution events as nodes complete.

    Drives :meth:`execute` on a background task with an async-queue
    sink wired in, then yields each node-completion event in real time
    (instead of collecting them at the end). The final state arrives as
    the last yielded event in ``VALUES`` mode.

    Args:
        inputs: Initial state or Command (for resume).
        config: Execution configuration.
        mode: Stream mode override (``VALUES`` / ``UPDATES`` / ``NODES``
            / ``DEBUG`` / ``CUSTOM``). Defaults to ``config.stream_mode``.

    Yields:
        ``StreamEvent`` per node, then a terminal ``VALUES`` event with
        the final state when ``mode == VALUES``. Re-raises any
        exception the underlying execute raised so callers can react.
    """
    cfg = config or self.config
    # The mode argument is per-call; reflect it in cfg so the sink in
    # execute() emits the right events. We don't mutate the caller's
    # cfg — clone it.
    stream_mode = mode or cfg.stream_mode
    if mode is not None and mode != cfg.stream_mode:
        cfg = cfg.model_copy(update={"stream_mode": mode})

    queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
    sentinel = None  # marks normal completion in the queue

    async def _sink(event: StreamEvent) -> None:
        await queue.put(event)

    async def _drive() -> GraphResult:
        # Make the sink visible to ``emit_custom`` calls inside node
        # bodies via the module-level ContextVar. Reset on exit so we
        # don't leak the sink into unrelated tasks.
        token = _active_stream_sink.set(_sink)
        try:
            return await self.execute(inputs, config=cfg, _event_sink=_sink)
        finally:
            _active_stream_sink.reset(token)
            # Signal end-of-stream regardless of success/error so the
            # consumer never deadlocks.
            await queue.put(sentinel)

    task = asyncio.create_task(_drive())

    consumer_broke_early = False
    try:
        while True:
            event = await queue.get()
            if event is sentinel:
                break
            yield event
    except (GeneratorExit, asyncio.CancelledError):
        consumer_broke_early = True
        raise
    finally:
        # If the consumer broke out early, cancel the driver so the
        # background task doesn't leak.
        if consumer_broke_early and not task.done():
            task.cancel()

    # task is done (or was cancelled). Surface any exception execute
    # raised; otherwise emit the terminal final-state event in
    # VALUES mode.
    result = task.result()
    if stream_mode == StreamMode.VALUES:
        yield StreamEvent(mode=stream_mode, data=result.final_state)

ainvoke async

ainvoke(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> dict[str, Any]

LangChain/LangGraph-compatible alias for execute() returning final_state.

Source code in src/locus/multiagent/graph.py
async def ainvoke(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """LangChain/LangGraph-compatible alias for execute() returning final_state."""
    result = await self.execute(inputs or {}, config=config)
    return result.final_state

invoke

invoke(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> dict[str, Any]

Synchronous entry point — LangGraph parity for CompiledStateGraph.invoke.

Thin sync wrapper around :meth:ainvoke. Refuses to run when called from inside a live event loop — use :meth:ainvoke there instead.

Source code in src/locus/multiagent/graph.py
def invoke(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Synchronous entry point — LangGraph parity for ``CompiledStateGraph.invoke``.

    Thin sync wrapper around :meth:`ainvoke`. Refuses to run when called
    from inside a live event loop — use :meth:`ainvoke` there instead.
    """
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(self.ainvoke(inputs, config=config, **kwargs))
    msg = (
        "StateGraph.invoke() called from inside a running event loop. "
        "Use `await graph.ainvoke(...)` from async code."
    )
    raise RuntimeError(msg)

run_sync

run_sync(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> dict[str, Any]

Alias for :meth:invoke — matches the spelling used in the docs/concepts/multi-agent/graph.md examples.

Source code in src/locus/multiagent/graph.py
def run_sync(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """Alias for :meth:`invoke` — matches the spelling used in the
    ``docs/concepts/multi-agent/graph.md`` examples."""
    return self.invoke(inputs, config=config, **kwargs)

astream async

astream(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> AsyncIterator[StreamEvent]

LangChain/LangGraph-compatible alias for stream().

Source code in src/locus/multiagent/graph.py
async def astream(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]:
    """LangChain/LangGraph-compatible alias for stream()."""
    async for event in self.stream(inputs, config=config):
        yield event

get_graph

get_graph() -> StateGraph

LangGraph-compatible alias — returns self.

Self carries .nodes, .edges, .draw_mermaid(), and .draw_ascii() so the LangGraph chain compiled.get_graph().draw_mermaid() works out of the box.

Source code in src/locus/multiagent/graph.py
def get_graph(self) -> StateGraph:
    """LangGraph-compatible alias — returns self.

    Self carries ``.nodes``, ``.edges``, ``.draw_mermaid()``, and
    ``.draw_ascii()`` so the LangGraph chain ``compiled.get_graph().draw_mermaid()``
    works out of the box.
    """
    return self

draw_mermaid

draw_mermaid(*, direction: str = 'TD') -> str

Render this graph as a Mermaid flowchart.

LangGraph parity for compiled.get_graph().draw_mermaid(). Delegates to :func:locus.multiagent.visualize.draw_mermaid.

Source code in src/locus/multiagent/graph.py
def draw_mermaid(self, *, direction: str = "TD") -> str:
    """Render this graph as a Mermaid flowchart.

    LangGraph parity for ``compiled.get_graph().draw_mermaid()``.
    Delegates to :func:`locus.multiagent.visualize.draw_mermaid`.
    """
    from locus.multiagent.visualize import draw_mermaid as _draw_mermaid

    return _draw_mermaid(self, direction=direction)

draw_ascii

draw_ascii() -> str

Render this graph as ASCII.

LangGraph parity for compiled.get_graph().draw_ascii(). Delegates to :func:locus.multiagent.visualize.draw_ascii.

Source code in src/locus/multiagent/graph.py
def draw_ascii(self) -> str:
    """Render this graph as ASCII.

    LangGraph parity for ``compiled.get_graph().draw_ascii()``.
    Delegates to :func:`locus.multiagent.visualize.draw_ascii`.
    """
    from locus.multiagent.visualize import draw_ascii as _draw_ascii

    return _draw_ascii(self)

get_mermaid

get_mermaid(*, direction: str = 'TD') -> str

Alias for :meth:draw_mermaid — matches the spelling used in docs/concepts/multi-agent/graph.md.

Source code in src/locus/multiagent/graph.py
def get_mermaid(self, *, direction: str = "TD") -> str:
    """Alias for :meth:`draw_mermaid` — matches the spelling used in
    ``docs/concepts/multi-agent/graph.md``."""
    return self.draw_mermaid(direction=direction)

aget_state async

aget_state(config: Any = None) -> None

LangGraph-compatible stub — locus uses checkpointer.load directly.

Source code in src/locus/multiagent/graph.py
async def aget_state(self, config: Any = None) -> None:
    """LangGraph-compatible stub — locus uses checkpointer.load directly."""
    return

compile

compile(*, checkpointer: Any | None = None, interrupt_before: list[str] | None = None, interrupt_after: list[str] | None = None, store: Any | None = None) -> StateGraph

Compile the graph with configuration.

Parameters:

Name Type Description Default
checkpointer Any | None

Checkpointer for state persistence

None
interrupt_before list[str] | None

Nodes to pause before

None
interrupt_after list[str] | None

Nodes to pause after

None
store Any | None

Store for cross-thread memory

None

Returns:

Type Description
StateGraph

Configured graph (self)

Source code in src/locus/multiagent/graph.py
def compile(
    self,
    *,
    checkpointer: Any | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    store: Any | None = None,
) -> StateGraph:
    """
    Compile the graph with configuration.

    Args:
        checkpointer: Checkpointer for state persistence
        interrupt_before: Nodes to pause before
        interrupt_after: Nodes to pause after
        store: Store for cross-thread memory

    Returns:
        Configured graph (self)
    """
    if checkpointer:
        self.config.checkpointer = checkpointer
    if interrupt_before:
        self.config.interrupt_before = interrupt_before
    if interrupt_after:
        self.config.interrupt_after = interrupt_after
    if store:
        self.config.store = store
    return self

GraphResult

Bases: BaseModel

Result from executing a graph.

is_interrupted property

is_interrupted: bool

Whether execution was interrupted for human input.

Node

Bases: BaseModel

A node in the execution graph.

Wraps an agent or callable that processes inputs and produces outputs. Supports retry policies with exponential backoff and result caching.

execute async

execute(inputs: dict[str, Any], *, resume_value: Any = None, is_resuming: bool = False) -> NodeResult

Execute the node with given inputs.

Parameters:

Name Type Description Default
inputs dict[str, Any]

Dictionary of inputs from upstream nodes

required
resume_value Any

Value to pass if resuming from interrupt

None
is_resuming bool

Whether we're resuming from an interrupt

False

Returns:

Type Description
NodeResult

NodeResult with output or error

Source code in src/locus/multiagent/graph.py
async def execute(
    self,
    inputs: dict[str, Any],
    *,
    resume_value: Any = None,
    is_resuming: bool = False,
) -> NodeResult:
    """
    Execute the node with given inputs.

    Args:
        inputs: Dictionary of inputs from upstream nodes
        resume_value: Value to pass if resuming from interrupt
        is_resuming: Whether we're resuming from an interrupt

    Returns:
        NodeResult with output or error
    """
    started_at = datetime.now(UTC)
    attempts = 0

    # Determine retry limit from policy or legacy config
    max_attempts = self.retry_policy.max_attempts if self.retry_policy else self.max_retries + 1

    # Check cache
    if self.cache_policy and self.cache_policy.enabled:
        import hashlib
        import json as _json
        import time as _time

        cache_key = hashlib.sha256(  # noqa: S324
            f"{self.id}:{_json.dumps(inputs, sort_keys=True, default=str)}".encode()
        ).hexdigest()
        cached = _node_cache.get(cache_key)
        if cached is not None:
            cached_output, cached_time = cached
            if _time.time() - cached_time < self.cache_policy.ttl_seconds:
                return NodeResult(
                    node_id=self.id,
                    status=NodeStatus.COMPLETED,
                    output=cached_output,
                    started_at=started_at,
                    completed_at=datetime.now(UTC),
                    duration_ms=0.0,
                )

    while attempts < max_attempts:
        try:
            # Check condition
            if self.condition is not None and not self.condition(inputs):
                return NodeResult(
                    node_id=self.id,
                    status=NodeStatus.SKIPPED,
                    started_at=started_at,
                    completed_at=datetime.now(UTC),
                )

            # Set up execution context for interrupt handling
            async with NodeExecutionContext(
                node_id=self.id,
                resume_value=resume_value,
                is_resuming=is_resuming,
            ):
                # Handle subgraph execution
                if self.is_subgraph and self.subgraph is not None:
                    subgraph_result = await self.subgraph.execute(inputs)
                    # Use final_state which has all merged outputs from the subgraph
                    output = subgraph_result.final_state
                else:
                    # Execute with optional timeout.
                    # Check the executor itself AND its __call__ method so that
                    # callable class instances with async __call__ are awaited
                    # correctly (asyncio.iscoroutinefunction returns False for
                    # instances but True for bound methods and async functions).
                    _exec_call = getattr(type(self.executor), "__call__", None)  # noqa: B004
                    _is_async = asyncio.iscoroutinefunction(self.executor) or (
                        _exec_call is not None and asyncio.iscoroutinefunction(_exec_call)
                    )
                    if _is_async:
                        coro = self.executor(inputs)
                    else:
                        # Wrap sync function
                        loop = asyncio.get_running_loop()
                        coro = loop.run_in_executor(None, lambda: self.executor(inputs))

                    if self.timeout_ms:
                        output = await asyncio.wait_for(
                            coro,
                            timeout=self.timeout_ms / 1000,
                        )
                    else:
                        output = await coro

            completed_at = datetime.now(UTC)
            duration_ms = (completed_at - started_at).total_seconds() * 1000

            # Parse output for Commands and Sends
            command = None
            sends = None

            if is_command(output):
                command = output
            elif is_send_list(output) or isinstance(output, Send):
                sends = normalize_sends(output)

            # Store in cache if policy set
            if self.cache_policy and self.cache_policy.enabled:
                import time as _time

                _node_cache[cache_key] = (output, _time.time())

            return NodeResult(
                node_id=self.id,
                status=NodeStatus.COMPLETED,
                output=output,
                duration_ms=duration_ms,
                started_at=started_at,
                completed_at=completed_at,
                command=command,
                sends=sends,
            )

        except InterruptException as e:
            # Node is requesting human input
            completed_at = datetime.now(UTC)
            duration_ms = (completed_at - started_at).total_seconds() * 1000
            return NodeResult(
                node_id=self.id,
                status=NodeStatus.INTERRUPTED,
                output=e.value,
                duration_ms=duration_ms,
                started_at=started_at,
                completed_at=completed_at,
            )

        except TimeoutError:
            completed_at = datetime.now(UTC)
            return NodeResult(
                node_id=self.id,
                status=NodeStatus.FAILED,
                error=f"Node execution timed out after {self.timeout_ms}ms",
                duration_ms=self.timeout_ms,
                started_at=started_at,
                completed_at=completed_at,
            )

        except Exception as e:  # noqa: BLE001
            attempts += 1
            if attempts >= max_attempts:
                completed_at = datetime.now(UTC)
                duration_ms = (completed_at - started_at).total_seconds() * 1000
                return NodeResult(
                    node_id=self.id,
                    status=NodeStatus.FAILED,
                    error=str(e),
                    duration_ms=duration_ms,
                    started_at=started_at,
                    completed_at=completed_at,
                )

            # Calculate delay from policy or legacy config
            if self.retry_policy:
                delay = self.retry_policy.get_delay(attempts - 1)
            else:
                delay = self.retry_delay_ms / 1000
            await asyncio.sleep(delay)

    # Should not reach here
    return NodeResult(
        node_id=self.id,
        status=NodeStatus.FAILED,
        error="Unexpected execution path",
    )

NodeResult

Bases: BaseModel

Result from executing a node.

success property

success: bool

Whether the node executed successfully.

NodeStatus

Bases: StrEnum

Status of a node in the graph.

Edge

Bases: BaseModel

A directed edge connecting two nodes in the graph.

Represents data flow from source to target.

apply

apply(source_output: Any) -> dict[str, Any]

Transform source output to target input.

Source code in src/locus/multiagent/graph.py
def apply(self, source_output: Any) -> dict[str, Any]:
    """Transform source output to target input."""
    # Apply transformation if provided
    if self.transform is not None:
        source_output = self.transform(source_output)

    # Apply key mapping
    if self.key_mapping is not None:
        if isinstance(source_output, dict):
            return {
                target_key: source_output.get(source_key)
                for source_key, target_key in self.key_mapping.items()
            }
        first_target = next(iter(self.key_mapping.values()), self.source_id)
        return {first_target: source_output}

    # Default: pass entire output under source node id
    return {self.source_id: source_output}

ConditionalEdge

Bases: BaseModel

A conditional edge with dynamic target selection.

The router function determines which target to route to based on state.

resolve_target

resolve_target(state: dict[str, Any]) -> list[str]

Resolve the target node(s) based on state.

Source code in src/locus/multiagent/graph.py
def resolve_target(self, state: dict[str, Any]) -> list[str]:
    """Resolve the target node(s) based on state."""
    result = self.router(state)

    if isinstance(result, list):
        # Multiple targets (parallel execution)
        targets = []
        for r in result:
            if r in self.targets:
                targets.append(self.targets[r])
            elif self.default_target:
                targets.append(self.default_target)
            else:
                targets.append(r)
        return targets

    # Single target
    if result in self.targets:
        target = self.targets[result]
    elif self.default_target:
        target = self.default_target
    else:
        target = result
    return [target] if target else []

CachePolicy

Bases: BaseModel

Cache policy for node execution results.

Caches node outputs to avoid re-computation for identical inputs.

Example

node = Node( name="expensive_lookup", executor=lookup, cache_policy=CachePolicy(ttl_seconds=300), )

RetryPolicy

Bases: BaseModel

Retry policy for node execution.

Exponential backoff with optional jitter, matching LangGraph's pattern.

Example

node = Node( name="api_call", executor=call_api, retry_policy=RetryPolicy(max_attempts=3, backoff_factor=2.0), )

get_delay

get_delay(attempt: int) -> float

Calculate delay for a given attempt number.

Source code in src/locus/multiagent/graph.py
def get_delay(self, attempt: int) -> float:
    """Calculate delay for a given attempt number."""
    import random

    delay = min(
        self.initial_interval * (self.backoff_factor**attempt),
        self.max_interval,
    )
    if self.jitter:
        delay *= 0.5 + random.random() * 0.5  # noqa: S311
    return delay

StreamMode

Bases: StrEnum

Streaming output modes.

StreamEvent

Bases: BaseModel

Event emitted during streaming execution.

Special nodes

START module-attribute

START = '__START__'

END module-attribute

END = '__END__'

Convenience builders

create_graph

create_graph(name: str = '', description: str = '', allow_cycles: bool = False) -> StateGraph

Create a new graph.

Source code in src/locus/multiagent/graph.py
def create_graph(
    name: str = "",
    description: str = "",
    allow_cycles: bool = False,
) -> StateGraph:
    """Create a new graph."""
    config = GraphConfig(allow_cycles=allow_cycles)
    return StateGraph(name=name, description=description, config=config)

node

node(name: str, executor: Callable[..., Any], *, description: str = '', condition: Callable[[dict[str, Any]], bool] | None = None, max_retries: int = 0, timeout_ms: float | None = None) -> Node

Create a node with the given executor.

Source code in src/locus/multiagent/graph.py
def node(
    name: str,
    executor: Callable[..., Any],
    *,
    description: str = "",
    condition: Callable[[dict[str, Any]], bool] | None = None,
    max_retries: int = 0,
    timeout_ms: float | None = None,
) -> Node:
    """Create a node with the given executor."""
    return Node(
        name=name,
        description=description,
        executor=executor,
        condition=condition,
        max_retries=max_retries,
        timeout_ms=timeout_ms,
    )

emit_custom async

emit_custom(data: Any, *, node_id: str | None = None) -> None

Emit a StreamEvent(mode=CUSTOM) from inside a graph node.

Use this when your node wants to surface progress / partial output that isn't a state update. The event reaches consumers of StateGraph.stream() immediately; outside a streaming context it's a silent no-op so the same node code works under execute() too.

Example::

async def long_running_node(state):
    for i in range(10):
        await emit_custom({"progress": i / 10})
        await asyncio.sleep(0.1)
    return {"done": True}
Source code in src/locus/multiagent/graph.py
async def emit_custom(data: Any, *, node_id: str | None = None) -> None:
    """Emit a ``StreamEvent(mode=CUSTOM)`` from inside a graph node.

    Use this when your node wants to surface progress / partial output that
    isn't a state update. The event reaches consumers of
    ``StateGraph.stream()`` immediately; outside a streaming context it's
    a silent no-op so the same node code works under ``execute()`` too.

    Example::

        async def long_running_node(state):
            for i in range(10):
                await emit_custom({"progress": i / 10})
                await asyncio.sleep(0.1)
            return {"done": True}
    """
    sink = _active_stream_sink.get()
    if sink is None:
        return
    await sink(StreamEvent(mode=StreamMode.CUSTOM, node_id=node_id, data=data))

Functional API

task

task(fn: Callable | None = None, *, name: str | None = None, retry_attempts: int = 1, cache: bool = False) -> Any

Decorator that marks a function as a parallelizable task.

Tasks are tracked within an entrypoint for monitoring and can be configured with retry and caching.

Parameters:

Name Type Description Default
fn Callable | None

The function to decorate.

None
name str | None

Task name (defaults to function name).

None
retry_attempts int

Number of retry attempts on failure.

1
cache bool

If True, cache results for identical arguments.

False
Example

@task async def fetch(url: str) -> dict: return await httpx.get(url).json()

@task(retry_attempts=3) async def unreliable_api(query: str) -> str: return await call_api(query)

Source code in src/locus/multiagent/functional.py
def task(
    fn: Callable | None = None,
    *,
    name: str | None = None,
    retry_attempts: int = 1,
    cache: bool = False,
) -> Any:
    """Decorator that marks a function as a parallelizable task.

    Tasks are tracked within an entrypoint for monitoring and can be
    configured with retry and caching.

    Args:
        fn: The function to decorate.
        name: Task name (defaults to function name).
        retry_attempts: Number of retry attempts on failure.
        cache: If True, cache results for identical arguments.

    Example:
        @task
        async def fetch(url: str) -> dict:
            return await httpx.get(url).json()

        @task(retry_attempts=3)
        async def unreliable_api(query: str) -> str:
            return await call_api(query)
    """

    def decorator(func: Callable) -> Callable:
        task_name = name or func.__name__
        _cache: dict[str, Any] = {}

        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            start = time.perf_counter()

            # Check cache
            if cache:
                cache_key = f"{args}:{kwargs}"
                if cache_key in _cache:
                    return _cache[cache_key]

            last_error = None
            for attempt in range(retry_attempts):
                try:
                    if asyncio.iscoroutinefunction(func):
                        result = await func(*args, **kwargs)
                    else:
                        result = func(*args, **kwargs)

                    duration = (time.perf_counter() - start) * 1000
                    _current_tasks.append(
                        TaskResult(
                            value=result,
                            duration_ms=duration,
                            task_name=task_name,
                        )
                    )

                    if cache:
                        _cache[cache_key] = result

                    return result

                except Exception as e:  # noqa: BLE001 — retry-any-failure semantics for user task bodies
                    last_error = e
                    if attempt < retry_attempts - 1:
                        await asyncio.sleep(0.1 * (attempt + 1))

            duration = (time.perf_counter() - start) * 1000
            _current_tasks.append(
                TaskResult(
                    duration_ms=duration,
                    task_name=task_name,
                    error=str(last_error),
                )
            )
            raise last_error  # type: ignore[misc]

        wrapper._is_task = True  # type: ignore[attr-defined]  # noqa: SLF001
        wrapper._task_name = task_name  # type: ignore[attr-defined]  # noqa: SLF001
        return wrapper

    if fn is not None:
        return decorator(fn)
    return decorator

entrypoint

entrypoint(fn: Callable | None = None, *, name: str | None = None) -> Any

Decorator that marks a function as a workflow entrypoint.

The entrypoint is the top-level function that orchestrates tasks. It tracks all task executions and returns an EntrypointResult.

Parameters:

Name Type Description Default
fn Callable | None

The function to decorate.

None
name str | None

Entrypoint name (defaults to function name).

None
Example

@entrypoint async def my_workflow(input_data: str) -> str: step1 = await fetch(input_data) step2 = await process(step1) return step2

result = await my_workflow("hello")

result is the raw return value

Access metadata via my_workflow.last_result

Source code in src/locus/multiagent/functional.py
def entrypoint(
    fn: Callable | None = None,
    *,
    name: str | None = None,
) -> Any:
    """Decorator that marks a function as a workflow entrypoint.

    The entrypoint is the top-level function that orchestrates tasks.
    It tracks all task executions and returns an EntrypointResult.

    Args:
        fn: The function to decorate.
        name: Entrypoint name (defaults to function name).

    Example:
        @entrypoint
        async def my_workflow(input_data: str) -> str:
            step1 = await fetch(input_data)
            step2 = await process(step1)
            return step2

        result = await my_workflow("hello")
        # result is the raw return value
        # Access metadata via my_workflow.last_result
    """

    def decorator(func: Callable) -> Callable:
        ep_name = name or func.__name__
        last_result: list[EntrypointResult] = [None]  # type: ignore[list-item]

        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            global _current_tasks
            _current_tasks = []

            start = time.perf_counter()

            try:
                if asyncio.iscoroutinefunction(func):
                    result = await func(*args, **kwargs)
                else:
                    result = func(*args, **kwargs)

                duration = (time.perf_counter() - start) * 1000
                last_result[0] = EntrypointResult(
                    value=result,
                    tasks=list(_current_tasks),
                    duration_ms=duration,
                )
                return result

            except Exception as e:
                duration = (time.perf_counter() - start) * 1000
                last_result[0] = EntrypointResult(
                    tasks=list(_current_tasks),
                    duration_ms=duration,
                    error=str(e),
                )
                raise

        wrapper.last_result = property(lambda self: last_result[0])  # type: ignore[attr-defined]  # noqa: ARG005
        wrapper._last_result = last_result  # type: ignore[attr-defined]  # noqa: SLF001
        wrapper._is_entrypoint = True  # type: ignore[attr-defined]  # noqa: SLF001
        wrapper._entrypoint_name = ep_name  # type: ignore[attr-defined]  # noqa: SLF001

        def get_result() -> EntrypointResult | None:
            return last_result[0]

        wrapper.get_result = get_result  # type: ignore[attr-defined]
        return wrapper

    if fn is not None:
        return decorator(fn)
    return decorator

A2A protocol

A2AServer

A2AServer(agent: Any, name: str = 'Locus Agent', description: str = '', skills: list[AgentSkill] | list[str] | None = None, url: str = '', provider: AgentProvider | None = None, version: str = '0.1.0', api_key: str | None = None, allow_unauthenticated: bool = False)

Expose a Locus Agent as a spec-compliant A2A endpoint.

Parameters:

Name Type Description Default
agent Any

A Locus Agent (or anything with run(prompt) -> AsyncIterator).

required
name str

Display name for the Agent Card.

'Locus Agent'
description str

One-line description for the Agent Card.

''
skills list[AgentSkill] | list[str] | None

List of :class:AgentSkill (preferred) or plain strings (legacy — auto-promoted to skills with id == name).

None
url str

Public URL the agent is reachable at — set this for cross-process / cross-host deployments so the card's url field is correct. Defaults to a placeholder.

''
provider AgentProvider | None

Optional :class:AgentProvider (e.g. Oracle).

None
version str

Agent semver — useful for capability negotiation.

'0.1.0'
api_key str | None

Bearer token required on every route; if None, falls back to LOCUS_A2A_API_KEY.

None
allow_unauthenticated bool

Bind to non-loopback without a key. Use only behind an upstream proxy that terminates auth.

False

Example::

from locus import Agent
from locus.a2a import A2AServer
from locus.a2a.spec import AgentSkill

server = A2AServer(
    agent=my_agent,
    name="Research Agent",
    description="Open-web research with citations.",
    skills=[
        AgentSkill(
            id="research",
            name="Research",
            description="Answer with cited sources.",
            tags=["search", "summarise"],
        ),
    ],
    url="https://research.example.com",
    api_key="secret",
)
server.run(port=8001)
Source code in src/locus/a2a/protocol.py
def __init__(
    self,
    agent: Any,
    name: str = "Locus Agent",
    description: str = "",
    skills: list[AgentSkill] | list[str] | None = None,
    url: str = "",
    provider: AgentProvider | None = None,
    version: str = "0.1.0",
    api_key: str | None = None,
    allow_unauthenticated: bool = False,
) -> None:
    self._agent = agent
    self._name = name
    self._description = description or f"A2A-compatible {name}"
    self._skills = self._normalise_skills(skills)
    self._url = url
    self._provider = provider
    self._version = version
    self._api_key = api_key or os.environ.get("LOCUS_A2A_API_KEY") or None
    self._allow_unauthenticated = allow_unauthenticated
    self._app: Any = None
    self._store = _TaskStore()

run

run(host: str = '127.0.0.1', port: int = 8001, **kwargs: Any) -> None

Run the A2A server.

Defaults to loopback binding. Non-loopback bindings require either api_key to be set or allow_unauthenticated=True.

Source code in src/locus/a2a/protocol.py
def run(self, host: str = "127.0.0.1", port: int = 8001, **kwargs: Any) -> None:
    """Run the A2A server.

    Defaults to loopback binding. Non-loopback bindings require
    either ``api_key`` to be set or ``allow_unauthenticated=True``.
    """
    if self._api_key is None and not self._allow_unauthenticated and not _is_loopback(host):
        msg = (
            f"Refusing to bind A2AServer to {host!r} without an API "
            "key. Set LOCUS_A2A_API_KEY, pass api_key=... to "
            "A2AServer, or pass allow_unauthenticated=True if an "
            "upstream proxy terminates auth."
        )
        raise RuntimeError(msg)

    try:
        import uvicorn
    except ImportError as e:
        msg = "uvicorn required. Install with: pip install uvicorn"
        raise ImportError(msg) from e
    uvicorn.run(self.app, host=host, port=port, **kwargs)

A2AClient

A2AClient(url: str, api_key: str | None = None)

Call a remote A2A agent from Locus.

Spec-compliant methods:

  • :meth:get_agent_card — fetches /.well-known/agent-card.json, falling back to the legacy /agent-card endpoint.
  • :meth:send_message — JSON-RPC message/send; returns a :class:Task you can poll with :meth:get_task.
  • :meth:send_message_streaming — JSON-RPC message/stream; yields events from the SSE stream.
  • :meth:get_task, :meth:cancel_task — task lifecycle.

Plus the legacy convenience APIs preserved from the pre-spec implementation:

  • :meth:invoke — flat string-in / string-out over /a2a/invoke.
  • :meth:as_tool — wrap a remote agent as a Locus @tool.
Source code in src/locus/a2a/protocol.py
def __init__(self, url: str, api_key: str | None = None) -> None:
    self._url = url.rstrip("/")
    self._api_key = api_key

get_agent_card async

get_agent_card() -> AgentCard

Fetch the remote agent's capability card.

Tries the spec well-known URL first, falls back to the legacy /agent-card endpoint for older peers.

Source code in src/locus/a2a/protocol.py
async def get_agent_card(self) -> AgentCard:
    """Fetch the remote agent's capability card.

    Tries the spec well-known URL first, falls back to the legacy
    ``/agent-card`` endpoint for older peers.
    """
    import httpx

    async with httpx.AsyncClient() as client:
        for path in ("/.well-known/agent-card.json", "/agent-card"):
            try:
                resp = await client.get(f"{self._url}{path}", headers=self._auth_headers())
                if resp.status_code == 200:
                    data = resp.json()
                    # Legacy peers serve flat string skills — promote.
                    if data.get("skills") and isinstance(data["skills"][0], str):
                        data["skills"] = [
                            {"id": s, "name": s, "description": s} for s in data["skills"]
                        ]
                    # Legacy peers also omit url/capabilities.
                    data.setdefault("url", self._url)
                    return AgentCard.model_validate(data)
            except httpx.HTTPError:
                continue
    msg = f"Could not fetch Agent Card from {self._url}"
    raise RuntimeError(msg)

send_message async

send_message(message: Message) -> Task

Send a message via JSON-RPC message/send and return the Task.

Source code in src/locus/a2a/protocol.py
async def send_message(self, message: Message) -> Task:
    """Send a message via JSON-RPC ``message/send`` and return the Task."""
    result = await self._rpc("message/send", {"message": message.model_dump(exclude_none=True)})
    return Task.model_validate(result)

send_message_streaming async

send_message_streaming(message: Message) -> AsyncIterator[dict[str, Any]]

Send a message via JSON-RPC message/stream and yield events.

Source code in src/locus/a2a/protocol.py
async def send_message_streaming(self, message: Message) -> AsyncIterator[dict[str, Any]]:
    """Send a message via JSON-RPC ``message/stream`` and yield events."""
    import httpx

    body = {
        "jsonrpc": "2.0",
        "id": uuid.uuid4().hex,
        "method": "message/stream",
        "params": {"message": message.model_dump(exclude_none=True)},
    }
    headers = self._auth_headers() | {"Accept": "text/event-stream"}
    async with (
        httpx.AsyncClient(timeout=120.0) as client,
        client.stream("POST", f"{self._url}/", json=body, headers=headers) as resp,
    ):
        resp.raise_for_status()
        async for raw in resp.aiter_lines():
            if not raw or not raw.startswith("data: "):
                continue
            payload = raw[len("data: ") :]
            if payload.strip() == "[DONE]":
                break
            try:
                env = json.loads(payload)
            except json.JSONDecodeError:
                continue
            if "error" in env:
                yield env  # surface error envelope to caller
                break
            yield env.get("result", env)

get_task async

get_task(task_id: str, history_length: int | None = None) -> Task

Fetch a task by id (JSON-RPC tasks/get).

Source code in src/locus/a2a/protocol.py
async def get_task(self, task_id: str, history_length: int | None = None) -> Task:
    """Fetch a task by id (JSON-RPC ``tasks/get``)."""
    params: dict[str, Any] = {"id": task_id}
    if history_length is not None:
        params["historyLength"] = history_length
    result = await self._rpc("tasks/get", params)
    return Task.model_validate(result)

cancel_task async

cancel_task(task_id: str) -> Task

Cancel a task (JSON-RPC tasks/cancel).

Source code in src/locus/a2a/protocol.py
async def cancel_task(self, task_id: str) -> Task:
    """Cancel a task (JSON-RPC ``tasks/cancel``)."""
    result = await self._rpc("tasks/cancel", {"id": task_id})
    return Task.model_validate(result)

invoke async

invoke(prompt: str) -> str

Send a flat text prompt over the legacy /a2a/invoke.

Useful when you control both ends of the wire and want a one-line round-trip; spec-compliant peers should prefer :meth:send_message so they can read the full :class:Task.

Source code in src/locus/a2a/protocol.py
async def invoke(self, prompt: str) -> str:
    """Send a flat text prompt over the legacy ``/a2a/invoke``.

    Useful when you control both ends of the wire and want a one-line
    round-trip; spec-compliant peers should prefer
    :meth:`send_message` so they can read the full :class:`Task`.
    """
    import httpx

    request = A2ARequest(messages=[A2AMessage(role="user", content=prompt)])
    async with httpx.AsyncClient(timeout=120.0) as client:
        resp = await client.post(
            f"{self._url}/a2a/invoke",
            json=request.model_dump(),
            headers=self._auth_headers(),
        )
        resp.raise_for_status()
        response = A2AResponse.model_validate(resp.json())
    agent_msgs = [m for m in response.messages if m.role == "agent"]
    return agent_msgs[-1].content if agent_msgs else ""

as_tool

as_tool(name: str | None = None, description: str | None = None) -> Any

Wrap this remote agent as a Locus @tool.

Source code in src/locus/a2a/protocol.py
def as_tool(self, name: str | None = None, description: str | None = None) -> Any:
    """Wrap this remote agent as a Locus ``@tool``."""
    from locus.tools.decorator import tool as tool_decorator

    client = self
    tool_name = name or "remote_agent"
    tool_desc = description or "Call a remote A2A agent"

    @tool_decorator(name=tool_name, description=tool_desc)
    def call_remote(prompt: str) -> str:
        """Send a request to a remote agent."""
        import asyncio

        return asyncio.run(client.invoke(prompt))

    return call_remote