Whitepaper
Docs
Sign In
Tool
Tool
v2.1.0
docker
Tool ID
docker
Creator
@sinaimhw
Downloads
127+
control docker via ssh
Get
README
No README available
Tool Code
Show
""" title: Remote Docker Management Tool author: Open WebUI Security Team version: 2.1.0 license: MIT description: Secure SSH-based Docker management with connection pooling and audit logging requirements: docker, paramiko """ import paramiko import logging import asyncio import os from typing import Optional, Callable from pydantic import BaseModel, Field # Configure logging logging.basicConfig(level=logging.INFO) class Tools: class Valves(BaseModel): SSH_HOST: str = Field( default=os.getenv("SSH_HOST", "108.181.152.141"), description="SSH server hostname or IP address", ) SSH_USER: str = Field( default=os.getenv("SSH_USER", "admin"), description="SSH username" ) SSH_PORT: int = Field(default=22, ge=1, le=65535, description="SSH port number") USE_SSH_KEY: bool = Field( default=os.getenv("USE_SSH_KEY", "false").lower() == "true", description="Use SSH key instead of password", ) CONTAINER_MANAGEMENT: bool = Field( default=True, description="Enable container operations" ) IMAGE_MANAGEMENT: bool = Field( default=True, description="Enable image operations" ) AUDIT_LOGGING: bool = Field(default=True, description="Enable command auditing") class UserValves(BaseModel): SSH_PASSWORD: Optional[str] = Field( default=os.getenv("SSH_PASSWORD", ""), description="SSH password (if not using SSH key)", ) SESSION_TIMEOUT: int = Field( default=300, description="SSH session timeout in seconds" ) SSH_KEY_PATH: Optional[str] = Field( default=os.getenv("SSH_KEY_PATH", "~/.ssh/id_rsa"), description="Path to SSH private key (if using SSH key authentication)", ) def __init__(self): self.valves = self.Valves() self.user_valves = self.UserValves() self.ssh_client = None self._connect_ssh() def _connect_ssh(self): """Establishes SSH connection with authentication pooling""" if ( not self.ssh_client or self.ssh_client.get_transport() is None or not self.ssh_client.get_transport().is_active() ): self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: if self.valves.USE_SSH_KEY: private_key = paramiko.RSAKey( filename=os.path.expanduser(self.user_valves.SSH_KEY_PATH) ) self.ssh_client.connect( hostname=self.valves.SSH_HOST, port=self.valves.SSH_PORT, username=self.valves.SSH_USER, pkey=private_key, timeout=10, ) else: self.ssh_client.connect( hostname=self.valves.SSH_HOST, port=self.valves.SSH_PORT, username=self.valves.SSH_USER, password=self.user_valves.SSH_PASSWORD, timeout=10, ) logging.info("SSH connection established successfully") except Exception as e: logging.error(f"SSH connection failed: {str(e)}") raise def _execute_remote(self, command: str) -> str: """Execute a remote command securely""" if self.valves.AUDIT_LOGGING: logging.info(f"Executing remote command: {command}") try: stdin, stdout, stderr = self.ssh_client.exec_command(command) exit_status = stdout.channel.recv_exit_status() output = stdout.read().decode().strip() errors = stderr.read().decode().strip() if exit_status != 0: return f"Error ({exit_status}): {errors}" return output or "Command executed successfully" except Exception as e: return f"Execution error: {str(e)}" async def _docker_command( self, operation: str, target: str = "", __event_emitter__: Optional[Callable] = None, ) -> str: """Executes Docker commands securely over SSH""" valid_operations = { "list-containers": "docker ps -a", "start": f"docker start {target}", "stop": f"docker stop {target}", "remove-container": f"docker rm {target}", "pull-image": f"docker pull {target}", "remove-image": f"docker rmi {target}", } if operation not in valid_operations: return "❌ Invalid Docker operation" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Executing {operation}...", "done": False}, } ) result = self._execute_remote(valid_operations[operation]) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Completed {operation}", "done": True}, } ) return result # Container Operations async def list_containers( self, __event_emitter__: Optional[Callable] = None ) -> str: return await self._docker_command( "list-containers", __event_emitter__=__event_emitter__ ) async def start_container( self, container_id: str, __event_emitter__: Optional[Callable] = None ) -> str: return await self._docker_command("start", container_id, __event_emitter__) async def stop_container( self, container_id: str, __event_emitter__: Optional[Callable] = None ) -> str: return await self._docker_command("stop", container_id, __event_emitter__) async def remove_container( self, container_id: str, __event_emitter__: Optional[Callable] = None ) -> str: return await self._docker_command( "remove-container", container_id, __event_emitter__ ) # Image Operations async def pull_image( self, image_name: str, __event_emitter__: Optional[Callable] = None ) -> str: return await self._docker_command("pull-image", image_name, __event_emitter__) async def remove_image( self, image_name: str, __event_emitter__: Optional[Callable] = None ) -> str: return await self._docker_command("remove-image", image_name, __event_emitter__) def __del__(self): """Cleans up SSH connection""" if self.ssh_client: self.ssh_client.close() logging.info("SSH connection closed") # Example Usage if __name__ == "__main__": async def event_logger(event: dict): logging.info(f"Event: {event}") async def main(): tool = Tools() try: # List containers print(await tool.list_containers(event_logger)) except Exception as e: logging.error(f"Operation failed: {str(e)}") import asyncio asyncio.run(main())