Workflows: The Brains of Your Application

Workflows define the high-level business logic and coordinate activities in your application.

What is a Workflow?

What is a Workflow?

A workflow is a durable, deterministic process that orchestrates activities and manages execution state. Key characteristics:

  • Defined as Python classes
  • Can run for seconds to years (with proper checkpointing)
  • Maintains complete execution history
  • Automatically recovers from failures
  • Input/output limited to 2MB
  • Timeout if synchronous CPU-bound Python code exceeds 2 seconds (see more)
Workflow Determinism

Workflow Determinism

Workflows must be deterministic: given the same inputs, they must always produce the same sequence of commands. This is fundamental to the replay mechanism: when a worker restarts or a workflow is recovered, the system re-executes the workflow code from the beginning, matching each command against the recorded event history. If the code produces different commands on replay, the workflow fails with a non-determinism error.

note

Determinism is enforced by default. Workflow code runs inside a sandbox that intercepts non-deterministic calls and raises errors at runtime. You can opt out per-workflow with enforce_determinism=False or worker-wide with DEFAULT_ENFORCE_DETERMINISM=0. See Determinism Enforcement below for details.

Correct: Use Deterministic APIs

Always use these in workflow code:

from mistralai.workflows import workflow

# Use workflow.now() for current time
current_time = workflow.now()

# Use workflow.uuid4() for UUIDs
request_id = workflow.uuid4()

# Use workflow.random() for random values
random_value = workflow.random()

Dangerous: Standard Library Equivalents

Don't use these directly in workflow code:

# Dangerous - use workflow.now() instead
from datetime import datetime
current_time = datetime.now()

# Dangerous - use workflow.uuid4() instead
import uuid
request_id = uuid.uuid4()

# Dangerous - use workflow.random() instead
import random
rand_val = random.random()

Also dangerous in workflows:

  • File system access (open(), os.listdir(), etc.)
  • Direct HTTP calls or database queries
  • Modifying global variables
  • System calls (os.environ, os.getcwd(), etc.)

Move Non-Deterministic Work to Activities

For operations like external API calls, database queries, or file I/O, use activities:

from mistralai.workflows import workflow
import mistralai.workflows as workflows

@workflows.activity()
async def fetch_external_data(params: DataParams) -> ExternalData:
    # Safe - Activities are not replayed
    response = await http_client.get(params.url)
    timestamp = datetime.now()  # OK in activities
    return ExternalData(data=response.json(), fetched_at=timestamp)

@workflow.define(name="my_workflow")
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, params: MyParams) -> MyResult:
        # Correct - Non-deterministic work in activity
        data = await fetch_external_data(params)
        return MyResult(data=data)

Determinism Enforcement (Sandbox)

When determinism enforcement is enabled, your workflow code runs inside a sandbox. The sandbox:

  • Re-imports modules in an isolated environment so that side-effectful module-level code is contained
  • Intercepts dangerous standard library calls such as datetime.now(), random.random(), uuid.uuid4(), open(), os.environ, and other non-deterministic operations
  • Restricts the asyncio event loop to prevent spawning uncontrolled coroutines

If your workflow code attempts any of these operations, the sandbox raises an error at runtime rather than silently producing a non-determinism bug that surfaces only on replay.

i
Information

If you disable the sandbox, determinism is your responsibility. Your code will still break on replay if it's non-deterministic, you just won't get an immediate error telling you so.

Disabling Per-Workflow

Determinism enforcement is enabled by default. To opt out for a specific workflow, pass enforce_determinism=False to the @workflow.define decorator:

from mistralai.workflows import workflow

@workflow.define(name="my_workflow", enforce_determinism=False)
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, input: str) -> str:
        # This code runs WITHOUT the sandbox.
        # You are responsible for ensuring determinism.
        return f"Processed: {input}"

Disabling Worker-Wide

Set the DEFAULT_ENFORCE_DETERMINISM environment variable to disable sandboxing for all workflows on a worker by default:

DEFAULT_ENFORCE_DETERMINISM=0

This sets config.worker.default_enforce_determinism to False.

Precedence

The decorator-level setting takes priority over the environment variable:

@workflow.define(enforce_determinism=...)DEFAULT_ENFORCE_DETERMINISMResult
Trueanysandboxed
Falseanynot sandboxed
not set (default)1 / True (default)sandboxed
not set (default)0 / Falsenot sandboxed

Temporarily Bypassing the Sandbox

warning

These escape hatches defeat the purpose of determinism enforcement. Use them only when you understand the implications and have no alternative.

When determinism enforcement is enabled, you may occasionally need to perform an operation that the sandbox blocks, for example importing a module that has non-deterministic side effects at import time, or performing a one-off read that you know is safe.

The SDK exposes two context managers under workflow.unsafe:

workflow.unsafe.imports_passed_through() — Allows imports inside the context to bypass the sandbox's module re-import mechanism. Use this when a third-party library performs side effects at import time that conflict with the sandbox.

from mistralai.workflows import workflow

@workflow.define(name="my_workflow")
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, input: str) -> str:
        with workflow.unsafe.imports_passed_through():
            import some_problematic_library
        # Use the library normally after the import
        return some_problematic_library.process(input)

workflow.unsafe.skip_determinism_enforcement() — Temporarily disables all sandbox restrictions within the context. Code inside this block runs as if enforce_determinism=False.

from mistralai.workflows import workflow

@workflow.define(name="my_workflow")
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, input: str) -> str:
        with workflow.unsafe.skip_determinism_enforcement():
            # Sandbox restrictions are lifted here.
            # You are responsible for ensuring determinism.
            import os
            value = os.environ.get("SOME_CONFIG", "default")
        return f"Processed: {input} with {value}"

Recommendations

  1. Keep enforcement enabled. Determinism enforcement is on by default. Catching non-determinism at development time is far cheaper than debugging replay failures in production. Only disable it temporarily while migrating legacy workflows.
  2. Migrate existing workflows incrementally. If you have workflows that are not yet compliant, set enforce_determinism=False on those specific workflows while you fix them. Avoid disabling enforcement worker-wide.
  3. Keep unsafe blocks small and documented. When you must bypass the sandbox, wrap the minimum amount of code and leave a comment explaining why.
  4. Move side effects to activities. The best way to avoid sandbox issues is to keep workflow code pure orchestration logic. All I/O, network calls, and non-deterministic operations belong in activities.
Defining a Workflow

Defining a Workflow

Basic workflow structure:

import mistralai.workflows as workflows

@workflows.workflow.define(name="report_workflow")
class MyWorkflow:
    async def run(self, report_type: str, include_details: bool = False) -> dict:
        """Workflow implementation"""
        # Orchestrate activities here
        pass
Workflow Input

Workflow Input

The run method accepts any JSON-serializable types as parameters.

Primitive and multi-parameter inputs

@workflow.define(name="report_workflow")
class ReportWorkflow:
    @workflow.entrypoint
    async def run(self, report_type: str, include_details: bool = False) -> dict:
        ...

Input:

{"report_type": "daily", "include_details": false}

Single Pydantic model

When the entrypoint takes a single Pydantic BaseModel, its fields become the top-level input keys — there is no wrapper object:

from pydantic import BaseModel

class ReportParams(BaseModel):
    report_type: str
    include_details: bool = False

@workflow.define(name="report_workflow")
class ReportWorkflow:
    @workflow.entrypoint
    async def run(self, params: ReportParams) -> dict:
        ...

Input:

{"report_type": "daily", "include_details": true}

Union of Pydantic models

When a workflow can be triggered with different input shapes, use a union of BaseModel subclasses:

from pydantic import BaseModel, ConfigDict

class PromptInput(BaseModel):
    model_config = ConfigDict(extra="forbid")
    prompt: str

class CountInput(BaseModel):
    model_config = ConfigDict(extra="forbid")
    count: int

@workflow.define(name="flexible_workflow")
class FlexibleWorkflow:
    @workflow.entrypoint
    async def run(self, params: PromptInput | CountInput) -> str:
        if isinstance(params, PromptInput):
            return f"prompt: {params.prompt}"
        return f"count: {params.count}"

The SDK validates the input against each member in order and passes the first match to the handler:

{"prompt": "hello"}              → PromptInput
{"count": 42}                    → CountInput
{"count": 42, "extra": "field"}  → ValidationError

Tip: Use extra="forbid" on each member model. It produces precise discrimination and clear error messages when the input doesn't match any expected shape.

Optional union (| None)

Append | None (or use Optional from typing) to make the input optional:

@workflow.define(name="optional_workflow")
class OptionalWorkflow:
    @workflow.entrypoint
    async def run(self, params: PromptInput | CountInput | None) -> str:
        if params is None:
            return "no input"
        if isinstance(params, PromptInput):
            return f"prompt: {params.prompt}"
        return f"count: {params.count}"

Mixing BaseModel with primitive types

Unions that combine a BaseModel with a non-model type (str, int, etc.) are not supported and will fail when the @workflow.entrypoint decorator is applied:

# ❌ Raises an error at class definition time
@workflow.define(name="my_workflow")
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, params: PromptInput | str) -> str:
        ...
Parameter 'params' of 'run' has unsupported union members: str.
Union inputs only support Pydantic BaseModel subclasses and None.
Use a dedicated BaseModel instead.

If you genuinely need a union that includes a non-model type, define a named RootModel subclass and use that instead:

from pydantic import BaseModel, RootModel

class PromptInput(BaseModel):
    prompt: str

class PromptOrRaw(RootModel[PromptInput | str]):
    pass

@workflow.define(name="my_workflow")
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, params: PromptOrRaw) -> str:
        if isinstance(params.root, PromptInput):
            return f"structured: {params.root.prompt}"
        return f"raw: {params.root}"

The named subclass also produces a cleaner generated JSON schema.

Execution Timeout

Execution Timeout

Every workflow has a maximum total execution time — execution_timeout — that caps its lifetime including all retries and continue-as-new iterations.

Set it on the @workflow.define decorator:

from datetime import timedelta
from mistralai.workflows import workflow

@workflow.define(name="my_workflow", execution_timeout=timedelta(hours=4))
class MyWorkflow:
    @workflow.entrypoint
    async def run(self, params: MyParams) -> MyResult:
        ...

When the workflow is started via the API, the platform uses this value as the hard cap. A workflow that is still running (or waiting) after execution_timeout has elapsed is cancelled with a WORKFLOW_EXECUTION_TIMED_OUT error.

Default: 1 hour. Workflows that need to run longer must opt in explicitly:

@workflow.define(name="long_running_workflow", execution_timeout=timedelta(days=7))
class LongRunningWorkflow:
    ...
note

execution_timeout is a total wall-clock limit, not an activity timeout. Individual activity timeouts are controlled by start_to_close_timeout / schedule_to_close_timeout on each activity call.

Core Workflow Features

Core Workflow Features

1. Signals

Handle external events with signals:

@workflows.workflow.signal(name="approve", description="Approval signal")
async def approve_signal(self, data: ApprovalData) -> None:
    self.approved = True

Learn more about signals

2. Queries

Expose workflow state through queries:

@workflows.workflow.query(name="get_status", description="Get current status")
def get_status(self) -> WorkflowStatus:
    return WorkflowStatus(
        progress=self.progress,
        status=self.current_status
    )

Learn more about queries

3. Updates

Handle workflow updates:

@workflows.workflow.update(name="update_config", description="Update workflow configuration")
async def update_config(self, config: UpdateConfig) -> UpdateResult:
    # Handle update
    return UpdateResult(success=True)

Learn more about updates

4. Schedules

Define scheduled workflow executions using cron expressions:

from mistralai.workflows.models import ScheduleDefinition

schedule = ScheduleDefinition(
    input={"report_type": "daily", "include_details": False},  # Input parameters for scheduled runs
    cron_expressions=["0 0 * * *"]  # Run daily at midnight
)

@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str = "daily", include_details: bool = False) -> None:
        # Generate report
        pass

Key schedule features:

  • Cron-based scheduling with standard cron expressions
  • Input parameters for scheduled executions
  • Multiple cron expressions can be specified

Learn more about scheduling

5. Child Workflows

Execute other workflows as children:

from datetime import timedelta
from mistralai.workflows import workflow

result = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1)
)

You can also start a child workflow without waiting for its result (fire-and-forget) by passing wait=False:

handle = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1),
    wait=False,
)
# Parent continues immediately — child runs independently
# Optionally await later: result = await handle

By default, wait=False sets the parent close policy to ABANDON, so the child continues running even if the parent completes. You can override this with the parent_close_policy parameter:

from mistralai.workflows import ParentClosePolicy

handle = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1),
    wait=False,
    parent_close_policy=ParentClosePolicy.TERMINATE,
)

6. Waiting for Conditions

Pause the workflow until a condition becomes true — typically set by an incoming signal or update. The workflow sleeps without consuming resources. If the condition is not met within the timeout, a TimeoutError is raised.

from mistralai.workflows import workflow
await workflow.wait_condition(
    lambda: self.ready,
    timeout=timedelta(minutes=5)
)

7. Continue-As-New

Reset workflow history for long-running or iterative workflows. Use this to prevent history from growing too large when processing large datasets or running indefinitely.

Important: Your workflow's run method must accept a state parameter to restore its state when continuing as new. This parameter will receive the state passed to continue_as_new().

import mistralai.workflows as workflows

@workflows.workflow.define(name="long-running-processor")
class LongRunningProcessor:
    @workflows.workflow.entrypoint
    async def run(self, page: int = 0, total_processed: int = 0):  # State parameters are required
        # Process current page

        # Check if history is getting large
        if workflows.workflow.should_continue_as_new():
            # Continue with fresh history
            workflows.workflow.continue_as_new({"page": page + 1, "total_processed": total_processed + 100})

        return await self.run(page + 1, total_processed + 100)

When to use continue-as-new:

  • Long-running workflows that iterate indefinitely
  • When workflow history approaches size limits (~50K events)
  • Workflows that need to run for weeks/months
Advanced Features

Advanced Features

1. Reset Mechanism

You can "reset" a Workflow Execution to restart it from a specific point in its history. This is useful if your workflow gets stuck (for example, because of a non-deterministic error) or cannot complete. When you reset, the workflow ends its current run and starts again from the event you pick in the event history.

All workflow events up to the reset point are copied to the new execution. The workflow then continues running from there using the latest version of your code. Any progress made after the reset event will be lost.

Resetting should only be used after fixing the underlying problem. It's best practice to provide a reason for the reset, which is recorded in the workflow's event history.

from mistralai.client import Mistral

client = Mistral(api_key="your_api_key")

client.workflows.executions.reset_workflow(
    execution_id="your-execution-id",
    event_id=42,  # Must be a WORKFLOW_TASK_COMPLETED event
    reason="Bug fixed in activity logic",
    exclude_signals=True,  # Optional: skip replaying signals after this point
    exclude_updates=True   # Optional: skip replaying updates after this point
)

2. Workflow Execution Context

Retrieve the current execution's unique identifier at runtime. This is useful for logging, correlating events across services, or passing the execution ID to external systems so they can send signals back.

import mistralai.workflows as workflows
execution_id = workflows.get_execution_id()
Workflow vs Activity

Workflow vs Activity

FeatureWorkflowActivity
DurationSeconds to yearsSeconds to minutes
StateStatefulStateless
RetriesManual recoveryAutomatic
ParallelismComplex coordinationSingle operation