We're Hiring!
Whitepaper
Docs
Sign In
@juriaan
·
10 months ago
·
10 months ago
function
Deepseek R1 Asynchronous Azure AI Foundry
Get
Last Updated
10 months ago
Created
10 months ago
Function
pipe
Name
Deepseek R1 Asynchronous Azure AI Foundry
Downloads
84+
Saves
0+
Description
Asynchronous request processing
Function Code
Show
""" Azure AI Foundry Parallel Pipeline for DeepSeek-R1 Version: 1.2.0 Updates: - Enhanced streaming performance - Smoother output formatting - Better connection management """ from typing import Union, AsyncGenerator, Optional from pydantic import BaseModel, Field import aiohttp import asyncio import os class Pipe: class Valves(BaseModel): AZURE_AI_API_KEY: str = Field( default=os.getenv("AZURE_AI_API_KEY", "API_KEY"), description="Azure AI API key", ) AZURE_AI_ENDPOINT: str = Field( default=os.getenv( "AZURE_AI_ENDPOINT", "https://<your-endpoint>.eastus2.models.ai.azure.com/chat/completions", ), description="Azure AI endpoint URL", ) MAX_CONCURRENT_REQUESTS: int = Field( default=os.getenv("MAX_CONCURRENT_REQUESTS", 10), description="Max concurrent requests", ) TIMEOUT: int = Field( default=os.getenv("TIMEOUT", 600), description="Request timeout in seconds" ) def __init__(self): self.valves = self.Valves() self.validate_environment() self.semaphore = asyncio.Semaphore(self.valves.MAX_CONCURRENT_REQUESTS) self.session: Optional[aiohttp.ClientSession] = None async def __aenter__(self): await self.create_session() return self async def __aexit__(self, *exc): await self.close_session() async def create_session(self): self.session = aiohttp.ClientSession( headers=self.get_headers(), timeout=aiohttp.ClientTimeout(total=self.valves.TIMEOUT), ) async def close_session(self): if self.session and not self.session.closed: await self.session.close() def validate_environment(self): if "models.ai.azure.com" not in self.valves.AZURE_AI_ENDPOINT: raise ValueError("Invalid endpoint URL format!") def get_headers(self) -> dict: return { "api-key": self.valves.AZURE_AI_API_KEY, "Content-Type": "application/json", "x-ms-model-mesh-model-name": "DeepSeek-R1", } async def process_request(self, filtered_body: dict) -> aiohttp.ClientResponse: async with self.semaphore: if not self.session or self.session.closed: await self.create_session() return await self.session.post( url=self.valves.AZURE_AI_ENDPOINT, json=filtered_body, ) async def pipe(self, body: dict) -> Union[dict, AsyncGenerator[str, None]]: self.validate_body(body) filtered_body = self.filter_parameters(body) try: response = await self.process_request(filtered_body) response.raise_for_status() if filtered_body.get("stream"): return self.stream_response(response) return await response.json() except aiohttp.ClientError as e: return {"error": f"Client error: {str(e)}"} except asyncio.TimeoutError: return {"error": "Request timed out"} async def stream_response( self, response: aiohttp.ClientResponse ) -> AsyncGenerator[str, None]: try: async for chunk in response.content.iter_chunked(8192): # 8KB chunks if not chunk: continue decoded = chunk.decode("utf-8").strip() if not decoded: continue # Split multiple data chunks in single read for data_chunk in decoded.split("data: "): if data_chunk.strip(): yield f"data: {data_chunk.strip()}\n\n" # Allow event loop to process other tasks await asyncio.sleep(0.001) except Exception as e: yield f"data: {{'error': 'Stream error: {str(e)}'}}\n\n" finally: await response.release() def filter_parameters(self, body: dict) -> dict: allowed_params = { "messages", "temperature", "top_p", "max_tokens", "stream", "tools", "tool_choice", "response_format", "frequency_penalty", "presence_penalty", } return {k: v for k, v in body.items() if k in allowed_params} def validate_body(self, body: dict): if not body.get("messages"): raise ValueError("Request body must contain 'messages' array") # Usage Example async def main(): async with Pipe() as pipe: response = await pipe.pipe( {"messages": [{"role": "user", "content": "Hello"}], "stream": True} ) if isinstance(response, AsyncGenerator): async for chunk in response: print(chunk, end="", flush=True) # Immediate output else: print(response) if __name__ == "__main__": asyncio.run(main())
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.