Queries

Les queries permettent aux systèmes externes de lire l'état actuel d'un workflow en cours d'exécution de manière synchrone.

Caractéristiques principales

Caractéristiques principales

Les queries utilisent une communication synchrone et sont en lecture seule — elles ne doivent pas modifier l'état du workflow. Elles peuvent être appelées à tout moment pendant l'exécution, mais doivent retourner rapidement, ce qui les rend inadaptées aux opérations de longue durée.

Exposition des queries

Exposition des queries

Le workflow ci-dessous suit sa propre progression au fur et à mesure de son exécution. Le gestionnaire de query get_status lit cet état et le renvoie immédiatement à l'appelant — sans interrompre ni modifier le workflow en cours.

import mistralai.workflows as workflows

@workflows.workflow.define(name="processing_workflow")
class ProcessingWorkflow:
    def __init__(self):
        self.progress = 0.0
        self.completed = False

    @workflows.workflow.query(name="get_status")
    def get_status(self) -> dict:
        return {
            "progress": self.progress,
            "completed": self.completed
        }

    @workflows.workflow.entrypoint
    async def run(self) -> None:
        # Simulate work
        for i in range(1, 11):
            self.progress = i * 10
            await asyncio.sleep(1)
        self.completed = True
Validation des entrées

Validation des entrées

Les gestionnaires de queries peuvent accepter des paramètres comme tout autre gestionnaire, et le SDK valide la charge utile avant d'invoquer le gestionnaire. Les queries valident les charges utiles entrantes par rapport à leurs paramètres déclarés. Les données entrantes sont vérifiées par rapport aux types attendus, et tout champ supplémentaire non déclaré dans la signature du gestionnaire est rejeté. Les échecs de validation renvoient HTTP 422 (Unprocessable Entity) avec un message d'erreur descriptif.

Pour les structures d'entrée complexes, utilisez des modèles Pydantic. Cela vous permet de passer des paramètres structurés à un gestionnaire de query tout en conservant une sécurité de type complète :

import pydantic

class StatusRequest(pydantic.BaseModel):
    include_details: bool = False

@workflows.workflow.query(name="get_status")
def get_status(self, req: StatusRequest) -> dict:
    result = {"progress": self.progress}
    if req.include_details:
        result["steps"] = self.completed_steps
    return result
Envoi d'une query

Envoi d'une query

Une fois votre workflow en cours d'exécution, vous pouvez interroger son état à tout moment depuis l'extérieur en utilisant le SDK ou l'API.

from mistralai.client import Mistral

client = Mistral(api_key="your_api_key")

result = client.workflows.executions.query_workflow_execution(
    execution_id="my-execution-id",
    name="get_status",
)
print(result.model_dump_json(indent=2))
Comparaison

Comparaison

FonctionnalitéType de communicationModifie l'étatRenvoie une valeurPeut exécuter des activités
SignalAsynchroneOuiNonNon
QuerySynchroneNonOuiNon
UpdateSynchroneOuiOuiOui