"""
title: OPENAI ASSISTANT IN OPENWEBUI
author:
[email protected]
version: 1.0.1
description: A tool to use the OPENAI ASSISTANTS
requirements: requests
"""
import time
import requests
from datetime import datetime
from pydantic import BaseModel
class Tools:
async def conversar_com_assistant(
self, prompt: str, __event_emitter__=None, __user__=None
) -> str:
"""
Send input to Assistant (v2) with corrected URL to check the status of the run..
"""
try:
api_key = "INSERT HERE YOUR OPENAI KEY"
assistant_id = "INSERT HERE YOUR ASSISTANT ID KEY"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"OpenAI-Beta": "assistants=v2"
}
run_start = requests.post(
"https://api.openai.com/v1/threads/runs",
headers=headers,
json={
"assistant_id": assistant_id,
"thread": {
"messages": [
{
"role": "user",
"content": prompt
}
]
}
}
)
run_data = run_start.json()
if run_start.status_code != 200:
error_msg = f"[ERRO AO INICIAR RUN]\nStatus: {run_start.status_code}\nResposta: {run_data}"
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": error_msg}
})
return error_msg
run_id = run_data["id"]
thread_id = run_data["thread_id"]
while True:
time.sleep(1)
run_check_response = requests.get(
f"https://api.openai.com/v1/threads/{thread_id}/runs/{run_id}",
headers=headers
)
run_check = run_check_response.json()
if run_check_response.status_code != 200 or "status" not in run_check:
error_msg = f"[ERRO AO CONSULTAR STATUS DO RUN]\nStatus: {run_check_response.status_code}\nResposta: {run_check}"
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": error_msg}
})
return error_msg
status = run_check["status"]
if status == "completed":
break
if status == "failed":
return "[ERRO] FAIL. Status: failed"
if status == "requires_action":
return "[ERRO] Assistant need a action (tool_call), not yet supported in this simplified version."
# Answer
messages = requests.get(
f"https://api.openai.com/v1/threads/{thread_id}/messages",
headers=headers
).json()
resposta = next(msg for msg in messages["data"] if msg["role"] == "assistant")
return resposta["content"][0]["text"]["value"]
except Exception as e:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Erro: {e}", "done": True}
})
return f"[EXCEÇÃO] {e}"