We're Hiring!
Whitepaper
Docs
Sign In
@joeykot
·
4 months ago
·
8 months ago
function
Context Length Filter
Get
Last Updated
4 months ago
Created
8 months ago
Function
filter
v0.2
Name
Context Length Filter
Downloads
93+
Saves
0+
Description
Filters messages to keep the system prompt and the last N non-system messages, optionally further truncating based on a maximum character count limit.
Function Code
Show
""" title: Context Length Filter author: Joey Kot version: 0.2 description: Filters messages to keep the system prompt and the last N non-system messages, optionally further truncating based on a maximum character count limit. Handles image_url content by specific rules. """ from pydantic import BaseModel, Field, ValidationError from typing import Optional, List, Dict, Any import json class Filter: class Valves(BaseModel): priority: int = Field( default=0, description="Priority level for the filter operations." ) n_last_messages: int = Field( default=4, description="Number of last messages to retain.", gt=0 ) max_chars: int = Field( default=-1, description="Maximum characters to retain. (Remove messages one by one, at least one message will be kept.)", ) def __init__(self, config: Optional[Dict[str, Any]] = None): self.toggle = True # IMPORTANT: This creates a switch UI in Open WebUI # TIP: Use SVG Data URI! self.icon = """data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBzdGFuZGFsb25lPSJubyI/PjwhRE9DVFlQRSBzdmcgUFVCTElDICItLy9XM0MvL0RURCBTVkcgMS4xLy9FTiIgImh0dHA6Ly93d3cudzMub3JnL0dyYXBoaWNzL1NWRy8xLjEvRFREL3N2ZzExLmR0ZCI+PHN2ZyB0PSIxNzU0NzY2NjQxMjk5IiBjbGFzcz0iaWNvbiIgdmlld0JveD0iMCAwIDEwMjQgMTAyNCIgdmVyc2lvbj0iMS4xIiB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHAtaWQ9IjExMTY2IiB4bWxuczp4bGluaz0iaHR0cDovL3d3dy53My5vcmcvMTk5OS94bGluayIgd2lkdGg9IjIwMCIgaGVpZ2h0PSIyMDAiPjxwYXRoIGQ9Ik00MTMuMTQxMzMzIDMyMi41MTczMzNMNTEyIDQyMS4zMzMzMzNsMjcxLjk1NzMzMy0yNzEuOTU3MzMzYTg1LjMzMzMzMyA4NS4zMzMzMzMgMCAwIDEgMTIwLjcwNCAwbDMwLjE2NTMzNCAzMC4xNjUzMzNMNDEzLjE0MTMzMyA3MDEuMjI2NjY3YTE3MC42NjY2NjcgMTcwLjY2NjY2NyAwIDEgMS05MC40OTYtOTAuNDk2bDk4Ljg1ODY2Ny05OC44NTg2NjdMMzIyLjY0NTMzMyA0MTMuMDEzMzMzQTE3MC42NjY2NjcgMTcwLjY2NjY2NyAwIDEgMSA0MTMuMTQxMzMzIDMyMi41NnpNMjU2IDM0MS4yMDUzMzNhODUuMzMzMzMzIDg1LjMzMzMzMyAwIDEgMCAwLTE3MC42NjY2NjYgODUuMzMzMzMzIDg1LjMzMzMzMyAwIDAgMCAwIDE3MC42NjY2NjZ6IG0wIDUxMmE4NS4zMzMzMzMgODUuMzMzMzMzIDAgMSAwIDAtMTcwLjY2NjY2NiA4NS4zMzMzMzMgODUuMzMzMzMzIDAgMCAwIDAgMTcwLjY2NjY2NnogbTQwNi44MjY2NjctMjgxLjA0NTMzM2wyNzIgMjcyLTMwLjE2NTMzNCAzMC4xNjUzMzNhODUuMzMzMzMzIDg1LjMzMzMzMyAwIDAgMS0xMjAuNzA0IDBsLTIxMS42MjY2NjYtMjExLjYyNjY2NiA5MC40NTMzMzMtOTAuNTM4NjY3eiIgZmlsbD0iIzAwMDAwMCIgcC1pZD0iMTExNjciPjwvcGF0aD48L3N2Zz4=""" if config is None: config = {} try: self.valves = self.Valves(**config) except ValidationError as e: print( f"Warning: Invalid configuration provided for filter. Default values will be used. Error: {e}" ) self.valves = self.Valves() def _get_message_char_count(self, message: Dict[str, Any]) -> int: try: msg_copy_for_count = json.loads(json.dumps(message)) # Exclude internal temporary ID from character count if "__temp_id__" in msg_copy_for_count: del msg_copy_for_count["__temp_id__"] return len(json.dumps(msg_copy_for_count, ensure_ascii=False)) except (TypeError, OverflowError) as e: # Fallback if JSON serialization fails print( f"Warning: Could not serialize message for char count: {message}. Error: {e}. Falling back to basic content length." ) content = message.get("content") if isinstance(content, str): return len(content) if isinstance( content, list ): # Sum lengths of text parts if content is a list count = 0 for part in content: if ( isinstance(part, dict) and part.get("type") == "text" and isinstance(part.get("text"), str) ): count += len(part["text"]) return count return 0 # Default for unhandled or empty content types def _get_total_char_count(self, messages: List[Dict[str, Any]]) -> int: # Sum character counts only for dictionary-type messages return sum( self._get_message_char_count(msg) for msg in messages if isinstance(msg, dict) ) def inlet( self, body: Dict[str, Any], __user__: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: if not isinstance(body.get("messages"), list): return body messages: List[Dict[str, Any]] = body["messages"] n_to_keep: int = self.valves.n_last_messages max_chars: int = self.valves.max_chars if not messages: return body # 0. Add unique temporary IDs to original messages for reliable tracking messages_with_ids: List[Dict[str, Any]] = [] for i, msg_data in enumerate(messages): if isinstance(msg_data, dict): msg_copy = json.loads(json.dumps(msg_data)) msg_copy["__temp_id__"] = f"msg_{i}" messages_with_ids.append(msg_copy) else: # Pass through non-dictionary items messages_with_ids.append(msg_data) # Identify the temporary ID of the last user message _last_user_msg_temp_id: Optional[str] = None for msg_data in reversed(messages_with_ids): if isinstance(msg_data, dict) and msg_data.get("role") == "user": _last_user_msg_temp_id = msg_data.get("__temp_id__") break saved_image_part_for_last_user: Optional[Dict[str, Any]] = None # 1. Create a new list of messages for processing: # - For the last user message, its *first* image_url part is saved and removed for processing. # - For all other messages, *all* image_url parts are removed for processing. processing_candidate_messages: List[Dict[str, Any]] = [] for msg_data_with_id in messages_with_ids: if not isinstance(msg_data_with_id, dict): processing_candidate_messages.append(msg_data_with_id) continue current_msg_copy = json.loads(json.dumps(msg_data_with_id)) content = current_msg_copy.get("content") if isinstance(content, list): new_content_list = [] is_current_msg_the_last_user = ( current_msg_copy.get("__temp_id__") == _last_user_msg_temp_id ) # Flag to ensure only the first image_url part from the last user message is saved image_saved_for_this_last_user_msg = False for part in content: if isinstance(part, dict) and part.get("type") == "image_url": if ( is_current_msg_the_last_user and not saved_image_part_for_last_user and not image_saved_for_this_last_user_msg ): saved_image_part_for_last_user = part image_saved_for_this_last_user_msg = True # This part is implicitly removed from new_content_list (not added back here) # else: image in non-last-user message, or subsequent image in last-user message: # it's removed by not being added to new_content_list. else: new_content_list.append(part) current_msg_copy["content"] = new_content_list # If content is not a list (e.g. string), it's processed as is (no image parts to remove in that format) processing_candidate_messages.append(current_msg_copy) # 2. Separate system prompt and other messages from the processed list system_prompt_obj: Optional[Dict[str, Any]] = None other_messages_list: List[Dict[str, Any]] = ( [] ) # Will contain messages (dicts) or passthrough items found_system_flag = False for msg_item in processing_candidate_messages: if not isinstance(msg_item, dict): other_messages_list.append(msg_item) continue if msg_item.get("role") == "system" and not found_system_flag: system_prompt_obj = msg_item found_system_flag = True else: other_messages_list.append(msg_item) # 3. Apply n_last_messages truncation to other_messages_list start_idx = max(0, len(other_messages_list) - n_to_keep) truncated_other_msgs = other_messages_list[start_idx:] # Preliminary final messages list (before char limit) current_final_messages: List[Dict[str, Any]] = [] if system_prompt_obj: current_final_messages.append(system_prompt_obj) current_final_messages.extend(truncated_other_msgs) # 4. Apply max_chars truncation if max_chars > 0: active_dict_messages_for_char_limit = [ m for m in truncated_other_msgs if isinstance(m, dict) ] # non_dict_messages_in_truncated are items that were not dicts and thus not subject to char removal by this logic non_dict_messages_in_truncated = [ m for m in truncated_other_msgs if not isinstance(m, dict) ] # Loop as long as total characters exceed limit AND there are active_dict_messages to remove while ( self._get_total_char_count(current_final_messages) > max_chars and len(active_dict_messages_for_char_limit) > 0 ): active_dict_messages_for_char_limit.pop( 0 ) # Remove the oldest 'other' dict message # Reconstruct current_final_messages current_final_messages = [] if system_prompt_obj: current_final_messages.append(system_prompt_obj) current_final_messages.extend(active_dict_messages_for_char_limit) current_final_messages.extend( non_dict_messages_in_truncated ) # Add non-dicts back (they weren't removed by char limit) # 5. Re-attach the saved image to the last user message if it survived filtering if saved_image_part_for_last_user and _last_user_msg_temp_id: for final_msg_dict_item in current_final_messages: if ( isinstance(final_msg_dict_item, dict) and final_msg_dict_item.get("__temp_id__") == _last_user_msg_temp_id ): # This is the original last user message that survived. content_field = final_msg_dict_item.get("content") if isinstance(content_field, list): content_field.append(saved_image_part_for_last_user) else: # If content is not a list (e.g., it was string, or became None/empty string after processing) # It needs to become a list to hold the image part. # Since an image was saved, original content for this message was a list. # So, content_field should ideally be a list here. # If it's not (e.g. empty string from prior processing), make it a list. final_msg_dict_item["content"] = [ saved_image_part_for_last_user ] break # 6. Clean up temporary IDs from the final list of messages final_messages_cleaned: List[Dict[str, Any]] = [] for msg_data_final_item in current_final_messages: if isinstance(msg_data_final_item, dict): cleaned_msg = json.loads(json.dumps(msg_data_final_item)) if "__temp_id__" in cleaned_msg: del cleaned_msg["__temp_id__"] final_messages_cleaned.append(cleaned_msg) else: # Pass through non-dictionary items from current_final_messages final_messages_cleaned.append(msg_data_final_item) body["messages"] = final_messages_cleaned return body
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
0