"""
title: Langflow Pipe Function
author: moss
version: 0.7.0
This module defines a Pipe class that integrates Open WebUI with Langflow.
"""
from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import requests
import time
class Pipe:
class Valves(BaseModel):
langflow_base_url: str = Field(
default="http://192.168.1.212:7860/api/v1/run",
description="The base URL of the Langflow API endpoint (e.g., 'http://192.168.1.212:7860/api/v1/run'). Do not include the workflow ID here.",
)
workflow_id: str = Field(
default="afd2bfb8-b03c-4a74-9608-b9a0422c4379",
description="The unique identifier of the Langflow workflow (e.g., 'afd2bfb8-b03c-4a74-9608-b9a0422c4379').",
)
api_key: str = Field(
default="",
description="API key for authenticating with Langflow. Leave empty if no API key is required.",
)
input_field: str = Field(
default="input_value",
description="The field name for the input message in the Langflow API request (e.g., 'input_value').",
)
response_field: str = Field(
default="[0]['message']",
description="The path to the chat text in the Langflow API response (e.g., '[0][\"message\"]'). If unsure, inspect the response structure using the debug logs.",
)
emit_interval: float = Field(
default=2.0,
description="Interval in seconds between status emissions (e.g., 2.0).",
)
enable_status_indicator: bool = Field(
default=True,
description="Enable or disable status indicator emissions (e.g., True or False).",
)
def __init__(self):
self.type = "pipe"
self.id = "langflow_pipe"
self.name = "Langflow Pipe"
self.valves = self.Valves()
self.last_emit_time = 0
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
current_time = time.time()
if (
__event_emitter__
and self.valves.enable_status_indicator
and (
current_time - self.last_emit_time >= self.valves.emit_interval or done
)
):
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
self.last_emit_time = current_time
async def pipe(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "Calling Langflow Workflow...", False
)
messages = body.get("messages", [])
if messages:
user_message = messages[-1]["content"]
try:
# Construct the full URL using the base URL and workflow ID
full_url = f"{self.valves.langflow_base_url}/{self.valves.workflow_id}"
headers = {
"Content-Type": "application/json",
"Authorization": (
f"Bearer {self.valves.api_key}" if self.valves.api_key else None
),
}
payload = {
self.valves.input_field: user_message,
"output_type": "chat",
"input_type": "chat",
"tweaks": {
"Agent-MwX6S": {},
"ChatInput-tdORN": {},
"ChatOutput-5onBh": {},
"TavilyAISearch-hqJ7Z": {},
},
}
response = requests.post(full_url, json=payload, headers=headers)
if response.status_code == 200:
# Inspect the response structure
response_data = response.json()
print(
"Langflow Response:", response_data
) # Debugging: Log the response
# Extract the chat text from the response structure
if isinstance(response_data, list) and len(response_data) > 0:
first_item = response_data[0]
if isinstance(first_item, dict) and "message" in first_item:
langflow_response = first_item["message"]
else:
langflow_response = (
"Invalid response structure: 'message' key not found"
)
else:
langflow_response = (
"Invalid response structure: expected a non-empty list"
)
# Append the assistant's response to the chat
body["messages"].append(
{"role": "assistant", "content": langflow_response}
)
else:
# Handle API errors
error_message = f"Error: {response.status_code} - {response.text}"
print(
"Langflow API Error:", error_message
) # Debugging: Log the error
raise Exception(error_message)
except Exception as e:
await self.emit_status(
__event_emitter__,
"error",
f"Error during Langflow execution: {str(e)}",
True,
)
return {"error": str(e)}
else:
await self.emit_status(
__event_emitter__,
"error",
"No messages found in the request body",
True,
)
body["messages"].append(
{
"role": "assistant",
"content": "No messages found in the request body",
}
)
await self.emit_status(__event_emitter__, "info", "Complete", True)
return body