Workflows enfants
Un workflow peut exécuter d'autres workflows en tant que workflows enfants, ce qui permet des schémas d'orchestration hiérarchiques.
Quand utiliser un workflow enfant vs une activité
Les deux permettent de composer du travail — choisissez selon la nécessité d'un historique durable séparé :
| Utilisez un workflow enfant si... | Utilisez une activité si... |
|---|---|
| L'unité de travail est elle-même de longue durée, avec ses propres relances ou nécessite son propre historique d'événements que vous pouvez rejouer indépendamment | L'unité de travail est un effet de bord unique (écriture en base, appel d'API, appel LLM) qui doit être relancé comme un tout |
| Vous souhaitez inspecter, signaler ou interroger ce sous-processus depuis l'extérieur | Vous n'avez pas besoin d'observabilité externe au-delà de l'historique du parent |
| Vous voulez l'exécuter sur un autre worker / déploiement | Le worker parent peut l'exécuter directement |
Les workflows enfants sont plus lourds que les activités : ils ont leur propre exécution, historique et politique de relance. Privilégiez d'abord une activité ; n'utilisez un workflow enfant que si vous avez besoin de sa durabilité indépendante.
Utilisation de base
Exécutez un workflow enfant et attendez son résultat :
import asyncio
import mistralai.workflows as workflows
from pydantic import BaseModel
@workflows.activity()
async def process_item(value: str) -> str:
return f"processed:{value}"
class ChildInput(BaseModel):
value: str
@workflows.workflow.define(name="child_workflow")
class ChildWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: ChildInput) -> str:
return await process_item(params.value)
class ParentInput(BaseModel):
data: str
@workflows.workflow.define(name="parent_workflow")
class ParentWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: ParentInput) -> str:
child_result = await workflows.execute_workflow(
ChildWorkflow, params=ChildInput(value=params.data)
)
return f"Parent got: {child_result}"
async def main():
result = await workflows.execute_workflow(
ParentWorkflow,
params=ParentInput(data="hello"),
)
print(result)Exécution sans attendre
Lancez un workflow enfant sans attendre son résultat en passant wait=False :
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
)
# Le parent poursuit immédiatement — l'enfant s'exécute de façon indépendante
# Vous pouvez attendre le résultat plus tard si besoin : result = await handlePar défaut, wait=False place la politique de fermeture du parent à ABANDON, donc le workflow enfant poursuit son exécution même si le parent se termine.
Politique de fermeture du parent
Redéfinissez la politique de fermeture par défaut avec le paramètre parent_close_policy :
from mistralai.workflows import ParentClosePolicy
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
parent_close_policy=ParentClosePolicy.TERMINATE,
)Observation des événements enfant
Chaque workflow enfant possède son propre ID d'exécution et son historique d'événements. Chaque événement contient workflow_context.parent_workflow_exec_id et workflow_context.root_workflow_exec_id, ce qui vous permet de vous abonner à un parent et à tous ses descendants en un seul flux, en filtrant sur root_workflow_exec_id.
Pour un workflow racine sans parent, parent_workflow_exec_id vaut null et root_workflow_exec_id correspond à l'ID d'exécution du workflow.
Voir Consommer des événements en streaming > Consommer les événements d'arbres de workflows pour le schéma d'abonnement.