"""
title: Mixture of Expert Agents
author: techelpr
version: 1.0
required_open_webui_version: 0.3.9
"""
from pydantic import BaseModel, Field
from typing import Optional, List
import requests
import json
class Filter:
class Valves(BaseModel):
"""
Define the default values for each valve.
"""
models: List[str] = Field(
default=[], description="List of models to use in the MoEA architecture."
)
openai_api_base: str = Field(
default="http://host.docker.internal:11434/v1",
description="Base URL for Ollama API.",
)
num_layers: int = Field(default=1, description="Number of MoEA layers.")
def __init__(self):
"""
Initialize the Filter object.
"""
self.valves = self.Valves()
# Function: Inlet
# Description: Processes incoming messages and applies the Multi-Agent architecture.
def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""
Process incoming messages and apply the Multi-Agent architecture.
Args:
body (dict): The message to be processed.
user (Optional[dict], optional): User information. Defaults to None.
Returns:
dict: The processed message.
"""
messages = body.get("messages", [])
if messages:
last_message = messages[-1]["content"]
moa_response = self.moa_process(last_message)
body["messages"][-1]["content"] = moa_response
return body
# Function: Agent Prompt
# Description: Create a prompt for the agents and the aggregator.
def agent_prompt(self, original_prompt: str, previous_responses: List[str]) -> str:
"""
Create a prompt for the agents and the aggregator.
Args:
original_prompt (str): The original prompt.
previous_responses (List[str]): Previous responses from agents.
Returns:
str: The prompt for the agent or aggregator.
"""
return f"*Internal Thoughts:* \n{previous_responses}\n\n*Prompt:* \n{original_prompt}"
# Function: MoA Process
# Description: Applies the Multi-Agent architecture to a given prompt.
def moa_process(self, prompt: str) -> str:
layer_outputs = []
if not self.valves.models or not self.valves.openai_api_base:
return "Error: Required valve(s) not set."
for layer in range(self.valves.num_layers):
current_layer_outputs = []
layer_agents = self.valves.models
for agent in layer_agents:
if layer == 0:
instruct_prompt = prompt
else:
instruct_prompt = self.agent_prompt(prompt, layer_outputs[-1])
response = self.query_ollama(agent, instruct_prompt)
current_layer_outputs.append(response)
layer_outputs.append(current_layer_outputs)
# Simplify agent prompts and combine responses into a single dataset
merged_responses = []
for layer_responses in layer_outputs:
merged_responses.extend(layer_responses)
# Create a final response for the requesting model
final_prompt = "*Guiding Principles:*\n"
final_prompt += "Consider each internal thought as a potential piece of information to incorporate into my response.\n"
final_prompt += "The internal thoughts provided are for your use only, and should never be referenced explicitly or mentioned in your response, unless directed by the prompt.\n"
final_prompt += "My goal is to provide a complete and detailed reply that addresses the original prompt and incorporates relevant information in a seamless manner.\n\n"
final_prompt += self.agent_prompt(prompt, merged_responses)
return final_prompt
# Function: Query Ollama
# Description: Queries the Ollama API for a given model and prompt.
def query_ollama(self, model: str, prompt: str) -> str:
"""
Query the Ollama API for a given model and prompt.
Args:
model (str): The model to query.
prompt (str): The prompt to be queried.
Returns:
str: The response from the Ollama API.
"""
try:
url = f"{self.valves.openai_api_base}/chat/completions"
headers = {"Content-Type": "application/json"}
data = {"model": model, "messages": [{"role": "user", "content": prompt}]}
response = requests.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except requests.exceptions.RequestException as e:
return f"Error: Unable to query model {model}"