Consuming Streaming Events
Subscribe to real-time events from workflows and activities using the Workflows API client.
Using the Workflows API Client
Basic Consumption
from mistralai.workflows.client import get_mistral_client
async with get_mistral_client(
server_url="https://api.mistral.ai",
api_key=os.environ["MISTRAL_API_KEY"],
) as client:
# Start a workflow
execution = await client.workflows.execute_workflow_async(
workflow_identifier="my-workflow",
input=params,
)
# Stream all events for this execution
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution.execution_id,
):
print(f"[{event.stream}] {event.data}")Filtering by Stream Name
Only receive events from a specific stream:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution_id,
stream="token", # Only token events
):
print(event.data)Detecting Completion
Use SDK event types to detect when a workflow finishes:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution_id,
):
if event.data is None:
continue
event_type = event.data.event_type
if event_type == "WORKFLOW_EXECUTION_COMPLETED":
print("Done!")
break
elif event_type in ("WORKFLOW_EXECUTION_FAILED", "WORKFLOW_EXECUTION_CANCELED"):
print(f"Ended: {event_type}")
break
else:
record_event(event)Resume from Sequence
If your connection drops, resume from where you left off:
last_seq = 0
while True:
try:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution_id,
start_seq=last_seq,
):
last_seq = event.sequence
process(event)
break # Completed normally
except ConnectionError:
await asyncio.sleep(1)
# Loop will resume from last_seqAPI Routes
Stream Events Endpoint
GET /v1/workflows/executions/{execution_id}/streamQuery parameters for filtering:
| Parameter | Description | Example |
|---|---|---|
stream | Filter by stream name | ?stream=token |
workflow_name | Filter by workflow name | ?workflow_name=my-workflow |
workflow_exec_id | Filter by execution ID | ?workflow_exec_id=exec789 |
root_workflow_exec_id | Filter by root workflow exec ID | ?root_workflow_exec_id=root123 |
parent_workflow_exec_id | Filter by parent workflow exec ID | ?parent_workflow_exec_id=parent456 |
activity_name | Filter by activity name | ?activity_name=chat_activity |
activity_id | Filter by activity execution ID | ?activity_id=activity123 |
scope | workflow, activity, or * | ?scope=activity |
start_seq | Resume from sequence | ?start_seq=42 |
metadata_filters | Filter by metadata (JSON object) | ?metadata_filters={"key":"v"} |
workflow_event_types | Filter by workflow event types | ?workflow_event_types=WORKFLOW_COMPLETED |
Example: Token Streaming
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?stream=token"Response (Server-Sent Events):
data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":1,"workflow_context":{...}}
data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":2,"workflow_context":{...}}
data: {"stream":"token","data":{"event_type":"WORKFLOW_EXECUTION_COMPLETED",...},"broker_sequence":3,"workflow_context":{...}}Example: All Events with Wildcard
# All events for an execution
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream"
# Only activity events
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?scope=activity"
# Only workflow-level events
curl -N "http://localhost:8000/v1/workflows/executions/${EXEC_ID}/stream?scope=workflow"Sub-Workflow Events
When a workflow spawns sub-workflows, events maintain parent-child relationships:
# Parent workflow
@workflows.workflow.define(name="parent-workflow")
class ParentWorkflow:
@workflows.workflow.entrypoint
async def run(self, params):
# Sub-workflow events will have parent_workflow_exec_id set
result = await workflows.workflow.execute_workflow(
ChildWorkflow,
params
)
return resultConsuming Events from Workflow Trees
# Get events from parent AND all children
async for event in client.workflows.events.get_stream_events(
root_workflow_exec_id=parent_execution_id,
):
if event.workflow_context.parent_workflow_exec_id:
print(f"[child] {event.data}")
else:
print(f"[parent] {event.data}")Filtering by Root ID
Use the root_workflow_exec_id parameter to get all events in a workflow tree:
async for event in client.workflows.events.get_stream_events(
root_workflow_exec_id="root123",
):
process(event)Event Types
The SDK uses typed event responses discriminated by event_type. Each event is a Pydantic model
(e.g. WorkflowExecutionCompletedResponse, CustomTaskInProgressResponse) with fields like
event_id, event_timestamp, workflow_exec_id, workflow_name, and type-specific attributes.
| Event Type | Description |
|---|---|
WORKFLOW_EXECUTION_STARTED | Workflow started |
WORKFLOW_EXECUTION_COMPLETED | Workflow finished successfully |
WORKFLOW_EXECUTION_FAILED | Workflow failed |
WORKFLOW_EXECUTION_CANCELED | Workflow was canceled |
WORKFLOW_EXECUTION_CONTINUED_AS_NEW | Workflow continued as new |
WORKFLOW_TASK_TIMED_OUT | Workflow task timed out |
WORKFLOW_TASK_FAILED | Workflow task failed |
CUSTOM_TASK_STARTED | Custom task started |
CUSTOM_TASK_IN_PROGRESS | Custom task progress update |
CUSTOM_TASK_COMPLETED | Custom task completed |
CUSTOM_TASK_FAILED | Custom task failed |
CUSTOM_TASK_TIMED_OUT | Custom task timed out |
CUSTOM_TASK_CANCELED | Custom task canceled |
ACTIVITY_TASK_STARTED | Activity started |
ACTIVITY_TASK_COMPLETED | Activity finished |
ACTIVITY_TASK_RETRYING | Activity is retrying |
ACTIVITY_TASK_FAILED | Activity failed |
Example Event
{
"stream": "token",
"broker_sequence": 1,
"timestamp": "2025-01-15T10:30:00Z",
"data": {
"event_type": "WORKFLOW_EXECUTION_STARTED",
"event_id": "evt_abc123",
"event_timestamp": 1736938200000000000,
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"root_workflow_exec_id": "abc123",
"parent_workflow_exec_id": null,
"workflow_run_id": "run_456",
"attributes": {}
},
"workflow_context": {
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"parent_workflow_exec_id": null
}
}Best practices
1. Always Handle Disconnection
async def resilient_consume(client, exec_id):
max_retries = 10
for attempt in range(max_retries):
try:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id,
start_seq=last_seq,
):
last_seq = event.sequence
yield event
if is_terminal_status(event):
return
except ConnectionError:
await asyncio.sleep(min(2 ** attempt, 30))2. Filter Early
Filter at the subscription level, not in your code:
# Good: Filter at source
async for event in client.workflows.events.get_stream_events(exec_id, stream="token"):
process(event)
# Bad: Filter in code (wastes bandwidth)
async for event in client.workflows.events.get_stream_events(exec_id):
if event.stream == "token":
process(event)3. Use Appropriate Scope
If you only care about activity events:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id, scope="activity"
):
if event.data is not None:
# Only activity events, no workflow-level events
process(event.data)