"""
title: Unified Pipe for Flowise and Ollama with Corrected Ollama Endpoint and Response Parsing
version: 1.0
"""
from typing import Optional, Callable, Awaitable, Dict, Any, List, Union, Generator
import aiohttp
import json
import time
import asyncio
from pydantic import BaseModel, Field
class Pipe:
"""
Unified Pipeline for managing interactions with both Flowise and Ollama APIs.
- Routes summarization requests to Ollama based on a predefined prefix.
- Routes all other requests to Flowise.
Requires:
- Flowise service: https://github.com/FlowiseAI/Flowise
- Ollama service: https://ollama.com/
NOTE:
- Ensure Flowise is deployed on a different port (e.g., 3030) to avoid conflicts.
"""
class Valves(BaseModel):
# Configuration for Flowise
FLOWISE_API_ENDPOINT: str = Field(
default="http://host.docker.internal:3030/",
description="Base URL for the Flowise API endpoint.",
)
FLOWISE_USERNAME: Optional[str] = Field(
default=None, description="Username for Flowise API auth."
)
FLOWISE_PASSWORD: Optional[str] = Field(
default=None, description="Password for Flowise API auth."
)
FLOWISE_CHATFLOW_ID: str = Field(
default="", description="Chatflow ID for the Flowise API."
)
# Configuration for Ollama
OLLAMA_API_ENDPOINT: str = Field(
default="http://host.docker.internal:11435",
description="Base URL for the Ollama API endpoint.",
)
OLLAMA_API_KEY: Optional[str] = Field(
default=None, description="API key for Ollama API auth."
)
OLLAMA_MODEL_ID: str = Field(
default="llama3.2", description="Model ID for the Ollama API."
)
# Summarization Valve
SUMMARIZATION_PROMPT_PREFIX: str = Field(
default="Create a concise, 3-5 word title with an emoji as a title for the prompt in the given language.",
description="Prefix that identifies a summarization request.",
)
# Common Settings
emit_interval: float = Field(
default=1.0, description="Interval between status emissions."
)
enable_status_indicator: bool = Field(
default=True, description="Enable/disable status indicator."
)
request_timeout: int = Field(
default=300, description="HTTP client timeout in seconds."
)
debug: bool = Field(
default=True, description="Enable or disable debug logging."
)
def __init__(self):
self.valves = self.Valves()
self.stop_emitter = asyncio.Event()
self.chat_sessions = (
{}
) # Store chat sessions {user_id: {"chat_id": ..., "history": [...] }}
def log(self, message: str):
"""Logs a message if debugging is enabled."""
if self.valves.debug:
print(f"[DEBUG] {message}")
def clean_response_text(self, text: str) -> str:
"""
Removes unnecessary surrounding quotes from the response.
Handles cases where the response text may be wrapped in quotes.
"""
self.log(f"Original text before cleaning: {text!r}")
# Remove outer quotes if present
while text.startswith('"') and text.endswith('"'):
text = text[1:-1]
self.log(f"Text after stripping quotes: {text!r}")
cleaned_text = text.strip()
self.log(f"Final cleaned text: {cleaned_text!r}")
return cleaned_text
def _get_latest_user_message(self, messages: List[Dict[str, str]]) -> Optional[str]:
"""
Extracts the latest user message content from the messages list.
Strips any "User: " prefix if present.
"""
for message in reversed(messages):
if message.get("role") == "user":
content = message.get("content", "").strip()
if content.startswith("User: "):
content = content[len("User: ") :].strip()
self.log(f"Stripped 'User: ' prefix. Content: {content}")
else:
self.log(f"No 'User: ' prefix found. Content: {content}")
if content:
self.log(f"Latest user question extracted: {content}")
return content
self.log("No user message found in the messages.")
return None
def _get_combined_prompt(self, messages: List[Dict[str, str]]) -> str:
"""
Combines user and assistant messages into a structured prompt.
Example:
User: hi
Assistant: How can I assist you today?
User: 5 words to describe ai
"""
prompt_parts = [
f"{message.get('role', 'user').capitalize()}: {message.get('content', '')}"
for message in messages
]
combined_prompt = "\n".join(prompt_parts)
self.log(f"Combined prompt:\n{combined_prompt}")
return combined_prompt
async def emit_periodic_status(
self,
__event_emitter__: Optional[Callable[[dict], Awaitable[None]]],
message: str,
interval: float,
):
"""Periodically emit status updates."""
start_time = time.time()
try:
while not self.stop_emitter.is_set():
elapsed_time = time.time() - start_time
await self.emit_status(
__event_emitter__,
"info",
f"{message} (elapsed: {elapsed_time:.1f}s)",
False,
)
await asyncio.sleep(interval)
except asyncio.CancelledError:
self.log("Periodic status emission cancelled.")
async def emit_status(
self,
__event_emitter__: Optional[Callable[[dict], Awaitable[None]]],
level: str,
message: str,
done: bool,
):
"""Emit status events."""
if callable(__event_emitter__):
event = {"type": "status", "data": {"description": message, "done": done}}
self.log(f"Emitting status event: {event}")
await __event_emitter__(event)
else:
self.log("No valid event emitter provided. Skipping event emission.")
async def pipe(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Optional[Callable[[dict], Awaitable[None]]] = None,
) -> Union[str, Generator[str, None, None]]:
"""
Main pipe method to handle routing.
- Routes summarization requests to Ollama.
- Routes all other requests to Flowise.
"""
status_task = None
start_time = time.time()
try:
# Emit periodic status if enabled
if callable(__event_emitter__) and self.valves.enable_status_indicator:
self.log("Starting periodic status emitter...")
self.stop_emitter.clear()
status_task = asyncio.create_task(
self.emit_periodic_status(
__event_emitter__,
"Processing request...",
self.valves.emit_interval,
)
)
else:
self.log("No valid event emitter provided. Skipping periodic status.")
# Extract messages and create the prompt
messages = body.get("messages", [])
self.log(f"Messages extracted: {messages}")
if not messages:
self.log("No messages found in the request body.")
return "Error: No messages found."
prompt = self._get_combined_prompt(messages)
self.log(f"Prompt prepared: {prompt}")
# Extract the latest user message
latest_user_question = self._get_latest_user_message(messages)
if not latest_user_question:
self.log("No user message found in the messages.")
return "Error: No user message found."
# Determine if the request is for summarization
if self.is_summarization_request(latest_user_question):
self.log("Summarization request detected. Routing to Ollama.")
# Delegate to Ollama API
response = await self.handle_ollama_request(
latest_user_question, __user__, __event_emitter__
)
else:
self.log("Regular request detected. Routing to Flowise.")
# Delegate to Flowise API
response = await self.handle_flowise_request(
latest_user_question, __user__, __event_emitter__
)
# Emit final status
elapsed_time = time.time() - start_time
await self.emit_status(
__event_emitter__,
"info",
f"Pipe Completed in {elapsed_time:.1f}s",
True,
)
return response
except Exception as e:
self.log(f"Error during pipe execution: {str(e)}")
return f"Error: {e}"
finally:
if status_task:
self.stop_emitter.set()
await status_task
def is_summarization_request(self, question: str) -> bool:
"""
Determine if the request is for summarization based on the 'question' field.
Checks if the question starts with the predefined summarization prompt prefix.
"""
is_match = question.startswith(self.valves.SUMMARIZATION_PROMPT_PREFIX)
self.log(f"Is summarization request (starts with prefix): {is_match}")
return is_match
async def handle_ollama_request(
self,
question: str,
__user__: Optional[dict],
__event_emitter__: Optional[Callable[[dict], Awaitable[None]]],
) -> Union[str, Generator[str, None, None]]:
"""
Handles summarization requests by sending them to Ollama.
"""
try:
# Prepare the payload for Ollama
payload = {
"prompt": question,
"model": self.valves.OLLAMA_MODEL_ID, # Ensure this matches Ollama's API requirements
# Add other necessary fields as per Ollama's API requirements
}
self.log(f"Payload for Ollama: {payload}")
# Construct the full Ollama API URL by appending '/v1/completions'
url = f"{self.valves.OLLAMA_API_ENDPOINT.rstrip('/')}/v1/completions"
headers = {"Content-Type": "application/json"}
# Handle authentication if provided
if self.valves.OLLAMA_API_KEY:
headers["Authorization"] = f"Bearer {self.valves.OLLAMA_API_KEY}"
self.log("Ollama authentication enabled.")
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.valves.request_timeout)
) as session:
async with session.post(url, json=payload, headers=headers) as response:
response_text = await response.text()
self.log(f"Ollama response status: {response.status}")
self.log(f"Ollama response text: {response_text}")
if response.status != 200:
self.log(
f"Ollama API call failed with status: {response.status}"
)
return f"Error: Ollama API call failed with status {response.status}"
# Extract and clean the response text
try:
data = json.loads(response_text)
self.log(f"Parsed Ollama response data: {data}")
except json.JSONDecodeError:
self.log("Failed to decode JSON from Ollama's response.")
return "Error: Invalid JSON response from Ollama."
# Extract the generated text from 'choices'
choices = data.get("choices", [])
if not choices:
self.log("No choices found in Ollama's response.")
return "Error: No choices found in Ollama's response."
first_choice = choices[0]
raw_text = first_choice.get("text", "")
self.log(f"Raw text from first choice: {raw_text!r}")
text = self.clean_response_text(raw_text)
if not text:
self.log("No valid text found in Ollama's response.")
return "Error: Empty response from Ollama."
self.log(f"Extracted text from Ollama: {text!r}")
# Optionally, update chat session if needed
# For example, if Ollama returns a new chat_id
new_chat_id = data.get("chat_id")
if new_chat_id:
user_id = (
__user__.get("user_id", "default_user")
if __user__
else "default_user"
)
if user_id not in self.chat_sessions:
self.chat_sessions[user_id] = {
"chat_id": None,
"history": [],
}
self.chat_sessions[user_id]["chat_id"] = new_chat_id
return text
except Exception as e:
self.log(f"Error during Ollama request handling: {str(e)}")
return f"Error: {e}"
async def handle_flowise_request(
self,
question: str,
__user__: Optional[dict],
__event_emitter__: Optional[Callable[[dict], Awaitable[None]]],
) -> Union[str, Generator[str, None, None]]:
"""
Handles regular requests by sending them to Flowise.
"""
try:
# Prepare the payload for Flowise
payload = {"question": question}
# Include chatId if it exists in the session
user_id = (
__user__.get("user_id", "default_user") if __user__ else "default_user"
)
chat_session = self.chat_sessions.get(user_id, {})
chat_id = chat_session.get("chat_id")
if chat_id:
payload["chatId"] = chat_id
self.log(f"Payload for Flowise: {payload}")
# Send the request to Flowise API
endpoint = self.valves.FLOWISE_API_ENDPOINT.rstrip("/")
url = f"{endpoint}/api/v1/prediction/{self.valves.FLOWISE_CHATFLOW_ID}"
headers = {"Content-Type": "application/json"}
# Handle authentication if provided
auth = None
if self.valves.FLOWISE_USERNAME and self.valves.FLOWISE_PASSWORD:
auth = aiohttp.BasicAuth(
self.valves.FLOWISE_USERNAME, self.valves.FLOWISE_PASSWORD
)
self.log("Flowise authentication enabled.")
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.valves.request_timeout),
auth=auth,
) as session:
async with session.post(url, json=payload, headers=headers) as response:
response_text = await response.text()
self.log(f"Flowise response status: {response.status}")
self.log(f"Flowise response text: {response_text}")
if response.status != 200:
self.log(
f"Flowise API call failed with status: {response.status}"
)
return f"Error: Flowise API call failed with status {response.status}"
# Extract and clean the response text
try:
data = json.loads(response_text)
self.log(f"Parsed Flowise response data: {data}")
except json.JSONDecodeError:
self.log("Failed to decode JSON from Flowise's response.")
return "Error: Invalid JSON response from Flowise."
raw_text = data.get("text", "")
text = self.clean_response_text(raw_text)
new_chat_id = data.get("chatId", chat_id)
if not text:
self.log("No valid text found in Flowise's response.")
return "Error: Empty response from Flowise."
self.log(f"Extracted text from Flowise: {text!r}")
self.log(f"New chat ID from Flowise: {new_chat_id}")
# Update chat session
if user_id not in self.chat_sessions:
self.chat_sessions[user_id] = {"chat_id": None, "history": []}
self.chat_sessions[user_id]["chat_id"] = new_chat_id
# Append to chat history
self.chat_sessions[user_id]["history"].append(
{"role": "assistant", "content": text}
)
return text
except Exception as e:
self.log(f"Error during Flowise request handling: {str(e)}")
return f"Error: {e}"