Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source)!: use uncompressed content for fingerprinting files (lines and ignored_header_bytes) #22050

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

roykim98
Copy link

@roykim98 roykim98 commented Dec 18, 2024

Summary

This PR addresses #13193.

This PR will handle the case where attempting to fingerprint compressed content. It will also handle a use case with log rotation, where an uncompressed active file might be monitored and then rotated into a compressed file with a new inode by a log rotation service. By comparing the uncompressed lines as a source of truth, it prevents messages from processing the files 2X.

I chose to explicitly check the magic bytes header by enumerating the signatures to make checking the different headers consistent. I checked a couple compression libraries, and I figured it would be complicated to try to move the ownership of a reader into multiple decoders with inconsistent functions that did not all permit easy checking of the magic header. Of course, if only gzip is expected to be supported for the foreseeable future, this is a moot point and we can just fall back to instantiating the gzip library to check

BREAKING CHANGE: When sourcing from compressed files, ignored_header_bytes will no longer look at the compressed file's bytes, which would include the magic bytes for the compression header. Instead, it will ignore the bytes from the uncompressed content. Similarly, lines will no longer look for new line delimiters in the compressed content, but the uncompressed content. Arguably, both of these current mechanisms as a bug, as compressed content would not have any explicit lines or intentional header aside from the magic bytes.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

Unit tests

cargo test --package file-source --lib -- fingerprinter::test --show-output

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Checklist

References

@bits-bot
Copy link

bits-bot commented Dec 18, 2024

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added the domain: external docs Anything related to Vector's external, public documentation label Dec 18, 2024
@roykim98 roykim98 force-pushed the roy/fingerprint-rotation branch from ccdfe60 to ae39cc0 Compare December 18, 2024 06:32
@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Dec 18, 2024
@roykim98 roykim98 marked this pull request as ready for review December 18, 2024 07:12
@roykim98 roykim98 requested review from a team as code owners December 18, 2024 07:12
Copy link
Member

@jszwedko jszwedko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @roykim98 ! I left a couple of comments about error handling, but overall this approach seems good to me. I appreciate the added tests.

let mut magic = vec![0u8; magic_header_bytes.len()];

if fp.read_exact(&mut magic).is_ok()
&& fp.seek(SeekFrom::Start(0)).is_ok()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we always need to seek back even if the read_exact failed, no?

Also, if the seek fails, I think we should return an error (i.e. change the function signature here to return a Result) since the file pointer will be in an invalid state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point. Will change.

let magic_header_bytes = compression_algorithm.magic_header_bytes();
let mut magic = vec![0u8; magic_header_bytes.len()];

if fp.read_exact(&mut magic).is_ok()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails, I think it could be due to one of two reasons:

  • Not enough bytes
  • Read interrupted

If the read is interrupted, I think we likely want to return an error since we weren't able to detect whether the file is compressed.

If there aren't enough bytes, we probably want to try again later since even uncompressed files, I think, should be a minimum of two bytes (a character and a newline) 🤔 The risk is that the file is actually compressed, there just aren't enough bytes to determine that yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors from read_exact are documented here: https://doc.rust-lang.org/std/io/trait.Read.html#method.read_exact

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jszwedko , I appreciate the review. I think I follow most of your points

I guess it would make sense most to:

  • Check each magic bytes header in the file. If find one that matches an algorithm, then short-circuit and return the corresponding enum
  • If all header checks fail, it means that this file is either:
    • Uncompressed, in which case we should return None, so we can fingerprint as-is
    • Using an unsupported compression algorithm, in which case we should return None, so we can fingerprint as-is
    • Using a supported compression algorithm, but the read was interrupted--return an error if read_exact fails for [ErrorKind::Interrupted].
    • Using a supported compression algorithm, but there aren't enough bytes to determine that yet
      • So in this scenario, I guess you're suggesting this is some kind of compressed stream that doesn't have a header written to it yet, and we should throw an error, as this would manifest as [ErrorKind::UnexpectedEof].
      • There is one tiny edge case here that will cause a potentially valid assessment to instead fail, which is if the compression algorithm we're checking happens to have a bigger header than the intended compression algorithm. e.g.) zstd's magic bytes are 4 bytes, assume the file is 3 bytes gzipped (gzip headers are 2 bytes, 1 byte of compressed content). If we throw an error at zstd before checking gzip, then we'd needlessly try again.
      • I guess this can be prevented by deterministically checking from smallest compression algorithms to the largest.

Lastly:

we probably want to try again later

I presume this is covered up the stack if fingerprinting fails? I can investigate myself, but if you happen to have the answer handy, it'd be helpful

@roykim98
Copy link
Author

Made some final tests here:

2024-12-20T03:03:53.093248Z  INFO vector::app: Loading configs. paths=["/Users/roy/vector.yaml"]
2024-12-20T03:03:53.322430Z  INFO vector::topology::running: Running healthchecks.
2024-12-20T03:03:53.324317Z  INFO vector::topology::builder: Healthcheck passed.
2024-12-20T03:03:53.326881Z  INFO vector: Vector has started. debug="true" version="0.44.0" arch="aarch64" revision=""
2024-12-20T03:03:53.327409Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2024-12-20T03:03:53.327408Z  INFO source{component_kind="source" component_id=file component_type=file}: vector::sources::file: Starting file server. include=["/Users/roy/sample-data/*.log*"] exclude=[]
2024-12-20T03:03:53.331783Z  INFO source{component_kind="source" component_id=file component_type=file}:file_server: file_source::checkpointer: Attempting to read legacy checkpoint files.
2024-12-20T03:04:30.271335Z  INFO source{component_kind="source" component_id=file component_type=file}:file_server: vector::internal_events::file::source: Found new file to watch. file=/Users/roy/sample-data/sample.log
{"file":"/Users/roy/sample-data/sample.log","message":"this is a test","source_type":"file","timestamp":"2024-12-20T03:04:30.273128Z"}
{"file":"/Users/roy/sample-data/sample.log","message":"this is a second line test","source_type":"file","timestamp":"2024-12-20T03:04:30.273833Z"}
{"file":"/Users/roy/sample-data/sample.log","message":"this is a third line test","source_type":"file","timestamp":"2024-12-20T03:04:30.273962Z"}
2024-12-20T03:04:52.859856Z  INFO source{component_kind="source" component_id=file component_type=file}:file_server: file_source::file_server: Watched file has been renamed. path="/Users/roy/sample-data/sample.log.gz" old_path="/Users/roy/sample-data/sample.log"
2024-12-20T03:05:19.523732Z  INFO source{component_kind="source" component_id=file component_type=file}:file_server: file_source::file_server: More than one file has the same fingerprint. path="/Users/roy/sample-data/test.log" old_path="/Users/roy/sample-data/sample.log.gz"
2024-12-20T03:05:19.524053Z  INFO source{component_kind="source" component_id=file component_type=file}:file_server: file_source::file_server: Switching to watch most recently modified file. new_modified_time=SystemTime { tv_sec: 1734663918, tv_nsec: 74963277 } old_modified_time=SystemTime { tv_sec: 1734663869, tv_nsec: 255448000 }
2024-12-20T03:05:21.576312Z  INFO source{component_kind="source" component_id=file component_type=file}:file_server: file_source::file_server: Watched file has been renamed. path="/Users/roy/sample-data/sample.log.gz" old_path="/Users/roy/sample-data/test.log"

Made a 3-line file called sample.log

this is a test
this is a second line test
this is a third line test

Loaded them up in the source, and then compressed it to a new inode, sample.log.gz

Created a similarly named file test.log

this is a test
this is a different test

All had the same fingerprint

sources:
  file:
    type: file
    include: 
      - /Users/roy/sample-data/*.log*
    fingerprint:
      strategy: checksum
      ignored_header_bytes: 0
      lines: 1
    ignore_checkpoints: true  # just so testing is easily repeatable
    data_dir: data/  # make sure this directory exists where you run the pipeline
sinks:
  console:
    type: console
    inputs: ["file"]
    encoding:
      codec: json

@roykim98 roykim98 requested a review from jszwedko December 20, 2024 03:09
Signed-off-by: Jesse Szwedko <[email protected]>
Copy link
Member

@jszwedko jszwedko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @roykim98 ! I refactored the check function, but otherwise this looks good to me! I appreciate the thorough testing.

@jszwedko jszwedko enabled auto-merge December 20, 2024 22:47
Comment on lines +106 to +107
fp.seek(SeekFrom::Start(0))?;
fp.read_exact(&mut magic)?;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jszwedko For my own understanding, wouldn't you still want to reset the position in the event that read_exact fails? That's sort of why I did the round-about way of checking the errors after executing both functions instead of using the syntactic sugar for ?

auto-merge was automatically disabled December 21, 2024 00:49

Head branch was pushed to by a user without write access

@roykim98
Copy link
Author

One of the workflows was failing due to some whitespaces in the *.cue files, which I just removed, but it revoked the automerge label and build checks. Looks like you may have to re-add the automerge label, sorry for the inconvenience @jszwedko .

Ran make check-fmt locally, it passes now.

@roykim98 roykim98 requested a review from jszwedko December 21, 2024 00:54
@roykim98
Copy link
Author

# Run the Clippy linter to catch common mistakes.
cargo vdev check rust --clippy
# Ensure all code is properly formatted. Code can be run through `rustfmt` using `cargo fmt` to ensure it is properly formatted.
cargo vdev check fmt
# Ensure the internal metrics that Vector emits conform to standards.
cargo vdev check events
# Ensure the `LICENSE-3rdparty.csv` file is up to date with the licenses each of Vector's dependencies are published under.
cargo vdev check licenses

These additional checks pass locally.

@roykim98
Copy link
Author

make check-component-docs
cd rust-doc && make docs

Also ran these

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: external docs Anything related to Vector's external, public documentation domain: sources Anything related to the Vector's sources
Projects
None yet
Development

Successfully merging this pull request may close these issues.

file source: checksum fingerprint is not correct with gzipped files
4 participants