"""
title: Context Clip Filter To a maximum number of chars
author: anfi
author_url: None
funding_url: None
version: 0.1
"""
from typing import Optional, Dict, List
import json
from datetime import datetime
import os
# This function reduces the total conversation length,
# including the system message, to about `self.MAX_CHAR_LIMIT`,
# with precision of the length of `MESSAGE_IF_CLIPPED`.
# 1. **MAX_CHAR_LIMIT**: Messages limited to this num of characters
# Exceeding text is clipped with a prepended message (`MESSAGE_IF_CLIPPED`).
# 2. **Custom Clipping Message**: Customize `MESSAGE_IF_CLIPPED` to inform AI about text clipping.
# This will be prepended before clip in middle of message if it fits, of if doesn't it will ocuppy entire message.
# When `DEBUG` is `True`, detailed filtering info is logged to `DEBUG_FILE_PATH` and errors to ERROR_FILE_PATH
# Function potentially eats a lot of resources for long conversations.
# I haven't used valves, because I was unable to figure out how to get values from them, sorry.
class Filter:
def __init__(self) -> None:
# Initialize the Filter with configuration parameters.
self.MAX_CHAR_LIMIT: int = 2000
self.DEBUG_FILE_PATH: str = "/app/backend/data/debuginfo.txt"
self.ERROR_FILE_PATH: str = "/app/backend/data/debuginfo_errors.txt"
self.DEBUG: bool = False
self.MESSAGE_IF_CLIPPED: str = "[The text has been clipped to this place due to system limitations. Ask user to resupply context if needed.]"
def _filter_messages(self, messages_to_filter: List[Dict[str, str]], prepend_clip_message: bool = True) -> tuple[List[Dict[str, str]], int, int]:
# Initialize an empty list to store filtered messages
filtered_messages: List[Dict[str, str]] = []
# Initialize total character count
total_chars: int = 0
# Initialize original character count
original_char_count: int = 0
# Set character limit from class attribute
char_limit: int = self.MAX_CHAR_LIMIT
# Set clip message based on prepend_clip_message flag
clip_message: str = self.MESSAGE_IF_CLIPPED if prepend_clip_message else ""
# Calculate the length of the clip message
clip_message_length: int = len(clip_message)
# Preserve system message if it exists
if messages_to_filter and messages_to_filter[0].get("role") == "system":
# Pop the system message from the input list
system_message = messages_to_filter.pop(0)
# Add the system message to the filtered messages
filtered_messages.append(system_message)
# Calculate the character count of the system message
system_char_count = len(system_message.get("content", ""))
# Add system message length to total characters
total_chars += system_char_count
# Add system message length to original character count
original_char_count += system_char_count
# Reduce the character limit by the system message length
char_limit -= system_char_count
# Calculate original character count
for message in messages_to_filter:
# Add the length of each message's content to the original character count
original_char_count += len(message.get("content", ""))
# Process messages from the end, keeping as many as possible
for message in reversed(messages_to_filter):
# Get the content of the message
content: str = message.get("content", "")
# Calculate the length of the content
content_length: int = len(content)
if total_chars + content_length <= char_limit:
# If the entire message fits, add it as is
filtered_messages.insert(0, message)
# Update the total character count
total_chars += content_length
elif total_chars + clip_message_length < char_limit:
# If there's room for at least some content and the clip message
# Calculate remaining characters
remaining_chars = char_limit - total_chars - clip_message_length
# Clip the content if there's room, otherwise use an empty string
clipped_content = content[-remaining_chars:] if remaining_chars > 0 else ""
# Prepend the clip message to the clipped content
clipped_content = clip_message + clipped_content
# Create a copy of the original message
clipped_message = message.copy()
# Update the content of the clipped message
clipped_message["content"] = clipped_content
# Insert the clipped message at the beginning of the filtered messages
filtered_messages.insert(0, clipped_message)
# Update the total character count
total_chars += len(clipped_content)
# Break the loop as we've reached the character limit
break
else:
# If there's not enough room for both content and clip message, clipped message is our only way:
# Create a copy of the original message
clipped_message = message.copy()
# Set the content of the clipped message to just the clip message
clipped_message["content"] = clip_message
# Insert the clipped message at the beginning of the filtered messages
filtered_messages.insert(0, clipped_message)
# Update the total character count
total_chars += clip_message_length
# Break the loop as we've reached the character limit
break
# Return the filtered messages, original character count, and total characters after filtering
return filtered_messages, original_char_count, total_chars
def inlet(self, body: Dict[str, any], __user__: Optional[Dict[str, any]] = None) -> Dict[str, any]:
# Process incoming messages and apply filtering.
# The __user__ parameter is unused but kept for future modifications.
# Args:
# body (Dict[str, any]): The incoming request body containing messages.
# __user__ (Optional[Dict[str, any]]): Unused parameter for potential future use.
# Returns:
# Dict[str, any]: The processed request body with filtered messages.
try:
# Extract the original messages from the body
original_messages: List[Dict[str, str]] = body.get("messages", [])
# Apply filtering to the messages
filtered_messages, original_char_count, filtered_char_count = self._filter_messages(original_messages, prepend_clip_message=True)
# Update the body with the filtered messages
body["messages"] = filtered_messages
# If debug mode is enabled, log debug information
if self.DEBUG:
self._log_debug_info(
"inlet", body, __user__, self.DEBUG_FILE_PATH,
original_messages, original_char_count, filtered_char_count
)
# Return the updated body
return body
except Exception as e:
# If an exception occurs and debug mode is enabled, log the error
if self.DEBUG:
self._log_debug_info(
"error", {"error": str(e)}, None, self.ERROR_FILE_PATH
)
# Return the original body in case of an error
return body
def _log_debug_info(
self,
logging_source_method_name: str,
body_content: Dict[str, any],
user_info: Optional[Dict[str, any]],
log_file_path: str,
original_messages: Optional[List[Dict[str, str]]] = None,
original_char_count: Optional[int] = None,
filtered_char_count: Optional[int] = None
) -> None:
try:
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
# Create a JSON structure for the log information
log_data = {
"timestamp": datetime.now().isoformat(),
"method": logging_source_method_name,
"filtered_message_count": len(body_content.get('messages', [])),
"filtered_character_count": f"{filtered_char_count} out of {self.MAX_CHAR_LIMIT}",
"body": body_content, # Filtered body content
"user": user_info
}
# Add original message information if available
if original_messages is not None:
log_data["original_message_count"] = len(original_messages)
log_data["original_character_count"] = original_char_count
with open(log_file_path, "a") as log_file:
# Write separator before the log entry
log_file.write(f"\n{'='*50}\n")
# Write the JSON structure
json.dump(log_data, log_file, indent=2)
# Write separator after the log entry
log_file.write(f"\n{'='*50}\n")
except Exception as e:
print(f"Error writing debug info: {str(e)}")