Programmation des workflows

Exécutez des workflows de manière récurrente selon une planification cron, avec des politiques pour la reprise et le chevauchement.

Fonctionnement

Fonctionnement

Les workers enregistrent leurs planifications de workflows auprès de la plateforme au démarrage. La plateforme déclenche le workflow à l’heure prévue et applique les politiques définies pour la reprise et le chevauchement.

Note

Les modifications de la planification nécessitent un redémarrage du worker pour prendre effet. Le worker lit les définitions de planification uniquement au démarrage : les changements des expressions cron ou des politiques dans votre code ne sont pas pris en compte par un worker déjà lancé.

Définir des planifications

Définir des planifications

Ajoutez des planifications à vos workflows à l’aide d’expressions cron. Les planifications utilisent UTC par défaut : convertissez depuis votre fuseau horaire local avant d’écrire l’expression.

import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition

schedule = ScheduleDefinition(
    input={"report_type": "daily"},
    cron_expressions=["0 0 * * *"]  # Tous les jours à minuit UTC
)

@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str = "daily") -> None:
        # Générer le rapport
        pass
Définir les politiques de planification

Définir les politiques de planification

Les politiques contrôlent ce qui se passe lorsque des exécutions sont manquées ou se chevauchent :

  • catchup_window_seconds — Si la plateforme était indisponible ou a manqué des exécutions planifiées, elle déclenche rétroactivement toutes les exécutions manquées dans cette fenêtre temporelle. Les exécutions plus anciennes sont ignorées.
  • overlap — Définit le comportement en cas de nouvelle exécution prévue alors que la précédente est toujours en cours :
    • SKIP — ignore la nouvelle exécution. À utiliser si seules les données les plus récentes vous importent et que des exécutions parallèles feraient des doublons (ex : synchronisations périodiques).
    • BUFFER_ONE — place une exécution en attente, ignore les suivantes tant que celle en attente n’a pas commencé. Pour garantir au moins une exécution de rattrapage sans accumulation excessive.
    • ALLOW_ALL — démarre chaque exécution planifiée en concurrence. À réserver aux cas où chaque exécution est indépendante et la charge supporte le parallélisme.
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy

# Remplacer la politique de planification par défaut
schedule_policy = SchedulePolicy(
    catchup_window_seconds=86400,  # Autoriser 1 jour de rattrapage
    overlap=ScheduleOverlapPolicy.SKIP,  # Ignorer les exécutions qui se chevauchent
)

schedule = ScheduleDefinition(
    input={"report_type": "daily"},
    cron_expressions=["0 0 * * *"],  # Tous les jours à minuit UTC
    policy=schedule_policy
)

@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str = "daily") -> None:
        # Générer le rapport
        pass
Gestion des planifications

Gestion des planifications

Les planifications peuvent être gérées de deux manières :

  • Dans le code (recommandé dans la plupart des cas) : déclarez les planifications dans le workflow via @workflows.workflow.define(..., schedules=[...]). Les workers les enregistrent au démarrage. Toute modification du ScheduleDefinition sera prise en compte après un redémarrage du worker.
  • Via l’API REST : POST /v1/workflows/schedules pour créer, GET /v1/workflows/schedules pour lister, et DELETE /v1/workflows/schedules/{schedule_id} pour supprimer. Utilisez l’API si vous souhaitez gérer indépendamment les planifications des déploiements de workers (ex : planifications ponctuelles ou pilotées par un système externe).
Avertissement

Tous les workers doivent utiliser la même définition de planification : chaque worker d’un workflow enregistre sa propre copie des planifications définies dans le code. Si deux workers exécutent des versions différentes d’un workflow avec des ScheduleDefinition divergentes, la plateforme détecte des enregistrements en conflit et le comportement devient indéterminé. Déployez les mises à jour de workers de façon à ce qu’ils convergent tous sur la même définition.

Exemple complet

Exemple complet

import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy

# Exécuter chaque samedi à 3h00 UTC
backup_schedule = ScheduleDefinition(
    input={"retention_days": 30},
    cron_expressions=["0 3 * * 6"],
    policy=SchedulePolicy(
        catchup_window_seconds=604800,  # 7 jours
        overlap=ScheduleOverlapPolicy.SKIP,
    )
)

@workflows.workflow.define(name="database_backup_workflow", schedules=[backup_schedule])
class DatabaseBackupWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, retention_days: int = 30) -> None:
        print(f"Début de la sauvegarde avec une rétention de {retention_days} jours")
        # Implémentation de la sauvegarde ici

# Démarrer le worker avec :
# asyncio.run(workflows.run_worker([DatabaseBackupWorkflow]))
Notes

Notes

  • Les expressions cron suivent la syntaxe standard à 5 champs (minute hour day-of-month month day-of-week).
  • Un workflow peut avoir plusieurs cron_expressions : elles sont toutes déclenchées de manière indépendante.
  • Chaque planification transmet sa propre charge utile input au point d’entrée du workflow.
  • Les modifications de la planification nécessitent un redémarrage du worker pour être prises en compte.