Whitepaper
Docs
Sign In
Tool
Tool
v0.0.1
Crawl4ai Web Scrape
Tool ID
crawl4ai_web_scrape
Creator
@focuses
Downloads
472+
Web scraper with Crawl4ai server, faster and keep the content smaller
Get
README
No README available
Tool Code
Show
""" title: Crawl4ai Web Scrape description: An improved web scraping tool that extracts text content using crawl4ai server author: focuses version: 0.0.1 license: MIT """ import aiohttp import asyncio import re from typing import Callable, Optional, Awaitable, Dict, Any from enum import Enum from pydantic import BaseModel, Field import time import urllib.parse import unittest class Event(Enum): START = "start" WAITING = "waiting" FINISHED = "finished" ERROR = "error" class Crawler: def __init__(self, base_url: str, token: str = None, timeout=300): self.base_url = base_url self.token = token self.timeout = timeout async def submit_and_wait( self, request_data: dict, token: str = None, timeout: int = 0, hook: Optional[Callable[[Event, Dict[str, Any]], Awaitable[None]]] = None, ) -> dict: """Submit crawl job and wait for completion using aiohttp with a unified hook.""" if token is None: token = self.token if timeout == 0: timeout = self.timeout headers = {"Authorization": f"Bearer {token}"} async with aiohttp.ClientSession(headers=headers) as session: # Submit crawl job endpoint = self.get_crawler_url("crawl") # print(endpoint) try: # Attempt to construct the task_id before the request. If this fails, the task_id will be None # task_id = request_data.get("task_id", None) # attempt to get a task ID from the request # start_info: Dict[str, Any] = {"request_data": request_data, "task_id": task_id} if hook: await hook(Event.START, {"url": request_data["urls"]}) async with session.post(endpoint, json=request_data) as response: response_json = await response.json() # print(response_json) task_id = response_json["task_id"] # print(f"Task ID: {task_id}") # Poll for result start_time = time.time() waiting_info: Dict[str, Any] = {"task_id": task_id} if hook: await hook(Event.WAITING, waiting_info) while True: if time.time() - start_time > timeout: raise TimeoutError(f"Task {task_id} timeout") async with session.get( self.get_crawler_url(f"task/{task_id}") ) as result: status = await result.json() if status["status"] == "completed": finished_info: Dict[str, Any] = { "task_id": task_id, "status": status["status"], } if hook: await hook(Event.FINISHED, finished_info) return status await asyncio.sleep(2) except Exception as e: error_info: Dict[str, Any] = {"task_id": task_id, "exception": e} if hook: await hook(Event.ERROR, error_info) raise # Re-raise the exception to ensure it's not swallowed finally: await session.close() def get_crawler_url(self, sub_dir): full_url = urllib.parse.urljoin(self.base_url, sub_dir) return full_url # ensure session is closed. A better pattern might be to use `async with` more narrowly, but this will do for this context. class EventEmitter: def __init__(self, event_emitter: Callable[[dict], Any] = None): self.event_emitter = event_emitter async def progress_update(self, description): await self.emit(description) async def error_update(self, description): await self.emit(description, "error", True) async def success_update(self, description): await self.emit(description, "success", True) async def emit(self, description="Unknown State", status="in_progress", done=False): if self.event_emitter: await self.event_emitter( { "type": "status", "data": { "status": status, "description": description, "done": done, }, } ) class Tools: class Valves(BaseModel): CRAWL4AI_URL: str = Field( default="http://127.0.0.1:11235/", description="Crawl4ai server URL example: http://127.0.0.1:11235/", ) CRAWL4AI_TOKEN: str = Field( default="123456", description="(Optional) Crawl4ai server token", ) def __init__(self): self.valves = self.Valves() self.citation = True async def web_scrape( self, url: str, __event_emitter__: Callable[[dict], Any] = None, __user__: dict = {}, ) -> str: """ Scrape and process a web page using r.jina.ai :param url: The URL of the web page to scrape. :return: The scraped and processed webpage content, or an error message. """ emitter = EventEmitter(__event_emitter__) await emitter.progress_update(f"Scraping {url}") crawl4ai_url = self.valves.CRAWL4AI_URL # Initialize Crawler with the base URL crawler = Crawler(crawl4ai_url, self.valves.CRAWL4AI_TOKEN) # Define the hook function to handle events async def hook(event: Event, data: Dict[str, Any]): if event == Event.START: await emitter.progress_update(f"Starting crawl for {data}") elif event == Event.WAITING: await emitter.progress_update(f"Waiting for crawl results of {data}") elif event == Event.FINISHED: await emitter.success_update(f"Crawl completed for {data}") elif event == Event.ERROR: error_message = f"Error during crawl for {url}: {data.get('exception', 'Unknown error')}" await emitter.error_update(error_message) # Prepare the request data request_data = { "urls": url, "crawler_params": { # Browser Configuration "headless": True, # Run in headless mode "browser_type": "chromium", # chromium/firefox/webkit "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", # Anti-Detection Features "simulate_user": True, # Simulate human behavior "magic": True, # Advanced anti-detection "override_navigator": True, # Override navigator properties }, "extra": {"only_text": True}, # "css_selector": "#js_content", # sohu "css_selector": ".article", # for 澎湃"css_selector": '''[class^='index_cententWrap']''' } try: # Submit the crawl job and wait for completion result = await crawler.submit_and_wait(request_data, hook=hook) # result = await crawler.submit_and_wait(request_data) # return result["result"]["markdown_v2"]["markdown_with_citations"] return remove_img(result["result"]["markdown"]) except Exception as e: error_message = f"Error scraping web page: {str(e)}" await emitter.error_update(error_message) return error_message def remove_img(markdown_text): pattern = r"!\[.*?\]\((data:image\/(?:[a-zA-Z]+);base64,[^\)]+)|(.*?)\)" cleaned_text = re.sub(pattern, "", markdown_text) return cleaned_text class WebScrapeTest(unittest.IsolatedAsyncioTestCase): async def test_web_scrape(self): url = "https://mp.weixin.qq.com/s/GHN4p5Gi0sUIPRbM-A02GQ" tool = Tools() content = await tool.web_scrape(url) print(content) self.assertEqual(1, 1) async def main(): tools = Tools() content = await tools.web_scrape( "https://mp.weixin.qq.com/s/GHN4p5Gi0sUIPRbM-A02GQ" ) print(content) if __name__ == "__main__": print("Running tests...") # unittest.main() asyncio.run(main())