Problem Statement
In industrial settings, engineers and technicians often struggle to manage and retrieve comprehensive information about various equipment. This information is scattered across technical manuals, maintenance logs, safety protocols, troubleshooting guides, and parts inventories. The fragmented nature of this data makes it difficult to access and utilize effectively, leading to inefficiencies and potential safety risks. This problem requires an intelligent, adaptive solution to provide real-time, context-aware responses to queries.
Proposed Solution
To address these challenges, we propose an agentic workflow that integrates a Retrieval-Augmented Generation (RAG) system with a database querying system (FunctionCalling). This solution leverages LLMs (including structured output mechanism), embedding models and structured data retrieval to provide contextually relevant and precise information. The workflow is orchestrated by multiple agents, each with a specific role:
- RAGAgent: Utilizes LLMs and Embedding models to retrieve and generate contextually relevant information from technical documents.
- DatabaseQueryAgent: Handles precise and structured data retrieval from databases containing maintenance logs, technical specifications, parts inventories, and compliance records.
- WorkflowOrchestrator: Orchestrates interactions between the RAGSearchAgent and DatabaseAgent, ensuring seamless and efficient query resolution.
Dataset Details
PDF Documents
The PDF documents contain detailed information about various industrial equipment, categorized into:
- Technical Manuals: Operation and maintenance guides.
- Maintenance Guides: Routine and preventive maintenance tasks.
- Troubleshooting Guides: Solutions to common issues.
- Safety Protocols: Safety procedures and guidelines.
Databases
The databases contain structured information that complements the PDF documents:
- Compliance Database (
compliance_db
): Safety certifications and compliance statuses. - Maintenance Database (
maintenance_db
): Logs of maintenance activities. - Technical Specifications Database (
technical_specifications_db
): Detailed technical specifications. - Parts Inventory and Compatibility Database (
parts_inventory_compatibility_db
): Information on parts, compatibility, and inventory status.
By integrating these datasets, the proposed agentic workflow aims to provide a comprehensive and efficient system for managing and retrieving industrial equipment information, ensuring that engineers and technicians have access to the most relevant and up-to-date information.
NOTE: Please note that all data used in this demonstration has been synthetically generated.
Technical Architecture:
Installation
Installs the necessary Python packages for the IndusAgent system.
!pip install mistralai==1.5.1 # Mistral AI client
!pip install qdrant-client==1.13.2 # Vector database client
!pip install gdown==5.2.0 # Google Drive download
Imports
Imports required libraries for LLM operations, data processing, vector database management, and utility functions.
# Core libraries
import os
import json
import functools
import warnings
from typing import List, Dict, Any, Tuple
# LLM and Data Processing
from mistralai import Mistral
from pydantic import BaseModel
import pandas as pd
import sqlite3
from tqdm import tqdm
# Vector Database
from qdrant_client import QdrantClient
from qdrant_client.models import (
PointStruct, VectorParams, Distance,
Filter, FieldCondition, MatchValue
)
# Data Download
import gdown
import zipfile
# Suppress warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
Download Data
Downloads the dataset from Google Drive, extracts it to a data directory, and sets up the working environment. The dataset contains CSV files for database operations and PDFs for document processing.
By the end of the process, you should be able to see the downloaded data, as shown in the image below.
Download data from Google Drive
file_id = "1lwYSN6ry3JOA7pw3WAx72a_IXGqqmR8y"
output_file = "data.zip" # Change this if your file is not a ZIP file
# Google Drive direct download URL
gdrive_url = f"https://drive.google.com/uc?id={file_id}"
# Download the file
gdown.download(gdrive_url, output_file, quiet=False)
print(f"✅ File downloaded: {output_file}")
Extract and setup data directory
# Unzip the file into the current directory
with zipfile.ZipFile(output_file, 'r') as zip_ref:
zip_ref.extractall(".") # Extracts directly to the current directory
print(f"✅ Files extracted to: {os.getcwd()}") # Confirm extraction path
output_dir = "data"
# Change working directory to the extracted folder
os.chdir(output_dir)
# Verify the new working directory
print(f"📂 Current directory: {os.getcwd()}")
# List files in the extracted folder
print("📜 Extracted files:", os.listdir())
Set up environment variables
Sets up the Mistral API key as an environment variable for authentication.
os.environ["MISTRAL_API_KEY"] = "<YOUR MISTRAL API KEY>" # Get your Mistral API key from https://console.mistral.ai/api-keys/
Initialize Mistral LLM and Qdrant Vector Database
Initializes the Mistral LLM client for text generation and Qdrant vector database client for similarity search operations.
Note:
- We will use our latest model,
Mistral Small 3
for demonstration. - You need to set up Qdrant Cloud or a Docker setup before proceeding. You can refer to the documentation for the setup instructions.
model = "mistral-small-latest"
mistral_client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])
qdrant_client = QdrantClient(
url= "<URL>",
api_key= "<API KEY>",
) # Replace with your Qdrant API key and URL if you are using Qdrant Cloud - https://cloud.qdrant.io/
System Prompts
The system uses three different types of prompts to guide the LLMs for response generation:
- PDF Summarization Prompt:
summarization_prompt
is used to create concise summaries of PDF documents. - Response Generation Prompt:
response_generation_prompt
is used to generate responses based on retrieved context. - Final Response Integration Prompt:
final_response_generation_prompt
is used to summarize responses from multiple sources - PDFs and different databases.
# Define the prompt for generating a response
response_generation_prompt = '''Based on the following context answer the query:\n\n Context: {context}\n\n Query: {query}'''
# Prompt for summarizing the PDF text
summarization_prompt = '''Your task is to summarize the following text focusing on the core essence of the text in maximum of 2-3 sentences.'''
# Prompt for final response summarization
final_response_summarization_prompt = """You are an expert technical assistant. Your task is to create a comprehensive,
coherent response by combining information from multiple sources: database records and documentation.
Consider the following guidelines:
1. Integrate information from both sources seamlessly
2. Resolve any conflicts between sources, if they exist
3. Present information in a logical, step-by-step manner when applicable
4. Include specific technical details, measurements, and procedures when available
5. Prioritize safety-related information when present
6. Add relevant maintenance intervals or schedules if mentioned
7. Reference specific part numbers or specifications when provided
The user's query is: {query}
Based on the following responses from different sources, create a unified, clear answer:
{responses}
Remember to:
- Focus on accuracy and completeness
- Maintain technical precision
- Use clear, professional language
- Address all aspects of the query
- Highlight any important warnings or precautions"""
DataProcessor
The DataProcessor
class is a comprehensive component that handles all data processing operations in the system. It manages both unstructured (PDFs) and structured (CSV) data, along with embedding generation and storage.
- PDF document processing and text extraction using Mistral OCR.
- CSV to database ingestion
- Embedding generation and vector storage
- Batch processing of documents and data
Main Components
1. Document Processing
get_categorized_filepaths
: Walks through the directory structure to get categorized PDF file pathsparse_pdf
: Extracts text from all pages of a PDF file using Mistral OCR.process_single_pdf
: Processes individual PDFs through the complete pipelineprocess_documents
: Handles sequential processing of multiple documents
2. Summarization and Embeddings
summarize
: Generates concise summaries of text using the Mistral modelget_text_embedding
: Creates text embeddings using Mistral's embedding modelqdrant_insert_embeddings
: Stores embeddings with metadata in Qdrant vector databaseprocess_and_store_embeddings
: Handles batch processing of embeddings
3. Database Operations
insert_csv_to_table
: Loads a single CSV file into a specified database tableinsert_data_database
: Handles multiple CSV files insertion into their respective tables
class DataProcessor:
"""
Handles all data processing operations including:
- PDF parsing and text extraction
- CSV to database ingestion
- Embedding generation and storage
- Batch processing of documents and data
"""
def __init__(self, mistral_client: Mistral, qdrant_client: QdrantClient):
self.mistral_client = mistral_client
self.qdrant_client = qdrant_client
def get_categorized_filepaths(self, root_dir: str) -> List[Dict[str, str]]:
"""
Walk through the directory structure and get file paths with their categories.
"""
categorized_files = []
for category in os.listdir(root_dir):
category_path = os.path.join(root_dir, category)
if not os.path.isdir(category_path):
continue
for root, _, files in os.walk(category_path):
for file in files:
if file.lower().endswith('.pdf'):
filepath = os.path.join(root, file)
categorized_files.append({
'filepath': filepath,
'category': category
})
return categorized_files
def parse_pdf(self, file_path: str) -> str:
"""Parse a PDF file and extract text from all pages using Mistral OCR."""
# Upload a file
uploaded_pdf = self.mistral_client.files.upload(
file={
"file_name": file_path,
"content": open(file_path, "rb"),
},
purpose="ocr"
)
# Get a signed URL for the uploaded file
signed_url = self.mistral_client.files.get_signed_url(file_id=uploaded_pdf.id)
# Get OCR results
ocr_response = self.mistral_client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": signed_url.url,
}
)
# Extract text from the OCR response
text = "\n".join([x.markdown for x in (ocr_response.pages)])
return text
def summarize(self, text: str, summarization_prompt: str = summarization_prompt) -> str:
"""Summarize the given text using the Mistral model."""
chat_response = self.mistral_client.chat.complete(
model=model,
messages=[
{
"role": "system",
"content": summarization_prompt
},
{
"role": "user",
"content": text
},
],
temperature=0
)
return chat_response.choices[0].message.content
def get_text_embedding(self, inputs: List[str]) -> List[float]:
"""Get the text embedding for the given inputs."""
embeddings_batch_response = self.mistral_client.embeddings.create(
model="mistral-embed",
inputs=inputs
)
return embeddings_batch_response.data[0].embedding
def qdrant_insert_embeddings(self, summaries: List[str], texts: List[str],
filepaths: List[str], categories: List[str]):
"""Insert embeddings into Qdrant with metadata."""
embeddings = [self.get_text_embedding([t]) for t in summaries]
if not self.qdrant_client.collection_exists("embeddings"):
self.qdrant_client.create_collection(
collection_name="embeddings",
vectors_config=VectorParams(size=1024, distance=Distance.COSINE),
)
self.qdrant_client.upsert(
collection_name="embeddings",
points=[
PointStruct(
id=idx,
vector=embedding,
payload={
"filepath": filepaths[idx],
"category": categories[idx],
"text": texts[idx]
}
) for idx, embedding in enumerate(embeddings)
]
)
def process_single_pdf(self, file_info: Dict[str, str]) -> Dict[str, any]:
"""Process a single PDF file through the pipeline."""
filepath = file_info['filepath']
category = file_info['category']
pdf_text = self.parse_pdf(filepath)
summary = self.summarize(pdf_text)
return {
'filepath': filepath,
'category': category,
'full_text': pdf_text,
'summary': summary
}
def process_documents(self, file_list: List[Dict[str, str]]) -> List[Dict[str, any]]:
"""Process documents sequentially."""
processed_docs = []
for file_info in tqdm(file_list, desc="Processing PDFs"):
try:
processed_doc = self.process_single_pdf(file_info)
processed_docs.append(processed_doc)
except Exception as e:
print(f"Error processing {file_info['filepath']}: {str(e)}")
continue
return processed_docs
def insert_csv_to_table(self, file_path: str, db_path: str, table_name: str):
"""
Insert CSV data into a table of SQLite database.
Args:
file_path (str): Path to the CSV file
db_path (str): Path to the SQLite database
table_name (str): Name of the table to create/update
"""
df = pd.read_csv(file_path)
conn = sqlite3.connect(db_path)
df.to_sql(table_name, conn, if_exists='replace', index=False)
conn.close()
def insert_data_database(self, db_path: str, file_mappings: Dict[str, str]):
"""
Bulk insert multiple CSV files into their respective database tables.
Args:
db_path (str): Path to the SQLite database
file_mappings (Dict[str, str]): Dictionary mapping table names to CSV file paths
"""
for table_name, file_path in file_mappings.items():
try:
self.insert_csv_to_table(file_path, db_path, table_name)
print(f"Successfully inserted data into {table_name}")
except Exception as e:
print(f"Error inserting data into {table_name}: {str(e)}")
def process_and_store_embeddings(self, docs: List[Dict[str, any]], batch_size: int = 10):
"""Generate embeddings and store them in Qdrant in batches."""
for i in range(0, len(docs), batch_size):
batch = docs[i:i + batch_size]
texts = [doc['full_text'] for doc in batch]
summaries = [doc['summary'] for doc in batch]
filepaths = [doc['filepath'] for doc in batch]
categories = [doc['category'] for doc in batch]
try:
self.qdrant_insert_embeddings(summaries, texts, filepaths, categories)
print(f"Processed batch {i//batch_size + 1}/{(len(docs) + batch_size - 1)//batch_size}")
except Exception as e:
print(f"Error processing batch starting at index {i}: {str(e)}")
continue
RAGAgent
The RAGAgent
class implements Retrieval-Augmented Generation (RAG) to provide intelligent search and response generation. It combines vector search capabilities with the LLM to give contextually relevant answers.
- Query categorization and classification
- Vector similarity search in Qdrant
- Context-aware response generation
- Document citation handling
Main Components
1. Query Processing
query_categorization
: Classifies queries into predefined categories (technical manual, safety protocol, etc.)query
: Orchestrates the complete RAG pipeline from query to final response
2. Search and Retrieval
qdrant_search
: Performs semantic search using query embeddings, filters results by document category and returns top-k most relevant documents.
3. Response Generation
generate_response
: Creates natural language responses using retrieved context, uses LLM with specialized prompts, Provides citations to source documents.
Query Category Model
A Pydantic model that defines the structure for query categorization, used by RAGAgent to classify queries into relevant categories (technical_manual, safety_protocol, etc.).
# Define category model for query classification
class Category(BaseModel):
category: str
class RAGAgent:
"""
Agent responsible for Retrieval-Augmented Generation (RAG) operations.
"""
def __init__(self, mistral_client: Mistral, qdrant_client: QdrantClient):
self.mistral_client = mistral_client
self.qdrant_client = qdrant_client
def generate_response(self, context: str, query: str) -> str:
"""Generate a response based on the given context and query."""
chat_response = self.mistral_client.chat.complete(
model=model,
messages=[
{
"role": "user",
"content": response_generation_prompt.format(context=context, query=query)
},
]
)
return chat_response.choices[0].message.content
def query_categorization(self, query: str) -> str:
"""Categorize the query into predefined categories."""
chat_response = self.mistral_client.chat.parse(
model=model,
messages=[
{
"role": "system",
"content": "Classify the query into one or more categories of the following list: ['technical_manual', 'safety_protocol', 'maintenance_guide', 'troubleshooting_guide']"
},
{
"role": "user",
"content": query,
},
],
response_format=Category,
max_tokens=256,
temperature=0
)
return json.loads(chat_response.choices[0].message.content)
def qdrant_search(self, query: str, category: str = None, top_k: int = 5) -> List[Dict[str, Any]]:
"""Search for similar texts in Qdrant based on the query and category."""
query_vector = DataProcessor(self.mistral_client, self.qdrant_client).get_text_embedding([query])
retrieval_results = self.qdrant_client.search(
collection_name="embeddings",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key='category',
match=MatchValue(value=category)
)
]
),
limit=top_k
)
return retrieval_results
def query(self, query_text: str, top_k: int = 3) -> Tuple[str, str]:
"""Process a natural language query using RAG."""
category = self.query_categorization(query_text)["category"]
results = self.qdrant_search(query_text, category, top_k=top_k)
file_paths = [result.payload["filepath"] for result in results]
retrieved_text = "\n".join([result.payload["text"] for result in results])
citations = ",".join([result.payload["filepath"] for result in results])
return self.generate_response(retrieved_text, query_text), citations
Database Query Tools
Defines database query function tools. These tools define the function calling interface for the DatabaseQueryAgent, enabling structured querying of different database tables.
tools = [
{
"type": "function",
"function": {
"name": "query_compliance",
"description": '''Query compliance records with filters. \n\n A sample example of columns and corresponding values from db are:\n\n EquipmentID,EquipmentName,Manufacturer,Model,ComplianceType,Certification,IssueDate,ExpiryDate,ComplianceStatus,ResponsiblePerson
1,CNC Machine,ABC Corp,Model X,Safety,ISO 9001,2020-01-15,2025-01-15,Active,John Doe''',
"parameters": {
"type": "object",
"properties": {
"filters": {
"type": "object",
"description": '''Dictionary of column names and values to filter by.''',
"additionalProperties": {
"type": "string"
}
}
},
"required": ["filters"],
},
},
},
{
"type": "function",
"function": {
"name": "query_maintenance",
"description": '''Query maintenance records with filters. \n\n A sample example of columns and corresponding values from db are:\n\n EquipmentID,EquipmentName,Manufacturer,Model,InstallationDate,LastMaintenanceDate,NextMaintenanceDate,MaintenanceType,MaintenanceDetails,MaintenanceStatus,ResponsibleTechnician
1,CNC Machine,ABC Corp,Model X,2020-01-15,2023-09-01,2023-12-01,Preventive,Oil change,Completed,John Doe''',
"parameters": {
"type": "object",
"properties": {
"filters": {
"type": "object",
"description": "Dictionary of column names and values to filter by",
"additionalProperties": {
"type": "string"
}
}
},
"required": ["filters"],
},
},
},
{
"type": "function",
"function": {
"name": "query_technical_specs",
"description": '''Query technical specifications with filters.\n\n A sample example of columns and corresponding values from db are:\n\n EquipmentID,EquipmentName,Manufacturer,Model,SpecificationType,SpecificationDetail,Unit,Value,DateMeasured,MeasuredBy
1,CNC Machine,ABC Corp,Model X,Power,Motor Power,kW,15,2023-01-15,John Doe''',
"parameters": {
"type": "object",
"properties": {
"filters": {
"type": "object",
"description": "Dictionary of column names and values to filter by",
"additionalProperties": {
"type": "string"
}
}
},
"required": ["filters"],
},
},
},
{
"type": "function",
"function": {
"name": "query_parts_inventory_compatibility",
"description": '''Query parts, inventory and compatibility with filters.\n\n A sample example of columns and corresponding values from db are:\n\n PartID,PartName,EquipmentID,EquipmentName,Manufacturer,Model,PartType,Quantity,Compatibility,Supplier,LastOrderDate,NextOrderDate,PartStatus
1,Oil Filter,1,CNC Machine,ABC Corp,Model X,Filter,50,Compatible,Supplier A,2023-01-15,2023-12-01,In Stock''',
"parameters": {
"type": "object",
"properties": {
"filters": {
"type": "object",
"description": "Dictionary of column names and values to filter by",
"additionalProperties": {
"type": "string"
}
}
},
"required": ["filters"],
},
},
}
]
DatabaseQueryAgent
The DatabaseQueryAgent
class manages interactions with the SQLite database, handling structured data queries through function calling on various databases containing maintenance logs, technical specifications, parts inventories, and compliance records. It provides specialized querying capabilities for different database tables.
- Natural language query processing
- Structured database querying
- Function calling for query execution
- JSON response formatting
Main Components
1. Table-Specific Queries
query_compliance
: Retrieves filtered compliance recordsquery_maintenance
: Accesses maintenance-related informationquery_technical_specs
: Fetches technical specificationsquery_parts_inventory_compatibility
: Retrieves parts and compatibility data
2. Query Processing
query
: Processes natural language queries using function calling, Handles tool calls for appropriate database operations, Tracks database tool citations, Formats responses with query results.
class DatabaseQueryAgent:
"""
Agent responsible for interacting with the SQLite database.
"""
def __init__(self, db_path: str, mistral_client: Mistral):
self.db_path = db_path
self.tools = tools
self.names_to_functions = {
'query_compliance': functools.partial(self.query_compliance),
'query_maintenance': functools.partial(self.query_maintenance),
'query_technical_specs': functools.partial(self.query_technical_specs),
'query_parts_inventory_compatibility': functools.partial(self.query_parts_inventory_compatibility)
}
self.mistral_client = mistral_client
def query_compliance(self, filters: Dict[str, str]) -> str:
"""
Query compliance table with filters.
Args:
filters (Dict[str, str]): Dictionary of column names and values to filter by.
Returns:
str: The query result in JSON format.
"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
where_conditions = []
params = []
for column, value in filters.items():
where_conditions.append(f"{column} = ?")
params.append(value)
where_clause = " AND ".join(where_conditions)
query = f"SELECT * FROM compliance WHERE {where_clause}"
cursor.execute(query, params)
columns = [description[0] for description in cursor.description]
result = cursor.fetchone()
if result:
record = dict(zip(columns, result))
return json.dumps({'result': record})
return json.dumps({'error': 'No matching records found'})
except Exception as e:
return json.dumps({'error': str(e)})
finally:
conn.close()
def query_maintenance(self, filters: Dict[str, str]) -> str:
"""
Query maintenance table with filters.
Args:
filters (Dict[str, str]): Dictionary of column names and values to filter by.
Returns:
str: The query result in JSON format.
"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
where_conditions = []
params = []
for column, value in filters.items():
where_conditions.append(f"{column} = ?")
params.append(value)
where_clause = " AND ".join(where_conditions)
query = f"SELECT * FROM maintenance WHERE {where_clause}"
cursor.execute(query, params)
columns = [description[0] for description in cursor.description]
result = cursor.fetchone()
if result:
record = dict(zip(columns, result))
return json.dumps({'result': record})
return json.dumps({'error': 'No matching records found'})
except Exception as e:
return json.dumps({'error': str(e)})
finally:
conn.close()
def query_technical_specs(self, filters: Dict[str, str]) -> str:
"""
Query technical specifications table with filters.
Args:
filters (Dict[str, str]): Dictionary of column names and values to filter by.
Returns:
str: The query result in JSON format.
"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
where_conditions = []
params = []
for column, value in filters.items():
where_conditions.append(f"{column} = ?")
params.append(value)
where_clause = " AND ".join(where_conditions)
query = f"SELECT * FROM technical_specifications WHERE {where_clause}"
cursor.execute(query, params)
columns = [description[0] for description in cursor.description]
result = cursor.fetchone()
if result:
record = dict(zip(columns, result))
return json.dumps({'result': record})
return json.dumps({'error': 'No matching records found'})
except Exception as e:
return json.dumps({'error': str(e)})
finally:
conn.close()
def query_parts_inventory_compatibility(self, filters: Dict[str, str]) -> str:
"""
Query parts inventory and compatibility table with filters.
Args:
filters (Dict[str, str]): Dictionary of column names and values to filter by.
Returns:
str: The query result in JSON format.
"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
where_conditions = []
params = []
for column, value in filters.items():
where_conditions.append(f"{column} = ?")
params.append(value)
where_clause = " AND ".join(where_conditions)
query = f"SELECT * FROM parts_inventory_compatibility WHERE {where_clause}"
cursor.execute(query, params)
columns = [description[0] for description in cursor.description]
result = cursor.fetchone()
if result:
record = dict(zip(columns, result))
return json.dumps({'result': record})
return json.dumps({'error': 'No matching records found'})
except Exception as e:
return json.dumps({'error': str(e)})
finally:
conn.close()
def query(self, query_text: str) -> str:
"""
Process a natural language query using the database tools.
Args:
query_text (str): Natural language query
Returns:
str: Response from the database query
"""
messages = [{"role": "user", "content": query_text}]
# Get initial response with potential tool calls
response = self.mistral_client.chat.complete(
model=model,
messages=messages,
tools=self.tools,
tool_choice="any",
)
messages.append(response.choices[0].message)
citations = set()
# Handle any tool calls
if hasattr(response.choices[0].message, 'tool_calls') and response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
function_name = tool_call.function.name
print(f"Tool call: {function_name}")
citations.add(function_name)
function_params = json.loads(tool_call.function.arguments)
print(f"Tool call parameters: {function_params}")
function_result = self.names_to_functions[function_name](**function_params)
messages.append({
"role": "tool",
"name": function_name,
"content": function_result,
"tool_call_id": tool_call.id
})
# Get final response
final_response = self.mistral_client.chat.complete(
model="mistral-small-latest",
messages=messages
)
return final_response.choices[0].message.content, ",".join(list((citations)))
WorkflowOrchestrator
The WorkflowOrchestrator
class orchestrates the interaction between RAGAgent and DatabaseQueryAgent to provide comprehensive responses by combining information from both structured and unstructured data sources.
- Workflow orchestration and coordination
- Response combination and integration
- Final response summarization
- Source citation management
Main Components
1. Workflow Execution
workflow
: Manages the complete query processing pipeline, Coordinates responses from both agents, Generates final unified response, Maintains traceability through citations.
2. Response summarization
combine_and_summarize_responses
: Merges and summarizes responses from both agents, Applies structured formatting to combined responses, Uses summarization prompts for coherent output.
class WorkflowOrchestrator:
"""
WorkflowOrchestrator is responsible for orchestrating the workflow between RAGSearchAgent and DatabaseQueryAgent.
Handles query processing, response combination, and final summarization.
"""
def __init__(self,
rag_agent: RAGAgent,
db_query_agent: DatabaseQueryAgent,
client: Mistral):
"""
Initialize WorkflowOrchestrator with necessary components.
Args:
rag_agent: RAGSearchAgent for document retrieval and generation
db_query_agent: DatabaseQueryAgent for structured data queries
client: Mistral client for text generation
"""
self.rag_agent = rag_agent
self.db_query_agent = db_query_agent
self.client = client
def combine_and_summarize_responses(self,
responses: Dict[str, str],
query: str,
summarization_prompt: str = summarization_prompt) -> str:
"""
Combine and summarize multiple responses into a coherent final response.
Args:
responses: Dictionary of response types and their content
query: Original user query
summarization_prompt: Template for summarization
Returns:
str: Summarized and combined response
"""
# Format responses into a structured text
combined_text = "\n\n".join([
f"{source}: {content}"
for source, content in responses.items()
])
# Generate summarized response
chat_response = self.client.chat.complete(
model=model,
messages=[
{
"role": "system",
"content": summarization_prompt
},
{
"role": "user",
"content": f"Query: {query}\n\nResponses:\n{combined_text}"
},
],
temperature=0
)
return chat_response.choices[0].message.content
def workflow(self, query: str) -> str:
"""
Execute the workflow for processing a query.
Args:
query: User query
Returns:
str: Final response with citations
"""
# Get responses from both agents
db_response, tools_citations = self.db_query_agent.query(query)
rag_response, rag_citations = self.rag_agent.query(query)
# Combine responses into a dictionary
responses = {
"Database Response": db_response,
"RAG Response": rag_response
}
# Generate final summarized response
final_response = self.combine_and_summarize_responses(
responses=responses,
query=query,
summarization_prompt=final_response_summarization_prompt
)
# Add citations
citations = (
f"\n\nSources:\n"
f"- Database Tools: {((tools_citations))}\n"
f"- PDF Sources: {rag_citations}"
)
return final_response + citations
Initialize and Process Documents
Initializes the DataProcessor and processes PDF documents through the complete pipeline - from file ingestion to embedding storage.
# Initialize the processor
doc_processor = DataProcessor(mistral_client, qdrant_client)
# Process documents
file_list = doc_processor.get_categorized_filepaths(root_dir='./pdf_data')
processed_docs = doc_processor.process_documents(file_list)
doc_processor.process_and_store_embeddings(processed_docs)
Insert Data into Database tables.
Loads multiple CSV files into their corresponding database tables in SQLite.
# Insert data into tables
db_path = "./database.db"
file_mappings = {
"compliance": "./csv_data/compliance_db.csv",
"maintenance": "./csv_data/maintenance_db.csv",
"technical_specifications": "./csv_data/technical_specifications_db.csv",
"parts_inventory_compatibility": "./csv_data/parts_inventory_compatibility_db.csv"
}
doc_processor.insert_data_database(db_path, file_mappings)
Initialise the Agents
Initializes the three core agents:
- RAGAgent for document search and response
- DatabaseQueryAgent for structured data querying.
- WorkflowAgent for orchestrating responses.
rag_agent = RAGAgent(mistral_client, qdrant_client)
db_query_agent = DatabaseQueryAgent(db_path, mistral_client)
workflow_orchestrator = WorkflowOrchestrator(rag_agent, db_query_agent, mistral_client)
Example Queries
query = "What are the troubleshooting steps for inaccurate machining in CNC Machine (Model X) and when was its last maintenance performed?"
print(f"Query: {query}")
print("----------------------")
answer = workflow_orchestrator.workflow(query)
print("------------Answer----------")
print(answer)
query = "What are the safety protocols for the Cooling System (Model Y), and when is its next scheduled maintenance?"
print(f"Query: {query}")
print("----------------------")
answer = workflow_orchestrator.workflow(query)
print("------------Answer----------")
print(answer)