Consommer les événements en streaming

Lorsqu’un workflow est en cours d’exécution, il émet des événements : démarrage de l’exécution, activité terminée, tokens générés. Consommer ces événements signifie que votre code écoute ce flux et réagit à chaque événement en temps réel. Vous disposez de deux méthodes :

  • Client SDK (recommandé) : modèles typés Pydantic, gestion automatique de la reconnexion, prise en charge du filtrage et de la reprise à partir d’une séquence précise. Utilisez-le dans votre code applicatif.
  • Points de terminaison HTTP bruts : deux routes SSE — /v1/workflows/executions/{execution_id}/stream (abonnement à une seule exécution) et /v1/workflows/events/stream (filtrage croisé sur plusieurs exécutions). Utilisez cette méthode si vous n’êtes pas sous Python ou si vous devez consommer directement le flux SSE brut. Consultez la section Routes API ci-dessous.
Utiliser le client API Workflows

Utiliser le client API Workflows

Consommation basique

Consommation basique

import os
from mistralai.client import Mistral

client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])

# Start a workflow
execution = client.workflows.execute_workflow(
    workflow_identifier="my-workflow",
    input=params,
)

# Stream all events for this execution
async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution.execution_id,
):
    print(f"[{event.stream}] {event.data}")
Détecter la fin de l’exécution

Détecter la fin de l’exécution

Utilisez les types d’événements du SDK pour savoir quand un workflow s’achève :

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution_id,
):
    if event.data is None:
        continue

    event_type = event.data.event_type

    if event_type == "WORKFLOW_EXECUTION_COMPLETED":
        print("Done!")
        break
    elif event_type in ("WORKFLOW_EXECUTION_FAILED", "WORKFLOW_EXECUTION_CANCELED"):
        print(f"Ended: {event_type}")
        break
    else:
        print(f"Event: {event_type}")
Reprise depuis une séquence

Reprise depuis une séquence

Si la connexion est interrompue, reprenez là où vous vous étiez arrêté à l’aide de start_seq. broker_sequence est garanti strictement croissant et jamais réutilisé — voir Garantie de la séquence pour plus de détails. En passant start_seq=N, la livraison reprend à partir de la séquence N incluse, donc avancez après la dernière séquence traitée afin d’éviter de recevoir à nouveau le même événement.

last_seq = 0
while True:
    try:
        async for event in client.workflows.events.get_stream_events(
            workflow_exec_id=execution_id,
            start_seq=last_seq,
        ):
            last_seq = event.broker_sequence + 1  # advance past this event
            process(event)
        break  # Completed normally
    except ConnectionError:
        await asyncio.sleep(1)
        # Loop will resume from last_seq
Routes API

Routes API

La plateforme expose deux endpoints SSE pour consommer les événements. Choisissez selon que vous connaissez déjà ou non l’exécution à laquelle vous souhaitez vous abonner.

Flux par exécution

Flux par exécution

GET /v1/workflows/executions/{execution_id}/stream

Abonnez-vous à l’exécution d’un workflow par son ID. Permet la reprise depuis une position SSE précédente.

ParamètreDescription
event_sourceFiltrage facultatif par source de l’événement (ex : workflow, activity). Voir Types d’événements pour les valeurs possibles.
last_event_idReprendre après un numéro de séquence spécifique (équivaut à start_seq). Peut aussi être fourni en header HTTP Last-Event-ID — mécanisme standard de reprise SSE.
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
  "https://api.mistral.ai/v1/workflows/executions/${EXEC_ID}/stream"
Flux d’événements filtré

Flux d’événements filtré

GET /v1/workflows/events/stream

Abonnez-vous en filtrant sur plusieurs exécutions — utile pour des tableaux de bord, des outils d’observabilité ou pour consommer simultanément un workflow parent et tous ses descendants dans un même flux.

ParamètreDescriptionExemple
scopeworkflow, activity ou *?scope=activity
streamFiltrer selon le nom du flux?stream=token
workflow_nameFiltrer sur le nom du workflow?workflow_name=my-workflow
workflow_exec_idFiltrer par ID d’exécution?workflow_exec_id=exec789
root_workflow_exec_idFiltrer par ID d’exécution racine?root_workflow_exec_id=root123
parent_workflow_exec_idFiltrer par ID d’exécution parent?parent_workflow_exec_id=parent456
activity_nameFiltrer sur le nom de l’activité?activity_name=chat_activity
activity_idFiltrer sur l’ID d’exécution d’activité?activity_id=activity123
start_seqReprendre à partir d’une séquence précise?start_seq=42
metadata_filtersFiltrer par métadonnées (objet JSON)?metadata_filters={"key":"v"}
workflow_event_typesFiltrer par type d’événement workflow?workflow_event_types=WORKFLOW_EXECUTION_COMPLETED
# Token-stream events for a specific execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
  "https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&stream=token"

# Only activity events for an execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
  "https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&scope=activity"

# Only workflow-level events for an execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
  "https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&scope=workflow"

Réponse (Server-Sent Events) :

data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":1,"workflow_context":{...}}

data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":2,"workflow_context":{...}}

data: {"stream":"token","data":{"event_type":"WORKFLOW_EXECUTION_COMPLETED",...},"broker_sequence":3,"workflow_context":{...}}
Consommer les événements issus d’arbres de workflows

Consommer les événements issus d’arbres de workflows

Quand un workflow génère des sous-workflows, chaque événement contient l’objet workflow_context avec workflow_exec_id, parent_workflow_exec_id et root_workflow_exec_id. Le workflow parent utilise execute_workflow pour lancer un enfant :

@workflows.workflow.define(name="parent-workflow")
class ParentWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params):
        # Child workflow events will have parent_workflow_exec_id set to this execution
        result = await workflows.workflow.execute_workflow(
            ChildWorkflow,
            params,
        )
        return result

Pour recevoir les événements d’un workflow parent et de tous ses descendants dans un même flux, abonnez-vous en utilisant root_workflow_exec_id :

# Get events from parent AND all children
async for event in client.workflows.events.get_stream_events(
    root_workflow_exec_id=parent_execution_id,
):
    if event.workflow_context.parent_workflow_exec_id:
        print(f"[child] {event.data}")
    else:
        print(f"[parent] {event.data}")
Types d’événements

Types d’événements

Le SDK fournit des réponses typées, distinguées selon event_type. Chaque événement correspond à un modèle Pydantic (par ex. WorkflowExecutionCompletedResponse, CustomTaskInProgressResponse) avec des champs tels que event_id, event_timestamp, workflow_exec_id, workflow_name et des attributes spécifiques au type d’événement.

Pour la liste complète des types d’événements workflow et activité, consultez Concepts clés > Événements > Types d’événements. L’API de streaming émet également des événements CUSTOM_TASK_* générés dans les activités (démarré, en cours, terminé, échec, timeout, annulé). Utilisez ces événements pour publier l’avancement depuis des activités de longue durée ; consultez la section Streaming pour le détail de la publication.

Voici les types d’événements que vous rencontrerez le plus souvent :

Type d’événementDescription
WORKFLOW_EXECUTION_STARTEDLe workflow a démarré
WORKFLOW_EXECUTION_COMPLETEDWorkflow terminé avec succès
WORKFLOW_EXECUTION_FAILEDWorkflow en échec
WORKFLOW_EXECUTION_CANCELEDWorkflow annulé
WORKFLOW_EXECUTION_CONTINUED_AS_NEWWorkflow repris sous un nouvel ID
WORKFLOW_TASK_TIMED_OUTTâche du workflow expirée
WORKFLOW_TASK_FAILEDTâche du workflow en échec
CUSTOM_TASK_STARTEDTâche personnalisée démarrée
CUSTOM_TASK_IN_PROGRESSMise à jour de l’avancement
CUSTOM_TASK_COMPLETEDTâche personnalisée terminée
CUSTOM_TASK_FAILEDTâche personnalisée en échec
CUSTOM_TASK_TIMED_OUTTâche personnalisée expirée
CUSTOM_TASK_CANCELEDTâche personnalisée annulée
ACTIVITY_TASK_STARTEDActivité démarrée
ACTIVITY_TASK_COMPLETEDActivité terminée
ACTIVITY_TASK_RETRYINGNouvelle tentative d’activité
ACTIVITY_TASK_FAILEDActivité en échec
Exemple d’événement

Exemple d’événement

{
  "stream": "token",
  "broker_sequence": 1,
  "timestamp": "2025-01-15T10:30:00Z",
  "data": {
    "event_type": "WORKFLOW_EXECUTION_STARTED",
    "event_id": "evt_abc123",
    "event_timestamp": 1736938200000000000,
    "workflow_name": "my-workflow",
    "workflow_exec_id": "abc123",
    "root_workflow_exec_id": "abc123",
    "parent_workflow_exec_id": null,
    "workflow_run_id": "run_456",
    "attributes": {}
  },
  "workflow_context": {
    "workflow_name": "my-workflow",
    "workflow_exec_id": "abc123",
    "parent_workflow_exec_id": null
  }
}
Bonnes pratiques

Bonnes pratiques

Toujours gérer les déconnexions

Toujours gérer les déconnexions

Les connexions streaming peuvent être coupées à tout moment. Utilisez start_seq pour reprendre après le dernier événement reçu, et implémentez une logique de backoff exponentiel pour ne pas saturer le serveur. Le test is_terminal_status() ci-dessous est votre propre helper — vérifiez le type de l’événement par rapport aux statuts terminaux comme WORKFLOW_EXECUTION_COMPLETED, WORKFLOW_EXECUTION_FAILED ou WORKFLOW_EXECUTION_CANCELED.

async def resilient_consume(client, exec_id):
    max_retries = 10
    last_seq = 0

    last_seq = 0
    for attempt in range(max_retries):
        try:
            async for event in client.workflows.events.get_stream_events(
                workflow_exec_id=exec_id,
                start_seq=last_seq,
            ):
                last_seq = event.broker_sequence + 1  # advance past this event
                yield event

                if is_terminal_status(event):
                    return
        except ConnectionError:
            await asyncio.sleep(min(2 ** attempt, 30))
Filtrer le plus tôt possible

Filtrer le plus tôt possible

Filtrez dès l’abonnement, pas dans votre code :

# Good: Filter at source
async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=exec_id, stream="token"
):
    process(event)

# Bad: Filter in code (wastes bandwidth)
async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=exec_id
):
    if event.stream == "token":
        process(event)
Utilisez le scope approprié

Utilisez le scope approprié

Si seuls les événements d’activité vous intéressent :

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=exec_id, scope="activity"
):
    if event.data is not None:
        # Only activity events, no workflow-level events
        process(event.data)