"""
title: Web Search using Google PSE with Jina Reader API
author: Jacob DeLacerda
version: 0.1.0
license: MIT
deep-research: https://youtu.be/4qrVoMx4UV8
"""
import os
import requests
import json
from urllib.parse import urlparse
import re
import unicodedata
from pydantic import BaseModel, Field
import asyncio
import concurrent.futures
from typing import Callable, Any
class HelpFunctions:
def get_base_url(self, url):
url_components = urlparse(url)
return f"{url_components.scheme}://{url_components.netloc}"
def generate_excerpt(self, content, max_length=200):
return content[:max_length] + "..." if len(content) > max_length else content
def format_text(self, text):
text = unicodedata.normalize("NFKC", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def remove_emojis(self, text):
return "".join(c for c in text if not unicodedata.category(c).startswith("So"))
def process_search_result(self, result, valves):
url_site = result["url"]
if valves.IGNORED_WEBSITES:
base_url = self.get_base_url(url_site)
if any(
ignored_site.strip() in base_url
for ignored_site in valves.IGNORED_WEBSITES.split(",")
):
return None
try:
jina_url = f"{valves.JINA_API_BASE_URL}/{url_site}"
headers = {
"X-No-Cache": "true",
"X-With-Images-Summary": "true",
"X-With-Links-Summary": "true",
}
response = requests.get(jina_url, headers=headers, timeout=20)
response.raise_for_status()
content = response.text
content = (
content.split("Images:")[0].strip() if "Images:" in content else content
)
content = self.format_text(content)
return {
"title": self.remove_emojis(result["title"]),
"url": url_site,
"content": self.truncate_to_n_words(
content, valves.PAGE_CONTENT_WORDS_LIMIT
),
"snippet": self.remove_emojis(result.get("content", "")),
}
except requests.exceptions.RequestException:
return None
def truncate_to_n_words(self, text: str, n: int) -> str:
words = text.split()
return " ".join(words[:n])
class EventEmitter:
def __init__(self, callback: Callable[[dict], Any] = None):
self.callback = callback
async def emit(
self,
message: str = None,
status: str = None,
description: str = None,
done: bool = False,
):
if self.callback:
await self.callback(
{
"message": message,
"status": status,
"description": description,
"done": done,
}
)
class Tools:
class Valves(BaseModel):
# Jina Reader configuration
JINA_API_BASE_URL: str = Field(
default="https://r.jina.ai", description="Jina Reader API base URL"
)
# Google Search configuration
GOOGLE_API_KEY: str = Field(
default="", # Replace with your API key
description="Google Custom Search API Key",
)
GOOGLE_CSE_ID: str = Field(
default="", # Replace with your Search Engine ID
description="Google Custom Search Engine ID",
)
# General settings
IGNORED_WEBSITES: str = Field(
default="", description="Comma-separated list of websites to ignore"
)
RETURNED_PAGES_COUNT: int = Field(
default=3, description="Number of pages to return"
)
TOTAL_PAGES_COUNT: int = Field(default=5, description="Total pages to search")
PAGE_CONTENT_WORDS_LIMIT: int = Field(
default=2000, description="Word limit per page for LLM context"
)
CITATION_LINKS: bool = Field(
default=True, description="Include citation metadata for LLM"
)
def __init__(self):
self.valves = self.Valves()
self.headers = {
"X-No-Cache": "true",
"X-With-Images-Summary": "true",
"X-With-Links-Summary": "true",
}
async def search_web(
self, query: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
functions = HelpFunctions()
emitter = EventEmitter(__event_emitter__)
try:
await emitter.emit(f"Searching for: {query}")
params = {
"q": query,
"format": "json",
"number_of_results": self.valves.TOTAL_PAGES_COUNT,
}
google_search_url = "https://www.googleapis.com/customsearch/v1"
params = {
"key": self.valves.GOOGLE_API_KEY,
"cx": self.valves.GOOGLE_CSE_ID,
"q": query,
"num": self.valves.TOTAL_PAGES_COUNT,
}
response = requests.get(
google_search_url, params=params, headers=self.headers, timeout=120
)
response.raise_for_status()
search_items = response.json().get("items", [])[
: self.valves.TOTAL_PAGES_COUNT
]
results = [
{
"title": item.get("title", ""),
"url": item.get("link", ""),
"content": item.get("snippet", ""),
}
for item in search_items
]
processed_results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(
functions.process_search_result, result, self.valves
)
for result in results
]
for future in concurrent.futures.as_completed(futures):
if result := future.result():
processed_results.append(result)
if len(processed_results) >= self.valves.RETURNED_PAGES_COUNT:
break
if self.valves.CITATION_LINKS and __event_emitter__:
for result in processed_results:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [result["content"]],
"metadata": [{"source": result["url"]}],
"source": {"name": result["title"]},
},
}
)
await emitter.emit(
status="complete",
description=f"Retrieved {len(processed_results)} results",
done=True,
)
return json.dumps(processed_results, ensure_ascii=False)
except Exception as e:
await emitter.emit(
status="error", description=f"Error: {str(e)}", done=True
)
return json.dumps({"error": str(e)})