Skip to content

Models

Two categories of provider: native (direct API connections — OpenAI, Anthropic, Ollama) and hosted (OCI Generative AI, which fronts OpenAI / Meta / xAI / Mistral / Gemini / Cohere via two transports).

For Oracle deployments the default path is OCIChatCompletionsModel — the OpenAI-compatible /openai/v1 transport with real SSE streaming, day-0 model support, and the OCI auth chain (instance principal, resource principal, session token, API key).

Registry

String factory — auto-routes "oci:openai.gpt-5", "openai:gpt-4o", "anthropic:claude-3-5-sonnet", etc. to the right transport.

get_model

get_model(model_string: str, **kwargs: Any) -> ModelProtocol

Get a model from a string identifier.

Format: "provider:model_name"

Examples:

  • "openai:gpt-4o"
  • "oci:cohere.command-r-plus"

Parameters:

Name Type Description Default
model_string str

Model identifier in "provider:model" format

required
**kwargs Any

Provider-specific configuration

{}

Returns:

Type Description
ModelProtocol

Model instance

Raises:

Type Description
ValueError

If provider is unknown or model string is invalid

Source code in src/locus/models/registry.py
def get_model(model_string: str, **kwargs: Any) -> ModelProtocol:
    """
    Get a model from a string identifier.

    Format: "provider:model_name"

    Examples:
        - "openai:gpt-4o"
        - "oci:cohere.command-r-plus"

    Args:
        model_string: Model identifier in "provider:model" format
        **kwargs: Provider-specific configuration

    Returns:
        Model instance

    Raises:
        ValueError: If provider is unknown or model string is invalid
    """
    if ":" not in model_string:
        raise ValueError(
            f"Model string must be 'provider:model', got: {model_string}. "
            f"Available providers: {list(_PROVIDERS.keys())}"
        )

    provider, model_id = model_string.split(":", 1)

    if provider not in _PROVIDERS:
        raise ValueError(f"Unknown provider: {provider}. Available: {list(_PROVIDERS.keys())}")

    return _PROVIDERS[provider](model_id, **kwargs)

list_providers

list_providers() -> list[str]

List available provider prefixes.

Source code in src/locus/models/registry.py
def list_providers() -> list[str]:
    """List available provider prefixes."""
    return list(_PROVIDERS.keys())

register_provider

register_provider(prefix: str, factory: Callable[..., ModelProtocol]) -> None

Register a model provider.

Parameters:

Name Type Description Default
prefix str

Provider prefix (e.g., "openai", "oci")

required
factory Callable[..., ModelProtocol]

Factory function that takes model name and kwargs

required
Source code in src/locus/models/registry.py
def register_provider(prefix: str, factory: Callable[..., ModelProtocol]) -> None:
    """
    Register a model provider.

    Args:
        prefix: Provider prefix (e.g., "openai", "oci")
        factory: Factory function that takes model name and kwargs
    """
    _PROVIDERS[prefix] = factory

Base contract

Every model provider implements ModelProtocol. RequestBuilder and ResponseParser are the per-provider seams for translating between Locus's ModelConfig / Message types and the provider's wire format.

ModelProtocol

Bases: Protocol

Protocol defining the model interface.

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Source code in src/locus/models/base.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """Complete a chat request."""
    ...

stream

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response.

Source code in src/locus/models/base.py
def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """Stream a chat response."""
    ...

ModelConfig

Bases: BaseModel

Base configuration for models.

ModelResponse

Bases: BaseModel

Response from a model completion.

content property

content: str | None

Get response content.

tool_calls property

tool_calls: list[Any]

Get tool calls.

prompt_tokens property

prompt_tokens: int

Get prompt token count.

completion_tokens property

completion_tokens: int

Get completion token count.

total_tokens property

total_tokens: int

Get total token count.

RequestBuilder

Bases: Protocol

Protocol for building provider-specific requests.

build

build(messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None, **kwargs: Any) -> Any

Build a provider-specific request.

Source code in src/locus/models/base.py
def build(
    self,
    messages: list[dict[str, Any]],
    tools: list[dict[str, Any]] | None,
    **kwargs: Any,
) -> Any:
    """Build a provider-specific request."""
    ...

ResponseParser

Bases: Protocol

Protocol for parsing provider-specific responses.

parse

parse(response: Any) -> ModelResponse

Parse a provider-specific response.

Source code in src/locus/models/base.py
def parse(self, response: Any) -> ModelResponse:
    """Parse a provider-specific response."""
    ...

OCI Generative AI

OpenAI-compatible transport (V1)

OCIChatCompletionsModel targets the /openai/v1 transport — real SSE streaming, day-0 support for new OpenAI / Meta / xAI / Mistral / Gemini releases. Use this for everything except Cohere R-series.

OCIChatCompletionsModel

OCIChatCompletionsModel(model: str, *, profile: str | None = None, auth_type: str | None = None, compartment_id: str | None = None, region: str = DEFAULT_OCI_GENAI_REGION, config_file: str = '~/.oci/config', base_url: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, **kwargs: Any)

Bases: OpenAIModel

OCI GenAI model accessed through the /openai/v1 endpoint.

Reuses :class:OpenAIModel for message conversion, tool handling, response parsing, and streaming. The only thing this class adds is the OCI-specific auth wiring.

Pass exactly one of profile, auth_type.

Initialize the OCI OpenAI-compat model.

Parameters:

Name Type Description Default
model str

OCI model identifier (e.g. openai.gpt-5, meta.llama-3.3-70b-instruct).

required
profile str | None

OCI config profile name from config_file. Mutually exclusive with auth_type.

None
auth_type str | None

"instance_principal" or "resource_principal". Mutually exclusive with profile. Requires compartment_id.

None
compartment_id str | None

OCI compartment OCID, sent as opc-compartment-id. Auto-derived from the profile's tenancy under profile=. Must be supplied explicitly under auth_type=.

None
region str

OCI region hosting the inference endpoint.

DEFAULT_OCI_GENAI_REGION
config_file str

Path to the OCI config file (used with profile).

'~/.oci/config'
base_url str | None

Override the derived endpoint URL (e.g. for a custom realm). Defaults to the OpenAI-compat URL for region.

None
max_tokens int

Default max tokens. For o1/o3/gpt-5* this is automatically forwarded as max_completion_tokens — see :class:OpenAIModel.

4096
temperature float

Default sampling temperature.

0.7
**kwargs Any

Forwarded to :class:OCIChatCompletionsConfig (top_p, seed, frequency_penalty, presence_penalty, ...).

{}

Raises:

Type Description
ValueError

If zero or both auth modes are set, if auth_type is invalid, or if auth_type is set without compartment_id.

Source code in src/locus/models/providers/oci/openai_compat.py
def __init__(
    self,
    model: str,
    *,
    profile: str | None = None,
    auth_type: str | None = None,
    compartment_id: str | None = None,
    region: str = DEFAULT_OCI_GENAI_REGION,
    config_file: str = "~/.oci/config",
    base_url: str | None = None,
    max_tokens: int = 4096,
    temperature: float = 0.7,
    **kwargs: Any,
) -> None:
    """Initialize the OCI OpenAI-compat model.

    Args:
        model: OCI model identifier (e.g. ``openai.gpt-5``,
            ``meta.llama-3.3-70b-instruct``).
        profile: OCI config profile name from ``config_file``. Mutually
            exclusive with ``auth_type``.
        auth_type: ``"instance_principal"`` or ``"resource_principal"``.
            Mutually exclusive with ``profile``. Requires
            ``compartment_id``.
        compartment_id: OCI compartment OCID, sent as
            ``opc-compartment-id``. Auto-derived from the profile's
            tenancy under ``profile=``. Must be supplied explicitly
            under ``auth_type=``.
        region: OCI region hosting the inference endpoint.
        config_file: Path to the OCI config file (used with ``profile``).
        base_url: Override the derived endpoint URL (e.g. for a custom
            realm). Defaults to the OpenAI-compat URL for ``region``.
        max_tokens: Default max tokens. For ``o1``/``o3``/``gpt-5*``
            this is automatically forwarded as
            ``max_completion_tokens`` — see :class:`OpenAIModel`.
        temperature: Default sampling temperature.
        **kwargs: Forwarded to :class:`OCIChatCompletionsConfig` (top_p, seed,
            frequency_penalty, presence_penalty, ...).

    Raises:
        ValueError: If zero or both auth modes are set, if ``auth_type``
            is invalid, or if ``auth_type`` is set without
            ``compartment_id``.
    """
    modes_set = sum(x is not None for x in (profile, auth_type))
    if modes_set != 1:
        msg = (
            "OCIChatCompletionsModel: specify exactly one auth mode. "
            "Either profile='<section_from_~/.oci/config>' (api-key path; "
            "use profile='DEFAULT' for the default section), "
            "or auth_type='instance_principal'|'resource_principal'|"
            "'security_token'|'delegation_token' together with compartment_id=. "
            f"Got profile={profile!r}, auth_type={auth_type!r}."
        )
        raise ValueError(msg)
    if auth_type is not None and auth_type not in _VALID_AUTH_TYPES:
        msg = f"auth_type must be one of {_VALID_AUTH_TYPES}, got {auth_type!r}"
        raise ValueError(msg)
    if auth_type is not None and compartment_id is None:
        msg = "compartment_id is required when auth_type= is set"
        raise ValueError(msg)

    # Pop fields we set explicitly to avoid duplicate-kwarg errors
    # when callers splat a config dict that includes the same keys.
    for explicit in (
        "model",
        "profile",
        "auth_type",
        "compartment_id",
        "region",
        "config_file",
        "base_url",
        "max_tokens",
        "temperature",
    ):
        kwargs.pop(explicit, None)

    # Resolve compartment_id with this precedence:
    #   1. explicit ``compartment_id=`` arg
    #   2. ``OCI_COMPARTMENT`` env var (matches OCIModel + the test
    #      conftest convention so a single export drives every test)
    #   3. profile's tenancy (last-resort default; may not have GenAI
    #      policy if the profile lives in a different home tenancy than
    #      the GenAI compartment, which is the MY_PROFILE / cross-tenancy case)
    if compartment_id is None:
        import os

        compartment_id = os.getenv("OCI_COMPARTMENT") or os.getenv("OCI_COMPARTMENT_ID")
    if compartment_id is None and profile is not None:
        try:
            profile_cfg = _load_profile_config(profile, config_file)
            compartment_id = profile_cfg.get("tenancy")
        except Exception:  # noqa: BLE001 — profile load may fail; keep None
            compartment_id = None

    config = OCIChatCompletionsConfig(
        model=model,
        api_key=None,
        base_url=base_url or build_oci_openai_base_url(region),
        region=region,
        profile=profile,
        auth_type=auth_type,
        compartment_id=compartment_id,
        config_file=config_file,
        max_tokens=max_tokens,
        temperature=temperature,
        **kwargs,
    )
    # Skip OpenAIModel.__init__ — it would rebuild the config without
    # OCI fields. Go straight to the Pydantic BaseModel init.
    super(OpenAIModel, self).__init__(config=config)

client property

client: AsyncOpenAI

Build the AsyncOpenAI client wired with the OCI request signer.

supports_structured_output property

supports_structured_output: bool

Native response_format={"type":"json_schema",...} support.

OpenAI's chat-completions API accepts a JSON-schema response_format and guarantees a parseable instance. The agent loop uses this property to skip the prompted-JSON fallback when the provider ships native structured output.

close async

close() -> None

Close the OpenAI client and release resources.

Source code in src/locus/models/native/openai.py
async def close(self) -> None:
    """Close the OpenAI client and release resources."""
    if self._client is not None:
        await self._client.close()
        self._client = None

__aenter__ async

__aenter__() -> OpenAIModel

Async context manager entry.

Source code in src/locus/models/native/openai.py
async def __aenter__(self) -> OpenAIModel:
    """Async context manager entry."""
    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit - close client.

Source code in src/locus/models/native/openai.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit - close client."""
    await self.close()

ainvoke async

ainvoke(messages: list[Any], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> Any

LangChain-compatible alias — returns Message (AIMessage equivalent).

Source code in src/locus/models/native/openai.py
async def ainvoke(
    self,
    messages: list[Any],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> Any:
    """LangChain-compatible alias — returns Message (AIMessage equivalent)."""
    response = await self.complete(messages, tools=tools, **kwargs)
    return response.message if hasattr(response, "message") else response

bind_tools

bind_tools(tools: list[Any], **kwargs: Any) -> OpenAIModel

LangChain-compatible bind_tools.

Source code in src/locus/models/native/openai.py
def bind_tools(self, tools: list[Any], **kwargs: Any) -> OpenAIModel:
    """LangChain-compatible bind_tools."""
    bound = self.model_copy()
    object.__setattr__(
        bound,
        "_bound_tools",
        [t.to_openai_schema() if hasattr(t, "to_openai_schema") else t for t in (tools or [])],
    )
    return bound

OCIChatCompletionsConfig

Bases: OpenAIConfig

Configuration for :class:OCIChatCompletionsModel.

Inherits all OpenAI knobs from :class:OpenAIConfig. Adds region, profile, auth_type, compartment_id, and config_file for OCI auth selection.

Cohere R-series transport

OCIModel targets the OCI SDK's /20231130/actions/v1 endpoint — required for cohere.command-r-* (Cohere's Chat API shape isn't served on the /openai/v1 transport).

OCIModel

OCIModel(model_id: str = 'cohere.command-r-plus', compartment_id: str | None = None, profile_name: str = 'DEFAULT', auth_type: str | OCIAuthType | None = None, config_file: str = '~/.oci/config', service_endpoint: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, **kwargs: Any)

Bases: BaseModel

OCI GenAI model provider.

Automatically selects the appropriate provider based on model_id: - cohere.command-r- → CohereProvider - cohere.command-a- → GenericProvider (A series uses generic format) - meta., openai., google., xai., mistral.* → GenericProvider

Example

model = OCIModel( ... model_id="openai.gpt-5.1-chat-latest", ... profile_name="DEFAULT", ... auth_type="api_key", ... ) response = await model.complete([Message.user("Hello!")])

Initialize OCI GenAI model.

Parameters:

Name Type Description Default
model_id str

OCI model identifier (e.g., "openai.gpt-oss-20b", "cohere.command-r-plus")

'cohere.command-r-plus'
compartment_id str | None

OCI compartment OCID (defaults to OCI_COMPARTMENT / OCI_COMPARTMENT_ID env var, then to the tenancy from profile)

None
profile_name str

OCI config profile name from ~/.oci/config

'DEFAULT'
auth_type str | OCIAuthType | None

Authentication type (api_key, security_token, instance_principal). When None (default), reads OCI_AUTH_TYPE from env, falling back to api_key.

None
config_file str

Path to OCI config file

'~/.oci/config'
service_endpoint str | None

Full OCI GenAI service endpoint URL

None
max_tokens int

Maximum tokens for response

4096
temperature float

Model temperature (0.0-1.0)

0.7
**kwargs Any

Additional model parameters

{}
Source code in src/locus/models/providers/oci/__init__.py
def __init__(
    self,
    model_id: str = "cohere.command-r-plus",
    compartment_id: str | None = None,
    profile_name: str = "DEFAULT",
    auth_type: str | OCIAuthType | None = None,
    config_file: str = "~/.oci/config",
    service_endpoint: str | None = None,
    max_tokens: int = 4096,
    temperature: float = 0.7,
    **kwargs: Any,
) -> None:
    """Initialize OCI GenAI model.

    Args:
        model_id: OCI model identifier (e.g., "openai.gpt-oss-20b", "cohere.command-r-plus")
        compartment_id: OCI compartment OCID (defaults to ``OCI_COMPARTMENT`` /
            ``OCI_COMPARTMENT_ID`` env var, then to the tenancy from profile)
        profile_name: OCI config profile name from ~/.oci/config
        auth_type: Authentication type (api_key, security_token,
            instance_principal). When ``None`` (default), reads
            ``OCI_AUTH_TYPE`` from env, falling back to ``api_key``.
        config_file: Path to OCI config file
        service_endpoint: Full OCI GenAI service endpoint URL
        max_tokens: Maximum tokens for response
        temperature: Model temperature (0.0-1.0)
        **kwargs: Additional model parameters
    """
    if auth_type is None:
        import os

        auth_type = os.getenv("OCI_AUTH_TYPE", "api_key")
    if isinstance(auth_type, str):
        auth_type = OCIAuthType(auth_type)

    # Resolve compartment_id with this precedence:
    #   1. explicit ``compartment_id=`` arg
    #   2. ``OCI_COMPARTMENT`` / ``OCI_COMPARTMENT_ID`` env var
    #   3. tenancy from the profile (handled inside OCIClient)
    if compartment_id is None:
        import os

        compartment_id = os.getenv("OCI_COMPARTMENT") or os.getenv("OCI_COMPARTMENT_ID")

    config = OCIConfig(
        model_id=model_id,
        compartment_id=compartment_id,
        profile_name=profile_name,
        auth_type=auth_type,
        config_file=config_file,
        service_endpoint=service_endpoint,
        max_tokens=max_tokens,
        temperature=temperature,
        **kwargs,
    )
    super().__init__(config=config)

supports_structured_output property

supports_structured_output: bool

OCI's native SDK transport (Cohere R-series) doesn't expose OpenAI-style response_format. Use the V1 transport (OCIChatCompletionsModel) for that.

client property

client: OCIClient

Get or create the OCI client.

provider property

provider: OCIModelProvider

Get the appropriate provider for this model.

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Parameters:

Name Type Description Default
messages list[Message]

Conversation history

required
tools list[dict[str, Any]] | None

Tool schemas in OpenAI format (overrides bound tools if set)

None
**kwargs Any

Additional OCI-specific options

{}

Returns:

Type Description
ModelResponse

Model response with message and metadata

Source code in src/locus/models/providers/oci/__init__.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """Complete a chat request.

    Args:
        messages: Conversation history
        tools: Tool schemas in OpenAI format (overrides bound tools if set)
        **kwargs: Additional OCI-specific options

    Returns:
        Model response with message and metadata
    """
    from oci.generative_ai_inference import models

    # Use explicitly-passed tools; fall back to tools bound via bind_tools()
    effective_tools = tools if tools is not None else getattr(self, "_bound_tools", None)

    # Convert messages and tools using the provider
    # Pass model_id for model-specific handling (e.g., Gemini parallel tool calls)
    converted_messages = self.provider.convert_messages(messages, self.config.model_id)
    converted_tools = self.provider.convert_tools(effective_tools)

    # Build request kwargs - remove duplicates
    request_kwargs = {
        "max_tokens": kwargs.pop("max_tokens", self.config.max_tokens),
        "temperature": kwargs.pop("temperature", self.config.temperature),
        **kwargs,
    }

    # Build the request. Pass model_id so the provider can pick the right
    # token-limit field (OpenAI wants max_completion_tokens, Meta wants
    # max_tokens, others accept either).
    request_kwargs["model_id"] = self.config.model_id
    if isinstance(converted_messages, dict):
        # Cohere returns a dict with special keys
        request_kwargs = {**converted_messages, **request_kwargs}
        chat_request = self.provider.build_request([], converted_tools, **request_kwargs)
    else:
        chat_request = self.provider.build_request(
            converted_messages,
            converted_tools,
            **request_kwargs,
        )

    # Create chat details
    chat_details = models.ChatDetails(
        compartment_id=self.client.compartment_id,
        serving_mode=self.client.get_serving_mode(self.config.model_id),
        chat_request=chat_request,
    )

    # Execute request with retry for empty responses.
    # OCI GenAI sometimes returns empty content, especially under
    # concurrent load. Retry up to 3 times with backoff.
    loop = asyncio.get_running_loop()
    max_retries = 3

    for attempt in range(max_retries):
        response = await loop.run_in_executor(
            None,
            lambda: self.client.chat(chat_details),
        )

        # Parse response
        content, tool_calls, stop_reason = self.provider.parse_response(response)
        usage = self.provider.parse_usage(response)

        # If we got content or tool calls, we're good
        if content or tool_calls:
            break

        # Backoff before retry (0.5s, 1.0s)
        if attempt < max_retries - 1:
            await asyncio.sleep(0.5 * (attempt + 1))

    return ModelResponse(
        message=Message.assistant(content=content, tool_calls=tool_calls),
        usage=usage,
        stop_reason=stop_reason,
    )

ainvoke async

ainvoke(messages: list[Any], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> Any

LangChain-compatible alias — returns Message (AIMessage equivalent).

Source code in src/locus/models/providers/oci/__init__.py
async def ainvoke(
    self,
    messages: list[Any],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> Any:
    """LangChain-compatible alias — returns Message (AIMessage equivalent)."""
    response = await self.complete(messages, tools=tools, **kwargs)
    return response.message if hasattr(response, "message") else response

bind_tools

bind_tools(tools: list[Any], **kwargs: Any) -> OCIModel

LangChain-compatible bind_tools — stores tool schemas and passes them on complete().

Source code in src/locus/models/providers/oci/__init__.py
def bind_tools(self, tools: list[Any], **kwargs: Any) -> OCIModel:
    """LangChain-compatible bind_tools — stores tool schemas and passes them on complete()."""
    bound = self.model_copy()
    object.__setattr__(
        bound,
        "_bound_tools",
        [t.to_openai_schema() if hasattr(t, "to_openai_schema") else t for t in (tools or [])],
    )
    return bound

stream async

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response via the OCI GenAI SDK.

Sets is_stream=True on the chat request so the SDK returns an SSE event stream. Each data: event carries a JSON chunk with message.content deltas and (on the last event) finishReason. Works for both OnDemandServingMode (model id) and DedicatedServingMode (DAC endpoint OCID).

On any exception the stream falls back to the non-streaming complete() path and yields a single chunk with the full content — robust to providers that reject is_stream.

Source code in src/locus/models/providers/oci/__init__.py
async def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """Stream a chat response via the OCI GenAI SDK.

    Sets ``is_stream=True`` on the chat request so the SDK returns
    an SSE event stream. Each ``data:`` event carries a JSON chunk
    with ``message.content`` deltas and (on the last event)
    ``finishReason``. Works for both ``OnDemandServingMode``
    (model id) and ``DedicatedServingMode`` (DAC endpoint OCID).

    On any exception the stream falls back to the non-streaming
    ``complete()`` path and yields a single chunk with the full
    content — robust to providers that reject ``is_stream``.
    """
    import json as _json

    from oci.generative_ai_inference import models

    # Build the same request shape as ``complete()`` but with
    # ``is_stream=True`` so the SDK returns a streaming response.
    converted_messages = self.provider.convert_messages(messages, model_id=self.config.model_id)
    converted_tools = self.provider.convert_tools(tools)
    request_kwargs = {
        "max_tokens": self.config.max_tokens,
        "temperature": self.config.temperature,
        "top_p": self.config.top_p,
        "model_id": self.config.model_id,
        "is_stream": True,
    }

    chat_request = self.provider.build_request(
        converted_messages,
        converted_tools,
        **request_kwargs,
    )
    # Some provider request builders (Cohere) take messages
    # under a different field — they may have ignored is_stream
    # in build_request. Set it on the resulting object as a
    # belt-and-braces step.
    if hasattr(chat_request, "is_stream"):
        chat_request.is_stream = True

    chat_details = models.ChatDetails(
        compartment_id=self.client.compartment_id,
        serving_mode=self.client.get_serving_mode(self.config.model_id),
        chat_request=chat_request,
    )

    loop = asyncio.get_running_loop()
    try:
        response = await loop.run_in_executor(None, lambda: self.client.chat(chat_details))
    except Exception:  # noqa: BLE001 — fall back on any provider error
        # Some DAC endpoints / model versions reject is_stream.
        # Hand the user a working stream by chunking the
        # non-streaming response.
        non_stream = await self.complete(messages, tools, **kwargs)
        if non_stream.content:
            yield ModelChunkEvent(content=non_stream.content)
        if non_stream.tool_calls:
            yield ModelChunkEvent(tool_calls=non_stream.tool_calls)
        yield ModelChunkEvent(done=True)
        return

    # ``response.data`` is the raw streaming body. Iterate the SSE
    # event stream synchronously in a worker thread so the asyncio
    # loop stays responsive — each event is a small JSON delta.
    events_iter = response.data.events()
    sentinel = object()

    def _next_event() -> Any:
        return next(events_iter, sentinel)

    while True:
        event = await loop.run_in_executor(None, _next_event)
        if event is sentinel:
            break
        data = getattr(event, "data", None)
        if not data:
            continue
        try:
            chunk = _json.loads(data)
        except (ValueError, TypeError):
            # Skip malformed deltas — keep the stream alive.
            continue
        content_delta, tool_calls_delta, _is_done = self.provider.parse_stream_chunk(chunk)
        if content_delta:
            yield ModelChunkEvent(content=content_delta)
        if tool_calls_delta:
            yield ModelChunkEvent(tool_calls=tool_calls_delta)

    yield ModelChunkEvent(done=True)

OCIConfig

Bases: ModelConfig

Configuration for OCI GenAI models.

OCIAuthType

Bases: StrEnum

OCI authentication types.

OpenAI

OpenAIModel

OpenAIModel(model: str = 'gpt-4o', api_key: str | None = None, base_url: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, **kwargs: Any)

Bases: BaseModel

OpenAI model provider.

Supports GPT-4o, GPT-4, o1, o3 models with streaming and tool calling.

Example

model = OpenAIModel(model="gpt-4o") response = await model.complete([Message.user("Hello!")])

Initialize OpenAI model.

Source code in src/locus/models/native/openai.py
def __init__(
    self,
    model: str = "gpt-4o",
    api_key: str | None = None,
    base_url: str | None = None,
    max_tokens: int = 4096,
    temperature: float = 0.7,
    **kwargs: Any,
) -> None:
    """Initialize OpenAI model."""
    config = OpenAIConfig(
        model=model,
        api_key=api_key,
        base_url=base_url,
        max_tokens=max_tokens,
        temperature=temperature,
        **kwargs,
    )
    super().__init__(config=config)

supports_structured_output property

supports_structured_output: bool

Native response_format={"type":"json_schema",...} support.

OpenAI's chat-completions API accepts a JSON-schema response_format and guarantees a parseable instance. The agent loop uses this property to skip the prompted-JSON fallback when the provider ships native structured output.

client property

client: AsyncOpenAI

Get or create the OpenAI client.

The client is configured with explicit max_retries and timeout from :class:OpenAIConfig so transient errors (429, 5xx, network resets) don't kill the agent loop on first try. The openai SDK retries with exponential backoff between attempts.

close async

close() -> None

Close the OpenAI client and release resources.

Source code in src/locus/models/native/openai.py
async def close(self) -> None:
    """Close the OpenAI client and release resources."""
    if self._client is not None:
        await self._client.close()
        self._client = None

__aenter__ async

__aenter__() -> OpenAIModel

Async context manager entry.

Source code in src/locus/models/native/openai.py
async def __aenter__(self) -> OpenAIModel:
    """Async context manager entry."""
    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit - close client.

Source code in src/locus/models/native/openai.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit - close client."""
    await self.close()

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Parameters:

Name Type Description Default
messages list[Message]

Conversation history

required
tools list[dict[str, Any]] | None

Tool schemas in OpenAI format

None
**kwargs Any

Additional OpenAI-specific options

{}

Returns:

Type Description
ModelResponse

Model response with message and metadata

Source code in src/locus/models/native/openai.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """
    Complete a chat request.

    Args:
        messages: Conversation history
        tools: Tool schemas in OpenAI format
        **kwargs: Additional OpenAI-specific options

    Returns:
        Model response with message and metadata
    """
    openai_messages = self._convert_messages(messages)
    openai_tools = self._convert_tools(tools)

    uses_completion_tokens = self._uses_max_completion_tokens(self.config.model)
    rejects_sampling = self._rejects_sampling_params(self.config.model)

    max_tokens_value = kwargs.get("max_tokens", self.config.max_tokens)

    request_kwargs: dict[str, Any] = {
        "model": self.config.model,
        "messages": openai_messages,
    }

    # Use appropriate token parameter based on model
    if uses_completion_tokens:
        request_kwargs["max_completion_tokens"] = max_tokens_value
    else:
        request_kwargs["max_tokens"] = max_tokens_value
        if not rejects_sampling:
            request_kwargs["temperature"] = kwargs.get("temperature", self.config.temperature)
            request_kwargs["top_p"] = kwargs.get("top_p", self.config.top_p)
            # Only send penalties when the user customized them. Some
            # providers (Grok) reject the parameter outright, even at
            # zero — server defaults are 0.0 anyway, so omitting the
            # default value is functionally identical for those that
            # accept it.
            freq = kwargs.get("frequency_penalty", self.config.frequency_penalty)
            if freq != 0.0:
                request_kwargs["frequency_penalty"] = freq
            pres = kwargs.get("presence_penalty", self.config.presence_penalty)
            if pres != 0.0:
                request_kwargs["presence_penalty"] = pres

    if openai_tools:
        request_kwargs["tools"] = openai_tools

    if self.config.seed is not None:
        request_kwargs["seed"] = self.config.seed

    if self.config.stop_sequences and not uses_completion_tokens:
        request_kwargs["stop"] = self.config.stop_sequences

    # Forward ``response_format`` for structured output. Caller is expected
    # to pass a fully-formed dict (see locus.core.structured.build_response_format).
    response_format = kwargs.get("response_format")
    if response_format is not None:
        request_kwargs["response_format"] = response_format

    response = await self.client.chat.completions.create(**request_kwargs)
    return self._parse_response(response)

ainvoke async

ainvoke(messages: list[Any], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> Any

LangChain-compatible alias — returns Message (AIMessage equivalent).

Source code in src/locus/models/native/openai.py
async def ainvoke(
    self,
    messages: list[Any],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> Any:
    """LangChain-compatible alias — returns Message (AIMessage equivalent)."""
    response = await self.complete(messages, tools=tools, **kwargs)
    return response.message if hasattr(response, "message") else response

bind_tools

bind_tools(tools: list[Any], **kwargs: Any) -> OpenAIModel

LangChain-compatible bind_tools.

Source code in src/locus/models/native/openai.py
def bind_tools(self, tools: list[Any], **kwargs: Any) -> OpenAIModel:
    """LangChain-compatible bind_tools."""
    bound = self.model_copy()
    object.__setattr__(
        bound,
        "_bound_tools",
        [t.to_openai_schema() if hasattr(t, "to_openai_schema") else t for t in (tools or [])],
    )
    return bound

stream async

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response.

Parameters:

Name Type Description Default
messages list[Message]

Conversation history

required
tools list[dict[str, Any]] | None

Tool schemas in OpenAI format

None
**kwargs Any

Additional OpenAI-specific options

{}

Yields:

Type Description
AsyncIterator[ModelChunkEvent]

Streaming chunks with content and/or tool calls

Source code in src/locus/models/native/openai.py
async def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """
    Stream a chat response.

    Args:
        messages: Conversation history
        tools: Tool schemas in OpenAI format
        **kwargs: Additional OpenAI-specific options

    Yields:
        Streaming chunks with content and/or tool calls
    """
    openai_messages = self._convert_messages(messages)
    openai_tools = self._convert_tools(tools)

    uses_completion_tokens = self._uses_max_completion_tokens(self.config.model)
    rejects_sampling = self._rejects_sampling_params(self.config.model)

    max_tokens_value = kwargs.get("max_tokens", self.config.max_tokens)

    request_kwargs: dict[str, Any] = {
        "model": self.config.model,
        "messages": openai_messages,
        "stream": True,
    }

    # Use appropriate token parameter based on model
    if uses_completion_tokens:
        request_kwargs["max_completion_tokens"] = max_tokens_value
    elif rejects_sampling:
        request_kwargs["max_tokens"] = max_tokens_value
    else:
        request_kwargs["max_tokens"] = max_tokens_value
        request_kwargs["temperature"] = kwargs.get("temperature", self.config.temperature)
        request_kwargs["top_p"] = kwargs.get("top_p", self.config.top_p)
        # See note in complete() — same penalty conditional.
        freq = kwargs.get("frequency_penalty", self.config.frequency_penalty)
        if freq != 0.0:
            request_kwargs["frequency_penalty"] = freq
        pres = kwargs.get("presence_penalty", self.config.presence_penalty)
        if pres != 0.0:
            request_kwargs["presence_penalty"] = pres

    if openai_tools:
        request_kwargs["tools"] = openai_tools

    if self.config.seed is not None:
        request_kwargs["seed"] = self.config.seed

    if self.config.stop_sequences:
        request_kwargs["stop"] = self.config.stop_sequences

    # Forward ``response_format`` for streaming structured output —
    # symmetric with complete(). Caller is expected to pass a fully-
    # formed dict (see locus.core.structured.build_response_format).
    response_format = kwargs.get("response_format")
    if response_format is not None:
        request_kwargs["response_format"] = response_format

    # Track tool calls during streaming
    current_tool_calls: dict[int, dict[str, Any]] = {}

    stream = await self.client.chat.completions.create(**request_kwargs)

    async for chunk in stream:
        if not chunk.choices:
            continue

        choice = chunk.choices[0]
        delta = getattr(choice, "delta", None)

        # Some providers (Gemini) emit chunks where ``delta`` is None
        # — skip past content/tool-call handling but still let the
        # finish_reason check below run.
        if delta is None:
            if choice.finish_reason:
                pass  # fall through to finish-reason block
            else:
                continue

        # Handle content
        if delta is not None and delta.content:
            yield ModelChunkEvent(content=delta.content)

        # Handle tool calls
        if delta is not None and delta.tool_calls:
            for tc_delta in delta.tool_calls:
                idx = tc_delta.index
                if idx not in current_tool_calls:
                    current_tool_calls[idx] = {
                        "id": tc_delta.id or "",
                        "name": "",
                        "arguments": "",
                    }

                if tc_delta.id:
                    current_tool_calls[idx]["id"] = tc_delta.id
                if tc_delta.function:
                    if tc_delta.function.name:
                        current_tool_calls[idx]["name"] = tc_delta.function.name
                    if tc_delta.function.arguments:
                        current_tool_calls[idx]["arguments"] += tc_delta.function.arguments

        # Check for end of stream
        if choice.finish_reason:
            # Emit any accumulated tool calls
            if current_tool_calls:
                tool_calls = []
                for tc_data in current_tool_calls.values():
                    try:
                        arguments = (
                            json.loads(tc_data["arguments"]) if tc_data["arguments"] else {}
                        )
                    except json.JSONDecodeError:
                        arguments = {}
                    tool_calls.append(
                        ToolCall(
                            id=tc_data["id"],
                            name=tc_data["name"],
                            arguments=arguments,
                        )
                    )
                yield ModelChunkEvent(tool_calls=tool_calls)

            yield ModelChunkEvent(done=True)

OpenAIConfig

Bases: ModelConfig

Configuration for OpenAI models.

Anthropic

AnthropicModel

AnthropicModel(model: str = 'claude-sonnet-4-20250514', api_key: str | None = None, base_url: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, prompt_cache: bool = False, **kwargs: Any)

Bases: BaseModel

Anthropic model provider.

Supports Claude 4.6, 4.5, 3.5 models with streaming and tool calling.

Example

model = AnthropicModel(model="claude-sonnet-4-20250514") response = await model.complete([Message.user("Hello!")])

Source code in src/locus/models/native/anthropic.py
def __init__(
    self,
    model: str = "claude-sonnet-4-20250514",
    api_key: str | None = None,
    base_url: str | None = None,
    max_tokens: int = 4096,
    temperature: float = 0.7,
    prompt_cache: bool = False,
    **kwargs: Any,
) -> None:
    config = AnthropicConfig(
        model=model,
        api_key=api_key,
        base_url=base_url,
        max_tokens=max_tokens,
        temperature=temperature,
        prompt_cache=prompt_cache,
        **kwargs,
    )
    super().__init__(config=config)

supports_structured_output property

supports_structured_output: bool

Anthropic doesn't ship OpenAI-style response_format.

The agent loop falls back to the prompted-JSON path with post-hoc parsing for Anthropic models.

client property

client: AsyncAnthropic

Get or create the Anthropic client.

Configured with explicit max_retries + timeout so a transient 529 (overloaded) / 5xx / connection reset doesn't kill the agent loop on the first try. Retries use exponential backoff inside the anthropic SDK.

close async

close() -> None

Close the underlying httpx client.

Agent.run_sync calls this in a finally block so the loop-bound httpx connections are shut down inside the same event loop that opened them. Without this, the next asyncio.run invocation closes the prior loop and the leftover client's __del__ later tries to aclose against it, raising RuntimeError: Event loop is closed.

Source code in src/locus/models/native/anthropic.py
async def close(self) -> None:
    """Close the underlying httpx client.

    ``Agent.run_sync`` calls this in a ``finally`` block so the
    loop-bound httpx connections are shut down inside the same
    event loop that opened them. Without this, the next
    ``asyncio.run`` invocation closes the prior loop and the
    leftover client's ``__del__`` later tries to ``aclose`` against
    it, raising ``RuntimeError: Event loop is closed``.
    """
    if self._client is not None:
        try:
            await self._client.close()
        finally:
            self._client = None

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Recognises an OpenAI-style response_format={"type": "json_schema", ...} kwarg and translates it into Anthropic's tool-use mechanism: a synthetic respond_with_schema tool is appended to the call and tool_choice is pinned to it. The tool arguments are then surfaced as the message content (canonical JSON) so callers can parse them with :func:locus.core.structured.parse_structured exactly as they would with native response_format providers.

Source code in src/locus/models/native/anthropic.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """Complete a chat request.

    Recognises an OpenAI-style ``response_format={"type": "json_schema", ...}``
    kwarg and translates it into Anthropic's tool-use mechanism: a synthetic
    ``respond_with_schema`` tool is appended to the call and ``tool_choice``
    is pinned to it. The tool arguments are then surfaced as the message
    content (canonical JSON) so callers can parse them with
    :func:`locus.core.structured.parse_structured` exactly as they would
    with native ``response_format`` providers.
    """
    import json as _json

    system_prompt, anthropic_messages = self._convert_messages(messages)
    anthropic_tools = self._convert_tools(tools) or []

    params: dict[str, Any] = {
        "model": self.config.model,
        "messages": anthropic_messages,
        "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
    }
    # Claude Opus 4.7 (and presumably later 4.x reasoning models) reject
    # `temperature` with `invalid_request_error: temperature is deprecated
    # for this model`. Silently drop the param for those models — locus's
    # own agent runtime_loop always passes `temperature=config.temperature`
    # in `complete_kwargs`, so honouring "caller intent" would still 400
    # every Agent(model="claude-opus-4-7") on the first turn. The
    # wrapper's job here is to keep the agent loop running; callers who
    # need the parameter back can pin to a model that accepts it.
    if not _rejects_temperature(self.config.model):
        params["temperature"] = kwargs.get("temperature", self.config.temperature)
    if system_prompt:
        # When prompt-caching is enabled, send the system prompt as a
        # block list with ``cache_control: ephemeral`` so subsequent
        # turns reuse the cached input at ~1/10x cost (Anthropic
        # ephemeral cache TTL is ~5 min).
        if self.config.prompt_cache:
            params["system"] = [
                {
                    "type": "text",
                    "text": system_prompt,
                    "cache_control": {"type": "ephemeral"},
                }
            ]
        else:
            params["system"] = system_prompt

    # Structured-output mode: emulate ``response_format`` via tool-use.
    response_format = kwargs.get("response_format")
    structured_mode = (
        isinstance(response_format, dict) and response_format.get("type") == "json_schema"
    )
    if structured_mode:
        assert isinstance(response_format, dict)  # narrowed by structured_mode
        anthropic_tools.append(self._structured_output_tool(response_format))
        params["tool_choice"] = {
            "type": "tool",
            "name": self._STRUCTURED_TOOL_NAME,
        }

    if anthropic_tools:
        # Cache the tool catalog too — it's typically the same across
        # turns and can be large. Anthropic walks the cache_control
        # markers in order; tagging the last tool covers the catalog.
        if self.config.prompt_cache and anthropic_tools:
            anthropic_tools = [
                *anthropic_tools[:-1],
                {
                    **anthropic_tools[-1],
                    "cache_control": {"type": "ephemeral"},
                },
            ]
        params["tools"] = anthropic_tools

    response = await self.client.messages.create(**params)

    # Parse response
    content: str | None = None
    tool_calls: list[ToolCall] = []
    structured_payload: dict[str, Any] | None = None

    for block in response.content:
        if block.type == "text":
            content = (content or "") + block.text
        elif block.type == "tool_use":
            if structured_mode and block.name == self._STRUCTURED_TOOL_NAME:
                structured_payload = block.input if isinstance(block.input, dict) else {}
                continue
            tool_calls.append(
                ToolCall(
                    id=block.id,
                    name=block.name,
                    arguments=block.input if isinstance(block.input, dict) else {},
                )
            )

    # In structured mode, surface the tool's arguments as the message
    # content so downstream ``parse_structured`` can validate it.
    if structured_mode and structured_payload is not None:
        content = _json.dumps(structured_payload)

    usage: dict[str, int] = {}
    if response.usage:
        usage = {
            "prompt_tokens": response.usage.input_tokens,
            "completion_tokens": response.usage.output_tokens,
        }
        # Anthropic returns these only when prompt caching is in play.
        # Surface them on usage so AgentResult.metrics can show
        # cache hits/misses and cost-saved estimates.
        cache_creation = getattr(response.usage, "cache_creation_input_tokens", None)
        cache_read = getattr(response.usage, "cache_read_input_tokens", None)
        if cache_creation is not None:
            usage["cache_creation_input_tokens"] = cache_creation
        if cache_read is not None:
            usage["cache_read_input_tokens"] = cache_read

    return ModelResponse(
        message=Message.assistant(content=content, tool_calls=tool_calls),
        usage=usage,
        stop_reason=response.stop_reason,
    )

stream async

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response.

Source code in src/locus/models/native/anthropic.py
async def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """Stream a chat response."""
    system_prompt, anthropic_messages = self._convert_messages(messages)
    anthropic_tools = self._convert_tools(tools)

    params: dict[str, Any] = {
        "model": self.config.model,
        "messages": anthropic_messages,
        "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
    }
    if system_prompt:
        params["system"] = system_prompt
    if anthropic_tools:
        params["tools"] = anthropic_tools

    async with self.client.messages.stream(**params) as stream:
        async for text in stream.text_stream:
            yield ModelChunkEvent(content=text)

    yield ModelChunkEvent(done=True)

Ollama

OllamaModel

OllamaModel(model: str = 'llama3.3', base_url: str = 'http://localhost:11434', max_tokens: int = 4096, temperature: float = 0.7, **kwargs: Any)

Bases: BaseModel

Ollama model provider for local LLMs.

Supports any model available in Ollama (Llama, Mistral, Gemma, etc.) with tool calling support.

Example

model = OllamaModel(model="llama3.3") response = await model.complete([Message.user("Hello!")])

Source code in src/locus/models/native/ollama.py
def __init__(
    self,
    model: str = "llama3.3",
    base_url: str = "http://localhost:11434",
    max_tokens: int = 4096,
    temperature: float = 0.7,
    **kwargs: Any,
) -> None:
    config = OllamaConfig(
        model=model,
        base_url=base_url,
        max_tokens=max_tokens,
        temperature=temperature,
        **kwargs,
    )
    super().__init__(config=config)

supports_structured_output property

supports_structured_output: bool

Ollama doesn't yet ship OpenAI-style response_format.

The agent loop falls back to the prompted-JSON path with post-hoc parsing for Ollama models.

client property

client: Any

Get or create the Ollama async client.

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Source code in src/locus/models/native/ollama.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """Complete a chat request."""
    ollama_messages = self._convert_messages(messages)
    ollama_tools = self._convert_tools(tools)

    params: dict[str, Any] = {
        "model": self.config.model,
        "messages": ollama_messages,
        "options": {
            "temperature": kwargs.get("temperature", self.config.temperature),
            "num_predict": kwargs.get("max_tokens", self.config.max_tokens),
        },
    }
    if ollama_tools:
        params["tools"] = ollama_tools

    response = await self.client.chat(**params)

    # Parse response — ollama returns Message object, not dict
    msg = response.get("message") or response
    if hasattr(msg, "content"):
        # ollama Message object
        content = msg.content
    else:
        content = msg.get("content") if isinstance(msg, dict) else str(msg)
    tool_calls: list[ToolCall] = []

    raw_tool_calls = (
        getattr(msg, "tool_calls", None)
        or (msg.get("tool_calls") if isinstance(msg, dict) else None)
        or []
    )
    for tc in raw_tool_calls:
        func = tc.get("function", {})
        args = func.get("arguments", {})
        if isinstance(args, str):
            try:
                args = json.loads(args)
            except json.JSONDecodeError:
                args = {}
        tool_calls.append(
            ToolCall(
                id=f"call_{func.get('name', 'unknown')}",
                name=func.get("name", "unknown"),
                arguments=args if isinstance(args, dict) else {},
            )
        )

    usage = {}
    prompt_tokens = (
        getattr(response, "prompt_eval_count", None) or response.get("prompt_eval_count")
        if isinstance(response, dict)
        else None
    )
    if prompt_tokens:
        eval_count = getattr(response, "eval_count", None) or (
            response.get("eval_count") if isinstance(response, dict) else 0
        )
        usage = {"prompt_tokens": prompt_tokens, "completion_tokens": eval_count or 0}

    done = getattr(response, "done", None) or (
        response.get("done") if isinstance(response, dict) else None
    )

    return ModelResponse(
        message=Message.assistant(content=content, tool_calls=tool_calls),
        usage=usage,
        stop_reason="stop" if done else None,
    )

stream async

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response.

Source code in src/locus/models/native/ollama.py
async def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """Stream a chat response."""
    ollama_messages = self._convert_messages(messages)

    params: dict[str, Any] = {
        "model": self.config.model,
        "messages": ollama_messages,
        "options": {
            "temperature": kwargs.get("temperature", self.config.temperature),
        },
    }

    response = await self.client.chat(**params, stream=True)

    async for chunk in response:
        msg = chunk.get("message", {})
        content = msg.get("content", "")
        if content:
            yield ModelChunkEvent(content=content)
        if chunk.get("done"):
            yield ModelChunkEvent(done=True)
            return

    yield ModelChunkEvent(done=True)