Whitepaper
Docs
Sign In
Tool
Tool
v1.5.0
GPT4 Memory Mimic
Tool ID
gpt4_memory_mimic
Creator
@mhio
Downloads
1.6K+
a tool for the llm to mimic the gpt4's memory consideration; --> A NEW UPDATE RELEASED : https://openwebui.com/t/mhio/met
Get
README
No README available
Tool Code
Show
""" title: Memory Enhancement Tool for LLM Web UI author: https://github.com/mhioi version: 1.5.0 license: MIT """ import os import json from typing import Callable, Any import asyncio import datetime from pydantic import BaseModel, Field import tarfile import socket import threading from http.server import SimpleHTTPRequestHandler from socketserver import TCPServer class MemoryFunctions: def __init__( self, memory_file="memory.json", debug=False, directory="memory_jsons" ): self.directory = directory os.makedirs(self.directory, exist_ok=True) # Ensure the directory exists self.memory_file = os.path.join(self.directory, memory_file) self.debug = debug self.memory_data = self.load_memory() self.tag_options = ["personal", "work", "education", "life", "person", "others"] def switch_memory_file(self, new_file: str): """Switch and initialize operations on a new memory file in designated directory.""" self.memory_file = os.path.join(self.directory, new_file) self.memory_data = self.load_memory() if self.debug: print(f"Switched to memory file: {self.memory_file}") def reindex_memory(self): if self.debug: print("Reindexing memory entries.") # Reindex memory in ascending order sorted_indices = sorted(self.memory_data.keys()) reindexed_memory = { new_index + 1: self.memory_data[old_index] for new_index, old_index in enumerate(sorted_indices) } self.memory_data = reindexed_memory self.save_memory() return "Memory reindexed successfully." def delete_memory_by_index(self, index: int): if index in self.memory_data: del self.memory_data[index] self.save_memory() return f"Memory index {index} deleted successfully." else: return f"Memory index {index} does not exist." def update_memory_by_index(self, index: int, tag: str, memo: str, by: str): if index in self.memory_data: if tag not in self.tag_options: tag = "others" # Update the entry self.memory_data[index]["tag"] = tag self.memory_data[index]["memo"] = memo self.memory_data[index]["by"] = by self.memory_data[index]["last_modified"] = datetime.datetime.now().strftime( "%Y-%m-%d_%H:%M:%S" ) self.save_memory() return f"Memory index {index} updated successfully." else: return f"Memory index {index} does not exist." async def update_multiple_memories( self, memory_updates: list, llm_wants_to_update: bool, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Update multiple memory entries at once. :param memory_updates: A list of updates, each containing index, tag, memo, by. Example: [{'index': 1, 'tag': 'work', 'memo': 'Updated memo', 'by': 'LLM'}, ...] :param llm_wants_to_update: Boolean indicating if the LLM has requested the updates. :returns: A message indicating the success or failure of the operations. """ emitter = EventEmitter(__event_emitter__) responses = [] if not llm_wants_to_update: return "LLM has not requested to update multiple memories." for update in memory_updates: index = update.get("index") tag = update.get("tag", "others") memo = update.get("memo", "") by = update.get("by", "LLM") if tag not in self.memory.tag_options: tag = "others" # Default tag to 'others' if invalid if self.valves.DEBUG: print(f"Updating memory {index}: tag={tag}, memo={memo}, by={by}") # Update the memory update_message = self.memory.update_memory_by_index(index, tag, memo, by) responses.append(update_message) await emitter.emit( description=update_message, status="memory_update", done=False ) await emitter.emit( description="All requested memory updates have been processed.", status="memory_update_complete", done=True, ) return "\n".join(responses) def load_memory(self): if os.path.exists(self.memory_file): if self.debug: print(f"Loading memory from {self.memory_file}") with open(self.memory_file, "r") as file: return json.load(file) else: return {} def save_memory(self): if self.debug: print(f"Saving memory to {self.memory_file}") with open(self.memory_file, "w") as file: json.dump(self.memory_data, file, ensure_ascii=False, indent=4) def add_to_memory(self, tag: str, memo: str, by: str): if tag not in self.tag_options: tag = "others" index = len(self.memory_data) + 1 entry = { "tag": tag, "memo": memo, "by": by, "last_modified": datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S"), } self.memory_data[index] = entry self.save_memory() # Other methods remain unchanged... def retrieve_from_memory(self, key: str): if self.debug: print(f"Retrieving from memory: {key}") return self.memory_data.get(key, None) def process_input_for_memory(self, input_text: str): return {"timestamp": str(datetime.datetime.now()), "input": input_text} def get_all_memories(self) -> dict: if self.debug: print("Retrieving all memories.") return self.memory_data def clear_memory(self): if self.debug: print("Clearing all memory entries.") self.memory_data.clear() self.save_memory() return "ALL MEMORIES CLEARED!" class EventEmitter: def __init__(self, event_emitter: Callable[[dict], Any] = None): self.event_emitter = event_emitter async def emit(self, description="Unknown state", status="in_progress", done=False): if self.event_emitter: await self.event_emitter( { "type": "status", "data": { "status": status, "description": description, "done": done, }, } ) class Tools: class Valves(BaseModel): USE_MEMORY: bool = Field( default=True, description="Enable or disable memory usage." ) MEMORY_REFRESH_INTERVAL: int = Field( default=60, description="Interval in minutes to refresh and analyze memory data.", ) DEBUG: bool = Field(default=True, description="Enable or disable debug mode.") def __init__(self): self.valves = self.Valves() self.memory = MemoryFunctions(debug=self.valves.DEBUG) self.confirmation_pending = False async def handle_input( self, input_text: str, tag: str, user_wants_to_add: bool, llm_wants_to_add: bool, by: str, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ AUTOMATICALLY Summarize user input and enhance responses using memory data. :params input_text: The TEXT . :returns: The response considering memory data. """ emitter = EventEmitter(__event_emitter__) if self.valves.DEBUG: print(f"Handling input: {input_text}") await emitter.emit(f"Analyzing input for memory: {input_text}") if self.valves.USE_MEMORY: # Assume 'by' is determined outside and 'tag' is selected by LLM if tag not in self.memory.tag_options: tag = "others" if user_wants_to_add: await emitter.emit( description=f"User requested to add to memory with tag {tag}", status="memory_update", done=False, ) self.memory.add_to_memory(tag, input_text, "user") return "added to memory by user's request!" elif llm_wants_to_add: await emitter.emit( description=f"LLM added to memory with tag {tag}", status="memory_update", done=False, ) self.memory.add_to_memory(tag, input_text, "LLM") return "added to memory by LLM's request!" # The remaining logic stays the same. async def recall_memories( self, __event_emitter__: Callable[[dict], Any] = None ) -> str: """ Retrieve all stored memories in current file and provide them to the user. :return: A structured representation of all memory contents. """ emitter = EventEmitter(__event_emitter__) await emitter.emit( "Retrieving all stored memories.", status="recall_in_progress" ) all_memories = self.memory.get_all_memories() if not all_memories: message = "No memory stored." if self.valves.DEBUG: print(message) await emitter.emit( description=message, status="recall_complete", done=True, ) return json.dumps({"message": message}, ensure_ascii=False) # Correctly format stored memories contents for readability formatted_memories = json.dumps(all_memories, ensure_ascii=False, indent=4) if self.valves.DEBUG: print(f"All stored memories retrieved: {formatted_memories}") await emitter.emit( description=f"All stored memories retrieved: {formatted_memories}", status="recall_complete", done=True, ) return f"Memories are : {formatted_memories}" async def clear_memories( self, user_confirmation: bool, __event_emitter__: Callable[[dict], Any] = None ) -> str: """ Clear all stored memories in current file after user confirmation;ask twice the user for confimation. :param user_confirmation: Boolean indicating user confirmation to clear memories. :return: A message indicating the status of the operation. """ emitter = EventEmitter(__event_emitter__) await emitter.emit( "Attempting to clear all memory entries.", status="clear_memory_attempt" ) if self.confirmation_pending and user_confirmation: self.memory.clear_memory() await emitter.emit( description="All memory entries have been cleared.", status="clear_memory_complete", done=True, ) self.confirmation_pending = False return json.dumps( {"message": "All memory entries cleared."}, ensure_ascii=False ) if not self.confirmation_pending: self.confirmation_pending = True await emitter.emit( description="Please confirm that you want to clear all memories. Call this function again with confirmation.", status="confirmation_required", done=False, ) return json.dumps( {"message": "Please confirm to clear all memories."}, ensure_ascii=False ) await emitter.emit( description="Clear memory operation aborted.", status="clear_memory_aborted", done=True, ) self.confirmation_pending = False return json.dumps( {"message": "Memory clear operation aborted."}, ensure_ascii=False ) async def refresh_memory(self, __event_emitter__: Callable[[dict], Any] = None): """ Periodically refresh and optimize memory data, includes reindexing. :returns: A message indicating the status of the refresh operation. """ emitter = EventEmitter(__event_emitter__) await emitter.emit("Starting memory refresh process.") if self.valves.DEBUG: print("Refreshing memory...") if self.valves.USE_MEMORY: refresh_message = self.memory.reindex_memory() if self.valves.DEBUG: print(refresh_message) await emitter.emit( description=refresh_message, status="memory_refresh", done=True ) return refresh_message if self.valves.DEBUG: print("Memory refreshed.") await emitter.emit( status="complete", description="Memory refresh completed.", done=True ) async def update_memory_entry( self, index: int, tag: str, memo: str, by: str, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Update an existing memory entry based on its index. :param index: The index of the memory entry to update,STARTING FROM 1. :param tag: The tag for the memory entry. :param memo: The memory information to update. :param by: Who is making the update ('user' or 'LLM'). :returns: A message indicating the success or failure of the update. """ emitter = EventEmitter(__event_emitter__) if self.valves.DEBUG: print( f"Updating memory index {index} with tag: {tag}, memo: {memo}, by: {by}" ) update_message = self.memory.update_memory_by_index(index, tag, memo, by) await emitter.emit( description=update_message, status="memory_update", done=True ) return update_message async def add_multiple_memories( self, memory_entries: list, llm_wants_to_add: bool, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Allows the LLM to add multiple memory entries at once. :param memory_entries: A list of dictionary entries, each containing tag, memo, by.Usage Example: memory_entries = [{"tag": "personal", "memo": "This is a personal note", "by": "LLM"},{"tag": "work", "memo": "Project deadline is tomorrow", "by": "LLM"}] :param llm_wants_to_add: Boolean indicating LLM's desire to add the memories. :returns: A message indicating the success or failure of the operations. """ emitter = EventEmitter(__event_emitter__) responses = [] if not llm_wants_to_add: return "LLM has not requested to add multiple memories." for idx, entry in enumerate(memory_entries): tag = entry.get("tag", "others") memo = entry.get("memo", "") by = entry.get("by", "LLM") if tag not in self.memory.tag_options: tag = "others" if self.valves.DEBUG: print(f"Adding memory {idx+1}: tag={tag}, memo={memo}, by={by}") # Add the memory self.memory.add_to_memory(tag, memo, by) response = f"Memory {idx+1} added with tag {tag} by {by}." responses.append(response) await emitter.emit(description=response, status="memory_update", done=False) await emitter.emit( description="All requested memories have been processed.", status="memory_update_complete", done=True, ) return "\n".join(responses) async def delete_memory_entry( self, index: int, llm_wants_to_delete: bool, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Delete a memory entry based on its index. :param index: The index of the memory entry to delete,STARTING FROM 1. :param llm_wants_to_delete: Boolean indicating if the LLM has requested the deletion. :returns: A message indicating the success or failure of the deletion. """ emitter = EventEmitter(__event_emitter__) if not llm_wants_to_delete: return "LLM has not requested to delete a memory." if self.valves.DEBUG: print(f"Attempting to delete memory at index {index}") deletion_message = self.memory.delete_memory_by_index(index) await emitter.emit( description=deletion_message, status="memory_deletion", done=True ) return deletion_message async def delete_multiple_memories( self, indices: list, llm_wants_to_delete: bool, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Delete multiple memory entries based on their indices. :param indices: A list of indices of the memory entries to delete,STARTING FROM 1. :param llm_wants_to_delete: Boolean indicating if the LLM has requested the deletions. :returns: A message indicating the success or failure of the deletions. """ emitter = EventEmitter(__event_emitter__) responses = [] if not llm_wants_to_delete: return "LLM has not requested to delete multiple memories." for index in indices: if self.valves.DEBUG: print(f"Attempting to delete memory at index {index}") deletion_message = self.memory.delete_memory_by_index(index) responses.append(deletion_message) await emitter.emit( description=deletion_message, status="memory_deletion", done=False ) await emitter.emit( description="All requested memory deletions have been processed.", status="memory_deletion_complete", done=True, ) return "\n".join(responses) async def create_or_switch_memory_file( self, new_file_name: str, __event_emitter__: Callable[[dict], Any] = None ) -> str: """ Create a new memory file or switch to an existing one. :param new_file_name: The name of the new or existing memory file. :returns: A message indicating the success or failure of the operation. """ emitter = EventEmitter(__event_emitter__) if self.valves.DEBUG: print(f"Switching to or creating memory file: {new_file_name}") self.memory.switch_memory_file(new_file_name + ".json") message = f"Memory file switched to {new_file_name}." await emitter.emit(description=message, status="file_switching", done=True) return message async def list_memory_files( self, __event_emitter__: Callable[[dict], Any] = None ) -> str: """ List available memory files in the designated directory. :returns: A message with the list of available memory files. """ emitter = EventEmitter(__event_emitter__) memory_files = [] if self.valves.DEBUG: print(f"Listing memory files in directory: {self.memory.directory}") try: for file in os.listdir(self.memory.directory): if file.endswith(".json"): if self.valves.DEBUG: print(f"Found memory file: {file}") memory_files.append(file) description = "Available memory files: " + ", ".join(memory_files) status = "file_listing_complete" except Exception as e: description = f"Error accessing directory: {str(e)}" status = "file_listing_error" await emitter.emit(description=description, status=status, done=True) return description async def current_memory_file( self, __event_emitter__: Callable[[dict], Any] = None ) -> str: """ Retrieve the name of the currently active memory file. :returns: A message indicating the current memory file. """ emitter = EventEmitter(__event_emitter__) current_file = self.memory.memory_file message = f"Currently using memory file: {current_file}" if self.valves.DEBUG: print(message) await emitter.emit( description=message, status="current_file_retrieved", done=True ) return message async def delete_memory_file( self, file_to_delete: str, user_confirmation: bool, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Delete a memory file in the designated directory with confirmation and necessary file switching. :param file_to_delete: The name of the memory file to delete. :param user_confirmation: Boolean indicating user confirmation for deletion. :returns: A message indicating the success or failure of the deletion. """ emitter = EventEmitter(__event_emitter__) file_path = os.path.join(self.memory.directory, file_to_delete) available_files = [ f for f in os.listdir(self.memory.directory) if f.endswith(".json") ] if file_to_delete not in available_files: message = f"File '{file_to_delete}' does not exist in the directory." await emitter.emit(description=message, status="file_not_found", done=True) if self.valves.DEBUG: print(message) return message if self.confirmation_pending and user_confirmation: try: if self.memory.memory_file == file_path: # Switch to another file before deleting the current one alternative_file = next( (f for f in available_files if f != file_to_delete), None ) if not alternative_file: message = ( "No alternative memory file to switch to. Deletion aborted." ) await emitter.emit( description=message, status="no_alternative_file", done=True ) if self.valves.DEBUG: print(message) return message self.memory.switch_memory_file(alternative_file) switch_message = f"Switched to '{alternative_file}'. Now deleting '{file_to_delete}'." await emitter.emit( description=switch_message, status="file_switched", done=False ) if self.valves.DEBUG: print(switch_message) os.remove(file_path) message = f"File '{file_to_delete}' deleted successfully." status = "file_deletion_complete" except Exception as e: message = f"Error deleting file '{file_to_delete}': {str(e)}" status = "deletion_error" await emitter.emit(description=message, status=status, done=True) self.confirmation_pending = False return message if not self.confirmation_pending: self.confirmation_pending = True confirmation_message = ( "Please confirm that you want to delete the memory file. " "Call this function again with confirmation." ) await emitter.emit( description=confirmation_message, status="confirmation_required", done=False, ) return json.dumps( { "message": "Please confirm to delete the memory file.", "file": file_to_delete, }, ensure_ascii=False, ) await emitter.emit( description="Deletion of memory file aborted.", status="deletion_aborted", done=True, ) self.confirmation_pending = False return json.dumps( {"message": "Memory file deletion aborted.", "file": file_to_delete}, ensure_ascii=False, ) async def execute_functions_sequentially( self, function_calls: list, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Execute a series of functions in sequence. :param function_calls: A list of dictionaries each containing 'name' and 'params'. Example: [{'name': 'handle_input', 'params': {...}}, ...] :returns: A dictionary with results of each function call. """ emitter = EventEmitter(__event_emitter__) results = {} for call in function_calls: func_name = call.get("name") params = call.get("params", {}) if hasattr(self, func_name) and callable(getattr(self, func_name)): if self.valves.DEBUG: print(f"Executing function: {func_name} with params: {params}") await emitter.emit( f"Executing {func_name}", status="function_execution", done=False ) try: func = getattr(self, func_name) result = await func(__event_emitter__=__event_emitter__, **params) results[func_name] = result await emitter.emit( description=f"{func_name} executed successfully.", status="function_complete", done=False, ) except Exception as e: error_msg = f"Error executing {func_name}: {str(e)}" results[func_name] = error_msg await emitter.emit( description=error_msg, status="function_error", done=False ) else: error_msg = f"Function {func_name} not found or not callable." results[func_name] = error_msg await emitter.emit( description=error_msg, status="function_missing", done=False ) await emitter.emit( description="All requested functions have been processed.", status="execution_complete", done=True, ) return f"executed successfully :{results}" async def download_memory( self, memory_file_name: str, download_all: bool, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Download a specific memory file or all memory files in a tarball;ONLY FOR 14 SECONDS, AND WHEN LLM ANSWERS THE LINK IS EXPIRED :param memory_file_name: Name of the memory file or target tarball name. :param download_all: Boolean indicating whether to download all memories as a tarball. :returns: A message with a link or status of the operation. """ emitter = EventEmitter(__event_emitter__) available_files = os.listdir(self.memory.directory) found_files = [] if download_all: tarball_path = os.path.join(self.memory.directory, "all_memories.tar.gz") with tarfile.open(tarball_path, "w:gz") as tar: for file in available_files: if file.endswith(".json"): tar.add(os.path.join(self.memory.directory, file), arcname=file) found_files.append(file) target_file = tarball_path else: file_path = os.path.join(self.memory.directory, memory_file_name) if memory_file_name in available_files: target_file = file_path found_files.append(memory_file_name) else: matched = sorted(f for f in available_files if memory_file_name in f) if matched: target_file = os.path.join(self.memory.directory, matched[0]) found_files.append(matched[0]) else: message = f"No memory file matching '{memory_file_name}' was found." await emitter.emit( description=message, status="file_not_found", done=True ) if self.valves.DEBUG: print(message) return message if not found_files: message = "No files were found to download." await emitter.emit(description=message, status="not_found", done=True) if self.valves.DEBUG: print(message) return message httpd = None try: handler = SimpleHTTPRequestHandler handler.directory = self.memory.directory httpd = TCPServer(("", 0), handler) ip, port = httpd.server_address server_url = f"http://{ip}:{port}/{target_file}" # Start the server in a new thread server_thread = threading.Thread(target=httpd.serve_forever, daemon=True) server_thread.start() message = f"Download available for 14 seconds in this link: {server_url}" await emitter.emit(description=message, status="download", done=True) if self.valves.DEBUG: print(message) # Give the user time to download await asyncio.sleep(14) return "TELL THE USER THAT LINK IS EXPIRED AND YOU SHOULD HAVE DOWNLOADED FILES!" except Exception as e: message = f"Error setting up download server: {str(e)}" await emitter.emit(description=message, status="download_error", done=True) if self.valves.DEBUG: print(message) finally: if httpd: httpd.shutdown() # Ensure server is shut down if os.path.exists(target_file): os.remove(target_file) # Delete the file after the server is shut down if self.valves.DEBUG: await emitter.emit( description="file deleted sucessfully", status=f"Deleted file: {target_file}", done=True, ) print(f"Deleted file: {target_file}") return message