Événements en streaming

Diffusez des événements en temps réel depuis vos workflows et activités pour alimenter des interfaces utilisateur en direct, afficher des indicateurs de progression et recevoir des réponses LLM jeton par jeton.

Cette page explique comment publier des événements en streaming depuis le code d’un workflow. Pour savoir comment les consommer côté client, voir Consommer des événements en streaming.

Guide de démarrage

Guide de démarrage

1. Publier depuis une activité

1. Publier depuis une activité

Dans une activité, utilisez le context manager Task pour publier des événements. Un objet Task représente un flux nommé de mises à jour d’état auquel des consommateurs externes peuvent s’abonner. Le paramètre type est un nom de sujet que vous choisissez (par exemple "token-stream"), et state correspond à la charge utile initiale. Chaque appel à update_state() publie le nouvel état comme événement à tous les abonnés en temps réel.

import mistralai.workflows as workflows
from mistralai.workflows.core.task import Task

@workflows.activity()
async def chat_activity(messages: list) -> dict:
    initial_state = {"tokens": []}

    async with Task(type="token-stream", state=initial_state) as task:

        async for chunk in llm.stream(messages):
            token = chunk.choices[0].delta.content
            if token:
                await task.update_state({"tokens": task.state["tokens"] + [token]})

    return {"response": "".join(task.state["tokens"])}
Astuce

Événement terminal à la fermeture : lorsque le bloc async with se termine, la tâche est automatiquement marquée comme complétée et un événement terminal est publié à tous les abonnés. Il n’est pas nécessaire d’appeler explicitement une méthode close() ou complete().

2. Consommer des événements

2. Consommer des événements

Voir Consommer des événements en streaming pour plus de détails sur l’abonnement aux événements.

Modèles de publication

Modèles de publication

Streaming de jetons (LLM)

Streaming de jetons (LLM)

Le modèle le plus courant : diffuser les jetons au fur et à mesure qu’ils sont générés :

@workflows.activity()
async def chat_activity(messages: list) -> dict:
    initial_state = {"tokens": []}

    async with Task(type="token-stream", state=initial_state) as task:

        async for chunk in llm.stream(messages):
            token = chunk.choices[0].delta.content
            if token:
                await task.update_state({"tokens": task.state["tokens"] + [token]})

    return {"response": "".join(task.state["tokens"])}
Mises à jour de progression

Mises à jour de progression

Signalez la progression lors d’opérations longues :

@workflows.activity(name="Processing text with explicit control")
async def streaming_tokens_with_progress_activity(text: str) -> dict:
    """
    Exemple de suivi explicite de la progression avec l’API Task.

    Ce modèle vous donne un contrôle total sur les mises à jour d’état et le suivi de la progression.
    """
    words = text.split()
    initial_state = {"processed_words": [], "progress_idx": 0, "progress_total": len(words)}

    async with Task(type="progress-stream", state=initial_state) as task:
        state = task.state
        for i, word in enumerate(words):
            await task.update_state({
                "processed_words": state["processed_words"] + [word],
                "progress_idx": i + 1,
            })
            state = task.state
            await asyncio.sleep(0.1)

    final_state = task.state
    return {"processed_text": " ".join(final_state["processed_words"]), "token_count": len(words)}
Noms de types de tâches

Noms de types de tâches

Le paramètre type est un nom de sujet que vous définissez. Choisissez des noms qui décrivent le contenu du flux pour que les consommateurs puissent s’abonner selon leur besoin.

RecommandéÀ éviterPourquoi
tokendatadata n’indique pas le contenu du flux
progressStream1Les noms numérotés deviennent ambigus si l’ordre change
search_resultmyStreamLa casse mixte n’est pas cohérente avec le reste du SDK
  • Utilisez des minuscules avec underscores
  • Gardez des noms courts et explicites
  • Utilisez une convention cohérente dans toute votre application
Limites des charges utiles

Limites des charges utiles

Les messages d’événements en streaming sont limités à 1 Mo, indépendamment de la limite de 2 Mo pour les entrées/sorties d’activité. Pour des charges plus volumineuses :

# À éviter : ne diffusez pas de charges volumineuses
def bad_example():
    # await task.update_state(large_document)

# Bon : stockez les données volumineuses ailleurs et diffusez une référence
url = await storage.upload(large_document)
await task.update_state({"url": url, "size": len(large_document)})
Schéma d’événement

Schéma d’événement

Chaque événement publié est enrichi de son contexte :

{
    "stream": "token",                          # Nom du flux ou sujet
    "broker_sequence": 42,                      # Numéro de séquence monotone
    "timestamp": "2026-04-15T10:30:00.000Z",    # Horodatage ISO
    "data": {                                   # Réponse typée (union discriminée)
        "event_type": "CUSTOM_TASK_IN_PROGRESS",
        "event_id": "evt_abc123",
        "event_timestamp": "2026-04-15T10:30:00.000Z",
        "workflow_name": "my-workflow",
        "workflow_exec_id": "abc123",
        "root_workflow_exec_id": "abc123",
        "parent_workflow_exec_id": null,
        "workflow_run_id": "run_456",
        "attributes": {"progress_idx": 4, "progress_total": 10}
    },
    "workflow_context": {
        "workflow_name": "my-workflow",
        "workflow_exec_id": "abc123",
        "parent_workflow_exec_id": null
    }
}
Garantie de séquence

Garantie de séquence

broker_sequence fournit une garantie stricte sur l’ordre :

  • Ordonné : les événements sont toujours livrés dans l’ordre croissant — vous ne recevrez jamais un numéro de séquence inférieur après un supérieur.
  • Unique : un numéro de séquence n’est jamais réutilisé, même après expiration des événements.
  • Reprise sûre : se reconnecter avec start_seq=N reprend exactement à partir de N : aucun événement n’est omis, aucun événement antérieur à N n’est renvoyé.
  • Durable : la garantie s’applique même lors des redémarrages du service ou lors de reconnexions. Si le flux a été interrompu en cours d’exécution, les événements historiques sont rejoués dans l’ordre avant la reprise des nouveaux événements.

Utilisez event.broker_sequence + 1 en tant que start_seq lors d’une reconnexion — consultez Consommer des événements en streaming pour le schéma complet de reprise.

Prochaine étape

Prochaine étape

Consommer des événements côté client : abonnez-vous au flux que vous venez de publier, depuis le SDK ou un endpoint HTTP brut.