"""
title: CoT Augmented Smart LLM
authors: yage
description: This pipe significantly augments the intelligence of an LLM with CoT (Chain of Thought) reasoning and context window management. It first generates an outline for the input, and then answers each part of the outline separately.
author_url: https://github.com/grapoet/
funding_url: https://github.com/open-webui
version: 0.0.2
required_open_webui_version: 0.3.17
required: openai
license: MIT
"""
import json
from time import time
from typing import List, Union, Generator, Iterator, Tuple
from pydantic import BaseModel, Field
import asyncio
from openai import AsyncOpenAI
from open_webui.utils.misc import pop_system_message
DECOMPOSE_PROMPT_TEMPLATE = """Here is the background information:
{background}
Your task is "{system_prompt}"
Overall you need to respond to the following requests:
{overall_prompts}
But we will divide and conquer, and do one at a time. The result will be concatenated together in the end. Treat each part as a second level section. And organize the response in Markdown format accordingly.
Now focus on this part:
{current_prompt}
Bear in mind that we will cover other parts in other turns. So be considerate of not repeating information. Try to be specific, detailed, and insightful. Give deep and thought provoking answers.
Now answer the question: {system_prompt} focusing on the part: {current_prompt}. Answer in the same language as the question. Don't wrap the answer in any code blocks.
"""
class Pipe:
class Valves(BaseModel):
base_url: str = Field(
# Use this if you are running Ollama locally outside Docker. Use localhost if not using Docker.
# default="http://host.docker.internal:11434/v1",
# Use this if you are running a vLLM server outside Docker.
# default="http://192.168.180.137:8006/v1",
# Default is OpenAI's API
default="https://api.openai.com/v1",
description="The base URL for the LLM. It could be OpenAI or Ollama or vLLM.",
)
api_key: str = Field(
default="Put your OpenAI API key here.",
description="The API key to use for the LLM.",
)
my_model_id: str = Field(
# default="qwen2.5:32b",
# default="Qwen/Qwen2.5-32B-Instruct-AWQ",
default="gpt-4o",
description="The model id to use for the pipe.",
)
def __init__(self):
self.type = "manifold"
self.id = "yage/"
self.name = "CoT Augmented"
self.valves = Pipe.Valves()
self.split_template = """Because the content is too long, let's divide and conquer. Your answers will be used to concatenate together to form the final answer. So if your part is the first part of the final answer, don't add any explanation after the answer. If your part is the last part of the final answer, don't put anything before the answer. For other parts, don't explain and only answer the question.
Focus on this part:
{content}"""
def pipes(self) -> List[dict]:
return [{"id": "yage/yage-outline", "name": f"/{self.valves.my_model_id}"}]
async def pipe(
self,
body: dict,
__event_emitter__=None,
) -> Union[str, Generator, Iterator]:
self.client = AsyncOpenAI(
base_url=self.valves.base_url, api_key=self.valves.api_key
)
system_message, messages = pop_system_message(body["messages"])
await __event_emitter__(
{
"type": "status",
"data": {"description": "Generating the outline", "done": False},
}
)
processed_messages, flattened_messages = self.parse_messages(messages)
flattened_last_message = flattened_messages.split("\n\n")[-1]
# Generate the outline for the whole content
actual_prompt = f"""{flattened_messages}
But don't answer it yet. Before you answer the question, your task is to think about to answer it systematically, what key aspects or questions would you need to cover? Don't answer the question, just list out the questions in markdown. The questions could be there because they are important, or deep and insightful, or thought provoking. And they could have hierarchical structures. Arrange them in the order of logical flow and make them convincing and easy to digest.
Now list out the key questions in markdown in the same language as the question. Hierarchical structures are allowed. Output in JSON format, with the following fields:
- tasks: a list of sub-questions, each as a string. It's OK for you to include escaped newlines (\\n)in the sub-question string, e.g. using markdown format including hierarchical structures. The rule of thumb is that each task should be able to be explained clearly within 500 words.
Here is the question:
{flattened_last_message}"""
tasks_response = await self.query_llm_server_async(
actual_prompt, self.valves.my_model_id
)
try:
tasks = json.loads(
tasks_response[tasks_response.find("{") : tasks_response.rfind("}") + 1]
)["tasks"]
except Exception as e:
print(f"Error parsing tasks response: {e}")
tasks = []
tasks_text = "# Outline\n\n" + "\n\n".join(tasks) + "\n\n # Actual answer: \n\n"
await __event_emitter__(
{
"type": "message",
"data": {"content": tasks_text},
}
)
await __event_emitter__(
{
"type": "status",
"data": {"description": "Generating the answers...", "done": True},
}
)
# Batch query the LLM server to outline of each part
responses = []
async for chunk in self.decompose_batch_query_llm_server_async(
system_prompt=flattened_last_message,
background=flattened_messages,
prompts=tasks,
model_name=self.valves.my_model_id,
):
if body.get("stream", False):
yield chunk
else:
responses.append(chunk)
if not body.get("stream", False):
yield " ".join(responses)
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True},
}
)
def parse_messages(self, messages) -> Tuple[List[dict], str]:
"""
Parse the messages to conform to the LLM's expected format.
Returns:
processed_messages: List[dict] - The processed messages.
"""
processed_messages = []
for message in messages:
processed_content = []
if isinstance(message.get("content"), list):
for item in message["content"]:
if item["type"] == "text":
processed_content.append({"type": "text", "text": item["text"]})
else:
processed_content = [
{"type": "text", "text": message.get("content", "")}
]
processed_messages.append(
{"role": message["role"], "content": processed_content[0]["text"]}
)
flattened_messages = "\n\n".join(
[
f'{message["role"]}: {message["content"]}'
for message in processed_messages
]
)
return processed_messages, flattened_messages
async def query_llm_server_async(
self, prompt: str, model_name: str, **kwargs
) -> str:
"""
Asynchronously queries a local LLM server with a given prompt and returns the response.
This version is not streaming.
"""
response = ""
async for chunk in self.query_llm_server_async_streaming(
prompt, model_name, **kwargs
):
response += chunk
return response
async def query_llm_server_async_streaming(
self, prompt: str, model_name: str, **kwargs
) -> str:
"""
Queries a local LLM server with a given prompt and returns the response.
Args:
base_url (str): The URL of the LLM server.
prompt (str): The input prompt for the LLM.
model_name (str): The name of the LLM model to use.
**kwargs: Additional keyword arguments to pass to the OpenAI API.
Returns:
str: The response from the LLM server.
"""
response = await self.client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": prompt}],
stream=True,
**kwargs,
)
async for chunk in response:
yield chunk.choices[0].delta.content or ""
async def decompose_batch_query_llm_server_async(
self,
system_prompt: str,
background: str,
prompts: List[str],
model_name: str,
) -> str:
"""
Asynchronously queries a local LLM server with a batch of prompts and returns the responses.
"""
overall_prompts = "\n\n".join(prompts)
# Create a list of tasks for each prompt
tasks = []
for prompt in prompts:
actual_prompt = DECOMPOSE_PROMPT_TEMPLATE.format(
system_prompt=system_prompt,
overall_prompts=overall_prompts,
current_prompt=prompt,
background=background if background is not None else "None",
)
task = asyncio.create_task(
self.client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": actual_prompt}],
stream=True,
)
)
tasks.append(task)
responses = []
for task in tasks:
buffer = ""
initial_time = time()
is_accumulating = True
async for chunk in await task:
if not chunk.choices:
continue
delta_content = chunk.choices[0].delta.content
if delta_content is None:
continue
if is_accumulating:
buffer += delta_content
if time() - initial_time > 1:
is_accumulating = False
yield buffer
buffer = ""
else:
yield delta_content
if buffer:
yield buffer
yield "\n"