NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Tool
v1.5.0
GPT4 Memory Mimic
a tool for the llm to mimic the gpt4's memory consideration; --> A NEW UPDATE RELEASED : https://openwebui.com/t/mhio/met
Tool ID
gpt4_memory_mimic
Creator
@mhio
Downloads
674+

Tool Content
python
"""
title: Memory Enhancement Tool for LLM Web UI
author: https://github.com/mhioi
version: 1.5.0
license: MIT
"""

import os
import json
from typing import Callable, Any
import asyncio
import datetime
from pydantic import BaseModel, Field
import tarfile
import socket
import threading
from http.server import SimpleHTTPRequestHandler
from socketserver import TCPServer


class MemoryFunctions:
    def __init__(
        self, memory_file="memory.json", debug=False, directory="memory_jsons"
    ):
        self.directory = directory
        os.makedirs(self.directory, exist_ok=True)  # Ensure the directory exists
        self.memory_file = os.path.join(self.directory, memory_file)
        self.debug = debug
        self.memory_data = self.load_memory()
        self.tag_options = ["personal", "work", "education", "life", "person", "others"]

    def switch_memory_file(self, new_file: str):
        """Switch and initialize operations on a new memory file in designated directory."""
        self.memory_file = os.path.join(self.directory, new_file)
        self.memory_data = self.load_memory()
        if self.debug:
            print(f"Switched to memory file: {self.memory_file}")

    def reindex_memory(self):
        if self.debug:
            print("Reindexing memory entries.")

        # Reindex memory in ascending order
        sorted_indices = sorted(self.memory_data.keys())
        reindexed_memory = {
            new_index + 1: self.memory_data[old_index]
            for new_index, old_index in enumerate(sorted_indices)
        }
        self.memory_data = reindexed_memory
        self.save_memory()

        return "Memory reindexed successfully."

    def delete_memory_by_index(self, index: int):
        if index in self.memory_data:
            del self.memory_data[index]
            self.save_memory()
            return f"Memory index {index} deleted successfully."
        else:
            return f"Memory index {index} does not exist."

    def update_memory_by_index(self, index: int, tag: str, memo: str, by: str):
        if index in self.memory_data:
            if tag not in self.tag_options:
                tag = "others"

            # Update the entry
            self.memory_data[index]["tag"] = tag
            self.memory_data[index]["memo"] = memo
            self.memory_data[index]["by"] = by
            self.memory_data[index]["last_modified"] = datetime.datetime.now().strftime(
                "%Y-%m-%d_%H:%M:%S"
            )
            self.save_memory()
            return f"Memory index {index} updated successfully."
        else:
            return f"Memory index {index} does not exist."

    async def update_multiple_memories(
        self,
        memory_updates: list,
        llm_wants_to_update: bool,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Update multiple memory entries at once.

        :param memory_updates: A list of updates, each containing index, tag, memo, by.
                               Example: [{'index': 1, 'tag': 'work', 'memo': 'Updated memo', 'by': 'LLM'}, ...]
        :param llm_wants_to_update: Boolean indicating if the LLM has requested the updates.
        :returns: A message indicating the success or failure of the operations.
        """
        emitter = EventEmitter(__event_emitter__)
        responses = []

        if not llm_wants_to_update:
            return "LLM has not requested to update multiple memories."

        for update in memory_updates:
            index = update.get("index")
            tag = update.get("tag", "others")
            memo = update.get("memo", "")
            by = update.get("by", "LLM")

            if tag not in self.memory.tag_options:
                tag = "others"  # Default tag to 'others' if invalid

            if self.valves.DEBUG:
                print(f"Updating memory {index}: tag={tag}, memo={memo}, by={by}")

            # Update the memory
            update_message = self.memory.update_memory_by_index(index, tag, memo, by)
            responses.append(update_message)

            await emitter.emit(
                description=update_message, status="memory_update", done=False
            )

        await emitter.emit(
            description="All requested memory updates have been processed.",
            status="memory_update_complete",
            done=True,
        )

        return "\n".join(responses)

    def load_memory(self):
        if os.path.exists(self.memory_file):
            if self.debug:
                print(f"Loading memory from {self.memory_file}")
            with open(self.memory_file, "r") as file:
                return json.load(file)
        else:
            return {}

    def save_memory(self):
        if self.debug:
            print(f"Saving memory to {self.memory_file}")
        with open(self.memory_file, "w") as file:
            json.dump(self.memory_data, file, ensure_ascii=False, indent=4)

    def add_to_memory(self, tag: str, memo: str, by: str):
        if tag not in self.tag_options:
            tag = "others"

        index = len(self.memory_data) + 1
        entry = {
            "tag": tag,
            "memo": memo,
            "by": by,
            "last_modified": datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S"),
        }
        self.memory_data[index] = entry
        self.save_memory()

    # Other methods remain unchanged...
    def retrieve_from_memory(self, key: str):
        if self.debug:
            print(f"Retrieving from memory: {key}")
        return self.memory_data.get(key, None)

    def process_input_for_memory(self, input_text: str):
        return {"timestamp": str(datetime.datetime.now()), "input": input_text}

    def get_all_memories(self) -> dict:
        if self.debug:
            print("Retrieving all memories.")
        return self.memory_data

    def clear_memory(self):
        if self.debug:
            print("Clearing all memory entries.")
        self.memory_data.clear()
        self.save_memory()
        return "ALL MEMORIES CLEARED!"


class EventEmitter:
    def __init__(self, event_emitter: Callable[[dict], Any] = None):
        self.event_emitter = event_emitter

    async def emit(self, description="Unknown state", status="in_progress", done=False):
        if self.event_emitter:
            await self.event_emitter(
                {
                    "type": "status",
                    "data": {
                        "status": status,
                        "description": description,
                        "done": done,
                    },
                }
            )


class Tools:
    class Valves(BaseModel):
        USE_MEMORY: bool = Field(
            default=True, description="Enable or disable memory usage."
        )
        MEMORY_REFRESH_INTERVAL: int = Field(
            default=60,
            description="Interval in minutes to refresh and analyze memory data.",
        )
        DEBUG: bool = Field(default=True, description="Enable or disable debug mode.")

    def __init__(self):
        self.valves = self.Valves()
        self.memory = MemoryFunctions(debug=self.valves.DEBUG)
        self.confirmation_pending = False

    async def handle_input(
        self,
        input_text: str,
        tag: str,
        user_wants_to_add: bool,
        llm_wants_to_add: bool,
        by: str,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        AUTOMATICALLY Summarize user input and enhance responses using memory data.

        :params input_text: The TEXT .
        :returns: The response considering memory data.
        """
        emitter = EventEmitter(__event_emitter__)

        if self.valves.DEBUG:
            print(f"Handling input: {input_text}")

        await emitter.emit(f"Analyzing input for memory: {input_text}")

        if self.valves.USE_MEMORY:
            # Assume 'by' is determined outside and 'tag' is selected by LLM
            if tag not in self.memory.tag_options:
                tag = "others"

            if user_wants_to_add:
                await emitter.emit(
                    description=f"User requested to add to memory with tag {tag}",
                    status="memory_update",
                    done=False,
                )
                self.memory.add_to_memory(tag, input_text, "user")
                return "added to memory by user's request!"
            elif llm_wants_to_add:
                await emitter.emit(
                    description=f"LLM added to memory with tag {tag}",
                    status="memory_update",
                    done=False,
                )
                self.memory.add_to_memory(tag, input_text, "LLM")
                return "added to memory by LLM's request!"

        # The remaining logic stays the same.

    async def recall_memories(
        self, __event_emitter__: Callable[[dict], Any] = None
    ) -> str:
        """
        Retrieve all stored memories in current file and provide them to the user.

        :return: A structured representation of all memory contents.
        """
        emitter = EventEmitter(__event_emitter__)
        await emitter.emit(
            "Retrieving all stored memories.", status="recall_in_progress"
        )

        all_memories = self.memory.get_all_memories()
        if not all_memories:
            message = "No memory stored."
            if self.valves.DEBUG:
                print(message)
            await emitter.emit(
                description=message,
                status="recall_complete",
                done=True,
            )
            return json.dumps({"message": message}, ensure_ascii=False)

        # Correctly format stored memories contents for readability
        formatted_memories = json.dumps(all_memories, ensure_ascii=False, indent=4)

        if self.valves.DEBUG:
            print(f"All stored memories retrieved: {formatted_memories}")

        await emitter.emit(
            description=f"All stored memories retrieved: {formatted_memories}",
            status="recall_complete",
            done=True,
        )

        return f"Memories are : {formatted_memories}"

    async def clear_memories(
        self, user_confirmation: bool, __event_emitter__: Callable[[dict], Any] = None
    ) -> str:
        """
        Clear all stored memories in current file after user confirmation;ask twice the user for confimation.

        :param user_confirmation: Boolean indicating user confirmation to clear memories.
        :return: A message indicating the status of the operation.
        """
        emitter = EventEmitter(__event_emitter__)
        await emitter.emit(
            "Attempting to clear all memory entries.", status="clear_memory_attempt"
        )

        if self.confirmation_pending and user_confirmation:
            self.memory.clear_memory()
            await emitter.emit(
                description="All memory entries have been cleared.",
                status="clear_memory_complete",
                done=True,
            )
            self.confirmation_pending = False
            return json.dumps(
                {"message": "All memory entries cleared."}, ensure_ascii=False
            )

        if not self.confirmation_pending:
            self.confirmation_pending = True
            await emitter.emit(
                description="Please confirm that you want to clear all memories. Call this function again with confirmation.",
                status="confirmation_required",
                done=False,
            )
            return json.dumps(
                {"message": "Please confirm to clear all memories."}, ensure_ascii=False
            )

        await emitter.emit(
            description="Clear memory operation aborted.",
            status="clear_memory_aborted",
            done=True,
        )
        self.confirmation_pending = False
        return json.dumps(
            {"message": "Memory clear operation aborted."}, ensure_ascii=False
        )

    async def refresh_memory(self, __event_emitter__: Callable[[dict], Any] = None):
        """
        Periodically refresh and optimize memory data, includes reindexing.

        :returns: A message indicating the status of the refresh operation.
        """
        emitter = EventEmitter(__event_emitter__)
        await emitter.emit("Starting memory refresh process.")

        if self.valves.DEBUG:
            print("Refreshing memory...")

        if self.valves.USE_MEMORY:
            refresh_message = self.memory.reindex_memory()

            if self.valves.DEBUG:
                print(refresh_message)

            await emitter.emit(
                description=refresh_message, status="memory_refresh", done=True
            )

            return refresh_message

        if self.valves.DEBUG:
            print("Memory refreshed.")

        await emitter.emit(
            status="complete", description="Memory refresh completed.", done=True
        )

    async def update_memory_entry(
        self,
        index: int,
        tag: str,
        memo: str,
        by: str,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Update an existing memory entry based on its index.

        :param index: The index of the memory entry to update,STARTING FROM 1.
        :param tag: The tag for the memory entry.
        :param memo: The memory information to update.
        :param by: Who is making the update ('user' or 'LLM').
        :returns: A message indicating the success or failure of the update.
        """
        emitter = EventEmitter(__event_emitter__)

        if self.valves.DEBUG:
            print(
                f"Updating memory index {index} with tag: {tag}, memo: {memo}, by: {by}"
            )

        update_message = self.memory.update_memory_by_index(index, tag, memo, by)

        await emitter.emit(
            description=update_message, status="memory_update", done=True
        )

        return update_message

    async def add_multiple_memories(
        self,
        memory_entries: list,
        llm_wants_to_add: bool,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Allows the LLM to add multiple memory entries at once.

        :param memory_entries: A list of dictionary entries, each containing tag, memo, by.Usage Example: memory_entries = [{"tag": "personal", "memo": "This is a personal note", "by": "LLM"},{"tag": "work", "memo": "Project deadline is tomorrow", "by": "LLM"}]
        :param llm_wants_to_add: Boolean indicating LLM's desire to add the memories.
        :returns: A message indicating the success or failure of the operations.
        """
        emitter = EventEmitter(__event_emitter__)
        responses = []

        if not llm_wants_to_add:
            return "LLM has not requested to add multiple memories."

        for idx, entry in enumerate(memory_entries):
            tag = entry.get("tag", "others")
            memo = entry.get("memo", "")
            by = entry.get("by", "LLM")

            if tag not in self.memory.tag_options:
                tag = "others"

            if self.valves.DEBUG:
                print(f"Adding memory {idx+1}: tag={tag}, memo={memo}, by={by}")

            # Add the memory
            self.memory.add_to_memory(tag, memo, by)
            response = f"Memory {idx+1} added with tag {tag} by {by}."
            responses.append(response)

            await emitter.emit(description=response, status="memory_update", done=False)

        await emitter.emit(
            description="All requested memories have been processed.",
            status="memory_update_complete",
            done=True,
        )

        return "\n".join(responses)

    async def delete_memory_entry(
        self,
        index: int,
        llm_wants_to_delete: bool,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Delete a memory entry based on its index.

        :param index: The index of the memory entry to delete,STARTING FROM 1.
        :param llm_wants_to_delete: Boolean indicating if the LLM has requested the deletion.
        :returns: A message indicating the success or failure of the deletion.
        """
        emitter = EventEmitter(__event_emitter__)

        if not llm_wants_to_delete:
            return "LLM has not requested to delete a memory."

        if self.valves.DEBUG:
            print(f"Attempting to delete memory at index {index}")

        deletion_message = self.memory.delete_memory_by_index(index)

        await emitter.emit(
            description=deletion_message, status="memory_deletion", done=True
        )

        return deletion_message

    async def delete_multiple_memories(
        self,
        indices: list,
        llm_wants_to_delete: bool,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Delete multiple memory entries based on their indices.

        :param indices: A list of indices of the memory entries to delete,STARTING FROM 1.
        :param llm_wants_to_delete: Boolean indicating if the LLM has requested the deletions.
        :returns: A message indicating the success or failure of the deletions.
        """
        emitter = EventEmitter(__event_emitter__)
        responses = []

        if not llm_wants_to_delete:
            return "LLM has not requested to delete multiple memories."

        for index in indices:
            if self.valves.DEBUG:
                print(f"Attempting to delete memory at index {index}")

            deletion_message = self.memory.delete_memory_by_index(index)
            responses.append(deletion_message)

            await emitter.emit(
                description=deletion_message, status="memory_deletion", done=False
            )

        await emitter.emit(
            description="All requested memory deletions have been processed.",
            status="memory_deletion_complete",
            done=True,
        )

        return "\n".join(responses)

    async def create_or_switch_memory_file(
        self, new_file_name: str, __event_emitter__: Callable[[dict], Any] = None
    ) -> str:
        """
        Create a new memory file or switch to an existing one.

        :param new_file_name: The name of the new or existing memory file.
        :returns: A message indicating the success or failure of the operation.
        """
        emitter = EventEmitter(__event_emitter__)

        if self.valves.DEBUG:
            print(f"Switching to or creating memory file: {new_file_name}")

        self.memory.switch_memory_file(new_file_name + ".json")

        message = f"Memory file switched to {new_file_name}."

        await emitter.emit(description=message, status="file_switching", done=True)

        return message

    async def list_memory_files(
        self, __event_emitter__: Callable[[dict], Any] = None
    ) -> str:
        """
        List available memory files in the designated directory.

        :returns: A message with the list of available memory files.
        """
        emitter = EventEmitter(__event_emitter__)
        memory_files = []

        if self.valves.DEBUG:
            print(f"Listing memory files in directory: {self.memory.directory}")

        try:
            for file in os.listdir(self.memory.directory):
                if file.endswith(".json"):
                    if self.valves.DEBUG:
                        print(f"Found memory file: {file}")
                    memory_files.append(file)

            description = "Available memory files: " + ", ".join(memory_files)
            status = "file_listing_complete"

        except Exception as e:
            description = f"Error accessing directory: {str(e)}"
            status = "file_listing_error"

        await emitter.emit(description=description, status=status, done=True)

        return description

    async def current_memory_file(
        self, __event_emitter__: Callable[[dict], Any] = None
    ) -> str:
        """
        Retrieve the name of the currently active memory file.

        :returns: A message indicating the current memory file.
        """
        emitter = EventEmitter(__event_emitter__)

        current_file = self.memory.memory_file

        message = f"Currently using memory file: {current_file}"

        if self.valves.DEBUG:
            print(message)

        await emitter.emit(
            description=message, status="current_file_retrieved", done=True
        )

        return message

    async def delete_memory_file(
        self,
        file_to_delete: str,
        user_confirmation: bool,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Delete a memory file in the designated directory with confirmation and necessary file switching.

        :param file_to_delete: The name of the memory file to delete.
        :param user_confirmation: Boolean indicating user confirmation for deletion.
        :returns: A message indicating the success or failure of the deletion.
        """
        emitter = EventEmitter(__event_emitter__)
        file_path = os.path.join(self.memory.directory, file_to_delete)
        available_files = [
            f for f in os.listdir(self.memory.directory) if f.endswith(".json")
        ]

        if file_to_delete not in available_files:
            message = f"File '{file_to_delete}' does not exist in the directory."
            await emitter.emit(description=message, status="file_not_found", done=True)
            if self.valves.DEBUG:
                print(message)
            return message

        if self.confirmation_pending and user_confirmation:
            try:
                if self.memory.memory_file == file_path:
                    # Switch to another file before deleting the current one
                    alternative_file = next(
                        (f for f in available_files if f != file_to_delete), None
                    )
                    if not alternative_file:
                        message = (
                            "No alternative memory file to switch to. Deletion aborted."
                        )
                        await emitter.emit(
                            description=message, status="no_alternative_file", done=True
                        )
                        if self.valves.DEBUG:
                            print(message)
                        return message

                    self.memory.switch_memory_file(alternative_file)
                    switch_message = f"Switched to '{alternative_file}'. Now deleting '{file_to_delete}'."
                    await emitter.emit(
                        description=switch_message, status="file_switched", done=False
                    )
                    if self.valves.DEBUG:
                        print(switch_message)

                os.remove(file_path)
                message = f"File '{file_to_delete}' deleted successfully."
                status = "file_deletion_complete"
            except Exception as e:
                message = f"Error deleting file '{file_to_delete}': {str(e)}"
                status = "deletion_error"

            await emitter.emit(description=message, status=status, done=True)
            self.confirmation_pending = False
            return message

        if not self.confirmation_pending:
            self.confirmation_pending = True
            confirmation_message = (
                "Please confirm that you want to delete the memory file. "
                "Call this function again with confirmation."
            )
            await emitter.emit(
                description=confirmation_message,
                status="confirmation_required",
                done=False,
            )
            return json.dumps(
                {
                    "message": "Please confirm to delete the memory file.",
                    "file": file_to_delete,
                },
                ensure_ascii=False,
            )

        await emitter.emit(
            description="Deletion of memory file aborted.",
            status="deletion_aborted",
            done=True,
        )
        self.confirmation_pending = False
        return json.dumps(
            {"message": "Memory file deletion aborted.", "file": file_to_delete},
            ensure_ascii=False,
        )

    async def execute_functions_sequentially(
        self,
        function_calls: list,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> dict:
        """
        Execute a series of functions in sequence.

        :param function_calls: A list of dictionaries each containing 'name' and 'params'.
                               Example: [{'name': 'handle_input', 'params': {...}}, ...]
        :returns: A dictionary with results of each function call.
        """
        emitter = EventEmitter(__event_emitter__)
        results = {}

        for call in function_calls:
            func_name = call.get("name")
            params = call.get("params", {})

            if hasattr(self, func_name) and callable(getattr(self, func_name)):
                if self.valves.DEBUG:
                    print(f"Executing function: {func_name} with params: {params}")

                await emitter.emit(
                    f"Executing {func_name}", status="function_execution", done=False
                )

                try:
                    func = getattr(self, func_name)
                    result = await func(__event_emitter__=__event_emitter__, **params)
                    results[func_name] = result

                    await emitter.emit(
                        description=f"{func_name} executed successfully.",
                        status="function_complete",
                        done=False,
                    )
                except Exception as e:
                    error_msg = f"Error executing {func_name}: {str(e)}"
                    results[func_name] = error_msg

                    await emitter.emit(
                        description=error_msg, status="function_error", done=False
                    )
            else:
                error_msg = f"Function {func_name} not found or not callable."
                results[func_name] = error_msg

                await emitter.emit(
                    description=error_msg, status="function_missing", done=False
                )

        await emitter.emit(
            description="All requested functions have been processed.",
            status="execution_complete",
            done=True,
        )

        return f"executed successfully :{results}"

    async def download_memory(
        self,
        memory_file_name: str,
        download_all: bool,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Download a specific memory file or all memory files in a tarball;ONLY FOR 14 SECONDS, AND WHEN LLM ANSWERS THE LINK IS EXPIRED

        :param memory_file_name: Name of the memory file or target tarball name.
        :param download_all: Boolean indicating whether to download all memories as a tarball.
        :returns: A message with a link or status of the operation.
        """
        emitter = EventEmitter(__event_emitter__)
        available_files = os.listdir(self.memory.directory)
        found_files = []

        if download_all:
            tarball_path = os.path.join(self.memory.directory, "all_memories.tar.gz")
            with tarfile.open(tarball_path, "w:gz") as tar:
                for file in available_files:
                    if file.endswith(".json"):
                        tar.add(os.path.join(self.memory.directory, file), arcname=file)
                        found_files.append(file)

            target_file = tarball_path
        else:
            file_path = os.path.join(self.memory.directory, memory_file_name)
            if memory_file_name in available_files:
                target_file = file_path
                found_files.append(memory_file_name)
            else:
                matched = sorted(f for f in available_files if memory_file_name in f)
                if matched:
                    target_file = os.path.join(self.memory.directory, matched[0])
                    found_files.append(matched[0])
                else:
                    message = f"No memory file matching '{memory_file_name}' was found."
                    await emitter.emit(
                        description=message, status="file_not_found", done=True
                    )
                    if self.valves.DEBUG:
                        print(message)
                    return message

        if not found_files:
            message = "No files were found to download."
            await emitter.emit(description=message, status="not_found", done=True)
            if self.valves.DEBUG:
                print(message)
            return message

        httpd = None
        try:
            handler = SimpleHTTPRequestHandler
            handler.directory = self.memory.directory
            httpd = TCPServer(("", 0), handler)
            ip, port = httpd.server_address
            server_url = f"http://{ip}:{port}/{target_file}"

            # Start the server in a new thread
            server_thread = threading.Thread(target=httpd.serve_forever, daemon=True)
            server_thread.start()

            message = f"Download available for 14 seconds in this link: {server_url}"
            await emitter.emit(description=message, status="download", done=True)
            if self.valves.DEBUG:
                print(message)

            # Give the user time to download
            await asyncio.sleep(14)
            return "TELL THE USER THAT LINK IS EXPIRED AND YOU SHOULD HAVE DOWNLOADED FILES!"
        except Exception as e:
            message = f"Error setting up download server: {str(e)}"
            await emitter.emit(description=message, status="download_error", done=True)
            if self.valves.DEBUG:
                print(message)
        finally:
            if httpd:
                httpd.shutdown()  # Ensure server is shut down
            if os.path.exists(target_file):
                os.remove(target_file)  # Delete the file after the server is shut down
                if self.valves.DEBUG:
                    await emitter.emit(
                        description="file deleted sucessfully",
                        status=f"Deleted file: {target_file}",
                        done=True,
                    )
                    print(f"Deleted file: {target_file}")
        return message