"""
title: Context Length Filter
description: Intelligently manages chat context by applying three-tier filtering: conversation turns, image count, and token limits. Strips images from older messages while preserving text content to maintain conversation flow.
author: Kejun Luo
editor: Cloud Zhang
version: 0.10
edit date: 2025/7/21
"""
import tiktoken
from pydantic import BaseModel, Field
from typing import Optional, Callable, Any, Awaitable
import time
class Filter:
class Valves(BaseModel):
priority: int = Field(default=0, description="Priority level")
max_turns: int = Field(
default=25,
description="Number of conversation turns to retain. Set '0' for unlimited.",
)
token_limit: int = Field(
default=10000,
description="Number of token limit to retain. Set '0' for unlimited.",
)
max_images: int = Field(
default=5,
description="Maximum number of images to retain in conversation history. Set '0' for unlimited.",
)
class UserValves(BaseModel):
pass
def __init__(self):
self.valves = self.Valves()
self.encoding = tiktoken.get_encoding("cl100k_base")
async def inlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__model__: Optional[dict] = None,
) -> dict:
print(
f"[Context Length Filter] Inlet execution started. Timestamp: {time.time()}"
)
messages = body.get("messages", [])
original_count = len(messages)
original_images = sum(self.count_images_in_message(msg) for msg in messages)
original_tokens = sum(self.count_text_tokens(msg) for msg in messages)
print(
f"[Context Length Filter] Original: {original_count} messages, {original_images} images, {original_tokens} tokens"
)
# 记录处理状态
processing_status = {
"turn_limited": False,
"image_limited": False,
"token_limited": False,
"original_stats": {
"messages": original_count,
"images": original_images,
"tokens": original_tokens,
"turns": max(0, (original_count - 1) // 2),
},
"final_stats": {},
"actions": [],
}
# === 阶段一:按轮数截断 ===
if self.valves.max_turns > 0:
current_turns = max(0, (len(messages) - 1) // 2)
if current_turns > self.valves.max_turns:
sent_msg_count = self.valves.max_turns * 2 + (len(messages) % 2)
messages = messages[-sent_msg_count:]
processing_status["turn_limited"] = True
processing_status["actions"].append(
f"轮数限制:{current_turns} → {self.valves.max_turns} 轮"
)
print(
f"[Context Length Filter] After turn limit: {len(messages)} messages"
)
# === 阶段二:按图片数量截断和剥离 ===
if self.valves.max_images > 0:
messages, image_status = await self.truncate_and_strip_images(messages)
if image_status["limited"]:
processing_status["image_limited"] = True
processing_status["actions"].append(
f"图片限制:{image_status['original_images']} → {image_status['final_images']} 张图片"
)
print(
f"[Context Length Filter] After image processing: {len(messages)} messages"
)
# === 阶段三:按Token截断 ===
if self.valves.token_limit > 0:
messages, token_status = await self.truncate_by_tokens(messages)
if token_status["limited"]:
processing_status["token_limited"] = True
processing_status["actions"].append(
f"Token限制:{token_status['original_tokens']} → {token_status['final_tokens']} tokens"
)
print(
f"[Context Length Filter] After token limit: {len(messages)} messages"
)
# 计算最终统计
final_images = sum(self.count_images_in_message(msg) for msg in messages)
final_tokens = sum(self.count_text_tokens(msg) for msg in messages)
final_turns = max(0, (len(messages) - 1) // 2)
processing_status["final_stats"] = {
"messages": len(messages),
"images": final_images,
"tokens": final_tokens,
"turns": final_turns,
}
print(
f"[Context Length Filter] Final: {len(messages)} messages, {final_images} images, {final_tokens} tokens"
)
# 如果有任何限制被触发,发送综合状态消息
if any(
[
processing_status["turn_limited"],
processing_status["image_limited"],
processing_status["token_limited"],
]
):
await self.show_comprehensive_status(__event_emitter__, processing_status)
body["messages"] = messages
return body
async def truncate_and_strip_images(self, messages: list) -> tuple[list, dict]:
"""
根据max_images限制,从旧消息开始剥离图片,直到满足限制。
返回处理后的消息和状态信息
"""
status = {"limited": False, "original_images": 0, "final_images": 0}
if not messages:
return messages, status
# 统计所有消息中的图片
image_locations = []
total_images = 0
for i, msg in enumerate(messages):
image_count = self.count_images_in_message(msg)
if image_count > 0:
image_locations.append({"index": i, "count": image_count})
total_images += image_count
status["original_images"] = total_images
if total_images <= self.valves.max_images:
status["final_images"] = total_images
return messages, status
# 需要处理
status["limited"] = True
images_to_remove = total_images - self.valves.max_images
print(
f"[Context Image Filter] Image limit exceeded: {total_images}/{self.valves.max_images}. Need to remove {images_to_remove} images."
)
modified_messages = messages[:]
images_removed_count = 0
messages_to_remove = set()
# 从最旧的消息开始处理
for loc in image_locations:
if images_removed_count >= images_to_remove:
break
msg_index = loc["index"]
original_msg = modified_messages[msg_index]
# 剥离图片
stripped_msg = self.strip_images_from_message(original_msg)
if stripped_msg:
print(
f"[Context Image Filter] Stripping images from message at index {msg_index}"
)
modified_messages[msg_index] = stripped_msg
else:
print(
f"[Context Image Filter] Marking message at index {msg_index} for removal (only contained images)"
)
messages_to_remove.add(msg_index)
images_removed_count += loc["count"]
# 从后往前删除消息,避免索引变化
for index in sorted(messages_to_remove, reverse=True):
del modified_messages[index]
status["final_images"] = total_images - images_removed_count
return modified_messages, status
async def truncate_by_tokens(self, messages: list) -> tuple[list, dict]:
"""
根据文本token数量截断消息
返回处理后的消息和状态信息
"""
status = {"limited": False, "original_tokens": 0, "final_tokens": 0}
if not messages:
return messages, status
total_tokens = sum(self.count_text_tokens(msg) for msg in messages)
status["original_tokens"] = total_tokens
if total_tokens <= self.valves.token_limit:
status["final_tokens"] = total_tokens
return messages, status
# 需要截断
status["limited"] = True
print(
f"[Context Token Filter] Token limit exceeded: {total_tokens}/{self.valves.token_limit}."
)
filtered_messages = []
current_tokens = 0
# 从后往前遍历以保留最新的消息
for msg in reversed(messages):
msg_tokens = self.count_text_tokens(msg)
if (
current_tokens + msg_tokens > self.valves.token_limit
and len(filtered_messages) > 0
):
# 已经有保留的消息了,并且再加这条就超限,则停止
break
filtered_messages.insert(0, msg)
current_tokens += msg_tokens
status["final_tokens"] = current_tokens
return filtered_messages, status
def strip_images_from_message(self, msg: dict) -> dict:
"""
从一条消息中移除图片内容,只保留文本。
如果移除后内容为空,则返回 None。
"""
content = msg.get("content", "")
if not isinstance(content, list):
# 如果不是多模态内容,或者已经是纯文本,直接返回原消息
return msg
new_content = []
for item in content:
if isinstance(item, dict):
item_type = item.get("type", "")
if item_type == "text":
text = item.get("text", "").strip()
if text: # 只保留非空文本
new_content.append(item)
elif item_type not in ["image", "image_url"] and not (
item_type == "file" and self.is_image_file(item)
):
# 保留非图片的其他内容
new_content.append(item)
if not new_content:
# 如果剥离图片后没有任何文本内容,则此消息应被删除
return None
# 创建消息副本并更新内容
modified_msg = msg.copy()
if len(new_content) == 1 and new_content[0].get("type") == "text":
# 如果只剩一个文本部分,可以简化为纯字符串
modified_msg["content"] = new_content[0].get("text", "")
else:
modified_msg["content"] = new_content
return modified_msg
def count_images_in_message(self, msg: dict) -> int:
"""统计消息中的图片数量"""
content = msg.get("content", "")
if isinstance(content, list):
return sum(
1
for item in content
if isinstance(item, dict)
and (
item.get("type", "") in ["image", "image_url"]
or (item.get("type") == "file" and self.is_image_file(item))
)
)
return 0
def is_image_file(self, file_item: dict) -> bool:
"""判断文件是否为图片"""
try:
filename = file_item.get("name", "").lower()
image_extensions = [
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".webp",
".svg",
".tiff",
".ico",
]
if any(filename.endswith(ext) for ext in image_extensions):
return True
mime_type = file_item.get("type", "").lower()
if mime_type.startswith("image/"):
return True
return False
except Exception:
return False
def count_text_tokens(self, msg: dict) -> int:
"""只计算文本部分的token数量"""
content = msg.get("content", "")
total_tokens = 0
try:
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
total_tokens += len(self.encoding.encode(item.get("text", "")))
elif isinstance(content, str):
total_tokens = len(self.encoding.encode(content))
role = msg.get("role", "")
if role:
total_tokens += len(self.encoding.encode(role))
except Exception as e:
print(f"[Context Length Filter] Text Token calculation error: {e}")
if isinstance(content, str):
total_tokens = len(content) // 4
return total_tokens
async def show_comprehensive_status(
self, __event_emitter__: Callable[[Any], Awaitable[None]], status: dict
) -> None:
"""显示综合的处理状态"""
# 构建状态描述
description_parts = ["上下文已优化"]
if status["actions"]:
description_parts.extend(status["actions"])
# 添加最终统计
orig = status["original_stats"]
final = status["final_stats"]
summary = f"最终保留:{final['turns']} 轮对话 ({final['messages']} 条消息,{final['images']} 张图片,{final['tokens']} tokens)"
description_parts.append(summary)
# 构建完整描述
description = " | ".join(description_parts)
await __event_emitter__(
{"type": "status", "data": {"description": description, "done": True}}
)