"""
title: Jina Reader and YouTube Loader Scraper
description: Scrapes web pages using a locally hosted [Reader](https://github.com/intergalacticalvariable/reader) instance with optional r.Jina API key usage and extracts YouTube transcripts with YoutubeLoader.from_youtube_url, featuring asynchronous processing and content utilities.
author: @iamg30
author_url: https://openwebui.com/u/iamg30/
funding_url: https://github.com/open-webui
version: 0.1.2
license: MIT
"""
import requests
import re
import urllib.parse
import logging
import time
import random
from typing import Callable, Any, Dict, List, Set, Optional
from pydantic import BaseModel, Field
import asyncio
from functools import lru_cache
from bs4 import BeautifulSoup
from langchain_community.document_loaders import YoutubeLoader
import traceback
from urllib.parse import urlparse
from datetime import datetime, timedelta
# Setup Logging for Extensive Debugging
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
class EventEmitter:
def __init__(
self, event_emitter: Callable[[dict], Any] = None, show_logs: bool = True
):
"""
:param event_emitter: Function to emit events to the chat.
:param show_logs: Toggle to enable or disable event emitting (for debugging).
"""
self.event_emitter = event_emitter
self.show_logs = show_logs
async def progress_update(self, description):
if self.show_logs:
await self.emit(description)
async def error_update(self, description):
if self.show_logs:
await self.emit(description, "error", True)
async def success_update(self, description):
if self.show_logs:
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
logger.debug(f"Emitting {status} event: {description}")
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class DomainThrottler:
"""Manages request rates to different domains to avoid rate limiting."""
def __init__(self, default_rate_limit: float = 1.0):
"""
Initialize with default rate limit in seconds between requests.
:param default_rate_limit: Default minimum time between requests to the same domain
"""
self.default_rate_limit = default_rate_limit
self.domain_last_request: Dict[str, datetime] = {}
self.domain_rate_limits: Dict[str, float] = {}
self.domain_backoff_multiplier: Dict[str, float] = {}
def get_domain(self, url: str) -> str:
"""Extract domain from URL."""
parsed_url = urlparse(url)
return parsed_url.netloc
async def wait_if_needed(self, url: str) -> None:
"""
Wait if needed to respect rate limits for the given domain.
:param url: The URL being requested
"""
domain = self.get_domain(url)
# Initialize domain if not seen before
if domain not in self.domain_last_request:
self.domain_last_request[domain] = datetime.now() - timedelta(days=1)
self.domain_rate_limits[domain] = self.default_rate_limit
self.domain_backoff_multiplier[domain] = 1.0
# Calculate time since last request
now = datetime.now()
time_since_last = (now - self.domain_last_request[domain]).total_seconds()
current_rate_limit = (
self.domain_rate_limits[domain] * self.domain_backoff_multiplier[domain]
)
# Wait if needed
if time_since_last < current_rate_limit:
wait_time = current_rate_limit - time_since_last
logger.debug(f"Throttling requests to {domain}, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
# Update last request time
self.domain_last_request[domain] = datetime.now()
def report_success(self, url: str) -> None:
"""
Report successful request to gradually reduce backoff.
:param url: The URL of the successful request
"""
domain = self.get_domain(url)
if (
domain in self.domain_backoff_multiplier
and self.domain_backoff_multiplier[domain] > 1.0
):
# Gradually reduce backoff
self.domain_backoff_multiplier[domain] = max(
1.0, self.domain_backoff_multiplier[domain] * 0.8
)
def report_rate_limited(self, url: str) -> None:
"""
Report rate limited request to increase backoff.
:param url: The URL that triggered rate limiting
"""
domain = self.get_domain(url)
# Increase backoff multiplier
self.domain_backoff_multiplier[domain] = min(
20.0, self.domain_backoff_multiplier[domain] * 2.0
)
logger.warning(
f"Rate limit detected for {domain}. Increasing backoff to "
f"{self.domain_backoff_multiplier[domain]}x"
)
class URLCache:
"""Manages caching of already fetched URLs to avoid redundant scraping."""
def __init__(self, max_cache_size: int = 1000):
"""
Initialize URL cache.
:param max_cache_size: Maximum number of URLs to cache
"""
self.cache: Dict[str, Dict[str, Any]] = {}
self.max_cache_size = max_cache_size
self.access_order: List[str] = []
def get(self, url: str) -> Optional[Dict[str, Any]]:
"""
Get cached data for URL if available.
:param url: URL to look up
:return: Cached data or None if not in cache
"""
if url in self.cache:
# Update access order (move to end)
self.access_order.remove(url)
self.access_order.append(url)
return self.cache[url]
return None
def put(self, url: str, data: Dict[str, Any]) -> None:
"""
Add or update URL data in cache.
:param url: URL to cache
:param data: Data to cache
"""
# Add to cache
self.cache[url] = data
# Update access order
if url in self.access_order:
self.access_order.remove(url)
self.access_order.append(url)
# Enforce size limit
while len(self.cache) > self.max_cache_size:
oldest_url = self.access_order.pop(0)
self.cache.pop(oldest_url, None)
class UserAgentRotator:
"""Rotates through different user agents to appear more like normal traffic."""
def __init__(self):
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59",
]
def get_random_user_agent(self) -> str:
"""Get a random user agent from the list."""
return random.choice(self.user_agents)
class Tools:
class Valves(BaseModel):
DISABLE_CACHING: bool = Field(
default=False, description="Bypass Jina Cache when scraping"
)
JINA_INSTANCE_URL: str = Field(
default="http://host.docker.internal:3000",
description="The URL of your self-hosted Jina instance",
)
GLOBAL_JINA_API_KEY: str = Field(
default="",
description="(Optional) Jina API key. Allows a higher rate limit when scraping. Used when a User-specific API key is not available.",
)
SHOW_LOGS: bool = Field(
default=True,
description="Toggle Event Emitters. If False, no status updates are shown.",
)
DEFAULT_RATE_LIMIT: float = Field(
default=2.0,
description="Default minimum time (in seconds) between requests to the same domain",
)
MAX_CACHE_SIZE: int = Field(
default=1000,
description="Maximum number of URLs to cache in memory",
)
class UserValves(BaseModel):
CLEAN_CONTENT: bool = Field(
default=True,
description="Remove links and image urls from scraped content. This reduces the number of tokens.",
)
JINA_API_KEY: str = Field(
default="",
description="(Optional) Jina API key. Allows a higher rate limit when scraping.",
)
def __init__(self, valves: Valves = None, user_valves: UserValves = None):
self.valves = valves or self.Valves()
self.user_valves = user_valves or self.UserValves()
self.citation = True
# Initialize helpers
self.throttler = DomainThrottler(
default_rate_limit=self.valves.DEFAULT_RATE_LIMIT
)
self.url_cache = URLCache(max_cache_size=self.valves.MAX_CACHE_SIZE)
self.user_agent_rotator = UserAgentRotator()
# URL deduplication for batch operations
self.processed_urls: Set[str] = set()
async def web_scrape(
self, url: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""
Scrape and process a web page using a self-hosted Jina instance with bot protection.
"""
emitter = EventEmitter(__event_emitter__, self.valves.SHOW_LOGS)
# Check cache first
cached_data = self.url_cache.get(url)
if cached_data and not self.valves.DISABLE_CACHING:
await emitter.progress_update(f"Retrieved {url} from cache")
return cached_data.get("content", "")
# Respect rate limits
await self.throttler.wait_if_needed(url)
# Encode the URL to ensure it is properly formatted
encoded_url = urllib.parse.quote(url, safe="/:")
jina_url = f"{self.valves.JINA_INSTANCE_URL}/{encoded_url}"
headers = {
"X-No-Cache": "true" if self.valves.DISABLE_CACHING else "false",
"X-With-Generated-Alt": "true",
"User-Agent": self.user_agent_rotator.get_random_user_agent(),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"TE": "Trailers",
}
# Add API key to headers if available
api_key = self.valves.GLOBAL_JINA_API_KEY or self.user_valves.JINA_API_KEY
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
try:
response = requests.get(jina_url, headers=headers)
# Handle rate limiting
if response.status_code == 429:
self.throttler.report_rate_limited(url)
await emitter.error_update(f"Rate limited for {url}, backing off...")
# Retry with increased backoff
await asyncio.sleep(random.uniform(2.0, 5.0))
return await self.web_scrape(url, __event_emitter__)
response.raise_for_status()
# Report success to potentially reduce backoff
self.throttler.report_success(url)
content = response.text
# Cache the result
self.url_cache.put(
url, {"content": content, "timestamp": datetime.now().isoformat()}
)
logger.debug(f"Successfully scraped: {url}")
return content
except requests.RequestException as e:
logger.error(f"Error scraping web page {url}: {str(e)}")
await emitter.error_update(f"Error scraping web page: {str(e)}")
return f"Error scraping web page: {str(e)}"
async def web_scrape_async(
self, url: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""
Scrape and process a web page asynchronously.
"""
emitter = EventEmitter(__event_emitter__, self.valves.SHOW_LOGS)
await emitter.progress_update(f"Scraping {url}")
content = await self.web_scrape(url, __event_emitter__)
should_clean = self.user_valves.CLEAN_CONTENT
if should_clean:
await emitter.progress_update("Received content, cleaning up ...")
cleaned_content = clean_urls(content) if should_clean else content
title = extract_title(cleaned_content)
await emitter.success_update(f"Successfully Scraped {title if title else url}")
return cleaned_content
async def extract_info(
self, url: str, __event_emitter__: Callable[[dict], Any] = None
):
"""
Extract information from the scraped content.
"""
emitter = EventEmitter(__event_emitter__, self.valves.SHOW_LOGS)
content = await self.web_scrape_async(url, __event_emitter__)
title = extract_title(content)
links = extract_links(content)
images = extract_images(content)
await emitter.success_update(
f"Extracted information for {title if title else url}"
)
return {"title": title, "links": links, "images": images}
async def get_youtube_transcript(
self,
url: str,
__event_emitter__: Callable[[dict[str, dict[str, Any] | str]], Any],
) -> str:
"""
Provides the title and full transcript of a YouTube video in English.
Only use if the user supplied a valid YouTube URL.
Examples of valid YouTube URLs: https://youtu.be/dQw4w9WgXcQ, https://www.youtube.com/watch?v=dQw4w9WgXcQ
:param url: The URL of the youtube video that you want the transcript for.
:return: The title and full transcript of the YouTube video in English, or an error message.
"""
try:
if "dQw4w9WgXcQ" in url:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"{url} is not a valid youtube link",
"done": True,
},
}
)
return "The tool failed with an error. No transcript has been provided."
# Check cache first
cached_data = self.url_cache.get(url)
if cached_data and not self.valves.DISABLE_CACHING:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Retrieved YouTube transcript for {url} from cache",
"done": True,
},
}
)
return cached_data.get("content", "")
# Respect rate limits for YouTube
await self.throttler.wait_if_needed(url)
data = YoutubeLoader.from_youtube_url(
# video info seems to be broken
youtube_url=url,
add_video_info=False,
language=["en", "en_auto"],
).load()
if not data:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Failed to retrieve transcript for {url}. No results",
"done": True,
},
}
)
return "The tool failed with an error. No transcript has been provided."
transcript_content = f"Title: {data[0].metadata.get('title')}\nTranscript:\n{data[0].page_content}"
# Cache the result
self.url_cache.put(
url,
{
"content": transcript_content,
"timestamp": datetime.now().isoformat(),
},
)
# Report success to potentially reduce backoff
self.throttler.report_success(url)
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Successfully retrieved transcript for {url}",
"done": True,
},
}
)
return transcript_content
except:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Failed to retrieve transcript for {url}.",
"done": True,
},
}
)
return f"The tool failed with an error. No transcript has been provided.\nError Traceback: \n{traceback.format_exc()}"
async def multi_input_handler(
self,
urls: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Handles multiple URLs input by the user, either comma-separated or newline-separated.
Avoids scraping duplicate URLs.
:param urls: A string containing one or more URLs, separated by commas or newlines.
:return: A string containing the results of the tool's processing for each URL.
"""
results = []
urls = [url.strip() for url in re.split(r"[,\n]", urls) if url.strip()]
# Deduplicate URLs
unique_urls = []
for url in urls:
if url not in self.processed_urls:
unique_urls.append(url)
self.processed_urls.add(url)
if len(unique_urls) < len(urls):
await EventEmitter(
__event_emitter__, self.valves.SHOW_LOGS
).progress_update(f"Removed {len(urls) - len(unique_urls)} duplicate URLs")
if not unique_urls:
return "No new URLs to process. All URLs have already been processed."
async def process_url(url):
if "youtube" in url:
return await self.get_youtube_transcript(url, __event_emitter__)
else:
return await self.web_scrape_async(url, __event_emitter__)
# Create a semaphore to limit parallel requests
semaphore = asyncio.Semaphore(3) # Max 3 concurrent requests
async def limited_process(url):
async with semaphore:
return url, await process_url(url)
tasks = [limited_process(url) for url in unique_urls]
responses = await asyncio.gather(*tasks)
for url, response in responses:
results.append(f"URL: {url}\nResult: {response}")
return "\n\n".join(results)
def clean_urls(text) -> str:
"""
Clean URLs from a string containing structured text.
"""
return re.sub(r"\((http[^)]+)\)", "", text)
def extract_title(text):
"""
Extract the title from a string containing structured text.
"""
match = re.search(r"Title: (.*)\n", text)
return match.group(1).strip() if match else None
def extract_links(text):
"""
Extract links from a string containing structured text.
"""
soup = BeautifulSoup(text, "html.parser")
links = [a.get("href") for a in soup.find_all("a", href=True)]
return links
def extract_images(text):
"""
Extract images from a string containing structured text.
"""
soup = BeautifulSoup(text, "html.parser")
images = [img.get("src") for img in soup.find_all("img", src=True)]
return images
# Example of a Test Case (unittest)
import unittest
class WebScrapeTest(unittest.IsolatedAsyncioTestCase):
async def test_web_scrape(self):
url = "https://toscrape.com/"
content = await Tools().web_scrape_async(url)
self.assertEqual("Scraping Sandbox", extract_title(content))
self.assertEqual(len(content), 770)
async def test_youtube_transcript(self):
url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"
content = await Tools().get_youtube_transcript(url)
self.assertEqual(
"The tool failed with an error. No transcript has been provided.", content
)
async def test_multi_input_handler(self):
urls = "https://toscrape.com/, https://www.youtube.com/watch?v=dQw4w9WgXcQ, https://www.google.com/"
content = await Tools().multi_input_handler(urls)
self.assertTrue("Scraping Sandbox" in content)
self.assertTrue("Title: Despacito" in content)
self.assertTrue("URL: https://www.google.com/" in content)
async def test_deduplication(self):
tools = Tools()
urls = "https://toscrape.com/, https://toscrape.com/"
content = await tools.multi_input_handler(urls)
# Should only process once
self.assertEqual(content.count("URL: https://toscrape.com/"), 1)
async def test_throttling(self):
tools = Tools()
urls = (
"https://example.com/, https://example.com/page1, https://example.com/page2"
)
start_time = time.time()
content = await tools.multi_input_handler(urls)
end_time = time.time()
# Should take at least DEFAULT_RATE_LIMIT * 2 seconds due to throttling
self.assertTrue(end_time - start_time >= tools.valves.DEFAULT_RATE_LIMIT * 2)
if __name__ == "__main__":
print("Running tests...")
unittest.main()