Signals

Les signaux permettent aux systèmes externes d'envoyer des messages aux workflows en cours d'exécution de manière asynchrone.

Caractéristiques principales

Caractéristiques principales

Les signaux utilisent une communication asynchrone et peuvent être envoyés à tout moment pendant l'exécution d'un workflow. Les workflows doivent gérer explicitement les signaux, et les signaux peuvent transporter des données utiles.

Écouter des signaux

Écouter des signaux

Le workflow ci-dessous écoute indéfiniment les signaux add_notification. Chaque fois qu'un signal arrive, il ajoute le message à une liste interne et le traite. L'appel wait_condition suspend l'exécution jusqu'à ce que de nouvelles notifications soient disponibles, évitant ainsi l'attente active.

import mistralai.workflows as workflows

@workflows.workflow.define(name="notification_workflow")
class NotificationWorkflow:
    def __init__(self):
        self.notifications = []

    @workflows.workflow.signal(name="add_notification")
    async def add_notification(self, message: str, priority: int = 1) -> None:
        self.notifications.append(message)
        print(f"Received notification: {message} (Priority: {priority})")

    @workflows.workflow.entrypoint
    async def run(self) -> None:
        print("Workflow started, waiting for notifications...")
        while True:
            await workflows.workflow.wait_condition(lambda: len(self.notifications) > 0)
            print(f"Processing {len(self.notifications)} notifications")
            self.notifications.clear()
Validation des données d'entrée

Validation des données d'entrée

Les gestionnaires de signaux déclarent explicitement leurs paramètres attendus, et le SDK applique ce contrat sur chaque payload entrant. Les signaux valident les payloads entrants par rapport à leurs paramètres déclarés. Les données entrantes sont vérifiées par rapport aux types attendus, et tous les champs supplémentaires non déclarés dans la signature du gestionnaire sont rejetés. Les échecs de validation renvoient HTTP 422 (Unprocessable Entity) avec un message d'erreur descriptif.

@workflows.workflow.signal(name="add_notification")
async def add_notification(self, message: str, priority: int = 1) -> None:
    # Only 'message' and 'priority' fields are accepted
    # Extra fields like {"message": "hi", "extra": "bad"} will be rejected
    ...

Pour des structures d'entrée complexes, utilisez des modèles Pydantic. Le SDK désérialisera automatiquement le payload entrant dans le modèle et validera chaque champ :

import pydantic

class UserProfile(pydantic.BaseModel):
    name: str
    address: str

@workflows.workflow.signal(name="update_profile")
async def update_profile(self, profile: UserProfile) -> None:
    self._profile = profile
Envoyer un signal

Envoyer un signal

Une fois votre workflow en cours d'exécution et à l'écoute de signaux, vous pouvez en envoyer un depuis l'extérieur en utilisant le SDK ou l'API.

from mistralai.client import Mistral

client = Mistral(api_key="your_api_key")

client.workflows.executions.signal_workflow_execution(
    execution_id="my-execution-id",
    name="add_notification",
    input={"message": "Deployment complete", "priority": 2},
)
Comparaison

Comparaison

FonctionnalitéType de communicationModifie l'étatRenvoie une valeurPeut exécuter des activités
SignalAsynchroneOuiNonNon
QuerySynchroneNonOuiNon
UpdateSynchroneOuiOuiOui