Activity Basics

Defining an Activity

Defining an Activity

Basic activity structure:

import mistralai.workflows as workflows

@workflows.activity()
async def my_activity(input_data: str, count: int = 1) -> dict:
    """Activity implementation"""
    # Perform work here
    return {"result": input_data, "processed_count": count}
Core Activity Features

Core Activity Features

Activity Naming

Activity Naming

By default, an activity is registered using its Python function name. Use the name parameter to set a custom name — this is the identifier used for registration, execution routing, and what appears in traces and the console.

@workflows.activity(
    name="custom_activity_name",  # Used for registration, execution, and observability
)

Learn more about observability and naming

Timeouts

Timeouts

start_to_close_timeout sets the maximum time an activity can run from the moment it starts executing to the moment it must return a result. If the activity exceeds this duration, it is terminated and treated as a failure (which may trigger a retry). Without a timeout, a hung activity blocks indefinitely.

from datetime import timedelta

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=10)
)
Retry Policies

Retry Policies

When an activity fails, it is retried automatically up to retry_policy_max_attempts times. The retry_policy_backoff_coefficient controls exponential backoff between retries — a coefficient of 2.0 means the delay doubles after each attempt (e.g., 1s, 2s, 4s, 8s). After all retries are exhausted, the failure propagates to the workflow.

@workflows.activity(
    retry_policy_max_attempts=5,
    retry_policy_backoff_coefficient=2.0
)
Worker Stickiness

Worker Stickiness

Execute activities on the same worker for performance optimization:

@workflows.activity(
    sticky_to_worker=True
)

See Sticky Worker Sessions for detailed documentation

Heartbeat Timeout

Heartbeat Timeout

Detect stuck activities quickly by requiring periodic heartbeat signals:

from datetime import timedelta
from mistralai.workflows import activity

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=30),
    heartbeat_timeout=timedelta(seconds=30)
)
async def long_running_task(items: list[str]) -> dict:
    results = []
    for i, item in enumerate(items):
        result = await process_item(item)
        results.append(result)
        # Report progress to prevent timeout
        activity.heartbeat({"processed": i + 1, "total": len(items)})
    return {"results": results}

When heartbeat_timeout is set, activities must call activity.heartbeat() periodically. If no heartbeat is received within the timeout, the activity is considered failed and a retry is triggered. This enables fast detection of stuck activities without waiting for the full start_to_close_timeout.

Note: Not supported for local activities.

Granularity and Failure Handling

Granularity and Failure Handling

Activities should be designed to be as granular as possible. This means breaking down complex tasks into smaller, manageable activities. However, each activity should encapsulate all the logic that is susceptible to failure. This granular approach has several benefits:

  • Isolation of Failures: Smaller activities make it easier to isolate and handle failures. If a failure occurs, only the affected activity needs to be retried.
  • Easier Debugging: Granular activities make it easier to identify the exact point of failure and debug issues.
  • Better Retry Mechanisms: Since each activity is idempotent and can be retried independently, granular activities ensure that only the failed part of the process is retried, saving time and resources.

For example, if you have a process that involves fetching data from an API, processing that data, and then storing it in a database, you might define three separate activities:

  1. Fetch data from API
  2. Process the data
  3. Store the data in the database
Nested Activities

Nested Activities

When designing activities, it's important to understand how state management works, especially when activities are nested within each other. If an activity is encapsulated within another activity, only the parent activity will be considered for retries and state management. This means that the state will be saved before the nested activity, and only the parent activity will be retried in case of failure.

import mistralai.workflows as workflows

@workflows.activity()
async def parent_activity(input_data: str) -> dict:
    """Parent activity that encapsulates a nested activity."""
    # Some logic here
    nested_result = await nested_activity(input_data)
    return {"result_data": nested_result["result_data"]}

@workflows.activity()
async def nested_activity(input_data: str) -> dict:
    """Nested activity."""
    # Logic for the nested activity
    return {"result_data": "processed_data"}

In this example, if nested_activity fails, the entire parent_activity will be retried, not just the nested activity. The state will be saved before the nested activity is called.