We're Hiring!
Whitepaper
Docs
Sign In
@feynman
·
a year ago
function
Vertex AI Streaming
Get
Last Updated
a year ago
Created
a year ago
Function
pipe
v0.0.1
Name
Vertex AI Streaming
Downloads
238+
Saves
0+
Description
Allow Access to vertex models, with streaming (forked from https://openwebui.com/f/aa995/vertex_ai/)
Function Code
Show
""" title: Multi-Model Manifold Pipe author: Feynman Liang (feynman@blueteam.ai) version: 0.0.1 license: MIT """ import json from pydantic import BaseModel, Field from typing import Union, Iterator, List from google.oauth2 import service_account from google.auth.transport.requests import AuthorizedSession # Set DEBUG to True to enable detailed logging DEBUG = True class Pipe: class Valves(BaseModel): PROJECT_ID: str = Field( default="", # Set this to your Project ID description="Your Google Cloud Project ID.", ) LOCATION: str = Field( default="", description="Default region for Vertex AI models." ) SERVICE_ACCOUNT_JSON: str = Field( default="", description="Service Account JSON" ) def __init__(self): self.id = "vertex_ai_streaming" # Set this to your function's ID in OpenWebUI self.type = "manifold" self.name = "Vertex AI Models" self.valves = self.Valves() self.authed_session = None def _ensure_auth(self): credentials_info = json.loads(self.valves.SERVICE_ACCOUNT_JSON) self.credentials = service_account.Credentials.from_service_account_info( credentials_info, scopes=["https://www.googleapis.com/auth/cloud-platform"] ) self.authed_session = AuthorizedSession(self.credentials) def pipes(self) -> List[dict]: # Return a list of models return [ {"id": "gemini-1.5-pro-002", "name": "Gemini 1.5 Pro (Feynman Dev)"}, # {"id": "claude-3-5-sonnet-v2@20241022", "name": "Claude 3.5 Sonnet V2"}, # {"id": "claude-3-haiku@20240307", "name": "Claude 3 Haiku"}, ] def pipe(self, body: dict) -> Union[str, Iterator[str]]: self._ensure_auth() project_id = self.valves.PROJECT_ID default_location = self.valves.LOCATION model_id = body.get("model") stream = body.get("stream", False) if not model_id: return "Error: No model specified." if DEBUG: print(f"Original model_id: {model_id}") # Remove the function ID prefix if present function_id_with_dot = f"{self.id}." if model_id.startswith(function_id_with_dot): model_id = model_id[len(function_id_with_dot) :] if DEBUG: print( f"Removed prefix '{function_id_with_dot}' from model_id, new model_id: {model_id}" ) else: if DEBUG: print("No prefix to remove from model_id.") if DEBUG: print(f"Using model_id: {model_id}") try: # Determine which model is selected and call the appropriate method if model_id.startswith("gemini"): return self.call_gemini_model( body, project_id, default_location, model_id ) elif model_id.startswith("claude"): # Use 'us-east5' for Claude models return self.call_claude_model(body, project_id, "us-east5", model_id) elif model_id.startswith("llama"): return self.call_llama_model( body, project_id, default_location, model_id ) else: return f"Error: Unknown model {model_id}" except Exception as e: if DEBUG: print(f"Error in pipe method: {e}") return f"Error: {e}" def call_gemini_model(self, body, project_id, location, model_id): url = f"https://{location}-aiplatform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/endpoints/openapi/chat/completions" stream = body.get("stream", False) # Prepare the prompt messages messages = body["messages"] formatted_messages = self.format_messages(messages) # Prepare request data data = { "model": f"google/{model_id}", "messages": formatted_messages, "temperature": body.get("temperature", 0.7), "max_tokens": body.get("max_tokens", 512), "top_p": body.get("top_p", 0.8), "top_k": body.get("top_k", 40), "stream": stream, } if DEBUG: print("Request URL:", url) print("Request Data:", json.dumps(data, indent=2)) if stream: return self._stream_response(url, data) else: # Make the POST request response = self.authed_session.post(url, json=data) # Check the response if response.status_code == 200: response_data = response.json() if DEBUG: print("Response Data:", json.dumps(response_data, indent=2)) if "choices" in response_data and len(response_data["choices"]) > 0: # Extract the generated text generated_text = response_data["choices"][0]["message"]["content"] return generated_text else: if DEBUG: print("No choices found in the response.") print("Response Data:", json.dumps(response_data, indent=2)) return "No response generated by the model." else: if DEBUG: print(f"Error: {response.status_code}") print("Response Headers:", response.headers) print("Response Content:", response.text) return f"Error: {response.status_code} {response.text}" def _stream_response(self, url: str, data: dict) -> Iterator[str]: """Stream the response from the API.""" response = self.authed_session.post(url, json=data, stream=True) if response.status_code != 200: if DEBUG: print(f"Error: {response.status_code}") print("Response Headers:", response.headers) print("Response Content:", response.text) yield f"Error: {response.status_code} {response.text}" return for line in response.iter_lines(): if line: try: json_response = json.loads(line.decode("utf-8").lstrip("data: ")) if DEBUG: print("Streaming chunk:", json_response) if "choices" in json_response and len(json_response["choices"]) > 0: delta = json_response["choices"][0].get("delta", {}) if "content" in delta: yield delta["content"] except json.JSONDecodeError as e: if DEBUG: print(f"Error decoding JSON: {e}") continue except Exception as e: if DEBUG: print(f"Error processing stream: {e}") continue def call_claude_model(self, body, project_id, location, model_id): url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/anthropic/models/{model_id}:predict" stream = body.get("stream", False) # Prepare the prompt messages messages = body["messages"] formatted_messages = self.format_claude_messages(messages) # Prepare request data data = { "instances": [ { "messages": formatted_messages, "anthropic_version": "vertex-2023-10-16", } ], "parameters": { "temperature": body.get("temperature", 0.7), "max_output_tokens": body.get("max_tokens", 512), "top_p": body.get("top_p", 0.8), "top_k": body.get("top_k", 40), "stream": stream, }, } if DEBUG: print("Request URL:", url) print("Request Data:", json.dumps(data, indent=2)) if stream: return self._stream_claude_response(url, data) else: # Make the POST request response = self.authed_session.post(url, json=data) # Check the response if response.status_code == 200: response_data = response.json() if DEBUG: print("Response Data:", json.dumps(response_data, indent=2)) if ( "predictions" in response_data and len(response_data["predictions"]) > 0 ): generated_text = response_data["predictions"][0].get("content", "") return generated_text else: if DEBUG: print("No predictions found in the response.") print("Response Data:", json.dumps(response_data, indent=2)) return "No response generated by the model." else: if DEBUG: print(f"Error: {response.status_code}") print("Response Headers:", response.headers) print("Response Content:", response.text) return f"Error: {response.status_code} {response.text}" def _stream_claude_response(self, url: str, data: dict) -> Iterator[str]: """Stream the response from the Claude API.""" response = self.authed_session.post(url, json=data, stream=True) if response.status_code != 200: if DEBUG: print(f"Error: {response.status_code}") print("Response Headers:", response.headers) print("Response Content:", response.text) yield f"Error: {response.status_code} {response.text}" return for line in response.iter_lines(): if line: try: json_response = json.loads(line.decode("utf-8").lstrip("data: ")) if DEBUG: print("Streaming chunk:", json_response) if ( "predictions" in json_response and len(json_response["predictions"]) > 0 ): chunk = json_response["predictions"][0].get("content", "") if chunk: yield chunk except json.JSONDecodeError as e: if DEBUG: print(f"Error decoding JSON: {e}") continue except Exception as e: if DEBUG: print(f"Error processing stream: {e}") continue def format_messages(self, messages): formatted_messages = [] for msg in messages: role = msg.get("role") content = msg.get("content") if role == "user": formatted_messages.append({"role": "user", "content": content}) elif role == "assistant": formatted_messages.append({"role": "assistant", "content": content}) elif role == "system": formatted_messages.append({"role": "system", "content": content}) return formatted_messages def format_claude_messages(self, messages): # For Claude models, messages might need to be formatted differently formatted_messages = [] for msg in messages: role = msg.get("role") content = msg.get("content") formatted_messages.append({"role": role, "content": content}) return formatted_messages
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
0