Streaming: Real-Time Events from Workflows

Stream events in real-time from your workflows and activities to power live UIs, progress indicators, and token-by-token LLM responses.

Quick Start

Quick Start

1. Publish from an Activity

import mistralai.workflows as workflows
from mistralai.workflows.core.task import Task

@workflows.activity()
async def chat_activity(messages: list) -> dict:
    initial_state = {"tokens": []}

    async with Task(type="token-stream", state=initial_state) as task:

        async for chunk in llm.stream(messages):
            token = chunk.choices[0].delta.content
            if token:
                await task.update_state({"tokens": task.state["tokens"] + [token]})

    return {"response": "".join(task.state["tokens"])}

2. Consume Events

See Consuming Streaming Events for details on subscribing to events.

Publishing Patterns

Publishing Patterns

Token Streaming (LLM)

The most common pattern - stream tokens as they're generated:

@workflows.activity()
async def chat_activity(messages: list) -> dict:
    initial_state = {"tokens": []}

    async with Task(type="token-stream", state=initial_state) as task:

        async for chunk in llm.stream(messages):
            token = chunk.choices[0].delta.content
            if token:
                await task.update_state({"tokens": task.state["tokens"] + [token]})

    return {"response": "".join(task.state["tokens"])}

Progress Updates

Report progress during long operations:

@workflows.activity(name="Processing text with explicit control")
async def streaming_tokens_with_progress_activity(text: str) -> dict:
    """
    Example of explicit progress tracking with Task API.

    This pattern gives you full control over state updates and progress tracking.
    """
    words = text.split()
    initial_state = {"processed_words": [], "progress_idx": 0, "progress_total": len(words)}

    async with Task(type="progress-stream", state=initial_state) as task:
        state = task.state
        for i, word in enumerate(words):
            await task.update_state({
                "processed_words": state["processed_words"] + [word],
                "progress_idx": i + 1,
            })
            state = task.state
            await asyncio.sleep(0.1)

    final_state = task.state
    return {"processed_text": " ".join(final_state["processed_words"]), "token_count": len(words)}
Task Type Names

Task Type Names

The type parameter is a topic name you define. Best practices:

GoodBad
tokendata
progressStream1
search_resultmyStream
  • Use lowercase with underscores
  • Keep names short and descriptive
  • Use consistent naming across your app
Payload Limits

Payload Limits

Events have a 1MB message limit. For large data:

# Bad: Don't stream large payloads
# await task.update_state(large_document)

# Good: Store large data externally, stream a reference
url = await storage.upload(large_document)
await task.update_state({"url": url, "size": len(large_document)})
Event Schema

Event Schema

Each published event is wrapped with context:

{
    "stream": "token",              # Your stream/topic name
    "broker_sequence": 42,          # Monotonic sequence number
    "timestamp": "...",             # ISO timestamp (optional)
    "data": {                       # Typed event response (discriminated union)
        "event_type": "CUSTOM_TASK_IN_PROGRESS",
        "event_id": "...",
        "event_timestamp": 1234567890,
        "workflow_name": "my-workflow",
        "workflow_exec_id": "abc123",
        "root_workflow_exec_id": "abc123",
        "parent_workflow_exec_id": null,
        "workflow_run_id": "...",
        "attributes": { ... }      # Type-specific attributes
    },
    "workflow_context": {
        "workflow_name": "my-workflow",
        "workflow_exec_id": "abc123",
        "parent_workflow_exec_id": null
    }
}