"""
Enhanced Web Scrape (Refactored + DDG Lite Search + API Key Overrides + Native API Mode)
version: 1.3.9
license: MIT
Features:
- WEATHER RULE enforcement
- DuckDuckGo Lite rewriting (blocks Google SERPs)
- Token-efficient scraping
- JS/Auth/Captcha fallback detection
- CLICKABLE LINK FALLBACK (Markdown output)
- Jina API key support (tool valve)
- Per-domain API key overrides via cogwheel (multiline text)
- **Native API Mode (JSON bypass)** → If endpoint returns JSON, Jina is skipped and raw JSON returned
"""
import re
import requests
from typing import Optional, Dict
from pydantic import BaseModel, Field
# ---------------------------------------------------------
# Utility functions
# ---------------------------------------------------------
def extract_title(text: str) -> Optional[str]:
match = re.search(r"^Title:\s*(.*?)\s*$", text, re.MULTILINE)
return match.group(1).strip() if match else None
def clean_urls(text: str) -> str:
text = re.sub(r"\(https?://[^)]+\)", "", text)
text = re.sub(r"\[([^\]]+)\]\(https?://[^)]+\)", r"\1", text)
text = re.sub(r"https?://\S+", "", text)
return text
def extract_domain(url: str) -> str:
m = re.search(r"https?://([^/]+)", url)
return m.group(1).lower() if m else ""
# ---------------------------------------------------------
# WEATHER RULE
# ---------------------------------------------------------
def enforce_weather_rule(url: str) -> Optional[str]:
if "wttr.in" in url:
base = url.split("?")[0]
loc = base.split("wttr.in/")[-1] or ""
return f"https://wttr.in/{loc}?format=3"
lowered = url.lower()
if any(k in lowered for k in ["weather", "forecast", "temperature"]):
loc = url.split("/")[-1].split("?")[0].split("&")[0].split("#")[0] or "Perth"
return f"https://wttr.in/{loc}?format=3"
return None
# ---------------------------------------------------------
# DDG Lite Rewriter
# ---------------------------------------------------------
def rewrite_to_ddg_lite(url: str) -> Optional[str]:
lo = url.lower()
if "google.com" in lo:
m = re.search(r"[?&]q=([^&]+)", lo)
q = m.group(1) if m else lo
return f"https://lite.duckduckgo.com/lite/?q={q}"
keys = ["search for ", "search ", "find ", "look up ", "look for "]
if any(k in lo for k in keys):
q = lo
for k in keys:
q = q.replace(k, "")
return f"https://lite.duckduckgo.com/lite/?q={q.strip()}"
if lo.startswith("news ") or " news" in lo:
return f"https://lite.duckduckgo.com/lite/?q={lo.replace('news', '').strip()}"
if " " in url and not url.startswith("http"):
return f"https://lite.duckduckgo.com/lite/?q={url.strip()}"
return None
# ---------------------------------------------------------
# Event Emitter
# ---------------------------------------------------------
class EventEmitter:
def __init__(self, emitter=None):
self.e = emitter
async def emit(self, desc="Unknown", status="in_progress", done=False):
if not self.e:
return
payload = {"type": "status", "data": {"status": status, "description": desc, "done": done}}
r = self.e(payload)
if hasattr(r, "__await__"):
await r
async def progress_update(self, d): await self.emit(d)
async def error_update(self, d): await self.emit(d, "error", True)
async def success_update(self, d): await self.emit(d, "success", True)
# ---------------------------------------------------------
# TOOL CONFIG (COGWHEEL VALVES)
# ---------------------------------------------------------
class Tools:
class Valves(BaseModel):
DISABLE_CACHING: bool = False
GLOBAL_JINA_API_KEY: str = ""
CITATION: bool = True
MAX_CHARS: int = 3500
MAX_LINES: int = 120
API_OVERRIDE_TEXT: str = Field(
default="",
description=(
"Per-domain API overrides, one per line:\n"
" api.nasa.gov=api_key=YOUR_KEY\n"
" api.openweathermap.org=appid=YOUR_KEY"
),
)
class UserValves(BaseModel):
CLEAN_CONTENT: bool = True
JINA_API_KEY: str = ""
MAX_CHARS: Optional[int] = None
EXTRACT_MODE: bool = False
def __init__(self):
self.valves = self.Valves()
# ---------------------------------------------------------
# API override parser
# ---------------------------------------------------------
def _parse_api_overrides(self) -> Dict[str, str]:
out = {}
raw = (self.valves.API_OVERRIDE_TEXT or "").strip()
if not raw:
return out
for line in raw.splitlines():
line = line.strip()
if "=" not in line:
continue
d, pair = line.split("=", 1)
out[d.strip().lower()] = pair.strip()
return out
# ---------------------------------------------------------
# MAIN SCRAPE ENTRYPOINT
# ---------------------------------------------------------
async def web_scrape(self, url: str, __event_emitter__=None, __user__=None):
emitter = EventEmitter(__event_emitter__)
# Load user valves safely
if (__user__ is None) or ("valves" not in __user__):
uv = self.UserValves()
else:
raw = __user__["valves"]
if isinstance(raw, dict):
uv = self.UserValves(**raw)
elif isinstance(raw, Tools.UserValves):
uv = raw
else:
uv = self.UserValves()
# WEATHER RULE
w = enforce_weather_rule(url)
if w:
await emitter.progress_update(f"WEATHER RULE applied → {w}")
return await self._fetch(w, emitter, uv, original_url=w)
# DDG LITE
ddg = rewrite_to_ddg_lite(url)
if ddg:
await emitter.progress_update(f"DuckDuckGo Lite: {ddg}")
url = ddg
await emitter.progress_update(f"Scraping {url}")
return await self._fetch(url, emitter, uv, original_url=url)
# ---------------------------------------------------------
# TEXT PROCESSING HELPERS
# ---------------------------------------------------------
def _trim_text(self, t: str, uv):
t = re.sub(r"[ \t]+", " ", t)
lines = t.splitlines()
if len(lines) > self.valves.MAX_LINES:
lines = lines[: self.valves.MAX_LINES] + ["...[truncated: max lines reached]"]
t = "\n".join(lines).strip()
mc = uv.MAX_CHARS or self.valves.MAX_CHARS
if len(t) > mc:
sn = t[:mc]
cut = sn.rfind("\n")
if cut > mc * 0.6:
sn = sn[:cut]
t = sn.rstrip() + "\n\n...[truncated: max chars reached]"
return t
def _extract_relevant(self, t: str, uv):
t = re.sub(r"[ \t]+", " ", t)
ps = [p.strip() for p in re.split(r"\n\s*\n", t) if p.strip()]
if len(ps) > 3:
ps = ps[:3] + ["...[additional paragraphs omitted]"]
return self._trim_text("\n\n".join(ps), uv)
# ---------------------------------------------------------
# FETCH + FALLBACK + NATIVE API MODE
# ---------------------------------------------------------
async def _fetch(self, url: str, emitter, uv, original_url=None):
headers = {
"User-Agent": "Mozilla/5.0",
"X-No-Cache": "true" if self.valves.DISABLE_CACHING else "false",
"X-With-Generated-Alt": "true",
}
# Jina auth if needed
token = (uv.JINA_API_KEY or "").strip() or self.valves.GLOBAL_JINA_API_KEY.strip()
if token:
headers["Authorization"] = f"Bearer {token}"
if original_url is None:
original_url = url
# Per-domain API override injection
overrides = self._parse_api_overrides()
dom = extract_domain(original_url)
if dom in overrides:
join = "&" if "?" in original_url else "?"
original_url = f"{original_url}{join}{overrides[dom]}"
try:
resp = requests.get(original_url, headers=headers, timeout=15)
resp.raise_for_status()
# -------------------------
# NATIVE API MODE (JSON)
# -------------------------
ct = resp.headers.get("Content-Type", "").lower()
if "json" in ct or dom.startswith("api."):
await emitter.success_update("Fetched API (JSON bypass mode)")
return resp.json()
# -------------------------
# HTML scraping path
# -------------------------
raw = resp.text
low = raw.lower()
bad = [
"captcha","cloudflare","enable javascript","please enable javascript",
"access denied","forbidden","unauthorized","checking your browser",
"verification required","bot protection",
]
if any(b in low for b in bad):
await emitter.error_update("Site not scrape-accessible (JS/Auth required)")
return {"type": "markdown",
"data": f"Unable to access this site automatically.\n\n[Open this site in your browser]({original_url})"}
text = raw
if uv.CLEAN_CONTENT:
await emitter.progress_update("Cleaning content")
text = clean_urls(text)
if uv.EXTRACT_MODE:
await emitter.progress_update("Extract Mode enabled")
text = self._extract_relevant(text, uv)
else:
text = self._trim_text(text, uv)
title = extract_title(text)
await emitter.success_update(f"Scraped {title or url}")
if title:
return f"Title: {title}\nSource: {original_url}\n\n{text}"
return f"Source: {original_url}\n\n{text}"
except requests.RequestException as e:
await emitter.error_update(f"Error scraping {original_url}: {e}")
return {"type": "markdown",
"data": f"Unable to access this site automatically.\n\n[Open this site in your browser]({original_url})"}