diff --git a/runtimes/aleph-debian-12-node18/create_disk_image.sh b/runtimes/aleph-debian-12-node18/create_disk_image.sh new file mode 100644 index 000000000..297c6d34e --- /dev/null +++ b/runtimes/aleph-debian-12-node18/create_disk_image.sh @@ -0,0 +1,108 @@ +#!/bin/sh + +rm ./rootfs.squashfs + +set -euf + +rm -fr ./rootfs +mkdir ./rootfs + +debootstrap --variant=minbase bookworm ./rootfs http://deb.debian.org/debian/ + +chroot ./rootfs /bin/sh <> /etc/ssh/sshd_config +echo "PasswordAuthentication no" >> /etc/ssh/sshd_config +echo "ChallengeResponseAuthentication no" >> /etc/ssh/sshd_config +echo "PermitRootLogin yes" >> /etc/ssh/sshd_config + +mkdir -p /overlay + +# Set up a login terminal on the serial console (ttyS0): +ln -s agetty /etc/init.d/agetty.ttyS0 +echo ttyS0 > /etc/securetty +EOT + +# In case we want to pre-install the latest lts version (v18.18.0) +chroot ./rootfs /bin/bash < ./rootfs/etc/inittab +# /etc/inittab + +::sysinit:/sbin/init sysinit +::sysinit:/sbin/init boot +::wait:/sbin/init default + +# Set up a couple of getty's +tty1::respawn:/sbin/getty 38400 tty1 +tty2::respawn:/sbin/getty 38400 tty2 +tty3::respawn:/sbin/getty 38400 tty3 +tty4::respawn:/sbin/getty 38400 tty4 +tty5::respawn:/sbin/getty 38400 tty5 +tty6::respawn:/sbin/getty 38400 tty6 + +# Put a getty on the serial port +ttyS0::respawn:/sbin/getty -L ttyS0 115200 vt100 + +# Stuff to do for the 3-finger salute +::ctrlaltdel:/sbin/reboot + +# Stuff to do before rebooting +::shutdown:/sbin/init shutdown +EOT + +# Reduce size +rm -fr ./rootfs/root/.cache +rm -fr ./rootfs/var/cache +mkdir -p ./rootfs/var/cache/apt/archives/partial +rm -fr ./rootfs/usr/share/doc +rm -fr ./rootfs/usr/share/man +rm -fr ./rootfs/var/lib/apt/lists/ + +# Custom init +cp ./init0.sh ./rootfs/sbin/init +cp ./init1.py ./rootfs/root/init1.py +cp ./loading.html ./rootfs/root/loading.html +cp ./run.sh ./rootfs/root/run.sh +chmod +x ./rootfs/sbin/init +chmod +x ./rootfs/root/init1.py +chmod +x ./rootfs/root/run.sh + +mksquashfs ./rootfs/ ./rootfs.squashfs diff --git a/runtimes/aleph-debian-12-node18/init0.sh b/runtimes/aleph-debian-12-node18/init0.sh new file mode 100644 index 000000000..23e9be3b9 --- /dev/null +++ b/runtimes/aleph-debian-12-node18/init0.sh @@ -0,0 +1,59 @@ +#!/bin/sh + +set -euf + +mount -t proc proc /proc -o nosuid,noexec,nodev + +log() { + echo "$(awk '{print $1}' /proc/uptime)" '|S' "$@" +} +log "init0.sh is launching" + +# Switch root from read-only ext4 to to read-write overlay +mkdir -p /overlay +/bin/mount -t tmpfs -o noatime,mode=0755 tmpfs /overlay +mkdir -p /overlay/root /overlay/work +/bin/mount -o noatime,lowerdir=/,upperdir=/overlay/root,workdir=/overlay/work -t overlay "overlayfs:/overlay/root" /mnt +mkdir -p /mnt/rom +pivot_root /mnt /mnt/rom + +mount --move /rom/proc /proc +mount --move /rom/dev /dev + +mkdir -p /dev/pts +mkdir -p /dev/shm + +mount -t sysfs sys /sys -o nosuid,noexec,nodev +mount -t tmpfs run /run -o mode=0755,nosuid,nodev +#mount -t devtmpfs dev /dev -o mode=0755,nosuid +mount -t devpts devpts /dev/pts -o mode=0620,gid=5,nosuid,noexec +mount -t tmpfs shm /dev/shm -omode=1777,nosuid,nodev + +# Required by Docker +cgroupfs-mount +update-alternatives --set iptables /usr/sbin/iptables-legacy +update-alternatives --set ip6tables /usr/sbin/ip6tables-legacy + +# Enable the following to force the storage driver used by Docker. +# See https://docs.docker.com/storage/storagedriver/select-storage-driver/ +#echo '{\n"storage-driver": "overlay2"\n}\n' > /etc/docker/daemon.json + +# List block devices +lsblk + +#cat /proc/sys/kernel/random/entropy_avail + +# TODO: Move in init1 +mkdir -p /run/sshd +/usr/sbin/sshd & +log "SSH UP" + +log "Setup socat" +socat UNIX-LISTEN:/tmp/socat-socket,fork,reuseaddr VSOCK-CONNECT:2:53 & +log "Socat ready" + +export PATH=$PATH:/usr/local/bin:/usr/bin:/usr/sbin +export PYTHONPATH=/opt/aleph/libs + +# Replace this script with the manager +exec /root/init1.py diff --git a/runtimes/aleph-debian-12-node18/init1.py b/runtimes/aleph-debian-12-node18/init1.py new file mode 100644 index 000000000..60b699de8 --- /dev/null +++ b/runtimes/aleph-debian-12-node18/init1.py @@ -0,0 +1,567 @@ +#!/usr/bin/python3 -OO + +import logging + +logging.basicConfig( + level=logging.DEBUG, + format="%(relativeCreated)4f |V %(levelname)s | %(message)s", +) +logger = logging.getLogger(__name__) + +logger.debug("Imports starting") + +import ctypes +import asyncio +import os +import socket +from enum import Enum +import subprocess +import sys +import traceback +from contextlib import redirect_stdout +from dataclasses import dataclass, field +from io import StringIO +from os import system +from shutil import make_archive +from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable + +import aiohttp +import msgpack + +logger.debug("Imports finished") + +ASGIApplication = NewType("AsgiApplication", Any) + + +class Encoding(str, Enum): + plain = "plain" + zip = "zip" + squashfs = "squashfs" + + +class Interface(str, Enum): + asgi = "asgi" + executable = "executable" + + +class ShutdownException(Exception): + pass + + +@dataclass +class Volume: + mount: str + device: str + read_only: bool + + +@dataclass +class ConfigurationPayload: + code: bytes + encoding: Encoding + entrypoint: str + input_data: bytes + interface: Interface + vm_hash: str + ip: Optional[str] = None + route: Optional[str] = None + dns_servers: List[str] = field(default_factory=list) + volumes: List[Volume] = field(default_factory=list) + variables: Optional[Dict[str, str]] = None + + +@dataclass +class RunCodePayload: + scope: Dict + + +# Open a socket to receive instructions from the host +s = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) +s.bind((socket.VMADDR_CID_ANY, 52)) +s.listen() + +# Send the host that we are ready +s0 = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) +s0.connect((2, 52)) +s0.close() + +# Configure aleph-client to use the guest API +os.environ["ALEPH_API_HOST"] = "http://localhost" +os.environ["ALEPH_API_UNIX_SOCKET"] = "/tmp/socat-socket" +os.environ["ALEPH_REMOTE_CRYPTO_HOST"] = "http://localhost" +os.environ["ALEPH_REMOTE_CRYPTO_UNIX_SOCKET"] = "/tmp/socat-socket" + +logger.debug("init1.py is launching") + + +def setup_hostname(hostname: str): + os.environ["ALEPH_ADDRESS_TO_USE"] = hostname + system(f"hostname {hostname}") + + +def setup_variables(variables: Optional[Dict[str, str]]): + if variables is None: + return + for key, value in variables.items(): + os.environ[key] = value + + +def setup_network( + ip: Optional[str], route: Optional[str], dns_servers: Optional[List[str]] = None +): + """Setup the system with info from the host.""" + dns_servers = dns_servers or [] + if not os.path.exists("/sys/class/net/eth0"): + logger.info("No network interface eth0") + return + + if not ip: + logger.info("No network IP") + return + + logger.debug("Setting up networking") + system("ip addr add 127.0.0.1/8 dev lo brd + scope host") + system("ip addr add ::1/128 dev lo") + system("ip link set lo up") + + if "/" in ip: + # Forward compatibility with future supervisors that pass the mask with the IP. + system(f"ip addr add {ip} dev eth0") + else: + logger.warning("Not passing the mask with the IP is deprecated and will be unsupported") + system(f"ip addr add {ip}/24 dev eth0") + + system("ip link set eth0 up") + + if route: + system(f"ip route add default via {route} dev eth0") + logger.debug("IP and route set") + else: + logger.warning("IP set with no network route") + + with open("/etc/resolv.conf", "wb") as resolvconf_fd: + for server in dns_servers: + resolvconf_fd.write(f"nameserver {server}\n".encode()) + + +def setup_input_data(input_data: bytes): + logger.debug("Extracting data") + if input_data: + # Unzip in /data + if not os.path.exists("/opt/input.zip"): + open("/opt/input.zip", "wb").write(input_data) + os.makedirs("/data", exist_ok=True) + os.system("unzip -q /opt/input.zip -d /data") + + +def setup_volumes(volumes: List[Volume]): + for volume in volumes: + logger.debug(f"Mounting /dev/{volume.device} on {volume.mount}") + os.makedirs(volume.mount, exist_ok=True) + if volume.read_only: + system(f"mount -t squashfs -o ro /dev/{volume.device} {volume.mount}") + else: + system(f"mount -o rw /dev/{volume.device} {volume.mount}") + + system("mount") + + +def setup_code_asgi( + code: bytes, encoding: Encoding, entrypoint: str +) -> ASGIApplication: + # Allow importing packages from /opt/packages + sys.path.append("/opt/packages") + + logger.debug("Extracting code") + if encoding == Encoding.squashfs: + sys.path.append("/opt/code") + module_name, app_name = entrypoint.split(":", 1) + logger.debug("import module") + module = __import__(module_name) + for level in module_name.split(".")[1:]: + module = getattr(module, level) + app: ASGIApplication = getattr(module, app_name) + elif encoding == Encoding.zip: + # Unzip in /opt and import the entrypoint from there + if not os.path.exists("/opt/archive.zip"): + open("/opt/archive.zip", "wb").write(code) + logger.debug("Run unzip") + os.system("unzip -q /opt/archive.zip -d /opt") + sys.path.append("/opt") + module_name, app_name = entrypoint.split(":", 1) + logger.debug("import module") + module = __import__(module_name) + for level in module_name.split(".")[1:]: + module = getattr(module, level) + app: ASGIApplication = getattr(module, app_name) + elif encoding == Encoding.plain: + # Execute the code and extract the entrypoint + locals: Dict[str, Any] = {} + exec(code, globals(), locals) + app: ASGIApplication = locals[entrypoint] + else: + raise ValueError(f"Unknown encoding '{encoding}'") + return app + + +def setup_code_executable( + code: bytes, encoding: Encoding, entrypoint: str +) -> subprocess.Popen: + logger.debug("Extracting code") + if encoding == Encoding.squashfs: + path = f"/opt/code/{entrypoint}" + if not os.path.isfile(path): + os.system("find /opt/code/") + raise FileNotFoundError(f"No such file: {path}") + os.system(f"chmod +x {path}") + elif encoding == Encoding.zip: + open("/opt/archive.zip", "wb").write(code) + logger.debug("Run unzip") + os.makedirs("/opt/code", exist_ok=True) + os.system("unzip /opt/archive.zip -d /opt/code") + path = f"/opt/code/{entrypoint}" + if not os.path.isfile(path): + os.system("find /opt/code") + raise FileNotFoundError(f"No such file: {path}") + os.system(f"chmod +x {path}") + elif encoding == Encoding.plain: + os.makedirs("/opt/code", exist_ok=True) + path = f"/opt/code/executable {entrypoint}" + open(path, "wb").write(code) + os.system(f"chmod +x {path}") + else: + raise ValueError(f"Unknown encoding '{encoding}'. This should never happen.") + + stdout = open("log_stdout.txt", "w") + stderr = open("log_stderr.txt", "w") + + # note: https://medium.com/python-pandemonium/a-trap-of-shell-true-in-the-subprocess-module-6db7fc66cdfd + process = subprocess.run(f"/root/run.sh {path}", stdout=stdout, stderr=stderr, shell=True) + + return process + +def setup_code( + code: bytes, encoding: Encoding, entrypoint: str, interface: Interface +) -> Union[ASGIApplication, subprocess.Popen]: + + if interface == Interface.asgi: + return setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint) + elif interface == Interface.executable: + return setup_code_executable( + code=code, encoding=encoding, entrypoint=entrypoint + ) + else: + raise ValueError("Invalid interface. This should never happen.") + + + +async def run_python_code_http( + application: ASGIApplication, scope: dict +) -> Tuple[Dict, Dict, str, Optional[bytes]]: + + logger.debug("Running code") + with StringIO() as buf, redirect_stdout(buf): + # Execute in the same process, saves ~20ms than a subprocess + + # The body should not be part of the ASGI scope itself + body: bytes = scope.pop("body") + + async def receive(): + type_ = ( + "http.request" + if scope["type"] in ("http", "websocket") + else "aleph.message" + ) + return {"type": type_, "body": body, "more_body": False} + + send_queue: asyncio.Queue = asyncio.Queue() + + async def send(dico): + await send_queue.put(dico) + + # TODO: Better error handling + logger.debug("Awaiting application...") + await application(scope, receive, send) + + logger.debug("Waiting for headers") + headers: Dict + if scope["type"] == "http": + headers = await send_queue.get() + else: + headers = {} + + logger.debug("Waiting for body") + body: Dict = await send_queue.get() + + logger.debug("Waiting for buffer") + output = buf.getvalue() + + logger.debug(f"Headers {headers}") + logger.debug(f"Body {body}") + logger.debug(f"Output {output}") + + logger.debug("Getting output data") + output_data: bytes + if os.path.isdir("/data") and os.listdir("/data"): + make_archive("/opt/output", "zip", "/data") + with open("/opt/output.zip", "rb") as output_zipfile: + output_data = output_zipfile.read() + else: + output_data = b"" + + logger.debug("Returning result") + return headers, body, output, output_data + + +async def make_request(session, scope): + async with session.request( + scope["method"], + url="http://localhost:8080{}".format(scope["path"]), + params=scope["query_string"], + headers=[(a.decode("utf-8"), b.decode("utf-8")) for a, b in scope["headers"]], + data=scope.get("body", None), + ) as resp: + headers = { + "headers": [ + (a.encode("utf-8"), b.encode("utf-8")) for a, b in resp.headers.items() + ], + "status": resp.status, + } + body = {"body": await resp.content.read()} + return headers, body + + +async def run_executable_http(scope: dict) -> Tuple[Dict, Dict, str, Optional[bytes]]: + logger.debug("Calling localhost") + + tries = 0 + headers = None + body = None + + timeout = aiohttp.ClientTimeout(total=5) + async with aiohttp.ClientSession(timeout=timeout) as session: + while not body: + try: + tries += 1 + headers, body = await make_request(session, scope) + except aiohttp.ClientConnectorError: + if tries > 20: + raise + await asyncio.sleep(0.05) + + output = "" # Process stdout is not captured per request + output_data = None + logger.debug("Returning result") + return headers, body, output, output_data + + +async def process_instruction( + instruction: bytes, + interface: Interface, + application: Union[ASGIApplication, subprocess.Popen], +) -> AsyncIterable[bytes]: + + if instruction == b"halt": + logger.info("Received halt command") + system("sync") + logger.debug("Filesystems synced") + if isinstance(application, subprocess.Popen): + application.terminate() + logger.debug("Application terminated") + # application.communicate() + else: + # Close the cached session in aleph_client: + from aleph_client.asynchronous import get_fallback_session + + session: aiohttp.ClientSession = get_fallback_session() + await session.close() + logger.debug("Aiohttp cached session closed") + yield b"STOP\n" + logger.debug("Supervisor informed of halt") + raise ShutdownException + elif instruction.startswith(b"!"): + # Execute shell commands in the form `!ls /` + msg = instruction[1:].decode() + try: + process_output = subprocess.check_output( + msg, stderr=subprocess.STDOUT, shell=True + ) + yield process_output + except subprocess.CalledProcessError as error: + yield str(error).encode() + b"\n" + error.output + else: + # Python + logger.debug("msgpack.loads (") + msg_ = msgpack.loads(instruction, raw=False) + logger.debug("msgpack.loads )") + payload = RunCodePayload(**msg_) + + output: Optional[str] = None + try: + headers: Dict + body: Dict + output_data: Optional[bytes] + + if interface == Interface.asgi: + headers, body, output, output_data = await run_python_code_http( + application=application, scope=payload.scope + ) + elif interface == Interface.executable: + headers, body, output, output_data = await run_executable_http( + scope=payload.scope + ) + else: + raise ValueError("Unknown interface. This should never happen") + + result = { + "headers": headers, + "body": body, + "output": output, + "output_data": output_data, + } + yield msgpack.dumps(result, use_bin_type=True) + except Exception as error: + yield msgpack.dumps( + { + "error": str(error), + "traceback": str(traceback.format_exc()), + "output": output, + } + ) + + +def receive_data_length(client) -> int: + """Receive the length of the data to follow.""" + buffer = b"" + for _ in range(9): + byte = client.recv(1) + if byte == b"\n": + break + else: + buffer += byte + return int(buffer) + + +def load_configuration(data: bytes) -> ConfigurationPayload: + msg_ = msgpack.loads(data, raw=False) + msg_["volumes"] = [Volume(**volume_dict) for volume_dict in msg_.get("volumes")] + return ConfigurationPayload(**msg_) + + +def receive_config(client) -> ConfigurationPayload: + length = receive_data_length(client) + data = b"" + while len(data) < length: + data += client.recv(1024 * 1024) + return load_configuration(data) + + +def setup_system(config: ConfigurationPayload): + setup_hostname(config.vm_hash) + setup_variables(config.variables) + setup_volumes(config.volumes) + setup_network(config.ip, config.route, config.dns_servers) + setup_input_data(config.input_data) + logger.debug("Setup finished") + + +def umount_volumes(volumes: List[Volume]): + "Umount user related filesystems" + system("sync") + for volume in volumes: + logger.debug(f"Umounting /dev/{volume.device} on {volume.mount}") + system(f"umount {volume.mount}") + + +async def main(): + client, addr = s.accept() + + logger.debug("Receiving setup...") + config = receive_config(client) + setup_system(config) + + try: + app: Union[ASGIApplication, subprocess.Popen] = setup_code( + config.code, config.encoding, config.entrypoint, config.interface + ) + client.send(msgpack.dumps({"success": True})) + except Exception as error: + client.send( + msgpack.dumps( + { + "success": False, + "error": str(error), + "traceback": str(traceback.format_exc()), + } + ) + ) + logger.exception("Program could not be started") + raise + + class ServerReference: + "Reference used to close the server from within `handle_instruction" + server: asyncio.AbstractServer + + server_reference = ServerReference() + + async def handle_instruction(reader, writer): + data = await reader.read(1000_1000) # Max 1 Mo + + logger.debug("Init received msg") + if logger.level <= logging.DEBUG: + data_to_print = f"{data[:500]}..." if len(data) > 500 else data + logger.debug(f"<<<\n\n{data_to_print}\n\n>>>") + + try: + async for result in process_instruction( + instruction=data, interface=config.interface, application=app + ): + writer.write(result) + await writer.drain() + + logger.debug("Instruction processed") + except ShutdownException: + logger.info("Initiating shutdown") + writer.write(b"STOPZ\n") + await writer.drain() + logger.debug("Shutdown confirmed to supervisor") + server_reference.server.close() + logger.debug("Supervisor socket server closed") + finally: + writer.close() + + server = await asyncio.start_server(handle_instruction, sock=s) + server_reference.server = server + + addr = server.sockets[0].getsockname() + print(f"Serving on {addr}") + + try: + async with server: + await server.serve_forever() + except asyncio.CancelledError: + logger.debug("Server was properly cancelled") + finally: + logger.warning("System shutdown") + server.close() + logger.debug("Server closed") + umount_volumes(config.volumes) + logger.debug("User volumes unmounted") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + asyncio.run(main()) + + logger.info("Unmounting system filesystems") + system("umount /dev/shm") + system("umount /dev/pts") + system("umount -a") + + logger.info("Sending reboot syscall") + # Send reboot syscall, see man page + # https://man7.org/linux/man-pages/man2/reboot.2.html + libc = ctypes.CDLL(None) + libc.syscall(169, 0xFEE1DEAD, 672274793, 0x4321FEDC, None) + # The exit should not happen due to system halt. + sys.exit(0) diff --git a/runtimes/aleph-debian-12-node18/run.sh b/runtimes/aleph-debian-12-node18/run.sh new file mode 100644 index 000000000..fdcaaa225 --- /dev/null +++ b/runtimes/aleph-debian-12-node18/run.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +cd /opt/code + +NVM=/root/.nvm/nvm.sh +NVMRC=/opt/code/.nvmrc +VERSION=v18.18.0 + +if [[ -f "$NVM" ]]; then + . $NVM +fi + +if [[ -f "$NVMRC" ]]; then + VERSION="$(cat $NVMRC)" +fi + +echo $VERSION + +nvm install $VERSION +nvm use $VERSION + +if [[ $1 == *.js ]]; then + node $1 & +else + exec $1 & +fi