This cookbook demonstrates the power of Magistral reasoning model combined with HubSpot CRM integration to create an intelligent, multi-agent system that can understand complex business queries and execute sophisticated CRM operations automatically.
The system transforms natural language business questions into actionable insights and automated CRM updates, showcasing how advanced AI reasoning can streamline sales operations and strategic decision-making.
Problem Statement
Traditional CRM Challenges
Modern sales and marketing teams face several critical challenges when working with CRM systems like HubSpot:
Manual Data Analysis: Teams spend hours manually analyzing deals, contacts, and companies to extract insights Complex Query Processing: Business stakeholders struggle to get answers to multi-faceted questions that require data from multiple CRM objects Strategic Planning: Market analysis and expansion planning requires combining CRM data with business intelligence in ways that aren't natively supported
Sample Query
"Assign priorities to all deals based on deal value"
These queries require:
- Understanding business context
- Analyzing multiple data sources
- Applying business logic
- Generating actionable recommendations
- Sometimes updating CRM records automatically
Solution Architecture
Core Innovation: Magistral Reasoning + HubSpot Integration + Multi-Agent Orchestration
Our solution combines Mistral's Magistral reasoning model with HubSpot's comprehensive CRM API through a sophisticated multi-agent system that can:
- Understand complex business queries using Magistral's advanced reasoning capabilities
- Plan multi-step execution strategies with dynamically created specialized agents
- Execute both data analysis and CRM updates through coordinated agent workflows
- Synthesize results into actionable business insights with strategic recommendations
AgentOrchestrator
Master coordinator that manages the entire multi-agent workflow and HubSpot integration. Orchestrates the complete flow from query analysis through sub-agent execution to final synthesis, while managing agent lifecycle and data connectivity.
LeadAgent
Powered by Magistral reasoning model with <think>
pattern processing, the Lead Agent performs sophisticated query analysis to understand business intent, determine data requirements, and create detailed execution plans specifying which sub-agents to create dynamically.
Dynamic Sub-Agents
Sub-agents are created on-the-fly based on specific query requirements - not pre-defined templates. Each agent is dynamically generated with specialized roles (e.g., priority_calculator, market_analyzer, deals_updater), specific tasks, and targeted data access patterns using Mistral Small for fast execution.
HubSpot API Connector
Dedicated connector providing comprehensive access to CRM data and operations:
- Property Discovery: Automatically maps all available HubSpot fields and valid values
- Data Fetching: Retrieves deals, contacts, and companies with full property sets
- Batch Updates: Efficiently updates multiple records in batches of 100
SynthesisAgent
Final orchestrator that combines all sub-agent results into coherent, actionable business insights using Mistral Small. Transforms technical agent outputs into user-friendly responses with strategic recommendations and next steps.
Installation
We need hubspot-api-client
and mistralai
packages for the demonstration.
!pip install hubspot-api-client=="12.0.0" mistralai=="1.9.3"
Imports
import requests
import json
from mistralai import Mistral, ThinkChunk, TextChunk
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import re
Setup API Keys
HUBSPOT_API_KEY = "<YOUR HUBSPOT API KEY>" # Replace with your HubSpot API key
MISTRAL_API_KEY = "<YOUR MISTRAL API KEY>" # Get it from https://console.mistral.ai/api-keys
Setup MistralAI Client
mistral_client = Mistral(api_key=MISTRAL_API_KEY)
HubSpot API connector
-
get_data
: Fetches CRM data from HubSpot API, Retrieves deals, contacts, and companies data for the analysis. -
batch_update
: Performs batch updates to HubSpot records, the updates and writes them back to HubSpot in efficient batches of 100 records. -
get_properties
: Automatically fetches and formats all HubSpot deal, contact, and company properties, including valid values and dropdown options, so agents can update data reliably without errors.
class HubSpotConnector:
"""Handles all HubSpot API operations"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.hubapi.com/crm/v3/objects"
self.headers = {"Authorization": f"Bearer {api_key}"}
def get_properties(self) -> Dict:
"""Load all HubSpot properties"""
print("📡 HubSpotConnector: Loading properties...")
properties = {}
for obj_type in ['deals', 'contacts', 'companies']:
url = f"https://api.hubapi.com/crm/v3/properties/{obj_type}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
props = response.json()['results']
prop_list = []
for prop in sorted(props, key=lambda x: x['name']):
prop_str = f"'{prop['name']}' - {prop['label']}"
if 'options' in prop and prop['options']:
valid_values = [opt['value'] for opt in prop['options']]
prop_str += f" | Valid values: {valid_values}"
prop_list.append(prop_str)
properties[obj_type] = prop_list
print(f"✅ HubSpotConnector: Loaded properties for {len(properties)} object types")
return properties
def get_data(self, object_type: str) -> List[Dict]:
"""Fetch data from HubSpot"""
print(f"📡 HubSpotConnector: Fetching {object_type} data...")
url = f"{self.base_url}/{object_type}"
params = {"limit": 100}
all_data = []
while url:
response = requests.get(url, headers=self.headers, params=params)
if response.status_code != 200:
raise Exception(f"HubSpot API error: {response.text}")
data = response.json()
all_data.extend(data.get("results", []))
url = data.get("paging", {}).get("next", {}).get("link")
params = {}
print(f"✅ HubSpotConnector: Loaded {len(all_data)} {object_type}")
return all_data
def batch_update(self, updates: Dict) -> None:
"""Perform batch updates to HubSpot"""
for object_type, update_list in updates.items():
if not update_list:
continue
print(f"📡 HubSpotConnector: Updating {len(update_list)} {object_type}...")
url = f"{self.base_url}/{object_type}/batch/update"
headers = {**self.headers, "Content-Type": "application/json"}
# Process in batches of 100
for i in range(0, len(update_list), 100):
batch = update_list[i:i+100]
payload = {"inputs": batch}
response = requests.post(url, headers=headers, json=payload)
if response.status_code not in [200, 202]:
raise Exception(f"HubSpot update error: {response.text}")
print(f"✅ HubSpotConnector: {object_type} updates completed")
Magistral (reasoning) and Mistral small LLM functions
-
magistral_reasoning
: Uses Magistral reasoning model for complex query analysis and execution planning with thinking process. -
mistral_small_execution
: Uses Mistral Small model for sub-agent task execution.
def magistral_reasoning(prompt: str) -> Dict[str, str]:
"""Use reasoning model for query analysis and planning"""
response = mistral_client.chat.complete(
model="magistral-medium-latest",
messages=[{"role": "user", "content": prompt}]
)
content = response.choices[0].message.content
reasoning = ""
conclusion = ""
for r in content:
if isinstance(r, ThinkChunk):
reasoning = r.thinking[0].text
elif isinstance(r, TextChunk):
conclusion = r.text
return {
"reasoning": reasoning,
"conclusion": conclusion
}
def mistral_small_execution(prompt: str) -> str:
"""Use Mistral Small for content generation"""
response = mistral_client.chat.complete(
model="mistral-small-latest",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
LeadAgent
Powered by Magistral reasoning model for sophisticated query analysis and execution planning
analyze_query
: Uses Magistral's<think>
pattern to understand business intent, determine data requirements, and create detailed execution plans with dynamic sub-agent specifications- Determines whether queries require read-only analysis or write-back operations to HubSpot
class LeadAgent:
"""Lead Agent powered by Magistral reasoning model for query analysis and planning"""
def __init__(self, hubspot_properties):
self.hubspot_properties = hubspot_properties
self.name = "LeadAgent"
def analyze_query(self, query: str) -> Dict:
"""Analyze query using Magistral reasoning and create execution plan"""
print(f"🧠 {self.name}: Analyzing query with Magistral reasoning...")
analysis_prompt = f"""
Analyze this HubSpot query and create a detailed execution plan based on different hubspot properties provided by following the shared rules:
HUBSPOT_PROPERTIES: {self.hubspot_properties}
QUERY: {query}
RULES:
1. What is the user asking for?
2. Is this a read-only query or does it require HubSpot updates?
3. What sub-agents are needed to accomplish this?
4. What HubSpot data is required?
5. What's the execution sequence?
6. What should be the final output format?
7. Query can also be combination of read-only and write-back.
8. Query is read-only if it requires data read from HubSpot.
9. Query is write-back if it requires an update to existing values or writing/ assigning new values.
10. In the final conclusion just give only one JSON string nothing else. I don't need any explanation.
Provide a JSON execution plan with:
{{
"sub_agents": [
{{
"name": "agent_name",
"task": "specific task description",
"task_type": "read_only" or "write_back",
"input_data": ["deals", "contacts", "companies"],
"output_format": "expected output"
}}
]
}}
"""
# Use existing magistral_reasoning function
analysis = magistral_reasoning(analysis_prompt)
try:
# Extract JSON execution plan from conclusion
json_match = re.search(r'\{.*\}', analysis["conclusion"], re.DOTALL)
if json_match:
execution_plan = json.loads(json_match.group(0))
else:
raise ValueError("No JSON found in analysis")
except Exception as e:
print(f"⚠️ {self.name}: JSON parsing failed, using fallback plan")
execution_plan = {
"sub_agents": [{
"name": "general_analyzer",
"task": query,
"task_type": "read_only",
"input_data": ["deals", "contacts", "companies"],
"output_format": "summary"
}]
}
print(f"✅ {self.name}: Plan created - {len(execution_plan['sub_agents'])} sub-agents needed")
return {
"reasoning": analysis["reasoning"],
"execution_plan": execution_plan,
"conclusion": analysis["conclusion"]
}
SubAgent
Dynamic agents created on-the-fly based on query complexity and requirements
execute
: Uses Mistral Small for fast task execution including data analysis, business logic application, and CRM updates- Specialized roles generated automatically (e.g., priority_calculator, market_analyzer, deals_updater)
- Handles both read-only operations and write-back operations with proper HubSpot property validation
class SubAgent:
"""Dynamic Sub-Agent created on-the-fly for specific tasks"""
def __init__(self, name: str, task: str, task_type: str, input_data: List[str],
output_format: str):
self.name = name
self.task = task
self.task_type = task_type
self.input_data = input_data
self.output_format = output_format
def execute(self, data: Dict, properties_context: str, hubspot_updater=None) -> Dict:
"""Execute the assigned task"""
print(f"🤖 {self.name} ({self.task_type}): Executing task...")
if self.task_type == 'read_only':
agent_prompt = f"""
You are a {self.name} agent.
TASK: {self.task}
AVAILABLE HUBSPOT PROPERTIES:
{properties_context}
DATA AVAILABLE:
{json.dumps(data, indent=2)}
OUTPUT FORMAT: {self.output_format}
Provide your analysis only based on the available data.
"""
else: # write_back
agent_prompt = f"""
You are a {self.name} agent.
TASK: {self.task}
AVAILABLE HUBSPOT PROPERTIES:
{properties_context}
DATA AVAILABLE:
{json.dumps(data, indent=2)}
CRITICAL: Use exact HubSpot property names from the list above in your JSON output.
OUTPUT FORMAT: JSON format with the properties to be written to HubSpot
Provide updates using exact HubSpot property names.
"""
# Use existing mistral_small_execution function
result = mistral_small_execution(agent_prompt)
# Handle write-back operations
if self.task_type == 'write_back' and hubspot_updater:
try:
json_match = re.search(r'\{.*\}', result, re.DOTALL)
if json_match:
updates = json.loads(json_match.group(0))
hubspot_updater.batch_update(updates)
print(f"✅ {self.name}: Successfully updated HubSpot records")
except Exception as e:
print(f"❌ {self.name}: Update failed - {str(e)}")
return {"status": "error", "error": str(e), "raw_result": result}
print(f"✅ {self.name}: Task completed successfully")
return {"status": "success", "result": result}
SynthesisAgent
Final orchestrator that combines all sub-agent results into coherent business insights
synthesize
: Uses Mistral Small to create user-friendly responses with actionable recommendations and next steps- Transforms technical agent outputs into executive-ready summaries and strategic guidance
class SynthesisAgent:
"""Final agent to synthesize all results into user-friendly response"""
def __init__(self):
self.name = "SynthesisAgent"
def synthesize(self, query: str, sub_agent_results: List[Dict], execution_plan: Dict) -> str:
"""Combine all sub-agent results into final answer"""
print(f"🔄 {self.name}: Synthesizing results from {len(sub_agent_results)} agents...")
# Prepare context from all sub-agent results
results_context = ""
for result in sub_agent_results:
results_context += f"\n{result['agent'].upper()} ({result['task_type']}):\n"
if result['result']['status'] == 'success':
results_context += f"{result['result']['result']}\n"
else:
results_context += f"Error: {result['result'].get('error', 'Unknown error')}\n"
results_context += "---\n"
synthesis_prompt = f"""
You are a final synthesizer agent. Create a comprehensive, user-friendly response based on all sub-agent results.
ORIGINAL QUERY: {query}
SUB-AGENT RESULTS:
{results_context}
TASK: Synthesize all the above results into a clear, actionable response for the user.
Guidelines:
1. Start with a direct answer to the user's query
2. Include key insights and findings
3. If updates were made, summarize what was changed
4. Provide actionable next steps if relevant
5. Keep it concise but comprehensive
6. Use a professional but friendly tone
Provide the final synthesized response:
"""
# Use existing mistral_small_execution function
final_answer = mistral_small_execution(synthesis_prompt)
print(f"✅ {self.name}: Final answer synthesized")
return final_answer
AgentOrchestrator
Master coordinator that manages the entire multi-agent workflow and HubSpot integration
process_query
: Orchestrates the complete flow from query analysis through sub-agent execution to final synthesis- Manages agent lifecycle, data flow between agents, and HubSpot connectivity
- Provides rich logging and monitoring of the multi-agent process
class AgentOrchestrator:
"""Main orchestrator that coordinates all agents"""
def __init__(self, hubspot_api_key: str, mistral_api_key: str):
# Initialize global mistral client for existing functions
global mistral_client
mistral_client = Mistral(api_key=mistral_api_key)
# Initialize HubSpot connector
self.hubspot_connector = HubSpotConnector(hubspot_api_key)
# Load HubSpot data and properties
self.hubspot_properties = self.hubspot_connector.get_properties()
self.hubspot_data = {
"deals": self.hubspot_connector.get_data("deals"),
"contacts": self.hubspot_connector.get_data("contacts"),
"companies": self.hubspot_connector.get_data("companies")
}
# Initialize agents
self.lead_agent = LeadAgent(self.hubspot_properties)
self.synthesis_agent = SynthesisAgent()
self.active_sub_agents = []
print(f"🚀 AgentOrchestrator: System initialized with {sum(len(data) for data in self.hubspot_data.values())} HubSpot records")
def process_query(self, query: str) -> Dict:
"""Main method to process user queries through multi-agent workflow"""
print(f"\n🎯 Processing Query: {query[:100]}...")
print("=" * 80)
# Step 1: Lead Agent analyzes query using Magistral reasoning
analysis = self.lead_agent.analyze_query(query)
execution_plan = analysis["execution_plan"]
# Step 2: Create and execute sub-agents dynamically
sub_agent_results = []
self.active_sub_agents = []
for agent_config in execution_plan["sub_agents"]:
# Create sub-agent dynamically
sub_agent = SubAgent(
name=agent_config["name"],
task=agent_config["task"],
task_type=agent_config["task_type"],
input_data=agent_config["input_data"],
output_format=agent_config["output_format"]
)
self.active_sub_agents.append(sub_agent)
# Prepare data and context for this sub-agent
agent_data = {data_type: self.hubspot_data.get(data_type, [])
for data_type in agent_config["input_data"]}
# Build properties context
properties_context = ""
for data_type in agent_config["input_data"]:
if data_type in self.hubspot_properties:
properties_context += f"\n{data_type.upper()} PROPERTIES:\n"
properties_context += "\n".join(self.hubspot_properties[data_type])
properties_context += "\n"
# Execute sub-agent using mistral_small_execution
result = sub_agent.execute(agent_data, properties_context, self.hubspot_connector)
sub_agent_results.append({
"agent": sub_agent.name,
"task_type": sub_agent.task_type,
"result": result
})
# Step 3: Synthesis Agent creates final answer using mistral_small_execution
final_answer = self.synthesis_agent.synthesize(query, sub_agent_results, execution_plan)
print("=" * 80)
print("✨ Query processing completed!")
return {
"query": query,
"reasoning": analysis["reasoning"],
"execution_plan": execution_plan,
"sub_agent_results": sub_agent_results,
"active_agents": [agent.name for agent in self.active_sub_agents],
"final_answer": final_answer
}
Initialize the multi-agent system
orchestrator = AgentOrchestrator(
hubspot_api_key=HUBSPOT_API_KEY,
mistral_api_key=MISTRAL_API_KEY
)
Test Queries
Query-1
query = "Assign priorities to all deals based on deal value."
result = orchestrator.process_query(query)
HubSpot status before updation
HubSpot status after updation
Dynamically Created Agents
agents = '\n'.join([f"{i + 1}. {agent}" for i, agent in enumerate(result['active_agents'])])
display(Markdown(agents))
Answer
from IPython.display import display, Markdown, Latex
display(Markdown(result['final_answer']))
Query-2
query = """We're considering expanding into three new industry verticals and need comprehensive market
intelligence to inform our go-to-market strategy. Analyze our current customer base to identify
patterns in successful account profiles, understand the characteristics that predict customer
success, and use these insights to evaluate market opportunities. The analysis should identify
which industries show the strongest fit with our solution, what use cases resonate most effectively,
and what competitive landscape we would face. Develop ideal customer profiles for each target
market, estimate market size and penetration potential, and create a prioritized market entry
strategy with resource requirements and timeline projections for successful market penetration."""
result = orchestrator.process_query(query)
Dynamically Created Agents
agents = '\n'.join([f"{i + 1}. {agent}" for i, agent in enumerate(result['active_agents'])])
display(Markdown(agents))
Answer
from IPython.display import display, Markdown, Latex
display(Markdown(result['final_answer']))