NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance ✨

Function
pipe
v0.1.6
Cloudfare WorkerAI
A Python wrapper for Cloudflare WorkersAI that enables seamless integration with their REST API, allowing direct access to AI models through a simplified manifold pipeline pattern.
Function ID
cloudfare_workerai
Creator
@christiant
Downloads
26+

Function Content
python
"""
title: CloudFlare Manifold Pipe
authors: christian-taillon
author_url: https://christiant.io
funding_url: https://github.com/open-webui
version: 0.1.6
required_open_webui_version: 0.3.17
license: MIT
"""

import os
import requests
import json
import time
from typing import List, Union, Generator, Iterator
from pydantic import BaseModel, Field
from open_webui.utils.misc import pop_system_message


class Pipe:
    class Valves(BaseModel):
        CF_ACCOUNT_ID: str = Field(default="")
        CF_API_KEY: str = Field(default="")
        CF_AI_Gateway: str = Field(default="")

    def __init__(self):
        self.type = "manifold"
        self.id = "cloudflare"
        self.name = "cloudflare/"
        self.valves = self.Valves(
            **{
                "CF_ACCOUNT_ID": os.getenv("CF_ACCOUNT_ID", ""),
                "CF_API_KEY": os.getenv("CF_API_KEY", ""),
                "CF_AI_Gateway": os.getenv("CF_AI_Gateway", ""),
            }
        )

    def get_cloudflare_models(self):
        return [
            {"id": "@cf/meta/llama-3.1-8b-instruct", "name": "llama3.1:8b"},
            {"id": "@cf/meta/llama-2-7b-chat-fp16", "name": "llama2:7b"},
            {"id": "@cf/meta/llama-3.3-70b-instruct-fp8-fast", "name": "llama3.3:70b"},
            {"id": "@cf/meta/llama-3.2-11b-vision-instruct", "name": "llama3.2:11b"},
            {
                "id": "@hf/thebloke/deepseek-coder-6.7b-instruct-awq",
                "name": "deepseek:6.7b",
            },
        ]

    def pipes(self) -> List[dict]:
        return self.get_cloudflare_models()

    def pipe(self, body: dict) -> Union[str, Generator, Iterator]:
        system_message, messages = pop_system_message(body["messages"])
        model_id = (
            body["model"].split("cloudfare_workerai.")[-1]
            if "cloudfare_workerai." in body["model"]
            else body["model"]
        )
        url = f"https://gateway.ai.cloudflare.com/v1/{self.valves.CF_ACCOUNT_ID}/{self.valves.CF_AI_Gateway}/workers-ai/{model_id}"
        headers = {
            "Authorization": f"Bearer {self.valves.CF_API_KEY}",
            "Content-Type": "application/json",
        }
        payload = {
            "prompt": messages[-1]["content"],
            "stream": body.get("stream", False),
            **({"system": str(system_message)} if system_message else {}),
        }

        try:
            if body.get("stream", False):
                return self.stream_response(url, headers, payload)
            else:
                return self.non_stream_response(url, headers, payload)
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            return f"Error: Request failed: {e}"
        except Exception as e:
            print(f"Error in pipe method: {e}")
            return f"Error: {e}"

    def stream_response(self, url, headers, payload):
        try:
            with requests.post(
                url, headers=headers, json=payload, stream=True
            ) as response:
                if response.status_code != 200:
                    raise Exception(
                        f"HTTP Error {response.status_code}: {response.text}"
                    )

                for chunk in response.iter_lines():
                    if chunk:
                        line = chunk.decode("utf-8")
                        if line.startswith("data: "):
                            if line.strip() == "data: [DONE]":
                                break
                            try:
                                json_str = line[6:]
                                data = json.loads(json_str)
                                if "response" in data:
                                    yield data["response"]
                            except json.JSONDecodeError:
                                continue
        except Exception as e:
            yield f"Error: {str(e)}"