Modèles de concurrence : faites évoluer vos workflows
Traitez efficacement des milliers d'éléments grâce aux modèles d'exécution parallèle de Mistral Workflows
Vue d'ensemble
Petits lots : Pour moins de ~10 000 activités, asyncio.gather fonctionne bien et simplifie votre code. Utilisez le framework de concurrence ci-dessous lorsque vous avez besoin d'un continue-as-new automatique, d'un suivi de progression, ou que vous traitez des ensembles de données plus volumineux susceptibles de dépasser la limite de 51 200 événements d'historique.
Pour les petits lots, asyncio.gather est l'approche la plus simple :
import asyncio
import mistralai.workflows as workflows
from pydantic import BaseModel
@workflows.activity()
async def process_item(item: str) -> dict:
return {"item": item, "result": f"processed:{item}"}
class Input(BaseModel):
items: list[str]
@workflows.workflow.define(name="concurrency_workflow")
class ConcurrencyWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: Input) -> list[dict]:
results = await asyncio.gather(*[process_item(item) for item in params.items])
return list(results)
async def main():
result = await workflows.execute_workflow(
ConcurrencyWorkflow,
params=Input(items=["alpha", "beta", "gamma", "delta"]),
)
for r in result:
print(r)Mistral Workflows fournit un framework de concurrence qui vous permet d'exécuter des activités en parallèle selon trois modèles distincts :
- List Executor : traiter une collection connue d'éléments
- Chain Executor : traiter des éléments de manière séquentielle depuis un flux/une file (pagination par jeton)
- Offset Pagination Executor : traiter des éléments en récupérant des pages/blocs par index
Tous les modèles offrent une tolérance aux pannes automatique, un suivi de progression et une scalabilité pour les grands ensembles de données.
Avantages clés
- Parallélisme massif : exécutez des milliers d'activités en simultané
- Continue-as-new automatique : gérez de grands ensembles de données sans atteindre les limites d'historique d'exécution (51 200 événements)
- Sécurité de type : validation complète de tous les inputs et outputs
- Tolérance aux pannes : mécanismes intégrés de gestion d'erreurs et de retry
- Suivi de progression : surveillez la progression d'exécution grâce aux fonctionnalités d'observabilité intégrées
Modèle List Executor
Cas d'usage
Traiter une collection connue d'éléments dont vous disposez tous dès le départ.
Quand l'utiliser
- Résultats de requêtes de base de données
- Contenus de fichiers ou documents téléchargés
- Réponses d'API qui retournent tous les éléments d'un coup
- batch d'utilisateurs, fichiers ou enregistrements
Modèle de code
import mistralai.workflows as workflows
@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
# Process individual item
return {"processed_value": f"processed_{value}"}
# ... (inside a workflow)
# Execute in parallel
items = [{"item_id": i, "value": f"item_{i}"} for i in range(1000)]
results = await workflows.execute_activities_in_parallel(
activity=process_item,
items=items,
max_concurrent_scheduled_tasks=100 # Optional: limit concurrency
)
# ...Options de configuration
| Paramètre | Description | Par défaut |
|---|---|---|
max_concurrent_scheduled_tasks | Nombre maximum d'exécutions d'activités concurrentes pouvant être planifiées simultanément. Limite le nombre d'activités en attente d'exécution à un instant donné. | 100 |
extra_params | Paramètres supplémentaires à transmettre à l'activité | None |
Remarque : List Executor n'utilise pas le paramètre max_concurrent_executions_per_worker. Ce paramètre n'est pertinent que pour Offset Pagination Executor.
Modèle Chain Executor
Cas d'usage
Traiter des éléments de manière séquentielle depuis un flux/une file au moyen d'une pagination par jeton.
Quand l'utiliser
- AWS S3 ListObjects (utilise ContinuationToken)
- DynamoDB Scan/Query (utilise LastEvaluatedKey)
- Azure Blob Storage (utilise marker)
- Toute API utilisant des jetons de continuation plutôt que des numéros de page
Modèle de code
import mistralai.workflows as workflows
@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
# Process individual item
return {"processed_value": f"processed_{value}"}
@workflows.activity()
async def get_next_item(prev_item: dict | None) -> dict | None:
# Get next item from previous item
if prev_item is None:
# First item
return {"item_id": 0, "value": "item_0"}
next_id = prev_item["item_id"] + 1
if next_id >= 1000: # Stop condition
return None
return {"item_id": next_id, "value": f"item_{next_id}"}
# ... (inside a workflow)
# Execute chain
results = await workflows.execute_activities_in_parallel(
activity=process_item,
get_item_from_prev_item_activity=get_next_item
)
# ...Remarque : Chain Executor n'utilise pas les paramètres max_concurrent_scheduled_tasks ou max_concurrent_executions_per_worker.
Comment ça fonctionne
Chain Executor sépare la découverte d'éléments (séquentielle) du traitement d'éléments (parallèle) :
- La récupération d'éléments est chaînée : l'executor appelle
get_item_from_prev_item_activityde manière séquentielle — d'abord avecNonepour obtenir le premier élément, puis avec chaque résultat pour obtenir l'élément suivant, jusqu'à ce que la fonction retourneNone - Le traitement est parallélisé : dès qu'un élément est récupéré, il est immédiatement envoyé pour traitement via la fonction
activity. Les éléments n'attendent pas que les autres terminent leur traitement
Cela signifie que la récupération de l'élément N+1 dépend du résultat de l'élément N, mais le traitement de l'élément N+1 s'exécute en parallèle avec le traitement des éléments 1 à N.
Modèle Offset Pagination Executor
Cas d'usage
Traiter des éléments en récupérant des pages/blocs au moyen d'une pagination par index.
Quand l'utiliser
- API REST traditionnelles avec numéros de page
- Découpage de base de données SQL avec OFFSET/LIMIT
- Traitement de fichiers par décalage d'octets
- Toute récupération de données basée sur un index
Modèle de code
import mistralai.workflows as workflows
@workflows.activity()
async def process_item(item_id: int, value: str) -> dict:
# Process individual item
return {"processed_value": f"processed_{value}"}
@workflows.activity()
async def get_item_by_index(params: workflows.GetItemFromIndexParams) -> dict:
# Get item by index
return {
"item_id": params.idx,
"value": f"item_{params.idx}",
"extra_data": params.extra_params
}
# Execute with offset pagination
results = await workflows.execute_activities_in_parallel(
activity=process_item,
get_item_from_index_activity=get_item_by_index,
n_items=1000, # Total number of items
max_concurrent_executions_per_worker=50, # Optional: controls how many items are processed together
max_concurrent_scheduled_tasks=100, # Optional: limit concurrent activity executions
extra_params={"batch_id": "daily_processing"} # Optional: extra parameters
)Options de configuration
| Paramètre | Description | Par défaut |
|---|---|---|
n_items | Nombre total d'éléments à traiter | Obligatoire |
max_concurrent_scheduled_tasks | Nombre maximum d'exécutions d'activités concurrentes pouvant être planifiées simultanément. Limite le nombre d'activités en attente d'exécution à un instant donné. | 100 |
max_concurrent_executions_per_worker | Contrôle le nombre d'éléments traités ensemble dans une seule exécution d'activité. Utilisé uniquement par Offset Pagination Executor. | 100 |
extra_params | Paramètres supplémentaires à transmettre à l'activité | None |
Remarque : les éléments traités ensemble sont encapsulés dans une seule activité. Si un élément échoue, le groupe entier est rejoué ensemble.
Bonnes pratiques pour Offset Pagination Executor
Comprendre les interactions entre paramètres
Offset Pagination Executor utilise deux paramètres clés :
max_concurrent_executions_per_worker: contrôle le nombre d'éléments traités ensemble dans une seule exécution d'activité.max_concurrent_scheduled_tasks: contrôle le nombre de ces exécutions d'activités pouvant s'exécuter en parallèle.
Exemple : si vous avez 1000 éléments avec max_concurrent_executions_per_worker=10 et max_concurrent_scheduled_tasks=5, le système va :
- Traiter les éléments par groupes de 10
- Exécuter jusqu'à 5 groupes en parallèle
- Traiter tous les éléments de manière efficace tout en maintenant des tailles de groupe gérables
Considérations sur le comportement de retry
Important : tous les éléments traités ensemble sont encapsulés dans une seule activité. Cela signifie :
- Avantages : traitement efficace, surcharge réduite
- Inconvénients : si un seul élément d'un groupe échoue, le groupe entier est rejoué
Bonnes pratiques :
- Choisissez des valeurs
max_concurrent_executions_per_workerqui équilibrent efficacité et granularité de retry - Valeurs plus petites : retries plus granulaires, mais surcharge plus élevée
- Valeurs plus grandes : plus efficaces, mais périmètre de retry plus large
- Tenez compte des taux d'échec des éléments lors du choix des valeurs
Stratégies d'optimisation des performances
-
Pour les tâches limitées par les I/O (appels d'API, requêtes de base de données) :
- Utilisez des groupes plus grands (50-100 éléments)
max_concurrent_scheduled_tasksplus élevé (100-200)- Exemple :
max_concurrent_executions_per_worker=50, max_concurrent_scheduled_tasks=150
-
Pour les tâches limitées par le CPU :
- Utilisez des groupes plus petits (5-20 éléments)
max_concurrent_scheduled_tasksplus bas (20-50)- Exemple :
max_concurrent_executions_per_worker=10, max_concurrent_scheduled_tasks=30
-
Pour les charges mixtes :
- Groupes moyens (20-50 éléments)
- Concurrence équilibrée (50-100)
- Exemple :
max_concurrent_executions_per_worker=25, max_concurrent_scheduled_tasks=75
Considérations sur la mémoire et les ressources
- La taille des groupes impacte l'utilisation de la mémoire : des groupes plus grands consomment plus de mémoire par activité
- La concurrence impacte la contention des ressources : une concurrence plus élevée peut mener à l'épuisement des ressources
- Limites de taille : n'oubliez pas la limite de 2 Mo d'input/output par activité
Fonctionnalités avancées
Gestion d'erreurs
Le framework de concurrence gère automatiquement :
- Les échecs d'activités avec mécanismes de retry
- Les erreurs de validation de type
- La continuation du workflow pour les grands ensembles de données
- Le suivi de progression et la gestion d'état
Optimisation des performances
Limites de concurrence
Ajustez la concurrence en fonction de votre charge de travail :
# For I/O-bound tasks (API calls, database queries)
results = await workflows.execute_activities_in_parallel(
activity=api_call_activity,
items=items,
max_concurrent_scheduled_tasks=200 # Higher concurrency for I/O-bound
)
# For CPU-bound tasks
results = await workflows.execute_activities_in_parallel(
activity=cpu_intensive_activity,
items=items,
max_concurrent_scheduled_tasks=50 # Lower concurrency for CPU-bound
)Batch
Pour les très grands ensembles de données, le framework gère automatiquement le continue-as-new :
# Process 100,000 items with List Executor - automatically continues as new workflow
results = await workflows.execute_activities_in_parallel(
activity=process_item,
items=large_item_list, # 100,000 items
max_concurrent_scheduled_tasks=100
)
# Process 100,000 items with Offset Pagination Executor
results = await workflows.execute_activities_in_parallel(
activity=process_item,
get_item_from_index_activity=get_item_by_index,
n_items=100000,
max_concurrent_executions_per_worker=100, # Process 100 items together per activity
max_concurrent_scheduled_tasks=50 # Execute 50 activities concurrently
)Exemples concrets
Pipeline de traitement de documents
Extraire du texte depuis un ensemble de documents en parallèle au moyen d'un service OCR. Chaque document est traité comme une activité distincte, de sorte que les échecs sont isolés — une seule extraction échouée ne bloque pas le reste.
@workflows.activity()
async def extract_text(document_id: str, content: str, metadata: dict) -> dict:
# Use OCR or text extraction
extracted_text = await ocr_service.extract(content)
return {
"document_id": document_id,
"extracted_text": extracted_text,
"analysis_results": {}
}
# Process all documents in parallel
documents = await fetch_documents_from_storage()
results = await workflows.execute_activities_in_parallel(
activity=extract_text,
items=documents
)Transformation de données par lots
Enrichir des enregistrements utilisateur en récupérant des données depuis plusieurs services externes. Le paramètre max_concurrent_scheduled_tasks plafonne le nombre d'activités s'exécutant simultanément, vous évitant ainsi de surcharger les services en aval.
@workflows.activity()
async def enrich_user_data(user_id: str, name: str, email: str) -> dict:
# Fetch additional data from external services
profile_score = await analytics_service.calculate_score(user_id)
recommendations = await recommendation_service.get_recommendations(user_id)
return {
"user_id": user_id,
"name": name,
"email": email,
"profile_score": profile_score,
"recommendations": recommendations
}
# Enrich all users in parallel
users = await database.get_all_users()
enriched_users = await workflows.execute_activities_in_parallel(
activity=enrich_user_data,
items=users,
max_concurrent_scheduled_tasks=50
)Orchestration multi-services
Traiter les commandes en attente en coordonnant les services de paiement, d'inventaire et de livraison. Chaque commande est gérée comme sa propre activité, et au sein de chaque activité les appels de services s'exécutent en parallèle.
@workflows.activity()
async def process_order(order_id: str, customer_id: str, items: list[str]) -> dict:
# Call multiple services in parallel
payment_result = await payment_service.process(order_id)
inventory_result = await inventory_service.reserve(items)
shipping_result = await shipping_service.schedule(order_id)
return {
"order_id": order_id,
"status": "processed",
"fulfillment_details": {
"payment": payment_result,
"inventory": inventory_result,
"shipping": shipping_result
}
}
# Process all orders in parallel
orders = await fetch_pending_orders()
results = await workflows.execute_activities_in_parallel(
activity=process_order,
items=orders
)Considérations sur les performances
Utilisation de la mémoire
- Les grands ensembles de données sont traités efficacement pour éviter les problèmes de mémoire
- Le mécanisme de continue-as-new garantit que l'état du workflow reste gérable
- Chaque exécution d'activité est isolée avec sa propre empreinte mémoire
Optimisation des I/O réseau
- Les activités peuvent être exécutées sur des workers proches des sources de données
- Utilisez
sticky_to_worker=Truepour les activités qui bénéficient de la localité - Configurez des limites de concurrence appropriées en fonction de la bande passante réseau
Reprise après erreur
- Les activités échouées sont automatiquement rejouées selon leur stratégie de retry
- Le framework maintient l'état de progression, de sorte que seuls les éléments échoués nécessitent un retraitement
Surveillance et alertes
- Configurez des alertes pour les exécutions longues ou échouées
Dépannage
Problèmes courants
Problème : ValueError: 'activity' must be an activity, please decorate it with @workflows.activity
Solution : assurez-vous que votre fonction d'activité est décorée avec @workflows.activity()
Problème : ValueError: Must specify one execution pattern
Solution : fournissez exactement un seul de ces paramètres : items, get_item_from_prev_item_activity, ou get_item_from_index_activity
Problème : retries excessifs avec Offset Pagination Executor
Solution : si vous rencontrez des retries excessifs avec Offset Pagination Executor, envisagez de :
- Réduire
max_concurrent_executions_per_workerpour créer des groupes d'éléments plus petits - Enquêter sur les échecs d'éléments individuels qui pourraient provoquer le retry de groupes entiers
- Ajouter une meilleure gestion d'erreurs au sein de votre activité pour prévenir les échecs
Problème : problèmes de mémoire avec de grands groupes
Solution : si vous rencontrez des problèmes de mémoire :
- Réduisez
max_concurrent_executions_per_workerpour traiter moins d'éléments ensemble - Assurez-vous que les éléments individuels ne sont pas trop volumineux (n'oubliez pas la limite de 2 Mo)
- Surveillez l'utilisation de la mémoire et ajustez les tailles de groupe en conséquence
Conseils de débogage
- Vérifiez les signatures d'activité : assurez-vous que toutes les activités ont des annotations de type appropriées
- Validez les limites de concurrence : commencez avec une concurrence faible et augmentez progressivement
- Utilisez les logs : ajoutez des logs détaillés dans vos activités pour le débogage
- Débogage d'Offset Pagination : pour les problèmes liés à Offset Pagination Executor :
- Commencez avec de petites valeurs (par exemple,
max_concurrent_executions_per_worker=1) - Vérifiez les échecs d'éléments individuels qui pourraient affecter des groupes entiers
- Commencez avec de petites valeurs (par exemple,
Référence d'API
execute_activities_in_parallel()Fonction execute_activities_in_parallel()
async def execute_activities_in_parallel(
activity: Callable[[T], Awaitable[U]],
*,
# List Executor
items: List[T] | None = None,
max_concurrent_scheduled_tasks: int = DEFAULT_MAX_CONCURRENT_SCHEDULED_TASKS,
# Chain Executor
get_item_from_prev_item_activity: Callable[[T | None], Awaitable[T | None]] | None = None,
# Offset Pagination Executor
get_item_from_index_activity: Callable[[GetItemFromIndexParams], Awaitable[T]] | None = None,
n_items: int | None = None,
max_concurrent_executions_per_worker: int = DEFAULT_MAX_CONCURRENT_EXECUTIONS_PER_WORKER,
# Common
extra_params: Dict[str, Any] | None = None,
) -> None | List[U]Paramètres :
activity: fonction d'activité à exécuter sur chaque élémentitems: liste d'éléments à traiter (List Executor)get_item_from_prev_item_activity: fonction pour obtenir l'élément suivant à partir du précédent (Chain Executor)get_item_from_index_activity: fonction pour obtenir un élément par index (Offset Pagination Executor)n_items: nombre total d'éléments (Offset Pagination Executor)max_concurrent_scheduled_tasks: nombre maximum d'exécutions d'activités concurrentes pouvant être planifiées simultanément. S'applique à List Executor et Offset Pagination Executor uniquement.max_concurrent_executions_per_worker: Uniquement pour Offset Pagination Executor - contrôle le nombre d'éléments traités ensemble dans une seule exécution d'activité.extra_params: paramètres supplémentaires à transmettre aux activités
Utilisation des paramètres par executor :
| Executor | max_concurrent_scheduled_tasks | max_concurrent_executions_per_worker |
|---|---|---|
| List Executor | ✅ Oui | ❌ Non |
| Chain Executor | ❌ Non | ❌ Non |
| Offset Pagination Executor | ✅ Oui | ✅ Oui |
Retourne :
Nonesi l'activité retourne NoneList[U]liste des résultats d'activités
Lève :
ValueError: si l'activité n'est pas correctement décorée ou si les paramètres sont invalides