import asyncio
import json
import aiohttp
from typing import Callable, Any, List, Optional
class EventEmitter:
def __init__(self, event_emitter: Optional[Callable[[dict], Any]] = None):
self.event_emitter = event_emitter
async def emit(self, event_type: str, data: dict):
if self.event_emitter:
await self.event_emitter({"type": event_type, "data": data})
async def update_status(
self, description: str, done: bool, action: str, urls: List[str]
):
await self.emit(
"status",
{"done": done, "action": action, "description": description, "urls": urls},
)
async def send_citation(self, title: str, url: str, content: str):
await self.emit(
"citation",
{
"document": [content],
"metadata": [{"name": title, "source": url, "html": False}],
},
)
class Tools:
@staticmethod
async def web_scrape(
urls: List[str], __event_emitter__: Optional[Callable[[dict], Any]] = None
) -> str:
"""
Scrapes content from provided URLs and returns a summary.
Only call this function for explicit web search requests or when URLs are provided.
:param urls: List of URLs to scrape. Each URL must include http or https.
:param __event_emitter__: Optional event emitter for status updates.
:return: Combined scraped and processed contents, or error messages.
"""
emitter = EventEmitter(__event_emitter__)
api_url = "https://gpts.webpilot.ai/api/read"
headers = {"Content-Type": "application/json", "WebPilot-Friend-UID": "0"}
await emitter.update_status(
f"开始读取 {len(urls)} 个网页", False, "web_search", urls
)
async def process_url(url: str) -> str:
try:
async with aiohttp.ClientSession() as session:
async with session.post(
api_url,
headers=headers,
json={
"link": url,
"ur": "summary of the page",
"lp": True,
"rt": False,
"l": "en",
},
) as response:
response.raise_for_status()
result = await response.json()
result.pop("rules", None)
content = json.dumps(result, ensure_ascii=False)
title = result.get("title") or url
await emitter.send_citation(title, url, content)
return f"{content}\n"
except Exception as e:
error_message = f"读取网页 {url} 时出错: {str(e)}"
await emitter.update_status(error_message, False, "web_scrape", [url])
await emitter.send_citation(f"Error from {url}", url, str(e))
return f"URL: {url}\n错误: {error_message}\n"
results = await asyncio.gather(*[process_url(url) for url in urls])
await emitter.update_status(
f"已完成 {len(urls)} 个网页的读取", True, "web_search", urls
)
return "\n".join(results)