Sub-workflows

A workflow can execute other workflows as sub-workflows, enabling hierarchical orchestration patterns.

Basic usage

Basic usage

Execute a sub-workflow and wait for its result:

import asyncio
import mistralai.workflows as workflows
from pydantic import BaseModel


@workflows.activity()
async def process_item(value: str) -> str:
    return f"processed:{value}"


class ChildInput(BaseModel):
    value: str


@workflows.workflow.define(name="child_workflow")
class ChildWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params: ChildInput) -> str:
        return await process_item(params.value)


class ParentInput(BaseModel):
    data: str


@workflows.workflow.define(name="parent_workflow")
class ParentWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params: ParentInput) -> str:
        child_result = await workflows.execute_workflow(
            ChildWorkflow, params=ChildInput(value=params.data)
        )
        return f"Parent got: {child_result}"


async def main():
    result = await workflows.execute_workflow(
        ParentWorkflow,
        params=ParentInput(data="hello"),
    )
    print(result)
Fire and forget

Fire and forget

Start a sub-workflow without waiting for its result by passing wait=False:

handle = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1),
    wait=False,
)
# Parent continues immediately — child runs independently
# Optionally await later: result = await handle

By default, wait=False sets the parent close policy to ABANDON, so the child continues running even if the parent completes.

Parent close policy

Parent close policy

Override the default close policy with the parent_close_policy parameter:

from mistralai.workflows import ParentClosePolicy

handle = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1),
    wait=False,
    parent_close_policy=ParentClosePolicy.TERMINATE,
)