Tool
WebSearch
使用searxng和jina进行联网
Tool ID
websearch
Creator
@2171
Downloads
276+

Tool Content
python
import asyncio
import aiohttp
from typing import Callable, Any, Optional, List
from urllib.parse import quote

EmitterType = Optional[Callable[[dict], Any]]


class EventEmitter:
    def __init__(self, event_emitter: EmitterType):
        self.event_emitter = event_emitter

    async def emit(self, event_type: str, data: dict):
        if self.event_emitter:
            await self.event_emitter({"type": event_type, "data": data})

    async def update_status(
        self, description: str, done: bool, action: str, urls: List[str]
    ):
        await self.emit(
            "status",
            {"done": done, "action": action, "description": description, "urls": urls},
        )

    async def send_citation(self, title: str, url: str, content: str):
        await self.emit(
            "citation",
            {
                "document": [content],
                "metadata": [{"name": title, "source": url, "html": False}],
            },
        )


class Tools:
    @staticmethod
    async def web_search(
        query: str, user_request: str, __event_emitter__: EmitterType = None
    ) -> str:
        """
        Perform a web search using SearXNG API and then scrape the top 4 results

        :param query: The search query
        :param user_request: The user's original request or query
        :param __event_emitter__: Optional event emitter for status updates
        :return: Combined results from search and web scraping
        """
        emitter = EventEmitter(__event_emitter__)

        await emitter.update_status(f"正在搜索: {query}", False, "web_search", [])

        encoded_query = quote(query)
        search_url = (
            f"https://searxng.site/search?format=json&q={encoded_query}"
        )

        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(search_url) as response:
                    response.raise_for_status()
                    search_results = await response.json()

            urls = [result["url"] for result in search_results.get("results", [])[:4]]

            if not urls:
                await emitter.update_status(
                    "搜索未返回任何结果", True, "web_search", []
                )
                return "搜索未返回任何结果"

            await emitter.update_status(
                f"搜索完成,正在读取前 {len(urls)} 个结果", False, "web_search", urls
            )

            scraped_content = await Tools.web_scrape(
                urls, user_request, __event_emitter__
            )

            # 构建最终返回的字符串
            final_result = f"用户查询: {query}\n用户原始请求: {user_request}\n\n搜索结果及网页内容:\n{scraped_content}"

            await emitter.update_status("搜索和网页抓取完成", True, "web_search", urls)

            return final_result

        except aiohttp.ClientError as e:
            error_message = f"搜索时发生错误: {str(e)}"
            await emitter.update_status(error_message, True, "web_search", [])
            return error_message

    @staticmethod
    async def web_scrape(
        urls: List[str], user_request: str, __event_emitter__: EmitterType = None
    ) -> str:
        """
        Scrape multiple web pages using r.jina.ai API

        :param urls: List of URLs of the web pages to scrape.
        :param user_request: The user's original request or query.
        :param __event_emitter__: Optional event emitter for status updates.
        :return: Combined scraped contents, or error messages.
        """
        emitter = EventEmitter(__event_emitter__)

        combined_results = []

        await emitter.update_status(
            f"Jina 正在读取 {len(urls)} 个网页", False, "web_search", urls
        )

        async def process_url(url):
            jina_url = f"https://r.jina.ai/{url}"
            headers = {
                "X-No-Cache": "true",
                "X-With-Images-Summary": "true",
                "X-With-Links-Summary": "true",
            }

            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(jina_url, headers=headers) as response:
                        response.raise_for_status()
                        content = await response.text()

                title = f"Scraped content from {url}"
                await emitter.send_citation(title, url, content)
                return f"URL: {url}\n内容: {content}\n"

            except aiohttp.ClientError as e:
                error_message = f"读取网页 {url} 时出错: {str(e)}"
                await emitter.update_status(error_message, False, "web_scrape", [url])
                return f"URL: {url}\n错误: {error_message}\n"

        tasks = [process_url(url) for url in urls]
        results = await asyncio.gather(*tasks)
        combined_results.extend(results)

        await emitter.update_status(
            f"已完成 {len(urls)} 个网页的读取", True, "web_search", urls
        )

        # 将所有结果合并为一个字符串
        return "\n".join(combined_results)