diff --git a/scripts/lib/__init__.py b/scripts/lib/__init__.py new file mode 100644 index 0000000..921ad97 --- /dev/null +++ b/scripts/lib/__init__.py @@ -0,0 +1,8 @@ +# Copyright 2025 © Institute of Software, CAS. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Supported architectures (arch used in kernel) +SUPPORT_ARCHS = ["arm64", "x86_64", "riscv"] + +# Map arch used in linux kernel to arch understandable for Rust +MAP_RUST_ARCH = {"arm64": "aarch64", "x86_64": "x86_64", "riscv": "riscv64"} diff --git a/scripts/lib/kernel_source.py b/scripts/lib/kernel_source.py new file mode 100644 index 0000000..998a2ea --- /dev/null +++ b/scripts/lib/kernel_source.py @@ -0,0 +1,164 @@ +# Copyright 2025 © Institute of Software, CAS. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import os +import re +import tarfile +import requests +import subprocess +import tempfile +from lib import SUPPORT_ARCHS + +KERNEL_ORG_CDN = "https://cdn.kernel.org/pub/linux/kernel" + + +def prepare_source(args): + check_kernel_version(args.version) + + # Create `temp_dir` under `/tmp` + temp_dir = create_temp_dir(args.version) + + # Download kernel tarball from https://cdn.kernel.org/ + tarball = download_kernel(args.version, temp_dir) + + # Extract kernel source + src_dir = extract_kernel(tarball, temp_dir) + + # If arch is not provided, install headers for all supported archs + if args.arch is None: + for arch in SUPPORT_ARCHS: + installed_header_path = install_headers( + src_dir=src_dir, + arch=arch, + install_path=args.install_path, + ) + else: + installed_header_path = install_headers( + src_dir=src_dir, + arch=args.arch, + install_path=args.install_path, + ) + + print(f"\nSuccessfully installed kernel headers to {installed_header_path}") + return installed_header_path + + +def check_kernel_version(version): + """ + Validate if the input kernel version exists in remote. Supports both X.Y + (namely X.Y.0 and .0 should be omitted) and X.Y.Z formats + """ + # Validate version format + if not re.match(r"^\d+\.\d+(\.\d+)?$", version): + raise ValueError("Invalid version format. Use X.Y or X.Y.Z") + + main_ver = version.split(".")[0] + base_url = f"{KERNEL_ORG_CDN}/v{main_ver}.x/" + tarball = f"linux-{version}.tar.xz" + + try: + # Fetch content of `base_url` + response = requests.get(base_url, timeout=15) + response.raise_for_status() + + # Check for exact filename match + if tarball in response.text: + print(f"Kernel version {version} found in remote") + return + + raise RuntimeError(f"Kernel version {version} not found in remote") + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + raise RuntimeError(f"Kernel series v{main_ver}.x does not exist") + + raise RuntimeError(f"HTTP error ({e.response.status_code}): {str(e)}") + except requests.exceptions.Timeout: + raise RuntimeError("Connection timeout while checking version") + except requests.exceptions.RequestException as e: + raise RuntimeError(f"Network error: {str(e)}") + + +def create_temp_dir(version): + prefix = f"linux-{version}-source-" + try: + temp_dir = tempfile.TemporaryDirectory(prefix=prefix, dir="/tmp", delete=False) + return temp_dir.name + except OSError as e: + raise RuntimeError(f"Failed to create temp directory: {e}") from e + + +def download_kernel(version, temp_dir): + version_major = re.match(r"^(\d+)\.\d+(\.\d+)?$", version).group(1) + url = f"{KERNEL_ORG_CDN}/v{version_major}.x/linux-{version}.tar.xz" + tarball_path = os.path.join(temp_dir, f"linux-{version}.tar.xz") + print(f"Downloading {url} to {tarball_path}") + + try: + with requests.get(url, stream=True) as response: + response.raise_for_status() + total_size = int(response.headers.get("content-length", 0)) + downloaded = 0 + + with open(tarball_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + downloaded += len(chunk) + if total_size > 0: + progress = downloaded / total_size * 100 + print(f"\rDownloading: {progress:.1f}%", end="") + print() + return tarball_path + except Exception as e: + raise RuntimeError(f"Download failed: {e}") from e + + +def extract_kernel(tarball_path, temp_dir): + print("Extracting...") + try: + with tarfile.open(tarball_path, "r:xz") as tar: + tar.extractall(path=temp_dir) + extract_path = os.path.join( + temp_dir, f"{os.path.basename(tarball_path).split('.tar')[0]}" + ) + print(f"Extracted to {extract_path}") + return extract_path + except (tarfile.TarError, IOError) as e: + raise RuntimeError(f"Extraction failed: {e}") from e + + +def install_headers(src_dir, arch, install_path): + # If install_path is not provided, install to parent directory of src_dir to + # prevent messing up with extracted kernel source code + if install_path is None: + install_path = os.path.dirname(src_dir) + + try: + os.makedirs(install_path, exist_ok=True) + + abs_install_path = os.path.abspath( + os.path.join(install_path, f"{arch}_headers") + ) + print(f"Installing to {abs_install_path}") + result = subprocess.run( + [ + "make", + "-C", + f"{src_dir}", + f"ARCH={arch}", + f"INSTALL_HDR_PATH={abs_install_path}", + "headers_install", + ], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + print(result.stdout) + return install_path + + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"Header installation failed:\n{e.output}" + f"Temporary files kept at: {os.path.dirname(src_dir)}" + ) diff --git a/scripts/lib/kvm_bindings.py b/scripts/lib/kvm_bindings.py new file mode 100644 index 0000000..dc13c28 --- /dev/null +++ b/scripts/lib/kvm_bindings.py @@ -0,0 +1,129 @@ +# Copyright 2025 © Institute of Software, CAS. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import re +import os +import subprocess +from pathlib import Path +from lib.kernel_source import prepare_source +from lib import SUPPORT_ARCHS + + +KVM_BINDINGS_DIR = "kvm-bindings/src/" + + +def generate_kvm_bindings(args): + installed_header_path = prepare_source(args) + + # If arch is not provided, install headers for all supported archs + if args.arch is None: + for arch in SUPPORT_ARCHS: + generate_bindings( + installed_header_path, arch, args.attribute, args.output_path + ) + else: + generate_bindings( + installed_header_path, args.arch, args.attribute, args.output_path + ) + + +def generate_bindings( + installed_header_path: str, arch: str, attribute: str, output_path: str +): + try: + # Locate `kvm.h` of specific architecture + arch_headers = os.path.join(installed_header_path, f"{arch}_headers") + kvm_header = Path(os.path.join(arch_headers, f"include/linux/kvm.h")) + if not kvm_header.is_file(): + raise FileNotFoundError(f"KVM header missing at {kvm_header}") + + structs = capture_serde(arch) + if not structs: + raise RuntimeError( + f"No structs found for {arch}, you need to invoke this command under rustvmm/kvm repo root" + ) + + # Build bindgen-cli command with dynamic paths and custom attribute for + # structures + base_cmd = [ + "bindgen", + os.path.abspath(kvm_header), + "--impl-debug", + "--impl-partialeq", + "--with-derive-default", + "--with-derive-partialeq", + ] + + for struct in structs: + base_cmd += ["--with-attribute-custom-struct", f"{struct}={attribute}"] + + # Add include paths relative to source directory + base_cmd += ["--", f"-I{arch_headers}/include"] # Use absolute include path + + print(f"\nGenerating bindings for {arch}...") + bindings = subprocess.run( + base_cmd, check=True, capture_output=True, text=True, encoding="utf-8" + ).stdout + + print("Successfully generated bindings") + + output_file_path = f"{output_path}/{arch}/bindings.rs" + + print(f"Generating to: {output_file_path}") + + except subprocess.CalledProcessError as e: + err_msg = f"Bindgen failed (code {e.returncode})" + raise RuntimeError(err_msg) from e + except Exception as e: + raise RuntimeError(f"Generation failed: {str(e)}") from e + + try: + with open(output_file_path, "w") as f: + f.write(bindings) + + # Format with rustfmt + subprocess.run(["rustfmt", output_file_path], check=True) + print(f"Generation succeeded: {output_file_path}") + except subprocess.CalledProcessError: + raise RuntimeError("rustfmt formatting failed") + except IOError as e: + raise RuntimeError(f"File write error: {str(e)}") + + +def capture_serde(arch: str) -> list[str]: + """ + Parse serde implementations for specified architecture + """ + + # Locate `serialize.rs` of specific architecture + target_path = Path(f"{KVM_BINDINGS_DIR}/{arch}/serialize.rs") + + # Validate file existence + if not target_path.is_file(): + raise FileNotFoundError( + f"Serialization file not found for {arch}: {target_path}" + ) + + print(f"Extracting serde structs of {arch} from: {target_path}") + + content = target_path.read_text(encoding="utf-8") + + pattern = re.compile( + r"serde_impls!\s*\{\s*(?P.*?)\s*\}", re.DOTALL | re.MULTILINE + ) + + # Extract struct list from matched block + match = pattern.search(content) + if not match: + raise ValueError(f"No serde_impls! block found in {target_path}") + + struct_list = match.group("struct") + + structs = [] + for line in struct_list.splitlines(): + for word in line.split(): + clean_word = word.strip().rstrip(",") + if clean_word: + structs.append(clean_word) + + return structs diff --git a/scripts/lib/seccompiler.py b/scripts/lib/seccompiler.py new file mode 100644 index 0000000..017fda6 --- /dev/null +++ b/scripts/lib/seccompiler.py @@ -0,0 +1,80 @@ +# Copyright 2025 © Institute of Software, CAS. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import subprocess +import os +import re +from lib.kernel_source import prepare_source +from lib import MAP_RUST_ARCH, SUPPORT_ARCHS +from pathlib import Path + +SECCOMPILER_SYSCALL_DIR = "src/syscall_table" + + +def generate_seccompiler(args): + installed_header_path = prepare_source(args) + + # If arch is not provided, install headers for all supported archs + if args.arch is None: + for arch in SUPPORT_ARCHS: + generate_rust_code(installed_header_path, arch, args.output_path) + else: + generate_rust_code(installed_header_path, args.arch, args.output_path) + + +def generate_rust_code(installed_header_path: str, arch: str, output_path: str): + # Generate syscall table + arch_headers = os.path.join(installed_header_path, f"{arch}_headers") + syscall_header = Path(os.path.join(arch_headers, f"include/asm/unistd_64.h")) + if not syscall_header.is_file(): + raise FileNotFoundError(f"syscall headers missing at {syscall_header}") + syscalls = generate_syscall_table(syscall_header) + + arch = MAP_RUST_ARCH[arch] + output_file_path = f"{output_path}/{arch}.rs" + + """Generate Rust code and format with rustfmt""" + print(f"Generating to: {output_file_path}") + code = f"""use std::collections::HashMap; +pub(crate) fn make_syscall_table() -> HashMap<&'static str, i64> {{ + vec![ + {syscalls} + ].into_iter().collect() +}} +""" + try: + with open(output_file_path, "w") as f: + f.write(code) + + # Format with rustfmt + subprocess.run(["rustfmt", output_file_path], check=True) + print(f"Generation succeeded: {output_file_path}") + except subprocess.CalledProcessError: + raise RuntimeError("rustfmt formatting failed") + except IOError as e: + raise RuntimeError(f"File write error: {str(e)}") + + +def generate_syscall_table(syscall_header_path: str): + """Generate syscall table from specified header file""" + try: + with open(syscall_header_path, "r") as f: + syscalls = [] + pattern = re.compile(r"^#define __NR_(\w+)\s+(\d+)") + + for line in f: + line = line.strip() + if line.startswith("#define __NR_"): + match = pattern.match(line) + if match: + name = match.group(1) + num = int(match.group(2)) + syscalls.append((name, num)) + + # Sort alphabetically by syscall name + syscalls.sort(key=lambda x: x[0]) + syscall_list = [f'("{name}", {num}),' for name, num in syscalls] + return " ".join(syscall_list) + + except Exception as e: + raise RuntimeError(f"File processing failed: {str(e)}") diff --git a/scripts/rustvmm_gen.py b/scripts/rustvmm_gen.py new file mode 100755 index 0000000..c52677d --- /dev/null +++ b/scripts/rustvmm_gen.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +# +# Copyright 2025 © Institute of Software, CAS. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import os +from pathlib import Path +from lib.kernel_source import prepare_source +from lib.seccompiler import generate_seccompiler, SECCOMPILER_SYSCALL_DIR +from lib.kvm_bindings import generate_kvm_bindings, KVM_BINDINGS_DIR + + +def main(): + parser = argparse.ArgumentParser(prog="rustvmm_gen") + subparsers = parser.add_subparsers(dest="command", required=True) + parser.add_argument("--arch", help="Target architecture (x86_64, arm64, riscv)") + parser.add_argument("--version", required=True, help="Kernel version (e.g. 6.12.8)") + parser.add_argument( + "--install_path", + default=None, + help="Header installation directory path", + ) + parser.add_argument("--keep", help="Keep temporary build files") + + # Prepare subcommand + prepare_parser = subparsers.add_parser("prepare", help="Prepare kernel headers") + prepare_parser.set_defaults(func=prepare_source) + + # Generate seccompiler subcommand + generate_syscall_parser = subparsers.add_parser( + "generate_seccompiler", + help="Generate syscall for `rust-vmm/seccompiler` from prepared kernel headers", + ) + default_seccompiler_syscall_path_prefix = f"{os.getcwd()}/{SECCOMPILER_SYSCALL_DIR}" + generate_syscall_parser.add_argument( + "--output_path", + default=default_seccompiler_syscall_path_prefix, + help=f"Output directory path (default: {default_seccompiler_syscall_path_prefix})", + ) + generate_syscall_parser.set_defaults(func=generate_seccompiler) + + # Generate kvm-bindings subcommand + generate_kvm_bindings_parser = subparsers.add_parser( + "generate_kvm_bindings", + help="Generate bindings for `rust-vmm/kvm/kvm-bindings` from prepared kernel headers", + ) + default_kvm_bindings_path_prefix = f"{os.getcwd()}/{KVM_BINDINGS_DIR}" + generate_kvm_bindings_parser.add_argument( + "--output_path", + default=default_kvm_bindings_path_prefix, + help=f"Output directory path (default: {default_kvm_bindings_path_prefix})", + ) + generate_kvm_bindings_parser.add_argument( + "--attribute", + help=f"Custom attribute to be added for structures", + ) + generate_kvm_bindings_parser.set_defaults(func=generate_kvm_bindings) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main()