"""
Web Scrape (Wikipedia Query Normalization + DDG Lite Search + Wikipedia JSON Summary
+ Frankfurter FX Instant Answers + API Key Overrides + Defensive Valve Access)
version: 1.5.4
license: MIT
"""
import re
import requests
from typing import Optional, Dict, Any
from urllib.parse import urlparse, quote
from pydantic import BaseModel, Field
# ---------------------------------------------------------
# Utility functions
# ---------------------------------------------------------
def extract_title(text: str) -> Optional[str]:
match = re.search(r"^Title:\s*(.*?)\s*$", text, re.MULTILINE)
return match.group(1).strip() if match else None
def clean_urls(text: str) -> str:
# Remove bare URLs in parentheses: (https://...)
text = re.sub(r"\(https?://[^)]+\)", "", text)
# Remove markdown links but keep the label: [label](https://...)
text = re.sub(r"\[([^\]]+)\]\(https?://[^)]+\)", r"\1", text)
# Remove bare URLs
text = re.sub(r"https?://\S+", "", text)
return text
def extract_domain(url: str) -> str:
m = re.search(r"https?://([^/]+)", url)
return m.group(1).lower() if m else ""
# ---------------------------------------------------------
# WEATHER RULE
# ---------------------------------------------------------
def enforce_weather_rule(url: str) -> Optional[str]:
if "wttr.in" in url:
base = url.split("?")[0]
loc = base.split("wttr.in/")[-1] or ""
return f"https://wttr.in/{loc}?format=3"
lowered = url.lower()
if any(k in lowered for k in ["weather", "forecast", "temperature"]):
loc = url.split("/")[-1].split("?")[0].split("&")[0].split("#")[0] or "Perth"
return f"https://wttr.in/{loc}?format=3"
return None
# ---------------------------------------------------------
# FX QUERY PARSER (Frankfurter)
# ---------------------------------------------------------
def _normalize_currency_token(token: str) -> str:
"""
Map common currency names/aliases to ISO 4217 codes where obvious.
Keeps 3-letter codes as-is when already valid.
"""
if not token:
return ""
t = token.upper()
aliases = {
"YEN": "JPY",
"EURO": "EUR",
"EUROS": "EUR",
"POUND": "GBP",
"POUNDS": "GBP",
}
return aliases.get(t, t)
def parse_fx_query(text: str):
"""
Try to recognize FX-style queries and return (amount, from_ccy, to_ccy),
or None if it doesn't look like an FX query.
Handles examples like:
'1 aud to jpy'
'aud to jpy'
'convert aud to jpy'
'exchange rate aud to jpy'
"what's the current exchange rate of aud to yen"
"""
raw = text.strip()
lo = raw.lower()
# Quick filter: must mention 'to' or 'in' or 'exchange rate' / 'convert'
if not any(k in lo for k in [" to ", " in ", "exchange rate", "convert "]):
return None
# Pattern with explicit amount: "10 usd to eur"
m = re.search(
r"(?i)\b(\d+(?:\.\d+)?)\s*([A-Z]{3}|[a-z]{3}|yen|euro|euros|pound|pounds)\s+"
r"(?:to|in)\s+([A-Z]{3}|[a-z]{3}|yen|euro|euros|pound|pounds)\b",
raw,
)
if m:
amount = float(m.group(1))
from_ccy = _normalize_currency_token(m.group(2))
to_ccy = _normalize_currency_token(m.group(3))
return amount, from_ccy, to_ccy
# Pattern without amount: "usd to eur", "exchange rate aud to jpy"
m = re.search(
r"(?i)\b([A-Z]{3}|[a-z]{3}|yen|euro|euros|pound|pounds)\s+"
r"(?:to|in)\s+([A-Z]{3}|[a-z]{3}|yen|euro|euros|pound|pounds)\b",
raw,
)
if m:
amount = 1.0
from_ccy = _normalize_currency_token(m.group(1))
to_ccy = _normalize_currency_token(m.group(2))
return amount, from_ccy, to_ccy
return None
# ---------------------------------------------------------
# Wikipedia query normalizer (plain text → /wiki/ URL)
# ---------------------------------------------------------
def interpret_wikipedia_query(text: str) -> Optional[str]:
"""
Turn text like:
- "Search wikipedia for Boris Becker"
- "search wikipedia Boris Becker"
- "wikipedia Boris Becker"
- "wiki Boris Becker"
into a canonical Wikipedia article URL:
- "https://en.wikipedia.org/wiki/Boris_Becker"
"""
if text.startswith("http://") or text.startswith("https://"):
return None # already a URL; don't touch
raw = text.strip()
if not raw:
return None
lo = raw.lower()
# Only trigger if the user clearly mentions wikipedia/wiki
wikipedia_markers = [
"search wikipedia for ",
"search wikipedia ",
"wikipedia ",
"wiki ",
]
for marker in wikipedia_markers:
if lo.startswith(marker):
topic = raw[len(marker) :].strip()
if not topic:
return None
# Normalize title: spaces -> underscores, URL-encode
title = topic.replace(" ", "_")
title_enc = quote(title)
return f"https://en.wikipedia.org/wiki/{title_enc}"
return None
# ---------------------------------------------------------
# DDG Lite Rewriter
# ---------------------------------------------------------
def rewrite_to_ddg_lite(url: str) -> Optional[str]:
lo = url.lower()
# IMPORTANT: don't hijack explicit Wikipedia queries
if "wikipedia" in lo or "wiki " in lo:
return None
if "google.com" in lo:
m = re.search(r"[?&]q=([^&]+)", lo)
q = m.group(1) if m else lo
return f"https://lite.duckduckgo.com/lite/?q={q}"
keys = ["search for ", "search ", "find ", "look up ", "look for "]
if any(k in lo for k in keys):
q = lo
for k in keys:
q = q.replace(k, "")
return f"https://lite.duckduckgo.com/lite/?q={q.strip()}"
if lo.startswith("news ") or " news" in lo:
return f"https://lite.duckduckgo.com/lite/?q={lo.replace('news', '').strip()}"
if " " in url and not url.startswith("http"):
return f"https://lite.duckduckgo.com/lite/?q={url.strip()}"
return None
# ---------------------------------------------------------
# Wikipedia URL Rewriter → JSON summary endpoint
# ---------------------------------------------------------
def rewrite_wikipedia_to_summary(url: str) -> Optional[str]:
"""
If URL is a Wikipedia HTML article, rewrite it to the JSON summary endpoint.
e.g. https://en.wikipedia.org/wiki/Albert_Einstein
-> https://en.wikipedia.org/api/rest_v1/page/summary/Albert_Einstein
"""
parsed = urlparse(url)
dom = parsed.netloc.lower()
if "wikipedia.org" not in dom:
return None
path = parsed.path or ""
# Typical article URLs: /wiki/Title
if path.startswith("/wiki/") and len(path) > len("/wiki/"):
title = path[len("/wiki/") :]
title = title.rstrip("/")
if not title:
return None
return f"https://{dom}/api/rest_v1/page/summary/{title}"
return None
# ---------------------------------------------------------
# Event Emitter
# ---------------------------------------------------------
class EventEmitter:
def __init__(self, emitter=None):
self.e = emitter
async def emit(self, desc="Unknown", status="in_progress", done=False):
if not self.e:
return
payload = {
"type": "status",
"data": {"status": status, "description": desc, "done": done},
}
r = self.e(payload)
if hasattr(r, "__await__"):
await r
async def progress_update(self, d: str):
await self.emit(d)
async def error_update(self, d: str):
await self.emit(d, "error", True)
async def success_update(self, d: str):
await self.emit(d, "success", True)
# ---------------------------------------------------------
# TOOL CONFIG (COGWHEEL VALVES)
# ---------------------------------------------------------
class Tools:
class Valves(BaseModel):
DISABLE_CACHING: bool = False
GLOBAL_JINA_API_KEY: str = ""
CITATION: bool = True
MAX_CHARS: int = 3500
MAX_LINES: int = 120
API_OVERRIDE_TEXT: str = Field(
default="",
description=(
"Per-domain API overrides, one per line:\n"
" api.nasa.gov=api_key=YOUR_KEY\n"
" api.openweathermap.org=appid=YOUR_KEY"
),
)
class UserValves(BaseModel):
CLEAN_CONTENT: bool = True
JINA_API_KEY: str = ""
MAX_CHARS: Optional[int] = None
EXTRACT_MODE: bool = False
def __init__(self):
self.valves = self.Valves()
# ---------------------------------------------------------
# Defensive user valve accessor
# ---------------------------------------------------------
def _uv(self, uv: Any, key: str, default=None):
if isinstance(uv, dict):
return uv.get(key, default)
return getattr(uv, key, default)
# ---------------------------------------------------------
# API override parser
# ---------------------------------------------------------
def _parse_api_overrides(self) -> Dict[str, str]:
out: Dict[str, str] = {}
raw = (self.valves.API_OVERRIDE_TEXT or "").strip()
if not raw:
return out
for line in raw.splitlines():
line = line.strip()
if "=" not in line:
continue
d, pair = line.split("=", 1)
out[d.strip().lower()] = pair.strip()
return out
# ---------------------------------------------------------
# FX INSTANT ANSWER VIA FRANKFURTER
# ---------------------------------------------------------
async def _handle_fx_query(
self,
text: str,
emitter: Any = None,
uv: Any = None,
) -> Optional[str]:
"""
Try to answer FX queries via Frankfurter.
On success: return a short text answer.
On any failure or non-FX input: return None and let caller fall back.
Designed to be safe both when called internally from web_scrape
(with a real EventEmitter and uv) and when invoked directly as a
tool with just 'text' (in which case emitter/uv may be omitted).
"""
parsed = parse_fx_query(text)
if not parsed:
return None
# Ensure uv is something our helpers can handle
if uv is None:
uv = self.UserValves()
amount, from_ccy, to_ccy = parsed
url = (
f"https://api.frankfurter.app/latest"
f"?from={from_ccy}&to={to_ccy}&amount={amount}"
)
if hasattr(emitter, "progress_update"):
await emitter.progress_update(
f"FX query detected → Frankfurter: {amount} {from_ccy} to {to_ccy}"
)
try:
r = requests.get(url, timeout=10)
r.raise_for_status()
data = r.json()
except requests.RequestException:
# Any network/API issue: fall back to normal search
if hasattr(emitter, "error_update"):
await emitter.error_update(
"Frankfurter FX request failed; falling back"
)
return None
except ValueError:
if hasattr(emitter, "error_update"):
await emitter.error_update(
"Frankfurter returned invalid JSON; falling back"
)
return None
rates = data.get("rates") or {}
if to_ccy not in rates:
# Unsupported currency pair or malformed response → fall back
return None
rate = rates[to_ccy]
date = data.get("date") or "unknown date"
if amount == 1.0:
core = f"1 {from_ccy} ≈ {rate} {to_ccy}"
else:
core = f"{amount} {from_ccy} ≈ {rate} {to_ccy}"
text_out = (
f"{core} (as of {date}, source: European Central Bank via frankfurter.app)"
)
if self.valves.CITATION:
text_out = (
f"Title: FX rate {from_ccy}→{to_ccy}\n"
f"Source: https://www.frankfurter.app/\n\n"
f"{text_out}"
)
return self._trim_text(text_out, uv)
# ---------------------------------------------------------
# MAIN SCRAPE ENTRYPOINT
# ---------------------------------------------------------
async def web_scrape(self, url: str, __event_emitter__=None, __user__=None):
emitter = EventEmitter(__event_emitter__)
# Load user valves safely
if (__user__ is None) or ("valves" not in __user__):
uv: Any = self.UserValves()
else:
raw = __user__["valves"]
if isinstance(raw, dict):
uv = raw
else:
uv = raw
# WEATHER RULE
w = enforce_weather_rule(url)
if w:
await emitter.progress_update(f"WEATHER RULE applied → {w}")
return await self._fetch(w, emitter, uv, original_url=w)
# FX INSTANT ANSWER (Frankfurter)
fx_answer = await self._handle_fx_query(url, emitter, uv)
if fx_answer is not None:
await emitter.success_update("FX instant answer via Frankfurter")
return fx_answer
# 1) Normalize explicit Wikipedia queries ("search wikipedia X", etc.)
wiki_query_url = interpret_wikipedia_query(url)
if wiki_query_url:
await emitter.progress_update(
f"Wikipedia article detected → {wiki_query_url}"
)
wiki_api_url = rewrite_wikipedia_to_summary(wiki_query_url)
if wiki_api_url:
await emitter.progress_update(f"Wikipedia summary API: {wiki_api_url}")
return await self._fetch(
wiki_api_url,
emitter,
uv,
original_url=wiki_api_url,
original_human_url=wiki_query_url,
)
# If for some reason rewrite fails, fall through and just scrape the article URL
url = wiki_query_url
# 2) Wikipedia HTML → JSON summary rewrite (for direct /wiki/ URLs)
wiki_api_url = rewrite_wikipedia_to_summary(url)
if wiki_api_url:
await emitter.progress_update(f"Wikipedia summary API: {wiki_api_url}")
return await self._fetch(
wiki_api_url,
emitter,
uv,
original_url=wiki_api_url,
original_human_url=url,
)
# 3) DuckDuckGo Lite for other generic search-like queries
ddg = rewrite_to_ddg_lite(url)
if ddg:
await emitter.progress_update(f"DuckDuckGo Lite: {ddg}")
url = ddg
await emitter.progress_update(f"Scraping {url}")
return await self._fetch(url, emitter, uv, original_url=url)
# ---------------------------------------------------------
# TEXT PROCESSING HELPERS
# ---------------------------------------------------------
def _trim_text(self, t: str, uv: Any) -> str:
t = re.sub(r"[ \t]+", " ", t)
lines = t.splitlines()
if len(lines) > self.valves.MAX_LINES:
lines = lines[: self.valves.MAX_LINES] + [
"...[truncated: max lines reached]"
]
t = "\n".join(lines).strip()
mc = self._uv(uv, "MAX_CHARS", None) or self.valves.MAX_CHARS
if len(t) > mc:
sn = t[:mc]
cut = sn.rfind("\n")
if cut > mc * 0.6:
sn = sn[:cut]
t = sn.rstrip() + "\n\n...[truncated: max chars reached]"
return t
def _extract_relevant(self, t: str, uv: Any) -> str:
t = re.sub(r"[ \t]+", " ", t)
ps = [p.strip() for p in re.split(r"\n\s*\n", t) if p.strip()]
if len(ps) > 3:
ps = ps[:3] + ["...[additional paragraphs omitted]"]
return self._trim_text("\n\n".join(ps), uv)
# ---------------------------------------------------------
# FETCH + FALLBACK + NATIVE API MODE
# ---------------------------------------------------------
async def _fetch(
self,
url: str,
emitter, # intentionally untyped to avoid Pydantic schema issues
uv: Any,
original_url: Optional[str] = None,
original_human_url: Optional[str] = None,
):
headers = {
"User-Agent": "Mozilla/5.0",
"X-No-Cache": "true" if self.valves.DISABLE_CACHING else "false",
"X-With-Generated-Alt": "true",
}
# Jina token (unchanged behavior)
token = self._uv(uv, "JINA_API_KEY", "") or self.valves.GLOBAL_JINA_API_KEY
token = (token or "").strip()
if token:
headers["Authorization"] = f"Bearer {token}"
if original_url is None:
original_url = url
# API override (e.g., NASA; Wikipedia does not need this)
overrides = self._parse_api_overrides()
dom = extract_domain(original_url)
if dom in overrides:
join = "&" if "?" in original_url else "?"
original_url = f"{original_url}{join}{overrides[dom]}"
try:
resp = requests.get(original_url, headers=headers, timeout=15)
resp.raise_for_status()
ct = resp.headers.get("Content-Type", "").lower()
dom = extract_domain(original_url)
# -------------------------------------------------
# SPECIAL HANDLING: Wikipedia JSON summary endpoint
# -------------------------------------------------
if "wikipedia.org" in dom and "/api/rest_v1/page/summary/" in original_url:
data = resp.json()
title = data.get("title") or None
# Human-readable article URL
source = (
(data.get("content_urls") or {}).get("desktop", {}).get("page")
or original_human_url
or original_url
)
summary = data.get("extract") or data.get("description") or ""
if not summary:
summary = str(data)
# Build base text with citation (if enabled)
if self.valves.CITATION:
if title:
text = f"Title: {title}\nSource: {source}\n\n{summary}"
else:
text = f"Source: {source}\n\n{summary}"
else:
text = summary
# Always append "for more" continuation after the summary
text = f"{text}\n\nFor more, click on this URL: {source}"
await emitter.success_update(
f"Wikipedia summary fetched: {title or original_url}"
)
return text
# -------------------------------------------------
# Generic JSON fast-path (APIs)
# -------------------------------------------------
if "json" in ct or dom.startswith("api."):
await emitter.success_update("Fetched API (JSON bypass mode)")
return resp.json()
# -------------------------------------------------
# HTML scrape path
# -------------------------------------------------
raw = resp.text
low = raw.lower()
bad = [
"captcha",
"cloudflare",
"enable javascript",
"please enable javascript",
"access denied",
"forbidden",
"unauthorized",
"checking your browser",
"verification required",
"bot protection",
]
if any(b in low for b in bad):
await emitter.error_update(
"Site not scrape-accessible (JS/Auth required)"
)
return {
"type": "markdown",
"data": (
"Unable to access this site automatically.\n\n"
f"[Open this site in your browser]({original_url})"
),
}
text = raw
if self._uv(uv, "CLEAN_CONTENT", True):
await emitter.progress_update("Cleaning content")
text = clean_urls(text)
if self._uv(uv, "EXTRACT_MODE", False):
await emitter.progress_update("Extract Mode enabled")
text = self._extract_relevant(text, uv)
else:
text = self._trim_text(text, uv)
title = extract_title(text)
await emitter.success_update(f"Scraped {title or url}")
if title:
return f"Title: {title}\nSource: {original_url}\n\n{text}"
return f"Source: {original_url}\n\n{text}"
except requests.RequestException as e:
await emitter.error_update(f"Error scraping {original_url}: {e}")
return {
"type": "markdown",
"data": (
"Unable to access this site automatically.\n\n"
f"[Open this site in your browser]({original_url})"
),
}