"""
title: MultiLanguage LibreTranslate Action Button
description: Translate the response to multiple languages at once LibreTranslate API
author: @iamg30
author_url: https://openwebui.com/u/iamg30/
forked from https://openwebui.com/f/jthesse/libretranslate_action
funding_url: https://github.com/open-webui
version: 0.1.1
license: MIT
icon_url: data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIxZW0iIGhlaWdodD0iMWVtIiB2aWV3Qm94PSIwIDAgMjQgMjQiPjxwYXRoIGZpbGw9ImN1cnJlbnRDb2xvciIgZD0ibTEyLjg3IDE1LjA3bC0yLjU0LTIuNTFsLjAzLS4wM0ExNy41IDE3LjUgMCAwIDAgMTQuMDcgNkgxN1Y0aC03VjJIOHYySDF2MmgxMS4xN0MxMS41IDcuOTIgMTAuNDQgOS43NSA5IDExLjM1QzguMDcgMTAuMzIgNy4zIDkuMTkgNi42OSA4aC0yYy43MyAxLjYzIDEuNzMgMy4xNyAyLjk4IDQuNTZsLTUuMDkgNS4wMkw0IDE5bDUtNWwzLjExIDMuMTF6TTE4LjUgMTBoLTJMMTIgMjJoMmwxLjEyLTNoNC43NUwyMSAyMmgyem0tMi42MiA3bDEuNjItNC4zM0wxOS4xMiAxN3oiLz48L3N2Zz4=
required_open_webui_version: 0.3.32
"""
from pydantic import BaseModel, Field
from typing import Optional
from open_webui.utils.misc import get_last_assistant_message
import requests
import asyncio
class Action:
class Valves(BaseModel):
LIBRE_TRANSLATE_URL: str = Field(
default="http://localhost:5000",
description="The URL of the LibreTranslate instance.",
)
class UserValves(BaseModel):
SOURCE_LANGUAGE: str = Field(
default="auto",
description="User-specific source language for assistant messages",
)
TARGET_LANGUAGES: list[str] = Field(
default=["de", "fr", "es"],
description="User-specific target languages for assistant messages",
)
def __init__(self):
self.valves = self.Valves()
async def action(
self,
body: dict,
__user__=None,
__event_emitter__=None,
__event_call__=None,
) -> Optional[dict]:
print(f"action:{__name__}")
source = __user__["valves"].SOURCE_LANGUAGE
target_languages = __user__["valves"].TARGET_LANGUAGES
if not self.valves.LIBRE_TRANSLATE_URL:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "error",
"description": "Please set your LibreTranslate URL",
"done": True,
},
}
)
return
for target in target_languages:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Translating text from {source} to {target}",
"done": False,
},
}
)
await asyncio.sleep(1)
messages = body["messages"]
assistant_message = get_last_assistant_message(messages)
print(f"Assistant message: {assistant_message}")
payload = {
"q": assistant_message,
"source": source,
"target": target,
}
try:
response = requests.post(
f"{self.valves.LIBRE_TRANSLATE_URL}/translate", json=payload
)
except Exception as e:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "error",
"description": f"Error translating to {target}: {e}",
"done": True,
},
}
)
continue
if response.status_code != 200:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "error",
"description": f"Error translating to {target}: {response.status_code}; {response.text}",
"done": True,
},
}
)
continue
data = response.json()
content = (
f"\n--- {target} -------------------------\n" + data["translatedText"]
)
await __event_emitter__({"type": "message", "data": {"content": content}})
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Translation to {target} done",
"done": True,
},
}
)