"""
title: Dilligent LLM
description: LLMs usually demonstrate poor performance when the context window is largely occupied. This pipe is designed to handle long context by splitting the context into multiple chunks, and then concatenating the results.
authors: yage
author_url: https://github.com/grapoet/
funding_url: https://github.com/open-webui
version: 0.0.2
required_open_webui_version: 0.3.17
required: openai, tiktoken
license: MIT
"""
import requests
import asyncio
from typing import List, Union, Generator, Iterator, Tuple
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
from open_webui.utils.misc import pop_system_message
from time import time
import tiktoken
class Pipe:
class Valves(BaseModel):
base_url: str = Field(
# Default is OpenAI's API
default="https://api.openai.com/v1",
# Use this if you are running Ollama locally outside Docker. Use localhost if not using Docker.
# default="http://host.docker.internal:11434/v1",
# Use this if you are running a vLLM server outside Docker.
# default="http://192.168.180.137:8006/v1",
description="The base URL for the LLM. It could be OpenAI or Ollama or vLLM.",
)
api_key: str = Field(
default="Put your OpenAI API key here.",
description="The API key to use for the LLM.",
)
my_model_id: str = Field(
# default="qwen2.5:32b",
# default="Qwen/Qwen2.5-32B-Instruct-AWQ",
default="gpt-4o",
description="The model id to use for the pipe.",
)
message_token_limit: int = Field(
default=2000,
description="The maximum number of tokens in a chunk, used when splitting the messages into chunks.",
)
parallel_requests: bool = Field(
default=False,
description="Whether to send multiple requests in parallel. It will make the response appear slower, but it will be much faster after the first chunk.",
)
def __init__(self):
self.type = "pipe"
self.id = "yage/"
self.name = "Long-Context"
self.valves = Pipe.Valves()
self.encoding = tiktoken.encoding_for_model('gpt-4')
self.split_template = """Because the content is too long, let's divide and conquer. Your answers will be used to concatenate together to form the final answer. So if your part is the first part of the final answer, don't add any explanation after the answer. If your part is the last part of the final answer, don't put anything before the answer. For other parts, don't explain and only answer the question.
Focus on this part:
{content}"""
def pipes(self) -> List[dict]:
return [
{"id": 'yage/yage-long-context', "name": f'/{self.valves.my_model_id}'}
]
async def pipe(self, body: dict) -> Union[str, Generator, Iterator]:
self.client = AsyncOpenAI(base_url=self.valves.base_url, api_key=self.valves.api_key)
system_message, messages = pop_system_message(body["messages"])
processed_messages, last_message_token_count = self.parse_messages(messages)
# If the token count is greater than 4096, it's more likely that the output quality will be worse. We will split the messages into chunks.
print(f'last message token count: {last_message_token_count}')
if last_message_token_count > self.valves.message_token_limit:
print(f'Token count is greater than {self.valves.message_token_limit}, splitting the messages into chunks.')
all_messages = await self.split_messages(processed_messages)
else:
all_messages = [processed_messages]
print(f'all_messages:')
for messages in all_messages:
print(f'messages: {messages}')
if self.valves.parallel_requests:
tasks = []
for processed_messages in all_messages:
payload = {
"model": self.valves.my_model_id,
"messages": processed_messages,
"max_tokens": body.get("max_tokens", 4096),
"temperature": body.get("temperature", 0.8),
"stream": True
}
task = asyncio.create_task(self.client.chat.completions.create(**payload))
tasks.append(task)
for task in tasks:
buffer = ''
initial_time = time()
is_accumulating = True
async for chunk in await task:
if not chunk.choices:
continue
delta_content = chunk.choices[0].delta.content
if delta_content is None:
continue
if is_accumulating:
buffer += delta_content
if time() - initial_time > 1:
is_accumulating = False
yield buffer
buffer = ''
else:
yield delta_content
if buffer:
yield buffer
yield '\n'
else:
for processed_messages in all_messages:
payload = {
"model": self.valves.my_model_id,
"messages": processed_messages,
"max_tokens": body.get("max_tokens", 4096),
"temperature": body.get("temperature", 0.8),
"stream": body.get("stream", False),
}
if body.get("stream", False):
async for chunk in self.stream_response(payload):
yield chunk
yield '\n'
else:
yield await self.non_stream_response(payload)
def parse_messages(self, messages) -> Tuple[List[dict], int]:
"""
Parse the messages to conform to the LLM's expected format.
Returns:
processed_messages: List[dict] - The processed messages.
last_message_token_count: int - The token count of the last message.
"""
processed_messages = []
last_message_token_count = 0
for message in messages:
processed_content = []
if isinstance(message.get("content"), list):
for item in message["content"]:
if item["type"] == "text":
processed_content.append({"type": "text", "text": item["text"]})
else:
processed_content = [
{"type": "text", "text": message.get("content", "")}
]
processed_messages.append(
{"role": message["role"], "content": processed_content[0]["text"]}
)
last_message_token_count = len(self.encoding.encode(processed_messages[-1].get("content", "")))
return processed_messages, last_message_token_count
async def stream_response(self, payload) -> Generator:
try:
stream = await self.client.chat.completions.create(
**payload,
)
async for chunk in stream:
if not chunk.choices:
continue
delta_content = chunk.choices[0].delta.content
if delta_content is not None:
yield delta_content
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
yield f"Error: Request failed: {e}"
except Exception as e:
print(f"General error in stream_response method: {e}")
yield f"Error: {e}"
async def non_stream_response(self, payload) -> str:
try:
response = await self.client.chat.completions.create(
**payload,
)
return response.choices[0].message.content if response.choices else ""
except requests.exceptions.RequestException as e:
print(f"Failed non-stream request: {e}")
return f"Error: {e}"
async def split_messages(self, messages) -> List[List[dict]]:
"""
Splits the latest message into blocks that do not exceed the message_token_limit.
Returns:
List[List[dict]] - A list of lists where each sublist contains the previous messages plus one block of the split latest message.
"""
last_message = messages[-1]
# Split the content of the last message into paragraphs (lines)
lines = last_message["content"].split('\n')
blocks = []
current_block_content = ""
current_block_token_count = 0
for line in lines:
line_token_count = len(self.encoding.encode(line))
if current_block_token_count + line_token_count > self.valves.message_token_limit:
# If adding the new line exceeds the limit, finalize the current block
blocks.append(messages + [{
"role": last_message["role"],
"content": self.split_template.format(content=current_block_content)
}])
current_block_content = ""
current_block_token_count = 0
# Add the line to the current block
current_block_content += line + '\n'
current_block_token_count += line_token_count
# Append any remaining content in the last block
if current_block_content:
blocks.append(messages + [{
"role": last_message["role"],
"content": self.split_template.format(content=current_block_content)
}])
return blocks