Limitation du débit dans les workflows
La limitation du débit est un aspect essentiel de la gestion des workflows, permettant de contrôler l’utilisation des ressources et d’éviter qu’un workflow ou une activité ne monopolise les ressources partagées.
Fonctionnement de la limitation du débit
Les limites de débit sont toujours partagées entre tous les workers et workflows d’un même espace de tâches (ou dans la TEMPORAL_TASK_QUEUE si précisé). Le paramètre key contrôle la manière dont les activités partagent ces limites :
Cas 1 : Limite par activité (pas de clé)
Quand aucune clé n’est fournie, la limite de débit s’applique à l’activité elle-même et est partagée par tous les workers et workflows qui l’utilisent.
À utiliser lorsque : vous souhaitez protéger une ressource partagée (comme un client API) qui doit disposer d’une limite globale, quel que soit le workflow qui l’utilise.
Cas 2 : Limite transversale avec clé
Quand une clé est fournie, plusieurs activités différentes qui utilisent la même clé partagent la même limite de débit à travers tous les workers et workflows.
À utiliser lorsque : il est nécessaire de coordonner une limite globale sur plusieurs activités différentes (par exemple, limiter le nombre total d’appels API en lecture, écriture et suppression utilisant le même service externe).
import mistralai.workflows as workflows
from mistralai.client import Mistral
from mistralai.workflows import Depends
from pydantic import BaseModel
def get_mistral_client() -> Mistral:
"""Creates a shared chat completion client with rate limiting"""
client = Mistral(
api_key="your_api_key",
)
return client
class CompletionParams(BaseModel):
model: str
messages: list
@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=100))
async def generate_chat_response(
params: CompletionParams,
client: Mistral = Depends(get_mistral_client)
) -> dict:
"""Generates a chat response using a shared client"""
# This activity can be called from multiple workflows
# but will share the same rate limit across all of them
return await client.chat.complete_async(model=params.model, messages=params.messages)Comportement : Tous les workflows appelant generate_chat_response partagent la même limite de 100 exécutions/seconde. Si le workflow A effectue 60 appels et le workflow B 50 appels dans la même seconde, ils consomment le même quota.
Avec une clé : Ajoutez key="mistral_api" pour partager cette limite sur plusieurs activités (par exemple, generate_chat_response, generate_embeddings, moderate_content).
Clés de limitation de débit et limites de pollers
Chaque clé de limitation distincte entraîne la création d’un worker Temproral interne supplémentaire par le SDK. Ajoutés au worker principal et au worker sticky-activity, un même processus worker exécute :
- 1 worker principal (workflows + activités par défaut)
- 1 worker sticky-activity
- 1 worker supplémentaire par clé de limitation distincte
Chaque worker interne ouvre 5 pollers workflows et 5 pollers activités. Avec de nombreuses clés différentes, le nombre total de pollers augmente rapidement :
| Clés distinctes | Workers internes | Nombre total de pollers |
|---|---|---|
| 0 | 2 | 20 |
| 5 | 7 | 70 |
| 12 | 14 | 140 |
Si le total dépasse le quota de pollers concurrents de votre namespace, vous verrez :
ResourceExhausted: namespace concurrent poller limit exceededRéduisez au maximum le nombre de clés distinctes. Les activités utilisant la même ressource externe et le même budget peuvent toutes utiliser la même clé :
# ❌ À éviter — chaque clé unique ajoute un worker interne
@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=10, key="ocr"))
async def run_ocr(params: OcrParams) -> OcrResult: ...
@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=10, key="classify"))
async def classify_document(params: ClassifyParams) -> ClassifyResult: ...
# ✅ Préférer — partager une seule clé pour les activités sur la même ressource
@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=10, key="document_pipeline"))
async def run_ocr(params: OcrParams) -> OcrResult: ...
@workflows.activity(rate_limit=workflows.RateLimit(time_window_in_sec=1, max_execution=10, key="document_pipeline"))
async def classify_document(params: ClassifyParams) -> ClassifyResult: ...La seconde approche utilise un seul worker interne pour les deux activités, ce qui divise par deux le nombre de pollers pour ce couple d’activités.