Activities
An activity is where the actual work happens. Sending an email. Calling an external API. Running a database query. Processing a file. Generating a response from an AI model. Anything that interacts with the outside world, performs I/O, or has side effects belongs in an activity — not in the workflow itself.
Activities are ordinary async functions. They have no constraints around determinism: they can call anything, use any library, read from disk, write to a database. The platform treats each activity call as an atomic unit — if an activity fails, it is retried automatically according to a configurable policy. Once it succeeds, its result is recorded in the execution history and the workflow moves on.
Because results are persisted, a successful activity is never re-executed on replay — only its recorded result is used. This means activities should be idempotent: running them twice with the same input should produce the same observable outcome, in case a retry happens after a partial execution.
Defining an activity
import mistralai.workflows as workflows
@workflows.activity()
async def fetch_user_data(user_id: str) -> dict:
response = await http_client.get(f"/users/{user_id}")
return response.json()Activities are called with await directly inside workflow code:
@workflows.workflow.define(name="user_report")
class UserReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, user_id: str) -> dict:
user = await fetch_user_data(user_id)
report = await generate_report(user)
return reportRetry policy
Configure retries and timeouts on the decorator. Always set start_to_close_timeout — without it, a hung activity blocks indefinitely:
from datetime import timedelta
@workflows.activity(
start_to_close_timeout=timedelta(minutes=5),
retry_policy_max_attempts=3,
retry_policy_backoff_coefficient=2.0,
)
async def call_external_api(params: ApiParams) -> ApiResponse:
async with httpx.AsyncClient() as client:
response = await client.post(params.url, json=params.data)
return ApiResponse(data=response.json())Limitations
Use async libraries for I/O. Blocking calls stall the worker for all concurrent activities:
# ✅ Async HTTP
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
# ❌ Blocking HTTP — stalls the worker
response = requests.post(url, json=data)2MB input/output limit. Each activity call is limited to 2MB in and 2MB out.