Function
pipe
v1.0
Flowise Pipe
Makes use of a Flowise endpoint for inference. Summarise using Ollama.
Function ID
flowise_pipe
Creator
@matthewh
Downloads
44+

Function Content
python
"""
title: Unified Pipe for Flowise and Ollama with Corrected Ollama Endpoint and Response Parsing
version: 1.0
"""

from typing import Optional, Callable, Awaitable, Dict, Any, List, Union, Generator
import aiohttp
import json
import time
import asyncio
from pydantic import BaseModel, Field


class Pipe:
    """
    Unified Pipeline for managing interactions with both Flowise and Ollama APIs.

    - Routes summarization requests to Ollama based on a predefined prefix.
    - Routes all other requests to Flowise.

    Requires:
    - Flowise service: https://github.com/FlowiseAI/Flowise
    - Ollama service: https://ollama.com/

    NOTE:
    - Ensure Flowise is deployed on a different port (e.g., 3030) to avoid conflicts.
    """

    class Valves(BaseModel):
        # Configuration for Flowise
        FLOWISE_API_ENDPOINT: str = Field(
            default="http://host.docker.internal:3030/",
            description="Base URL for the Flowise API endpoint.",
        )
        FLOWISE_USERNAME: Optional[str] = Field(
            default=None, description="Username for Flowise API auth."
        )
        FLOWISE_PASSWORD: Optional[str] = Field(
            default=None, description="Password for Flowise API auth."
        )
        FLOWISE_CHATFLOW_ID: str = Field(
            default="", description="Chatflow ID for the Flowise API."
        )

        # Configuration for Ollama
        OLLAMA_API_ENDPOINT: str = Field(
            default="http://host.docker.internal:11435",
            description="Base URL for the Ollama API endpoint.",
        )
        OLLAMA_API_KEY: Optional[str] = Field(
            default=None, description="API key for Ollama API auth."
        )
        OLLAMA_MODEL_ID: str = Field(
            default="llama3.2", description="Model ID for the Ollama API."
        )

        # Summarization Valve
        SUMMARIZATION_PROMPT_PREFIX: str = Field(
            default="Create a concise, 3-5 word title with an emoji as a title for the prompt in the given language.",
            description="Prefix that identifies a summarization request.",
        )

        # Common Settings
        emit_interval: float = Field(
            default=1.0, description="Interval between status emissions."
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable/disable status indicator."
        )
        request_timeout: int = Field(
            default=300, description="HTTP client timeout in seconds."
        )
        debug: bool = Field(
            default=True, description="Enable or disable debug logging."
        )

    def __init__(self):
        self.valves = self.Valves()
        self.stop_emitter = asyncio.Event()
        self.chat_sessions = (
            {}
        )  # Store chat sessions {user_id: {"chat_id": ..., "history": [...] }}

    def log(self, message: str):
        """Logs a message if debugging is enabled."""
        if self.valves.debug:
            print(f"[DEBUG] {message}")

    def clean_response_text(self, text: str) -> str:
        """
        Removes unnecessary surrounding quotes from the response.

        Handles cases where the response text may be wrapped in quotes.
        """
        self.log(f"Original text before cleaning: {text!r}")
        # Remove outer quotes if present
        while text.startswith('"') and text.endswith('"'):
            text = text[1:-1]
            self.log(f"Text after stripping quotes: {text!r}")
        cleaned_text = text.strip()
        self.log(f"Final cleaned text: {cleaned_text!r}")
        return cleaned_text

    def _get_latest_user_message(self, messages: List[Dict[str, str]]) -> Optional[str]:
        """
        Extracts the latest user message content from the messages list.
        Strips any "User: " prefix if present.
        """
        for message in reversed(messages):
            if message.get("role") == "user":
                content = message.get("content", "").strip()
                if content.startswith("User: "):
                    content = content[len("User: ") :].strip()
                    self.log(f"Stripped 'User: ' prefix. Content: {content}")
                else:
                    self.log(f"No 'User: ' prefix found. Content: {content}")
                if content:
                    self.log(f"Latest user question extracted: {content}")
                    return content
        self.log("No user message found in the messages.")
        return None

    def _get_combined_prompt(self, messages: List[Dict[str, str]]) -> str:
        """
        Combines user and assistant messages into a structured prompt.

        Example:
            User: hi
            Assistant: How can I assist you today?
            User: 5 words to describe ai
        """
        prompt_parts = [
            f"{message.get('role', 'user').capitalize()}: {message.get('content', '')}"
            for message in messages
        ]
        combined_prompt = "\n".join(prompt_parts)
        self.log(f"Combined prompt:\n{combined_prompt}")
        return combined_prompt

    async def emit_periodic_status(
        self,
        __event_emitter__: Optional[Callable[[dict], Awaitable[None]]],
        message: str,
        interval: float,
    ):
        """Periodically emit status updates."""
        start_time = time.time()
        try:
            while not self.stop_emitter.is_set():
                elapsed_time = time.time() - start_time
                await self.emit_status(
                    __event_emitter__,
                    "info",
                    f"{message} (elapsed: {elapsed_time:.1f}s)",
                    False,
                )
                await asyncio.sleep(interval)
        except asyncio.CancelledError:
            self.log("Periodic status emission cancelled.")

    async def emit_status(
        self,
        __event_emitter__: Optional[Callable[[dict], Awaitable[None]]],
        level: str,
        message: str,
        done: bool,
    ):
        """Emit status events."""
        if callable(__event_emitter__):
            event = {"type": "status", "data": {"description": message, "done": done}}
            self.log(f"Emitting status event: {event}")
            await __event_emitter__(event)
        else:
            self.log("No valid event emitter provided. Skipping event emission.")

    async def pipe(
        self,
        body: dict,
        __user__: Optional[dict] = None,
        __event_emitter__: Optional[Callable[[dict], Awaitable[None]]] = None,
    ) -> Union[str, Generator[str, None, None]]:
        """
        Main pipe method to handle routing.

        - Routes summarization requests to Ollama.
        - Routes all other requests to Flowise.
        """
        status_task = None
        start_time = time.time()

        try:
            # Emit periodic status if enabled
            if callable(__event_emitter__) and self.valves.enable_status_indicator:
                self.log("Starting periodic status emitter...")
                self.stop_emitter.clear()
                status_task = asyncio.create_task(
                    self.emit_periodic_status(
                        __event_emitter__,
                        "Processing request...",
                        self.valves.emit_interval,
                    )
                )
            else:
                self.log("No valid event emitter provided. Skipping periodic status.")

            # Extract messages and create the prompt
            messages = body.get("messages", [])
            self.log(f"Messages extracted: {messages}")

            if not messages:
                self.log("No messages found in the request body.")
                return "Error: No messages found."

            prompt = self._get_combined_prompt(messages)
            self.log(f"Prompt prepared: {prompt}")

            # Extract the latest user message
            latest_user_question = self._get_latest_user_message(messages)
            if not latest_user_question:
                self.log("No user message found in the messages.")
                return "Error: No user message found."

            # Determine if the request is for summarization
            if self.is_summarization_request(latest_user_question):
                self.log("Summarization request detected. Routing to Ollama.")
                # Delegate to Ollama API
                response = await self.handle_ollama_request(
                    latest_user_question, __user__, __event_emitter__
                )
            else:
                self.log("Regular request detected. Routing to Flowise.")
                # Delegate to Flowise API
                response = await self.handle_flowise_request(
                    latest_user_question, __user__, __event_emitter__
                )

            # Emit final status
            elapsed_time = time.time() - start_time
            await self.emit_status(
                __event_emitter__,
                "info",
                f"Pipe Completed in {elapsed_time:.1f}s",
                True,
            )

            return response

        except Exception as e:
            self.log(f"Error during pipe execution: {str(e)}")
            return f"Error: {e}"

        finally:
            if status_task:
                self.stop_emitter.set()
                await status_task

    def is_summarization_request(self, question: str) -> bool:
        """
        Determine if the request is for summarization based on the 'question' field.
        Checks if the question starts with the predefined summarization prompt prefix.
        """
        is_match = question.startswith(self.valves.SUMMARIZATION_PROMPT_PREFIX)
        self.log(f"Is summarization request (starts with prefix): {is_match}")
        return is_match

    async def handle_ollama_request(
        self,
        question: str,
        __user__: Optional[dict],
        __event_emitter__: Optional[Callable[[dict], Awaitable[None]]],
    ) -> Union[str, Generator[str, None, None]]:
        """
        Handles summarization requests by sending them to Ollama.
        """
        try:
            # Prepare the payload for Ollama
            payload = {
                "prompt": question,
                "model": self.valves.OLLAMA_MODEL_ID,  # Ensure this matches Ollama's API requirements
                # Add other necessary fields as per Ollama's API requirements
            }

            self.log(f"Payload for Ollama: {payload}")

            # Construct the full Ollama API URL by appending '/v1/completions'
            url = f"{self.valves.OLLAMA_API_ENDPOINT.rstrip('/')}/v1/completions"
            headers = {"Content-Type": "application/json"}

            # Handle authentication if provided
            if self.valves.OLLAMA_API_KEY:
                headers["Authorization"] = f"Bearer {self.valves.OLLAMA_API_KEY}"
                self.log("Ollama authentication enabled.")

            async with aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=self.valves.request_timeout)
            ) as session:
                async with session.post(url, json=payload, headers=headers) as response:
                    response_text = await response.text()
                    self.log(f"Ollama response status: {response.status}")
                    self.log(f"Ollama response text: {response_text}")

                    if response.status != 200:
                        self.log(
                            f"Ollama API call failed with status: {response.status}"
                        )
                        return f"Error: Ollama API call failed with status {response.status}"

                    # Extract and clean the response text
                    try:
                        data = json.loads(response_text)
                        self.log(f"Parsed Ollama response data: {data}")
                    except json.JSONDecodeError:
                        self.log("Failed to decode JSON from Ollama's response.")
                        return "Error: Invalid JSON response from Ollama."

                    # Extract the generated text from 'choices'
                    choices = data.get("choices", [])
                    if not choices:
                        self.log("No choices found in Ollama's response.")
                        return "Error: No choices found in Ollama's response."

                    first_choice = choices[0]
                    raw_text = first_choice.get("text", "")
                    self.log(f"Raw text from first choice: {raw_text!r}")

                    text = self.clean_response_text(raw_text)

                    if not text:
                        self.log("No valid text found in Ollama's response.")
                        return "Error: Empty response from Ollama."

                    self.log(f"Extracted text from Ollama: {text!r}")

                    # Optionally, update chat session if needed
                    # For example, if Ollama returns a new chat_id
                    new_chat_id = data.get("chat_id")
                    if new_chat_id:
                        user_id = (
                            __user__.get("user_id", "default_user")
                            if __user__
                            else "default_user"
                        )
                        if user_id not in self.chat_sessions:
                            self.chat_sessions[user_id] = {
                                "chat_id": None,
                                "history": [],
                            }
                        self.chat_sessions[user_id]["chat_id"] = new_chat_id

                    return text

        except Exception as e:
            self.log(f"Error during Ollama request handling: {str(e)}")
            return f"Error: {e}"

    async def handle_flowise_request(
        self,
        question: str,
        __user__: Optional[dict],
        __event_emitter__: Optional[Callable[[dict], Awaitable[None]]],
    ) -> Union[str, Generator[str, None, None]]:
        """
        Handles regular requests by sending them to Flowise.
        """
        try:
            # Prepare the payload for Flowise
            payload = {"question": question}
            # Include chatId if it exists in the session
            user_id = (
                __user__.get("user_id", "default_user") if __user__ else "default_user"
            )
            chat_session = self.chat_sessions.get(user_id, {})
            chat_id = chat_session.get("chat_id")
            if chat_id:
                payload["chatId"] = chat_id

            self.log(f"Payload for Flowise: {payload}")

            # Send the request to Flowise API
            endpoint = self.valves.FLOWISE_API_ENDPOINT.rstrip("/")
            url = f"{endpoint}/api/v1/prediction/{self.valves.FLOWISE_CHATFLOW_ID}"
            headers = {"Content-Type": "application/json"}

            # Handle authentication if provided
            auth = None
            if self.valves.FLOWISE_USERNAME and self.valves.FLOWISE_PASSWORD:
                auth = aiohttp.BasicAuth(
                    self.valves.FLOWISE_USERNAME, self.valves.FLOWISE_PASSWORD
                )
                self.log("Flowise authentication enabled.")

            async with aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=self.valves.request_timeout),
                auth=auth,
            ) as session:
                async with session.post(url, json=payload, headers=headers) as response:
                    response_text = await response.text()
                    self.log(f"Flowise response status: {response.status}")
                    self.log(f"Flowise response text: {response_text}")

                    if response.status != 200:
                        self.log(
                            f"Flowise API call failed with status: {response.status}"
                        )
                        return f"Error: Flowise API call failed with status {response.status}"

                    # Extract and clean the response text
                    try:
                        data = json.loads(response_text)
                        self.log(f"Parsed Flowise response data: {data}")
                    except json.JSONDecodeError:
                        self.log("Failed to decode JSON from Flowise's response.")
                        return "Error: Invalid JSON response from Flowise."

                    raw_text = data.get("text", "")
                    text = self.clean_response_text(raw_text)
                    new_chat_id = data.get("chatId", chat_id)

                    if not text:
                        self.log("No valid text found in Flowise's response.")
                        return "Error: Empty response from Flowise."

                    self.log(f"Extracted text from Flowise: {text!r}")
                    self.log(f"New chat ID from Flowise: {new_chat_id}")

                    # Update chat session
                    if user_id not in self.chat_sessions:
                        self.chat_sessions[user_id] = {"chat_id": None, "history": []}
                    self.chat_sessions[user_id]["chat_id"] = new_chat_id

                    # Append to chat history
                    self.chat_sessions[user_id]["history"].append(
                        {"role": "assistant", "content": text}
                    )

                    return text

        except Exception as e:
            self.log(f"Error during Flowise request handling: {str(e)}")
            return f"Error: {e}"