Programmation des workflows
Exécutez des workflows de manière récurrente selon une planification cron, avec des politiques pour la reprise et le chevauchement.
Fonctionnement
Les workers enregistrent leurs planifications de workflows auprès de la plateforme au démarrage. La plateforme déclenche le workflow à l’heure prévue et applique les politiques définies pour la reprise et le chevauchement.
Les modifications de la planification nécessitent un redémarrage du worker pour prendre effet. Le worker lit les définitions de planification uniquement au démarrage : les changements des expressions cron ou des politiques dans votre code ne sont pas pris en compte par un worker déjà lancé.
Définir des planifications
Ajoutez des planifications à vos workflows à l’aide d’expressions cron. Les planifications utilisent UTC par défaut : convertissez depuis votre fuseau horaire local avant d’écrire l’expression.
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition
schedule = ScheduleDefinition(
input={"report_type": "daily"},
cron_expressions=["0 0 * * *"] # Tous les jours à minuit UTC
)
@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str = "daily") -> None:
# Générer le rapport
passDéfinir les politiques de planification
Les politiques contrôlent ce qui se passe lorsque des exécutions sont manquées ou se chevauchent :
catchup_window_seconds— Si la plateforme était indisponible ou a manqué des exécutions planifiées, elle déclenche rétroactivement toutes les exécutions manquées dans cette fenêtre temporelle. Les exécutions plus anciennes sont ignorées.overlap— Définit le comportement en cas de nouvelle exécution prévue alors que la précédente est toujours en cours :SKIP— ignore la nouvelle exécution. À utiliser si seules les données les plus récentes vous importent et que des exécutions parallèles feraient des doublons (ex : synchronisations périodiques).BUFFER_ONE— place une exécution en attente, ignore les suivantes tant que celle en attente n’a pas commencé. Pour garantir au moins une exécution de rattrapage sans accumulation excessive.ALLOW_ALL— démarre chaque exécution planifiée en concurrence. À réserver aux cas où chaque exécution est indépendante et la charge supporte le parallélisme.
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy
# Remplacer la politique de planification par défaut
schedule_policy = SchedulePolicy(
catchup_window_seconds=86400, # Autoriser 1 jour de rattrapage
overlap=ScheduleOverlapPolicy.SKIP, # Ignorer les exécutions qui se chevauchent
)
schedule = ScheduleDefinition(
input={"report_type": "daily"},
cron_expressions=["0 0 * * *"], # Tous les jours à minuit UTC
policy=schedule_policy
)
@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str = "daily") -> None:
# Générer le rapport
passGestion des planifications
Les planifications peuvent être gérées de deux manières :
- Dans le code (recommandé dans la plupart des cas) : déclarez les planifications dans le workflow via
@workflows.workflow.define(..., schedules=[...]). Les workers les enregistrent au démarrage. Toute modification duScheduleDefinitionsera prise en compte après un redémarrage du worker. - Via l’API REST :
POST /v1/workflows/schedulespour créer,GET /v1/workflows/schedulespour lister, etDELETE /v1/workflows/schedules/{schedule_id}pour supprimer. Utilisez l’API si vous souhaitez gérer indépendamment les planifications des déploiements de workers (ex : planifications ponctuelles ou pilotées par un système externe).
Tous les workers doivent utiliser la même définition de planification : chaque worker d’un workflow enregistre sa propre copie des planifications définies dans le code. Si deux workers exécutent des versions différentes d’un workflow avec des ScheduleDefinition divergentes, la plateforme détecte des enregistrements en conflit et le comportement devient indéterminé. Déployez les mises à jour de workers de façon à ce qu’ils convergent tous sur la même définition.
Exemple complet
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy
# Exécuter chaque samedi à 3h00 UTC
backup_schedule = ScheduleDefinition(
input={"retention_days": 30},
cron_expressions=["0 3 * * 6"],
policy=SchedulePolicy(
catchup_window_seconds=604800, # 7 jours
overlap=ScheduleOverlapPolicy.SKIP,
)
)
@workflows.workflow.define(name="database_backup_workflow", schedules=[backup_schedule])
class DatabaseBackupWorkflow:
@workflows.workflow.entrypoint
async def run(self, retention_days: int = 30) -> None:
print(f"Début de la sauvegarde avec une rétention de {retention_days} jours")
# Implémentation de la sauvegarde ici
# Démarrer le worker avec :
# asyncio.run(workflows.run_worker([DatabaseBackupWorkflow]))Notes
- Les expressions cron suivent la syntaxe standard à 5 champs (
minute hour day-of-month month day-of-week). - Un workflow peut avoir plusieurs
cron_expressions: elles sont toutes déclenchées de manière indépendante. - Chaque planification transmet sa propre charge utile
inputau point d’entrée du workflow. - Les modifications de la planification nécessitent un redémarrage du worker pour être prises en compte.