Realtime

La transcription en temps réel vous permet de transcrire l'audio au fur et à mesure qu'il est prononcé ou enregistré. Cela est utile pour les applications nécessitant un retour immédiat, comme le sous-titrage en direct, les assistants vocaux ou la prise de notes en temps réel.

Avant de commencer

Avant de commencer

Modèles avec capacités de streaming

Modèles compatibles temps réel :

  • Voxtral Mini Transcribe Realtime (voxtral-mini-transcribe-realtime-2602) : optimisé pour la transcription en direct avec une latence ultra-faible et une haute précision.
Note

Le mode temps réel n'est actuellement pas compatible avec le paramètre diarize. Utilisez l'un ou l'autre.

Utilisation de base

Utilisation de base

Note

Version Python : avant d'exécuter le script suivant, assurez-vous d'avoir installé le package mistralai[realtime] :

pip install mistralai[realtime]

Vous pouvez fournir n'importe quel flux audio en entrée comme suit.

from mistralai.client import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.client.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta

import asyncio
import sys
from typing import AsyncIterator

api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)

audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)
audio_stream = ...

async def main():
    try:
        async for event in client.audio.realtime.transcribe_stream(
            audio_stream=audio_stream, # audio stream corresponds to any iterable of bytes
            model="voxtral-mini-transcribe-realtime-2602",
            audio_format=audio_format,
        ):
            if isinstance(event, RealtimeTranscriptionSessionCreated):
                print(f"Session created.")
            elif isinstance(event, TranscriptionStreamTextDelta):
                print(event.text, end="", flush=True)
            elif isinstance(event, TranscriptionStreamDone):
                print("Transcription done.")
            elif isinstance(event, RealtimeTranscriptionError):
                print(f"Error: {event}")
            elif isinstance(event, UnknownRealtimeEvent):
                print(f"Unknown event: {event}")
                continue
    except KeyboardInterrupt:
        print("Stopping...")

sys.exit(asyncio.run(main()))

Voici un exemple d'implémentation utilisant votre microphone pour diffuser l'audio directement.

from mistralai.client import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.client.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta

import asyncio
import sys
from typing import AsyncIterator

api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)

#microphone is always pcm_s16le here
audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)

async def iter_microphone(
    *,
    sample_rate: int,
    chunk_duration_ms: int,
) -> AsyncIterator[bytes]:
    """
    Yield microphone PCM chunks using PyAudio (16-bit mono).
    Encoding is always pcm_s16le.
    """
    import pyaudio

    p = pyaudio.PyAudio()
    chunk_samples = int(sample_rate * chunk_duration_ms / 1000)

    stream = p.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=sample_rate,
        input=True,
        frames_per_buffer=chunk_samples,
    )

    loop = asyncio.get_running_loop()
    try:
        while True:
            # stream.read is blocking; run it off-thread
            data = await loop.run_in_executor(None, stream.read, chunk_samples, False)
            yield data
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()

audio_stream = iter_microphone(sample_rate=audio_format.sample_rate, chunk_duration_ms=480)

async def main():
    try:
        async for event in client.audio.realtime.transcribe_stream(
            audio_stream=audio_stream, # audio stream corresponds to any iterable of bytes
            model="voxtral-mini-transcribe-realtime-2602",
            audio_format=audio_format,
        ):
            if isinstance(event, RealtimeTranscriptionSessionCreated):
                print(f"Session created.")
            elif isinstance(event, TranscriptionStreamTextDelta):
                print(event.text, end="", flush=True)
            elif isinstance(event, TranscriptionStreamDone):
                print("Transcription done.")
            elif isinstance(event, RealtimeTranscriptionError):
                print(f"Error: {event}")
            elif isinstance(event, UnknownRealtimeEvent):
                print(f"Unknown event: {event}")
                continue
    except KeyboardInterrupt:
        print("Stopping...")

sys.exit(asyncio.run(main()))
Délai cible

Délai cible

Le délai cible permet d'attendre un temps défini avant de démarrer la transcription. Cela permet de recueillir du contexte et d'améliorer la précision. Vous pouvez spécifier un délai cible via le paramètre target_streaming_delay_ms.

Utilisation de base

Utilisation de base

Voici un exemple simple d'utilisation :

import asyncio
import sys
from typing import AsyncIterator

from mistralai.client import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.client.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta

api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)

audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)
audio_stream = ... # audio stream corresponds to any iterable of bytes


async def main():
    try:
        async for event in client.audio.realtime.transcribe_stream(
            audio_stream=audio_stream,
            model="voxtral-mini-transcribe-realtime-2602",
            audio_format=audio_format,
            target_streaming_delay_ms=1000,
        ):
            if isinstance(event, RealtimeTranscriptionSessionCreated):
                print(f"Session created.")
            elif isinstance(event, TranscriptionStreamTextDelta):
                print(event.text, end="", flush=True)
            elif isinstance(event, TranscriptionStreamDone):
                print("Transcription done.")
            elif isinstance(event, RealtimeTranscriptionError):
                print(f"Error: {event}")
            elif isinstance(event, UnknownRealtimeEvent):
                print(f"Unknown event: {event}")
                continue
    except KeyboardInterrupt:
        print("Stopping...")

sys.exit(asyncio.run(main()))
Exemple d'utilisation : double délai

Exemple d'utilisation : double délai

La transcription à double délai permet d'équilibrer rapidité et précision. Elle utilise deux flux parallèles :

  • Flux rapide : fournit des transcriptions rapides mais moins précises avec un délai minimal.
  • Flux lent : fournit des transcriptions plus précises en recueillant davantage de contexte, ce qui entraîne un délai plus long.

Cette approche est utile dans les scénarios où un retour immédiat et une haute précision sont nécessaires, comme le sous-titrage en direct ou la prise de notes en temps réel.

Comment ça fonctionne

  1. Flux rapide : transcrit l'audio avec un délai minimal (par exemple, 240 ms), fournissant des résultats rapides mais potentiellement moins précis.
  2. Flux lent : transcrit l'audio avec un délai plus long (par exemple, 2 400 ms), fournissant des résultats plus précis en exploitant un contexte supplémentaire.
  3. Sortie combinée : la sortie finale fusionne les résultats des deux flux, assurant un équilibre entre vitesse et précision.

Cas d'usage

  • Sous-titrage en direct pour présentations ou réunions
  • Applications de prise de notes en temps réel
  • Assistants vocaux nécessitant un retour immédiat

Étape 1 : Importer les dépendances

Dans cette étape, nous importons toutes les bibliothèques et modules nécessaires pour la transcription à double délai. Cela inclut :

  • Bibliothèques standard : argparse pour l'analyse des arguments de ligne de commande, asyncio pour les opérations asynchrones, et difflib pour comparer des séquences.
  • Bibliothèque Rich : utilisée pour créer une interface utilisateur de terminal visuellement attrayante.
  • SDK Mistral AI : fournit la fonctionnalité principale pour la transcription en temps réel.
  • Utilitaires PyAudio : gèrent l'entrée microphone et le streaming audio.
import argparse
import asyncio
import difflib
import os
import sys
from dataclasses import dataclass
from typing import AsyncIterator, Sequence

from rich.align import Align
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.text import Text

from mistralai.client import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.client.models import (
    AudioFormat,
    RealtimeTranscriptionError,
    RealtimeTranscriptionSessionCreated,
    TranscriptionStreamDone,
    TranscriptionStreamTextDelta,
)

from pyaudio_utils import load_pyaudio

console = Console()

Étape 2 : Définir les classes d'état et d'affichage

Dans cette étape, nous définissons les classes responsables de la gestion de l'état de la transcription et du rendu de l'interface utilisateur :

  • DualTranscriptState : suit l'état des deux flux de transcription rapide et lent, incluant leurs sorties textuelles, leurs statuts et les éventuelles erreurs.
  • DualTranscriptDisplay : affiche une interface utilisateur en direct dans le terminal, présentant les résultats de transcription des deux flux. Elle utilise la bibliothèque rich pour créer une mise en page visuellement attrayante avec un en-tête, un corps de transcription et un pied de page.

La classe DualTranscriptDisplay inclut des méthodes pour :

  • Normaliser les mots pour comparaison.
  • Calculer les textes d'affichage pour fusionner les résultats des deux flux.
  • Appliquer des styles aux éléments d'interface utilisateur en fonction de leur statut.
@dataclass
class DualTranscriptState:
    """Tracks transcript state for dual-delay transcription."""

    fast_full_text: str = ""
    slow_full_text: str = ""
    fast_status: str = "🔌 Connecting..."
    slow_status: str = "🔌 Connecting..."
    error: str | None = None
    fast_done: bool = False
    slow_done: bool = False

    def set_error(self, message: str) -> None:
        self.error = message
        self.fast_status = "❌ Error"
        self.slow_status = "❌ Error"


class DualTranscriptDisplay:
    """Renders a live dual-delay transcription UI."""

    def __init__(
        self,
        *,
        model: str,
        fast_delay_ms: int,
        slow_delay_ms: int,
        state: DualTranscriptState,
    ) -> None:
        self.model = model
        self.fast_delay_ms = fast_delay_ms
        self.slow_delay_ms = slow_delay_ms
        self.state = state

    @staticmethod
    def _normalize_word(word: str) -> str:
        return word.strip(".,!?;:\"'()[]{}").lower()

    def _compute_display_texts(self) -> tuple[str, str]:
        slow_words = self.state.slow_full_text.split()
        fast_words = self.state.fast_full_text.split()

        if not slow_words:
            partial_text = f" {self.state.fast_full_text}".rstrip()
            return "", partial_text

        slow_norm = [self._normalize_word(word) for word in slow_words]
        fast_norm = [self._normalize_word(word) for word in fast_words]

        matcher = difflib.SequenceMatcher(None, slow_norm, fast_norm)
        last_fast_index = 0
        slow_progress = 0
        for block in matcher.get_matching_blocks():
            if block.size == 0:
                continue
            slow_end = block.a + block.size
            if slow_end > slow_progress:
                slow_progress = slow_end
                last_fast_index = block.b + block.size

        if last_fast_index < len(fast_words):
            ahead_words = fast_words[last_fast_index:]
            partial_text = " " + " ".join(ahead_words) if ahead_words else ""
        else:
            partial_text = ""

        return self.state.slow_full_text, partial_text

    @staticmethod
    def _status_style(status: str) -> str:
        if "Listening" in status:
            return "green"
        if "Connecting" in status:
            return "yellow dim"
        if "Done" in status or "Stopped" in status:
            return "dim"
        return "red"

    def render(self) -> Layout:
        layout = Layout()

        header_text = Text()
        header_text.append("│ ", style="dim")
        header_text.append(self.model, style="dim")
        header_text.append(" │ ", style="dim")
        header_text.append(
            f"fast {self.fast_delay_ms}ms", style="bright_yellow"
        )
        header_text.append(
            f" {self.state.fast_status}",
            style=self._status_style(self.state.fast_status),
        )
        header_text.append(" │ ", style="dim")
        header_text.append(f"slow {self.slow_delay_ms}ms", style="white")
        header_text.append(
            f" {self.state.slow_status}",
            style=self._status_style(self.state.slow_status),
        )

        header = Align.left(header_text, vertical="middle", pad=False)

        final_text, partial_text = self._compute_display_texts()
        transcript_text = Text()
        if final_text or partial_text:
            transcript_text.append(final_text, style="white")
            transcript_text.append(partial_text, style="bright_yellow")
        else:
            transcript_text.append("...", style="dim")

        transcript = Panel(
            Align.left(transcript_text, vertical="top"),
            border_style="dim",
            padding=(1, 2),
        )

        footer_text = Text()
        footer_text.append("ctrl+c", style="dim")
        footer_text.append(" quit", style="dim italic")
        footer = Align.left(footer_text, vertical="middle", pad=False)

        if self.state.error:
            layout.split_column(
                Layout(header, name="header", size=1),
                Layout(transcript, name="body"),
                Layout(
                    Panel(Text(self.state.error, style="red"), border_style="red"),
                    name="error",
                    size=4,
                ),
                Layout(footer, name="footer", size=1),
            )
        else:
            layout.split_column(
                Layout(header, name="header", size=1),
                Layout(transcript, name="body"),
                Layout(footer, name="footer", size=1),
            )

        return layout

Étape 3 : Définir les fonctions de gestion audio

Dans cette étape, nous définissons des fonctions pour gérer l'entrée audio du microphone et les flux audio pour la transcription rapide et lente :

  • iter_microphone : Capture l'audio du microphone avec PyAudio et génère des blocs de données audio de manière asynchrone. Cette fonction tourne en boucle, lisant en continu les données audio et les transmettant pour traitement.
  • queue_audio_iter : Lit les blocs audio d'une file d'attente et les génère jusqu'à réception d'une valeur sentinelle (None). Cela permet de découpler la capture audio du traitement.
  • broadcast_microphone : Lit l'audio du microphone une seule fois et le diffuse vers plusieurs files d'attente. Cela garantit que les flux de transcription rapide et lente reçoivent les mêmes données audio.

Ces fonctions fonctionnent ensemble pour assurer une capture efficace des données audio et leur distribution aux deux flux de transcription.

async def iter_microphone(
    *,
    sample_rate: int,
    chunk_duration_ms: int,
) -> AsyncIterator[bytes]:
    """
    Yield microphone PCM chunks using PyAudio (16-bit mono).
    Encoding is always pcm_s16le.
    """
    pyaudio = load_pyaudio()

    p = pyaudio.PyAudio()
    chunk_samples = int(sample_rate * chunk_duration_ms / 1000)

    stream = p.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=sample_rate,
        input=True,
        frames_per_buffer=chunk_samples,
    )

    loop = asyncio.get_running_loop()
    try:
        while True:
            data = await loop.run_in_executor(None, stream.read, chunk_samples, False)
            yield data
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()


async def queue_audio_iter(
    queue: asyncio.Queue[bytes | None],
) -> AsyncIterator[bytes]:
    """Yield audio chunks from a queue until a None sentinel is received."""
    while True:
        chunk = await queue.get()
        if chunk is None:
            break
        yield chunk


async def broadcast_microphone(
    *,
    sample_rate: int,
    chunk_duration_ms: int,
    queues: Sequence[asyncio.Queue[bytes | None]],
) -> None:
    """Read from the microphone once and broadcast to multiple queues."""
    try:
        async for chunk in iter_microphone(
            sample_rate=sample_rate, chunk_duration_ms=chunk_duration_ms
        ):
            for queue in queues:
                await queue.put(chunk)
    finally:
        for queue in queues:
            while True:
                try:
                    queue.put_nowait(None)
                    break
                except asyncio.QueueFull:
                    try:
                        queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break

Étape 4 : Définir les fonctions utilitaires

Dans cette étape, nous définissons des fonctions utilitaires pour gérer les statuts d'événements et analyser les arguments de ligne de commande :

  • _status_for_event : Convertit les événements de transcription en messages de statut lisibles. Par exemple, elle renvoie « 🎤 Listening... » lors de la création d'une session et « ✅ Done » lorsque la transcription est terminée.
  • parse_args : Analyse les arguments de ligne de commande pour configurer le processus de transcription. Cela inclut des paramètres tels que l'identifiant du modèle, les délais de transcription rapide et lente, le taux d'échantillonnage, la durée des blocs, la clé API et l'URL de base.

Ces fonctions utilitaires simplifient la logique principale et rendent le code plus modulaire et facile à maintenir.

def _status_for_event(event: object) -> str:
    if isinstance(event, RealtimeTranscriptionSessionCreated):
        return "🎤 Listening..."
    return "✅ Done"


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Dual-delay real-time microphone transcription."
    )
    parser.add_argument(
        "--model",
        default="voxtral-mini-transcribe-realtime-2602",
        help="Model ID",
    )
    parser.add_argument(
        "--fast-delay-ms",
        type=int,
        default=240,
        help="Fast target streaming delay in ms",
    )
    parser.add_argument(
        "--slow-delay-ms",
        type=int,
        default=2400,
        help="Slow target streaming delay in ms",
    )
    parser.add_argument(
        "--sample-rate",
        type=int,
        default=16000,
        choices=[8000, 16000, 22050, 44100, 48000],
        help="Sample rate in Hz",
    )
    parser.add_argument(
        "--chunk-duration",
        type=int,
        default=10,
        help="Chunk duration in ms",
    )
    parser.add_argument(
        "--api-key",
        default=os.environ.get("MISTRAL_API_KEY"),
        help="Mistral API key",
    )
    parser.add_argument(
        "--base-url",
        default=os.environ.get("MISTRAL_BASE_URL", "wss://api.mistral.ai"),
    )
    return parser.parse_args()

Étape 5 : Définir les fonctions de gestion des flux

Dans cette étape, nous définissons des fonctions pour gérer les flux de transcription et mettre à jour l'interface utilisateur :

  • run_stream : Gère le flux de transcription pour le flux rapide ou lent. Elle traite les événements du flux de transcription, tels que la création de session, les deltas de texte et les événements de fin. Elle met à jour l'état en conséquence et déclenche les mises à jour de l'interface en envoyant des signaux à la file d'attente de mise à jour.
  • ui_loop : Met à jour en continu l'interface utilisateur en fonction des signaux de la file d'attente de mise à jour. Elle utilise le composant Live de la bibliothèque rich pour afficher l'interface en temps réel, offrant une expérience utilisateur fluide et réactive.

Ces fonctions fonctionnent ensemble pour garantir que les résultats de transcription s'affichent en temps réel et que l'interface reste réactive.

async def run_stream(
    *,
    client: Mistral,
    model: str,
    delay_ms: int,
    audio_stream: AsyncIterator[bytes],
    audio_format: AudioFormat,
    state: DualTranscriptState,
    update_queue: asyncio.Queue[None],
    is_fast: bool,
) -> None:
    try:
        async for event in client.audio.realtime.transcribe_stream(
            audio_stream=audio_stream,
            model=model,
            audio_format=audio_format,
            target_streaming_delay_ms=delay_ms,
        ):
            if isinstance(event, RealtimeTranscriptionSessionCreated):
                if is_fast:
                    state.fast_status = _status_for_event(event)
                else:
                    state.slow_status = _status_for_event(event)
            elif isinstance(event, TranscriptionStreamTextDelta):
                if is_fast:
                    state.fast_full_text += event.text
                else:
                    state.slow_full_text += event.text
            elif isinstance(event, TranscriptionStreamDone):
                if is_fast:
                    state.fast_status = _status_for_event(event)
                    state.fast_done = True
                else:
                    state.slow_status = _status_for_event(event)
                    state.slow_done = True
                break
            elif isinstance(event, RealtimeTranscriptionError):
                state.set_error(str(event.error))
                break
            elif isinstance(event, UnknownRealtimeEvent):
                continue

            if update_queue.empty():
                update_queue.put_nowait(None)
    except Exception as exc:  # pragma: no cover - safety net for UI demo
        state.set_error(str(exc))
        if update_queue.empty():
            update_queue.put_nowait(None)


async def ui_loop(
    display: DualTranscriptDisplay,
    update_queue: asyncio.Queue[None],
    stop_event: asyncio.Event,
    *,
    refresh_hz: float = 12.0,
) -> None:
    with Live(
        display.render(), console=console, refresh_per_second=refresh_hz, screen=True
    ) as live:
        while not stop_event.is_set():
            try:
                await asyncio.wait_for(update_queue.get(), timeout=0.25)
            except asyncio.TimeoutError:
                pass
            live.update(display.render())

Étape 6 : Définir la fonction principale

Dans cette étape finale, nous définissons la fonction main, qui orchestre l'ensemble du processus de transcription à double délai :

  1. Initialisation : Analyser les arguments de ligne de commande, charger PyAudio et initialiser les objets d'état et d'affichage.
  2. Configuration : Créer un client Mistral, configurer les files d'attente audio pour les flux rapide et lent, et initialiser les primitives de synchronisation comme stop_event et update_queue.
  3. Création des tâches : Lancer des tâches asynchrones pour diffuser l'audio vers les deux flux, exécuter les flux de transcription rapide et lent, et mettre à jour l'interface utilisateur en temps réel.
  4. Surveillance : Surveiller en continu les tâches pour détecter leur achèvement ou des erreurs. Si une erreur se produit ou si l'utilisateur interrompt le processus (par exemple, avec Ctrl+C), définir l'événement d'arrêt pour fermer toutes les tâches de manière propre.
  5. Nettoyage : Annuler toutes les tâches et attendre leur achèvement pour garantir une sortie propre.

La fonction main relie tous les éléments entre eux, garantissant que le processus de transcription à double délai s'exécute de manière fluide et efficace.

async def main() -> int:
    args = parse_args()
    api_key = args.api_key or os.environ["MISTRAL_API_KEY"]

    try:
        load_pyaudio()
    except RuntimeError as exc:
        console.print(str(exc), style="red")
        return 1

    state = DualTranscriptState()
    display = DualTranscriptDisplay(
        model=args.model,
        fast_delay_ms=args.fast_delay_ms,
        slow_delay_ms=args.slow_delay_ms,
        state=state,
    )

    client = Mistral(api_key=api_key, server_url=args.base_url)
    audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=args.sample_rate)

    fast_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=50)
    slow_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=50)

    stop_event = asyncio.Event()
    update_queue: asyncio.Queue[None] = asyncio.Queue(maxsize=1)

    broadcaster = asyncio.create_task(
        broadcast_microphone(
            sample_rate=args.sample_rate,
            chunk_duration_ms=args.chunk_duration,
            queues=(fast_queue, slow_queue),
        )
    )

    fast_task = asyncio.create_task(
        run_stream(
            client=client,
            model=args.model,
            delay_ms=args.fast_delay_ms,
            audio_stream=queue_audio_iter(fast_queue),
            audio_format=audio_format,
            state=state,
            update_queue=update_queue,
            is_fast=True,
        )
    )

    slow_task = asyncio.create_task(
        run_stream(
            client=client,
            model=args.model,
            delay_ms=args.slow_delay_ms,
            audio_stream=queue_audio_iter(slow_queue),
            audio_format=audio_format,
            state=state,
            update_queue=update_queue,
            is_fast=False,
        )
    )

    ui_task = asyncio.create_task(
        ui_loop(display, update_queue, stop_event, refresh_hz=12.0)
    )

    try:
        while True:
            await asyncio.sleep(0.1)
            for task in (broadcaster, fast_task, slow_task):
                if not task.done():
                    continue
                exc = task.exception()
                if exc:
                    state.set_error(str(exc))
                    if update_queue.empty():
                        update_queue.put_nowait(None)
                    stop_event.set()
                    break
            if state.error:
                stop_event.set()
                break
            if state.fast_done and state.slow_done:
                stop_event.set()
                break
    except KeyboardInterrupt:
        state.fast_status = "⏹️ Stopped"
        state.slow_status = "⏹️ Stopped"
        stop_event.set()
    finally:
        broadcaster.cancel()
        fast_task.cancel()
        slow_task.cancel()
        await asyncio.gather(broadcaster, fast_task, slow_task, return_exceptions=True)
        await ui_task

    return 0 if not state.error else 1


if __name__ == "__main__":
    sys.exit(asyncio.run(main()))