Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not leak file handles from Compactor.write #25725

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,11 +1064,11 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K

// We've hit the max file limit and there is more to write. Create a new file
// and continue.
if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
if errors.Is(err, errMaxFileExceeded) || errors.Is(err, ErrMaxBlocksExceeded) {
Copy link

@devanbenz devanbenz Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it matters but since you are joining errors and then checking with errors.Is if there are multiple errors joined and errMaxFileExceed or ErrMaxBlocksExceeded is in the errors this will pass the test.

See the following example where I check for a single error. The function that returns two errors combined still passes the test.
https://goplay.tools/snippet/b_YVLCYn5-j

I'm sure thats what you were expecting but just something I noticed with errors.Is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the original code discards file closing errors on errMaxFileExceededor or ErrMaxBlocksExceeded, behavior that using errors.Is essentially replicates because of the feature you mention above, on reflection I think we should report and abort compaction on file closing errors, which will require an additional commit to this PR.

files = append(files, fileName)
logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName))
continue
} else if err == ErrNoValues {
} else if errors.Is(err, ErrNoValues) {
logger.Debug("Dropping empty file", zap.String("output_file", fileName))
// If the file only contained tombstoned entries, then it would be a 0 length
// file that we can drop.
Expand Down Expand Up @@ -1128,33 +1128,34 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
// in memory.
if iter.EstimatedIndexSize() > 64*1024*1024 {
w, err = NewTSMWriterWithDiskBuffer(limitWriter)
if err != nil {
return err
}
} else {
w, err = NewTSMWriter(limitWriter)
if err != nil {
return err
}
}

if err != nil {
// Close the file and return if we can't create the TSMWriter
return errors.Join(err, fd.Close())
}
defer func() {
errs := make([]error, 0, 2)
errs = append(errs, err)
closeErr := w.Close()
if err == nil {
// Save closeErr as err for later checks
err = closeErr
}
gwossum marked this conversation as resolved.
Show resolved Hide resolved
errs = append(errs, closeErr)

// Check for errors where we should not remove the file
_, inProgress := err.(errCompactionInProgress)
maxBlocks := err == ErrMaxBlocksExceeded
maxFileSize := err == errMaxFileExceeded
maxBlocks := errors.Is(err, ErrMaxBlocksExceeded)
maxFileSize := errors.Is(err, errMaxFileExceeded)
if inProgress || maxBlocks || maxFileSize {
err = errors.Join(errs...)
return
} else if err != nil {
errs = append(errs, w.Remove())
}

if err != nil {
_ = w.Remove()
}
err = errors.Join(errs...)
}()

lastLogSize := w.Size()
Expand All @@ -1179,7 +1180,7 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
}

// Write the key and value
if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded {
if err := w.WriteBlock(key, minTime, maxTime, block); errors.Is(err, ErrMaxBlocksExceeded) {
if err := w.WriteIndex(); err != nil {
return err
}
Expand All @@ -1188,7 +1189,7 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
return err
}

// If we have a max file size configured and we're over it, close out the file
// If we're over maxTSMFileSize, close out the file
// and return the error.
if w.Size() > maxTSMFileSize {
if err := w.WriteIndex(); err != nil {
Expand Down
66 changes: 31 additions & 35 deletions tsdb/engine/tsm1/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
Expand Down Expand Up @@ -511,19 +512,15 @@ func (d *directIndex) Size() uint32 {
}

func (d *directIndex) Close() error {
errs := make([]error, 0, 3)
// Flush anything remaining in the index
if err := d.w.Flush(); err != nil {
return err
}

if d.fd == nil {
return nil
errs = append(errs, d.w.Flush())
if d.fd != nil {
// Close and remove the temporary index file
errs = append(errs, d.fd.Close())
errs = append(errs, os.Remove(d.fd.Name()))
}

if err := d.fd.Close(); err != nil {
return err
}
return os.Remove(d.fd.Name())
return errors.Join(errs...)
}

// Remove removes the index from any tempory storage
Expand All @@ -532,11 +529,14 @@ func (d *directIndex) Remove() error {
return nil
}

// Close the file handle to prevent leaking. We ignore the error because
// we just want to cleanup and remove the file.
_ = d.fd.Close()

return os.Remove(d.fd.Name())
errs := make([]error, 0, 2)
// Close the file handle to prevent leaking.
// We don't let an error stop the removal.
if err := d.fd.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
errs = append(errs, err)
}
errs = append(errs, os.Remove(d.fd.Name()))
return errors.Join(errs...)
}

// tsmWriter writes keys and values in the TSM format
Expand Down Expand Up @@ -756,25 +756,19 @@ func (t *tsmWriter) sync() error {
}

func (t *tsmWriter) Close() error {
if err := t.Flush(); err != nil {
return err
}

if err := t.index.Close(); err != nil {
return err
}

errs := make([]error, 0, 3)
errs = append(errs, t.Flush())
errs = append(errs, t.index.Close())
if c, ok := t.wrapped.(io.Closer); ok {
return c.Close()
errs = append(errs, c.Close())
}
return nil
return errors.Join(errs...)
}

// Remove removes any temporary storage used by the writer.
func (t *tsmWriter) Remove() error {
if err := t.index.Remove(); err != nil {
return err
}
errs := make([]error, 0, 3)
errs = append(errs, t.index.Remove())

// nameCloser is the most permissive interface we can close the wrapped
// value with.
Expand All @@ -783,14 +777,16 @@ func (t *tsmWriter) Remove() error {
Name() string
}

// If the writer is not a memory buffer, we can remove the file.
if f, ok := t.wrapped.(nameCloser); ok {
// Close the file handle to prevent leaking. We ignore the error because
// we just want to cleanup and remove the file.
_ = f.Close()

return os.Remove(f.Name())
// Close the file handle to prevent leaking.
if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
errs = append(errs, err)
}
// Remove the file
errs = append(errs, os.Remove(f.Name()))
}
return nil
return errors.Join(errs...)
}

func (t *tsmWriter) Size() uint32 {
Expand Down
Loading