Sessions workers persistantes
Les sessions workers persistantes acheminent plusieurs activités vers la même instance de worker, ce qui permet de réutiliser les ressources et de conserver l’état entre différents appels d’activité.
Quand utiliser
Optez pour les sessions persistantes lorsque plusieurs activités consécutives doivent accéder à la même ressource coûteuse : un modèle de ML déjà chargé, une connexion base de données ouverte, un cache préchauffé. En reliant tous les appels au même worker, vous chargez la ressource une seule fois et la réutilisez pour toute la séquence, au lieu de payer le coût d’initialisation à chaque activité. Même logique pour les traitements avec état : si l’étape N+1 dépend d’un état en mémoire construit à l’étape N, une session persistante garantit l’accès à cet état sans avoir à le sérialiser via l’orchestrateur.
Le compromis concerne la disponibilité. Une session persistante est liée à un worker particulier : si ce worker tombe en panne ou est arrêté, la session se termine et les activités restantes doivent redémarrer sur un autre worker (potentiellement avec un cache froid). Les sessions longues mobilisent aussi durablement la capacité du worker, ce qui est à éviter pour des workflows devant tourner pendant des heures, ou quand le coût d’initialisation à chaque activité reste faible.
Utilisation de base
from mistralai.workflows import run_sticky_worker_session
# État côté worker
_loaded_model = None
@workflows.activity(sticky_to_worker=True)
async def load_model() -> None:
global _loaded_model
if _loaded_model is None:
_loaded_model = load_expensive_model()
@workflows.activity(sticky_to_worker=True)
async def predict(data: dict) -> dict:
return _loaded_model.predict(data)
@workflows.workflow.define(name="ml-inference")
class MLInferenceWorkflow:
@workflows.workflow.entrypoint
async def execute(self, batch: list[dict]) -> list[dict]:
# Toutes les activités s’exécutent sur le même worker
async with run_sticky_worker_session():
await load_model() # Chargé une fois
results = [await predict(item) for item in batch]
return resultsRéutilisation de session
Capturez explicitement une session à réutiliser dans plusieurs contextes :
@workflows.workflow.define(name="multi-batch")
class MultiBatchWorkflow:
@workflows.workflow.entrypoint
async def execute(self, batches: list[list[dict]]) -> list[list[dict]]:
# Capture le worker une seule fois
session = await get_sticky_worker_session()
all_results = []
for batch in batches:
async with run_sticky_worker_session(session):
results = [await process_item(item) for item in batch]
all_results.append(results)
return all_resultsLimites
État uniquement en mémoire : l’état au niveau du worker vit dans la mémoire du processus. Il est perdu en cas de redémarrage ou redéploiement du worker et n’est pas partagé entre différents workers. Utilisez une base de données ou un stockage externe pour tout état qui doit persister ou être accessible ailleurs.
La session se rompt en cas d’échec du worker :
- Si le worker tombe en panne ou est arrêté, la session s’arrête.
- Les activités suivantes sont dirigées vers un autre worker.
- Concevez les activités pour supporter les redémarrages à froid.
Contention sur les ressources du worker :
- Les sessions longues mobilisent durablement la capacité d’un worker.
- Peut générer des points chauds si de nombreux workflows ciblent le même worker.
- Surveillez l’utilisation des workers pour éviter la saturation des ressources.
Le paramètre sticky_to_worker ne s’applique pas aux activités locales, qui s’exécutent déjà dans le processus worker du workflow.
Pour une comparaison des différents types d’activités (régulières, persistantes ou locales), consultez Choisir un type d’activité sur la page parente Activités.