We're Hiring!
Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.0
LangChain Pipe
Last Updated
a year ago
Created
a year ago
Function ID
langchain_pipe
Creator
@colbysawyer
Downloads
1.7K+
Get
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
Description
Example Pipe Function to utilize LangChain
README
No README available
Function Code
Show
""" title: LangChain Pipe Function author: Colby Sawyer @ Attollo LLC (mailto:colby.sawyer@attollodefense.com) author_url: https://github.com/ColbySawyer7 version: 0.1.0 This module defines a Pipe class that utilizes LangChain """ from typing import Optional, Callable, Awaitable from pydantic import BaseModel, Field import os import time # import LangChain dependencies from langchain_core.prompts import ChatPromptTemplate from langchain.schema import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langchain_community.llms import Ollama # Uncomment to use OpenAI and FAISS #from langchain_openai import ChatOpenAI #from langchain_community.vectorstores import FAISS class Pipe: class Valves(BaseModel): base_url: str = Field(default="http://localhost:11434") ollama_embed_model: str = Field(default="nomic-embed-text") ollama_model: str = Field(default="llama3.1") openai_api_key: str = Field(default="...") openai_model: str = Field(default="gpt3.5-turbo") emit_interval: float = Field( default=2.0, description="Interval in seconds between status emissions" ) enable_status_indicator: bool = Field( default=True, description="Enable or disable status indicator emissions" ) def __init__(self): self.type = "pipe" self.id = "langchain_pipe" self.name = "LangChain Pipe" self.valves = self.Valves() self.last_emit_time = 0 pass async def emit_status( self, __event_emitter__: Callable[[dict], Awaitable[None]], level: str, message: str, done: bool, ): current_time = time.time() if ( __event_emitter__ and self.valves.enable_status_indicator and ( current_time - self.last_emit_time >= self.valves.emit_interval or done ) ): await __event_emitter__( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) self.last_emit_time = current_time async def pipe(self, body: dict, __user__: Optional[dict] = None, __event_emitter__: Callable[[dict], Awaitable[None]] = None, __event_call__: Callable[[dict], Awaitable[dict]] = None, ) -> Optional[dict]: await self.emit_status( __event_emitter__, "info", "/initiating Chain", False ) # ====================================================================================================================================== # MODEL SETUP # ====================================================================================================================================== # Setup the model for generating responses # ========================================================================== # Ollama Usage _model = Ollama( model=self.valves.ollama_model, base_url=self.valves.base_url ) # ========================================================================== # OpenAI Usage # _model = ChatOpenAI( # openai_api_key=self.valves.openai_api_key, # model=self.valves.openai_model # ) # ========================================================================== # Example usage of FAISS for retreival # vectorstore = FAISS.from_texts( # texts, embedding=OpenAIEmbeddings(openai_api_key=self.valves.openai_api_key) # ) # ====================================================================================================================================== # PROMPTS SETUP # ========================================================================== _prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful bot"), ("human", "{question}") ]) # ====================================================================================================================================== # CHAIN SETUP # ========================================================================== # Basic Chain chain = ( {"question": RunnablePassthrough()} | _prompt | _model | StrOutputParser() ) # ====================================================================================================================================== # Langchain Calling # ====================================================================================================================================== await self.emit_status( __event_emitter__, "info", "Starting Chain", False ) messages = body.get("messages", []) # Verify a message is available if messages: question = messages[-1]["content"] try: # Invoke Chain response = chain.invoke(question) # Set assitant message with chain reply body["messages"].append({"role": "assistant", "content": response}) except Exception as e: await self.emit_status(__event_emitter__, "error", f"Error during sequence execution: {str(e)}", True) return {"error": str(e)} # If no message is available alert user else: await self.emit_status(__event_emitter__, "error", "No messages found in the request body", True) body["messages"].append({"role": "assistant", "content": "No messages found in the request body"}) await self.emit_status(__event_emitter__, "info", "Complete", True) return response