Consommer les événements en streaming
Lorsqu’un workflow est en cours d’exécution, il émet des événements : démarrage de l’exécution, activité terminée, tokens générés. Consommer ces événements signifie que votre code écoute ce flux et réagit à chaque événement en temps réel. Vous disposez de deux méthodes :
- Client SDK (recommandé) : modèles typés Pydantic, gestion automatique de la reconnexion, prise en charge du filtrage et de la reprise à partir d’une séquence précise. Utilisez-le dans votre code applicatif.
- Points de terminaison HTTP bruts : deux routes SSE —
/v1/workflows/executions/{execution_id}/stream(abonnement à une seule exécution) et/v1/workflows/events/stream(filtrage croisé sur plusieurs exécutions). Utilisez cette méthode si vous n’êtes pas sous Python ou si vous devez consommer directement le flux SSE brut. Consultez la section Routes API ci-dessous.
Utiliser le client API Workflows
Consommation basique
import os
from mistralai.client import Mistral
client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])
# Start a workflow
execution = client.workflows.execute_workflow(
workflow_identifier="my-workflow",
input=params,
)
# Stream all events for this execution
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution.execution_id,
):
print(f"[{event.stream}] {event.data}")Détecter la fin de l’exécution
Utilisez les types d’événements du SDK pour savoir quand un workflow s’achève :
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution_id,
):
if event.data is None:
continue
event_type = event.data.event_type
if event_type == "WORKFLOW_EXECUTION_COMPLETED":
print("Done!")
break
elif event_type in ("WORKFLOW_EXECUTION_FAILED", "WORKFLOW_EXECUTION_CANCELED"):
print(f"Ended: {event_type}")
break
else:
print(f"Event: {event_type}")Reprise depuis une séquence
Si la connexion est interrompue, reprenez là où vous vous étiez arrêté à l’aide de start_seq. broker_sequence est garanti strictement croissant et jamais réutilisé — voir Garantie de la séquence pour plus de détails. En passant start_seq=N, la livraison reprend à partir de la séquence N incluse, donc avancez après la dernière séquence traitée afin d’éviter de recevoir à nouveau le même événement.
last_seq = 0
while True:
try:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution_id,
start_seq=last_seq,
):
last_seq = event.broker_sequence + 1 # advance past this event
process(event)
break # Completed normally
except ConnectionError:
await asyncio.sleep(1)
# Loop will resume from last_seqRoutes API
La plateforme expose deux endpoints SSE pour consommer les événements. Choisissez selon que vous connaissez déjà ou non l’exécution à laquelle vous souhaitez vous abonner.
Flux par exécution
GET /v1/workflows/executions/{execution_id}/streamAbonnez-vous à l’exécution d’un workflow par son ID. Permet la reprise depuis une position SSE précédente.
| Paramètre | Description |
|---|---|
event_source | Filtrage facultatif par source de l’événement (ex : workflow, activity). Voir Types d’événements pour les valeurs possibles. |
last_event_id | Reprendre après un numéro de séquence spécifique (équivaut à start_seq). Peut aussi être fourni en header HTTP Last-Event-ID — mécanisme standard de reprise SSE. |
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
"https://api.mistral.ai/v1/workflows/executions/${EXEC_ID}/stream"Flux d’événements filtré
GET /v1/workflows/events/streamAbonnez-vous en filtrant sur plusieurs exécutions — utile pour des tableaux de bord, des outils d’observabilité ou pour consommer simultanément un workflow parent et tous ses descendants dans un même flux.
| Paramètre | Description | Exemple |
|---|---|---|
scope | workflow, activity ou * | ?scope=activity |
stream | Filtrer selon le nom du flux | ?stream=token |
workflow_name | Filtrer sur le nom du workflow | ?workflow_name=my-workflow |
workflow_exec_id | Filtrer par ID d’exécution | ?workflow_exec_id=exec789 |
root_workflow_exec_id | Filtrer par ID d’exécution racine | ?root_workflow_exec_id=root123 |
parent_workflow_exec_id | Filtrer par ID d’exécution parent | ?parent_workflow_exec_id=parent456 |
activity_name | Filtrer sur le nom de l’activité | ?activity_name=chat_activity |
activity_id | Filtrer sur l’ID d’exécution d’activité | ?activity_id=activity123 |
start_seq | Reprendre à partir d’une séquence précise | ?start_seq=42 |
metadata_filters | Filtrer par métadonnées (objet JSON) | ?metadata_filters={"key":"v"} |
workflow_event_types | Filtrer par type d’événement workflow | ?workflow_event_types=WORKFLOW_EXECUTION_COMPLETED |
# Token-stream events for a specific execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
"https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&stream=token"
# Only activity events for an execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
"https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&scope=activity"
# Only workflow-level events for an execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
"https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&scope=workflow"Réponse (Server-Sent Events) :
data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":1,"workflow_context":{...}}
data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":2,"workflow_context":{...}}
data: {"stream":"token","data":{"event_type":"WORKFLOW_EXECUTION_COMPLETED",...},"broker_sequence":3,"workflow_context":{...}}Consommer les événements issus d’arbres de workflows
Quand un workflow génère des sous-workflows, chaque événement contient l’objet workflow_context avec workflow_exec_id, parent_workflow_exec_id et root_workflow_exec_id. Le workflow parent utilise execute_workflow pour lancer un enfant :
@workflows.workflow.define(name="parent-workflow")
class ParentWorkflow:
@workflows.workflow.entrypoint
async def run(self, params):
# Child workflow events will have parent_workflow_exec_id set to this execution
result = await workflows.workflow.execute_workflow(
ChildWorkflow,
params,
)
return resultPour recevoir les événements d’un workflow parent et de tous ses descendants dans un même flux, abonnez-vous en utilisant root_workflow_exec_id :
# Get events from parent AND all children
async for event in client.workflows.events.get_stream_events(
root_workflow_exec_id=parent_execution_id,
):
if event.workflow_context.parent_workflow_exec_id:
print(f"[child] {event.data}")
else:
print(f"[parent] {event.data}")Types d’événements
Le SDK fournit des réponses typées, distinguées selon event_type. Chaque événement correspond à un modèle Pydantic (par ex. WorkflowExecutionCompletedResponse, CustomTaskInProgressResponse) avec des champs tels que event_id, event_timestamp, workflow_exec_id, workflow_name et des attributes spécifiques au type d’événement.
Pour la liste complète des types d’événements workflow et activité, consultez Concepts clés > Événements > Types d’événements. L’API de streaming émet également des événements CUSTOM_TASK_* générés dans les activités (démarré, en cours, terminé, échec, timeout, annulé). Utilisez ces événements pour publier l’avancement depuis des activités de longue durée ; consultez la section Streaming pour le détail de la publication.
Voici les types d’événements que vous rencontrerez le plus souvent :
| Type d’événement | Description |
|---|---|
WORKFLOW_EXECUTION_STARTED | Le workflow a démarré |
WORKFLOW_EXECUTION_COMPLETED | Workflow terminé avec succès |
WORKFLOW_EXECUTION_FAILED | Workflow en échec |
WORKFLOW_EXECUTION_CANCELED | Workflow annulé |
WORKFLOW_EXECUTION_CONTINUED_AS_NEW | Workflow repris sous un nouvel ID |
WORKFLOW_TASK_TIMED_OUT | Tâche du workflow expirée |
WORKFLOW_TASK_FAILED | Tâche du workflow en échec |
CUSTOM_TASK_STARTED | Tâche personnalisée démarrée |
CUSTOM_TASK_IN_PROGRESS | Mise à jour de l’avancement |
CUSTOM_TASK_COMPLETED | Tâche personnalisée terminée |
CUSTOM_TASK_FAILED | Tâche personnalisée en échec |
CUSTOM_TASK_TIMED_OUT | Tâche personnalisée expirée |
CUSTOM_TASK_CANCELED | Tâche personnalisée annulée |
ACTIVITY_TASK_STARTED | Activité démarrée |
ACTIVITY_TASK_COMPLETED | Activité terminée |
ACTIVITY_TASK_RETRYING | Nouvelle tentative d’activité |
ACTIVITY_TASK_FAILED | Activité en échec |
Exemple d’événement
{
"stream": "token",
"broker_sequence": 1,
"timestamp": "2025-01-15T10:30:00Z",
"data": {
"event_type": "WORKFLOW_EXECUTION_STARTED",
"event_id": "evt_abc123",
"event_timestamp": 1736938200000000000,
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"root_workflow_exec_id": "abc123",
"parent_workflow_exec_id": null,
"workflow_run_id": "run_456",
"attributes": {}
},
"workflow_context": {
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"parent_workflow_exec_id": null
}
}Bonnes pratiques
Toujours gérer les déconnexions
Les connexions streaming peuvent être coupées à tout moment. Utilisez start_seq pour reprendre après le dernier événement reçu, et implémentez une logique de backoff exponentiel pour ne pas saturer le serveur. Le test is_terminal_status() ci-dessous est votre propre helper — vérifiez le type de l’événement par rapport aux statuts terminaux comme WORKFLOW_EXECUTION_COMPLETED, WORKFLOW_EXECUTION_FAILED ou WORKFLOW_EXECUTION_CANCELED.
async def resilient_consume(client, exec_id):
max_retries = 10
last_seq = 0
last_seq = 0
for attempt in range(max_retries):
try:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id,
start_seq=last_seq,
):
last_seq = event.broker_sequence + 1 # advance past this event
yield event
if is_terminal_status(event):
return
except ConnectionError:
await asyncio.sleep(min(2 ** attempt, 30))Filtrer le plus tôt possible
Filtrez dès l’abonnement, pas dans votre code :
# Good: Filter at source
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id, stream="token"
):
process(event)
# Bad: Filter in code (wastes bandwidth)
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id
):
if event.stream == "token":
process(event)Utilisez le scope approprié
Si seuls les événements d’activité vous intéressent :
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id, scope="activity"
):
if event.data is not None:
# Only activity events, no workflow-level events
process(event.data)