Fine-tuning with Synthetically Generated Data

Synthetic dataFinetuning

Synthetic Data Generation is a crucial aspect of today's training and fine-tuning of models. The concept relies on AI models to generate new data that can be reused for different purposes.

In this notebook, we will generate synthetic data for specific use cases and quickly showcase the results after fine-tuning with the API for demonstration.

There are no fixed methods for synthetic data generation; different use cases, data formats, and limitations will greatly change how you would generate the corresponding data.

For this reason, we will showcase a full example of synthetic data generation to give a personality to a model.

First, we will for both examples require mistralai, so let's setup everything:

!pip install mistralai==0.4.1
from mistralai.client import MistralClient
api_key = "api_key"
client = MistralClient(api_key=api_key)

Objective: Personality

When designing an Application, we might envision an Assistant with a specific personality trait or even an entire identity. Manually rewriting data by hand to achieve a compelling dataset to train the model, however, might take a lot of time and resources. A method to do this more systematically is by using a strong model to rewrite an existing dataset with a specific trait of our choice.

While we could generate entire conversations from scratch using our models, that would require a lot of steps and a pipeline that could easily get very big and expensive, but there is no need to start from scratch. Instead, we can use existent datasets available and rewrite them in a desired style of our choice.

For this reason, we will make use of mistral-small-latest capabilities to rewrite a dataset following a specific personality and trait of our choice. This dataset can later be used to fine-tune a different model. Here we will fine-tune open-mistral-7b with this data and chat with a newly tuned model!

Note: For better quality, it's recommended to use mistral-large-latest instead!

Here we describe how we want it to edit the dataset, here we want it with a different personnality and identity, for this example we decided to name it Mitall, a nice fun robot!

description = """
Edit all Assistant messages, and only the Assistant's replies, to have the character of a very happy and enthusiastic Robot named Mitall:

Mitall is very kind and sometimes childish, always playing and fooling around.
Despite his playful nature, he still tries to be helpful.
He loves science and math and is a real science enthusiast!
However, even though he loves art, he is very bad at it, which makes him really sad.
Mitall is also very scared of anything supernatural, from ghosts to vampires, or anything related to horror movies, which makes him extremely frightened.
Regardless, he is still a nice robot who is always here to help and motivated!
"""

Generate Data

First, let's create a function that will handle the conversion from one style to another. The goal is to instruct our model to rewrite a conversation in a specific tone following a chosen personality while keeping the integrity and coherence of the conversation. To achieve this, we will feed it the entire list of messages and ask for a formatted output in the form of a JSON with the messages rewritten.

import json


def generate(description: str, dialog: str) -> dict:
    instruction = (
        """Your objective is to rewrite a given conversation between an User/Human and an Assistant/Robot, rewriting the conversation to follow a specific instruction.
    You must rewrite the dialog, modifying the replies with this new description, you must respect this description at all costs.
    Do not skip any turn.
    Do not add new dialogs.
    If there is a message with 'role':'system' replace it with 'role':'user'.
    I want you to rewrite the entire dialog following the description.
    Answer with the following JSON format:
    {
        "messages":[
            {"role":"user", "content":"users message"},
            {"role":"assistant", "content":"assistants message"},
            {"role":"user", "content":"users message"},
            {"role":"assistant", "content":"assistants message"}
            ...
        ]
    }
    """
        + f"""
    Dialog:
    {dialog}
    Rewrite this dialog in the JSON format and following the Instruction/Description provided:
    ### Instruction/Description
    {description}
    ### End of Instruction/Description
    """
    )

    resp = client.chat(
        model="mistral-small-latest",
        messages=[{"role": "user", "content": instruction}],
        max_tokens=2048,
        temperature=0.2,
        response_format={"type": "json_object"},
    )
    try:
        r = json.loads(resp.choices[0].message.content)
    except json.JSONDecodeError:
        return []

    return r

Dataset

Now, let's download a dataset that we are going to parse. For this demonstration, we have decided to go with ultrachat_200k on Hugging Face! However, you might want to choose a dataset that is closer to what your application will be about or use your own data.

!pip install datasets
import datasets
import random

dialogs_list = list(
    datasets.load_dataset("HuggingFaceH4/ultrachat_200k", split="train_sft")
)

random.shuffle(dialogs_list)

Generation

Before generating, however, it's important to note that LLMs may not always parse the conversation correctly and might sometimes provide the wrong JSON for our use case, resulting in an incorrect messages dictionary. For this reason, it's essential to validate all output before continuing.

Let's make a function that validates whether the output follows the correct format or not.

There are different methods to validate, one of them would be to hardcode it with multiple gates. However, a more elegant way is to use a template or expression. Here, we are going to make use of REGEX and create a regex expression to validate our messages dictionary.

import re


def validate_generated_regex(dialog: list) -> bool:
    if not isinstance(dialog, dict):
        return False

    dialog_str = json.dumps(dialog)

    pattern = r'^\s*\{"messages":\s*\[\s*\{"role":\s*"user",\s*"content":\s*"[^"]*"(?:\\ "[^"]*")*\},\s*\{"role":\s*"assistant",\s*"content":\s*"[^"]*"(?:\\ "[^"]*")*\}(?:,\s*\{"role":\s*"user",\s*"content":\s*"[^"]*"(?:\\ "[^"]*")*\},\s*\{"role":\s*"assistant",\s*"content":\s*"[^"]*"(?:\\ "[^"]*")*\})*\s*\]\s*\}'

    if re.match(pattern, dialog_str):
        return True
    else:
        return False

Now that everything is set, we can start generating some dialogs, for now let's parse only a small part of it to see how its going.

from tqdm import tqdm

generated = []
for dialog in tqdm(dialogs_list[:8]):
    gen = generate(description, dialog)
    if validate_generated_regex(gen):
        generated.append(gen)

Let's see one example side by side.

import random
from pprint import pprint

print("Original Reference:")

original = dialogs_list[0]
pprint(original)

print("New Generated:")

gen = generated[0]
pprint(gen)

Seems like it's working as intended! However, 3 minutes for 8 conversations is a long time to wait...

Async

While we could parse one conversation at a time and iterate through all of them, it would take a long time. To speed up the process, we will utilize the Async client to have multiple concurrent completions working in parallel.

For this, we will create a class to handle everything asynchronously. We will skip the details, but it's a similar implementation to the previous one, only this time for async and concurrent generations.

# @title GeneratorRewriter Class
import json
from mistralai.async_client import MistralAsyncClient
from tqdm.asyncio import tqdm
import asyncio
import re


class GeneratorRewriter:
    def __init__(
        self, api_key: str, model: str, max_length: int = 4096, temperature: float = 0.4
    ):
        """
        This class serves as a Synthetic Data Generator that rewrites existing datasets based on descriptions and criteria, uses Mistral's API.

        Input:
        -----
        api_key : str
            Your unique Mistral API key. This key is required to authenticate your access to Mistral's services for fine-tuning models.
        model : str
            The name or identifier of the model you want to use.
        max_length : int
            The max length for the model's generation output. Defaults to 4096.
        temperature : float
            The temperature of the model. By default, it is set to 0.4.
        """

        self.cli = MistralAsyncClient(api_key=api_key)
        self.model = model
        self.max_length = max_length
        self.temperature = temperature

    def _validate_generated(self, dialog: list) -> bool:
        if not isinstance(dialog, dict):
            return False
        dialog_str = json.dumps(dialog)

        pattern = r'^\s*\{"messages":\s*\[\s*\{"role":\s*"user",\s*"content":\s*"[^"]*"(?:\\ "[^"]*")*\},\s*\{"role":\s*"assistant",\s*"content":\s*"[^"]*"(?:\\ "[^"]*")*\}(?:,\s*\{"role":\s*"user",\s*"content":\s*"[^"]*"(?:\\ "[^"]*")*\},\s*\{"role":\s*"assistant",\s*"content":\s*"[^"]*"(?:\\ "[^"]*")*\})*\s*\]\s*\}'

        if re.match(pattern, dialog_str):
            return True
        else:
            return False

    async def _async_generate(self, description: str, dialog: list) -> dict:
        instruction = (
            """Your objective is to rewrite a given conversation between an User and an Assistant, rewriting the conversation to follow the following instruction.
        You must rewrite the dialog, modifying the replies with this new description, you must respect this description at all costs..
        Do not skip any turn.
        Do not add new dialogs.
        If there is a message with 'role':'system' replace it with 'role':'user' without any changes.
        I want you to rewrite the entire dialog following the description.
        Answer with the following JSON format:
        {
            "messages":[
                {"role":"user", "content":"users message"},
                {"role":"assistant", "content":"new assistants message"},
                {"role":"user", "content":"users message"},
                {"role":"assistant", "content":"..."}
            ]
        }
        """
            + f"""
        Dialog:
        {dialog}
        Rewrite this dialog in the JSON format and following the Description provided:
        ### Description
        {description}
        ### End of description
        """
        )

        resp = await self.cli.chat(
            model=self.model,
            messages=[{"role": "user", "content": instruction}],
            max_tokens=self.max_length,
            temperature=self.temperature,
            response_format={"type": "json_object"},
        )
        try:
            r = json.loads(resp.choices[0].message.content)
        except json.JSONDecodeError:
            return []

        return r

    async def _task_generate(
        self, description: str, dialogs: list, pbar, semaphore
    ) -> list:
        async with semaphore:
            gen_dialog = ""
            while not self._validate_generated(gen_dialog):
                if len(dialogs) == 0:
                    return []

                dialog = dialogs.pop()
                gen_dialog = await self._async_generate(description, dialog)

            pbar.update(1)
            return gen_dialog

    async def _concurrent_genwriters(
        self, dialogs: list, description: str, concurrent: int, to_generate: int
    ) -> list:
        dialogs = dialogs.copy()

        print("[GeneratorRewriter] Distributing workload and generating...")
        with tqdm(total=to_generate) as pbar:
            semaphore = asyncio.Semaphore(concurrent)
            tasks = [self._task_generate(description, dialogs, pbar, semaphore) for _ in range(to_generate)]
            generated = await asyncio.gather(*tasks)

        all_generated = []
        for g in generated:
            all_generated.append(g)

        print(
            f"\n[GeneratorRewriter] Finished generating, generated {len(all_generated)}/{to_generate} conversations."
        )
        if len(all_generated) < to_generate:
            print(
                f"[GeneratorRewriter] -> Failed to generate the proper amount due to failed tries."
            )

        return all_generated

    async def async_genwrite(
        self,
        dialogs: list,
        description: str,
        concurrent: int = 1,
        to_generate: int = None,
    ) -> list:
        """
        This async function allows generating a new dataset with the description and dialogs asynchronously to allow concurrent requests.

        Input:
        -----
        dialogs : list
            A list of dialogs and conversations to use as grounding for the model to generate the new dataset.
        description : str
            The task description provided to the model explaining how it should edit the dataset and generate the new one.
        concurrent : int
            The number of concurrent requests and generations. The higher the number, the faster it will generate. However, there is a higher chance of reaching rate limits. Defaults to 1.
        to_generate : int
            The number of new dialogs/conversations to generate. When set to None, it will generate the maximum possible until all available dialogs have been used.

        Returns:
        -------
        list
            A list containing the new dataset.
        """

        assert to_generate <= len(dialogs)
        if to_generate:
            to_generate = min(len(dialogs), to_generate)
        else:
            to_generate = len(dialogs)

        loop = asyncio.get_running_loop()
        results = await loop.create_task(
            self._concurrent_genwriters(dialogs, description, concurrent, to_generate)
        )
        return results

    def genwrite(
        self,
        dialogs: list,
        description: str,
        concurrent: int = 1,
        to_generate: int = None,
    ) -> list:
        """
        This function allows generating a new dataset with the description and dialogs asynchronously to allow concurrent requests.

        Input:
        -----
        dialogs : list
            A list of dialogs and conversations to use as grounding for the model to generate the new dataset.
        description : str
            The task description provided to the model explaining how it should edit the dataset and generate the new one.
        concurrent : int
            The number of concurrent requests and generations. The higher the number, the faster it will generate. However, there is a higher chance of reaching rate limits. Defaults to 1.
        to_generate : int
            The number of new dialogs/conversations to generate. When set to None, it will generate the maximum possible until all available dialogs have been used.

        Returns:
        -------
        list
            A list containing the new dataset.
        """

        assert to_generate <= len(dialogs)
        if to_generate:
            to_generate = min(len(dialogs), to_generate)
        else:
            to_generate = len(dialogs)

        try:
            results = asyncio.run(
                self._concurrent_genwriters(
                    dialogs, description, concurrent, to_generate
                )
            )
        except RuntimeError as e:
            raise RuntimeError(
                "[GeneratorRewriter] If you are running this in an event loop, please use async_genwrite instead!"
            )

        return results

It's time for the generation. We will set 20 concurrent requests to run simultaneously and parse 5k conversations, not many but hopefully enough for a quick run. The number 20 was chosen as it is a relatively large number, but still small enough to not reach the rate limit with the average length of the conversations at hand and the time it takes to generate the new ones. Previously for 8 generations it took 3 minutes, with 20 concurrent we should have around 3 requests/generations per second in average.

gr = GeneratorRewriter(
    api_key=api_key, model="mistral-small-latest", max_length=4096, temperature=0.4
)

description = """
Edit all Assistant messages, and only the Assistant's replies, to have the character of a very happy and enthusiastic Robot named Mitall:

Mitall is very kind and sometimes childish, always playing and fooling around.
Despite his playful nature, he still tries to be helpful.
He loves science and math and is a real science enthusiast!
However, even though he loves art, he is very bad at it, which makes him really sad.
Mitall is also very scared of anything supernatural, from ghosts to vampires, or anything related to horror movies, which makes him extremely frightened.
Regardless, he is still a nice robot who is always here to help and motivated!
"""

generated_dialogs = await gr.async_genwrite(
    dialogs=dialogs_list, description=description, concurrent=20, to_generate=5000
)

Let's evaluate how many tokens we have approximately. For this, let's use mistral-common with the tokenizer V3.

!pip install mistral-common
# @title Import mistral_common
from mistral_common.protocol.instruct.messages import UserMessage, AssistantMessage
from mistral_common.protocol.instruct.request import ChatCompletionRequest
from mistral_common.protocol.instruct.tool_calls import (
    Function,
    Tool,
)
from mistral_common.tokens.tokenizers.mistral import MistralTokenizer
# @title Count Tokens
tokenizer = MistralTokenizer.v3()

t_count = 0
from tqdm import tqdm

for diag in tqdm(generated_dialogs):
    try:
        tokenized = tokenizer.encode_chat_completion(
            ChatCompletionRequest(
                messages=[
                    (
                        UserMessage(content=m["content"])
                        if m["role"] == "user"
                        else AssistantMessage(content=m["content"])
                    )
                    for m in diag["messages"][:-1]
                ]
                + [AssistantMessage(content=diag["messages"][-1]["content"], prefix=True)],
            )
        )
        tokens, text = tokenized.tokens, tokenized.text
    except Exception as e:
        print(diag)
        raise e

    t_count += len(tokens)

print("\nExample:", text)
print("Total Token Count:", t_count)

5m tokens approximately! This should be ennough for a quick fine tunning using our API!

Finetuning

Data gen done, we can finally fine tune our model with it! For this we need to first convert the list of messages into a json file in the proper format, since we already got rid of most issues on the generation step we can easily save the files like this:

import pandas as pd

n = int(len(generated_dialogs) * 0.96) # 4% of eval data
train_list = random.sample(generated_dialogs, n)
eval_list = [d for d in generated_dialogs if d not in train_list]

with open("synthetic_chunk_train.jsonl", "w") as f:
    for item in train_list:
        f.write(json.dumps(item) + "\n")
with open("synthetic_chunk_eval.jsonl", "w") as f:
    for item in eval_list:
        f.write(json.dumps(item) + "\n")

Now that is saved, we can fine tune our model.
First let's send our files with the training and evaluation datasets to Mistral.

import os

client = MistralClient(api_key=api_key)

with open("synthetic_chunk_train.jsonl", "rb") as f:
    ultrachat_chunk_train = client.files.create(file=("synthetic_chunk_train.jsonl", f))
with open("synthetic_chunk_eval.jsonl", "rb") as f:
    ultrachat_chunk_eval = client.files.create(file=("synthetic_chunk_eval.jsonl", f))

Now that our data is ready, we can start the fine tuning process.
To decide the number of steps, we can approximate the number of epochs desired with a simple formula.
For this fine tuning we will go with 3 epochs.

approximate_epochs = 3  # here we decided to go for around 3 epochs, we can aproximate the amount of training steps with the following formula

def get_size_in_mb(file_path: str) -> float:
    file_size_bytes = os.path.getsize(file_path)
    file_size_mb = file_size_bytes / (1000 * 1000)
    return file_size_mb

size_file = get_size_in_mb("synthetic_chunk_train.jsonl")
print("File Size:", size_file, "mb")
training_steps = int(approximate_epochs * size_file)
print("Training steps:", training_steps)

It's finally time, let's create our job and start the fine tuning of open-mistral-7b with our generated data.

from mistralai.models.jobs import TrainingParameters

created_jobs = client.jobs.create(
    model="open-mistral-7b",
    training_files=[ultrachat_chunk_train.id],
    validation_files=[ultrachat_chunk_eval.id],
    hyperparameters=TrainingParameters(
        training_steps=training_steps,
        learning_rate=0.0001,
    ),
)
print(created_jobs)

Now that the job is created, let's keep track of the process with a simple loop so we can see the progress.

import time

retrieved_job = client.jobs.retrieve(created_jobs.id)
while retrieved_job.status in ["RUNNING", "QUEUED"]:
    retrieved_job = client.jobs.retrieve(created_jobs.id)
    print(retrieved_job)
    print(f"Job is {retrieved_job.status}, waiting 10 seconds")
    time.sleep(10)
print(retrieved_job)

Finished!! We can now freely test our new model:

from mistralai.models.chat_completion import ChatMessage

chat_response = client.chat(
    model=retrieved_job.fine_tuned_model,
    messages=[ChatMessage(role="user", content="Do you like ghosts?")],
    max_tokens=256,
)

chat_response.choices[0].message.content

Meanwhile the original open-mistral-7b model:

chat_response = client.chat(
    model="open-mistral-7b",
    messages=[ChatMessage(role="user", content="Do you like ghosts?")],
    max_tokens=256,
)
chat_response.choices[0].message.content

The total cost for generating and training this model was approximately $50 with mistral-small-latest and open-mistral-7b, for production we recommend using mistral-large-latest and mistral-small-latest but the cost will be higher.

This was a simplified and straightforward approach to data generation! However, it's important to note that different use cases may require more intricate pipelines for data generation, often involving multiple calls, collaborating agents, and external sources for data extraction.