Sub-workflows
A workflow can execute other workflows as sub-workflows, enabling hierarchical orchestration patterns.
Basic usage
Basic usage
Execute a sub-workflow and wait for its result:
import asyncio
import mistralai.workflows as workflows
from pydantic import BaseModel
@workflows.activity()
async def process_item(value: str) -> str:
return f"processed:{value}"
class ChildInput(BaseModel):
value: str
@workflows.workflow.define(name="child_workflow")
class ChildWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: ChildInput) -> str:
return await process_item(params.value)
class ParentInput(BaseModel):
data: str
@workflows.workflow.define(name="parent_workflow")
class ParentWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: ParentInput) -> str:
child_result = await workflows.execute_workflow(
ChildWorkflow, params=ChildInput(value=params.data)
)
return f"Parent got: {child_result}"
async def main():
result = await workflows.execute_workflow(
ParentWorkflow,
params=ParentInput(data="hello"),
)
print(result)Fire and forget
Fire and forget
Start a sub-workflow without waiting for its result by passing wait=False:
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
)
# Parent continues immediately — child runs independently
# Optionally await later: result = await handleBy default, wait=False sets the parent close policy to ABANDON, so the child continues running even if the parent completes.
Parent close policy
Parent close policy
Override the default close policy with the parent_close_policy parameter:
from mistralai.workflows import ParentClosePolicy
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
parent_close_policy=ParentClosePolicy.TERMINATE,
)