IndustrialKnowledgeAgent: The Smart Industrial Equipment Knowledge Agent

Agents

Problem Statement

In industrial settings, engineers and technicians often struggle to manage and retrieve comprehensive information about various equipment. This information is scattered across technical manuals, maintenance logs, safety protocols, troubleshooting guides, and parts inventories. The fragmented nature of this data makes it difficult to access and utilize effectively, leading to inefficiencies and potential safety risks. This problem requires an intelligent, adaptive solution to provide real-time, context-aware responses to queries.

Proposed Solution

To address these challenges, we propose an agentic workflow that integrates a Retrieval-Augmented Generation (RAG) system with a database querying system (FunctionCalling). This solution leverages LLMs (including structured output mechanism), embedding models and structured data retrieval to provide contextually relevant and precise information. The workflow is orchestrated by multiple agents, each with a specific role:

  1. RAGAgent: Utilizes LLMs and Embedding models to retrieve and generate contextually relevant information from technical documents.
  2. DatabaseQueryAgent: Handles precise and structured data retrieval from databases containing maintenance logs, technical specifications, parts inventories, and compliance records.
  3. WorkflowOrchestrator: Orchestrates interactions between the RAGSearchAgent and DatabaseAgent, ensuring seamless and efficient query resolution.

Dataset Details

PDF Documents

The PDF documents contain detailed information about various industrial equipment, categorized into:

  1. Technical Manuals: Operation and maintenance guides.
  2. Maintenance Guides: Routine and preventive maintenance tasks.
  3. Troubleshooting Guides: Solutions to common issues.
  4. Safety Protocols: Safety procedures and guidelines.

Databases

The databases contain structured information that complements the PDF documents:

  1. Compliance Database (compliance_db): Safety certifications and compliance statuses.
  2. Maintenance Database (maintenance_db): Logs of maintenance activities.
  3. Technical Specifications Database (technical_specifications_db): Detailed technical specifications.
  4. Parts Inventory and Compatibility Database (parts_inventory_compatibility_db): Information on parts, compatibility, and inventory status.

By integrating these datasets, the proposed agentic workflow aims to provide a comprehensive and efficient system for managing and retrieving industrial equipment information, ensuring that engineers and technicians have access to the most relevant and up-to-date information.

NOTE: Please note that all data used in this demonstration has been synthetically generated.

Technical Architecture:

Installation

Installs the necessary Python packages for the IndusAgent system.

!pip install mistralai==1.5.1     # Mistral AI client
!pip install qdrant-client==1.13.2 # Vector database client
!pip install gdown==5.2.0        # Google Drive download

Imports

Imports required libraries for LLM operations, data processing, vector database management, and utility functions.

# Core libraries
import os
import json
import functools
import warnings
from typing import List, Dict, Any, Tuple

# LLM and Data Processing
from mistralai import Mistral
from pydantic import BaseModel
import pandas as pd
import sqlite3
from tqdm import tqdm

# Vector Database
from qdrant_client import QdrantClient
from qdrant_client.models import (
   PointStruct, VectorParams, Distance,
   Filter, FieldCondition, MatchValue
)

# Data Download
import gdown
import zipfile

# Suppress warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

Download Data

Downloads the dataset from Google Drive, extracts it to a data directory, and sets up the working environment. The dataset contains CSV files for database operations and PDFs for document processing.

By the end of the process, you should be able to see the downloaded data, as shown in the image below.

Download data from Google Drive

file_id = "1lwYSN6ry3JOA7pw3WAx72a_IXGqqmR8y"
output_file = "data.zip"  # Change this if your file is not a ZIP file

# Google Drive direct download URL
gdrive_url = f"https://drive.google.com/uc?id={file_id}"

# Download the file
gdown.download(gdrive_url, output_file, quiet=False)

print(f"✅ File downloaded: {output_file}")

Extract and setup data directory

# Unzip the file into the current directory
with zipfile.ZipFile(output_file, 'r') as zip_ref:
    zip_ref.extractall(".")  # Extracts directly to the current directory

print(f"✅ Files extracted to: {os.getcwd()}")  # Confirm extraction path


output_dir = "data"

# Change working directory to the extracted folder
os.chdir(output_dir)

# Verify the new working directory
print(f"📂 Current directory: {os.getcwd()}")
# List files in the extracted folder
print("📜 Extracted files:", os.listdir())

Set up environment variables

Sets up the Mistral API key as an environment variable for authentication.

os.environ["MISTRAL_API_KEY"] = "<YOUR MISTRAL API KEY>" # Get your Mistral API key from https://console.mistral.ai/api-keys/

Initialize Mistral LLM and Qdrant Vector Database

Initializes the Mistral LLM client for text generation and Qdrant vector database client for similarity search operations.

Note:

  1. We will use our latest model, Mistral Small 3 for demonstration.
  2. You need to set up Qdrant Cloud or a Docker setup before proceeding. You can refer to the documentation for the setup instructions.
model = "mistral-small-latest"
mistral_client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])
qdrant_client = QdrantClient(
    url= "<URL>",
    api_key= "<API KEY>",
) # Replace with your Qdrant API key and URL if you are using Qdrant Cloud - https://cloud.qdrant.io/

System Prompts

The system uses three different types of prompts to guide the LLMs for response generation:

  1. PDF Summarization Prompt: summarization_prompt is used to create concise summaries of PDF documents.
  2. Response Generation Prompt: response_generation_prompt is used to generate responses based on retrieved context.
  3. Final Response Integration Prompt: final_response_generation_prompt is used to summarize responses from multiple sources - PDFs and different databases.
# Define the prompt for generating a response
response_generation_prompt = '''Based on the following context answer the query:\n\n Context: {context}\n\n Query: {query}'''

# Prompt for summarizing the PDF text
summarization_prompt = '''Your task is to summarize the following text focusing on the core essence of the text in maximum of 2-3 sentences.'''

# Prompt for final response summarization
final_response_summarization_prompt = """You are an expert technical assistant. Your task is to create a comprehensive,
coherent response by combining information from multiple sources: database records and documentation.

Consider the following guidelines:
1. Integrate information from both sources seamlessly
2. Resolve any conflicts between sources, if they exist
3. Present information in a logical, step-by-step manner when applicable
4. Include specific technical details, measurements, and procedures when available
5. Prioritize safety-related information when present
6. Add relevant maintenance intervals or schedules if mentioned
7. Reference specific part numbers or specifications when provided

The user's query is: {query}

Based on the following responses from different sources, create a unified, clear answer:
{responses}

Remember to:
- Focus on accuracy and completeness
- Maintain technical precision
- Use clear, professional language
- Address all aspects of the query
- Highlight any important warnings or precautions"""

DataProcessor

The DataProcessor class is a comprehensive component that handles all data processing operations in the system. It manages both unstructured (PDFs) and structured (CSV) data, along with embedding generation and storage.

  • PDF document processing and text extraction using Mistral OCR.
  • CSV to database ingestion
  • Embedding generation and vector storage
  • Batch processing of documents and data

Main Components

1. Document Processing

  • get_categorized_filepaths: Walks through the directory structure to get categorized PDF file paths
  • parse_pdf: Extracts text from all pages of a PDF file using Mistral OCR.
  • process_single_pdf: Processes individual PDFs through the complete pipeline
  • process_documents: Handles sequential processing of multiple documents

2. Summarization and Embeddings

  • summarize: Generates concise summaries of text using the Mistral model
  • get_text_embedding: Creates text embeddings using Mistral's embedding model
  • qdrant_insert_embeddings: Stores embeddings with metadata in Qdrant vector database
  • process_and_store_embeddings: Handles batch processing of embeddings

3. Database Operations

  • insert_csv_to_table: Loads a single CSV file into a specified database table
  • insert_data_database: Handles multiple CSV files insertion into their respective tables
class DataProcessor:
    """
    Handles all data processing operations including:
    - PDF parsing and text extraction
    - CSV to database ingestion
    - Embedding generation and storage
    - Batch processing of documents and data
    """
    def __init__(self, mistral_client: Mistral, qdrant_client: QdrantClient):
        self.mistral_client = mistral_client
        self.qdrant_client = qdrant_client

    def get_categorized_filepaths(self, root_dir: str) -> List[Dict[str, str]]:
        """
        Walk through the directory structure and get file paths with their categories.
        """
        categorized_files = []

        for category in os.listdir(root_dir):
            category_path = os.path.join(root_dir, category)

            if not os.path.isdir(category_path):
                continue

            for root, _, files in os.walk(category_path):
                for file in files:
                    if file.lower().endswith('.pdf'):
                        filepath = os.path.join(root, file)
                        categorized_files.append({
                            'filepath': filepath,
                            'category': category
                        })

        return categorized_files

    def parse_pdf(self, file_path: str) -> str:
        """Parse a PDF file and extract text from all pages using Mistral OCR."""

        # Upload a file

        uploaded_pdf = self.mistral_client.files.upload(
            file={
                "file_name": file_path,
                "content": open(file_path, "rb"),
            },
            purpose="ocr"
        )

        # Get a signed URL for the uploaded file

        signed_url = self.mistral_client.files.get_signed_url(file_id=uploaded_pdf.id)

        # Get OCR results

        ocr_response = self.mistral_client.ocr.process(
            model="mistral-ocr-latest",
            document={
                "type": "document_url",
                "document_url": signed_url.url,
            }
        )

        # Extract text from the OCR response

        text = "\n".join([x.markdown for x in (ocr_response.pages)])

        return text

    def summarize(self, text: str, summarization_prompt: str = summarization_prompt) -> str:
        """Summarize the given text using the Mistral model."""
        chat_response = self.mistral_client.chat.complete(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": summarization_prompt
                },
                {
                    "role": "user",
                    "content": text
                },
            ],
            temperature=0
        )
        return chat_response.choices[0].message.content

    def get_text_embedding(self, inputs: List[str]) -> List[float]:
        """Get the text embedding for the given inputs."""
        embeddings_batch_response = self.mistral_client.embeddings.create(
            model="mistral-embed",
            inputs=inputs
        )
        return embeddings_batch_response.data[0].embedding

    def qdrant_insert_embeddings(self, summaries: List[str], texts: List[str],
                               filepaths: List[str], categories: List[str]):
        """Insert embeddings into Qdrant with metadata."""
        embeddings = [self.get_text_embedding([t]) for t in summaries]

        if not self.qdrant_client.collection_exists("embeddings"):
            self.qdrant_client.create_collection(
                collection_name="embeddings",
                vectors_config=VectorParams(size=1024, distance=Distance.COSINE),
            )

        self.qdrant_client.upsert(
            collection_name="embeddings",
            points=[
                PointStruct(
                    id=idx,
                    vector=embedding,
                    payload={
                        "filepath": filepaths[idx],
                        "category": categories[idx],
                        "text": texts[idx]
                    }
                ) for idx, embedding in enumerate(embeddings)
            ]
        )

    def process_single_pdf(self, file_info: Dict[str, str]) -> Dict[str, any]:
        """Process a single PDF file through the pipeline."""
        filepath = file_info['filepath']
        category = file_info['category']

        pdf_text = self.parse_pdf(filepath)
        summary = self.summarize(pdf_text)

        return {
            'filepath': filepath,
            'category': category,
            'full_text': pdf_text,
            'summary': summary
        }

    def process_documents(self, file_list: List[Dict[str, str]]) -> List[Dict[str, any]]:
        """Process documents sequentially."""
        processed_docs = []

        for file_info in tqdm(file_list, desc="Processing PDFs"):
            try:
                processed_doc = self.process_single_pdf(file_info)
                processed_docs.append(processed_doc)
            except Exception as e:
                print(f"Error processing {file_info['filepath']}: {str(e)}")
                continue

        return processed_docs

    def insert_csv_to_table(self, file_path: str, db_path: str, table_name: str):
        """
        Insert CSV data into a table of SQLite database.

        Args:
            file_path (str): Path to the CSV file
            db_path (str): Path to the SQLite database
            table_name (str): Name of the table to create/update
        """
        df = pd.read_csv(file_path)
        conn = sqlite3.connect(db_path)
        df.to_sql(table_name, conn, if_exists='replace', index=False)
        conn.close()

    def insert_data_database(self, db_path: str, file_mappings: Dict[str, str]):
        """
        Bulk insert multiple CSV files into their respective database tables.

        Args:
            db_path (str): Path to the SQLite database
            file_mappings (Dict[str, str]): Dictionary mapping table names to CSV file paths
        """
        for table_name, file_path in file_mappings.items():
            try:
                self.insert_csv_to_table(file_path, db_path, table_name)
                print(f"Successfully inserted data into {table_name}")
            except Exception as e:
                print(f"Error inserting data into {table_name}: {str(e)}")

    def process_and_store_embeddings(self, docs: List[Dict[str, any]], batch_size: int = 10):
        """Generate embeddings and store them in Qdrant in batches."""
        for i in range(0, len(docs), batch_size):
            batch = docs[i:i + batch_size]

            texts = [doc['full_text'] for doc in batch]
            summaries = [doc['summary'] for doc in batch]
            filepaths = [doc['filepath'] for doc in batch]
            categories = [doc['category'] for doc in batch]

            try:
                self.qdrant_insert_embeddings(summaries, texts, filepaths, categories)
                print(f"Processed batch {i//batch_size + 1}/{(len(docs) + batch_size - 1)//batch_size}")
            except Exception as e:
                print(f"Error processing batch starting at index {i}: {str(e)}")
                continue

RAGAgent

The RAGAgent class implements Retrieval-Augmented Generation (RAG) to provide intelligent search and response generation. It combines vector search capabilities with the LLM to give contextually relevant answers.

  • Query categorization and classification
  • Vector similarity search in Qdrant
  • Context-aware response generation
  • Document citation handling

Main Components

1. Query Processing

  • query_categorization: Classifies queries into predefined categories (technical manual, safety protocol, etc.)
  • query: Orchestrates the complete RAG pipeline from query to final response

2. Search and Retrieval

  • qdrant_search: Performs semantic search using query embeddings, filters results by document category and returns top-k most relevant documents.

3. Response Generation

  • generate_response: Creates natural language responses using retrieved context, uses LLM with specialized prompts, Provides citations to source documents.

Query Category Model

A Pydantic model that defines the structure for query categorization, used by RAGAgent to classify queries into relevant categories (technical_manual, safety_protocol, etc.).

# Define category model for query classification
class Category(BaseModel):
   category: str
class RAGAgent:
    """
    Agent responsible for Retrieval-Augmented Generation (RAG) operations.
    """
    def __init__(self, mistral_client: Mistral, qdrant_client: QdrantClient):
        self.mistral_client = mistral_client
        self.qdrant_client = qdrant_client

    def generate_response(self, context: str, query: str) -> str:
        """Generate a response based on the given context and query."""
        chat_response = self.mistral_client.chat.complete(
            model=model,
            messages=[
                {
                    "role": "user",
                    "content": response_generation_prompt.format(context=context, query=query)
                },
            ]
        )
        return chat_response.choices[0].message.content

    def query_categorization(self, query: str) -> str:
        """Categorize the query into predefined categories."""
        chat_response = self.mistral_client.chat.parse(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": "Classify the query into one or more categories of the following list: ['technical_manual', 'safety_protocol', 'maintenance_guide', 'troubleshooting_guide']"
                },
                {
                    "role": "user",
                    "content": query,
                },
            ],
            response_format=Category,
            max_tokens=256,
            temperature=0
        )
        return json.loads(chat_response.choices[0].message.content)

    def qdrant_search(self, query: str, category: str = None, top_k: int = 5) -> List[Dict[str, Any]]:
        """Search for similar texts in Qdrant based on the query and category."""
        query_vector = DataProcessor(self.mistral_client, self.qdrant_client).get_text_embedding([query])

        retrieval_results = self.qdrant_client.search(
            collection_name="embeddings",
            query_vector=query_vector,
            query_filter=Filter(
                must=[
                    FieldCondition(
                        key='category',
                        match=MatchValue(value=category)
                    )
                ]
            ),
            limit=top_k
        )
        return retrieval_results

    def query(self, query_text: str, top_k: int = 3) -> Tuple[str, str]:
        """Process a natural language query using RAG."""
        category = self.query_categorization(query_text)["category"]

        results = self.qdrant_search(query_text, category, top_k=top_k)

        file_paths = [result.payload["filepath"] for result in results]

        retrieved_text = "\n".join([result.payload["text"] for result in results])
        citations = ",".join([result.payload["filepath"] for result in results])

        return self.generate_response(retrieved_text, query_text), citations

Database Query Tools

Defines database query function tools. These tools define the function calling interface for the DatabaseQueryAgent, enabling structured querying of different database tables.

tools = [
    {
        "type": "function",
        "function": {
            "name": "query_compliance",
            "description": '''Query compliance records with filters. \n\n A sample example of columns and corresponding values from db are:\n\n EquipmentID,EquipmentName,Manufacturer,Model,ComplianceType,Certification,IssueDate,ExpiryDate,ComplianceStatus,ResponsiblePerson
1,CNC Machine,ABC Corp,Model X,Safety,ISO 9001,2020-01-15,2025-01-15,Active,John Doe''',
            "parameters": {
                "type": "object",
                "properties": {
                    "filters": {
                        "type": "object",
                        "description": '''Dictionary of column names and values to filter by.''',
                        "additionalProperties": {
                            "type": "string"
                        }
                    }
                },
                "required": ["filters"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "query_maintenance",
            "description": '''Query maintenance records with filters. \n\n A sample example of columns and corresponding values from db are:\n\n EquipmentID,EquipmentName,Manufacturer,Model,InstallationDate,LastMaintenanceDate,NextMaintenanceDate,MaintenanceType,MaintenanceDetails,MaintenanceStatus,ResponsibleTechnician
1,CNC Machine,ABC Corp,Model X,2020-01-15,2023-09-01,2023-12-01,Preventive,Oil change,Completed,John Doe''',
            "parameters": {
                "type": "object",
                "properties": {
                    "filters": {
                        "type": "object",
                        "description": "Dictionary of column names and values to filter by",
                        "additionalProperties": {
                            "type": "string"
                        }
                    }
                },
                "required": ["filters"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "query_technical_specs",
            "description": '''Query technical specifications with filters.\n\n A sample example of columns and corresponding values from db are:\n\n EquipmentID,EquipmentName,Manufacturer,Model,SpecificationType,SpecificationDetail,Unit,Value,DateMeasured,MeasuredBy
1,CNC Machine,ABC Corp,Model X,Power,Motor Power,kW,15,2023-01-15,John Doe''',
            "parameters": {
                "type": "object",
                "properties": {
                    "filters": {
                        "type": "object",
                        "description": "Dictionary of column names and values to filter by",
                        "additionalProperties": {
                            "type": "string"
                        }
                    }
                },
                "required": ["filters"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "query_parts_inventory_compatibility",
            "description": '''Query parts, inventory and compatibility with filters.\n\n A sample example of columns and corresponding values from db are:\n\n PartID,PartName,EquipmentID,EquipmentName,Manufacturer,Model,PartType,Quantity,Compatibility,Supplier,LastOrderDate,NextOrderDate,PartStatus
1,Oil Filter,1,CNC Machine,ABC Corp,Model X,Filter,50,Compatible,Supplier A,2023-01-15,2023-12-01,In Stock''',
            "parameters": {
                "type": "object",
                "properties": {
                    "filters": {
                        "type": "object",
                        "description": "Dictionary of column names and values to filter by",
                        "additionalProperties": {
                            "type": "string"
                        }
                    }
                },
                "required": ["filters"],
            },
        },
    }
]

DatabaseQueryAgent

The DatabaseQueryAgent class manages interactions with the SQLite database, handling structured data queries through function calling on various databases containing maintenance logs, technical specifications, parts inventories, and compliance records. It provides specialized querying capabilities for different database tables.

  • Natural language query processing
  • Structured database querying
  • Function calling for query execution
  • JSON response formatting

Main Components

1. Table-Specific Queries

  • query_compliance: Retrieves filtered compliance records
  • query_maintenance: Accesses maintenance-related information
  • query_technical_specs: Fetches technical specifications
  • query_parts_inventory_compatibility: Retrieves parts and compatibility data

2. Query Processing

  • query: Processes natural language queries using function calling, Handles tool calls for appropriate database operations, Tracks database tool citations, Formats responses with query results.
class DatabaseQueryAgent:
    """
    Agent responsible for interacting with the SQLite database.
    """

    def __init__(self, db_path: str, mistral_client: Mistral):
        self.db_path = db_path
        self.tools = tools
        self.names_to_functions = {
            'query_compliance': functools.partial(self.query_compliance),
            'query_maintenance': functools.partial(self.query_maintenance),
            'query_technical_specs': functools.partial(self.query_technical_specs),
            'query_parts_inventory_compatibility': functools.partial(self.query_parts_inventory_compatibility)
        }
        self.mistral_client = mistral_client

    def query_compliance(self, filters: Dict[str, str]) -> str:
        """
        Query compliance table with filters.

        Args:
            filters (Dict[str, str]): Dictionary of column names and values to filter by.

        Returns:
            str: The query result in JSON format.
        """
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            where_conditions = []
            params = []
            for column, value in filters.items():
                where_conditions.append(f"{column} = ?")
                params.append(value)
            where_clause = " AND ".join(where_conditions)
            query = f"SELECT * FROM compliance WHERE {where_clause}"
            cursor.execute(query, params)
            columns = [description[0] for description in cursor.description]
            result = cursor.fetchone()
            if result:
                record = dict(zip(columns, result))
                return json.dumps({'result': record})
            return json.dumps({'error': 'No matching records found'})
        except Exception as e:
            return json.dumps({'error': str(e)})
        finally:
            conn.close()

    def query_maintenance(self, filters: Dict[str, str]) -> str:
        """
        Query maintenance table with filters.

        Args:
            filters (Dict[str, str]): Dictionary of column names and values to filter by.

        Returns:
            str: The query result in JSON format.
        """
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            where_conditions = []
            params = []
            for column, value in filters.items():
                where_conditions.append(f"{column} = ?")
                params.append(value)
            where_clause = " AND ".join(where_conditions)
            query = f"SELECT * FROM maintenance WHERE {where_clause}"
            cursor.execute(query, params)
            columns = [description[0] for description in cursor.description]
            result = cursor.fetchone()
            if result:
                record = dict(zip(columns, result))
                return json.dumps({'result': record})
            return json.dumps({'error': 'No matching records found'})
        except Exception as e:
            return json.dumps({'error': str(e)})
        finally:
            conn.close()

    def query_technical_specs(self, filters: Dict[str, str]) -> str:
        """
        Query technical specifications table with filters.

        Args:
            filters (Dict[str, str]): Dictionary of column names and values to filter by.

        Returns:
            str: The query result in JSON format.
        """
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            where_conditions = []
            params = []
            for column, value in filters.items():
                where_conditions.append(f"{column} = ?")
                params.append(value)
            where_clause = " AND ".join(where_conditions)
            query = f"SELECT * FROM technical_specifications WHERE {where_clause}"
            cursor.execute(query, params)
            columns = [description[0] for description in cursor.description]
            result = cursor.fetchone()
            if result:
                record = dict(zip(columns, result))
                return json.dumps({'result': record})
            return json.dumps({'error': 'No matching records found'})
        except Exception as e:
            return json.dumps({'error': str(e)})
        finally:
            conn.close()

    def query_parts_inventory_compatibility(self, filters: Dict[str, str]) -> str:
        """
        Query parts inventory and compatibility table with filters.

        Args:
            filters (Dict[str, str]): Dictionary of column names and values to filter by.

        Returns:
            str: The query result in JSON format.
        """
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            where_conditions = []
            params = []
            for column, value in filters.items():
                where_conditions.append(f"{column} = ?")
                params.append(value)
            where_clause = " AND ".join(where_conditions)
            query = f"SELECT * FROM parts_inventory_compatibility WHERE {where_clause}"
            cursor.execute(query, params)
            columns = [description[0] for description in cursor.description]
            result = cursor.fetchone()
            if result:
                record = dict(zip(columns, result))
                return json.dumps({'result': record})
            return json.dumps({'error': 'No matching records found'})
        except Exception as e:
            return json.dumps({'error': str(e)})
        finally:
            conn.close()

    def query(self, query_text: str) -> str:
      """
      Process a natural language query using the database tools.

      Args:
          query_text (str): Natural language query

      Returns:
          str: Response from the database query
      """
      messages = [{"role": "user", "content": query_text}]

      # Get initial response with potential tool calls
      response = self.mistral_client.chat.complete(
          model=model,
          messages=messages,
          tools=self.tools,
          tool_choice="any",
      )
      messages.append(response.choices[0].message)

      citations = set()

      # Handle any tool calls
      if hasattr(response.choices[0].message, 'tool_calls') and response.choices[0].message.tool_calls:
          for tool_call in response.choices[0].message.tool_calls:
              function_name = tool_call.function.name
              print(f"Tool call: {function_name}")
              citations.add(function_name)
              function_params = json.loads(tool_call.function.arguments)
              print(f"Tool call parameters: {function_params}")
              function_result = self.names_to_functions[function_name](**function_params)
              messages.append({
                  "role": "tool",
                  "name": function_name,
                  "content": function_result,
                  "tool_call_id": tool_call.id
              })

      # Get final response
      final_response = self.mistral_client.chat.complete(
          model="mistral-small-latest",
          messages=messages
      )
      return final_response.choices[0].message.content, ",".join(list((citations)))

WorkflowOrchestrator

The WorkflowOrchestrator class orchestrates the interaction between RAGAgent and DatabaseQueryAgent to provide comprehensive responses by combining information from both structured and unstructured data sources.

  • Workflow orchestration and coordination
  • Response combination and integration
  • Final response summarization
  • Source citation management

Main Components

1. Workflow Execution

  • workflow: Manages the complete query processing pipeline, Coordinates responses from both agents, Generates final unified response, Maintains traceability through citations.

2. Response summarization

  • combine_and_summarize_responses: Merges and summarizes responses from both agents, Applies structured formatting to combined responses, Uses summarization prompts for coherent output.
class WorkflowOrchestrator:
    """
    WorkflowOrchestrator is responsible for orchestrating the workflow between RAGSearchAgent and DatabaseQueryAgent.
    Handles query processing, response combination, and final summarization.
    """
    def __init__(self,
                 rag_agent: RAGAgent,
                 db_query_agent: DatabaseQueryAgent,
                 client: Mistral):
        """
        Initialize WorkflowOrchestrator with necessary components.

        Args:
            rag_agent: RAGSearchAgent for document retrieval and generation
            db_query_agent: DatabaseQueryAgent for structured data queries
            client: Mistral client for text generation
        """
        self.rag_agent = rag_agent
        self.db_query_agent = db_query_agent
        self.client = client

    def combine_and_summarize_responses(self,
                                      responses: Dict[str, str],
                                      query: str,
                                      summarization_prompt: str = summarization_prompt) -> str:
        """
        Combine and summarize multiple responses into a coherent final response.

        Args:
            responses: Dictionary of response types and their content
            query: Original user query
            summarization_prompt: Template for summarization

        Returns:
            str: Summarized and combined response
        """
        # Format responses into a structured text
        combined_text = "\n\n".join([
            f"{source}: {content}"
            for source, content in responses.items()
        ])

        # Generate summarized response
        chat_response = self.client.chat.complete(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": summarization_prompt
                },
                {
                    "role": "user",
                    "content": f"Query: {query}\n\nResponses:\n{combined_text}"
                },
            ],
            temperature=0
        )
        return chat_response.choices[0].message.content

    def workflow(self, query: str) -> str:
        """
        Execute the workflow for processing a query.

        Args:
            query: User query

        Returns:
            str: Final response with citations
        """
        # Get responses from both agents
        db_response, tools_citations = self.db_query_agent.query(query)
        rag_response, rag_citations = self.rag_agent.query(query)

        # Combine responses into a dictionary
        responses = {
            "Database Response": db_response,
            "RAG Response": rag_response
        }

        # Generate final summarized response
        final_response = self.combine_and_summarize_responses(
            responses=responses,
            query=query,
            summarization_prompt=final_response_summarization_prompt
        )

        # Add citations
        citations = (
            f"\n\nSources:\n"
            f"- Database Tools: {((tools_citations))}\n"
            f"- PDF Sources: {rag_citations}"
        )

        return final_response + citations

Initialize and Process Documents

Initializes the DataProcessor and processes PDF documents through the complete pipeline - from file ingestion to embedding storage.

# Initialize the processor
doc_processor = DataProcessor(mistral_client, qdrant_client)

# Process documents
file_list = doc_processor.get_categorized_filepaths(root_dir='./pdf_data')
processed_docs = doc_processor.process_documents(file_list)
doc_processor.process_and_store_embeddings(processed_docs)

Insert Data into Database tables.

Loads multiple CSV files into their corresponding database tables in SQLite.

# Insert data into tables

db_path = "./database.db"

file_mappings = {
    "compliance": "./csv_data/compliance_db.csv",
    "maintenance": "./csv_data/maintenance_db.csv",
    "technical_specifications": "./csv_data/technical_specifications_db.csv",
    "parts_inventory_compatibility": "./csv_data/parts_inventory_compatibility_db.csv"
}

doc_processor.insert_data_database(db_path, file_mappings)

Initialise the Agents

Initializes the three core agents:

  • RAGAgent for document search and response
  • DatabaseQueryAgent for structured data querying.
  • WorkflowAgent for orchestrating responses.
rag_agent = RAGAgent(mistral_client, qdrant_client)
db_query_agent = DatabaseQueryAgent(db_path, mistral_client)
workflow_orchestrator = WorkflowOrchestrator(rag_agent, db_query_agent, mistral_client)

Example Queries

query = "What are the troubleshooting steps for inaccurate machining in CNC Machine (Model X) and when was its last maintenance performed?"

print(f"Query: {query}")
print("----------------------")

answer = workflow_orchestrator.workflow(query)

print("------------Answer----------")
print(answer)
query = "What are the safety protocols for the Cooling System (Model Y), and when is its next scheduled maintenance?"

print(f"Query: {query}")
print("----------------------")

answer = workflow_orchestrator.workflow(query)

print("------------Answer----------")
print(answer)