Principes de base d’une activité

Les activités sont les unités de travail d’un workflow. Pour un aperçu conceptuel (atomicité, reprises, relecture), consultez Concepts de base > Activités. Cette page explique comment les définir, les configurer et les structurer.

Définir une activité

Définir une activité

Structure de base d’une activité :

import mistralai.workflows as workflows

@workflows.activity()
async def my_activity(input_data: str, count: int = 1) -> dict:
    """Activity implementation"""
    # Effectuez le travail ici
    return {"result": input_data, "processed_count": count}
Fonctionnalités clés des activités

Fonctionnalités clés des activités

Nommer une activité

Nommer une activité

Par défaut, une activité est enregistrée avec le nom de la fonction Python. Le paramètre name permet d’attribuer un nom personnalisé ; c’est cet identifiant qui sert à l’enregistrement, à l’acheminement à l’exécution et qui apparaît dans les traces et la console.

@workflows.activity(
    name="custom_activity_name",  # Utilisé pour l'enregistrement, l’exécution et l’observabilité
)

En savoir plus sur l’observabilité et la gestion des noms

Timeouts

Timeouts

start_to_close_timeout définit la durée maximale d’exécution d’une activité (du lancement au retour du résultat). Si cette durée est dépassée, l’activité est interrompue et considérée comme un échec (ce qui peut déclencher une reprise). Sans timeout, une activité non réactive bloque le workflow indéfiniment.

from datetime import timedelta

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=10)
)
Politiques de reprise

Politiques de reprise

En cas d’échec, une activité est automatiquement relancée jusqu’à retry_policy_max_attempts fois. Le paramètre retry_policy_backoff_coefficient règle la progression exponentielle de l’attente entre deux reprises : un coefficient de 2.0 double le délai à chaque tentative (par exemple 1s, 2s, 4s, 8s). Une fois toutes les tentatives épuisées, l’échec est propagé au workflow.

@workflows.activity(
    retry_policy_max_attempts=5,
    retry_policy_backoff_coefficient=2.0
)
Affinité avec un worker

Affinité avec un worker

Pour associer une séquence d’activités au même worker (afin de partager une ressource en mémoire, comme un modèle chargé), placez sticky_to_worker=True et exécutez-les dans run_sticky_worker_session(). Pour le schéma complet, voir Sessions worker sticky.

Timeout de heartbeat

Timeout de heartbeat

Détectez les activités non réactives en exigeant des signaux de heartbeat réguliers :

from datetime import timedelta
from mistralai.workflows import activity

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=30),
    heartbeat_timeout=timedelta(seconds=30)
)
async def long_running_task(items: list[str]) -> dict:
    results = []
    for i, item in enumerate(items):
        result = await process_item(item)
        results.append(result)
        # Signaler l’avancement pour éviter le timeout
        activity.heartbeat({"processed": i + 1, "total": len(items)})
    return {"results": results}

Quand heartbeat_timeout est défini, les activités doivent régulièrement appeler activity.heartbeat(). Si aucun heartbeat n’est reçu dans le délai imparti, l’activité est considérée comme échouée et une reprise est déclenchée. Ce mécanisme permet à la plateforme de réagir sans attendre la fin du start_to_close_timeout.

Cette sous-section fait référence pour le comportement des heartbeats ; les sections Concepts de base > Activités et Concepts de base > Workers renvoient ici.

Note
Granularité et gestion des erreurs

Granularité et gestion des erreurs

Nous recommandons de garder les activités aussi granulaires que possible. Découpez les tâches complexes en sous-activités simples, chacune encapsulant toute la logique susceptible d’échouer. Cette approche présente plusieurs avantages :

  • Meilleure isolation des erreurs : des activités simples facilitent l’identification et le traitement des échecs. En cas d’erreur, seule l’activité concernée est reprise.
  • Débogage facilité : plus une activité est courte, plus le point d’échec précis est facile à cibler.
  • Optimisation des reprises : des activités idempotentes, relançables indépendamment, garantissent que seule la partie du processus en échec est rejouée — un gain de temps et de ressources.

Par exemple, pour une suite : récupérer des données depuis une API, les traiter, puis les stocker en base, définissez trois activités distinctes :

  1. Récupération des données via l’API
  2. Traitement des données
  3. Stockage en base de données
Activités imbriquées

Activités imbriquées

Avertissement

Limite de reprise pour les activités imbriquées : Si une activité en appelle une autre, seule l’activité « principale » constitue la limite de reprise. L’état est sauvegardé avant cette activité, pas avant l’appel imbriqué. En cas d’échec de l’activité imbriquée, l’ensemble de l’activité principale est reprise depuis le début, y compris tout le travail déjà effectué avant l’appel. Pour isoler chaque étape, composez les activités directement au niveau du workflow.

Lorsque vous concevez des activités, tenez compte de la gestion de l’état, notamment dans des activités imbriquées. Si une activité est encapsulée dans une autre, seule l’activité parente est considérée pour les reprises et la gestion de l’état. L’état est sauvegardé avant l’activité imbriquée, et seule l’activité parente sera relancée en cas d’échec.

import mistralai.workflows as workflows

@workflows.activity()
async def parent_activity(input_data: str) -> dict:
    """Activité principale qui encapsule une activité imbriquée."""
    # Logique ici
    nested_result = await nested_activity(input_data)
    return {"result_data": nested_result["result_data"]}

@workflows.activity()
async def nested_activity(input_data: str) -> dict:
    """Activité imbriquée."""
    # Logique spécifique à cette activité
    return {"result_data": "processed_data"}

Dans cet exemple, si nested_activity échoue, toute l’activité parent_activity sera rejouée, pas uniquement l’activité imbriquée. L’état est sauvegardé avant l’appel de l’activité imbriquée.