"""
title: Combined Gemini, Anthropic, and N8N Manifold Pipe
author: demodomain.dev
version: 1.0
license: MIT
description: A combined pipe that supports Google's Gemini, Anthropic's Claude, Perplexity models, and N8N integration
For the n8n integration:
- create an account here: https://webui.demodomain.dev/ to test the demo of this pipe chatting with a live n8n workflow
- pipe connects to n8n by sending user messages to an n8n webhook.
- pipe sets a looping function to call another n8n webhook, every x seconds to get status updates, and display the latest status in the UI
- updates from n8n can be "status", "no update", or "error"
- handles errors gracefully and shuts down the function that checks for updates on a loop
- uses OpenWebUi's metadata.get("chat_id") for chat session management with n8n
- creates a collapsable element containing COT (Chain of Thought) <thinking> tags above the final message
- see accompanying demo n8n workflow here: https://github.com/yupguv/openwebui/blob/main/n8n_workflow_openwebui_chat
- the demo n8n workflow optionally uses supabase. Table definitions are here: https://github.com/yupguv/openwebui/blob/main/openwebui_n8n_supabase_tables
"""
import os
import json
import time
import requests
import aiohttp
import asyncio
from typing import List, Union, Generator, Iterator, Optional, Callable, Awaitable, Dict
from pydantic import BaseModel, Field
import google.generativeai as genai
from google.generativeai.types import GenerationConfig
from open_webui.utils.misc import pop_system_message
from requests.exceptions import Timeout, RequestException
DEBUG = False
class Pipe:
class Valves(BaseModel):
# API Keys
GOOGLE_API_KEY: str = Field(
default="", description="Your Google API key for accessing Gemini models"
)
ANTHROPIC_API_KEY: str = Field(
default="", description="Your Anthropic API key for accessing Claude models"
)
PERPLEXITY_API_KEY: str = Field(
default="", description="Your Perplexity API key for accessing Llama models"
)
PERPLEXITY_API_BASE_URL: str = Field(
default="https://api.perplexity.ai",
description="Base URL for Perplexity API endpoints",
)
PERPLEXITY_NAME_PREFIX: str = Field(
default="Perplexity/",
description="The prefix applied before Perplexity model names",
)
# N8N Configuration
N8N_URL: str = Field(
default="https://n8n.[your domain].com/webhook/[your webhook URL]",
description="Your N8N webhook URL",
)
N8N_BEARER_TOKEN: str = Field(
default="...", description="Your N8N bearer token"
)
N8N_INPUT_FIELD: str = Field(
default="chatInput", description="Input field name for N8N"
)
N8N_RESPONSE_FIELD: str = Field(
default="output", description="Response field name from N8N"
)
N8N_STATUS_CHECK: str = Field(
default="https://n8n.[your domain].com/webhook/[your webhook URL]",
description="N8N status check endpoint",
)
# Safety and Debug Settings
USE_PERMISSIVE_SAFETY: bool = Field(
default=False,
description="When enabled, sets minimal content filtering. Use with caution in controlled environments only.",
)
DEBUG: bool = Field(
default=False,
description="Enables detailed debug logging for troubleshooting.",
)
# Image Size Limits
ANTHROPIC_MAX_SINGLE_IMAGE_SIZE_MB: float = Field(
default=5.0,
description="Maximum size in megabytes (MB) for a single image when using Anthropic models (default: 5MB)",
)
ANTHROPIC_MAX_TOTAL_IMAGE_SIZE_MB: float = Field(
default=100.0,
description="Maximum total size in megabytes (MB) for all images in a single request to Anthropic models (default: 100MB)",
)
# API Configuration
ANTHROPIC_API_VERSION: str = Field(
default="2023-06-01",
description="Anthropic API version to use. Update this if you need features from a newer API version.",
)
ANTHROPIC_API_URL: str = Field(
default="https://api.anthropic.com/v1/messages",
description="Base URL for Anthropic's API. Only change if using a different endpoint.",
)
ANTHROPIC_MODELS: str = Field(
default="claude-3-5-sonnet-20241022:Claude 3.5 Sonnet,claude-3-5-haiku-20241022:Claude 3.5 Haiku,claude-3-opus-20240229:Claude 3 Opus,claude-3-sonnet-20240229:Claude 3 Sonnet,claude-3-haiku-20240307:Claude 3 Haiku",
description="Available Anthropic models in format 'model_id:display_name'. Separate multiple models with commas.",
)
# Default Generation Parameters
DEFAULT_TEMPERATURE: float = Field(
default=0.7,
description="Controls randomness in responses. Higher values (e.g., 0.8) create more diverse outputs, lower values (e.g., 0.2) are more focused.",
)
DEFAULT_TOP_P: float = Field(
default=0.9,
description="Controls diversity via nucleus sampling. Higher values allow more diverse tokens.",
)
DEFAULT_TOP_K: int = Field(
default=40,
description="Controls diversity by limiting the number of tokens considered at each step.",
)
DEFAULT_MAX_TOKENS: int = Field(
default=4096,
description="Maximum number of tokens to generate in the response.",
)
DEFAULT_STREAM: bool = Field(
default=False,
description="Whether to stream responses by default. Streaming provides faster initial responses.",
)
STREAM_DELAY: float = Field(
default=0.01,
description="Delay between streamed chunks in seconds to prevent overwhelming the client.",
)
# Request Timeouts
CONNECT_TIMEOUT: float = Field(
default=3.05,
description="Maximum time (in seconds) to wait for initial connection to API.",
)
READ_TIMEOUT: float = Field(
default=60.0,
description="Maximum time (in seconds) to wait for API response after connection.",
)
def __init__(self):
self.type = "manifold"
self.id = "combined_ai_n8n"
self.name = "Combined AI and N8N"
self.valves = self.Valves(
GOOGLE_API_KEY=os.getenv("GOOGLE_API_KEY", ""),
ANTHROPIC_API_KEY=os.getenv("ANTHROPIC_API_KEY", ""),
N8N_URL=os.getenv("N8N_URL", ""),
N8N_BEARER_TOKEN=os.getenv("N8N_BEARER_TOKEN", ""),
N8N_INPUT_FIELD=os.getenv("N8N_INPUT_FIELD", "chatInput"),
N8N_RESPONSE_FIELD=os.getenv("N8N_RESPONSE_FIELD", "output"),
N8N_STATUS_CHECK=os.getenv("N8N_STATUS_CHECK", ""),
USE_PERMISSIVE_SAFETY=os.getenv("USE_PERMISSIVE_SAFETY", False),
)
self.status_loop_task = None
self.thinking_messages = []
def pipes(self) -> List[dict]:
"""Get all available models"""
all_pipes = []
all_pipes.extend(self.get_google_models())
all_pipes.extend(self.get_anthropic_models())
all_pipes.extend(self.get_perplexity_models())
all_pipes.extend(
[
{"id": "all_models.n8n.Main", "name": "N8N: Main"},
{"id": "all_models.n8n.Brainstorm", "name": "N8N: Brainstorm"},
{"id": "all_models.n8n.Zoho", "name": "N8N: Zoho"},
{"id": "all_models.n8n.Processes", "name": "N8N: Processes"},
]
)
return all_pipes
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
__metadata__: dict,
done: bool,
body: dict,
__user__: Optional[dict] = None,
):
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
def get_google_models(self):
"""Get available Google models"""
if not self.valves.GOOGLE_API_KEY:
return []
try:
genai.configure(api_key=self.valves.GOOGLE_API_KEY)
models = genai.list_models()
return [
{
"id": f"google.{model.name[7:]}",
"name": f"Google: {model.display_name}",
}
for model in models
if "generateContent" in model.supported_generation_methods
if model.name.startswith("models/")
]
except Exception as e:
if DEBUG:
print(f"Error fetching Google models: {e}")
return []
def get_anthropic_models(self):
"""Get available Anthropic models"""
if not self.valves.ANTHROPIC_API_KEY:
return []
try:
models = []
model_pairs = self.valves.ANTHROPIC_MODELS.split(",")
for pair in model_pairs:
if ":" in pair:
model_id, display_name = pair.strip().split(":")
models.append({"id": model_id, "name": display_name})
else:
if self.valves.DEBUG:
print(f"Invalid model pair format: {pair}")
return models
except Exception as e:
if self.valves.DEBUG:
print(f"Error parsing Anthropic models: {e}")
return []
def get_perplexity_models(self):
"""Get available Perplexity models"""
if not self.valves.PERPLEXITY_API_KEY:
return []
return [
{"id": f"perplexity.{model_id}", "name": f"Perplexity: {display_name}"}
for model_id, display_name in [
(
"llama-3.1-sonar-small-128k-online",
"Llama 3.1 Sonar Small 128k Online",
),
(
"llama-3.1-sonar-large-128k-online",
"Llama 3.1 Sonar Large 128k Online",
),
(
"llama-3.1-sonar-huge-128k-online",
"Llama 3.1 Sonar Huge 128k Online",
),
("llama-3.1-sonar-small-128k-chat", "Llama 3.1 Sonar Small 128k Chat"),
("llama-3.1-sonar-large-128k-chat", "Llama 3.1 Sonar Large 128k Chat"),
("llama-3.1-8b-instruct", "Llama 3.1 8B Instruct"),
("llama-3.1-70b-instruct", "Llama 3.1 70B Instruct"),
]
]
def process_image(self, image_data):
"""Process image data with size validation."""
if image_data["image_url"]["url"].startswith("data:image"):
mime_type, base64_data = image_data["image_url"]["url"].split(",", 1)
media_type = mime_type.split(":")[1].split(";")[0]
# Calculate image size in MB from base64 data
image_size = len(base64_data) * 3 / 4 # size in bytes
image_size_mb = image_size / (1024 * 1024) # convert to MB
if image_size_mb > self.valves.ANTHROPIC_MAX_SINGLE_IMAGE_SIZE_MB:
raise ValueError(
f"Image size of {image_size_mb:.2f}MB exceeds {self.valves.ANTHROPIC_MAX_SINGLE_IMAGE_SIZE_MB}MB limit"
)
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64_data,
},
}
else:
url = image_data["image_url"]["url"]
response = requests.head(url, allow_redirects=True)
content_length = int(response.headers.get("content-length", 0))
content_length_mb = content_length / (1024 * 1024) # Convert to MB
if content_length_mb > self.valves.ANTHROPIC_MAX_SINGLE_IMAGE_SIZE_MB:
raise ValueError(
f"Image at URL exceeds size limit: {content_length_mb:.2f}MB > {self.valves.ANTHROPIC_MAX_SINGLE_IMAGE_SIZE_MB}MB"
)
return {
"type": "image",
"source": {"type": "url", "url": url},
}
def validate_api_keys(self, model_id: str) -> None:
"""Validate required API keys are present before making requests"""
if "n8n." in model_id:
if not self.valves.N8N_BEARER_TOKEN:
raise ValueError("N8N_BEARER_TOKEN is required for N8N operations")
return
if "google" in model_id.lower() or "gemini" in model_id.lower():
if not self.valves.GOOGLE_API_KEY:
raise ValueError("GOOGLE_API_KEY is required for Google/Gemini models")
elif "anthropic" in model_id.lower() or "claude" in model_id.lower():
if not self.valves.ANTHROPIC_API_KEY:
raise ValueError(
"ANTHROPIC_API_KEY is required for Anthropic/Claude models"
)
elif "perplexity" in model_id.lower() or "llama" in model_id.lower():
if not self.valves.PERPLEXITY_API_KEY:
raise ValueError(
"PERPLEXITY_API_KEY is required for Perplexity/Llama models"
)
else:
raise ValueError(
f"Unable to determine required API key for model: {model_id}"
)
def make_api_request(
self, url: str, headers: dict, payload: dict, stream: bool = False
) -> Union[requests.Response, str]:
"""Make API request with proper timeout handling"""
try:
response = requests.post(
url,
headers=headers,
json=payload,
stream=stream,
timeout=(self.valves.CONNECT_TIMEOUT, self.valves.READ_TIMEOUT),
)
if response.status_code != 200:
error_msg = f"HTTP {response.status_code}: {response.text}"
if self.valves.DEBUG:
print(error_msg)
raise RequestException(error_msg)
return response
except Timeout:
error_msg = f"Request timed out after {self.valves.READ_TIMEOUT} seconds"
if self.valves.DEBUG:
print(error_msg)
raise TimeoutError(error_msg)
except RequestException as e:
error_msg = f"Request failed: {str(e)}"
if self.valves.DEBUG:
print(error_msg)
raise
def handle_google_request(
self, model_id: str, body: dict
) -> Union[str, Iterator[str]]:
"""Handle requests for Google models"""
try:
genai.configure(api_key=self.valves.GOOGLE_API_KEY)
# Convert various Gemini model formats to the standard format
if not model_id.startswith("gemini-"):
if model_id.startswith("gemini2-") or model_id.startswith("gemini-2"):
model_id = f"gemini-{model_id.split('-', 1)[1]}"
else:
model_id = f"gemini-{model_id}"
if DEBUG:
print(f"Using Google model ID: {model_id}")
messages = body["messages"]
stream = body.get("stream", False)
if DEBUG:
print("Incoming body:", str(body))
system_message = next(
(msg["content"] for msg in messages if msg["role"] == "system"), None
)
contents = []
for message in messages:
if message["role"] != "system":
if isinstance(message.get("content"), list):
parts = []
for content in message["content"]:
if content["type"] == "text":
parts.append({"text": content["text"]})
elif content["type"] == "image_url":
image_url = content["image_url"]["url"]
if image_url.startswith("data:image"):
image_data = image_url.split(",")[1]
parts.append(
{
"inline_data": {
"mime_type": "image/jpeg",
"data": image_data,
}
}
)
else:
parts.append({"image_url": image_url})
contents.append({"role": message["role"], "parts": parts})
else:
contents.append(
{
"role": (
"user" if message["role"] == "user" else "model"
),
"parts": [{"text": message["content"]}],
}
)
if system_message:
contents.insert(
0,
{"role": "user", "parts": [{"text": f"System: {system_message}"}]},
)
if "gemini-1.5" in model_id:
model = genai.GenerativeModel(
model_name=model_id, system_instruction=system_message
)
else:
model = genai.GenerativeModel(model_name=model_id)
generation_config = GenerationConfig(
temperature=body.get("temperature", self.valves.DEFAULT_TEMPERATURE),
top_p=body.get("top_p", self.valves.DEFAULT_TOP_P),
top_k=body.get("top_k", self.valves.DEFAULT_TOP_K),
max_output_tokens=body.get(
"max_tokens", self.valves.DEFAULT_MAX_TOKENS
),
stop_sequences=body.get("stop", []),
)
# Safety settings
if self.valves.USE_PERMISSIVE_SAFETY:
safety_settings = {
genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE,
genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE,
genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE,
}
else:
safety_settings = body.get("safety_settings")
if stream:
def stream_generator():
response = model.generate_content(
contents,
generation_config=generation_config,
safety_settings=safety_settings,
stream=True,
)
for chunk in response:
if chunk.text:
yield chunk.text
return stream_generator()
else:
response = model.generate_content(
contents,
generation_config=generation_config,
safety_settings=safety_settings,
stream=False,
)
return response.text
except Exception as e:
if self.valves.DEBUG:
print(f"Error in Google request: {e}")
return f"Error: {e}"
def handle_anthropic_request(
self, model_id: str, body: dict
) -> Union[str, Generator]:
try:
system_message, messages = pop_system_message(body["messages"])
processed_messages = []
total_image_size = 0 # in bytes
for message in messages:
processed_content = []
if isinstance(message.get("content"), list):
for item in message["content"]:
if item["type"] == "text":
processed_content.append(
{"type": "text", "text": item["text"]}
)
elif item["type"] == "image_url":
processed_image = self.process_image(item)
processed_content.append(processed_image)
if processed_image["source"]["type"] == "base64":
image_size = (
len(processed_image["source"]["data"]) * 3 / 4
) # size in bytes
total_image_size += image_size
total_image_size_mb = total_image_size / (
1024 * 1024
) # convert to MB
if (
total_image_size_mb
> self.valves.ANTHROPIC_MAX_TOTAL_IMAGE_SIZE_MB
):
raise ValueError(
f"Total size of images ({total_image_size_mb:.2f}MB) exceeds {self.valves.ANTHROPIC_MAX_TOTAL_IMAGE_SIZE_MB}MB limit for Anthropic"
)
else:
processed_content = [
{"type": "text", "text": message.get("content", "")}
]
processed_messages.append(
{"role": message["role"], "content": processed_content}
)
payload = {
"model": model_id,
"messages": processed_messages,
"max_tokens": body.get("max_tokens", self.valves.DEFAULT_MAX_TOKENS),
"temperature": body.get("temperature", self.valves.DEFAULT_TEMPERATURE),
"top_k": body.get("top_k", self.valves.DEFAULT_TOP_K),
"top_p": body.get("top_p", self.valves.DEFAULT_TOP_P),
"stop_sequences": body.get("stop", []),
**({"system": str(system_message)} if system_message else {}),
"stream": body.get("stream", self.valves.DEFAULT_STREAM),
}
headers = {
"x-api-key": self.valves.ANTHROPIC_API_KEY,
"anthropic-version": self.valves.ANTHROPIC_API_VERSION,
"content-type": "application/json",
}
url = self.valves.ANTHROPIC_API_URL
if body.get("stream", False):
return self.stream_anthropic_response(url, headers, payload)
else:
return self.non_stream_anthropic_response(url, headers, payload)
except Exception as e:
if DEBUG:
print(f"Error in Anthropic request: {e}")
return f"Error: {e}"
def stream_anthropic_response(self, url, headers, payload):
"""Handle streaming responses from Anthropic"""
try:
with requests.post(
url,
headers=headers,
json=payload,
stream=True,
timeout=(self.valves.CONNECT_TIMEOUT, self.valves.READ_TIMEOUT),
) as response:
if response.status_code != 200:
raise Exception(
f"HTTP Error {response.status_code}: {response.text}"
)
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if data["type"] == "content_block_start":
yield data["content_block"]["text"]
elif data["type"] == "content_block_delta":
yield data["delta"]["text"]
elif data["type"] == "message_stop":
break
elif data["type"] == "message":
for content in data.get("content", []):
if content["type"] == "text":
yield content["text"]
time.sleep(
self.valves.STREAM_DELAY
) # Small delay to prevent overwhelming client
except json.JSONDecodeError:
print(f"Failed to parse JSON: {line}")
except KeyError as e:
print(f"Unexpected data structure: {e}")
print(f"Full data: {data}")
except Exception as e:
yield f"Error in streaming response: {str(e)}"
def non_stream_anthropic_response(self, url, headers, payload):
"""Handle non-streaming responses from Anthropic"""
try:
response = self.make_api_request(url, headers, payload)
res = response.json()
return (
res["content"][0]["text"] if "content" in res and res["content"] else ""
)
except Exception as e:
return f"Error in non-streaming response: {str(e)}"
async def get_n8n_status(
self, body: dict, __metadata__: dict, __user__: Optional[dict] = None
) -> str:
headers = {
"Authorization": f"Bearer {self.valves.N8N_BEARER_TOKEN}",
"Content-Type": "application/json",
}
payload = {
"sessionId": __metadata__.get("chat_id"),
"user_id": __user__["id"],
"input": "status check",
}
payload[self.valves.N8N_INPUT_FIELD] = "status check"
try:
async with aiohttp.ClientSession() as session:
async with session.post(
self.valves.N8N_STATUS_CHECK,
json=payload,
headers=headers,
timeout=5,
) as response:
if response.status == 200:
response_data = await response.json()
if isinstance(response_data, list) and len(response_data) > 0:
status_response = response_data[0].get(
self.valves.N8N_RESPONSE_FIELD,
"No response field found",
)
else:
status_response = response_data.get(
self.valves.N8N_RESPONSE_FIELD,
"No response field found",
)
return status_response
else:
raise Exception(
f"Error: {response.status} - {await response.text()}"
)
except Exception as e:
return f"Error fetching status: {str(e)}"
def handle_perplexity_request(
self, model_id: str, body: dict
) -> Union[str, Generator]:
"""Handle requests for Perplexity models"""
try:
# Fix model ID handling
if model_id.startswith("perplexity."):
model_id = model_id[len("perplexity.") :]
# Ensure the model ID has the correct prefix
if not model_id.startswith("llama-"):
model_id = f"llama-{model_id}"
# Make sure we have the complete model ID
if not model_id.startswith("llama-3."):
model_id = f"llama-3.{model_id.replace('llama-', '')}"
headers = {
"Authorization": f"Bearer {self.valves.PERPLEXITY_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json",
}
system_message, messages = pop_system_message(body.get("messages", []))
system_prompt = "You are a helpful assistant."
if system_message is not None:
system_prompt = system_message["content"]
payload = {
"model": model_id,
"messages": [{"role": "system", "content": system_prompt}, *messages],
"stream": body.get("stream", True),
"return_citations": True,
"return_images": True,
}
if self.valves.DEBUG:
print(f"Perplexity request model ID: {model_id}")
print(f"Perplexity request payload: {payload}")
response = self.make_api_request(
f"{self.valves.PERPLEXITY_API_BASE_URL}/chat/completions",
headers,
payload,
stream=body.get("stream", False),
)
if body.get("stream", False):
return response.iter_lines()
else:
response_json = response.json()
return response_json["choices"][0]["message"]["content"]
except Exception as e:
if self.valves.DEBUG:
print(f"Error in Perplexity request: {e}")
return f"Error: {e}"
async def status_loop(self, body, __user__, __event_emitter__, __metadata__: dict):
while True:
message = await self.get_n8n_status(body, __metadata__, __user__)
if "Error - " in message:
await __event_emitter__(
{
"type": "message",
"data": {"content": message},
}
)
await self.emit_status(
__event_emitter__,
"status",
"",
__metadata__,
True,
body,
__user__,
)
if self.status_loop_task is not None:
self.status_loop_task.cancel()
self.status_loop_task = None
break
else:
if message == "no update":
pass
elif "<details>" in message:
message = message.replace(r"\n", "\n")
self.thinking_messages.append(
{"role": "assistant", "content": message}
)
await __event_emitter__(
{
"type": "message",
"data": {"content": message},
}
)
else:
await self.emit_status(
__event_emitter__,
"status",
message,
__metadata__,
False,
body,
__user__,
)
await asyncio.sleep(2)
async def handle_n8n_request(
self,
body: dict,
__metadata__: dict,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Callable[[dict], Awaitable[dict]]] = None,
) -> Optional[dict]:
messages = body.get("messages", [])
if not messages:
return {"error": "No messages found in request body"}
question = messages[-1]["content"]
if "Prompt: " in question:
question = question.split("Prompt: ")[-1]
try:
await self.emit_status(
__event_emitter__,
"status",
"sending...",
__metadata__,
False,
body,
__user__,
)
self.status_loop_task = asyncio.create_task(
self.status_loop(body, __user__, __event_emitter__, __metadata__)
)
headers = {
"Authorization": f"Bearer {self.valves.N8N_BEARER_TOKEN}",
"Content-Type": "application/json",
}
payload = {
"sessionId": __metadata__.get("chat_id"),
"user_id": __user__["id"],
"model_id": body["model"],
}
payload[self.valves.N8N_INPUT_FIELD] = question
async with aiohttp.ClientSession() as session:
async with session.post(
self.valves.N8N_URL,
json=payload,
headers=headers,
timeout=90,
) as response:
if response.status == 200:
response_data = await asyncio.wait_for(
response.json(), timeout=60
)
n8n_response = response_data[self.valves.N8N_RESPONSE_FIELD]
else:
raise Exception(
f"Error: {response.status} - {await response.text()}"
)
if n8n_response is None:
n8n_response = (
"Received a NULL response from n8n. Please check the workflow."
)
else:
n8n_response = n8n_response.replace(r"\n", "\n")
# Cancel the status loop task BEFORE sending the final response
if self.status_loop_task is not None:
self.status_loop_task.cancel()
self.status_loop_task = None
# Update messages with thinking messages and final response
for thinking_message in self.thinking_messages:
messages.append(
{
"type": "message",
"data": {"content": thinking_message["content"]},
}
)
messages.append(
{
"type": "message",
"data": {"content": n8n_response},
}
)
if __event_emitter__:
# Send final status to indicate completion
await self.emit_status(
__event_emitter__,
"status",
"Complete",
__metadata__,
True, # Mark as done
body,
__user__,
)
# Send the final response
await __event_emitter__(
{
"type": "message",
"data": {"content": n8n_response},
}
)
return messages
except Exception as e:
if self.status_loop_task:
self.status_loop_task.cancel()
self.status_loop_task = None
error_msg = f"Error in N8N request: {str(e)}"
if __event_emitter__:
await self.emit_status(
__event_emitter__,
"status",
error_msg,
__metadata__,
True,
body,
__user__,
)
return {"error": error_msg}
async def pipe(
self,
body: dict,
__metadata__: dict = None,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[dict]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Union[str, Generator, Iterator, Optional[dict]]:
"""Main pipe method that routes requests to appropriate provider"""
if not body.get("model"):
return "Error: No model specified"
try:
model_id = body["model"]
# First, clean up the model ID by removing all_models prefix
if model_id.startswith("all_models."):
model_id = model_id[11:] # Remove "all_models."
# Now validate and check for n8n
self.validate_api_keys(model_id)
if "n8n." in model_id:
return await self.handle_n8n_request(
body, __metadata__, __user__, __event_emitter__
)
parts = model_id.split(".")
# If we have multiple parts, check for provider info
if len(parts) > 1:
# Look for provider indicators in all parts
if any(part == "anthropic" for part in parts):
provider = "anthropic"
model_id = parts[-1] # Get the last part as model name
elif any(part in ("google", "google_genai") for part in parts):
provider = "google"
# For Google models, preserve the full model name
if "gemini" in model_id:
model_id = model_id[model_id.find("gemini") :]
else:
model_id = parts[-1] # Fallback to last part if no gemini found
elif any(part == "perplexity" for part in parts):
provider = "perplexity"
model_id = parts[-1] # Get the last part as model name
else:
# Check the model name patterns
model_name = parts[-1]
if any(
model_name.startswith(prefix)
for prefix in ["gemini-", "gemini_", "gemini2-", "gemini-2"]
):
provider = "google"
model_id = model_name
elif model_name.startswith("claude-"):
provider = "anthropic"
model_id = model_name
else:
raise ValueError(
f"Unable to determine provider from model ID: {model_id}"
)
else:
# Single part - check the model name patterns
if any(
model_id.startswith(prefix)
for prefix in ["gemini-", "gemini_", "gemini2-", "gemini-2"]
):
provider = "google"
elif model_id.startswith("claude-"):
provider = "anthropic"
elif "perplexity" in model_id.lower() or "llama" in model_id.lower():
provider = "perplexity"
else:
raise ValueError(
f"Unable to determine provider from model ID: {model_id}"
)
if DEBUG:
print(f"Original model ID: {body['model']}")
print(f"Parsed provider: {provider}")
print(f"Parsed model ID: {model_id}")
if DEBUG:
print(f"Routing request to {provider} for model {model_id}")
# Route to appropriate handler
if provider == "google":
return self.handle_google_request(model_id, body)
elif provider == "anthropic":
return self.handle_anthropic_request(model_id, body)
elif provider == "perplexity":
return self.handle_perplexity_request(model_id, body)
else:
raise ValueError(f"Unknown provider: {provider}")
except Exception as e:
if DEBUG:
print(f"Error in pipe: {e}")
return f"Error: {str(e)}"
finally:
if self.status_loop_task is not None:
self.status_loop_task.cancel()
self.status_loop_task = None