NOTICE
Open WebUI Community is currently undergoing a major revamp to improve user experience and performance. Expected completion by year-end! ✨

Function
action
v0.1.0
LibreTranslate Action
Translate the response to a new language using LibreTranslate API
Function ID
libretranslate_action
Creator
@jthesse
Downloads
248+

Function Content
python
"""
title: LibreTranslate Action
description: Translate the response to a new language using LibreTranslate API
author: Jan-Timo Hesse
author_url: https://github.com/JTHesse/
funding_url: https://github.com/JTHesse/
version: 0.1.0
license: MIT
icon_url: 
required_open_webui_version: 0.3.9
"""

from pydantic import BaseModel, Field
from typing import Optional

from utils.misc import get_last_assistant_message

import requests
import asyncio


class Action:
    class Valves(BaseModel):
        LIBRE_TRANSLATE_URL: str = Field(
            default="http://localhost:5000",
            description="The URL of the LibreTranslate instance.",
        )

    class UserValves(BaseModel):
        SOURCE_LANGUAGE: str = Field(
            default="auto", description="User-specific source language for assistant messages"
        )
        TARGET_LANGUAGE: str = Field(
            default="de", description="User-specific target language for assistant messages"
        )

    def __init__(self):
        self.valves = self.Valves()
        pass

    async def action(
        self,
        body: dict,
        __user__=None,
        __event_emitter__=None,
        __event_call__=None,
    ) -> Optional[dict]:
        print(f"action:{__name__}")

        source = __user__["valves"].SOURCE_LANGUAGE
        target = __user__["valves"].TARGET_LANGUAGE

        if not self.valves.LIBRE_TRANSLATE_URL:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"status": "error", "description": "Please set your LibreTranslate URL", "done": True},
                }
            )
            return

        if __event_emitter__:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": f"Translating text from {source} to {target}", "done": False},
                }
            )
            await asyncio.sleep(1)

            messages = body["messages"]
            assistant_message = get_last_assistant_message(messages)

            print(f"Assistant message: {assistant_message}")

            payload = {
                "q": assistant_message,
                "source": source,
                "target": target,
            }

            try:
                response = requests.post(
                    f"{self.valves.LIBRE_TRANSLATE_URL}/translate", json=payload
                )
            except Exception as e:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {"status": "error", "description": f"Error: {e}", "done": True},
                    }
                )
                return

            if response.status_code != 200:
                await __event_emitter__(
                    {
                        "type": "status",
                        "data": {
                            "status": "error",
                            "description": f"Error: {response.status_code}; {response.text}",
                            "done": True,
                        },
                    }
                )
                return
            
            data = response.json()
            content = ("""

---                    
"""
                + data["translatedText"]
            )

            await __event_emitter__({"type": "message", "data": {"content": content}})
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": "Answer Translated", "done": True},
                }
            )