Function
pipe
v0.1.0
AnythingLLM Pipe
OpenAI Compatible Endpoints Pipe for AnythingLLM
Function ID
anythingllm_pipe
Creator
@halu
Downloads
50+

Function Content
python
"""
title: OpenAI Compatible Endpoints Pipe for AnythingLLM
author: halu
version: 0.1.0
license: MIT
last_updated:2024-11-19
description:
This Python code serves as a pipe to connect OpenAI Compatible Endpoints with the AnythingLLM API, 
enabling seamless interaction and utilization of the services provided by AnythingLLM within an OpenAI-like framework.
Additionally, it realizes the streaming/non-streaming information transmission between AnythingLLM and the Open WebUI by the way of OpenAI Compatible Endpoints here.
"""

import os
import json
from pydantic import BaseModel, Field
import requests
import time
from typing import List, Union, Iterator

# Set DEBUG to True to enable detailed logging
DEBUG = False


class Pipe:
    class Valves(BaseModel):
        AnythingLLM_url: str = Field(
            default="http://host.docker.internal:3001/api/v1/openai/chat/completions"
        )  # Your AnythingLLM_url
        AnythingLLM_API_KEY: str = Field(default="")  # Your AnythingLLM API key
        DEFAULT_MODEL: str = Field(default="")  # Your model(workspace_name)

    def __init__(self):
        self.id = "finance"
        self.type = "manifold"
        self.name = "MyFinance: "
        self.valves = self.Valves(
            **{
                "AnythingLLM_url": os.getenv(
                    "AnythingLLM_url",
                    "http://host.docker.internal:3001/api/v1/openai/chat/completions",
                ),
                "AnythingLLM_API_KEY": os.getenv("AnythingLLM_API_KEY", ""),
                "DEFAULT_MODEL": os.getenv("AnythingLLM_DEFAULT_MODEL", ""),
            }
        )

    def get_AnythingLLM_models(self):
        """Return available models - for AnythingLLM we'll return a fixed list"""
        return [{"id": "finance", "name": "finance"}]

    def pipes(self) -> List[dict]:
        return self.get_AnythingLLM_models()

    def pipe(self, body: dict) -> Union[str, Iterator[str]]:
        try:
            # Use default model ID since AnythingLLM has a single endpoint
            model_id = self.valves.DEFAULT_MODEL

            messages = []

            # OpenAI Compatible Endpoints
            # Process messages including system, user, and assistant messages
            for message in body["messages"]:
                if isinstance(message.get("content"), list):
                    # For OpenAI, we'll join multiple content parts into a single text
                    text_parts = []
                    for content in message["content"]:
                        if content["type"] == "text":
                            text_parts.append(content["text"])
                        elif content["type"] == "image_url":
                            # OpenAI might not support image inputs - add a note about the image
                            text_parts.append(f"[Image: {content['image_url']['url']}]")
                    messages.append(
                        {"role": message["role"], "content": " ".join(text_parts)}
                    )
                else:
                    # Handle simple text messages
                    messages.append(
                        {"role": message["role"], "content": message["content"]}
                    )

            if DEBUG:
                print("OpenAI API request:")
                print("  Model:", model_id)
                print("  Messages:", json.dumps(messages, indent=2))

            url = self.valves.AnythingLLM_url

            headers = {
                "accept": "*/*",
                "Authorization": f"Bearer {self.valves.AnythingLLM_API_KEY}",
                "Content-Type": "application/json",
            }

            # Prepare the API call parameters
            data = {
                "messages": messages,
                "model": model_id,
                "stream": body.get("stream", True),
                "temperature": body.get("temperature", 0.7),
            }

            # Add stop sequences if provided
            if body.get("stop"):
                data["stop"] = body["stop"]

            if body.get("stream", False):
                return self.stream_response(url, headers, data)
            else:
                response = requests.post(url, headers=headers, json=data)
                return json.loads(response.text)["choices"][0]["message"].get("content")

        except Exception as e:
            if DEBUG:
                print(f"Error in pipe method: {e}")
            return f"Error: {e}"

    # streaming requests
    def stream_response(self, url, headers, data):
        try:
            # Send streaming requests
            with requests.post(
                url,
                headers=headers,
                json=data,
                stream=True,
                timeout=(2, 30),
                verify=False,
            ) as response:
                # Check the response status
                if response.status_code != 200:
                    raise Exception(
                        f"HTTP Error {response.status_code}: {response.text}"
                    )

                # Iterate on the response flow
                for line in response.iter_lines():
                    if line:
                        line = line.decode("utf-8")  # Decode the response content
                        if line.startswith("data: "):
                            try:
                                # Parsing JSON data
                                data = json.loads(line[6:])

                                # If there is content, extract and return
                                if data["choices"][0]["delta"].get("content"):
                                    yield data["choices"][0]["delta"]["content"]

                                time.sleep(0.01)  # Control the frequency of requests
                            except json.JSONDecodeError:
                                print(f"Failed to parse JSON: {line}")
                            except KeyError as e:
                                print(f"Unexpected data structure: {e}")
                                print(f"Full data: {data}")
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            yield f"Error: Request failed: {e}"
        except Exception as e:
            print(f"General error in stream_response method: {e}")
            yield f"Error: {e}"