Function
filter
v0.1
Context Clip Chars
Reduces context to X characters.
Function ID
context_clip_chars
Creator
@anfi
Downloads
45+

Function Content
python
"""
title: Context Clip Filter To a maximum number of chars
author: anfi
author_url: None
funding_url: None
version: 0.1
"""

from typing import Optional, Dict, List
import json
from datetime import datetime
import os

# This function reduces the total conversation length,
# including the system message, to about `self.MAX_CHAR_LIMIT`,
# with precision of the length of `MESSAGE_IF_CLIPPED`.

# 1. **MAX_CHAR_LIMIT**: Messages limited to this num of characters
#    Exceeding text is clipped with a prepended message (`MESSAGE_IF_CLIPPED`).
# 2. **Custom Clipping Message**: Customize `MESSAGE_IF_CLIPPED` to inform AI about text clipping.
#  This will be prepended before clip in middle of message if it fits, of if doesn't it will ocuppy entire message.

# When `DEBUG` is `True`, detailed filtering info is logged to `DEBUG_FILE_PATH` and errors to ERROR_FILE_PATH
# Function potentially eats a lot of resources for long conversations.
# I haven't used valves, because I was unable to figure out how to get values from them, sorry.


class Filter:
    def __init__(self) -> None:
        # Initialize the Filter with configuration parameters.
        self.MAX_CHAR_LIMIT: int = 2000
        self.DEBUG_FILE_PATH: str = "/app/backend/data/debuginfo.txt"
        self.ERROR_FILE_PATH: str = "/app/backend/data/debuginfo_errors.txt"
        self.DEBUG: bool = False
        self.MESSAGE_IF_CLIPPED: str = "[The text has been clipped to this place due to system limitations. Ask user to resupply context if needed.]"

    def _filter_messages(self, messages_to_filter: List[Dict[str, str]], prepend_clip_message: bool = True) -> tuple[List[Dict[str, str]], int, int]:
        # Initialize an empty list to store filtered messages
        filtered_messages: List[Dict[str, str]] = []
        # Initialize total character count
        total_chars: int = 0
        # Initialize original character count
        original_char_count: int = 0
        # Set character limit from class attribute
        char_limit: int = self.MAX_CHAR_LIMIT
        # Set clip message based on prepend_clip_message flag
        clip_message: str = self.MESSAGE_IF_CLIPPED if prepend_clip_message else ""
        # Calculate the length of the clip message
        clip_message_length: int = len(clip_message)

        # Preserve system message if it exists
        if messages_to_filter and messages_to_filter[0].get("role") == "system":
            # Pop the system message from the input list
            system_message = messages_to_filter.pop(0)
            # Add the system message to the filtered messages
            filtered_messages.append(system_message)
            # Calculate the character count of the system message
            system_char_count = len(system_message.get("content", ""))
            # Add system message length to total characters
            total_chars += system_char_count
            # Add system message length to original character count
            original_char_count += system_char_count
            # Reduce the character limit by the system message length
            char_limit -= system_char_count

        # Calculate original character count
        for message in messages_to_filter:
            # Add the length of each message's content to the original character count
            original_char_count += len(message.get("content", ""))

        # Process messages from the end, keeping as many as possible
        for message in reversed(messages_to_filter):
            # Get the content of the message
            content: str = message.get("content", "")
            # Calculate the length of the content
            content_length: int = len(content)
            
            if total_chars + content_length <= char_limit:
                # If the entire message fits, add it as is
                filtered_messages.insert(0, message)
                # Update the total character count
                total_chars += content_length
            elif total_chars + clip_message_length < char_limit:
                # If there's room for at least some content and the clip message
                # Calculate remaining characters
                remaining_chars = char_limit - total_chars - clip_message_length
                # Clip the content if there's room, otherwise use an empty string
                clipped_content = content[-remaining_chars:] if remaining_chars > 0 else ""
                # Prepend the clip message to the clipped content
                clipped_content = clip_message + clipped_content
                # Create a copy of the original message
                clipped_message = message.copy()
                # Update the content of the clipped message
                clipped_message["content"] = clipped_content
                # Insert the clipped message at the beginning of the filtered messages
                filtered_messages.insert(0, clipped_message)
                # Update the total character count
                total_chars += len(clipped_content)
                # Break the loop as we've reached the character limit
                break
            else:
                # If there's not enough room for both content and clip message, clipped message is our only way:
                # Create a copy of the original message
                clipped_message = message.copy()
                # Set the content of the clipped message to just the clip message
                clipped_message["content"] = clip_message
                # Insert the clipped message at the beginning of the filtered messages
                filtered_messages.insert(0, clipped_message)
                # Update the total character count
                total_chars += clip_message_length
                # Break the loop as we've reached the character limit
                break

        # Return the filtered messages, original character count, and total characters after filtering
        return filtered_messages, original_char_count, total_chars

    def inlet(self, body: Dict[str, any], __user__: Optional[Dict[str, any]] = None) -> Dict[str, any]:
        # Process incoming messages and apply filtering.
        # The __user__ parameter is unused but kept for future modifications.

        # Args:
        #     body (Dict[str, any]): The incoming request body containing messages.
        #     __user__ (Optional[Dict[str, any]]): Unused parameter for potential future use.

        # Returns:
        #     Dict[str, any]: The processed request body with filtered messages.
        try:
            # Extract the original messages from the body
            original_messages: List[Dict[str, str]] = body.get("messages", [])
            # Apply filtering to the messages
            filtered_messages, original_char_count, filtered_char_count = self._filter_messages(original_messages, prepend_clip_message=True)
            # Update the body with the filtered messages
            body["messages"] = filtered_messages

            # If debug mode is enabled, log debug information
            if self.DEBUG:
                self._log_debug_info(
                    "inlet", body, __user__, self.DEBUG_FILE_PATH, 
                    original_messages, original_char_count, filtered_char_count
                )

            # Return the updated body
            return body
        except Exception as e:
            # If an exception occurs and debug mode is enabled, log the error
            if self.DEBUG:
                self._log_debug_info(
                    "error", {"error": str(e)}, None, self.ERROR_FILE_PATH
                )
            # Return the original body in case of an error
            return body

    def _log_debug_info(
        self,
        logging_source_method_name: str,
        body_content: Dict[str, any],
        user_info: Optional[Dict[str, any]],
        log_file_path: str,
        original_messages: Optional[List[Dict[str, str]]] = None,
        original_char_count: Optional[int] = None,
        filtered_char_count: Optional[int] = None
    ) -> None:

        try:
            os.makedirs(os.path.dirname(log_file_path), exist_ok=True)

            # Create a JSON structure for the log information
            log_data = {
                "timestamp": datetime.now().isoformat(),
                "method": logging_source_method_name,
                "filtered_message_count": len(body_content.get('messages', [])),
                "filtered_character_count": f"{filtered_char_count} out of {self.MAX_CHAR_LIMIT}",
                "body": body_content,  # Filtered body content
                "user": user_info
            }

            # Add original message information if available
            if original_messages is not None:
                log_data["original_message_count"] = len(original_messages)
                log_data["original_character_count"] = original_char_count

            with open(log_file_path, "a") as log_file:
                # Write separator before the log entry
                log_file.write(f"\n{'='*50}\n")
                
                # Write the JSON structure
                json.dump(log_data, log_file, indent=2)
                
                # Write separator after the log entry
                log_file.write(f"\n{'='*50}\n")
        except Exception as e:
            print(f"Error writing debug info: {str(e)}")