import os
import json
import logging
import ast
import re
from typing import Union, Generator, Iterator, List, Dict, Tuple
import requests
from pydantic import BaseModel, Field
# Optional SDKs (loaded only when needed)
try:
from openai import OpenAI, APIError, RateLimitError
except Exception:
OpenAI = None
APIError = Exception # fallback type
RateLimitError = Exception # fallback type
try:
import google.generativeai as genai
except Exception:
genai = None
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Pipe:
class Valves(BaseModel):
# API Configuration
openai_api_key: str = Field(
default="", description="OpenAI API Key for GPT models"
)
google_api_key: str = Field(
default="", description="Google API Key for Gemini models"
)
xai_api_key: str = Field(default="", description="xAI API Key for Grok models")
perplexity_api_key: str = Field(
default="", description="Perplexity API Key for Sonar models"
)
# Classifier Configuration
classifier_model: str = Field(
default="models/gemini-2.5-flash",
description="Model used for task classification (e.g., models/gemini-2.5-flash)",
)
# Model Management
allowed_models: str = Field(
default="chatgpt-4o-latest,models/gemini-2.5-pro,gpt-3.5-turbo,gpt-4,gpt-4.1,gpt-4o,gpt-4o-mini,gpt-5,gpt-5-mini,gpt-5-nano,grok-3,grok-3-fast,grok-4,models/gemini-2.5-flash,models/gemini-2.5-pro,sonar,sonar-deep-research,sonar-pro,sonar-reasoning,sonar-reasoning-pro",
description="Comma-separated list of allowed models",
)
disabled_models: str = Field(
default="", description="Comma-separated list of disabled models"
)
# Display Options
show_selection_info: bool = Field(
default=True, description="Show model selection information"
)
execute_with_selected_model: bool = Field(
default=True, description="Actually execute with the selected model"
)
show_reasoning_details: bool = Field(
default=True, description="Show detailed reasoning for model selection"
)
def __init__(self):
self.valves = self.Valves()
self._setup_available_models()
self._setup_category_mapping()
def _get_api_keys(self) -> Dict[str, str]:
logger.info("Fetching API keys")
api_keys = {
"openai": (
self.valves.openai_api_key or os.getenv("OPENAI_API_KEY") or ""
).strip(),
"google": (
self.valves.google_api_key or os.getenv("GOOGLE_API_KEY") or ""
).strip(),
"xai": (self.valves.xai_api_key or os.getenv("XAI_API_KEY") or "").strip(),
"perplexity": (
self.valves.perplexity_api_key or os.getenv("PERPLEXITY_API_KEY") or ""
).strip(),
}
logger.info(
f"API keys retrieved: {{ {', '.join(f'{k}: ' + ('set' if v else 'unset') for k, v in api_keys.items())} }}"
)
return api_keys
def _get_allowed_models(self) -> Dict[str, Dict]:
logger.info("Fetching allowed models")
api_keys = self._get_api_keys()
allowed_list = [
m.strip() for m in self.valves.allowed_models.split(",") if m.strip()
]
disabled_list = [
m.strip() for m in self.valves.disabled_models.split(",") if m.strip()
]
allowed_models = {}
for model_id in allowed_list:
if model_id in self.all_models and model_id not in disabled_list:
model_type = self.all_models[model_id].get("type")
if (
model_type in api_keys
and api_keys[model_type]
or model_type not in api_keys
):
allowed_models[model_id] = self.all_models[model_id]
logger.info(f"Allowed models: {list(allowed_models.keys())}")
return allowed_models
def _setup_available_models(self):
"""Define available models and their specialties"""
self.all_models = {
# OpenAI Models
"chatgpt-4o-latest": {
"type": "openai",
"specialty": [
"multimodal",
"vision",
"reasoning",
"coding",
"advanced",
"creative_writing",
"question_answering",
],
"description": "Latest ChatGPT-4o model with enhanced capabilities",
"cost_tier": "high",
"context_size": "medium",
},
"gpt-3.5-turbo": {
"type": "openai",
"specialty": [
"general",
"fast_response",
"cost_effective",
"question_answering",
],
"description": "Efficient GPT-3.5 Turbo for general-purpose tasks",
"cost_tier": "low",
"context_size": "low",
},
"gpt-4": {
"type": "openai",
"specialty": ["reasoning", "coding", "analysis"],
"description": "GPT-4 model with strong reasoning",
"cost_tier": "high",
"context_size": "medium",
},
"gpt-4.1": {
"type": "openai",
"specialty": ["reasoning", "coding", "analysis", "improved"],
"description": "Improved GPT-4 model",
"cost_tier": "high",
"context_size": "medium",
},
"gpt-4o": {
"type": "openai",
"specialty": [
"multimodal",
"vision",
"reasoning",
"coding",
"creative_writing",
],
"description": "Advanced multimodal GPT-4o",
"cost_tier": "high",
"context_size": "medium",
},
"gpt-4o-mini": {
"type": "openai",
"specialty": [
"general",
"fast_response",
"cost_effective",
"question_answering",
],
"description": "Efficient GPT-4o variant for general use",
"cost_tier": "medium",
"context_size": "low",
},
# Note: These gpt-5 identifiers are included for routing demonstrative purposes
"gpt-5": {
"type": "openai",
"specialty": [
"multimodal",
"reasoning",
"coding",
"advanced",
"creative_writing",
"question_answering",
"agentic",
],
"description": "Next-gen GPT model",
"cost_tier": "premium",
"context_size": "high",
},
"gpt-5-mini": {
"type": "openai",
"specialty": [
"general",
"fast_response",
"cost_effective",
"question_answering",
"reasoning",
],
"description": "Efficient GPT-5 variant",
"cost_tier": "medium",
"context_size": "medium",
},
"gpt-5-nano": {
"type": "openai",
"specialty": [
"general",
"fast_response",
"highly_cost_effective",
"question_answering",
],
"description": "Ultra-efficient GPT-5 variant",
"cost_tier": "low",
"context_size": "low",
},
# xAI Grok Models
"grok-3": {
"type": "xai",
"specialty": [
"real_time",
"current_events",
"reasoning",
"advanced",
"data_analysis",
"technical_analysis",
],
"description": "Grok-3 with enhanced reasoning",
"cost_tier": "high",
"context_size": "medium",
},
"grok-3-fast": {
"type": "xai",
"specialty": ["real_time", "fast_response", "current_events"],
"description": "Fast Grok-3 variant",
"cost_tier": "medium",
"context_size": "low",
},
"grok-4": {
"type": "xai",
"specialty": [
"real_time",
"current_events",
"reasoning",
"multimodal",
"data_analysis",
"technical_analysis",
"vision",
"tool_use",
],
"description": "Grok-4 model with tool use and vision",
"cost_tier": "premium",
"context_size": "high",
},
# Google Gemini Models
"models/gemini-2.5-flash": {
"type": "google",
"specialty": [
"fast_response",
"general",
"classification",
"question_answering",
"simple_queries",
"multimodal",
"low_latency",
],
"description": "Gemini 2.5 Flash for fast, low-cost tasks",
"cost_tier": "low",
"context_size": "low",
},
"models/gemini-2.5-pro": {
"type": "google",
"specialty": [
"long_context",
"multimodal",
"analysis",
"reasoning",
"data_analysis",
"creative_writing",
"question_answering",
"simple_queries",
"complex_problems",
"coding",
],
"description": "Gemini 2.5 Pro with long context",
"cost_tier": "high",
"context_size": "high",
},
# Perplexity Sonar Models
"sonar": {
"type": "perplexity",
"specialty": [
"search",
"current_info",
"research",
"question_answering",
"grounded_answers",
],
"description": "Standard Sonar with web search",
"cost_tier": "low",
"context_size": "low",
},
"sonar-deep-research": {
"type": "perplexity",
"specialty": [
"search",
"current_info",
"research",
"deep_analysis",
"technical_analysis",
],
"description": "Sonar deep research",
"cost_tier": "medium",
"context_size": "medium",
},
"sonar-pro": {
"type": "perplexity",
"specialty": [
"search",
"current_info",
"research",
"analysis",
"question_answering",
],
"description": "Sonar Pro",
"cost_tier": "medium",
"context_size": "medium",
},
"sonar-reasoning": {
"type": "perplexity",
"specialty": ["search", "reasoning", "research", "question_answering"],
"description": "Sonar with enhanced reasoning",
"cost_tier": "medium",
"context_size": "medium",
},
"sonar-reasoning-pro": {
"type": "perplexity",
"specialty": [
"search",
"reasoning",
"research",
"advanced",
"technical_analysis",
],
"description": "Sonar advanced reasoning",
"cost_tier": "high",
"context_size": "high",
},
}
def _setup_category_mapping(self):
"""Define mapping from categories to primary models and variants"""
self.category_to_model = {
"emotion": "gpt-5",
"news": "sonar-pro",
"research": "gpt-5",
"chat": "grok-4",
"profanity": "grok-4",
"coding": "gpt-5",
"knowledge": "sonar",
"math": "grok-4",
"creative": "gpt-5",
"multimodal": "models/gemini-2.5-pro",
"business": "gpt-5",
"entertainment": "grok-4",
"health": "gpt-5",
"technical": "models/gemini-2.5-pro",
"politics": "sonar",
"travel": "gpt-5",
"sports": "sonar",
"gaming": "grok-4",
"food": "models/gemini-2.5-pro",
"language": "models/gemini-2.5-pro",
"legal": "gpt-5",
"education": "grok-4",
"environment": "sonar",
"history": "models/gemini-2.5-pro",
"science": "grok-4",
"fiction": "gpt-5",
"spirituality": "gpt-5",
"shopping": "sonar",
"news_analysis": "sonar-reasoning-pro",
"coding_debugging": "gpt-5",
"ai_ethics": "gpt-5",
"general": "models/gemini-2.5-pro",
# Expanded business/tech ops categories
"finance": "gpt-5",
"security": "grok-4",
"devops": "gpt-5",
"data": "models/gemini-2.5-pro",
"sales": "gpt-5",
"marketing": "gpt-5",
"product": "gpt-5",
"operations": "models/gemini-2.5-pro",
"hr": "models/gemini-2.5-pro",
# Catch-alls
"other": "models/gemini-2.5-pro",
"unknown": "models/gemini-2.5-pro",
}
self.small_variants = {
"gpt-5": "gpt-5-nano",
"gpt-4o": "gpt-4o-mini",
"models/gemini-2.5-pro": "models/gemini-2.5-flash",
"grok-4": "grok-3-fast",
"sonar-pro": "sonar",
"sonar-reasoning-pro": "sonar",
}
self.pro_variants = {
"gpt-5": "gpt-5",
"gpt-4o": "gpt-4o",
"models/gemini-2.5-pro": "models/gemini-2.5-pro",
"grok-4": "grok-4",
"sonar": "sonar-reasoning-pro",
}
def pipe(self, body: dict) -> Union[str, Generator, Iterator]:
logger.info(f"Processing query: {body.get('messages', [])}")
messages = body.get("messages", [])
if not messages:
return "❌ No messages provided in the request."
# Get the latest user message
user_message = ""
for msg in reversed(messages):
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
if not user_message:
return "❌ No user message found in the request."
# Step 1: Select the best model
selected_model, reasoning, response_style = self._classify_and_select_model(
user_message, messages
)
# Step 2: Execute with selected model or failover
if self.valves.execute_with_selected_model:
model_response, used_model, used_reasoning = self._execute_with_model(
selected_model, reasoning, user_message, messages, response_style
)
else:
model_response = f"This task would be processed by {selected_model}.\n\nYour query: {user_message}"
used_model = selected_model
used_reasoning = reasoning
# Step 3: Build response header (avoid duplicate banner if previous assistant already included it)
response = ""
if self.valves.show_selection_info:
last_assistant = next(
(m for m in reversed(messages) if m.get("role") == "assistant"), None
)
already_shown = bool(
last_assistant
and isinstance(last_assistant.get("content"), str)
and "Smart Dispatcher Active" in last_assistant["content"]
)
if not already_shown:
response += "🎯 Smart Dispatcher Active\n"
response += f"• Selected Model: {used_model}\n"
if self.valves.show_reasoning_details:
response += f"• Reasoning: {used_reasoning}\n"
model_info = self.all_models.get(used_model, {})
response += f"• Specialties: {', '.join(model_info.get('specialty', []))}\n"
response += "\n---\n\n"
response += model_response
return response
def _classify_and_select_model(
self, user_message: str, messages: List[dict]
) -> Tuple[str, str, str]:
allowed_models = self._get_allowed_models()
if not allowed_models:
return (
"models/gemini-2.5-pro",
"No allowed models configured - using default",
"brief",
)
# Rule-based for simple greetings to avoid classifier call
user_lower = user_message.strip().lower()
if len(user_lower.split()) <= 2 and any(
g in user_lower for g in ["hi", "hello", "hey", "greetings", "yo", "sup"]
):
return (
"models/gemini-2.5-flash",
"Rule-based selection for simple greeting query (topic: chat, difficulty: easy, tokens: low)",
"brief",
)
# AI classifier to get topic, difficulty, tokens, response_style
classification, reasoning = self._get_ai_classification(user_message, messages)
# Ensure dict and safe keys
if not isinstance(classification, dict):
classification = {}
topic = str(classification.get("topic", "general")).strip().lower() or "general"
difficulty = (
str(classification.get("difficulty", "medium")).strip().lower() or "medium"
)
tokens = str(classification.get("tokens", "medium")).strip().lower() or "medium"
response_style = (
str(classification.get("response_style", "brief")).strip().lower()
or "brief"
)
# If topic unknown, coerce to general
if topic not in self.category_to_model:
topic = "general"
reasoning += " (Unknown topic coerced to general)"
# Get primary model from category mapping
base_model = self.category_to_model.get(topic, "models/gemini-2.5-pro")
# Adjust for difficulty and tokens
if difficulty == "easy" or tokens == "low":
selected_model = self.small_variants.get(base_model, base_model)
reasoning += " (Adjusted to smaller variant for easy/low-token query)"
elif difficulty == "hard" or tokens == "high":
selected_model = self.pro_variants.get(base_model, base_model)
reasoning += " (Adjusted to pro variant for hard/high-token query)"
else:
selected_model = base_model
# Ensure selected is allowed; fall back if needed
if selected_model not in allowed_models:
selected_model = (
base_model
if base_model in allowed_models
else list(allowed_models.keys())[0]
)
reasoning += " (Fallback to allowed model)"
return selected_model, reasoning, response_style
def _get_ai_classification(
self, user_message: str, messages: List[dict]
) -> Tuple[Dict[str, str], str]:
"""Use the classifier model to classify the query"""
api_keys = self._get_api_keys()
classifier_model_id = self.valves.classifier_model
classifier_conf = self.all_models.get(classifier_model_id, {})
classifier_type = classifier_conf.get("type", "google")
categories = (
"emotion, news, research, chat, profanity, coding, knowledge, math, creative, multimodal, "
"business, entertainment, health, technical, politics, travel, sports, gaming, food, language, "
"legal, education, environment, history, science, fiction, spirituality, shopping, news_analysis, "
"coding_debugging, ai_ethics, finance, security, devops, data, sales, marketing, product, "
"operations, hr, general"
)
prompt = f"""
Classify this query into one primary topic from this list:
{categories}
Assess difficulty: easy, medium, hard.
Estimate token needs: low (<10k), medium (10-100k), high (>100k).
Assess response style: brief (short and concise sufficient) or detailed (longer explanation required).
Output strict JSON with double quotes only: {{"topic": "topic_name", "difficulty": "level", "tokens": "level", "response_style": "brief_or_detailed"}}
Query: {user_message}
""".strip()
try:
if classifier_type == "google":
if genai is None:
raise RuntimeError("google.generativeai SDK not available")
api_key = api_keys["google"]
if not api_key:
raise RuntimeError(
"Google API key not configured for classification"
)
genai.configure(api_key=api_key)
model_name = classifier_model_id.replace("models/", "")
model = genai.GenerativeModel(model_name)
response = model.generate_content(prompt)
raw = (getattr(response, "text", "") or "").strip()
# Strip code fences if present
raw = raw.replace("```json", "").replace("```", "").strip()
# Try JSON first, then regex extraction, then literal_eval
classification = None
try:
classification = json.loads(raw)
except Exception:
m = re.search(r"\{.*\}", raw, re.DOTALL)
if m:
blob = m.group(0)
try:
classification = json.loads(blob)
except Exception:
try:
classification = ast.literal_eval(blob)
except Exception:
classification = None
if not isinstance(classification, dict):
classification = {}
# Normalize values
for k in ("topic", "difficulty", "tokens", "response_style"):
if k in classification and isinstance(classification[k], str):
classification[k] = (
classification[k].strip().strip('"').strip("'").lower()
)
# Safe defaults
classification.setdefault("topic", "general")
classification.setdefault("difficulty", "medium")
classification.setdefault("tokens", "medium")
classification.setdefault("response_style", "brief")
return classification, "AI classification using Gemini"
# Default fallback classification if classifier type unsupported
return {
"topic": "general",
"difficulty": "medium",
"tokens": "medium",
"response_style": "brief",
}, "Fallback classification"
except Exception as e:
logger.error(f"Classification error: {str(e)}")
return {
"topic": "general",
"difficulty": "medium",
"tokens": "medium",
"response_style": "brief",
}, f"Error in classification: {str(e)}"
def _pick_alternate_models(
self, primary: str, allowed_models: Dict[str, Dict]
) -> List[str]:
candidates: List[str] = []
primary_conf = self.all_models.get(primary, {})
primary_specs = set(primary_conf.get("specialty", []))
primary_type = primary_conf.get("type")
# Prefer same specialty different vendor
for m, conf in allowed_models.items():
if m == primary:
continue
if conf.get("type") != primary_type and (
primary_specs & set(conf.get("specialty", []))
):
candidates.append(m)
# Add general-purpose safety net in priority order, if allowed
safety_net = ["models/gemini-2.5-flash", "gpt-4o-mini", "sonar"]
candidates.extend(
[m for m in safety_net if m in allowed_models and m not in candidates]
)
return candidates
def _execute_with_model(
self,
selected_model: str,
reasoning: str,
user_message: str,
messages: List[dict],
response_style: str,
) -> Tuple[str, str, str]:
"""Execute with the selected model, with robust failover on errors only."""
logger.info(f"Attempting execution with model: {selected_model}")
allowed_models = self._get_allowed_models()
candidates = [selected_model] + self._pick_alternate_models(
selected_model, allowed_models
)
api_keys = self._get_api_keys()
model_response = ""
failover_msg = ""
used_model = selected_model
used_reasoning = reasoning
for candidate in candidates:
try:
out = self._execute_specific_model(
candidate, user_message, messages, api_keys, response_style
)
if self._bad_or_empty(out):
raise ValueError("Model produced empty or invalid content")
model_response = failover_msg + out
used_model = candidate
if failover_msg:
used_reasoning += f" (Failover to {candidate})"
break
except Exception as e:
logger.info(f"Failing over from {candidate}: {str(e)}")
failover_msg += f"⚠️ Failover from {candidate}: {str(e)}\n\n"
else:
model_response = (
failover_msg
+ "❌ All models failed. I couldn’t reach any model to answer that. Could you rephrase or give a bit more detail?"
)
return model_response, used_model, used_reasoning
def _bad_or_empty(self, text: str) -> bool:
if not text or not str(text).strip():
return True
if len(str(text).strip()) < 2:
return True
err_frag = ["error", "invalid", "policy", "blocked", "unavailable"]
return any(frag in str(text).lower() for frag in err_frag)
def _execute_specific_model(
self,
model_id: str,
user_message: str,
messages: List[dict],
api_keys: Dict[str, str],
response_style: str,
) -> str:
"""Execute task with a specific model without failover."""
model_config = self.all_models.get(model_id, {})
model_type = model_config.get("type", "unknown")
instruction = (
"Keep your response short and brief."
if response_style == "brief"
else "Provide a detailed and comprehensive response."
)
if model_type == "openai":
return self._execute_openai_model(
model_id, messages, api_keys["openai"], instruction
)
elif model_type == "google":
return self._execute_google_model(
model_id, user_message, messages, api_keys["google"], instruction
)
elif model_type == "xai":
return self._execute_xai_model(
model_id, messages, api_keys["xai"], instruction
)
elif model_type == "perplexity":
return self._execute_perplexity_model(
model_id, messages, api_keys["perplexity"], instruction
)
else:
raise ValueError(f"Model type '{model_type}' not supported.")
def _execute_openai_model(
self, model_id: str, messages: List[dict], api_key: str, instruction: str
) -> str:
"""Execute task using OpenAI models"""
if not OpenAI:
raise ValueError("OpenAI SDK not available.")
if not api_key:
raise ValueError("OpenAI API key not configured.")
client = OpenAI(api_key=api_key)
openai_messages = [{"role": "system", "content": instruction}] + [
{"role": msg.get("role", "user"), "content": str(msg.get("content", ""))}
for msg in messages
if msg.get("content")
]
if not openai_messages:
raise ValueError("No valid messages found to process.")
params = {"model": model_id, "messages": openai_messages}
# Handle param differences conservatively
if "gpt-5" in model_id:
params["max_completion_tokens"] = 2000
else:
params["max_tokens"] = 2000
params["temperature"] = 0.7
try:
completion = client.chat.completions.create(**params)
except RateLimitError as e:
raise ValueError(f"Quota exceeded for OpenAI: {str(e)}")
except APIError as e:
em = str(e).lower()
# Retry logic for parameter quirks
if "unsupported" in em and "temperature" in em:
params.pop("temperature", None)
completion = client.chat.completions.create(**params)
elif "unsupported" in em and "max_tokens" in em:
params.pop("max_tokens", None)
params["max_completion_tokens"] = 2000
completion = client.chat.completions.create(**params)
else:
raise ValueError(f"OpenAI API error: {str(e)}")
if not getattr(completion, "choices", None):
raise ValueError("No response choices returned from OpenAI API")
content = completion.choices[0].message.content
if not content:
raise ValueError("Empty content in OpenAI response")
return content
def _execute_google_model(
self,
model_id: str,
user_message: str,
messages: List[dict],
api_key: str,
instruction: str,
) -> str:
"""Execute task using Google Gemini models"""
if genai is None:
raise ValueError("google.generativeai SDK not available.")
if not api_key:
raise ValueError("Google API key not configured.")
genai.configure(api_key=api_key)
model_name = model_id.replace("models/", "")
model = genai.GenerativeModel(model_name)
conversation_context = ""
for msg in messages[:-1]:
if msg.get("content"):
role = "Human" if msg.get("role") == "user" else "Assistant"
conversation_context += f"{role}: {msg.get('content', '')}\n"
prompt = (
f"{instruction}\n\n{conversation_context}Human: {user_message}"
if conversation_context
else f"{instruction}\n\nHuman: {user_message}"
)
try:
response = model.generate_content(prompt)
except Exception as e:
if "quota" in str(e).lower() or "429" in str(e):
raise ValueError(f"Quota exceeded for Google: {str(e)}")
raise ValueError(f"Google API error: {str(e)}")
text = getattr(response, "text", None)
if text and str(text).strip():
return text
raise ValueError(f"Empty content in Google response for {model_id}")
def _execute_xai_model(
self, model_id: str, messages: List[dict], api_key: str, instruction: str
) -> str:
"""Execute task using xAI models"""
if not api_key:
raise ValueError("xAI API key not configured.")
try:
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
xai_messages = [{"role": "system", "content": instruction}] + [
{"role": msg.get("role", "user"), "content": msg.get("content", "")}
for msg in messages
if msg.get("content")
]
data = {
"messages": xai_messages,
"model": model_id,
"temperature": 0.7,
"max_tokens": 2000,
}
response = requests.post(
"https://api.x.ai/v1/chat/completions",
headers=headers,
json=data,
timeout=60,
)
response.raise_for_status()
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
if content:
return content
else:
raise ValueError(f"Empty content in xAI response for {model_id}")
else:
raise ValueError(f"No choices in xAI response for {model_id}")
except requests.exceptions.HTTPError as e:
if e.response is not None and e.response.status_code == 429:
raise ValueError(f"Quota exceeded for xAI: {str(e)}")
code = e.response.status_code if e.response is not None else "unknown"
raise ValueError(f"xAI API error: HTTP {code}")
except Exception as e:
raise ValueError(f"xAI API error: {str(e)}")
def _execute_perplexity_model(
self, model_id: str, messages: List[dict], api_key: str, instruction: str
) -> str:
"""Execute task using Perplexity models"""
if not api_key:
raise ValueError("Perplexity API key not configured.")
try:
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
perplexity_messages = [{"role": "system", "content": instruction}] + [
{"role": msg.get("role", "user"), "content": msg.get("content", "")}
for msg in messages
if msg.get("content")
]
data = {
"model": model_id,
"messages": perplexity_messages,
"temperature": 0.7,
"max_tokens": 2000,
}
response = requests.post(
"https://api.perplexity.ai/chat/completions",
headers=headers,
json=data,
timeout=60,
)
response.raise_for_status()
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
if content:
return content
else:
raise ValueError(
f"Empty content in Perplexity response for {model_id}"
)
else:
raise ValueError(f"No choices in Perplexity response for {model_id}")
except requests.exceptions.HTTPError as e:
if e.response is not None and e.response.status_code == 429:
raise ValueError(f"Quota exceeded for Perplexity: {str(e)}")
code = e.response.status_code if e.response is not None else "unknown"
raise ValueError(f"Perplexity API error: HTTP {code}")
except Exception as e:
raise ValueError(f"Perplexity API error: {str(e)}")