We're Hiring!
Whitepaper
Docs
Sign In
Tool
Tool
v10.3
Web Search
Last Updated
a month ago
Created
a year ago
Tool ID
web_search
Creator
@xiaopa233
Downloads
2.1K+
Get
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
Description
Web Search using SearXNG and Scrap first N Pages
README
Tool Code
Show
""" title: Web Search author: EntropyYue funding_url: https://github.com/EntropyYue/web_search version: 10.3 license: MIT """ import asyncio import ipaddress import json import re import unicodedata from collections.abc import Callable from typing import Any from urllib.parse import ParseResult, urlparse from aiohttp import ClientError, ClientSession, ClientTimeout from bs4 import BeautifulSoup from langchain_community.retrievers import BM25Retriever as LCBM25Retriever from pydantic import BaseModel, Field from tiktoken import get_encoding class MetaData(BaseModel): title: str url: str snippet: str | None = None def dict(self) -> dict[str, str | None]: return {"title": self.title, "url": self.url, "snippet": self.snippet} class LoadResult(BaseModel): text: str | None = None metadata: MetaData | None = None error: str | None = None def to_dict(self) -> dict[str, Any]: if self.error: return {"error": self.error} return { "text": self.text, "metadata": self.metadata.dict() if self.metadata else None, } class PageCleaner: def __init__(self, token_limit: int = 1000): self.token_limit = token_limit self.tokenizer = get_encoding("cl100k_base") self.invisible_chars = ["\ufeff", "\u200b", "\u2028", "\u2060"] def extract_title(self, soup: BeautifulSoup) -> str: title = ( soup.title.string if soup.title and soup.title.string else "No title found" ) return self._normalize_text(title) def extract_text(self, soup: BeautifulSoup) -> str: return soup.get_text(separator="\n", strip=True) def clean_text(self, text: str) -> str: text = self._normalize_text(text) text = re.sub("[ \\t]+", " ", text) text = self._remove_emojis(text) text = self._remove_invisible_chars(text) return text.strip() def truncate_tokens(self, text: str) -> str: tokens = self.tokenizer.encode(text) truncated = self.tokenizer.decode(tokens[: self.token_limit]) return self._remove_invisible_chars(truncated).strip() def _normalize_text(self, text: str) -> str: return unicodedata.normalize("NFKC", text).strip() def _remove_emojis(self, text: str) -> str: return "".join( c for c in text if not unicodedata.category(c).startswith("So") ) def _remove_invisible_chars(self, text: str) -> str: for ch in self.invisible_chars: text = text.replace(ch, "") return text class WebLoader: def __init__(self, ignore_websites: str, headers: dict, token_limit: int) -> None: self.ignore_websites = ignore_websites self.cleaner = PageCleaner(token_limit=token_limit) self.headers = headers def get_base_url(self, url: str) -> str: parsed_url: ParseResult = urlparse(url) return f"{parsed_url.scheme}://{parsed_url.netloc}" def _is_safe_url(self, url: str) -> bool: try: parsed = urlparse(url) if parsed.scheme != "https": return False hostname = parsed.hostname or "" try: ip = ipaddress.ip_address(hostname) if ( ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local ): return False except ValueError: pass return not ( hostname in ("localhost",) or hostname.endswith(".local") or hostname.endswith(".localdomain") ) except Exception: return False async def fetch_and_process_page( self, url: str, session: ClientSession ) -> LoadResult: if not self._is_safe_url(url): return LoadResult(error="不安全的URL, 仅支持HTTPS和公共网络") try: async with session.get(url, headers=self.headers) as response: response.raise_for_status() html = await response.text() except ClientError as e: return LoadResult(error=f"检索页面失败, 网络错误: {str(e)}") except Exception as e: return LoadResult(error=f"检索页面失败: {str(e)}") soup = BeautifulSoup(html, "html.parser") title = self.cleaner.extract_title(soup) raw_text = self.cleaner.extract_text(soup) clean_text = self.cleaner.clean_text(raw_text) truncated = self.cleaner.truncate_tokens(clean_text) return LoadResult(text=truncated, metadata=MetaData(title=title, url=url)) async def process_search_result( self, result: dict[str, str], session: ClientSession ) -> LoadResult | None: url = result["url"] snippet = result.get("content", "") if self.ignore_websites: base_url = self.get_base_url(url) ignored_sites = [s.strip() for s in self.ignore_websites.split(",")] if any(site in base_url for site in ignored_sites): return None result_data = await self.fetch_and_process_page(url, session) if result_data.text and result_data.metadata: result_data.metadata.snippet = self.cleaner._remove_emojis(snippet) return result_data return None class SearchEngine: def __init__(self, url: str, max_result: int, headers: dict) -> None: self.url = url self.max_result = max_result self.headers = headers async def search(self, query: str, session: ClientSession) -> dict[str, Any]: params = {"q": query, "format": "json"} try: async with session.get( self.url, params=params, headers=self.headers ) as resp: resp.raise_for_status() result = await resp.json() if "results" in result: result["results"] = result["results"][: self.max_result] return result except ClientError as e: raise RuntimeError(str(e)) from e class BM25Retriever: def __init__(self, documents: list[LoadResult], k=5) -> None: texts = [doc.text or "" for doc in documents if doc.text] metadatas = [doc.metadata or {} for doc in documents if doc.metadata] self.retriever = LCBM25Retriever.from_texts( texts=texts, metadatas=(metadata.dict() for metadata in metadatas) ) self.retriever.k = k async def ainvoke(self, query: str) -> list[LoadResult]: results = await self.retriever.ainvoke(query) return [ LoadResult(text=doc.page_content, metadata=MetaData(**doc.metadata)) for doc in results ] class EventEmitter: def __init__( self, enable_status: bool, enable_citation: bool, event_emitter: Callable[[dict], Any] | None = None, ): self.enable_status = enable_status self.enable_citation = enable_citation self.event_emitter = event_emitter async def _emit(self, type, data: dict[str, Any]) -> None: if not self.event_emitter: return await self.event_emitter({"type": type, "data": data}) async def status( self, description: str | None = None, status: str = "in_progress", done: bool = False, action: str | None = "web_search", queries: list[str] | None = None, count: int | None = None, urls: list[str] | None = None, ) -> None: if not self.enable_status: return await self._emit( type="status", data={ "description": description, "status": status, "done": done, "action": action, "queries": queries, "count": count, "urls": urls, }, ) async def queries(self, queries: list[str]) -> None: await self.status(action="web_search_queries_generated", queries=queries) async def urls(self, urls: list[str]) -> None: await self.status( action="web_search", description="Searched {{count}} sites", urls=urls ) async def retrieval(self, queries: list[str]) -> None: await self.status(action="queries_generated", queries=queries) async def fetched(self, count: int) -> None: await self.status(action="sources_retrieved", count=count, done=True) async def citation( self, document: list[str], metadata: list[dict[str, str]], source: dict[str, str], ) -> None: if not self.enable_citation: return await self._emit( type="citation", data={"document": document, "metadata": metadata, "source": source}, ) class Tools: class Valves(BaseModel): SEARXNG_ENGINE_API_BASE_URL: str = Field( default="https://example.com/search", description="搜索引擎的基础URL" ) IGNORED_WEBSITES: str = Field( default="", description="以逗号分隔的要忽略的网站列表" ) MAX_SEARCH_RESULTS: int = Field( default=3, description="单个关键词要返回的结果数" ) MAX_PROCESSED_RESULTS: int = Field(default=10, description="要处理的最大结果数") SEARCH_PAGE_TOKENS_LIMIT: int = Field( default=2000, description="搜索结果每页的限制Token数" ) GET_WEBSITE_TOKENS_LIMIT: int = Field( default=5000, description="获取网站的限制Token数" ) BM25_RERANK_TOP_K: int = Field( default=5, description="使用BM25重新排序时的Top-K" ) USE_ENV_PROXY: bool = Field(default=False, description="使用环境变量中的代理") WEB_LOAD_TIMEOUT: int = Field(default=5, description="网页抓取超时时间 (秒)") CITATION_LINKS: bool = Field( default=False, description="发送带有链接的自定义引用" ) STATUS: bool = Field(default=True, description="发送状态") def __init__(self): self.valves = self.Valves() self.timeout = ClientTimeout(total=self.valves.WEB_LOAD_TIMEOUT) self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" } async def search_web( self, queries: list[str], __event_emitter__: Callable[[dict], Any] | None = None ) -> str: """ 搜索网络并获取相关页面的内容,搜索未知知识、新闻、信息、公共联系信息、天气等 :params queries: 搜索中使用的关键词列表 :return: 网站内容的json格式 """ loader = WebLoader( ignore_websites=self.valves.IGNORED_WEBSITES, headers=self.headers, token_limit=self.valves.GET_WEBSITE_TOKENS_LIMIT, ) emitter = EventEmitter( enable_status=self.valves.STATUS, enable_citation=self.valves.CITATION_LINKS, event_emitter=__event_emitter__, ) search_engine = SearchEngine( url=self.valves.SEARXNG_ENGINE_API_BASE_URL, max_result=self.valves.MAX_SEARCH_RESULTS, headers=self.headers, ) await emitter.status("Searching the web") await emitter.queries(queries) async with ClientSession( trust_env=self.valves.USE_ENV_PROXY, timeout=self.timeout ) as session: tasks = [ asyncio.create_task(search_engine.search(query, session)) for query in queries ] results: list[dict[str, str]] = [] for done in asyncio.as_completed(tasks): try: search_result = await done except Exception as e: await emitter.status( status="error", description=f"搜索时出错: {str(e)}", done=True ) search_result = {} if "results" in search_result: results.extend(search_result["results"]) if len(results) == 0: await emitter.status( status="error", description="未找到搜索结果", done=True ) return json.dumps( {"error": "No search results found"}, indent=4, ensure_ascii=False ) await emitter.urls([result.get("url", "") for result in results]) results_json: list[LoadResult] = [] async with ClientSession( trust_env=self.valves.USE_ENV_PROXY, timeout=self.timeout ) as session: tasks = [ asyncio.create_task(loader.process_search_result(result, session)) for result in results ] for done in asyncio.as_completed(tasks): try: result_json = await done except Exception: continue if result_json: results_json.append(result_json) if len(results_json) >= self.valves.MAX_PROCESSED_RESULTS: for task in tasks: if not task.done(): task.cancel() await asyncio.gather(*tasks, return_exceptions=True) break if len(results_json) == 0: await emitter.fetched(0) return json.dumps( {"error": "No fetched results found"}, indent=4, ensure_ascii=False ) await emitter.retrieval(queries) bm25_retriever = BM25Retriever( results_json, k=self.valves.BM25_RERANK_TOP_K ) results_json = await bm25_retriever.ainvoke(" ".join(queries)) for result in results_json: if result.text and result.metadata: await emitter.citation( document=[result.text], metadata=[{"source": result.metadata.url}], source={"name": result.metadata.title}, ) await emitter.fetched(len(results_json)) return json.dumps( [r.to_dict() for r in results_json], indent=4, ensure_ascii=False ) async def get_website( self, urls: list[str], __event_emitter__: Callable[[dict], Any] | None = None ) -> str: """ 打开输入的网站并获取其内容 :params urls: 需要打开的网站列表 :return: 网站内容的json格式 """ loader = WebLoader( ignore_websites=self.valves.IGNORED_WEBSITES, token_limit=self.valves.GET_WEBSITE_TOKENS_LIMIT, headers=self.headers, ) emitter = EventEmitter( enable_status=self.valves.STATUS, enable_citation=self.valves.CITATION_LINKS, event_emitter=__event_emitter__, ) await emitter.status("Searching the web") await emitter.queries(urls) results_json: list[LoadResult] = [] if urls == []: return "" async with ClientSession( trust_env=self.valves.USE_ENV_PROXY, timeout=self.timeout ) as session: tasks = [ asyncio.create_task(loader.fetch_and_process_page(url, session)) for url in urls ] for task in asyncio.as_completed(tasks): try: result_site = await task except Exception: continue if result_site: results_json.append(result_site) if result_site.text and result_site.metadata: await emitter.citation( document=[result_site.text], metadata=[{"source": result_site.metadata.url}], source={"name": result_site.metadata.url}, ) await emitter.fetched(len(results_json)) return json.dumps( [r.to_dict() for r in results_json], indent=4, ensure_ascii=False )