Whitepaper
Docs
Sign In
Function
Function
pipe
v1.1.0
Azure AI for DeepSeek-R1
Function ID
azure_ai_for_deepseek_r1
Creator
@owndev
Downloads
2.4K+
OBSOLETE: Please use: https://openwebui.com/f/owndev/azure_ai
Get
README
No README available
Function Code
Show
""" OBSOLETE # Please use: https://openwebui.com/f/owndev/azure_ai --- title: Azure AI Foundry Pipeline for DeepSeek-R1 author: owndev author_url: https://github.com/owndev project_url: https://github.com/owndev/Open-WebUI-Functions funding_url: https://github.com/owndev/Open-WebUI-Functions version: 1.1.0 license: MIT description: A pipeline for interacting with DeepSeek-R1 in Azure AI services. features: - Supports dynamic model specification via headers. - Filters valid parameters to ensure clean requests. - Handles streaming and non-streaming responses. - Provides flexible timeout and error handling mechanisms. - Compatible with Azure AI DeepSeek-R1. """ from typing import List, Union, Generator, Iterator, Optional, Dict, Any from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from starlette.background import BackgroundTask from open_webui.env import AIOHTTP_CLIENT_TIMEOUT, SRC_LOG_LEVELS import aiohttp import json import os import logging # Helper functions async def cleanup_response( response: Optional[aiohttp.ClientResponse], session: Optional[aiohttp.ClientSession], ) -> None: """ Clean up the response and session objects. Args: response: The ClientResponse object to close session: The ClientSession object to close """ if response: response.close() if session: await session.close() class Pipe: # Environment variables for API key, endpoint, and optional model class Valves(BaseModel): # API key for Azure AI AZURE_AI_API_KEY: str = Field( default=os.getenv("AZURE_AI_API_KEY", "API_KEY"), description="API key for Azure AI" ) # Endpoint for DeepSeek-R1 in Azure AI (e.g. "https://<your-endpoint>.eastus2.models.ai.azure.com/chat/completions") AZURE_AI_ENDPOINT: str = Field( default=os.getenv("AZURE_AI_ENDPOINT", "https://<your-endpoint>.eastus2.models.ai.azure.com/chat/completions"), description="Endpoint for DeepSeek-R1 in Azure AI" ) def __init__(self): self.valves = self.Valves() self.name: str = "Azure AI: DeepSeek-R1" self.validate_environment() def validate_environment(self): """ Validates that required environment variables are set. Raises: ValueError: If required environment variables are not set. """ if not self.valves.AZURE_AI_API_KEY: raise ValueError("AZURE_AI_API_KEY is not set!") if not self.valves.AZURE_AI_ENDPOINT: raise ValueError("AZURE_AI_ENDPOINT is not set!") def get_headers(self) -> dict: """ Constructs the headers for the API request, including the model name if defined. Returns: Dictionary containing the required headers for the API request. """ headers = { "Authorization": "Bearer " + self.valves.AZURE_AI_API_KEY, "Content-Type": "application/json" } return headers def validate_body(self, body: Dict[str, Any]) -> None: """ Validates the request body to ensure required fields are present. Args: body: The request body to validate Raises: ValueError: If required fields are missing or invalid. """ if "messages" not in body or not isinstance(body["messages"], list): raise ValueError("The 'messages' field is required and must be a list.") async def pipe(self, body: Dict[str, Any]) -> Union[str, Generator, Iterator, Dict[str, Any], StreamingResponse]: """ Main method for sending requests to the Azure AI endpoint. The model name is passed as a header if defined. Args: body: The request body containing messages and other parameters Returns: Response from Azure AI API, which could be a string, dictionary or streaming response """ log = logging.getLogger("azure_ai_deepseek_r1.pipe") log.setLevel(SRC_LOG_LEVELS["OPENAI"]) # Validate the request body self.validate_body(body) # Construct headers headers = self.get_headers() # Filter allowed parameters allowed_params = { "messages", "frequency_penalty", "max_tokens", "presence_penalty", "response_format", "seed", "stop", "stream", "temperature", "tool_choice", "tools", "top_p", } filtered_body = {k: v for k, v in body.items() if k in allowed_params} # Convert the modified body back to JSON payload = json.dumps(filtered_body) request = None session = None streaming = False response = None try: session = aiohttp.ClientSession( trust_env=True, timeout=aiohttp.ClientTimeout(total=AIOHTTP_CLIENT_TIMEOUT), ) request = await session.request( method="POST", url=self.valves.AZURE_AI_ENDPOINT, data=payload, headers=headers, ) # Check if response is SSE if "text/event-stream" in request.headers.get("Content-Type", ""): streaming = True return StreamingResponse( request.content, status_code=request.status, headers=dict(request.headers), background=BackgroundTask( cleanup_response, response=request, session=session ), ) else: try: response = await request.json() except Exception as e: log.error(f"Error parsing JSON response: {e}") response = await request.text() request.raise_for_status() return response except Exception as e: log.exception(f"Error in Azure AI request: {e}") detail = f"Exception: {str(e)}" if isinstance(response, dict): if "error" in response: detail = f"{response['error']['message'] if 'message' in response['error'] else response['error']}" elif isinstance(response, str): detail = response return f"Error: {detail}" finally: if not streaming and session: if request: request.close() await session.close()