Limitation du débit dans les workflows

La limitation du débit est un aspect essentiel de la gestion des workflows, permettant de contrôler l’utilisation des ressources et d’éviter qu’un workflow ou une activité ne monopolise les ressources partagées.

Fonctionnement de la limitation du débit

Fonctionnement de la limitation du débit

Les limites de débit sont toujours partagées entre tous les workers et workflows d’un même espace de tâches (ou dans la TEMPORAL_TASK_QUEUE si précisé). Le paramètre key contrôle la manière dont les activités partagent ces limites :

Cas 1 : Limite par activité (pas de clé)

Cas 1 : Limite par activité (pas de clé)

Quand aucune clé n’est fournie, la limite de débit s’applique à l’activité elle-même et est partagée par tous les workers et workflows qui l’utilisent.

À utiliser lorsque : vous souhaitez protéger une ressource partagée (comme un client API) qui doit disposer d’une limite globale, quel que soit le workflow qui l’utilise.

Cas 2 : Limite transversale avec clé

Cas 2 : Limite transversale avec clé

Quand une clé est fournie, plusieurs activités différentes qui utilisent la même clé partagent la même limite de débit à travers tous les workers et workflows.

À utiliser lorsque : il est nécessaire de coordonner une limite globale sur plusieurs activités différentes (par exemple, limiter le nombre total d’appels API en lecture, écriture et suppression utilisant le même service externe).

Exemple : Client API mutualisé avec limite de débit (pas de clé)

Exemple : Client API mutualisé avec limite de débit (pas de clé)

import mistralai.workflows as workflows
from mistralai.client import Mistral
from mistralai.workflows import Depends
from pydantic import BaseModel

def get_mistral_client() -> Mistral:
    """Creates a shared chat completion client with rate limiting"""
    client = Mistral(
        api_key="your_api_key",
    )
    return client

class CompletionParams(BaseModel):
    model: str
    messages: list

@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=100))
async def generate_chat_response(
    params: CompletionParams,
    client: Mistral = Depends(get_mistral_client)
) -> dict:
    """Generates a chat response using a shared client"""
    # This activity can be called from multiple workflows
    # but will share the same rate limit across all of them
    return await client.chat.complete_async(model=params.model, messages=params.messages)

Comportement : Tous les workflows appelant generate_chat_response partagent la même limite de 100 exécutions/seconde. Si le workflow A effectue 60 appels et le workflow B 50 appels dans la même seconde, ils consomment le même quota.

Avec une clé : Ajoutez key="mistral_api" pour partager cette limite sur plusieurs activités (par exemple, generate_chat_response, generate_embeddings, moderate_content).

Clés de limitation de débit et limites de pollers

Clés de limitation de débit et limites de pollers

Chaque clé de limitation distincte entraîne la création d’un worker Temproral interne supplémentaire par le SDK. Ajoutés au worker principal et au worker sticky-activity, un même processus worker exécute :

  • 1 worker principal (workflows + activités par défaut)
  • 1 worker sticky-activity
  • 1 worker supplémentaire par clé de limitation distincte

Chaque worker interne ouvre 5 pollers workflows et 5 pollers activités. Avec de nombreuses clés différentes, le nombre total de pollers augmente rapidement :

Clés distinctesWorkers internesNombre total de pollers
0220
5770
1214140

Si le total dépasse le quota de pollers concurrents de votre namespace, vous verrez :

ResourceExhausted: namespace concurrent poller limit exceeded

Réduisez au maximum le nombre de clés distinctes. Les activités utilisant la même ressource externe et le même budget peuvent toutes utiliser la même clé :

# ❌ À éviter — chaque clé unique ajoute un worker interne
@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=10, key="ocr"))
async def run_ocr(params: OcrParams) -> OcrResult: ...

@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=10, key="classify"))
async def classify_document(params: ClassifyParams) -> ClassifyResult: ...


# ✅ Préférer — partager une seule clé pour les activités sur la même ressource
@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=10, key="document_pipeline"))
async def run_ocr(params: OcrParams) -> OcrResult: ...

@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=10, key="document_pipeline"))
async def classify_document(params: ClassifyParams) -> ClassifyResult: ...

La seconde approche utilise un seul worker interne pour les deux activités, ce qui divise par deux le nombre de pollers pour ce couple d’activités.