Activités

Une activité est l’endroit où le travail effectif s’exécute : envoi d’un email, appel à une API externe, exécution d’une requête en base de données, traitement d’un fichier, génération d’une réponse par un modèle d’IA. Toute opération qui interagit avec le monde extérieur, réalise des E/S ou produit des effets de bord doit être placée dans une activité, jamais dans le workflow lui-même.

Les activités sont de simples fonctions asynchrones. Aucune contrainte de déterminisme : elles peuvent appeler n’importe quoi, utiliser n’importe quelle bibliothèque, lire sur le disque, écrire dans une base. La plateforme considère chaque appel d’activité comme une unité atomique. En cas d’échec, l’activité est relancée automatiquement selon une politique configurable. Dès qu’elle réussit, son résultat est enregistré dans l’historique d’exécution et le workflow poursuit.

Puisque les résultats sont persistés, une activité réussie n’est jamais rejouée lors d’un replay : seul son résultat enregistré est utilisé. Cependant, si une activité échoue ou expire pendant son exécution, la plateforme la relance, parfois après que certains effets de bord aient déjà été appliqués partiellement.

Cela signifie que les activités doivent être sûres à réexécuter : une relance après un échec partiel doit laisser le système dans un état cohérent. On parle parfois d’idempotence, mais la propriété recherchée est que les effets de bord convergent vers un état observable stable, pas nécessairement que la sortie soit strictement identique à l’octet près.

Par exemple, une activité LLM peut retourner des complétions différentes à chaque essai, mais l’activité reste réexécutable en toute sécurité, car son effet de bord (enregistrer une complétion) ne s’effectue qu’une seule fois pour chaque tentative réussie.

Définir une activité

Définir une activité

import mistralai.workflows as workflows

@workflows.activity()
async def fetch_user_data(user_id: str) -> dict:
    response = await http_client.get(f"/users/{user_id}")
    return response.json()

On appelle les activités directement avec await dans le code du workflow :

@workflows.workflow.define(name="user_report")
class UserReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, user_id: str) -> dict:
        user = await fetch_user_data(user_id)
        report = await generate_report(user)
        return report
Politique de relance

Politique de relance

Définissez les politiques de relance et les timeouts dans le décorateur. Il est indispensable de renseigner start_to_close_timeout : sans cela, une activité bloquée peut geler indéfiniment :

from datetime import timedelta

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=5),
    retry_policy_max_attempts=3,
    retry_policy_backoff_coefficient=2.0,
)
async def call_external_api(params: ApiParams) -> ApiResponse:
    async with httpx.AsyncClient() as client:
        response = await client.post(params.url, json=params.data)
        return ApiResponse(data=response.json())
Activités longues : battements (heartbeats)

Activités longues : battements (heartbeats)

Pour les activités qui durent plus de quelques minutes, utilisez les battements pour signaler que l’activité est toujours vivante. Un battement est un ping périodique envoyé à la plateforme ; il peut aussi embarquer la progression, afin de permettre à la reprise de repartir depuis le dernier checkpoint après une relance, sans tout recommencer.

Si la plateforme ne reçoit plus de battement dans le délai de heartbeat_timeout défini, elle considère l’instance comme morte et replanifie l’activité sur un autre worker.

from datetime import timedelta
import temporalio.activity

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=30),
    heartbeat_timeout=timedelta(seconds=30),
)
async def process_large_file(path: str) -> int:
    processed = 0
    async for chunk in read_chunks(path):
        await do_work(chunk)
        processed += 1
        temporalio.activity.heartbeat(processed)  # ping + checkpoint
    return processed

Voir Créer des workflows > Activités > Bases pour toutes les options de configuration.

Limitations

Limitations

Utilisez des bibliothèques async pour l’E/S. Les appels bloquants gênent tous les workers en concurrence :

# ✅ HTTP asynchrone
async with httpx.AsyncClient() as client:
    response = await client.post(url, json=data)

# ❌ HTTP bloquant : bloque le worker
response = requests.post(url, json=data)

Limite de 2 Mo en entrée/sortie. Chaque activité est limitée à 2 Mo maximum en entrée et en sortie. Pour l’envoi de payloads plus volumineux, voir Externalisation des charges utiles.