"""
title: Chat History Filter (gRPC, Optimized for Performance)
author: chrish71
version: 0.1.19.grpc
required_open_webui_version: 0.6.5+
description: >
Inlet: retrieve semantic summaries from Pinecone (using gRPC) & trim prompt to
[system summary + last N raw messages], with detailed Pipedream metrics.
Outlet: every M exchanges summarize via OpenRouter & upsert back to Pinecone (using gRPC),
reporting timings, counts, and scores via Pipedream.
This version includes performance optimizations by making I/O-bound and CPU-bound calls asynchronous.
Note: This version uses the Pinecone gRPC client. Ensure you have it installed:
`pip install "pinecone[grpc]>=3.0.0" "openai>=1.0.0" "httpx>=0.20.0" "sentence-transformers" "python-dotenv" "requests"`
And set the `PINECONE_INDEX_HOST` environment variable.
"""
import os
import sys
import time
import logging
import traceback
from datetime import datetime
from typing import Any, Dict, List, Optional
import asyncio
import requests
import httpx
from pydantic import BaseModel, Field
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from openai import AsyncOpenAI
from pinecone.grpc import PineconeGRPC
from pinecone import ServerlessSpec
# Load environment variables
load_dotenv()
# ─── Logging ─────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("chat_history_filter_optimized")
logger.setLevel(logging.DEBUG)
logger.debug("✅ Debug logging is ACTIVE")
# ─── Global Embedder Initialization ──────────────────────────────────────────────
GLOBAL_EMBEDDER = SentenceTransformer("BAAI/bge-large-en-v1.5")
GLOBAL_VECTOR_DIM = GLOBAL_EMBEDDER.get_sentence_embedding_dimension()
logger.info(
"🧠 Global embedder loaded: model=BAAI/bge-large-en-v1.5, dim=%d", GLOBAL_VECTOR_DIM
)
class Filter:
class Valves(BaseModel):
pinecone_api_key: str = Field(
default_factory=lambda: os.getenv("PINECONE_API_KEY", "")
)
pinecone_environment: str = Field(
default_factory=lambda: os.getenv("PINECONE_ENVIRONMENT", "")
)
pinecone_index_host: str = Field(
default_factory=lambda: os.getenv("PINECONE_INDEX_HOST", "")
)
index_name: str = Field(
default_factory=lambda: os.getenv("PINECONE_INDEX_NAME", "")
)
namespace: str = Field(
default_factory=lambda: os.getenv("PINECONE_NAMESPACE", "")
)
vector_dim: int = Field(
default_factory=lambda: int(os.getenv("VECTOR_DIM", "")) # Default added
)
top_k: int = Field(default_factory=lambda: int(os.getenv("TOP_K", "")))
keep_recent: int = Field(
default_factory=lambda: int(os.getenv("KEEP_RECENT_TURNS", ""))
)
openrouter_api_key: str = Field(
default_factory=lambda: os.getenv("OPENROUTER_API_KEY", "")
)
summary_model: str = Field(
default_factory=lambda: os.getenv("OPENROUTER_MODEL", "")
)
turns_before: int = Field(
default_factory=lambda: int(os.getenv("TURNS_BEFORE_SUMMARY", ""))
)
pipedream_webhook_url: str = Field(
default_factory=lambda: os.getenv("PIPEDREAM_WEBHOOK_URL", "")
)
def __init__(self):
self.valves = self.Valves()
logger.info("⚙️ Filter configuration loaded: %s", self.valves.json())
if GLOBAL_VECTOR_DIM != self.valves.vector_dim:
logger.warning(
"⚠️ VECTOR_DIM mismatch %d → %d. Updating valve.",
self.valves.vector_dim,
GLOBAL_VECTOR_DIM,
)
self.valves.vector_dim = GLOBAL_VECTOR_DIM
if not self.valves.pinecone_api_key:
logger.error("PINECONE_API_KEY is not set.")
raise ValueError("PINECONE_API_KEY is not set.")
if not self.valves.pinecone_index_host:
logger.error(
"PINECONE_INDEX_HOST environment variable is not set. This is required for gRPC client."
)
raise ValueError("PINECONE_INDEX_HOST is not set.")
try:
self.pc = PineconeGRPC(api_key=self.valves.pinecone_api_key)
self.idx = self.pc.Index(host=self.valves.pinecone_index_host)
logger.info(
f"Successfully connected to Pinecone index via gRPC using host: {self.valves.pinecone_index_host}"
)
except Exception as e:
logger.error(
f"Failed to initialize Pinecone gRPC client or connect to index: {e}",
exc_info=True,
)
raise
self.router = AsyncOpenAI(
api_key=self.valves.openrouter_api_key,
base_url="https://openrouter.ai/api/v1",
)
self.http_client = httpx.AsyncClient(timeout=10.0) # Increased timeout slightly
try:
if self.valves.pinecone_environment:
region, cloud = self.valves.pinecone_environment.split("-", 1)
self.spec = ServerlessSpec(cloud=cloud, region=region)
else:
self.spec = None
except Exception:
self.spec = None
logger.info(
"Filter initialized. Initial notification will be sent from async methods if URL is set."
)
async def notify(self, step: str, data: Dict[str, Any] = None):
url = self.valves.pipedream_webhook_url
if not url:
logger.debug(f"🔔 notify skipped/no URL step={step} data={data}")
return
payload = {
"step": step,
"ts": datetime.utcnow().isoformat() + "Z",
**(data or {}),
}
logger.debug(f"🔔 notify → POST {url} payload={payload}")
try:
resp = await self.http_client.post(url, json=payload)
logger.debug(f"🔔 notify ← {resp.status_code} {resp.text[:200]!r}")
resp.raise_for_status()
except httpx.RequestError as e:
logger.error(
f"🔔 notify (httpx) error: {e.__class__.__name__} - {e}", exc_info=True
)
except Exception as e:
logger.error(f"🔔 notify general error: {e}", exc_info=True)
# Helper to run sync CPU-bound function in a thread pool
async def _run_cpu_bound(self, func, *args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)
# Helper to run sync I/O-bound function (like Pinecone SDK calls if no async version)
async def _run_io_bound(self, func, *args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)
async def inlet(
self, body: Dict[str, Any], __user__: Dict[str, Any], **_
) -> Dict[str, Any]:
meta = body.get("metadata", {})
chat_id = (
body.get("chat_id")
or body.get("id")
or body.get("session_id")
or meta.get("chat_id")
or meta.get("session_id")
)
messages = body.get("messages", [])
await self.notify(
"inlet_start", {"chat_id": chat_id, "total_messages": len(messages)}
)
if not chat_id or len(messages) <= 1:
await self.notify(
"inlet_skip",
{
"reason": "no chat_id or too few messages",
"chat_id": chat_id,
"total_messages": len(messages),
},
)
return body
last_user = ""
user_messages = [m for m in messages if m.get("role") == "user"]
if user_messages:
last_user = user_messages[-1].get("content", "")
if not last_user:
logger.warning(f"No user message content found for chat_id: {chat_id}")
return body
t_encode = time.time()
vec = await self._run_cpu_bound(
GLOBAL_EMBEDDER.encode, last_user, truncation=True
)
vec = vec.tolist()
encode_ms = int((time.time() - t_encode) * 1000)
await self.notify(
"inlet_encode",
{
"chars_encoded": len(last_user),
"encode_ms": encode_ms,
"chat_id": chat_id,
},
)
t_query = time.time()
res = None
query_ms = None
try:
res = await self._run_io_bound(
self.idx.query,
vector=vec,
top_k=self.valves.top_k,
namespace=self.valves.namespace,
filter={
"user_id": {"$eq": __user__.get("id")},
"chat_id": {"$eq": chat_id},
},
include_metadata=True,
include_values=False,
)
query_ms = int((time.time() - t_query) * 1000)
except Exception as e:
logger.error(
f"Inlet gRPC query error for chat_id: {chat_id}: {e}", exc_info=True
)
await self.notify(
"inlet_query_error", {"error": str(e), "chat_id": chat_id}
)
matches = res.matches if res and hasattr(res, "matches") else []
all_scores = [round(getattr(m, "score", 0.0), 3) for m in matches]
avg_score = round(sum(all_scores) / len(all_scores), 3) if all_scores else None
min_score = min(all_scores) if all_scores else None
max_score = max(all_scores) if all_scores else None
await self.notify(
"inlet_query",
{
"top_k": self.valves.top_k,
"result_count": len(all_scores),
"all_scores": all_scores,
"avg_score": avg_score,
"min_score": min_score,
"max_score": max_score,
"query_ms": query_ms,
"chat_id": chat_id,
},
)
bullets = [
m.metadata.get("text")
for m in sorted(
matches, key=lambda x: getattr(x, "score", 0.0), reverse=True
)[:3]
if m.metadata and m.metadata.get("text")
]
if not bullets:
logger.debug(
f"No bullets from primary query for chat_id: {chat_id}. Performing fallback gRPC query."
)
t_fb = time.time()
fb_res = None
fallback_ms = None
try:
fb_res = await self._run_io_bound(
self.idx.query,
vector=[0.0] * self.valves.vector_dim,
top_k=self.valves.top_k,
namespace=self.valves.namespace,
filter={
"user_id": {"$eq": __user__.get("id")},
"chat_id": {"$eq": chat_id},
},
include_metadata=True,
include_values=False,
)
fallback_ms = int((time.time() - t_fb) * 1000)
except Exception as e:
logger.error(
f"Inlet fallback gRPC query error for chat_id: {chat_id}: {e}",
exc_info=True,
)
await self.notify(
"inlet_fallback_query_error", {"error": str(e), "chat_id": chat_id}
)
fb_matches = fb_res.matches if fb_res and hasattr(fb_res, "matches") else []
await self.notify(
"inlet_fallback",
{
"fallback_count": len(fb_matches),
"fallback_ms": fallback_ms,
"chat_id": chat_id,
},
)
bullets = [
m.metadata.get("text")
for m in sorted(
fb_matches, key=lambda x: getattr(x, "score", 0.0), reverse=True
)[:3]
if m.metadata and m.metadata.get("text")
]
if bullets:
await self.notify(
"inlet_inject",
{"injected_bullets_count": len(bullets), "chat_id": chat_id},
)
system_msg = {
"role": "system",
"content": "Relevant summary:\n" + "\n".join(f"- {b}" for b in bullets),
}
body["messages"] = [system_msg] + messages[-self.valves.keep_recent :]
await self.notify(
"inlet_end",
{"final_message_count": len(body.get("messages", [])), "chat_id": chat_id},
)
return body
async def outlet(
self,
body: Dict[str, Any],
__user__: Dict[str, Any],
__event_emitter__: Any,
**_,
) -> Dict[str, Any]:
meta = body.get("metadata", {})
chat_id = (
body.get("chat_id")
or body.get("id")
or body.get("session_id")
or meta.get("chat_id")
or meta.get("session_id")
)
messages = body.get("messages", []) or []
await self.notify(
"outlet_start",
{
"chat_id": chat_id,
"total_messages": len(messages),
"turns_before": self.valves.turns_before,
},
)
if not chat_id or len(messages) < self.valves.turns_before * 2:
await self.notify(
"outlet_skip",
{
"reason": "not enough turns",
"chat_id": chat_id,
"total_messages": len(messages),
"turns_before": self.valves.turns_before,
},
)
return body
window = messages[-self.valves.turns_before * 2 :]
await self.notify(
"outlet_summarize_start",
{
"chat_id": chat_id,
"window_size": len(window),
"window_snip_count": len(window[:2]),
"window_snip_0": (
window[0].get("content", "")[:50] if len(window) > 0 else ""
),
"window_snip_1": (
window[1].get("content", "")[:50] if len(window) > 1 else ""
),
},
)
t_sum = time.time()
summary = ""
try:
# Construct the user content for summarization
# Use single quotes inside the f-string expression to avoid backslashes
user_content_for_summary = "\n".join(
f"{m.get('role', 'unknown')}: {m.get('content', '')}" for m in window
)
resp = await self.router.chat.completions.create(
model=self.valves.summary_model,
messages=[
{
"role": "system",
"content": (
"You are an insightful, precise summarizer designed to condense chat conversations "
"into clear and concise summaries for long-term storage and efficient retrieval in a vector database."
),
},
{
"role": "user",
"content": user_content_for_summary,
},
],
max_tokens=256,
temperature=0.0,
extra_headers={"X-Title": "OpenWebUI"},
)
summary = resp.choices[0].message.content.strip()
summarize_ms = int((time.time() - t_sum) * 1000)
await self.notify(
"outlet_summary_end",
{
"chat_id": chat_id,
"summary_length_chars": len(summary),
"summarize_ms": summarize_ms,
},
)
except Exception as e:
logger.error(
f"Outlet summary error for chat_id: {chat_id}: {e}", exc_info=True
)
await self.notify("outlet_error", {"error": str(e), "chat_id": chat_id})
return body
if not summary:
logger.warning(
f"Empty summary generated for chat_id: {chat_id}. Skipping upsert."
)
return body
t_encode_outlet = time.time()
vec = await self._run_cpu_bound(
GLOBAL_EMBEDDER.encode, summary, truncation=True
)
vec = vec.tolist()
encode_outlet_ms = int((time.time() - t_encode_outlet) * 1000)
logger.info(
f"Outlet embedding took {encode_outlet_ms}ms for chat_id: {chat_id}"
)
rec_id = f"{chat_id}-{int(time.time())}"
metadata_to_upsert = {
"chat_id": chat_id,
"user_id": __user__.get("id"),
"timestamp": datetime.utcnow().isoformat() + "Z",
"text": summary,
"summary_id": rec_id,
"summary_length": len(summary),
"vector_dim": len(vec),
"model": self.valves.summary_model,
"version": "0.1.19.grpc", # Updated version
}
await self.notify(
"outlet_upsert_start",
{
"chat_id": chat_id,
"vector_dim": len(vec),
"metadata_keys": list(metadata_to_upsert.keys()),
},
)
t_up = time.time()
try:
upsert_response = await self._run_io_bound(
self.idx.upsert,
vectors=[{"id": rec_id, "values": vec, "metadata": metadata_to_upsert}],
namespace=self.valves.namespace,
)
upsert_ms = int((time.time() - t_up) * 1000)
await self.notify(
"outlet_upsert_end",
{
"upsert_ms": upsert_ms,
"chat_id": chat_id,
"upserted_count": getattr(upsert_response, "upserted_count", "N/A"),
},
)
except Exception as e:
logger.error(
f"Outlet gRPC upsert error for chat_id: {chat_id}: {e}", exc_info=True
)
await self.notify(
"outlet_upsert_error", {"error": str(e), "chat_id": chat_id}
)
await self.notify(
"outlet_end", {"summary_length_chars": len(summary), "chat_id": chat_id}
)
return body
async def __aenter__(self):
if not hasattr(self, "http_client") or self.http_client.is_closed:
self.http_client = httpx.AsyncClient(timeout=10.0)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if hasattr(self, "http_client") and not self.http_client.is_closed:
await self.http_client.aclose()