"""
title: LibreTranslate Action
description: Translate the response to a new language using LibreTranslate API
author: Jan-Timo Hesse
author_url: https://github.com/JTHesse/
funding_url: https://github.com/JTHesse/
version: 0.1.0
license: MIT
icon_url: data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIxZW0iIGhlaWdodD0iMWVtIiB2aWV3Qm94PSIwIDAgMjQgMjQiPjxwYXRoIGZpbGw9ImN1cnJlbnRDb2xvciIgZD0ibTEyLjg3IDE1LjA3bC0yLjU0LTIuNTFsLjAzLS4wM0ExNy41IDE3LjUgMCAwIDAgMTQuMDcgNkgxN1Y0aC03VjJIOHYySDF2MmgxMS4xN0MxMS41IDcuOTIgMTAuNDQgOS43NSA5IDExLjM1QzguMDcgMTAuMzIgNy4zIDkuMTkgNi42OSA4aC0yYy43MyAxLjYzIDEuNzMgMy4xNyAyLjk4IDQuNTZsLTUuMDkgNS4wMkw0IDE5bDUtNWwzLjExIDMuMTF6TTE4LjUgMTBoLTJMMTIgMjJoMmwxLjEyLTNoNC43NUwyMSAyMmgyem0tMi42MiA3bDEuNjItNC4zM0wxOS4xMiAxN3oiLz48L3N2Zz4=
required_open_webui_version: 0.3.9
"""
from pydantic import BaseModel, Field
from typing import Optional
from utils.misc import get_last_assistant_message
import requests
import asyncio
class Action:
class Valves(BaseModel):
LIBRE_TRANSLATE_URL: str = Field(
default="http://localhost:5000",
description="The URL of the LibreTranslate instance.",
)
class UserValves(BaseModel):
SOURCE_LANGUAGE: str = Field(
default="auto", description="User-specific source language for assistant messages"
)
TARGET_LANGUAGE: str = Field(
default="de", description="User-specific target language for assistant messages"
)
def __init__(self):
self.valves = self.Valves()
pass
async def action(
self,
body: dict,
__user__=None,
__event_emitter__=None,
__event_call__=None,
) -> Optional[dict]:
print(f"action:{__name__}")
source = __user__["valves"].SOURCE_LANGUAGE
target = __user__["valves"].TARGET_LANGUAGE
if not self.valves.LIBRE_TRANSLATE_URL:
await __event_emitter__(
{
"type": "status",
"data": {"status": "error", "description": "Please set your LibreTranslate URL", "done": True},
}
)
return
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Translating text from {source} to {target}", "done": False},
}
)
await asyncio.sleep(1)
messages = body["messages"]
assistant_message = get_last_assistant_message(messages)
print(f"Assistant message: {assistant_message}")
payload = {
"q": assistant_message,
"source": source,
"target": target,
}
try:
response = requests.post(
f"{self.valves.LIBRE_TRANSLATE_URL}/translate", json=payload
)
except Exception as e:
await __event_emitter__(
{
"type": "status",
"data": {"status": "error", "description": f"Error: {e}", "done": True},
}
)
return
if response.status_code != 200:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "error",
"description": f"Error: {response.status_code}; {response.text}",
"done": True,
},
}
)
return
data = response.json()
content = ("""
---
"""
+ data["translatedText"]
)
await __event_emitter__({"type": "message", "data": {"content": content}})
await __event_emitter__(
{
"type": "status",
"data": {"description": "Answer Translated", "done": True},
}
)