Skip to content

Commit

Permalink
Fix invalid Windows XML Task error (#986)
Browse files Browse the repository at this point in the history
Horofic authored Jan 13, 2025
1 parent 214b438 commit bdfc048
Showing 4 changed files with 60 additions and 25 deletions.
8 changes: 5 additions & 3 deletions dissect/target/plugins/os/windows/task_helpers/tasks_xml.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import warnings
from typing import Iterator, Optional
from xml.etree.ElementTree import Element
@@ -157,8 +159,8 @@ def strip_namespace(self, data: Element) -> Element:
return data

def get_element(
self, xml_path: str, xml_data: Optional[Element] = None, attribute: Optional[str] = None
) -> Optional[str]:
self, xml_path: str, xml_data: Element | None = None, attribute: Optional[str] = None
) -> str | None:
"""Get the value of the specified XML element.
Args:
@@ -179,7 +181,7 @@ def get_element(

return data.text

def get_raw(self, xml_path: Optional[str] = None) -> str:
def get_raw(self, xml_path: str | None = None) -> str:
"""Get the raw XML data of the specified element.
Args:
18 changes: 9 additions & 9 deletions dissect/target/plugins/os/windows/tasks.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
from __future__ import annotations

import logging
import warnings
from typing import Iterator, Union
from typing import Iterator

from flow.record import GroupedRecord

from dissect.target import Target
from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.exceptions import InvalidTaskError, UnsupportedPluginError
from dissect.target.helpers.record import DynamicDescriptor, TargetRecordDescriptor
from dissect.target.plugin import Plugin, export
from dissect.target.plugins.os.windows.task_helpers.tasks_job import AtTask
from dissect.target.plugins.os.windows.task_helpers.tasks_xml import ScheduledTasks

warnings.simplefilter(action="ignore", category=FutureWarning)
log = logging.getLogger(__name__)

TaskRecord = TargetRecordDescriptor(
"filesystem/windows/task",
[
@@ -118,7 +113,7 @@ def check_compatible(self) -> None:
raise UnsupportedPluginError("No task files")

@export(record=DynamicDescriptor(["path", "datetime"]))
def tasks(self) -> Iterator[Union[TaskRecord, GroupedRecord]]:
def tasks(self) -> Iterator[TaskRecord | GroupedRecord]:
"""Return all scheduled tasks on a Windows system.
On a Windows system, a scheduled task is a program or script that is executed on a specific time or at specific
@@ -132,7 +127,12 @@ def tasks(self) -> Iterator[Union[TaskRecord, GroupedRecord]]:
"""
for task_file in self.task_files:
if not task_file.suffix or task_file.suffix == ".xml":
task_objects = ScheduledTasks(task_file).tasks
try:
task_objects = ScheduledTasks(task_file).tasks
except InvalidTaskError as e:
self.target.log.warning("Invalid task file encountered: %s", task_file)
self.target.log.debug("", exc_info=e)
continue
else:
task_objects = [AtTask(task_file, self.target)]

3 changes: 3 additions & 0 deletions tests/_data/plugins/os/windows/tasks/InvalidTask
Git LFS file not shown
56 changes: 43 additions & 13 deletions tests/plugins/os/windows/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import logging
import re
from datetime import datetime, timezone
from typing import Callable

import pytest
from flow.record import GroupedRecord

from dissect.target.plugins.os.windows.tasks import TasksPlugin
from dissect.target.filesystem import Filesystem
from dissect.target.plugins.os.windows.tasks import TaskRecord, TasksPlugin
from dissect.target.target import Target
from tests._utils import absolute_path


@pytest.fixture
def setup_tasks_test(target_win, fs_win):
def setup_tasks_test(target_win: Target, fs_win: Filesystem) -> None:
xml_task_file = absolute_path("_data/plugins/os/windows/tasks/MapsToastTask")
atjob_task_file = absolute_path("_data/plugins/os/windows/tasks/AtTask.job")

@@ -22,7 +26,20 @@ def setup_tasks_test(target_win, fs_win):
target_win.add_plugin(TasksPlugin)


def assert_xml_task_properties(xml_task):
@pytest.fixture
def setup_invalid_tasks_test(target_win: Target, fs_win: Filesystem, setup_tasks_test) -> None:
xml_task_file_invalid = absolute_path("_data/plugins/os/windows/tasks/InvalidTask")

fs_win.map_file("windows/system32/tasks/Microsoft/Windows/Maps/InvalidTask", xml_task_file_invalid)
fs_win.map_file(
"windows/system32/GroupPolicy/DataStore/ANY_SID/Machine/Preferences/ScheduledTasks/invalid_xml.xml",
xml_task_file_invalid,
)

target_win.add_plugin(TasksPlugin)


def assert_xml_task_properties(xml_task: TaskRecord) -> None:
assert str(xml_task.uri) == "\\Microsoft\\Windows\\Maps\\MapsToastTask"
assert (
xml_task.security_descriptor
@@ -70,7 +87,7 @@ def assert_xml_task_properties(xml_task):
assert xml_task.data is None


def assert_at_task_properties(at_task):
def assert_at_task_properties(at_task: TaskRecord) -> None:
assert at_task.uri is None
assert at_task.security_descriptor is None
assert str(at_task.task_path) == "sysvol\\windows\\tasks\\AtTask.job"
@@ -115,20 +132,20 @@ def assert_at_task_properties(at_task):
assert at_task.data == "[]"


def assert_xml_task_grouped_properties(xml_task_grouped):
def assert_xml_task_grouped_properties(xml_task_grouped: GroupedRecord) -> None:
assert xml_task_grouped.action_type == "ComHandler"
assert xml_task_grouped.class_id == "{9885AEF2-BD9F-41E0-B15E-B3141395E803}"
assert xml_task_grouped.data is None


def assert_at_task_grouped_exec(at_task_grouped):
def assert_at_task_grouped_exec(at_task_grouped: GroupedRecord) -> None:
assert at_task_grouped.action_type == "Exec"
assert at_task_grouped.arguments == ""
assert at_task_grouped.command == "C:\\WINDOWS\\NOTEPAD.EXE"
assert at_task_grouped.working_directory == "C:\\Documents and Settings\\John"


def assert_at_task_grouped_daily(at_task_grouped):
def assert_at_task_grouped_daily(at_task_grouped: GroupedRecord) -> None:
assert at_task_grouped.days_between_triggers == 3
assert at_task_grouped.end_boundary == "2023-05-12"
assert at_task_grouped.execution_time_limit == "P3D"
@@ -139,13 +156,13 @@ def assert_at_task_grouped_daily(at_task_grouped):
assert_at_task_grouped_padding(at_task_grouped)


def assert_at_task_grouped_padding(at_task_grouped):
def assert_at_task_grouped_padding(at_task_grouped: GroupedRecord) -> None:
assert at_task_grouped.padding == 0
assert at_task_grouped.reserved2 == 0
assert at_task_grouped.reserved3 == 0


def assert_at_task_grouped_monthlydow(at_task_grouped):
def assert_at_task_grouped_monthlydow(at_task_grouped: GroupedRecord) -> None:
assert at_task_grouped.records[1].enabled == "True"
assert at_task_grouped.start_boundary == "2023-05-11"
assert at_task_grouped.end_boundary == "2023-05-20"
@@ -159,7 +176,7 @@ def assert_at_task_grouped_monthlydow(at_task_grouped):
assert_at_task_grouped_padding(at_task_grouped)


def assert_at_task_grouped_weekly(at_task_grouped):
def assert_at_task_grouped_weekly(at_task_grouped: GroupedRecord) -> None:
assert at_task_grouped.records[1].enabled == "True"
assert at_task_grouped.end_boundary == "2023-05-27"
assert at_task_grouped.execution_time_limit == "P3D"
@@ -173,7 +190,7 @@ def assert_at_task_grouped_weekly(at_task_grouped):
assert_at_task_grouped_padding(at_task_grouped)


def assert_at_task_grouped_monthly_date(at_task_grouped):
def assert_at_task_grouped_monthly_date(at_task_grouped: GroupedRecord) -> None:
assert at_task_grouped.day_of_month == "15"
assert at_task_grouped.months_of_year == ["March", "May", "June", "July", "August", "October"]
assert at_task_grouped.records[1].enabled == "True"
@@ -193,7 +210,9 @@ def assert_at_task_grouped_monthly_date(at_task_grouped):
(assert_at_task_properties, "AtTask"),
],
)
def test_single_record_properties(target_win, setup_tasks_test, assert_func, marker):
def test_single_record_properties(
target_win: Target, setup_tasks_test: pytest.fixture, assert_func: Callable, marker: str
) -> None:
records = list(target_win.tasks())
assert len(records) == 10
pat = re.compile(rf"{marker}")
@@ -213,9 +232,20 @@ def test_single_record_properties(target_win, setup_tasks_test, assert_func, mar
(assert_at_task_grouped_monthly_date, "2023-05-29"),
],
)
def test_grouped_record_properties(target_win, setup_tasks_test, assert_func, marker):
def test_grouped_record_properties(
target_win, setup_invalid_tasks_test: pytest.fixture, assert_func: Callable, marker: str
) -> None:
records = list(target_win.tasks())
assert len(records) == 10
pat = re.compile(rf"{marker}")
grouped_records = filter(lambda x: re.findall(pat, str(x)) and isinstance(x, GroupedRecord), records)
assert_func(list(grouped_records)[0])


def test_xml_task_invalid(
target_win: Target, setup_invalid_tasks_test: pytest.fixture, caplog: pytest.LogCaptureFixture
) -> None:
caplog.clear()
with caplog.at_level(logging.WARNING):
assert len(list(target_win.tasks())) == 10
assert "Invalid task file encountered:" in caplog.text

0 comments on commit bdfc048

Please sign in to comment.