Realtime
La transcription en temps réel vous permet de transcrire l'audio au fur et à mesure qu'il est prononcé ou enregistré. Cela est utile pour les applications nécessitant un retour immédiat, comme le sous-titrage en direct, les assistants vocaux ou la prise de notes en temps réel.
Avant de commencer
Modèles avec capacités de streaming
Modèles compatibles temps réel :
- Voxtral Mini Transcribe Realtime (
voxtral-mini-transcribe-realtime-2602) : optimisé pour la transcription en direct avec une latence ultra-faible et une haute précision.
Le mode temps réel n'est actuellement pas compatible avec le paramètre diarize. Utilisez l'un ou l'autre.
Utilisation de base
Version Python : avant d'exécuter le script suivant, assurez-vous d'avoir installé le package mistralai[realtime] :
pip install mistralai[realtime]Vous pouvez fournir n'importe quel flux audio en entrée comme suit.
from mistralai.client import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.client.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta
import asyncio
import sys
from typing import AsyncIterator
api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)
audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)
audio_stream = ...
async def main():
try:
async for event in client.audio.realtime.transcribe_stream(
audio_stream=audio_stream, # audio stream corresponds to any iterable of bytes
model="voxtral-mini-transcribe-realtime-2602",
audio_format=audio_format,
):
if isinstance(event, RealtimeTranscriptionSessionCreated):
print(f"Session created.")
elif isinstance(event, TranscriptionStreamTextDelta):
print(event.text, end="", flush=True)
elif isinstance(event, TranscriptionStreamDone):
print("Transcription done.")
elif isinstance(event, RealtimeTranscriptionError):
print(f"Error: {event}")
elif isinstance(event, UnknownRealtimeEvent):
print(f"Unknown event: {event}")
continue
except KeyboardInterrupt:
print("Stopping...")
sys.exit(asyncio.run(main()))Voici un exemple d'implémentation utilisant votre microphone pour diffuser l'audio directement.
from mistralai.client import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.client.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta
import asyncio
import sys
from typing import AsyncIterator
api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)
#microphone is always pcm_s16le here
audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)
async def iter_microphone(
*,
sample_rate: int,
chunk_duration_ms: int,
) -> AsyncIterator[bytes]:
"""
Yield microphone PCM chunks using PyAudio (16-bit mono).
Encoding is always pcm_s16le.
"""
import pyaudio
p = pyaudio.PyAudio()
chunk_samples = int(sample_rate * chunk_duration_ms / 1000)
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=sample_rate,
input=True,
frames_per_buffer=chunk_samples,
)
loop = asyncio.get_running_loop()
try:
while True:
# stream.read is blocking; run it off-thread
data = await loop.run_in_executor(None, stream.read, chunk_samples, False)
yield data
finally:
stream.stop_stream()
stream.close()
p.terminate()
audio_stream = iter_microphone(sample_rate=audio_format.sample_rate, chunk_duration_ms=480)
async def main():
try:
async for event in client.audio.realtime.transcribe_stream(
audio_stream=audio_stream, # audio stream corresponds to any iterable of bytes
model="voxtral-mini-transcribe-realtime-2602",
audio_format=audio_format,
):
if isinstance(event, RealtimeTranscriptionSessionCreated):
print(f"Session created.")
elif isinstance(event, TranscriptionStreamTextDelta):
print(event.text, end="", flush=True)
elif isinstance(event, TranscriptionStreamDone):
print("Transcription done.")
elif isinstance(event, RealtimeTranscriptionError):
print(f"Error: {event}")
elif isinstance(event, UnknownRealtimeEvent):
print(f"Unknown event: {event}")
continue
except KeyboardInterrupt:
print("Stopping...")
sys.exit(asyncio.run(main()))Délai cible
Le délai cible permet d'attendre un temps défini avant de démarrer la transcription. Cela permet de recueillir du contexte et d'améliorer la précision. Vous pouvez spécifier un délai cible via le paramètre target_streaming_delay_ms.
Utilisation de base
Voici un exemple simple d'utilisation :
import asyncio
import sys
from typing import AsyncIterator
from mistralai.client import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.client.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta
api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)
audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)
audio_stream = ... # audio stream corresponds to any iterable of bytes
async def main():
try:
async for event in client.audio.realtime.transcribe_stream(
audio_stream=audio_stream,
model="voxtral-mini-transcribe-realtime-2602",
audio_format=audio_format,
target_streaming_delay_ms=1000,
):
if isinstance(event, RealtimeTranscriptionSessionCreated):
print(f"Session created.")
elif isinstance(event, TranscriptionStreamTextDelta):
print(event.text, end="", flush=True)
elif isinstance(event, TranscriptionStreamDone):
print("Transcription done.")
elif isinstance(event, RealtimeTranscriptionError):
print(f"Error: {event}")
elif isinstance(event, UnknownRealtimeEvent):
print(f"Unknown event: {event}")
continue
except KeyboardInterrupt:
print("Stopping...")
sys.exit(asyncio.run(main()))Exemple d'utilisation : double délai
La transcription à double délai permet d'équilibrer rapidité et précision. Elle utilise deux flux parallèles :
- Flux rapide : fournit des transcriptions rapides mais moins précises avec un délai minimal.
- Flux lent : fournit des transcriptions plus précises en recueillant davantage de contexte, ce qui entraîne un délai plus long.
Cette approche est utile dans les scénarios où un retour immédiat et une haute précision sont nécessaires, comme le sous-titrage en direct ou la prise de notes en temps réel.
Comment ça fonctionne
- Flux rapide : transcrit l'audio avec un délai minimal (par exemple, 240 ms), fournissant des résultats rapides mais potentiellement moins précis.
- Flux lent : transcrit l'audio avec un délai plus long (par exemple, 2 400 ms), fournissant des résultats plus précis en exploitant un contexte supplémentaire.
- Sortie combinée : la sortie finale fusionne les résultats des deux flux, assurant un équilibre entre vitesse et précision.
Cas d'usage
- Sous-titrage en direct pour présentations ou réunions
- Applications de prise de notes en temps réel
- Assistants vocaux nécessitant un retour immédiat
Étape 1 : Importer les dépendances
Dans cette étape, nous importons toutes les bibliothèques et modules nécessaires pour la transcription à double délai. Cela inclut :
- Bibliothèques standard :
argparsepour l'analyse des arguments de ligne de commande,asynciopour les opérations asynchrones, etdifflibpour comparer des séquences. - Bibliothèque Rich : utilisée pour créer une interface utilisateur de terminal visuellement attrayante.
- SDK Mistral AI : fournit la fonctionnalité principale pour la transcription en temps réel.
- Utilitaires PyAudio : gèrent l'entrée microphone et le streaming audio.
import argparse
import asyncio
import difflib
import os
import sys
from dataclasses import dataclass
from typing import AsyncIterator, Sequence
from rich.align import Align
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.text import Text
from mistralai.client import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.client.models import (
AudioFormat,
RealtimeTranscriptionError,
RealtimeTranscriptionSessionCreated,
TranscriptionStreamDone,
TranscriptionStreamTextDelta,
)
from pyaudio_utils import load_pyaudio
console = Console()Étape 2 : Définir les classes d'état et d'affichage
Dans cette étape, nous définissons les classes responsables de la gestion de l'état de la transcription et du rendu de l'interface utilisateur :
- DualTranscriptState : suit l'état des deux flux de transcription rapide et lent, incluant leurs sorties textuelles, leurs statuts et les éventuelles erreurs.
- DualTranscriptDisplay : affiche une interface utilisateur en direct dans le terminal, présentant les résultats de transcription des deux flux. Elle utilise la bibliothèque
richpour créer une mise en page visuellement attrayante avec un en-tête, un corps de transcription et un pied de page.
La classe DualTranscriptDisplay inclut des méthodes pour :
- Normaliser les mots pour comparaison.
- Calculer les textes d'affichage pour fusionner les résultats des deux flux.
- Appliquer des styles aux éléments d'interface utilisateur en fonction de leur statut.
@dataclass
class DualTranscriptState:
"""Tracks transcript state for dual-delay transcription."""
fast_full_text: str = ""
slow_full_text: str = ""
fast_status: str = "🔌 Connecting..."
slow_status: str = "🔌 Connecting..."
error: str | None = None
fast_done: bool = False
slow_done: bool = False
def set_error(self, message: str) -> None:
self.error = message
self.fast_status = "❌ Error"
self.slow_status = "❌ Error"
class DualTranscriptDisplay:
"""Renders a live dual-delay transcription UI."""
def __init__(
self,
*,
model: str,
fast_delay_ms: int,
slow_delay_ms: int,
state: DualTranscriptState,
) -> None:
self.model = model
self.fast_delay_ms = fast_delay_ms
self.slow_delay_ms = slow_delay_ms
self.state = state
@staticmethod
def _normalize_word(word: str) -> str:
return word.strip(".,!?;:\"'()[]{}").lower()
def _compute_display_texts(self) -> tuple[str, str]:
slow_words = self.state.slow_full_text.split()
fast_words = self.state.fast_full_text.split()
if not slow_words:
partial_text = f" {self.state.fast_full_text}".rstrip()
return "", partial_text
slow_norm = [self._normalize_word(word) for word in slow_words]
fast_norm = [self._normalize_word(word) for word in fast_words]
matcher = difflib.SequenceMatcher(None, slow_norm, fast_norm)
last_fast_index = 0
slow_progress = 0
for block in matcher.get_matching_blocks():
if block.size == 0:
continue
slow_end = block.a + block.size
if slow_end > slow_progress:
slow_progress = slow_end
last_fast_index = block.b + block.size
if last_fast_index < len(fast_words):
ahead_words = fast_words[last_fast_index:]
partial_text = " " + " ".join(ahead_words) if ahead_words else ""
else:
partial_text = ""
return self.state.slow_full_text, partial_text
@staticmethod
def _status_style(status: str) -> str:
if "Listening" in status:
return "green"
if "Connecting" in status:
return "yellow dim"
if "Done" in status or "Stopped" in status:
return "dim"
return "red"
def render(self) -> Layout:
layout = Layout()
header_text = Text()
header_text.append("│ ", style="dim")
header_text.append(self.model, style="dim")
header_text.append(" │ ", style="dim")
header_text.append(
f"fast {self.fast_delay_ms}ms", style="bright_yellow"
)
header_text.append(
f" {self.state.fast_status}",
style=self._status_style(self.state.fast_status),
)
header_text.append(" │ ", style="dim")
header_text.append(f"slow {self.slow_delay_ms}ms", style="white")
header_text.append(
f" {self.state.slow_status}",
style=self._status_style(self.state.slow_status),
)
header = Align.left(header_text, vertical="middle", pad=False)
final_text, partial_text = self._compute_display_texts()
transcript_text = Text()
if final_text or partial_text:
transcript_text.append(final_text, style="white")
transcript_text.append(partial_text, style="bright_yellow")
else:
transcript_text.append("...", style="dim")
transcript = Panel(
Align.left(transcript_text, vertical="top"),
border_style="dim",
padding=(1, 2),
)
footer_text = Text()
footer_text.append("ctrl+c", style="dim")
footer_text.append(" quit", style="dim italic")
footer = Align.left(footer_text, vertical="middle", pad=False)
if self.state.error:
layout.split_column(
Layout(header, name="header", size=1),
Layout(transcript, name="body"),
Layout(
Panel(Text(self.state.error, style="red"), border_style="red"),
name="error",
size=4,
),
Layout(footer, name="footer", size=1),
)
else:
layout.split_column(
Layout(header, name="header", size=1),
Layout(transcript, name="body"),
Layout(footer, name="footer", size=1),
)
return layoutÉtape 3 : Définir les fonctions de gestion audio
Dans cette étape, nous définissons des fonctions pour gérer l'entrée audio du microphone et les flux audio pour la transcription rapide et lente :
- iter_microphone : Capture l'audio du microphone avec PyAudio et génère des blocs de données audio de manière asynchrone. Cette fonction tourne en boucle, lisant en continu les données audio et les transmettant pour traitement.
- queue_audio_iter : Lit les blocs audio d'une file d'attente et les génère jusqu'à réception d'une valeur sentinelle (
None). Cela permet de découpler la capture audio du traitement. - broadcast_microphone : Lit l'audio du microphone une seule fois et le diffuse vers plusieurs files d'attente. Cela garantit que les flux de transcription rapide et lente reçoivent les mêmes données audio.
Ces fonctions fonctionnent ensemble pour assurer une capture efficace des données audio et leur distribution aux deux flux de transcription.
async def iter_microphone(
*,
sample_rate: int,
chunk_duration_ms: int,
) -> AsyncIterator[bytes]:
"""
Yield microphone PCM chunks using PyAudio (16-bit mono).
Encoding is always pcm_s16le.
"""
pyaudio = load_pyaudio()
p = pyaudio.PyAudio()
chunk_samples = int(sample_rate * chunk_duration_ms / 1000)
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=sample_rate,
input=True,
frames_per_buffer=chunk_samples,
)
loop = asyncio.get_running_loop()
try:
while True:
data = await loop.run_in_executor(None, stream.read, chunk_samples, False)
yield data
finally:
stream.stop_stream()
stream.close()
p.terminate()
async def queue_audio_iter(
queue: asyncio.Queue[bytes | None],
) -> AsyncIterator[bytes]:
"""Yield audio chunks from a queue until a None sentinel is received."""
while True:
chunk = await queue.get()
if chunk is None:
break
yield chunk
async def broadcast_microphone(
*,
sample_rate: int,
chunk_duration_ms: int,
queues: Sequence[asyncio.Queue[bytes | None]],
) -> None:
"""Read from the microphone once and broadcast to multiple queues."""
try:
async for chunk in iter_microphone(
sample_rate=sample_rate, chunk_duration_ms=chunk_duration_ms
):
for queue in queues:
await queue.put(chunk)
finally:
for queue in queues:
while True:
try:
queue.put_nowait(None)
break
except asyncio.QueueFull:
try:
queue.get_nowait()
except asyncio.QueueEmpty:
breakÉtape 4 : Définir les fonctions utilitaires
Dans cette étape, nous définissons des fonctions utilitaires pour gérer les statuts d'événements et analyser les arguments de ligne de commande :
- _status_for_event : Convertit les événements de transcription en messages de statut lisibles. Par exemple, elle renvoie « 🎤 Listening... » lors de la création d'une session et « ✅ Done » lorsque la transcription est terminée.
- parse_args : Analyse les arguments de ligne de commande pour configurer le processus de transcription. Cela inclut des paramètres tels que l'identifiant du modèle, les délais de transcription rapide et lente, le taux d'échantillonnage, la durée des blocs, la clé API et l'URL de base.
Ces fonctions utilitaires simplifient la logique principale et rendent le code plus modulaire et facile à maintenir.
def _status_for_event(event: object) -> str:
if isinstance(event, RealtimeTranscriptionSessionCreated):
return "🎤 Listening..."
return "✅ Done"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Dual-delay real-time microphone transcription."
)
parser.add_argument(
"--model",
default="voxtral-mini-transcribe-realtime-2602",
help="Model ID",
)
parser.add_argument(
"--fast-delay-ms",
type=int,
default=240,
help="Fast target streaming delay in ms",
)
parser.add_argument(
"--slow-delay-ms",
type=int,
default=2400,
help="Slow target streaming delay in ms",
)
parser.add_argument(
"--sample-rate",
type=int,
default=16000,
choices=[8000, 16000, 22050, 44100, 48000],
help="Sample rate in Hz",
)
parser.add_argument(
"--chunk-duration",
type=int,
default=10,
help="Chunk duration in ms",
)
parser.add_argument(
"--api-key",
default=os.environ.get("MISTRAL_API_KEY"),
help="Mistral API key",
)
parser.add_argument(
"--base-url",
default=os.environ.get("MISTRAL_BASE_URL", "wss://api.mistral.ai"),
)
return parser.parse_args()Étape 5 : Définir les fonctions de gestion des flux
Dans cette étape, nous définissons des fonctions pour gérer les flux de transcription et mettre à jour l'interface utilisateur :
- run_stream : Gère le flux de transcription pour le flux rapide ou lent. Elle traite les événements du flux de transcription, tels que la création de session, les deltas de texte et les événements de fin. Elle met à jour l'état en conséquence et déclenche les mises à jour de l'interface en envoyant des signaux à la file d'attente de mise à jour.
- ui_loop : Met à jour en continu l'interface utilisateur en fonction des signaux de la file d'attente de mise à jour. Elle utilise le composant
Livede la bibliothèquerichpour afficher l'interface en temps réel, offrant une expérience utilisateur fluide et réactive.
Ces fonctions fonctionnent ensemble pour garantir que les résultats de transcription s'affichent en temps réel et que l'interface reste réactive.
async def run_stream(
*,
client: Mistral,
model: str,
delay_ms: int,
audio_stream: AsyncIterator[bytes],
audio_format: AudioFormat,
state: DualTranscriptState,
update_queue: asyncio.Queue[None],
is_fast: bool,
) -> None:
try:
async for event in client.audio.realtime.transcribe_stream(
audio_stream=audio_stream,
model=model,
audio_format=audio_format,
target_streaming_delay_ms=delay_ms,
):
if isinstance(event, RealtimeTranscriptionSessionCreated):
if is_fast:
state.fast_status = _status_for_event(event)
else:
state.slow_status = _status_for_event(event)
elif isinstance(event, TranscriptionStreamTextDelta):
if is_fast:
state.fast_full_text += event.text
else:
state.slow_full_text += event.text
elif isinstance(event, TranscriptionStreamDone):
if is_fast:
state.fast_status = _status_for_event(event)
state.fast_done = True
else:
state.slow_status = _status_for_event(event)
state.slow_done = True
break
elif isinstance(event, RealtimeTranscriptionError):
state.set_error(str(event.error))
break
elif isinstance(event, UnknownRealtimeEvent):
continue
if update_queue.empty():
update_queue.put_nowait(None)
except Exception as exc: # pragma: no cover - safety net for UI demo
state.set_error(str(exc))
if update_queue.empty():
update_queue.put_nowait(None)
async def ui_loop(
display: DualTranscriptDisplay,
update_queue: asyncio.Queue[None],
stop_event: asyncio.Event,
*,
refresh_hz: float = 12.0,
) -> None:
with Live(
display.render(), console=console, refresh_per_second=refresh_hz, screen=True
) as live:
while not stop_event.is_set():
try:
await asyncio.wait_for(update_queue.get(), timeout=0.25)
except asyncio.TimeoutError:
pass
live.update(display.render())Étape 6 : Définir la fonction principale
Dans cette étape finale, nous définissons la fonction main, qui orchestre l'ensemble du processus de transcription à double délai :
- Initialisation : Analyser les arguments de ligne de commande, charger PyAudio et initialiser les objets d'état et d'affichage.
- Configuration : Créer un client Mistral, configurer les files d'attente audio pour les flux rapide et lent, et initialiser les primitives de synchronisation comme
stop_eventetupdate_queue. - Création des tâches : Lancer des tâches asynchrones pour diffuser l'audio vers les deux flux, exécuter les flux de transcription rapide et lent, et mettre à jour l'interface utilisateur en temps réel.
- Surveillance : Surveiller en continu les tâches pour détecter leur achèvement ou des erreurs. Si une erreur se produit ou si l'utilisateur interrompt le processus (par exemple, avec Ctrl+C), définir l'événement d'arrêt pour fermer toutes les tâches de manière propre.
- Nettoyage : Annuler toutes les tâches et attendre leur achèvement pour garantir une sortie propre.
La fonction main relie tous les éléments entre eux, garantissant que le processus de transcription à double délai s'exécute de manière fluide et efficace.
async def main() -> int:
args = parse_args()
api_key = args.api_key or os.environ["MISTRAL_API_KEY"]
try:
load_pyaudio()
except RuntimeError as exc:
console.print(str(exc), style="red")
return 1
state = DualTranscriptState()
display = DualTranscriptDisplay(
model=args.model,
fast_delay_ms=args.fast_delay_ms,
slow_delay_ms=args.slow_delay_ms,
state=state,
)
client = Mistral(api_key=api_key, server_url=args.base_url)
audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=args.sample_rate)
fast_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=50)
slow_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=50)
stop_event = asyncio.Event()
update_queue: asyncio.Queue[None] = asyncio.Queue(maxsize=1)
broadcaster = asyncio.create_task(
broadcast_microphone(
sample_rate=args.sample_rate,
chunk_duration_ms=args.chunk_duration,
queues=(fast_queue, slow_queue),
)
)
fast_task = asyncio.create_task(
run_stream(
client=client,
model=args.model,
delay_ms=args.fast_delay_ms,
audio_stream=queue_audio_iter(fast_queue),
audio_format=audio_format,
state=state,
update_queue=update_queue,
is_fast=True,
)
)
slow_task = asyncio.create_task(
run_stream(
client=client,
model=args.model,
delay_ms=args.slow_delay_ms,
audio_stream=queue_audio_iter(slow_queue),
audio_format=audio_format,
state=state,
update_queue=update_queue,
is_fast=False,
)
)
ui_task = asyncio.create_task(
ui_loop(display, update_queue, stop_event, refresh_hz=12.0)
)
try:
while True:
await asyncio.sleep(0.1)
for task in (broadcaster, fast_task, slow_task):
if not task.done():
continue
exc = task.exception()
if exc:
state.set_error(str(exc))
if update_queue.empty():
update_queue.put_nowait(None)
stop_event.set()
break
if state.error:
stop_event.set()
break
if state.fast_done and state.slow_done:
stop_event.set()
break
except KeyboardInterrupt:
state.fast_status = "⏹️ Stopped"
state.slow_status = "⏹️ Stopped"
stop_event.set()
finally:
broadcaster.cancel()
fast_task.cancel()
slow_task.cancel()
await asyncio.gather(broadcaster, fast_task, slow_task, return_exceptions=True)
await ui_task
return 0 if not state.error else 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))