Workflows
This page covers the practical patterns for building workflows. For the underlying model (durability, replay, history, determinism), see Core Concepts > Workflows.
Workflow versus activity
When sketching the orchestration of your application, the first decision is what belongs in a workflow versus an activity.
A workflow is the orchestration brain. It coordinates steps, makes decisions, holds state across long durations (seconds to years), and waits for external events. Workflow code must be deterministic so the platform can replay it from history when a worker restarts.
An activity is the unit of real work — anything that touches the outside world. LLM calls, HTTP requests, database writes, file reads, and tool invocations all live in activities. Activities don't need to be deterministic, run for seconds to minutes, and are automatically retried with configurable backoff when they fail.
For example, a workflow that processes incoming invoices might call a fetch_invoice_pdf activity (HTTP download), then an extract_fields activity (LLM call with structured output), branch on the extracted total to decide whether human approval is needed, wait for an approve signal, and finally call a post_to_accounting_system activity. The order, the branching, and the wait sit in the workflow; each side effect lives in an activity.
For the activity counterpart of this page, see Activities.
Defining a workflow
To define a workflow, use the @workflow.define decorator on a class and mark the entry function with @workflow.entrypoint:
import mistralai.workflows as workflows
@workflows.workflow.define(name="report_workflow")
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str, include_details: bool = False) -> dict:
"""Workflow implementation."""
# Orchestrate activities here
...Workflow input
The run method accepts any JSON-serializable type — Pydantic models, plain dicts, or primitives all work. The platform validates inputs and generates Console UI schemas in every case. For the conceptual overview, see Core Concepts > Workflow input.
Primitive and multi-parameter inputs
@workflow.define(name="report_workflow")
class ReportWorkflow:
@workflow.entrypoint
async def run(self, report_type: str, include_details: bool = False) -> dict:
...Input:
{"report_type": "daily", "include_details": false}Single Pydantic model
When the entrypoint takes a single Pydantic BaseModel, its fields become the top-level input keys; there is no wrapper object:
from pydantic import BaseModel
class ReportParams(BaseModel):
report_type: str
include_details: bool = False
@workflow.define(name="report_workflow")
class ReportWorkflow:
@workflow.entrypoint
async def run(self, params: ReportParams) -> dict:
...Input:
{"report_type": "daily", "include_details": true}Union of Pydantic models
When a workflow can be triggered with different input shapes, use a union of BaseModel subclasses:
from pydantic import BaseModel, ConfigDict
class PromptInput(BaseModel):
model_config = ConfigDict(extra="forbid")
prompt: str
class CountInput(BaseModel):
model_config = ConfigDict(extra="forbid")
count: int
@workflow.define(name="flexible_workflow")
class FlexibleWorkflow:
@workflow.entrypoint
async def run(self, params: PromptInput | CountInput) -> str:
if isinstance(params, PromptInput):
return f"prompt: {params.prompt}"
return f"count: {params.count}"The SDK validates the input against each member in order and passes the first match to the handler:
{"prompt": "hello"} → PromptInput
{"count": 42} → CountInput
{"count": 42, "extra": "field"} → ValidationErrorUse extra="forbid" on each member model. It produces precise discrimination and clear error messages when the input doesn't match any expected shape.
| None)Optional union (| None)
Append | None (or use Optional from typing) to make the input optional:
@workflow.define(name="optional_workflow")
class OptionalWorkflow:
@workflow.entrypoint
async def run(self, params: PromptInput | CountInput | None) -> str:
if params is None:
return "no input"
if isinstance(params, PromptInput):
return f"prompt: {params.prompt}"
return f"count: {params.count}"Mixing BaseModel with primitive types
Unions that combine a BaseModel with a non-model type (such as str or int) are not supported and fail when the @workflow.entrypoint decorator is applied:
# ❌ Raises an error at class definition time
@workflow.define(name="my_workflow")
class MyWorkflow:
@workflow.entrypoint
async def run(self, params: PromptInput | str) -> str:
...Parameter 'params' of 'run' has unsupported union members: str.
Union inputs only support Pydantic BaseModel subclasses and None.
Use a dedicated BaseModel instead.If you genuinely need a union that includes a non-model type, define a named RootModel subclass and use that instead:
from pydantic import BaseModel, RootModel
class PromptInput(BaseModel):
prompt: str
class PromptOrRaw(RootModel[PromptInput | str]):
pass
@workflow.define(name="my_workflow")
class MyWorkflow:
@workflow.entrypoint
async def run(self, params: PromptOrRaw) -> str:
if isinstance(params.root, PromptInput):
return f"structured: {params.root.prompt}"
return f"raw: {params.root}"The named subclass also produces a cleaner generated JSON schema.
Execution timeout
For the role and default of execution_timeout, see Core Concepts > Execution timeout. To override the 1-hour default, pass a timedelta to the decorator:
from datetime import timedelta
from mistralai.workflows import workflow
@workflow.define(name="long_running_workflow", execution_timeout=timedelta(days=7))
class LongRunningWorkflow:
@workflow.entrypoint
async def run(self, params: MyParams) -> MyResult:
...A workflow still running after execution_timeout is canceled with a WORKFLOW_EXECUTION_TIMED_OUT error.
execution_timeout is a total wall-clock limit, not an activity timeout. Individual activity timeouts are controlled by start_to_close_timeout or schedule_to_close_timeout on each activity call.
Core workflow features
Signals
A signal is a fire-and-forget message delivered to a running workflow. The caller doesn't wait for a response — the workflow handles the signal asynchronously and updates its own state. Use signals for human approvals, external system callbacks, or any event that should nudge a workflow without expecting a return value.
For example, an invoice-processing workflow can pause until a reviewer signals approval:
@workflows.workflow.signal(name="approve", description="Approval signal")
async def approve_signal(self, data: ApprovalData) -> None:
self.approved = TrueQueries
A query reads workflow state without modifying it. Use queries to expose progress, current status, or computed values to external clients — for example, a dashboard polling for the percentage of items processed.
@workflows.workflow.query(name="get_status", description="Get current status")
def get_status(self) -> WorkflowStatus:
return WorkflowStatus(
progress=self.progress,
status=self.current_status
)Updates
An update is like a signal, but the caller waits for the workflow to process the message and reply. Use updates when the caller needs confirmation or a computed result — for example, adjusting a workflow's configuration mid-run and getting the new state back, or submitting a value and receiving the validated, persisted version.
@workflows.workflow.update(name="update_config", description="Update workflow configuration")
async def update_config(self, config: UpdateConfig) -> UpdateResult:
# Handle update
return UpdateResult(success=True)Schedules
Define scheduled workflow executions using cron expressions:
from mistralai.workflows.models import ScheduleDefinition
schedule = ScheduleDefinition(
input={"report_type": "daily", "include_details": False}, # Input parameters for scheduled runs
cron_expressions=["0 0 * * *"] # Run daily at midnight
)
@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str = "daily", include_details: bool = False) -> None:
# Generate report
passKey schedule features:
- Cron-based scheduling with standard cron expressions
- Input parameters for scheduled executions
- Specify multiple cron expressions
Child workflows
Run other workflows as children:
from datetime import timedelta
from mistralai.workflows import workflow
result = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1)
)You can also start a child workflow without waiting for its result (fire-and-forget) by passing wait=False:
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
)
# Parent continues immediately; child runs independently
# Optionally await later: result = await handleBy default, wait=False sets the parent close policy to ABANDON, so the child continues running even if the parent completes. You can override this with the parent_close_policy parameter:
from mistralai.workflows import ParentClosePolicy
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
parent_close_policy=ParentClosePolicy.TERMINATE,
)Waiting for conditions
Pause the workflow until a condition becomes true (typically set by an incoming signal or update). The workflow sleeps without consuming resources. If the condition is not met within the timeout, a TimeoutError is raised.
from mistralai.workflows import workflow
await workflow.wait_condition(
lambda: self.ready,
timeout=timedelta(minutes=5)
)Continue-as-new
Reset a workflow's history while keeping it logically alive — useful for indefinitely iterating workflows or runs that approach the 50K-event history limit.
For the full pattern (state parameters, when to trigger, determinism implications), see Continue-as-new.
Advanced features
Reset mechanism
You can "reset" a workflow execution to restart it from a specific point in its history. This is useful if your workflow gets stuck (for example, because of a non-deterministic error) or cannot complete. When you reset, the workflow ends its current run and starts again from the event you pick in the event history.
All workflow events up to the reset point are copied to the new execution. The workflow then continues running from there using the current version of your code. Any progress made after the reset event is lost.
Reset only after fixing the underlying problem. We recommend providing a reason for the reset, which is recorded in the workflow's event history.
from mistralai.client import Mistral
client = Mistral(api_key="your_api_key")
client.workflows.executions.reset_workflow(
execution_id="your-execution-id",
event_id=42, # Must be a WORKFLOW_TASK_COMPLETED event
reason="Bug fixed in activity logic",
exclude_signals=True, # Optional: skip replaying signals after this point
exclude_updates=True # Optional: skip replaying updates after this point
)Workflow execution context
Retrieve the current execution's unique identifier at runtime. This is useful for logging, correlating events across services, or passing the execution ID to external systems so they can send signals back.
import mistralai.workflows as workflows
execution_id = workflows.get_execution_id()