Activities

An activity is where the actual work happens. Sending an email. Calling an external API. Running a database query. Processing a file. Generating a response from an AI model. Anything that interacts with the outside world, performs I/O, or has side effects belongs in an activity — not in the workflow itself.

Activities are ordinary async functions. They have no constraints around determinism: they can call anything, use any library, read from disk, write to a database. The platform treats each activity call as an atomic unit — if an activity fails, it is retried automatically according to a configurable policy. Once it succeeds, its result is recorded in the execution history and the workflow moves on.

Because results are persisted, a successful activity is never re-executed on replay — only its recorded result is used. This means activities should be idempotent: running them twice with the same input should produce the same observable outcome, in case a retry happens after a partial execution.

Defining an activity

Defining an activity

import mistralai.workflows as workflows

@workflows.activity()
async def fetch_user_data(user_id: str) -> dict:
    response = await http_client.get(f"/users/{user_id}")
    return response.json()

Activities are called with await directly inside workflow code:

@workflows.workflow.define(name="user_report")
class UserReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, user_id: str) -> dict:
        user = await fetch_user_data(user_id)
        report = await generate_report(user)
        return report
Retry policy

Retry policy

Configure retries and timeouts on the decorator. Always set start_to_close_timeout — without it, a hung activity blocks indefinitely:

from datetime import timedelta

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=5),
    retry_policy_max_attempts=3,
    retry_policy_backoff_coefficient=2.0,
)
async def call_external_api(params: ApiParams) -> ApiResponse:
    async with httpx.AsyncClient() as client:
        response = await client.post(params.url, json=params.data)
        return ApiResponse(data=response.json())
Limitations

Limitations

Use async libraries for I/O. Blocking calls stall the worker for all concurrent activities:

# ✅ Async HTTP
async with httpx.AsyncClient() as client:
    response = await client.post(url, json=data)

# ❌ Blocking HTTP — stalls the worker
response = requests.post(url, json=data)

2MB input/output limit. Each activity call is limited to 2MB in and 2MB out.