import asyncio
import aiohttp
from typing import Callable, Any, Optional, List
from urllib.parse import quote
EmitterType = Optional[Callable[[dict], Any]]
class EventEmitter:
def __init__(self, event_emitter: EmitterType):
self.event_emitter = event_emitter
async def emit(self, event_type: str, data: dict):
if self.event_emitter:
await self.event_emitter({"type": event_type, "data": data})
async def update_status(
self, description: str, done: bool, action: str, urls: List[str]
):
await self.emit(
"status",
{"done": done, "action": action, "description": description, "urls": urls},
)
async def send_citation(self, title: str, url: str, content: str):
await self.emit(
"citation",
{
"document": [content],
"metadata": [{"name": title, "source": url, "html": False}],
},
)
class Tools:
@staticmethod
async def web_search(
query: str, user_request: str, __event_emitter__: EmitterType = None
) -> str:
"""
Perform a web search using SearXNG API and then scrape the top 4 results
:param query: The search query
:param user_request: The user's original request or query
:param __event_emitter__: Optional event emitter for status updates
:return: Combined results from search and web scraping
"""
emitter = EventEmitter(__event_emitter__)
await emitter.update_status(f"正在搜索: {query}", False, "web_search", [])
encoded_query = quote(query)
search_url = (
f"https://searxng.site/search?format=json&q={encoded_query}"
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(search_url) as response:
response.raise_for_status()
search_results = await response.json()
urls = [result["url"] for result in search_results.get("results", [])[:4]]
if not urls:
await emitter.update_status(
"搜索未返回任何结果", True, "web_search", []
)
return "搜索未返回任何结果"
await emitter.update_status(
f"搜索完成,正在读取前 {len(urls)} 个结果", False, "web_search", urls
)
scraped_content = await Tools.web_scrape(
urls, user_request, __event_emitter__
)
# 构建最终返回的字符串
final_result = f"用户查询: {query}\n用户原始请求: {user_request}\n\n搜索结果及网页内容:\n{scraped_content}"
await emitter.update_status("搜索和网页抓取完成", True, "web_search", urls)
return final_result
except aiohttp.ClientError as e:
error_message = f"搜索时发生错误: {str(e)}"
await emitter.update_status(error_message, True, "web_search", [])
return error_message
@staticmethod
async def web_scrape(
urls: List[str], user_request: str, __event_emitter__: EmitterType = None
) -> str:
"""
Scrape multiple web pages using r.jina.ai API
:param urls: List of URLs of the web pages to scrape.
:param user_request: The user's original request or query.
:param __event_emitter__: Optional event emitter for status updates.
:return: Combined scraped contents, or error messages.
"""
emitter = EventEmitter(__event_emitter__)
combined_results = []
await emitter.update_status(
f"Jina 正在读取 {len(urls)} 个网页", False, "web_search", urls
)
async def process_url(url):
jina_url = f"https://r.jina.ai/{url}"
headers = {
"X-No-Cache": "true",
"X-With-Images-Summary": "true",
"X-With-Links-Summary": "true",
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(jina_url, headers=headers) as response:
response.raise_for_status()
content = await response.text()
title = f"Scraped content from {url}"
await emitter.send_citation(title, url, content)
return f"URL: {url}\n内容: {content}\n"
except aiohttp.ClientError as e:
error_message = f"读取网页 {url} 时出错: {str(e)}"
await emitter.update_status(error_message, False, "web_scrape", [url])
return f"URL: {url}\n错误: {error_message}\n"
tasks = [process_url(url) for url in urls]
results = await asyncio.gather(*tasks)
combined_results.extend(results)
await emitter.update_status(
f"已完成 {len(urls)} 个网页的读取", True, "web_search", urls
)
# 将所有结果合并为一个字符串
return "\n".join(combined_results)