Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.2
DIFY
Function ID
dify_manifold
Creator
@xuzhougneg
Downloads
380+
Very good compatibility to DIFY backend
Get
README
No README available
Function Code
Show
""" title: DIFY Manifold Pipe authors: xuzhougeng author_url: https://github.com/xuzhougeng funding_url: https://github.com/open-webui version: 0.1.2 description: 该流程用于DIFY的API接口,用于与DIFY的API进行交互 """ import os import requests import json import time from typing import List, Union, Generator, Iterator, Optional from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message from open_webui.config import UPLOAD_DIR import base64 import tempfile def get_file_extension(file_name: str) -> str: """ 获取文件的扩展名 os.path.splitext(file_name) 返回一个元组, 第一个元素是文件名, 第二个元素是扩展名 """ return os.path.splitext(file_name)[1].strip(".") # 黑魔法, 从__event_emitter__中获取闭包变量 def get_closure_info(func): # 获取函数的闭包变量 if hasattr(func, "__closure__") and func.__closure__: for cell in func.__closure__: if isinstance(cell.cell_contents, dict): return cell.cell_contents return None class Pipe: class Valves(BaseModel): # 环境变量的设置 DIFY_BASE_URL: str = Field(default="https://api.dify.ai/v1") DIFY_KEY: str = Field(default="") FILE_SERVER: str = Field(default="") def __init__(self): self.type = "manifold" self.id = "dify" self.name = "dify/" self.valves = self.Valves(**{"DIFY_KEY": os.getenv("DIFY_KEY", "")}) # 存储格式: # { # "chat_id_1": { # "dify_conversation_id": "xxx", # "messages": [{"chat_message_id_1": "dify_message_id_1"}, ...] # } # } self.chat_message_mapping = {} # 存储格式: # { # "chat_id_1": "gpt-3.5-turbo", # "chat_id_2": "gpt-4" # } self.dify_chat_model = {} # 存储格式: # { # "chat_id_1": { # "file_id1":{ # "local_file_path": "/path/to/file1.pdf", # "dify_file_id": "dify_file_123", # "file_name": "file1.pdf" # }, # "file_id2":{ # "local_file_path": "/path/to/file2.jpg", # "dify_file_id": "dify_file_456", # "file_name": "file2.jpg" # } # } # } self.dify_file_list = {} self.data_cache_dir = "data/dify" self.load_state() def save_state(self): """持久化Dify相关的状态变量到文件""" os.makedirs(self.data_cache_dir, exist_ok=True) # chat_message_mapping.json chat_mapping_file = os.path.join( self.data_cache_dir, "chat_message_mapping.json" ) with open(chat_mapping_file, "w", encoding="utf-8") as f: json.dump(self.chat_message_mapping, f, ensure_ascii=False, indent=2) # chat_model.json chat_model_file = os.path.join(self.data_cache_dir, "chat_model.json") with open(chat_model_file, "w", encoding="utf-8") as f: json.dump(self.dify_chat_model, f, ensure_ascii=False, indent=2) # file_list.json file_list_file = os.path.join(self.data_cache_dir, "file_list.json") with open(file_list_file, "w", encoding="utf-8") as f: json.dump(self.dify_file_list, f, ensure_ascii=False, indent=2) def load_state(self): """从文件加载Dify相关的状态变量""" try: # chat_message_mapping.json chat_mapping_file = os.path.join( self.data_cache_dir, "chat_message_mapping.json" ) if os.path.exists(chat_mapping_file): with open(chat_mapping_file, "r", encoding="utf-8") as f: self.chat_message_mapping = json.load(f) else: self.chat_message_mapping = {} # chat_model.json chat_model_file = os.path.join(self.data_cache_dir, "chat_model.json") if os.path.exists(chat_model_file): with open(chat_model_file, "r", encoding="utf-8") as f: self.dify_chat_model = json.load(f) else: self.dify_chat_model = {} # file_list.json file_list_file = os.path.join(self.data_cache_dir, "file_list.json") if os.path.exists(file_list_file): with open(file_list_file, "r", encoding="utf-8") as f: self.dify_file_list = json.load(f) else: self.dify_file_list = {} except Exception as e: print(f"加载Dify状态文件失败: {e}") # 加载失败时使用空字典 self.chat_message_mapping = {} self.dify_chat_model = {} self.dify_file_list = {} def get_models(self): """ 获取DIFY的模型列表 """ return [ {"id": "claude3.5-haiku", "name": "claude3.5-haiku"}, {"id": "claude3.5-sonnet", "name": "claude3.5-sonnet"}, {"id": "gpt4o", "name": "gpt4o"}, {"id": "gpt4o-mini", "name": "gpt4o-mini"}, ] def upload_file(self, user_id: str, file_path: str, mime_type: str) -> str: """ 这个函数负责上传文件到DIFY的服务器, 并返回文件的ID """ import requests url = f"{self.valves.DIFY_BASE_URL}/files/upload" headers = { "Authorization": f"Bearer {self.valves.DIFY_KEY}", } file_name = os.path.basename(file_path) files = { # 文件字段:(文件名, 文件对象, MIME类型) "file": (file_name, open(file_path, "rb"), mime_type), # 普通表单字段:(None, 值) "user": (None, user_id), } response = requests.post(url, headers=headers, files=files) file_id = response.json()["id"] # Optional: print response return file_id def is_doc_file(self, file_path: str) -> bool: """ 判断文件是否是文档文件 'TXT', 'MD', 'MARKDOWN', 'PDF', 'HTML', 'XLSX', 'XLS', 'DOCX', 'CSV', 'EML', 'MSG', 'PPTX', 'PPT', 'XML', 'EPUB' """ file_extension = get_file_extension(file_path).upper() if file_extension in [ "PDF", "XLSX", "XLS", "DOCX", "EML", "MSG", "PPTX", "PPT", "XML", "EPUB", ]: return True return False def is_text_file(self, mime_type: str) -> bool: """ 判断文件是否是文本文件 """ if "text" in mime_type: return True return False def is_audio_file(self, file_path: str) -> bool: """ 判断文件是否是音频文件 'MP3', 'M4A', 'WAV', 'WEBM', 'AMR'. """ if get_file_extension(file_path).upper() in [ "MP3", "M4A", "WAV", "WEBM", "AMR", ]: return True return False def is_video_file(self, file_path: str) -> bool: """ 判断文件是否是视频文件 'MP4', 'MOV', 'MPEG', 'MPGA' """ if get_file_extension(file_path).upper() in ["MP4", "MOV", "MPEG", "MPGA"]: return True return False def upload_text_file(self, user_id: str, file_path: str) -> str: """ 上传文本文件到服务器,第一行添加文件名 支持类型: 纯文本文件 Args: file_path: 文本文件路径 user_id: 用户ID Returns: str: 上传后的文件ID """ try: # 获取原始文件名 filename = os.path.basename(file_path) # 读取原文件内容 with open(file_path, "r", encoding="utf-8") as f: content = f.read() # 创建带文件名标记的新内容 new_content = f"#{filename}\n{content}" # 创建临时文件 with tempfile.NamedTemporaryFile( delete=False, suffix=".txt", mode="w", encoding="utf-8" ) as tmp_file: tmp_file.write(new_content) temp_file_path = tmp_file.name try: # 上传文件 file_id = self.upload_file(user_id, temp_file_path, "text/plain") return file_id finally: # 清理临时文件 os.remove(temp_file_path) except UnicodeDecodeError: raise ValueError("文件编码不是UTF-8格式") except Exception as e: raise ValueError(f"处理文本文件失败: {str(e)}") def upload_images(self, image_data_base64: str, user_id: str) -> str: """ 上传 base64 编码的图片到 DIFY 服务器,返回图片路径 支持类型: 'JPG', 'JPEG', 'PNG', 'GIF', 'WEBP', 'SVG' """ try: # Remove the data URL scheme prefix if present if image_data_base64.startswith("data:"): # Extract the base64 data after the comma image_data_base64 = image_data_base64.split(",", 1)[1] # 解码 base64 图像数据 image_data = base64.b64decode(image_data_base64) # Create and save temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp_file: tmp_file.write(image_data) temp_file_path = tmp_file.name try: file_id = self.upload_file(user_id, temp_file_path, "image/png") finally: os.remove(temp_file_path) return file_id except Exception as e: raise ValueError(f"Failed to process base64 image data: {str(e)}") def pipes(self) -> List[dict]: return self.get_models() def pipe(self, body: dict, __event_emitter__: dict = None, __user__: Optional[dict] = None, __task__=None) -> Union[str, Generator, Iterator]: """处理对话请求""" # 获取模型名称 model_name = body["model"][body["model"].find(".") + 1 :] # 处理特殊任务 if __task__ is not None: if __task__ == "title_generation": return model_name elif __task__ == "tags_generation": return f'{{"tags":[{model_name}]}}' # 获取当前用户 current_user = __user__["email"] # 处理系统消息和普通消息 system_message, messages = pop_system_message(body["messages"]) print(f"system_message:{system_message}") print(f"messages:{messages}, {len(messages)}") # 从event_emitter获取chat_id和message_id cell_contents = get_closure_info(__event_emitter__) chat_id = cell_contents["chat_id"] message_id = cell_contents["message_id"] # 处理对话模型和上下文 parent_message_id = None # 在pipe函数中修改对话历史的处理逻辑 if len(messages) == 1: # 新对话逻辑保持不变 self.dify_chat_model[chat_id] = model_name self.chat_message_mapping[chat_id] = { "dify_conversation_id": "", "messages": [] } self.dify_file_list[chat_id] = {} else: # 检查是否存在历史记录 if chat_id in self.chat_message_mapping: # 首先验证模型 if chat_id in self.dify_chat_model: if self.dify_chat_model[chat_id] != model_name: raise ValueError(f"Cannot change model in an existing conversation. This conversation was started with {self.dify_chat_model[chat_id]}") else: # 如果somehow没有记录模型(异常情况),记录当前模型 self.dify_chat_model[chat_id] = model_name chat_history = self.chat_message_mapping[chat_id]["messages"] current_msg_index = len(messages) - 1 # 当前消息的索引 # 如果不是第一条消息,获取前一条消息的dify_id作为parent if current_msg_index > 0 and len(chat_history) >= current_msg_index: previous_msg = chat_history[current_msg_index - 1] parent_message_id = list(previous_msg.values())[0] # 关键修改:截断当前位置之后的消息历史 self.chat_message_mapping[chat_id]["messages"] = chat_history[:current_msg_index] # 获取最后一条消息作为query message = messages[-1] query = "" inputs = {"model": model_name} file_list = [] # 处理消息内容 if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": query += item["text"] if item["type"] == "image_url": upload_file_id = self.upload_images(item["image_url"]["url"], current_user) upload_file_dict = { "type": "image", "transfer_method": "local_file", "url": "", "upload_file_id": upload_file_id } file_list.append(upload_file_dict) else: query = message.get("content", "") # 处理文件上传 if "upload_files" in body: for file in body["upload_files"]: if file["type"] != "file": continue file_id = file["id"] if chat_id in self.dify_file_list and file_id in self.dify_file_list[chat_id]: file_list.append(self.dify_file_list[chat_id][file_id]) continue # 获取文件信息并上传 if "collection_name" in file: file_path = os.path.join(UPLOAD_DIR, file["file"]["filename"]) else: file_path = file["file"]["path"] file_mime_type = file["file"]["meta"]["content_type"] upload_file_dict = { "transfer_method": "local_file", "url": "", } # 处理不同类型的文件 if self.is_doc_file(file_path): upload_file_id = self.upload_file(current_user, file_path, file_mime_type) upload_file_dict.update({"type": "document", "upload_file_id": upload_file_id}) elif self.is_text_file(file_mime_type): upload_file_id = self.upload_text_file(current_user, file_path) upload_file_dict.update({"type": "document", "upload_file_id": upload_file_id}) elif self.is_audio_file(file_path): upload_file_id = self.upload_file(current_user, file_path, file_mime_type) upload_file_dict.update({"type": "audio", "upload_file_id": upload_file_id}) elif self.is_video_file(file_path): upload_file_id = self.upload_file(current_user, file_path, file_mime_type) upload_file_dict.update({"type": "video", "upload_file_id": upload_file_id}) else: raise ValueError(f"Unsupported file type: {file_path}") file_list.append(upload_file_dict) if chat_id not in self.dify_file_list: self.dify_file_list[chat_id] = {} self.dify_file_list[chat_id][file_id] = upload_file_dict # 准备请求载荷 payload = { "inputs": inputs, "parent_message_id": parent_message_id, "query": query, "response_mode": "streaming" if body.get("stream", False) else "blocking", "conversation_id": self.chat_message_mapping[chat_id].get("dify_conversation_id", ""), "user": current_user, "files": file_list, } # 设置请求头 headers = { "Authorization": f"Bearer {self.valves.DIFY_KEY}", "content-type": "application/json", } url = f"{self.valves.DIFY_BASE_URL}/chat-messages" try: if body.get("stream", False): return self.stream_response(url, headers, payload, chat_id, message_id) else: return self.non_stream_response(url, headers, payload, chat_id, message_id) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" def stream_response(self, url, headers, payload, chat_id, message_id): """处理流式响应""" try: with requests.post(url, headers=headers, json=payload, stream=True, timeout=(3.05, 60)) as response: if response.status_code != 200: raise Exception(f"HTTP Error {response.status_code}: {response.text}") for line in response.iter_lines(): if line: line = line.decode("utf-8") if line.startswith("data: "): try: data = json.loads(line[6:]) event = data.get("event") if event == "message": # 处理普通文本消息 yield data.get("answer", "") elif event == "message_file": # 处理文件(图片)消息 pass elif event == "message_end": # 保存会话和消息ID映射 dify_conversation_id = data.get("conversation_id", "") dify_message_id = data.get("message_id", "") self.chat_message_mapping[chat_id]["dify_conversation_id"] = dify_conversation_id self.chat_message_mapping[chat_id]["messages"].append({message_id: dify_message_id}) # 保存状态 self.save_state() break elif event == "error": # 处理错误 error_msg = f"Error {data.get('status')}: {data.get('message')} ({data.get('code')})" yield f"Error: {error_msg}" break time.sleep(0.01) except json.JSONDecodeError: print(f"Failed to parse JSON: {line}") except KeyError as e: print(f"Unexpected data structure: {e}") print(f"Full data: {data}") except requests.exceptions.RequestException as e: print(f"Request failed: {e}") yield f"Error: Request failed: {e}" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" def non_stream_response(self, url, headers, payload, chat_id, message_id): """处理非流式响应""" try: response = requests.post(url, headers=headers, json=payload, timeout=(3.05, 60)) if response.status_code != 200: raise Exception(f"HTTP Error {response.status_code}: {response.text}") res = response.json() # 保存会话和消息ID映射 dify_conversation_id = res.get("conversation_id", "") dify_message_id = res.get("message_id", "") self.chat_message_mapping[chat_id]["dify_conversation_id"] = dify_conversation_id self.chat_message_mapping[chat_id]["messages"].append({message_id: dify_message_id}) # 保存状态 self.save_state() return res.get("answer", "") except requests.exceptions.RequestException as e: print(f"Failed non-stream request: {e}") return f"Error: {e}"