Whitepaper
Docs
Sign In
Function
Function
filter
v0.1
Reduce context to a nubmer of characters
Function ID
reduce_context_to_a_nubmer_of_characters
Creator
@anfi
Downloads
102+
Reduces context to X characters - and automatically deletes the old stuff in conversation.
Get
README
No README available
Function Code
Show
""" title: Context Clip Filter To a maximum number of chars author: anfi author_url: None funding_url: None version: 0.1 """ from typing import Optional, Dict, List import json from datetime import datetime import os # This function reduces the total conversation length, # including the system message, to about `self.MAX_CHAR_LIMIT`, # with precision of the length of `MESSAGE_IF_CLIPPED`. # 1. **MAX_CHAR_LIMIT**: Messages limited to this num of characters # Exceeding text is clipped with a prepended message (`MESSAGE_IF_CLIPPED`). # 2. **Custom Clipping Message**: Customize `MESSAGE_IF_CLIPPED` to inform AI about text clipping. # This will be prepended before clip in middle of message if it fits, of if doesn't it will ocuppy entire message. # When `DEBUG` is `True`, detailed filtering info is logged to `DEBUG_FILE_PATH` and errors to ERROR_FILE_PATH # Function potentially eats a lot of resources for long conversations. # I haven't used valves, because I was unable to figure out how to get values from them, sorry. class Filter: def __init__(self) -> None: # Initialize the Filter with configuration parameters. self.MAX_CHAR_LIMIT: int = 2000 self.DEBUG_FILE_PATH: str = "/app/backend/data/debuginfo.txt" self.ERROR_FILE_PATH: str = "/app/backend/data/debuginfo_errors.txt" self.DEBUG: bool = False self.MESSAGE_IF_CLIPPED: str = "[The text has been clipped to this place due to system limitations. Ask user to resupply context if needed.]" def _filter_messages(self, messages_to_filter: List[Dict[str, str]], prepend_clip_message: bool = True) -> tuple[List[Dict[str, str]], int, int]: # Initialize an empty list to store filtered messages filtered_messages: List[Dict[str, str]] = [] # Initialize total character count total_chars: int = 0 # Initialize original character count original_char_count: int = 0 # Set character limit from class attribute char_limit: int = self.MAX_CHAR_LIMIT # Set clip message based on prepend_clip_message flag clip_message: str = self.MESSAGE_IF_CLIPPED if prepend_clip_message else "" # Calculate the length of the clip message clip_message_length: int = len(clip_message) # Preserve system message if it exists if messages_to_filter and messages_to_filter[0].get("role") == "system": # Pop the system message from the input list system_message = messages_to_filter.pop(0) # Add the system message to the filtered messages filtered_messages.append(system_message) # Calculate the character count of the system message system_char_count = len(system_message.get("content", "")) # Add system message length to total characters total_chars += system_char_count # Add system message length to original character count original_char_count += system_char_count # Reduce the character limit by the system message length char_limit -= system_char_count # Calculate original character count for message in messages_to_filter: # Add the length of each message's content to the original character count original_char_count += len(message.get("content", "")) # Process messages from the end, keeping as many as possible for message in reversed(messages_to_filter): # Get the content of the message content: str = message.get("content", "") # Calculate the length of the content content_length: int = len(content) if total_chars + content_length <= char_limit: # If the entire message fits, add it as is filtered_messages.insert(0, message) # Update the total character count total_chars += content_length elif total_chars + clip_message_length < char_limit: # If there's room for at least some content and the clip message # Calculate remaining characters remaining_chars = char_limit - total_chars - clip_message_length # Clip the content if there's room, otherwise use an empty string clipped_content = content[-remaining_chars:] if remaining_chars > 0 else "" # Prepend the clip message to the clipped content clipped_content = clip_message + clipped_content # Create a copy of the original message clipped_message = message.copy() # Update the content of the clipped message clipped_message["content"] = clipped_content # Insert the clipped message at the beginning of the filtered messages filtered_messages.insert(0, clipped_message) # Update the total character count total_chars += len(clipped_content) # Break the loop as we've reached the character limit break else: # If there's not enough room for both content and clip message, clipped message is our only way: # Create a copy of the original message clipped_message = message.copy() # Set the content of the clipped message to just the clip message clipped_message["content"] = clip_message # Insert the clipped message at the beginning of the filtered messages filtered_messages.insert(0, clipped_message) # Update the total character count total_chars += clip_message_length # Break the loop as we've reached the character limit break # Return the filtered messages, original character count, and total characters after filtering return filtered_messages, original_char_count, total_chars def inlet(self, body: Dict[str, any], __user__: Optional[Dict[str, any]] = None) -> Dict[str, any]: # Process incoming messages and apply filtering. # The __user__ parameter is unused but kept for future modifications. # Args: # body (Dict[str, any]): The incoming request body containing messages. # __user__ (Optional[Dict[str, any]]): Unused parameter for potential future use. # Returns: # Dict[str, any]: The processed request body with filtered messages. try: # Extract the original messages from the body original_messages: List[Dict[str, str]] = body.get("messages", []) # Apply filtering to the messages filtered_messages, original_char_count, filtered_char_count = self._filter_messages(original_messages, prepend_clip_message=True) # Update the body with the filtered messages body["messages"] = filtered_messages # If debug mode is enabled, log debug information if self.DEBUG: self._log_debug_info( "inlet", body, __user__, self.DEBUG_FILE_PATH, original_messages, original_char_count, filtered_char_count ) # Return the updated body return body except Exception as e: # If an exception occurs and debug mode is enabled, log the error if self.DEBUG: self._log_debug_info( "error", {"error": str(e)}, None, self.ERROR_FILE_PATH ) # Return the original body in case of an error return body def _log_debug_info( self, logging_source_method_name: str, body_content: Dict[str, any], user_info: Optional[Dict[str, any]], log_file_path: str, original_messages: Optional[List[Dict[str, str]]] = None, original_char_count: Optional[int] = None, filtered_char_count: Optional[int] = None ) -> None: try: os.makedirs(os.path.dirname(log_file_path), exist_ok=True) # Create a JSON structure for the log information log_data = { "timestamp": datetime.now().isoformat(), "method": logging_source_method_name, "filtered_message_count": len(body_content.get('messages', [])), "filtered_character_count": f"{filtered_char_count} out of {self.MAX_CHAR_LIMIT}", "body": body_content, # Filtered body content "user": user_info } # Add original message information if available if original_messages is not None: log_data["original_message_count"] = len(original_messages) log_data["original_character_count"] = original_char_count with open(log_file_path, "a") as log_file: # Write separator before the log entry log_file.write(f"\n{'='*50}\n") # Write the JSON structure json.dump(log_data, log_file, indent=2) # Write separator after the log entry log_file.write(f"\n{'='*50}\n") except Exception as e: print(f"Error writing debug info: {str(e)}")