Skip to content

Commit

Permalink
Prevent enumerating entire non-ISO formatted syslog files in `is_iso_…
Browse files Browse the repository at this point in the history
…fmt` (#972)
JSCU-CNI authored Jan 30, 2025
1 parent e81af5f commit 7869285
Showing 2 changed files with 55 additions and 5 deletions.
18 changes: 13 additions & 5 deletions dissect/target/plugins/os/unix/log/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import itertools
from __future__ import annotations

import logging
import re
from datetime import datetime
@@ -22,12 +23,17 @@
)


def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]:
def iso_readlines(file: Path, max_lines: int | None = None) -> Iterator[tuple[datetime, str]]:
"""Iterator reading the provided log file in ISO format. Mimics ``year_rollover_helper`` behaviour."""
with open_decompress(file, "rt") as fh:
for line in fh:
for i, line in enumerate(fh):
if max_lines is not None and i >= max_lines:
log.debug("Stopping iso_readlines enumeration in %s: max_lines=%s was reached", file, max_lines)
break

if not (match := RE_TS_ISO.match(line)):
log.warning("No timestamp found in one of the lines in %s!", file)
if not max_lines:
log.warning("No timestamp found in one of the lines in %s!", file)
log.debug("Skipping line: %s", line)
continue

@@ -43,4 +49,6 @@ def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]:

def is_iso_fmt(file: Path) -> bool:
"""Determine if the provided log file uses ISO 8601 timestamp format logging or not."""
return any(itertools.islice(iso_readlines(file), 0, 2))
# We do not want to iterate of the entire file so we limit iso_readlines to the first few lines.
# We can not use islice here since that would only work if the file is ISO formatted and thus yields results.
return any(iso_readlines(file, max_lines=3))
42 changes: 42 additions & 0 deletions tests/plugins/os/unix/log/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import gzip
import textwrap
from io import BytesIO

import pytest

from dissect.target.filesystem import VirtualFilesystem
from dissect.target.plugins.os.unix.log.helpers import is_iso_fmt, iso_readlines

syslog = """\
Dec 31 03:14:15 localhost systemd[1]: Starting Journal Service...
Jan 1 13:21:34 localhost systemd: Stopped target Swap.
Jan 2 03:14:15 localhost systemd[1]: Starting Journal Service...
Jan 3 13:21:34 localhost systemd: Stopped target Swap.
2024-12-31T13:37:00.123456+02:00 hostname systemd[1]: Started anacron.service - Run anacron jobs.
2024-12-31T13:37:00.123456+02:00 hostname anacron[1337]: Anacron 2.3 started on 2024-12-31
2024-12-31T13:37:00.123456+02:00 hostname anacron[1337]: Normal exit (0 jobs run)
2024-12-31T13:37:00.123456+02:00 hostname systemd[1]: anacron.service: Deactivated successfully.
"""


@pytest.mark.parametrize(
"max_lines, expected_return_value",
[
(3, False),
(4, False),
(5, True),
(9, True),
],
)
def test_iso_readlines_max_lines(fs_unix: VirtualFilesystem, max_lines: int, expected_return_value: bool) -> None:
"""assert that iso_readlines does not parse more than the provided max_lines"""

fs_unix.map_file_fh("/var/log/syslog.2", BytesIO(gzip.compress(textwrap.dedent(syslog).encode())))
assert any(iso_readlines(fs_unix.path("/var/log/syslog.2"), max_lines)) == expected_return_value


def test_is_iso_fmt(fs_unix: VirtualFilesystem) -> None:
"""assert that is_iso_fmt does not parse more than three max_lines"""

fs_unix.map_file_fh("/var/log/syslog.3", BytesIO(gzip.compress(textwrap.dedent(syslog).encode())))
assert not is_iso_fmt(fs_unix.path("/var/log/syslog.3"))

0 comments on commit 7869285

Please sign in to comment.