Injection de dépendances

L'injection de dépendances permet de passer proprement des ressources partagées (connexions à une base de données, clients d'API, configurations) à des activités sans avoir à les créer manuellement à chaque appel.

Guide de démarrage

Guide de démarrage

Ajoutez un paramètre dans une activité en utilisant Depends(provider) et le worker transmettra le résultat du provider à chaque appel :

import mistralai.workflows as workflows
from mistralai.workflows import Depends

async def get_db() -> Database:
    return await Database.connect("postgres://...")

@workflows.activity()
async def insert_user(name: str, db: Database = Depends(get_db)) -> int:
    return await db.insert(name)

Le provider (get_db) est appelé une seule fois au démarrage du worker. Son résultat est réutilisé pour chaque appel à une activité.

Astuce

Durée de vie du provider : les dépendances déclarées via Depends() sont initialisées une fois au démarrage du worker et partagées entre toutes les exécutions d’activités. La même instance est donc réutilisée à chaque appel : pools de connexions, clients d’API et objets de configuration sont créés une seule fois. Assurez-vous que la valeur renvoyée est sans risque à partager entre activités concurrentes.

Fonctionnement

Fonctionnement

Le système prend en charge différents styles de providers. Choisissez celui qui correspond au cycle de vie de votre ressource :

  1. Fonctions synchrones : valeurs de configuration simples ou objets en mémoire
  2. Fonctions asynchrones : dépendances nécessitant des opérations d’E/S, comme des connexions à une base de données
  3. Gestionnaires de contexte (sync ou async) : ressources nécessitant un nettoyage
  4. Générateurs (sync ou async) : flux de données ou cycles de vie complexes

Quand vous déclarez une dépendance dans votre activité, le système invoque le provider, gère son cycle de vie et transmet la valeur comme valeur par défaut du paramètre.

Définir des dépendances

Définir des dépendances

Provider par fonction synchrone

Provider par fonction synchrone

Pour des valeurs de configuration ou objets qui ne nécessitent pas d'initialisation asynchrone :

def get_config() -> dict:
    """Fournit la configuration de l'application"""
    return {
        "timeout": 30,
        "retries": 3,
        "api_url": "https://api.example.com"
    }
Provider par fonction asynchrone

Provider par fonction asynchrone

Pour les dépendances nécessitant une initialisation asynchrone, comme une connexion à une base de données :

async def get_db_connection() -> DatabaseConnection:
    """Crée et retourne une connexion à la base de données"""
    conn = await DatabaseConnection.create("postgres://user:pass@localhost/db")
    return conn
Provider via gestionnaire de contexte

Provider via gestionnaire de contexte

Pour les ressources qui nécessitent un nettoyage propre, comme les sessions requérant une déconnexion :

from contextlib import contextmanager

@contextmanager
def get_logged_in_session():
    """Fournit une session avec gestion automatique de connexion/déconnexion"""
    session = Session()
    session.login()
    try:
        yield session
    finally:
        session.logout()
Provider via gestionnaire de contexte asynchrone

Provider via gestionnaire de contexte asynchrone

Pour les ressources asynchrones nécessitant un nettoyage, par exemple une connexion à une base de données :

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db_connection_with_cleanup():
    """Fournit une connexion base de données avec nettoyage automatique"""
    conn = await DatabaseConnection.create("postgres://...")
    try:
        yield conn
    finally:
        await conn.close()
Provider par générateur

Provider par générateur

Pour du flux de données ou une gestion de ressources complexe :

def get_data_stream():
    """Fournit un flux de données"""
    with open("data.txt") as f:
        for line in f:
            yield line.strip()
Provider par générateur asynchrone

Provider par générateur asynchrone

Pour des sources de données asynchrones en streaming :

import aiofiles

async def get_async_data_stream():
    """Fournit un flux de données asynchrone"""
    async with aiofiles.open("data.txt") as f:
        async for line in f:
            yield line.strip()
Utiliser les dépendances dans des activités

Utiliser les dépendances dans des activités

Les activités peuvent déclarer leurs dépendances via le marqueur Depends(). Le système fournit automatiquement ces dépendances à l’exécution de l’activité :

import asyncio
import mistralai.workflows as workflows
from mistralai.workflows import Depends
from contextlib import contextmanager
from pydantic import BaseModel


def get_config() -> dict:
    return {"timeout": 30, "retries": 3, "api_url": "https://api.example.com"}


class FakeDB:
    def __init__(self):
        self.records = []

    def insert(self, name: str):
        self.records.append(name)
        return len(self.records)


async def get_db() -> FakeDB:
    return FakeDB()


@contextmanager
def get_session():
    session = {"logged_in": True, "events": []}
    try:
        yield session
    finally:
        session["logged_in"] = False


@workflows.activity()
async def create_user(
    name: str,
    db: FakeDB = Depends(get_db),
    config: dict = Depends(get_config),
    session: dict = Depends(get_session),
) -> dict:
    record_id = db.insert(name)
    session["events"].append("user_created")
    return {
        "status": "success",
        "record_id": record_id,
        "timeout_used": config["timeout"],
    }


class Input(BaseModel):
    name: str


@workflows.workflow.define(name="di_workflow")
class DIWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params: Input) -> dict:
        return await create_user(params.name)


async def main():
    result = await workflows.execute_workflow(
        DIWorkflow,
        params=Input(name="Alice"),
    )
    print(result)
Cas d’usage courants

Cas d’usage courants

Le même motif Depends(provider) s’applique à toute ressource partagée. Deux exemples récurrents :

Connexions à une base de données — fournissez une factory de sessions asynchrone avec nettoyage automatique :

async def get_db_session() -> AsyncIterator[AsyncSession]:
    engine = await create_db_engine("postgres://...")
    SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False)
    async with SessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

@workflows.activity()
async def write_record(payload: dict, session: AsyncSession = Depends(get_db_session)) -> int:
    record = Record(**payload)
    session.add(record)
    await session.commit()
    return record.id

Clients d’API — initialisez une seule fois et réutilisez la connexion persistante :

import os
import mistralai.workflows as workflows
from mistralai.workflows import Depends

async def get_payment_client() -> PaymentServiceClient:
    client = PaymentServiceClient(api_key=os.environ["PAYMENT_API_KEY"])
    await client.initialize()
    return client

@workflows.activity()
async def charge(user_id: str, amount: int, client: PaymentServiceClient = Depends(get_payment_client)) -> str:
    return await client.charge(user_id=user_id, amount_cents=amount)