diff --git a/dissect/target/container.py b/dissect/target/container.py index 90a5d60c2..da4428413 100644 --- a/dissect/target/container.py +++ b/dissect/target/container.py @@ -169,7 +169,7 @@ def close(self) -> None: Override this if you need to clean-up anything. """ - raise NotImplementedError() + pass def register(module: str, class_name: str, internal: bool = True): diff --git a/dissect/target/exceptions.py b/dissect/target/exceptions.py index 77dc540b8..b0cae7476 100644 --- a/dissect/target/exceptions.py +++ b/dissect/target/exceptions.py @@ -1,5 +1,6 @@ from __future__ import annotations +import errno import os import sys import traceback @@ -76,22 +77,32 @@ class PluginNotFoundError(PluginError): class FileNotFoundError(FilesystemError, FileNotFoundError): """The requested path could not be found.""" + errno = errno.ENOENT + class IsADirectoryError(FilesystemError, IsADirectoryError): """The entry is a directory.""" + errno = errno.EISDIR + class NotADirectoryError(FilesystemError, NotADirectoryError): """The entry is not a directory.""" + errno = errno.ENOTDIR + class NotASymlinkError(FilesystemError): """The entry is not a symlink.""" + errno = errno.EINVAL + class SymlinkRecursionError(FilesystemError): """A symlink loop is detected for the entry.""" + errno = errno.ELOOP + class RegistryError(Error): """A registry error occurred.""" diff --git a/dissect/target/helpers/compat/path_310.py b/dissect/target/helpers/compat/path_310.py index bee647c31..da581cb1c 100644 --- a/dissect/target/helpers/compat/path_310.py +++ b/dissect/target/helpers/compat/path_310.py @@ -20,23 +20,15 @@ import fnmatch import re from pathlib import Path, PurePath, _Accessor, _PosixFlavour -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Any, Callable, Iterator, Optional +from typing import IO, TYPE_CHECKING, Any, Callable, Iterator from dissect.target import filesystem from dissect.target.exceptions import FilesystemError, SymlinkRecursionError -from dissect.target.helpers.compat.path_common import ( - _DissectPathParents, - io_open, - isjunction, - realpath, - scandir, -) -from dissect.target.helpers.polypath import normalize +from dissect.target.helpers import polypath +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry - from dissect.target.helpers.compat.path_common import _DissectScandirIterator from dissect.target.helpers.fsutil import stat_result @@ -82,9 +74,9 @@ def open( path: TargetPath, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -93,15 +85,15 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(path, mode, buffering, encoding, errors, newline) + return path_common.io_open(path, mode, buffering, encoding, errors, newline) @staticmethod - def listdir(path: TargetPath) -> Iterator[str]: + def listdir(path: TargetPath) -> list[str]: return path.get().listdir() @staticmethod - def scandir(path: TargetPath) -> _DissectScandirIterator: - return scandir(path) + def scandir(path: TargetPath) -> path_common._DissectScandirIterator: + return path_common.scandir(path) @staticmethod def chmod(path: TargetPath, mode: int, *, follow_symlinks: bool = True) -> None: @@ -163,10 +155,10 @@ def getcwd() -> str: def expanduser(path: str) -> str: raise NotImplementedError("TargetPath.expanduser() is unsupported") - realpath = staticmethod(realpath) + realpath = staticmethod(path_common.realpath) # NOTE: Forward compatibility with CPython >= 3.12 - isjunction = staticmethod(isjunction) + isjunction = staticmethod(path_common.isjunction) _dissect_accessor = _DissectAccessor() @@ -193,7 +185,7 @@ def _from_parts(cls, args: list) -> TargetPath: path_args = [] for arg in args[1:]: if isinstance(arg, str): - arg = normalize(arg, alt_separator=alt_separator) + arg = polypath.normalize(arg, alt_separator=alt_separator) path_args.append(arg) self = super()._from_parts(path_args) @@ -247,8 +239,8 @@ def parent(self) -> TargetPath: return result @property - def parents(self) -> _DissectPathParents: - return _DissectPathParents(self) + def parents(self) -> path_common._DissectPathParents: + return path_common._DissectPathParents(self) class TargetPath(Path, PureDissectPath): @@ -382,9 +374,9 @@ def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -402,7 +394,7 @@ def write_bytes(self, data: bytes) -> int: raise NotImplementedError("TargetPath.write_bytes() is unsupported") def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. @@ -417,45 +409,6 @@ def readlink(self) -> TargetPath: obj = self._from_parts((self._fs, path)) return obj - def exists(self) -> bool: - """ - Whether this path exists. - """ - try: - # .exists() must resolve possible symlinks - self.get().stat() - return True - except (FilesystemError, ValueError): - return False - - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - # NOTE: Forward compatibility with CPython >= 3.12 def is_junction(self) -> bool: """ @@ -463,42 +416,6 @@ def is_junction(self) -> bool: """ return self._accessor.isjunction(self) - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_socket(self) -> bool: - """ - Whether this path is a socket. - """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - def expanduser(self) -> TargetPath: """Return a new path with expanded ~ and ~user constructs (as returned by os.path.expanduser) diff --git a/dissect/target/helpers/compat/path_311.py b/dissect/target/helpers/compat/path_311.py index d090921c0..ffc4d539e 100644 --- a/dissect/target/helpers/compat/path_311.py +++ b/dissect/target/helpers/compat/path_311.py @@ -17,34 +17,21 @@ Notes: - CPython 3.11 ditched the _Accessor class, so we override the methods that should use it """ + from __future__ import annotations import fnmatch import re from pathlib import Path, PurePath, _PosixFlavour -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Any, Callable, Iterator, Optional +from typing import IO, TYPE_CHECKING, Any, Callable, Iterator from dissect.target import filesystem -from dissect.target.exceptions import ( - FileNotFoundError, - FilesystemError, - NotADirectoryError, - NotASymlinkError, - SymlinkRecursionError, -) -from dissect.target.helpers.compat.path_common import ( - _DissectPathParents, - io_open, - isjunction, - realpath, - scandir, -) -from dissect.target.helpers.polypath import normalize +from dissect.target.exceptions import FilesystemError, SymlinkRecursionError +from dissect.target.helpers import polypath +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry - from dissect.target.helpers.compat.path_common import _DissectScandirIterator from dissect.target.helpers.fsutil import stat_result @@ -101,7 +88,7 @@ def _from_parts(cls, args: list) -> TargetPath: path_args = [] for arg in args[1:]: if isinstance(arg, str): - arg = normalize(arg, alt_separator=alt_separator) + arg = polypath.normalize(arg, alt_separator=alt_separator) path_args.append(arg) self = super()._from_parts(path_args) @@ -155,8 +142,8 @@ def parent(self) -> TargetPath: return result @property - def parents(self) -> _DissectPathParents: - return _DissectPathParents(self) + def parents(self) -> path_common._DissectPathParents: + return path_common._DissectPathParents(self) class TargetPath(Path, PureDissectPath): @@ -193,7 +180,7 @@ def iterdir(self) -> Iterator[TargetPath]: """Iterate over the files in this directory. Does not yield any result for the special paths '.' and '..'. """ - for entry in scandir(self): + for entry in path_common.scandir(self): if entry.name in {".", ".."}: # Yielding a path object for these makes little sense continue @@ -201,8 +188,8 @@ def iterdir(self) -> Iterator[TargetPath]: child_path._entry = entry yield child_path - def _scandir(self) -> _DissectScandirIterator: - return scandir(self) + def _scandir(self) -> path_common._DissectScandirIterator: + return path_common.scandir(self) # NOTE: Forward compatibility with CPython >= 3.12 def walk( @@ -267,7 +254,7 @@ def resolve(self, strict: bool = False) -> TargetPath: normalizing it. """ - s = realpath(self, strict=strict) + s = path_common.realpath(self, strict=strict) p = self._from_parts((self._fs, s)) # In non-strict mode, realpath() doesn't raise on symlink loops. @@ -306,9 +293,9 @@ def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -317,7 +304,7 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(self, mode, buffering, encoding, errors, newline) + return path_common.io_open(self, mode, buffering, encoding, errors, newline) def write_bytes(self, data: bytes) -> int: """ @@ -326,7 +313,7 @@ def write_bytes(self, data: bytes) -> int: raise NotImplementedError("TargetPath.write_bytes() is unsupported") def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. @@ -430,36 +417,6 @@ def link_to(self, target: str) -> None: """ raise NotImplementedError("TargetPath.link_to() is unsupported") - def exists(self) -> bool: - """ - Whether this path exists. - """ - try: - # .exists() must resolve possible symlinks - self.get().stat() - return True - except (FileNotFoundError, NotADirectoryError, NotASymlinkError, SymlinkRecursionError, ValueError): - return False - - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - def is_mount(self) -> bool: """ Check if this path is a POSIX mount point @@ -480,57 +437,12 @@ def is_mount(self) -> bool: parent_ino = self.parent.stat().st_ino return ino == parent_ino - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - # NOTE: Forward compatibility with CPython >= 3.12 def is_junction(self) -> bool: """ Whether this path is a junction. """ - return isjunction(self) - - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_socket(self) -> bool: - """ - Whether this path is a socket. - """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False + return path_common.isjunction(self) def expanduser(self) -> TargetPath: """Return a new path with expanded ~ and ~user constructs diff --git a/dissect/target/helpers/compat/path_312.py b/dissect/target/helpers/compat/path_312.py index 374617901..043322f7a 100644 --- a/dissect/target/helpers/compat/path_312.py +++ b/dissect/target/helpers/compat/path_312.py @@ -12,7 +12,7 @@ The implementation is split up in multiple files, one for each CPython version. You're currently looking at the CPython 3.12 implementation. -Commit hash we're in sync with: +Commit hash we're in sync with: f49221a Notes: - CPython 3.12 changed a lot in preparation of proper subclassing, so our patches differ @@ -25,22 +25,15 @@ import posixpath import sys from pathlib import Path, PurePath -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Iterator, Optional +from typing import IO, TYPE_CHECKING, Iterator from dissect.target import filesystem from dissect.target.exceptions import FilesystemError, SymlinkRecursionError from dissect.target.helpers import polypath -from dissect.target.helpers.compat.path_common import ( - io_open, - isjunction, - realpath, - scandir, -) +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry - from dissect.target.helpers.compat.path_common import _DissectScandirIterator from dissect.target.helpers.fsutil import stat_result @@ -69,7 +62,7 @@ def normcase(self, s: str) -> str: splitdrive = staticmethod(posixpath.splitdrive) - def splitroot(self, part: str) -> tuple[str, str]: + def splitroot(self, part: str) -> tuple[str, str, str]: return polypath.splitroot(part, alt_separator=self.altsep) def join(self, *args) -> str: @@ -93,14 +86,14 @@ def ismount(self, path: TargetPath) -> bool: parent_ino = path.parent.stat().st_ino return ino == parent_ino - isjunction = staticmethod(isjunction) + isjunction = staticmethod(path_common.isjunction) samestat = staticmethod(posixpath.samestat) def isabs(self, path: str) -> bool: return polypath.isabs(path, alt_separator=self.altsep) - realpath = staticmethod(realpath) + realpath = staticmethod(path_common.realpath) class PureDissectPath(PurePath): @@ -114,7 +107,7 @@ def __init__(self, fs: Filesystem, *pathsegments): if not isinstance(fs, filesystem.Filesystem): raise TypeError( "invalid PureDissectPath initialization: missing filesystem, " - "got %r (this might be a bug, please report)" % pathsegments + "got %r (this might be a bug, please report)" % (fs, *pathsegments) ) alt_separator = fs.alt_separator @@ -177,91 +170,13 @@ def stat(self, *, follow_symlinks: bool = True) -> stat_result: else: return self.get().lstat() - def exists(self, *, follow_symlinks: bool = True) -> bool: - """ - Whether this path exists. - - This method normally follows symlinks; to check whether a symlink exists, - add the argument follow_symlinks=False. - """ - try: - # .exists() must resolve possible symlinks - self.stat(follow_symlinks=follow_symlinks) - return True - except (FilesystemError, ValueError): - return False - - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_socket(self) -> bool: - """ - Whether this path is a socket. - """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -270,7 +185,7 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(self, mode, buffering, encoding, errors, newline) + return path_common.io_open(self, mode, buffering, encoding, errors, newline) def write_bytes(self, data: bytes) -> int: """ @@ -279,7 +194,7 @@ def write_bytes(self, data: bytes) -> int: raise NotImplementedError("TargetPath.write_bytes() is unsupported") def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. @@ -290,7 +205,7 @@ def iterdir(self) -> Iterator[TargetPath]: """Iterate over the files in this directory. Does not yield any result for the special paths '.' and '..'. """ - for entry in scandir(self): + for entry in path_common.scandir(self): if entry.name in {".", ".."}: # Yielding a path object for these makes little sense continue @@ -298,8 +213,8 @@ def iterdir(self) -> Iterator[TargetPath]: child_path._entry = entry yield child_path - def _scandir(self) -> _DissectScandirIterator: - return scandir(self) + def _scandir(self) -> path_common._DissectScandirIterator: + return path_common.scandir(self) @classmethod def cwd(cls) -> TargetPath: diff --git a/dissect/target/helpers/compat/path_313.py b/dissect/target/helpers/compat/path_313.py new file mode 100644 index 000000000..b974ce16c --- /dev/null +++ b/dissect/target/helpers/compat/path_313.py @@ -0,0 +1,400 @@ +"""A pathlib.Path compatible implementation for dissect.target. + +This allows for the majority of the pathlib.Path API to "just work" on dissect.target filesystems. + +Most of this consists of subclassed internal classes with dissect.target specific patches, +but sometimes the change to a function is small, so the entire internal function is copied +and only a small part changed. To ease updating this code, the order of functions, comments +and code style is kept largely the same as the original pathlib.py. + +Yes, we know, this is playing with fire and it can break on new CPython releases. + +The implementation is split up in multiple files, one for each CPython version. +You're currently looking at the CPython 3.13 implementation. + +Commit hash we're in sync with: 094d95f + +Notes: + - https://docs.python.org/3.13/whatsnew/3.13.html#pathlib + - https://github.com/python/cpython/blob/3.13/Lib/pathlib/_local.py +""" + +from __future__ import annotations + +import posixpath +import sys +from glob import _Globber +from pathlib import Path, PurePath +from pathlib._abc import PathBase, UnsupportedOperation +from typing import IO, TYPE_CHECKING, Callable, Iterator + +from dissect.target import filesystem +from dissect.target.exceptions import FilesystemError, SymlinkRecursionError +from dissect.target.helpers import polypath +from dissect.target.helpers.compat import path_common + +if TYPE_CHECKING: + from dissect.target.filesystem import Filesystem, FilesystemEntry + from dissect.target.helpers.fsutil import stat_result + + +class _DissectParser: + sep = "/" + altsep = "" + case_sensitive = False + + __variant_instances = {} + + def __new__(cls, case_sensitive: bool = False, alt_separator: str = ""): + idx = (case_sensitive, alt_separator) + instance = cls.__variant_instances.get(idx, None) + if instance is None: + instance = object.__new__(cls) + cls.__variant_instances[idx] = instance + + return instance + + def __init__(self, case_sensitive: bool = False, alt_separator: str = ""): + self.altsep = alt_separator + self.case_sensitive = case_sensitive + + def normcase(self, s: str) -> str: + return s if self.case_sensitive else s.lower() + + def split(self, part: str) -> tuple[str, str]: + return polypath.split(part, alt_separator=self.altsep) + + splitdrive = staticmethod(posixpath.splitdrive) + + def splitroot(self, part: str) -> tuple[str, str, str]: + return polypath.splitroot(part, alt_separator=self.altsep) + + def join(self, *args) -> str: + return polypath.join(*args, alt_separator=self.altsep) + + isjunction = staticmethod(path_common.isjunction) + + def isabs(self, path: str) -> bool: + return polypath.isabs(str(path), alt_separator=self.altsep) + + realpath = staticmethod(path_common.realpath) + + +class _DissectGlobber(_Globber): + @staticmethod + def add_slash(path: TargetPath) -> TargetPath: + return _GlobberTargetPath(path._fs, path, "") + + +class PureDissectPath(PurePath): + _fs: Filesystem + parser: _DissectParser = _DissectParser(case_sensitive=False) + _globber = _DissectGlobber + + def __reduce__(self) -> tuple: + raise TypeError("TargetPath pickling is currently not supported") + + def __init__(self, fs: Filesystem, *pathsegments): + if not isinstance(fs, filesystem.Filesystem): + raise TypeError( + "invalid PureDissectPath initialization: missing filesystem, " + "got %r (this might be a bug, please report)" % (fs, *pathsegments) + ) + + alt_separator = fs.alt_separator + path_args = [] + for arg in pathsegments: + if isinstance(arg, str): + arg = polypath.normalize(arg, alt_separator=alt_separator) + path_args.append(arg) + + super().__init__(*path_args) + self._fs = fs + self.parser = _DissectParser(alt_separator=fs.alt_separator, case_sensitive=fs.case_sensitive) + + def with_segments(self, *pathsegments) -> TargetPath: + return type(self)(self._fs, *pathsegments) + + # NOTE: This is copied from pathlib/_local.py + # but turned into an instance method so we get access to the correct flavour + def _parse_path(self, path: str) -> tuple[str, str, list[str]]: + if not path: + return "", "", [] + sep = self.parser.sep + altsep = self.parser.altsep + if altsep: + path = path.replace(altsep, sep) + drv, root, rel = self.parser.splitroot(path) + if not root and drv.startswith(sep) and not drv.endswith(sep): + drv_parts = drv.split(sep) + if len(drv_parts) == 4 and drv_parts[2] not in "?.": + # e.g. //server/share + root = sep + elif len(drv_parts) == 6: + # e.g. //?/unc/server/share + root = sep + parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] + return drv, root, parsed + + +class TargetPath(Path, PureDissectPath): + __slots__ = ("_entry",) + + @classmethod + def _unsupported_msg(cls, attribute: str) -> str: + return f"{cls.__name__}.{attribute} is unsupported" + + def get(self) -> FilesystemEntry: + try: + return self._entry + except AttributeError: + self._entry = self._fs.get(str(self)) + return self._entry + + def stat(self, *, follow_symlinks: bool = True) -> stat_result: + """ + Return the result of the stat() system call on this path, like + os.stat() does. + """ + if follow_symlinks: + return self.get().stat() + else: + return self.get().lstat() + + def exists(self, *, follow_symlinks: bool = True) -> bool: + """ + Whether this path exists. + + This method normally follows symlinks; to check whether a symlink exists, + add the argument follow_symlinks=False. + """ + try: + # .exists() must resolve possible symlinks + self.stat(follow_symlinks=follow_symlinks) + return True + except (FilesystemError, ValueError): + return False + + is_mount = PathBase.is_mount + + def is_junction(self) -> bool: + """ + Whether this path is a junction. + """ + return self.parser.isjunction(self) + + def open( + self, + mode: str = "rb", + buffering: int = 0, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ) -> IO: + """Open file and return a stream. + + Supports a subset of features of the real pathlib.open/io.open. + + Note: in contrast to regular Python, the mode is binary by default. Text mode + has to be explicitly specified. Buffering is also disabled by default. + """ + return path_common.io_open(self, mode, buffering, encoding, errors, newline) + + def write_bytes(self, data: bytes) -> int: + """ + Open the file in bytes mode, write to it, and close the file. + """ + raise UnsupportedOperation(self._unsupported_msg("write_bytes()")) + + def write_text( + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None + ) -> int: + """ + Open the file in text mode, write to it, and close the file. + """ + raise UnsupportedOperation(self._unsupported_msg("write_text()")) + + def iterdir(self) -> Iterator[TargetPath]: + """Yield path objects of the directory contents. + + The children are yielded in arbitrary order, and the + special entries '.' and '..' are not included. + """ + with path_common.scandir(self) as scandir_it: + for entry in scandir_it: + name = entry.name + child_path = self.joinpath(name) + child_path._entry = entry + yield child_path + + def glob( + self, pattern: str, *, case_sensitive: bool | None = None, recurse_symlinks: bool = False + ) -> Iterator[TargetPath]: + """Iterate over this subtree and yield all existing files (of any + kind, including directories) matching the given relative pattern. + """ + return PathBase.glob(self, pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks) + + def rglob( + self, pattern: str, *, case_sensitive: bool | None = None, recurse_symlinks: str = False + ) -> Iterator[TargetPath]: + """Recursively yield all existing files (of any kind, including + directories) matching the given relative pattern, anywhere in + this subtree. + """ + return PathBase.rglob(self, pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks) + + def walk( + self, top_down: bool = True, on_error: Callable[[Exception], None] = None, follow_symlinks: bool = False + ) -> Iterator[tuple[TargetPath, list[str], list[str]]]: + """Walk the directory tree from this directory, similar to os.walk().""" + return PathBase.walk(self, top_down=top_down, on_error=on_error, follow_symlinks=follow_symlinks) + + def absolute(self) -> TargetPath: + """Return an absolute version of this path + No normalization or symlink resolution is performed. + + Use resolve() to resolve symlinks and remove '..' segments. + """ + raise UnsupportedOperation(self._unsupported_msg("absolute()")) + + @classmethod + def cwd(cls) -> TargetPath: + """Return a new path pointing to the current working directory.""" + raise UnsupportedOperation(cls._unsupported_msg("cwd()")) + + def expanduser(self) -> TargetPath: + """Return a new path with expanded ~ and ~user constructs + (as returned by os.path.expanduser) + """ + raise UnsupportedOperation(self._unsupported_msg("expanduser()")) + + @classmethod + def home(cls) -> TargetPath: + """Return a new path pointing to the user's home directory (as + returned by os.path.expanduser('~')). + """ + raise UnsupportedOperation(cls._unsupported_msg("home()")) + + def readlink(self) -> TargetPath: + """ + Return the path to which the symbolic link points. + """ + return self.with_segments(self.get().readlink()) + + # NOTE: We changed some of the error handling here to deal with our own exception types + def resolve(self, strict: bool = False) -> TargetPath: + """ + Make the path absolute, resolving all symlinks on the way and also + normalizing it. + """ + + s = self.parser.realpath(self, strict=strict) + p = self.with_segments(s) + + # In non-strict mode, realpath() doesn't raise on symlink loops. + # Ensure we get an exception by calling stat() + if not strict: + try: + p.stat() + except FilesystemError as e: + if isinstance(e, SymlinkRecursionError): + raise + return p + + def symlink_to(self, target: str, target_is_directory: bool = False) -> None: + """ + Make this path a symlink pointing to the target path. + Note the order of arguments (link, target) is the reverse of os.symlink. + """ + raise UnsupportedOperation(self._unsupported_msg("symlink_to()")) + + def hardlink_to(self, target: str) -> None: + """ + Make this path a hard link pointing to the same file as *target*. + + Note the order of arguments (self, target) is the reverse of os.link's. + """ + raise UnsupportedOperation(self._unsupported_msg("hardlink_to()")) + + def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: + """ + Create this file with the given access mode, if it doesn't exist. + """ + raise UnsupportedOperation(self._unsupported_msg("touch()")) + + def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False) -> None: + """ + Create a new directory at this given path. + """ + raise UnsupportedOperation(self._unsupported_msg("mkdir()")) + + def rename(self, target: str) -> TargetPath: + """ + Rename this path to the target path. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + raise UnsupportedOperation(self._unsupported_msg("rename()")) + + def replace(self, target: str) -> TargetPath: + """ + Rename this path to the target path, overwriting if that path exists. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + raise UnsupportedOperation(self._unsupported_msg("replace()")) + + def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: + """ + Change the permissions of the path, like os.chmod(). + """ + raise UnsupportedOperation(self._unsupported_msg("chmod()")) + + def lchmod(self, mode: int) -> None: + """ + Like chmod(), except if the path points to a symlink, the symlink's + permissions are changed, rather than its target's. + """ + raise UnsupportedOperation(self._unsupported_msg("lchmod()")) + + def unlink(self, missing_ok: bool = False) -> None: + """ + Remove this file or link. + If the path is a directory, use rmdir() instead. + """ + raise UnsupportedOperation(self._unsupported_msg("unlink()")) + + def rmdir(self) -> None: + """ + Remove this directory. The directory must be empty. + """ + raise UnsupportedOperation(self._unsupported_msg("rmdir()")) + + def owner(self) -> str: + """ + Return the login name of the file owner. + """ + raise UnsupportedOperation(self._unsupported_msg("owner()")) + + def group(self) -> str: + """ + Return the group name of the file gid. + """ + raise UnsupportedOperation(self._unsupported_msg("group()")) + + +class _GlobberTargetPath(TargetPath): + def __str__(self) -> str: + # This is necessary because the _Globber class expects an added `/` at the end + # However, only PurePathBase properly adds that, PurePath doesn't + # We do want to operate on Path objects rather than strings, so do a little hack here + return self._raw_path diff --git a/dissect/target/helpers/compat/path_39.py b/dissect/target/helpers/compat/path_39.py index 70566ef90..2af3189b0 100644 --- a/dissect/target/helpers/compat/path_39.py +++ b/dissect/target/helpers/compat/path_39.py @@ -20,8 +20,7 @@ import fnmatch import re from pathlib import Path, PurePath, _Accessor, _PosixFlavour -from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK -from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Iterator, Optional +from typing import IO, TYPE_CHECKING, Any, BinaryIO, Callable, Iterator from dissect.target import filesystem from dissect.target.exceptions import ( @@ -29,13 +28,8 @@ NotASymlinkError, SymlinkRecursionError, ) -from dissect.target.helpers.compat.path_common import ( - _DissectPathParents, - io_open, - isjunction, - scandir, -) -from dissect.target.helpers.polypath import normalize, normpath +from dissect.target.helpers import polypath +from dissect.target.helpers.compat import path_common if TYPE_CHECKING: from dissect.target.filesystem import Filesystem, FilesystemEntry @@ -147,7 +141,7 @@ def listdir(path: TargetPath) -> list[str]: @staticmethod def scandir(path: TargetPath) -> _DissectScandirIterator: - return scandir(path) + return path_common.scandir(path) @staticmethod def chmod(path: TargetPath, mode: int, *, follow_symlinks: bool = True) -> None: @@ -204,7 +198,7 @@ def group(path: TargetPath) -> str: raise NotImplementedError("TargetPath.group() is unsupported") # NOTE: Forward compatibility with CPython >= 3.12 - isjunction = staticmethod(isjunction) + isjunction = staticmethod(path_common.isjunction) _dissect_accessor = _DissectAccessor() @@ -231,7 +225,7 @@ def _from_parts(cls, args: list, init: bool = True) -> TargetPath: path_args = [] for arg in args[1:]: if isinstance(arg, str): - arg = normalize(arg, alt_separator=alt_separator) + arg = polypath.normalize(arg, alt_separator=alt_separator) path_args.append(arg) self = super()._from_parts(path_args, init=init) @@ -285,14 +279,14 @@ def parent(self) -> TargetPath: return result @property - def parents(self) -> _DissectPathParents: - return _DissectPathParents(self) + def parents(self) -> path_common._DissectPathParents: + return path_common._DissectPathParents(self) class TargetPath(Path, PureDissectPath): __slots__ = ("_entry",) - def _init(self, template: Optional[Path] = None) -> None: + def _init(self, template: Path | None = None) -> None: self._accessor = _dissect_accessor def _make_child_relpath(self, part: str) -> TargetPath: @@ -403,7 +397,7 @@ def resolve(self, strict: bool = False) -> TargetPath: self.stat() s = str(self.absolute()) # Now we have no symlinks in the path, it's safe to normalize it. - normed = normpath(s, self._flavour.altsep) + normed = polypath.normpath(s, self._flavour.altsep) obj = self._from_parts((self._fs, normed), init=False) obj._init(template=self) return obj @@ -421,9 +415,9 @@ def open( self, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. @@ -432,7 +426,7 @@ def open( Note: in contrast to regular Python, the mode is binary by default. Text mode has to be explicitly specified. Buffering is also disabled by default. """ - return io_open(self, mode, buffering, encoding, errors, newline) + return path_common.io_open(self, mode, buffering, encoding, errors, newline) def write_bytes(self, data: bytes) -> int: """ @@ -441,7 +435,7 @@ def write_bytes(self, data: bytes) -> int: raise NotImplementedError("TargetPath.write_bytes() is unsupported") def write_text( - self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None + self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None ) -> int: """ Open the file in text mode, write to it, and close the file. @@ -456,45 +450,6 @@ def readlink(self) -> TargetPath: obj = self._from_parts((self._fs, path), init=False) return obj - def exists(self) -> bool: - """ - Whether this path exists. - """ - try: - # .exists() must resolve possible symlinks - self.get().stat() - return True - except (FilesystemError, ValueError): - return False - - def is_dir(self) -> bool: - """ - Whether this path is a directory. - """ - try: - return self.get().is_dir() - except (FilesystemError, ValueError): - return False - - def is_file(self) -> bool: - """ - Whether this path is a regular file (also True for symlinks pointing - to regular files). - """ - try: - return self.get().is_file() - except (FilesystemError, ValueError): - return False - - def is_symlink(self) -> bool: - """ - Whether this path is a symbolic link. - """ - try: - return self.get().is_symlink() - except (FilesystemError, ValueError): - return False - # NOTE: Forward compatibility with CPython >= 3.12 def is_junction(self) -> bool: """ @@ -502,42 +457,6 @@ def is_junction(self) -> bool: """ return self._accessor.isjunction(self) - def is_block_device(self) -> bool: - """ - Whether this path is a block device. - """ - try: - return S_ISBLK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_char_device(self) -> bool: - """ - Whether this path is a character device. - """ - try: - return S_ISCHR(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_fifo(self) -> bool: - """ - Whether this path is a FIFO. - """ - try: - return S_ISFIFO(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - - def is_socket(self) -> bool: - """ - Whether this path is a socket. - """ - try: - return S_ISSOCK(self.stat().st_mode) - except (FilesystemError, ValueError): - return False - def expanduser(self) -> TargetPath: """Return a new path with expanded ~ and ~user constructs (as returned by os.path.expanduser) diff --git a/dissect/target/helpers/compat/path_common.py b/dissect/target/helpers/compat/path_common.py index eda3962a7..d8bde46b6 100644 --- a/dissect/target/helpers/compat/path_common.py +++ b/dissect/target/helpers/compat/path_common.py @@ -4,11 +4,11 @@ import posixpath import stat import sys -from typing import IO, TYPE_CHECKING, Iterator, Literal, Optional +from typing import IO, TYPE_CHECKING, Iterator, Literal if TYPE_CHECKING: - from dissect.target.helpers.fsutil import TargetPath from dissect.target.filesystem import Filesystem, FilesystemEntry + from dissect.target.helpers.fsutil import TargetPath from dissect.target.exceptions import FilesystemError, SymlinkRecursionError from dissect.target.helpers.polypath import abspath, normalize @@ -173,9 +173,9 @@ def io_open( path: TargetPath, mode: str = "rb", buffering: int = 0, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, ) -> IO: """Open file and return a stream. diff --git a/dissect/target/helpers/config.py b/dissect/target/helpers/config.py index 7a1cc3faa..e12c1eff7 100644 --- a/dissect/target/helpers/config.py +++ b/dissect/target/helpers/config.py @@ -70,6 +70,9 @@ def _find_config_file(paths: list[Path | str] | None) -> Path | None: config_file = None for path in paths: + if not path: + continue + path = Path(path) cur_path = path.absolute() diff --git a/dissect/target/helpers/configutil.py b/dissect/target/helpers/configutil.py index d676303d9..38c9a218a 100644 --- a/dissect/target/helpers/configutil.py +++ b/dissect/target/helpers/configutil.py @@ -305,7 +305,6 @@ def parse_file(self, fh: TextIO) -> None: class Bin(ConfigurationParser): - """Read the file into ``binary`` and show the number of bytes read""" def parse_file(self, fh: io.BytesIO) -> None: diff --git a/dissect/target/helpers/fsutil.py b/dissect/target/helpers/fsutil.py index f79155b12..f0f5cacc4 100644 --- a/dissect/target/helpers/fsutil.py +++ b/dissect/target/helpers/fsutil.py @@ -11,7 +11,7 @@ import re import sys from pathlib import Path -from typing import Any, BinaryIO, Iterator, Optional, Sequence, TextIO, Union +from typing import Any, BinaryIO, Iterator, Sequence, TextIO try: import lzma @@ -42,6 +42,7 @@ commonpath, dirname, isabs, + isreserved, join, normalize, normpath, @@ -52,7 +53,9 @@ splitroot, ) -if sys.version_info >= (3, 12): +if sys.version_info >= (3, 13): + from dissect.target.helpers.compat.path_313 import PureDissectPath, TargetPath +elif sys.version_info >= (3, 12): from dissect.target.helpers.compat.path_312 import PureDissectPath, TargetPath elif sys.version_info >= (3, 11): from dissect.target.helpers.compat.path_311 import PureDissectPath, TargetPath @@ -80,6 +83,7 @@ "glob_split", "has_glob_magic", "isabs", + "isreserved", "join", "normalize", "normpath", @@ -100,9 +104,9 @@ ] -def generate_addr(path: Union[str, Path], alt_separator: str = "") -> int: +def generate_addr(path: str | Path, alt_separator: str = "") -> int: if not alt_separator and isinstance(path, Path): - alt_separator = path._flavour.altsep + alt_separator = (getattr(path, "parser", None) or path._flavour).altsep path = normalize(str(path), alt_separator=alt_separator) return int(hashlib.sha256(path.encode()).hexdigest()[:8], 16) @@ -233,7 +237,7 @@ def __repr__(self) -> str: ) return f"dissect.target.stat_result({values})" - def _parse_time(self, ts: Union[int, float]) -> tuple[int, float, int]: + def _parse_time(self, ts: int | float) -> tuple[int, float, int]: ts_int = int(ts) ts_ns = int(ts * 1e9) @@ -311,7 +315,7 @@ def glob_split(pattern: str, alt_separator: str = "") -> tuple[str, str]: Args: pattern: A glob pattern to match names of filesystem entries against. - alt_separator: An optional alternative path separator in use by the filesystem being matched. + alt_separator: An alternative path separator in use by the filesystem being matched. Returns: A tuple of a string with path parts up to the first path part that has a glob pattern and a string of @@ -488,14 +492,14 @@ def resolve_link( def open_decompress( - path: Optional[TargetPath] = None, + path: TargetPath | None = None, mode: str = "rb", *, - fileobj: Optional[BinaryIO] = None, - encoding: Optional[str] = "UTF-8", - errors: Optional[str] = "backslashreplace", - newline: Optional[str] = None, -) -> Union[BinaryIO, TextIO]: + fileobj: BinaryIO | None = None, + encoding: str | None = "UTF-8", + errors: str | None = "backslashreplace", + newline: str | None = None, +) -> BinaryIO | TextIO: """Open and decompress a file. Handles gz, bz2 and zstd files. Uncompressed files are opened as-is. When passing in an already opened ``fileobj``, the mode, encoding, errors and newline arguments are ignored. @@ -606,9 +610,9 @@ def reverse_readlines(fh: TextIO, chunk_size: int = 1024 * 1024 * 8) -> Iterator def fs_attrs( - path: Union[os.PathLike, str, bytes], + path: os.PathLike | str | bytes, follow_symlinks: bool = True, -) -> dict[Union[os.PathLike, str, bytes], bytes]: +) -> dict[os.PathLike | str | bytes, bytes]: """Return the extended attributes for a given path on the local filesystem. This is currently only implemented for Linux using os.listxattr and related functions. diff --git a/dissect/target/helpers/polypath.py b/dissect/target/helpers/polypath.py index dfe44cf70..ca7694478 100644 --- a/dissect/target/helpers/polypath.py +++ b/dissect/target/helpers/polypath.py @@ -37,7 +37,7 @@ def split(path: str, alt_separator: str = "") -> str: splitdrive = posixpath.splitdrive -def splitroot(path: str, alt_separator: str = "") -> tuple[str, str]: +def splitroot(path: str, alt_separator: str = "") -> tuple[str, str, str]: return posixpath.splitroot(normalize(path, alt_separator=alt_separator)) @@ -71,3 +71,11 @@ def relpath(path: str, start: str, alt_separator: str = "") -> str: def commonpath(paths: list[str], alt_separator: str = "") -> str: return posixpath.commonpath([normalize(path, alt_separator=alt_separator) for path in paths]) + + +def isreserved(path: str) -> bool: + """Return True if the path is a reserved name. + + We currently do not have any reserved names. + """ + return False diff --git a/dissect/target/helpers/utils.py b/dissect/target/helpers/utils.py index 7648e94e2..1cb39750f 100644 --- a/dissect/target/helpers/utils.py +++ b/dissect/target/helpers/utils.py @@ -143,15 +143,19 @@ def year_rollover_helper( log.debug("Skipping line: %s", line) continue + # We have to append the current_year to strptime instead of adding it using replace later. + # This prevents DeprecationWarnings on cpython >= 3.13 and Exceptions on cpython >= 3.15. + # See https://github.com/python/cpython/issues/70647 and https://github.com/python/cpython/pull/117107. + compare_ts = datetime.strptime(f"{timestamp.group(0)};1900", f"{ts_format};%Y") + if last_seen_month and compare_ts.month > last_seen_month: + current_year -= 1 + last_seen_month = compare_ts.month + try: - relative_ts = datetime.strptime(timestamp.group(0), ts_format) + relative_ts = datetime.strptime(f"{timestamp.group(0)};{current_year}", f"{ts_format};%Y") except ValueError as e: log.warning("Timestamp '%s' does not match format '%s', skipping line.", timestamp.group(0), ts_format) log.debug("", exc_info=e) continue - if last_seen_month and relative_ts.month > last_seen_month: - current_year -= 1 - last_seen_month = relative_ts.month - - yield relative_ts.replace(year=current_year, tzinfo=tzinfo), line + yield relative_ts.replace(tzinfo=tzinfo), line diff --git a/dissect/target/plugins/apps/container/docker.py b/dissect/target/plugins/apps/container/docker.py index a33dce5f9..dfc479914 100644 --- a/dissect/target/plugins/apps/container/docker.py +++ b/dissect/target/plugins/apps/container/docker.py @@ -260,9 +260,11 @@ def logs(self, raw_messages: bool = False, remove_backspaces: bool = False) -> I ts=log_entry.get("time"), container=container.name, # container hash stream=log_entry.get("stream"), - message=log_entry.get("log") - if raw_messages - else strip_log(log_entry.get("log"), remove_backspaces), + message=( + log_entry.get("log") + if raw_messages + else strip_log(log_entry.get("log"), remove_backspaces) + ), _target=self.target, ) @@ -273,9 +275,9 @@ def logs(self, raw_messages: bool = False, remove_backspaces: bool = False) -> I ts=ts.from_unix_us(log_entry.ts // 1000), container=container.parent.name, # container hash stream=log_entry.source, - message=log_entry.message - if raw_messages - else strip_log(log_entry.message, remove_backspaces), + message=( + log_entry.message if raw_messages else strip_log(log_entry.message, remove_backspaces) + ), _target=self.target, ) diff --git a/dissect/target/plugins/apps/vpn/wireguard.py b/dissect/target/plugins/apps/vpn/wireguard.py index 2b5942d80..57c2e0d1b 100644 --- a/dissect/target/plugins/apps/vpn/wireguard.py +++ b/dissect/target/plugins/apps/vpn/wireguard.py @@ -24,7 +24,7 @@ ("string", "postup"), ("string", "predown"), ("string", "postdown"), - ("string", "source"), + ("path", "source"), ], ) @@ -37,7 +37,7 @@ ("net.ipnetwork[]", "allowed_ips"), ("string", "endpoint"), ("varint", "persistent_keep_alive"), - ("string", "source"), + ("path", "source"), ], ) @@ -77,7 +77,7 @@ def __init__(self, target) -> None: super().__init__(target) self.configs: list[Path] = [] for path in self.CONFIG_GLOBS: - self.configs.extend(self.target.fs.path().glob(path.lstrip("/"))) + self.configs.extend(self.target.fs.path("/").glob(path.lstrip("/"))) def check_compatible(self) -> None: if not self.configs: diff --git a/dissect/target/plugins/os/windows/clfs.py b/dissect/target/plugins/os/windows/clfs.py index f07cadfec..e386a0f92 100644 --- a/dissect/target/plugins/os/windows/clfs.py +++ b/dissect/target/plugins/os/windows/clfs.py @@ -4,7 +4,6 @@ from dissect.clfs.exceptions import InvalidBLFError, InvalidRecordBlockError from dissect.target.exceptions import UnsupportedPluginError -from dissect.target.helpers import fsutil from dissect.target.helpers.record import TargetRecordDescriptor from dissect.target.plugin import Plugin, export from dissect.target.target import Target @@ -91,7 +90,6 @@ def clfs(self) -> Iterator[ClfsRecord]: continue container_path = blf_container.name.replace("%BLF%", str(blf_path.parent)) - container_path = fsutil.normalize(container_path, alt_separator=blf_path._flavour.altsep) container_file = self.target.fs.path(container_path) fh = container_file.open() diff --git a/dissect/target/plugins/os/windows/datetime.py b/dissect/target/plugins/os/windows/datetime.py index 55d7c9967..a4ffe8784 100644 --- a/dissect/target/plugins/os/windows/datetime.py +++ b/dissect/target/plugins/os/windows/datetime.py @@ -1,7 +1,6 @@ import calendar from collections import namedtuple from datetime import datetime, timedelta, timezone, tzinfo -from typing import Dict, Tuple from dissect.cstruct import cstruct @@ -88,7 +87,7 @@ def parse_systemtime_transition(systemtime: c_tz._SYSTEMTIME, year: int) -> date ) -def parse_dynamic_dst(key: RegistryKey) -> Dict[int, TimezoneInformation]: +def parse_dynamic_dst(key: RegistryKey) -> dict[int, TimezoneInformation]: """Parse dynamic DST information from a given TimeZoneInformation registry key. If a timezone has dynamic DST information, there's a "Dynamic DST" subkey with values for each year. @@ -121,7 +120,7 @@ def parse_tzi(tzi: bytes) -> TimezoneInformation: ) -def get_dst_range(tzi: TimezoneInformation, year: int) -> Tuple[datetime, datetime]: +def get_dst_range(tzi: TimezoneInformation, year: int) -> tuple[datetime, datetime]: """Get the start and end date of DST for the given year.""" start = parse_systemtime_transition(tzi.daylight_date, year) end = parse_systemtime_transition(tzi.standard_date, year) @@ -160,9 +159,19 @@ def is_dst(self, dt: datetime) -> bool: flip = True start, end = end, start - # Can't compare naive to aware objects, so strip the timezone from - # dt first. - dt = dt.replace(tzinfo=None) + # Can't compare naive to aware objects, so strip the timezone from dt first + # Cast to a proper datetime object to avoid issues with subclassed datetime objects + dt = datetime( + dt.year, + dt.month, + dt.day, + dt.hour, + dt.minute, + dt.second, + dt.microsecond, + tzinfo=None, + fold=dt.fold, + ) result = False if start + HOUR <= dt < end - HOUR: diff --git a/dissect/target/target.py b/dissect/target/target.py index 5dd6f81ec..1489a9b53 100644 --- a/dissect/target/target.py +++ b/dissect/target/target.py @@ -95,7 +95,7 @@ def __init__(self, path: Union[str, Path] = None): try: self._config = config.load(config_paths) except Exception as e: - self.log.warning("Error loading config file: %s", self.path) + self.log.warning("Error loading config file: %s", config_paths) self.log.debug("", exc_info=e) self._config = config.load(None) # This loads an empty config. @@ -616,7 +616,7 @@ def get_function(self, function: str) -> FunctionTuple: raise UnsupportedPluginError( f"Unsupported function `{function}` for target with OS plugin {self._os_plugin}", extra=causes[1:] if len(causes) > 1 else None, - ) from causes[0] if causes else None + ) from (causes[0] if causes else None) # We still ended up with no compatible plugins if function not in self._functions: diff --git a/dissect/target/tools/fsutils.py b/dissect/target/tools/fsutils.py index c79b8dfc8..954046864 100644 --- a/dissect/target/tools/fsutils.py +++ b/dissect/target/tools/fsutils.py @@ -217,9 +217,11 @@ def filetype(path: TargetPath) -> str: atime=datetime.fromtimestamp(s.st_atime, tz=timezone.utc).isoformat(timespec="microseconds"), mtime=datetime.fromtimestamp(s.st_mtime, tz=timezone.utc).isoformat(timespec="microseconds"), ctime=datetime.fromtimestamp(s.st_ctime, tz=timezone.utc).isoformat(timespec="microseconds"), - btime=datetime.fromtimestamp(s.st_birthtime, tz=timezone.utc).isoformat(timespec="microseconds") - if hasattr(s, "st_birthtime") and s.st_birthtime - else "?", + btime=( + datetime.fromtimestamp(s.st_birthtime, tz=timezone.utc).isoformat(timespec="microseconds") + if hasattr(s, "st_birthtime") and s.st_birthtime + else "?" + ), ) print(res, file=stdout) diff --git a/pyproject.toml b/pyproject.toml index d6c6cedc8..58b0b0c4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "dissect.regf>=3.3,<4", "dissect.util>=3,<4", "dissect.volume>=2,<4", - "flow.record~=3.18.0", + "flow.record~=3.19.0", "structlog", ] dynamic = ["version"] diff --git a/tests/helpers/test_fsutil.py b/tests/helpers/test_fsutil.py index d5fa89bdf..63c1f5fe5 100644 --- a/tests/helpers/test_fsutil.py +++ b/tests/helpers/test_fsutil.py @@ -165,6 +165,26 @@ def test_relpath(path: str, start: str, alt_separator: str, result: str) -> None assert fsutil.relpath(path, start, alt_separator=alt_separator) == result +@pytest.mark.parametrize( + ("paths, alt_separator, result"), + [ + (["/some/dir/some/file", "/some/dir/some/other"], "", "/some/dir/some"), + (["/some/dir/some/file", "/some/dir/some/other"], "\\", "/some/dir/some"), + (["\\some\\dir\\some\\file", "\\some\\dir\\some\\other"], "\\", "/some/dir/some"), + (["/some/dir/some/file", "/some/dir/other"], "", "/some/dir"), + (["/some/dir/some/file", "/some/other"], "", "/some"), + (["/some/dir/some/file", "/some/other"], "\\", "/some"), + ], +) +def test_commonpath(paths: list[str], alt_separator: str, result: str) -> None: + assert fsutil.commonpath(paths, alt_separator=alt_separator) == result + + +def test_isreserved() -> None: + assert not fsutil.isreserved("CON") + assert not fsutil.isreserved("foo") + + def test_generate_addr() -> None: slash_path = "/some/dir/some/file" slash_vfs = VirtualFilesystem(alt_separator="") @@ -226,10 +246,13 @@ def path_fs() -> Iterator[VirtualFilesystem]: vfs = VirtualFilesystem() vfs.makedirs("/some/dir") + vfs.makedirs("/some/dir/nested") vfs.symlink("/some/dir/file.txt", "/some/symlink.txt") vfs.symlink("nonexistent", "/some/dir/link.txt") + vfs.symlink("/some/dir/nested", "/some/dirlink") vfs.map_file_fh("/some/file.txt", io.BytesIO(b"content")) vfs.map_file_fh("/some/dir/file.txt", io.BytesIO(b"")) + vfs.map_file_fh("/some/dir/nested/file.txt", io.BytesIO(b"")) yield vfs @@ -308,6 +331,7 @@ def test_target_path_is_relative_to(path_fs: VirtualFilesystem) -> None: assert not path_fs.path("/some/dir/file.txt").is_relative_to("/some/other") +@pytest.mark.skipif(sys.version_info >= (3, 13), reason="deprecated on Python 3.13+") def test_target_path_is_reserved(path_fs: VirtualFilesystem) -> None: # We currently do not have any reserved names for TargetPath assert not path_fs.path("CON").is_reserved() @@ -374,14 +398,48 @@ def test_target_path_glob(path_fs: VirtualFilesystem) -> None: def test_target_path_rglob(path_fs: VirtualFilesystem) -> None: - assert list(path_fs.path("/some").rglob("*.txt")) == [ - path_fs.path("/some/symlink.txt"), - path_fs.path("/some/file.txt"), - path_fs.path("/some/dir/link.txt"), - path_fs.path("/some/dir/file.txt"), + assert list(map(str, path_fs.path("/some").rglob("*.txt"))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", ] + assert list(path_fs.path("/some").rglob("*.TXT")) == [] assert list(path_fs.path("/some").rglob("*.csv")) == [] + with patch.object(path_fs, "case_sensitive", False): + assert list(map(str, path_fs.path("/some").rglob("*.TXT"))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", + ] + + +@pytest.mark.skipif(sys.version_info < (3, 12), reason="requires Python 3.12+") +def test_target_path_rglob_case_sensitive(path_fs: VirtualFilesystem) -> None: + assert list(map(str, path_fs.path("/some").rglob("*.TXT", case_sensitive=False))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", + ] + + +@pytest.mark.skipif(sys.version_info < (3, 13), reason="requires Python 3.13+") +def test_target_path_rglob_recurse_symlinks(path_fs: VirtualFilesystem) -> None: + assert list(map(str, path_fs.path("/some").rglob("*.txt", recurse_symlinks=True))) == [ + "/some/symlink.txt", + "/some/file.txt", + "/some/dirlink/file.txt", + "/some/dir/link.txt", + "/some/dir/file.txt", + "/some/dir/nested/file.txt", + ] + def test_target_path_is_dir(path_fs: VirtualFilesystem) -> None: assert path_fs.path("/some/dir").is_dir() @@ -439,14 +497,16 @@ def test_target_path_iterdir(path_fs: VirtualFilesystem) -> None: assert list(path_fs.path("/some").iterdir()) == [ path_fs.path("/some/dir"), path_fs.path("/some/symlink.txt"), + path_fs.path("/some/dirlink"), path_fs.path("/some/file.txt"), ] def test_target_path_walk(path_fs: VirtualFilesystem) -> None: assert list(path_fs.path("/some").walk()) == [ - (path_fs.path("/some"), ["dir"], ["symlink.txt", "file.txt"]), - (path_fs.path("/some/dir"), [], ["link.txt", "file.txt"]), + (path_fs.path("/some"), ["dir"], ["symlink.txt", "dirlink", "file.txt"]), + (path_fs.path("/some/dir"), ["nested"], ["link.txt", "file.txt"]), + (path_fs.path("/some/dir/nested"), [], ["file.txt"]), ] @@ -625,8 +685,9 @@ def test_pure_dissect_path__from_parts_flavour(alt_separator: str, case_sensitiv vfs = VirtualFilesystem(alt_separator=alt_separator, case_sensitive=case_sensitive) pure_dissect_path = fsutil.PureDissectPath(vfs, "/some/dir") - assert pure_dissect_path._flavour.altsep == alt_separator - assert pure_dissect_path._flavour.case_sensitive == case_sensitive + obj = getattr(pure_dissect_path, "parser", None) or pure_dissect_path._flavour + assert obj.altsep == alt_separator + assert obj.case_sensitive == case_sensitive def test_pure_dissect_path__from_parts_no_fs_exception() -> None: diff --git a/tests/plugins/apps/vpn/test_wireguard.py b/tests/plugins/apps/vpn/test_wireguard.py index d9d96ab5b..94e8edd40 100644 --- a/tests/plugins/apps/vpn/test_wireguard.py +++ b/tests/plugins/apps/vpn/test_wireguard.py @@ -1,10 +1,12 @@ +from dissect.target.filesystem import VirtualFilesystem from dissect.target.plugins.apps.vpn.wireguard import WireGuardPlugin +from dissect.target.target import Target from tests._utils import absolute_path -def test_wireguard_plugin_global_log(target_unix_users, fs_unix): +def test_wireguard_plugin_global_log(target_unix_users: Target, fs_unix: VirtualFilesystem) -> None: wireguard_config_file = absolute_path("_data/plugins/apps/vpn/wireguard/wg0.conf") - fs_unix.map_file("etc/wireguard/wg0.conf", wireguard_config_file) + fs_unix.map_file("/etc/wireguard/wg0.conf", wireguard_config_file) target_unix_users.add_plugin(WireGuardPlugin) records = list(target_unix_users.wireguard.config()) @@ -16,7 +18,7 @@ def test_wireguard_plugin_global_log(target_unix_users, fs_unix): assert str(record.address) == "10.13.37.1" assert record.private_key == "UHJpdmF0ZUtleQ==" assert record.listen_port == 12345 - assert record.source == "etc/wireguard/wg0.conf" + assert record.source == "/etc/wireguard/wg0.conf" assert record.dns is None # Peer @@ -24,11 +26,11 @@ def test_wireguard_plugin_global_log(target_unix_users, fs_unix): assert record.name is None assert [str(addr) for addr in record.allowed_ips] == ["10.13.37.2/32", "::/0"] assert record.public_key == "UHVibGljS2V5MQ==" - assert record.source == "etc/wireguard/wg0.conf" + assert record.source == "/etc/wireguard/wg0.conf" # Peer record = records[2] assert record.name is None assert [str(addr) for addr in record.allowed_ips] == ["10.13.37.3/32", "::/0"] assert record.public_key == "UHVibGljS2V5Mg==" - assert record.source == "etc/wireguard/wg0.conf" + assert record.source == "/etc/wireguard/wg0.conf" diff --git a/tests/plugins/os/windows/test_recyclebin.py b/tests/plugins/os/windows/test_recyclebin.py index 733a9198f..c6e5b5554 100644 --- a/tests/plugins/os/windows/test_recyclebin.py +++ b/tests/plugins/os/windows/test_recyclebin.py @@ -1,5 +1,7 @@ +import io from pathlib import Path from tempfile import TemporaryDirectory +from typing import Iterator from unittest.mock import Mock, mock_open, patch import pytest @@ -7,10 +9,11 @@ from dissect.target.exceptions import UnsupportedPluginError from dissect.target.filesystem import VirtualFilesystem from dissect.target.plugins.os.windows.recyclebin import RecyclebinPlugin, c_recyclebin +from dissect.target.target import Target @pytest.fixture -def recycle_bin(): +def recycle_bin() -> Iterator[VirtualFilesystem]: recycle_bin = VirtualFilesystem() with TemporaryDirectory() as tmp_dir: @@ -18,17 +21,17 @@ def recycle_bin(): yield recycle_bin -def test_recycle_bin_compatibility_failed(target_win): +def test_recycle_bin_compatibility_failed(target_win: Target) -> None: with pytest.raises(UnsupportedPluginError): RecyclebinPlugin(target_win).check_compatible() -def test_recycle_bin_compat_succeeded(target_win, recycle_bin): +def test_recycle_bin_compat_succeeded(target_win: Target, recycle_bin: VirtualFilesystem) -> None: target_win.fs.mount("C:\\$recycle.bin", recycle_bin) assert RecyclebinPlugin(target_win).check_compatible() is None -def test_read_recycle_bin(target_win): +def test_read_recycle_bin(target_win: Target) -> None: mocked_file = Mock() mocked_file.is_file.return_value = True @@ -37,7 +40,7 @@ def test_read_recycle_bin(target_win): assert [mocked_bin_file.return_value] == list(RecyclebinPlugin(target_win).read_recycle_bin(mocked_file)) -def test_filtered_name(target_win): +def test_filtered_name(target_win: Target) -> None: mocked_file = Mock() mocked_file.is_file.return_value = True mocked_file.is_dir.return_value = False @@ -47,7 +50,7 @@ def test_filtered_name(target_win): assert [] == list(RecyclebinPlugin(target_win).read_recycle_bin(mocked_file)) -def test_read_recycle_bin_directory(target_win): +def test_read_recycle_bin_directory(target_win: Target) -> None: mocked_dir = Mock() mocked_dir.is_file.return_value = False mocked_dir.is_dir.return_value = True @@ -74,7 +77,7 @@ def test_read_recycle_bin_directory(target_win): (b"\x02" + b"\x00" * 7, "header_v2"), ], ) -def test_parse_header(target_win, version_number, expected_header): +def test_parse_header(target_win: Target, version_number: bytes, expected_header: str) -> None: recycle_plugin = RecyclebinPlugin(target_win) header = recycle_plugin.select_header(version_number) @@ -88,7 +91,7 @@ def test_parse_header(target_win, version_number, expected_header): "file/to/$recycle.bin/sid", ], ) -def test_read_bin_file_unknown(target_win, path): +def test_read_bin_file_unknown(target_win: Target, path: str) -> None: recycle_plugin = RecyclebinPlugin(target_win) header_1 = c_recyclebin.header_v1(version=0, file_size=0x20, timestamp=0x20, filename="hello_world" + "\x00" * 249) @@ -102,8 +105,8 @@ def test_read_bin_file_unknown(target_win, path): assert output.path == "hello_world" -def test_recyclebin_plugin_file(target_win, recycle_bin): - recycle_bin.map_file("$ihello_world", None) +def test_recyclebin_plugin_file(target_win: Target, recycle_bin: VirtualFilesystem) -> None: + recycle_bin.map_file_fh("$ihello_world", io.BytesIO(b"")) target_win.fs.mount("C:\\$recycle.bin", recycle_bin) target_win.add_plugin(RecyclebinPlugin) @@ -113,8 +116,8 @@ def test_recyclebin_plugin_file(target_win, recycle_bin): assert recycle_bin_entries == [mocked_bin_file.return_value] -def test_recyclebin_plugin_wrong_prefix(target_win, recycle_bin): - recycle_bin.map_file("hello_world", None) +def test_recyclebin_plugin_wrong_prefix(target_win: Target, recycle_bin: VirtualFilesystem) -> None: + recycle_bin.map_file_fh("hello_world", io.BytesIO(b"")) target_win.fs.mount("C:\\$recycle.bin", recycle_bin) target_win.add_plugin(RecyclebinPlugin) @@ -132,7 +135,7 @@ def test_recyclebin_plugin_wrong_prefix(target_win, recycle_bin): ("C:/$Recycle.bin/just_another_file", "unknown"), ], ) -def test_find_sid_from_path(target_win, path, expected_output): +def test_find_sid_from_path(target_win: Target, path: str, expected_output: str) -> None: recycle_plugin = RecyclebinPlugin(target_win) assert recycle_plugin.find_sid(Path(path)) == expected_output diff --git a/tests/test_container.py b/tests/test_container.py index 80087541f..4e659968e 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import struct from io import BytesIO from pathlib import Path +from typing import Iterator from unittest.mock import Mock, patch import pytest @@ -11,7 +14,7 @@ @pytest.fixture -def mocked_ewf_detect(): +def mocked_ewf_detect() -> Iterator[Mock]: mocked_ewf = Mock() mocked_ewf.EwfContainer.detect.return_value = True mocked_ewf.EwfContainer.detect @@ -27,18 +30,18 @@ def mocked_ewf_detect(): ([Path("hello")], [Path("hello")]), ], ) -def test_open_inputs(mocked_ewf_detect: Mock, path, expected_output): +def test_open_inputs(mocked_ewf_detect: Mock, path: str | list[str] | Path, expected_output: Path | list[Path]) -> None: container.open(path) mocked_ewf_detect.assert_called_with(expected_output) -def test_open_fallback_fh(tmp_path): +def test_open_fallback_fh(tmp_path: Path) -> None: # Create a valid VHD file fake_vhd = ( (bytes(range(256)) * 2) + b"conectix" + (b"\x00" * 8) - + (b"\xFF" * 8) + + (b"\xff" * 8) + (b"\x00" * 24) + struct.pack(">Q", 512) + (b"\x00" * 455) diff --git a/tests/tools/test_diff.py b/tests/tools/test_diff.py index df6bae6ce..75701ce5b 100644 --- a/tests/tools/test_diff.py +++ b/tests/tools/test_diff.py @@ -223,7 +223,9 @@ def test_differentiate_plugins(src_target: Target, dst_target: Target) -> None: assert deleted[0].record.hostname == "src_target" -def test_shell_ls(src_target: Target, dst_target: Target, capsys, monkeypatch) -> None: +def test_shell_ls( + src_target: Target, dst_target: Target, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch +) -> None: monkeypatch.setattr(fsutils, "LS_COLORS", {}) cli = DifferentialCli(src_target, dst_target, deep=True) @@ -248,7 +250,9 @@ def test_shell_ls(src_target: Target, dst_target: Target, capsys, monkeypatch) - assert captured.out == "\n".join(expected) + "\n" -def test_shell_find(src_target: Target, dst_target: Target, capsys, monkeypatch) -> None: +def test_shell_find( + src_target: Target, dst_target: Target, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch +) -> None: monkeypatch.setattr(fsutils, "LS_COLORS", {}) cli = DifferentialCli(src_target, dst_target, deep=True) @@ -275,7 +279,7 @@ def test_shell_find(src_target: Target, dst_target: Target, capsys, monkeypatch) assert captured.out == "\n".join(expected) + "\n" -def test_shell_cat(src_target: Target, dst_target: Target, capsys) -> None: +def test_shell_cat(src_target: Target, dst_target: Target, capsys: pytest.CaptureFixture) -> None: cli = DifferentialCli(src_target, dst_target, deep=True) cli.onecmd("cat /changes/unchanged") @@ -296,7 +300,7 @@ def test_shell_cat(src_target: Target, dst_target: Target, capsys) -> None: assert captured.out == "Hello From Destination Target\n" -def test_shell_plugin(src_target: Target, dst_target: Target, capsys) -> None: +def test_shell_plugin(src_target: Target, dst_target: Target, capsys: pytest.CaptureFixture) -> None: cli = DifferentialCli(src_target, dst_target, deep=True) cli.onecmd("plugin users") @@ -307,10 +311,10 @@ def test_shell_plugin(src_target: Target, dst_target: Target, capsys) -> None: assert "differential/record/deleted" in captured.out -def test_target_diff_shell(capsys, monkeypatch) -> None: +def test_target_diff_shell(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: with monkeypatch.context() as m: m.setattr(fsutils, "LS_COLORS", {}) - m.setenv("NO_COLOR", 1) + m.setenv("NO_COLOR", "1") src_target_path = absolute_path("_data/tools/diff/src.tar") dst_target_path = absolute_path("_data/tools/diff/dst.tar") m.setattr("sys.argv", ["target-diff", "--deep", "shell", src_target_path, dst_target_path]) @@ -333,7 +337,7 @@ def test_target_diff_shell(capsys, monkeypatch) -> None: assert "unrecognized arguments" not in err -def test_target_diff_fs(capsys, monkeypatch) -> None: +def test_target_diff_fs(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: with monkeypatch.context() as m: src_target_path = absolute_path("_data/tools/diff/src.tar") dst_target_path = absolute_path("_data/tools/diff/dst.tar") @@ -346,7 +350,7 @@ def test_target_diff_fs(capsys, monkeypatch) -> None: assert "differential/file/deleted" in out -def test_target_diff_query(capsys, monkeypatch) -> None: +def test_target_diff_query(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: with monkeypatch.context() as m: src_target_path = absolute_path("_data/tools/diff/src.tar") dst_target_path = absolute_path("_data/tools/diff/dst.tar")