"""
title: Universal Scraper
description: A versatile scraping tool that automatically fetches content from standard web pages (including converting Reddit to Old Reddit for better scraping) using Jina Reader or retrieves transcripts from YouTube videos, supporting multiple mixed URLs with detailed UI feedback and citations.
original_author: Amos
funding_url: https://github.com/open-webui
version: 0.6.0
license: MIT
requirements: requests, youtube-transcript-api
"""
import requests
from typing import List, Dict, Any
import re
import xml.etree.ElementTree as ET # For parsing YouTube transcript XML
from pydantic import (
BaseModel,
Field,
ValidationError,
) # Import ValidationError for Pydantic issues
# Removed unittest and related mocks as tests are removed
import asyncio
from datetime import datetime # Needed for citation timestamp
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import TextFormatter
# --- Helper Functions (now outside the Tools class) ---
def extract_title(text: str) -> str | None:
"""
Extracts the title from a string containing structured text (like Jina Reader output)
assuming the format includes "Title: ...\n". Uses a non-greedy match.
:param text: The input string containing the title.
:return: The extracted title string, or None if the title is not found.
"""
match = re.search(r"Title: (.+?)\n", text)
return match.group(1).strip() if match else None
def extract_youtube_video_id(url: str) -> str | None:
"""
Extract YouTube video ID from various YouTube URL formats.
Supports youtube.com/watch?v=..., youtu.be/..., youtube.com/embed/...,
youtube.com/v/..., and potential variations with query parameters.
:param url: The YouTube video URL
:return: Extracted video ID or None if invalid
"""
# Regex to capture the video ID from various formats
match = re.search(
r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch\?v=|embed/|v/)|youtu\.be/)([^&?\n]+)",
url,
)
if match:
return match.group(1)
return None
def extract_youtube_video_title_from_html(html_content: str) -> str | None:
"""
Extracts the video title from the raw HTML content of a YouTube page.
Looks for the <title> tag content.
:param html_content: The HTML source of the YouTube video page.
:return: The extracted video title, or None if not found.
"""
match = re.search(r"<title>(.*?)</title>", html_content, re.DOTALL)
if match:
# Clean up common YouTube title suffixes like " - YouTube"
title = match.group(1).strip()
title = re.sub(r"\s*-\s*YouTube$", "", title)
return title
return None
def extract_youtube_caption_url(
captions_data: str, language_code: str = "en"
) -> str | None:
"""
Extracts the caption URL for a specific language from captions metadata (JSON string found in HTML).
Prioritizes the specified language, falls back to 'en' if specified not found, or the first available if 'en' not found
using the original script's simple find method as a final fallback.
:param captions_data: The caption metadata string (usually a JSON array as string).
:param language_code: The preferred language code (e.g., 'en', 'fr').
:return: Extracted caption URL or None if no suitable caption found.
"""
# --- Attempt to find preferred or English caption first ---
track_matches = re.finditer(r"\{.*?\}", captions_data)
preferred_url = None
english_url = None
first_valid_url = None
for match in track_matches:
track_str = match.group(0)
try:
language_match = re.search(r'"vssId":"(?P<id>.*?)"', track_str)
baseurl_match = re.search(r'"baseUrl":"(?P<url>.*?)"', track_str)
if not language_match or not baseurl_match:
continue
lang_id = language_match.group("id")
base_url = baseurl_match.group("url").replace("\\u0026", "&")
cleaned_lang_id = lang_id.lstrip("a.").lstrip(".").split(".", 1)[0]
if first_valid_url is None:
first_valid_url = base_url
is_auto_generated = lang_id.startswith("a.")
if cleaned_lang_id == language_code:
if not is_auto_generated:
preferred_url = base_url
break # Found manual preferred, stop
elif preferred_url is None:
preferred_url = base_url # Store auto preferred as fallback
if (
cleaned_lang_id == "en" and preferred_url is None
): # Only check English if preferred not found yet
if not is_auto_generated:
english_url = base_url # Manual English preferred
elif english_url is None:
english_url = (
base_url # Store auto English only if no manual English
)
except Exception as e:
print(f"Warning: Failed to parse caption track entry: {e}")
continue
# Return in priority: preferred manual/auto -> english manual/auto -> first valid found by language checks
if preferred_url or english_url or first_valid_url:
return preferred_url or english_url or first_valid_url
# --- End Attempt to find preferred or English caption first ---
# --- Fallback: Revert to original script's simple method ---
url_start = captions_data.find('"baseUrl":"')
if url_start == -1:
return None
url_start += len('"baseUrl":"')
url_end = captions_data.find('"', url_start)
if url_end == -1:
return None
fallback_url = captions_data[url_start:url_end].replace("\\u0026", "&")
print(
f"Warning: Language/priority search failed, falling back to first found URL: {fallback_url}"
)
return fallback_url
# --- End Fallback ---
def extract_transcript_from_xml(xml_content: str) -> str:
"""
Extracts readable text from YouTube XML captions.
Cleans common HTML entities and joins text blocks.
:param xml_content: The XML content of the captions
:return: The cleaned transcript text
"""
try:
root = ET.fromstring(xml_content)
transcript_lines = []
for text_element in root.findall(".//text"):
line = text_element.text
if line:
# Decode common XML/HTML entities
line = (
line.replace("'", "'")
.replace("&", "&")
.replace(""", '"')
.replace("<", "<")
.replace(">", ">")
)
transcript_lines.append(line)
# Join lines, ensuring a space separation
return " ".join(transcript_lines)
except ET.ParseError as e:
print(f"Error parsing XML transcript: {e}")
return ""
def clean_urls(text: str) -> str:
"""
Cleans URLs from a string containing structured text.
Specifically targets Jina Reader's link format like `(http://...)`.
:param text: The input string containing the URLs.
:return: The cleaned string with URLs removed.
"""
# Removes patterns like (http://...) or (https://...) followed by content inside parens
# using a non-greedy match for the URL part.
cleaned_text = re.sub(r"\s*\((https?://[^)]+?)\)", "", text)
return cleaned_text
# Helper function for Reddit URL manipulation
def _modify_reddit_url(url: str) -> str:
"""
Converts www.reddit.com or reddit.com URLs to old.reddit.com.
Leaves other URLs unchanged.
:param url: The input URL.
:return: Modified URL or original URL.
"""
# Use regex for case-insensitive check and robustness
match = re.match(r"^(https?://)(www\.)?(reddit\.com)(.*)$", url, re.IGNORECASE)
if match:
protocol = match.group(1)
path_and_query = match.group(4)
return f"{protocol}old.reddit.com{path_and_query}"
return url
# Helper functions for emitting status/citation, also moved outside
# Note: These expect the raw event_emitter callable passed by Open WebUI
async def _emit_status(
__event_emitter__, description: str, status: str = "in_progress", done: bool = False
):
if __event_emitter__:
try:
if asyncio.iscoroutinefunction(__event_emitter__):
await __event_emitter__(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
else:
# Handle synchronous event_emitter if necessary
__event_emitter__(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
except Exception as e:
print(f"Error emitting status event: {e}")
async def _emit_citation(__event_emitter__, url: str, title: str, content: str):
if __event_emitter__:
try:
citation_data = {
"type": "citation",
"data": {
"document": [content],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": title,
}
],
"source": {"name": title, "url": url},
},
}
if asyncio.iscoroutinefunction(__event_emitter__):
await __event_emitter__(citation_data)
else:
__event_emitter__(citation_data)
except Exception as e:
print(f"Error emitting citation event for {url}: {e}")
# --- Core scraping logic helpers (now outside Tools class) ---
# They now accept a 'tools_instance' parameter to access Valves and UserValves type
# and the global valves instance.
async def _scrape_web(
tools_instance: Any, # Accept the Tools instance (used for accessing Valves/UserValves definitions and global valves instance)
url: str, # This is the original URL from the user
user_valves: Any, # Accept Any, will validate/instantiate internally
__event_emitter__,
item_description: str,
) -> Dict[str, Any]:
"""Internal helper to scrape a standard web page using Jina Reader."""
# --- Apply Reddit URL Modification ---
url_to_fetch = _modify_reddit_url(url)
# --- End Modification ---
# --- Robust UserValves Handling ---
effective_user_valves: (
tools_instance.UserValves
) # Type hint for clarity, refers to the class within the instance
if not isinstance(user_valves, tools_instance.UserValves):
if isinstance(user_valves, dict):
try:
# Instantiate using the UserValves class from the passed Tools instance
effective_user_valves = tools_instance.UserValves(**user_valves)
except (ValidationError, TypeError) as e:
error_msg = f"Warning: Failed to parse user valves from dict in _scrape_web: {e}. Using default valves."
print(error_msg)
await _emit_status(
__event_emitter__, error_msg, status="error", done=False
)
effective_user_valves = (
tools_instance.UserValves()
) # Use default from the class
else:
error_msg = f"Warning: Received unexpected type for user_valves in _scrape_web: {type(user_valves).__name__}. Using default valves."
print(error_msg)
await _emit_status(__event_emitter__, error_msg, status="error", done=False)
effective_user_valves = (
tools_instance.UserValves()
) # Use default from the class
else:
effective_user_valves = user_valves
# --- End Robust UserValves Handling ---
jina_url = (
f"https://r.jina.ai/{url_to_fetch}" # Use the potentially modified URL here
)
headers = {
"X-With-Generated-Alt": "true",
"User-Agent": "Mozilla/5.0 (compatible; OpenWebUI/1.0; +https://github.com/open-webui/open-webui-tools/tree/main/universal_scrape)",
}
# Access global valves from the passed Tools instance
if tools_instance.valves.DISABLE_CACHING:
headers["X-No-Cache"] = "true"
api_key = (
effective_user_valves.JINA_API_KEY or tools_instance.valves.GLOBAL_JINA_API_KEY
)
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
try:
# Use url_to_fetch in description if different from original? Maybe too noisy.
# Stick to item_description which contains the original url.
scrape_description = f"Scraping {item_description} using Jina Reader..."
if url != url_to_fetch:
scrape_description += f" (via {url_to_fetch})"
await _emit_status(__event_emitter__, scrape_description)
response = requests.get(jina_url, headers=headers, timeout=60)
response.raise_for_status()
content = response.text
title = extract_title(content) or url # Use original URL as title if not found
if effective_user_valves.CLEAN_CONTENT:
await _emit_status(
__event_emitter__, f"Cleaning content for {item_description}..."
)
content = clean_urls(content)
await _emit_citation(
__event_emitter__, url, title, content
) # Use original URL in citation
await _emit_status(
__event_emitter__,
f"Successfully scraped web page: {title}",
status="success",
done=False,
)
return {
"url": url,
"title": title,
"content": content,
"status": "success",
} # Return original URL
except requests.exceptions.Timeout:
error_message = (
f"Error scraping {item_description}: Request timed out after 60 seconds."
)
await _emit_status(__event_emitter__, error_message, status="error", done=False)
return {"url": url, "error": error_message, "status": "timeout"}
except requests.RequestException as e:
error_message = f"Error scraping {item_description}: {str(e)}"
if (
isinstance(e, requests.exceptions.HTTPError)
and e.response is not None
and e.response.status_code in [401, 403]
):
error_message += " (Potential Jina API Key issue)"
await _emit_status(__event_emitter__, error_message, status="error", done=False)
return {"url": url, "error": error_message, "status": "error"}
except Exception as e:
error_message = f"An unexpected error occurred while scraping {item_description}: {type(e).__name__} - {e}"
await _emit_status(__event_emitter__, error_message, status="error", done=False)
return {"url": url, "error": error_message, "status": "unexpected_error"}
async def _fetch_youtube_transcript(
tools_instance: Any,
url: str,
user_valves: Any,
__event_emitter__,
item_description: str,
) -> Dict[str, Any]:
"""Internal helper to fetch YouTube video transcript using youtube-transcript-api."""
# --- Robust UserValves Handling ---
effective_user_valves: tools_instance.UserValves
if not isinstance(user_valves, tools_instance.UserValves):
if isinstance(user_valves, dict):
try:
effective_user_valves = tools_instance.UserValves(**user_valves)
except (ValidationError, TypeError) as e:
error_msg = f"Warning: Failed to parse user valves from dict in _fetch_youtube_transcript: {e}. Using default valves."
print(error_msg)
await _emit_status(
__event_emitter__, error_msg, status="error", done=False
)
effective_user_valves = tools_instance.UserValves()
else:
error_msg = f"Warning: Received unexpected type for user_valves in _fetch_youtube_transcript: {type(user_valves).__name__}. Using default valves."
print(error_msg)
await _emit_status(__event_emitter__, error_msg, status="error", done=False)
effective_user_valves = tools_instance.UserValves()
else:
effective_user_valves = user_valves
video_id = extract_youtube_video_id(url)
if not video_id:
error_message = f"Invalid YouTube URL format for {item_description}."
await _emit_status(__event_emitter__, error_message, status="error", done=False)
return {"url": url, "error": error_message, "status": "invalid_url"}
try:
# Initialize the API
youtube_api = YouTubeTranscriptApi()
await _emit_status(
__event_emitter__,
f"Fetching available transcripts for {item_description}...",
)
# First, let's get the video title by trying to list transcripts
# This also helps us understand what's available
try:
transcript_list = youtube_api.list(video_id)
# Try to get video title from the first available transcript
if transcript_list:
video_title = f"YouTube Video {video_id}" # Fallback title
# The transcript list doesn't contain title info, so we'll use a generic title
except Exception as e:
# If we can't list transcripts, we'll still try to fetch directly
video_title = f"YouTube Video {video_id}"
await _emit_status(
__event_emitter__,
f"Could not list available transcripts for {item_description}, trying direct fetch...",
status="warning",
done=False,
)
await _emit_status(
__event_emitter__,
f"Fetching transcript ({effective_user_valves.YOUTUBE_LANGUAGE}) for {item_description}...",
)
# Set up language preferences - primary language, then English fallback
languages = [effective_user_valves.YOUTUBE_LANGUAGE]
if effective_user_valves.YOUTUBE_LANGUAGE != "en":
languages.append("en")
# Fetch the transcript
try:
fetched_transcript = youtube_api.fetch(video_id, languages=languages)
# Extract the actual language that was used
actual_language = fetched_transcript.language_code
if actual_language != effective_user_valves.YOUTUBE_LANGUAGE:
await _emit_status(
__event_emitter__,
f"Using {actual_language} transcript (requested: {effective_user_valves.YOUTUBE_LANGUAGE}) for {item_description}...",
status="warning",
done=False,
)
await _emit_status(
__event_emitter__,
f"Processing transcript content for {item_description}...",
)
# Convert transcript to text
# Option 1: Simple text extraction (joining all text snippets)
transcript_text_parts = []
for snippet in fetched_transcript:
transcript_text_parts.append(snippet.text)
transcript_content = " ".join(transcript_text_parts)
# Option 2: If you want more detailed formatting with timestamps, use this instead:
# formatter = TextFormatter()
# transcript_content = formatter.format_transcript(fetched_transcript)
if not transcript_content.strip():
error_message = f"Transcript content is empty for {item_description}."
await _emit_status(
__event_emitter__, error_message, status="error", done=False
)
return {
"url": url,
"title": video_title,
"content": "",
"error": error_message,
"status": "empty_transcript",
}
# Update video title if we got it from the transcript
if hasattr(fetched_transcript, "video_id"):
video_title = f"YouTube Video: {fetched_transcript.video_id}"
await _emit_citation(
__event_emitter__, url, video_title, transcript_content
)
await _emit_status(
__event_emitter__,
f"Successfully fetched transcript for: {video_title}",
status="success",
done=False,
)
return {
"url": url,
"title": video_title,
"content": transcript_content,
"status": "success",
}
except Exception as transcript_error:
# Handle specific youtube-transcript-api errors
error_type = type(transcript_error).__name__
if "NoTranscriptFound" in error_type:
error_message = f"No transcript found for {item_description}. Video may not have captions available."
elif "TranscriptsDisabled" in error_type:
error_message = f"Transcripts are disabled for {item_description}."
elif "NoTranscriptAvailable" in error_type:
error_message = f"No transcript available in requested languages ({', '.join(languages)}) for {item_description}."
elif "VideoUnavailable" in error_type:
error_message = f"Video is unavailable for {item_description}. It may be private, deleted, or region-restricted."
elif "TooManyRequests" in error_type:
error_message = f"Too many requests to YouTube for {item_description}. Please try again later."
elif "RequestBlocked" in error_type or "IpBlocked" in error_type:
error_message = f"Request blocked by YouTube for {item_description}. Your IP may be temporarily banned."
else:
error_message = f"Error fetching transcript for {item_description}: {transcript_error}"
await _emit_status(
__event_emitter__, error_message, status="error", done=False
)
return {
"url": url,
"title": video_title,
"content": "",
"error": error_message,
"status": "transcript_error",
}
except Exception as e:
error_message = f"An unexpected error occurred while processing {item_description}: {type(e).__name__} - {e}"
await _emit_status(__event_emitter__, error_message, status="error", done=False)
return {
"url": url,
"title": f"YouTube Video {video_id}",
"error": error_message,
"status": "unexpected_error",
}
# --- Tools Class (Main Tool Definition) ---
class Tools:
"""
Container class for the web scraping tool, including configuration models
and the main scraping logic.
"""
class Valves(BaseModel):
"""Global configuration valves for the tool."""
DISABLE_CACHING: bool = Field(
default=False, description="Bypass Jina Cache when scraping"
)
GLOBAL_JINA_API_KEY: str = Field(
default="",
description="(Optional) Jina API key. Allows a higher rate limit when scraping standard web pages.",
)
class UserValves(BaseModel):
"""User-specific configuration valves for the tool."""
CLEAN_CONTENT: bool = Field(
default=True,
description="For standard web pages: Remove links and image urls from scraped content. This reduces the number of tokens.",
)
JINA_API_KEY: str = Field(
default="",
description="(Optional) Jina API key. Allows a higher rate limit when scraping standard web pages.",
)
YOUTUBE_LANGUAGE: str = Field(
default="en",
description="For YouTube videos: Preferred transcript language code (e.g., 'en', 'fr', 'es'). Falls back to English if preferred not available.",
)
def __init__(self):
self.valves = self.Valves()
self.citation = False # Set self.citation = False for custom citations
async def web_scrape(
self,
urls: List[str], # Accept a list of URLs
__event_emitter__=None, # Untyped emitter
__user__: dict = {}, # Accept the __user__ dictionary as per docs
) -> str:
"""
Scrape and process multiple web pages or fetch YouTube transcripts.
Automatically detects URL type. Handles multiple URLs in a list.
Converts Reddit URLs to old.reddit.com before scraping.
Content (web text or transcript) from each URL is returned as a separate block.
Emits status updates and citations for each processed URL.
:param urls: A list of URLs (strings) to process (web pages or YouTube videos).
:param __event_emitter__: Callable to emit real-time updates (status, citations). (auto-injected by Open WebUI)
:param __user__: Dictionary containing user information, including 'valves'. (auto-injected by Open WebUI)
:return: A combined string containing the processed content (or error messages)
for each URL. Each URL's result is delineated by '--- Start Content for URL: ... ---'
and '--- End Content for URL: URL ... ---' markers.
Returns an error message string if no valid URLs are provided.
"""
# Get the raw user valves data from __user__. This will be passed to helpers.
# The helpers are now responsible for validating/instantiating it robustly.
user_valves_data = __user__.get("valves")
# Optional initial warning if __user__['valves'] isn't the expected type
if user_valves_data is None or (
not isinstance(user_valves_data, self.UserValves)
and not isinstance(user_valves_data, dict)
):
print(
f"Warning: __user__['valves'] is missing or unexpected type ({type(user_valves_data).__name__}). Helpers will use default valves."
)
await _emit_status(
__event_emitter__,
"Warning: User valves config issue. Using defaults or attempting parsing per item.",
status="error",
done=False,
)
if not isinstance(urls, list) or not urls:
await _emit_status(
__event_emitter__, "No URLs provided.", status="error", done=True
)
return "Error: No URLs provided to the tool."
# Basic filtering for non-empty strings that look somewhat like URLs (must contain scheme or www.)
potential_urls = [
url.strip()
for url in urls
if isinstance(url, str) and re.match(r"^(https?://|www\.)\S+", url.strip())
]
invalid_urls = [url for url in urls if url.strip() not in potential_urls]
if invalid_urls:
invalid_preview = ", ".join(invalid_urls[:5]) + (
"..." if len(invalid_urls) > 5 else ""
)
await _emit_status(
__event_emitter__,
f"Skipping {len(invalid_urls)} invalid or improperly formatted URL(s): {invalid_preview}",
status="error",
done=False,
)
if not potential_urls:
await _emit_status(
__event_emitter__,
"No valid URLs found after initial filtering.",
status="error",
done=True,
)
return f"Error: No valid URLs found after initial filtering. Original inputs: {', '.join(urls)}"
results: List[Dict[str, Any]] = []
total_urls = len(potential_urls)
await _emit_status(
__event_emitter__, f"Starting processing for {total_urls} potential URL(s)."
)
for i, url in enumerate(potential_urls):
item_description = f"Item {i + 1}/{total_urls}: {url}"
# --- Determine URL Type and Call Appropriate Helper ---
# Check YouTube using the original URL
video_id = extract_youtube_video_id(url)
if video_id:
# Pass 'self' (the Tools instance) to the helper
result = await _fetch_youtube_transcript(
self, url, user_valves_data, __event_emitter__, item_description
)
results.append(result)
else:
# Assume it's a standard web URL.
# The _scrape_web helper will handle the Reddit URL modification internally.
# Pass 'self' (the Tools instance) to the helper
result = await _scrape_web(
self, url, user_valves_data, __event_emitter__, item_description
)
results.append(result)
formatted_output = ""
if not results:
formatted_output = "No URLs were processed successfully."
final_status_msg = "No URLs processed."
overall_status = "error"
else:
for result in results:
formatted_output += f"--- Start Content for URL: {result['url']} ---\n"
display_title = result.get("title", result["url"])
formatted_output += f"Source Title: {display_title}\n"
if result["status"] == "success":
formatted_output += f"Status: Success\n"
content_to_add = result.get("content")
if content_to_add is not None:
formatted_output += f"Content:\n{content_to_add}\n"
else:
formatted_output += "Content: (Empty)\n"
else:
formatted_output += f"Status: {result['status']}\n"
error_detail = result.get("error", "Details not available.")
if error_detail is not None:
formatted_output += f"Error:\n{error_detail}\n"
else:
formatted_output += "Error: (Details not available)\n"
formatted_output += f"--- End Content for URL: {result['url']} ---\n\n"
formatted_output = formatted_output.rstrip("\n")
overall_status = "success"
if any(r["status"] != "success" for r in results):
overall_status = "error"
successful_count = sum(1 for r in results if r["status"] == "success")
final_status_msg = f"Finished processing {len(results)} item(s). Successfully processed {successful_count}."
await _emit_status(
__event_emitter__, final_status_msg, status=overall_status, done=True
)
return formatted_output