Injection de dépendances
L'injection de dépendances permet de passer proprement des ressources partagées (connexions à une base de données, clients d'API, configurations) à des activités sans avoir à les créer manuellement à chaque appel.
Guide de démarrage
Ajoutez un paramètre dans une activité en utilisant Depends(provider) et le worker transmettra le résultat du provider à chaque appel :
import mistralai.workflows as workflows
from mistralai.workflows import Depends
async def get_db() -> Database:
return await Database.connect("postgres://...")
@workflows.activity()
async def insert_user(name: str, db: Database = Depends(get_db)) -> int:
return await db.insert(name)Le provider (get_db) est appelé une seule fois au démarrage du worker. Son résultat est réutilisé pour chaque appel à une activité.
Durée de vie du provider : les dépendances déclarées via Depends() sont initialisées une fois au démarrage du worker et partagées entre toutes les exécutions d’activités. La même instance est donc réutilisée à chaque appel : pools de connexions, clients d’API et objets de configuration sont créés une seule fois. Assurez-vous que la valeur renvoyée est sans risque à partager entre activités concurrentes.
Fonctionnement
Le système prend en charge différents styles de providers. Choisissez celui qui correspond au cycle de vie de votre ressource :
- Fonctions synchrones : valeurs de configuration simples ou objets en mémoire
- Fonctions asynchrones : dépendances nécessitant des opérations d’E/S, comme des connexions à une base de données
- Gestionnaires de contexte (sync ou async) : ressources nécessitant un nettoyage
- Générateurs (sync ou async) : flux de données ou cycles de vie complexes
Quand vous déclarez une dépendance dans votre activité, le système invoque le provider, gère son cycle de vie et transmet la valeur comme valeur par défaut du paramètre.
Définir des dépendances
Provider par fonction synchrone
Pour des valeurs de configuration ou objets qui ne nécessitent pas d'initialisation asynchrone :
def get_config() -> dict:
"""Fournit la configuration de l'application"""
return {
"timeout": 30,
"retries": 3,
"api_url": "https://api.example.com"
}Provider par fonction asynchrone
Pour les dépendances nécessitant une initialisation asynchrone, comme une connexion à une base de données :
async def get_db_connection() -> DatabaseConnection:
"""Crée et retourne une connexion à la base de données"""
conn = await DatabaseConnection.create("postgres://user:pass@localhost/db")
return connProvider via gestionnaire de contexte
Pour les ressources qui nécessitent un nettoyage propre, comme les sessions requérant une déconnexion :
from contextlib import contextmanager
@contextmanager
def get_logged_in_session():
"""Fournit une session avec gestion automatique de connexion/déconnexion"""
session = Session()
session.login()
try:
yield session
finally:
session.logout()Provider via gestionnaire de contexte asynchrone
Pour les ressources asynchrones nécessitant un nettoyage, par exemple une connexion à une base de données :
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db_connection_with_cleanup():
"""Fournit une connexion base de données avec nettoyage automatique"""
conn = await DatabaseConnection.create("postgres://...")
try:
yield conn
finally:
await conn.close()Provider par générateur
Pour du flux de données ou une gestion de ressources complexe :
def get_data_stream():
"""Fournit un flux de données"""
with open("data.txt") as f:
for line in f:
yield line.strip()Provider par générateur asynchrone
Pour des sources de données asynchrones en streaming :
import aiofiles
async def get_async_data_stream():
"""Fournit un flux de données asynchrone"""
async with aiofiles.open("data.txt") as f:
async for line in f:
yield line.strip()Utiliser les dépendances dans des activités
Les activités peuvent déclarer leurs dépendances via le marqueur Depends(). Le système fournit automatiquement ces dépendances à l’exécution de l’activité :
import asyncio
import mistralai.workflows as workflows
from mistralai.workflows import Depends
from contextlib import contextmanager
from pydantic import BaseModel
def get_config() -> dict:
return {"timeout": 30, "retries": 3, "api_url": "https://api.example.com"}
class FakeDB:
def __init__(self):
self.records = []
def insert(self, name: str):
self.records.append(name)
return len(self.records)
async def get_db() -> FakeDB:
return FakeDB()
@contextmanager
def get_session():
session = {"logged_in": True, "events": []}
try:
yield session
finally:
session["logged_in"] = False
@workflows.activity()
async def create_user(
name: str,
db: FakeDB = Depends(get_db),
config: dict = Depends(get_config),
session: dict = Depends(get_session),
) -> dict:
record_id = db.insert(name)
session["events"].append("user_created")
return {
"status": "success",
"record_id": record_id,
"timeout_used": config["timeout"],
}
class Input(BaseModel):
name: str
@workflows.workflow.define(name="di_workflow")
class DIWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: Input) -> dict:
return await create_user(params.name)
async def main():
result = await workflows.execute_workflow(
DIWorkflow,
params=Input(name="Alice"),
)
print(result)Cas d’usage courants
Le même motif Depends(provider) s’applique à toute ressource partagée. Deux exemples récurrents :
Connexions à une base de données — fournissez une factory de sessions asynchrone avec nettoyage automatique :
async def get_db_session() -> AsyncIterator[AsyncSession]:
engine = await create_db_engine("postgres://...")
SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False)
async with SessionLocal() as session:
try:
yield session
finally:
await session.close()
@workflows.activity()
async def write_record(payload: dict, session: AsyncSession = Depends(get_db_session)) -> int:
record = Record(**payload)
session.add(record)
await session.commit()
return record.idClients d’API — initialisez une seule fois et réutilisez la connexion persistante :
import os
import mistralai.workflows as workflows
from mistralai.workflows import Depends
async def get_payment_client() -> PaymentServiceClient:
client = PaymentServiceClient(api_key=os.environ["PAYMENT_API_KEY"])
await client.initialize()
return client
@workflows.activity()
async def charge(user_id: str, amount: int, client: PaymentServiceClient = Depends(get_payment_client)) -> str:
return await client.charge(user_id=user_id, amount_cents=amount)