Événements en streaming
Diffusez des événements en temps réel depuis vos workflows et activités pour alimenter des interfaces utilisateur en direct, afficher des indicateurs de progression et recevoir des réponses LLM jeton par jeton.
Cette page explique comment publier des événements en streaming depuis le code d’un workflow. Pour savoir comment les consommer côté client, voir Consommer des événements en streaming.
Guide de démarrage
1. Publier depuis une activité
Dans une activité, utilisez le context manager Task pour publier des événements. Un objet Task représente un flux nommé de mises à jour d’état auquel des consommateurs externes peuvent s’abonner. Le paramètre type est un nom de sujet que vous choisissez (par exemple "token-stream"), et state correspond à la charge utile initiale. Chaque appel à update_state() publie le nouvel état comme événement à tous les abonnés en temps réel.
import mistralai.workflows as workflows
from mistralai.workflows.core.task import Task
@workflows.activity()
async def chat_activity(messages: list) -> dict:
initial_state = {"tokens": []}
async with Task(type="token-stream", state=initial_state) as task:
async for chunk in llm.stream(messages):
token = chunk.choices[0].delta.content
if token:
await task.update_state({"tokens": task.state["tokens"] + [token]})
return {"response": "".join(task.state["tokens"])}Événement terminal à la fermeture : lorsque le bloc async with se termine, la tâche est automatiquement marquée comme complétée et un événement terminal est publié à tous les abonnés. Il n’est pas nécessaire d’appeler explicitement une méthode close() ou complete().
2. Consommer des événements
Voir Consommer des événements en streaming pour plus de détails sur l’abonnement aux événements.
Modèles de publication
Streaming de jetons (LLM)
Le modèle le plus courant : diffuser les jetons au fur et à mesure qu’ils sont générés :
@workflows.activity()
async def chat_activity(messages: list) -> dict:
initial_state = {"tokens": []}
async with Task(type="token-stream", state=initial_state) as task:
async for chunk in llm.stream(messages):
token = chunk.choices[0].delta.content
if token:
await task.update_state({"tokens": task.state["tokens"] + [token]})
return {"response": "".join(task.state["tokens"])}Mises à jour de progression
Signalez la progression lors d’opérations longues :
@workflows.activity(name="Processing text with explicit control")
async def streaming_tokens_with_progress_activity(text: str) -> dict:
"""
Exemple de suivi explicite de la progression avec l’API Task.
Ce modèle vous donne un contrôle total sur les mises à jour d’état et le suivi de la progression.
"""
words = text.split()
initial_state = {"processed_words": [], "progress_idx": 0, "progress_total": len(words)}
async with Task(type="progress-stream", state=initial_state) as task:
state = task.state
for i, word in enumerate(words):
await task.update_state({
"processed_words": state["processed_words"] + [word],
"progress_idx": i + 1,
})
state = task.state
await asyncio.sleep(0.1)
final_state = task.state
return {"processed_text": " ".join(final_state["processed_words"]), "token_count": len(words)}Noms de types de tâches
Le paramètre type est un nom de sujet que vous définissez. Choisissez des noms qui décrivent le contenu du flux pour que les consommateurs puissent s’abonner selon leur besoin.
| Recommandé | À éviter | Pourquoi |
|---|---|---|
token | data | data n’indique pas le contenu du flux |
progress | Stream1 | Les noms numérotés deviennent ambigus si l’ordre change |
search_result | myStream | La casse mixte n’est pas cohérente avec le reste du SDK |
- Utilisez des minuscules avec underscores
- Gardez des noms courts et explicites
- Utilisez une convention cohérente dans toute votre application
Limites des charges utiles
Les messages d’événements en streaming sont limités à 1 Mo, indépendamment de la limite de 2 Mo pour les entrées/sorties d’activité. Pour des charges plus volumineuses :
# À éviter : ne diffusez pas de charges volumineuses
def bad_example():
# await task.update_state(large_document)
# Bon : stockez les données volumineuses ailleurs et diffusez une référence
url = await storage.upload(large_document)
await task.update_state({"url": url, "size": len(large_document)})Schéma d’événement
Chaque événement publié est enrichi de son contexte :
{
"stream": "token", # Nom du flux ou sujet
"broker_sequence": 42, # Numéro de séquence monotone
"timestamp": "2026-04-15T10:30:00.000Z", # Horodatage ISO
"data": { # Réponse typée (union discriminée)
"event_type": "CUSTOM_TASK_IN_PROGRESS",
"event_id": "evt_abc123",
"event_timestamp": "2026-04-15T10:30:00.000Z",
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"root_workflow_exec_id": "abc123",
"parent_workflow_exec_id": null,
"workflow_run_id": "run_456",
"attributes": {"progress_idx": 4, "progress_total": 10}
},
"workflow_context": {
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"parent_workflow_exec_id": null
}
}Garantie de séquence
broker_sequence fournit une garantie stricte sur l’ordre :
- Ordonné : les événements sont toujours livrés dans l’ordre croissant — vous ne recevrez jamais un numéro de séquence inférieur après un supérieur.
- Unique : un numéro de séquence n’est jamais réutilisé, même après expiration des événements.
- Reprise sûre : se reconnecter avec
start_seq=Nreprend exactement à partir de N : aucun événement n’est omis, aucun événement antérieur à N n’est renvoyé. - Durable : la garantie s’applique même lors des redémarrages du service ou lors de reconnexions. Si le flux a été interrompu en cours d’exécution, les événements historiques sont rejoués dans l’ordre avant la reprise des nouveaux événements.
Utilisez event.broker_sequence + 1 en tant que start_seq lors d’une reconnexion — consultez Consommer des événements en streaming pour le schéma complet de reprise.
Prochaine étape
→ Consommer des événements côté client : abonnez-vous au flux que vous venez de publier, depuis le SDK ou un endpoint HTTP brut.