Whitepaper
Docs
Sign In
Function
Function
pipe
v0.1.0
Browser-Use Pipe (Operator)
Function ID
operator
Creator
@mballesterosc
Downloads
308+
Browser-Use alternative to Operator openai for control your browser with AI Agents
Get
README
No README available
Function Code
Show
""" title: Browser Agent Pipe author: Manuel Ballesteros Cardador version: 0.1.0 requirements: playwright,langchain-openai,browser-use Visit https://github.com/open-webui/open-webui/discussions/10384 for error installing browser-use """ from typing import Optional, Callable, Awaitable, List, Dict from pydantic import BaseModel, Field import os import asyncio import time from dataclasses import dataclass from langchain_openai import ChatOpenAI from browser_use import Agent from browser_use.browser.browser import Browser, BrowserConfig from langchain_google_genai import ChatGoogleGenerativeAI @dataclass class ActionResult: is_done: bool extracted_content: Optional[str] error: Optional[str] include_in_memory: bool @dataclass class AgentHistoryList: all_results: List[ActionResult] all_model_outputs: List[Dict] class Pipe: class Valves(BaseModel): openai_api_key: str = Field( default="", description="OpenAI API key para el agente de navegador", ) model: str = Field( default="gpt-4o", description="Modelo a usar (ej: 'gpt-4o', 'gpt-4', 'gpt-3.5-turbo')", ) headless: bool = Field( default=True, description="Ejecutar navegador en modo headless", ) emit_interval: float = Field( default=2.0, description="Intervalo entre actualizaciones de estado en segundos", ) enable_status_indicator: bool = Field( default=True, description="Habilitar indicador de estado", ) def __init__(self): self.type = "pipe" self.id = "browser_agent" self.name = "Browser Agent" self.valves = self.Valves() self.last_emit_time = 0 self.current_task: Optional[asyncio.Task] = None async def emit_status( self, emitter: Callable[[dict], Awaitable[None]], level: str, message: str, done: bool, ): current_time = time.time() if ( emitter and self.valves.enable_status_indicator and ( current_time - self.last_emit_time >= self.valves.emit_interval or done ) ): await emitter( { "type": "status", "data": { "status": "complete" if done else "in_progress", "level": level, "description": message, "done": done, }, } ) self.last_emit_time = current_time async def run_browser_task(self, task: str) -> str: """Ejecuta la tarea en segundo plano con manejo de errores""" if not self.valves.openai_api_key.strip(): return "❌ Error: API key de OpenAI requerida" os.environ["OPENAI_API_KEY"] = self.valves.openai_api_key try: browser = Browser( config=BrowserConfig( headless=False, ) ) agent = Agent( task=task, llm=ChatGoogleGenerativeAI( model=self.valves.model, api_key=self.valves.openai_api_key ), # ChatOpenAI(model=self.valves.model), browser=browser, ) return await agent.run() await browser.close() except Exception as e: return f"❌ Error crítico: {str(e)}" def _parse_results(self, result) -> str: parsed = [] process_steps = [] final_result = None # Debug: Imprimir tipo de resultado print(f"Tipo de resultado recibido: {type(result)}") if isinstance(result, AgentHistoryList): print(f"Número de acciones en historial: {len(result.all_results)}") for i, action_result in enumerate(result.all_results, 1): # Debug: Imprimir metadatos de cada acción print(f"Acción {i}:") print(f"Done: {action_result.is_done}") print( f"Contenido: {action_result.extracted_content[:50] if action_result.extracted_content else None}" ) print(f"Error: {action_result.error}") # Capturar resultado final if action_result.is_done: final_result = action_result.extracted_content print(f"Resultado final detectado en acción {i}") # Construir pasos del proceso step_content = [] if action_result.extracted_content: step_content.append( f"**Paso {i}:** {action_result.extracted_content}" ) if action_result.error and action_result.error != "None": step_content.append(f"❌ _Error: {action_result.error}_") if step_content: process_steps.append("\n".join(step_content)) # Fallback para string representation else: str_result = str(result) print("Resultado en formato string:") print(str_result[:500]) # Primeros 500 caracteres para debug sections = str_result.split("ActionResult(") for i, section in enumerate(sections[1:], 1): # Extraer done is_done = "is_done=True" in section content = error = None if "extracted_content=" in section: content = ( section.split("extracted_content=")[1].split(",")[0].strip("'") ) if "error=" in section: error = section.split("error=")[1].split(",")[0].strip("'") if is_done: final_result = content print(f"Resultado final detectado en acción {i} (string fallback)") step_content = [] if content: step_content.append(f"**Paso {i}:** {content}") if error and error != "None": step_content.append(f"❌ _Error: {error}_") if step_content: process_steps.append("\n".join(step_content)) # Construir respuesta final output = [] if final_result: output.append(f"## 🔍 Resultado Final\n\n{final_result}") if process_steps: output.append( "\n\n## 📋 Detalles del Proceso\n\n" + "\n\n".join(process_steps) ) return ( "\n\n".join(output) if output else "✅ Tarea completada sin resultados visibles" ) async def pipe( self, body: dict, user: Optional[dict] = None, __event_emitter__: Callable[[dict], Awaitable[None]] = None, __event_call__: Callable[[dict], Awaitable[dict]] = None, ) -> Optional[dict]: messages = body.get("messages", []) if not messages: error = "No se encontraron mensajes" await self.emit_status(__event_emitter__, "error", error, True) body["messages"].append({"role": "assistant", "content": error}) return body try: user_message = messages[-1]["content"] # Debug: Inicio de proceso print(f"\n{'='*50}\nIniciando tarea con mensaje: {user_message}") print(f"Modelo: {self.valves.model}") print( f"API Key: {'configured' if self.valves.openai_api_key else 'missing'}" ) await self.emit_status( __event_emitter__, "info", "🚀 Iniciando navegador...", False, ) self.current_task = asyncio.create_task(self.run_browser_task(user_message)) raw_result = await self.current_task # Debug: Resultado crudo print("\nResultado crudo recibido:") print(f"Tipo: {type(raw_result)}") print(f"Contenido: {str(raw_result)[:500]}...") # Primeros 500 caracteres parsed_result = self._parse_results(raw_result) # Debug: Resultado parseado print("\nResultado parseado:") print(parsed_result[:1000]) # Primeros 1000 caracteres formatted_output = f"{parsed_result}" await self.emit_status( __event_emitter__, "success", "🏁 Tarea completada", True ) body["messages"].append({"role": "assistant", "content": formatted_output}) return formatted_output except Exception as e: error = f"## ❌ Error en el pipe\n\n{str(e)}" print(f"\nError en pipe: {str(e)}") await self.emit_status(__event_emitter__, "error", error, True) body["messages"].append({"role": "assistant", "content": error}) return body finally: self.current_task = None