Signals
Les signaux permettent aux systèmes externes d'envoyer des messages aux workflows en cours d'exécution de manière asynchrone.
Caractéristiques principales
Les signaux utilisent une communication asynchrone et peuvent être envoyés à tout moment pendant l'exécution d'un workflow. Les workflows doivent gérer explicitement les signaux, et les signaux peuvent transporter des données utiles.
Écouter des signaux
Le workflow ci-dessous écoute indéfiniment les signaux add_notification. Chaque fois qu'un signal arrive, il ajoute le message à une liste interne et le traite. L'appel wait_condition suspend l'exécution jusqu'à ce que de nouvelles notifications soient disponibles, évitant ainsi l'attente active.
import mistralai.workflows as workflows
@workflows.workflow.define(name="notification_workflow")
class NotificationWorkflow:
def __init__(self):
self.notifications = []
@workflows.workflow.signal(name="add_notification")
async def add_notification(self, message: str, priority: int = 1) -> None:
self.notifications.append(message)
print(f"Received notification: {message} (Priority: {priority})")
@workflows.workflow.entrypoint
async def run(self) -> None:
print("Workflow started, waiting for notifications...")
while True:
await workflows.workflow.wait_condition(lambda: len(self.notifications) > 0)
print(f"Processing {len(self.notifications)} notifications")
self.notifications.clear()Validation des données d'entrée
Les gestionnaires de signaux déclarent explicitement leurs paramètres attendus, et le SDK applique ce contrat sur chaque payload entrant. Les signaux valident les payloads entrants par rapport à leurs paramètres déclarés. Les données entrantes sont vérifiées par rapport aux types attendus, et tous les champs supplémentaires non déclarés dans la signature du gestionnaire sont rejetés. Les échecs de validation renvoient HTTP 422 (Unprocessable Entity) avec un message d'erreur descriptif.
@workflows.workflow.signal(name="add_notification")
async def add_notification(self, message: str, priority: int = 1) -> None:
# Only 'message' and 'priority' fields are accepted
# Extra fields like {"message": "hi", "extra": "bad"} will be rejected
...Pour des structures d'entrée complexes, utilisez des modèles Pydantic. Le SDK désérialisera automatiquement le payload entrant dans le modèle et validera chaque champ :
import pydantic
class UserProfile(pydantic.BaseModel):
name: str
address: str
@workflows.workflow.signal(name="update_profile")
async def update_profile(self, profile: UserProfile) -> None:
self._profile = profileEnvoyer un signal
Une fois votre workflow en cours d'exécution et à l'écoute de signaux, vous pouvez en envoyer un depuis l'extérieur en utilisant le SDK ou l'API.
from mistralai.client import Mistral
client = Mistral(api_key="your_api_key")
client.workflows.executions.signal_workflow_execution(
execution_id="my-execution-id",
name="add_notification",
input={"message": "Deployment complete", "priority": 2},
)Comparaison
| Fonctionnalité | Type de communication | Modifie l'état | Renvoie une valeur | Peut exécuter des activités |
|---|---|---|---|---|
| Signal | Asynchrone | Oui | Non | Non |
| Query | Synchrone | Non | Oui | Non |
| Update | Synchrone | Oui | Oui | Oui |