Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Acquire collection loader #935

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
10 changes: 8 additions & 2 deletions dissect/target/loaders/acquire.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
log = logging.getLogger(__name__)

FILESYSTEMS_ROOT = "fs"
FILESYSTEMS_LEGACY_ROOT = "sysvol"


def _get_root(path: Path):
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -41,10 +42,15 @@ def detect(path: Path) -> bool:
if not root:
return False

return root.joinpath(FILESYSTEMS_ROOT).exists()
return root.joinpath(FILESYSTEMS_ROOT).exists() or root.joinpath(FILESYSTEMS_LEGACY_ROOT).exists()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be very slow on large tar files, where there will be a double performance penalty. First the entire tar file will be parsed just for this check, then if it exists, it will be parsed again in __init__. However, if it doesn't exist, it will still be parsed again in the actual tar loader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree, but is there a way to prevent that? As we do need to check if the fs directory exists in the file, since otherwise the tar is not an acquire collect and thus need to be handled by the TarLoader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Ideally" (not really, since it's only for performance and the code path itself will be more confusing) it's a subpath in the zip and tar loaders. So you piggy back on the detection logic of those, and upon mapping you do a fast check to see if you need to divert logic to Acquire logic (which could exist in loaders/acquire.py.

At least, that's the "best" idea I can come up with right now.


def map(self, target: Target) -> None:
# Handle both root dir 'fs' and 'sysvol' (legacy)
fs_root = self.root
if fs_root.joinpath(FILESYSTEMS_ROOT).exists():
fs_root = fs_root.joinpath(FILESYSTEMS_ROOT)

find_and_map_dirs(
target,
self.root.joinpath(FILESYSTEMS_ROOT)
fs_root
)
5 changes: 4 additions & 1 deletion dissect/target/loaders/dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ def find_dirs(path: Path) -> tuple[str, list[Path]]:
for p in path.iterdir():
# Look for directories like C or C:
if p.is_dir() and (is_drive_letter_path(p) or p.name in ("sysvol", "$rootfs$")):
dirs.append(p)
if p.name == "sysvol":
dirs.append(('c', p))
else:
dirs.append(p)

if not os_type:
os_type = os_type_from_path(p)
Expand Down
70 changes: 70 additions & 0 deletions tests/loaders/test_acquire.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pathlib import Path

import pytest

from dissect.target import Target
from dissect.target.loaders.acquire import AcquireLoader
from dissect.target.loaders.tar import TarLoader
from dissect.target.plugins.os.windows._os import WindowsPlugin
from tests._utils import absolute_path
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved



Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
# def test_tar_sensitive_drive_letter(target_bare: Target) -> None:
# # TODO: determine if we need this test
# tar_file = absolute_path("_data/loaders/acquire/uppercase_driveletter.tar")
#
# loader = AcquireLoader(Path(tar_file))
# loader.map(target_bare)
#
# # mounts = / and c:
# assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:"]
# assert "C:" not in target_bare.fs.mounts.keys()
#
# # Initialize our own WindowsPlugin to override the detection
# target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"])
# target_bare._init_os()
#
# # sysvol is now added
# assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:", "sysvol"]
#
# # WindowsPlugin sets the case sensitivity to False
# assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world"
# assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world"
Horofic marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize(
"archive, expected_drive_letter",
[
("_data/loaders/acquire/test-windows-sysvol-absolute.tar", "c:"), # C: due to backwards compatibility
("_data/loaders/acquire/test-windows-sysvol-relative.tar", "c:"), # C: due to backwards compatibility
("_data/loaders/acquire/test-windows-fs-c-relative.tar", "c:"),
("_data/loaders/acquire/test-windows-fs-c-absolute.tar", "c:"),
("_data/loaders/acquire/test-windows-fs-x.tar", "x:"),
("_data/loaders/acquire/test-windows-fs-c.zip", "c:"),
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
],
)
def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None:
loader = AcquireLoader(Path(absolute_path(archive)))
loader.map(target_default)

assert WindowsPlugin.detect(target_default)
# NOTE: for the sysvol archives, this also tests the backwards compatibility
assert sorted(target_default.fs.mounts.keys()) == [expected_drive_letter]
assert target_default.fs.get(f"{expected_drive_letter}/Windows/System32/foo.txt")


# TODO check this one
# def test_tar_anonymous_filesystems(target_default: Target) -> None:
# tar_file = absolute_path("_data/loaders/tar/test-anon-filesystems.tar")
#
# loader = AcquireLoader(Path(tar_file))
# loader.map(target_default)
#
# # mounts = $fs$/fs0, $fs$/fs1 and /
# assert len(target_default.fs.mounts) == 3
# assert "$fs$/fs0" in target_default.fs.mounts.keys()
# assert "$fs$/fs1" in target_default.fs.mounts.keys()
# assert "/" in target_default.fs.mounts.keys()
# assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n"
# assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n"
Horofic marked this conversation as resolved.
Show resolved Hide resolved
68 changes: 9 additions & 59 deletions tests/loaders/test_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

from dissect.target import Target
from dissect.target.loaders.tar import TarLoader
from dissect.target.plugins.os.windows._os import WindowsPlugin
from tests._utils import absolute_path


def test_tar_loader_compressed_tar_file(target_win: Target) -> None:
archive_path = absolute_path("_data/loaders/tar/test-archive.tar.gz")
@pytest.mark.parametrize(
"archive",
[
"_data/loaders/tar/test-archive.tar",
"_data/loaders/tar/test-archive.tar.gz",
],
)
def test_tar_loader_compressed_tar_file(target_win: Target, archive) -> None:
archive_path = absolute_path(archive)

loader = TarLoader(archive_path)
loader.map(target_win)
Expand All @@ -20,28 +26,6 @@ def test_tar_loader_compressed_tar_file(target_win: Target) -> None:
assert test_file.open().read() == b"test-value\n"


def test_tar_sensitive_drive_letter(target_bare: Target) -> None:
tar_file = absolute_path("_data/loaders/tar/uppercase_driveletter.tar")

loader = TarLoader(tar_file)
loader.map(target_bare)

# mounts = / and c:
assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:"]
assert "C:" not in target_bare.fs.mounts.keys()

# Initialize our own WindowsPlugin to override the detection
target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"])
target_bare._init_os()

# sysvol is now added
assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:", "sysvol"]

# WindowsPlugin sets the case sensitivity to False
assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world"
assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world"


def test_tar_loader_compressed_tar_file_with_empty_dir(target_unix: Target) -> None:
archive_path = absolute_path("_data/loaders/tar/test-archive-empty-folder.tgz")
loader = TarLoader(archive_path)
Expand All @@ -55,37 +39,3 @@ def test_tar_loader_compressed_tar_file_with_empty_dir(target_unix: Target) -> N
empty_folder = target_unix.fs.path("test/empty_dir")
assert empty_folder.exists()
assert empty_folder.is_dir()


@pytest.mark.parametrize(
"archive, expected_drive_letter",
[
("_data/loaders/tar/test-windows-sysvol-absolute.tar", "c:"), # C: due to backwards compatibility
("_data/loaders/tar/test-windows-sysvol-relative.tar", "c:"), # C: due to backwards compatibility
("_data/loaders/tar/test-windows-fs-c-relative.tar", "c:"),
("_data/loaders/tar/test-windows-fs-c-absolute.tar", "c:"),
("_data/loaders/tar/test-windows-fs-x.tar", "x:"),
],
)
def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None:
loader = TarLoader(absolute_path(archive))
loader.map(target_default)

assert WindowsPlugin.detect(target_default)
# NOTE: for the sysvol archives, this also tests the backwards compatibility
assert sorted(target_default.fs.mounts.keys()) == [expected_drive_letter]


def test_tar_anonymous_filesystems(target_default: Target) -> None:
tar_file = absolute_path("_data/loaders/tar/test-anon-filesystems.tar")

loader = TarLoader(tar_file)
loader.map(target_default)

# mounts = $fs$/fs0, $fs$/fs1 and /
assert len(target_default.fs.mounts) == 3
assert "$fs$/fs0" in target_default.fs.mounts.keys()
assert "$fs$/fs1" in target_default.fs.mounts.keys()
assert "/" in target_default.fs.mounts.keys()
assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n"
assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n"
Loading