Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,13 @@ func (f *FileInode) DeRegisterFileHandle(readOnly bool) {

// LOCKS_REQUIRED(f.mu)
// UpdateSize updates the size of the backing GCS object. It also calls
// updateMRDWrapper to ensure that the multi-range downloader (which is used
// updateMRD to ensure that the multi-range downloader (which is used
// for random reads) is aware of the new size. This prevents the downloader
// from operating on stale object information.
func (f *FileInode) UpdateSize(size uint64) {
f.src.Size = size
f.attrs.Size = size
f.updateMRDWrapper()
f.updateMRD()
}

// LOCKS_REQUIRED(f.mu)
Expand Down Expand Up @@ -826,7 +826,7 @@ func (f *FileInode) SetMtime(
minObj = *minObjPtr
}
f.src = minObj
f.updateMRDWrapper()
f.updateMRD()
return
}

Expand Down Expand Up @@ -986,7 +986,7 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Update MRDWrapper
f.updateMRDWrapper()
f.updateMRD()
// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
Expand All @@ -998,12 +998,15 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
}
}

// Updates the min object stored in MRDWrapper corresponding to the inode.
// Updates the min object stored in MRDWrapper & MRDInstance corresponding to the inode.
// Should be called when minObject associated with inode is updated.
func (f *FileInode) updateMRDWrapper() {
err := f.MRDWrapper.SetMinObject(f.Source())
if err != nil {
logger.Errorf("FileInode::updateMRDWrapper Error in setting minObject %v", err)
func (f *FileInode) updateMRD() {
minObj := f.Source()
if err := f.mrdInstance.SetMinObject(minObj); err != nil {
logger.Errorf("FileInode::updateMRD Error in setting minObject for MrdInstance %v", err)
}
if err := f.MRDWrapper.SetMinObject(minObj); err != nil {
logger.Errorf("FileInode::updateMRD Error in setting minObject for MRDWrapper %v", err)
}
}

Expand Down
64 changes: 32 additions & 32 deletions internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,22 @@ func (t *FileTest) createBufferedWriteHandler(shouldInitialize bool, openMode ut
}
}

func (t *FileTest) validateMrdInstanceMinObject() {
t.T().Helper()
// Validate MinObject in inode and MRDInstance points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject())
// Validate MinObject in MRDInstance is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.mrdInstance.GetMinObject())
}

func (t *FileTest) validateMrdWrapperMinObject() {
t.T().Helper()
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
}

////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -589,10 +605,8 @@ func (t *FileTest) TestWriteThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -679,10 +693,8 @@ func (t *FileTest) TestWriteToLocalFileThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()
// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
require.NoError(t.T(), err)
Expand Down Expand Up @@ -742,10 +754,8 @@ func (t *FileTest) TestSyncEmptyLocalFile() {
assert.Equal(t.T(), t.in.SourceGeneration().Metadata, m.MetaGeneration)
assert.Equal(t.T(), t.in.SourceGeneration().Size, m.Size)
assert.Equal(t.T(), uint64(0), m.Size)
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()
// Validate the mtime.
mtimeInBucket, ok := m.Metadata["gcsfuse_mtime"]
assert.True(t.T(), ok)
Expand Down Expand Up @@ -820,10 +830,8 @@ func (t *FileTest) TestAppendThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
Expand Down Expand Up @@ -916,10 +924,8 @@ func (t *FileTest) TestTruncateDownwardThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -989,10 +995,8 @@ func (t *FileTest) TestTruncateUpwardThenFlush() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
Expand Down Expand Up @@ -1289,10 +1293,8 @@ func (t *FileTest) TestSyncFlush_Clobbered() {
err = t.in.Flush(t.ctx)
}

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

// Check if the error is a FileClobberedError
var fcErr *gcsfuse_errors.FileClobberedError
Expand Down Expand Up @@ -1421,10 +1423,8 @@ func (t *FileTest) TestSetMtime_ContentDirty() {
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)

// Validate MinObject in inode and MRDWrapper points to different copy of MinObject.
assert.NotSame(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate MinObject in MRDWrapper is equal to the MinObject in inode.
assert.Equal(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
t.validateMrdWrapperMinObject()
t.validateMrdInstanceMinObject()

require.NoError(t.T(), err)
assert.NotNil(t.T(), m)
Expand Down
32 changes: 24 additions & 8 deletions internal/gcsx/mrd_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"strconv"
"sync"
"sync/atomic"

"github.com/googlecloudplatform/gcsfuse/v3/cfg"
"github.com/googlecloudplatform/gcsfuse/v3/internal/cache/lru"
Expand All @@ -38,7 +39,7 @@ type MrdInstance struct {
// inodeId is the ID of the file inode associated with this instance.
inodeId fuseops.InodeID
// object is the GCS object for which the downloaders are created.
object *gcs.MinObject
object atomic.Pointer[gcs.MinObject]
// bucket is the GCS bucket containing the object.
bucket gcs.Bucket
// refCount tracks the number of active users of this instance.
Expand All @@ -55,13 +56,28 @@ type MrdInstance struct {

// NewMrdInstance creates a new MrdInstance for a given GCS object.
func NewMrdInstance(obj *gcs.MinObject, bucket gcs.Bucket, cache *lru.Cache, inodeId fuseops.InodeID, cfg cfg.MrdConfig) *MrdInstance {
return &MrdInstance{
object: obj,
mrdInstance := MrdInstance{
bucket: bucket,
mrdCache: cache,
inodeId: inodeId,
mrdConfig: cfg,
}
mrdInstance.object.Store(obj)
return &mrdInstance
}

// SetMinObject sets the gcs.MinObject stored in the MrdInstance to passed value, only if it's non nil.
func (mi *MrdInstance) SetMinObject(minObj *gcs.MinObject) error {
if minObj == nil {
return fmt.Errorf("MrdInstance::SetMinObject: Missing MinObject")
}
mi.object.Store(minObj)
return nil
}

// GetMinObject returns the gcs.MinObject stored in MrdInstance.
func (mi *MrdInstance) GetMinObject() *gcs.MinObject {
return mi.object.Load()
}

// getMRDEntry returns a valid MRDEntry from the pool.
Expand Down Expand Up @@ -157,7 +173,7 @@ func (mi *MrdInstance) ensureMRDPool() (err error) {
}

// Creating a new pool. Not reusing any handle while creating a new pool.
mi.mrdPool, err = NewMRDPool(&MRDPoolConfig{PoolSize: int(mi.mrdConfig.PoolSize), object: mi.object, bucket: mi.bucket}, nil)
mi.mrdPool, err = NewMRDPool(&MRDPoolConfig{PoolSize: int(mi.mrdConfig.PoolSize), object: mi.object.Load(), bucket: mi.bucket}, nil)
if err != nil {
err = fmt.Errorf("MrdInstance::ensureMRDPool Error in creating MRDPool: %w", err)
}
Expand All @@ -171,7 +187,7 @@ func (mi *MrdInstance) RecreateMRD() error {
// Create the new pool first to avoid a period where mrdPool is nil.
newPool, err := NewMRDPool(&MRDPoolConfig{
PoolSize: int(mi.mrdConfig.PoolSize),
object: mi.object,
object: mi.object.Load(),
bucket: mi.bucket,
}, nil)
if err != nil {
Expand Down Expand Up @@ -218,7 +234,7 @@ func (mi *MrdInstance) IncrementRefCount() {
// Remove from cache
deletedEntry := mi.mrdCache.Erase(getKey(mi.inodeId))
if deletedEntry != nil {
logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance (%s) erased from cache", mi.object.Name)
logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance (%s) erased from cache", mi.object.Load().Name)
}
}
}
Expand Down Expand Up @@ -269,13 +285,13 @@ func (mi *MrdInstance) DecrementRefCount() {
// This is a safe order.
evictedValues, err := mi.mrdCache.Insert(getKey(mi.inodeId), mi)
if err != nil {
logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for object (%s) into cache, destroying immediately: %v", mi.object.Name, err)
logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for object (%s) into cache, destroying immediately: %v", mi.object.Load().Name, err)
// The instance could not be inserted into the cache. Since the refCount is 0,
// we must destroy it now to prevent it from being leaked.
mi.Destroy()
return
}
logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for object (%s) added to cache", mi.object.Name)
logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for object (%s) added to cache", mi.object.Load().Name)

// Do not proceed if no eviction happened.
if evictedValues == nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/gcsx/mrd_simple_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ func (msr *MrdSimpleReader) Destroy() {
// No need to take lock as Destroy will only be called when file handle is being released
// and there will be no read calls at that point.
if msr.mrdInstance != nil {
msr.mrdInstanceInUse.Store(false)
msr.mrdInstance.DecrementRefCount()
if msr.mrdInstanceInUse.CompareAndSwap(true, false) {
msr.mrdInstance.DecrementRefCount()
}
msr.mrdInstance = nil
}
}
Loading