diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index c4be1156766..6da71e136cf 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "io" + "io/fs" "math" "os" "path/filepath" @@ -65,6 +66,10 @@ func (e errCompactionInProgress) Error() string { return "compaction in progress" } +func (e errCompactionInProgress) Unwrap() error { + return e.err +} + type errCompactionAborted struct { err error } @@ -1051,6 +1056,7 @@ func (c *Compactor) removeTmpFiles(files []string) error { func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) { // These are the new TSM files written var files []string + var eInProgress errCompactionInProgress for { sequence++ @@ -1060,15 +1066,15 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName)) // Write as much as possible to this file - err := c.write(fileName, iter, throttle, logger) + rollToNext, err := c.write(fileName, iter, throttle, logger) - // We've hit the max file limit and there is more to write. Create a new file - // and continue. - if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded { + if rollToNext { + // We've hit the max file limit and there is more to write. Create a new file + // and continue. files = append(files, fileName) logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName)) continue - } else if err == ErrNoValues { + } else if errors.Is(err, ErrNoValues) { logger.Debug("Dropping empty file", zap.String("output_file", fileName)) // If the file only contained tombstoned entries, then it would be a 0 length // file that we can drop. @@ -1076,9 +1082,14 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K return nil, err } break - } else if _, ok := err.(errCompactionInProgress); ok { - // Don't clean up the file as another compaction is using it. This should not happen as the - // planner keeps track of which files are assigned to compaction plans now. + } else if errors.As(err, &eInProgress) { + if !errors.Is(eInProgress.err, fs.ErrExist) { + logger.Error("error creating compaction file", zap.String("output_file", fileName), zap.Error(err)) + } else { + // Don't clean up the file as another compaction is using it. This should not happen as the + // planner keeps track of which files are assigned to compaction plans now. + logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName)) + } return nil, err } else if err != nil { // We hit an error and didn't finish the compaction. Abort. @@ -1100,10 +1111,10 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K return files, nil } -func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) { +func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (rollToNext bool, err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { - return errCompactionInProgress{err: err} + return false, errCompactionInProgress{err: err} } // syncingWriter ensures that whatever we wrap the above file descriptor in @@ -1128,33 +1139,31 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * // in memory. if iter.EstimatedIndexSize() > 64*1024*1024 { w, err = NewTSMWriterWithDiskBuffer(limitWriter) - if err != nil { - return err - } } else { w, err = NewTSMWriter(limitWriter) - if err != nil { - return err - } } - + if err != nil { + // Close the file and return if we can't create the TSMWriter + return false, errors.Join(err, fd.Close()) + } defer func() { + var eInProgress errCompactionInProgress + + errs := make([]error, 0, 3) + errs = append(errs, err) closeErr := w.Close() - if err == nil { - err = closeErr - } + errs = append(errs, closeErr) - // Check for errors where we should not remove the file - _, inProgress := err.(errCompactionInProgress) - maxBlocks := err == ErrMaxBlocksExceeded - maxFileSize := err == errMaxFileExceeded - if inProgress || maxBlocks || maxFileSize { + // Check for conditions where we should not remove the file + inProgress := errors.As(err, &eInProgress) && errors.Is(eInProgress.err, fs.ErrExist) + if (closeErr == nil) && (inProgress || rollToNext) { + // do not join errors, there is only the one. return + } else if err != nil || closeErr != nil { + // Remove the file, we have had a problem + errs = append(errs, w.Remove()) } - - if err != nil { - _ = w.Remove() - } + err = errors.Join(errs...) }() lastLogSize := w.Size() @@ -1164,38 +1173,38 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * c.mu.RUnlock() if !enabled { - return errCompactionAborted{} + return false, errCompactionAborted{} } // Each call to read returns the next sorted key (or the prior one if there are // more values to write). The size of values will be less than or equal to our // chunk size (1000) key, minTime, maxTime, block, err := iter.Read() if err != nil { - return err + return false, err } if minTime > maxTime { - return fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime) + return false, fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime) } // Write the key and value - if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded { + if err := w.WriteBlock(key, minTime, maxTime, block); errors.Is(err, ErrMaxBlocksExceeded) { if err := w.WriteIndex(); err != nil { - return err + return false, err } - return err + return true, err } else if err != nil { - return err + return false, err } - // If we have a max file size configured and we're over it, close out the file + // If we're over maxTSMFileSize, close out the file // and return the error. if w.Size() > maxTSMFileSize { if err := w.WriteIndex(); err != nil { - return err + return false, err } - return errMaxFileExceeded + return true, errMaxFileExceeded } else if (w.Size() - lastLogSize) > logEvery { logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size())) lastLogSize = w.Size() @@ -1204,15 +1213,15 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * // Were there any errors encountered during iteration? if err := iter.Err(); err != nil { - return err + return false, err } // We're all done. Close out the file. if err := w.WriteIndex(); err != nil { - return err + return false, err } logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size())) - return nil + return false, nil } func (c *Compactor) add(files []string) bool { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index c90beb6b1cf..ccfa6d4f469 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1,7 +1,9 @@ package tsm1_test import ( + "errors" "fmt" + "io/fs" "math" "os" "path/filepath" @@ -12,6 +14,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "github.com/stretchr/testify/assert" "go.uber.org/zap" ) @@ -113,11 +116,11 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) { } f2 := MustWriteTSM(dir, 2, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Open() files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop()) @@ -170,11 +173,11 @@ func TestCompactor_CompactFull(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err == nil { @@ -280,11 +283,11 @@ func TestCompactor_DecodeError(t *testing.T) { f.WriteAt([]byte("ffff"), 10) // skip over header f.Close() - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err == nil { @@ -326,11 +329,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -406,11 +409,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -620,11 +623,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -722,11 +725,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -825,11 +828,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -933,11 +936,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -1049,11 +1052,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { } f2.Close() - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Open() // Compact both files, should get 2 files back @@ -1086,6 +1089,61 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { } } +func TestCompactor_CompactFull_InProgress(t *testing.T) { + // This test creates a lot of data and causes timeout failures for these envs + if testing.Short() || os.Getenv("CI") != "" || os.Getenv("GORACE") != "" { + t.Skip("Skipping in progress compaction test") + } + dir := MustTempDir() + defer os.RemoveAll(dir) + + f2Name := func() string { + values := make([]tsm1.Value, 1000) + + // Write a new file with 2 blocks + f2, f2Name := MustTSMWriter(dir, 2) + defer func() { + assert.NoError(t, f2.Close(), "closing TSM file %s", f2Name) + }() + for i := 0; i < 2; i++ { + values = values[:0] + for j := 0; j < 1000; j++ { + values = append(values, tsm1.NewValue(int64(i*1000+j), int64(1))) + } + assert.NoError(t, f2.Write([]byte("cpu,host=A#!~#value"), values), "writing TSM file: %s", f2Name) + } + assert.NoError(t, f2.WriteIndex(), "writing TSM file index for %s", f2Name) + return f2Name + }() + ffs := &fakeFileStore{} + defer ffs.Close() + compactor := tsm1.NewCompactor() + compactor.Dir = dir + compactor.FileStore = ffs + compactor.Open() + + expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name) + assert.NoError(t, err, "unexpected error parsing file name %s", f2Name) + expSeq = expSeq + 1 + + fileName := filepath.Join(compactor.Dir, tsm1.DefaultFormatFileName(expGen, expSeq)+"."+tsm1.TSMFileExtension+"."+tsm1.TmpTSMFileExtension) + + // Create a temp file to simulate an in progress compaction + f, err := os.Create(fileName) + assert.NoError(t, err, "creating in-progress compaction file %s", fileName) + defer func() { + assert.NoError(t, f.Close(), "closing in-progress compaction file %s", fileName) + }() + _, err = compactor.CompactFull([]string{f2Name}, zap.NewNop()) + assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name) + e := errors.Unwrap(err) + assert.NotNil(t, e, "expected an error wrapped by errCompactionInProgress") + assert.Truef(t, errors.Is(e, fs.ErrExist), "error did not indicate file existence: %v", e) + pathErr := &os.PathError{} + assert.Truef(t, errors.As(e, &pathErr), "expected path error, got %v", e) + assert.Truef(t, errors.Is(pathErr, fs.ErrExist), "error did not indicate file existence: %v", pathErr) +} + func newTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*tsm1.TSMReader) (tsm1.KeyIterator, error) { files := []string{} for _, r := range readers { @@ -2529,14 +2587,14 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { }, } - fs := &fakeFileStore{ + ffs := &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return testSet }, blockCount: 1000, } - cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond) + cp := tsm1.NewDefaultPlanner(ffs, time.Nanosecond) plan, pLen := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files if exp, got := 4, len(plan[0]); got != exp { @@ -2595,9 +2653,9 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp.Release(plan) - cp.FileStore = fs + cp.FileStore = ffs // ensure that it will plan if last modified has changed - fs.lastModified = time.Now() + ffs.lastModified = time.Now() cGroups, pLen := cp.Plan(time.Now()) if exp, got := 4, len(cGroups[0]); got != exp { @@ -2693,7 +2751,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { }, } - fs := &fakeFileStore{ + ffs := &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return testSet }, @@ -2701,7 +2759,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - fs, + ffs, time.Nanosecond, ) diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index 4784dc6d558..70f0fd77e22 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -66,6 +66,7 @@ import ( "bufio" "bytes" "encoding/binary" + "errors" "fmt" "hash/crc32" "io" @@ -511,19 +512,15 @@ func (d *directIndex) Size() uint32 { } func (d *directIndex) Close() error { + errs := make([]error, 0, 3) // Flush anything remaining in the index - if err := d.w.Flush(); err != nil { - return err - } - - if d.fd == nil { - return nil + errs = append(errs, d.w.Flush()) + if d.fd != nil { + // Close and remove the temporary index file + errs = append(errs, d.fd.Close()) + errs = append(errs, os.Remove(d.fd.Name())) } - - if err := d.fd.Close(); err != nil { - return err - } - return os.Remove(d.fd.Name()) + return errors.Join(errs...) } // Remove removes the index from any tempory storage @@ -532,11 +529,14 @@ func (d *directIndex) Remove() error { return nil } - // Close the file handle to prevent leaking. We ignore the error because - // we just want to cleanup and remove the file. - _ = d.fd.Close() - - return os.Remove(d.fd.Name()) + errs := make([]error, 0, 2) + // Close the file handle to prevent leaking. + // We don't let an error stop the removal. + if err := d.fd.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + errs = append(errs, err) + } + errs = append(errs, os.Remove(d.fd.Name())) + return errors.Join(errs...) } // tsmWriter writes keys and values in the TSM format @@ -756,25 +756,19 @@ func (t *tsmWriter) sync() error { } func (t *tsmWriter) Close() error { - if err := t.Flush(); err != nil { - return err - } - - if err := t.index.Close(); err != nil { - return err - } - + errs := make([]error, 0, 3) + errs = append(errs, t.Flush()) + errs = append(errs, t.index.Close()) if c, ok := t.wrapped.(io.Closer); ok { - return c.Close() + errs = append(errs, c.Close()) } - return nil + return errors.Join(errs...) } // Remove removes any temporary storage used by the writer. func (t *tsmWriter) Remove() error { - if err := t.index.Remove(); err != nil { - return err - } + errs := make([]error, 0, 3) + errs = append(errs, t.index.Remove()) // nameCloser is the most permissive interface we can close the wrapped // value with. @@ -783,14 +777,16 @@ func (t *tsmWriter) Remove() error { Name() string } + // If the writer is not a memory buffer, we can remove the file. if f, ok := t.wrapped.(nameCloser); ok { - // Close the file handle to prevent leaking. We ignore the error because - // we just want to cleanup and remove the file. - _ = f.Close() - - return os.Remove(f.Name()) + // Close the file handle to prevent leaking. + if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + errs = append(errs, err) + } + // Remove the file + errs = append(errs, os.Remove(f.Name())) } - return nil + return errors.Join(errs...) } func (t *tsmWriter) Size() uint32 {