Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Acquire collection loader #935

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader:
register("remote", "RemoteLoader")
register("mqtt", "MQTTLoader")
register("asdf", "AsdfLoader")
register("acquire", "AcquireLoader")
register("tar", "TarLoader")
register("vmx", "VmxLoader")
register("vmwarevm", "VmwarevmLoader")
Expand Down
55 changes: 55 additions & 0 deletions dissect/target/loaders/acquire.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

import logging
import zipfile
from pathlib import Path

from dissect.target.filesystems.tar import TarFilesystem
from dissect.target.filesystems.zip import ZipFilesystem
from dissect.target.loader import Loader
from dissect.target.loaders.dir import find_and_map_dirs
from dissect.target.target import Target

log = logging.getLogger(__name__)

FILESYSTEMS_ROOT = "fs"
FILESYSTEMS_LEGACY_ROOT = "sysvol"


def _get_root(path: Path) -> Path | None:
if path.is_file():
fh = path.open("rb")
if TarFilesystem._detect(fh):
return TarFilesystem(fh).path()

if ZipFilesystem._detect(fh):
return zipfile.Path(path.open("rb"))

return None


class AcquireLoader(Loader):
def __init__(self, path: Path, **kwargs):
super().__init__(path)

self.root = _get_root(path)

@staticmethod
def detect(path: Path) -> bool:
root = _get_root(path)

if not root:
return False

return root.joinpath(FILESYSTEMS_ROOT).exists() or root.joinpath(FILESYSTEMS_LEGACY_ROOT).exists()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be very slow on large tar files, where there will be a double performance penalty. First the entire tar file will be parsed just for this check, then if it exists, it will be parsed again in __init__. However, if it doesn't exist, it will still be parsed again in the actual tar loader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree, but is there a way to prevent that? As we do need to check if the fs directory exists in the file, since otherwise the tar is not an acquire collect and thus need to be handled by the TarLoader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Ideally" (not really, since it's only for performance and the code path itself will be more confusing) it's a subpath in the zip and tar loaders. So you piggy back on the detection logic of those, and upon mapping you do a fast check to see if you need to divert logic to Acquire logic (which could exist in loaders/acquire.py.

At least, that's the "best" idea I can come up with right now.


def map(self, target: Target) -> None:
# Handle both root dir 'fs' and 'sysvol' (legacy)
fs_root = self.root
if fs_root.joinpath(FILESYSTEMS_ROOT).exists():
fs_root = fs_root.joinpath(FILESYSTEMS_ROOT)

find_and_map_dirs(
target,
fs_root
)
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
26 changes: 22 additions & 4 deletions dissect/target/loaders/dir.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import re
import zipfile
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING

from dissect.target.filesystem import LayerFilesystem
from dissect.target.filesystems.tar import TarFilesystem
from dissect.target.filesystems.dir import DirectoryFilesystem
from dissect.target.filesystems.zip import ZipFilesystem
from dissect.target.helpers import loaderutil
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -16,7 +18,7 @@
from dissect.target import Target

PREFIXES = ["", "fs"]

ANON_FS_RE = re.compile(r"^fs[0-9]+$")

Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
class DirLoader(Loader):
"""Load a directory as a filesystem."""
Expand All @@ -43,6 +45,7 @@ def map_dirs(
*,
dirfs: type[DirectoryFilesystem] = DirectoryFilesystem,
zipfs: type[ZipFilesystem] = ZipFilesystem,
tarfs: type[TarFilesystem] = TarFilesystem,
**kwargs,
) -> None:
"""Map directories as filesystems into the given target.
Expand All @@ -53,6 +56,7 @@ def map_dirs(
os_type: The operating system type, used to determine how the filesystem should be mounted.
dirfs: The filesystem class to use for directory filesystems.
zipfs: The filesystem class to use for ZIP filesystems.
tarfs: The filesystem class to use for TAR filesystems.
"""
alt_separator = ""
case_sensitive = True
Expand All @@ -70,6 +74,8 @@ def map_dirs(

if isinstance(path, zipfile.Path):
dfs = zipfs(path.root.fp, path.at, alt_separator=alt_separator, case_sensitive=case_sensitive)
elif hasattr(path, "_fs") and isinstance(path._fs, TarFilesystem):
dfs = tarfs(path._fs.tar.fileobj, str(path), alt_separator=alt_separator, case_sensitive=case_sensitive)
else:
dfs = dirfs(path, alt_separator=alt_separator, case_sensitive=case_sensitive)

Expand All @@ -86,7 +92,10 @@ def map_dirs(
vfs = dfs[0]

fs_to_add.append(vfs)
target.fs.mount(drive_letter.lower() + ":", vfs)
mount_letter = drive_letter.lower()
if mount_letter != "$fs$":
mount_letter += ":"
target.fs.mount(mount_letter, vfs)
else:
fs_to_add.extend(dfs)

Expand Down Expand Up @@ -130,12 +139,21 @@ def find_dirs(path: Path) -> tuple[str, list[Path]]:
for p in path.iterdir():
# Look for directories like C or C:
if p.is_dir() and (is_drive_letter_path(p) or p.name in ("sysvol", "$rootfs$")):
dirs.append(p)
if p.name == "sysvol":
dirs.append(('c', p))
else:
dirs.append((p.name[0], p))
Comment on lines +143 to +146
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes what the function returns. Not bad necessarily, but it might break some other loaders that use this function. So those would need to be changed.


if not os_type:
os_type = os_type_from_path(p)

if not os_type:
if p.name == "$fs$":
dirs.append(('$fs$', p))
for anon_fs in p.iterdir():
if ANON_FS_RE.match(anon_fs.name):
dirs.append(anon_fs)

if len(dirs) == 0:
os_type = os_type_from_path(path)
dirs = [path]

Expand Down
59 changes: 8 additions & 51 deletions dissect/target/loaders/tar.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import logging
import re
import tarfile
from pathlib import Path

Expand All @@ -16,9 +15,6 @@
log = logging.getLogger(__name__)


ANON_FS_RE = re.compile(r"^fs[0-9]+$")
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved


class TarLoader(Loader):
"""Load tar files."""

Expand Down Expand Up @@ -50,52 +46,14 @@ def map(self, target: target.Target) -> None:
if member.name == ".":
continue

if not member.name.startswith(("/fs/", "fs/", "/sysvol/", "sysvol/")):
# Not an acquire tar
if "/" not in volumes:
vol = filesystem.VirtualFilesystem(case_sensitive=True)
vol.tar = self.tar
volumes["/"] = vol
target.filesystems.add(vol)

volume = volumes["/"]
mname = member.name
else:
if member.name.startswith(("/fs/", "fs/")):
# Current acquire
parts = member.name.replace("fs/", "").split("/")
if parts[0] == "":
parts.pop(0)
else:
# Legacy acquire
parts = member.name.lstrip("/").split("/")
volume_name = parts[0].lower()

# NOTE: older versions of acquire would write to "sysvol" instead of a driver letter
# Figuring out the sysvol from the drive letters is easier than the drive letter from "sysvol",
# so this was swapped in acquire 3.12. Now we map all volumes to a drive letter and let the
# Windows OS plugin figure out which is the sysvol
# For backwards compatibility we're forced to keep this check, and assume that "c:" is our sysvol
if volume_name == "sysvol":
volume_name = "c:"

if volume_name == "$fs$":
if len(parts) == 1:
# The fs/$fs$ entry is ignored, only the directories below it are processed.
continue
fs_name = parts[1]
if ANON_FS_RE.match(fs_name):
parts.pop(0)
volume_name = f"{volume_name}/{fs_name}"

if volume_name not in volumes:
vol = filesystem.VirtualFilesystem(case_sensitive=False)
vol.tar = self.tar
volumes[volume_name] = vol
target.filesystems.add(vol)

volume = volumes[volume_name]
mname = "/".join(parts[1:])
if "/" not in volumes:
vol = filesystem.VirtualFilesystem(case_sensitive=True)
vol.tar = self.tar
volumes["/"] = vol
target.filesystems.add(vol)

volume = volumes["/"]
mname = member.name

entry_cls = TarFilesystemDirectoryEntry if member.isdir() else TarFilesystemEntry
entry = entry_cls(volume, fsutil.normpath(mname), member)
Expand All @@ -107,7 +65,6 @@ def map(self, target: target.Target) -> None:
vol,
usnjrnl_path=[
"$Extend/$Usnjrnl:$J",
"$Extend/$Usnjrnl:J", # Old versions of acquire used $Usnjrnl:J
],
)

Expand Down
3 changes: 3 additions & 0 deletions tests/_data/loaders/acquire/test-windows-fs-c.zip
Horofic marked this conversation as resolved.
Show resolved Hide resolved
Git LFS file not shown
67 changes: 67 additions & 0 deletions tests/loaders/test_acquire.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from pathlib import Path

import pytest

from dissect.target import Target
from dissect.target.loaders.acquire import AcquireLoader
from dissect.target.loaders.tar import TarLoader
from dissect.target.plugins.os.windows._os import WindowsPlugin
from tests._utils import absolute_path
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved



Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
def test_tar_sensitive_drive_letter(target_bare: Target) -> None:
tar_file = absolute_path("_data/loaders/acquire/uppercase_driveletter.tar")

loader = AcquireLoader(Path(tar_file))
assert loader.detect(Path(tar_file))
loader.map(target_bare)

# mounts = c:
assert sorted(target_bare.fs.mounts.keys()) == ["c:"]

# Initialize our own WindowsPlugin to override the detection
target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"])
target_bare._init_os()

# sysvol is now added
assert sorted(target_bare.fs.mounts.keys()) == ["c:", "sysvol"]

# WindowsPlugin sets the case sensitivity to False
assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world"
assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world"


@pytest.mark.parametrize(
"archive, expected_drive_letter",
[
("_data/loaders/acquire/test-windows-sysvol-absolute.tar", "c:"), # C: due to backwards compatibility
("_data/loaders/acquire/test-windows-sysvol-relative.tar", "c:"), # C: due to backwards compatibility
("_data/loaders/acquire/test-windows-fs-c-relative.tar", "c:"),
("_data/loaders/acquire/test-windows-fs-c-absolute.tar", "c:"),
("_data/loaders/acquire/test-windows-fs-x.tar", "x:"),
("_data/loaders/acquire/test-windows-fs-c.zip", "c:"),
Matthijsy marked this conversation as resolved.
Show resolved Hide resolved
],
)
def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None:
path = Path(absolute_path(archive))
assert AcquireLoader.detect(path)

loader = AcquireLoader(path)
loader.map(target_default)

assert WindowsPlugin.detect(target_default)
# NOTE: for the sysvol archives, this also tests the backwards compatibility
assert sorted(target_default.fs.mounts.keys()) == [expected_drive_letter]
assert target_default.fs.get(f"{expected_drive_letter}/Windows/System32/foo.txt")


def test_tar_anonymous_filesystems(target_default: Target) -> None:
tar_file = Path(absolute_path("_data/loaders/acquire/test-anon-filesystems.tar"))
assert AcquireLoader.detect(tar_file)

loader = AcquireLoader(tar_file)
loader.map(target_default)

assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n"
assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n"
68 changes: 9 additions & 59 deletions tests/loaders/test_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

from dissect.target import Target
from dissect.target.loaders.tar import TarLoader
from dissect.target.plugins.os.windows._os import WindowsPlugin
from tests._utils import absolute_path


def test_tar_loader_compressed_tar_file(target_win: Target) -> None:
archive_path = absolute_path("_data/loaders/tar/test-archive.tar.gz")
@pytest.mark.parametrize(
"archive",
[
"_data/loaders/tar/test-archive.tar",
"_data/loaders/tar/test-archive.tar.gz",
],
)
def test_tar_loader_compressed_tar_file(target_win: Target, archive) -> None:
archive_path = absolute_path(archive)

loader = TarLoader(archive_path)
loader.map(target_win)
Expand All @@ -20,28 +26,6 @@ def test_tar_loader_compressed_tar_file(target_win: Target) -> None:
assert test_file.open().read() == b"test-value\n"


def test_tar_sensitive_drive_letter(target_bare: Target) -> None:
tar_file = absolute_path("_data/loaders/tar/uppercase_driveletter.tar")

loader = TarLoader(tar_file)
loader.map(target_bare)

# mounts = / and c:
assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:"]
assert "C:" not in target_bare.fs.mounts.keys()

# Initialize our own WindowsPlugin to override the detection
target_bare._os_plugin = WindowsPlugin.create(target_bare, target_bare.fs.mounts["c:"])
target_bare._init_os()

# sysvol is now added
assert sorted(target_bare.fs.mounts.keys()) == ["/", "c:", "sysvol"]

# WindowsPlugin sets the case sensitivity to False
assert target_bare.fs.get("C:/test.file").open().read() == b"hello_world"
assert target_bare.fs.get("c:/test.file").open().read() == b"hello_world"


def test_tar_loader_compressed_tar_file_with_empty_dir(target_unix: Target) -> None:
archive_path = absolute_path("_data/loaders/tar/test-archive-empty-folder.tgz")
loader = TarLoader(archive_path)
Expand All @@ -55,37 +39,3 @@ def test_tar_loader_compressed_tar_file_with_empty_dir(target_unix: Target) -> N
empty_folder = target_unix.fs.path("test/empty_dir")
assert empty_folder.exists()
assert empty_folder.is_dir()


@pytest.mark.parametrize(
"archive, expected_drive_letter",
[
("_data/loaders/tar/test-windows-sysvol-absolute.tar", "c:"), # C: due to backwards compatibility
("_data/loaders/tar/test-windows-sysvol-relative.tar", "c:"), # C: due to backwards compatibility
("_data/loaders/tar/test-windows-fs-c-relative.tar", "c:"),
("_data/loaders/tar/test-windows-fs-c-absolute.tar", "c:"),
("_data/loaders/tar/test-windows-fs-x.tar", "x:"),
],
)
def test_tar_loader_windows_sysvol_formats(target_default: Target, archive: str, expected_drive_letter: str) -> None:
loader = TarLoader(absolute_path(archive))
loader.map(target_default)

assert WindowsPlugin.detect(target_default)
# NOTE: for the sysvol archives, this also tests the backwards compatibility
assert sorted(target_default.fs.mounts.keys()) == [expected_drive_letter]


def test_tar_anonymous_filesystems(target_default: Target) -> None:
tar_file = absolute_path("_data/loaders/tar/test-anon-filesystems.tar")

loader = TarLoader(tar_file)
loader.map(target_default)

# mounts = $fs$/fs0, $fs$/fs1 and /
assert len(target_default.fs.mounts) == 3
assert "$fs$/fs0" in target_default.fs.mounts.keys()
assert "$fs$/fs1" in target_default.fs.mounts.keys()
assert "/" in target_default.fs.mounts.keys()
assert target_default.fs.get("$fs$/fs0/foo").open().read() == b"hello world\n"
assert target_default.fs.get("$fs$/fs1/bar").open().read() == b"hello world\n"