NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance. Expected completion by year-end! ✨

Function
pipe
v6.6.6r9
DOOM Pipeline (Incompatible with 0.3.33)
Play DOOM in your browser, just don't let the boss catch you!
Function ID
doom
Creator
@justinrahb
Downloads
1.1K+

Function Content
python
"""
title: DOOM
author: justinh-rahb
author_url: https://github.com/justinh-rahb/webui-doom
funding_url: https://github.com/justinh-rahb/webui-doom
version: 6.6.6r9
license: MIT
"""

from pydantic import BaseModel, Field
from typing import Union, Generator, Iterator, Dict
from utils.misc import get_last_user_message
from apps.webui.models.files import Files
from apps.webui.models.users import Users

from main import generate_chat_completions

import requests
import time
import uuid
import os
import json

from config import UPLOAD_DIR


class Pipe:
    class Valves(BaseModel):
        MODEL_ID: str = Field(
            default="DOOM:latest", description="Identifier for the DOOM model."
        )
        MODEL_NAME: str = Field(default="DOOM", description="Name for the DOOM model.")
        FALLBACK_MODEL_ID: str = Field(
            default="llama3:latest", description="Title-generation model."
        )
        GITHUB_REPO_URL: str = Field(
            default="https://raw.githubusercontent.com/justinh-rahb/webui-doom/main/src/",
            description="Base URL to load assets from.",
        )
        WAD_FILE_URL: str = Field(
            default="https://raw.githubusercontent.com/justinh-rahb/webui-doom/main/src/doom1.wad",
            description="Bring your own .WAD file!",
        )

    def __init__(self):
        self.type = "manifold"
        self.valves = self.Valves()
        self.token = None
        pass

    def pipes(self):
        return [{"name": self.valves.MODEL_NAME, "id": self.valves.MODEL_ID}]

    def create_file(
        self, file_name: str, title: str, content: Union[str, bytes], content_type: str
    ):
        base_path = UPLOAD_DIR + "/"
        file_id = str(uuid.uuid4())

        file_path = base_path + file_id + "_" + file_name
        # Create a file
        mode = "w" if isinstance(content, str) else "wb"
        with open(file_path, mode) as f:
            f.write(content)

        meta = {
            "source": file_path,
            "title": title,
            "content_type": content_type,
            "size": os.path.getsize(file_path),
            "path": file_path,
        }

        class FileForm(BaseModel):
            id: str
            filename: str
            meta: dict = {}

        formData = FileForm(id=file_id, filename=file_name, meta=meta)

        file = Files.insert_new_file(self.user_id, formData)
        return file.id

    def get_file_url(self, file_id: str) -> str:
        return f"/api/v1/files/{file_id}/content"

    def responses(
        self, command: str, messages: list
    ) -> Union[str, Generator, Iterator]:
        print(f"responses:{__name__}")

        command_body = get_last_user_message(messages)[len(command) + 2 :]
        print("Command Body:", command_body)

        list_of_responses = [
            "![DOOM]()\n\n",
        ]

        # Check if the files already exist
        files = Files.get_files()
        files = [file for file in files if file.user_id == self.user_id]

        existing_html = next(
            (file for file in files if "index.html" in file.filename), None
        )

        if existing_html:
            list_of_responses.append(
                "Looks like you already have the game files... 🤘\n\n"
            )
            list_of_responses.append(
                "{{HTML_FILE_ID_{FILE_ID}}}".replace("{FILE_ID}", existing_html.id)
            )
        else:
            wad_url = self.valves.WAD_FILE_URL
            cfg_url = f"{self.valves.GITHUB_REPO_URL}default.cfg"
            html_url = f"{self.valves.GITHUB_REPO_URL}index.html"
            js_url = f"{self.valves.GITHUB_REPO_URL}websockets-doom.js"
            wasm_url = f"{self.valves.GITHUB_REPO_URL}websockets-doom.wasm"

            # Step 1: Download and upload the WAD file as doom1.wad
            list_of_responses.append("```console\nDownloading WAD.......... ")
            wad_id = self.download_and_create_file(
                "doom1.wad", wad_url, "application/x-doom"
            )
            list_of_responses.append(f"ID: {wad_id} DONE\n")
            print(f"doom1.wad ID: {wad_id}")

            # Step 2: Download and upload the default.cfg file
            list_of_responses.append("Downloading CFG.......... ")
            cfg_id = self.download_and_create_file(
                "default.cfg", cfg_url, "application/octet-stream"
            )
            list_of_responses.append(f"ID: {cfg_id} DONE\n")
            print(f"default.cfg ID: {cfg_id}")

            # Step 3: Download and upload the WASM file
            list_of_responses.append("Downloading WASM......... ")
            wasm_id = self.download_and_create_file(
                "websockets-doom.wasm", wasm_url, "application/wasm"
            )
            list_of_responses.append(f"ID: {wasm_id} DONE\n")
            print(f"websockets-doom.wasm ID: {wasm_id}")

            # Step 4: Download, modify, and upload the JavaScript file
            list_of_responses.append("Downloading JS........... ")
            js_content = requests.get(js_url).text
            js_content = js_content.replace(
                "websockets-doom.wasm", self.get_file_url(wasm_id)
            )
            js_content = js_content.replace(
                "doom1.wad", self.get_file_url(wad_id)
            ).replace("default.cfg", self.get_file_url(cfg_id))
            js_id = self.create_file(
                "websockets-doom.js",
                "websockets-doom.js",
                js_content,
                "application/javascript",
            )
            list_of_responses.append(f"ID: {js_id} DONE\n")
            print(f"websockets-doom.js ID: {js_id}")

            # Step 5: Download, modify, and upload the HTML file
            list_of_responses.append("Downloading HTML......... ")
            html_content = requests.get(html_url).text

            html_content = html_content.replace(
                'Module.FS.createPreloadedFile("", "doom1.wad", "doom1.wad", true, true);',
                f'Module.FS.createPreloadedFile("", "doom1.wad", "{self.get_file_url(wad_id)}/doom1.wad", true, true);',
            )
            html_content = html_content.replace(
                'Module.FS.createPreloadedFile("", "default.cfg", "default.cfg", true, true);',
                f'Module.FS.createPreloadedFile("", "default.cfg", "{self.get_file_url(cfg_id)}/default.cfg", true, true);',
            )
            html_content = html_content.replace(
                "websockets-doom.js", self.get_file_url(js_id)
            )
            html_id = self.create_file(
                "index.html", "index.html", html_content, "text/html"
            )
            list_of_responses.append(f"ID: {html_id} DONE\n```\n\n")
            print(f"index.html ID: {html_id}")

            # Step 6: Provide the final HTML ID to display in iframe
            list_of_responses.append("Time to play... 😈\n\n")
            list_of_responses.append(
                "{{HTML_FILE_ID_{FILE_ID}}}".replace("{FILE_ID}", html_id)
            )

        for response in list_of_responses:
            time.sleep(0.5)
            yield response

        return "Done"

    def download_and_create_file(self, file_name: str, url: str, content_type: str):
        try:
            response = requests.get(url)
            response.raise_for_status()
            content = response.content

            # Create the file with the correct content type
            return self.create_file(file_name, file_name, content, content_type)
        except Exception as e:
            raise Exception(f"Error handling {file_name}: {str(e)}")

    async def pipe(self, body: dict, __user__: dict) -> Union[str, Generator, Iterator]:
        print(f"pipe:{__name__}")
        self.user_id = __user__["id"]

        messages = body["messages"]
        user_message = get_last_user_message(messages)

        if user_message.startswith("/"):
            command = user_message.split(" ")[0][1:]
            print(f"Command: {command}")
            return self.responses(command, messages)
        else:
            print("No command found - calling API")

            # Call the API based on the API_TYPE
            payload = {
                "model": self.valves.FALLBACK_MODEL_ID,
                "messages": body["messages"],
                "stream": body.get("stream", False),
            }

            print(f"call_api:{__name__}")
            print(f"call_api:{payload}")

            user = Users.get_user_by_id(__user__["id"])
            return await generate_chat_completions(form_data=payload, user=user)