Skip to content
Open
21 changes: 12 additions & 9 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,13 @@ func (f *FileInode) DeRegisterFileHandle(readOnly bool) {

// LOCKS_REQUIRED(f.mu)
// UpdateSize updates the size of the backing GCS object. It also calls
// updateMRDWrapper to ensure that the multi-range downloader (which is used
// updateMRD to ensure that the multi-range downloader (which is used
// for random reads) is aware of the new size. This prevents the downloader
// from operating on stale object information.
func (f *FileInode) UpdateSize(size uint64) {
f.src.Size = size
f.attrs.Size = size
f.updateMRDWrapper()
f.updateMRD()
}

// LOCKS_REQUIRED(f.mu)
Expand Down Expand Up @@ -826,7 +826,7 @@ func (f *FileInode) SetMtime(
minObj = *minObjPtr
}
f.src = minObj
f.updateMRDWrapper()
f.updateMRD()
return
}

Expand Down Expand Up @@ -986,7 +986,7 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Update MRDWrapper
f.updateMRDWrapper()
f.updateMRD()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the inode gets updated with new generation we should invalidate the MRD instance and create it again

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're leaving object unfinalized, then the generation won't change and we don't have to recreate the MRD. I can recreate it if FinalizeFileForRapid is set to true. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled generation change scenario in SetMinObject.

// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
Expand All @@ -998,12 +998,15 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
}
}

// Updates the min object stored in MRDWrapper corresponding to the inode.
// Updates the min object stored in MRDWrapper & MRDInstance corresponding to the inode.
// Should be called when minObject associated with inode is updated.
func (f *FileInode) updateMRDWrapper() {
err := f.MRDWrapper.SetMinObject(f.Source())
if err != nil {
logger.Errorf("FileInode::updateMRDWrapper Error in setting minObject %v", err)
func (f *FileInode) updateMRD() {
minObj := f.Source()
if err := f.mrdInstance.SetMinObject(minObj); err != nil {
logger.Errorf("FileInode::updateMRD Error in setting minObject for MrdInstance %v", err)
}
if err := f.MRDWrapper.SetMinObject(minObj); err != nil {
logger.Errorf("FileInode::updateMRD Error in setting minObject for MRDWrapper %v", err)
}
}

Expand Down
64 changes: 32 additions & 32 deletions internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,22 @@ func (t *FileTest) createBufferedWriteHandler(shouldInitialize bool, openMode ut
}
}

func (t *FileTest) validateMrdInstanceMinObject() {
t.T().Helper()
// Validate MinObject in inode and MRDInstance points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject())
// Validate MinObject in MRDInstance is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject())
}

func (t *FileTest) validateMrdWrapperMinObject() {
t.T().Helper()
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
}

////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -589,10 +605,8 @@ func (t *FileTest) TestWriteThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -679,10 +693,8 @@ func (t *FileTest) TestWriteToLocalFileThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()
// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
require.NoError(t.T(), err)
Expand Down Expand Up @@ -742,10 +754,8 @@ func (t *FileTest) TestSyncEmptyLocalFile() {
assert.Equal(t.T(), t.in.SourceGeneration().Metadata, m.MetaGeneration)
assert.Equal(t.T(), t.in.SourceGeneration().Size, m.Size)
assert.Equal(t.T(), uint64(0), m.Size)
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()
// Validate the mtime.
mtimeInBucket, ok := m.Metadata["gcsfuse_mtime"]
assert.True(t.T(), ok)
Expand Down Expand Up @@ -820,10 +830,8 @@ func (t *FileTest) TestAppendThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
Expand Down Expand Up @@ -916,10 +924,8 @@ func (t *FileTest) TestTruncateDownwardThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -989,10 +995,8 @@ func (t *FileTest) TestTruncateUpwardThenFlush() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -1289,10 +1293,8 @@ func (t *FileTest) TestSyncFlush_Clobbered() {
err = t.in.Flush(t.ctx)
}

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Check if the error is a FileClobberedError
var fcErr *gcsfuse_errors.FileClobberedError
Expand Down Expand Up @@ -1421,10 +1423,8 @@ func (t *FileTest) TestSetMtime_ContentDirty() {
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

require.NoError(t.T(), err)
assert.NotNil(t.T(), m)
Expand Down
32 changes: 24 additions & 8 deletions internal/gcsx/mrd_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"strconv"
"sync"
"sync/atomic"

"github.com/googlecloudplatform/gcsfuse/v3/cfg"
"github.com/googlecloudplatform/gcsfuse/v3/internal/cache/lru"
Expand All @@ -38,7 +39,7 @@ type MrdInstance struct {
// inodeId is the ID of the file inode associated with this instance.
inodeId fuseops.InodeID
// object is the GCS object for which the downloaders are created.
object *gcs.MinObject
object atomic.Pointer[gcs.MinObject]
// bucket is the GCS bucket containing the object.
bucket gcs.Bucket
// refCount tracks the number of active users of this instance.
Expand All @@ -55,13 +56,28 @@ type MrdInstance struct {

// NewMrdInstance creates a new MrdInstance for a given GCS object.
func NewMrdInstance(obj *gcs.MinObject, bucket gcs.Bucket, cache *lru.Cache, inodeId fuseops.InodeID, cfg cfg.MrdConfig) *MrdInstance {
return &MrdInstance{
object: obj,
mrdInstance := MrdInstance{
bucket: bucket,
mrdCache: cache,
inodeId: inodeId,
mrdConfig: cfg,
}
mrdInstance.object.Store(obj)
return &mrdInstance
}

// SetMinObject sets the gcs.MinObject stored in the MrdInstance to passed value, only if it's non nil.
func (mi *MrdInstance) SetMinObject(minObj *gcs.MinObject) error {
if minObj == nil {
return fmt.Errorf("MrdInstance::SetMinObject: Missing MinObject")
}
mi.object.Store(minObj)
return nil
}

// GetMinObject returns the gcs.MinObject stored in MrdInstance.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only for testing? If yes, can we mention that in the comment.

func (mi *MrdInstance) GetMinObject() *gcs.MinObject {
return mi.object.Load()
}

// getMRDEntry returns a valid MRDEntry from the pool.
Expand Down Expand Up @@ -157,7 +173,7 @@ func (mi *MrdInstance) ensureMRDPool() (err error) {
}

// Creating a new pool. Not reusing any handle while creating a new pool.
mi.mrdPool, err = NewMRDPool(&MRDPoolConfig{PoolSize: int(mi.mrdConfig.PoolSize), object: mi.object, bucket: mi.bucket}, nil)
mi.mrdPool, err = NewMRDPool(&MRDPoolConfig{PoolSize: int(mi.mrdConfig.PoolSize), object: mi.object.Load(), bucket: mi.bucket}, nil)
if err != nil {
err = fmt.Errorf("MrdInstance::ensureMRDPool Error in creating MRDPool: %w", err)
}
Expand All @@ -171,7 +187,7 @@ func (mi *MrdInstance) RecreateMRD() error {
// Create the new pool first to avoid a period where mrdPool is nil.
newPool, err := NewMRDPool(&MRDPoolConfig{
PoolSize: int(mi.mrdConfig.PoolSize),
object: mi.object,
object: mi.object.Load(),
bucket: mi.bucket,
}, nil)
if err != nil {
Expand Down Expand Up @@ -218,7 +234,7 @@ func (mi *MrdInstance) IncrementRefCount() {
// Remove from cache
deletedEntry := mi.mrdCache.Erase(getKey(mi.inodeId))
if deletedEntry != nil {
logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance (%s) erased from cache", mi.object.Name)
logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance (%s) erased from cache", mi.object.Load().Name)
}
}
}
Expand Down Expand Up @@ -269,13 +285,13 @@ func (mi *MrdInstance) DecrementRefCount() {
// This is a safe order.
evictedValues, err := mi.mrdCache.Insert(getKey(mi.inodeId), mi)
if err != nil {
logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for object (%s) into cache, destroying immediately: %v", mi.object.Name, err)
logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for object (%s) into cache, destroying immediately: %v", mi.object.Load().Name, err)
// The instance could not be inserted into the cache. Since the refCount is 0,
// we must destroy it now to prevent it from being leaked.
mi.Destroy()
return
}
logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for object (%s) added to cache", mi.object.Name)
logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for object (%s) added to cache", mi.object.Load().Name)

// Do not proceed if no eviction happened.
if evictedValues == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/gcsx/mrd_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (t *MrdInstanceTest) SetupTest() {
}

func (t *MrdInstanceTest) TestNewMrdInstance() {
assert.Equal(t.T(), t.object, t.mrdInstance.object)
assert.Equal(t.T(), t.object, t.mrdInstance.object.Load())
assert.Equal(t.T(), t.bucket, t.mrdInstance.bucket)
assert.Equal(t.T(), t.cache, t.mrdInstance.mrdCache)
assert.Equal(t.T(), t.inodeID, t.mrdInstance.inodeId)
Expand Down
5 changes: 3 additions & 2 deletions internal/gcsx/mrd_simple_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ func (msr *MrdSimpleReader) Destroy() {
// No need to take lock as Destroy will only be called when file handle is being released
// and there will be no read calls at that point.
if msr.mrdInstance != nil {
msr.mrdInstanceInUse.Store(false)
msr.mrdInstance.DecrementRefCount()
if msr.mrdInstanceInUse.CompareAndSwap(true, false) {
msr.mrdInstance.DecrementRefCount()
}
msr.mrdInstance = nil
}
}
4 changes: 4 additions & 0 deletions internal/gcsx/multi_range_downloader_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) SetMinObject(minObj *gcs.MinObjec
if minObj == nil {
return fmt.Errorf("MultiRangeDownloaderWrapper::SetMinObject: Missing MinObject")
}
mrdWrapper.mu.Lock()
defer mrdWrapper.mu.Unlock()
mrdWrapper.object = minObj
return nil
}
Expand All @@ -93,6 +95,8 @@ func wrapperKey(wrapper *MultiRangeDownloaderWrapper) string {

// GetMinObject returns the minObject stored in MultiRangeDownloaderWrapper. Used only for unit testing.
func (mrdWrapper *MultiRangeDownloaderWrapper) GetMinObject() *gcs.MinObject {
mrdWrapper.mu.RLock()
defer mrdWrapper.mu.RUnlock()
return mrdWrapper.object
}

Expand Down