We're Hiring!
Whitepaper
Docs
Sign In
@haervwe
ยท
2 months ago
ยท
a year ago
function
Letta Agent
Get
Last Updated
2 months ago
Created
a year ago
Function
pipe
v0.5.0
Name
Letta Agent
Downloads
315+
Saves
0+
Description
Pipe to interact with Letta (memgpt) agents
Function Code
Show
""" title: Letta_Agent_Connector author: Haervwe author_url: https://github.com/Haervwe/open-webui-tools version: 0.5.0 description: A pipe to connect with Letta agents, enabling seamless integration of autonomous agents into Open WebUI conversations. Supports task-specific processing, real-time Server-Sent Events (SSE) streaming for instant message emission, tool call handling with collapsible details blocks, configurable agent names, and maintains conversation context while communicating with the agent API. Messages now stream in real-time as they're generated by the agent. """ import logging from typing import Dict, List from pydantic import BaseModel, Field import aiohttp import json from open_webui.constants import TASKS from open_webui.main import generate_chat_completions from open_webui.models.users import Users import asyncio import time name = "Letta Agent" def setup_logger(): logger = logging.getLogger(name) if not logger.handlers: logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.set_name(name) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) handler.setFormatter(formatter) logger.addHandler(handler) logger.propagate = False return logger logger = setup_logger() class Pipe: class Valves(BaseModel): Agent_ID: str = Field( default="agent-id", description="The ID of the Letta agent to communicate with", ) Agent_Name: str = Field( default="Letta Agent", description="Display name of the agent (shown in status messages)", ) API_URL: str = Field( default="http://localhost:8283", description="Base URL for the Letta agent API", ) API_Token: str = Field( default="", description="Bearer token for API authentication" ) Task_Model: str = Field( default="", description="Model to use for title/tags generation tasks. If empty, uses the default model.", ) Custom_Name: str = Field( default="", description="Custom name for the pipe (if empty, uses 'Letta Agent')", ) Timeout: int = Field( default=400, description="Timeout to wait for Letta agent response in seconds", ) def __init__(self): self.type = "manifold" self.conversation_history = [] self.valves = self.Valves() def pipes(self) -> List[Dict[str, str]]: pipe_name = self.valves.Custom_Name if self.valves.Custom_Name != "" else name return [ { "id": f"{name}-pipe", "name": f"{pipe_name} Pipe", } ] async def emit_message(self, message: str): await self.__current_event_emitter__( {"type": "message", "data": {"content": message}} ) async def emit_status(self, level: str, message: str, done: bool): await self.__current_event_emitter__( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) async def format_messages( self, messages: List[Dict[str, str]] ) -> List[Dict[str, str]]: """Format messages according to the Letta API specification.""" formatted_messages = [] for msg in messages: # Only include supported roles if msg.get("role") not in ["user", "system"]: continue formatted_msg = { "role": msg.get("role", "user"), "content": msg.get("content", ""), } formatted_messages.append(formatted_msg) # Ensure we have at least one message if not formatted_messages: formatted_messages.append({"role": "user", "content": "Hello"}) logger.debug(f"Formatted messages: {json.dumps(formatted_messages, indent=2)}") return formatted_messages async def get_letta_response(self, message: Dict[str, str]) -> str: """ Send the user message and stream the response in real-time. Uses Server-Sent Events (SSE) streaming endpoint for progressive message emission. """ start_time = time.monotonic() headers = { "Authorization": f"Bearer {self.valves.API_Token}", "Content-Type": "application/json", "Accept": "text/event-stream", } data = {"messages": [message]} # Use the streaming endpoint for real-time responses url = f"{self.valves.API_URL}/v1/agents/{self.valves.Agent_ID}/messages/stream" timeout = aiohttp.ClientTimeout(total=self.valves.Timeout) async with aiohttp.ClientSession(timeout=timeout) as session: # Make the POST request to the streaming endpoint async with session.post(url, headers=headers, json=data) as response: if response.status == 422: text = await response.text() logger.error(f"API Validation Error. Response: {text}") raise ValueError(f"API Validation Error: {text}") response.raise_for_status() # Process Server-Sent Events (SSE) stream async for line in response.content: line = line.decode('utf-8').strip() # Skip empty lines and comments if not line or line.startswith(':'): continue # SSE format: "data: {json}" if line.startswith('data: '): data_str = line[6:] # Remove "data: " prefix # Check for end of stream if data_str == '[DONE]': logger.debug("Stream completed") break try: chunk = json.loads(data_str) elapsed = int(time.monotonic() - start_time) msg_type = chunk.get("message_type") logger.debug(f"Received SSE chunk type: {msg_type}") if msg_type == "reasoning_message": reasoning = chunk.get("reasoning", "").strip() if reasoning: header = f"Thought for {elapsed} seconds" details = ( f"<details>\n" f"<summary>{header}</summary>\n\n" f"> {reasoning}\n\n" "</details>\n\n" ) await self.emit_message(details) elif msg_type == "tool_call_message": # Process tool call tool_call = chunk.get("tool_call", {}) tool_name = tool_call.get("name", chunk.get("tool_name", "unknown_tool")) tool_args = tool_call.get("arguments", chunk.get("tool_arguments", {})) # Parse arguments if they're a JSON string if isinstance(tool_args, str): try: tool_args = json.loads(tool_args) except (json.JSONDecodeError, TypeError): pass call_message = f"<details>\n<summary>๐ง Calling tool: {tool_name}</summary>\n\n" if tool_args and isinstance(tool_args, dict): args_str = json.dumps(tool_args, indent=2) call_message += f"**Arguments:**\n```json\n{args_str}\n```\n\n" call_message += "</details>\n\n" await self.emit_message(call_message) elif msg_type == "tool_return_message": # Process tool result tool_call_id = chunk.get("tool_call_id", "") tool_name = chunk.get("name", "") if not tool_name: tool_name = tool_call_id.split("-")[0] if tool_call_id else "tool" content = chunk.get("tool_return", "").strip() if content: # Try to parse and extract meaningful content try: content_json = json.loads(content) if isinstance(content_json, dict): content = ( content_json.get("message") or content_json.get("result") or content_json.get("content") or content_json.get("output") or str(content_json) ) elif isinstance(content_json, list): content = ", ".join(str(item) for item in content_json) except (json.JSONDecodeError, TypeError): pass result_message = ( f"<details>\n" f"<summary>๐ {tool_name} result</summary>\n\n" f"{content}\n\n" f"</details>\n\n" ) await self.emit_message(result_message) elif msg_type == "assistant_message": # Emit final response content = chunk.get("content", "").strip() if content: logger.debug(f"Emitting assistant message: {content}") await self.emit_message(content) except json.JSONDecodeError as e: logger.error(f"Failed to parse SSE data: {data_str}, error: {e}") continue # All messages have been emitted as they arrived return "" async def pipe( self, body: dict, __user__: dict, __event_emitter__=None, __task__=None, __model__=None, __request__=None, ) -> str: """Process messages through the Letta agent pipe.""" # Store event_emitter in instance variable for future use if __event_emitter__: self.__current_event_emitter__ = __event_emitter__ elif ( not hasattr(self, "__current_event_emitter__") or not self.__current_event_emitter__ ): logger.error("Event emitter not provided") return "" print(__user__) self.__user__ = Users.get_user_by_id(__user__["id"]) self.__model__ = __model__ self.__request__ = __request__ # Handle task-specific processing if __task__ and __task__ != TASKS.DEFAULT: try: task_model = ( self.valves.Task_Model if self.valves.Task_Model else self.__model__ ) response = await generate_chat_completions( self.__request__, { "model": task_model, "messages": body.get("messages"), "stream": False, }, user=self.__user__, ) return f"{name}: {response['choices'][0]['message']['content']}" except Exception as e: logger.error(f"Error processing task {__task__}: {e}") return f"{name}: Error processing {__task__}" # Regular message processing messages = body.get("messages", []) if not messages: await self.emit_status("error", "No messages provided", True) return "" # Only send the last user message user_message = messages[-1] if isinstance(user_message, str): user_message = {"role": "user", "content": user_message} agent_name = self.valves.Agent_Name await self.emit_status("info", f"{agent_name} is thinking...", False) try: # Messages are emitted progressively in get_letta_response await self.get_letta_response(user_message) # Clear the thinking status await self.emit_status("info", f"{agent_name} responded", True) return "" except (asyncio.TimeoutError, TimeoutError) as e: error_msg = f"Letta agent timeout after {self.valves.Timeout}s" if str(e): error_msg = str(e) logger.error(error_msg) await self.emit_status("error", error_msg, True) return f"โฑ๏ธ {error_msg}" except aiohttp.ClientResponseError as e: error_msg = f"HTTP {e.status}: {e.message}" logger.error(f"HTTP error from Letta: {error_msg}") await self.emit_status("error", error_msg, True) return f"๐ Connection error: {error_msg}. Check if Letta server is running and accessible." except aiohttp.ClientError as e: error_msg = f"Connection error: {str(e)}" logger.error(error_msg) await self.emit_status("error", error_msg, True) return f"๐ {error_msg}. Check if Letta server is running at {self.valves.API_URL}" except json.JSONDecodeError as e: error_msg = f"Invalid JSON response: {str(e)}" logger.error(error_msg) await self.emit_status("error", error_msg, True) return f"๐ {error_msg}. The agent may have returned malformed data." except ValueError as e: error_msg = str(e) logger.error(f"Validation error: {error_msg}") await self.emit_status("error", error_msg, True) return f"โ ๏ธ {error_msg}" except Exception as e: error_msg = f"Unexpected error: {str(e)}" logger.error(error_msg, exc_info=True) await self.emit_status("error", error_msg, True) return f"โ {error_msg}"
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
0