import asyncio
import aiohttp
from typing import Callable, Any, Optional, List
EmitterType = Optional[Callable[[dict], Any]]
class EventEmitter:
def __init__(self, event_emitter: EmitterType):
self.event_emitter = event_emitter
async def emit(self, event_type: str, data: dict):
if self.event_emitter:
await self.event_emitter({"type": event_type, "data": data})
async def update_status(
self, description: str, done: bool, action: str, urls: List[str]
):
await self.emit(
"status",
{"done": done, "action": action, "description": description, "urls": urls},
)
async def send_citation(self, title: str, url: str, content: str):
await self.emit(
"citation",
{
"document": [content],
"metadata": [{"name": title, "source": url, "html": False}],
},
)
class Tools:
@staticmethod
async def web_scrape(
urls: List[str], user_request: str, __event_emitter__: EmitterType = None
) -> str:
"""
Scrape multiple web pages using r.jina.ai API
:param urls: List of URLs of the web pages to scrape.
:param user_request: The user's original request or query.
:param __event_emitter__: Optional event emitter for status updates.
:return: Combined scraped contents, or error messages.
"""
emitter = EventEmitter(__event_emitter__)
combined_results = []
await emitter.update_status(
f"Jina 正在读取 {len(urls)} 个网页", False, "web_search", urls
)
async def process_url(url):
jina_url = f"https://r.jina.ai/{url}"
headers = {
"X-No-Cache": "true",
"X-With-Images-Summary": "true",
"X-With-Links-Summary": "true",
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(jina_url, headers=headers) as response:
response.raise_for_status()
content = await response.text()
title = f"Scraped content from {url}"
await emitter.send_citation(title, url, content)
return f"URL: {url}\n内容: {content}\n"
except aiohttp.ClientError as e:
error_message = f"读取网页 {url} 时出错: {str(e)}"
await emitter.update_status(error_message, False, "web_scrape", [url])
return f"URL: {url}\n错误: {error_message}\n"
tasks = [process_url(url) for url in urls]
results = await asyncio.gather(*tasks)
combined_results.extend(results)
await emitter.update_status(
f"已完成 {len(urls)} 个网页的读取", True, "web_search", urls
)
# 将所有结果合并为一个字符串
final_result = "\n".join(combined_results)
return f"用户需求: {user_request}\n\n对请求的处理结果:\n{final_result}"