"""
title: Dify_Workflow
authors: JunghyunMoon
author_url: https://github.com/JungHyunMoon
funding_url: https://github.com/open-webui
version: 0.1
description: This function is for the DIFY API interface, used to connect with Dify's workflows
"""
from typing import List, Union, Generator, Iterator, Optional
from pydantic import BaseModel, Field
import requests
import json
import time
class Pipe:
class Valves(BaseModel):
DIFY_API_URL: str = Field(default="http://192.168.2.56/v1")
DIFY_API_KEY: str = Field(default="app-Fx3d3owfIYktuKFPatL6Myyl")
IS_STREAMING: bool = Field(default=True)
DEBUG: bool = Field(default=False)
def __init__(self):
self.valves = self.Valves()
self.name = "Dify Agent - v1"
# 2중 딕셔너리: user_id -> { chat_id -> conversation_id }
self.conversation_ids = {}
# user_id 별로 마지막에 사용한 chat_id 기억
self.latest_chat_id_by_user = {}
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""
- body 안에 있는 chat_id, user_id를 추출
- 2중 딕셔너리(conversation_ids) 구조를 미리 초기화
- user가 pipe()를 호출할 때 사용해야 하는 chat_id를 latest_chat_id_by_user에 저장
"""
if self.valves.DEBUG:
print("[DEBUG][inlet] Start inlet method.")
print(f"[DEBUG][inlet] Received user: {user}")
print(f"[DEBUG][inlet] Received body: {body}")
print("====================================================")
user_id = user.get("id") if user else None
chat_id = body.get("chat_id")
if user_id is None:
# user_id가 없으면 식별 불가
if self.valves.DEBUG:
print("[DEBUG][inlet] No user_id found, returning body as is.")
return body
# conversation_ids 안에 user_id 키가 없으면 딕셔너리 초기화
if user_id not in self.conversation_ids:
self.conversation_ids[user_id] = {}
if self.valves.DEBUG:
print(f"[DEBUG][inlet] Initialized conversation_ids[{user_id}] as empty dict.")
# 해당 user_id에 대해 아직 chat_id가 없으면 ""로 초기화
if chat_id and chat_id not in self.conversation_ids[user_id]:
self.conversation_ids[user_id][chat_id] = ""
if self.valves.DEBUG:
print(f"[DEBUG][inlet] Set conversation_ids[{user_id}][{chat_id}] = '' (no conv_id yet)")
# 해당 user가 마지막으로 사용한 chat_id 갱신
if chat_id:
self.latest_chat_id_by_user[user_id] = chat_id
if self.valves.DEBUG:
print(f"[DEBUG][inlet] latest_chat_id_by_user[{user_id}] = {chat_id}")
if self.valves.DEBUG:
print("[DEBUG][inlet] Finished inlet method.")
print("====================================================")
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""
- outlet에서 별도 처리할 로직이 없다면 그대로 반환만
"""
if self.valves.DEBUG:
print("[DEBUG][outlet] Start outlet method.")
print(f"[DEBUG][outlet] Received body: {body}")
print(f"[DEBUG][outlet] Received user: {user}")
print("[DEBUG][outlet] Finished outlet method.")
print("====================================================")
return body
def pipe(
self,
user_message: str,
model_id: str,
messages: List[dict],
body: dict
) -> Union[str, Generator, Iterator]:
# 디버그 모드면 자세히 출력
if self.valves.DEBUG:
print("[DEBUG][pipe] Start pipe method.")
print(f"[DEBUG][pipe] user_message: {user_message}")
print(f"[DEBUG][pipe] model_id: {model_id}")
print(f"[DEBUG][pipe] messages: {messages}")
print(f"[DEBUG][pipe] body: {body}")
print("====================================================")
# 1) streaming / blocking 모드 결정
response_mode = "streaming" if self.valves.IS_STREAMING else "blocking"
if self.valves.DEBUG:
print(f"[DEBUG][pipe] response_mode set to: {response_mode}")
# 2) user_id 추출
user_data = body.get("user", {})
user_id = user_data.get("id")
if not user_id:
if self.valves.DEBUG:
print("[DEBUG][pipe] Error: No user_id found in body.")
return "Error: No user_id found in body."
# 3) user_id로부터 chat_id 가져오기
chat_id = self.latest_chat_id_by_user.get(user_id, "default_chat")
if self.valves.DEBUG:
print(f"[DEBUG][pipe] Using chat_id: {chat_id}")
# 4) 기존 conversation_id가 있으면 사용, 없으면 ""
conversation_id = ""
if user_id in self.conversation_ids:
conversation_id = self.conversation_ids[user_id].get(chat_id, "")
if self.valves.DEBUG:
print(f"[DEBUG][pipe] Current conversation_id: {conversation_id}")
# 5) API 요청 payload
headers = {
'Authorization': f'Bearer {self.valves.DIFY_API_KEY}',
'Content-Type': 'application/json'
}
payload = {
"inputs": {},
"query": user_message,
"response_mode": response_mode,
"conversation_id": conversation_id,
"user": user_data.get("email", ""),
}
if self.valves.DEBUG:
print("[DEBUG][pipe] Final payload for Dify API:")
print(json.dumps(payload, indent=2, ensure_ascii=False))
# 6) 호출
try:
if response_mode == "streaming":
return self.streaming_response(headers, payload, user_id, chat_id)
else:
return self.blocking_response(headers, payload, user_id, chat_id)
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: Request failed: {e}"
except Exception as e:
print(f"General error in pipe method: {e}")
return f"Error: {e}"
finally:
if self.valves.DEBUG:
print("[DEBUG][pipe] Finished pipe method.")
print("====================================================")
def streaming_response(self, headers, payload, user_id: str, chat_id: str):
url = f"{self.valves.DIFY_API_URL}/chat-messages"
if self.valves.DEBUG:
print("[DEBUG][streaming_response] Start streaming_response method.")
print(f"[DEBUG][streaming_response] URL: {url}")
try:
with requests.post(url, headers=headers, json=payload, stream=True, timeout=60) as response:
if response.status_code != 200:
msg = f"HTTP Error {response.status_code}: {response.text}"
if self.valves.DEBUG:
print(f"[DEBUG][streaming_response] {msg}")
raise Exception(msg)
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
try:
data = json.loads(line[6:])
event = data.get("event")
if event == "message":
yield data.get("answer", "")
elif event == "workflow_finished":
if "conversation_id" in data:
new_conv_id = data["conversation_id"]
self._update_conversation_id(user_id, chat_id, new_conv_id)
if self.valves.DEBUG:
print(f"[DEBUG][streaming_response] Updated conv_id to: {new_conv_id}")
print("[DEBUG][streaming_response] Workflow finished. Yielding extra message.")
elif event == "message_end":
if self.valves.DEBUG:
print("[DEBUG][streaming_response] Received message_end event. Break stream.")
break
elif event == "error":
error_msg = (
f"Error {data.get('status')}: {data.get('message')} "
f"({data.get('code')})"
)
yield f"Error: {error_msg}"
break
time.sleep(0.01)
except json.JSONDecodeError:
print(f"Failed to parse JSON: {line}")
except KeyError as e:
if self.valves.DEBUG:
print(f"[DEBUG][streaming_response] Unexpected data structure. Missing key: {e}")
print(f"[DEBUG][streaming_response] Full data: {data}")
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
yield f"Error: Request failed: {e}"
except Exception as e:
print(f"General error in streaming_response method: {e}")
yield f"Error: {e}"
finally:
if self.valves.DEBUG:
print("[DEBUG][streaming_response] Finished streaming_response method.")
print("====================================================")
def blocking_response(self, headers, payload, user_id: str, chat_id: str):
url = f"{self.valves.DIFY_API_URL}/chat-messages"
if self.valves.DEBUG:
print("[DEBUG][blocking_response] Start blocking_response method.")
print(f"[DEBUG][blocking_response] URL: {url}")
print(f"[DEBUG][blocking_response] Payload: {payload}")
try:
response = requests.post(url, headers=headers, json=payload, timeout=30)
if response.status_code == 200:
try:
json_resp = response.json()
if self.valves.DEBUG:
print("[DEBUG][blocking_response] Response JSON:", json_resp)
# 새로운 conversation_id가 있으면 저장
if "conversation_id" in json_resp:
new_conv_id = json_resp["conversation_id"]
self._update_conversation_id(user_id, chat_id, new_conv_id)
if self.valves.DEBUG:
print(f"[DEBUG][blocking_response] Updated conv_id to: {new_conv_id}")
result = json_resp.get("answer")
if result:
return str(result)
else:
return "No Key answer"
except Exception as e:
if self.valves.DEBUG:
print(f"[DEBUG][blocking_response] Error parsing JSON: {e}")
return f"Error: {e}"
else:
err_msg = f"HTTP Error {response.status_code}: {response.text}"
if self.valves.DEBUG:
print(f"[DEBUG][blocking_response] {err_msg}")
return err_msg
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return f"Error: Request failed: {e}"
finally:
if self.valves.DEBUG:
print("[DEBUG][blocking_response] Finished blocking_response method.")
print("====================================================")
def _update_conversation_id(self, user_id: str, chat_id: str, new_conv_id: str):
"""
내부적으로 2중 딕셔너리에 conversation_id를 업데이트하는 헬퍼 함수
"""
if self.valves.DEBUG:
print("[DEBUG][_update_conversation_id] Start updating conversation_id.")
print(f"[DEBUG][_update_conversation_id] user_id: {user_id}, chat_id: {chat_id}, new_conv_id: {new_conv_id}")
if user_id not in self.conversation_ids:
self.conversation_ids[user_id] = {}
if not chat_id:
chat_id = "default_chat"
self.conversation_ids[user_id][chat_id] = new_conv_id
if self.valves.DEBUG:
print(f"[DEBUG][_update_conversation_id] conversation_ids[{user_id}][{chat_id}] = {new_conv_id}")
print("[DEBUG][_update_conversation_id] Finished updating conversation_id.")
print("====================================================")