Workflows: The Brains of Your Application
Workflows define the high-level business logic and coordinate activities in your application.
What is a Workflow?
A workflow is a durable, deterministic process that orchestrates activities and manages execution state. Key characteristics:
- Defined as Python classes
- Can run for seconds to years (with proper checkpointing)
- Maintains complete execution history
- Automatically recovers from failures
- Input/output limited to 2MB
- Timeout if synchronous CPU-bound Python code exceeds 2 seconds (see more)
Workflow Determinism
Workflows must be deterministic: given the same inputs, they must always produce the same sequence of commands. This is fundamental to the replay mechanism: when a worker restarts or a workflow is recovered, the system re-executes the workflow code from the beginning, matching each command against the recorded event history. If the code produces different commands on replay, the workflow fails with a non-determinism error.
Determinism is enforced by default. Workflow code runs inside a sandbox that intercepts non-deterministic calls and raises errors at runtime. You can opt out per-workflow with enforce_determinism=False or worker-wide with DEFAULT_ENFORCE_DETERMINISM=0. See Determinism Enforcement below for details.
Correct: Use Deterministic APIs
Always use these in workflow code:
from mistralai.workflows import workflow
# Use workflow.now() for current time
current_time = workflow.now()
# Use workflow.uuid4() for UUIDs
request_id = workflow.uuid4()
# Use workflow.random() for random values
random_value = workflow.random()Dangerous: Standard Library Equivalents
Don't use these directly in workflow code:
# Dangerous - use workflow.now() instead
from datetime import datetime
current_time = datetime.now()
# Dangerous - use workflow.uuid4() instead
import uuid
request_id = uuid.uuid4()
# Dangerous - use workflow.random() instead
import random
rand_val = random.random()Also dangerous in workflows:
- File system access (
open(),os.listdir(), etc.) - Direct HTTP calls or database queries
- Modifying global variables
- System calls (
os.environ,os.getcwd(), etc.)
Move Non-Deterministic Work to Activities
For operations like external API calls, database queries, or file I/O, use activities:
from mistralai.workflows import workflow
import mistralai.workflows as workflows
@workflows.activity()
async def fetch_external_data(params: DataParams) -> ExternalData:
# Safe - Activities are not replayed
response = await http_client.get(params.url)
timestamp = datetime.now() # OK in activities
return ExternalData(data=response.json(), fetched_at=timestamp)
@workflow.define(name="my_workflow")
class MyWorkflow:
@workflow.entrypoint
async def run(self, params: MyParams) -> MyResult:
# Correct - Non-deterministic work in activity
data = await fetch_external_data(params)
return MyResult(data=data)Determinism Enforcement (Sandbox)
When determinism enforcement is enabled, your workflow code runs inside a sandbox. The sandbox:
- Re-imports modules in an isolated environment so that side-effectful module-level code is contained
- Intercepts dangerous standard library calls such as
datetime.now(),random.random(),uuid.uuid4(),open(),os.environ, and other non-deterministic operations - Restricts the asyncio event loop to prevent spawning uncontrolled coroutines
If your workflow code attempts any of these operations, the sandbox raises an error at runtime rather than silently producing a non-determinism bug that surfaces only on replay.
If you disable the sandbox, determinism is your responsibility. Your code will still break on replay if it's non-deterministic, you just won't get an immediate error telling you so.
Disabling Per-Workflow
Determinism enforcement is enabled by default. To opt out for a specific workflow, pass
enforce_determinism=False to the @workflow.define decorator:
from mistralai.workflows import workflow
@workflow.define(name="my_workflow", enforce_determinism=False)
class MyWorkflow:
@workflow.entrypoint
async def run(self, input: str) -> str:
# This code runs WITHOUT the sandbox.
# You are responsible for ensuring determinism.
return f"Processed: {input}"Disabling Worker-Wide
Set the DEFAULT_ENFORCE_DETERMINISM environment variable to disable sandboxing for all
workflows on a worker by default:
DEFAULT_ENFORCE_DETERMINISM=0This sets config.worker.default_enforce_determinism to False.
Precedence
The decorator-level setting takes priority over the environment variable:
@workflow.define(enforce_determinism=...) | DEFAULT_ENFORCE_DETERMINISM | Result |
|---|---|---|
True | any | sandboxed |
False | any | not sandboxed |
| not set (default) | 1 / True (default) | sandboxed |
| not set (default) | 0 / False | not sandboxed |
Temporarily Bypassing the Sandbox
These escape hatches defeat the purpose of determinism enforcement. Use them only when you understand the implications and have no alternative.
When determinism enforcement is enabled, you may occasionally need to perform an operation that the sandbox blocks, for example importing a module that has non-deterministic side effects at import time, or performing a one-off read that you know is safe.
The SDK exposes two context managers under workflow.unsafe:
workflow.unsafe.imports_passed_through() — Allows imports inside the context to bypass the sandbox's module re-import mechanism.
Use this when a third-party library performs side effects at import time that conflict
with the sandbox.
from mistralai.workflows import workflow
@workflow.define(name="my_workflow")
class MyWorkflow:
@workflow.entrypoint
async def run(self, input: str) -> str:
with workflow.unsafe.imports_passed_through():
import some_problematic_library
# Use the library normally after the import
return some_problematic_library.process(input)workflow.unsafe.skip_determinism_enforcement() — Temporarily disables all sandbox restrictions within the context.
Code inside this block runs as if enforce_determinism=False.
from mistralai.workflows import workflow
@workflow.define(name="my_workflow")
class MyWorkflow:
@workflow.entrypoint
async def run(self, input: str) -> str:
with workflow.unsafe.skip_determinism_enforcement():
# Sandbox restrictions are lifted here.
# You are responsible for ensuring determinism.
import os
value = os.environ.get("SOME_CONFIG", "default")
return f"Processed: {input} with {value}"Recommendations
- Keep enforcement enabled. Determinism enforcement is on by default. Catching non-determinism at development time is far cheaper than debugging replay failures in production. Only disable it temporarily while migrating legacy workflows.
- Migrate existing workflows incrementally. If you have workflows that are not yet
compliant, set
enforce_determinism=Falseon those specific workflows while you fix them. Avoid disabling enforcement worker-wide. - Keep unsafe blocks small and documented. When you must bypass the sandbox, wrap the minimum amount of code and leave a comment explaining why.
- Move side effects to activities. The best way to avoid sandbox issues is to keep workflow code pure orchestration logic. All I/O, network calls, and non-deterministic operations belong in activities.
Defining a Workflow
Basic workflow structure:
import mistralai.workflows as workflows
@workflows.workflow.define(name="report_workflow")
class MyWorkflow:
async def run(self, report_type: str, include_details: bool = False) -> dict:
"""Workflow implementation"""
# Orchestrate activities here
passWorkflow Input
The run method accepts any JSON-serializable types as parameters.
Primitive and multi-parameter inputs
@workflow.define(name="report_workflow")
class ReportWorkflow:
@workflow.entrypoint
async def run(self, report_type: str, include_details: bool = False) -> dict:
...Input:
{"report_type": "daily", "include_details": false}Single Pydantic model
When the entrypoint takes a single Pydantic BaseModel, its fields become the top-level input keys — there is no wrapper object:
from pydantic import BaseModel
class ReportParams(BaseModel):
report_type: str
include_details: bool = False
@workflow.define(name="report_workflow")
class ReportWorkflow:
@workflow.entrypoint
async def run(self, params: ReportParams) -> dict:
...Input:
{"report_type": "daily", "include_details": true}Union of Pydantic models
When a workflow can be triggered with different input shapes, use a union of BaseModel subclasses:
from pydantic import BaseModel, ConfigDict
class PromptInput(BaseModel):
model_config = ConfigDict(extra="forbid")
prompt: str
class CountInput(BaseModel):
model_config = ConfigDict(extra="forbid")
count: int
@workflow.define(name="flexible_workflow")
class FlexibleWorkflow:
@workflow.entrypoint
async def run(self, params: PromptInput | CountInput) -> str:
if isinstance(params, PromptInput):
return f"prompt: {params.prompt}"
return f"count: {params.count}"The SDK validates the input against each member in order and passes the first match to the handler:
{"prompt": "hello"} → PromptInput
{"count": 42} → CountInput
{"count": 42, "extra": "field"} → ValidationErrorTip: Use
extra="forbid"on each member model. It produces precise discrimination and clear error messages when the input doesn't match any expected shape.
Optional union (| None)
Append | None (or use Optional from typing) to make the input optional:
@workflow.define(name="optional_workflow")
class OptionalWorkflow:
@workflow.entrypoint
async def run(self, params: PromptInput | CountInput | None) -> str:
if params is None:
return "no input"
if isinstance(params, PromptInput):
return f"prompt: {params.prompt}"
return f"count: {params.count}"Mixing BaseModel with primitive types
Unions that combine a BaseModel with a non-model type (str, int, etc.) are not supported and will fail when the @workflow.entrypoint decorator is applied:
# ❌ Raises an error at class definition time
@workflow.define(name="my_workflow")
class MyWorkflow:
@workflow.entrypoint
async def run(self, params: PromptInput | str) -> str:
...Parameter 'params' of 'run' has unsupported union members: str.
Union inputs only support Pydantic BaseModel subclasses and None.
Use a dedicated BaseModel instead.If you genuinely need a union that includes a non-model type, define a named RootModel subclass and use that instead:
from pydantic import BaseModel, RootModel
class PromptInput(BaseModel):
prompt: str
class PromptOrRaw(RootModel[PromptInput | str]):
pass
@workflow.define(name="my_workflow")
class MyWorkflow:
@workflow.entrypoint
async def run(self, params: PromptOrRaw) -> str:
if isinstance(params.root, PromptInput):
return f"structured: {params.root.prompt}"
return f"raw: {params.root}"The named subclass also produces a cleaner generated JSON schema.
Execution Timeout
Every workflow has a maximum total execution time — execution_timeout — that caps its lifetime including all retries and continue-as-new iterations.
Set it on the @workflow.define decorator:
from datetime import timedelta
from mistralai.workflows import workflow
@workflow.define(name="my_workflow", execution_timeout=timedelta(hours=4))
class MyWorkflow:
@workflow.entrypoint
async def run(self, params: MyParams) -> MyResult:
...When the workflow is started via the API, the platform uses this value as the hard cap. A workflow that is still running (or waiting) after execution_timeout has elapsed is cancelled with a WORKFLOW_EXECUTION_TIMED_OUT error.
Default: 1 hour. Workflows that need to run longer must opt in explicitly:
@workflow.define(name="long_running_workflow", execution_timeout=timedelta(days=7))
class LongRunningWorkflow:
...execution_timeout is a total wall-clock limit, not an activity timeout. Individual activity timeouts are controlled by start_to_close_timeout / schedule_to_close_timeout on each activity call.
Core Workflow Features
1. Signals
Handle external events with signals:
@workflows.workflow.signal(name="approve", description="Approval signal")
async def approve_signal(self, data: ApprovalData) -> None:
self.approved = True2. Queries
Expose workflow state through queries:
@workflows.workflow.query(name="get_status", description="Get current status")
def get_status(self) -> WorkflowStatus:
return WorkflowStatus(
progress=self.progress,
status=self.current_status
)3. Updates
Handle workflow updates:
@workflows.workflow.update(name="update_config", description="Update workflow configuration")
async def update_config(self, config: UpdateConfig) -> UpdateResult:
# Handle update
return UpdateResult(success=True)4. Schedules
Define scheduled workflow executions using cron expressions:
from mistralai.workflows.models import ScheduleDefinition
schedule = ScheduleDefinition(
input={"report_type": "daily", "include_details": False}, # Input parameters for scheduled runs
cron_expressions=["0 0 * * *"] # Run daily at midnight
)
@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str = "daily", include_details: bool = False) -> None:
# Generate report
passKey schedule features:
- Cron-based scheduling with standard cron expressions
- Input parameters for scheduled executions
- Multiple cron expressions can be specified
5. Child Workflows
Execute other workflows as children:
from datetime import timedelta
from mistralai.workflows import workflow
result = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1)
)You can also start a child workflow without waiting for its result (fire-and-forget) by passing wait=False:
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
)
# Parent continues immediately — child runs independently
# Optionally await later: result = await handleBy default, wait=False sets the parent close policy to ABANDON, so the child continues running even if the parent completes. You can override this with the parent_close_policy parameter:
from mistralai.workflows import ParentClosePolicy
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
parent_close_policy=ParentClosePolicy.TERMINATE,
)6. Waiting for Conditions
Pause the workflow until a condition becomes true — typically set by an incoming signal or update. The workflow sleeps without consuming resources. If the condition is not met within the timeout, a TimeoutError is raised.
from mistralai.workflows import workflow
await workflow.wait_condition(
lambda: self.ready,
timeout=timedelta(minutes=5)
)7. Continue-As-New
Reset workflow history for long-running or iterative workflows. Use this to prevent history from growing too large when processing large datasets or running indefinitely.
Important: Your workflow's run method must accept a state parameter to restore its state when continuing as new. This parameter will receive the state passed to continue_as_new().
import mistralai.workflows as workflows
@workflows.workflow.define(name="long-running-processor")
class LongRunningProcessor:
@workflows.workflow.entrypoint
async def run(self, page: int = 0, total_processed: int = 0): # State parameters are required
# Process current page
# Check if history is getting large
if workflows.workflow.should_continue_as_new():
# Continue with fresh history
workflows.workflow.continue_as_new({"page": page + 1, "total_processed": total_processed + 100})
return await self.run(page + 1, total_processed + 100)When to use continue-as-new:
- Long-running workflows that iterate indefinitely
- When workflow history approaches size limits (~50K events)
- Workflows that need to run for weeks/months
Advanced Features
1. Reset Mechanism
You can "reset" a Workflow Execution to restart it from a specific point in its history. This is useful if your workflow gets stuck (for example, because of a non-deterministic error) or cannot complete. When you reset, the workflow ends its current run and starts again from the event you pick in the event history.
All workflow events up to the reset point are copied to the new execution. The workflow then continues running from there using the latest version of your code. Any progress made after the reset event will be lost.
Resetting should only be used after fixing the underlying problem. It's best practice to provide a reason for the reset, which is recorded in the workflow's event history.
from mistralai.client import Mistral
client = Mistral(api_key="your_api_key")
client.workflows.executions.reset_workflow(
execution_id="your-execution-id",
event_id=42, # Must be a WORKFLOW_TASK_COMPLETED event
reason="Bug fixed in activity logic",
exclude_signals=True, # Optional: skip replaying signals after this point
exclude_updates=True # Optional: skip replaying updates after this point
)2. Workflow Execution Context
Retrieve the current execution's unique identifier at runtime. This is useful for logging, correlating events across services, or passing the execution ID to external systems so they can send signals back.
import mistralai.workflows as workflows
execution_id = workflows.get_execution_id()Workflow vs Activity
| Feature | Workflow | Activity |
|---|---|---|
| Duration | Seconds to years | Seconds to minutes |
| State | Stateful | Stateless |
| Retries | Manual recovery | Automatic |
| Parallelism | Complex coordination | Single operation |