"""
title: Cloudflare Workers AI Whisper
description: Transcribe audio/video via Cloudflare Workers AI Whisper. Finds files even without explicit attachment and can return WebVTT with timestamps.
author: you
version: 0.3.0
license: MIT
"""
from pydantic import BaseModel, Field
from typing import Any, Dict, Optional, List, Tuple
import os, json, time, re, hashlib, mimetypes, shutil, subprocess, tempfile
# Prefer requests; fallback to urllib
try:
import requests
_USE_REQUESTS = True
except Exception:
import urllib.request
_USE_REQUESTS = False
CLOUDFLARE_API_BASE = "https://api.cloudflare.com/client/v4"
# Idempotency (avoid duplicate CF calls for same bytes) + write guard for VTT
_IDEMP_CACHE: Dict[str, Tuple[float, Dict[str, Any]]] = {}
_WRITE_GUARD: Dict[str, float] = {}
WRITE_GUARD_SECONDS = 90.0
class Pipe:
class Valves(BaseModel):
CLOUDFLARE_ACCOUNT_ID: str = Field(
default="", description="Cloudflare Account ID (Workers AI)."
)
CLOUDFLARE_API_TOKEN: str = Field(
default="", description="Cloudflare API Token (AI:Run/Read)."
)
MODEL: str = Field(
default="@cf/openai/whisper",
description="e.g. @cf/openai/whisper or @cf/openai/whisper-large-v3-turbo",
)
DEBUG: bool = Field(
default=False, description="Verbose logs to backend stdout."
)
SIMPLE_RETURN: bool = Field(
default=True, description="Return a bare string (best for 0.6.29 UI)."
)
# Output selection
RETURN_FORMAT: str = Field(
default="both", description="'text' | 'vtt' | 'both' | 'auto'"
)
WRITE_VTT_FILE: bool = Field(
default=True,
description="Write a .vtt file to disk (guarded to once per audio).",
)
VTT_OUT_DIR: str = Field(
default="/app/backend/data/whisper_out",
description="Directory to write .vtt files when enabled.",
)
# Size + adaptation
MAX_BYTES: int = Field(
default=1_200_000, description="Target max size per request (bytes)."
)
STRATEGY: str = Field(
default="auto", description="'auto' | 'downsample' | 'chunk' | 'none'"
)
CHUNK_SECONDS: int = Field(
default=60, description="Chunk length when chunking (seconds)."
)
TARGET_KBPS: int = Field(
default=24, description="Re-encode target audio bitrate (kbps)."
)
TARGET_HZ: int = Field(
default=16000, description="Re-encode target sample rate (Hz)."
)
# Where to look for uploads (inside the container)
UPLOADS_BASE_PATHS: List[str] = Field(
default=[
"/app/backend/data/uploads",
"/app/backend/data/files",
"/app/backend/data",
],
description="Primary search roots for uploaded files.",
)
# Set to 0 to ignore time window; we’ll take the newest audio file on disk.
RECENT_WINDOW_S: int = Field(
default=0,
description="Look-back window for FS scanning (0 = ignore and pick newest).",
)
FS_SCAN_LIMIT: int = Field(
default=10000, description="Max files to scan recursively."
)
# VTT cue building (when CF doesn't return vtt)
VTT_MAX_CUE_SECS: float = Field(
default=4.0, description="Max seconds per cue when synthesizing VTT."
)
VTT_MAX_WORDS_PER_CUE: int = Field(
default=16, description="Max words per cue when synthesizing VTT."
)
VTT_GAP_BREAK_SECS: float = Field(
default=0.8,
description="Start a new cue if gap between words exceeds this.",
)
def __init__(self):
self.type = "pipe"
self.id = "cf.whisper.transcribe"
self.name = "Cloudflare Whisper Transcriber"
self.valves = self.Valves()
def pipes(self):
return [
{
"id": self.id,
"name": "", # model label
"provider": "Cloudflare AI", # provider label (if supported)
}
]
# ---------------- utils ----------------
def _log(self, *args):
if self.valves.DEBUG:
print("[CF-Whisper]", *args)
def _now(self) -> float:
return time.time()
def _have_ffmpeg(self) -> bool:
return shutil.which("ffmpeg") is not None
def _guess_mimetype_from_name(self, name: Optional[str]) -> str:
if not name:
return "application/octet-stream"
mt, _ = mimetypes.guess_type(name)
return mt or "application/octet-stream"
def _sniff_mimetype_from_bytes(self, b: bytes, fallback: str) -> str:
try:
if not b or len(b) < 12:
return fallback
head = b[:12]
if head[:3] == b"ID3" or head[:2] in (
b"\xff\xfb",
b"\xff\xf3",
b"\xff\xf2",
):
return "audio/mpeg"
if head[:4] == b"RIFF" and b"WAVE" in b[:16]:
return "audio/wav"
if head[:4] == b"fLaC":
return "audio/flac"
if head[:4] == b"OggS":
return "audio/ogg"
if b"ftyp" in head:
return "audio/mp4"
except Exception:
pass
return fallback
def _is_audioish(self, f: Dict[str, Any]) -> bool:
name = (f.get("name") or f.get("filename") or "").lower()
ctype = (f.get("content_type") or f.get("mimetype") or "").lower()
if "audio" in ctype or "video" in ctype:
return True
audio_exts = (
".mp3",
".wav",
".m4a",
".flac",
".ogg",
".opus",
".mp4",
".aac",
".wma",
".amr",
".webm",
".mov",
".mkv",
)
return any(name.endswith(ext) for ext in audio_exts)
# ---------------- file IO ----------------
def _candidate_roots(self) -> List[str]:
roots = set(self.valves.UPLOADS_BASE_PATHS or [])
roots.update(
{
"/app/backend/data/uploads",
"/app/backend/data",
"/data/uploads",
"/data",
"/app/data/uploads",
"/app/data",
"/var/lib/open-webui/data",
"/var/lib/open-webui",
"/srv/openwebui/data",
}
)
env_root = os.environ.get("OPENWEBUI_DATA_DIR")
if env_root:
roots.add(env_root)
roots.add(os.path.join(env_root, "uploads"))
r = [p for p in sorted(roots) if p and os.path.exists(p)]
self._log("Scanning roots:", r)
return r
def _read_by_path(self, path: str) -> Optional[bytes]:
if not path:
return None
try:
if os.path.isabs(path) and os.path.exists(path):
with open(path, "rb") as fh:
return fh.read()
rel = path.lstrip("/\\")
for root in self._candidate_roots():
p = os.path.join(root, rel)
if os.path.exists(p):
with open(p, "rb") as fh:
return fh.read()
except Exception as e:
self._log("read_by_path failed:", path, e)
return None
def _read_by_name(self, name: str) -> Optional[bytes]:
if not name:
return None
for root in self._candidate_roots():
p = os.path.join(root, name)
if os.path.exists(p):
try:
with open(p, "rb") as fh:
return fh.read()
except Exception as e:
self._log("read_by_name failed:", p, e)
return None
def _row_get(self, row: Any, key: str) -> Any:
if isinstance(row, dict):
return row.get(key)
return getattr(row, key, None)
def _read_info_by_file_id(
self, file_id: str
) -> Optional[Tuple[bytes, Optional[str], Optional[str]]]:
if not file_id:
return None
try:
from open_webui.models.files import Files # type: ignore
meta = Files.get_file_by_id(file_id)
if meta is None:
return None
ctype = self._row_get(meta, "content_type")
name = self._row_get(meta, "name")
path = (
self._row_get(meta, "path")
or self._row_get(meta, "filepath")
or self._row_get(meta, "file_path")
)
if path and os.path.exists(path):
with open(path, "rb") as fh:
return fh.read(), ctype, name
blob = self._row_get(meta, "data")
if isinstance(blob, (bytes, bytearray)) and blob:
return bytes(blob), ctype, name
if isinstance(blob, str) and blob and blob not in ("{}", "null", "None"):
import base64
try:
return base64.b64decode(blob), ctype, name
except Exception:
pass
for root in self._candidate_roots():
for dirpath, _dirs, files in os.walk(root):
for fn in files:
full = os.path.join(dirpath, fn)
if file_id in full:
try:
with open(full, "rb") as fh:
return fh.read(), ctype, name or fn
except Exception:
continue
except Exception as e:
self._log("Files.get_file_by_id failed:", e)
return None
def _read_by_inline_b64(self, b64: str) -> Optional[bytes]:
if not b64:
return None
try:
import base64
return base64.b64decode(b64)
except Exception as e:
self._log("base64 decode failed:", e)
return None
# ---------------- request-body gathering ----------------
def _gather_files_anywhere(self, body: Dict[str, Any]) -> List[Dict[str, Any]]:
cands: List[Dict[str, Any]] = []
# Root-level common keys
for key in ("__files__", "attachments", "files", "file"):
v = body.get(key)
if isinstance(v, list):
cands.extend([x for x in v if isinstance(x, dict)])
elif isinstance(v, dict):
cands.append(v)
# Inside messages (0.6.29 often puts them here)
for m in reversed(body.get("messages") or []):
for key in ("attachments", "files", "__files__"):
v = m.get(key)
if isinstance(v, list):
cands.extend([x for x in v if isinstance(x, dict)])
elif isinstance(v, dict):
cands.append(v)
# Some builds tuck a single file under message["file"]
v = m.get("file")
if isinstance(v, dict):
cands.append(v)
aud = [f for f in cands if self._is_audioish(f)]
return aud or cands
def _extract_first_file_dict(
self, body: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
cands = self._gather_files_anywhere(body)
return cands[0] if cands else None
def _extract_file_id_from_messages(self, body: Dict[str, Any]) -> Optional[str]:
for m in reversed(body.get("messages") or []):
if m.get("role") and m.get("role") != "user":
continue
if isinstance(m.get("file_id"), str):
return m["file_id"]
content = (m.get("content") or "").strip()
m1 = re.search(r"file_id\s*[:=]\s*([0-9a-fA-F\-]{36})", content)
if m1:
return m1.group(1)
m2 = re.search(
r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b",
content,
)
if m2:
return m2.group(0)
return None
# ---------------- DB + FS fallbacks ----------------
def _rows_from_files_model(self, user_id: Optional[str]) -> Optional[List[Any]]:
try:
from open_webui.models.files import Files # type: ignore
except Exception as e:
self._log("Files model import failed:", e)
return None
rows = None
for meth in ("get_all_files_by_user", "get_files_by_user", "get_all"):
fn = getattr(Files, meth, None)
if callable(fn):
try:
rows = fn(user_id) if meth.endswith("_by_user") else fn()
if rows:
break
except TypeError:
try:
rows = fn()
if rows:
break
except Exception as e:
self._log(f"Files.{meth} failed:", e)
except Exception as e:
self._log(f"Files.{meth} failed:", e)
return rows
def _read_recent_from_files_model(
self, user_id: Optional[str]
) -> Optional[Tuple[bytes, str, Dict[str, Any]]]:
rows = self._rows_from_files_model(user_id)
if not rows:
return None
def as_epoch(x):
for fld in ("created_at", "updated_at", "timestamp", "ts"):
v = self._row_get(x, fld)
if v is None:
continue
try:
if isinstance(v, (int, float)):
return float(v)
if isinstance(v, str):
from datetime import datetime
for fmt in (
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
):
try:
return datetime.strptime(
v.split("Z")[0], fmt
).timestamp()
except Exception:
pass
except Exception:
pass
return 0.0
def is_audio_meta(m):
name = (self._row_get(m, "name") or "").lower()
ctype = (self._row_get(m, "content_type") or "").lower()
if "audio" in ctype or "video" in ctype:
return True
return any(
name.endswith(ext)
for ext in (
".mp3",
".wav",
".m4a",
".flac",
".ogg",
".opus",
".mp4",
".aac",
".wma",
".amr",
".webm",
".mov",
".mkv",
)
)
cands = [m for m in rows if is_audio_meta(m)]
cands.sort(key=lambda m: as_epoch(m), reverse=True)
for meta in cands:
name = self._row_get(meta, "name")
p = (
self._row_get(meta, "path")
or self._row_get(meta, "filepath")
or self._row_get(meta, "file_path")
)
if p and os.path.exists(p):
try:
with open(p, "rb") as fh:
b = fh.read()
mt = self._row_get(
meta, "content_type"
) or self._guess_mimetype_from_name(name)
mt = self._sniff_mimetype_from_bytes(b, mt)
return (
b,
mt,
{
"source": "files_model",
"file_id": self._row_get(meta, "id")
or self._row_get(meta, "file_id"),
"name": name,
"path": p,
},
)
except Exception as e:
self._log("open path from Files failed:", e)
continue
blob = self._row_get(meta, "data")
try:
if isinstance(blob, (bytes, bytearray)) and blob:
b = bytes(blob)
elif (
isinstance(blob, str)
and blob
and blob not in ("{}", "null", "None")
):
import base64
b = base64.b64decode(blob)
else:
b = None
except Exception:
b = None
if b:
mt = self._row_get(
meta, "content_type"
) or self._guess_mimetype_from_name(name)
mt = self._sniff_mimetype_from_bytes(b, mt)
return (
b,
mt,
{
"source": "files_model_blob",
"file_id": self._row_get(meta, "id")
or self._row_get(meta, "file_id"),
"name": name,
},
)
return None
def _read_recent_from_filesystem(
self,
) -> Optional[Tuple[bytes, str, Dict[str, Any]]]:
now = time.time()
window = int(self.valves.RECENT_WINDOW_S)
limit = max(100, int(self.valves.FS_SCAN_LIMIT))
exts = (
".mp3",
".wav",
".m4a",
".flac",
".ogg",
".opus",
".mp4",
".aac",
".wma",
".amr",
".webm",
".mov",
".mkv",
)
newest_overall = (None, 0.0)
newest_in_window = (None, 0.0)
scanned = 0
roots = self._candidate_roots()
if not roots:
self._log("No readable roots found on disk.")
return None
for root in roots:
for dirpath, _dirs, files in os.walk(root):
for fn in files:
if not fn.lower().endswith(exts):
continue
p = os.path.join(dirpath, fn)
try:
st = os.stat(p)
except Exception:
continue
scanned += 1
if scanned > limit:
break
mtime = st.st_mtime
if mtime > newest_overall[1]:
newest_overall = (p, mtime)
if (
window > 0
and (now - mtime) <= window
and mtime > newest_in_window[1]
):
newest_in_window = (p, mtime)
if scanned > limit:
break
chosen_path = None
if window <= 0:
chosen_path = newest_overall[0]
self._log("FS pick (no window):", chosen_path)
else:
chosen_path = newest_in_window[0] or newest_overall[0]
self._log("FS pick (window/overall):", chosen_path)
if chosen_path and os.path.exists(chosen_path):
try:
with open(chosen_path, "rb") as fh:
b = fh.read()
mt = self._guess_mimetype_from_name(os.path.basename(chosen_path))
mt = self._sniff_mimetype_from_bytes(b, mt)
return (
b,
mt,
{
"source": "filesystem",
"path": chosen_path,
"name": os.path.basename(chosen_path),
},
)
except Exception as e:
self._log("filesystem open failed:", e)
return None
# ---------------- main reader ----------------
def _read_uploaded(
self, body: Dict[str, Any], __user__: Dict[str, Any]
) -> Optional[Tuple[bytes, str, Dict[str, Any]]]:
debug: Dict[str, Any] = {}
# A) Direct attachment in the request
f = self._extract_first_file_dict(body)
if f:
debug["source"] = "attached"
debug["file_dict_keys"] = list(f.keys())
ctype = (f.get("content_type") or f.get("mimetype") or "").strip() or None
name = f.get("name") or f.get("filename")
if name:
debug["attached_name"] = name
if f.get("path"):
b = self._read_by_path(f["path"])
if b:
mt = ctype or self._guess_mimetype_from_name(name)
mt = self._sniff_mimetype_from_bytes(b, mt)
return b, mt, debug
if name:
b = self._read_by_name(name)
if b:
mt = ctype or self._guess_mimetype_from_name(name)
mt = self._sniff_mimetype_from_bytes(b, mt)
return b, mt, debug
if f.get("file_id"):
info = self._read_info_by_file_id(f["file_id"])
if info:
b, ctype2, name2 = info
debug["name"] = name2 or debug.get("attached_name")
mt = ctype or ctype2 or self._guess_mimetype_from_name(name2)
mt = self._sniff_mimetype_from_bytes(b, mt)
return b, mt, {**debug, "file_id": f["file_id"]}
if isinstance(f.get("data"), str) and f["data"]:
b = self._read_by_inline_b64(f["data"])
if b:
mt = ctype or self._guess_mimetype_from_name(name)
mt = self._sniff_mimetype_from_bytes(b, mt)
return b, mt, debug
# B) UUID mentioned in message content
fid = self._extract_file_id_from_messages(body)
if fid:
info = self._read_info_by_file_id(fid)
if info:
self._log("Read via file_id from message:", fid)
b, ctype3, name3 = info
mt = ctype3 or self._guess_mimetype_from_name(name3)
mt = self._sniff_mimetype_from_bytes(b, mt)
return b, mt, {"file_id_from_message": fid, "name": name3}
# C) Most recent file for this user from Files model
user_id = (__user__ or {}).get("id")
recent = self._read_recent_from_files_model(user_id)
if recent:
self._log("DB pick:", recent[2])
return recent
# D) Filesystem newest audio
fs = self._read_recent_from_filesystem()
if fs:
self._log("FS pick:", fs[2])
return fs
return None
# ---------------- audio transforms ----------------
def _ffmpeg_write_bytes(self, b: bytes, suffix: str) -> Tuple[str, str]:
tmpdir = tempfile.mkdtemp(prefix="cfwhisp_")
inp = os.path.join(tmpdir, f"in.{suffix.lstrip('.')}")
with open(inp, "wb") as f:
f.write(b)
return tmpdir, inp
def _downsample(
self, b: bytes, target_kbps: int, target_hz: int
) -> Optional[Tuple[bytes, str]]:
if not self._have_ffmpeg():
return None
tmpdir, inp = self._ffmpeg_write_bytes(b, "bin")
outp = os.path.join(tmpdir, "out.mp3")
cmd = [
"ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
inp,
"-ac",
"1",
"-ar",
str(target_hz),
"-b:a",
f"{target_kbps}k",
outp,
]
try:
subprocess.run(cmd, check=True)
with open(outp, "rb") as f:
return f.read(), "audio/mpeg"
except Exception as e:
self._log("ffmpeg downsample failed:", e)
return None
finally:
try:
shutil.rmtree(tmpdir)
except Exception:
pass
def _chunk(
self, b: bytes, chunk_sec: int, target_kbps: int, target_hz: int
) -> Optional[List[Tuple[bytes, str]]]:
if not self._have_ffmpeg():
return None
tmpdir, inp = self._ffmpeg_write_bytes(b, "bin")
pat = os.path.join(tmpdir, "seg_%03d.mp3")
cmd = [
"ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
inp,
"-ac",
"1",
"-ar",
str(target_hz),
"-b:a",
f"{target_kbps}k",
"-f",
"segment",
"-segment_time",
str(chunk_sec),
"-reset_timestamps",
"1",
pat,
]
try:
subprocess.run(cmd, check=True)
parts: List[Tuple[bytes, str]] = []
for name in sorted(os.listdir(tmpdir)):
if not name.startswith("seg_"):
continue
with open(os.path.join(tmpdir, name), "rb") as f:
parts.append((f.read(), "audio/mpeg"))
return parts or None
except Exception as e:
self._log("ffmpeg chunk failed:", e)
return None
finally:
try:
shutil.rmtree(tmpdir)
except Exception:
pass
# ---------------- VTT helpers ----------------
def _fmt_ts(self, seconds: float) -> str:
if seconds < 0:
seconds = 0.0
ms = int(round(seconds * 1000.0))
h = ms // 3_600_000
ms %= 3_600_000
m = ms // 60_000
ms %= 60_000
s = ms // 1000
ms %= 1000
return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"
def _words_to_vtt(self, words: List[Dict[str, Any]]) -> str:
if not words:
return "WEBVTT\n\n"
max_secs = float(self.valves.VTT_MAX_CUE_SECS)
max_words = int(self.valves.VTT_MAX_WORDS_PER_CUE)
gap_break = float(self.valves.VTT_GAP_BREAK_SECS)
vtt_lines = ["WEBVTT", ""]
cue_words: List[Dict[str, Any]] = []
cue_start = None
last_end = None
def flush():
nonlocal cue_words, cue_start, last_end
if not cue_words:
return
start = (
cue_start if cue_start is not None else cue_words[0].get("start", 0.0)
)
end = last_end if last_end is not None else cue_words[-1].get("end", start)
text = " ".join(w.get("word", "").strip() for w in cue_words).strip()
vtt_lines.append(
f"{self._fmt_ts(float(start))} --> {self._fmt_ts(float(end))}"
)
vtt_lines.append(text)
vtt_lines.append("")
cue_words = []
cue_start = None
last_end = None
for w in words:
wstart = float(w.get("start", 0.0))
wend = float(w.get("end", wstart))
if cue_start is None:
cue_start = wstart
last_end = wend
cue_words = [w]
continue
if wstart - (last_end or wstart) > gap_break:
flush()
cue_start = wstart
last_end = wend
cue_words = [w]
continue
cue_words.append(w)
last_end = wend
if (wend - (cue_start or wstart)) >= max_secs or len(
cue_words
) >= max_words:
flush()
flush()
return "\n".join(vtt_lines)
def _maybe_write_vtt(
self, audio_bytes: bytes, vtt: str, suggested_name: Optional[str] = None
) -> Optional[str]:
if not (self.valves.WRITE_VTT_FILE and vtt):
return None
key = self._stable_key_for_audio(audio_bytes)
last = _WRITE_GUARD.get(key, 0.0)
now = self._now()
if (now - last) < WRITE_GUARD_SECONDS:
self._log("VTT write suppressed by guard (recent write).")
return None
_WRITE_GUARD[key] = now
os.makedirs(self.valves.VTT_OUT_DIR, exist_ok=True)
base = "transcript"
if suggested_name:
import re as _re
base = (
_re.sub(
r"[^A-Za-z0-9\-]+",
"-",
os.path.splitext(os.path.basename(suggested_name))[0],
)
or base
)
fname = f"{base}-{int(now)}.vtt"
fpath = os.path.join(self.valves.VTT_OUT_DIR, fname)
with open(fpath, "w", encoding="utf-8") as fh:
fh.write(vtt)
self._log("VTT written:", fpath)
return fpath
# ---------------- Cloudflare call ----------------
def _parse_or_raise(self, raw_text: str, status: int = 200) -> Dict[str, Any]:
try:
data = json.loads(raw_text)
except Exception:
raise RuntimeError(f"Cloudflare returned non-JSON:\n{raw_text[:2000]}")
if isinstance(data, dict) and data.get("success") is False:
errs = data.get("errors") or []
for e in errs:
if isinstance(e, dict) and str(e.get("code")) == "6001":
raise RuntimeError(
f"Transient network error (6001): {raw_text[:2000]}"
)
result = data.get("result", data)
if not isinstance(result, dict):
raise RuntimeError(f"Unexpected response shape:\n{data}")
result["_cf_raw"] = raw_text[:2000]
result["_cf_status"] = status
return result
def _cf_whisper(self, audio_bytes: bytes, mimetype: str) -> Dict[str, Any]:
acc = self.valves.CLOUDFLARE_ACCOUNT_ID.strip()
tok = self.valves.CLOUDFLARE_API_TOKEN.strip()
if not acc or not tok:
raise RuntimeError(
"Missing Cloudflare credentials (Account ID / API Token)."
)
url = f"{CLOUDFLARE_API_BASE}/accounts/{acc}/ai/run/{self.valves.MODEL}"
transports = ["octet", "multipart", "octet"]
for attempt, mode in enumerate(transports, start=1):
try:
if _USE_REQUESTS:
if mode == "octet":
headers = {
"Authorization": f"Bearer {tok}",
"Content-Type": mimetype or "application/octet-stream",
"Accept": "application/json",
}
self._log(
f"POST attempt#{attempt} ({mode})",
url,
"bytes:",
len(audio_bytes),
"ctype:",
headers["Content-Type"],
)
r = requests.post(
url, headers=headers, data=audio_bytes, timeout=300
)
if r.status_code >= 500 or r.status_code in (
520,
521,
522,
523,
524,
525,
526,
):
raise RuntimeError(
f"Cloudflare AI HTTP {r.status_code}: {r.text[:2000]}"
)
if r.status_code in (400, 415):
raise RuntimeError(
f"Bad request/ctype {r.status_code}: {r.text[:2000]}"
)
if r.status_code == 413:
raise RuntimeError(
f"Cloudflare AI HTTP 413: {r.text[:2000]}"
)
if r.status_code >= 400:
raise RuntimeError(
f"Cloudflare AI HTTP {r.status_code}: {r.text[:2000]}"
)
return self._parse_or_raise(r.text, r.status_code)
else:
headers = {
"Authorization": f"Bearer {tok}",
"Accept": "application/json",
}
files = {
"file": (
"audio",
audio_bytes,
mimetype or "application/octet-stream",
)
}
self._log(
f"POST attempt#{attempt} ({mode})",
url,
"bytes:",
len(audio_bytes),
"ctype:",
files["file"][2],
)
r = requests.post(
url, headers=headers, files=files, timeout=300
)
if r.status_code >= 500 or r.status_code in (
520,
521,
522,
523,
524,
525,
526,
):
raise RuntimeError(
f"Cloudflare AI HTTP {r.status_code}: {r.text[:2000]}"
)
if r.status_code == 413:
raise RuntimeError(
f"Cloudflare (multipart) HTTP 413: {r.text[:2000]}"
)
if r.status_code >= 400:
raise RuntimeError(
f"Cloudflare (multipart) HTTP {r.status_code}: {r.text[:2000]}"
)
return self._parse_or_raise(r.text, r.status_code)
else:
headers = {
"Authorization": f"Bearer {tok}",
"Content-Type": mimetype or "application/octet-stream",
"Accept": "application/json",
}
self._log(
f"POST attempt#{attempt} (octet/urllib)",
url,
"bytes:",
len(audio_bytes),
"ctype:",
headers["Content-Type"],
)
req = urllib.request.Request(
url, data=audio_bytes, headers=headers, method="POST"
)
with urllib.request.urlopen(req, timeout=300) as resp:
raw_text = resp.read().decode("utf-8")
status = getattr(resp, "status", 200)
return self._parse_or_raise(raw_text, status)
except Exception as e:
msg = str(e)
retryable = (
("HTTP 5" in msg)
or ("HTTP 52" in msg)
or ("Transient network error" in msg)
or ("Network connection lost" in msg)
)
if attempt < len(transports) and retryable:
sleep_s = 0.6 * (2 ** (attempt - 1))
self._log(f"Retrying after {sleep_s:.1f}s due to: {msg[:120]}")
time.sleep(sleep_s)
continue
raise
raise RuntimeError("Unexpected: all attempts exhausted without return.")
# ---------------- idempotency ----------------
def _stable_key_for_audio(self, audio_bytes: bytes) -> str:
h = hashlib.sha256(audio_bytes).hexdigest()[:32]
return f"{h}:{self.valves.MODEL}"
# ---------------- main ----------------
def pipe(self, body: dict, __user__: dict, __request__=None) -> Any:
t0 = time.time()
# auto-return selection based on user prompt keywords
ret_mode = (self.valves.RETURN_FORMAT or "text").lower()
if ret_mode == "auto":
last = ""
msgs = body.get("messages") or []
if msgs:
last = (msgs[-1].get("content") or "").lower()
if any(
k in last
for k in ("vtt", "subtitle", "subtitles", "webvtt", "timestamps", "srt")
):
ret_mode = "vtt"
else:
ret_mode = "text"
got = self._read_uploaded(body, __user__)
if not got:
return (
(
"I didn't detect an audio/video file in your message. "
"Please attach a file (mp3/wav/m4a/ogg/mp4/etc.) and try again."
)
if self.valves.SIMPLE_RETURN
else {"ok": False, "error": "No readable audio found in the message."}
)
audio, mimetype, debug = got
max_bytes = max(512_000, int(self.valves.MAX_BYTES))
strategy = (self.valves.STRATEGY or "auto").lower()
def call_cloudflare(bts: bytes, mt: str) -> Dict[str, Any]:
key = self._stable_key_for_audio(bts)
now = self._now()
hit = _IDEMP_CACHE.get(key)
if hit and (now - hit[0] < 10):
return hit[1]
result = self._cf_whisper(bts, mt)
_IDEMP_CACHE[key] = (now, result)
return result
# Fast path
if len(audio) <= max_bytes or strategy == "none":
try:
res = call_cloudflare(audio, mimetype or "application/octet-stream")
text = (res.get("text") or "").strip()
vtt = res.get("vtt") or ""
words = res.get("words") or []
if not vtt and words:
vtt = self._words_to_vtt(words)
vtt_path = self._maybe_write_vtt(
audio,
vtt,
suggested_name=(debug.get("attached_name") or debug.get("name")),
)
if self.valves.SIMPLE_RETURN:
if ret_mode == "vtt":
return vtt or "WEBVTT\n\n"
if ret_mode == "both":
block = "```vtt\n" + (vtt or "WEBVTT\n\n") + "\n```"
if vtt_path:
block += f"\n\n(saved on server: {vtt_path})"
return (text or "(empty)") + "\n\n" + block
return text or "(empty transcription)"
else:
payload = {"ok": True, "text": text, "vtt": vtt}
payload["meta"] = {
"model": self.valves.MODEL,
"duration_s": round(self._now() - t0, 2),
"cf_status": res.get("_cf_status", 200),
"cf_raw": res.get("_cf_raw"),
"debug": {"mimetype": mimetype, **debug, "vtt_path": vtt_path},
}
return payload
except Exception as e:
msg = f"Transcription failed:\n\n{str(e)[:2000]}"
return (
msg
if self.valves.SIMPLE_RETURN
else {
"ok": False,
"error": f"Transcription request failed: {str(e)}",
"meta": {
"model": self.valves.MODEL,
"duration_s": round(self._now() - t0, 2),
"cf_status": "error",
"cf_raw": str(e)[:2000],
"debug": {"mimetype": mimetype, **debug},
},
}
)
# Adaptive (downsample -> chunk)
if not self._have_ffmpeg():
msg = "File is large and ffmpeg isn't available to shrink/chunk it. Please install ffmpeg or try a smaller file."
return msg if self.valves.SIMPLE_RETURN else {"ok": False, "error": msg}
shrunk: Optional[Tuple[bytes, str]] = None
parts: Optional[List[Tuple[bytes, str]]] = None
try_downsample = strategy in ("auto", "downsample")
try_chunk = strategy in ("auto", "chunk")
if try_downsample:
self._log(
"Attempting downsample...",
f"{self.valves.TARGET_KBPS}kbps @ {self.valves.TARGET_HZ}Hz",
)
shrunk = self._downsample(
audio, self.valves.TARGET_KBPS, self.valves.TARGET_HZ
)
if shrunk and len(shrunk[0]) > max_bytes:
self._log("Downsample still > MAX_BYTES; will chunk.")
shrunk = None
if not shrunk and try_chunk:
chunk_sec = max(10, int(self.valves.CHUNK_SECONDS))
for _ in range(4):
self._log(f"Chunking at {chunk_sec}s ...")
parts = self._chunk(
audio, chunk_sec, self.valves.TARGET_KBPS, self.valves.TARGET_HZ
)
if not parts:
break
if all(len(p[0]) <= max_bytes for p in parts):
break
chunk_sec = max(10, chunk_sec // 2)
parts = None
try:
texts: List[str] = []
all_words: List[Dict[str, Any]] = []
offset = 0.0
cf_status, cf_raw = None, None
if shrunk:
b2, mt2 = shrunk
res = call_cloudflare(b2, mt2)
texts.append((res.get("text") or "").strip())
words = res.get("words") or []
if words:
all_words.extend(
{
"word": w.get("word", ""),
"start": float(w.get("start", 0.0)) + offset,
"end": float(w.get("end", 0.0)) + offset,
}
for w in words
)
if words:
offset += float(
words[-1].get("end", words[-1].get("start", 0.0))
)
cf_status, cf_raw = res.get("_cf_status", 200), res.get("_cf_raw")
elif parts:
for idx, (pb, pmt) in enumerate(parts):
self._log(f"Sending chunk {idx+1}/{len(parts)} ({len(pb)} bytes)")
res = call_cloudflare(pb, pmt)
texts.append((res.get("text") or "").strip())
words = res.get("words") or []
if words:
for w in words:
all_words.append(
{
"word": w.get("word", ""),
"start": float(w.get("start", 0.0)) + offset,
"end": float(w.get("end", 0.0)) + offset,
}
)
offset += float(
words[-1].get("end", words[-1].get("start", 0.0))
)
cf_status, cf_raw = res.get("_cf_status", 200), res.get("_cf_raw")
else:
shrunk = self._downsample(
audio, max(16, self.valves.TARGET_KBPS // 2), self.valves.TARGET_HZ
)
if not shrunk or len(shrunk[0]) > max_bytes:
raise RuntimeError(
"Unable to adapt audio under MAX_BYTES. Increase chunking or install ffmpeg."
)
b2, mt2 = shrunk
res = call_cloudflare(b2, mt2)
texts.append((res.get("text") or "").strip())
words = res.get("words") or []
if words:
all_words.extend(
{
"word": w.get("word", ""),
"start": float(w.get("start", 0.0)) + offset,
"end": float(w.get("end", 0.0)) + offset,
}
for w in words
)
if words:
offset += float(
words[-1].get("end", words[-1].get("start", 0.0))
)
cf_status, cf_raw = res.get("_cf_status", 200), res.get("_cf_raw")
final_text = "\n".join([t for t in texts if t]).strip()
vtt_combined = self._words_to_vtt(all_words) if all_words else ""
vtt_path = self._maybe_write_vtt(
audio,
vtt_combined,
suggested_name=(debug.get("attached_name") or debug.get("name")),
)
if self.valves.SIMPLE_RETURN:
if ret_mode == "vtt":
return vtt_combined or "WEBVTT\n\n"
if ret_mode == "both":
block = "```vtt\n" + (vtt_combined or "WEBVTT\n\n") + "\n```"
if vtt_path:
block += f"\n\n(saved on server: {vtt_path})"
return (final_text or "(empty)") + "\n\n" + block
return final_text or "(empty transcription)"
else:
return {
"ok": True,
"text": final_text,
"vtt": vtt_combined,
"meta": {
"model": self.valves.MODEL,
"duration_s": round(self._now() - t0, 2),
"cf_status": cf_status,
"cf_raw": cf_raw,
"debug": {"vtt_path": vtt_path, **debug},
},
}
except Exception as e:
msg = f"Transcription failed:\n\n{str(e)[:2000]}"
return (
msg
if self.valves.SIMPLE_RETURN
else {
"ok": False,
"error": f"Transcription request failed: {str(e)}",
"meta": {
"model": self.valves.MODEL,
"duration_s": round(self._now() - t0, 2),
"cf_status": "error",
"cf_raw": str(e)[:2000],
},
}
)