Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not leak file handles from Compactor.write #25725

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 51 additions & 42 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -65,6 +66,10 @@ func (e errCompactionInProgress) Error() string {
return "compaction in progress"
}

func (e errCompactionInProgress) Unwrap() error {
return e.err
}

type errCompactionAborted struct {
err error
}
Expand Down Expand Up @@ -1051,6 +1056,7 @@ func (c *Compactor) removeTmpFiles(files []string) error {
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) {
// These are the new TSM files written
var files []string
var eInProgress errCompactionInProgress

for {
sequence++
Expand All @@ -1060,25 +1066,30 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName))

// Write as much as possible to this file
err := c.write(fileName, iter, throttle, logger)
rollToNext, err := c.write(fileName, iter, throttle, logger)

// We've hit the max file limit and there is more to write. Create a new file
// and continue.
if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
if rollToNext {
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
files = append(files, fileName)
logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName))
continue
} else if err == ErrNoValues {
} else if errors.Is(err, ErrNoValues) {
logger.Debug("Dropping empty file", zap.String("output_file", fileName))
// If the file only contained tombstoned entries, then it would be a 0 length
// file that we can drop.
if err := os.RemoveAll(fileName); err != nil {
return nil, err
}
break
} else if _, ok := err.(errCompactionInProgress); ok {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
} else if errors.As(err, &eInProgress) {
if !errors.Is(eInProgress.err, fs.ErrExist) {
logger.Error("error creating compaction file", zap.String("output_file", fileName), zap.Error(err))
} else {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName))
gwossum marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, err
} else if err != nil {
// We hit an error and didn't finish the compaction. Abort.
Expand All @@ -1100,10 +1111,10 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
return files, nil
}

func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) {
func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (rollToNext bool, err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
return false, errCompactionInProgress{err: err}
}

// syncingWriter ensures that whatever we wrap the above file descriptor in
Expand All @@ -1128,33 +1139,31 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
// in memory.
if iter.EstimatedIndexSize() > 64*1024*1024 {
w, err = NewTSMWriterWithDiskBuffer(limitWriter)
if err != nil {
return err
}
} else {
w, err = NewTSMWriter(limitWriter)
if err != nil {
return err
}
}

if err != nil {
// Close the file and return if we can't create the TSMWriter
return false, errors.Join(err, fd.Close())
}
defer func() {
var eInProgress errCompactionInProgress

errs := make([]error, 0, 3)
errs = append(errs, err)
closeErr := w.Close()
if err == nil {
err = closeErr
}
errs = append(errs, closeErr)

// Check for errors where we should not remove the file
_, inProgress := err.(errCompactionInProgress)
maxBlocks := err == ErrMaxBlocksExceeded
maxFileSize := err == errMaxFileExceeded
if inProgress || maxBlocks || maxFileSize {
// Check for conditions where we should not remove the file
inProgress := errors.As(err, &eInProgress) && errors.Is(eInProgress.err, fs.ErrExist)
if (closeErr == nil) && (inProgress || rollToNext) {
// do not join errors, there is only the one.
return
} else if err != nil || closeErr != nil {
// Remove the file, we have had a problem
errs = append(errs, w.Remove())
}

if err != nil {
_ = w.Remove()
}
err = errors.Join(errs...)
}()

lastLogSize := w.Size()
Expand All @@ -1164,38 +1173,38 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
c.mu.RUnlock()

if !enabled {
return errCompactionAborted{}
return false, errCompactionAborted{}
}
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our
// chunk size (1000)
key, minTime, maxTime, block, err := iter.Read()
if err != nil {
return err
return false, err
}

if minTime > maxTime {
return fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
return false, fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
}

// Write the key and value
if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded {
if err := w.WriteBlock(key, minTime, maxTime, block); errors.Is(err, ErrMaxBlocksExceeded) {
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
return err
return true, err
} else if err != nil {
return err
return false, err
}

// If we have a max file size configured and we're over it, close out the file
// If we're over maxTSMFileSize, close out the file
// and return the error.
if w.Size() > maxTSMFileSize {
if err := w.WriteIndex(); err != nil {
return err
return false, err
}

return errMaxFileExceeded
return true, errMaxFileExceeded
} else if (w.Size() - lastLogSize) > logEvery {
logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size()))
lastLogSize = w.Size()
Expand All @@ -1204,15 +1213,15 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *

// Were there any errors encountered during iteration?
if err := iter.Err(); err != nil {
return err
return false, err
}

// We're all done. Close out the file.
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size()))
return nil
return false, nil
}

func (c *Compactor) add(files []string) bool {
Expand Down
Loading
Loading