From c4206a8142ac6cab269b3539190c6fe193e23524 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:02:42 +0200 Subject: [PATCH 01/12] first attempt to make TargetPath 3.13 compatible --- dissect/target/helpers/compat/path_313.py | 510 ++++++++++++++++++++++ dissect/target/helpers/fsutil.py | 4 +- 2 files changed, 513 insertions(+), 1 deletion(-) create mode 100644 dissect/target/helpers/compat/path_313.py diff --git a/dissect/target/helpers/compat/path_313.py b/dissect/target/helpers/compat/path_313.py new file mode 100644 index 000000000..8e6b9d1e4 --- /dev/null +++ b/dissect/target/helpers/compat/path_313.py @@ -0,0 +1,510 @@ +"""A pathlib.Path compatible implementation for dissect.target. + +This allows for the majority of the pathlib.Path API to "just work" on dissect.target filesystems. + +Most of this consists of subclassed internal classes with dissect.target specific patches, +but sometimes the change to a function is small, so the entire internal function is copied +and only a small part changed. To ease updating this code, the order of functions, comments +and code style is kept largely the same as the original pathlib.py. + +Yes, we know, this is playing with fire and it can break on new CPython releases. + +The implementation is split up in multiple files, one for each CPython version. +You're currently looking at the CPython 3.13 implementation. + +Commit hash we're in sync with: a7aa7c41ebfce5bf537c939c8dfc0605adcfabd8 + +Notes: + - https://docs.python.org/3.13/whatsnew/3.13.html#pathlib + - https://github.com/python/cpython/blob/3.13/Lib/pathlib/_local.py +""" + +from __future__ import annotations + +import posixpath +import sys +from pathlib import Path, PurePath +from glob import _Globber +from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK +from typing import IO, TYPE_CHECKING, Optional + +from dissect.target import filesystem +from dissect.target.exceptions import FilesystemError, SymlinkRecursionError +from dissect.target.helpers import polypath +from dissect.target.helpers.compat.path_common import ( + io_open, + isjunction, + realpath, + scandir, +) + +if TYPE_CHECKING: + from dissect.target.filesystem import Filesystem, FilesystemEntry + from dissect.target.helpers.compat.path_common import _DissectScandirIterator + from dissect.target.helpers.fsutil import stat_result + + +class _DissectFlavour: + sep = "/" + altsep = "" + case_sensitive = False + + __variant_instances = {} + + def __new__(cls, case_sensitive: bool = False, alt_separator: str = ""): + idx = (case_sensitive, alt_separator) + instance = cls.__variant_instances.get(idx, None) + if instance is None: + instance = object.__new__(cls) + cls.__variant_instances[idx] = instance + + return instance + + def __init__(self, case_sensitive: bool = False, alt_separator: str = ""): + self.altsep = alt_separator + self.case_sensitive = case_sensitive + + def normcase(self, s: str) -> str: + return s if self.case_sensitive else s.lower() + + splitdrive = staticmethod(posixpath.splitdrive) + + def splitroot(self, part: str) -> tuple[str, str]: + return polypath.splitroot(part, alt_separator=self.altsep) + + def join(self, *args) -> str: + return polypath.join(*args, alt_separator=self.altsep) + + # NOTE: Fallback implementation from older versions of pathlib.py + def ismount(self, path: TargetPath) -> bool: + # Need to exist and be a dir + if not path.exists() or not path.is_dir(): + return False + + try: + parent_dev = path.parent.stat().st_dev + except FilesystemError: + return False + + dev = path.stat().st_dev + if dev != parent_dev: + return True + ino = path.stat().st_ino + parent_ino = path.parent.stat().st_ino + return ino == parent_ino + + isjunction = staticmethod(isjunction) + + samestat = staticmethod(posixpath.samestat) + + def isabs(self, path: str) -> bool: + return polypath.isabs(path, alt_separator=self.altsep) + + realpath = staticmethod(realpath) + + +# NOTE: ported from 3.13 Lib/glob.py +class _DissectGlobber(_Globber): + pass + + +class PureDissectPath(PurePath): + _fs: Filesystem + _flavour = _DissectFlavour(case_sensitive=False) + _globber = _DissectGlobber + + def __reduce__(self) -> tuple: + raise TypeError("TargetPath pickling is currently not supported") + + def __init__(self, fs: Filesystem, *pathsegments): + if not isinstance(fs, filesystem.Filesystem): + raise TypeError( + "invalid PureDissectPath initialization: missing filesystem, " + "got %r (this might be a bug, please report)" % pathsegments + ) + + alt_separator = fs.alt_separator + path_args = [] + for arg in pathsegments: + if isinstance(arg, str): + arg = polypath.normalize(arg, alt_separator=alt_separator) + path_args.append(arg) + + super().__init__(*path_args) + self._fs = fs + self._flavour = _DissectFlavour(alt_separator=fs.alt_separator, case_sensitive=fs.case_sensitive) + + def with_segments(self, *pathsegments) -> TargetPath: + return type(self)(self._fs, *pathsegments) + + # NOTE: This is copied from pathlib.py but turned into an instance method so we get access to the correct flavour + def _parse_path(self, path: str) -> tuple[str, str, list[str]]: + if not path: + return "", "", [] + sep = self._flavour.sep + altsep = self._flavour.altsep + if altsep: + path = path.replace(altsep, sep) + drv, root, rel = self._flavour.splitroot(path) + if not root and drv.startswith(sep) and not drv.endswith(sep): + drv_parts = drv.split(sep) + if len(drv_parts) == 4 and drv_parts[2] not in "?.": + # e.g. //server/share + root = sep + elif len(drv_parts) == 6: + # e.g. //?/unc/server/share + root = sep + parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] + return drv, root, parsed + + def is_reserved(self) -> bool: + """Return True if the path contains one of the special names reserved + by the system, if any.""" + return False + + +class TargetPath(Path, PureDissectPath): + __slots__ = ("_entry",) + + def get(self) -> FilesystemEntry: + try: + return self._entry + except AttributeError: + self._entry = self._fs.get(str(self)) + return self._entry + + def stat(self, *, follow_symlinks: bool = True) -> stat_result: + """ + Return the result of the stat() system call on this path, like + os.stat() does. + """ + if follow_symlinks: + return self.get().stat() + else: + return self.get().lstat() + + def exists(self, *, follow_symlinks: bool = True) -> bool: + """ + Whether this path exists. + + This method normally follows symlinks; to check whether a symlink exists, + add the argument follow_symlinks=False. + """ + try: + # .exists() must resolve possible symlinks + self.stat(follow_symlinks=follow_symlinks) + return True + except (FilesystemError, ValueError): + return False + + def is_dir(self) -> bool: + """ + Whether this path is a directory. + """ + try: + return self.get().is_dir() + except (FilesystemError, ValueError): + return False + + def is_file(self) -> bool: + """ + Whether this path is a regular file (also True for symlinks pointing + to regular files). + """ + try: + return self.get().is_file() + except (FilesystemError, ValueError): + return False + + def is_symlink(self) -> bool: + """ + Whether this path is a symbolic link. + """ + try: + return self.get().is_symlink() + except (FilesystemError, ValueError): + return False + + def is_block_device(self) -> bool: + """ + Whether this path is a block device. + """ + try: + return S_ISBLK(self.stat().st_mode) + except (FilesystemError, ValueError): + return False + + def is_char_device(self) -> bool: + """ + Whether this path is a character device. + """ + try: + return S_ISCHR(self.stat().st_mode) + except (FilesystemError, ValueError): + return False + + def is_fifo(self) -> bool: + """ + Whether this path is a FIFO. + """ + try: + return S_ISFIFO(self.stat().st_mode) + except (FilesystemError, ValueError): + return False + + def is_socket(self) -> bool: + """ + Whether this path is a socket. + """ + try: + return S_ISSOCK(self.stat().st_mode) + except (FilesystemError, ValueError): + return False + + def open( + self, + mode: str = "rb", + buffering: int = 0, + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> IO: + """Open file and return a stream. + + Supports a subset of features of the real pathlib.open/io.open. + + Note: in contrast to regular Python, the mode is binary by default. Text mode + has to be explicitly specified. Buffering is also disabled by default. + """ + return io_open(self, mode, buffering, encoding, errors, newline) + + def write_bytes(self, data: bytes) -> int: + """ + Open the file in bytes mode, write to it, and close the file. + """ + raise NotImplementedError("TargetPath.write_bytes() is unsupported") + + def write_text( + self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + ) -> int: + """ + Open the file in text mode, write to it, and close the file. + """ + raise NotImplementedError("TargetPath.write_text() is unsupported") + + # NOTE: ported from 3.13 Lib/pathlib/_local.py + def iterdir(self): + """Yield path objects of the directory contents. + + The children are yielded in arbitrary order, and the + special entries '.' and '..' are not included. + """ + root_dir = str(self) + with scandir(self) as scandir_it: + paths = [entry.path for entry in scandir_it] + if root_dir == '.': + paths = map(self._remove_leading_dot, paths) + return map(self._from_parsed_string, paths) + + def _scandir(self) -> _DissectScandirIterator: + return scandir(self) + + # NOTE: ported from 3.13 Lib/pathlib/_local.py + def glob(self, pattern, *, case_sensitive=None, recurse_symlinks=False): + """Iterate over this subtree and yield all existing files (of any + kind, including directories) matching the given relative pattern. + """ + sys.audit("pathlib.Path.glob", self, pattern) + if not isinstance(pattern, TargetPath): + pattern = self.with_segments(pattern) + if pattern.anchor: + raise NotImplementedError("Non-relative patterns are unsupported") + parts = pattern._tail.copy() + if not parts: + raise ValueError("Unacceptable pattern: {!r}".format(pattern)) + raw = pattern._raw_path + if raw[-1] in (self.parser.sep, self.parser.altsep): + # GH-65238: pathlib doesn't preserve trailing slash. Add it back. + parts.append('') + + select = self._glob_selector(parts[::-1], case_sensitive, recurse_symlinks) + return select(self) + + # NOTE: ported from 3.13 Lib/pathlib/_abc.py + def _glob_selector(self, parts, case_sensitive, recurse_symlinks): + case_sensitive = self._fs.case_sensitive + case_pedantic = False + globber = self._globber(self.parser.sep, case_sensitive, case_pedantic, recurse_symlinks) + return globber.selector(parts) + + # TODO: rglob + + @classmethod + def cwd(cls) -> TargetPath: + """Return a new path pointing to the current working directory.""" + raise NotImplementedError("TargetPath.cwd() is unsupported") + + @classmethod + def home(cls) -> TargetPath: + """Return a new path pointing to the user's home directory (as + returned by os.path.expanduser('~')). + """ + raise NotImplementedError("TargetPath.home() is unsupported") + + def absolute(self) -> TargetPath: + """Return an absolute version of this path by prepending the current + working directory. No normalization or symlink resolution is performed. + + Use resolve() to get the canonical path to a file. + """ + raise NotImplementedError("TargetPath.absolute() is unsupported in Dissect") + + # NOTE: We changed some of the error handling here to deal with our own exception types + def resolve(self, strict: bool = False) -> TargetPath: + """ + Make the path absolute, resolving all symlinks on the way and also + normalizing it. + """ + + s = self._flavour.realpath(self, strict=strict) + p = self.with_segments(s) + + # In non-strict mode, realpath() doesn't raise on symlink loops. + # Ensure we get an exception by calling stat() + if not strict: + try: + p.stat() + except FilesystemError as e: + if isinstance(e, SymlinkRecursionError): + raise + return p + + def owner(self) -> str: + """ + Return the login name of the file owner. + """ + raise NotImplementedError("TargetPath.owner() is unsupported") + + def group(self) -> str: + """ + Return the group name of the file gid. + """ + raise NotImplementedError("TargetPath.group() is unsupported") + + def readlink(self) -> TargetPath: + """ + Return the path to which the symbolic link points. + """ + return self.with_segments(self.get().readlink()) + + def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: + """ + Create this file with the given access mode, if it doesn't exist. + """ + raise NotImplementedError("TargetPath.touch() is unsupported") + + def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False) -> None: + """ + Create a new directory at this given path. + """ + raise NotImplementedError("TargetPath.mkdir() is unsupported") + + def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: + """ + Change the permissions of the path, like os.chmod(). + """ + raise NotImplementedError("TargetPath.chmod() is unsupported") + + def lchmod(self, mode: int) -> None: + """ + Like chmod(), except if the path points to a symlink, the symlink's + permissions are changed, rather than its target's. + """ + raise NotImplementedError("TargetPath.lchmod() is unsupported") + + def unlink(self, missing_ok: bool = False) -> None: + """ + Remove this file or link. + If the path is a directory, use rmdir() instead. + """ + raise NotImplementedError("TargetPath.unlink() is unsupported") + + def rmdir(self) -> None: + """ + Remove this directory. The directory must be empty. + """ + raise NotImplementedError("TargetPath.rmdir() is unsupported") + + def rename(self, target: str) -> TargetPath: + """ + Rename this path to the target path. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + raise NotImplementedError("TargetPath.rename() is unsupported") + + def replace(self, target: str) -> TargetPath: + """ + Rename this path to the target path, overwriting if that path exists. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + raise NotImplementedError("TargetPath.replace() is unsupported") + + def symlink_to(self, target: str, target_is_directory: bool = False) -> None: + """ + Make this path a symlink pointing to the target path. + Note the order of arguments (link, target) is the reverse of os.symlink. + """ + raise NotImplementedError("TargetPath.symlink_to() is unsupported") + + def hardlink_to(self, target: str) -> None: + """ + Make this path a hard link pointing to the same file as *target*. + + Note the order of arguments (self, target) is the reverse of os.link's. + """ + raise NotImplementedError("TargetPath.hardlink_to() is unsupported") + + def expanduser(self) -> TargetPath: + """Return a new path with expanded ~ and ~user constructs + (as returned by os.path.expanduser) + """ + raise NotImplementedError("TargetPath.expanduser() is unsupported") + + + + +# class PathGlobber(_GlobberBase): +# """ +# Class providing shell-style globbing for path objects. +# """ + +# lexists = operator.methodcaller('exists', follow_symlinks=False) +# add_slash = operator.methodcaller('joinpath', '') + +# @staticmethod +# def scandir(path): +# """Emulates os.scandir(), which returns an object that can be used as +# a context manager. This method is called by walk() and glob(). +# """ +# import contextlib +# return contextlib.nullcontext(path.iterdir()) + +# @staticmethod +# def concat_path(path, text): +# """Appends text to the given path.""" +# return path.with_segments(path._raw_path + text) + +# @staticmethod +# def parse_entry(entry): +# """Returns the path of an entry yielded from scandir().""" +# return entry diff --git a/dissect/target/helpers/fsutil.py b/dissect/target/helpers/fsutil.py index 1b1eec9b4..96fa0a314 100644 --- a/dissect/target/helpers/fsutil.py +++ b/dissect/target/helpers/fsutil.py @@ -52,7 +52,9 @@ splitroot, ) -if sys.version_info >= (3, 12): +if sys.version_info >= (3, 13): + from dissect.target.helpers.compat.path_313 import PureDissectPath, TargetPath +elif sys.version_info >= (3, 12): from dissect.target.helpers.compat.path_312 import PureDissectPath, TargetPath elif sys.version_info >= (3, 11): from dissect.target.helpers.compat.path_311 import PureDissectPath, TargetPath From af6fea3238b136ef918dd1ebf416aefb6a47c982 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Wed, 4 Dec 2024 22:14:07 +0100 Subject: [PATCH 02/12] Improve TargetPath compatibility --- dissect/target/exceptions.py | 11 + dissect/target/helpers/compat/path_310.py | 119 +----- dissect/target/helpers/compat/path_311.py | 119 +----- dissect/target/helpers/compat/path_312.py | 115 +----- dissect/target/helpers/compat/path_313.py | 369 +++++++------------ dissect/target/helpers/compat/path_39.py | 111 +----- dissect/target/helpers/compat/path_common.py | 10 +- dissect/target/helpers/fsutil.py | 26 +- dissect/target/helpers/polypath.py | 2 +- tests/helpers/test_fsutil.py | 62 +++- tests/plugins/os/windows/test_recyclebin.py | 5 +- 11 files changed, 279 insertions(+), 670 deletions(-) diff --git a/dissect/target/exceptions.py b/dissect/target/exceptions.py index 77dc540b8..b0cae7476 100644 --- a/dissect/target/exceptions.py +++ b/dissect/target/exceptions.py @@ -1,5 +1,6 @@ from __future__ import annotations +import errno import os import sys import traceback @@ -76,22 +77,32 @@ class PluginNotFoundError(PluginError): class FileNotFoundError(FilesystemError, FileNotFoundError): """The requested path could not be found.""" + errno = errno.ENOENT + class IsADirectoryError(FilesystemError, IsADirectoryError): """The entry is a directory.""" + errno = errno.EISDIR + class NotADirectoryError(FilesystemError, NotADirectoryError): """The entry is not a directory.""" + errno = errno.ENOTDIR + class NotASymlinkError(FilesystemError): """The entry is not a symlink.""" + errno = errno.EINVAL + class SymlinkRecursionError(FilesystemError): """A symlink loop is detected for the entry.""" + errno = errno.ELOOP + class RegistryError(Error): """A registry error occurred.""" diff --git a/dissect/target/helpers/compat/path_310.py b/dissect/target/helpers/compat/path_310.py index bee647c31..150c2ceba 100644 --- a/dissect/target/helpers/compat/path_310.py +++ b/dissect/target/helpers/compat/path_310.py @@ -20,23 +20,15 @@ import fnmatch import re from pathlib import Path, PurePath, _Accessor, _PosixFlavour -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Any, Callable, Iterator, Optional +from typing import IO, TYPE_CHECKING, Any, Callable, Iterator from dissect.target import filesystem from dissect.target.exceptions import FilesystemError, SymlinkRecursionError -from dissect.target.helpers.compat.path_common import ( - _DissectPathParents, - io_open, - isjunction, - realpath, - scandir, -) -from dissect.target.helpers.polypath import normalize +from dissect.target.helpers import polypath +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry - from dissect.target.helpers.compat.path_common import _DissectScandirIterator from dissect.target.helpers.fsutil import stat_result @@ -82,9 +74,9 @@ def open( path: TargetPath, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -93,15 +85,15 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(path, mode, buffering, encoding, errors, newline) + return path_common.io_open(path, mode, buffering, encoding, errors, newline) @staticmethod def listdir(path: TargetPath) -> Iterator[str]: return path.get().listdir() @staticmethod - def scandir(path: TargetPath) -> _DissectScandirIterator: - return scandir(path) + def scandir(path: TargetPath) -> path_common._DissectScandirIterator: + return path_common.scandir(path) @staticmethod def chmod(path: TargetPath, mode: int, *, follow_symlinks: bool = True) -> None: @@ -163,10 +155,10 @@ def getcwd() -> str: def expanduser(path: str) -> str: raise NotImplementedError("TargetPath.expanduser() is unsupported") - realpath = staticmethod(realpath) + realpath = staticmethod(path_common.realpath) # NOTE: Forward compatibility with CPython >= 3.12 - isjunction = staticmethod(isjunction) + isjunction = staticmethod(path_common.isjunction) _dissect_accessor = _DissectAccessor() @@ -193,7 +185,7 @@ def _from_parts(cls, args: list) -> TargetPath: path_args = [] for arg in args[1:]: if isinstance(arg, str): - arg = normalize(arg, alt_separator=alt_separator) + arg = polypath.normalize(arg, alt_separator=alt_separator) path_args.append(arg) self = super()._from_parts(path_args) @@ -247,8 +239,8 @@ def parent(self) -> TargetPath: return result @property - def parents(self) -> _DissectPathParents: - return _DissectPathParents(self) + def parents(self) -> path_common._DissectPathParents: + return path_common._DissectPathParents(self) class TargetPath(Path, PureDissectPath): @@ -382,9 +374,9 @@ def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -402,7 +394,7 @@ def write_bytes(self, data: bytes) -> int: raise NotImplementedError("TargetPath.write_bytes() is unsupported") def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. @@ -417,45 +409,6 @@ def readlink(self) -> TargetPath: obj = self._from_parts((self._fs, path)) return obj - def exists(self) -> bool: - """ - Whether this path exists. - """ - try: - # .exists() must resolve possible symlinks - self.get().stat() - return True - except (FilesystemError, ValueError): - return False - - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - # NOTE: Forward compatibility with CPython >= 3.12 def is_junction(self) -> bool: """ @@ -463,42 +416,6 @@ def is_junction(self) -> bool: """ return self._accessor.isjunction(self) - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_socket(self) -> bool: - """ - Whether this path is a socket. - """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - def expanduser(self) -> TargetPath: """Return a new path with expanded ~ and ~user constructs (as returned by os.path.expanduser) diff --git a/dissect/target/helpers/compat/path_311.py b/dissect/target/helpers/compat/path_311.py index d090921c0..f7430ee8c 100644 --- a/dissect/target/helpers/compat/path_311.py +++ b/dissect/target/helpers/compat/path_311.py @@ -17,34 +17,24 @@ Notes: - CPython 3.11 ditched the _Accessor class, so we override the methods that should use it """ + from __future__ import annotations import fnmatch import re from pathlib import Path, PurePath, _PosixFlavour -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Any, Callable, Iterator, Optional +from typing import IO, TYPE_CHECKING, Any, Callable, Iterator from dissect.target import filesystem from dissect.target.exceptions import ( - FileNotFoundError, FilesystemError, - NotADirectoryError, - NotASymlinkError, SymlinkRecursionError, ) -from dissect.target.helpers.compat.path_common import ( - _DissectPathParents, - io_open, - isjunction, - realpath, - scandir, -) -from dissect.target.helpers.polypath import normalize +from dissect.target.helpers import polypath +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry - from dissect.target.helpers.compat.path_common import _DissectScandirIterator from dissect.target.helpers.fsutil import stat_result @@ -101,7 +91,7 @@ def _from_parts(cls, args: list) -> TargetPath: path_args = [] for arg in args[1:]: if isinstance(arg, str): - arg = normalize(arg, alt_separator=alt_separator) + arg = polypath.normalize(arg, alt_separator=alt_separator) path_args.append(arg) self = super()._from_parts(path_args) @@ -155,8 +145,8 @@ def parent(self) -> TargetPath: return result @property - def parents(self) -> _DissectPathParents: - return _DissectPathParents(self) + def parents(self) -> path_common._DissectPathParents: + return path_common._DissectPathParents(self) class TargetPath(Path, PureDissectPath): @@ -193,7 +183,7 @@ def iterdir(self) -> Iterator[TargetPath]: """Iterate over the files in this directory. Does not yield any result for the special paths '.' and '..'. """ - for entry in scandir(self): + for entry in path_common.scandir(self): if entry.name in {".", ".."}: # Yielding a path object for these makes little sense continue @@ -201,8 +191,8 @@ def iterdir(self) -> Iterator[TargetPath]: child_path._entry = entry yield child_path - def _scandir(self) -> _DissectScandirIterator: - return scandir(self) + def _scandir(self) -> path_common._DissectScandirIterator: + return path_common.scandir(self) # NOTE: Forward compatibility with CPython >= 3.12 def walk( @@ -267,7 +257,7 @@ def resolve(self, strict: bool = False) -> TargetPath: normalizing it. """ - s = realpath(self, strict=strict) + s = path_common.realpath(self, strict=strict) p = self._from_parts((self._fs, s)) # In non-strict mode, realpath() doesn't raise on symlink loops. @@ -306,9 +296,9 @@ def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -317,7 +307,7 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(self, mode, buffering, encoding, errors, newline) + return path_common.io_open(self, mode, buffering, encoding, errors, newline) def write_bytes(self, data: bytes) -> int: """ @@ -326,7 +316,7 @@ def write_bytes(self, data: bytes) -> int: raise NotImplementedError("TargetPath.write_bytes() is unsupported") def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. @@ -430,36 +420,6 @@ def link_to(self, target: str) -> None: """ raise NotImplementedError("TargetPath.link_to() is unsupported") - def exists(self) -> bool: - """ - Whether this path exists. - """ - try: - # .exists() must resolve possible symlinks - self.get().stat() - return True - except (FileNotFoundError, NotADirectoryError, NotASymlinkError, SymlinkRecursionError, ValueError): - return False - - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - def is_mount(self) -> bool: """ Check if this path is a POSIX mount point @@ -480,57 +440,12 @@ def is_mount(self) -> bool: parent_ino = self.parent.stat().st_ino return ino == parent_ino - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - # NOTE: Forward compatibility with CPython >= 3.12 def is_junction(self) -> bool: """ Whether this path is a junction. """ - return isjunction(self) - - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_socket(self) -> bool: - """ - Whether this path is a socket. - """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False + return path_common.isjunction(self) def expanduser(self) -> TargetPath: """Return a new path with expanded ~ and ~user constructs diff --git a/dissect/target/helpers/compat/path_312.py b/dissect/target/helpers/compat/path_312.py index 374617901..043322f7a 100644 --- a/dissect/target/helpers/compat/path_312.py +++ b/dissect/target/helpers/compat/path_312.py @@ -12,7 +12,7 @@ The implementation is split up in multiple files, one for each CPython version. You're currently looking at the CPython 3.12 implementation. -Commit hash we're in sync with: +Commit hash we're in sync with: f49221a Notes: - CPython 3.12 changed a lot in preparation of proper subclassing, so our patches differ @@ -25,22 +25,15 @@ import posixpath import sys from pathlib import Path, PurePath -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Iterator, Optional +from typing import IO, TYPE_CHECKING, Iterator from dissect.target import filesystem from dissect.target.exceptions import FilesystemError, SymlinkRecursionError from dissect.target.helpers import polypath -from dissect.target.helpers.compat.path_common import ( - io_open, - isjunction, - realpath, - scandir, -) +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry - from dissect.target.helpers.compat.path_common import _DissectScandirIterator from dissect.target.helpers.fsutil import stat_result @@ -69,7 +62,7 @@ def normcase(self, s: str) -> str: splitdrive = staticmethod(posixpath.splitdrive) - def splitroot(self, part: str) -> tuple[str, str]: + def splitroot(self, part: str) -> tuple[str, str, str]: return polypath.splitroot(part, alt_separator=self.altsep) def join(self, *args) -> str: @@ -93,14 +86,14 @@ def ismount(self, path: TargetPath) -> bool: parent_ino = path.parent.stat().st_ino return ino == parent_ino - isjunction = staticmethod(isjunction) + isjunction = staticmethod(path_common.isjunction) samestat = staticmethod(posixpath.samestat) def isabs(self, path: str) -> bool: return polypath.isabs(path, alt_separator=self.altsep) - realpath = staticmethod(realpath) + realpath = staticmethod(path_common.realpath) class PureDissectPath(PurePath): @@ -114,7 +107,7 @@ def __init__(self, fs: Filesystem, *pathsegments): if not isinstance(fs, filesystem.Filesystem): raise TypeError( "invalid PureDissectPath initialization: missing filesystem, " - "got %r (this might be a bug, please report)" % pathsegments + "got %r (this might be a bug, please report)" % (fs, *pathsegments) ) alt_separator = fs.alt_separator @@ -177,91 +170,13 @@ def stat(self, *, follow_symlinks: bool = True) -> stat_result: else: return self.get().lstat() - def exists(self, *, follow_symlinks: bool = True) -> bool: - """ - Whether this path exists. - - This method normally follows symlinks; to check whether a symlink exists, - add the argument follow_symlinks=False. - """ - try: - # .exists() must resolve possible symlinks - self.stat(follow_symlinks=follow_symlinks) - return True - except (FilesystemError, ValueError): - return False - - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_socket(self) -> bool: - """ - Whether this path is a socket. - """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -270,7 +185,7 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(self, mode, buffering, encoding, errors, newline) + return path_common.io_open(self, mode, buffering, encoding, errors, newline) def write_bytes(self, data: bytes) -> int: """ @@ -279,7 +194,7 @@ def write_bytes(self, data: bytes) -> int: raise NotImplementedError("TargetPath.write_bytes() is unsupported") def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. @@ -290,7 +205,7 @@ def iterdir(self) -> Iterator[TargetPath]: """Iterate over the files in this directory. Does not yield any result for the special paths '.' and '..'. """ - for entry in scandir(self): + for entry in path_common.scandir(self): if entry.name in {".", ".."}: # Yielding a path object for these makes little sense continue @@ -298,8 +213,8 @@ def iterdir(self) -> Iterator[TargetPath]: child_path._entry = entry yield child_path - def _scandir(self) -> _DissectScandirIterator: - return scandir(self) + def _scandir(self) -> path_common._DissectScandirIterator: + return path_common.scandir(self) @classmethod def cwd(cls) -> TargetPath: diff --git a/dissect/target/helpers/compat/path_313.py b/dissect/target/helpers/compat/path_313.py index 8e6b9d1e4..cef3c8a09 100644 --- a/dissect/target/helpers/compat/path_313.py +++ b/dissect/target/helpers/compat/path_313.py @@ -12,7 +12,7 @@ The implementation is split up in multiple files, one for each CPython version. You're currently looking at the CPython 3.13 implementation. -Commit hash we're in sync with: a7aa7c41ebfce5bf537c939c8dfc0605adcfabd8 +Commit hash we're in sync with: 094d95f Notes: - https://docs.python.org/3.13/whatsnew/3.13.html#pathlib @@ -23,28 +23,22 @@ import posixpath import sys -from pathlib import Path, PurePath from glob import _Globber -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Optional +from pathlib import Path, PurePath +from pathlib._abc import PathBase, UnsupportedOperation +from typing import IO, TYPE_CHECKING, Callable, Iterator from dissect.target import filesystem from dissect.target.exceptions import FilesystemError, SymlinkRecursionError from dissect.target.helpers import polypath -from dissect.target.helpers.compat.path_common import ( - io_open, - isjunction, - realpath, - scandir, -) +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry - from dissect.target.helpers.compat.path_common import _DissectScandirIterator from dissect.target.helpers.fsutil import stat_result -class _DissectFlavour: +class _DissectParser: sep = "/" altsep = "" case_sensitive = False @@ -67,50 +61,34 @@ def __init__(self, case_sensitive: bool = False, alt_separator: str = ""): def normcase(self, s: str) -> str: return s if self.case_sensitive else s.lower() + def split(self, part: str) -> tuple[str, str]: + return polypath.split(part, alt_separator=self.altsep) + splitdrive = staticmethod(posixpath.splitdrive) - def splitroot(self, part: str) -> tuple[str, str]: + def splitroot(self, part: str) -> tuple[str, str, str]: return polypath.splitroot(part, alt_separator=self.altsep) def join(self, *args) -> str: return polypath.join(*args, alt_separator=self.altsep) - # NOTE: Fallback implementation from older versions of pathlib.py - def ismount(self, path: TargetPath) -> bool: - # Need to exist and be a dir - if not path.exists() or not path.is_dir(): - return False - - try: - parent_dev = path.parent.stat().st_dev - except FilesystemError: - return False - - dev = path.stat().st_dev - if dev != parent_dev: - return True - ino = path.stat().st_ino - parent_ino = path.parent.stat().st_ino - return ino == parent_ino - - isjunction = staticmethod(isjunction) - - samestat = staticmethod(posixpath.samestat) + isjunction = staticmethod(path_common.isjunction) def isabs(self, path: str) -> bool: - return polypath.isabs(path, alt_separator=self.altsep) + return polypath.isabs(str(path), alt_separator=self.altsep) - realpath = staticmethod(realpath) + realpath = staticmethod(path_common.realpath) -# NOTE: ported from 3.13 Lib/glob.py class _DissectGlobber(_Globber): - pass + @staticmethod + def add_slash(path: TargetPath) -> TargetPath: + return _GlobberTargetPath(path._fs, path, "") class PureDissectPath(PurePath): _fs: Filesystem - _flavour = _DissectFlavour(case_sensitive=False) + parser: _DissectParser = _DissectParser(case_sensitive=False) _globber = _DissectGlobber def __reduce__(self) -> tuple: @@ -120,7 +98,7 @@ def __init__(self, fs: Filesystem, *pathsegments): if not isinstance(fs, filesystem.Filesystem): raise TypeError( "invalid PureDissectPath initialization: missing filesystem, " - "got %r (this might be a bug, please report)" % pathsegments + "got %r (this might be a bug, please report)" % (fs, *pathsegments) ) alt_separator = fs.alt_separator @@ -132,7 +110,7 @@ def __init__(self, fs: Filesystem, *pathsegments): super().__init__(*path_args) self._fs = fs - self._flavour = _DissectFlavour(alt_separator=fs.alt_separator, case_sensitive=fs.case_sensitive) + self.parser = _DissectParser(alt_separator=fs.alt_separator, case_sensitive=fs.case_sensitive) def with_segments(self, *pathsegments) -> TargetPath: return type(self)(self._fs, *pathsegments) @@ -141,11 +119,11 @@ def with_segments(self, *pathsegments) -> TargetPath: def _parse_path(self, path: str) -> tuple[str, str, list[str]]: if not path: return "", "", [] - sep = self._flavour.sep - altsep = self._flavour.altsep + sep = self.parser.sep + altsep = self.parser.altsep if altsep: path = path.replace(altsep, sep) - drv, root, rel = self._flavour.splitroot(path) + drv, root, rel = self.parser.splitroot(path) if not root and drv.startswith(sep) and not drv.endswith(sep): drv_parts = drv.split(sep) if len(drv_parts) == 4 and drv_parts[2] not in "?.": @@ -157,15 +135,14 @@ def _parse_path(self, path: str) -> tuple[str, str, list[str]]: parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] return drv, root, parsed - def is_reserved(self) -> bool: - """Return True if the path contains one of the special names reserved - by the system, if any.""" - return False - class TargetPath(Path, PureDissectPath): __slots__ = ("_entry",) + @classmethod + def _unsupported_msg(cls, attribute: str) -> str: + return f"{cls.__name__}.{attribute} is unsupported" + def get(self) -> FilesystemEntry: try: return self._entry @@ -197,77 +174,21 @@ def exists(self, *, follow_symlinks: bool = True) -> bool: except (FilesystemError, ValueError): return False - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False + is_mount = PathBase.is_mount - def is_socket(self) -> bool: + def is_junction(self) -> bool: """ - Whether this path is a socket. + Whether this path is a junction. """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False + return self.parser.isjunction(self) def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -276,88 +197,92 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(self, mode, buffering, encoding, errors, newline) + return path_common.io_open(self, mode, buffering, encoding, errors, newline) def write_bytes(self, data: bytes) -> int: """ Open the file in bytes mode, write to it, and close the file. """ - raise NotImplementedError("TargetPath.write_bytes() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("write_bytes()")) def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. """ - raise NotImplementedError("TargetPath.write_text() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("write_text()")) - # NOTE: ported from 3.13 Lib/pathlib/_local.py - def iterdir(self): + def iterdir(self) -> Iterator[TargetPath]: """Yield path objects of the directory contents. The children are yielded in arbitrary order, and the special entries '.' and '..' are not included. """ root_dir = str(self) - with scandir(self) as scandir_it: - paths = [entry.path for entry in scandir_it] - if root_dir == '.': - paths = map(self._remove_leading_dot, paths) - return map(self._from_parsed_string, paths) - - def _scandir(self) -> _DissectScandirIterator: - return scandir(self) - - # NOTE: ported from 3.13 Lib/pathlib/_local.py - def glob(self, pattern, *, case_sensitive=None, recurse_symlinks=False): + with path_common.scandir(self) as scandir_it: + for entry in scandir_it: + name = entry.name + if root_dir == ".": + name = self._remove_leading_dot(name) + child_path = self.joinpath(name) + child_path._entry = entry + yield child_path + + def glob( + self, pattern: str, *, case_sensitive: bool | None = None, recurse_symlinks: bool = False + ) -> Iterator[TargetPath]: """Iterate over this subtree and yield all existing files (of any kind, including directories) matching the given relative pattern. """ - sys.audit("pathlib.Path.glob", self, pattern) - if not isinstance(pattern, TargetPath): - pattern = self.with_segments(pattern) - if pattern.anchor: - raise NotImplementedError("Non-relative patterns are unsupported") - parts = pattern._tail.copy() - if not parts: - raise ValueError("Unacceptable pattern: {!r}".format(pattern)) - raw = pattern._raw_path - if raw[-1] in (self.parser.sep, self.parser.altsep): - # GH-65238: pathlib doesn't preserve trailing slash. Add it back. - parts.append('') - - select = self._glob_selector(parts[::-1], case_sensitive, recurse_symlinks) - return select(self) - - # NOTE: ported from 3.13 Lib/pathlib/_abc.py - def _glob_selector(self, parts, case_sensitive, recurse_symlinks): - case_sensitive = self._fs.case_sensitive - case_pedantic = False - globber = self._globber(self.parser.sep, case_sensitive, case_pedantic, recurse_symlinks) - return globber.selector(parts) - - # TODO: rglob + return PathBase.glob(self, pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks) + + def rglob( + self, pattern: str, *, case_sensitive: bool | None = None, recurse_symlinks: str = False + ) -> Iterator[TargetPath]: + """Recursively yield all existing files (of any kind, including + directories) matching the given relative pattern, anywhere in + this subtree. + """ + return PathBase.rglob(self, pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks) + + def walk( + self, top_down: bool = True, on_error: Callable[[Exception], None] = None, follow_symlinks: bool = False + ) -> Iterator[tuple[TargetPath, list[str], list[str]]]: + """Walk the directory tree from this directory, similar to os.walk().""" + return PathBase.walk(self, top_down=top_down, on_error=on_error, follow_symlinks=follow_symlinks) + + def absolute(self) -> TargetPath: + """Return an absolute version of this path + No normalization or symlink resolution is performed. + + Use resolve() to resolve symlinks and remove '..' segments. + """ + raise UnsupportedOperation(self._unsupported_msg("absolute()")) @classmethod def cwd(cls) -> TargetPath: """Return a new path pointing to the current working directory.""" - raise NotImplementedError("TargetPath.cwd() is unsupported") + raise UnsupportedOperation(cls._unsupported_msg("cwd()")) + + def expanduser(self) -> TargetPath: + """Return a new path with expanded ~ and ~user constructs + (as returned by os.path.expanduser) + """ + raise UnsupportedOperation(self._unsupported_msg("expanduser()")) @classmethod def home(cls) -> TargetPath: """Return a new path pointing to the user's home directory (as returned by os.path.expanduser('~')). """ - raise NotImplementedError("TargetPath.home() is unsupported") - - def absolute(self) -> TargetPath: - """Return an absolute version of this path by prepending the current - working directory. No normalization or symlink resolution is performed. + raise UnsupportedOperation(cls._unsupported_msg("home()")) - Use resolve() to get the canonical path to a file. + def readlink(self) -> TargetPath: + """ + Return the path to which the symbolic link points. """ - raise NotImplementedError("TargetPath.absolute() is unsupported in Dissect") + return self.with_segments(self.get().readlink()) # NOTE: We changed some of the error handling here to deal with our own exception types def resolve(self, strict: bool = False) -> TargetPath: @@ -366,7 +291,7 @@ def resolve(self, strict: bool = False) -> TargetPath: normalizing it. """ - s = self._flavour.realpath(self, strict=strict) + s = self.parser.realpath(self, strict=strict) p = self.with_segments(s) # In non-strict mode, realpath() doesn't raise on symlink loops. @@ -379,61 +304,32 @@ def resolve(self, strict: bool = False) -> TargetPath: raise return p - def owner(self) -> str: + def symlink_to(self, target: str, target_is_directory: bool = False) -> None: """ - Return the login name of the file owner. + Make this path a symlink pointing to the target path. + Note the order of arguments (link, target) is the reverse of os.symlink. """ - raise NotImplementedError("TargetPath.owner() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("symlink_to()")) - def group(self) -> str: - """ - Return the group name of the file gid. + def hardlink_to(self, target: str) -> None: """ - raise NotImplementedError("TargetPath.group() is unsupported") + Make this path a hard link pointing to the same file as *target*. - def readlink(self) -> TargetPath: - """ - Return the path to which the symbolic link points. + Note the order of arguments (self, target) is the reverse of os.link's. """ - return self.with_segments(self.get().readlink()) + raise UnsupportedOperation(self._unsupported_msg("hardlink_to()")) def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: """ Create this file with the given access mode, if it doesn't exist. """ - raise NotImplementedError("TargetPath.touch() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("touch()")) def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False) -> None: """ Create a new directory at this given path. """ - raise NotImplementedError("TargetPath.mkdir() is unsupported") - - def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: - """ - Change the permissions of the path, like os.chmod(). - """ - raise NotImplementedError("TargetPath.chmod() is unsupported") - - def lchmod(self, mode: int) -> None: - """ - Like chmod(), except if the path points to a symlink, the symlink's - permissions are changed, rather than its target's. - """ - raise NotImplementedError("TargetPath.lchmod() is unsupported") - - def unlink(self, missing_ok: bool = False) -> None: - """ - Remove this file or link. - If the path is a directory, use rmdir() instead. - """ - raise NotImplementedError("TargetPath.unlink() is unsupported") - - def rmdir(self) -> None: - """ - Remove this directory. The directory must be empty. - """ - raise NotImplementedError("TargetPath.rmdir() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("mkdir()")) def rename(self, target: str) -> TargetPath: """ @@ -445,7 +341,7 @@ def rename(self, target: str) -> TargetPath: Returns the new Path instance pointing to the target path. """ - raise NotImplementedError("TargetPath.rename() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("rename()")) def replace(self, target: str) -> TargetPath: """ @@ -457,54 +353,47 @@ def replace(self, target: str) -> TargetPath: Returns the new Path instance pointing to the target path. """ - raise NotImplementedError("TargetPath.replace() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("replace()")) - def symlink_to(self, target: str, target_is_directory: bool = False) -> None: + def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: """ - Make this path a symlink pointing to the target path. - Note the order of arguments (link, target) is the reverse of os.symlink. + Change the permissions of the path, like os.chmod(). """ - raise NotImplementedError("TargetPath.symlink_to() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("chmod()")) - def hardlink_to(self, target: str) -> None: + def lchmod(self, mode: int) -> None: """ - Make this path a hard link pointing to the same file as *target*. - - Note the order of arguments (self, target) is the reverse of os.link's. + Like chmod(), except if the path points to a symlink, the symlink's + permissions are changed, rather than its target's. """ - raise NotImplementedError("TargetPath.hardlink_to() is unsupported") + raise UnsupportedOperation(self._unsupported_msg("lchmod()")) - def expanduser(self) -> TargetPath: - """Return a new path with expanded ~ and ~user constructs - (as returned by os.path.expanduser) + def unlink(self, missing_ok: bool = False) -> None: """ - raise NotImplementedError("TargetPath.expanduser() is unsupported") - - - + Remove this file or link. + If the path is a directory, use rmdir() instead. + """ + raise UnsupportedOperation(self._unsupported_msg("unlink()")) -# class PathGlobber(_GlobberBase): -# """ -# Class providing shell-style globbing for path objects. -# """ + def rmdir(self) -> None: + """ + Remove this directory. The directory must be empty. + """ + raise UnsupportedOperation(self._unsupported_msg("rmdir()")) -# lexists = operator.methodcaller('exists', follow_symlinks=False) -# add_slash = operator.methodcaller('joinpath', '') + def owner(self) -> str: + """ + Return the login name of the file owner. + """ + raise UnsupportedOperation(self._unsupported_msg("owner()")) -# @staticmethod -# def scandir(path): -# """Emulates os.scandir(), which returns an object that can be used as -# a context manager. This method is called by walk() and glob(). -# """ -# import contextlib -# return contextlib.nullcontext(path.iterdir()) + def group(self) -> str: + """ + Return the group name of the file gid. + """ + raise UnsupportedOperation(self._unsupported_msg("group()")) -# @staticmethod -# def concat_path(path, text): -# """Appends text to the given path.""" -# return path.with_segments(path._raw_path + text) -# @staticmethod -# def parse_entry(entry): -# """Returns the path of an entry yielded from scandir().""" -# return entry +class _GlobberTargetPath(TargetPath): + def __str__(self) -> str: + return self._raw_path diff --git a/dissect/target/helpers/compat/path_39.py b/dissect/target/helpers/compat/path_39.py index 70566ef90..2af3189b0 100644 --- a/dissect/target/helpers/compat/path_39.py +++ b/dissect/target/helpers/compat/path_39.py @@ -20,8 +20,7 @@ import fnmatch import re from pathlib import Path, PurePath, _Accessor, _PosixFlavour -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Iterator, Optional +from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Iterator from dissect.target import filesystem from dissect.target.exceptions import ( @@ -29,13 +28,8 @@ NotASymlinkError, SymlinkRecursionError, ) -from dissect.target.helpers.compat.path_common import ( - _DissectPathParents, - io_open, - isjunction, - scandir, -) -from dissect.target.helpers.polypath import normalize, normpath +from dissect.target.helpers import polypath +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry @@ -147,7 +141,7 @@ def listdir(path: TargetPath) -> list[str]: @staticmethod def scandir(path: TargetPath) -> _DissectScandirIterator: - return scandir(path) + return path_common.scandir(path) @staticmethod def chmod(path: TargetPath, mode: int, *, follow_symlinks: bool = True) -> None: @@ -204,7 +198,7 @@ def group(path: TargetPath) -> str: raise NotImplementedError("TargetPath.group() is unsupported") # NOTE: Forward compatibility with CPython >= 3.12 - isjunction = staticmethod(isjunction) + isjunction = staticmethod(path_common.isjunction) _dissect_accessor = _DissectAccessor() @@ -231,7 +225,7 @@ def _from_parts(cls, args: list, init: bool = True) -> TargetPath: path_args = [] for arg in args[1:]: if isinstance(arg, str): - arg = normalize(arg, alt_separator=alt_separator) + arg = polypath.normalize(arg, alt_separator=alt_separator) path_args.append(arg) self = super()._from_parts(path_args, init=init) @@ -285,14 +279,14 @@ def parent(self) -> TargetPath: return result @property - def parents(self) -> _DissectPathParents: - return _DissectPathParents(self) + def parents(self) -> path_common._DissectPathParents: + return path_common._DissectPathParents(self) class TargetPath(Path, PureDissectPath): __slots__ = ("_entry",) - def _init(self, template: Optional[Path] = None) -> None: + def _init(self, template: Path | None = None) -> None: self._accessor = _dissect_accessor def _make_child_relpath(self, part: str) -> TargetPath: @@ -403,7 +397,7 @@ def resolve(self, strict: bool = False) -> TargetPath: self.stat() s = str(self.absolute()) # Now we have no symlinks in the path, it's safe to normalize it. - normed = normpath(s, self._flavour.altsep) + normed = polypath.normpath(s, self._flavour.altsep) obj = self._from_parts((self._fs, normed), init=False) obj._init(template=self) return obj @@ -421,9 +415,9 @@ def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -432,7 +426,7 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(self, mode, buffering, encoding, errors, newline) + return path_common.io_open(self, mode, buffering, encoding, errors, newline) def write_bytes(self, data: bytes) -> int: """ @@ -441,7 +435,7 @@ def write_bytes(self, data: bytes) -> int: raise NotImplementedError("TargetPath.write_bytes() is unsupported") def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. @@ -456,45 +450,6 @@ def readlink(self) -> TargetPath: obj = self._from_parts((self._fs, path), init=False) return obj - def exists(self) -> bool: - """ - Whether this path exists. - """ - try: - # .exists() must resolve possible symlinks - self.get().stat() - return True - except (FilesystemError, ValueError): - return False - - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - # NOTE: Forward compatibility with CPython >= 3.12 def is_junction(self) -> bool: """ @@ -502,42 +457,6 @@ def is_junction(self) -> bool: """ return self._accessor.isjunction(self) - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_socket(self) -> bool: - """ - Whether this path is a socket. - """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - def expanduser(self) -> TargetPath: """Return a new path with expanded ~ and ~user constructs (as returned by os.path.expanduser) diff --git a/dissect/target/helpers/compat/path_common.py b/dissect/target/helpers/compat/path_common.py index eda3962a7..d8bde46b6 100644 --- a/dissect/target/helpers/compat/path_common.py +++ b/dissect/target/helpers/compat/path_common.py @@ -4,11 +4,11 @@ import posixpath import stat import sys -from typing import IO, TYPE_CHECKING, Iterator, Literal, Optional +from typing import IO, TYPE_CHECKING, Iterator, Literal if TYPE_CHECKING: - from dissect.target.helpers.fsutil import TargetPath from dissect.target.filesystem import Filesystem, FilesystemEntry + from dissect.target.helpers.fsutil import TargetPath from dissect.target.exceptions import FilesystemError, SymlinkRecursionError from dissect.target.helpers.polypath import abspath, normalize @@ -173,9 +173,9 @@ def io_open( path: TargetPath, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. diff --git a/dissect/target/helpers/fsutil.py b/dissect/target/helpers/fsutil.py index 96fa0a314..589538590 100644 --- a/dissect/target/helpers/fsutil.py +++ b/dissect/target/helpers/fsutil.py @@ -11,7 +11,7 @@ import re import sys from pathlib import Path -from typing import Any, BinaryIO, Iterator, Optional, Sequence, TextIO, Union +from typing import Any, BinaryIO, Iterator, Sequence, TextIO try: import lzma @@ -102,9 +102,9 @@ ] -def generate_addr(path: Union[str, Path], alt_separator: str = "") -> int: +def generate_addr(path: str | Path, alt_separator: str = "") -> int: if not alt_separator and isinstance(path, Path): - alt_separator = path._flavour.altsep + alt_separator = (getattr(path, "parser", None) or path._flavour).altsep path = normalize(str(path), alt_separator=alt_separator) return int(hashlib.sha256(path.encode()).hexdigest()[:8], 16) @@ -235,7 +235,7 @@ def __repr__(self) -> str: ) return f"dissect.target.stat_result({values})" - def _parse_time(self, ts: Union[int, float]) -> tuple[int, float, int]: + def _parse_time(self, ts: int | float) -> tuple[int, float, int]: ts_int = int(ts) ts_ns = int(ts * 1e9) @@ -313,7 +313,7 @@ def glob_split(pattern: str, alt_separator: str = "") -> tuple[str, str]: Args: pattern: A glob pattern to match names of filesystem entries against. - alt_separator: An optional alternative path separator in use by the filesystem being matched. + alt_separator: An alternative | Nonepath separator in use by the filesystem being matched. Returns: A tuple of a string with path parts up to the first path part that has a glob pattern and a string of @@ -490,14 +490,14 @@ def resolve_link( def open_decompress( - path: Optional[TargetPath] = None, + path: TargetPath | None = None, mode: str = "rb", *, - fileobj: Optional[BinaryIO] = None, - encoding: Optional[str] = "UTF-8", - errors: Optional[str] = "backslashreplace", - newline: Optional[str] = None, -) -> Union[BinaryIO, TextIO]: + fileobj: BinaryIO | None = None, + encoding: str | None = "UTF-8", + errors: str | None = "backslashreplace", + newline: str | None = None, +) -> BinaryIO | TextIO: """Open and decompress a file. Handles gz, bz2 and zstd files. Uncompressed files are opened as-is. When passing in an already opened ``fileobj``, the mode, encoding, errors and newline arguments are ignored. @@ -605,9 +605,9 @@ def reverse_readlines(fh: TextIO, chunk_size: int = 1024 * 1024 * 8) -> Iterator def fs_attrs( - path: Union[os.PathLike, str, bytes], + path: os.PathLike | str | bytes, follow_symlinks: bool = True, -) -> dict[Union[os.PathLike, str, bytes], bytes]: +) -> dict[os.PathLike | str | bytes, bytes]: """Return the extended attributes for a given path on the local filesystem. This is currently only implemented for Linux using os.listxattr and related functions. diff --git a/dissect/target/helpers/polypath.py b/dissect/target/helpers/polypath.py index dfe44cf70..01e5b2947 100644 --- a/dissect/target/helpers/polypath.py +++ b/dissect/target/helpers/polypath.py @@ -37,7 +37,7 @@ def split(path: str, alt_separator: str = "") -> str: splitdrive = posixpath.splitdrive -def splitroot(path: str, alt_separator: str = "") -> tuple[str, str]: +def splitroot(path: str, alt_separator: str = "") -> tuple[str, str, str]: return posixpath.splitroot(normalize(path, alt_separator=alt_separator)) diff --git a/tests/helpers/test_fsutil.py b/tests/helpers/test_fsutil.py index 7a88886a2..9826d4624 100644 --- a/tests/helpers/test_fsutil.py +++ b/tests/helpers/test_fsutil.py @@ -226,10 +226,13 @@ def path_fs() -> Iterator[VirtualFilesystem]: vfs = VirtualFilesystem() vfs.makedirs("/some/dir") + vfs.makedirs("/some/dir/nested") vfs.symlink("/some/dir/file.txt", "/some/symlink.txt") vfs.symlink("nonexistent", "/some/dir/link.txt") + vfs.symlink("/some/dir/nested", "/some/dirlink") vfs.map_file_fh("/some/file.txt", io.BytesIO(b"content")) vfs.map_file_fh("/some/dir/file.txt", io.BytesIO(b"")) + vfs.map_file_fh("/some/dir/nested/file.txt", io.BytesIO(b"")) yield vfs @@ -374,14 +377,48 @@ def test_target_path_glob(path_fs: VirtualFilesystem) -> None: def test_target_path_rglob(path_fs: VirtualFilesystem) -> None: - assert list(path_fs.path("/some").rglob("*.txt")) == [ - path_fs.path("/some/symlink.txt"), - path_fs.path("/some/file.txt"), - path_fs.path("/some/dir/link.txt"), - path_fs.path("/some/dir/file.txt"), + assert list(map(str, path_fs.path("/some").rglob("*.txt"))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", ] + assert list(path_fs.path("/some").rglob("*.TXT")) == [] assert list(path_fs.path("/some").rglob("*.csv")) == [] + with patch.object(path_fs, "case_sensitive", False): + assert list(map(str, path_fs.path("/some").rglob("*.TXT"))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", + ] + + +@pytest.mark.skipif(sys.version_info < (3, 12), reason="requires Python 3.12+") +def test_target_path_rglob_case_sensitive(path_fs: VirtualFilesystem) -> None: + assert list(map(str, path_fs.path("/some").rglob("*.TXT", case_sensitive=False))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", + ] + + +@pytest.mark.skipif(sys.version_info < (3, 13), reason="requires Python 3.13+") +def test_target_path_rglob_recurse_symlinks(path_fs: VirtualFilesystem) -> None: + assert list(map(str, path_fs.path("/some").rglob("*.txt", recurse_symlinks=True))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dirlink/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", + ] + def test_target_path_is_dir(path_fs: VirtualFilesystem) -> None: assert path_fs.path("/some/dir").is_dir() @@ -439,14 +476,16 @@ def test_target_path_iterdir(path_fs: VirtualFilesystem) -> None: assert list(path_fs.path("/some").iterdir()) == [ path_fs.path("/some/dir"), path_fs.path("/some/symlink.txt"), + path_fs.path("/some/dirlink"), path_fs.path("/some/file.txt"), ] def test_target_path_walk(path_fs: VirtualFilesystem) -> None: assert list(path_fs.path("/some").walk()) == [ - (path_fs.path("/some"), ["dir"], ["symlink.txt", "file.txt"]), - (path_fs.path("/some/dir"), [], ["link.txt", "file.txt"]), + (path_fs.path("/some"), ["dir"], ["symlink.txt", "dirlink", "file.txt"]), + (path_fs.path("/some/dir"), ["nested"], ["link.txt", "file.txt"]), + (path_fs.path("/some/dir/nested"), [], ["file.txt"]), ] @@ -625,8 +664,9 @@ def test_pure_dissect_path__from_parts_flavour(alt_separator: str, case_sensitiv vfs = VirtualFilesystem(alt_separator=alt_separator, case_sensitive=case_sensitive) pure_dissect_path = fsutil.PureDissectPath(vfs, "/some/dir") - assert pure_dissect_path._flavour.altsep == alt_separator - assert pure_dissect_path._flavour.case_sensitive == case_sensitive + obj = getattr(pure_dissect_path, "parser", None) or pure_dissect_path._flavour + assert obj.altsep == alt_separator + assert obj.case_sensitive == case_sensitive def test_pure_dissect_path__from_parts_no_fs_exception() -> None: @@ -698,7 +738,9 @@ def test_reverse_readlines() -> None: ] vfs.map_file_fh("file_multi_long_single", io.BytesIO((("🦊" * 8000) + ("a" * 200)).encode())) - assert list(fsutil.reverse_readlines(vfs.path("file_multi_long_single").open("rt"))) == [("🦊" * 8000) + ("a" * 200)] + assert list(fsutil.reverse_readlines(vfs.path("file_multi_long_single").open("rt"))) == [ + ("🦊" * 8000) + ("a" * 200) + ] vfs.map_file_fh("empty", io.BytesIO(b"")) assert list(fsutil.reverse_readlines(vfs.path("empty").open("rt"))) == [] diff --git a/tests/plugins/os/windows/test_recyclebin.py b/tests/plugins/os/windows/test_recyclebin.py index 733a9198f..e2b33a551 100644 --- a/tests/plugins/os/windows/test_recyclebin.py +++ b/tests/plugins/os/windows/test_recyclebin.py @@ -1,3 +1,4 @@ +import io from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import Mock, mock_open, patch @@ -103,7 +104,7 @@ def test_read_bin_file_unknown(target_win, path): def test_recyclebin_plugin_file(target_win, recycle_bin): - recycle_bin.map_file("$ihello_world", None) + recycle_bin.map_file_fh("$ihello_world", io.BytesIO(b"")) target_win.fs.mount("C:\\$recycle.bin", recycle_bin) target_win.add_plugin(RecyclebinPlugin) @@ -114,7 +115,7 @@ def test_recyclebin_plugin_file(target_win, recycle_bin): def test_recyclebin_plugin_wrong_prefix(target_win, recycle_bin): - recycle_bin.map_file("hello_world", None) + recycle_bin.map_file_fh("hello_world", io.BytesIO(b"")) target_win.fs.mount("C:\\$recycle.bin", recycle_bin) target_win.add_plugin(RecyclebinPlugin) From ac8fb8f0353e9ad81e493b71cc2ff98bad4f4b99 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:38:58 +0100 Subject: [PATCH 03/12] More Python 3.13 fixes --- dissect/target/container.py | 2 +- dissect/target/helpers/compat/path_311.py | 5 +---- dissect/target/helpers/compat/path_313.py | 3 --- dissect/target/helpers/configutil.py | 1 - .../target/plugins/apps/container/docker.py | 14 +++++++------ dissect/target/plugins/os/windows/clfs.py | 2 -- dissect/target/plugins/os/windows/datetime.py | 21 +++++++++++++------ dissect/target/target.py | 2 +- dissect/target/tools/fsutils.py | 8 ++++--- tests/test_container.py | 2 +- 10 files changed, 32 insertions(+), 28 deletions(-) diff --git a/dissect/target/container.py b/dissect/target/container.py index 90a5d60c2..da4428413 100644 --- a/dissect/target/container.py +++ b/dissect/target/container.py @@ -169,7 +169,7 @@ def close(self) -> None: Override this if you need to clean-up anything. """ - raise NotImplementedError() + pass def register(module: str, class_name: str, internal: bool = True): diff --git a/dissect/target/helpers/compat/path_311.py b/dissect/target/helpers/compat/path_311.py index f7430ee8c..ffc4d539e 100644 --- a/dissect/target/helpers/compat/path_311.py +++ b/dissect/target/helpers/compat/path_311.py @@ -26,10 +26,7 @@ from typing import IO, TYPE_CHECKING, Any, Callable, Iterator from dissect.target import filesystem -from dissect.target.exceptions import ( - FilesystemError, - SymlinkRecursionError, -) +from dissect.target.exceptions import FilesystemError, SymlinkRecursionError from dissect.target.helpers import polypath from dissect.target.helpers.compat import path_common diff --git a/dissect/target/helpers/compat/path_313.py b/dissect/target/helpers/compat/path_313.py index cef3c8a09..d8e21126c 100644 --- a/dissect/target/helpers/compat/path_313.py +++ b/dissect/target/helpers/compat/path_313.py @@ -219,12 +219,9 @@ def iterdir(self) -> Iterator[TargetPath]: The children are yielded in arbitrary order, and the special entries '.' and '..' are not included. """ - root_dir = str(self) with path_common.scandir(self) as scandir_it: for entry in scandir_it: name = entry.name - if root_dir == ".": - name = self._remove_leading_dot(name) child_path = self.joinpath(name) child_path._entry = entry yield child_path diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index d676303d9..38c9a218a 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -305,7 +305,6 @@ def parse_file(self, fh: TextIO) -> None: class Bin(ConfigurationParser): - """Read the file into ``binary`` and show the number of bytes read""" def parse_file(self, fh: io.BytesIO) -> None: diff --git a/dissect/target/plugins/apps/container/docker.py b/dissect/target/plugins/apps/container/docker.py index a33dce5f9..dfc479914 100644 --- a/dissect/target/plugins/apps/container/docker.py +++ b/dissect/target/plugins/apps/container/docker.py @@ -260,9 +260,11 @@ def logs(self, raw_messages: bool = False, remove_backspaces: bool = False) -> I ts=log_entry.get("time"), container=container.name, # container hash stream=log_entry.get("stream"), - message=log_entry.get("log") - if raw_messages - else strip_log(log_entry.get("log"), remove_backspaces), + message=( + log_entry.get("log") + if raw_messages + else strip_log(log_entry.get("log"), remove_backspaces) + ), _target=self.target, ) @@ -273,9 +275,9 @@ def logs(self, raw_messages: bool = False, remove_backspaces: bool = False) -> I ts=ts.from_unix_us(log_entry.ts // 1000), container=container.parent.name, # container hash stream=log_entry.source, - message=log_entry.message - if raw_messages - else strip_log(log_entry.message, remove_backspaces), + message=( + log_entry.message if raw_messages else strip_log(log_entry.message, remove_backspaces) + ), _target=self.target, ) diff --git a/dissect/target/plugins/os/windows/clfs.py b/dissect/target/plugins/os/windows/clfs.py index f07cadfec..e386a0f92 100644 --- a/dissect/target/plugins/os/windows/clfs.py +++ b/dissect/target/plugins/os/windows/clfs.py @@ -4,7 +4,6 @@ from dissect.clfs.exceptions import InvalidBLFError, InvalidRecordBlockError from dissect.target.exceptions import UnsupportedPluginError -from dissect.target.helpers import fsutil from dissect.target.helpers.record import TargetRecordDescriptor from dissect.target.plugin import Plugin, export from dissect.target.target import Target @@ -91,7 +90,6 @@ def clfs(self) -> Iterator[ClfsRecord]: continue container_path = blf_container.name.replace("%BLF%", str(blf_path.parent)) - container_path = fsutil.normalize(container_path, alt_separator=blf_path._flavour.altsep) container_file = self.target.fs.path(container_path) fh = container_file.open() diff --git a/dissect/target/plugins/os/windows/datetime.py b/dissect/target/plugins/os/windows/datetime.py index 55d7c9967..a4ffe8784 100644 --- a/dissect/target/plugins/os/windows/datetime.py +++ b/dissect/target/plugins/os/windows/datetime.py @@ -1,7 +1,6 @@ import calendar from collections import namedtuple from datetime import datetime, timedelta, timezone, tzinfo -from typing import Dict, Tuple from dissect.cstruct import cstruct @@ -88,7 +87,7 @@ def parse_systemtime_transition(systemtime: c_tz._SYSTEMTIME, year: int) -> date ) -def parse_dynamic_dst(key: RegistryKey) -> Dict[int, TimezoneInformation]: +def parse_dynamic_dst(key: RegistryKey) -> dict[int, TimezoneInformation]: """Parse dynamic DST information from a given TimeZoneInformation registry key. If a timezone has dynamic DST information, there's a "Dynamic DST" subkey with values for each year. @@ -121,7 +120,7 @@ def parse_tzi(tzi: bytes) -> TimezoneInformation: ) -def get_dst_range(tzi: TimezoneInformation, year: int) -> Tuple[datetime, datetime]: +def get_dst_range(tzi: TimezoneInformation, year: int) -> tuple[datetime, datetime]: """Get the start and end date of DST for the given year.""" start = parse_systemtime_transition(tzi.daylight_date, year) end = parse_systemtime_transition(tzi.standard_date, year) @@ -160,9 +159,19 @@ def is_dst(self, dt: datetime) -> bool: flip = True start, end = end, start - # Can't compare naive to aware objects, so strip the timezone from - # dt first. - dt = dt.replace(tzinfo=None) + # Can't compare naive to aware objects, so strip the timezone from dt first + # Cast to a proper datetime object to avoid issues with subclassed datetime objects + dt = datetime( + dt.year, + dt.month, + dt.day, + dt.hour, + dt.minute, + dt.second, + dt.microsecond, + tzinfo=None, + fold=dt.fold, + ) result = False if start + HOUR <= dt < end - HOUR: diff --git a/dissect/target/target.py b/dissect/target/target.py index c3a2ab516..79e36cf41 100644 --- a/dissect/target/target.py +++ b/dissect/target/target.py @@ -615,7 +615,7 @@ def get_function(self, function: str) -> FunctionTuple: raise UnsupportedPluginError( f"Unsupported function `{function}` for target with OS plugin {self._os_plugin}", extra=causes[1:] if len(causes) > 1 else None, - ) from causes[0] if causes else None + ) from (causes[0] if causes else None) # We still ended up with no compatible plugins if function not in self._functions: diff --git a/dissect/target/tools/fsutils.py b/dissect/target/tools/fsutils.py index c79b8dfc8..954046864 100644 --- a/dissect/target/tools/fsutils.py +++ b/dissect/target/tools/fsutils.py @@ -217,9 +217,11 @@ def filetype(path: TargetPath) -> str: atime=datetime.fromtimestamp(s.st_atime, tz=timezone.utc).isoformat(timespec="microseconds"), mtime=datetime.fromtimestamp(s.st_mtime, tz=timezone.utc).isoformat(timespec="microseconds"), ctime=datetime.fromtimestamp(s.st_ctime, tz=timezone.utc).isoformat(timespec="microseconds"), - btime=datetime.fromtimestamp(s.st_birthtime, tz=timezone.utc).isoformat(timespec="microseconds") - if hasattr(s, "st_birthtime") and s.st_birthtime - else "?", + btime=( + datetime.fromtimestamp(s.st_birthtime, tz=timezone.utc).isoformat(timespec="microseconds") + if hasattr(s, "st_birthtime") and s.st_birthtime + else "?" + ), ) print(res, file=stdout) diff --git a/tests/test_container.py b/tests/test_container.py index 80087541f..22e3c9cf4 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -38,7 +38,7 @@ def test_open_fallback_fh(tmp_path): (bytes(range(256)) * 2) + b"conectix" + (b"\x00" * 8) - + (b"\xFF" * 8) + + (b"\xff" * 8) + (b"\x00" * 24) + struct.pack(">Q", 512) + (b"\x00" * 455) From fde971e107766b6e26c6aa01e3b5349328850eda Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 5 Dec 2024 17:05:52 +0100 Subject: [PATCH 04/12] fix most DeprecationWarnings --- dissect/target/helpers/config.py | 3 +++ dissect/target/helpers/utils.py | 16 ++++++++++------ dissect/target/plugins/apps/vpn/wireguard.py | 6 +++--- dissect/target/target.py | 2 +- tests/helpers/test_fsutil.py | 4 +--- tests/plugins/apps/vpn/test_wireguard.py | 12 +++++++----- tests/tools/test_diff.py | 2 +- 7 files changed, 26 insertions(+), 19 deletions(-) diff --git a/dissect/target/helpers/config.py b/dissect/target/helpers/config.py index 7a1cc3faa..e12c1eff7 100644 --- a/dissect/target/helpers/config.py +++ b/dissect/target/helpers/config.py @@ -70,6 +70,9 @@ def _find_config_file(paths: list[Path | str] | None) -> Path | None: config_file = None for path in paths: + if not path: + continue + path = Path(path) cur_path = path.absolute() diff --git a/dissect/target/helpers/utils.py b/dissect/target/helpers/utils.py index 7648e94e2..1cb39750f 100644 --- a/dissect/target/helpers/utils.py +++ b/dissect/target/helpers/utils.py @@ -143,15 +143,19 @@ def year_rollover_helper( log.debug("Skipping line: %s", line) continue + # We have to append the current_year to strptime instead of adding it using replace later. + # This prevents DeprecationWarnings on cpython >= 3.13 and Exceptions on cpython >= 3.15. + # See https://github.com/python/cpython/issues/70647 and https://github.com/python/cpython/pull/117107. + compare_ts = datetime.strptime(f"{timestamp.group(0)};1900", f"{ts_format};%Y") + if last_seen_month and compare_ts.month > last_seen_month: + current_year -= 1 + last_seen_month = compare_ts.month + try: - relative_ts = datetime.strptime(timestamp.group(0), ts_format) + relative_ts = datetime.strptime(f"{timestamp.group(0)};{current_year}", f"{ts_format};%Y") except ValueError as e: log.warning("Timestamp '%s' does not match format '%s', skipping line.", timestamp.group(0), ts_format) log.debug("", exc_info=e) continue - if last_seen_month and relative_ts.month > last_seen_month: - current_year -= 1 - last_seen_month = relative_ts.month - - yield relative_ts.replace(year=current_year, tzinfo=tzinfo), line + yield relative_ts.replace(tzinfo=tzinfo), line diff --git a/dissect/target/plugins/apps/vpn/wireguard.py b/dissect/target/plugins/apps/vpn/wireguard.py index a68a03403..5fbb1b1be 100644 --- a/dissect/target/plugins/apps/vpn/wireguard.py +++ b/dissect/target/plugins/apps/vpn/wireguard.py @@ -24,7 +24,7 @@ ("string", "postup"), ("string", "predown"), ("string", "postdown"), - ("string", "source"), + ("path", "source"), ], ) @@ -37,7 +37,7 @@ ("net.ipnetwork[]", "allowed_ips"), ("string", "endpoint"), ("varint", "persistent_keep_alive"), - ("string", "source"), + ("path", "source"), ], ) @@ -77,7 +77,7 @@ def __init__(self, target) -> None: super().__init__(target) self.configs: list[Path] = [] for path in self.CONFIG_GLOBS: - self.configs.extend(self.target.fs.path().glob(path.lstrip("/"))) + self.configs.extend(self.target.fs.path("/").glob(path.lstrip("/"))) def check_compatible(self) -> None: if not self.configs: diff --git a/dissect/target/target.py b/dissect/target/target.py index 79e36cf41..b69aa8774 100644 --- a/dissect/target/target.py +++ b/dissect/target/target.py @@ -95,7 +95,7 @@ def __init__(self, path: Union[str, Path] = None): try: self._config = config.load(config_paths) except Exception as e: - self.log.warning("Error loading config file: %s", self.path) + self.log.warning("Error loading config file: %s", config_paths) self.log.debug("", exc_info=e) self._config = config.load(None) # This loads an empty config. diff --git a/tests/helpers/test_fsutil.py b/tests/helpers/test_fsutil.py index 9826d4624..04d0b8794 100644 --- a/tests/helpers/test_fsutil.py +++ b/tests/helpers/test_fsutil.py @@ -738,9 +738,7 @@ def test_reverse_readlines() -> None: ] vfs.map_file_fh("file_multi_long_single", io.BytesIO((("🦊" * 8000) + ("a" * 200)).encode())) - assert list(fsutil.reverse_readlines(vfs.path("file_multi_long_single").open("rt"))) == [ - ("🦊" * 8000) + ("a" * 200) - ] + assert list(fsutil.reverse_readlines(vfs.path("file_multi_long_single").open("rt"))) == [("🦊" * 8000) + ("a" * 200)] vfs.map_file_fh("empty", io.BytesIO(b"")) assert list(fsutil.reverse_readlines(vfs.path("empty").open("rt"))) == [] diff --git a/tests/plugins/apps/vpn/test_wireguard.py b/tests/plugins/apps/vpn/test_wireguard.py index a5639e47f..79dd6c02c 100644 --- a/tests/plugins/apps/vpn/test_wireguard.py +++ b/tests/plugins/apps/vpn/test_wireguard.py @@ -1,10 +1,12 @@ +from dissect.target.filesystem import VirtualFilesystem from dissect.target.plugins.apps.vpn.wireguard import WireGuardPlugin +from dissect.target.target import Target from tests._utils import absolute_path -def test_wireguard_plugin_global_log(target_unix_users, fs_unix): +def test_wireguard_plugin_global_log(target_unix_users: Target, fs_unix: VirtualFilesystem): wireguard_config_file = absolute_path("_data/plugins/apps/vpn/wireguard/wg0.conf") - fs_unix.map_file("etc/wireguard/wg0.conf", wireguard_config_file) + fs_unix.map_file("/etc/wireguard/wg0.conf", wireguard_config_file) target_unix_users.add_plugin(WireGuardPlugin) records = list(target_unix_users.wireguard.config()) @@ -16,7 +18,7 @@ def test_wireguard_plugin_global_log(target_unix_users, fs_unix): assert str(record.address) == "10.13.37.1" assert record.private_key == "UHJpdmF0ZUtleQ==" assert record.listen_port == "12345" - assert record.source == "etc/wireguard/wg0.conf" + assert record.source == "/etc/wireguard/wg0.conf" assert record.dns is None # Peer @@ -24,11 +26,11 @@ def test_wireguard_plugin_global_log(target_unix_users, fs_unix): assert record.name is None assert [str(addr) for addr in record.allowed_ips] == ["10.13.37.2/32", "::/0"] assert record.public_key == "UHVibGljS2V5MQ==" - assert record.source == "etc/wireguard/wg0.conf" + assert record.source == "/etc/wireguard/wg0.conf" # Peer record = records[2] assert record.name is None assert [str(addr) for addr in record.allowed_ips] == ["10.13.37.3/32", "::/0"] assert record.public_key == "UHVibGljS2V5Mg==" - assert record.source == "etc/wireguard/wg0.conf" + assert record.source == "/etc/wireguard/wg0.conf" diff --git a/tests/tools/test_diff.py b/tests/tools/test_diff.py index df6bae6ce..26a65f191 100644 --- a/tests/tools/test_diff.py +++ b/tests/tools/test_diff.py @@ -310,7 +310,7 @@ def test_shell_plugin(src_target: Target, dst_target: Target, capsys) -> None: def test_target_diff_shell(capsys, monkeypatch) -> None: with monkeypatch.context() as m: m.setattr(fsutils, "LS_COLORS", {}) - m.setenv("NO_COLOR", 1) + m.setenv("NO_COLOR", "1") src_target_path = absolute_path("_data/tools/diff/src.tar") dst_target_path = absolute_path("_data/tools/diff/dst.tar") m.setattr("sys.argv", ["target-diff", "--deep", "shell", src_target_path, dst_target_path]) From b86057639090d6b58313b9971651bbcf306b3f8e Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Fri, 6 Dec 2024 10:22:21 +0100 Subject: [PATCH 05/12] Add isreserved --- dissect/target/helpers/fsutil.py | 2 ++ dissect/target/helpers/polypath.py | 8 ++++++++ tests/helpers/test_fsutil.py | 27 ++++++++++++++++++++++++--- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/dissect/target/helpers/fsutil.py b/dissect/target/helpers/fsutil.py index 589538590..da29d40b3 100644 --- a/dissect/target/helpers/fsutil.py +++ b/dissect/target/helpers/fsutil.py @@ -42,6 +42,7 @@ commonpath, dirname, isabs, + isreserved, join, normalize, normpath, @@ -82,6 +83,7 @@ "glob_split", "has_glob_magic", "isabs", + "isreserved", "join", "normalize", "normpath", diff --git a/dissect/target/helpers/polypath.py b/dissect/target/helpers/polypath.py index 01e5b2947..ca7694478 100644 --- a/dissect/target/helpers/polypath.py +++ b/dissect/target/helpers/polypath.py @@ -71,3 +71,11 @@ def relpath(path: str, start: str, alt_separator: str = "") -> str: def commonpath(paths: list[str], alt_separator: str = "") -> str: return posixpath.commonpath([normalize(path, alt_separator=alt_separator) for path in paths]) + + +def isreserved(path: str) -> bool: + """Return True if the path is a reserved name. + + We currently do not have any reserved names. + """ + return False diff --git a/tests/helpers/test_fsutil.py b/tests/helpers/test_fsutil.py index 04d0b8794..2e74edb92 100644 --- a/tests/helpers/test_fsutil.py +++ b/tests/helpers/test_fsutil.py @@ -165,6 +165,26 @@ def test_relpath(path: str, start: str, alt_separator: str, result: str) -> None assert fsutil.relpath(path, start, alt_separator=alt_separator) == result +@pytest.mark.parametrize( + ("paths, alt_separator, result"), + [ + (["/some/dir/some/file", "/some/dir/some/other"], "", "/some/dir/some"), + (["/some/dir/some/file", "/some/dir/some/other"], "\\", "/some/dir/some"), + (["\\some\\dir\\some\\file", "\\some\\dir\\some\\other"], "\\", "/some/dir/some"), + (["/some/dir/some/file", "/some/dir/other"], "", "/some/dir"), + (["/some/dir/some/file", "/some/other"], "", "/some"), + (["/some/dir/some/file", "/some/other"], "\\", "/some"), + ], +) +def test_commonpath(paths: list[str], alt_separator: str, result: str) -> None: + assert fsutil.commonpath(paths, alt_separator=alt_separator) == result + + +def test_isreserved() -> None: + assert not fsutil.isreserved("CON") + assert not fsutil.isreserved("foo") + + def test_generate_addr() -> None: slash_path = "/some/dir/some/file" slash_vfs = VirtualFilesystem(alt_separator="") @@ -312,9 +332,10 @@ def test_target_path_is_relative_to(path_fs: VirtualFilesystem) -> None: def test_target_path_is_reserved(path_fs: VirtualFilesystem) -> None: - # We currently do not have any reserved names for TargetPath - assert not path_fs.path("CON").is_reserved() - assert not path_fs.path("foo").is_reserved() + if sys.version_info < (3, 13): + # We currently do not have any reserved names for TargetPath + assert not path_fs.path("CON").is_reserved() + assert not path_fs.path("foo").is_reserved() def test_target_path_join(path_fs: VirtualFilesystem) -> None: From 7b93a900fc289467f1c215425066f0a587e6cc61 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Fri, 6 Dec 2024 10:33:05 +0100 Subject: [PATCH 06/12] Add comment explaining _GlobberTargetPath --- dissect/target/helpers/compat/path_313.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dissect/target/helpers/compat/path_313.py b/dissect/target/helpers/compat/path_313.py index d8e21126c..28a39957a 100644 --- a/dissect/target/helpers/compat/path_313.py +++ b/dissect/target/helpers/compat/path_313.py @@ -115,7 +115,7 @@ def __init__(self, fs: Filesystem, *pathsegments): def with_segments(self, *pathsegments) -> TargetPath: return type(self)(self._fs, *pathsegments) - # NOTE: This is copied from pathlib.py but turned into an instance method so we get access to the correct flavour + # NOTE: This is copied from pathlib/_local.py but turned into an instance method so we get access to the correct flavour def _parse_path(self, path: str) -> tuple[str, str, list[str]]: if not path: return "", "", [] @@ -393,4 +393,7 @@ def group(self) -> str: class _GlobberTargetPath(TargetPath): def __str__(self) -> str: + # This is necessary because the _Globber class expects an added `/` at the end + # However, only PurePathBase properly adds that, PurePath doesn't + # We do want to operate on Path objects rather than strings, so do a little hack here return self._raw_path From f455fcc1b5dffebb189211255155001cbe4ee8a4 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:44:45 +0100 Subject: [PATCH 07/12] Slightly change is_reserved test --- tests/helpers/test_fsutil.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/helpers/test_fsutil.py b/tests/helpers/test_fsutil.py index 2e74edb92..3c68b97f3 100644 --- a/tests/helpers/test_fsutil.py +++ b/tests/helpers/test_fsutil.py @@ -331,11 +331,11 @@ def test_target_path_is_relative_to(path_fs: VirtualFilesystem) -> None: assert not path_fs.path("/some/dir/file.txt").is_relative_to("/some/other") +@pytest.mark.skipif(sys.version_info >= (3, 13), reason="deprecated on Python 3.13+") def test_target_path_is_reserved(path_fs: VirtualFilesystem) -> None: - if sys.version_info < (3, 13): - # We currently do not have any reserved names for TargetPath - assert not path_fs.path("CON").is_reserved() - assert not path_fs.path("foo").is_reserved() + # We currently do not have any reserved names for TargetPath + assert not path_fs.path("CON").is_reserved() + assert not path_fs.path("foo").is_reserved() def test_target_path_join(path_fs: VirtualFilesystem) -> None: @@ -759,7 +759,9 @@ def test_reverse_readlines() -> None: ] vfs.map_file_fh("file_multi_long_single", io.BytesIO((("🦊" * 8000) + ("a" * 200)).encode())) - assert list(fsutil.reverse_readlines(vfs.path("file_multi_long_single").open("rt"))) == [("🦊" * 8000) + ("a" * 200)] + assert list(fsutil.reverse_readlines(vfs.path("file_multi_long_single").open("rt"))) == [ + ("🦊" * 8000) + ("a" * 200) + ] vfs.map_file_fh("empty", io.BytesIO(b"")) assert list(fsutil.reverse_readlines(vfs.path("empty").open("rt"))) == [] From e86bd28a23c9a5364dd92e67eff5784e915d62c7 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 9 Jan 2025 13:54:18 +0100 Subject: [PATCH 08/12] fix linter --- tests/helpers/test_fsutil.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/helpers/test_fsutil.py b/tests/helpers/test_fsutil.py index 3c68b97f3..df4aeaf2d 100644 --- a/tests/helpers/test_fsutil.py +++ b/tests/helpers/test_fsutil.py @@ -759,9 +759,7 @@ def test_reverse_readlines() -> None: ] vfs.map_file_fh("file_multi_long_single", io.BytesIO((("🦊" * 8000) + ("a" * 200)).encode())) - assert list(fsutil.reverse_readlines(vfs.path("file_multi_long_single").open("rt"))) == [ - ("🦊" * 8000) + ("a" * 200) - ] + assert list(fsutil.reverse_readlines(vfs.path("file_multi_long_single").open("rt"))) == [("🦊" * 8000) + ("a" * 200)] vfs.map_file_fh("empty", io.BytesIO(b"")) assert list(fsutil.reverse_readlines(vfs.path("empty").open("rt"))) == [] From 03555c2d6bef7587dfb92062ad2674fd4712537b Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:15:21 +0100 Subject: [PATCH 09/12] Bump flow.record dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d6c6cedc8..58b0b0c4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "dissect.regf>=3.3,<4", "dissect.util>=3,<4", "dissect.volume>=2,<4", - "flow.record~=3.18.0", + "flow.record~=3.19.0", "structlog", ] dynamic = ["version"] From 3f5c65cd404cf2ec06be73eb38ac5981b8f9fbea Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:30:20 +0100 Subject: [PATCH 10/12] Add some more testing type hints --- tests/plugins/apps/vpn/test_wireguard.py | 2 +- tests/plugins/os/windows/test_recyclebin.py | 24 +++++++++++---------- tests/test_container.py | 7 +++--- tests/tools/test_diff.py | 18 ++++++++++------ 4 files changed, 29 insertions(+), 22 deletions(-) diff --git a/tests/plugins/apps/vpn/test_wireguard.py b/tests/plugins/apps/vpn/test_wireguard.py index 79dd6c02c..7d6d081ca 100644 --- a/tests/plugins/apps/vpn/test_wireguard.py +++ b/tests/plugins/apps/vpn/test_wireguard.py @@ -4,7 +4,7 @@ from tests._utils import absolute_path -def test_wireguard_plugin_global_log(target_unix_users: Target, fs_unix: VirtualFilesystem): +def test_wireguard_plugin_global_log(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: wireguard_config_file = absolute_path("_data/plugins/apps/vpn/wireguard/wg0.conf") fs_unix.map_file("/etc/wireguard/wg0.conf", wireguard_config_file) diff --git a/tests/plugins/os/windows/test_recyclebin.py b/tests/plugins/os/windows/test_recyclebin.py index e2b33a551..c6e5b5554 100644 --- a/tests/plugins/os/windows/test_recyclebin.py +++ b/tests/plugins/os/windows/test_recyclebin.py @@ -1,6 +1,7 @@ import io from pathlib import Path from tempfile import TemporaryDirectory +from typing import Iterator from unittest.mock import Mock, mock_open, patch import pytest @@ -8,10 +9,11 @@ from dissect.target.exceptions import UnsupportedPluginError from dissect.target.filesystem import VirtualFilesystem from dissect.target.plugins.os.windows.recyclebin import RecyclebinPlugin, c_recyclebin +from dissect.target.target import Target @pytest.fixture -def recycle_bin(): +def recycle_bin() -> Iterator[VirtualFilesystem]: recycle_bin = VirtualFilesystem() with TemporaryDirectory() as tmp_dir: @@ -19,17 +21,17 @@ def recycle_bin(): yield recycle_bin -def test_recycle_bin_compatibility_failed(target_win): +def test_recycle_bin_compatibility_failed(target_win: Target) -> None: with pytest.raises(UnsupportedPluginError): RecyclebinPlugin(target_win).check_compatible() -def test_recycle_bin_compat_succeeded(target_win, recycle_bin): +def test_recycle_bin_compat_succeeded(target_win: Target, recycle_bin: VirtualFilesystem) -> None: target_win.fs.mount("C:\\$recycle.bin", recycle_bin) assert RecyclebinPlugin(target_win).check_compatible() is None -def test_read_recycle_bin(target_win): +def test_read_recycle_bin(target_win: Target) -> None: mocked_file = Mock() mocked_file.is_file.return_value = True @@ -38,7 +40,7 @@ def test_read_recycle_bin(target_win): assert [mocked_bin_file.return_value] == list(RecyclebinPlugin(target_win).read_recycle_bin(mocked_file)) -def test_filtered_name(target_win): +def test_filtered_name(target_win: Target) -> None: mocked_file = Mock() mocked_file.is_file.return_value = True mocked_file.is_dir.return_value = False @@ -48,7 +50,7 @@ def test_filtered_name(target_win): assert [] == list(RecyclebinPlugin(target_win).read_recycle_bin(mocked_file)) -def test_read_recycle_bin_directory(target_win): +def test_read_recycle_bin_directory(target_win: Target) -> None: mocked_dir = Mock() mocked_dir.is_file.return_value = False mocked_dir.is_dir.return_value = True @@ -75,7 +77,7 @@ def test_read_recycle_bin_directory(target_win): (b"\x02" + b"\x00" * 7, "header_v2"), ], ) -def test_parse_header(target_win, version_number, expected_header): +def test_parse_header(target_win: Target, version_number: bytes, expected_header: str) -> None: recycle_plugin = RecyclebinPlugin(target_win) header = recycle_plugin.select_header(version_number) @@ -89,7 +91,7 @@ def test_parse_header(target_win, version_number, expected_header): "file/to/$recycle.bin/sid", ], ) -def test_read_bin_file_unknown(target_win, path): +def test_read_bin_file_unknown(target_win: Target, path: str) -> None: recycle_plugin = RecyclebinPlugin(target_win) header_1 = c_recyclebin.header_v1(version=0, file_size=0x20, timestamp=0x20, filename="hello_world" + "\x00" * 249) @@ -103,7 +105,7 @@ def test_read_bin_file_unknown(target_win, path): assert output.path == "hello_world" -def test_recyclebin_plugin_file(target_win, recycle_bin): +def test_recyclebin_plugin_file(target_win: Target, recycle_bin: VirtualFilesystem) -> None: recycle_bin.map_file_fh("$ihello_world", io.BytesIO(b"")) target_win.fs.mount("C:\\$recycle.bin", recycle_bin) @@ -114,7 +116,7 @@ def test_recyclebin_plugin_file(target_win, recycle_bin): assert recycle_bin_entries == [mocked_bin_file.return_value] -def test_recyclebin_plugin_wrong_prefix(target_win, recycle_bin): +def test_recyclebin_plugin_wrong_prefix(target_win: Target, recycle_bin: VirtualFilesystem) -> None: recycle_bin.map_file_fh("hello_world", io.BytesIO(b"")) target_win.fs.mount("C:\\$recycle.bin", recycle_bin) target_win.add_plugin(RecyclebinPlugin) @@ -133,7 +135,7 @@ def test_recyclebin_plugin_wrong_prefix(target_win, recycle_bin): ("C:/$Recycle.bin/just_another_file", "unknown"), ], ) -def test_find_sid_from_path(target_win, path, expected_output): +def test_find_sid_from_path(target_win: Target, path: str, expected_output: str) -> None: recycle_plugin = RecyclebinPlugin(target_win) assert recycle_plugin.find_sid(Path(path)) == expected_output diff --git a/tests/test_container.py b/tests/test_container.py index 22e3c9cf4..218d76324 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -1,6 +1,7 @@ import struct from io import BytesIO from pathlib import Path +from typing import Iterator from unittest.mock import Mock, patch import pytest @@ -11,7 +12,7 @@ @pytest.fixture -def mocked_ewf_detect(): +def mocked_ewf_detect() -> Iterator[Mock]: mocked_ewf = Mock() mocked_ewf.EwfContainer.detect.return_value = True mocked_ewf.EwfContainer.detect @@ -27,12 +28,12 @@ def mocked_ewf_detect(): ([Path("hello")], [Path("hello")]), ], ) -def test_open_inputs(mocked_ewf_detect: Mock, path, expected_output): +def test_open_inputs(mocked_ewf_detect: Mock, path: str | list[str] | Path, expected_output: Path | list[Path]) -> None: container.open(path) mocked_ewf_detect.assert_called_with(expected_output) -def test_open_fallback_fh(tmp_path): +def test_open_fallback_fh(tmp_path: Path) -> None: # Create a valid VHD file fake_vhd = ( (bytes(range(256)) * 2) diff --git a/tests/tools/test_diff.py b/tests/tools/test_diff.py index 26a65f191..75701ce5b 100644 --- a/tests/tools/test_diff.py +++ b/tests/tools/test_diff.py @@ -223,7 +223,9 @@ def test_differentiate_plugins(src_target: Target, dst_target: Target) -> None: assert deleted[0].record.hostname == "src_target" -def test_shell_ls(src_target: Target, dst_target: Target, capsys, monkeypatch) -> None: +def test_shell_ls( + src_target: Target, dst_target: Target, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch +) -> None: monkeypatch.setattr(fsutils, "LS_COLORS", {}) cli = DifferentialCli(src_target, dst_target, deep=True) @@ -248,7 +250,9 @@ def test_shell_ls(src_target: Target, dst_target: Target, capsys, monkeypatch) - assert captured.out == "\n".join(expected) + "\n" -def test_shell_find(src_target: Target, dst_target: Target, capsys, monkeypatch) -> None: +def test_shell_find( + src_target: Target, dst_target: Target, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch +) -> None: monkeypatch.setattr(fsutils, "LS_COLORS", {}) cli = DifferentialCli(src_target, dst_target, deep=True) @@ -275,7 +279,7 @@ def test_shell_find(src_target: Target, dst_target: Target, capsys, monkeypatch) assert captured.out == "\n".join(expected) + "\n" -def test_shell_cat(src_target: Target, dst_target: Target, capsys) -> None: +def test_shell_cat(src_target: Target, dst_target: Target, capsys: pytest.CaptureFixture) -> None: cli = DifferentialCli(src_target, dst_target, deep=True) cli.onecmd("cat /changes/unchanged") @@ -296,7 +300,7 @@ def test_shell_cat(src_target: Target, dst_target: Target, capsys) -> None: assert captured.out == "Hello From Destination Target\n" -def test_shell_plugin(src_target: Target, dst_target: Target, capsys) -> None: +def test_shell_plugin(src_target: Target, dst_target: Target, capsys: pytest.CaptureFixture) -> None: cli = DifferentialCli(src_target, dst_target, deep=True) cli.onecmd("plugin users") @@ -307,7 +311,7 @@ def test_shell_plugin(src_target: Target, dst_target: Target, capsys) -> None: assert "differential/record/deleted" in captured.out -def test_target_diff_shell(capsys, monkeypatch) -> None: +def test_target_diff_shell(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: with monkeypatch.context() as m: m.setattr(fsutils, "LS_COLORS", {}) m.setenv("NO_COLOR", "1") @@ -333,7 +337,7 @@ def test_target_diff_shell(capsys, monkeypatch) -> None: assert "unrecognized arguments" not in err -def test_target_diff_fs(capsys, monkeypatch) -> None: +def test_target_diff_fs(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: with monkeypatch.context() as m: src_target_path = absolute_path("_data/tools/diff/src.tar") dst_target_path = absolute_path("_data/tools/diff/dst.tar") @@ -346,7 +350,7 @@ def test_target_diff_fs(capsys, monkeypatch) -> None: assert "differential/file/deleted" in out -def test_target_diff_query(capsys, monkeypatch) -> None: +def test_target_diff_query(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: with monkeypatch.context() as m: src_target_path = absolute_path("_data/tools/diff/src.tar") dst_target_path = absolute_path("_data/tools/diff/dst.tar") From 2cb3c2637d5b7e3bea5db1cba6dbcaf744cf511a Mon Sep 17 00:00:00 2001 From: Erik Schamper <1254028+Schamper@users.noreply.github.com> Date: Fri, 10 Jan 2025 14:11:25 +0100 Subject: [PATCH 11/12] Apply suggestions from code review Co-authored-by: Stefan de Reuver <9864602+Horofic@users.noreply.github.com> --- dissect/target/helpers/compat/path_310.py | 2 +- dissect/target/helpers/compat/path_313.py | 3 ++- dissect/target/helpers/fsutil.py | 2 +- tests/test_container.py | 2 ++ 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dissect/target/helpers/compat/path_310.py b/dissect/target/helpers/compat/path_310.py index 150c2ceba..da581cb1c 100644 --- a/dissect/target/helpers/compat/path_310.py +++ b/dissect/target/helpers/compat/path_310.py @@ -88,7 +88,7 @@ def open( return path_common.io_open(path, mode, buffering, encoding, errors, newline) @staticmethod - def listdir(path: TargetPath) -> Iterator[str]: + def listdir(path: TargetPath) -> list[str]: return path.get().listdir() @staticmethod diff --git a/dissect/target/helpers/compat/path_313.py b/dissect/target/helpers/compat/path_313.py index 28a39957a..7e704f8ed 100644 --- a/dissect/target/helpers/compat/path_313.py +++ b/dissect/target/helpers/compat/path_313.py @@ -115,7 +115,8 @@ def __init__(self, fs: Filesystem, *pathsegments): def with_segments(self, *pathsegments) -> TargetPath: return type(self)(self._fs, *pathsegments) - # NOTE: This is copied from pathlib/_local.py but turned into an instance method so we get access to the correct flavour + # NOTE: This is copied from pathlib/_local.py + # but turned into an instance method so we get access to the correct flavour def _parse_path(self, path: str) -> tuple[str, str, list[str]]: if not path: return "", "", [] diff --git a/dissect/target/helpers/fsutil.py b/dissect/target/helpers/fsutil.py index da29d40b3..3b614dcec 100644 --- a/dissect/target/helpers/fsutil.py +++ b/dissect/target/helpers/fsutil.py @@ -315,7 +315,7 @@ def glob_split(pattern: str, alt_separator: str = "") -> tuple[str, str]: Args: pattern: A glob pattern to match names of filesystem entries against. - alt_separator: An alternative | Nonepath separator in use by the filesystem being matched. + alt_separator: An alternative path separator in use by the filesystem being matched. Returns: A tuple of a string with path parts up to the first path part that has a glob pattern and a string of diff --git a/tests/test_container.py b/tests/test_container.py index 218d76324..4e659968e 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import struct from io import BytesIO from pathlib import Path From 3fe62791a7d618750814e2eecca3f8d0f37a5b45 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Fri, 10 Jan 2025 15:36:20 +0100 Subject: [PATCH 12/12] Fix linting --- dissect/target/helpers/compat/path_313.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dissect/target/helpers/compat/path_313.py b/dissect/target/helpers/compat/path_313.py index 7e704f8ed..b974ce16c 100644 --- a/dissect/target/helpers/compat/path_313.py +++ b/dissect/target/helpers/compat/path_313.py @@ -115,7 +115,7 @@ def __init__(self, fs: Filesystem, *pathsegments): def with_segments(self, *pathsegments) -> TargetPath: return type(self)(self._fs, *pathsegments) - # NOTE: This is copied from pathlib/_local.py + # NOTE: This is copied from pathlib/_local.py # but turned into an instance method so we get access to the correct flavour def _parse_path(self, path: str) -> tuple[str, str, list[str]]: if not path: