Workflows

This page covers the practical patterns for building workflows. For the underlying model (durability, replay, history, determinism), see Core Concepts > Workflows.

Workflow versus activity

Workflow versus activity

When sketching the orchestration of your application, the first decision is what belongs in a workflow versus an activity.

A workflow is the orchestration brain. It coordinates steps, makes decisions, holds state across long durations (seconds to years), and waits for external events. Workflow code must be deterministic so the platform can replay it from history when a worker restarts.

An activity is the unit of real work — anything that touches the outside world. LLM calls, HTTP requests, database writes, file reads, and tool invocations all live in activities. Activities don't need to be deterministic, run for seconds to minutes, and are automatically retried with configurable backoff when they fail.

For example, a workflow that processes incoming invoices might call a fetch_invoice_pdf activity (HTTP download), then an extract_fields activity (LLM call with structured output), branch on the extracted total to decide whether human approval is needed, wait for an approve signal, and finally call a post_to_accounting_system activity. The order, the branching, and the wait sit in the workflow; each side effect lives in an activity.

For the activity counterpart of this page, see Activities.

Defining a workflow

Defining a workflow

To define a workflow, use the @workflow.define decorator on a class and mark the entry function with @workflow.entrypoint:

import mistralai.workflows as workflows

@workflows.workflow.define(name="report_workflow")
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str, include_details: bool = False) -> dict:
        """Workflow implementation."""
        # Orchestrate activities here
        ...
Workflow input

Workflow input

The run method accepts any JSON-serializable type — Pydantic models, plain dicts, or primitives all work. The platform validates inputs and generates Console UI schemas in every case. For the conceptual overview, see Core Concepts > Workflow input.

Primitive and multi-parameter inputs

Primitive and multi-parameter inputs

@workflow.define(name="report_workflow")
class ReportWorkflow:
    @workflow.entrypoint
    async def run(self, report_type: str, include_details: bool = False) -> dict:
        ...

Input:

{"report_type": "daily", "include_details": false}
Single Pydantic model

Single Pydantic model

When the entrypoint takes a single Pydantic BaseModel, its fields become the top-level input keys; there is no wrapper object:

from pydantic import BaseModel

class ReportParams(BaseModel):
    report_type: str
    include_details: bool = False

@workflow.define(name="report_workflow")
class ReportWorkflow:
    @workflow.entrypoint
    async def run(self, params: ReportParams) -> dict:
        ...

Input:

{"report_type": "daily", "include_details": true}
Union of Pydantic models

Union of Pydantic models

When a workflow can be triggered with different input shapes, use a union of BaseModel subclasses:

from pydantic import BaseModel, ConfigDict

class PromptInput(BaseModel):
    model_config = ConfigDict(extra="forbid")
    prompt: str

class CountInput(BaseModel):
    model_config = ConfigDict(extra="forbid")
    count: int

@workflow.define(name="flexible_workflow")
class FlexibleWorkflow:
    @workflow.entrypoint
    async def run(self, params: PromptInput | CountInput) -> str:
        if isinstance(params, PromptInput):
            return f"prompt: {params.prompt}"
        return f"count: {params.count}"

The SDK validates the input against each member in order and passes the first match to the handler:

{"prompt": "hello"}              → PromptInput
{"count": 42}                    → CountInput
{"count": 42, "extra": "field"}  → ValidationError
tip

Use extra="forbid" on each member model. It produces precise discrimination and clear error messages when the input doesn't match any expected shape.

Optional union (| None)

Optional union (| None)

Append | None (or use Optional from typing) to make the input optional:

@workflow.define(name="optional_workflow")
class OptionalWorkflow:
    @workflow.entrypoint
    async def run(self, params: PromptInput | CountInput | None) -> str:
        if params is None:
            return "no input"
        if isinstance(params, PromptInput):
            return f"prompt: {params.prompt}"
        return f"count: {params.count}"
Mixing BaseModel with primitive types

Mixing BaseModel with primitive types

Unions that combine a BaseModel with a non-model type (such as str or int) are not supported and fail when the @workflow.entrypoint decorator is applied:

# ❌ Raises an error at class definition time
@workflow.define(name="my_workflow")
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, params: PromptInput | str) -> str:
        ...
Parameter 'params' of 'run' has unsupported union members: str.
Union inputs only support Pydantic BaseModel subclasses and None.
Use a dedicated BaseModel instead.

If you genuinely need a union that includes a non-model type, define a named RootModel subclass and use that instead:

from pydantic import BaseModel, RootModel

class PromptInput(BaseModel):
    prompt: str

class PromptOrRaw(RootModel[PromptInput | str]):
    pass

@workflow.define(name="my_workflow")
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, params: PromptOrRaw) -> str:
        if isinstance(params.root, PromptInput):
            return f"structured: {params.root.prompt}"
        return f"raw: {params.root}"

The named subclass also produces a cleaner generated JSON schema.

Execution timeout

Execution timeout

For the role and default of execution_timeout, see Core Concepts > Execution timeout. To override the 1-hour default, pass a timedelta to the decorator:

from datetime import timedelta
from mistralai.workflows import workflow

@workflow.define(name="long_running_workflow", execution_timeout=timedelta(days=7))
class LongRunningWorkflow:
    @workflow.entrypoint
    async def run(self, params: MyParams) -> MyResult:
        ...

A workflow still running after execution_timeout is canceled with a WORKFLOW_EXECUTION_TIMED_OUT error.

note

execution_timeout is a total wall-clock limit, not an activity timeout. Individual activity timeouts are controlled by start_to_close_timeout or schedule_to_close_timeout on each activity call.

Core workflow features

Core workflow features

Signals

Signals

A signal is a fire-and-forget message delivered to a running workflow. The caller doesn't wait for a response — the workflow handles the signal asynchronously and updates its own state. Use signals for human approvals, external system callbacks, or any event that should nudge a workflow without expecting a return value.

For example, an invoice-processing workflow can pause until a reviewer signals approval:

@workflows.workflow.signal(name="approve", description="Approval signal")
async def approve_signal(self, data: ApprovalData) -> None:
    self.approved = True

Learn more about signals

Queries

Queries

A query reads workflow state without modifying it. Use queries to expose progress, current status, or computed values to external clients — for example, a dashboard polling for the percentage of items processed.

@workflows.workflow.query(name="get_status", description="Get current status")
def get_status(self) -> WorkflowStatus:
    return WorkflowStatus(
        progress=self.progress,
        status=self.current_status
    )

Learn more about queries

Updates

Updates

An update is like a signal, but the caller waits for the workflow to process the message and reply. Use updates when the caller needs confirmation or a computed result — for example, adjusting a workflow's configuration mid-run and getting the new state back, or submitting a value and receiving the validated, persisted version.

@workflows.workflow.update(name="update_config", description="Update workflow configuration")
async def update_config(self, config: UpdateConfig) -> UpdateResult:
    # Handle update
    return UpdateResult(success=True)

Learn more about updates

Schedules

Schedules

Define scheduled workflow executions using cron expressions:

from mistralai.workflows.models import ScheduleDefinition

schedule = ScheduleDefinition(
    input={"report_type": "daily", "include_details": False},  # Input parameters for scheduled runs
    cron_expressions=["0 0 * * *"]  # Run daily at midnight
)

@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str = "daily", include_details: bool = False) -> None:
        # Generate report
        pass

Key schedule features:

  • Cron-based scheduling with standard cron expressions
  • Input parameters for scheduled executions
  • Specify multiple cron expressions

Learn more about scheduling

Child workflows

Child workflows

Run other workflows as children:

from datetime import timedelta
from mistralai.workflows import workflow

result = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1)
)

You can also start a child workflow without waiting for its result (fire-and-forget) by passing wait=False:

handle = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1),
    wait=False,
)
# Parent continues immediately; child runs independently
# Optionally await later: result = await handle

By default, wait=False sets the parent close policy to ABANDON, so the child continues running even if the parent completes. You can override this with the parent_close_policy parameter:

from mistralai.workflows import ParentClosePolicy

handle = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1),
    wait=False,
    parent_close_policy=ParentClosePolicy.TERMINATE,
)
Waiting for conditions

Waiting for conditions

Pause the workflow until a condition becomes true (typically set by an incoming signal or update). The workflow sleeps without consuming resources. If the condition is not met within the timeout, a TimeoutError is raised.

from mistralai.workflows import workflow
await workflow.wait_condition(
    lambda: self.ready,
    timeout=timedelta(minutes=5)
)
Continue-as-new

Continue-as-new

Reset a workflow's history while keeping it logically alive — useful for indefinitely iterating workflows or runs that approach the 50K-event history limit.

For the full pattern (state parameters, when to trigger, determinism implications), see Continue-as-new.

Advanced features

Advanced features

Reset mechanism

Reset mechanism

You can "reset" a workflow execution to restart it from a specific point in its history. This is useful if your workflow gets stuck (for example, because of a non-deterministic error) or cannot complete. When you reset, the workflow ends its current run and starts again from the event you pick in the event history.

All workflow events up to the reset point are copied to the new execution. The workflow then continues running from there using the current version of your code. Any progress made after the reset event is lost.

Reset only after fixing the underlying problem. We recommend providing a reason for the reset, which is recorded in the workflow's event history.

from mistralai.client import Mistral

client = Mistral(api_key="your_api_key")

client.workflows.executions.reset_workflow(
    execution_id="your-execution-id",
    event_id=42,  # Must be a WORKFLOW_TASK_COMPLETED event
    reason="Bug fixed in activity logic",
    exclude_signals=True,  # Optional: skip replaying signals after this point
    exclude_updates=True   # Optional: skip replaying updates after this point
)
Workflow execution context

Workflow execution context

Retrieve the current execution's unique identifier at runtime. This is useful for logging, correlating events across services, or passing the execution ID to external systems so they can send signals back.

import mistralai.workflows as workflows
execution_id = workflows.get_execution_id()