Realtime

Realtime transcription allows you to transcribe audio as it is being spoken or recorded. This is useful for applications requiring immediate feedback, such as live captioning, voice assistants, or real-time note-taking.

Before You Start

Before You Start

Models with streaming capabilities

Realtime capable models:

  • Voxtral Mini Transcribe Realtime (voxtral-mini-transcribe-realtime-2602): Optimized for live transcription with ultra-low latency and high accuracy.
note

Realtime is currently not compatible with the diarize parameter. Use either one or the other.

Basic Usage

Basic Usage

note

Python Version: Before running the following script, ensure you have installed the mistralai[realtime] package:

pip install mistralai[realtime]

We allow you to provide any stream of audio as input as follows.

from mistralai import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta

import asyncio
import sys
from typing import AsyncIterator

api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)

audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)
audio_stream = ...

async def main():
    try:
        async for event in client.audio.realtime.transcribe_stream(
            audio_stream=audio_stream, # audio stream corresponds to any iterable of bytes
            model="voxtral-mini-transcribe-realtime-2602",
            audio_format=audio_format,
        ):
            if isinstance(event, RealtimeTranscriptionSessionCreated):
                print(f"Session created.")
            elif isinstance(event, TranscriptionStreamTextDelta):
                print(event.text, end="", flush=True)
            elif isinstance(event, TranscriptionStreamDone):
                print("Transcription done.")
            elif isinstance(event, RealtimeTranscriptionError):
                print(f"Error: {event}")
            elif isinstance(event, UnknownRealtimeEvent):
                print(f"Unknown event: {event}")
                continue
    except KeyboardInterrupt:
        print("Stopping...")

sys.exit(asyncio.run(main()))

Below we provide an implementation example using your microphone to stream audio input directly.

from mistralai import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta

import asyncio
import sys
from typing import AsyncIterator

api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)

#microphone is always pcm_s16le here
audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)

async def iter_microphone(
    *,
    sample_rate: int,
    chunk_duration_ms: int,
) -> AsyncIterator[bytes]:
    """
    Yield microphone PCM chunks using PyAudio (16-bit mono).
    Encoding is always pcm_s16le.
    """
    import pyaudio

    p = pyaudio.PyAudio()
    chunk_samples = int(sample_rate * chunk_duration_ms / 1000)

    stream = p.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=sample_rate,
        input=True,
        frames_per_buffer=chunk_samples,
    )

    loop = asyncio.get_running_loop()
    try:
        while True:
            # stream.read is blocking; run it off-thread
            data = await loop.run_in_executor(None, stream.read, chunk_samples, False)
            yield data
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()

audio_stream = iter_microphone(sample_rate=audio_format.sample_rate, chunk_duration_ms=480)

async def main():
    try:
        async for event in client.audio.realtime.transcribe_stream(
            audio_stream=audio_stream, # audio stream corresponds to any iterable of bytes
            model="voxtral-mini-transcribe-realtime-2602",
            audio_format=audio_format,
        ):
            if isinstance(event, RealtimeTranscriptionSessionCreated):
                print(f"Session created.")
            elif isinstance(event, TranscriptionStreamTextDelta):
                print(event.text, end="", flush=True)
            elif isinstance(event, TranscriptionStreamDone):
                print("Transcription done.")
            elif isinstance(event, RealtimeTranscriptionError):
                print(f"Error: {event}")
            elif isinstance(event, UnknownRealtimeEvent):
                print(f"Unknown event: {event}")
                continue
    except KeyboardInterrupt:
        print("Stopping...")

sys.exit(asyncio.run(main()))
Target delay

Target delay

Target delay enables to wait for a specific time before starting the transcription. It enables to gather context and improve accuracy. You can specify a target delay via the target_streaming_delay_ms parameter.

Basic Usage

Basic Usage

Here's a basic example of how to use it:

import asyncio
import sys
from typing import AsyncIterator

from mistralai import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.models import AudioFormat, RealtimeTranscriptionError, RealtimeTranscriptionSessionCreated, TranscriptionStreamDone, TranscriptionStreamTextDelta

api_key = "YOUR_MISTRAL_API_KEY"
client = Mistral(api_key=api_key)

audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=16000)
audio_stream = ... # audio stream corresponds to any iterable of bytes


async def main():
    try:
        async for event in client.audio.realtime.transcribe_stream(
            audio_stream=audio_stream,
            model="voxtral-mini-transcribe-realtime-2602",
            audio_format=audio_format,
            target_streaming_delay_ms=1000,
        ):
            if isinstance(event, RealtimeTranscriptionSessionCreated):
                print(f"Session created.")
            elif isinstance(event, TranscriptionStreamTextDelta):
                print(event.text, end="", flush=True)
            elif isinstance(event, TranscriptionStreamDone):
                print("Transcription done.")
            elif isinstance(event, RealtimeTranscriptionError):
                print(f"Error: {event}")
            elif isinstance(event, UnknownRealtimeEvent):
                print(f"Unknown event: {event}")
                continue
    except KeyboardInterrupt:
        print("Stopping...")

sys.exit(asyncio.run(main()))
Example use: Dual delay

Example use: Dual delay

Dual delay transcription allows you to balance between speed and accuracy. It uses two parallel streams:

  • Fast Stream: Provides quick but less accurate transcriptions with minimal delay.
  • Slow Stream: Provides more accurate transcriptions by gathering additional context, resulting in a longer delay.

This approach is useful for scenarios where both immediate feedback and high accuracy are required, such as live captioning or real-time note-taking.

How It Works

  1. Fast Stream: Transcribes audio with minimal delay (e.g., 240ms), providing quick but potentially less accurate results.
  2. Slow Stream: Transcribes audio with a longer delay (e.g., 2400ms), providing more accurate results by leveraging additional context.
  3. Combined Output: The final output merges results from both streams, ensuring a balance between speed and accuracy.

Use Cases

  • Live captioning for presentations or meetings
  • Real-time note-taking applications
  • Voice assistants requiring immediate feedback

Step 1: Import Dependencies

In this step, we import all the necessary libraries and modules required for the dual delay transcription. This includes:

  • Standard libraries: argparse for command-line argument parsing, asyncio for asynchronous operations, and difflib for comparing sequences.
  • Rich library: Used for creating a visually appealing terminal UI.
  • Mistral AI SDK: Provides the core functionality for realtime transcription.
  • PyAudio utilities: Handles microphone input and audio streaming.
import argparse
import asyncio
import difflib
import os
import sys
from dataclasses import dataclass
from typing import AsyncIterator, Sequence

from rich.align import Align
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.text import Text

from mistralai import Mistral
from mistralai.extra.realtime import UnknownRealtimeEvent
from mistralai.models import (
    AudioFormat,
    RealtimeTranscriptionError,
    RealtimeTranscriptionSessionCreated,
    TranscriptionStreamDone,
    TranscriptionStreamTextDelta,
)

from pyaudio_utils import load_pyaudio

console = Console()

Step 2: Define State and Display Classes

In this step, we define the classes responsible for managing the state of the transcription and rendering the UI:

  • DualTranscriptState: Tracks the state of both fast and slow transcription streams, including their text outputs, statuses, and any errors.
  • DualTranscriptDisplay: Renders a live UI in the terminal, showing the transcription results from both streams. It uses the rich library to create a visually appealing layout with a header, transcript body, and footer.

The DualTranscriptDisplay class includes methods for:

  • Normalizing words for comparison.
  • Computing the display texts to merge results from both streams.
  • Applying styles to the UI elements based on their status.
@dataclass
class DualTranscriptState:
    """Tracks transcript state for dual-delay transcription."""

    fast_full_text: str = ""
    slow_full_text: str = ""
    fast_status: str = "🔌 Connecting..."
    slow_status: str = "🔌 Connecting..."
    error: str | None = None
    fast_done: bool = False
    slow_done: bool = False

    def set_error(self, message: str) -> None:
        self.error = message
        self.fast_status = "❌ Error"
        self.slow_status = "❌ Error"


class DualTranscriptDisplay:
    """Renders a live dual-delay transcription UI."""

    def __init__(
        self,
        *,
        model: str,
        fast_delay_ms: int,
        slow_delay_ms: int,
        state: DualTranscriptState,
    ) -> None:
        self.model = model
        self.fast_delay_ms = fast_delay_ms
        self.slow_delay_ms = slow_delay_ms
        self.state = state

    @staticmethod
    def _normalize_word(word: str) -> str:
        return word.strip(".,!?;:\"'()[]{}").lower()

    def _compute_display_texts(self) -> tuple[str, str]:
        slow_words = self.state.slow_full_text.split()
        fast_words = self.state.fast_full_text.split()

        if not slow_words:
            partial_text = f" {self.state.fast_full_text}".rstrip()
            return "", partial_text

        slow_norm = [self._normalize_word(word) for word in slow_words]
        fast_norm = [self._normalize_word(word) for word in fast_words]

        matcher = difflib.SequenceMatcher(None, slow_norm, fast_norm)
        last_fast_index = 0
        slow_progress = 0
        for block in matcher.get_matching_blocks():
            if block.size == 0:
                continue
            slow_end = block.a + block.size
            if slow_end > slow_progress:
                slow_progress = slow_end
                last_fast_index = block.b + block.size

        if last_fast_index < len(fast_words):
            ahead_words = fast_words[last_fast_index:]
            partial_text = " " + " ".join(ahead_words) if ahead_words else ""
        else:
            partial_text = ""

        return self.state.slow_full_text, partial_text

    @staticmethod
    def _status_style(status: str) -> str:
        if "Listening" in status:
            return "green"
        if "Connecting" in status:
            return "yellow dim"
        if "Done" in status or "Stopped" in status:
            return "dim"
        return "red"

    def render(self) -> Layout:
        layout = Layout()

        header_text = Text()
        header_text.append("│ ", style="dim")
        header_text.append(self.model, style="dim")
        header_text.append(" │ ", style="dim")
        header_text.append(
            f"fast {self.fast_delay_ms}ms", style="bright_yellow"
        )
        header_text.append(
            f" {self.state.fast_status}",
            style=self._status_style(self.state.fast_status),
        )
        header_text.append(" │ ", style="dim")
        header_text.append(f"slow {self.slow_delay_ms}ms", style="white")
        header_text.append(
            f" {self.state.slow_status}",
            style=self._status_style(self.state.slow_status),
        )

        header = Align.left(header_text, vertical="middle", pad=False)

        final_text, partial_text = self._compute_display_texts()
        transcript_text = Text()
        if final_text or partial_text:
            transcript_text.append(final_text, style="white")
            transcript_text.append(partial_text, style="bright_yellow")
        else:
            transcript_text.append("...", style="dim")

        transcript = Panel(
            Align.left(transcript_text, vertical="top"),
            border_style="dim",
            padding=(1, 2),
        )

        footer_text = Text()
        footer_text.append("ctrl+c", style="dim")
        footer_text.append(" quit", style="dim italic")
        footer = Align.left(footer_text, vertical="middle", pad=False)

        if self.state.error:
            layout.split_column(
                Layout(header, name="header", size=1),
                Layout(transcript, name="body"),
                Layout(
                    Panel(Text(self.state.error, style="red"), border_style="red"),
                    name="error",
                    size=4,
                ),
                Layout(footer, name="footer", size=1),
            )
        else:
            layout.split_column(
                Layout(header, name="header", size=1),
                Layout(transcript, name="body"),
                Layout(footer, name="footer", size=1),
            )

        return layout

Step 3: Define Audio Handling Functions

In this step, we define functions to handle audio input from the microphone and manage the audio streams for both fast and slow transcription:

  • iter_microphone: Captures audio from the microphone using PyAudio and yields chunks of audio data asynchronously. This function runs in a loop, continuously reading audio data and yielding it for processing.
  • queue_audio_iter: Reads audio chunks from a queue and yields them until a sentinel value (None) is received. This allows for decoupling audio capture from processing.
  • broadcast_microphone: Reads audio from the microphone once and broadcasts it to multiple queues. This ensures that both the fast and slow transcription streams receive the same audio data.

These functions work together to ensure that audio data is efficiently captured and distributed to both transcription streams.

async def iter_microphone(
    *,
    sample_rate: int,
    chunk_duration_ms: int,
) -> AsyncIterator[bytes]:
    """
    Yield microphone PCM chunks using PyAudio (16-bit mono).
    Encoding is always pcm_s16le.
    """
    pyaudio = load_pyaudio()

    p = pyaudio.PyAudio()
    chunk_samples = int(sample_rate * chunk_duration_ms / 1000)

    stream = p.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=sample_rate,
        input=True,
        frames_per_buffer=chunk_samples,
    )

    loop = asyncio.get_running_loop()
    try:
        while True:
            data = await loop.run_in_executor(None, stream.read, chunk_samples, False)
            yield data
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()


async def queue_audio_iter(
    queue: asyncio.Queue[bytes | None],
) -> AsyncIterator[bytes]:
    """Yield audio chunks from a queue until a None sentinel is received."""
    while True:
        chunk = await queue.get()
        if chunk is None:
            break
        yield chunk


async def broadcast_microphone(
    *,
    sample_rate: int,
    chunk_duration_ms: int,
    queues: Sequence[asyncio.Queue[bytes | None]],
) -> None:
    """Read from the microphone once and broadcast to multiple queues."""
    try:
        async for chunk in iter_microphone(
            sample_rate=sample_rate, chunk_duration_ms=chunk_duration_ms
        ):
            for queue in queues:
                await queue.put(chunk)
    finally:
        for queue in queues:
            while True:
                try:
                    queue.put_nowait(None)
                    break
                except asyncio.QueueFull:
                    try:
                        queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break

Step 4: Define Helper Functions

In this step, we define helper functions to manage event statuses and parse command-line arguments:

  • _status_for_event: Converts transcription events into human-readable status messages. For example, it returns "🎤 Listening..." when a session is created and "✅ Done" when transcription is complete.
  • parse_args: Parses command-line arguments to configure the transcription process. This includes settings like the model ID, fast and slow delay times, sample rate, chunk duration, API key, and base URL.

These helper functions simplify the main logic and make the code more modular and easier to maintain.

def _status_for_event(event: object) -> str:
    if isinstance(event, RealtimeTranscriptionSessionCreated):
        return "🎤 Listening..."
    return "✅ Done"


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Dual-delay real-time microphone transcription."
    )
    parser.add_argument(
        "--model",
        default="voxtral-mini-transcribe-realtime-2602",
        help="Model ID",
    )
    parser.add_argument(
        "--fast-delay-ms",
        type=int,
        default=240,
        help="Fast target streaming delay in ms",
    )
    parser.add_argument(
        "--slow-delay-ms",
        type=int,
        default=2400,
        help="Slow target streaming delay in ms",
    )
    parser.add_argument(
        "--sample-rate",
        type=int,
        default=16000,
        choices=[8000, 16000, 22050, 44100, 48000],
        help="Sample rate in Hz",
    )
    parser.add_argument(
        "--chunk-duration",
        type=int,
        default=10,
        help="Chunk duration in ms",
    )
    parser.add_argument(
        "--api-key",
        default=os.environ.get("MISTRAL_API_KEY"),
        help="Mistral API key",
    )
    parser.add_argument(
        "--base-url",
        default=os.environ.get("MISTRAL_BASE_URL", "wss://api.mistral.ai"),
    )
    return parser.parse_args()

Step 5: Define Stream Handling Functions

In this step, we define functions to handle the transcription streams and update the UI:

  • run_stream: Manages the transcription stream for either the fast or slow stream. It processes events from the transcription stream, such as session creation, text deltas, and completion events. It updates the state accordingly and triggers UI updates by sending signals to the update queue.
  • ui_loop: Continuously updates the UI based on signals from the update queue. It uses the Live component from the rich library to render the UI in real-time, providing a smooth and responsive user experience.

These functions work together to ensure that the transcription results are displayed in real-time and the UI remains responsive.

async def run_stream(
    *,
    client: Mistral,
    model: str,
    delay_ms: int,
    audio_stream: AsyncIterator[bytes],
    audio_format: AudioFormat,
    state: DualTranscriptState,
    update_queue: asyncio.Queue[None],
    is_fast: bool,
) -> None:
    try:
        async for event in client.audio.realtime.transcribe_stream(
            audio_stream=audio_stream,
            model=model,
            audio_format=audio_format,
            target_streaming_delay_ms=delay_ms,
        ):
            if isinstance(event, RealtimeTranscriptionSessionCreated):
                if is_fast:
                    state.fast_status = _status_for_event(event)
                else:
                    state.slow_status = _status_for_event(event)
            elif isinstance(event, TranscriptionStreamTextDelta):
                if is_fast:
                    state.fast_full_text += event.text
                else:
                    state.slow_full_text += event.text
            elif isinstance(event, TranscriptionStreamDone):
                if is_fast:
                    state.fast_status = _status_for_event(event)
                    state.fast_done = True
                else:
                    state.slow_status = _status_for_event(event)
                    state.slow_done = True
                break
            elif isinstance(event, RealtimeTranscriptionError):
                state.set_error(str(event.error))
                break
            elif isinstance(event, UnknownRealtimeEvent):
                continue

            if update_queue.empty():
                update_queue.put_nowait(None)
    except Exception as exc:  # pragma: no cover - safety net for UI demo
        state.set_error(str(exc))
        if update_queue.empty():
            update_queue.put_nowait(None)


async def ui_loop(
    display: DualTranscriptDisplay,
    update_queue: asyncio.Queue[None],
    stop_event: asyncio.Event,
    *,
    refresh_hz: float = 12.0,
) -> None:
    with Live(
        display.render(), console=console, refresh_per_second=refresh_hz, screen=True
    ) as live:
        while not stop_event.is_set():
            try:
                await asyncio.wait_for(update_queue.get(), timeout=0.25)
            except asyncio.TimeoutError:
                pass
            live.update(display.render())

Step 6: Define the Main Function

In this final step, we define the main function, which orchestrates the entire dual delay transcription process:

  1. Initialization: Parse command-line arguments, load PyAudio, and initialize the state and display objects.
  2. Setup: Create a Mistral client, set up audio queues for fast and slow streams, and initialize synchronization primitives like stop_event and update_queue.
  3. Task Creation: Launch asynchronous tasks for:
    • Broadcasting audio to both streams.
    • Running the fast and slow transcription streams.
    • Updating the UI in real-time.
  4. Monitoring: Continuously monitor the tasks for completion or errors. If an error occurs or the user interrupts the process (e.g., with Ctrl+C), set the stop event to gracefully shut down all tasks.
  5. Cleanup: Cancel all tasks and wait for their completion to ensure a clean exit.

The main function ties everything together, ensuring that the dual delay transcription process runs smoothly and efficiently.

async def main() -> int:
    args = parse_args()
    api_key = args.api_key or os.environ["MISTRAL_API_KEY"]

    try:
        load_pyaudio()
    except RuntimeError as exc:
        console.print(str(exc), style="red")
        return 1

    state = DualTranscriptState()
    display = DualTranscriptDisplay(
        model=args.model,
        fast_delay_ms=args.fast_delay_ms,
        slow_delay_ms=args.slow_delay_ms,
        state=state,
    )

    client = Mistral(api_key=api_key, server_url=args.base_url)
    audio_format = AudioFormat(encoding="pcm_s16le", sample_rate=args.sample_rate)

    fast_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=50)
    slow_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=50)

    stop_event = asyncio.Event()
    update_queue: asyncio.Queue[None] = asyncio.Queue(maxsize=1)

    broadcaster = asyncio.create_task(
        broadcast_microphone(
            sample_rate=args.sample_rate,
            chunk_duration_ms=args.chunk_duration,
            queues=(fast_queue, slow_queue),
        )
    )

    fast_task = asyncio.create_task(
        run_stream(
            client=client,
            model=args.model,
            delay_ms=args.fast_delay_ms,
            audio_stream=queue_audio_iter(fast_queue),
            audio_format=audio_format,
            state=state,
            update_queue=update_queue,
            is_fast=True,
        )
    )

    slow_task = asyncio.create_task(
        run_stream(
            client=client,
            model=args.model,
            delay_ms=args.slow_delay_ms,
            audio_stream=queue_audio_iter(slow_queue),
            audio_format=audio_format,
            state=state,
            update_queue=update_queue,
            is_fast=False,
        )
    )

    ui_task = asyncio.create_task(
        ui_loop(display, update_queue, stop_event, refresh_hz=12.0)
    )

    try:
        while True:
            await asyncio.sleep(0.1)
            for task in (broadcaster, fast_task, slow_task):
                if not task.done():
                    continue
                exc = task.exception()
                if exc:
                    state.set_error(str(exc))
                    if update_queue.empty():
                        update_queue.put_nowait(None)
                    stop_event.set()
                    break
            if state.error:
                stop_event.set()
                break
            if state.fast_done and state.slow_done:
                stop_event.set()
                break
    except KeyboardInterrupt:
        state.fast_status = "⏹️ Stopped"
        state.slow_status = "⏹️ Stopped"
        stop_event.set()
    finally:
        broadcaster.cancel()
        fast_task.cancel()
        slow_task.cancel()
        await asyncio.gather(broadcaster, fast_task, slow_task, return_exceptions=True)
        await ui_task

    return 0 if not state.error else 1


if __name__ == "__main__":
    sys.exit(asyncio.run(main()))