Updates
Les updates permettent à des systèmes externes de modifier l'état d'un workflow et de recevoir une réponse. Contrairement aux signaux, les updates sont synchrones et peuvent retourner des valeurs.
Caractéristiques principales
Les updates utilisent une communication synchrone et retournent une réponse à l'appelant. Contrairement aux signaux, ils peuvent modifier l'état du workflow, retourner des valeurs et exécuter des activités dans le cadre du traitement de la requête.
Gestion des updates
Le workflow ci-dessous expose un gestionnaire update_data qui exécute une activité, met à jour l'état interne et retourne un résultat à l'appelant — le tout en un seul appel synchrone. Cela fait des updates le bon choix lorsque l'appelant a besoin d'une confirmation que l'opération s'est terminée.
import mistralai.workflows as workflows
import asyncio
# Activity definition
@workflows.activity()
async def process_update_data(data: str) -> str:
# Simulate processing
await asyncio.sleep(0.5)
return f"Processed: {data.upper()}"
@workflows.workflow.define(name="data_processing_workflow")
class DataProcessingWorkflow:
def __init__(self):
self.current_value = "default"
@workflows.workflow.update(name="update_data")
async def update_data(self, new_value: str) -> dict:
# Execute an activity as part of the update
processed = await process_update_data(new_value)
# Update workflow state
old_value = self.current_value
self.current_value = processed
return {
"success": True,
"processed_value": processed,
"message": f"Updated from '{old_value}' to '{processed}'"
}
@workflows.workflow.entrypoint
async def run(self) -> None:
print(f"Workflow started with value: {self.current_value}")
# Workflow continues running...Validation des entrées
Les gestionnaires d'updates déclarent leurs paramètres attendus et le SDK valide la charge utile avant l'exécution du gestionnaire. Les updates valident les charges utiles entrantes par rapport à leurs paramètres déclarés. Les données entrantes sont vérifiées par rapport aux types attendus et tout champ supplémentaire non déclaré dans la signature du gestionnaire est rejeté. Les échecs de validation retournent HTTP 422 (Unprocessable Entity) avec un message d'erreur descriptif.
Pour des structures d'entrée complexes, utilisez des modèles Pydantic. C'est particulièrement utile lorsqu'un update comporte plusieurs champs liés qui vont ensemble :
import pydantic
class ConfigUpdate(pydantic.BaseModel):
timeout: int
retry_count: int
@workflows.workflow.update(name="update_config")
async def update_config(self, config: ConfigUpdate) -> dict:
self.config = config
return {"success": True}Envoi d'un update
Une fois votre workflow en cours d'exécution, vous pouvez envoyer un update depuis l'extérieur et recevoir la valeur de retour du gestionnaire de manière synchrone.
from mistralai.client import Mistral
client = Mistral(api_key="your_api_key")
result = client.workflows.executions.update_workflow_execution(
execution_id="my-execution-id",
name="update_data",
input={"new_value": "hello"},
)
print(result.model_dump_json(indent=2))Comparaison
| Fonctionnalité | Type de communication | Modifie l'état | Retourne une valeur | Peut exécuter des activités |
|---|---|---|---|---|
| Signal | Asynchrone | Oui | Non | Non |
| Query | Synchrone | Non | Oui | Non |
| Update | Synchrone | Oui | Oui | Oui |