Streaming: Real-Time Events from Workflows

Stream events in real-time from your workflows and activities to power live UIs, progress indicators, and token-by-token LLM responses.

Quick Start

Quick Start

1. Publish from an Activity

1. Publish from an Activity

Inside an activity, use the Task context manager to publish events. A Task represents a named stream of state updates that external consumers can subscribe to. The type parameter is a topic name you choose (e.g., "token-stream"), and state is the initial payload. Each call to update_state() publishes the new state as an event to all subscribers in real time.

import mistralai.workflows as workflows
from mistralai.workflows.core.task import Task

@workflows.activity()
async def chat_activity(messages: list) -> dict:
    initial_state = {"tokens": []}

    async with Task(type="token-stream", state=initial_state) as task:

        async for chunk in llm.stream(messages):
            token = chunk.choices[0].delta.content
            if token:
                await task.update_state({"tokens": task.state["tokens"] + [token]})

    return {"response": "".join(task.state["tokens"])}

When the async with block exits, the task is marked as complete and a terminal event is published.

2. Consume Events

2. Consume Events

See Consuming Streaming Events for details on subscribing to events.

Publishing Patterns

Publishing Patterns

Token Streaming (LLM)

Token Streaming (LLM)

The most common pattern - stream tokens as they're generated:

@workflows.activity()
async def chat_activity(messages: list) -> dict:
    initial_state = {"tokens": []}

    async with Task(type="token-stream", state=initial_state) as task:

        async for chunk in llm.stream(messages):
            token = chunk.choices[0].delta.content
            if token:
                await task.update_state({"tokens": task.state["tokens"] + [token]})

    return {"response": "".join(task.state["tokens"])}
Progress Updates

Progress Updates

Report progress during long operations:

@workflows.activity(name="Processing text with explicit control")
async def streaming_tokens_with_progress_activity(text: str) -> dict:
    """
    Example of explicit progress tracking with Task API.

    This pattern gives you full control over state updates and progress tracking.
    """
    words = text.split()
    initial_state = {"processed_words": [], "progress_idx": 0, "progress_total": len(words)}

    async with Task(type="progress-stream", state=initial_state) as task:
        state = task.state
        for i, word in enumerate(words):
            await task.update_state({
                "processed_words": state["processed_words"] + [word],
                "progress_idx": i + 1,
            })
            state = task.state
            await asyncio.sleep(0.1)

    final_state = task.state
    return {"processed_text": " ".join(final_state["processed_words"]), "token_count": len(words)}
Task Type Names

Task Type Names

The type parameter is a topic name you define. Best practices:

GoodBad
tokendata
progressStream1
search_resultmyStream
  • Use lowercase with underscores
  • Keep names short and descriptive
  • Use consistent naming across your app
Payload Limits

Payload Limits

Events have a 1MB message limit. For large data:

# Bad: Don't stream large payloads
# await task.update_state(large_document)

# Good: Store large data externally, stream a reference
url = await storage.upload(large_document)
await task.update_state({"url": url, "size": len(large_document)})
Event Schema

Event Schema

Each published event is wrapped with context:

{
    "stream": "token",              # Your stream/topic name
    "broker_sequence": 42,          # Monotonic sequence number
    "timestamp": "...",             # ISO timestamp (optional)
    "data": {                       # Typed event response (discriminated union)
        "event_type": "CUSTOM_TASK_IN_PROGRESS",
        "event_id": "...",
        "event_timestamp": 1234567890,
        "workflow_name": "my-workflow",
        "workflow_exec_id": "abc123",
        "root_workflow_exec_id": "abc123",
        "parent_workflow_exec_id": null,
        "workflow_run_id": "...",
        "attributes": { ... }      # Type-specific attributes
    },
    "workflow_context": {
        "workflow_name": "my-workflow",
        "workflow_exec_id": "abc123",
        "parent_workflow_exec_id": null
    }
}