Concurrency patterns: scale your workflows
Process thousands of items efficiently with Mistral Workflows' parallel execution patterns
Overview
Small batches: For fewer than ~10,000 activities, asyncio.gather works well and keeps your code simple. Use the concurrency framework below when you need automatic continue-as-new, progress tracking, or are processing larger datasets that could exceed the 51,200-event history limit.
For small batches, asyncio.gather is the simplest approach:
import asyncio
import mistralai.workflows as workflows
from pydantic import BaseModel
@workflows.activity()
async def process_item(item: str) -> dict:
return {"item": item, "result": f"processed:{item}"}
class Input(BaseModel):
items: list[str]
@workflows.workflow.define(name="concurrency_workflow")
class ConcurrencyWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: Input) -> list[dict]:
results = await asyncio.gather(*[process_item(item) for item in params.items])
return list(results)
async def main():
result = await workflows.execute_workflow(
ConcurrencyWorkflow,
params=Input(items=["alpha", "beta", "gamma", "delta"]),
)
for r in result:
print(r)Mistral Workflows provides a concurrency framework that enables you to execute activities in parallel across three distinct patterns:
- List Executor: Process a known collection of items
- Chain Executor: Process items sequentially from a stream/queue (token-based pagination)
- Offset Pagination Executor: Process items by fetching pages/chunks by index
All patterns provide automatic fault tolerance, progress tracking, and scalability for large datasets.
Key Benefits
- Massive Parallelism: Execute thousands of activities concurrently
- Automatic Continue-As-New: Handle large datasets without hitting execution history limits (51,200 events)
- Type Safety: Comprehensive type validation for all inputs and outputs
- Fault Tolerance: Built-in error handling and retry mechanisms
- Progress Tracking: Monitor execution progress through built-in observability features
List Executor Pattern
Use Case
Process a known collection of items where you have all items upfront.
When to Use
- Database query results
- File contents or uploaded documents
- API responses that return all items at once
- Batch processing of users, files, or records
Code Pattern
import mistralai.workflows as workflows
@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
# Process individual item
return {"processed_value": f"processed_{value}"}
# ... (inside a workflow)
# Execute in parallel
items = [{"item_id": i, "value": f"item_{i}"} for i in range(1000)]
results = await workflows.execute_activities_in_parallel(
activity=process_item,
items=items,
max_concurrent_scheduled_tasks=100 # Optional: limit concurrency
)
# ...Configuration Options
| Parameter | Description | Default |
|---|---|---|
max_concurrent_scheduled_tasks | Maximum number of concurrent activity executions that can be scheduled simultaneously. This limits how many activities are waiting to be executed at once. | 100 |
extra_params | Additional parameters to pass to the activity | None |
Note: The List Executor doesn't use max_concurrent_executions_per_worker parameter. This parameter is only relevant for the Offset Pagination Executor.
Chain Executor Pattern
Use Case
Process items sequentially from a stream/queue using token-based pagination.
When to Use
- AWS S3 ListObjects (uses ContinuationToken)
- DynamoDB Scan/Query (uses LastEvaluatedKey)
- Azure Blob Storage (uses marker)
- Any API that uses continuation tokens instead of page numbers
Code Pattern
import mistralai.workflows as workflows
@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
# Process individual item
return {"processed_value": f"processed_{value}"}
@workflows.activity()
async def get_next_item(prev_item: dict | None) -> dict | None:
# Get next item from previous item
if prev_item is None:
# First item
return {"item_id": 0, "value": "item_0"}
next_id = prev_item["item_id"] + 1
if next_id >= 1000: # Stop condition
return None
return {"item_id": next_id, "value": f"item_{next_id}"}
# ... (inside a workflow)
# Execute chain
results = await workflows.execute_activities_in_parallel(
activity=process_item,
get_item_from_prev_item_activity=get_next_item
)
# ...Note: The Chain Executor doesn't use max_concurrent_scheduled_tasks or max_concurrent_executions_per_worker parameters.
How It Works
The Chain Executor separates item discovery (sequential) from item processing (parallel):
- Item fetching is chained: The executor calls
get_item_from_prev_item_activitysequentially—first withNoneto get the first item, then with each result to get the next item, until the function returnsNone - Processing is parallelized: As soon as an item is fetched, it's immediately dispatched for processing via the
activityfunction. Items don't wait for each other to finish processing
This means fetching item N+1 depends on item N's result, but processing item N+1 runs concurrently with processing items 1 through N.
Offset Pagination Executor Pattern
Use Case
Process items by fetching pages/chunks using index-based pagination.
When to Use
- Traditional REST APIs with page numbers
- SQL database chunking with OFFSET/LIMIT
- File processing by byte offset
- Any index-based data fetching
Code Pattern
import mistralai.workflows as workflows
@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
# Process individual item
return {"processed_value": f"processed_{value}"}
@workflows.activity()
async def get_item_by_index(params: workflows.GetItemFromIndexParams) -> dict:
# Get item by index
return {
"item_id": params.idx,
"value": f"item_{params.idx}",
"extra_data": params.extra_params
}
# Execute with offset pagination
results = await workflows.execute_activities_in_parallel(
activity=process_item,
get_item_from_index_activity=get_item_by_index,
n_items=1000, # Total number of items
max_concurrent_executions_per_worker=50, # Optional: controls how many items are processed together
max_concurrent_scheduled_tasks=100, # Optional: limit concurrent activity executions
extra_params={"batch_id": "daily_processing"} # Optional: extra parameters
)Configuration Options
| Parameter | Description | Default |
|---|---|---|
n_items | Total number of items to process | Required |
max_concurrent_scheduled_tasks | Maximum number of concurrent activity executions that can be scheduled simultaneously. This limits how many activities are waiting to be executed at once. | 100 |
max_concurrent_executions_per_worker | Controls how many items are processed together in a single activity execution. Only used by Offset Pagination Executor. | 100 |
extra_params | Additional parameters to pass to the activity | None |
Note: Items processed together are wrapped in a single activity. If any item fails, the entire group is retried together.
Offset Pagination Executor Best Practices
Understanding Parameter Interactions
The Offset Pagination Executor uses two key parameters:
max_concurrent_executions_per_worker: Controls how many items are processed together in a single activity execution.max_concurrent_scheduled_tasks: Controls how many of these activity executions can run concurrently.
Example: If you have 1000 items with max_concurrent_executions_per_worker=10 and max_concurrent_scheduled_tasks=5, the system will:
- Process items in groups of 10
- Execute up to 5 groups concurrently
- Process all items efficiently while maintaining manageable group sizes
Retry Behavior Considerations
Important: All items processed together are wrapped in a single activity. This means:
- Pros: Efficient processing, reduced overhead
- Cons: If any single item in a group fails, the entire group is retried
Best Practices:
- Choose
max_concurrent_executions_per_workervalues that balance efficiency with retry granularity - Smaller values: More fine-grained retries, but higher overhead
- Larger values: More efficient, but larger retry scope
- Consider item failure rates when choosing values
Performance Optimization Strategies
-
For I/O-bound tasks (API calls, database queries):
- Use larger groups (50-100 items)
- Higher
max_concurrent_scheduled_tasks(100-200) - Example:
max_concurrent_executions_per_worker=50, max_concurrent_scheduled_tasks=150
-
For CPU-bound tasks:
- Use smaller groups (5-20 items)
- Lower
max_concurrent_scheduled_tasks(20-50) - Example:
max_concurrent_executions_per_worker=10, max_concurrent_scheduled_tasks=30
-
For mixed workloads:
- Medium groups (20-50 items)
- Balanced concurrency (50-100)
- Example:
max_concurrent_executions_per_worker=25, max_concurrent_scheduled_tasks=75
Memory and Resource Considerations
- Group size impacts memory usage: Larger groups consume more memory per activity
- Concurrency impacts resource contention: Higher concurrency may lead to resource exhaustion
- Size limits: Remember the 2MB input/output limit per activity
Advanced Features
Error Handling
The concurrency framework automatically handles:
- Activity failures with retry mechanisms
- Type validation errors
- Workflow continuation for large datasets
- Progress tracking and state management
Performance Optimization
Concurrency Limits
Adjust concurrency based on your workload:
# For I/O-bound tasks (API calls, database queries)
results = await workflows.execute_activities_in_parallel(
activity=api_call_activity,
items=items,
max_concurrent_scheduled_tasks=200 # Higher concurrency for I/O-bound
)
# For CPU-bound tasks
results = await workflows.execute_activities_in_parallel(
activity=cpu_intensive_activity,
items=items,
max_concurrent_scheduled_tasks=50 # Lower concurrency for CPU-bound
)Batch Processing
For very large datasets, the framework automatically handles continue-as-new:
# Process 100,000 items with List Executor - automatically continues as new workflow
results = await workflows.execute_activities_in_parallel(
activity=process_item,
items=large_item_list, # 100,000 items
max_concurrent_scheduled_tasks=100
)
# Process 100,000 items with Offset Pagination Executor
results = await workflows.execute_activities_in_parallel(
activity=process_item,
get_item_from_index_activity=get_item_by_index,
n_items=100000,
max_concurrent_executions_per_worker=100, # Process 100 items together per activity
max_concurrent_scheduled_tasks=50 # Execute 50 activities concurrently
)Real-World Examples
Document Processing Pipeline
@workflows.activity()
async def extract_text(document_id: str, content: str, metadata: dict) -> dict:
# Use OCR or text extraction
extracted_text = await ocr_service.extract(content)
return {
"document_id": document_id,
"extracted_text": extracted_text,
"analysis_results": {}
}
# Process all documents in parallel
documents = await fetch_documents_from_storage()
results = await workflows.execute_activities_in_parallel(
activity=extract_text,
items=documents
)Batch Data Transformation
@workflows.activity()
async def enrich_user_data(user_id: str, name: str, email: str) -> dict:
# Fetch additional data from external services
profile_score = await analytics_service.calculate_score(user_id)
recommendations = await recommendation_service.get_recommendations(user_id)
return {
"user_id": user_id,
"name": name,
"email": email,
"profile_score": profile_score,
"recommendations": recommendations
}
# Enrich all users in parallel
users = await database.get_all_users()
enriched_users = await workflows.execute_activities_in_parallel(
activity=enrich_user_data,
items=users,
max_concurrent_scheduled_tasks=50
)Multi-Service Orchestration
@workflows.activity()
async def process_order(order_id: str, customer_id: str, items: list[str]) -> dict:
# Call multiple services in parallel
payment_result = await payment_service.process(order_id)
inventory_result = await inventory_service.reserve(items)
shipping_result = await shipping_service.schedule(order_id)
return {
"order_id": order_id,
"status": "processed",
"fulfillment_details": {
"payment": payment_result,
"inventory": inventory_result,
"shipping": shipping_result
}
}
# Process all orders in parallel
orders = await fetch_pending_orders()
results = await workflows.execute_activities_in_parallel(
activity=process_order,
items=orders
)Performance Considerations
Memory Usage
- Large datasets are processed efficiently to avoid memory issues
- The continue-as-new mechanism ensures workflow state remains manageable
- Each activity execution is isolated with its own memory footprint
Network I/O Optimization
- Activities can be executed on workers close to data sources
- Use
sticky_to_worker=Truefor activities that benefit from locality - Configure appropriate concurrency limits based on network bandwidth
Error Recovery
- Failed activities are automatically retried according to their retry policy
- The framework maintains progress state, so only failed items need reprocessing
Monitoring and Alerting
- Implement custom event recording with
workflows.record_event() - Set up alerts for long-running or failed executions
Troubleshooting
Common Issues
Issue: ValueError: 'activity' must be an activity, please decorate it with @workflows.activity
Solution: Ensure your activity function is decorated with @workflows.activity()
Issue: ValueError: Must specify one execution pattern
Solution: Provide exactly one of: items, get_item_from_prev_item_activity, or get_item_from_index_activity
Issue: Excessive retries with Offset Pagination Executor
Solution: If you're experiencing excessive retries with the Offset Pagination Executor, consider:
- Reducing
max_concurrent_executions_per_workerto create smaller groups of items - Investigating individual item failures that might be causing entire groups to retry
- Adding better error handling within your activity to prevent failures
Issue: Memory issues with large groups
Solution: If you encounter memory issues:
- Reduce
max_concurrent_executions_per_workerto process fewer items together - Ensure individual items are not too large (remember the 2MB limit)
- Monitor memory usage and adjust group sizes accordingly
Debugging Tips
- Check Activity Signatures: Ensure all activities have proper type annotations
- Validate Concurrency Limits: Start with lower concurrency and increase gradually
- Use Logging: Add detailed logging in your activities for debugging
- Offset Pagination Debugging: For Offset Pagination Executor issues:
- Start with small values (e.g.,
max_concurrent_executions_per_worker=1) - Check for individual item failures that might affect entire groups
- Start with small values (e.g.,
API Reference
execute_activities_in_parallel() Function
async def execute_activities_in_parallel(
activity: Callable[[T], Awaitable[U]],
*,
# List Executor
items: List[T] | None = None,
max_concurrent_scheduled_tasks: int = DEFAULT_MAX_CONCURRENT_SCHEDULED_TASKS,
# Chain Executor
get_item_from_prev_item_activity: Callable[[T | None], Awaitable[T | None]] | None = None,
# Offset Pagination Executor
get_item_from_index_activity: Callable[[GetItemFromIndexParams], Awaitable[T]] | None = None,
n_items: int | None = None,
max_concurrent_executions_per_worker: int = DEFAULT_MAX_CONCURRENT_EXECUTIONS_PER_WORKER,
# Common
extra_params: Dict[str, Any] | None = None,
) -> None | List[U]Parameters:
activity: The activity function to execute on each itemitems: List of items to process (List Executor)get_item_from_prev_item_activity: Function to get next item from previous (Chain Executor)get_item_from_index_activity: Function to get item by index (Offset Pagination Executor)n_items: Total number of items (Offset Pagination Executor)max_concurrent_scheduled_tasks: Maximum number of concurrent activity executions that can be scheduled simultaneously. Applies to List Executor and Offset Pagination Executor only.max_concurrent_executions_per_worker: Only for Offset Pagination Executor - Controls how many items are processed together in a single activity execution.extra_params: Extra parameters to pass to activities
Parameter Usage by Executor:
| Executor | max_concurrent_scheduled_tasks | max_concurrent_executions_per_worker |
|---|---|---|
| List Executor | ✅ Yes | ❌ No |
| Chain Executor | ❌ No | ❌ No |
| Offset Pagination Executor | ✅ Yes | ✅ Yes |
Returns:
Noneif activity returns NoneList[U]list of activity results
Raises:
ValueError: If activity is not properly decorated or parameters are invalid