Consuming Streaming Events

Subscribe to real-time events from workflows and activities using the Workflows API client.

Using the Workflows API Client

Using the Workflows API Client

Basic Consumption

Basic Consumption

import os
from mistralai.client import Mistral

client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])

# Start a workflow
execution = client.workflows.execute_workflow(
    workflow_identifier="my-workflow",
    input=params,
)

# Stream all events for this execution
for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution.execution_id,
):
    print(f"[{event.stream}] {event.data}")
Filtering by Stream Name

Filtering by Stream Name

Only receive events from a specific stream:

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution_id,
    stream="token",  # Only token events
):
    print(event.data)
Detecting Completion

Detecting Completion

Use SDK event types to detect when a workflow finishes:

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution_id,
):
    if event.data is None:
        continue

    event_type = event.data.event_type

    if event_type == "WORKFLOW_EXECUTION_COMPLETED":
        print("Done!")
        break
    elif event_type in ("WORKFLOW_EXECUTION_FAILED", "WORKFLOW_EXECUTION_CANCELED"):
        print(f"Ended: {event_type}")
        break
    else:
        record_event(event)
Resume from Sequence

Resume from Sequence

If your connection drops, resume from where you left off:

last_seq = 0
while True:
    try:
        async for event in client.workflows.events.get_stream_events(
            workflow_exec_id=execution_id,
            start_seq=last_seq,
        ):
            last_seq = event.sequence
            process(event)
        break  # Completed normally
    except ConnectionError:
        await asyncio.sleep(1)
        # Loop will resume from last_seq
API Routes

API Routes

Stream Events Endpoint

Stream Events Endpoint

GET /v1/workflows/executions/{execution_id}/stream

Query parameters for filtering:

ParameterDescriptionExample
streamFilter by stream name?stream=token
workflow_nameFilter by workflow name?workflow_name=my-workflow
workflow_exec_idFilter by execution ID?workflow_exec_id=exec789
root_workflow_exec_idFilter by root workflow exec ID?root_workflow_exec_id=root123
parent_workflow_exec_idFilter by parent workflow exec ID?parent_workflow_exec_id=parent456
activity_nameFilter by activity name?activity_name=chat_activity
activity_idFilter by activity execution ID?activity_id=activity123
scopeworkflow, activity, or *?scope=activity
start_seqResume from sequence?start_seq=42
metadata_filtersFilter by metadata (JSON object)?metadata_filters={"key":"v"}
workflow_event_typesFilter by workflow event types?workflow_event_types=WORKFLOW_COMPLETED
Example: Token Streaming

Example: Token Streaming

curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?stream=token"

Response (Server-Sent Events):

data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":1,"workflow_context":{...}}

data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":2,"workflow_context":{...}}

data: {"stream":"token","data":{"event_type":"WORKFLOW_EXECUTION_COMPLETED",...},"broker_sequence":3,"workflow_context":{...}}
Example: All Events with Wildcard

Example: All Events with Wildcard

# All events for an execution
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream"

# Only activity events
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?scope=activity"

# Only workflow-level events
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?scope=workflow"
Sub-Workflow Events

Sub-Workflow Events

When a workflow spawns sub-workflows, events maintain parent-child relationships:

# Parent workflow
@workflows.workflow.define(name="parent-workflow")
class ParentWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params):
        # Sub-workflow events will have parent_workflow_exec_id set
        result = await workflows.workflow.execute_workflow(
            ChildWorkflow,
            params
        )
        return result
Consuming Events from Workflow Trees

Consuming Events from Workflow Trees

To receive events from a parent workflow and all its children in a single stream, subscribe using root_workflow_exec_id. Each event includes workflow_context so you can tell which workflow in the tree emitted it — if parent_workflow_exec_id is set, the event came from a child workflow.

# Get events from parent AND all children
async for event in client.workflows.events.get_stream_events(
    root_workflow_exec_id=parent_execution_id,
):
    if event.workflow_context.parent_workflow_exec_id:
        print(f"[child] {event.data}")
    else:
        print(f"[parent] {event.data}")
Filtering by Root ID

Filtering by Root ID

Use the root_workflow_exec_id parameter to get all events in a workflow tree:

async for event in client.workflows.events.get_stream_events(
    root_workflow_exec_id="root123",
):
    process(event)
Event Types

Event Types

The SDK uses typed event responses discriminated by event_type. Each event is a Pydantic model (e.g. WorkflowExecutionCompletedResponse, CustomTaskInProgressResponse) with fields like event_id, event_timestamp, workflow_exec_id, workflow_name, and type-specific attributes.

Event TypeDescription
WORKFLOW_EXECUTION_STARTEDWorkflow started
WORKFLOW_EXECUTION_COMPLETEDWorkflow finished successfully
WORKFLOW_EXECUTION_FAILEDWorkflow failed
WORKFLOW_EXECUTION_CANCELEDWorkflow was canceled
WORKFLOW_EXECUTION_CONTINUED_AS_NEWWorkflow continued as new
WORKFLOW_TASK_TIMED_OUTWorkflow task timed out
WORKFLOW_TASK_FAILEDWorkflow task failed
CUSTOM_TASK_STARTEDCustom task started
CUSTOM_TASK_IN_PROGRESSCustom task progress update
CUSTOM_TASK_COMPLETEDCustom task completed
CUSTOM_TASK_FAILEDCustom task failed
CUSTOM_TASK_TIMED_OUTCustom task timed out
CUSTOM_TASK_CANCELEDCustom task canceled
ACTIVITY_TASK_STARTEDActivity started
ACTIVITY_TASK_COMPLETEDActivity finished
ACTIVITY_TASK_RETRYINGActivity is retrying
ACTIVITY_TASK_FAILEDActivity failed
Example Event

Example Event

{
  "stream": "token",
  "broker_sequence": 1,
  "timestamp": "2025-01-15T10:30:00Z",
  "data": {
    "event_type": "WORKFLOW_EXECUTION_STARTED",
    "event_id": "evt_abc123",
    "event_timestamp": 1736938200000000000,
    "workflow_name": "my-workflow",
    "workflow_exec_id": "abc123",
    "root_workflow_exec_id": "abc123",
    "parent_workflow_exec_id": null,
    "workflow_run_id": "run_456",
    "attributes": {}
  },
  "workflow_context": {
    "workflow_name": "my-workflow",
    "workflow_exec_id": "abc123",
    "parent_workflow_exec_id": null
  }
}
Best practices

Best practices

Always Handle Disconnection

Always Handle Disconnection

Streaming connections can drop at any time. Use start_seq to resume from the last received event, and implement exponential backoff to avoid hammering the server. The is_terminal_status() check below is your own helper — check the event type against terminal statuses like WORKFLOW_EXECUTION_COMPLETED, WORKFLOW_EXECUTION_FAILED, or WORKFLOW_EXECUTION_CANCELED.

async def resilient_consume(client, exec_id):
    max_retries = 10

    for attempt in range(max_retries):
        try:
            async for event in client.workflows.events.get_stream_events(
                workflow_exec_id=exec_id,
                start_seq=last_seq,
            ):
                last_seq = event.sequence
                yield event

                if is_terminal_status(event):
                    return
        except ConnectionError:
            await asyncio.sleep(min(2 ** attempt, 30))
Filter Early

Filter Early

Filter at the subscription level, not in your code:

# Good: Filter at source
async for event in client.workflows.events.get_stream_events(exec_id, stream="token"):
    process(event)

# Bad: Filter in code (wastes bandwidth)
async for event in client.workflows.events.get_stream_events(exec_id):
    if event.stream == "token":
        process(event)
Use Appropriate Scope

Use Appropriate Scope

If you only care about activity events:

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=exec_id, scope="activity"
):
    if event.data is not None:
        # Only activity events, no workflow-level events
        process(event.data)