Queries
Les queries permettent aux systèmes externes de lire l'état actuel d'un workflow en cours d'exécution de manière synchrone.
Caractéristiques principales
Les queries utilisent une communication synchrone et sont en lecture seule — elles ne doivent pas modifier l'état du workflow. Elles peuvent être appelées à tout moment pendant l'exécution, mais doivent retourner rapidement, ce qui les rend inadaptées aux opérations de longue durée.
Exposition des queries
Le workflow ci-dessous suit sa propre progression au fur et à mesure de son exécution. Le gestionnaire de query get_status lit cet état et le renvoie immédiatement à l'appelant — sans interrompre ni modifier le workflow en cours.
import mistralai.workflows as workflows
@workflows.workflow.define(name="processing_workflow")
class ProcessingWorkflow:
def __init__(self):
self.progress = 0.0
self.completed = False
@workflows.workflow.query(name="get_status")
def get_status(self) -> dict:
return {
"progress": self.progress,
"completed": self.completed
}
@workflows.workflow.entrypoint
async def run(self) -> None:
# Simulate work
for i in range(1, 11):
self.progress = i * 10
await asyncio.sleep(1)
self.completed = TrueValidation des entrées
Les gestionnaires de queries peuvent accepter des paramètres comme tout autre gestionnaire, et le SDK valide la charge utile avant d'invoquer le gestionnaire. Les queries valident les charges utiles entrantes par rapport à leurs paramètres déclarés. Les données entrantes sont vérifiées par rapport aux types attendus, et tout champ supplémentaire non déclaré dans la signature du gestionnaire est rejeté. Les échecs de validation renvoient HTTP 422 (Unprocessable Entity) avec un message d'erreur descriptif.
Pour les structures d'entrée complexes, utilisez des modèles Pydantic. Cela vous permet de passer des paramètres structurés à un gestionnaire de query tout en conservant une sécurité de type complète :
import pydantic
class StatusRequest(pydantic.BaseModel):
include_details: bool = False
@workflows.workflow.query(name="get_status")
def get_status(self, req: StatusRequest) -> dict:
result = {"progress": self.progress}
if req.include_details:
result["steps"] = self.completed_steps
return resultEnvoi d'une query
Une fois votre workflow en cours d'exécution, vous pouvez interroger son état à tout moment depuis l'extérieur en utilisant le SDK ou l'API.
from mistralai.client import Mistral
client = Mistral(api_key="your_api_key")
result = client.workflows.executions.query_workflow_execution(
execution_id="my-execution-id",
name="get_status",
)
print(result.model_dump_json(indent=2))Comparaison
| Fonctionnalité | Type de communication | Modifie l'état | Renvoie une valeur | Peut exécuter des activités |
|---|---|---|---|---|
| Signal | Asynchrone | Oui | Non | Non |
| Query | Synchrone | Non | Oui | Non |
| Update | Synchrone | Oui | Oui | Oui |