Consuming Streaming Events

Subscribe to real-time events from workflows and activities using the Workflows API client.

Using the Workflows API Client

Using the Workflows API Client

Basic Consumption

from mistralai.workflows.client import get_mistral_client

async with get_mistral_client(
    server_url="https://api.mistral.ai",
    api_key=os.environ["MISTRAL_API_KEY"],
) as client:
    # Start a workflow
    execution = await client.workflows.execute_workflow_async(
        workflow_identifier="my-workflow",
        input=params,
    )

    # Stream all events for this execution
    async for event in client.workflows.events.get_stream_events(
        workflow_exec_id=execution.execution_id,
    ):
        print(f"[{event.stream}] {event.data}")

Filtering by Stream Name

Only receive events from a specific stream:

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution_id,
    stream="token",  # Only token events
):
    print(event.data)

Detecting Completion

Use SDK event types to detect when a workflow finishes:

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution_id,
):
    if event.data is None:
        continue

    event_type = event.data.event_type

    if event_type == "WORKFLOW_EXECUTION_COMPLETED":
        print("Done!")
        break
    elif event_type in ("WORKFLOW_EXECUTION_FAILED", "WORKFLOW_EXECUTION_CANCELED"):
        print(f"Ended: {event_type}")
        break
    else:
        record_event(event)

Resume from Sequence

If your connection drops, resume from where you left off:

last_seq = 0
while True:
    try:
        async for event in client.workflows.events.get_stream_events(
            workflow_exec_id=execution_id,
            start_seq=last_seq,
        ):
            last_seq = event.sequence
            process(event)
        break  # Completed normally
    except ConnectionError:
        await asyncio.sleep(1)
        # Loop will resume from last_seq
API Routes

API Routes

Stream Events Endpoint

GET /v1/workflows/executions/{execution_id}/stream

Query parameters for filtering:

ParameterDescriptionExample
streamFilter by stream name?stream=token
workflow_nameFilter by workflow name?workflow_name=my-workflow
workflow_exec_idFilter by execution ID?workflow_exec_id=exec789
root_workflow_exec_idFilter by root workflow exec ID?root_workflow_exec_id=root123
parent_workflow_exec_idFilter by parent workflow exec ID?parent_workflow_exec_id=parent456
activity_nameFilter by activity name?activity_name=chat_activity
activity_idFilter by activity execution ID?activity_id=activity123
scopeworkflow, activity, or *?scope=activity
start_seqResume from sequence?start_seq=42
metadata_filtersFilter by metadata (JSON object)?metadata_filters={"key":"v"}
workflow_event_typesFilter by workflow event types?workflow_event_types=WORKFLOW_COMPLETED

Example: Token Streaming

curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?stream=token"

Response (Server-Sent Events):

data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":1,"workflow_context":{...}}

data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":2,"workflow_context":{...}}

data: {"stream":"token","data":{"event_type":"WORKFLOW_EXECUTION_COMPLETED",...},"broker_sequence":3,"workflow_context":{...}}

Example: All Events with Wildcard

# All events for an execution
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream"

# Only activity events
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?scope=activity"

# Only workflow-level events
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?scope=workflow"
Sub-Workflow Events

Sub-Workflow Events

When a workflow spawns sub-workflows, events maintain parent-child relationships:

# Parent workflow
@workflows.workflow.define(name="parent-workflow")
class ParentWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params):
        # Sub-workflow events will have parent_workflow_exec_id set
        result = await workflows.workflow.execute_workflow(
            ChildWorkflow,
            params
        )
        return result

Consuming Events from Workflow Trees

# Get events from parent AND all children
async for event in client.workflows.events.get_stream_events(
    root_workflow_exec_id=parent_execution_id,
):
    if event.workflow_context.parent_workflow_exec_id:
        print(f"[child] {event.data}")
    else:
        print(f"[parent] {event.data}")

Filtering by Root ID

Use the root_workflow_exec_id parameter to get all events in a workflow tree:

async for event in client.workflows.events.get_stream_events(
    root_workflow_exec_id="root123",
):
    process(event)
Event Types

Event Types

The SDK uses typed event responses discriminated by event_type. Each event is a Pydantic model (e.g. WorkflowExecutionCompletedResponse, CustomTaskInProgressResponse) with fields like event_id, event_timestamp, workflow_exec_id, workflow_name, and type-specific attributes.

Event TypeDescription
WORKFLOW_EXECUTION_STARTEDWorkflow started
WORKFLOW_EXECUTION_COMPLETEDWorkflow finished successfully
WORKFLOW_EXECUTION_FAILEDWorkflow failed
WORKFLOW_EXECUTION_CANCELEDWorkflow was canceled
WORKFLOW_EXECUTION_CONTINUED_AS_NEWWorkflow continued as new
WORKFLOW_TASK_TIMED_OUTWorkflow task timed out
WORKFLOW_TASK_FAILEDWorkflow task failed
CUSTOM_TASK_STARTEDCustom task started
CUSTOM_TASK_IN_PROGRESSCustom task progress update
CUSTOM_TASK_COMPLETEDCustom task completed
CUSTOM_TASK_FAILEDCustom task failed
CUSTOM_TASK_TIMED_OUTCustom task timed out
CUSTOM_TASK_CANCELEDCustom task canceled
ACTIVITY_TASK_STARTEDActivity started
ACTIVITY_TASK_COMPLETEDActivity finished
ACTIVITY_TASK_RETRYINGActivity is retrying
ACTIVITY_TASK_FAILEDActivity failed

Example Event

{
  "stream": "token",
  "broker_sequence": 1,
  "timestamp": "2025-01-15T10:30:00Z",
  "data": {
    "event_type": "WORKFLOW_EXECUTION_STARTED",
    "event_id": "evt_abc123",
    "event_timestamp": 1736938200000000000,
    "workflow_name": "my-workflow",
    "workflow_exec_id": "abc123",
    "root_workflow_exec_id": "abc123",
    "parent_workflow_exec_id": null,
    "workflow_run_id": "run_456",
    "attributes": {}
  },
  "workflow_context": {
    "workflow_name": "my-workflow",
    "workflow_exec_id": "abc123",
    "parent_workflow_exec_id": null
  }
}
Best practices

Best practices

1. Always Handle Disconnection

async def resilient_consume(client, exec_id):
    max_retries = 10

    for attempt in range(max_retries):
        try:
            async for event in client.workflows.events.get_stream_events(
                workflow_exec_id=exec_id,
                start_seq=last_seq,
            ):
                last_seq = event.sequence
                yield event

                if is_terminal_status(event):
                    return
        except ConnectionError:
            await asyncio.sleep(min(2 ** attempt, 30))

2. Filter Early

Filter at the subscription level, not in your code:

# Good: Filter at source
async for event in client.workflows.events.get_stream_events(exec_id, stream="token"):
    process(event)

# Bad: Filter in code (wastes bandwidth)
async for event in client.workflows.events.get_stream_events(exec_id):
    if event.stream == "token":
        process(event)

3. Use Appropriate Scope

If you only care about activity events:

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=exec_id, scope="activity"
):
    if event.data is not None:
        # Only activity events, no workflow-level events
        process(event.data)