Whitepaper
Docs
Sign In
Function
Function
pipe
v1.0
RAGFlow CHAT ASSISTANT pipe
Function ID
ragflow_chat_assistant
Creator
@codepoint
Downloads
122+
Pipe function for integrating with RAGFlow CHAT ASSISTANT
Get
README
No README available
Function Code
Show
""" title: RAGFlow CHAT ASSISTANT pipe function author: Per Morten Sandstad version: 1.0 """ from pydantic import BaseModel, Field import requests import json import uuid import re class Pipe: class Valves(BaseModel): RAGFLOW_API_BASE_URL: str = Field( default="http://ragflow.mnemonic.no", description="Base URL for accessing RAGFlow API endpoints.", ) RAGFLOW_API_KEY: str = Field( default="", description="API key for authenticating requests to the RAGFlow API.", ) RAGFLOW_CHAT_ASSISTANT: str = Field( default="Johnny Mnemonic", description="RAGFlow CHAT ASSISTANT to access via the API.", ) RAGFLOW_REFERENCE: str = Field( default="No", description="Include references from RAGFlow in response (Yes/No)", ) def __init__(self): self.type = "pipe" self.id = "ragflow_pipe" self.name = "RAGFlow Pipe" self.valves = self.Valves() self.CHAT_ID = "" def get_chat_assistant_id(self): """Fetch the chat assistant ID by name.""" HEADERS = { "Authorization": f"Bearer {self.valves.RAGFLOW_API_KEY}", "Content-Type": "application/json", } url = f"{self.valves.RAGFLOW_API_BASE_URL}/api/v1/chats?page=1&page_size=10&name={self.valves.RAGFLOW_CHAT_ASSISTANT}" response = requests.get(url, headers=HEADERS) if response.status_code == 200: data = response.json() if "data" in data and data["data"]: for assistant in data["data"]: if assistant["name"] == self.valves.RAGFLOW_CHAT_ASSISTANT: return assistant["id"] return None def create_chat_session(self, chat_assistant_id, session_name): """Create a chat session with the given chat assistant ID.""" HEADERS = { "Authorization": f"Bearer {self.valves.RAGFLOW_API_KEY}", "Content-Type": "application/json", } url = f"{self.valves.RAGFLOW_API_BASE_URL}/api/v1/chats/{chat_assistant_id}/sessions" payload = {"name": session_name} response = requests.post(url, json=payload, headers=HEADERS) if response.status_code == 200: response_json = response.json() chat_id = response_json.get("data", {}).get("id") return response_json return chat_id else: return {"error": f"Failed to create session: {response.text}"} def chat_session(self, chat_assistant_id, chat_id, chat_message): """Create a chat session""" HEADERS = { "Authorization": f"Bearer {self.valves.RAGFLOW_API_KEY}", "Content-Type": "application/json", } url = f"{self.valves.RAGFLOW_API_BASE_URL}/api/v1/chats/{chat_assistant_id}/completions" payload = {"question": chat_message, "stream": False, "session_id": chat_id} # payload = {"question": chat_message, "stream": False} response = requests.post(url, json=payload, headers=HEADERS) if response.status_code == 200: response_json = response.json() return response_json else: return {"error": f"Failed to create session: {response.text}"} def delete_chat_session(self, chat_assistant_id, session_id): """Deletes a chat session by ID.""" HEADERS = { "Authorization": f"Bearer {self.valves.RAGFLOW_API_KEY}", "Content-Type": "application/json", } url = f"{self.valves.RAGFLOW_API_BASE_URL}/api/v1/chats/{chat_assistant_id}/sessions" payload = {"ids": [session_id]} # API expects a list of IDs response = requests.delete(url, json=payload, headers=HEADERS) if response.status_code == 200: return response.json() else: return {"error": f"Failed to delete session: {response.text}"} def pipe(self, body: dict, __user__: dict): # print(f"pipe:{__name__}") self.CHAT_ID = "" RAGFLOW_DOCS = [] messages = body.get("messages", []) if messages: user_question = messages[-1]["content"] try: CHAT_ASSISTANT_ID = self.get_chat_assistant_id() if self.CHAT_ID == "": unique_id = str(uuid.uuid4())[:8] CHAT_SESSION_NAME = f"OpenWebUI-{unique_id}" create_chat_session = self.create_chat_session( CHAT_ASSISTANT_ID, CHAT_SESSION_NAME ) self.CHAT_ID = create_chat_session.get("data", {}).get("id") # Get the response from RAGFlow RAGFLOW_RESPONSE = self.chat_session( CHAT_ASSISTANT_ID, self.CHAT_ID, user_question ) try: RAGFLOW_DOCS = "\n".join( [ f"[{doc['doc_name']}](http://osl-rag1.mnemonic.no/document/{doc['doc_id']}?ext=pdf&prefix=document)" for doc in RAGFLOW_RESPONSE["data"]["reference"]["doc_aggs"] ] ) except: pass RAGFLOW_RESPONSE_ANSWER = RAGFLOW_RESPONSE.get("data", {}).get( "answer", None ) self.delete_chat_session(CHAT_ASSISTANT_ID, self.CHAT_ID) # return f"{RAGFLOW_RESPONSE}" if self.valves.RAGFLOW_REFERENCE == "Yes": # Extract reference chunks and format them RAGFLOW_RESPONSE_CLEAN = re.sub( r"##(\d{1,2})\$\$", r"#\1", RAGFLOW_RESPONSE_ANSWER ) reference_list = [] reference_number = 0 for chunk in RAGFLOW_RESPONSE["data"]["reference"]["chunks"]: document_name = chunk["document_name"] content = chunk["content"] formatted_entry = f"**\#{reference_number}:**\nFile name:{document_name}\n{content}" reference_list.append(formatted_entry) reference_number += 1 # Join all formatted entries into a single string RAGFLOW_CHUNKS = "\n\n".join(reference_list) if ( "The answer you are looking for is not found in the knowledge base" in RAGFLOW_RESPONSE_CLEAN ): return f"{RAGFLOW_RESPONSE_CLEAN}" return f"{RAGFLOW_RESPONSE_CLEAN}\n\n**Documents referenced:**\n{RAGFLOW_DOCS}\n\n**Chunks used from knowledge base:**\n{RAGFLOW_CHUNKS}" RAGFLOW_RESPONSE_CLEAN = re.sub( r"##\d{1,2}\$\$", "", RAGFLOW_RESPONSE_ANSWER ) if ( "The answer you are looking for is not found in the knowledge base" in RAGFLOW_RESPONSE_CLEAN ): return f"{RAGFLOW_RESPONSE_CLEAN}" return ( f"{RAGFLOW_RESPONSE_CLEAN}\n\n**Documents referenced:**\n{RAGFLOW_DOCS}" ) except Exception as e: self.delete_chat_session(CHAT_ASSISTANT_ID, self.CHAT_ID) return f"Error: {e}"