diff --git a/a.fio b/a.fio new file mode 100644 index 0000000000..a421d18cfb --- /dev/null +++ b/a.fio @@ -0,0 +1,20 @@ +[global] +ioengine=libaio +direct=0 +verify=0 +bs=1m +iodepth=2 +runtime=60s +time_based=0 +fadvise_hint=0 +nrfiles=10 +thread=1 +openfiles=1 +group_reporting=1 +filename_format=test.$jobnum.$filenum + +[test] +rw=read +filesize=1g +directory=/home/princer_google_com/gcs1/1g +numjobs=96 diff --git a/cfg/config.go b/cfg/config.go index ef521de8be..08ea164ff1 100644 --- a/cfg/config.go +++ b/cfg/config.go @@ -25,7 +25,14 @@ import ( ) // AllFlagOptimizationRules is the generated map from a flag's config-path to its specific rules. -var AllFlagOptimizationRules = map[string]shared.OptimizationRules{"file-system.enable-kernel-reader": { +var AllFlagOptimizationRules = map[string]shared.OptimizationRules{"file-system.congestion-threshold": { + BucketTypeOptimization: []shared.BucketTypeOptimization{ + { + BucketType: "zonal", + Value: int64(DefaultCongestionThreshold()), + }, + }, +}, "file-system.enable-kernel-reader": { BucketTypeOptimization: []shared.BucketTypeOptimization{ { BucketType: "zonal", @@ -71,6 +78,20 @@ var AllFlagOptimizationRules = map[string]shared.OptimizationRules{"file-system. Value: int64(-1), }, }, +}, "file-system.max-background": { + BucketTypeOptimization: []shared.BucketTypeOptimization{ + { + BucketType: "zonal", + Value: int64(DefaultMaxBackground()), + }, + }, +}, "file-system.max-read-ahead-kb": { + BucketTypeOptimization: []shared.BucketTypeOptimization{ + { + BucketType: "zonal", + Value: int64(16384), + }, + }, }, "metadata-cache.negative-ttl-secs": { MachineBasedOptimization: []shared.MachineBasedOptimization{ { @@ -224,6 +245,18 @@ func (c *Config) ApplyOptimizations(isSet IsValueSet, input *OptimizationInput) c.MachineType = machineType // Apply optimizations for each flag that has rules defined. + if !isSet.IsSet("congestion-threshold") { + rules := AllFlagOptimizationRules["file-system.congestion-threshold"] + result := getOptimizedValue(&rules, c.FileSystem.CongestionThreshold, profileName, machineType, input, machineTypeToGroupMap) + if result.Optimized { + if val, ok := result.FinalValue.(int64); ok { + if c.FileSystem.CongestionThreshold != val { + c.FileSystem.CongestionThreshold = val + optimizedFlags["file-system.congestion-threshold"] = result + } + } + } + } if !isSet.IsSet("enable-kernel-reader") { rules := AllFlagOptimizationRules["file-system.enable-kernel-reader"] result := getOptimizedValue(&rules, c.FileSystem.EnableKernelReader, profileName, machineType, input, machineTypeToGroupMap) @@ -272,6 +305,30 @@ func (c *Config) ApplyOptimizations(isSet IsValueSet, input *OptimizationInput) } } } + if !isSet.IsSet("max-background") { + rules := AllFlagOptimizationRules["file-system.max-background"] + result := getOptimizedValue(&rules, c.FileSystem.MaxBackground, profileName, machineType, input, machineTypeToGroupMap) + if result.Optimized { + if val, ok := result.FinalValue.(int64); ok { + if c.FileSystem.MaxBackground != val { + c.FileSystem.MaxBackground = val + optimizedFlags["file-system.max-background"] = result + } + } + } + } + if !isSet.IsSet("max-read-ahead-kb") { + rules := AllFlagOptimizationRules["file-system.max-read-ahead-kb"] + result := getOptimizedValue(&rules, c.FileSystem.MaxReadAheadKb, profileName, machineType, input, machineTypeToGroupMap) + if result.Optimized { + if val, ok := result.FinalValue.(int64); ok { + if c.FileSystem.MaxReadAheadKb != val { + c.FileSystem.MaxReadAheadKb = val + optimizedFlags["file-system.max-read-ahead-kb"] = result + } + } + } + } if !isSet.IsSet("metadata-cache-negative-ttl-secs") { rules := AllFlagOptimizationRules["metadata-cache.negative-ttl-secs"] result := getOptimizedValue(&rules, c.MetadataCache.NegativeTtlSecs, profileName, machineType, input, machineTypeToGroupMap) diff --git a/cfg/config_test.go b/cfg/config_test.go index 6d107c9258..f86b955547 100644 --- a/cfg/config_test.go +++ b/cfg/config_test.go @@ -23,6 +23,75 @@ import ( ) func TestApplyOptimizations(t *testing.T) { + // Tests for file-system.congestion-threshold + t.Run("file-system.congestion-threshold", func(t *testing.T) { + testCases := []struct { + name string + config Config + isSet *mockIsValueSet + input *OptimizationInput + expectOptimized bool + expectedValue any + }{ + { + name: "user_set", + config: Config{}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{ + "congestion-threshold": true, + "machine-type": true, + }, + }, + input: &OptimizationInput{BucketType: BucketTypeZonal}, + expectOptimized: false, + expectedValue: int64(98765), + }, + { + name: "no_optimization", + config: Config{Profile: "non_existent_profile"}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{"machine-type": true}, + stringFlags: map[string]string{"machine-type": "low-end-machine"}, + }, + input: nil, + expectOptimized: false, + expectedValue: 0, + }, + { + name: "bucket_type_zonal", + config: Config{Profile: ""}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{}, + }, + input: &OptimizationInput{BucketType: BucketTypeZonal}, + expectOptimized: true, + expectedValue: DefaultCongestionThreshold(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // We need a copy of the config for each test case. + c := tc.config + // Set the default or non-default value on the config object. + if tc.name == "user_set" { + c.FileSystem.CongestionThreshold = tc.expectedValue.(int64) + } else { + c.FileSystem.CongestionThreshold = 0 + } + + optimizedFlags := c.ApplyOptimizations(tc.isSet, tc.input) + + if tc.expectOptimized { + assert.Contains(t, optimizedFlags, "file-system.congestion-threshold") + } else { + assert.NotContains(t, optimizedFlags, "file-system.congestion-threshold") + } + // Use EqualValues to handle the int vs int64 type mismatch for default values. + assert.EqualValues(t, tc.expectedValue, c.FileSystem.CongestionThreshold) + }) + } + }) // Tests for file-system.enable-kernel-reader t.Run("file-system.enable-kernel-reader", func(t *testing.T) { testCases := []struct { @@ -358,6 +427,144 @@ func TestApplyOptimizations(t *testing.T) { }) } }) + // Tests for file-system.max-background + t.Run("file-system.max-background", func(t *testing.T) { + testCases := []struct { + name string + config Config + isSet *mockIsValueSet + input *OptimizationInput + expectOptimized bool + expectedValue any + }{ + { + name: "user_set", + config: Config{}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{ + "max-background": true, + "machine-type": true, + }, + }, + input: &OptimizationInput{BucketType: BucketTypeZonal}, + expectOptimized: false, + expectedValue: int64(98765), + }, + { + name: "no_optimization", + config: Config{Profile: "non_existent_profile"}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{"machine-type": true}, + stringFlags: map[string]string{"machine-type": "low-end-machine"}, + }, + input: nil, + expectOptimized: false, + expectedValue: 0, + }, + { + name: "bucket_type_zonal", + config: Config{Profile: ""}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{}, + }, + input: &OptimizationInput{BucketType: BucketTypeZonal}, + expectOptimized: true, + expectedValue: DefaultMaxBackground(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // We need a copy of the config for each test case. + c := tc.config + // Set the default or non-default value on the config object. + if tc.name == "user_set" { + c.FileSystem.MaxBackground = tc.expectedValue.(int64) + } else { + c.FileSystem.MaxBackground = 0 + } + + optimizedFlags := c.ApplyOptimizations(tc.isSet, tc.input) + + if tc.expectOptimized { + assert.Contains(t, optimizedFlags, "file-system.max-background") + } else { + assert.NotContains(t, optimizedFlags, "file-system.max-background") + } + // Use EqualValues to handle the int vs int64 type mismatch for default values. + assert.EqualValues(t, tc.expectedValue, c.FileSystem.MaxBackground) + }) + } + }) + // Tests for file-system.max-read-ahead-kb + t.Run("file-system.max-read-ahead-kb", func(t *testing.T) { + testCases := []struct { + name string + config Config + isSet *mockIsValueSet + input *OptimizationInput + expectOptimized bool + expectedValue any + }{ + { + name: "user_set", + config: Config{}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{ + "max-read-ahead-kb": true, + "machine-type": true, + }, + }, + input: &OptimizationInput{BucketType: BucketTypeZonal}, + expectOptimized: false, + expectedValue: int64(98765), + }, + { + name: "no_optimization", + config: Config{Profile: "non_existent_profile"}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{"machine-type": true}, + stringFlags: map[string]string{"machine-type": "low-end-machine"}, + }, + input: nil, + expectOptimized: false, + expectedValue: 0, + }, + { + name: "bucket_type_zonal", + config: Config{Profile: ""}, + isSet: &mockIsValueSet{ + setFlags: map[string]bool{}, + }, + input: &OptimizationInput{BucketType: BucketTypeZonal}, + expectOptimized: true, + expectedValue: 16384, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // We need a copy of the config for each test case. + c := tc.config + // Set the default or non-default value on the config object. + if tc.name == "user_set" { + c.FileSystem.MaxReadAheadKb = tc.expectedValue.(int64) + } else { + c.FileSystem.MaxReadAheadKb = 0 + } + + optimizedFlags := c.ApplyOptimizations(tc.isSet, tc.input) + + if tc.expectOptimized { + assert.Contains(t, optimizedFlags, "file-system.max-read-ahead-kb") + } else { + assert.NotContains(t, optimizedFlags, "file-system.max-read-ahead-kb") + } + // Use EqualValues to handle the int vs int64 type mismatch for default values. + assert.EqualValues(t, tc.expectedValue, c.FileSystem.MaxReadAheadKb) + }) + } + }) // Tests for metadata-cache.negative-ttl-secs t.Run("metadata-cache.negative-ttl-secs", func(t *testing.T) { testCases := []struct { diff --git a/cfg/config_util.go b/cfg/config_util.go index be4b9eda9d..faa67cca38 100644 --- a/cfg/config_util.go +++ b/cfg/config_util.go @@ -25,6 +25,15 @@ func DefaultMaxParallelDownloads() int { return max(16, 2*runtime.NumCPU()) } +func DefaultMaxBackground() int { + return max(12, 4*runtime.NumCPU()) +} + +func DefaultCongestionThreshold() int { + // 75 % of DefaultMaxBackground + return (4 * DefaultMaxBackground()) / 4 +} + func IsFileCacheEnabled(mountConfig *Config) bool { return mountConfig.FileCache.MaxSizeMb != 0 && string(mountConfig.CacheDir) != "" } diff --git a/cfg/config_util_test.go b/cfg/config_util_test.go index 3ac07dbb4d..f601c1b846 100644 --- a/cfg/config_util_test.go +++ b/cfg/config_util_test.go @@ -25,6 +25,14 @@ func Test_DefaultMaxParallelDownloads(t *testing.T) { assert.GreaterOrEqual(t, DefaultMaxParallelDownloads(), 16) } +func Test_DefaultMaxBackground(t *testing.T) { + assert.GreaterOrEqual(t, DefaultMaxBackground(), 12) +} + +func Test_DefaultCongestionThreshold(t *testing.T) { + assert.GreaterOrEqual(t, DefaultMaxBackground(), 9) +} + func TestIsFileCacheEnabled(t *testing.T) { testCases := []struct { name string diff --git a/cfg/params.yaml b/cfg/params.yaml index 04f22fc41b..830df4057d 100644 --- a/cfg/params.yaml +++ b/cfg/params.yaml @@ -340,6 +340,10 @@ params: new requests. 0 means system default (typically 75% of max-background; 9). default: "0" hide-flag: true + optimizations: + bucket-type-optimization: + - bucket-type: "zonal" + value: "DefaultCongestionThreshold()" - config-path: "file-system.dir-mode" flag-name: "dir-mode" @@ -453,6 +457,10 @@ params: (typically 12). default: "0" hide-flag: true + optimizations: + bucket-type-optimization: + - bucket-type: "zonal" + value: "DefaultMaxBackground()" - config-path: "file-system.max-read-ahead-kb" flag-name: "max-read-ahead-kb" @@ -463,6 +471,10 @@ params: and system default will be used. default: "0" hide-flag: true + optimizations: + bucket-type-optimization: + - bucket-type: "zonal" + value: 16384 # 16 MiB - config-path: "file-system.precondition-errors" flag-name: "precondition-errors" diff --git a/cmd/mount.go b/cmd/mount.go index 0442c7f424..e6c306e051 100644 --- a/cmd/mount.go +++ b/cmd/mount.go @@ -180,6 +180,7 @@ func getFuseMountConfig(fsName string, newConfig *cfg.Config) *fuse.MountConfig // Enables ReadDirPlus, allowing the kernel to retrieve directory entries and their // attributes in a single operation. EnableReaddirplus: newConfig.FileSystem.ExperimentalEnableReaddirplus, + EnableAsyncReads: newConfig.FileSystem.EnableKernelReader, } // GCSFuse to Jacobsa Fuse Log Level mapping: diff --git a/go.mod b/go.mod index b451950915..48273dc62a 100644 --- a/go.mod +++ b/go.mod @@ -108,3 +108,5 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect ) + +// replace github.com/jacobsa/fuse => /home/princer_google_com/dev/fuse diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 2fadd56701..e706983a70 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -2916,7 +2916,17 @@ func (fs *fileSystem) ReadFile( } // Serve the read. - if fs.newConfig.EnableNewReader { + if fs.newConfig.FileSystem.EnableKernelReader && fh.Inode().Bucket().BucketType().Zonal { + var resp gcsx.ReadResponse + req := &gcsx.ReadRequest{ + Buffer: op.Dst, + Offset: op.Offset, + } + resp, err = fh.ReadWithMrdSimpleReader(ctx, req) + op.BytesRead = resp.Size + op.Data = resp.Data + op.Callback = resp.Callback + } else if fs.newConfig.EnableNewReader { var resp gcsx.ReadResponse req := &gcsx.ReadRequest{ Buffer: op.Dst, diff --git a/internal/fs/handle/file.go b/internal/fs/handle/file.go index c1c7d2a474..8db91177b4 100644 --- a/internal/fs/handle/file.go +++ b/internal/fs/handle/file.go @@ -56,6 +56,9 @@ type FileHandle struct { // GUARDED_BY(mu) readManager gcsx.ReadManager + // A mrdSimpleReader is a new reader based on MRD and reads whatever is + // requested using MrdInstance. + mrdSimpleReader *gcsx.MrdSimpleReader // fileCacheHandler is used to get file cache handle and read happens using that. // This will be nil if the file cache is disabled. fileCacheHandler *file.CacheHandler @@ -95,6 +98,10 @@ func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, handleID: handleID, } + if c.FileSystem.EnableKernelReader && inode != nil && inode.Bucket().BucketType().Zonal { + fh.mrdSimpleReader = gcsx.NewMrdSimpleReader(inode.GetMRDInstance()) + } + fh.inode.RegisterFileHandle(fh.openMode.AccessMode() == util.ReadOnly) fh.mu = syncutil.NewInvariantMutex(fh.checkInvariants) @@ -117,6 +124,9 @@ func (fh *FileHandle) Destroy() { if fh.readManager != nil { fh.readManager.Destroy() } + if fh.mrdSimpleReader != nil { + fh.mrdSimpleReader.Destroy() + } } // Inode returns the inode backing this handle. @@ -246,6 +256,37 @@ func (fh *FileHandle) ReadWithReadManager(ctx context.Context, req *gcsx.ReadReq return readResponse, nil } +// ReadWithMrdSimpleReader reads data at the given offset using the mrd simple reader. +// +// LOCKS_REQUIRED(fh.inode.mu) +// UNLOCK_FUNCTION(fh.inode.mu) +func (fh *FileHandle) ReadWithMrdSimpleReader(ctx context.Context, req *gcsx.ReadRequest) (gcsx.ReadResponse, error) { + // If content cache enabled, CacheEnsureContent forces the file handler to fall through to the inode + // and fh.inode.SourceGenerationIsAuthoritative() will return false. + if err := fh.inode.CacheEnsureContent(ctx); err != nil { + fh.inode.Unlock() + return gcsx.ReadResponse{}, fmt.Errorf("failed to ensure inode content: %w", err) + } + + if !fh.inode.SourceGenerationIsAuthoritative() { + // Read from inode if source generation is not authoritative. + defer fh.inode.Unlock() + n, err := fh.inode.Read(ctx, req.Buffer, req.Offset) + return gcsx.ReadResponse{Size: n}, err + } + + fh.lockHandleAndRelockInode(true) + + if fh.mrdSimpleReader == nil { + fh.unlockHandleAndInode(true) + return gcsx.ReadResponse{}, errors.New("mrdSimpleReader is not initialized") + } + + fh.inode.Unlock() + defer fh.mu.RUnlock() + return fh.mrdSimpleReader.ReadAt(ctx, req) +} + // Equivalent to locking fh.Inode() and calling fh.Inode().Read, but may be // more efficient. // diff --git a/internal/fs/handle/file_test.go b/internal/fs/handle/file_test.go index f2f38e809f..6f6027921f 100644 --- a/internal/fs/handle/file_test.go +++ b/internal/fs/handle/file_test.go @@ -33,6 +33,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/fs/inode" "github.com/googlecloudplatform/gcsfuse/v3/internal/gcsx" "github.com/googlecloudplatform/gcsfuse/v3/internal/gcsx/read_manager" + "github.com/googlecloudplatform/gcsfuse/v3/internal/storage" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/fake" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v3/internal/util" @@ -293,7 +294,7 @@ func (t *fileTest) Test_IsValidReader_GenerationValidation() { func (t *fileTest) Test_Read_Success() { expectedData := []byte("hello from reader") parent := createDirInode(&t.bucket, &t.clock) - in := createFileInode(t.T(), &t.bucket, &t.clock, nil, parent, "test_obj_reader", expectedData, false) + in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, "test_obj_reader", expectedData, false) fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) buf := make([]byte, len(expectedData)) fh.inode.Lock() @@ -309,7 +310,7 @@ func (t *fileTest) Test_Read_Success() { func (t *fileTest) Test_ReadWithReadManager_Success() { expectedData := []byte("hello from readManager") parent := createDirInode(&t.bucket, &t.clock) - in := createFileInode(t.T(), &t.bucket, &t.clock, nil, parent, "test_obj_readManager", expectedData, false) + in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, "test_obj_readManager", expectedData, false) fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) buf := make([]byte, len(expectedData)) fh.inode.Lock() @@ -459,7 +460,7 @@ func (t *fileTest) Test_ReadWithReadManager_ErrorScenarios() { t.Run(tc.name, func() { t.SetupTest() parent := createDirInode(&t.bucket, &t.clock) - testInode := createFileInode(t.T(), &t.bucket, &t.clock, nil, parent, object.Name, []byte("data"), false) + testInode := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, object.Name, []byte("data"), false) fh := NewFileHandle(testInode, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) fh.inode.Lock() mockRM := new(read_manager.MockReadManager) @@ -499,7 +500,7 @@ func (t *fileTest) Test_Read_ErrorScenarios() { t.Run(tc.name, func() { t.SetupTest() parent := createDirInode(&t.bucket, &t.clock) - testInode := createFileInode(t.T(), &t.bucket, &t.clock, nil, parent, object.Name, []byte("data"), false) + testInode := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, object.Name, []byte("data"), false) fh := NewFileHandle(testInode, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) fh.inode.Lock() mockReader := new(gcsx.MockRandomReader) @@ -524,7 +525,7 @@ func (t *fileTest) Test_ReadWithReadManager_FallbackToInode() { objectData := []byte("fallback data") object := gcs.MinObject{Name: "test_obj"} parent := createDirInode(&t.bucket, &t.clock) - in := createFileInode(t.T(), &t.bucket, &t.clock, nil, parent, object.Name, objectData, true) + in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, object.Name, objectData, true) fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) fh.inode.Lock() mockRM := new(read_manager.MockReadManager) @@ -548,7 +549,7 @@ func (t *fileTest) Test_Read_FallbackToInode() { objectData := []byte("fallback data") object := gcs.MinObject{Name: "test_obj"} parent := createDirInode(&t.bucket, &t.clock) - in := createFileInode(t.T(), &t.bucket, &t.clock, nil, parent, object.Name, objectData, true) + in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, object.Name, objectData, true) fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) fh.inode.Lock() mockR := new(gcsx.MockRandomReader) @@ -648,6 +649,137 @@ func (t *fileTest) Test_Read_ReaderInvalidatedByGenerationChange() { assert.Equal(t.T(), content2, output) } +func (t *fileTest) Test_ReadWithMrdSimpleReader_Success() { + // 1. Setup + expectedData := []byte("hello from mrd reader") + objectName := "test_obj_mrd_reader" + // Create a mock bucket that behaves like a zonal bucket. + mockBucket := new(storage.TestifyMockBucket) + mockBucket.On("BucketType").Return(gcs.BucketType{Zonal: true}) + // Mock CreateObject which is called by createFileInode. + mockBucket.On("CreateObject", mock.Anything, mock.Anything).Return(&gcs.Object{}, nil).Once() + // Create File Inode. + mockSyncerBucket := gcsx.NewSyncerBucket(1, 10, ".gcsfuse_tmp/", mockBucket) + parent := createDirInode(&mockSyncerBucket, &t.clock) + in := createFileInode(t.T(), &mockSyncerBucket, &t.clock, &cfg.Config{}, parent, objectName, expectedData, false) + // Create File Handle. + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) + require.NotNil(t.T(), fh.mrdSimpleReader) + // Mock the downloader that mrdSimpleReader will use. + fakeMRD := fake.NewFakeMultiRangeDownloader(in.Source(), expectedData) + mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + // Create read request and take inode lock. + buf := make([]byte, len(expectedData)) + req := &gcsx.ReadRequest{ + Buffer: buf, + Offset: 0, + } + fh.inode.Lock() // Required by the function signature. + + // 2. Call ReadWithMrdSimpleReader. + resp, err := fh.ReadWithMrdSimpleReader(t.ctx, req) + + // 3. Assertions + assert.NoError(t.T(), err) + assert.Equal(t.T(), len(expectedData), resp.Size) + assert.Equal(t.T(), expectedData, buf[:resp.Size]) + mockBucket.AssertExpectations(t.T()) +} + +func (t *fileTest) Test_ReadWithMrdSimpleReader_NotAuthoritative() { + // 1. Setup + zonalBucket := gcsx.NewSyncerBucket(1, 10, ".gcsfuse_tmp/", fake.NewFakeBucket(&t.clock, "zonal_bucket", gcs.BucketType{Zonal: true})) + originalData := []byte("some data") // 9 bytes + parent := createDirInode(&zonalBucket, &t.clock) + in := createFileInode(t.T(), &zonalBucket, &t.clock, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, parent, "test_obj", originalData, false) + // Make inode dirty. + in.Lock() + _, err := in.Write(t.ctx, []byte("dirty"), 0, writeMode) // 5 bytes + in.Unlock() + require.NoError(t.T(), err) + // After write, content should be "dirtydata". + expectedReadData := "dirtydata" + // Create file handle. + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) + require.NotNil(t.T(), fh.mrdSimpleReader) + // Create read request and take inode lock. + buf := make([]byte, len(expectedReadData)) + req := &gcsx.ReadRequest{ + Buffer: buf, + Offset: 0, + } + fh.inode.Lock() + + // 2. Call ReadWithMrdSimpleReader + resp, err := fh.ReadWithMrdSimpleReader(t.ctx, req) + + // 3. Assertions + // It should read from inode, which contains "dirty data". + assert.NoError(t.T(), err) + assert.Equal(t.T(), len(expectedReadData), resp.Size) + assert.Equal(t.T(), expectedReadData, string(buf[:resp.Size])) +} + +func (t *fileTest) Test_ReadWithMrdSimpleReader_NilReader() { + // 1. Setup with a non-zonal bucket. + nonZonalBucket := gcsx.NewSyncerBucket(1, 10, ".gcsfuse_tmp/", fake.NewFakeBucket(&t.clock, "non_zonal_bucket", gcs.BucketType{Zonal: false})) + parent := createDirInode(&nonZonalBucket, &t.clock) + in := createFileInode(t.T(), &nonZonalBucket, &t.clock, &cfg.Config{}, parent, "test_obj", []byte("data"), false) + // Create file handle. + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) + require.Nil(t.T(), fh.mrdSimpleReader) + // Create read request and take inode lock. + req := &gcsx.ReadRequest{ + Buffer: make([]byte, 4), + Offset: 0, + } + fh.inode.Lock() + + // 2. Call ReadWithMrdSimpleReader. + _, err := fh.ReadWithMrdSimpleReader(t.ctx, req) + + // 3. Assertions + assert.Error(t.T(), err) + assert.Equal(t.T(), "mrdSimpleReader is not initialized", err.Error()) +} + +func (t *fileTest) Test_ReadWithMrdSimpleReader_ReadAtError() { + // 1. Setup + expectedData := []byte("hello from mrd reader") + objectName := "test_obj_mrd_reader_error" + // Mock required functions from mock bucket. + mockBucket := new(storage.TestifyMockBucket) + mockBucket.On("BucketType").Return(gcs.BucketType{Zonal: true}) + mockBucket.On("CreateObject", mock.Anything, mock.Anything).Return(&gcs.Object{}, nil).Once() + // Create file inode. + mockSyncerBucket := gcsx.NewSyncerBucket(1, 10, ".gcsfuse_tmp/", mockBucket) + parent := createDirInode(&mockSyncerBucket, &t.clock) + in := createFileInode(t.T(), &mockSyncerBucket, &t.clock, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, parent, objectName, expectedData, false) + // Create file handle. + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) + require.NotNil(t.T(), fh.mrdSimpleReader) + // Mock the downloader to return an error. + expectedErr := errors.New("mrd read error") + fakeMRD := fake.NewFakeMultiRangeDownloaderWithSleepAndDefaultError(in.Source(), expectedData, 0, expectedErr) + mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + // Create read request & take inode lock. + buf := make([]byte, len(expectedData)) + req := &gcsx.ReadRequest{ + Buffer: buf, + Offset: 0, + } + fh.inode.Lock() + + // 2. Call ReadWithMrdSimpleReader. + resp, err := fh.ReadWithMrdSimpleReader(t.ctx, req) + + // 3. Assertions + assert.Error(t.T(), err) + assert.Contains(t.T(), err.Error(), expectedErr.Error()) + assert.Zero(t.T(), resp.Size) + mockBucket.AssertExpectations(t.T()) +} + func (t *fileTest) TestOpenMode() { testCases := []struct { name string diff --git a/internal/fs/inode/file.go b/internal/fs/inode/file.go index 1b5bc477ce..2078062707 100644 --- a/internal/fs/inode/file.go +++ b/internal/fs/inode/file.go @@ -124,6 +124,9 @@ type FileInode struct { // Limits the max number of blocks that can be created across file system when // streaming writes are enabled. globalMaxWriteBlocksSem *semaphore.Weighted + + // mrdInstance manages the MultiRangeDownloader instances for this inode. + mrdInstance *gcsx.MrdInstance } var _ Inode = &FileInode{} @@ -167,6 +170,7 @@ func NewFileInode( unlinked: false, config: cfg, globalMaxWriteBlocksSem: globalMaxBlocksSem, + mrdInstance: gcsx.NewMrdInstance(&minObj, bucket, mrdCache, id, cfg.Mrd), } var err error f.MRDWrapper, err = gcsx.NewMultiRangeDownloaderWrapper(bucket, &minObj, cfg, mrdCache) @@ -414,6 +418,11 @@ func (f *FileInode) Source() *gcs.MinObject { return &o } +// Returns MrdInstace for this inode. +func (f *FileInode) GetMRDInstance() *gcsx.MrdInstance { + return f.mrdInstance +} + // If true, it is safe to serve reads directly from the object given by // f.Source(), rather than calling f.ReadAt. Doing so may be more efficient, // because f.ReadAt may cause the entire object to be faulted in and requires diff --git a/internal/gcsx/mrd_instance.go b/internal/gcsx/mrd_instance.go new file mode 100644 index 0000000000..4886eb6a88 --- /dev/null +++ b/internal/gcsx/mrd_instance.go @@ -0,0 +1,206 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcsx + +import ( + "fmt" + "strconv" + "sync" + + "github.com/googlecloudplatform/gcsfuse/v3/cfg" + "github.com/googlecloudplatform/gcsfuse/v3/internal/cache/lru" + "github.com/googlecloudplatform/gcsfuse/v3/internal/logger" + "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" + "github.com/jacobsa/fuse/fuseops" +) + +// MrdInstance manages a pool of Multi-Range Downloader (MRD) instances for a +// single file inode. It handles the lifecycle of the MRD pool, including +// creation, destruction, and caching. +type MrdInstance struct { + // mrdPool holds the pool of MultiRangeDownloader instances. + mrdPool *MRDPool + // inodeId is the ID of the file inode associated with this instance. + inodeId fuseops.InodeID + // object is the GCS object for which the downloaders are created. + object *gcs.MinObject + // bucket is the GCS bucket containing the object. + bucket gcs.Bucket + // refCount tracks the number of active users of this instance. + refCount int64 + // refCountMu protects access to refCount. + refCountMu sync.Mutex + // poolMu protects access to mrdPool. + poolMu sync.RWMutex + // mrdCache is a shared cache for inactive MrdInstance objects. + mrdCache *lru.Cache + // mrdConfig holds configuration for the MRD pool. + mrdConfig cfg.MrdConfig +} + +// NewMrdInstance creates a new MrdInstance for a given GCS object. +func NewMrdInstance(obj *gcs.MinObject, bucket gcs.Bucket, cache *lru.Cache, inodeId fuseops.InodeID, cfg cfg.MrdConfig) *MrdInstance { + return &MrdInstance{ + object: obj, + bucket: bucket, + mrdCache: cache, + inodeId: inodeId, + mrdConfig: cfg, + } +} + +// GetMRDEntry returns the next available MRDEntry from the pool using a +// round-robin strategy. It is thread-safe. +func (mi *MrdInstance) GetMRDEntry() *MRDEntry { + mi.poolMu.RLock() + defer mi.poolMu.RUnlock() + if mi.mrdPool != nil { + return mi.mrdPool.Next() + } + return nil +} + +// EnsureMrdInstance ensures that the MRD pool is initialized. If the pool +// already exists, this function is a no-op. +func (mi *MrdInstance) EnsureMrdInstance() (err error) { + mi.poolMu.Lock() + defer mi.poolMu.Unlock() + + // Return early if pool exists. + if mi.mrdPool != nil { + return + } + + // Creating a new pool. Not reusing any handle while creating a new pool. + mi.mrdPool, err = NewMRDPool(&MRDPoolConfig{PoolSize: int(mi.mrdConfig.PoolSize), object: mi.object, bucket: mi.bucket}, nil) + if err != nil { + err = fmt.Errorf("MrdInstance::EnsureMrdInstance Error in creating MRDPool: %w", err) + } + return +} + +// RecreateMRDEntry recreates a specific, potentially failed, entry in the MRD pool. +func (mi *MrdInstance) RecreateMRDEntry(entry *MRDEntry) (err error) { + mi.poolMu.RLock() + defer mi.poolMu.RUnlock() + if mi.mrdPool != nil { + err = mi.mrdPool.RecreateMRD(entry, nil) + if err != nil { + err = fmt.Errorf("MrdInstance::RecreateMRDEntry Error in recreating MRD: %w", err) + } + return + } + return fmt.Errorf("MrdInstance::RecreateMRDEntry MRDPool is nil") +} + +// RecreateMRD recreates the entire MRD pool. This is typically called by the +// file inode when the backing GCS object's generation changes, invalidating +// all existing downloader instances. +func (mi *MrdInstance) RecreateMRD(object *gcs.MinObject) error { + mi.Destroy() + err := mi.EnsureMrdInstance() + if err != nil { + return fmt.Errorf("MrdInstance::RecreateMRD Error in recreating MRD: %w", err) + } + return nil +} + +// Destroy closes all MRD instances in the pool and releases associated resources. +func (mi *MrdInstance) Destroy() { + mi.poolMu.Lock() + defer mi.poolMu.Unlock() + if mi.mrdPool != nil { + // Delete the instance. + mi.mrdPool.Close() + mi.mrdPool = nil + } +} + +// IncrementRefCount increases the reference count for the MrdInstance. When the +// instance is actively used (refCount > 0), it is removed from the inactive +// MRD cache to prevent eviction. +func (mi *MrdInstance) IncrementRefCount() { + mi.refCountMu.Lock() + defer mi.refCountMu.Unlock() + mi.refCount++ + + if mi.refCount == 1 && mi.mrdCache != nil { + // Remove from cache + deletedEntry := mi.mrdCache.Erase(strconv.FormatUint(uint64(mi.inodeId), 10)) + if deletedEntry != nil { + logger.Tracef("MrdInstance::IncrementRefCount: MrdInstance (%s) erased from cache", mi.object.Name) + } + } +} + +// DecrementRefCount decreases the reference count. When the count drops to zero, the +// instance is considered inactive and is added to the LRU cache for potential +// reuse. If the cache is full, this may trigger the eviction and closure of the +// least recently used MRD instances. +func (mi *MrdInstance) DecrementRefCount() { + mi.refCountMu.Lock() + mi.refCount-- + if mi.refCount < 0 { + logger.Errorf("MrdInstance::DecrementRefCount: Invalid refcount") + mi.refCountMu.Unlock() + return + } + if mi.refCount > 0 || mi.mrdCache == nil { + mi.refCountMu.Unlock() + return + } + + // Add to cache. + // Lock order: refCountMu -> cache.mu -> poolMu (via Size() inside Insert). + // This is a safe order. + evictedValues, err := mi.mrdCache.Insert(strconv.FormatUint(uint64(mi.inodeId), 10), mi) + if err != nil { + logger.Errorf("MrdInstance::DecrementRefCount: Failed to insert MrdInstance for object (%s) into cache, destroying immediately: %v", mi.object.Name, err) + // The instance could not be inserted into the cache. Since the refCount is 0, + // we must destroy it now to prevent it from being leaked. + mi.Destroy() + mi.refCountMu.Unlock() + return + } + logger.Tracef("MrdInstance::DecrementRefCount: MrdInstance for object (%s) added to cache", mi.object.Name) + mi.refCountMu.Unlock() + + for _, instance := range evictedValues { + mrdInstance, ok := instance.(*MrdInstance) + if !ok { + logger.Errorf("MrdInstance::DecrementRefCount: Invalid value type, expected *MrdInstance, got %T", mrdInstance) + } else { + // Check if the instance was resurrected. + mrdInstance.refCountMu.Lock() + if mrdInstance.refCount > 0 { + mrdInstance.refCountMu.Unlock() + continue + } + // Safe to destroy. Hold refCountMu to prevent concurrent resurrection. + mrdInstance.Destroy() + mrdInstance.refCountMu.Unlock() + } + } +} + +// Size returns the number of active MRDs. +func (mi *MrdInstance) Size() uint64 { + mi.poolMu.RLock() + defer mi.poolMu.RUnlock() + if mi.mrdPool != nil { + return mi.mrdPool.Size() + } + return 1 +} diff --git a/internal/gcsx/mrd_instance_test.go b/internal/gcsx/mrd_instance_test.go new file mode 100644 index 0000000000..661fe9e7a6 --- /dev/null +++ b/internal/gcsx/mrd_instance_test.go @@ -0,0 +1,217 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcsx + +import ( + "strconv" + "testing" + + "github.com/googlecloudplatform/gcsfuse/v3/cfg" + "github.com/googlecloudplatform/gcsfuse/v3/internal/cache/lru" + "github.com/googlecloudplatform/gcsfuse/v3/internal/storage" + "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/fake" + "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" + "github.com/jacobsa/fuse/fuseops" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type MrdInstanceTest struct { + suite.Suite + object *gcs.MinObject + bucket *storage.TestifyMockBucket + cache *lru.Cache + inodeID fuseops.InodeID + mrdConfig cfg.MrdConfig + mrdInstance *MrdInstance +} + +func TestMrdInstanceTestSuite(t *testing.T) { + suite.Run(t, new(MrdInstanceTest)) +} + +func (t *MrdInstanceTest) SetupTest() { + t.object = &gcs.MinObject{ + Name: "foo", + Size: 1024 * MiB, + Generation: 1234, + } + t.bucket = new(storage.TestifyMockBucket) + t.cache = lru.NewCache(2) // Small cache size for testing eviction + t.inodeID = 100 + t.mrdConfig = cfg.MrdConfig{PoolSize: 1} + + t.mrdInstance = NewMrdInstance(t.object, t.bucket, t.cache, t.inodeID, t.mrdConfig) +} + +func (t *MrdInstanceTest) TestNewMrdInstance() { + assert.Equal(t.T(), t.object, t.mrdInstance.object) + assert.Equal(t.T(), t.bucket, t.mrdInstance.bucket) + assert.Equal(t.T(), t.cache, t.mrdInstance.mrdCache) + assert.Equal(t.T(), t.inodeID, t.mrdInstance.inodeId) + assert.Equal(t.T(), t.mrdConfig, t.mrdInstance.mrdConfig) + assert.Nil(t.T(), t.mrdInstance.mrdPool) + assert.Equal(t.T(), int64(0), t.mrdInstance.refCount) +} + +func (t *MrdInstanceTest) TestEnsureMrdInstance() { + fakeMRD := fake.NewFakeMultiRangeDownloader(t.object, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + assert.Nil(t.T(), t.mrdInstance.mrdPool) + + err := t.mrdInstance.EnsureMrdInstance() + + assert.NoError(t.T(), err) + assert.NotNil(t.T(), t.mrdInstance.mrdPool) + t.bucket.AssertExpectations(t.T()) +} + +func (t *MrdInstanceTest) TestEnsureMrdInstance_AlreadyExists() { + fakeMRD := fake.NewFakeMultiRangeDownloader(t.object, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + err := t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + pool := t.mrdInstance.mrdPool + + // Call again + err = t.mrdInstance.EnsureMrdInstance() + + assert.NoError(t.T(), err) + assert.Equal(t.T(), pool, t.mrdInstance.mrdPool) + t.bucket.AssertExpectations(t.T()) // Should only be called once +} + +func (t *MrdInstanceTest) TestGetMRDEntry() { + fakeMRD := fake.NewFakeMultiRangeDownloader(t.object, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + err := t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + + entry := t.mrdInstance.GetMRDEntry() + + assert.NotNil(t.T(), entry) + assert.Equal(t.T(), fakeMRD, entry.mrd) +} + +func (t *MrdInstanceTest) TestGetMRDEntry_NilPool() { + entry := t.mrdInstance.GetMRDEntry() + + assert.Nil(t.T(), entry) +} + +func (t *MrdInstanceTest) TestRecreateMRDEntry() { + fakeMRD1 := fake.NewFakeMultiRangeDownloader(t.object, nil) + fakeMRD2 := fake.NewFakeMultiRangeDownloader(t.object, nil) + // Initial creation + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once() + err := t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + entry := t.mrdInstance.GetMRDEntry() + assert.Equal(t.T(), fakeMRD1, entry.mrd) + + // Recreate + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once() + err = t.mrdInstance.RecreateMRDEntry(entry) + + assert.NoError(t.T(), err) + assert.Equal(t.T(), fakeMRD2, entry.mrd) +} + +func (t *MrdInstanceTest) TestRecreateMRD() { + fakeMRD1 := fake.NewFakeMultiRangeDownloader(t.object, nil) + fakeMRD2 := fake.NewFakeMultiRangeDownloader(t.object, nil) + // Initial creation + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once() + err := t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + pool1 := t.mrdInstance.mrdPool + + // Recreate + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once() + err = t.mrdInstance.RecreateMRD(t.object) + + assert.NoError(t.T(), err) + assert.NotNil(t.T(), t.mrdInstance.mrdPool) + assert.NotEqual(t.T(), pool1, t.mrdInstance.mrdPool) +} + +func (t *MrdInstanceTest) TestDestroy() { + fakeMRD := fake.NewFakeMultiRangeDownloader(t.object, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + err := t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + assert.NotNil(t.T(), t.mrdInstance.mrdPool) + + t.mrdInstance.Destroy() + + assert.Nil(t.T(), t.mrdInstance.mrdPool) +} + +func (t *MrdInstanceTest) TestIncrementRefCount() { + // Setup: Put something in cache first to verify removal + fakeMRD := fake.NewFakeMultiRangeDownloader(t.object, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + err := t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + // Manually insert into cache to simulate it being inactive + key := strconv.FormatUint(uint64(t.inodeID), 10) + _, err = t.cache.Insert(key, t.mrdInstance) + assert.NoError(t.T(), err) + assert.NotNil(t.T(), t.cache.LookUpWithoutChangingOrder(key)) + + t.mrdInstance.IncrementRefCount() + + assert.Equal(t.T(), int64(1), t.mrdInstance.refCount) + assert.Nil(t.T(), t.cache.LookUpWithoutChangingOrder(key)) +} + +func (t *MrdInstanceTest) TestDecrementRefCount() { + fakeMRD := fake.NewFakeMultiRangeDownloader(t.object, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + err := t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + t.mrdInstance.refCount = 1 + + t.mrdInstance.DecrementRefCount() + + assert.Equal(t.T(), int64(0), t.mrdInstance.refCount) + key := strconv.FormatUint(uint64(t.inodeID), 10) + assert.NotNil(t.T(), t.cache.LookUpWithoutChangingOrder(key)) +} + +func (t *MrdInstanceTest) TestDecrementRefCount_Eviction() { + // Fill cache with other items + localMrdInstance := &MrdInstance{mrdPool: &MRDPool{}} + localMrdInstance.mrdPool.currentSize.Store(1) + _, err := t.cache.Insert("other1", localMrdInstance) + assert.NoError(t.T(), err) + _, err = t.cache.Insert("other2", localMrdInstance) + assert.NoError(t.T(), err) + fakeMRD := fake.NewFakeMultiRangeDownloader(t.object, nil) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + err = t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + t.mrdInstance.refCount = 1 + + // This should trigger eviction of "other1" (LRU) + t.mrdInstance.DecrementRefCount() + + assert.Equal(t.T(), int64(0), t.mrdInstance.refCount) + key := strconv.FormatUint(uint64(t.inodeID), 10) + assert.NotNil(t.T(), t.cache.LookUpWithoutChangingOrder(key)) + assert.Nil(t.T(), t.cache.LookUpWithoutChangingOrder("other1")) + assert.NotNil(t.T(), t.cache.LookUpWithoutChangingOrder("other2")) +} diff --git a/internal/gcsx/mrd_pool.go b/internal/gcsx/mrd_pool.go index 60d87c7184..9f20c58639 100644 --- a/internal/gcsx/mrd_pool.go +++ b/internal/gcsx/mrd_pool.go @@ -24,7 +24,9 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" ) -const smallFileThresholdMiB = 500 +const smallFileThresholdMiB = 200 +const midiumFileThresholdMiB = 1500 +const largeFileThresholdMiB = 2500 // MRDEntry holds a single MultiRangeDownloader instance and a mutex to protect access to it. type MRDEntry struct { @@ -60,6 +62,12 @@ type MRDPool struct { func (mrdPoolConfig *MRDPoolConfig) determinePoolSize() { if mrdPoolConfig.object.Size < smallFileThresholdMiB*MiB { mrdPoolConfig.PoolSize = 1 + } else if mrdPoolConfig.object.Size < midiumFileThresholdMiB*MiB { + mrdPoolConfig.PoolSize = 2 + } else if mrdPoolConfig.object.Size < largeFileThresholdMiB*MiB { + mrdPoolConfig.PoolSize = 3 + } else { + mrdPoolConfig.PoolSize = 4 } } @@ -148,13 +156,15 @@ func (p *MRDPool) RecreateMRD(entry *MRDEntry, fallbackHandle []byte) error { if &p.entries[i] == entry { continue } - p.entries[i].mu.RLock() - if p.entries[i].mrd != nil { - handle = p.entries[i].mrd.GetHandle() + // Use TryRLock to avoid deadlock if multiple entries are being recreated simultaneously. + if p.entries[i].mu.TryRLock() { + if p.entries[i].mrd != nil { + handle = p.entries[i].mrd.GetHandle() + p.entries[i].mu.RUnlock() + break + } p.entries[i].mu.RUnlock() - break } - p.entries[i].mu.RUnlock() } } @@ -197,3 +207,7 @@ func (p *MRDPool) Close() (handle []byte) { } return } + +func (p *MRDPool) Size() uint64 { + return p.currentSize.Load() +} diff --git a/internal/gcsx/mrd_pool_test.go b/internal/gcsx/mrd_pool_test.go index 87ca08050c..fe6a37a656 100644 --- a/internal/gcsx/mrd_pool_test.go +++ b/internal/gcsx/mrd_pool_test.go @@ -136,25 +136,26 @@ func (t *mrdPoolTest) TestDeterminePoolSize() { testCases := []struct { name string objectSize uint64 - initialPoolSize int expectedPoolSize int }{ { name: "SmallFile", objectSize: 100 * MiB, - initialPoolSize: 4, expectedPoolSize: 1, }, { - name: "LargeFile", + name: "MediumFile", objectSize: 1000 * MiB, - initialPoolSize: 4, - expectedPoolSize: 4, + expectedPoolSize: 2, + }, + { + name: "Large File", + objectSize: 2000 * MiB, + expectedPoolSize: 3, }, { - name: "ThresholdFile", - objectSize: smallFileThresholdMiB * MiB, - initialPoolSize: 4, + name: "Very large ThresholdFile", + objectSize: 3000 * MiB, expectedPoolSize: 4, }, } @@ -162,7 +163,6 @@ func (t *mrdPoolTest) TestDeterminePoolSize() { for _, tc := range testCases { t.Run(tc.name, func() { t.object.Size = tc.objectSize - t.poolConfig.PoolSize = tc.initialPoolSize t.poolConfig.determinePoolSize() diff --git a/internal/gcsx/mrd_simple_reader.go b/internal/gcsx/mrd_simple_reader.go new file mode 100644 index 0000000000..328a12f53e --- /dev/null +++ b/internal/gcsx/mrd_simple_reader.go @@ -0,0 +1,185 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcsx + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + "sync/atomic" +) + +// MrdSimpleReader is a reader that uses an MRD Instance to read data from a GCS object. +// This reader is simpler than the full `randomReader` as it doesn't have complex logic +// +// to switch between sequential and random read strategies. +type MrdSimpleReader struct { + // mu protects the internal state of the reader, specifically access to mrdInstance. + mu sync.RWMutex + mrdInstanceInUse atomic.Bool + mrdInstance *MrdInstance +} + +// NewMrdSimpleReader creates a new MrdSimpleReader that uses the provided +// MrdInstance to manage MRD connections. +func NewMrdSimpleReader(mrdInstance *MrdInstance) *MrdSimpleReader { + return &MrdSimpleReader{ + mrdInstance: mrdInstance, + } +} + +// getValidEntry handles the logic of obtaining a usable entry from the MRDInstance, +// including initialization and recreation of entries if they are in a bad state. +func (msr *MrdSimpleReader) getValidEntry() (*MRDEntry, error) { + msr.mu.RLock() + defer msr.mu.RUnlock() + + if msr.mrdInstance == nil { + return nil, fmt.Errorf("MrdSimpleReader: mrdInstance is nil") + } + + if msr.mrdInstanceInUse.CompareAndSwap(false, true) { + msr.mrdInstance.IncrementRefCount() + } + + // Attempt to get an entry. + entry := msr.mrdInstance.GetMRDEntry() + + // If no entry is available, the pool might not be initialized. + var err error + if entry == nil { + err = msr.mrdInstance.EnsureMrdInstance() + if err != nil { + return nil, err + } + // After initialization, get the next available entry. + entry = msr.mrdInstance.GetMRDEntry() + } else { + // If an entry was retrieved, check if it's usable. + entry.mu.RLock() + needsRecreation := entry.mrd == nil || entry.mrd.Error() != nil + entry.mu.RUnlock() + + if needsRecreation { + err = msr.mrdInstance.RecreateMRDEntry(entry) + if err != nil { + return nil, err + } + // After recreation, get the next available entry. + entry = msr.mrdInstance.GetMRDEntry() + } + } + + return entry, err +} + +// ReadAt reads data into the provided request buffer starting at the specified +// offset. It retrieves an available MRD entry and uses it to download the +// requested byte range. If an MRD entry is not available or is in an error +// state, it attempts to create or recreate one. +func (msr *MrdSimpleReader) ReadAt(ctx context.Context, req *ReadRequest) (ReadResponse, error) { + // If the destination buffer is empty, there's nothing to read. + if len(req.Buffer) == 0 { + return ReadResponse{}, nil + } + + entry, err := msr.getValidEntry() + if err != nil { + return ReadResponse{}, err + } + if entry == nil { + return ReadResponse{}, fmt.Errorf("MrdSimpleReader: failed to get a valid MRD entry") + } + + // Prepare the buffer for the read operation. + buffer := bytes.NewBuffer(req.Buffer) + buffer.Reset() + done := make(chan readResult, 1) + + // Local mutex for the callback closure to prevent race conditions on the 'done' channel. + var cbMu sync.Mutex + defer func() { + cbMu.Lock() + // The channel must be closed only once. + if done != nil { + close(done) + done = nil + } + cbMu.Unlock() + }() + + // Lock the entry to safely access its MRD instance. + entry.mu.RLock() + mrd := entry.mrd + if mrd == nil { + entry.mu.RUnlock() + return ReadResponse{}, fmt.Errorf("MrdSimpleReader: mrd is nil") + } + // Add the read request to the MRD instance. The read will be performed + // asynchronously, and the callback will be invoked upon completion. + mrd.Add(buffer, req.Offset, int64(len(req.Buffer)), func(offsetAddCallback int64, bytesReadAddCallback int64, e error) { + defer func() { + cbMu.Lock() + // Send the result to the 'done' channel if it's still open. + if done != nil { + done <- readResult{bytesRead: int(bytesReadAddCallback), err: e} + } + cbMu.Unlock() + }() + + // Wrap non-EOF errors for better context. + if e != nil && e != io.EOF { + e = fmt.Errorf("error in Add call: %w", e) + } + }) + entry.mu.RUnlock() + + var bytesRead int + + // Wait for the read to complete or for the context to be cancelled. + select { + case <-ctx.Done(): + err = ctx.Err() + case res := <-done: + bytesRead = res.bytesRead + err = res.err + } + + if err != nil { + return ReadResponse{}, err + } + + return ReadResponse{Size: bytesRead}, nil +} + +// Destroy cleans up the resources used by the reader, primarily by destroying +// the associated MrdInstance. This should be called when the reader is no +// longer needed. +func (msr *MrdSimpleReader) Destroy() { + msr.mu.Lock() + defer msr.mu.Unlock() + if msr.mrdInstance != nil { + msr.mrdInstanceInUse.Store(false) + msr.mrdInstance.DecrementRefCount() + msr.mrdInstance = nil + } +} + +// CheckInvariants is a no-op for this reader type. It's included to satisfy +// the gcsx.Reader interface. +// func (msr *MrdSimpleReader) CheckInvariants() { +// } diff --git a/internal/gcsx/mrd_simple_reader_test.go b/internal/gcsx/mrd_simple_reader_test.go new file mode 100644 index 0000000000..736c4f6c32 --- /dev/null +++ b/internal/gcsx/mrd_simple_reader_test.go @@ -0,0 +1,188 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcsx + +import ( + "context" + "errors" + "io" + "testing" + "time" + + "github.com/googlecloudplatform/gcsfuse/v3/cfg" + "github.com/googlecloudplatform/gcsfuse/v3/internal/cache/lru" + "github.com/googlecloudplatform/gcsfuse/v3/internal/storage" + "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/fake" + "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" + "github.com/jacobsa/fuse/fuseops" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type MrdSimpleReaderTest struct { + suite.Suite + object *gcs.MinObject + bucket *storage.TestifyMockBucket + cache *lru.Cache + inodeID fuseops.InodeID + mrdConfig cfg.MrdConfig + mrdInstance *MrdInstance + reader *MrdSimpleReader +} + +func TestMrdSimpleReaderTestSuite(t *testing.T) { + suite.Run(t, new(MrdSimpleReaderTest)) +} + +func (t *MrdSimpleReaderTest) SetupTest() { + t.object = &gcs.MinObject{ + Name: "foo", + Size: 1024 * MiB, + Generation: 1234, + } + t.bucket = new(storage.TestifyMockBucket) + t.cache = lru.NewCache(2) + t.inodeID = 100 + t.mrdConfig = cfg.MrdConfig{PoolSize: 1} + + t.mrdInstance = NewMrdInstance(t.object, t.bucket, t.cache, t.inodeID, t.mrdConfig) + t.reader = NewMrdSimpleReader(t.mrdInstance) +} + +func (t *MrdSimpleReaderTest) TestNewMrdSimpleReader() { + assert.NotNil(t.T(), t.reader) + assert.Equal(t.T(), t.mrdInstance, t.reader.mrdInstance) +} + +func (t *MrdSimpleReaderTest) TestReadAt_EmptyBuffer() { + req := &ReadRequest{ + Buffer: []byte{}, + Offset: 0, + } + + resp, err := t.reader.ReadAt(context.Background(), req) + + assert.NoError(t.T(), err) + assert.Equal(t.T(), 0, resp.Size) +} + +func (t *MrdSimpleReaderTest) TestReadAt_Success() { + data := []byte("hello world") + fakeMRD := fake.NewFakeMultiRangeDownloader(t.object, data) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + req := &ReadRequest{ + Buffer: make([]byte, 5), + Offset: 0, + } + + resp, err := t.reader.ReadAt(context.Background(), req) + + assert.NoError(t.T(), err) + assert.Equal(t.T(), 5, resp.Size) + assert.Equal(t.T(), "hello", string(req.Buffer)) +} + +func (t *MrdSimpleReaderTest) TestReadAt_ContextCancelled() { + fakeMRD := fake.NewFakeMultiRangeDownloaderWithSleep(t.object, []byte("hello"), 100*time.Millisecond) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + ctx, cancel := context.WithCancel(context.Background()) + req := &ReadRequest{ + Buffer: make([]byte, 5), + Offset: 0, + } + cancel() + + resp, err := t.reader.ReadAt(ctx, req) + + assert.Error(t.T(), err) + assert.Equal(t.T(), context.Canceled, err) + assert.Equal(t.T(), 0, resp.Size) +} + +func (t *MrdSimpleReaderTest) TestReadAt_MrdError() { + fakeMRD := fake.NewFakeMultiRangeDownloaderWithSleepAndDefaultError(t.object, []byte("hello"), 0, errors.New("read error")) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + req := &ReadRequest{ + Buffer: make([]byte, 5), + Offset: 0, + } + + resp, err := t.reader.ReadAt(context.Background(), req) + + assert.Error(t.T(), err) + assert.Contains(t.T(), err.Error(), "read error") + assert.Equal(t.T(), 0, resp.Size) +} + +func (t *MrdSimpleReaderTest) TestReadAt_MrdEOF() { + fakeMRD := fake.NewFakeMultiRangeDownloaderWithSleepAndDefaultError(t.object, []byte("hello"), 0, io.EOF) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD, nil).Once() + req := &ReadRequest{ + Buffer: make([]byte, 5), + Offset: 0, + } + + resp, err := t.reader.ReadAt(context.Background(), req) + + assert.Error(t.T(), err) + assert.Equal(t.T(), io.EOF, err) + assert.Equal(t.T(), 0, resp.Size) +} + +func (t *MrdSimpleReaderTest) TestReadAt_NilMrdInstance() { + t.reader.mrdInstance = nil + req := &ReadRequest{ + Buffer: make([]byte, 5), + Offset: 0, + } + + resp, err := t.reader.ReadAt(context.Background(), req) + + assert.Error(t.T(), err) + assert.Contains(t.T(), err.Error(), "mrdInstance is nil") + assert.Equal(t.T(), 0, resp.Size) +} + +func (t *MrdSimpleReaderTest) TestReadAt_Recreation() { + // 1. Initial creation with a broken MRD + fakeMRD1 := fake.NewFakeMultiRangeDownloaderWithStatusError(t.object, []byte("data"), errors.New("broken")) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once() + // Initialize pool manually to simulate existing bad state + err := t.mrdInstance.EnsureMrdInstance() + assert.NoError(t.T(), err) + // 2. ReadAt called. getValidEntry gets entry with fakeMRD1. + // It sees error, calls RecreateMRDEntry. + fakeMRD2 := fake.NewFakeMultiRangeDownloader(t.object, []byte("data")) + t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once() + req := &ReadRequest{ + Buffer: make([]byte, 4), + Offset: 0, + } + + resp, err := t.reader.ReadAt(context.Background(), req) + + assert.NoError(t.T(), err) + assert.Equal(t.T(), 4, resp.Size) + assert.Equal(t.T(), "data", string(req.Buffer)) +} + +func (t *MrdSimpleReaderTest) TestDestroy() { + t.reader.Destroy() + + assert.Nil(t.T(), t.reader.mrdInstance) + // Verify that calling Destroy again doesn't panic + t.reader.Destroy() +} diff --git a/test_io_depth.sh b/test_io_depth.sh new file mode 100755 index 0000000000..779d9da9d9 --- /dev/null +++ b/test_io_depth.sh @@ -0,0 +1,38 @@ +#!/bin/bash +umount ~/gcs1 || true +echo "Prince" > ~/logs.txt +echo 256 | sudo tee /proc/sys/fs/fuse/max_pages_limit + +sevirity="trace" + +go install . && gcsfuse --max-read-ahead-kb 65536 --log-severity=$sevirity --log-format=text --log-file ~/logs.txt princer-gcsfuse-test-zonal-us-west4a ~/gcs1 + +# mkdir -p /home/princer_google_com/gcs1/5M +# fio --name=multi_file_5000mb \ +# --directory=/home/princer_google_com/gcs1/5M \ +# --rw=read \ +# --bs=4K \ +# --nrfiles=1 \ +# --filesize=5M \ +# --numjobs=1 \ +# --openfiles=1 \ +# --ioengine=libaio \ +# --direct=0 \ +# --group_reporting + + + mkdir -p /home/princer_google_com/gcs1/2G +fio --name=multi_file_64gb \ + --directory=/home/princer_google_com/gcs1/2G \ + --rw=read \ + --bs=1M \ + --nrfiles=2 \ + --filesize=2G \ + --numjobs=1 \ + --openfiles=1 \ + --ioengine=libaio \ + --direct=0 \ + --iodepth=1 \ + --group_reporting + +umount "$MOUNT_POINT" || true \ No newline at end of file diff --git a/tools/config-gen/main.go b/tools/config-gen/main.go index ad6ebc29b5..d16d00daf5 100644 --- a/tools/config-gen/main.go +++ b/tools/config-gen/main.go @@ -155,10 +155,16 @@ func main() { // formatValue is a custom template function that correctly formats values for Go code. // It adds quotes to strings and leaves other types as-is. +// Special case: if a string looks like a function call (ends with ()), it's output as-is. func formatValue(v any) string { rv := reflect.ValueOf(v) switch rv.Kind() { case reflect.String: + s := v.(string) + // Check if it looks like a function call - if so, output as-is without quotes + if len(s) > 2 && s[len(s)-2:] == "()" { + return s + } // Use %q to safely quote strings, e.g., "my-string" return fmt.Sprintf("%q", v) default: