"""
title: openai.ai Compatible OpenAI Pipe
author: Raiyan Hasan
version: 0.1.0
license: MIT
"""
import os
import json
from pydantic import BaseModel, Field
from openai import OpenAI
from typing import List, Union, Iterator
# Set DEBUG to True to enable detailed logging
DEBUG = False
class Pipe:
class Valves(BaseModel):
openai_API_KEY: str = Field(default="") # Optional API key if needed
DEFAULT_MODEL: str = Field(default="openai") # Default model identifier
def __init__(self):
self.id = "openai"
self.type = "manifold"
self.name = "openai: "
self.valves = self.Valves(
**{
"openai_API_KEY": os.getenv("openai_API_KEY", ""),
"DEFAULT_MODEL": os.getenv("openai_DEFAULT_MODEL", "openai"),
}
)
self._init_client()
def _init_client(self):
"""Initialize OpenAI client with openai.ai endpoint"""
self.client = OpenAI(
api_key=self.valves.openai_API_KEY
or "dummy-key", # Some clients require a non-empty API key
base_url="https://text.pollinations.ai/openai",
)
def get_openai_models(self):
"""Return available models - for openai we'll return a fixed list"""
return [{"id": "openai", "name": "openai AI"}]
def pipes(self) -> List[dict]:
return self.get_openai_models()
def pipe(self, body: dict) -> Union[str, Iterator[str]]:
try:
# Use default model ID since openai has a single endpoint
model_id = self.valves.DEFAULT_MODEL
messages = []
# Process messages including system, user, and assistant messages
for message in body["messages"]:
if isinstance(message.get("content"), list):
# For openai, we'll join multiple content parts into a single text
text_parts = []
for content in message["content"]:
if content["type"] == "text":
text_parts.append(content["text"])
elif content["type"] == "image_url":
# openai might not support image inputs - add a note about the image
text_parts.append(f"[Image: {content['image_url']['url']}]")
messages.append(
{"role": message["role"], "content": " ".join(text_parts)}
)
else:
# Handle simple text messages
messages.append(
{"role": message["role"], "content": message["content"]}
)
if DEBUG:
print("openai API request:")
print(" Model:", model_id)
print(" Messages:", json.dumps(messages, indent=2))
# Prepare the API call parameters
kwargs = {
"model": model_id,
"messages": messages,
"temperature": body.get("temperature", 0.7),
"top_p": body.get("top_p", 0.9),
"max_tokens": body.get("max_tokens", 8192),
"stream": body.get("stream", True),
}
# Add stop sequences if provided
if body.get("stop"):
kwargs["stop"] = body["stop"]
if body.get("stream", False):
def stream_generator():
try:
response = self.client.chat.completions.create(**kwargs)
for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
if DEBUG:
print(f"Streaming error: {e}")
yield f"Error during streaming: {str(e)}"
return stream_generator()
else:
response = self.client.chat.completions.create(**kwargs)
return response.choices[0].message.content
except Exception as e:
if DEBUG:
print(f"Error in pipe method: {e}")
return f"Error: {e}"
def health_check(self) -> bool:
"""Check if the openai API is accessible"""
try:
# Simple health check with a basic prompt
response = self.client.chat.completions.create(
model=self.valves.DEFAULT_MODEL,
messages=[{"role": "user", "content": "Hello"}],
max_tokens=5,
)
return True
except Exception as e:
if DEBUG:
print(f"Health check failed: {e}")
return False