diff --git a/internal/fs/inode/file.go b/internal/fs/inode/file.go index 54015e33d7..32693a60cd 100644 --- a/internal/fs/inode/file.go +++ b/internal/fs/inode/file.go @@ -499,13 +499,13 @@ func (f *FileInode) DeRegisterFileHandle(readOnly bool) { // LOCKS_REQUIRED(f.mu) // UpdateSize updates the size of the backing GCS object. It also calls -// updateMRDWrapper to ensure that the multi-range downloader (which is used +// updateMRD to ensure that the multi-range downloader (which is used // for random reads) is aware of the new size. This prevents the downloader // from operating on stale object information. func (f *FileInode) UpdateSize(size uint64) { f.src.Size = size f.attrs.Size = size - f.updateMRDWrapper() + f.updateMRD() } // LOCKS_REQUIRED(f.mu) @@ -829,7 +829,7 @@ func (f *FileInode) SetMtime( minObj = *minObjPtr } f.src = minObj - f.updateMRDWrapper() + f.updateMRD() return } @@ -989,7 +989,7 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) { if minObj != nil && !f.localFileCache { f.src = *minObj // Update MRDWrapper - f.updateMRDWrapper() + f.updateMRD() // Convert localFile to nonLocalFile after it is synced to GCS. if f.IsLocal() { f.local = false @@ -1001,12 +1001,15 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) { } } -// Updates the min object stored in MRDWrapper corresponding to the inode. +// Updates the min object stored in MRDWrapper & MRDInstance corresponding to the inode. // Should be called when minObject associated with inode is updated. -func (f *FileInode) updateMRDWrapper() { - err := f.MRDWrapper.SetMinObject(f.Source()) - if err != nil { - logger.Errorf("FileInode::updateMRDWrapper Error in setting minObject %v", err) +func (f *FileInode) updateMRD() { + minObj := f.Source() + if err := f.mrdInstance.SetMinObject(minObj); err != nil { + logger.Errorf("FileInode::updateMRD Error in setting minObject for MrdInstance %v", err) + } + if err := f.MRDWrapper.SetMinObject(minObj); err != nil { + logger.Errorf("FileInode::updateMRD Error in setting minObject for MRDWrapper %v", err) } } diff --git a/internal/fs/inode/file_test.go b/internal/fs/inode/file_test.go index 68a2b3820f..adc8b01621 100644 --- a/internal/fs/inode/file_test.go +++ b/internal/fs/inode/file_test.go @@ -172,6 +172,22 @@ func (t *FileTest) createBufferedWriteHandler(shouldInitialize bool, openMode ut } } +func (t *FileTest) validateMrdInstanceMinObject() { + t.T().Helper() + // Validate MinObject in inode and MRDInstance points to different copy of MinObject. + assert.NotSame(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject()) + // Validate MinObject in MRDInstance is equal to the MinObject in inode. + assert.Equal(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject()) +} + +func (t *FileTest) validateMrdWrapperMinObject() { + t.T().Helper() + // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. + assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + // Validate MinObject in MRDWrapper is equal to the MinObject in inode. + assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) +} + //////////////////////////////////////////////////////////////////////// // Tests //////////////////////////////////////////////////////////////////////// @@ -609,10 +625,8 @@ func (t *FileTest) TestWriteThenSync() { // The generation should have advanced. assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object) - // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. - assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) - // Validate MinObject in MRDWrapper is equal to the MinObject in inode. - assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + t.validateMrdWrapperMinObject() + t.validateMrdInstanceMinObject() // Stat the current object in the bucket. statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()} @@ -699,10 +713,8 @@ func (t *FileTest) TestWriteToLocalFileThenSync() { assert.Equal(t.T(), writeTime.UTC().Format(time.RFC3339Nano), m.Metadata["gcsfuse_mtime"]) - // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. - assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) - // Validate MinObject in MRDWrapper is same as the MinObject in inode. - assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + t.validateMrdWrapperMinObject() + t.validateMrdInstanceMinObject() // Read the object's contents. contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName()) require.NoError(t.T(), err) @@ -762,10 +774,8 @@ func (t *FileTest) TestSyncEmptyLocalFile() { assert.Equal(t.T(), t.in.SourceGeneration().Metadata, m.MetaGeneration) assert.Equal(t.T(), t.in.SourceGeneration().Size, m.Size) assert.Equal(t.T(), uint64(0), m.Size) - // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. - assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) - // Validate MinObject in MRDWrapper is equal to the MinObject in inode. - assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + t.validateMrdWrapperMinObject() + t.validateMrdInstanceMinObject() // Validate the mtime. mtimeInBucket, ok := m.Metadata["gcsfuse_mtime"] assert.True(t.T(), ok) @@ -840,10 +850,8 @@ func (t *FileTest) TestAppendThenSync() { assert.Equal(t.T(), writeTime.UTC().Format(time.RFC3339Nano), m.Metadata["gcsfuse_mtime"]) - // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. - assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) - // Validate MinObject in MRDWrapper is equal to the MinObject in inode. - assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + t.validateMrdWrapperMinObject() + t.validateMrdInstanceMinObject() // Read the object's contents. contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName()) @@ -936,10 +944,8 @@ func (t *FileTest) TestTruncateDownwardThenSync() { // The generation should have advanced. assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object) - // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. - assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) - // Validate MinObject in MRDWrapper is equal to the MinObject in inode. - assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + t.validateMrdWrapperMinObject() + t.validateMrdInstanceMinObject() // Stat the current object in the bucket. statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()} @@ -1009,10 +1015,8 @@ func (t *FileTest) TestTruncateUpwardThenFlush() { // The generation should have advanced. assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object) - // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. - assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) - // Validate MinObject in MRDWrapper is equal to the MinObject in inode. - assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + t.validateMrdWrapperMinObject() + t.validateMrdInstanceMinObject() // Stat the current object in the bucket. statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()} @@ -1309,10 +1313,8 @@ func (t *FileTest) TestSyncFlush_Clobbered() { err = t.in.Flush(t.ctx) } - // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. - assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) - // Validate MinObject in MRDWrapper is equal to the MinObject in inode. - assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + t.validateMrdWrapperMinObject() + t.validateMrdInstanceMinObject() // Check if the error is a FileClobberedError var fcErr *gcsfuse_errors.FileClobberedError @@ -1441,10 +1443,8 @@ func (t *FileTest) TestSetMtime_ContentDirty() { statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()} m, _, err := t.bucket.StatObject(t.ctx, statReq) - // Validate MinObject in inode and MRDWrapper points to different copy of MinObject. - assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) - // Validate MinObject in MRDWrapper is equal to the MinObject in inode. - assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject()) + t.validateMrdWrapperMinObject() + t.validateMrdInstanceMinObject() require.NoError(t.T(), err) assert.NotNil(t.T(), m) diff --git a/internal/gcsx/mrd_instance.go b/internal/gcsx/mrd_instance.go index 5c71bea15a..9c5dd2c3de 100644 --- a/internal/gcsx/mrd_instance.go +++ b/internal/gcsx/mrd_instance.go @@ -32,6 +32,8 @@ import ( "github.com/jacobsa/fuse/fuseops" ) +const mrdPoolCloseTimeout = 120 * time.Second + // MrdInstance manages a pool of Multi-Range Downloader (MRD) instances for a // single file inode. It handles the lifecycle of the MRD pool, including // creation, destruction, and caching. @@ -57,15 +59,64 @@ type MrdInstance struct { } // NewMrdInstance creates a new MrdInstance for a given GCS object. -// Passed config should not be nil. Code will panic otherwise. func NewMrdInstance(obj *gcs.MinObject, bucket gcs.Bucket, cache *lru.Cache, inodeId fuseops.InodeID, config *cfg.Config) *MrdInstance { - return &MrdInstance{ - object: obj, + mrdInstance := MrdInstance{ bucket: bucket, mrdCache: cache, inodeId: inodeId, config: config, + object: obj, + } + return &mrdInstance +} + +// SetMinObject sets the gcs.MinObject stored in the MrdInstance to passed value, only if it's non nil. +// If the generation of the object has changed, it recreates the MRD pool to ensure consistency. +func (mi *MrdInstance) SetMinObject(minObj *gcs.MinObject) error { + if minObj == nil { + return fmt.Errorf("MrdInstance::SetMinObject: Missing MinObject") + } + + mi.poolMu.Lock() + defer mi.poolMu.Unlock() + + oldObj := mi.object + // If generation matches, just update the object (e.g. for size updates) and return. + if oldObj != nil && oldObj.Generation == minObj.Generation { + mi.object = minObj + return nil + } + + // Generations differ, need to create and swap a new pool. + if err := mi.createAndSwapPool(minObj); err != nil { + return fmt.Errorf("MrdInstance::SetMinObject: failed to create and swap pool: %w", err) + } + + return nil +} + +// createAndSwapPool creates a new MRD pool and swaps it with the existing one. +// It also updates the minObject with the passed object & closes the old pool. +// LOCKS_REQUIRED(mi.poolMu). +func (mi *MrdInstance) createAndSwapPool(obj *gcs.MinObject) error { + newPool, err := NewMRDPool(&MRDPoolConfig{PoolSize: int(mi.config.Mrd.PoolSize), object: obj, bucket: mi.bucket}, nil) + if err != nil { + return err } + + oldPool := mi.mrdPool + mi.mrdPool = newPool + mi.object = obj + + closePoolWithTimeout(oldPool, "MrdInstance::createAndSwapPool", mrdPoolCloseTimeout) + return nil +} + +// GetMinObject returns the gcs.MinObject stored in MrdInstance. Used only for unit testing. +func (mi *MrdInstance) GetMinObject() *gcs.MinObject { + mi.poolMu.RLock() + defer mi.poolMu.RUnlock() + return mi.object } // getMRDEntry returns a valid MRDEntry from the pool. @@ -182,25 +233,18 @@ func (mi *MrdInstance) ensureMRDPool() (err error) { // file inode when the backing GCS object's generation changes, invalidating // all existing downloader instances. func (mi *MrdInstance) RecreateMRD() error { - // Create the new pool first to avoid a period where mrdPool is nil. - newPool, err := NewMRDPool(&MRDPoolConfig{ - PoolSize: int(mi.config.Mrd.PoolSize), - object: mi.object, - bucket: mi.bucket, - }, nil) - if err != nil { - return fmt.Errorf("MrdInstance::RecreateMRD Error in recreating MRD: %w", err) - } - mi.poolMu.Lock() - oldPool := mi.mrdPool - mi.mrdPool = newPool - mi.poolMu.Unlock() + defer mi.poolMu.Unlock() + + obj := mi.object + if obj == nil { + return fmt.Errorf("MrdInstance::RecreateMRD: object is nil") + } - // Close the old pool after swapping. - if oldPool != nil { - oldPool.Close() + if err := mi.createAndSwapPool(obj); err != nil { + return fmt.Errorf("MrdInstance::RecreateMRD: failed to create new pool: %w", err) } + return nil } @@ -208,11 +252,35 @@ func (mi *MrdInstance) RecreateMRD() error { func (mi *MrdInstance) closePool() { mi.poolMu.Lock() defer mi.poolMu.Unlock() - if mi.mrdPool != nil { - // Close the pool. - mi.mrdPool.Close() - mi.mrdPool = nil + pool := mi.mrdPool + mi.mrdPool = nil + closePoolWithTimeout(pool, "MrdInstance::closePool", mrdPoolCloseTimeout) +} + +// closePoolWithTimeout closes the given MRD pool in a separate goroutine with a timeout. +// If closing the pool takes longer than the specified timeout, a warning is logged. +func closePoolWithTimeout(pool *MRDPool, caller string, timeout time.Duration) { + if pool == nil { + return } + + go func() { + done := make(chan struct{}) + go func() { + defer close(done) + pool.Close() + }() + + select { + case <-done: + case <-time.After(timeout): + var objectName string + if pool.poolConfig != nil && pool.poolConfig.object != nil { + objectName = pool.poolConfig.object.Name + } + logger.Warnf("%s: MRDPool.Close() timed out after %v for object %s", caller, timeout, objectName) + } + }() } // Destroy completely destroys the MrdInstance, cleaning up @@ -254,7 +322,7 @@ func (mi *MrdInstance) IncrementRefCount() { // Remove from cache deletedEntry := mi.mrdCache.Erase(getKey(mi.inodeId)) if deletedEntry != nil { - logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance (%s) erased from cache", mi.object.Name) + logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance Inode (%d) erased from cache", mi.inodeId) } } } @@ -303,13 +371,14 @@ func (mi *MrdInstance) DecrementRefCount() { // This is a safe order. evictedValues, err := mi.mrdCache.Insert(getKey(mi.inodeId), mi) if err != nil { - logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for object (%s) into cache, destroying immediately: %v", mi.object.Name, err) + logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for inode (%d) into cache, destroying immediately: %v", mi.inodeId, err) // The instance could not be inserted into the cache. Since the refCount is 0, // we must close it now to prevent it from being leaked. mi.closePool() return } - logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for object (%s) added to cache", mi.object.Name) + + logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for inode (%d) added to cache", mi.inodeId) // Do not proceed if no eviction happened. if evictedValues == nil { diff --git a/internal/gcsx/mrd_instance_test.go b/internal/gcsx/mrd_instance_test.go index 06d31a9d4d..703222073e 100644 --- a/internal/gcsx/mrd_instance_test.go +++ b/internal/gcsx/mrd_instance_test.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "strconv" + "sync" "testing" "time" @@ -257,7 +258,7 @@ func (t *MrdInstanceTest) TestRecreateMRD() { assert.NoError(t.T(), err) assert.NotNil(t.T(), t.mrdInstance.mrdPool) - assert.NotEqual(t.T(), pool1, t.mrdInstance.mrdPool) + assert.NotSame(t.T(), pool1, t.mrdInstance.mrdPool) } func (t *MrdInstanceTest) TestDestroy() { @@ -542,3 +543,174 @@ func (t *MrdInstanceTest) TestHandleEviction_SafeToClose() { assert.Nil(t.T(), t.mrdInstance.mrdPool) t.mrdInstance.poolMu.RUnlock() } + +func (t *MrdInstanceTest) TestCreateAndSwapPool_Success() { + // Setup initial state + initialObj := &gcs.MinObject{Name: "old", Generation: 1} + t.mrdInstance.object = initialObj + // Create an initial pool + fakeMRD1 := fake.NewFakeMultiRangeDownloader(initialObj, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once() + err := t.mrdInstance.ensureMRDPool() + assert.NoError(t.T(), err) + oldPool := t.mrdInstance.mrdPool + assert.NotNil(t.T(), oldPool) + // Prepare for new pool creation + newObj := &gcs.MinObject{Name: "new", Generation: 2} + fakeMRD2 := fake.NewFakeMultiRangeDownloader(newObj, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once() + + // Call createAndSwapPool + t.mrdInstance.poolMu.Lock() + err = t.mrdInstance.createAndSwapPool(newObj) + t.mrdInstance.poolMu.Unlock() + + assert.NoError(t.T(), err) + assert.NotSame(t.T(), oldPool, t.mrdInstance.mrdPool) + assert.Equal(t.T(), newObj, t.mrdInstance.object) + assert.Equal(t.T(), fakeMRD2, t.mrdInstance.mrdPool.entries[0].mrd) +} + +func (t *MrdInstanceTest) TestCreateAndSwapPool_Failure() { + // Setup initial state + initialObj := &gcs.MinObject{Name: "old", Generation: 1} + t.mrdInstance.object = initialObj + // Create an initial pool + fakeMRD1 := fake.NewFakeMultiRangeDownloader(initialObj, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once() + err := t.mrdInstance.ensureMRDPool() + assert.NoError(t.T(), err) + oldPool := t.mrdInstance.mrdPool + assert.NotNil(t.T(), oldPool) + // Prepare for new pool creation failure + newObj := &gcs.MinObject{Name: "new", Generation: 2} + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("creation failed")).Once() + + // Call createAndSwapPool + t.mrdInstance.poolMu.Lock() + err = t.mrdInstance.createAndSwapPool(newObj) + t.mrdInstance.poolMu.Unlock() + + assert.Error(t.T(), err) + assert.Contains(t.T(), err.Error(), "creation failed") + // Verify state remains unchanged + assert.Equal(t.T(), oldPool, t.mrdInstance.mrdPool) + assert.Equal(t.T(), initialObj, t.mrdInstance.object) +} + +func (t *MrdInstanceTest) TestSetMinObject_NilObject() { + err := t.mrdInstance.SetMinObject(nil) + + assert.Error(t.T(), err) + + assert.Contains(t.T(), err.Error(), "Missing MinObject") +} + +func (t *MrdInstanceTest) TestSetMinObject_SameGeneration() { + // Setup + initialObj := t.mrdInstance.GetMinObject() + // Ensure pool exists. + fakeMRD1 := fake.NewFakeMultiRangeDownloader(initialObj, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once() + err := t.mrdInstance.ensureMRDPool() + assert.NoError(t.T(), err) + t.mrdInstance.poolMu.RLock() + initialPool := t.mrdInstance.mrdPool + t.mrdInstance.poolMu.RUnlock() + // Same generation update (e.g. size change). + newObj := &gcs.MinObject{ + Name: initialObj.Name, + Generation: initialObj.Generation, + Size: initialObj.Size + 100, + } + + err = t.mrdInstance.SetMinObject(newObj) + + assert.NoError(t.T(), err) + assert.Equal(t.T(), newObj, t.mrdInstance.GetMinObject()) + // Pool should not change for same generation. + t.mrdInstance.poolMu.RLock() + assert.Equal(t.T(), initialPool, t.mrdInstance.mrdPool) + t.mrdInstance.poolMu.RUnlock() +} + +func (t *MrdInstanceTest) TestSetMinObject_DifferentGeneration() { + // Setup + initialObj := t.mrdInstance.GetMinObject() + // Ensure pool exists. + fakeMRD1 := fake.NewFakeMultiRangeDownloader(initialObj, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once() + err := t.mrdInstance.ensureMRDPool() + assert.NoError(t.T(), err) + t.mrdInstance.poolMu.RLock() + initialPool := t.mrdInstance.mrdPool + t.mrdInstance.poolMu.RUnlock() + // New generation + newObj := &gcs.MinObject{ + Name: initialObj.Name, + Generation: initialObj.Generation + 1, + Size: initialObj.Size, + } + // Mock creation of new pool + fakeMRD2 := fake.NewFakeMultiRangeDownloader(newObj, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once() + + err = t.mrdInstance.SetMinObject(newObj) + + assert.NoError(t.T(), err) + assert.Equal(t.T(), newObj, t.mrdInstance.GetMinObject()) + t.mrdInstance.poolMu.RLock() + assert.NotSame(t.T(), initialPool, t.mrdInstance.mrdPool) + assert.NotNil(t.T(), t.mrdInstance.mrdPool) + t.mrdInstance.poolMu.RUnlock() +} + +func (t *MrdInstanceTest) TestGetMinObject() { + obj := t.mrdInstance.GetMinObject() + + assert.Equal(t.T(), t.object, obj) +} + +func (t *MrdInstanceTest) TestClosePoolWithTimeout_LogWarningOnTimeout() { + // 1. Capture logs. + var buf logBuffer + logger.SetOutput(&buf) + defer logger.SetOutput(os.Stdout) + // 2. Create a pool that blocks on Close(). + // MRDPool.Close() waits on creationWg. We increment it to block Close(). + pool := &MRDPool{ + poolConfig: &MRDPoolConfig{ + object: t.object, + }, + } + pool.creationWg.Add(1) + + // 3. Call the function. + closePoolWithTimeout(pool, "TestCaller", 10*time.Millisecond) + + // 4. Wait enough time for timeout to trigger. + time.Sleep(50 * time.Millisecond) + // 5. Verify log. + assert.Contains(t.T(), buf.String(), "TestCaller: MRDPool.Close() timed out") + assert.Contains(t.T(), buf.String(), t.object.Name) + // 7. Cleanup: Unblock the pool closure to avoid goroutine leak. + pool.creationWg.Done() +} + +// logBuffer is a thread-safe buffer for capturing logs in tests. +type logBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (b *logBuffer) Write(p []byte) (n int, err error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.Write(p) +} + +func (b *logBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} diff --git a/internal/gcsx/mrd_simple_reader.go b/internal/gcsx/mrd_simple_reader.go index e6588373d0..5ad91287f5 100644 --- a/internal/gcsx/mrd_simple_reader.go +++ b/internal/gcsx/mrd_simple_reader.go @@ -101,8 +101,9 @@ func (msr *MrdSimpleReader) Destroy() { // No need to take lock as Destroy will only be called when file handle is being released // and there will be no read calls at that point. if msr.mrdInstance != nil { - msr.mrdInstanceInUse.Store(false) - msr.mrdInstance.DecrementRefCount() + if msr.mrdInstanceInUse.CompareAndSwap(true, false) { + msr.mrdInstance.DecrementRefCount() + } msr.mrdInstance = nil } } diff --git a/internal/gcsx/multi_range_downloader_wrapper.go b/internal/gcsx/multi_range_downloader_wrapper.go index e4e4e90869..0ef5db6be4 100644 --- a/internal/gcsx/multi_range_downloader_wrapper.go +++ b/internal/gcsx/multi_range_downloader_wrapper.go @@ -80,6 +80,8 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) SetMinObject(minObj *gcs.MinObjec if minObj == nil { return fmt.Errorf("MultiRangeDownloaderWrapper::SetMinObject: Missing MinObject") } + mrdWrapper.mu.Lock() + defer mrdWrapper.mu.Unlock() mrdWrapper.object = minObj return nil } @@ -93,6 +95,8 @@ func wrapperKey(wrapper *MultiRangeDownloaderWrapper) string { // GetMinObject returns the minObject stored in MultiRangeDownloaderWrapper. Used only for unit testing. func (mrdWrapper *MultiRangeDownloaderWrapper) GetMinObject() *gcs.MinObject { + mrdWrapper.mu.RLock() + defer mrdWrapper.mu.RUnlock() return mrdWrapper.object }