"""
id: run_code
title: Run code
description: Run arbitrary Python or Bash code safely in a gVisor sandbox.
author: EtiennePerot
author_url: https://github.com/EtiennePerot/open-webui-code-execution
funding_url: https://github.com/EtiennePerot/open-webui-code-execution
version: 0.5.0
license: Apache-2.0
"""
# NOTE: If running Open WebUI in a container, you *need* to set up this container to allow sandboxed code execution.
# Please read the docs here:
#
# https://github.com/EtiennePerot/open-webui-code-execution/blob/master/README.md
#
# This is an OpenWebUI *function*. It can run code within LLM-generated code blocks.
# If you are looking for an OpenWebUI *tool* to allow the LLM to run its own code,
# see here instead: https://openwebui.com/t/etienneperot/run_code/
#
# See https://github.com/EtiennePerot/open-webui-code-execution for more info.
#
# Protip: You can test this tool manually by running it as a Python script, like so:
# (Run this inside the Open WebUI container)
#
# python3 run_code.py --self_test
#
# This will simulate that OpenWebUI would do if it asked this tool to evaluate the Python code `print("Hello world!")`.
# This can be useful when setting up this tool to verify that it works in your environment.
# You can also use it for one-off code execution like this:
#
# echo 'print("Hello world!")' | python3 run_code.py
#
import asyncio
import argparse
import base64
import contextlib
import copy
import fcntl
import json
import hashlib
import inspect
import mimetypes
import os
import os.path
import platform
import pydantic
import re
import shutil
import stat
import subprocess
import sys
import tempfile
import time
import typing
import urllib.parse
import urllib.request
import uuid
class _Action:
class Valves(pydantic.BaseModel):
_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX = "CODE_EVAL_VALVE_OVERRIDE_"
NETWORKING_ALLOWED: bool = pydantic.Field(
default=True,
description=f"Whether to allow network access during code execution; may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}NETWORKING_ALLOWED.",
)
MAX_RUNTIME_SECONDS: int = pydantic.Field(
ge=1,
default=30,
description=f"Maximum number of seconds code is given to run; may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}MAX_RUNTIME_SECONDS.",
)
MAX_RAM_MEGABYTES: int = pydantic.Field(
ge=0,
default=128,
description=f"Maximum number of megabytes that the interpreter has when running. Must run as root with host cgroups writable (`--mount=type=bind,source=/sys/fs/cgroup,target=/sys/fs/cgroup,readonly=false`) for this to work. Set to 0 to disable memory limits. May be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}MAX_RAM_MEGABYTES",
)
AUTO_INSTALL: bool = pydantic.Field(
default=True,
description=f"Whether to automatically install gVisor if not installed on the system; may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}AUTO_INSTALL.",
)
DEBUG: bool = pydantic.Field(
default=False,
description=f"Whether to produce debug logs during execution; may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}DEBUG.",
)
MAX_FILES_PER_EXECUTION: int = pydantic.Field(
ge=1,
default=32,
description=f"Maximum number of generated files to allow per code execution; may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}MAX_FILES_PER_EXECUTION.",
)
MAX_FILES_PER_USER: int = pydantic.Field(
ge=1,
default=4096,
description=f"Maximum number of files to keep around for a given user; may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}MAX_FILES_PER_USER.",
)
MAX_MEGABYTES_PER_USER: int = pydantic.Field(
ge=1,
default=256,
description=f"Maximum total size of files to keep around for a given user; may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}MAX_MEGABYTES_PER_USER.",
)
WEB_ACCESSIBLE_DIRECTORY_PATH: str = pydantic.Field(
default="/data/cache/functions/run_code",
description=f"Path of the directory to write files that should be accessible for user download in; may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}WEB_ACCESSIBLE_DIRECTORY_PATH.",
)
WEB_ACCESSIBLE_DIRECTORY_URL: str = pydantic.Field(
default="/cache/functions/run_code",
description=f"URL corresponding to WEB_ACCESSIBLE_DIRECTORY_PATH. May start with '/' to make it relative to the Open WebUI serving domain. may be overridden by environment variable {_VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}WEB_ACCESSIBLE_DIRECTORY_URL.",
)
def __init__(self, valves):
self.valves = valves
for valve_name, valve_value in valves.dict().items():
override = os.getenv(
self.valves._VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX
+ valve_name
)
if override is None:
continue
try:
if type(valve_value) is type(True):
assert override.lower() in (
"true",
"false",
), 'Value must be "true" or "false"'
override = override.lower() == "true"
elif type(valve_value) is type(42):
override = int(override)
elif type(valve_value) is type(""):
pass
else:
valve_value_type = type(valve_value)
raise ValueError(f"Unknown valve type: {valve_value_type}")
except Exception as e:
raise ValueError(
f"Valve override {self.valves._VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX}{valve_name}={valve_value}: bad value: {e}"
)
else:
setattr(self.valves, valve_name, override)
async def action(
self,
body: dict,
__event_emitter__: typing.Callable[[dict], typing.Any] = None,
__id__: typing.Optional[str] = None,
__user__: typing.Optional[dict] = None,
) -> typing.Optional[dict]:
valves = self.valves
debug = valves.DEBUG
emitter = self._EventEmitter(__event_emitter__, debug=debug)
storage = self._UserStorage(
storage_root_path=os.path.join(
valves.WEB_ACCESSIBLE_DIRECTORY_PATH, "user_files"
),
storage_root_url=valves.WEB_ACCESSIBLE_DIRECTORY_URL.rstrip("/")
+ "/user_files",
__user__=__user__,
max_files_per_user=valves.MAX_FILES_PER_USER,
max_bytes_per_user=valves.MAX_MEGABYTES_PER_USER * 1024 * 1024,
)
async def _fail(error_message, status="SANDBOX_ERROR"):
await emitter.fail(error_message)
await emitter.code_execution_result(f"{status}: {error_message}")
return json.dumps({"status": status, "output": error_message})
await emitter.status("Checking messages for code blocks...")
if len(body.get("messages", ())) == 0:
return await _fail("No messages in conversation.", status="INVALID_INPUT")
last_message = body["messages"][-1]
if last_message["role"] != "assistant":
return await _fail(
"Last message was not from the AI model.", status="INVALID_INPUT"
)
split_three_backticks = last_message["content"].split("```")
if len(split_three_backticks) < 3:
return await _fail(
"Last message did not contain code blocks.", status="INVALID_INPUT"
)
if len(split_three_backticks) % 2 != 1:
return await _fail(
"Last message did not contain well-formed code blocks.",
status="INVALID_INPUT",
)
chosen_code_block = None
language = None
for code_block in split_three_backticks[-2:0:-2]:
if code_block.startswith("python\n") or code_block.startswith("python3\n"):
chosen_code_block = code_block
language = Sandbox.LANGUAGE_PYTHON
if (
code_block.startswith("bash\n")
or code_block.startswith("sh\n")
or code_block.startswith("shell\n")
):
chosen_code_block = code_block
language = Sandbox.LANGUAGE_BASH
break
if chosen_code_block is None:
# Try to see if the last code block looks like Python or bash.
last_code_block = split_three_backticks[-2]
# Look for an interpreter line.
first_line = last_code_block.strip().split("\n")[0]
if first_line.startswith("#!") and (
first_line.endswith("python") or first_line.endswith("python3")
):
chosen_code_block = code_block
language = Sandbox.LANGUAGE_PYTHON
elif first_line.startswith("#!") and first_line.endswith("sh"):
chosen_code_block = code_block
language = Sandbox.LANGUAGE_BASH
elif any(
python_like in last_code_block
for python_like in ("import ", "print(", "print ")
):
chosen_code_block = code_block
language = Sandbox.LANGUAGE_PYTHON
elif any(
bash_like in last_code_block
for bash_like in ("echo ", "if [", "; do", "esac\n")
):
chosen_code_block = code_block
language = Sandbox.LANGUAGE_BASH
if chosen_code_block is None:
return await _fail(
"Message does not contain code blocks detected as Python or Bash."
)
try:
max_ram_bytes = None
if self.valves.MAX_RAM_MEGABYTES != 0:
max_ram_bytes = self.valves.MAX_RAM_MEGABYTES * 1024 * 1024
await emitter.status("Checking if environment supports sandboxing...")
Sandbox.check_setup(
language=language,
auto_install_allowed=self.valves.AUTO_INSTALL,
)
if self.valves.AUTO_INSTALL and Sandbox.runsc_needs_installation():
await emitter.status("Auto-installing gVisor...")
Sandbox.install_runsc()
await emitter.status("Initializing sandbox configuration...")
status = "UNKNOWN"
output = None
generated_files = []
language_title = language.title()
# If the provided code starts/ends with "```SOME_LANGUAGE", remove that.
code = chosen_code_block
if language == Sandbox.LANGUAGE_PYTHON:
code = code.removeprefix("python3")
code = code.removeprefix("python")
elif language == Sandbox.LANGUAGE_BASH:
code = code.removeprefix("shell")
code = code.removeprefix("bash")
code = code.removeprefix("sh")
code = code.strip()
with tempfile.TemporaryDirectory(prefix="sandbox_") as tmp_dir:
sandbox_storage_path = os.path.join(tmp_dir, "storage")
os.makedirs(sandbox_storage_path, mode=0o777)
sandbox = Sandbox(
tmp_dir=tmp_dir,
language=language,
code=code,
debug=debug,
networking_allowed=valves.NETWORKING_ALLOWED,
max_runtime_seconds=valves.MAX_RUNTIME_SECONDS,
max_ram_bytes=max_ram_bytes,
persistent_home_dir=sandbox_storage_path,
)
await emitter.status(
f"Running {language_title} code in gVisor sandbox..."
)
try:
result = sandbox.run()
except Sandbox.ExecutionTimeoutError as e:
await emitter.fail(
f"Code timed out after {valves.MAX_RUNTIME_SECONDS} seconds"
)
status = "TIMEOUT"
output = e.stderr
except Sandbox.InterruptedExecutionError as e:
await emitter.fail(f"Code used too many resources")
status = "INTERRUPTED"
output = e.stderr
except Sandbox.CodeExecutionError as e:
await emitter.fail(f"{language_title}: {e}")
status = "ERROR"
output = e.stderr
else:
status = "OK"
output = result.stdout or result.stderr
output_num_files, _ = storage.measure_directory(
sandbox_storage_path
)
if output_num_files > valves.MAX_FILES_PER_EXECUTION:
output = f"Code produced {output_num_files} files, exceeding per-execution quota of {valves.MAX_FILES_PER_EXECUTION}"
await emitter.fail(output)
status = "STORAGE_ERROR"
elif output_num_files > 0:
try:
with storage:
generated_files = storage.copy(
__id__=__id__,
intake_path=sandbox_storage_path,
)
except self._UserStorage.OutOfStorageException as e:
status = "STORAGE_ERROR"
output = f"Storage quota exceeded: {e}"
await emitter.fail(output)
if status == "OK":
await emitter.status(
status="complete",
done=True,
description=f"{language_title} code executed successfully.",
)
if output:
output = output.strip()
if debug:
def _log(filename: str, log_line: str):
print(f"[{filename}] {log_line}", file=sys.stderr)
sandbox.debug_logs(_log)
await emitter.code_execution_result(output)
if status == "OK":
generated_files_output = ""
if len(generated_files) > 0:
generated_files_output = "* " + "\n* ".join(
f.markdown()
for f in sorted(generated_files, key=lambda f: f.name)
)
if output and len(generated_files) > 0:
await emitter.message(
f"\n\n---\nI executed this {language_title} code and got:\n```Output\n{output}\n```\n**Files**:\n{generated_files_output}"
)
elif output and len(generated_files) == 0:
await emitter.message(
f"\n\n---\nI executed this {language_title} code and got:\n```Output\n{output}\n```"
)
elif len(generated_files) > 0:
await emitter.message(
f"\n\n---\nI executed this {language_title} code and it generated these files:\n{generated_files_output}"
)
else:
await emitter.message(
f"\n\n---\nI executed this {language_title} code and it ran successfully with no output."
)
return json.dumps(
{
"status": status,
"output": output,
"generated_files": {
f.name: f.markdown() for f in generated_files
},
}
)
if status == "TIMEOUT":
if output:
await emitter.message(
f"\n\n---\nI executed this {language_title} code and it timed out after {self.valves.MAX_RUNTIME_SECONDS} seconds:\n```Error\n{output}\n```\n"
)
else:
await emitter.message(
f"\n\n---\nI executed this {language_title} code and it timed out after {self.valves.MAX_RUNTIME_SECONDS} seconds.\n"
)
if status == "ERROR" and output:
await emitter.message(
f"\n\n---\nI executed this {language_title} code and got the following error:\n```Error\n{output}\n```\n"
)
else:
await emitter.message(
f"\n\n---\nI executed this {language_title} code but got an unexplained error.\n"
)
return json.dumps({"status": status, "output": output})
except Sandbox.PlatformNotSupportedException as e:
return await _fail(f"Sandbox cannot run on this machine: {e}")
except Sandbox.SandboxRuntimeException as e:
return await _fail(f"Sandbox runtime failed: {e}")
except Sandbox.FixableException as e:
return await _fail(f"Environment needs setup work: {e}")
except Sandbox.SandboxException as e:
return await _fail(f"Sandbox exception: {e}")
except Exception as e:
return await _fail(f"Unhandled exception: {e}")
class _EventEmitter:
"""
Helper wrapper for event emissions.
"""
def __init__(
self,
event_emitter: typing.Callable[[dict], typing.Any] = None,
debug: bool = False,
):
self.event_emitter = event_emitter
self._debug = debug
async def _emit(self, typ, data):
if self._debug:
print(f"Emitting {typ} event: {data}", file=sys.stderr)
if not self.event_emitter:
return None
maybe_future = self.event_emitter(
{
"type": typ,
"data": data,
}
)
if asyncio.isfuture(maybe_future) or inspect.isawaitable(maybe_future):
return await maybe_future
async def status(
self, description="Unknown state", status="in_progress", done=False
):
await self._emit(
"status",
{
"status": status,
"description": description,
"done": done,
},
)
if not done and len(description) <= 1024:
# Emit it again; Open WebUI does not seem to flush this reliably.
# Only do it for relatively small statuses; when debug mode is enabled,
# this can take up a lot of space.
await self._emit(
"status",
{
"status": status,
"description": description,
"done": done,
},
)
async def fail(self, description="Unknown error"):
await self.status(description=description, status="error", done=True)
async def code_execution_result(self, output):
await self._emit(
"code_execution_result",
{
"output": output,
},
)
async def message(self, content):
await self._emit(
"message",
{
"content": content,
},
)
class _UserFile:
MAX_INLINE_URL_SIZE = 0
MAX_INLINE_TEXT_LINES = 128
MAX_INLINE_TEXT_BYTES = 65535
def __init__(self, file_path, file_relative_path, file_url, file_size):
self._file_path = file_path
self._file_relative_path = file_relative_path
self._file_url = file_url
self._size_bytes = file_size
mimetypes.init()
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type, _ = mimetypes.guess_type(file_url)
if mime_type is None:
# Check if the file is valid UTF-8 text.
is_utf8 = True
is_empty = True
with open(self._file_path, "rb") as f:
for line in f:
is_empty = is_empty or len(line) > 0
try:
line.decode("utf-8")
except UnicodeDecodeError:
is_utf8 = False
mime_type = (
"text/plain"
if (is_utf8 and not is_empty)
else "application/octet-stream"
)
self._mime_type = mime_type
self._cached_markdown = None
@property
def name(self):
return self._file_relative_path
@property
def url(self):
if self._size_bytes > 0 and self._size_bytes <= self.MAX_INLINE_URL_SIZE:
# Try to use an inline URL.
with open(self._file_path, "rb") as f:
contents = f.read()
inline_url = f"data:{self._mime_type}," + urllib.parse.quote_from_bytes(
contents
)
inline_base64 = (
f"data:{self._mime_type};base64,"
+ base64.standard_b64encode(contents).decode("ascii")
)
shortest_url = (
inline_url
if len(inline_url) < len(inline_base64)
else inline_base64
)
if len(shortest_url) <= self.MAX_INLINE_URL_SIZE:
return shortest_url
return self._file_url
def _inline_markdown(self):
"""Render the file as inline text or markdown if small enough; otherwise return None."""
if self._size_bytes > self.MAX_INLINE_TEXT_BYTES:
return None
if not self._mime_type.startswith("text/"):
return None
with open(self._file_path, "rb") as f:
try:
contents = f.read().decode("utf-8")
except UnicodeDecodeError:
return None
lines = contents.split("\n")
if len(lines) > self.MAX_INLINE_TEXT_LINES:
return None
if self._mime_type != "text/markdown":
if "```" in contents:
return None
if contents and contents[-1] == "\n":
contents = contents[:-1]
return f"📄 [{self.name}]({self.url}):\n```\n{contents}\n```"
components = [f"📃 [{self.name}]({self.url}):"]
for line in lines:
components.append(f"> {line}")
if components[-1] == "> ":
components = components[:-1]
return "\n".join(components)
def _markdown(self):
if self._size_bytes == 0:
return f"⁉️ `{self.name}` (empty)"
inline_markdown = self._inline_markdown()
if inline_markdown is not None:
return inline_markdown
icon = "💾"
if self._mime_type.startswith("text/"):
icon = "📄"
elif self._mime_type.startswith("image/"):
icon = "🖼️"
elif self._mime_type.startswith("audio/"):
icon = "🎵"
elif self._mime_type.startswith("video/"):
icon = "🎬"
size = f"{self._size_bytes} bytes"
if self._size_bytes > 1024 * 1024 * 1024:
size = f"{self._size_bytes // 1024 // 1024 // 1024} GiB"
elif self._size_bytes > 1024 * 1024:
size = f"{self._size_bytes // 1024 // 1024} MiB"
elif self._size_bytes > 1024:
size = f"{self._size_bytes // 1024} KiB"
return f"{icon} [{self.name}]({self.url}) ({size})"
def markdown(self):
if self._cached_markdown is None:
self._cached_markdown = self._markdown()
return self._cached_markdown
class _UserStorage:
class StorageException(Exception):
"""Base class for storage-related exceptions."""
class OutOfStorageException(Exception):
"""Not enough files or bytes quota."""
class EnvironmentNeedsSetupException(Exception):
"""Storage is badly configured."""
# Number of zeroes to use in nonces (in per-day storage directories).
# This effectively limits the number of nonces usable in a given day.
NUM_NONCE_ZEROS = 4
# Free space buffer to keep free on the underlying storage device,
# rather than allowing user storage to fill it to the brim.
MUST_KEEP_FREE_MARGIN_MEGABYTES = 512
@classmethod
def measure_directory(cls, path, predicate=None):
"""
Measure storage cost of a directory.
:param path: Path to the directory to measure.
:param predicate: Optional predicate to filter files and directories, called with absolute paths.
:return: 2-tuple `(total_files, total_bytes)`. Note that `total_files` counts the number of non-root directories as well, and `total_bytes` also includes storage necessary to store filenames and directory names.
"""
path = os.path.normpath(os.path.abspath(path))
total_files = 0
total_bytes = 0
try:
for dirpath, subdirs, subfiles in os.walk(
path, onerror=None, followlinks=False
):
dirpath = os.path.normpath(os.path.abspath(dirpath))
for subdir in subdirs:
if predicate is None or predicate(
os.path.join(dirpath, subdir)
):
total_files += 1
total_bytes += len(subdir)
for subfile in subfiles:
subfile_path = os.path.join(dirpath, subfile)
if predicate is not None and not predicate(subfile_path):
continue
try:
subfile_stat = os.stat(subfile_path, follow_symlinks=False)
except FileNotFoundError:
continue # Likely raced with another execution.
if not stat.S_ISREG(subfile_stat.st_mode):
continue # Ignore non-regular files.
total_files += 1
total_bytes += len(subfile)
total_bytes += subfile_stat.st_size
except OSError as e:
raise cls.EnvironmentNeedsSetupException(
f"Failed to explore directory {path} (please adjust permissions): {e}"
)
return total_files, total_bytes
def __init__(
self,
storage_root_path,
storage_root_url,
__user__: typing.Optional[dict] = None,
max_files_per_user=None,
max_bytes_per_user=None,
):
self._storage_root_path = os.path.normpath(
os.path.abspath(storage_root_path)
)
self._storage_root_url = storage_root_url.rstrip("/")
self._date = time.strftime("%Y/%m/%d")
self._max_files_per_user = max_files_per_user
self._max_bytes_per_user = max_bytes_per_user
self._user = f"anon_{self._date}"
if __user__ is not None:
if type(__user__) is type({}):
self._user = str(
"|".join(
f"{k}={v}"
for k, v in sorted(__user__.items(), key=lambda x: x[0])
)
)
else:
self._user = str(__user__)
user_hash = hashlib.sha512()
user_hash.update(self._user.encode("utf-8"))
self._user_hash = (
base64.b32encode(user_hash.digest()).decode("ascii").lower()[:12]
)
self._user_path = os.path.join(storage_root_path, self._user_hash)
self._lock_fd = None
def __enter__(self):
assert self._lock_fd is None
os.makedirs(self._user_path, mode=0o755, exist_ok=True)
lock_fd = os.open(
os.path.join(self._user_path, ".lock"),
os.O_RDWR | os.O_CREAT | os.O_TRUNC,
)
deadline = time.time() + 10
last_exception = None
while time.time() < deadline:
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError) as e:
last_exception = e
time.sleep(0.01)
else:
self._lock_fd = lock_fd
break
if self._lock_fd is None:
os.close(lock_fd)
raise self.StorageException(
f"Cannot lock storage directory (too many concurrent code executions?) {last_exception}"
)
def __exit__(self, *args, **kwargs):
assert self._lock_fd is not None
fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
os.close(self._lock_fd)
self._lock_fd = None
def _is_user_file(self, path):
"""Used as predicate when measuring user storage directories."""
assert path.startswith(self._user_path + os.sep)
path = path[len(self._user_path) + len(os.sep) :]
# User files are under:
# YYYY/MM/DD/NONCE/HASH (5 levels of nesting).
# So a real user file has at least 6 components, which means it
# must have at least 5 slashes.
return path.count(os.sep) >= 5
def copy(self, __id__, intake_path):
"""
Copy a directory to user storage.
Ensure that the user storage has room for the given number of files totaling the given number of bytes.
If this is feasible by deleting previous user files, it will do so.
Must be done while holding the lock.
:param __id__: Chat or action ID.
:param intake_path: Path to a directory that should be copied to the user storage.
:raises OutOfStorageException: If there is not enough available file or bytes quota.
:return: A list of `_UserFile`s from each file copied from `intake_path`.
"""
assert (
self._lock_fd is not None
), "Cannot perform this operation without holding the lock"
want_num_files, want_num_bytes = self.measure_directory(intake_path)
if want_num_files == 0:
return () # Nothing to copy.
if self._max_files_per_user <= want_num_files:
raise self.OutOfStorageException(
f"Cannot allocate storage for {want_num_files} files; maximum is {self._max_files_per_user} files per user"
)
if self._max_bytes_per_user <= want_num_bytes:
raise self.OutOfStorageException(
f"Cannot allocate storage for {want_num_bytes} bytes; maximum is {self._max_bytes_per_user} bytes per user"
)
disk_usage_free = shutil.disk_usage(self._user_path).free
if (
disk_usage_free
<= want_num_bytes + self.MUST_KEEP_FREE_MARGIN_MEGABYTES * 1024 * 1024
):
raise self.OutOfStorageException(
f"Not enough free disk space for {want_num_bytes} bytes; current free space is {disk_usage_free} bytes and must keep at least {MUST_KEEP_FREE_MARGIN_MEGABYTES} megabytes free"
)
user_root_num_files, user_root_num_bytes = self.measure_directory(
self._user_path,
predicate=self._is_user_file,
)
user_root_remaining_files = self._max_files_per_user - user_root_num_files
user_root_remaining_bytes = self._max_bytes_per_user - user_root_num_bytes
while (
user_root_remaining_files < want_num_files
or user_root_remaining_bytes < want_num_bytes
):
oldest_directory = None
try:
oldest_yyyy = next(
iter(
sorted(
f
for f in os.listdir(self._user_path)
if len(f) >= 4 and f.isdigit()
)
)
)
oldest_mm = next(
iter(
sorted(
f
for f in os.listdir(
os.path.join(self._user_path, oldest_yyyy)
)
if len(f) == 2 and f.isdigit()
)
)
)
oldest_dd = next(
iter(
sorted(
f
for f in os.listdir(
os.path.join(
self._user_path, oldest_yyyy, oldest_mm
)
)
if len(f) == 2 and f.isdigit()
)
)
)
oldest_nonce = next(
iter(
sorted(
f
for f in os.listdir(
os.path.join(
self._user_path,
oldest_yyyy,
oldest_mm,
oldest_dd,
)
)
if len(f) == self.NUM_NONCE_ZEROS and f.isdigit()
)
)
)
oldest_directory = os.path.join(
self._user_path, oldest_yyyy, oldest_mm, oldest_dd, oldest_nonce
)
except StopIteration:
raise self.OutOfStorageException(
f"Cannot find directory to clear in order to make enough room for new user storage ({want_num_files} files, {want_num_bytes} bytes)"
)
assert oldest_directory is not None
if not shutil.rmtree.avoids_symlink_attacks:
raise self.EnvironmentNeedsSetupException(
"Only supported on platforms with symlink-attack-resistant rmtree implementations"
)
shutil.rmtree(oldest_directory)
for parent_directory in (
os.path.join(self._user_path, oldest_yyyy, oldest_mm, oldest_dd),
os.path.join(self._user_path, oldest_yyyy, oldest_mm),
os.path.join(self._user_path, oldest_yyyy),
):
if len(os.listdir(parent_directory)) == 0:
os.rmdir(parent_directory)
user_root_num_files, user_root_num_bytes = self.measure_directory(
self._user_path,
predicate=self._is_user_file,
)
user_root_remaining_files = (
self._max_files_per_user - user_root_num_files
)
user_root_remaining_bytes = (
self._max_bytes_per_user - user_root_num_bytes
)
# We now have enough. Find new directory name.
path_with_counter = None
max_nonce = 10**self.NUM_NONCE_ZEROS - 1
for nonce in range(
1, min(self._max_files_per_user or max_nonce, max_nonce)
):
path_with_counter = os.path.join(
self._user_path, self._date, str(nonce).zfill(self.NUM_NONCE_ZEROS)
)
try:
os.makedirs(path_with_counter, mode=0o755, exist_ok=False)
except FileExistsError:
pass
else:
break
if path_with_counter is None:
raise self.OutOfStorageException("No free storage directory available!")
id_str = str(__id__) if __id__ is not None else self._date
id_hash = hashlib.sha512()
id_hash.update(self._user.encode("utf-8"))
id_hash.update(b"||||")
id_hash.update(self._date.encode("utf-8"))
id_hash.update(b"||||")
id_hash.update(path_with_counter.encode("utf-8"))
id_hash.update(b"||||")
id_hash.update(str(uuid.uuid4()).encode("utf-8"))
id_hash.update(b"||||")
id_hash.update(id_str.encode("utf-8"))
id_hash_component = (
base64.b32encode(id_hash.digest()).decode("ascii").lower()[:12]
)
final_path = os.path.normpath(
os.path.abspath(os.path.join(path_with_counter, id_hash_component))
)
# Now do the copy.
# This doesn't use `shutil.copytree` because we explicitly avoid copying anything but regular files.
user_files = []
for dirpath, subdirs, subfiles in os.walk(
intake_path, onerror=None, followlinks=False
):
dirpath = os.path.normpath(os.path.abspath(dirpath))
relative_dirpath = None
if dirpath == intake_path:
relative_dirpath = "."
elif dirpath.startswith(intake_path + os.sep):
relative_dirpath = dirpath[len(intake_path) + len(os.sep) :]
else:
assert (
False
), f"Bad traversal: expected all paths to starts with {intake_path} but got path that does not: {dirpath}"
assert relative_dirpath is not None
assert not os.path.isabs(relative_dirpath)
copy_dirpath = os.path.join(final_path, relative_dirpath)
os.makedirs(copy_dirpath, mode=0o755, exist_ok=True)
for subfile in subfiles:
subfile_path = os.path.join(dirpath, subfile)
subfile_relative_path = os.path.normpath(
os.path.join(relative_dirpath, subfile)
)
assert not os.path.isabs(subfile_relative_path)
subfile_stat = os.stat(subfile_path, follow_symlinks=False)
if not stat.S_ISREG(subfile_stat.st_mode):
continue # Ignore non-regular files.
subfile_copy = os.path.join(copy_dirpath, subfile)
assert subfile_copy.startswith(self._storage_root_path + os.sep)
subfile_url = f"{self._storage_root_url}/" + urllib.parse.quote(
subfile_copy[len(self._storage_root_path) + len(os.sep) :],
safe=os.sep,
)
shutil.move(subfile_path, subfile_copy, copy_function=shutil.copy)
user_files.append(
_Action._UserFile(
file_path=subfile_copy,
file_relative_path=subfile_relative_path,
file_url=subfile_url,
file_size=subfile_stat.st_size,
)
)
# We are done.
return user_files
class Action:
Valves = _Action.Valves
def __init__(self):
self.valves = self.Valves()
async def action(
self,
body: dict,
__event_emitter__: typing.Callable[[dict], typing.Any] = None,
) -> typing.Optional[dict]:
return await _Action(self.valves).action(
body=body, __event_emitter__=__event_emitter__
)
class Sandbox:
"""
Sandbox manages a gVisor sandbox's lifecycle.
"""
# Set of supported programming languages.
LANGUAGE_PYTHON = "python"
LANGUAGE_BASH = "bash"
SUPPORTED_LANGUAGES = [LANGUAGE_PYTHON, LANGUAGE_BASH]
# The following directories will be exposed as read-only to the
# sandboxed environment. This must contain at least the necessary
# files and libraries necessary to run the code interpreter.
# Subdirectories of these directories may be hidden by adding them
# to the `EMPTY_READ_ONLY_DIRECTORIES` or `EMPTY_WRITABLE_DIRECTORIES`
# lists below.
EXPOSED_SYSTEM_DIRECTORIES = [
"/bin",
"/etc/alternatives",
"/etc/ssl/certs",
"/lib",
"/lib32",
"/lib64",
"/opt",
"/sbin",
"/usr",
"/var/lib",
]
# The following files will be exposed as read-only to the sandboxed
# environment. This should contain the set of files necessary by the
# code interpreter to function correctly, e.g. `/etc/resolv.conf`
# is necessary to properly resolve hosts through DNS.
EXPOSED_SYSTEM_FILES = [
"/etc/hosts",
"/etc/localtime",
"/etc/mime.types",
"/etc/nsswitch.conf",
"/etc/os-release",
"/etc/resolv.conf",
"/etc/shells",
]
# The following directories will exist in the sandbox environment but
# will appear as empty and read-only.
# This is useful to have a filesystem that feels like a normal Linux
# environment without actually revealing these directories to the
# sandbox.
EMPTY_READ_ONLY_DIRECTORIES = [
"/etc",
"/home",
"/lost+found",
"/root",
"/run",
"/run/user",
"/sys",
"/var",
]
# The following directories will exist in the sandbox environment but
# will appear as empty and writable.
# This is useful to have a filesystem that feels like a normal Linux
# environment without actually revealing these directories to the
# sandbox.
EMPTY_WRITABLE_DIRECTORIES = [
"/dev/shm",
"/home/user",
"/run/user/1000",
"/var/run",
"/var/tmp",
"/tmp",
]
# Static parts of the OCI configuration.
OCI_CONFIG_SKELETON = {
"ociVersion": "1.0.0",
"process": {
"user": {"uid": 1000, "gid": 1000},
"args": ["/bin/INVALID"], # Will be filled in.
"env": [
# Basic environment variables.
"EDITOR=cat",
"LANG=C.UTF-8",
"LC_ALL=C.UTF-8",
"LC_CTYPE=C.UTF-8",
"HOME=/home/user",
"HOSTNAME=sandbox",
"PAGER=cat",
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"PWD=/home/user",
"SHLVL=1",
"TERM=xterm",
"USER=user",
],
"cwd": "/home/user",
"capabilities": {
# No capabilities whatsoever.
"bounding": [],
"effective": [],
"inheritable": [],
"permitted": [],
},
"rlimits": [
{"type": "RLIMIT_NOFILE", "hard": 1048576, "soft": 1048576},
],
"noNewPrivileges": True,
},
"root": {
"path": "/invalid", # Will be filled in.
"readonly": True,
},
"hostname": "sandbox",
"mounts": [
{"destination": "/dev", "type": "dev"},
{"destination": "/proc", "type": "proc"},
],
"linux": {
"namespaces": [
{"type": "pid"},
{"type": "ipc"},
{"type": "uts"},
{"type": "mount"},
],
"resources": {
"memory": {
# `limit` may be be filled in here depending on user configuration.
"disableOOMKiller": False,
},
},
},
}
# The path where the `runsc` binary will be downloaded and installed if
# requested.
AUTO_INSTALLATION_PATH = "/tmp/gvisor/runsc"
# Regular expression for log filename prefixes generated by `runsc`.
_LOG_FILENAME_TRUNCATE_RE = re.compile(r"^runsc\.log\.\d{8}-\d{6}(?:\.\d+)?\.")
# Other files worth logging when dumping debug logs.
_EXTRA_DEBUG_LOG_PATHS = (
"/etc/os-release",
"/proc/self/cgroup",
"/proc/self/personality",
"/proc/self/mountinfo",
"/proc/self/setgroups",
"/proc/self/status",
"/proc/self/uid_map",
"/proc/self/gid_map",
"/proc/cmdline",
"/proc/cpuinfo",
"/proc/cgroups",
"/proc/mounts",
"/proc/version",
)
# Other commands worth running when dumping debug logs.
_EXTRA_DEBUG_LOG_COMMANDS = (
("pwd",),
("id",),
("uname", "-a"),
("ls", "-l", "/proc/self/ns"),
(sys.executable, "--version"),
)
# Environment variable used to detect interpreter re-execution.
_MARKER_ENVIRONMENT_VARIABLE = "__CODE_EXECUTION_STAGE"
class _Switcheroo:
"""
Management of the switcheroo procedure for running in a usable cgroup namespace and node.
"""
_CGROUP_ROOT = "/sys/fs/cgroup"
_CGROUP_NAME_PREFIX = "codeeval_"
_CGROUP_MAX_COUNT = 4096
_CGROUP_SANDBOX_NAME = "sandbox"
_CGROUP_SUPERVISOR_NAME = "supervisor"
_CGROUP_LEAF = "leaf"
def __init__(self, log_path, max_sandbox_ram_bytes):
self._log_path = log_path
self._max_sandbox_ram_bytes = max_sandbox_ram_bytes
self._my_euid = None
self._my_egid = None
self._checkpoint = None
self._cgroup_controllers = None
self._needed_controllers = set()
if max_sandbox_ram_bytes is not None:
self._needed_controllers.add("memory")
self._initial_cgroup_name = None
self._codeeval_cgroup_name = None
self._moved = False
self._operations = (
# Save EUID and EGID before we move to a new user namespace.
("save_euid", self._save_euid),
("save_egid", self._save_egid),
("unshare_user", self._unshare_user),
# Map our current user as being root in the new user namespace.
("write_uid_map", self._write_uid_map),
("write_setgroups", self._write_setgroups),
("write_gid_map", self._write_gid_map),
# cgroupfs's view does not take into account cgroup namespaces.
# Weird, right?
# This means `/proc/PID/cgroup` will show the namespaced view of
# the cgroup that the PID is in, but `/sys/fs/cgroup` will still
# contain the whole system cgroup hierarchy regardless of namespace.
# Instead, namespaces act as "boundary box" around process movement
# requests when writing to cgroup.procs or creating new cgroups.
# So our first order of business here is to find out which cgroup we
# are running in. We do this by scanning the whole cgroupfs hierarchy
# and looking for our PID. This will populate
# `self._initial_cgroup_name`.
("find_self_in_cgroup_hierarchy", self._find_self_in_cgroup_hierarchy),
# The cgroup nesting rules are complicated, but the short of it is:
# A cgroup can either **contain processes** OR **have limits**.
# Also, cgroups that contain processes must be leaf nodes.
# Also, cgroups that enforce limits must have their parent cgroup
# also have the same limit "controller" be active.
# So we will have two types of cgroups:
# - Leaf nodes with no controllers
# - Non-leaf nodes with controllers
# So initially, all the processes in the container's initial
# namespace need to be moved out to a new leaf node,
# otherwise we cannot turn on controllers on the initial
# cgroup.
# So we will set up the following hierarchy:
# /sys/fs/cgroup/$INITIAL:
# The cgroup where the container's processes were running
# the first time we run any Sandbox in the container.
# It may initially have no controllers enabled, but we will
# turn them on later.
# /sys/fs/cgroup/$INITIAL/leaf:
# The cgroup where the container's processes are moved to
# from the $INITIAL cgroup upon first run of any Sandbox in
# this container. When this code runs again, processes that
# are already in `$INITIAL/leaf` are not moved.
# /sys/fs/cgroup/$INITIAL/codeeval_$NUM:
# A per-Sandbox cgroup that never contains any processes.
# It will have controllers enabled on it but will never have
# specific limits enforced.
# /sys/fs/cgroup/$INITIAL/codeeval_$NUM/sandbox:
# A per-Sandbox cgroup that never contains any processes.
# It will have controllers enabled on it and will enforce
# resource limits for the processes running in its /leaf.
# /sys/fs/cgroup/$INITIAL/codeeval_$NUM/sandbox/leaf:
# A per-Sandbox cgroup that is running `runsc` (gVisor).
# It has no controllers enabled on it, but resources are
# being enforced by virtue of being a child of
# `$INITIAL/codeeval_$NUM/sandbox` which does enforce limits.
# /sys/fs/cgroup/$INITIAL/codeeval_$NUM/supervisor:
# A per-Sandbox cgroup that never contains any processes.
# It will have controllers enabled on it and will enforce
# resource limits for the processes running in its /leaf.
# /sys/fs/cgroup/$INITIAL/codeeval_$NUM/supervisor/leaf:
# A per-Sandbox cgroup that is running a Python interpreter
# that manages the lifetime of the `runsc` process.
# It will run `Sandbox.maybe_main`.
# It has no controllers enabled on it, but resources are
# being enforced by virtue of being a child of
# `$INITIAL/codeeval_$NUM/sandbox` which does enforce limits.
#
# This particular step creates the `$INITIAL/leaf` cgroup.
# If already created, it does nothing.
("create_initial_leaf_cgroup", self._create_initial_leaf_cgroup),
# Move all processes in `$INITIAL` to `$INITIAL/leaf`.
(
"move_initial_cgroup_processes_to_initial_leaf_cgroup",
self._move_initial_cgroup_processes_to_initial_leaf_cgroup,
),
# Read the cgroup controllers enabled in `$INITIAL`. This acts
# as a bounding set on the ones we can enable in any child of it.
("read_cgroup_controllers", self._read_cgroup_controllers),
# Cleanup old `$INITIAL/codeeval_*` cgroups that may be lying
# around from past runs.
("cleanup_old_cgroups", self._cleanup_old_cgroups),
# Create a new `$INITIAL/codeeval_$NUM` cgroup.
("create_codeeval_cgroup", self._create_codeeval_cgroup),
# Create a new `$INITIAL/codeeval_$NUM/sandbox` cgroup.
("create_sandbox_cgroup", self._create_sandbox_cgroup),
# Create a new `$INITIAL/codeeval_$NUM/sandbox/leaf` cgroup.
("create_sandbox_leaf_cgroup", self._create_sandbox_leaf_cgroup),
# Create a new `$INITIAL/codeeval_$NUM/supervisor` cgroup.
("create_supervisor_cgroup", self._create_supervisor_cgroup),
# Create a new `$INITIAL/codeeval_$NUM/supervisor/leaf` cgroup.
("create_supervisor_leaf_cgroup", self._create_supervisor_leaf_cgroup),
# Add controllers to `$INITIAL`.
(
"add_cgroup_controllers_to_root",
self._add_cgroup_controllers_to_root,
),
# Add controllers to `$INITIAL/codeeval_$NUM`.
(
"add_cgroup_controllers_to_codeeval",
self._add_cgroup_controllers_to_codeeval,
),
# Add controllers to `$INITIAL/codeeval_$NUM/sandbox`.
(
"add_cgroup_controllers_to_sandbox",
self._add_cgroup_controllers_to_sandbox,
),
# Set resource limits on `$INITIAL/codeeval_$NUM`.
("set_sandbox_cgroup_limits", self._set_sandbox_cgroup_limits),
# Add controllers to `$INITIAL/codeeval_$NUM/supervisor`.
(
"add_cgroup_controllers_to_supervisor",
self._add_cgroup_controllers_to_supervisor,
),
# Set resource limits on `$INITIAL/codeeval_$NUM/supervisor`.
("set_supervisor_cgroup_limits", self._set_supervisor_cgroup_limits),
# Move current process to
# `$INITIAL/codeeval_$NUM/supervisor/leaf`.
(
"move_process_to_supervisor_leaf",
self._move_process_to_supervisor_leaf,
),
# Double-check that we have moved to
# `$INITIAL/codeeval_$NUM/supervisor/leaf`.
("sanity_check_own_cgroup", self._sanity_check_own_cgroup),
)
def _status(self):
"""
Return the current switcheroo status.
:return: The last successful operation name, "UNSTARTED" if unstarted, or "OK" if all done, and some information.
"""
main_status = self._checkpoint
if self._checkpoint is None:
main_status = "UNSTARTED"
if self._checkpoint == self._operations[-1][0]:
main_status = "OK"
my_pid = os.getpid()
status_line = f"{main_status} (euid={self._my_euid} egid={self._my_egid} pid={my_pid} initial_cgroup_name={self._initial_cgroup_name} codeeval_cgroup_name={self._codeeval_cgroup_name} controllers={self._cgroup_controllers})"
cgroupfs_data = []
for cgroup_components in (
(self._initial_cgroup_name,),
(self._initial_cgroup_name, self._CGROUP_LEAF),
(self._initial_cgroup_name, self._codeeval_cgroup_name),
(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_LEAF,
),
(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SUPERVISOR_NAME,
),
(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SUPERVISOR_NAME,
self._CGROUP_LEAF,
),
(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SANDBOX_NAME,
),
(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SANDBOX_NAME,
self._CGROUP_LEAF,
),
):
if any(c is None for c in cgroup_components):
continue
file_data = []
for filename in ("procs", "controllers", "subtree_control"):
data = None
try:
with self._open(
self._cgroup_path(
*(cgroup_components + (f"cgroup.{filename}",))
),
"rb",
) as f:
data = f.read().decode("ascii").replace("\n", " ")
except Exception as e:
data = f"[fail: {e}]"
file_data.append(f"{filename}: {data}")
cgroup_components_joined = os.sep.join(cgroup_components)
file_data_joined = ", ".join(file_data)
cgroupfs_data.append(f"{cgroup_components_joined}=[{file_data_joined}]")
if len(cgroupfs_data) > 0:
cgroupfs_data_joined = " ".join(cgroupfs_data)
status_line += f" {cgroupfs_data_joined}"
return status_line
def _cgroup_path(self, *components):
assert all(
c is not None for c in components
), f"Tried to build cgroup path with not-yet-determined component: {components}"
return os.path.join(self._CGROUP_ROOT, *(c for c in components if c))
def _log(self, log_f, message):
"""
Log a message to `log_f`.
:param log_f: Log file object.
"""
timestamp = time.strftime("%H:%M:%S")
status = self._status()
log_f.write(f"[{timestamp}] {message} [{status}]\n".encode("utf-8"))
def do(self):
"""
Do the switcheroo.
:raises OSError: If anything goes wrong. Progress is saved.
"""
op_index = -1
for i, (op, _) in enumerate(self._operations):
if self._checkpoint == op:
op_index = i
break
with self._open(self._log_path, "ab") as log_f:
do_log = lambda s: self._log(log_f, s)
for op, fn in self._operations[op_index + 1 :]:
do_log(f"Starting operation: {op}")
errors = []
success = False
for attempt in range(1, 4):
try:
fn()
except OSError as e:
do_log(f"OSError #{attempt}: {op}: {e}")
errors.append(OSError(f"OSError in {op} (#{attempt}): {e}"))
except Exception as e:
do_log(f"Exception #{attempt}: {op}: {e}")
errors.append(OSError(f"{op} failed (#{attempt}): {e}"))
else:
success = True
break
time.sleep(0.1)
if success:
self._checkpoint = op
do_log(f"Success: {op}")
continue
assert len(errors) > 0, "Logic error"
first_exception = errors[0]
if len(errors) == 1:
raise first_exception
other_exceptions = "; ".join(str(e) for e in errors[1:])
raise errors[0].__class__(
f"{first_exception} (other attempts: {other_exceptions})"
)
def _best_effort_remove_cgroup_subtree(self, codeeval_name):
for cgroup_components in (
(
self._initial_cgroup_name,
codeeval_name,
self._CGROUP_SANDBOX_NAME,
self._CGROUP_LEAF,
),
(
self._initial_cgroup_name,
codeeval_name,
self._CGROUP_SUPERVISOR_NAME,
self._CGROUP_LEAF,
),
(self._initial_cgroup_name, codeeval_name, self._CGROUP_SANDBOX_NAME),
(
self._initial_cgroup_name,
codeeval_name,
self._CGROUP_SUPERVISOR_NAME,
),
(self._initial_cgroup_name, codeeval_name),
):
try:
os.rmdir(self._cgroup_path(*cgroup_components))
except OSError:
pass
def cleanup(self):
if self._moved:
self._move_process_back()
if self._codeeval_cgroup_name is not None:
self._best_effort_remove_cgroup_subtree(self._codeeval_cgroup_name)
def _open(self, path, mode):
try:
return open(path, mode)
except OSError as e:
raise OSError(f"opening {path} mode={mode}: {e}")
def _save_euid(self):
self._my_euid = os.geteuid()
def _save_egid(self):
self._my_egid = os.getegid()
def _unshare_user(self):
Sandbox.unshare(
os.CLONE_NEWUSER if "CLONE_NEWUSER" in os.__dict__ else 0x10000000
)
def _write_uid_map(self):
with self._open("/proc/self/uid_map", "wb") as uid_map_f:
uid_map_f.write(f"0 {self._my_euid} 1\n".encode("ascii"))
def _write_setgroups(self):
with self._open("/proc/self/setgroups", "wb") as setgroups_f:
setgroups_f.write(b"deny")
def _write_gid_map(self):
with self._open("/proc/self/gid_map", "wb") as gid_map_f:
gid_map_f.write(f"0 {self._my_egid} 1\n".encode("ascii"))
def _find_self_in_cgroup_hierarchy(self):
my_pid = os.getpid()
cgroup_root_slash = self._CGROUP_ROOT + os.sep
found_cgroup = None
num_checked = 0
num_except = 0
sample_exception = None
for dirpath, _, subfiles in os.walk(
self._CGROUP_ROOT, onerror=None, followlinks=False
):
if not dirpath.startswith(cgroup_root_slash):
continue
if "cgroup.procs" not in subfiles:
continue
num_checked += 1
found_pid = False
try:
with self._open(
os.path.join(dirpath, "cgroup.procs"), "rb"
) as cgroup_procs_f:
for line in cgroup_procs_f:
for pid_str in line.strip().split(b" "):
if not pid_str:
continue
try:
pid = int(pid_str)
except ValueError:
continue
if pid == my_pid:
found_pid = True
break
except Exception as e:
num_except += 1
if sample_exception is None:
sample_exception = e.__class__(f"{dirpath}: {e}")
continue
if not found_pid:
continue
current_cgroup = dirpath[len(cgroup_root_slash) :]
if found_cgroup is not None:
raise OSError(
f"Found PID {my_pid} in two separate cgroups: {found_cgroup} and {current_cgroup}; racing with another process?"
)
found_cgroup = current_cgroup
if found_cgroup is None:
raise OSError(
f"PID {my_pid} could not be found in any cgroup (checked {num_checked} cgroups, {num_except} exceptions; sample: {sample_exception})"
)
if found_cgroup.endswith(os.sep + self._CGROUP_LEAF):
found_cgroup = found_cgroup[: -len(os.sep + self._CGROUP_LEAF)]
self._initial_cgroup_name = found_cgroup
def _read_cgroup_controllers(self):
cgroup_controllers = []
with self._open(
self._cgroup_path(self._initial_cgroup_name, "cgroup.controllers"), "rb"
) as cgroup_controllers_f:
for line in cgroup_controllers_f:
for controller in line.strip().split(b" "):
if controller and controller not in cgroup_controllers:
cgroup_controllers.append(controller.decode("ascii"))
self._cgroup_controllers = cgroup_controllers
def _cleanup_old_cgroups(self):
initial_cgroup_path = self._cgroup_path(self._initial_cgroup_name)
for filename in os.listdir(initial_cgroup_path):
if not filename.startswith(self._CGROUP_NAME_PREFIX):
continue
cgroup_path = os.path.join(initial_cgroup_path, filename)
if not os.path.isdir(cgroup_path):
continue
self._best_effort_remove_cgroup_subtree(filename)
def _create_initial_leaf_cgroup(self):
try:
os.mkdir(
self._cgroup_path(self._initial_cgroup_name, self._CGROUP_LEAF),
mode=0o755,
)
except FileExistsError:
pass
def _create_codeeval_cgroup(self):
for counter in range(0, self._CGROUP_MAX_COUNT):
codeeval_cgroup_name_candidate = f"{self._CGROUP_NAME_PREFIX}{counter}"
cgroup_path = self._cgroup_path(
self._initial_cgroup_name, codeeval_cgroup_name_candidate
)
try:
os.mkdir(cgroup_path, mode=0o755)
except FileExistsError:
pass
else:
self._codeeval_cgroup_name = codeeval_cgroup_name_candidate
return
initial_cgroup_path_prefix = self._cgroup_path(
self._initial_cgroup_name, self._CGROUP_NAME_PREFIX
)
raise OSError(
f"Out of cgroups (tried {initial_cgroup_path_prefix}NUM with NUM from 0 to {self._CGROUP_MAX_COUNT-1} and they all already exist)"
)
def _create_sandbox_cgroup(self):
os.mkdir(
self._cgroup_path(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SANDBOX_NAME,
),
mode=0o755,
)
def _create_sandbox_leaf_cgroup(self):
os.mkdir(
self._cgroup_path(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SANDBOX_NAME,
self._CGROUP_LEAF,
),
mode=0o755,
)
def _create_supervisor_cgroup(self):
os.mkdir(
self._cgroup_path(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SUPERVISOR_NAME,
),
mode=0o755,
)
def _create_supervisor_leaf_cgroup(self):
os.mkdir(
self._cgroup_path(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SUPERVISOR_NAME,
self._CGROUP_LEAF,
),
mode=0o755,
)
def _add_cgroup_controllers_to(self, *cgroup_components):
add_controllers = tuple(
controller
for controller in self._cgroup_controllers
if controller in self._needed_controllers
)
cgroup_components = tuple(cgroup_components) + ("cgroup.subtree_control",)
cgroup_subtree_control_path = self._cgroup_path(*cgroup_components)
try:
with self._open(
cgroup_subtree_control_path, "wb"
) as cgroup_subtree_control_f:
controllers_data = (
" ".join(f"+{controller}" for controller in add_controllers)
+ "\n"
)
cgroup_subtree_control_f.write(controllers_data.encode("ascii"))
except OSError as e:
raise OSError(
f"Adding controllers {add_controllers} on {cgroup_subtree_control_path}: {e}"
)
got_controllers = set()
try:
with self._open(
cgroup_subtree_control_path, "rb"
) as cgroup_subtree_control_f:
for line in cgroup_subtree_control_f:
for controller in line.strip().split(b" "):
if not controller:
continue
got_controllers.add(controller.decode("ascii"))
except OSError as e:
raise OSError(
f"Reading controllers from {cgroup_subtree_control_path}: {e}"
)
assert all(
controller in got_controllers for controller in add_controllers
), f"Missing controllers in {cgroup_subtree_control_path}: got {got_controllers} expected {add_controllers}"
def _add_cgroup_controllers_to_root(self):
return self._add_cgroup_controllers_to(self._initial_cgroup_name)
def _add_cgroup_controllers_to_codeeval(self):
return self._add_cgroup_controllers_to(
self._initial_cgroup_name, self._codeeval_cgroup_name
)
def _add_cgroup_controllers_to_sandbox(self):
return self._add_cgroup_controllers_to(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SANDBOX_NAME,
)
def _add_cgroup_controllers_to_supervisor(self):
return self._add_cgroup_controllers_to(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SUPERVISOR_NAME,
)
def _move_initial_cgroup_processes_to_initial_leaf_cgroup(self):
initial_cgroup_procs_path = self._cgroup_path(
self._initial_cgroup_name, "cgroup.procs"
)
initial_leaf_cgroup_procs_path = self._cgroup_path(
self._initial_cgroup_name, self._CGROUP_LEAF, "cgroup.procs"
)
done_zero_pid = False
while True:
moving_process_pid = None
with self._open(
initial_cgroup_procs_path, "rb"
) as initial_cgroup_procs_f:
for line in initial_cgroup_procs_f:
if moving_process_pid is not None:
continue
for pid_str in line.strip().split(b" "):
if not pid_str:
continue
try:
pid = int(pid_str)
except ValueError:
continue
if pid == 0:
if done_zero_pid:
continue
done_zero_pid = True
moving_process_pid = pid
break
if moving_process_pid is None:
break
with self._open(
initial_leaf_cgroup_procs_path, "wb"
) as initial_leaf_cgroup_procs_f:
initial_leaf_cgroup_procs_f.write(
f"{moving_process_pid}\n".encode("ascii")
)
def _move_process_to_supervisor_leaf(self):
supervisor_leaf_cgroup_procs_path = self._cgroup_path(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SUPERVISOR_NAME,
self._CGROUP_LEAF,
"cgroup.procs",
)
f = self._open(supervisor_leaf_cgroup_procs_path, "wb")
try:
f.write(b"0\n")
self._moved = True
finally:
try:
f.close()
except OSError as e:
pass
def _move_process_back(self):
initial_leaf_cgroup_procs_path = self._cgroup_path(
self._initial_cgroup_name, self._CGROUP_LEAF, "cgroup.procs"
)
f = self._open(initial_leaf_cgroup_procs_path, "wb")
try:
f.write(b"0\n")
self._moved = False
finally:
try:
f.close()
except OSError:
pass
def _set_cgroup_limits(self, *cgroup_components):
cgroup_components = tuple(cgroup_components) + ("memory.max",)
cgroup_memory_max_path = self._cgroup_path(*cgroup_components)
if self._max_sandbox_ram_bytes is not None:
try:
with self._open(cgroup_memory_max_path, "wb") as memory_max_f:
memory_max_f.write(
f"{self._max_sandbox_ram_bytes}\n".encode("ascii")
)
except OSError as e:
raise OSError(
f"Trying to set max RAM limit to {self._max_sandbox_ram_bytes} bytes: {e}"
)
def _set_supervisor_cgroup_limits(self):
return self._set_cgroup_limits(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SUPERVISOR_NAME,
)
def _set_sandbox_cgroup_limits(self):
return self._set_cgroup_limits(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SANDBOX_NAME,
)
def _sanity_check_own_cgroup(self):
supervisor_cgroup_path = self._cgroup_path(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SUPERVISOR_NAME,
)
with self._open("/proc/self/cgroup", "rb") as cgroup_f:
cgroup_data = cgroup_f.read().decode("ascii").strip()
assert cgroup_data.endswith(
os.sep + os.path.join(self._CGROUP_SUPERVISOR_NAME, self._CGROUP_LEAF)
), f"Unexpected self cgroup after moving to {supervisor_cgroup_path}: {cgroup_data}"
def move_process_to_sandbox_leaf_cgroup_lambda(self):
"""
Returns a function that can move the current process to the sandbox leaf cgroup.
:return: A function to move the current process to the sandbox cgroup.
:raises SandboxException: If not queried after we have already chosen a new cgroup name.
"""
if self._codeeval_cgroup_name is None:
raise Sandbox.SandboxException(
"Tried to move process to sandbox leaf cgroup before we know it"
)
def _move(cgroup_path):
"""Dependency-free preexec_fn-compatible function to move to the given cgroup.procs."""
try:
f = open(cgroup_path, "wb")
except OSError as e:
raise OSError(f"Cannot open cgroup path {cgroup_path}: {e}")
try:
f.write(b"0\n")
except OSError as e:
raise OSError(f"Cannot move process to {cgroup_path}: {e}")
finally:
try:
f.close()
except OSError:
pass
clone_newcgroup = (
os.CLONE_NEWCGROUP
if "CLONE_NEWCGROUP" in os.__dict__
else 0x2000000
)
if "unshare" in os.__dict__: # Python >= 3.12.
try:
os.unshare(clone_newcgroup)
except OSError as e:
raise OSError(f"unshare({clone_newcgroup}) failed: {e}")
else:
import ctypes
libc = ctypes.CDLL(None)
libc.unshare.argtypes = [ctypes.c_int]
rc = libc.unshare(clone_newcgroup)
if rc == -1:
raise OSError(f"unshare({clone_newcgroup}) failed")
sandbox_cgroup_procs_path = self._cgroup_path(
self._initial_cgroup_name,
self._codeeval_cgroup_name,
self._CGROUP_SANDBOX_NAME,
self._CGROUP_LEAF,
"cgroup.procs",
)[:]
return lambda: _move(sandbox_cgroup_procs_path)
class SandboxException(Exception):
"""
Base class for all exceptions generated by `Sandbox`.
"""
def __init__(self, *args, **kwargs):
self._sandbox_exception_args = tuple(args)
self._sandbox_exception_kwargs = dict(kwargs)
super().__init__(*args, **kwargs)
class PlatformNotSupportedException(SandboxException):
"""
Raised when the sandbox cannot run on the current platform.
The only way to fix this is to run on a different platform.
"""
class SandboxRuntimeException(SandboxException):
"""
Raised when the sandbox fails to run properly.
This means gVisor itself is failing, not the code in the sandbox.
"""
class ExecutionError(subprocess.CalledProcessError):
"""
Raised when the sandboxed code fails to run.
This means the sandbox worked, but the code that ran within failed.
"""
def __init__(self, code, **kwargs):
super().__init__(**kwargs)
self._code = code
self._sandbox_exception_args = ()
self._sandbox_exception_kwargs = kwargs.copy()
self._sandbox_exception_kwargs["code"] = code
def __str__(self):
super_str = super().__str__()
full_code = self._code
short_code = full_code.replace("\n", ";")
if len(short_code) >= 128:
short_code = short_code[:60] + "…" + short_code[-60:]
if self.stderr:
lines = [l.strip() for l in self.stderr.split("\n") if l.strip()]
if len(lines) >= 2:
first_line, last_line = lines[0], lines[-1]
return f"{first_line} […] {last_line} (`{short_code}`)\n{super_str}\n```\n{full_code}\n```"
if len(lines) == 1:
first_line = lines[0]
return f"{first_line} (`{short_code}`)\n{super_str}\n```\n{full_code}\n```"
return f"`{short_code}` failed\n{super_str}\n```\n{full_code}\n```"
class CodeExecutionError(ExecutionError):
"""
Raised when the sandboxed code returns with a non-zero exit code.
"""
def __str__(self):
super_str = super().__str__()
return f"Code error: {super_str}"
class ExecutionTimeoutError(ExecutionError):
"""
Raised when the code runs for too long relative to the timeout.
"""
def __str__(self):
super_str = super().__str__()
return f"Timeout: {super_str}"
class InterruptedExecutionError(ExecutionError):
"""
Raised when the code runs but is interrupted before it finishes.
This could happen from running out of resources.
"""
def __str__(self):
super_str = super().__str__()
return f"Interrupted: {super_str}"
class FixableException(SandboxException):
"""
Base class for exceptions which can be addressed by the user.
"""
class GVisorNotInstalledException(FixableException):
"""
Raised when gVisor is not installed (`runsc` not found in $PATH).
"""
class CorruptDownloadException(FixableException):
"""
Raised when auto-downloading gVisor resulted in a hash mismatch.
"""
class EnvironmentNeedsSetupException(FixableException):
"""
Raised when the environment does not give adequate control over the
system to run gVisor properly.
"""
@classmethod
def check_platform(cls):
"""
Verifies that this tool is running on a supported platform.
:return: Nothing.
:raises PlatformNotSupportedException: If running on an unsupported platform.
"""
uname = platform.uname()
if uname.system != "Linux":
raise cls.PlatformNotSupportedException(f"{uname.system} is not supported")
if uname.machine not in ("x86_64", "aarch64"):
raise cls.PlatformNotSupportedException(f"{uname.machine} is not supported")
@classmethod
def check_cgroups(cls):
"""
Verifies that cgroupfs is mounted and usable for resource limit enforcement.
:return: Nothing.
:raises EnvironmentNeedsSetupException: If cgroupfs is not mounted or unusable for resource limit enforcement.
"""
if not os.path.exists("/sys/fs/cgroup"):
raise cls.EnvironmentNeedsSetupException(
"cgroupfs not mounted as /sys/fs/cgroup but necessary for the sandbox to enforce memory limits; please mount it (`--mount=type=bind,source=/sys/fs/cgroup,target=/sys/fs/cgroup,readonly=false`)"
)
if not os.path.exists("/sys/fs/cgroup/cgroup.subtree_control"):
raise cls.EnvironmentNeedsSetupException(
"/sys/fs/cgroup/cgroup.subtree_control not found; make sure you are using cgroups v2"
)
# Try to open the file for writing to see if we can actually control cgroups.
# They may be mounted read-only, as is default with Docker.
try:
with open(
"/sys/fs/cgroup/cgroup.subtree_control", "wb"
) as subtree_control_f:
pass
except OSError:
if os.geteuid() != 0:
raise cls.EnvironmentNeedsSetupException(
"This script is not running as root, but it needs to do so in order to enforce resource limits; please run as root"
)
raise cls.EnvironmentNeedsSetupException(
"cgroupfs is not mounted writable but necessary for the sandbox to enforce memory limits; please remount it as writable (`--mount=type=bind,source=/sys/fs/cgroup,target=/sys/fs/cgroup,readonly=false`)"
)
with open("/sys/fs/cgroup/cgroup.controllers", "rb") as subtree_control_f:
controllers = subtree_control_f.read().decode("ascii").split(" ")
if "memory" not in controllers:
raise cls.EnvironmentNeedsSetupException(
"cgroupfs does not have the 'memory' controller enabled, necessary to enforce memory limits; please enable it"
)
@classmethod
def cgroups_available(cls) -> bool:
"""
Returns whether cgroupfs is mounted and usable for resource limit enforcement.
:return: Whether cgroupfs is mounted and usable for resource limit enforcement.
"""
try:
cls.check_cgroups()
except:
return False
else:
return True
@staticmethod
def unshare(flags):
"""
Implementation of `os.unshare` that works on Python < 3.12.
:param flags: Flags to pass to the `unshare(2)` system call.
:raises OSError: If something goes wrong.
"""
if "unshare" in os.__dict__: # Python >= 3.12.
return os.unshare(flags)
# Python <= 3.11:
import ctypes
libc = ctypes.CDLL(None)
libc.unshare.argtypes = [ctypes.c_int]
rc = libc.unshare(flags)
if rc == -1:
raise OSError(f"unshare({flags}) failed")
@classmethod
def check_unshare(cls):
"""
Verifies that the `unshare(2)` system call is available.
:return: Nothing.
:raises EnvironmentNeedsSetupException: If `unshare(2)` is not available.
"""
try:
cls.unshare(0)
except OSError:
raise cls.EnvironmentNeedsSetupException(
"`unshare(2)` syscall is unavailable but necessary for the sandbox to isolate itself; please remove the seccomp filter (`--security-opt=seccomp=unconfined`)"
)
@classmethod
def get_runsc_path(cls):
"""
Returns the absolute path where the `runsc` binary is installed.
:return: Absolute path to `runsc` binary, or `None` if not installed.
"""
runsc_path = shutil.which("runsc")
if runsc_path:
return runsc_path
if os.path.exists(cls.AUTO_INSTALLATION_PATH):
return cls.AUTO_INSTALLATION_PATH
return None
@classmethod
def runsc_needs_installation(cls):
"""
Checks whether the `runsc` binary is installed.
:return: Whether the `runsc` binary is installed.
"""
return cls.get_runsc_path() is None
@classmethod
def install_runsc(cls):
"""
Download and install the `runsc` binary if not already present.
:return: Nothing.
:raises CorruptDownloadException: If the download resulted in a hash mismatch.
"""
if not cls.runsc_needs_installation():
return
uname = platform.uname()
release_url_dir = f"https://storage.googleapis.com/gvisor/releases/release/latest/{uname.machine}"
os.makedirs(
os.path.dirname(cls.AUTO_INSTALLATION_PATH), mode=0o755, exist_ok=True
)
with tempfile.TemporaryDirectory(
prefix="sandbox_download_"
) as download_tmp_dir:
download_path = os.path.join(download_tmp_dir, "runsc")
urllib.request.urlretrieve(
url=f"{release_url_dir}/runsc",
filename=download_path,
)
sha512_raw = urllib.request.urlopen(
f"{release_url_dir}/runsc.sha512"
).read()
want_sha512 = sha512_raw.decode("ascii").split(" ")[0]
runsc_hash = hashlib.sha512()
with open(download_path, "rb") as runsc_f:
while True:
chunk = runsc_f.read(65536)
if not chunk:
break
runsc_hash.update(chunk)
runsc_sha512 = runsc_hash.hexdigest()
if runsc_sha512 != want_sha512:
raise cls.CorruptDownloadException(
"gVisor hash mismatch when auto-installing; please install gVisor manually"
)
os.chmod(download_path, mode=0o755)
os.rename(download_path, cls.AUTO_INSTALLATION_PATH)
@classmethod
def check_setup(cls, language: str, auto_install_allowed: bool):
"""
Verifies that the environment is compatible with running sandboxes.
:param language: The programming language to run.
:param auto_install_allowed: Whether auto-installation of `runsc` is allowed.
:return: Nothing.
:raises ValueError: If provided an invalid language name.
:raises PlatformNotSupportedException: If running on an unsupported platform.
:raises FixableException: If another issue occurs but that can be fixed by the user.
"""
if language not in cls.SUPPORTED_LANGUAGES:
raise ValueError(f"Unsupported language: {language}")
if shutil.which("bash") is None:
raise cls.EnvironmentNeedsSetupException(
"bash is not installed (`bash` binary not found in $PATH); please install it"
)
if shutil.which("unshare") is None:
raise cls.EnvironmentNeedsSetupException(
"unshare is not installed (`unshare` binary not found in $PATH); please install it"
)
cls.check_platform()
cls.check_unshare()
cls.check_cgroups()
if not auto_install_allowed and cls.get_runsc_path() is None:
raise cls.GVisorNotInstalledException(
"gVisor is not installed (runsc binary not found in $PATH); please install it or enable AUTO_INSTALL valve for auto installation"
)
@classmethod
def maybe_main(cls):
"""
Entry-point for re-execution.
May call `sys.exit` if this is intended to be a code evaluation re-execution.
"""
if os.environ.get(cls._MARKER_ENVIRONMENT_VARIABLE) is None:
return
directives = json.load(sys.stdin)
try:
result = cls(**directives["settings"])._run()
except Exception as e:
exception_info = {
"name": e.__class__.__name__,
"str": str(e),
}
if isinstance(e, cls.SandboxException) or isinstance(e, cls.ExecutionError):
exception_info["args"] = e._sandbox_exception_args
exception_info["kwargs"] = e._sandbox_exception_kwargs
json.dump(
{
"exception": exception_info,
},
sys.stdout,
)
else:
stdout = result.stdout
if type(stdout) != type(b""):
stdout = stdout.encode("utf-8", errors="replace")
stderr = result.stderr
if type(stderr) != type(b""):
stderr = stderr.encode("utf-8", errors="replace")
json.dump(
{
"result": {
"args": result.args,
"returncode": result.returncode,
"stdout": base64.b64encode(stdout).decode("utf-8"),
"stderr": base64.b64encode(stderr).decode("utf-8"),
},
},
sys.stdout,
)
finally:
sys.stdout.flush()
sys.exit(0)
def __init__(
self,
tmp_dir: str,
language: str,
code: str,
debug: bool,
networking_allowed: bool,
max_runtime_seconds: int,
max_ram_bytes: typing.Optional[int],
persistent_home_dir: typing.Optional[str],
):
"""
Constructor.
:param tmp_dir: Temporary directory exclusive to this sandbox. Must outlive the Sandbox object.
:param language: The language of the code; must be one of `SUPPORTED_LANGUAGES`.
:param code: Arbitrary code that needs to run in the sandbox.
:param debug: Whether or not to enable debug-level logging for the sandbox.
:param networking_allowed: Whether the code should be given access to the network.
:param max_runtime_seconds: How long the code should be allowed to run, in seconds.
:param max_ram_bytes: How many bytes of RAM the interpreter should be allowed to use, or `None` for no limit.
:param persistent_home_dir: Optional directory which will be mapped read-write to this real host directory.
"""
self._init(
{
"tmp_dir": tmp_dir,
"language": language,
"code": code,
"debug": debug,
"networking_allowed": networking_allowed,
"max_runtime_seconds": max_runtime_seconds,
"max_ram_bytes": max_ram_bytes,
"persistent_home_dir": persistent_home_dir,
}
)
def _init(self, settings):
self._settings = settings
self._tmp_dir = self._settings["tmp_dir"]
self._bundle_path = os.path.join(self._tmp_dir, "bundle")
self._runtime_root_path = os.path.join(self._tmp_dir, "runtime")
self._logs_path = os.path.join(self._tmp_dir, "logs")
self._gotmp_dir = os.path.join(self._tmp_dir, "gotmp")
self._sandbox_shared_path = os.path.join(self._tmp_dir, "sandbox")
self._language = self._settings["language"]
self._code = self._settings["code"]
self._debug = self._settings["debug"]
self._networking_allowed = self._settings["networking_allowed"]
self._max_runtime_seconds = self._settings["max_runtime_seconds"]
self._max_ram_bytes = self._settings["max_ram_bytes"]
self._persistent_home_dir = self._settings["persistent_home_dir"]
self._sandboxed_command = None
self._switcheroo = self._Switcheroo(
log_path=os.path.join(self._logs_path, "switcheroo.txt"),
max_sandbox_ram_bytes=self._max_ram_bytes,
)
def _setup_sandbox(self):
"""
Set up the sandbox's root filesystem and OCI config prior to execution.
Runs in separate forked process. Performs the switcheroo.
:raises FixableException: If an issue occurs but that can be fixed by the user.
"""
# Set up basic configuration options.
oci_config = copy.deepcopy(self.OCI_CONFIG_SKELETON)
if self._max_ram_bytes:
oci_config["linux"]["resources"]["memory"]["limit"] = self._max_ram_bytes
os.makedirs(self._bundle_path, mode=0o711)
os.makedirs(self._runtime_root_path, mode=0o711)
os.makedirs(self._logs_path, mode=0o711)
os.makedirs(self._sandbox_shared_path, mode=0o777)
os.makedirs(self._gotmp_dir, mode=0o711)
os.chmod(self._sandbox_shared_path, mode=0o777, follow_symlinks=False)
rootfs_path = os.path.join(self._tmp_dir, "rootfs")
os.makedirs(rootfs_path, mode=0o755)
if self._persistent_home_dir is not None:
if not os.path.isdir(self._persistent_home_dir):
raise self.SandboxException(
f"Persistent home directory {self._persistent_home_dir} does not exist"
)
try:
self._switcheroo.do()
except Exception as e:
try:
switcheroo_status = self._switcheroo._status()
except:
raise e
else:
raise e.__class__(f"{e}; {switcheroo_status}")
oci_config["root"]["path"] = rootfs_path
# Locate the interpreter to use.
interpreter_path = sys.executable
if self._language == self.LANGUAGE_BASH:
interpreter_path = shutil.which("bash")
if interpreter_path is None:
raise RuntimeError("Interpreter not found")
oci_config["mounts"].append(
{
"type": "bind",
"source": interpreter_path,
"destination": interpreter_path,
"options": ["ro"],
}
)
# Populate rootfs. This is a multi-step process.
# Create writable empty directories.
for d in self.EMPTY_WRITABLE_DIRECTORIES:
rootfs_subdir = os.path.join(rootfs_path, d.removeprefix(os.path.sep))
os.makedirs(rootfs_subdir, mode=0o755, exist_ok=True)
oci_config["mounts"].append(
{
"type": "tmpfs",
"destination": d,
"options": [],
}
)
# Create read-only empty directories.
for d in self.EMPTY_READ_ONLY_DIRECTORIES + [os.path.dirname(interpreter_path)]:
rootfs_subdir = os.path.join(rootfs_path, d.removeprefix(os.path.sep))
os.makedirs(rootfs_subdir, mode=0o755, exist_ok=True)
# Handle exposed host symlinks. These will show up as symlinks with the same
# target path in the sandbox, so they do not expose the host's view of the
# directory they point to.
symlinks = set()
for l in self.EXPOSED_SYSTEM_DIRECTORIES + self.EXPOSED_SYSTEM_FILES:
if not os.path.islink(l):
continue
rootfs_subpath = os.path.join(rootfs_path, l.removeprefix(os.path.sep))
os.makedirs(os.path.dirname(rootfs_subpath), mode=0o755, exist_ok=True)
os.symlink(src=os.readlink(l), dst=rootfs_subpath)
symlinks.add(l)
# Handle exposed host directories.
for d in self.EXPOSED_SYSTEM_DIRECTORIES:
if d in symlinks:
continue # It is a symlink, so already handled.
if not os.path.isdir(d):
continue # The host does not have a directory at this path.
rootfs_subdir = os.path.join(rootfs_path, d.removeprefix(os.path.sep))
os.makedirs(rootfs_subdir, mode=0o755, exist_ok=True)
oci_config["mounts"].append(
{
"type": "bind",
"source": d,
"destination": d,
"options": ["ro", "rbind"],
}
)
# Handle exposed host files.
for f in self.EXPOSED_SYSTEM_FILES:
if f in symlinks:
continue # It is a symlink, so already handled.
if not os.path.isfile(f):
continue # The host does not have a file at this path.
rootfs_subpath = os.path.join(rootfs_path, f.removeprefix(os.path.sep))
rootfs_subdir = os.path.dirname(rootfs_subpath)
os.makedirs(rootfs_subdir, mode=0o755, exist_ok=True)
oci_config["mounts"].append(
{
"type": "bind",
"source": f,
"destination": f,
"options": ["ro"],
}
)
# Shared sandbox directory to propagate exit code and persistent files.
oci_config["mounts"].append(
{
"type": "bind",
"source": self._sandbox_shared_path,
"destination": "/sandbox",
"options": ["rw"],
}
)
if self._persistent_home_dir is not None:
oci_config["mounts"].append(
{
"type": "bind",
"source": self._persistent_home_dir,
"destination": "/sandbox/persistent",
"options": ["rw"],
}
)
# Sort mounts to ensure proper overlay order.
oci_config["mounts"].sort(key=lambda m: m["destination"])
# Generate some /etc files that look normal.
with open(os.path.join(rootfs_path, "etc/hostname"), "w") as hostname_f:
hostname_f.write("sandbox\n")
with open(os.path.join(rootfs_path, "etc/passwd"), "w") as passwd_f:
passwd_f.write("user:x:1000:1000:user:/home/user:/bin/bash\n")
# Generate command line to run in the sandbox.
self._sandboxed_command = [
shutil.which("bash"),
"-c",
"; ".join(
(
"echo OK > /sandbox/started",
f"{interpreter_path} /dev/stdin",
'echo "$?" > /sandbox/.pre_exit_code || exit 1',
"if [[ -d /sandbox/persistent ]]; then cp -rd --one-file-system /home/user/. /sandbox/persistent/ || exit 2; fi",
"mv /sandbox/.pre_exit_code /sandbox/exit_code || exit 3",
"exit 0",
)
),
]
# Work around issue that gVisor does not preserve correct UID mappings when running as non-root user in the sandbox.
# So map current user to 0:0, then create a new userns immediately before running command and remap to correct UID/GID.
oci_config["process"]["user"]["uid"] = 0
oci_config["process"]["user"]["gid"] = 0
oci_config["process"]["args"] = [
shutil.which("unshare"),
"--map-user=1000",
"--map-group=1000",
] + self._sandboxed_command
# We are done. Write OCI config to bundle directory.
with open(os.path.join(self._bundle_path, "config.json"), "w") as bundle_f:
json.dump(oci_config, bundle_f, indent=2, sort_keys=True)
def _run(self) -> subprocess.CompletedProcess:
"""
Spawn and wait for the sandbox. Runs in separate forked process.
:return: A `CompletedProcess` object representing the return code and stdout/stderr of the code interpreter.
:raises Sandbox.SandboxRuntimeException: If the sandbox failed to start or behaved incorrectly regardless of the code being evaluated.
:raises Sandbox.ExecutionTimeoutError: If the code interpreter ran for longer than configured.
:raises Sandbox.InterruptedExecutionError: If the code interpreter died without providing a return code; usually due to running over resource limits.
:raises sandbox.CodeExecutionError: If the code interpreter failed to execute the given code. This does not represent a sandbox failure.
"""
try:
self._setup_sandbox()
network_mode = "host" if self._networking_allowed else "none"
runsc_argv = [
self.get_runsc_path(),
"--rootless=true",
"--directfs=false",
f"--network={network_mode}",
f"--ignore-cgroups=true", # We already took care of cgroups manually.
f"--root={self._runtime_root_path}",
f"--debug-log={self._logs_path}/",
"run",
f"--bundle={self._bundle_path}",
"sandbox",
]
runsc_env = os.environ.copy()
runsc_env["TMPDIR"] = self._gotmp_dir
started_marker_path = os.path.join(self._sandbox_shared_path, "started")
try:
result = subprocess.run(
runsc_argv,
env=runsc_env,
preexec_fn=self._switcheroo.move_process_to_sandbox_leaf_cgroup_lambda(),
input=self._code + "\n",
text=True,
capture_output=True,
timeout=self._max_runtime_seconds,
check=True,
)
except subprocess.TimeoutExpired as e:
raise self.ExecutionTimeoutError(
code=self._code,
returncode=126,
cmd=self._sandboxed_command,
output=e.stdout,
stderr=e.stderr,
)
except subprocess.CalledProcessError as e:
if os.path.isfile(started_marker_path):
raise self.InterruptedExecutionError(
code=self._code,
returncode=127,
cmd=self._sandboxed_command,
output=e.stdout,
stderr=e.stderr,
)
logs = {}
def process_log(filename, log_line):
if self._debug or (
log_line and log_line[0] in "WEF"
): # Warning, Error, Fatal
if filename not in logs:
logs[filename] = []
logs[filename].append(log_line)
self.debug_logs(process_log)
stderr = e.stderr.strip()
json_logs = json.dumps(logs)
if self._debug:
raise self.SandboxRuntimeException(
f"Sandbox failed to start: {e}; stderr: {stderr}; logs: {json_logs}"
)
raise self.SandboxRuntimeException(
f"Sandbox failed to start: {e} (turn on debug mode to see more information); stderr: {stderr}; logs: {json_logs}"
)
if not os.path.isfile(started_marker_path):
raise self.SandboxRuntimeException(
f"Sandbox failed to start up properly"
)
exit_code_path = os.path.join(self._sandbox_shared_path, "exit_code")
if not os.path.isfile(exit_code_path):
raise self.SandboxRuntimeException(
"Sandbox failed to record an exit code"
)
with open(exit_code_path, "r") as exit_code_f:
exit_code_str = exit_code_f.read()
try:
exit_code = int(exit_code_str.strip())
except ValueError as e:
raise self.SandboxRuntimeException(
f"Sandbox recorded non-integer exit code: {e}"
)
if exit_code != 0:
raise self.CodeExecutionError(
code=self._code,
returncode=exit_code,
cmd=self._sandboxed_command,
output=result.stdout,
stderr=result.stderr,
)
return result
finally:
if self._switcheroo is not None:
self._switcheroo.cleanup()
def run(self) -> subprocess.CompletedProcess:
"""
Set up and run the sandbox in a separate process.
:return: A `CompletedProcess` object representing the return code and stdout/stderr of the code interpreter.
:raises FixableException: If an issue occurs but that can be fixed by the user.
:raises Sandbox.SandboxRuntimeException: If the sandbox failed to start or behaved incorrectly regardless of the code being evaluated.
:raises Sandbox.ExecutionTimeoutError: If the code interpreter ran for longer than configured.
:raises Sandbox.InterruptedExecutionError: If the code interpreter died without providing a return code; usually due to running over resource limits.
:raises sandbox.CodeExecutionError: If the code interpreter failed to execute the given code. This does not represent a sandbox failure.
"""
new_env = os.environ.copy()
new_env[self._MARKER_ENVIRONMENT_VARIABLE] = "1"
data = json.dumps({"settings": self._settings})
try:
result = subprocess.run(
(sys.executable, os.path.abspath(__file__)),
env=new_env,
input=data,
text=True,
capture_output=True,
check=True,
)
except subprocess.CalledProcessError as e:
raise self.SandboxRuntimeException(f"{e} (stderr: {e.stderr})")
else:
if not result.stdout:
raise self.SandboxRuntimeException(
f"Subprocess interpreter did not produce any output (stderr: {result.stderr})"
)
try:
output = json.loads(result.stdout)
except json.decoder.JSONDecodeError as e:
raise self.SandboxRuntimeException(
f"Subprocess interpreter produced invalid JSON (stdout: {result.stdout})"
)
if "exception" in output:
class_name = output["exception"]["name"]
found_class = None
for ex_class in (
self.PlatformNotSupportedException,
self.SandboxRuntimeException,
self.CodeExecutionError,
self.ExecutionTimeoutError,
self.InterruptedExecutionError,
self.GVisorNotInstalledException,
self.CorruptDownloadException,
self.EnvironmentNeedsSetupException,
self.ExecutionError,
self.SandboxException,
):
if ex_class.__name__ == class_name:
found_class = ex_class
break
if found_class is None:
exception_str = output["exception"]["str"]
raise self.SandboxException(f"{class_name}: {exception_str}")
raise found_class(
*output["exception"]["args"], **output["exception"]["kwargs"]
)
if "result" not in output:
raise self.SandboxException(
f"Invalid response from subprocess: {output}"
)
return subprocess.CompletedProcess(
args=output["result"]["args"],
returncode=output["result"]["returncode"],
stdout=base64.b64decode(output["result"]["stdout"]).decode(
"utf-8", errors="replace"
),
stderr=base64.b64decode(output["result"]["stderr"]).decode(
"utf-8", errors="replace"
),
)
def debug_logs(self, write_fn: typing.Callable[[str, str], typing.Any]):
"""
Write debug logs and other system information to the given function.
May only be called after `run` returns, but may be called even when
`run` fails.
:param write_fn: A function that takes (filename, log line) as arguments.
"""
log_paths = []
all_logs = []
if os.path.isdir(self._logs_path):
for log_filename in sorted(os.listdir(self._logs_path)):
if not log_filename.endswith(".txt"):
continue
log_paths.append(os.path.join(self._logs_path, log_filename))
else:
all_logs.append(
(
"[meta]",
f"Logs path {self._logs_path} does not exist or is not a directory.",
)
)
log_paths.extend(self._EXTRA_DEBUG_LOG_PATHS)
runsc_filenames = set()
for log_path in log_paths:
log_filename = os.path.basename(log_path)
if log_filename.startswith("runsc.log.") and log_filename.endswith(".txt"):
log_filename = "runsc." + self._LOG_FILENAME_TRUNCATE_RE.sub(
"", log_filename
).removesuffix(".txt")
if log_filename in runsc_filenames:
runsc_filename_suffix = 2
while f"{log_filename}.{runsc_filename_suffix}" in runsc_filenames:
runsc_filename_suffix += 1
log_filename = f"{log_filename}.{runsc_filename_suffix}"
runsc_filenames.add(log_filename)
try:
with open(log_path, "rb") as log_f:
for log_line in log_f:
log_line = log_line.replace(b"\x00", b"\n").rstrip()
if not log_line:
continue
for line in log_line.split(b"\n"):
all_logs.append(
(log_filename, line.decode("utf-8", errors="replace"))
)
except Exception as e:
all_logs.append((log_filename, f"Failed to open: {e}"))
for cmd in self._EXTRA_DEBUG_LOG_COMMANDS:
cmd_str = "`" + " ".join(cmd) + "`"
try:
result = subprocess.run(cmd, capture_output=True, timeout=1, check=True)
except subprocess.CalledProcessError as e:
all_logs.append(
(cmd_str, f"Failed: {e} (stdout={e.stdout}, stderr={e.stderr})")
)
except Exception as e:
all_logs.append((cmd_str, f"Failed: {e}"))
else:
for line in result.stdout.replace(b"\x00", b"\n").split(b"\n"):
line = line.rstrip()
if not line:
continue
all_logs.append((cmd_str, line.decode("utf-8", errors="replace")))
for filename, log_entry in all_logs:
write_fn(filename, log_entry)
_SAMPLE_BASH_INSTRUCTIONS = (
"echo 'Hello from the sandbox!'",
"date",
"dmesg",
"echo 'Bye from the sandbox!'",
)
_SAMPLE_PYTHON_INSTRUCTIONS = (
"print('Hello from the sandbox!')",
"import datetime, sys",
"print('Current date and time:', datetime.datetime.now())",
"sys.stdout.flush()",
"import shutil, subprocess",
"subprocess.run([shutil.which('dmesg')], check=True)",
"print('Bye from the sandbox!')",
)
def _do_self_tests(debug):
user_storage_path = None
def _want_generated_files(want_generated_files):
def _verify(got_generated_files):
try:
for name in want_generated_files.keys():
if name not in got_generated_files:
raise ValueError(f"File {name} is missing")
for name in got_generated_files.keys():
if name not in want_generated_files:
raise ValueError(f"Unexpected generated file: {name}")
for name, predicates in want_generated_files.items():
if type(predicates) not in (type(()), type([])):
predicates = (predicates,)
for i, predicate in enumerate(predicates):
try:
result = predicate(got_generated_files[name])
if result is not None:
raise ValueError(result)
except ValueError as e:
raise e.__class__(
f"File {name} predicate #{i}: {e} (got: {got_generated_files[name]})"
)
except ValueError as e:
return e
else:
return None
return _verify
def _markdown_contains(substring):
return lambda md: (
f"substring '{substring}' not found" if substring not in md else None
)
def _markdown_does_not_contain(substring):
return lambda md: (
f"substring '{substring}' unexpectedly found" if substring in md else None
)
def _want_user_storage_num_files(num_files_predicate):
if type(num_files_predicate) is type(42):
want_num_files = num_files_predicate
num_files_predicate = lambda n: n == want_num_files
def _verify():
total_files = 0
for _, _, subfiles in os.walk(user_storage_path):
total_files += len(list(f for f in subfiles if f != ".lock"))
if not num_files_predicate(total_files):
raise ValueError(f"User storage unexpectedly has {total_files} files")
return _verify
_self_tests = (
{
"name": "simple_python",
"language": "python",
"code": _SAMPLE_PYTHON_INSTRUCTIONS,
"debug": True,
"status": "OK",
},
{
"name": "simple_bash",
"language": "bash",
"code": _SAMPLE_BASH_INSTRUCTIONS,
"debug": True,
"status": "OK",
"post": _want_user_storage_num_files(0),
},
{
"name": "bad_syntax_python",
"language": "python",
"code": ("print('foo",),
"debug": True,
"status": "ERROR",
},
{
"name": "bad_syntax_bash",
"language": "bash",
"code": ("echo 'foo",),
"debug": True,
"status": "ERROR",
},
{
"name": "simple_file_creation",
"language": "bash",
"code": ("echo 'Hello world!' > today.txt",),
"status": "OK",
"generated_files": {
"today.txt": (
_markdown_contains("today.txt"),
_markdown_contains("Hello world!\n"),
)
},
"post": _want_user_storage_num_files(1),
},
{
"name": "generated_file_rendering",
"language": "bash",
"code": (
"mkdir -p foo/bar",
"dmesg > foo/bar/dmesg.txt",
"touch empty.txt",
"(echo '# Example markdown file'; echo 'This is markdown!') > markdown.md",
"echo 'This file cannot `''`''` be inlined' > no_inline.txt",
"echo 'Might be an image' > picture.png",
"echo 'Might be music' > song.flac",
"echo 'Might be a video' > movie.mkv",
"echo 'Might be random data' > random_data",
"echo 'Might be random data' > random_data.bin",
),
"valves": {
"WEB_ACCESSIBLE_DIRECTORY_URL": "http://myhost/URL/ROOT",
},
"status": "OK",
"generated_files": {
"foo/bar/dmesg.txt": (
_markdown_contains("📄"),
_markdown_contains("[foo/bar/dmesg.txt]"),
_markdown_contains("(http://myhost/URL/ROOT/"),
_markdown_contains("/foo/bar/dmesg.txt)"),
_markdown_contains("gVisor"),
_markdown_contains("```"),
),
"empty.txt": (
_markdown_contains("⁉️"),
_markdown_does_not_contain("📄"),
_markdown_contains("`empty.txt`"),
_markdown_contains("(empty)"),
_markdown_does_not_contain("[empty.txt]"),
_markdown_does_not_contain("(http://myhost/URL/ROOT"),
),
"markdown.md": (
_markdown_contains("📃"),
_markdown_does_not_contain("📄"),
_markdown_contains(
"\n> # Example markdown file\n> This is markdown!"
),
_markdown_does_not_contain("```"),
),
"no_inline.txt": (
_markdown_contains("📄"),
_markdown_does_not_contain("This file cannot"),
_markdown_does_not_contain("```"),
),
"picture.png": (
_markdown_contains("🖼️"),
_markdown_does_not_contain("Might be"),
),
"song.flac": (
_markdown_contains("🎵"),
_markdown_does_not_contain("Might be"),
),
"movie.mkv": (
_markdown_contains("🎬"),
_markdown_does_not_contain("Might be"),
),
"random_data": (
_markdown_contains("💾"),
_markdown_does_not_contain("Might be"),
),
"random_data.bin": (
_markdown_contains("💾"),
_markdown_does_not_contain("Might be"),
),
},
"post": _want_user_storage_num_files(10),
},
{
"name": "generated_text_files_too_large_to_render",
"language": "bash",
"code": (
f"yes boop | head -{_Action._UserFile.MAX_INLINE_TEXT_LINES-1} > ok_lines.md",
f"yes boop | head -{_Action._UserFile.MAX_INLINE_TEXT_LINES-1} > ok_lines.txt",
f"yes boop | head -{_Action._UserFile.MAX_INLINE_TEXT_LINES+1} > too_many_lines.md",
f"yes boop | head -{_Action._UserFile.MAX_INLINE_TEXT_LINES+1} > too_many_lines.txt",
f"yes boop | tr '\\n' ' ' | head -c{_Action._UserFile.MAX_INLINE_TEXT_BYTES-1} > ok_bytes.txt",
f"yes boop | tr '\\n' ' ' | head -c{_Action._UserFile.MAX_INLINE_TEXT_BYTES+1} > too_many_bytes.txt",
),
"status": "OK",
"generated_files": {
"ok_lines.md": _markdown_contains("boop"),
"ok_lines.txt": _markdown_contains("boop"),
"too_many_lines.md": _markdown_does_not_contain("boop"),
"too_many_lines.txt": _markdown_does_not_contain("boop"),
"ok_bytes.txt": _markdown_contains("boop"),
"too_many_bytes.txt": _markdown_does_not_contain("boop"),
},
"post": _want_user_storage_num_files(16),
},
{
"name": "too_large_file_for_user_quota",
"language": "bash",
"code": (f"head -c{64 * 1024 * 1024} /dev/urandom > random_data.bin",),
"valves": {
"MAX_MEGABYTES_PER_USER": 32,
},
"status": "STORAGE_ERROR",
"post": _want_user_storage_num_files(16),
},
{
"name": "too_many_files_generated_in_single_execution",
"language": "bash",
"code": ("for i in $(seq 1 128); do touch file$i.txt; done",),
"valves": {
"MAX_FILES_PER_EXECUTION": 64,
},
"status": "STORAGE_ERROR",
"post": _want_user_storage_num_files(16),
},
{
"name": "too_many_files_for_user_quota",
"language": "bash",
"code": ("for i in $(seq 1 128); do touch file$i.txt; done",),
"valves": {
"MAX_FILES_PER_EXECUTION": 256,
"MAX_FILES_PER_USER": 64,
},
"status": "STORAGE_ERROR",
"post": _want_user_storage_num_files(16),
},
{
"name": "clear_out_existing_user_storage",
"language": "bash",
"code": ("for i in $(seq 1 128); do touch file$i.txt; done",),
"valves": {
"MAX_FILES_PER_EXECUTION": 256,
# Covers enough quota for the last test's files to not be
# deleted (6 of them), but all previous tests' files will.
# We add +2 which covers 2 of the previous-previous tests'
# files, but since files from one execution are either all
# deleted or not, this should not make a difference, so the
# extra +2 should not add to the expected number of files.
"MAX_FILES_PER_USER": 128 + 6 + 2,
},
"status": "OK",
"generated_files": {f"file{i}.txt": () for i in range(1, 128 + 1)},
# Verify that only this test + the last test's files are here:
"post": _want_user_storage_num_files(128 + 6),
},
{
"name": "long_running_code",
"language": "python",
"code": (
"import time",
"time.sleep(15)",
"print('Managed to sleep for 15 seconds.')",
),
"valves": {
"MAX_RUNTIME_SECONDS": 5,
},
"status": "TIMEOUT",
},
{
"name": "ram_hog",
"language": "python",
"code": (
"import time",
"f = open('/dev/urandom', 'rb')",
"s = []",
"for i in range(256): s.append(f.read(1024 * 1024))",
"time.sleep(1)",
"print('\\n'.join(line for line in open('/proc/self/status').read().split('\\n') if line.startswith('Vm')))",
"print('Managed to hog', len(s), 'megabytes.')",
),
"valves": {
"MAX_RAM_MEGABYTES": 128,
},
"status": "INTERRUPTED",
},
)
def _print_output(obj):
if obj.stdout:
print(" 🗨️ Output:", file=sys.stderr)
for stdout_line in obj.stdout.split("\n"):
print(f" {stdout_line}")
if obj.stderr:
print(" 🐞 Debug:", file=sys.stderr)
for stderr_line in obj.stderr.split("\n"):
print(f" {stderr_line}")
success = True
with tempfile.TemporaryDirectory(prefix="sandbox_self_test") as tmp_dir:
user_storage_path = os.path.join(tmp_dir, "user_storage")
os.makedirs(user_storage_path, mode=0o755)
valve_name_prefix = (
_Action.Valves()._VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX
)
for self_test in _self_tests:
name = self_test["name"]
language = self_test["language"]
code = "\n".join(self_test["code"]) + "\n"
want_status = self_test["status"]
valves = self_test.get("valves", {})
want_generated_files = self_test.get("generated_files", {})
if type(want_generated_files) is type({}):
want_generated_files = _want_generated_files(want_generated_files)
post_tests = self_test.get("post", ())
if type(post_tests) not in (type(()), type([])):
post_tests = (post_tests,)
test_env = os.environ.copy()
test_env[f"{valve_name_prefix}WEB_ACCESSIBLE_DIRECTORY_PATH"] = (
user_storage_path
)
for valve_name, valve_value in valves.items():
test_env[f"{valve_name_prefix}{valve_name}"] = str(valve_value)
test_argv = [
sys.executable,
os.path.abspath(__file__),
f"--language={language}",
]
if debug or self_test.get("debug", False):
test_argv.append("--debug")
print(f"⏳ Running self-test: {name}", file=sys.stderr)
try:
result = subprocess.run(
test_argv,
env=test_env,
input=code,
text=True,
capture_output=True,
timeout=20,
check=True,
)
except subprocess.CalledProcessError as e:
success = False
_print_output(e)
print(
f"❌ Self-test {name} failed: process failed: {e}", file=sys.stderr
)
except Exception as e:
success = False
exception_class = e.__class__
print(
f"❌ Self-test {name} failed: {exception_class}: {e}",
file=sys.stderr,
)
else:
try:
result_data = json.loads(result.stdout)
except json.decoder.JSONDecodeError as e:
_print_output(result)
success = False
print(
f"❌ Self-test {name} failed: JSON decoding failed: {e}; got: {result.stdout}",
file=sys.stderr,
)
else:
if debug:
_print_output(result)
got_status = result_data["status"]
got_generated_files = result_data.get("generated_files", {})
if got_status != want_status:
success = False
print(
f"❌ Self-test {name} failed: status was {got_status}, expected {want_status}",
file=sys.stderr,
)
elif want_generated_files(got_generated_files) is not None:
success = False
generated_files_error = want_generated_files(
got_generated_files
)
print(
f"❌ Self-test {name} failed: generated files are incorrect: {generated_files_error}",
file=sys.stderr,
)
else:
post_test_failure = None
for post_test in post_tests:
try:
post_test()
except Exception as e:
post_test_failure = e
break
if post_test_failure is not None:
success = False
print(
f"❌ Self-test {name} failed: post-test verification failed: {post_test_failure}",
file=sys.stderr,
)
else:
print(f"✔️ Self-test {name} passed.", file=sys.stderr)
if success:
print("✅ All self-tests passed, good go to!", file=sys.stderr)
sys.exit(0)
else:
print("☠️ One or more self-tests failed.", file=sys.stderr)
sys.exit(1)
assert False, "Unreachable"
# Debug utility: Run code from stdin if running as a normal Python script.
if __name__ == "__main__":
Sandbox.maybe_main()
parser = argparse.ArgumentParser(
description="Run arbitrary code in a gVisor sandbox."
)
parser.add_argument(
"--language",
choices=("python", "bash"),
default="python",
help="Language of the code to run.",
)
parser.add_argument(
"--use_sample_code",
action="store_true",
default=False,
help="Run sample code for the given language; otherwise, read code from stdin.",
)
parser.add_argument(
"--self_test",
action="store_true",
default=False,
help="Run series of self-tests.",
)
parser.add_argument(
"--debug", action="store_true", default=False, help="Enable debug mode."
)
args = parser.parse_args()
if args.debug:
os.environ[
_Action.Valves()._VALVE_OVERRIDE_ENVIRONMENT_VARIABLE_NAME_PREFIX + "DEBUG"
] = "true"
if args.self_test:
_do_self_tests(args.debug)
if args.use_sample_code:
if args.language == "bash":
code = "\n".join(_SAMPLE_BASH_INSTRUCTIONS) + "\n"
else:
code = "\n".join(_SAMPLE_PYTHON_INSTRUCTIONS) + "\n"
else:
code = sys.stdin.read()
async def _local_run():
def _dummy_emitter(event):
print(f"Event: {event}", file=sys.stderr)
action = Action()
body = {
"messages": [
{
"role": "assistant",
"content": f"```{args.language}\n{code}\n```\n",
},
],
}
output = await action.action(body=body, __event_emitter__=_dummy_emitter)
print(output)
asyncio.run(_local_run())