"""
title: Ollama Api Facade Metadata
author: Gregor Biswanger
author_url: https://github.com/GregorBiswanger/OllamaApiFacade
version: 1.0.0
date: 2024-11-28
license: Apache 2.0
description: A Pipe that enhances .NET Ollama API Facade requests by including additional metadata fields such as id, chat_id, and session_id for advanced context handling.
"""
from pydantic import BaseModel, Field
from typing import Union, AsyncGenerator
import aiohttp
import json
import requests
class Pipe:
class Valves(BaseModel):
OLLAMA_API_BASE_URL: str = Field(
default="http://host.docker.internal:11434",
description="Base URL for accessing the Ollama API Facade.",
)
def __init__(self):
self.type = "manifold"
self.valves = self.Valves()
def pipes(self):
try:
headers = {"Content-Type": "application/json"}
response = requests.get(
f"{self.valves.OLLAMA_API_BASE_URL}/api/tags", headers=headers
)
response.raise_for_status()
models = response.json().get("models", [])
print(f"Debug: Available models: {models}")
return [{"id": model["model"], "name": model["name"]} for model in models]
except Exception as e:
print(f"Error fetching models: {e}")
return [{"id": "error", "name": "Failed to fetch models."}]
async def pipe(
self, body: dict, __user__: dict, __metadata__: dict
) -> AsyncGenerator[str, None]:
headers = {"Content-Type": "application/json"}
model_name = body["model"].split(".")[-1]
transformed_body = {
"stream": True,
"model": model_name,
"messages": body.get("messages", []),
"options": body.get("options", {}),
"session_id": __metadata__.get("session_id"),
"chat_id": __metadata__.get("chat_id"),
"id": __user__.get("id"),
"max_tokens": body.get("max_tokens", 256),
}
print(f"Request to Ollama: {json.dumps(transformed_body, indent=2)}")
try:
async with aiohttp.ClientSession() as session:
async with session.post(
url=f"{self.valves.OLLAMA_API_BASE_URL}/api/chat",
json=transformed_body,
headers=headers,
) as response:
response.raise_for_status()
print("Debug: Starting stream processing...")
async for line in response.content:
line = line.decode("utf-8").strip()
if line:
try:
parsed_line = json.loads(line)
print(f"Debug: Received data: {parsed_line}")
if (
"message" in parsed_line
and "content" in parsed_line["message"]
):
yield parsed_line["message"]["content"]
except json.JSONDecodeError as e:
print(f"Error parsing line: {e}")
continue
except aiohttp.ClientError as e:
print(f"Error during request: {e}")
yield json.dumps({"error": f"Request error: {e}"})