Déport de charge utile (payload offloading)

Pour garantir la rapidité et la prévisibilité de la couche d'orchestration pour chaque workflow sur la plateforme, Mistral Workflows impose une limite de 2 Mo sur les entrées de workflow, les entrées et sorties d'activité. Lorsque vous devez transférer des données plus volumineuses entre les étapes, le schéma recommandé est de passer des références vers les données, pas les données elles-mêmes — par exemple, une clé d’objet dans un bucket, un identifiant de ligne dans une base de données ou un identifiant de document.

Pour rendre cela transparent, le SDK fournit deux assistants intégrés qui transforment ce schéma de référence en une simple annotation :

  • Déport de champ d'activité — marquez un champ comme déportable : le SDK le stocke automatiquement dans votre stockage d’objets lors de la sortie et le restaure à l’entrée. La couche d’orchestration ne voit qu’une petite référence.
  • Déport de payload — bascule automatiquement toute charge utile dépassant 2 Mo dans un stockage d’objets. Utile lorsque le workflow doit manipuler ou accéder à une grande valeur.

Les deux fonctionnalités s’exécutent côté worker et se configurent par déploiement.

i
Information

Vous souhaitez chiffrer les charges utiles en plus de les déporter ? Consultez Chiffrement — les deux fonctions sont compatibles.

Quand utiliser quoi ?

Quand utiliser quoi ?

ProblèmeSolution
Entrée/sortie d’activité > 2 Mo, accès uniquement côté activitéDéport de champ d’activité (recommandé)
Entrée de workflow ou entrée/sortie d’activité > 2 Mo, accès dans le code du workflowDéport de payload
Astuce

Guide rapide : si vous pouvez contenir vos données volumineuses dans une activité (cas le plus courant), privilégiez le déport de champ d’activité — il évite les coûts de relecture liés au déport de payload. Utilisez le déport de payload seulement si le workflow doit manipuler ou transmettre lui-même une valeur volumineuse.

Prérequis

Prérequis

Installez le SDK Workflows avec le connecteur vers votre fournisseur de stockage cloud :

# Support AWS S3
uv add "mistralai-workflows[s3]"

# Support Azure Blob Storage
uv add "mistralai-workflows[azure]"

# Support Google Cloud Storage
uv add "mistralai-workflows[gcs]"

# Tous les fournisseurs de stockage
uv add "mistralai-workflows[storage]"

Vous devez également disposer d’un conteneur ou d’un bucket accessible par tous les workers.

Avertissement

Définissez une politique d’expiration sur votre bucket : Workflows ne supprime pas automatiquement les charges utiles déportées. Configurez une règle de cycle de vie/expiration adaptée à votre période de rétention du workflow (par défaut : 30 jours). Les blobs sont préfixés par espace de noms, donc vous pouvez définir des règles par namespace.

Déport de champ d'activité (recommandé)

Déport de champ d'activité (recommandé)

Marquez un champ Pydantic comme déportable. Le SDK télécharge sa valeur dans votre stockage d’objets à la sortie d’une activité et la retélécharge à l’entrée de la suivante.

Points clés :

  • Les valeurs déportées sont disponibles uniquement dans les activités (pas dans le contexte de workflow).
  • Téléchargement et téléchargement se font dans le contexte de l’activité, sous le contrôle de son timeout.
  • Chaque OffloadableField est stocké comme un blob séparé : regroupez les champs liés au sein d’un même modèle Pydantic pour de meilleures performances.
Définir des champs déportables

Définir des champs déportables

from mistralai.workflows.core.encoding.fields_offloader import (
    OffloadableModel,
    OffloadableField,
)

class TranscriptionPayload(OffloadableModel):
    audio_id: str                                            # champ classique
    transcript: OffloadableField[str] = OffloadableField(    # déporté
        value=""
    )
Utilisation dans les activités

Utilisation dans les activités

Dans votre activité, accédez à la valeur déportée via la méthode get_value(). Le système gère automatiquement le déport et la restauration :

import mistralai.workflows as workflows

@workflows.activity()
async def transcribe(payload: TranscriptionPayload) -> TranscriptionPayload:
    audio = fetch_audio(payload.audio_id)
    text = await whisper(audio)

    return TranscriptionPayload(
        audio_id=payload.audio_id,
        transcript=OffloadableField(value=text),
    )
Utilisation dans les workflows

Utilisation dans les workflows

Dans le corps du workflow, ne faites pas appel à .get_value() : la valeur n’est pas forcément locale. Passez l’objet OffloadableField tel quel à l’activité suivante :

import mistralai.workflows as workflows

@workflows.workflow.define(name="transcribe-and-summarize")
class TranscribeWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, audio_id: str) -> str:
        result = await transcribe(TranscriptionPayload(audio_id=audio_id))

        # Passez le champ déporté tel quel — NE LE décompressez PAS dans le workflow
        summary = await summarize(SummaryInput(transcript=result.transcript))
        return summary.text
Conseil performance

Conseil performance

Chaque OffloadableField implique un aller-retour séparé en blob. Si plusieurs champs sont toujours consultés ensemble, regroupez-les dans un même modèle Pydantic :

from pydantic import BaseModel
from mistralai.workflows.core.encoding.fields_offloader import (
    OffloadableModel,
    OffloadableField,
)

class LargeContext(BaseModel):
    transcript: str
    speaker_diarization: list[dict]
    audio_features: dict[str, float]

class Payload(OffloadableModel):
    large: OffloadableField[LargeContext]   # un blob, trois champs
Configuration

Configuration

Définissez ces variables d’environnement sur vos workers. La présence du bloc de configuration de stockage active la fonctionnalité — il n’y a pas de drapeau ENABLED. La taille minimale par défaut est 1 Mo ; vous pouvez l’ajuster avec MIN_SIZE_BYTES.

ACTIVITY_ATTRIBUTES_OFFLOADING__MIN_SIZE_BYTES=1048576   # 1 Mo (défaut)
ACTIVITY_ATTRIBUTES_OFFLOADING__STORAGE_CONFIG__STORAGE_PROVIDER=azure
ACTIVITY_ATTRIBUTES_OFFLOADING__STORAGE_CONFIG__CONTAINER_NAME=my-workflow-payloads
ACTIVITY_ATTRIBUTES_OFFLOADING__STORAGE_CONFIG__AZURE_CONNECTION_STRING="..."
Déport de payload (échappatoire globale)

Déport de payload (échappatoire globale)

Utilisez cette fonction si une entrée ou sortie de workflow est systématiquement volumineuse et ne peut pas être structurée en champs d’activité.

Le SDK déporte automatiquement tout payload supérieur à 2 Mo dans le stockage d’objets. La plateforme stocke une référence ; les workers téléchargent la charge utile à la demande.

Avertissement

Coût de relecture : lors d’un replay de workflow (redémarrage d’un worker, reprise après incident), chaque payload déporté est retéléchargé. Pour des workflows manipulant de nombreux gros volumes, cela peut provoquer des timeouts d’activité et des blocages worker. Privilégiez le déport de champ d’activité pour les parcours critiques.

Configuration

Configuration

Le déport de payload nécessite une configuration sur le client et le worker : le déport peut intervenir autant au démarrage d’un workflow (côté client) que lors des transferts entre activités (côté worker).

Les données déportées sont stockées dans votre propre cloud. Vous êtes responsable de :

  • Créer et gérer le bucket ou conteneur ;
  • Configurer les identifiants d’accès adéquats ;
  • Vérifier que le client et le worker accèdent tous deux au stockage ;
  • Mettre en place une politique d’expiration — la suppression n’est pas automatique. Ajustez l’expiration à la durée de rétention du workflow (par défaut : 30 jours).
Configuration worker

Configuration worker

Définissez ces variables d’environnement sur vos workers :

TEMPORAL_PAYLOAD_OFFLOADING__STORAGE_CONFIG__STORAGE_PROVIDER=azure
TEMPORAL_PAYLOAD_OFFLOADING__STORAGE_CONFIG__CONTAINER_NAME=workflow-payloads
TEMPORAL_PAYLOAD_OFFLOADING__STORAGE_CONFIG__AZURE_CONNECTION_STRING="..."

Comme pour le déport de champ, la simple présence du bloc de configuration active la fonctionnalité — il n’y a pas de drapeau ENABLED.

Configuration client

Configuration client

Si vous lancez des workflows via le SDK Python dont l’entrée peut dépasser 2 Mo, configurez également le déport côté client :

from mistralai.client import Mistral
from mistralai.extra.workflows.encoding import PayloadOffloadingConfig, BlobStorageConfig

client = Mistral(
    api_key="your_api_key",
    workflow_payload_offloading=PayloadOffloadingConfig(
        storage_config=BlobStorageConfig(
            storage_provider="azure",
            container_name="workflow-payloads",
            azure_connection_string="...",
        )
    ),
)

execution = client.workflows.execute_workflow(
    workflow_identifier="my-workflow",
    input={"large_data": "..."},  # Sera déporté si > 2 Mo
)