Streaming: Real-Time Events from Workflows
Stream events in real-time from your workflows and activities to power live UIs, progress indicators, and token-by-token LLM responses.
Quick Start
Quick Start
1. Publish from an Activity
import mistralai.workflows as workflows
from mistralai.workflows.core.task import Task
@workflows.activity()
async def chat_activity(messages: list) -> dict:
initial_state = {"tokens": []}
async with Task(type="token-stream", state=initial_state) as task:
async for chunk in llm.stream(messages):
token = chunk.choices[0].delta.content
if token:
await task.update_state({"tokens": task.state["tokens"] + [token]})
return {"response": "".join(task.state["tokens"])}2. Consume Events
See Consuming Streaming Events for details on subscribing to events.
Publishing Patterns
Publishing Patterns
Token Streaming (LLM)
The most common pattern - stream tokens as they're generated:
@workflows.activity()
async def chat_activity(messages: list) -> dict:
initial_state = {"tokens": []}
async with Task(type="token-stream", state=initial_state) as task:
async for chunk in llm.stream(messages):
token = chunk.choices[0].delta.content
if token:
await task.update_state({"tokens": task.state["tokens"] + [token]})
return {"response": "".join(task.state["tokens"])}Progress Updates
Report progress during long operations:
@workflows.activity(name="Processing text with explicit control")
async def streaming_tokens_with_progress_activity(text: str) -> dict:
"""
Example of explicit progress tracking with Task API.
This pattern gives you full control over state updates and progress tracking.
"""
words = text.split()
initial_state = {"processed_words": [], "progress_idx": 0, "progress_total": len(words)}
async with Task(type="progress-stream", state=initial_state) as task:
state = task.state
for i, word in enumerate(words):
await task.update_state({
"processed_words": state["processed_words"] + [word],
"progress_idx": i + 1,
})
state = task.state
await asyncio.sleep(0.1)
final_state = task.state
return {"processed_text": " ".join(final_state["processed_words"]), "token_count": len(words)}Task Type Names
Task Type Names
The type parameter is a topic name you define. Best practices:
| Good | Bad |
|---|---|
token | data |
progress | Stream1 |
search_result | myStream |
- Use lowercase with underscores
- Keep names short and descriptive
- Use consistent naming across your app
Payload Limits
Payload Limits
Events have a 1MB message limit. For large data:
# Bad: Don't stream large payloads
# await task.update_state(large_document)
# Good: Store large data externally, stream a reference
url = await storage.upload(large_document)
await task.update_state({"url": url, "size": len(large_document)})Event Schema
Event Schema
Each published event is wrapped with context:
{
"stream": "token", # Your stream/topic name
"broker_sequence": 42, # Monotonic sequence number
"timestamp": "...", # ISO timestamp (optional)
"data": { # Typed event response (discriminated union)
"event_type": "CUSTOM_TASK_IN_PROGRESS",
"event_id": "...",
"event_timestamp": 1234567890,
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"root_workflow_exec_id": "abc123",
"parent_workflow_exec_id": null,
"workflow_run_id": "...",
"attributes": { ... } # Type-specific attributes
},
"workflow_context": {
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"parent_workflow_exec_id": null
}
}