Concurrency patterns: scale your workflows

Process thousands of items efficiently with Mistral Workflows' parallel execution patterns

Overview

Overview

tip

Small batches: For fewer than ~10,000 activities, asyncio.gather works well and keeps your code simple. Use the concurrency framework below when you need automatic continue-as-new, progress tracking, or are processing larger datasets that could exceed the 51,200-event history limit.

For small batches, asyncio.gather is the simplest approach:

import asyncio
import mistralai.workflows as workflows
from pydantic import BaseModel


@workflows.activity()
async def process_item(item: str) -> dict:
    return {"item": item, "result": f"processed:{item}"}


class Input(BaseModel):
    items: list[str]


@workflows.workflow.define(name="concurrency_workflow")
class ConcurrencyWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params: Input) -> list[dict]:
        results = await asyncio.gather(*[process_item(item) for item in params.items])
        return list(results)


async def main():
    result = await workflows.execute_workflow(
        ConcurrencyWorkflow,
        params=Input(items=["alpha", "beta", "gamma", "delta"]),
    )
    for r in result:
        print(r)

Mistral Workflows provides a concurrency framework that enables you to execute activities in parallel across three distinct patterns:

  • List Executor: Process a known collection of items
  • Chain Executor: Process items sequentially from a stream/queue (token-based pagination)
  • Offset Pagination Executor: Process items by fetching pages/chunks by index

All patterns provide automatic fault tolerance, progress tracking, and scalability for large datasets.

Key Benefits

  • Massive Parallelism: Execute thousands of activities concurrently
  • Automatic Continue-As-New: Handle large datasets without hitting execution history limits (51,200 events)
  • Type Safety: Comprehensive type validation for all inputs and outputs
  • Fault Tolerance: Built-in error handling and retry mechanisms
  • Progress Tracking: Monitor execution progress through built-in observability features
List Executor Pattern

List Executor Pattern

Use Case

Process a known collection of items where you have all items upfront.

When to Use

  • Database query results
  • File contents or uploaded documents
  • API responses that return all items at once
  • Batch processing of users, files, or records

Code Pattern

import mistralai.workflows as workflows

@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
    # Process individual item
    return {"processed_value": f"processed_{value}"}

# ... (inside a workflow)
# Execute in parallel
items = [{"item_id": i, "value": f"item_{i}"} for i in range(1000)]
results = await workflows.execute_activities_in_parallel(
    activity=process_item,
    items=items,
    max_concurrent_scheduled_tasks=100  # Optional: limit concurrency
)
# ...

Configuration Options

ParameterDescriptionDefault
max_concurrent_scheduled_tasksMaximum number of concurrent activity executions that can be scheduled simultaneously. This limits how many activities are waiting to be executed at once.100
extra_paramsAdditional parameters to pass to the activityNone

Note: The List Executor doesn't use max_concurrent_executions_per_worker parameter. This parameter is only relevant for the Offset Pagination Executor.

Chain Executor Pattern

Chain Executor Pattern

Use Case

Process items sequentially from a stream/queue using token-based pagination.

When to Use

  • AWS S3 ListObjects (uses ContinuationToken)
  • DynamoDB Scan/Query (uses LastEvaluatedKey)
  • Azure Blob Storage (uses marker)
  • Any API that uses continuation tokens instead of page numbers

Code Pattern

import mistralai.workflows as workflows

@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
    # Process individual item
    return {"processed_value": f"processed_{value}"}

@workflows.activity()
async def get_next_item(prev_item: dict | None) -> dict | None:
    # Get next item from previous item
    if prev_item is None:
        # First item
        return {"item_id": 0, "value": "item_0"}

    next_id = prev_item["item_id"] + 1
    if next_id >= 1000:  # Stop condition
        return None

    return {"item_id": next_id, "value": f"item_{next_id}"}

# ... (inside a workflow)
# Execute chain
results = await workflows.execute_activities_in_parallel(
    activity=process_item,
    get_item_from_prev_item_activity=get_next_item
)
# ...

Note: The Chain Executor doesn't use max_concurrent_scheduled_tasks or max_concurrent_executions_per_worker parameters.

How It Works

The Chain Executor separates item discovery (sequential) from item processing (parallel):

  1. Item fetching is chained: The executor calls get_item_from_prev_item_activity sequentially—first with None to get the first item, then with each result to get the next item, until the function returns None
  2. Processing is parallelized: As soon as an item is fetched, it's immediately dispatched for processing via the activity function. Items don't wait for each other to finish processing

This means fetching item N+1 depends on item N's result, but processing item N+1 runs concurrently with processing items 1 through N.

Offset Pagination Executor Pattern

Offset Pagination Executor Pattern

Use Case

Process items by fetching pages/chunks using index-based pagination.

When to Use

  • Traditional REST APIs with page numbers
  • SQL database chunking with OFFSET/LIMIT
  • File processing by byte offset
  • Any index-based data fetching

Code Pattern

import mistralai.workflows as workflows

@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
    # Process individual item
    return {"processed_value": f"processed_{value}"}

@workflows.activity()
async def get_item_by_index(params: workflows.GetItemFromIndexParams) -> dict:
    # Get item by index
    return {
        "item_id": params.idx,
        "value": f"item_{params.idx}",
        "extra_data": params.extra_params
    }

# Execute with offset pagination
results = await workflows.execute_activities_in_parallel(
    activity=process_item,
    get_item_from_index_activity=get_item_by_index,
    n_items=1000,  # Total number of items
    max_concurrent_executions_per_worker=50,  # Optional: controls how many items are processed together
    max_concurrent_scheduled_tasks=100,  # Optional: limit concurrent activity executions
    extra_params={"batch_id": "daily_processing"}  # Optional: extra parameters
)

Configuration Options

ParameterDescriptionDefault
n_itemsTotal number of items to processRequired
max_concurrent_scheduled_tasksMaximum number of concurrent activity executions that can be scheduled simultaneously. This limits how many activities are waiting to be executed at once.100
max_concurrent_executions_per_workerControls how many items are processed together in a single activity execution. Only used by Offset Pagination Executor.100
extra_paramsAdditional parameters to pass to the activityNone

Note: Items processed together are wrapped in a single activity. If any item fails, the entire group is retried together.

Offset Pagination Executor Best Practices

Offset Pagination Executor Best Practices

Understanding Parameter Interactions

The Offset Pagination Executor uses two key parameters:

  1. max_concurrent_executions_per_worker: Controls how many items are processed together in a single activity execution.
  2. max_concurrent_scheduled_tasks: Controls how many of these activity executions can run concurrently.

Example: If you have 1000 items with max_concurrent_executions_per_worker=10 and max_concurrent_scheduled_tasks=5, the system will:

  • Process items in groups of 10
  • Execute up to 5 groups concurrently
  • Process all items efficiently while maintaining manageable group sizes

Retry Behavior Considerations

Important: All items processed together are wrapped in a single activity. This means:

  • Pros: Efficient processing, reduced overhead
  • Cons: If any single item in a group fails, the entire group is retried

Best Practices:

  • Choose max_concurrent_executions_per_worker values that balance efficiency with retry granularity
  • Smaller values: More fine-grained retries, but higher overhead
  • Larger values: More efficient, but larger retry scope
  • Consider item failure rates when choosing values

Performance Optimization Strategies

  1. For I/O-bound tasks (API calls, database queries):

    • Use larger groups (50-100 items)
    • Higher max_concurrent_scheduled_tasks (100-200)
    • Example: max_concurrent_executions_per_worker=50, max_concurrent_scheduled_tasks=150
  2. For CPU-bound tasks:

    • Use smaller groups (5-20 items)
    • Lower max_concurrent_scheduled_tasks (20-50)
    • Example: max_concurrent_executions_per_worker=10, max_concurrent_scheduled_tasks=30
  3. For mixed workloads:

    • Medium groups (20-50 items)
    • Balanced concurrency (50-100)
    • Example: max_concurrent_executions_per_worker=25, max_concurrent_scheduled_tasks=75

Memory and Resource Considerations

  • Group size impacts memory usage: Larger groups consume more memory per activity
  • Concurrency impacts resource contention: Higher concurrency may lead to resource exhaustion
  • Size limits: Remember the 2MB input/output limit per activity
Advanced Features

Advanced Features

Error Handling

The concurrency framework automatically handles:

  • Activity failures with retry mechanisms
  • Type validation errors
  • Workflow continuation for large datasets
  • Progress tracking and state management

Performance Optimization

Concurrency Limits

Adjust concurrency based on your workload:

# For I/O-bound tasks (API calls, database queries)
results = await workflows.execute_activities_in_parallel(
    activity=api_call_activity,
    items=items,
    max_concurrent_scheduled_tasks=200  # Higher concurrency for I/O-bound
)

# For CPU-bound tasks
results = await workflows.execute_activities_in_parallel(
    activity=cpu_intensive_activity,
    items=items,
    max_concurrent_scheduled_tasks=50  # Lower concurrency for CPU-bound
)

Batch Processing

For very large datasets, the framework automatically handles continue-as-new:

# Process 100,000 items with List Executor - automatically continues as new workflow
results = await workflows.execute_activities_in_parallel(
    activity=process_item,
    items=large_item_list,  # 100,000 items
    max_concurrent_scheduled_tasks=100
)

# Process 100,000 items with Offset Pagination Executor
results = await workflows.execute_activities_in_parallel(
    activity=process_item,
    get_item_from_index_activity=get_item_by_index,
    n_items=100000,
    max_concurrent_executions_per_worker=100,  # Process 100 items together per activity
    max_concurrent_scheduled_tasks=50  # Execute 50 activities concurrently
)
Real-World Examples

Real-World Examples

Document Processing Pipeline

@workflows.activity()
async def extract_text(document_id: str, content: str, metadata: dict) -> dict:
    # Use OCR or text extraction
    extracted_text = await ocr_service.extract(content)
    return {
        "document_id": document_id,
        "extracted_text": extracted_text,
        "analysis_results": {}
    }

# Process all documents in parallel
documents = await fetch_documents_from_storage()
results = await workflows.execute_activities_in_parallel(
    activity=extract_text,
    items=documents
)

Batch Data Transformation

@workflows.activity()
async def enrich_user_data(user_id: str, name: str, email: str) -> dict:
    # Fetch additional data from external services
    profile_score = await analytics_service.calculate_score(user_id)
    recommendations = await recommendation_service.get_recommendations(user_id)

    return {
        "user_id": user_id,
        "name": name,
        "email": email,
        "profile_score": profile_score,
        "recommendations": recommendations
    }

# Enrich all users in parallel
users = await database.get_all_users()
enriched_users = await workflows.execute_activities_in_parallel(
    activity=enrich_user_data,
    items=users,
    max_concurrent_scheduled_tasks=50
)

Multi-Service Orchestration

@workflows.activity()
async def process_order(order_id: str, customer_id: str, items: list[str]) -> dict:
    # Call multiple services in parallel
    payment_result = await payment_service.process(order_id)
    inventory_result = await inventory_service.reserve(items)
    shipping_result = await shipping_service.schedule(order_id)

    return {
        "order_id": order_id,
        "status": "processed",
        "fulfillment_details": {
            "payment": payment_result,
            "inventory": inventory_result,
            "shipping": shipping_result
        }
    }

# Process all orders in parallel
orders = await fetch_pending_orders()
results = await workflows.execute_activities_in_parallel(
    activity=process_order,
    items=orders
)
Performance Considerations

Performance Considerations

Memory Usage

  • Large datasets are processed efficiently to avoid memory issues
  • The continue-as-new mechanism ensures workflow state remains manageable
  • Each activity execution is isolated with its own memory footprint

Network I/O Optimization

  • Activities can be executed on workers close to data sources
  • Use sticky_to_worker=True for activities that benefit from locality
  • Configure appropriate concurrency limits based on network bandwidth

Error Recovery

  • Failed activities are automatically retried according to their retry policy
  • The framework maintains progress state, so only failed items need reprocessing

Monitoring and Alerting

  • Implement custom event recording with workflows.record_event()
  • Set up alerts for long-running or failed executions
Troubleshooting

Troubleshooting

Common Issues

Issue: ValueError: 'activity' must be an activity, please decorate it with @workflows.activity

Solution: Ensure your activity function is decorated with @workflows.activity()

Issue: ValueError: Must specify one execution pattern

Solution: Provide exactly one of: items, get_item_from_prev_item_activity, or get_item_from_index_activity

Issue: Excessive retries with Offset Pagination Executor

Solution: If you're experiencing excessive retries with the Offset Pagination Executor, consider:

  • Reducing max_concurrent_executions_per_worker to create smaller groups of items
  • Investigating individual item failures that might be causing entire groups to retry
  • Adding better error handling within your activity to prevent failures

Issue: Memory issues with large groups

Solution: If you encounter memory issues:

  • Reduce max_concurrent_executions_per_worker to process fewer items together
  • Ensure individual items are not too large (remember the 2MB limit)
  • Monitor memory usage and adjust group sizes accordingly

Debugging Tips

  1. Check Activity Signatures: Ensure all activities have proper type annotations
  2. Validate Concurrency Limits: Start with lower concurrency and increase gradually
  3. Use Logging: Add detailed logging in your activities for debugging
  4. Offset Pagination Debugging: For Offset Pagination Executor issues:
    • Start with small values (e.g., max_concurrent_executions_per_worker=1)
    • Check for individual item failures that might affect entire groups
API Reference

API Reference

execute_activities_in_parallel() Function

async def execute_activities_in_parallel(
    activity: Callable[[T], Awaitable[U]],
    *,
    # List Executor
    items: List[T] | None = None,
    max_concurrent_scheduled_tasks: int = DEFAULT_MAX_CONCURRENT_SCHEDULED_TASKS,

    # Chain Executor
    get_item_from_prev_item_activity: Callable[[T | None], Awaitable[T | None]] | None = None,

    # Offset Pagination Executor
    get_item_from_index_activity: Callable[[GetItemFromIndexParams], Awaitable[T]] | None = None,
    n_items: int | None = None,
    max_concurrent_executions_per_worker: int = DEFAULT_MAX_CONCURRENT_EXECUTIONS_PER_WORKER,

    # Common
    extra_params: Dict[str, Any] | None = None,
) -> None | List[U]

Parameters:

  • activity: The activity function to execute on each item
  • items: List of items to process (List Executor)
  • get_item_from_prev_item_activity: Function to get next item from previous (Chain Executor)
  • get_item_from_index_activity: Function to get item by index (Offset Pagination Executor)
  • n_items: Total number of items (Offset Pagination Executor)
  • max_concurrent_scheduled_tasks: Maximum number of concurrent activity executions that can be scheduled simultaneously. Applies to List Executor and Offset Pagination Executor only.
  • max_concurrent_executions_per_worker: Only for Offset Pagination Executor - Controls how many items are processed together in a single activity execution.
  • extra_params: Extra parameters to pass to activities

Parameter Usage by Executor:

Executormax_concurrent_scheduled_tasksmax_concurrent_executions_per_worker
List Executor✅ Yes❌ No
Chain Executor❌ No❌ No
Offset Pagination Executor✅ Yes✅ Yes

Returns:

  • None if activity returns None
  • List[U] list of activity results

Raises:

  • ValueError: If activity is not properly decorated or parameters are invalid