"""
title: OpenAI Compatible Endpoints Pipe for AnythingLLM
author: halu
version: 0.1.0
license: MIT
last_updated:2024-11-19
description:
This Python code serves as a pipe to connect OpenAI Compatible Endpoints with the AnythingLLM API,
enabling seamless interaction and utilization of the services provided by AnythingLLM within an OpenAI-like framework.
Additionally, it realizes the streaming/non-streaming information transmission between AnythingLLM and the Open WebUI by the way of OpenAI Compatible Endpoints here.
"""
import os
import json
from pydantic import BaseModel, Field
import requests
import time
from typing import List, Union, Iterator
# Set DEBUG to True to enable detailed logging
DEBUG = False
class Pipe:
class Valves(BaseModel):
AnythingLLM_url: str = Field(
default="http://host.docker.internal:3001/api/v1/openai/chat/completions"
) # Your AnythingLLM_url
AnythingLLM_API_KEY: str = Field(default="") # Your AnythingLLM API key
DEFAULT_MODEL: str = Field(default="") # Your model(workspace_name)
def __init__(self):
self.id = "finance"
self.type = "manifold"
self.name = "MyFinance: "
self.valves = self.Valves(
**{
"AnythingLLM_url": os.getenv(
"AnythingLLM_url",
"http://host.docker.internal:3001/api/v1/openai/chat/completions",
),
"AnythingLLM_API_KEY": os.getenv("AnythingLLM_API_KEY", ""),
"DEFAULT_MODEL": os.getenv("AnythingLLM_DEFAULT_MODEL", ""),
}
)
def get_AnythingLLM_models(self):
"""Return available models - for AnythingLLM we'll return a fixed list"""
return [{"id": "finance", "name": "finance"}]
def pipes(self) -> List[dict]:
return self.get_AnythingLLM_models()
def pipe(self, body: dict) -> Union[str, Iterator[str]]:
try:
# Use default model ID since AnythingLLM has a single endpoint
model_id = self.valves.DEFAULT_MODEL
messages = []
# OpenAI Compatible Endpoints
# Process messages including system, user, and assistant messages
for message in body["messages"]:
if isinstance(message.get("content"), list):
# For OpenAI, we'll join multiple content parts into a single text
text_parts = []
for content in message["content"]:
if content["type"] == "text":
text_parts.append(content["text"])
elif content["type"] == "image_url":
# OpenAI might not support image inputs - add a note about the image
text_parts.append(f"[Image: {content['image_url']['url']}]")
messages.append(
{"role": message["role"], "content": " ".join(text_parts)}
)
else:
# Handle simple text messages
messages.append(
{"role": message["role"], "content": message["content"]}
)
if DEBUG:
print("OpenAI API request:")
print(" Model:", model_id)
print(" Messages:", json.dumps(messages, indent=2))
url = self.valves.AnythingLLM_url
headers = {
"accept": "*/*",
"Authorization": f"Bearer {self.valves.AnythingLLM_API_KEY}",
"Content-Type": "application/json",
}
# Prepare the API call parameters
data = {
"messages": messages,
"model": model_id,
"stream": body.get("stream", True),
"temperature": body.get("temperature", 0.7),
}
# Add stop sequences if provided
if body.get("stop"):
data["stop"] = body["stop"]
if body.get("stream", False):
return self.stream_response(url, headers, data)
else:
response = requests.post(url, headers=headers, json=data)
return json.loads(response.text)["choices"][0]["message"].get("content")
except Exception as e:
if DEBUG:
print(f"Error in pipe method: {e}")
return f"Error: {e}"
# streaming requests
def stream_response(self, url, headers, data):
try:
# Send streaming requests
with requests.post(
url,
headers=headers,
json=data,
stream=True,
timeout=(2, 30),
verify=False,
) as response:
# Check the response status
if response.status_code != 200:
raise Exception(
f"HTTP Error {response.status_code}: {response.text}"
)
# Iterate on the response flow
for line in response.iter_lines():
if line:
line = line.decode("utf-8") # Decode the response content
if line.startswith("data: "):
try:
# Parsing JSON data
data = json.loads(line[6:])
# If there is content, extract and return
if data["choices"][0]["delta"].get("content"):
yield data["choices"][0]["delta"]["content"]
time.sleep(0.01) # Control the frequency of requests
except json.JSONDecodeError:
print(f"Failed to parse JSON: {line}")
except KeyError as e:
print(f"Unexpected data structure: {e}")
print(f"Full data: {data}")
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
yield f"Error: Request failed: {e}"
except Exception as e:
print(f"General error in stream_response method: {e}")
yield f"Error: {e}"