Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,13 @@ func (f *FileInode) DeRegisterFileHandle(readOnly bool) {

// LOCKS_REQUIRED(f.mu)
// UpdateSize updates the size of the backing GCS object. It also calls
// updateMRDWrapper to ensure that the multi-range downloader (which is used
// updateMRD to ensure that the multi-range downloader (which is used
// for random reads) is aware of the new size. This prevents the downloader
// from operating on stale object information.
func (f *FileInode) UpdateSize(size uint64) {
f.src.Size = size
f.attrs.Size = size
f.updateMRDWrapper()
f.updateMRD()
}

// LOCKS_REQUIRED(f.mu)
Expand Down Expand Up @@ -829,7 +829,7 @@ func (f *FileInode) SetMtime(
minObj = *minObjPtr
}
f.src = minObj
f.updateMRDWrapper()
f.updateMRD()
return
}

Expand Down Expand Up @@ -989,7 +989,7 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Update MRDWrapper
f.updateMRDWrapper()
f.updateMRD()
// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
Expand All @@ -1001,12 +1001,15 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
}
}

// Updates the min object stored in MRDWrapper corresponding to the inode.
// Updates the min object stored in MRDWrapper & MRDInstance corresponding to the inode.
// Should be called when minObject associated with inode is updated.
func (f *FileInode) updateMRDWrapper() {
err := f.MRDWrapper.SetMinObject(f.Source())
if err != nil {
logger.Errorf("FileInode::updateMRDWrapper Error in setting minObject %v", err)
func (f *FileInode) updateMRD() {
minObj := f.Source()
if err := f.mrdInstance.SetMinObject(minObj); err != nil {
logger.Errorf("FileInode::updateMRD Error in setting minObject for MrdInstance %v", err)
}
if err := f.MRDWrapper.SetMinObject(minObj); err != nil {
logger.Errorf("FileInode::updateMRD Error in setting minObject for MRDWrapper %v", err)
}
}

Expand Down
64 changes: 32 additions & 32 deletions internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ func (t *FileTest) createBufferedWriteHandler(shouldInitialize bool, openMode ut
}
}

func (t *FileTest) validateMrdInstanceMinObject() {
t.T().Helper()
// Validate MinObject in inode and MRDInstance points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject())
// Validate MinObject in MRDInstance is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject())
}

func (t *FileTest) validateMrdWrapperMinObject() {
t.T().Helper()
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
}

////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -609,10 +625,8 @@ func (t *FileTest) TestWriteThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -699,10 +713,8 @@ func (t *FileTest) TestWriteToLocalFileThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()
// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
require.NoError(t.T(), err)
Expand Down Expand Up @@ -762,10 +774,8 @@ func (t *FileTest) TestSyncEmptyLocalFile() {
assert.Equal(t.T(), t.in.SourceGeneration().Metadata, m.MetaGeneration)
assert.Equal(t.T(), t.in.SourceGeneration().Size, m.Size)
assert.Equal(t.T(), uint64(0), m.Size)
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()
// Validate the mtime.
mtimeInBucket, ok := m.Metadata["gcsfuse_mtime"]
assert.True(t.T(), ok)
Expand Down Expand Up @@ -840,10 +850,8 @@ func (t *FileTest) TestAppendThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
Expand Down Expand Up @@ -936,10 +944,8 @@ func (t *FileTest) TestTruncateDownwardThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -1009,10 +1015,8 @@ func (t *FileTest) TestTruncateUpwardThenFlush() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -1309,10 +1313,8 @@ func (t *FileTest) TestSyncFlush_Clobbered() {
err = t.in.Flush(t.ctx)
}

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Check if the error is a FileClobberedError
var fcErr *gcsfuse_errors.FileClobberedError
Expand Down Expand Up @@ -1441,10 +1443,8 @@ func (t *FileTest) TestSetMtime_ContentDirty() {
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

require.NoError(t.T(), err)
assert.NotNil(t.T(), m)
Expand Down
121 changes: 95 additions & 26 deletions internal/gcsx/mrd_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/jacobsa/fuse/fuseops"
)

const mrdPoolCloseTimeout = 120 * time.Second

// MrdInstance manages a pool of Multi-Range Downloader (MRD) instances for a
// single file inode. It handles the lifecycle of the MRD pool, including
// creation, destruction, and caching.
Expand All @@ -57,15 +59,64 @@ type MrdInstance struct {
}

// NewMrdInstance creates a new MrdInstance for a given GCS object.
// Passed config should not be nil. Code will panic otherwise.
func NewMrdInstance(obj *gcs.MinObject, bucket gcs.Bucket, cache *lru.Cache, inodeId fuseops.InodeID, config *cfg.Config) *MrdInstance {
return &MrdInstance{
object: obj,
mrdInstance := MrdInstance{
bucket: bucket,
mrdCache: cache,
inodeId: inodeId,
config: config,
object: obj,
}
return &mrdInstance
}

// SetMinObject sets the gcs.MinObject stored in the MrdInstance to passed value, only if it's non nil.
// If the generation of the object has changed, it recreates the MRD pool to ensure consistency.
func (mi *MrdInstance) SetMinObject(minObj *gcs.MinObject) error {
if minObj == nil {
return fmt.Errorf("MrdInstance::SetMinObject: Missing MinObject")
}

mi.poolMu.Lock()
defer mi.poolMu.Unlock()

oldObj := mi.object
// If generation matches, just update the object (e.g. for size updates) and return.
if oldObj != nil && oldObj.Generation == minObj.Generation {
mi.object = minObj
return nil
}

// Generations differ, need to create and swap a new pool.
if err := mi.createAndSwapPool(minObj); err != nil {
return fmt.Errorf("MrdInstance::SetMinObject: failed to create and swap pool: %w", err)
}

return nil
}

// createAndSwapPool creates a new MRD pool and swaps it with the existing one.
// It also updates the minObject with the passed object & closes the old pool.
// LOCKS_REQUIRED(mi.poolMu).
func (mi *MrdInstance) createAndSwapPool(obj *gcs.MinObject) error {
newPool, err := NewMRDPool(&MRDPoolConfig{PoolSize: int(mi.config.Mrd.PoolSize), object: obj, bucket: mi.bucket}, nil)
if err != nil {
return err
}

oldPool := mi.mrdPool
mi.mrdPool = newPool
mi.object = obj

closePoolWithTimeout(oldPool, "MrdInstance::createAndSwapPool", mrdPoolCloseTimeout)
return nil
}

// GetMinObject returns the gcs.MinObject stored in MrdInstance. Used only for unit testing.
func (mi *MrdInstance) GetMinObject() *gcs.MinObject {
mi.poolMu.RLock()
defer mi.poolMu.RUnlock()
return mi.object
}

// getMRDEntry returns a valid MRDEntry from the pool.
Expand Down Expand Up @@ -182,37 +233,54 @@ func (mi *MrdInstance) ensureMRDPool() (err error) {
// file inode when the backing GCS object's generation changes, invalidating
// all existing downloader instances.
func (mi *MrdInstance) RecreateMRD() error {
// Create the new pool first to avoid a period where mrdPool is nil.
newPool, err := NewMRDPool(&MRDPoolConfig{
PoolSize: int(mi.config.Mrd.PoolSize),
object: mi.object,
bucket: mi.bucket,
}, nil)
if err != nil {
return fmt.Errorf("MrdInstance::RecreateMRD Error in recreating MRD: %w", err)
}

mi.poolMu.Lock()
oldPool := mi.mrdPool
mi.mrdPool = newPool
mi.poolMu.Unlock()
defer mi.poolMu.Unlock()

obj := mi.object
if obj == nil {
return fmt.Errorf("MrdInstance::RecreateMRD: object is nil")
}

// Close the old pool after swapping.
if oldPool != nil {
oldPool.Close()
if err := mi.createAndSwapPool(obj); err != nil {
return fmt.Errorf("MrdInstance::RecreateMRD: failed to create new pool: %w", err)
}

return nil
}

// closePool closes all MRD instances in the pool and releases associated resources.
func (mi *MrdInstance) closePool() {
mi.poolMu.Lock()
defer mi.poolMu.Unlock()
if mi.mrdPool != nil {
// Close the pool.
mi.mrdPool.Close()
mi.mrdPool = nil
pool := mi.mrdPool
mi.mrdPool = nil
closePoolWithTimeout(pool, "MrdInstance::closePool", mrdPoolCloseTimeout)
}

// closePoolWithTimeout closes the given MRD pool in a separate goroutine with a timeout.
// If closing the pool takes longer than the specified timeout, a warning is logged.
func closePoolWithTimeout(pool *MRDPool, caller string, timeout time.Duration) {
if pool == nil {
return
}

go func() {
done := make(chan struct{})
go func() {
defer close(done)
pool.Close()
}()

select {
case <-done:
case <-time.After(timeout):
var objectName string
if pool.poolConfig != nil && pool.poolConfig.object != nil {
objectName = pool.poolConfig.object.Name
}
logger.Warnf("%s: MRDPool.Close() timed out after %v for object %s", caller, timeout, objectName)
}
}()
}

// Destroy completely destroys the MrdInstance, cleaning up
Expand Down Expand Up @@ -254,7 +322,7 @@ func (mi *MrdInstance) IncrementRefCount() {
// Remove from cache
deletedEntry := mi.mrdCache.Erase(getKey(mi.inodeId))
if deletedEntry != nil {
logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance (%s) erased from cache", mi.object.Name)
logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance Inode (%d) erased from cache", mi.inodeId)
}
}
}
Expand Down Expand Up @@ -303,13 +371,14 @@ func (mi *MrdInstance) DecrementRefCount() {
// This is a safe order.
evictedValues, err := mi.mrdCache.Insert(getKey(mi.inodeId), mi)
if err != nil {
logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for object (%s) into cache, destroying immediately: %v", mi.object.Name, err)
logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for inode (%d) into cache, destroying immediately: %v", mi.inodeId, err)
// The instance could not be inserted into the cache. Since the refCount is 0,
// we must close it now to prevent it from being leaked.
mi.closePool()
return
}
logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for object (%s) added to cache", mi.object.Name)

logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for inode (%d) added to cache", mi.inodeId)

// Do not proceed if no eviction happened.
if evictedValues == nil {
Expand Down
Loading
Loading