Workflows
A workflow is the composition layer of your application: it defines what to do and when, while activities define how. A workflow coordinates activities, waits for external events, manages timers, and decides how to proceed based on results.
Workflows can run for any time frame, from seconds to months. They survive process restarts, infrastructure failures, and transient errors without losing their place. This is possible because the platform records every significant event in the workflow's lifecycle and uses that record to reconstruct state whenever needed.
Defining a workflow
To define a workflow, use the @workflows.workflow.define decorator on a class and the @workflows.workflow.entrypoint decorator on the method that starts the workflow:
import mistralai.workflows as workflows
from pydantic import BaseModel
class ReportParams(BaseModel):
report_type: str
include_details: bool = False
@workflows.workflow.define(name="report_workflow")
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: ReportParams) -> dict:
result = await process_report(params.report_type)
return {"result": result, "details": params.include_details}Workflow input
The run method accepts any JSON-serializable types: Pydantic models, plain dicts, or primitives. The platform validates inputs and generates Console UI schemas in all cases.
When the entrypoint takes a single Pydantic BaseModel, its fields become the top-level input keys:
{"report_type": "daily", "include_details": true}For workflows that accept different input shapes, use a union of models:
The SDK validates the input against each member in order and passes the first match. Use extra="forbid" on each model for precise discrimination and clear error messages.
Determinism
There is one fundamental constraint on workflow code: it must be deterministic. Given the same starting conditions, a workflow must always produce the same sequence of operations.
The reason is replay. When a worker restarts or recovers a workflow, it re-executes the workflow code from the beginning, matching each operation against the recorded event history. If the code produces a different sequence (because it called datetime.now() and got a different timestamp, or random() and got a different number), the replay diverges from history and the workflow fails with a non-determinism error.
This is why you use SDK-provided replacements for non-deterministic operations:
from mistralai.workflows import workflow
current_time = workflow.now() # not datetime.now()
request_id = workflow.uuid4() # not uuid.uuid4()
rand_value = workflow.random() # not random.random()Any operation that touches the outside world (an API call, a database query, a file read) belongs in an activity, not in the workflow itself.
The SDK enforces determinism by default: a sandbox intercepts calls to known non-deterministic APIs and raises an error immediately, rather than letting a subtle bug surface only on replay. You can opt out at two levels:
- Per workflow: pass
enforce_determinism=Falseto@workflows.workflow.define. - Worker-wide: set the
DEFAULT_ENFORCE_DETERMINISMenvironment variable (which setsconfig.worker.default_enforce_determinismtoFalse).
Keeping enforcement on is strongly recommended. With it disabled, a non-deterministic call does not fail at the call site. Instead it silently corrupts the workflow's history and surfaces as a divergence error on the next replay (worker restart, crash recovery), often hours or days later.
Execution timeout
Every workflow has an execution_timeout, the hard cap on its total lifetime including all retries and continue-as-new iterations. The default is 1 hour. Workflows that need to run longer must opt in:
from datetime import timedelta
@workflows.workflow.define(name="long_running_workflow", execution_timeout=timedelta(days=7))
class LongRunningWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: MyParams) -> MyResult:
...Limitations
2-second timeout between activities. Workflow code must complete within 2 seconds between activity calls. Move heavy computation to activities.
No I/O in workflows. All network calls, database queries, and file operations belong in activities.
2MB input/output limit. Each workflow invocation is limited to 2MB in and 2MB out. For larger payloads, use payload offloading to push inputs and outputs to your own blob storage.
Execution history limit. Each execution is capped at 51,200 events or 50MB. See Events for details.