Skip to content
Open
21 changes: 12 additions & 9 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,13 @@ func (f *FileInode) DeRegisterFileHandle(readOnly bool) {

// LOCKS_REQUIRED(f.mu)
// UpdateSize updates the size of the backing GCS object. It also calls
// updateMRDWrapper to ensure that the multi-range downloader (which is used
// updateMRD to ensure that the multi-range downloader (which is used
// for random reads) is aware of the new size. This prevents the downloader
// from operating on stale object information.
func (f *FileInode) UpdateSize(size uint64) {
f.src.Size = size
f.attrs.Size = size
f.updateMRDWrapper()
f.updateMRD()
}

// LOCKS_REQUIRED(f.mu)
Expand Down Expand Up @@ -829,7 +829,7 @@ func (f *FileInode) SetMtime(
minObj = *minObjPtr
}
f.src = minObj
f.updateMRDWrapper()
f.updateMRD()
return
}

Expand Down Expand Up @@ -989,7 +989,7 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Update MRDWrapper
f.updateMRDWrapper()
f.updateMRD()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the inode gets updated with new generation we should invalidate the MRD instance and create it again

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're leaving object unfinalized, then the generation won't change and we don't have to recreate the MRD. I can recreate it if FinalizeFileForRapid is set to true. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled generation change scenario in SetMinObject.

// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
Expand All @@ -1001,12 +1001,15 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
}
}

// Updates the min object stored in MRDWrapper corresponding to the inode.
// Updates the min object stored in MRDWrapper & MRDInstance corresponding to the inode.
// Should be called when minObject associated with inode is updated.
func (f *FileInode) updateMRDWrapper() {
err := f.MRDWrapper.SetMinObject(f.Source())
if err != nil {
logger.Errorf("FileInode::updateMRDWrapper Error in setting minObject %v", err)
func (f *FileInode) updateMRD() {
minObj := f.Source()
if err := f.mrdInstance.SetMinObject(minObj); err != nil {
logger.Errorf("FileInode::updateMRD Error in setting minObject for MrdInstance %v", err)
}
if err := f.MRDWrapper.SetMinObject(minObj); err != nil {
logger.Errorf("FileInode::updateMRD Error in setting minObject for MRDWrapper %v", err)
}
}

Expand Down
64 changes: 32 additions & 32 deletions internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ func (t *FileTest) createBufferedWriteHandler(shouldInitialize bool, openMode ut
}
}

func (t *FileTest) validateMrdInstanceMinObject() {
t.T().Helper()
// Validate MinObject in inode and MRDInstance points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject())
// Validate MinObject in MRDInstance is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject())
}

func (t *FileTest) validateMrdWrapperMinObject() {
t.T().Helper()
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
}

////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -609,10 +625,8 @@ func (t *FileTest) TestWriteThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -699,10 +713,8 @@ func (t *FileTest) TestWriteToLocalFileThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()
// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
require.NoError(t.T(), err)
Expand Down Expand Up @@ -762,10 +774,8 @@ func (t *FileTest) TestSyncEmptyLocalFile() {
assert.Equal(t.T(), t.in.SourceGeneration().Metadata, m.MetaGeneration)
assert.Equal(t.T(), t.in.SourceGeneration().Size, m.Size)
assert.Equal(t.T(), uint64(0), m.Size)
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()
// Validate the mtime.
mtimeInBucket, ok := m.Metadata["gcsfuse_mtime"]
assert.True(t.T(), ok)
Expand Down Expand Up @@ -840,10 +850,8 @@ func (t *FileTest) TestAppendThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
Expand Down Expand Up @@ -936,10 +944,8 @@ func (t *FileTest) TestTruncateDownwardThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -1009,10 +1015,8 @@ func (t *FileTest) TestTruncateUpwardThenFlush() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -1309,10 +1313,8 @@ func (t *FileTest) TestSyncFlush_Clobbered() {
err = t.in.Flush(t.ctx)
}

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Check if the error is a FileClobberedError
var fcErr *gcsfuse_errors.FileClobberedError
Expand Down Expand Up @@ -1441,10 +1443,8 @@ func (t *FileTest) TestSetMtime_ContentDirty() {
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

require.NoError(t.T(), err)
assert.NotNil(t.T(), m)
Expand Down
97 changes: 71 additions & 26 deletions internal/gcsx/mrd_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,66 @@ type MrdInstance struct {
}

// NewMrdInstance creates a new MrdInstance for a given GCS object.
// Passed config should not be nil. Code will panic otherwise.
func NewMrdInstance(obj *gcs.MinObject, bucket gcs.Bucket, cache *lru.Cache, inodeId fuseops.InodeID, config *cfg.Config) *MrdInstance {
return &MrdInstance{
object: obj,
mrdInstance := MrdInstance{
bucket: bucket,
mrdCache: cache,
inodeId: inodeId,
config: config,
object: obj,
}
return &mrdInstance
}

// SetMinObject sets the gcs.MinObject stored in the MrdInstance to passed value, only if it's non nil.
// If the generation of the object has changed, it recreates the MRD pool to ensure consistency.
func (mi *MrdInstance) SetMinObject(minObj *gcs.MinObject) error {
if minObj == nil {
return fmt.Errorf("MrdInstance::SetMinObject: Missing MinObject")
}

mi.poolMu.Lock()
defer mi.poolMu.Unlock()

oldObj := mi.object
// If generation matches, just update the object (e.g. for size updates) and return.
if oldObj != nil && oldObj.Generation == minObj.Generation {
mi.object = minObj
return nil
}

// Generations differ, need to create and swap a new pool.
if err := mi.createAndSwapPool(minObj); err != nil {
return fmt.Errorf("MrdInstance::SetMinObject: failed to create and swap pool: %w", err)
}

return nil
}

// createAndSwapPool creates a new MRD pool and swaps it with the existing one.
// It also updates the minObject with the passed object & closes the old pool.
// LOCKS_REQUIRED(mi.poolMu).
func (mi *MrdInstance) createAndSwapPool(obj *gcs.MinObject) error {
newPool, err := NewMRDPool(&MRDPoolConfig{PoolSize: int(mi.config.Mrd.PoolSize), object: obj, bucket: mi.bucket}, nil)
if err != nil {
return err
}

oldPool := mi.mrdPool
mi.mrdPool = newPool
mi.object = obj

if oldPool != nil {
go oldPool.Close()
}
return nil
}

// GetMinObject returns the gcs.MinObject stored in MrdInstance.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only for testing? If yes, can we mention that in the comment.

func (mi *MrdInstance) GetMinObject() *gcs.MinObject {
mi.poolMu.RLock()
defer mi.poolMu.RUnlock()
return mi.object
}

// getMRDEntry returns a valid MRDEntry from the pool.
Expand Down Expand Up @@ -182,36 +233,29 @@ func (mi *MrdInstance) ensureMRDPool() (err error) {
// file inode when the backing GCS object's generation changes, invalidating
// all existing downloader instances.
func (mi *MrdInstance) RecreateMRD() error {
// Create the new pool first to avoid a period where mrdPool is nil.
newPool, err := NewMRDPool(&MRDPoolConfig{
PoolSize: int(mi.config.Mrd.PoolSize),
object: mi.object,
bucket: mi.bucket,
}, nil)
if err != nil {
return fmt.Errorf("MrdInstance::RecreateMRD Error in recreating MRD: %w", err)
}

mi.poolMu.Lock()
oldPool := mi.mrdPool
mi.mrdPool = newPool
mi.poolMu.Unlock()
defer mi.poolMu.Unlock()

// Close the old pool after swapping.
if oldPool != nil {
oldPool.Close()
obj := mi.object
if obj == nil {
return fmt.Errorf("MrdInstance::RecreateMRD: object is nil")
}

if err := mi.createAndSwapPool(obj); err != nil {
return fmt.Errorf("MrdInstance::RecreateMRD: failed to create new pool: %w", err)
}

return nil
}

// closePool closes all MRD instances in the pool and releases associated resources.
func (mi *MrdInstance) closePool() {
mi.poolMu.Lock()
defer mi.poolMu.Unlock()
if mi.mrdPool != nil {
// Close the pool.
mi.mrdPool.Close()
mi.mrdPool = nil
pool := mi.mrdPool
mi.mrdPool = nil
if pool != nil {
go pool.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just for performance? Do we know, how much will be save? I am just worried about the go-routine resource leak, in case multiple such Close gets stuck.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's for performance. It will wait for the active requests on the MRD to complete and then destroy the MRD. It will do this serially for each MRD in the pool.
Could you explain the resource-leak that you're thinking about in slightly more detail. Why do you think this can get stuck?

}
}

Expand Down Expand Up @@ -254,7 +298,7 @@ func (mi *MrdInstance) IncrementRefCount() {
// Remove from cache
deletedEntry := mi.mrdCache.Erase(getKey(mi.inodeId))
if deletedEntry != nil {
logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance (%s) erased from cache", mi.object.Name)
logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance Inode (%d) erased from cache", mi.inodeId)
}
}
}
Expand Down Expand Up @@ -303,13 +347,14 @@ func (mi *MrdInstance) DecrementRefCount() {
// This is a safe order.
evictedValues, err := mi.mrdCache.Insert(getKey(mi.inodeId), mi)
if err != nil {
logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for object (%s) into cache, destroying immediately: %v", mi.object.Name, err)
logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for inode (%d) into cache, destroying immediately: %v", mi.inodeId, err)
// The instance could not be inserted into the cache. Since the refCount is 0,
// we must close it now to prevent it from being leaked.
mi.closePool()
return
}
logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for object (%s) added to cache", mi.object.Name)

logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for inode (%d) added to cache", mi.inodeId)

// Do not proceed if no eviction happened.
if evictedValues == nil {
Expand Down
Loading
Loading