From 23347be484cb69a8d71de9fd6f9dae673d365562 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Tue, 5 Nov 2024 22:48:27 +0100 Subject: [PATCH 01/11] Add ZIP loader --- dissect/target/loader.py | 1 + dissect/target/loaders/zip.py | 97 +++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 dissect/target/loaders/zip.py diff --git a/dissect/target/loader.py b/dissect/target/loader.py index 08d5e29b0..1b92149c1 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -179,6 +179,7 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader: register("mqtt", "MQTTLoader") register("asdf", "AsdfLoader") register("tar", "TarLoader") +register("zip", "ZipLoader") register("vmx", "VmxLoader") register("vmwarevm", "VmwarevmLoader") register("hyperv", "HyperVLoader") diff --git a/dissect/target/loaders/zip.py b/dissect/target/loaders/zip.py new file mode 100644 index 000000000..6b356c425 --- /dev/null +++ b/dissect/target/loaders/zip.py @@ -0,0 +1,97 @@ +import logging +import re +from pathlib import Path +from typing import Union +from zipfile import ZipFile + +from dissect.target import filesystem, target +from dissect.target.filesystems.zip import ZipFilesystemDirectoryEntry, ZipFilesystemEntry +from dissect.target.helpers import fsutil, loaderutil +from dissect.target.loader import Loader + +log = logging.getLogger(__name__) + +ANON_FS_RE = re.compile(r"^fs[0-9]+$") + + +class ZipLoader(Loader): + """Load zip files.""" + + def __init__(self, path: Union[Path, str], **kwargs): + super().__init__(path) + + self.zip = ZipFile(path) + + @staticmethod + def detect(path: Path) -> bool: + return path.name.lower().endswith((".zip")) + + def map(self, target: target.Target) -> None: + volumes = {} + + for member in self.zip.infolist(): + if member.filename == ".": + continue + + if not member.filename.startswith(("/fs/", "fs/", "/sysvol/", "sysvol/")): + # Not an acquire tar + if "/" not in volumes: + vol = filesystem.VirtualFilesystem(case_sensitive=True) + vol.zip = self.zip + volumes["/"] = vol + target.filesystems.add(vol) + + volume = volumes["/"] + mname = member.filename + else: + if member.filename.startswith(("/fs/", "fs/")): + # Current acquire + parts = member.filename.replace("fs/", "").split("/") + if parts[0] == "": + parts.pop(0) + else: + # Legacy acquire + parts = member.filename.lstrip("/").split("/") + volume_name = parts[0].lower() + + # NOTE: older versions of acquire would write to "sysvol" instead of a driver letter + # Figuring out the sysvol from the drive letters is easier than the drive letter from "sysvol", + # so this was swapped in acquire 3.12. Now we map all volumes to a drive letter and let the + # Windows OS plugin figure out which is the sysvol + # For backwards compatibility we're forced to keep this check, and assume that "c:" is our sysvol + if volume_name == "sysvol": + volume_name = "c:" + + if volume_name == "$fs$": + if len(parts) == 1: + # The fs/$fs$ entry is ignored, only the directories below it are processed. + continue + fs_name = parts[1] + if ANON_FS_RE.match(fs_name): + parts.pop(0) + volume_name = f"{volume_name}/{fs_name}" + + if volume_name not in volumes: + vol = filesystem.VirtualFilesystem(case_sensitive=False) + vol.zip = self.zip + volumes[volume_name] = vol + target.filesystems.add(vol) + + volume = volumes[volume_name] + mname = "/".join(parts[1:]) + + entry_cls = ZipFilesystemDirectoryEntry if member.is_dir() else ZipFilesystemEntry + entry = entry_cls(volume, fsutil.normpath(mname), member) + volume.map_file_entry(entry.path, entry) + + for vol_name, vol in volumes.items(): + loaderutil.add_virtual_ntfs_filesystem( + target, + vol, + usnjrnl_path=[ + "$Extend/$Usnjrnl:$J", + "$Extend/$Usnjrnl:J", # Old versions of acquire used $Usnjrnl:J + ], + ) + + target.fs.mount(vol_name, vol) From 08b50274b90ec7d80415321b462dfab213320c65 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Mon, 18 Nov 2024 16:31:15 +0100 Subject: [PATCH 02/11] Move acquire logic into dedicated AcquireLoader --- dissect/target/loader.py | 2 +- dissect/target/loaders/acquire.py | 55 ++++++++++++++++++ dissect/target/loaders/tar.py | 65 +++++---------------- dissect/target/loaders/zip.py | 97 ------------------------------- 4 files changed, 72 insertions(+), 147 deletions(-) create mode 100644 dissect/target/loaders/acquire.py delete mode 100644 dissect/target/loaders/zip.py diff --git a/dissect/target/loader.py b/dissect/target/loader.py index 1b92149c1..cd5f541a0 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -179,7 +179,7 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader: register("mqtt", "MQTTLoader") register("asdf", "AsdfLoader") register("tar", "TarLoader") -register("zip", "ZipLoader") +register("acquire", "AcquireLoader") register("vmx", "VmxLoader") register("vmwarevm", "VmwarevmLoader") register("hyperv", "HyperVLoader") diff --git a/dissect/target/loaders/acquire.py b/dissect/target/loaders/acquire.py new file mode 100644 index 000000000..e66e975e6 --- /dev/null +++ b/dissect/target/loaders/acquire.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import logging +import zipfile +from pathlib import Path +from typing import Union + +from dissect.target import target +from dissect.target.filesystem import TarFilesystem +from dissect.target.loader import Loader +from dissect.target.loaders.dir import find_and_map_dirs + +log = logging.getLogger(__name__) + +FILESYSTEMS_ROOT = "fs" + + +def _get_root(path: Union[Path, str]): + if path.suffix == ".zip": + return zipfile.Path(path.open("rb")) + elif path.suffix == ".tar": + return TarFilesystem(path.open("rb")).path() + elif path.suffix in [".tar.gz", ".tgz"]: + log.warning( + f"Tar file {path!r} is compressed, which will affect performance. " + "Consider uncompressing the archive before passing the tar file to Dissect." + ) + return TarFilesystem(path.open("rb")).path() + else: + return path + + +class AcquireLoader(Loader): + """ + Load acquire collect files. + Supports both the zip and tar output + Only compatible with acquire >= 3.12 due to changed structure + """ + + def __init__(self, path: Union[Path, str], **kwargs): + super().__init__(path) + + self.root = _get_root(path) + + @staticmethod + def detect(path: Path) -> bool: + path = _get_root(path) + + return path.joinpath(FILESYSTEMS_ROOT).exists() + + def map(self, target: target.Target) -> None: + find_and_map_dirs( + target, + self.root.joinpath(FILESYSTEMS_ROOT), + ) diff --git a/dissect/target/loaders/tar.py b/dissect/target/loaders/tar.py index 092cd1a31..576520b98 100644 --- a/dissect/target/loaders/tar.py +++ b/dissect/target/loaders/tar.py @@ -15,7 +15,6 @@ log = logging.getLogger(__name__) - ANON_FS_RE = re.compile(r"^fs[0-9]+$") @@ -38,7 +37,14 @@ def __init__(self, path: Path | str, **kwargs): @staticmethod def detect(path: Path) -> bool: - return path.name.lower().endswith((".tar", ".tar.gz", ".tgz")) + if not path.name.lower().endswith((".tar", ".tar.gz", ".tgz")): + return False + + # Check that this is not an acquire collect, that is handled by AcquireLoader + tar = tarfile.open(fileobj=path.open("rb")) + acquire_members = [m for m in tar.getmembers() if m.name.startswith(("/fs/", "fs/", "/sysvol/", "sysvol/"))] + + return len(acquire_members) == 0 def is_compressed(self, path: Path | str) -> bool: return str(path).lower().endswith((".tar.gz", ".tgz")) @@ -50,52 +56,14 @@ def map(self, target: target.Target) -> None: if member.name == ".": continue - if not member.name.startswith(("/fs/", "fs/", "/sysvol/", "sysvol/")): - # Not an acquire tar - if "/" not in volumes: - vol = filesystem.VirtualFilesystem(case_sensitive=True) - vol.tar = self.tar - volumes["/"] = vol - target.filesystems.add(vol) - - volume = volumes["/"] - mname = member.name - else: - if member.name.startswith(("/fs/", "fs/")): - # Current acquire - parts = member.name.replace("fs/", "").split("/") - if parts[0] == "": - parts.pop(0) - else: - # Legacy acquire - parts = member.name.lstrip("/").split("/") - volume_name = parts[0].lower() - - # NOTE: older versions of acquire would write to "sysvol" instead of a driver letter - # Figuring out the sysvol from the drive letters is easier than the drive letter from "sysvol", - # so this was swapped in acquire 3.12. Now we map all volumes to a drive letter and let the - # Windows OS plugin figure out which is the sysvol - # For backwards compatibility we're forced to keep this check, and assume that "c:" is our sysvol - if volume_name == "sysvol": - volume_name = "c:" - - if volume_name == "$fs$": - if len(parts) == 1: - # The fs/$fs$ entry is ignored, only the directories below it are processed. - continue - fs_name = parts[1] - if ANON_FS_RE.match(fs_name): - parts.pop(0) - volume_name = f"{volume_name}/{fs_name}" - - if volume_name not in volumes: - vol = filesystem.VirtualFilesystem(case_sensitive=False) - vol.tar = self.tar - volumes[volume_name] = vol - target.filesystems.add(vol) - - volume = volumes[volume_name] - mname = "/".join(parts[1:]) + if "/" not in volumes: + vol = filesystem.VirtualFilesystem(case_sensitive=True) + vol.tar = self.tar + volumes["/"] = vol + target.filesystems.add(vol) + + volume = volumes["/"] + mname = member.name entry_cls = TarFilesystemDirectoryEntry if member.isdir() else TarFilesystemEntry entry = entry_cls(volume, fsutil.normpath(mname), member) @@ -107,7 +75,6 @@ def map(self, target: target.Target) -> None: vol, usnjrnl_path=[ "$Extend/$Usnjrnl:$J", - "$Extend/$Usnjrnl:J", # Old versions of acquire used $Usnjrnl:J ], ) diff --git a/dissect/target/loaders/zip.py b/dissect/target/loaders/zip.py deleted file mode 100644 index 6b356c425..000000000 --- a/dissect/target/loaders/zip.py +++ /dev/null @@ -1,97 +0,0 @@ -import logging -import re -from pathlib import Path -from typing import Union -from zipfile import ZipFile - -from dissect.target import filesystem, target -from dissect.target.filesystems.zip import ZipFilesystemDirectoryEntry, ZipFilesystemEntry -from dissect.target.helpers import fsutil, loaderutil -from dissect.target.loader import Loader - -log = logging.getLogger(__name__) - -ANON_FS_RE = re.compile(r"^fs[0-9]+$") - - -class ZipLoader(Loader): - """Load zip files.""" - - def __init__(self, path: Union[Path, str], **kwargs): - super().__init__(path) - - self.zip = ZipFile(path) - - @staticmethod - def detect(path: Path) -> bool: - return path.name.lower().endswith((".zip")) - - def map(self, target: target.Target) -> None: - volumes = {} - - for member in self.zip.infolist(): - if member.filename == ".": - continue - - if not member.filename.startswith(("/fs/", "fs/", "/sysvol/", "sysvol/")): - # Not an acquire tar - if "/" not in volumes: - vol = filesystem.VirtualFilesystem(case_sensitive=True) - vol.zip = self.zip - volumes["/"] = vol - target.filesystems.add(vol) - - volume = volumes["/"] - mname = member.filename - else: - if member.filename.startswith(("/fs/", "fs/")): - # Current acquire - parts = member.filename.replace("fs/", "").split("/") - if parts[0] == "": - parts.pop(0) - else: - # Legacy acquire - parts = member.filename.lstrip("/").split("/") - volume_name = parts[0].lower() - - # NOTE: older versions of acquire would write to "sysvol" instead of a driver letter - # Figuring out the sysvol from the drive letters is easier than the drive letter from "sysvol", - # so this was swapped in acquire 3.12. Now we map all volumes to a drive letter and let the - # Windows OS plugin figure out which is the sysvol - # For backwards compatibility we're forced to keep this check, and assume that "c:" is our sysvol - if volume_name == "sysvol": - volume_name = "c:" - - if volume_name == "$fs$": - if len(parts) == 1: - # The fs/$fs$ entry is ignored, only the directories below it are processed. - continue - fs_name = parts[1] - if ANON_FS_RE.match(fs_name): - parts.pop(0) - volume_name = f"{volume_name}/{fs_name}" - - if volume_name not in volumes: - vol = filesystem.VirtualFilesystem(case_sensitive=False) - vol.zip = self.zip - volumes[volume_name] = vol - target.filesystems.add(vol) - - volume = volumes[volume_name] - mname = "/".join(parts[1:]) - - entry_cls = ZipFilesystemDirectoryEntry if member.is_dir() else ZipFilesystemEntry - entry = entry_cls(volume, fsutil.normpath(mname), member) - volume.map_file_entry(entry.path, entry) - - for vol_name, vol in volumes.items(): - loaderutil.add_virtual_ntfs_filesystem( - target, - vol, - usnjrnl_path=[ - "$Extend/$Usnjrnl:$J", - "$Extend/$Usnjrnl:J", # Old versions of acquire used $Usnjrnl:J - ], - ) - - target.fs.mount(vol_name, vol) From ee3cb91728132ba1cdcaea6439a86858a97366a4 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Fri, 3 Jan 2025 14:08:08 +0100 Subject: [PATCH 03/11] Make aquire loader work with TAR files --- dissect/target/loader.py | 2 +- dissect/target/loaders/acquire.py | 49 ++++++++++++++----------------- dissect/target/loaders/dir.py | 3 ++ dissect/target/loaders/tar.py | 12 +------- 4 files changed, 27 insertions(+), 39 deletions(-) diff --git a/dissect/target/loader.py b/dissect/target/loader.py index cd5f541a0..7c93128e1 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -178,8 +178,8 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader: register("remote", "RemoteLoader") register("mqtt", "MQTTLoader") register("asdf", "AsdfLoader") -register("tar", "TarLoader") register("acquire", "AcquireLoader") +register("tar", "TarLoader") register("vmx", "VmxLoader") register("vmwarevm", "VmwarevmLoader") register("hyperv", "HyperVLoader") diff --git a/dissect/target/loaders/acquire.py b/dissect/target/loaders/acquire.py index e66e975e6..a5086246a 100644 --- a/dissect/target/loaders/acquire.py +++ b/dissect/target/loaders/acquire.py @@ -3,53 +3,48 @@ import logging import zipfile from pathlib import Path -from typing import Union -from dissect.target import target -from dissect.target.filesystem import TarFilesystem +from dissect.target.filesystems.tar import TarFilesystem +from dissect.target.filesystems.zip import ZipFilesystem from dissect.target.loader import Loader from dissect.target.loaders.dir import find_and_map_dirs +from dissect.target.target import Target log = logging.getLogger(__name__) FILESYSTEMS_ROOT = "fs" -def _get_root(path: Union[Path, str]): - if path.suffix == ".zip": - return zipfile.Path(path.open("rb")) - elif path.suffix == ".tar": - return TarFilesystem(path.open("rb")).path() - elif path.suffix in [".tar.gz", ".tgz"]: - log.warning( - f"Tar file {path!r} is compressed, which will affect performance. " - "Consider uncompressing the archive before passing the tar file to Dissect." - ) - return TarFilesystem(path.open("rb")).path() - else: - return path +def _get_root(path: Path): + if path.is_file(): + fh = path.open("rb") + if TarFilesystem._detect(fh): + return TarFilesystem(fh).path() + # test this + if ZipFilesystem._detect(fh): + return zipfile.Path(path.open("rb")) + + return None -class AcquireLoader(Loader): - """ - Load acquire collect files. - Supports both the zip and tar output - Only compatible with acquire >= 3.12 due to changed structure - """ - def __init__(self, path: Union[Path, str], **kwargs): +class AcquireLoader(Loader): + def __init__(self, path: Path, **kwargs): super().__init__(path) self.root = _get_root(path) @staticmethod def detect(path: Path) -> bool: - path = _get_root(path) + root = _get_root(path) + + if not root: + return False - return path.joinpath(FILESYSTEMS_ROOT).exists() + return root.joinpath(FILESYSTEMS_ROOT).exists() - def map(self, target: target.Target) -> None: + def map(self, target: Target) -> None: find_and_map_dirs( target, - self.root.joinpath(FILESYSTEMS_ROOT), + self.root.joinpath(FILESYSTEMS_ROOT) ) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index 22a55e549..e305e0ffd 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from dissect.target.filesystem import LayerFilesystem +from dissect.target.filesystems.tar import TarFilesystem from dissect.target.filesystems.dir import DirectoryFilesystem from dissect.target.filesystems.zip import ZipFilesystem from dissect.target.helpers import loaderutil @@ -60,6 +61,8 @@ def map_dirs(target: Target, dirs: list[Path | tuple[str, Path]], os_type: str, if isinstance(path, zipfile.Path): dfs = ZipFilesystem(path.root.fp, path.at, alt_separator=alt_separator, case_sensitive=case_sensitive) + elif hasattr(path, '_fs') and isinstance(path._fs, TarFilesystem): + dfs = TarFilesystem(path._fs.tar.fileobj, str(path), alt_separator=alt_separator, case_sensitive=case_sensitive) else: dfs = DirectoryFilesystem(path, alt_separator=alt_separator, case_sensitive=case_sensitive) diff --git a/dissect/target/loaders/tar.py b/dissect/target/loaders/tar.py index 576520b98..d06346f52 100644 --- a/dissect/target/loaders/tar.py +++ b/dissect/target/loaders/tar.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import re import tarfile from pathlib import Path @@ -15,8 +14,6 @@ log = logging.getLogger(__name__) -ANON_FS_RE = re.compile(r"^fs[0-9]+$") - class TarLoader(Loader): """Load tar files.""" @@ -37,14 +34,7 @@ def __init__(self, path: Path | str, **kwargs): @staticmethod def detect(path: Path) -> bool: - if not path.name.lower().endswith((".tar", ".tar.gz", ".tgz")): - return False - - # Check that this is not an acquire collect, that is handled by AcquireLoader - tar = tarfile.open(fileobj=path.open("rb")) - acquire_members = [m for m in tar.getmembers() if m.name.startswith(("/fs/", "fs/", "/sysvol/", "sysvol/"))] - - return len(acquire_members) == 0 + return path.name.lower().endswith((".tar", ".tar.gz", ".tgz")) def is_compressed(self, path: Path | str) -> bool: return str(path).lower().endswith((".tar.gz", ".tgz")) From 29528b91386d8c4d22dfcd8defc09ea103a0ad57 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Fri, 3 Jan 2025 14:10:36 +0100 Subject: [PATCH 04/11] Fix comments --- dissect/target/loaders/dir.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index 96f26f675..ce1118945 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -55,6 +55,7 @@ def map_dirs( os_type: The operating system type, used to determine how the filesystem should be mounted. dirfs: The filesystem class to use for directory filesystems. zipfs: The filesystem class to use for ZIP filesystems. + tarfs: The filesystem class to use for TAR filesystems. """ alt_separator = "" case_sensitive = True From a0c5d891ba03a9aea125af247dfba2d6daa66f71 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Fri, 3 Jan 2025 15:22:06 +0100 Subject: [PATCH 05/11] Add tests --- dissect/target/loaders/acquire.py | 10 ++- dissect/target/loaders/dir.py | 5 +- .../test-anon-filesystems.tar | 0 .../test-windows-fs-c-absolute.tar | 0 .../test-windows-fs-c-relative.tar | 0 .../{tar => acquire}/test-windows-fs-x.tar | 0 .../test-windows-sysvol-absolute.tar | 0 .../test-windows-sysvol-relative.tar | 0 .../uppercase_driveletter.tar | 0 tests/loaders/test_acquire.py | 70 +++++++++++++++++++ tests/loaders/test_tar.py | 68 +++--------------- 11 files changed, 91 insertions(+), 62 deletions(-) rename tests/_data/loaders/{tar => acquire}/test-anon-filesystems.tar (100%) rename tests/_data/loaders/{tar => acquire}/test-windows-fs-c-absolute.tar (100%) rename tests/_data/loaders/{tar => acquire}/test-windows-fs-c-relative.tar (100%) rename tests/_data/loaders/{tar => acquire}/test-windows-fs-x.tar (100%) rename tests/_data/loaders/{tar => acquire}/test-windows-sysvol-absolute.tar (100%) rename tests/_data/loaders/{tar => acquire}/test-windows-sysvol-relative.tar (100%) rename tests/_data/loaders/{tar => acquire}/uppercase_driveletter.tar (100%) create mode 100644 tests/loaders/test_acquire.py diff --git a/dissect/target/loaders/acquire.py b/dissect/target/loaders/acquire.py index a5086246a..bea0a81ea 100644 --- a/dissect/target/loaders/acquire.py +++ b/dissect/target/loaders/acquire.py @@ -13,6 +13,7 @@ log = logging.getLogger(__name__) FILESYSTEMS_ROOT = "fs" +FILESYSTEMS_LEGACY_ROOT = "sysvol" def _get_root(path: Path): @@ -41,10 +42,15 @@ def detect(path: Path) -> bool: if not root: return False - return root.joinpath(FILESYSTEMS_ROOT).exists() + return root.joinpath(FILESYSTEMS_ROOT).exists() or root.joinpath(FILESYSTEMS_LEGACY_ROOT).exists() def map(self, target: Target) -> None: + # Handle both root dir 'fs' and 'sysvol' (legacy) + fs_root = self.root + if fs_root.joinpath(FILESYSTEMS_ROOT).exists(): + fs_root = fs_root.joinpath(FILESYSTEMS_ROOT) + find_and_map_dirs( target, - self.root.joinpath(FILESYSTEMS_ROOT) + fs_root ) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index ce1118945..aa938bbbc 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -135,7 +135,10 @@ def find_dirs(path: Path) -> tuple[str, list[Path]]: for p in path.iterdir(): # Look for directories like C or C: if p.is_dir() and (is_drive_letter_path(p) or p.name in ("sysvol", "$rootfs$")): - dirs.append(p) + if p.name == "sysvol": + dirs.append(('c', p)) + else: + dirs.append(p) if not os_type: os_type = os_type_from_path(p) diff --git a/tests/_data/loaders/tar/test-anon-filesystems.tar b/tests/_data/loaders/acquire/test-anon-filesystems.tar similarity index 100% rename from tests/_data/loaders/tar/test-anon-filesystems.tar rename to tests/_data/loaders/acquire/test-anon-filesystems.tar diff --git a/tests/_data/loaders/tar/test-windows-fs-c-absolute.tar b/tests/_data/loaders/acquire/test-windows-fs-c-absolute.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-fs-c-absolute.tar rename to tests/_data/loaders/acquire/test-windows-fs-c-absolute.tar diff --git a/tests/_data/loaders/tar/test-windows-fs-c-relative.tar b/tests/_data/loaders/acquire/test-windows-fs-c-relative.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-fs-c-relative.tar rename to tests/_data/loaders/acquire/test-windows-fs-c-relative.tar diff --git a/tests/_data/loaders/tar/test-windows-fs-x.tar b/tests/_data/loaders/acquire/test-windows-fs-x.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-fs-x.tar rename to tests/_data/loaders/acquire/test-windows-fs-x.tar diff --git a/tests/_data/loaders/tar/test-windows-sysvol-absolute.tar b/tests/_data/loaders/acquire/test-windows-sysvol-absolute.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-sysvol-absolute.tar rename to tests/_data/loaders/acquire/test-windows-sysvol-absolute.tar diff --git a/tests/_data/loaders/tar/test-windows-sysvol-relative.tar b/tests/_data/loaders/acquire/test-windows-sysvol-relative.tar similarity index 100% rename from tests/_data/loaders/tar/test-windows-sysvol-relative.tar rename to tests/_data/loaders/acquire/test-windows-sysvol-relative.tar diff --git a/tests/_data/loaders/tar/uppercase_driveletter.tar b/tests/_data/loaders/acquire/uppercase_driveletter.tar similarity index 100% rename from tests/_data/loaders/tar/uppercase_driveletter.tar rename to tests/_data/loaders/acquire/uppercase_driveletter.tar diff --git a/tests/loaders/test_acquire.py b/tests/loaders/test_acquire.py new file mode 100644 index 000000000..ac9bb532f --- /dev/null +++ b/tests/loaders/test_acquire.py @@ -0,0 +1,70 @@ +from pathlib import Path + +import pytest + +from dissect.target import Target +from dissect.target.loaders.acquire import AcquireLoader +from dissect.target.loaders.tar import TarLoader +from dissect.target.plugins.os.windows._os import WindowsPlugin +from tests._utils import absolute_path + + + +# def test_tar_sensitive_drive_letter(target_bare: Target) -> None: +# # TODO: determine if we need this test +# tar_file = absolute_path("_data/loaders/acquire/uppercase_driveletter.tar") +# +# loader = AcquireLoader(Path(tar_file)) +# loader.map(target_bare) +# +# # mounts = / and c: +# assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:"] +# assert "C:" not in target_bare.fs.mounts.keys() +# +# # Initialize our own WindowsPlugin to override the detection +# target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"]) +# target_bare._init_os() +# +# # sysvol is now added +# assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:", "sysvol"] +# +# # WindowsPlugin sets the case sensitivity to False +# assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world" +# assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world" + + +@pytest.mark.parametrize( + "archive, expected_drive_letter", + [ + ("_data/loaders/acquire/test-windows-sysvol-absolute.tar", "c:"), # C: due to backwards compatibility + ("_data/loaders/acquire/test-windows-sysvol-relative.tar", "c:"), # C: due to backwards compatibility + ("_data/loaders/acquire/test-windows-fs-c-relative.tar", "c:"), + ("_data/loaders/acquire/test-windows-fs-c-absolute.tar", "c:"), + ("_data/loaders/acquire/test-windows-fs-x.tar", "x:"), + ("_data/loaders/acquire/test-windows-fs-c.zip", "c:"), + ], +) +def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None: + loader = AcquireLoader(Path(absolute_path(archive))) + loader.map(target_default) + + assert WindowsPlugin.detect(target_default) + # NOTE: for the sysvol archives, this also tests the backwards compatibility + assert sorted(target_default.fs.mounts.keys()) == [expected_drive_letter] + assert target_default.fs.get(f"{expected_drive_letter}/Windows/System32/foo.txt") + + +# TODO check this one +# def test_tar_anonymous_filesystems(target_default: Target) -> None: +# tar_file = absolute_path("_data/loaders/tar/test-anon-filesystems.tar") +# +# loader = AcquireLoader(Path(tar_file)) +# loader.map(target_default) +# +# # mounts = $fs$/fs0, $fs$/fs1 and / +# assert len(target_default.fs.mounts) == 3 +# assert "$fs$/fs0" in target_default.fs.mounts.keys() +# assert "$fs$/fs1" in target_default.fs.mounts.keys() +# assert "/" in target_default.fs.mounts.keys() +# assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n" +# assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n" diff --git a/tests/loaders/test_tar.py b/tests/loaders/test_tar.py index a591b3c21..a3d6cc0cf 100644 --- a/tests/loaders/test_tar.py +++ b/tests/loaders/test_tar.py @@ -2,12 +2,18 @@ from dissect.target import Target from dissect.target.loaders.tar import TarLoader -from dissect.target.plugins.os.windows._os import WindowsPlugin from tests._utils import absolute_path -def test_tar_loader_compressed_tar_file(target_win: Target) -> None: - archive_path = absolute_path("_data/loaders/tar/test-archive.tar.gz") +@pytest.mark.parametrize( + "archive", + [ + "_data/loaders/tar/test-archive.tar", + "_data/loaders/tar/test-archive.tar.gz", + ], +) +def test_tar_loader_compressed_tar_file(target_win: Target, archive) -> None: + archive_path = absolute_path(archive) loader = TarLoader(archive_path) loader.map(target_win) @@ -20,28 +26,6 @@ def test_tar_loader_compressed_tar_file(target_win: Target) -> None: assert test_file.open().read() == b"test-value\n" -def test_tar_sensitive_drive_letter(target_bare: Target) -> None: - tar_file = absolute_path("_data/loaders/tar/uppercase_driveletter.tar") - - loader = TarLoader(tar_file) - loader.map(target_bare) - - # mounts = / and c: - assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:"] - assert "C:" not in target_bare.fs.mounts.keys() - - # Initialize our own WindowsPlugin to override the detection - target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"]) - target_bare._init_os() - - # sysvol is now added - assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:", "sysvol"] - - # WindowsPlugin sets the case sensitivity to False - assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world" - assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world" - - def test_tar_loader_compressed_tar_file_with_empty_dir(target_unix: Target) -> None: archive_path = absolute_path("_data/loaders/tar/test-archive-empty-folder.tgz") loader = TarLoader(archive_path) @@ -55,37 +39,3 @@ def test_tar_loader_compressed_tar_file_with_empty_dir(target_unix: Target) -> N empty_folder = target_unix.fs.path("test/empty_dir") assert empty_folder.exists() assert empty_folder.is_dir() - - -@pytest.mark.parametrize( - "archive, expected_drive_letter", - [ - ("_data/loaders/tar/test-windows-sysvol-absolute.tar", "c:"), # C: due to backwards compatibility - ("_data/loaders/tar/test-windows-sysvol-relative.tar", "c:"), # C: due to backwards compatibility - ("_data/loaders/tar/test-windows-fs-c-relative.tar", "c:"), - ("_data/loaders/tar/test-windows-fs-c-absolute.tar", "c:"), - ("_data/loaders/tar/test-windows-fs-x.tar", "x:"), - ], -) -def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None: - loader = TarLoader(absolute_path(archive)) - loader.map(target_default) - - assert WindowsPlugin.detect(target_default) - # NOTE: for the sysvol archives, this also tests the backwards compatibility - assert sorted(target_default.fs.mounts.keys()) == [expected_drive_letter] - - -def test_tar_anonymous_filesystems(target_default: Target) -> None: - tar_file = absolute_path("_data/loaders/tar/test-anon-filesystems.tar") - - loader = TarLoader(tar_file) - loader.map(target_default) - - # mounts = $fs$/fs0, $fs$/fs1 and / - assert len(target_default.fs.mounts) == 3 - assert "$fs$/fs0" in target_default.fs.mounts.keys() - assert "$fs$/fs1" in target_default.fs.mounts.keys() - assert "/" in target_default.fs.mounts.keys() - assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n" - assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n" From 1ba45902dbcb717d11e0394546ff404a9506346b Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Wed, 8 Jan 2025 09:26:15 +0100 Subject: [PATCH 06/11] Update tests/loaders/test_acquire.py Co-authored-by: Stefan de Reuver <9864602+Horofic@users.noreply.github.com> --- tests/loaders/test_acquire.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/loaders/test_acquire.py b/tests/loaders/test_acquire.py index ac9bb532f..226e0a071 100644 --- a/tests/loaders/test_acquire.py +++ b/tests/loaders/test_acquire.py @@ -4,7 +4,6 @@ from dissect.target import Target from dissect.target.loaders.acquire import AcquireLoader -from dissect.target.loaders.tar import TarLoader from dissect.target.plugins.os.windows._os import WindowsPlugin from tests._utils import absolute_path From 6fd58697ac5701def652e30f2b37a3ad7691f17e Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Wed, 8 Jan 2025 09:26:28 +0100 Subject: [PATCH 07/11] Update dissect/target/loaders/dir.py Co-authored-by: Stefan de Reuver <9864602+Horofic@users.noreply.github.com> --- dissect/target/loaders/dir.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index aa938bbbc..05396b774 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -73,8 +73,8 @@ def map_dirs( if isinstance(path, zipfile.Path): dfs = zipfs(path.root.fp, path.at, alt_separator=alt_separator, case_sensitive=case_sensitive) - elif hasattr(path, '_fs') and isinstance(path._fs, TarFilesystem): - dfs = tarfs(path._fs.tar.fileobj, str(path), alt_separator=alt_separator, case_sensitive=case_sensitive) + elif hasattr(path, "_fs") and isinstance(path._fs, TarFilesystem): + dfs = tarfs(path._fs.tar.fileobj, str(path), alt_separator=alt_separator, case_sensitive=case_sensitive) else: dfs = dirfs(path, alt_separator=alt_separator, case_sensitive=case_sensitive) From eddc1616490b3febf2b4044ec615dd5ceb6668e0 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Wed, 8 Jan 2025 09:33:24 +0100 Subject: [PATCH 08/11] Apply suggestions from code review Co-authored-by: Stefan de Reuver <9864602+Horofic@users.noreply.github.com> --- dissect/target/loaders/acquire.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dissect/target/loaders/acquire.py b/dissect/target/loaders/acquire.py index bea0a81ea..0345ed4ac 100644 --- a/dissect/target/loaders/acquire.py +++ b/dissect/target/loaders/acquire.py @@ -16,13 +16,12 @@ FILESYSTEMS_LEGACY_ROOT = "sysvol" -def _get_root(path: Path): +def _get_root(path: Path) -> Path | None: if path.is_file(): fh = path.open("rb") if TarFilesystem._detect(fh): return TarFilesystem(fh).path() - # test this if ZipFilesystem._detect(fh): return zipfile.Path(path.open("rb")) From 5ae9fec14df7c564bbb0d7d39681148bba2648f4 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Wed, 8 Jan 2025 09:23:47 +0100 Subject: [PATCH 09/11] Add ZIP acquire test data --- tests/_data/loaders/acquire/test-windows-fs-c.zip | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 tests/_data/loaders/acquire/test-windows-fs-c.zip diff --git a/tests/_data/loaders/acquire/test-windows-fs-c.zip b/tests/_data/loaders/acquire/test-windows-fs-c.zip new file mode 100644 index 000000000..9e1300062 --- /dev/null +++ b/tests/_data/loaders/acquire/test-windows-fs-c.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:fa791d1d27f0888e1d799ce0e818a1b21f9372bbab99e3ab818d3ee9f2bd3c1e +size 2418 From 00a606b90e0d53e92aa8ab3dde24df70d1dc19b3 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Wed, 8 Jan 2025 12:43:47 +0100 Subject: [PATCH 10/11] Add support for anon filesystems --- dissect/target/loaders/acquire.py | 2 +- dissect/target/loaders/dir.py | 18 ++++++-- tests/loaders/test_acquire.py | 70 +++++++++++++++---------------- 3 files changed, 49 insertions(+), 41 deletions(-) diff --git a/dissect/target/loaders/acquire.py b/dissect/target/loaders/acquire.py index 0345ed4ac..8a73ad743 100644 --- a/dissect/target/loaders/acquire.py +++ b/dissect/target/loaders/acquire.py @@ -16,7 +16,7 @@ FILESYSTEMS_LEGACY_ROOT = "sysvol" -def _get_root(path: Path) -> Path | None: +def _get_root(path: Path) -> Path | None: if path.is_file(): fh = path.open("rb") if TarFilesystem._detect(fh): diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index 05396b774..4366ce1e8 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re import zipfile from collections import defaultdict from pathlib import Path @@ -17,7 +18,7 @@ from dissect.target import Target PREFIXES = ["", "fs"] - +ANON_FS_RE = re.compile(r"^fs[0-9]+$") class DirLoader(Loader): """Load a directory as a filesystem.""" @@ -91,7 +92,10 @@ def map_dirs( vfs = dfs[0] fs_to_add.append(vfs) - target.fs.mount(drive_letter.lower() + ":", vfs) + mount_letter = drive_letter.lower() + if mount_letter != "$fs$": + mount_letter += ":" + target.fs.mount(mount_letter, vfs) else: fs_to_add.extend(dfs) @@ -138,12 +142,18 @@ def find_dirs(path: Path) -> tuple[str, list[Path]]: if p.name == "sysvol": dirs.append(('c', p)) else: - dirs.append(p) + dirs.append((p.name[0], p)) if not os_type: os_type = os_type_from_path(p) - if not os_type: + if p.name == "$fs$": + dirs.append(('$fs$', p)) + for anon_fs in p.iterdir(): + if ANON_FS_RE.match(anon_fs.name): + dirs.append(anon_fs) + + if len(dirs) == 0: os_type = os_type_from_path(path) dirs = [path] diff --git a/tests/loaders/test_acquire.py b/tests/loaders/test_acquire.py index 226e0a071..7e3446e37 100644 --- a/tests/loaders/test_acquire.py +++ b/tests/loaders/test_acquire.py @@ -4,32 +4,32 @@ from dissect.target import Target from dissect.target.loaders.acquire import AcquireLoader +from dissect.target.loaders.tar import TarLoader from dissect.target.plugins.os.windows._os import WindowsPlugin from tests._utils import absolute_path -# def test_tar_sensitive_drive_letter(target_bare: Target) -> None: -# # TODO: determine if we need this test -# tar_file = absolute_path("_data/loaders/acquire/uppercase_driveletter.tar") -# -# loader = AcquireLoader(Path(tar_file)) -# loader.map(target_bare) -# -# # mounts = / and c: -# assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:"] -# assert "C:" not in target_bare.fs.mounts.keys() -# -# # Initialize our own WindowsPlugin to override the detection -# target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"]) -# target_bare._init_os() -# -# # sysvol is now added -# assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:", "sysvol"] -# -# # WindowsPlugin sets the case sensitivity to False -# assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world" -# assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world" +def test_tar_sensitive_drive_letter(target_bare: Target) -> None: + tar_file = absolute_path("_data/loaders/acquire/uppercase_driveletter.tar") + + loader = AcquireLoader(Path(tar_file)) + assert loader.detect(Path(tar_file)) + loader.map(target_bare) + + # mounts = c: + assert sorted(target_bare.fs.mounts.keys()) == ["c:"] + + # Initialize our own WindowsPlugin to override the detection + target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"]) + target_bare._init_os() + + # sysvol is now added + assert sorted(target_bare.fs.mounts.keys()) == ["c:", "sysvol"] + + # WindowsPlugin sets the case sensitivity to False + assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world" + assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world" @pytest.mark.parametrize( @@ -44,7 +44,10 @@ ], ) def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None: - loader = AcquireLoader(Path(absolute_path(archive))) + path = Path(absolute_path(archive)) + assert AcquireLoader.detect(path) + + loader = AcquireLoader(path) loader.map(target_default) assert WindowsPlugin.detect(target_default) @@ -53,17 +56,12 @@ def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, assert target_default.fs.get(f"{expected_drive_letter}/Windows/System32/foo.txt") -# TODO check this one -# def test_tar_anonymous_filesystems(target_default: Target) -> None: -# tar_file = absolute_path("_data/loaders/tar/test-anon-filesystems.tar") -# -# loader = AcquireLoader(Path(tar_file)) -# loader.map(target_default) -# -# # mounts = $fs$/fs0, $fs$/fs1 and / -# assert len(target_default.fs.mounts) == 3 -# assert "$fs$/fs0" in target_default.fs.mounts.keys() -# assert "$fs$/fs1" in target_default.fs.mounts.keys() -# assert "/" in target_default.fs.mounts.keys() -# assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n" -# assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n" +def test_tar_anonymous_filesystems(target_default: Target) -> None: + tar_file = Path(absolute_path("_data/loaders/acquire/test-anon-filesystems.tar")) + assert AcquireLoader.detect(tar_file) + + loader = AcquireLoader(tar_file) + loader.map(target_default) + + assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n" + assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n" From 4072e20532e62e8513a9689ef2b12e74ca498858 Mon Sep 17 00:00:00 2001 From: Matthijs Vos Date: Wed, 29 Jan 2025 11:44:04 +0100 Subject: [PATCH 11/11] Apply suggestions from code review Co-authored-by: Stefan de Reuver <9864602+Horofic@users.noreply.github.com> --- dissect/target/loaders/acquire.py | 5 +---- dissect/target/loaders/dir.py | 3 ++- tests/loaders/test_acquire.py | 2 -- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/dissect/target/loaders/acquire.py b/dissect/target/loaders/acquire.py index 8a73ad743..04bcb1a47 100644 --- a/dissect/target/loaders/acquire.py +++ b/dissect/target/loaders/acquire.py @@ -49,7 +49,4 @@ def map(self, target: Target) -> None: if fs_root.joinpath(FILESYSTEMS_ROOT).exists(): fs_root = fs_root.joinpath(FILESYSTEMS_ROOT) - find_and_map_dirs( - target, - fs_root - ) + find_and_map_dirs(target, fs_root) diff --git a/dissect/target/loaders/dir.py b/dissect/target/loaders/dir.py index 4366ce1e8..ff8621b21 100644 --- a/dissect/target/loaders/dir.py +++ b/dissect/target/loaders/dir.py @@ -7,8 +7,8 @@ from typing import TYPE_CHECKING from dissect.target.filesystem import LayerFilesystem -from dissect.target.filesystems.tar import TarFilesystem from dissect.target.filesystems.dir import DirectoryFilesystem +from dissect.target.filesystems.tar import TarFilesystem from dissect.target.filesystems.zip import ZipFilesystem from dissect.target.helpers import loaderutil from dissect.target.loader import Loader @@ -20,6 +20,7 @@ PREFIXES = ["", "fs"] ANON_FS_RE = re.compile(r"^fs[0-9]+$") + class DirLoader(Loader): """Load a directory as a filesystem.""" diff --git a/tests/loaders/test_acquire.py b/tests/loaders/test_acquire.py index 7e3446e37..6e6f51a06 100644 --- a/tests/loaders/test_acquire.py +++ b/tests/loaders/test_acquire.py @@ -4,12 +4,10 @@ from dissect.target import Target from dissect.target.loaders.acquire import AcquireLoader -from dissect.target.loaders.tar import TarLoader from dissect.target.plugins.os.windows._os import WindowsPlugin from tests._utils import absolute_path - def test_tar_sensitive_drive_letter(target_bare: Target) -> None: tar_file = absolute_path("_data/loaders/acquire/uppercase_driveletter.tar")