Function
pipe
v0.1.0
LangChain Pipe
Example Pipe Function to utilize LangChain
Function ID
langchain_pipe
Creator
@colbysawyer
Downloads
172+

Function Content
python
"""
title: LangChain Pipe Function
author: Colby Sawyer @ Attollo LLC (mailto:colby.sawyer@attollodefense.com)
author_url: https://github.com/ColbySawyer7
version: 0.1.0

This module defines a Pipe class that utilizes LangChain
"""

from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time

# import LangChain dependencies
from langchain_core.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_community.llms import Ollama
# Uncomment to use OpenAI and FAISS
#from langchain_openai import ChatOpenAI
#from langchain_community.vectorstores import FAISS

class Pipe:
    class Valves(BaseModel):
        base_url: str = Field(default="http://localhost:11434")
        ollama_embed_model: str = Field(default="nomic-embed-text")
        ollama_model: str = Field(default="llama3.1")
        openai_api_key: str = Field(default="...")
        openai_model: str = Field(default="gpt3.5-turbo")
        emit_interval: float = Field(
            default=2.0, description="Interval in seconds between status emissions"
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable or disable status indicator emissions"
        )

    def __init__(self):
        self.type = "pipe"
        self.id = "langchain_pipe"
        self.name = "LangChain Pipe"
        self.valves = self.Valves()
        self.last_emit_time = 0
        pass

    async def emit_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        level: str,
        message: str,
        done: bool,
    ):
        current_time = time.time()
        if (
            __event_emitter__
            and self.valves.enable_status_indicator
            and (
                current_time - self.last_emit_time >= self.valves.emit_interval or done
            )
        ):
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "status": "complete" if done else "in_progress",
                        "level": level,
                        "description": message,
                        "done": done,
                    },
                }
            )
            self.last_emit_time = current_time

    async def pipe(self, body: dict,
             __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
        __event_call__: Callable[[dict], Awaitable[dict]] = None,
        ) -> Optional[dict]:
        await self.emit_status(
            __event_emitter__, "info", "/initiating Chain", False
        )

        # ======================================================================================================================================
        # MODEL SETUP
        # ======================================================================================================================================
        # Setup the model for generating responses
        # ==========================================================================
        # Ollama Usage
        _model = Ollama(
            model=self.valves.ollama_model,
            base_url=self.valves.base_url
        )
        # ==========================================================================
        # OpenAI Usage
        # _model = ChatOpenAI(
        #     openai_api_key=self.valves.openai_api_key,
        #     model=self.valves.openai_model
        # )
        # ==========================================================================

        # Example usage of FAISS for retreival
        # vectorstore = FAISS.from_texts(
        #     texts, embedding=OpenAIEmbeddings(openai_api_key=self.valves.openai_api_key)
        # )

        # ======================================================================================================================================
        # PROMPTS SETUP
        # ==========================================================================
        _prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a helpful bot"),
            ("human", "{question}")
        ])
        # ======================================================================================================================================
        # CHAIN SETUP
        # ==========================================================================
        # Basic Chain
        chain = (
            {"question": RunnablePassthrough()}
            | _prompt
            | _model
            | StrOutputParser()
        )
        # ======================================================================================================================================
        # Langchain Calling
        # ======================================================================================================================================
        await self.emit_status(
            __event_emitter__, "info", "Starting Chain", False
        )
        messages = body.get("messages", [])
        # Verify a message is available
        if messages:
            question = messages[-1]["content"]
            try:
                # Invoke Chain
                response = chain.invoke(question)
                # Set assitant message with chain reply
                body["messages"].append({"role": "assistant", "content": response})
            except Exception as e:
                await self.emit_status(__event_emitter__, "error", f"Error during sequence execution: {str(e)}", True)
                return {"error": str(e)}
        # If no message is available alert user
        else:
            await self.emit_status(__event_emitter__, "error", "No messages found in the request body", True)
            body["messages"].append({"role": "assistant", "content": "No messages found in the request body"})

        await self.emit_status(__event_emitter__, "info", "Complete", True)
        return response