Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not leak file handles from Compactor.write #25725

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 50 additions & 42 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (e errCompactionInProgress) Error() string {
return "compaction in progress"
}

func (e errCompactionInProgress) Unwrap() error {
return e.err
}

type errCompactionAborted struct {
err error
}
Expand Down Expand Up @@ -1051,6 +1055,7 @@ func (c *Compactor) removeTmpFiles(files []string) error {
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) {
// These are the new TSM files written
var files []string
var eInProgress errCompactionInProgress

for {
sequence++
Expand All @@ -1060,25 +1065,30 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName))

// Write as much as possible to this file
err := c.write(fileName, iter, throttle, logger)
rollToNext, err := c.write(fileName, iter, throttle, logger)

// We've hit the max file limit and there is more to write. Create a new file
// and continue.
if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
if rollToNext {
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
files = append(files, fileName)
logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName))
continue
} else if err == ErrNoValues {
} else if errors.Is(err, ErrNoValues) {
logger.Debug("Dropping empty file", zap.String("output_file", fileName))
// If the file only contained tombstoned entries, then it would be a 0 length
// file that we can drop.
if err := os.RemoveAll(fileName); err != nil {
return nil, err
}
break
} else if _, ok := err.(errCompactionInProgress); ok {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
} else if errors.As(err, &eInProgress) {
if !os.IsExist(eInProgress.err) {
gwossum marked this conversation as resolved.
Show resolved Hide resolved
logger.Error("error creating compaction file", zap.String("output_file", fileName), zap.Error(err))
} else {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName))
gwossum marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, err
} else if err != nil {
// We hit an error and didn't finish the compaction. Abort.
Expand All @@ -1100,10 +1110,10 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
return files, nil
}

func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) {
func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (rollToNext bool, err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
return false, errCompactionInProgress{err: err}
}

// syncingWriter ensures that whatever we wrap the above file descriptor in
Expand All @@ -1128,33 +1138,31 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
// in memory.
if iter.EstimatedIndexSize() > 64*1024*1024 {
w, err = NewTSMWriterWithDiskBuffer(limitWriter)
if err != nil {
return err
}
} else {
w, err = NewTSMWriter(limitWriter)
if err != nil {
return err
}
}

if err != nil {
// Close the file and return if we can't create the TSMWriter
return false, errors.Join(err, fd.Close())
}
defer func() {
var eInProgress errCompactionInProgress

errs := make([]error, 0, 3)
errs = append(errs, err)
closeErr := w.Close()
if err == nil {
err = closeErr
}
errs = append(errs, closeErr)

// Check for errors where we should not remove the file
_, inProgress := err.(errCompactionInProgress)
maxBlocks := err == ErrMaxBlocksExceeded
maxFileSize := err == errMaxFileExceeded
if inProgress || maxBlocks || maxFileSize {
// Check for conditions where we should not remove the file
inProgress := errors.As(err, &eInProgress) && os.IsExist(eInProgress.err)
if (closeErr == nil) && (inProgress || rollToNext) {
// do not join errors, there is only the one.
return
} else if err != nil || closeErr != nil {
// Remove the file, we have had a problem
errs = append(errs, w.Remove())
}

if err != nil {
_ = w.Remove()
}
err = errors.Join(errs...)
}()

lastLogSize := w.Size()
Expand All @@ -1164,38 +1172,38 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
c.mu.RUnlock()

if !enabled {
return errCompactionAborted{}
return false, errCompactionAborted{}
}
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our
// chunk size (1000)
key, minTime, maxTime, block, err := iter.Read()
if err != nil {
return err
return false, err
}

if minTime > maxTime {
return fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
return false, fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
}

// Write the key and value
if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded {
if err := w.WriteBlock(key, minTime, maxTime, block); errors.Is(err, ErrMaxBlocksExceeded) {
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
return err
return true, err
} else if err != nil {
return err
return false, err
}

// If we have a max file size configured and we're over it, close out the file
// If we're over maxTSMFileSize, close out the file
// and return the error.
if w.Size() > maxTSMFileSize {
if err := w.WriteIndex(); err != nil {
return err
return false, err
}

return errMaxFileExceeded
return true, errMaxFileExceeded
} else if (w.Size() - lastLogSize) > logEvery {
logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size()))
lastLogSize = w.Size()
Expand All @@ -1204,15 +1212,15 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *

// Were there any errors encountered during iteration?
if err := iter.Err(); err != nil {
return err
return false, err
}

// We're all done. Close out the file.
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size()))
return nil
return false, nil
}

func (c *Compactor) add(files []string) bool {
Expand Down
57 changes: 57 additions & 0 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsm1_test

import (
"errors"
"fmt"
"math"
"os"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1086,6 +1088,61 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
}
}

func TestCompactor_CompactFull_InProgress(t *testing.T) {
// This test creates a lot of data and causes timeout failures for these envs
if testing.Short() || os.Getenv("CI") != "" || os.Getenv("GORACE") != "" {
t.Skip("Skipping in progress compaction test")
}
dir := MustTempDir()
defer os.RemoveAll(dir)

f2Name := func() string {
values := make([]tsm1.Value, 1000)

// Write a new file with 2 blocks
f2, f2Name := MustTSMWriter(dir, 2)
defer func() {
assert.NoError(t, f2.Close(), "closing TSM file %s", f2Name)
}()
for i := 0; i < 2; i++ {
values = values[:0]
for j := 0; j < 1000; j++ {
values = append(values, tsm1.NewValue(int64(i*1000+j), int64(1)))
}
assert.NoError(t, f2.Write([]byte("cpu,host=A#!~#value"), values), "writing TSM file: %s", f2Name)
}
assert.NoError(t, f2.WriteIndex(), "writing TSM file index for %s", f2Name)
return f2Name
}()
fs := &fakeFileStore{}
defer fs.Close()
compactor := tsm1.NewCompactor()
compactor.Dir = dir
compactor.FileStore = fs
compactor.Open()

expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name)
assert.NoError(t, err, "unexpected error parsing file name %s", f2Name)
expSeq = expSeq + 1

fileName := filepath.Join(compactor.Dir, tsm1.DefaultFormatFileName(expGen, expSeq)+"."+tsm1.TSMFileExtension+"."+tsm1.TmpTSMFileExtension)

// Create a temp file to simulate an in progress compaction
f, err := os.Create(fileName)
assert.NoError(t, err, "creating in-progress compaction file %s", fileName)
defer func() {
assert.NoError(t, f.Close(), "closing in-progress compaction file %s", fileName)
}()
_, err = compactor.CompactFull([]string{f2Name}, zap.NewNop())
assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name)
e := errors.Unwrap(err)
assert.NotNil(t, e, "expected an error wrapped by errCompactionInProgress")
assert.Truef(t, os.IsExist(e), "error did not indicate file existence: %v", e)
pathErr := &os.PathError{}
assert.Truef(t, errors.As(e, &pathErr), "expected path error, got %v", e)
assert.Truef(t, os.IsExist(pathErr), "error did not indicate file existence: %v", pathErr)
}

func newTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*tsm1.TSMReader) (tsm1.KeyIterator, error) {
files := []string{}
for _, r := range readers {
Expand Down
66 changes: 31 additions & 35 deletions tsdb/engine/tsm1/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
Expand Down Expand Up @@ -511,19 +512,15 @@ func (d *directIndex) Size() uint32 {
}

func (d *directIndex) Close() error {
errs := make([]error, 0, 3)
// Flush anything remaining in the index
if err := d.w.Flush(); err != nil {
return err
}

if d.fd == nil {
return nil
errs = append(errs, d.w.Flush())
if d.fd != nil {
// Close and remove the temporary index file
errs = append(errs, d.fd.Close())
errs = append(errs, os.Remove(d.fd.Name()))
}

if err := d.fd.Close(); err != nil {
return err
}
return os.Remove(d.fd.Name())
return errors.Join(errs...)
}

// Remove removes the index from any tempory storage
Expand All @@ -532,11 +529,14 @@ func (d *directIndex) Remove() error {
return nil
}

// Close the file handle to prevent leaking. We ignore the error because
// we just want to cleanup and remove the file.
_ = d.fd.Close()

return os.Remove(d.fd.Name())
errs := make([]error, 0, 2)
// Close the file handle to prevent leaking.
// We don't let an error stop the removal.
if err := d.fd.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
errs = append(errs, err)
}
errs = append(errs, os.Remove(d.fd.Name()))
return errors.Join(errs...)
}

// tsmWriter writes keys and values in the TSM format
Expand Down Expand Up @@ -756,25 +756,19 @@ func (t *tsmWriter) sync() error {
}

func (t *tsmWriter) Close() error {
if err := t.Flush(); err != nil {
return err
}

if err := t.index.Close(); err != nil {
return err
}

errs := make([]error, 0, 3)
errs = append(errs, t.Flush())
errs = append(errs, t.index.Close())
if c, ok := t.wrapped.(io.Closer); ok {
return c.Close()
errs = append(errs, c.Close())
}
return nil
return errors.Join(errs...)
}

// Remove removes any temporary storage used by the writer.
func (t *tsmWriter) Remove() error {
if err := t.index.Remove(); err != nil {
return err
}
errs := make([]error, 0, 3)
errs = append(errs, t.index.Remove())

// nameCloser is the most permissive interface we can close the wrapped
// value with.
Expand All @@ -783,14 +777,16 @@ func (t *tsmWriter) Remove() error {
Name() string
}

// If the writer is not a memory buffer, we can remove the file.
if f, ok := t.wrapped.(nameCloser); ok {
// Close the file handle to prevent leaking. We ignore the error because
// we just want to cleanup and remove the file.
_ = f.Close()

return os.Remove(f.Name())
// Close the file handle to prevent leaking.
if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
errs = append(errs, err)
}
// Remove the file
errs = append(errs, os.Remove(f.Name()))
}
return nil
return errors.Join(errs...)
}

func (t *tsmWriter) Size() uint32 {
Expand Down
Loading