diff --git a/cmd/legacy_main.go b/cmd/legacy_main.go index 30bb8a9c78..160a343005 100644 --- a/cmd/legacy_main.go +++ b/cmd/legacy_main.go @@ -34,6 +34,7 @@ import ( "time" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "golang.org/x/sys/unix" "github.com/googlecloudplatform/gcsfuse/v3/cfg" @@ -179,7 +180,7 @@ func createStorageHandle(newConfig *cfg.Config, userAgent string, metricHandle m //////////////////////////////////////////////////////////////////////// // Mount the file system according to arguments in the supplied context. -func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, metricHandle metrics.MetricHandle, isSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) { +func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle, isSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) { // Enable invariant checking if requested. if newConfig.Debug.ExitOnInvariantViolation { locker.EnableInvariantsCheck() @@ -214,6 +215,7 @@ func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, newConfig, storageHandle, metricHandle, + traceHandle, isSet) if err != nil { @@ -558,6 +560,11 @@ func Mount(mountInfo *mountInfo, bucketName, mountPoint string) (err error) { } } shutdownTracingFn := monitor.SetupTracing(ctx, newConfig, logger.MountInstanceID(fsName(bucketName))) + traceHandle := tracing.NewNoopTracer() + if cfg.IsTracingEnabled(newConfig) { + traceHandle = tracing.NewOTELTracer() + } + shutdownFn := common.JoinShutdownFunc(metricExporterShutdownFn, shutdownTracingFn) // No-op if profiler is disabled. @@ -570,7 +577,7 @@ func Mount(mountInfo *mountInfo, bucketName, mountPoint string) (err error) { var mfs *fuse.MountedFileSystem { startTime := time.Now() - mfs, err = mountWithArgs(bucketName, mountPoint, newConfig, metricHandle, mountInfo.isUserSet) + mfs, err = mountWithArgs(bucketName, mountPoint, newConfig, metricHandle, traceHandle, mountInfo.isUserSet) // This utility is to absorb the error // returned by daemonize.SignalOutcome calls by simply diff --git a/cmd/mount.go b/cmd/mount.go index a268e3195c..e59dd4fef9 100644 --- a/cmd/mount.go +++ b/cmd/mount.go @@ -23,6 +23,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/mount" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "golang.org/x/net/context" "github.com/googlecloudplatform/gcsfuse/v3/internal/fs" @@ -43,6 +44,7 @@ func mountWithStorageHandle( newConfig *cfg.Config, storageHandle storage.StorageHandle, metricHandle metrics.MetricHandle, + traceHandle tracing.TraceHandle, isUserSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) { // Sanity check: make sure the temporary directory exists and is writable @@ -127,6 +129,7 @@ be interacting with the file system.`) NewConfig: newConfig, IsUserSet: isUserSet, MetricHandle: metricHandle, + TraceHandle: traceHandle, } if serverCfg.NewConfig.FileSystem.ExperimentalEnableDentryCache { serverCfg.Notifier = fuse.NewNotifier() diff --git a/internal/bufferedread/buffered_reader.go b/internal/bufferedread/buffered_reader.go index a56214b24a..1e8204b057 100644 --- a/internal/bufferedread/buffered_reader.go +++ b/internal/bufferedread/buffered_reader.go @@ -31,6 +31,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/jacobsa/fuse/fuseops" "golang.org/x/sync/semaphore" ) @@ -72,6 +73,8 @@ type BufferedReader struct { metricHandle metrics.MetricHandle + traceHandle tracing.TraceHandle + // handleID is the file handle id, used for logging. handleID fuseops.HandleID @@ -123,6 +126,7 @@ type BufferedReaderOptions struct { GlobalMaxBlocksSem *semaphore.Weighted WorkerPool workerpool.WorkerPool MetricHandle metrics.MetricHandle + TraceHandle tracing.TraceHandle ReadTypeClassifier *gcsx.ReadTypeClassifier HandleID fuseops.HandleID } @@ -155,6 +159,7 @@ func NewBufferedReader(opts *BufferedReaderOptions) (*BufferedReader, error) { blockPool: blockpool, workerPool: opts.WorkerPool, metricHandle: opts.MetricHandle, + traceHandle: opts.TraceHandle, handleID: opts.HandleID, prefetchMultiplier: defaultPrefetchMultiplier, randomReadsThreshold: opts.Config.RandomSeekThreshold, diff --git a/internal/cache/file/cache_handle_test.go b/internal/cache/file/cache_handle_test.go index 17ae0b4249..2b8cee5d50 100644 --- a/internal/cache/file/cache_handle_test.go +++ b/internal/cache/file/cache_handle_test.go @@ -39,6 +39,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil" "github.com/googlecloudplatform/gcsfuse/v3/metrics" "github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -156,6 +157,7 @@ func (cht *cacheHandleTest) SetupTest() { fileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics(), + tracing.NewNoopTracer(), ) cht.cacheHandle = NewCacheHandle(readLocalFileHandle, fileDownloadJob, cht.cache, false, 0) @@ -859,6 +861,7 @@ func (cht *cacheHandleTest) Test_SequentialRead_Parallel_Download_True() { fileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics(), + tracing.NewNoopTracer(), ) cht.cacheHandle.fileDownloadJob = fileDownloadJob @@ -894,6 +897,7 @@ func (cht *cacheHandleTest) Test_RandomRead_Parallel_Download_True() { fileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics(), + tracing.NewNoopTracer(), ) cht.cacheHandle.fileDownloadJob = fileDownloadJob @@ -929,6 +933,7 @@ func (cht *cacheHandleTest) Test_RandomRead_CacheForRangeReadFalse_And_ParallelD fileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics(), + tracing.NewNoopTracer(), ) // Since, it's a random read, download job will not start. diff --git a/internal/cache/file/cache_handler_test.go b/internal/cache/file/cache_handler_test.go index 956a983a6b..e6dd1d1d7b 100644 --- a/internal/cache/file/cache_handler_test.go +++ b/internal/cache/file/cache_handler_test.go @@ -37,6 +37,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil" "github.com/googlecloudplatform/gcsfuse/v3/metrics" "github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -85,7 +86,7 @@ func initializeCacheHandlerTestArgs(t *testing.T, fileCacheConfig *cfg.FileCache // Job manager jobManager := downloader.NewJobManager(cache, util.DefaultFilePerm, - util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, fileCacheConfig, metrics.NewNoopMetrics()) + util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer()) // Mocked cached handler object. isSparse := fileCacheConfig != nil && fileCacheConfig.ExperimentalEnableChunkCache diff --git a/internal/cache/file/downloader/downloader.go b/internal/cache/file/downloader/downloader.go index 850b7e6285..2cbbc6705d 100644 --- a/internal/cache/file/downloader/downloader.go +++ b/internal/cache/file/downloader/downloader.go @@ -25,6 +25,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/locker" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "golang.org/x/sync/semaphore" ) @@ -62,11 +63,12 @@ type JobManager struct { mu locker.Locker maxParallelismSem *semaphore.Weighted metricHandle metrics.MetricHandle + traceHandle tracing.TraceHandle } func NewJobManager(fileInfoCache *lru.Cache, filePerm os.FileMode, dirPerm os.FileMode, cacheDir string, sequentialReadSizeMb int32, c *cfg.FileCacheConfig, - metricHandle metrics.MetricHandle) (jm *JobManager) { + metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle) (jm *JobManager) { maxParallelDownloads := int64(math.MaxInt64) if c.MaxParallelDownloads > 0 { maxParallelDownloads = c.MaxParallelDownloads @@ -81,6 +83,7 @@ func NewJobManager(fileInfoCache *lru.Cache, filePerm os.FileMode, dirPerm os.Fi // Shared between jobs - Limits the overall concurrency of downloads. maxParallelismSem: semaphore.NewWeighted(maxParallelDownloads), metricHandle: metricHandle, + traceHandle: traceHandle, } jm.mu = locker.New("JobManager", func() {}) jm.jobs = make(map[string]*Job) @@ -118,7 +121,7 @@ func (jm *JobManager) CreateJobIfNotExists(object *gcs.MinObject, bucket gcs.Buc removeJobCallback := func() { jm.removeJob(object.Name, bucket.Name()) } - job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig, jm.maxParallelismSem, jm.metricHandle) + job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig, jm.maxParallelismSem, jm.metricHandle, jm.traceHandle) jm.jobs[objectPath] = job return job } diff --git a/internal/cache/file/downloader/downloader_test.go b/internal/cache/file/downloader/downloader_test.go index adda4da988..60752d4197 100644 --- a/internal/cache/file/downloader/downloader_test.go +++ b/internal/cache/file/downloader/downloader_test.go @@ -33,6 +33,7 @@ import ( testutil "github.com/googlecloudplatform/gcsfuse/v3/internal/util" "github.com/googlecloudplatform/gcsfuse/v3/metrics" "github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" . "github.com/jacobsa/ogletest" "github.com/stretchr/testify/mock" ) @@ -70,7 +71,7 @@ func (dt *downloaderTest) setupHelper() { ExpectEq(nil, err) dt.initJobTest(DefaultObjectName, []byte("taco"), DefaultSequentialReadSizeMb, CacheMaxSize, func() {}) - dt.jm = NewJobManager(dt.cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, dt.defaultFileCacheConfig, metrics.NewNoopMetrics()) + dt.jm = NewJobManager(dt.cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, dt.defaultFileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer()) } func (dt *downloaderTest) SetUp(*TestInfo) { diff --git a/internal/cache/file/downloader/jm_parallel_downloads_test.go b/internal/cache/file/downloader/jm_parallel_downloads_test.go index a9f60d2dd9..52f9265233 100644 --- a/internal/cache/file/downloader/jm_parallel_downloads_test.go +++ b/internal/cache/file/downloader/jm_parallel_downloads_test.go @@ -31,6 +31,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -156,7 +157,7 @@ func TestParallelDownloads(t *testing.T) { WriteBufferSize: 4 * 1024 * 1024, EnableODirect: tc.enableODirect, } - jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig, metrics.NewNoopMetrics()) + jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer()) job := jm.CreateJobIfNotExists(&minObj, bucket) subscriberC := job.subscribe(tc.subscribedOffset) @@ -199,7 +200,7 @@ func TestMultipleConcurrentDownloads(t *testing.T) { MaxParallelDownloads: 2, WriteBufferSize: 4 * 1024 * 1024, } - jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig, metrics.NewNoopMetrics()) + jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer()) job1 := jm.CreateJobIfNotExists(&minObj1, bucket) job2 := jm.CreateJobIfNotExists(&minObj2, bucket) s1 := job1.subscribe(10 * util.MiB) diff --git a/internal/cache/file/downloader/job.go b/internal/cache/file/downloader/job.go index 291d39c447..b41d1a3fd2 100644 --- a/internal/cache/file/downloader/job.go +++ b/internal/cache/file/downloader/job.go @@ -32,6 +32,8 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/logger" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" + "golang.org/x/net/context" "golang.org/x/sync/semaphore" ) @@ -97,6 +99,8 @@ type Job struct { rangeChan chan data.ObjectRange metricsHandle metrics.MetricHandle + + traceHandle tracing.TraceHandle } // JobStatus represents the status of job. @@ -123,6 +127,7 @@ func NewJob( fileCacheConfig *cfg.FileCacheConfig, maxParallelismSem *semaphore.Weighted, metricHandle metrics.MetricHandle, + traceHandle tracing.TraceHandle, ) (job *Job) { job = &Job{ object: object, @@ -134,6 +139,7 @@ func NewJob( fileCacheConfig: fileCacheConfig, maxParallelismSem: maxParallelismSem, metricsHandle: metricHandle, + traceHandle: traceHandle, } job.mu = locker.New("Job-"+fileSpec.Path, job.checkInvariants) job.init() diff --git a/internal/cache/file/downloader/job_test.go b/internal/cache/file/downloader/job_test.go index f74207ffa4..2787ea6a39 100644 --- a/internal/cache/file/downloader/job_test.go +++ b/internal/cache/file/downloader/job_test.go @@ -35,6 +35,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil" testutil "github.com/googlecloudplatform/gcsfuse/v3/internal/util" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" . "github.com/jacobsa/ogletest" "golang.org/x/sync/semaphore" ) @@ -63,7 +64,7 @@ func (dt *downloaderTest) initJobTest(objectName string, objectContent []byte, s } dt.cache = lru.NewCache(lruCacheSize) - dt.job = NewJob(&dt.object, dt.bucket, dt.cache, sequentialReadSize, dt.fileSpec, removeCallback, dt.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics()) + dt.job = NewJob(&dt.object, dt.bucket, dt.cache, sequentialReadSize, dt.fileSpec, removeCallback, dt.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics(), tracing.NewNoopTracer()) fileInfoKey := data.FileInfoKey{ BucketName: storage.TestBucketName, ObjectName: objectName, diff --git a/internal/cache/file/downloader/job_testify_test.go b/internal/cache/file/downloader/job_testify_test.go index 7adcd7df6c..987493e9f5 100644 --- a/internal/cache/file/downloader/job_testify_test.go +++ b/internal/cache/file/downloader/job_testify_test.go @@ -24,6 +24,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/fake" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "golang.org/x/net/context" "golang.org/x/sync/semaphore" @@ -65,7 +66,7 @@ func (t *JobTestifyTest) initReadCacheTestifyTest(objectName string, objectConte DirPerm: util.DefaultDirPerm, } t.cache = lru.NewCache(lruCacheSize) - t.job = NewJob(&t.object, t.mockBucket, t.cache, sequentialReadSize, t.fileSpec, removeCallback, t.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics()) + t.job = NewJob(&t.object, t.mockBucket, t.cache, sequentialReadSize, t.fileSpec, removeCallback, t.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics(), tracing.NewNoopTracer()) fileInfoKey := data.FileInfoKey{ BucketName: storage.TestBucketName, ObjectName: objectName, diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 179125040b..e38cf24253 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -34,11 +34,10 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/fs/gcsfuse_errors" "github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "golang.org/x/sync/semaphore" - "go.opentelemetry.io/otel/trace" - "github.com/googlecloudplatform/gcsfuse/v3/cfg" "github.com/googlecloudplatform/gcsfuse/v3/internal/cache/file" "github.com/googlecloudplatform/gcsfuse/v3/internal/cache/file/downloader" @@ -143,6 +142,8 @@ type ServerConfig struct { MetricHandle metrics.MetricHandle + TraceHandle tracing.TraceHandle + // Notifier allows the file system to send invalidation messages to the FUSE // kernel module. This enables proactive cache invalidation (e.g., for dentries) // when underlying content changes, improving consistency while still leveraging @@ -213,6 +214,7 @@ func NewFileSystem(ctx context.Context, serverCfg *ServerConfig) (fuseutil.FileS fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: serverCfg.NewConfig.FileCache.CacheFileForRangeRead, metricHandle: serverCfg.MetricHandle, + traceHandle: serverCfg.TraceHandle, enableAtomicRenameObject: serverCfg.NewConfig.EnableAtomicRenameObject, isTracingEnabled: cfg.IsTracingEnabled(serverCfg.NewConfig), globalMaxWriteBlocksSem: semaphore.NewWeighted(serverCfg.NewConfig.Write.GlobalMaxBlocks), @@ -313,7 +315,7 @@ func createFileCacheHandler(serverCfg *ServerConfig) (fileCacheHandler *file.Cac return nil, fmt.Errorf("createFileCacheHandler: while creating file cache directory: %w", cacheDirErr) } - jobManager := downloader.NewJobManager(fileInfoCache, filePerm, dirPerm, cacheDir, serverCfg.SequentialReadSizeMb, &serverCfg.NewConfig.FileCache, serverCfg.MetricHandle) + jobManager := downloader.NewJobManager(fileInfoCache, filePerm, dirPerm, cacheDir, serverCfg.SequentialReadSizeMb, &serverCfg.NewConfig.FileCache, serverCfg.MetricHandle, serverCfg.TraceHandle) fileCacheHandler = file.NewCacheHandler(fileInfoCache, jobManager, cacheDir, filePerm, dirPerm, serverCfg.NewConfig.FileCache.ExcludeRegex, serverCfg.NewConfig.FileCache.IncludeRegex, serverCfg.NewConfig.FileCache.ExperimentalEnableChunkCache) return } @@ -550,6 +552,8 @@ type fileSystem struct { metricHandle metrics.MetricHandle + traceHandle tracing.TraceHandle + enableAtomicRenameObject bool isTracingEnabled bool @@ -1724,16 +1728,6 @@ func (fs *fileSystem) StatFS( return } -// When tracing is enabled ensure span & trace context from oldCtx is passed on to newCtx -func maybePropagateTraceContext(newCtx context.Context, oldCtx context.Context, isTracingEnabled bool) context.Context { - if !isTracingEnabled { - return newCtx - } - - span := trace.SpanFromContext(oldCtx) - return trace.ContextWithSpan(newCtx, span) -} - // getInterruptlessContext returns a new context that is not cancellable by the // parent context if the ignore-interrupts flag is set. Otherwise, it returns // the original context. @@ -1742,7 +1736,7 @@ func (fs *fileSystem) getInterruptlessContext(ctx context.Context) context.Conte // When ignore interrupts config is set, we are creating a new context not // cancellable by parent context. newCtx := context.Background() - return maybePropagateTraceContext(newCtx, ctx, fs.isTracingEnabled) + return tracing.MaybePropagateTraceContext(newCtx, ctx, fs.isTracingEnabled) } return ctx @@ -2088,7 +2082,7 @@ func (fs *fileSystem) CreateFile( // CreateFile() invoked to create new files, can be safely considered as filehandle // opened in append mode. - fs.handles[op.Handle] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, openMode, fs.newConfig, fs.bufferedReadWorkerPool, fs.globalMaxReadBlocksSem, op.Handle) + fs.handles[op.Handle] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, fs.traceHandle, openMode, fs.newConfig, fs.bufferedReadWorkerPool, fs.globalMaxReadBlocksSem, op.Handle) fs.mu.Unlock() @@ -2876,7 +2870,7 @@ func (fs *fileSystem) OpenFile( // Figure out the mode in which the file is being opened. openMode := util.FileOpenMode(op.OpenFlags) - fs.handles[op.Handle] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, openMode, fs.newConfig, fs.bufferedReadWorkerPool, fs.globalMaxReadBlocksSem, op.Handle) + fs.handles[op.Handle] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, fs.traceHandle, openMode, fs.newConfig, fs.bufferedReadWorkerPool, fs.globalMaxReadBlocksSem, op.Handle) // When we observe object generations that we didn't create, we assign them // new inode IDs. So for a given inode, all modifications go through the diff --git a/internal/fs/handle/file.go b/internal/fs/handle/file.go index cc6d4506c0..ff8428d980 100644 --- a/internal/fs/handle/file.go +++ b/internal/fs/handle/file.go @@ -29,6 +29,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool" "github.com/googlecloudplatform/gcsfuse/v3/internal/workloadinsight" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/jacobsa/fuse/fuseops" "github.com/jacobsa/syncutil" "golang.org/x/net/context" @@ -67,6 +68,8 @@ type FileHandle struct { // will be downloaded for random reads as well too. cacheFileForRangeRead bool metricHandle metrics.MetricHandle + traceHandle tracing.TraceHandle + // openMode is used to store the mode in which the file is opened. openMode util.OpenMode @@ -85,12 +88,13 @@ type FileHandle struct { } // LOCKS_REQUIRED(fh.inode.mu) -func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle metrics.MetricHandle, openMode util.OpenMode, c *cfg.Config, bufferedReadWorkerPool workerpool.WorkerPool, globalMaxReadBlocksSem *semaphore.Weighted, handleID fuseops.HandleID) (fh *FileHandle) { +func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle, openMode util.OpenMode, c *cfg.Config, bufferedReadWorkerPool workerpool.WorkerPool, globalMaxReadBlocksSem *semaphore.Weighted, handleID fuseops.HandleID) (fh *FileHandle) { fh = &FileHandle{ inode: inode, fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: cacheFileForRangeRead, metricHandle: metricHandle, + traceHandle: traceHandle, openMode: openMode, config: c, bufferedReadWorkerPool: bufferedReadWorkerPool, @@ -215,6 +219,7 @@ func (fh *FileHandle) ReadWithReadManager(ctx context.Context, req *gcsx.ReadReq FileCacheHandler: fh.fileCacheHandler, CacheFileForRangeRead: fh.cacheFileForRangeRead, MetricHandle: fh.metricHandle, + TraceHandle: fh.traceHandle, MrdWrapper: mrdWrapper, Config: fh.config, GlobalMaxBlocksSem: fh.globalMaxReadBlocksSem, @@ -318,7 +323,7 @@ func (fh *FileHandle) Read(ctx context.Context, dst []byte, offset int64, sequen fh.destroyReader() // Attempt to create an appropriate reader. - fh.reader = gcsx.NewRandomReader(minObj, bucket, sequentialReadSizeMb, fh.fileCacheHandler, fh.cacheFileForRangeRead, fh.metricHandle, mrdWrapper, fh.config, fh.handleID) + fh.reader = gcsx.NewRandomReader(minObj, bucket, sequentialReadSizeMb, fh.fileCacheHandler, fh.cacheFileForRangeRead, fh.metricHandle, fh.traceHandle, mrdWrapper, fh.config, fh.handleID) // Release RWLock and take RLock on file handle again fh.mu.Unlock() diff --git a/internal/fs/handle/file_test.go b/internal/fs/handle/file_test.go index d29f999966..1c67554855 100644 --- a/internal/fs/handle/file_test.go +++ b/internal/fs/handle/file_test.go @@ -38,6 +38,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/util" "github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/jacobsa/fuse/fuseops" "github.com/jacobsa/timeutil" "github.com/stretchr/testify/assert" @@ -165,7 +166,7 @@ func (t *fileTest) TestFileHandleWrite() { parent := createDirInode(&t.bucket, &t.clock) config := &cfg.Config{Write: cfg.WriteConfig{EnableStreamingWrites: false}} in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "test_obj", nil, false) - fh := NewFileHandle(in, nil, false, nil, writeMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), writeMode, &cfg.Config{}, nil, nil, 0) data := []byte("hello") _, err := fh.Write(t.ctx, data, 0) @@ -189,7 +190,7 @@ func (t *fileTest) Test_IsValidReadManager_NilReadManager() { const objectName = "test_obj" const objectContent = "some data" in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, objectName, []byte(objectContent), false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) fh.inode.Lock() defer fh.inode.Unlock() fh.readManager = nil @@ -205,7 +206,7 @@ func (t *fileTest) Test_IsValidReadManager_GenerationValidation() { const objectName = "test_obj" const objectContent = "some data" in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, objectName, []byte(objectContent), false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) fh.inode.Lock() defer fh.inode.Unlock() @@ -245,7 +246,7 @@ func (t *fileTest) Test_IsValidReader_NilReader() { const objectName = "test_obj" const objectContent = "some data" in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, objectName, []byte(objectContent), false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) fh.inode.Lock() defer fh.inode.Unlock() fh.reader = nil @@ -261,7 +262,7 @@ func (t *fileTest) Test_IsValidReader_GenerationValidation() { const objectName = "test_obj" const objectContent = "some data" in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, objectName, []byte(objectContent), false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) fh.inode.Lock() defer fh.inode.Unlock() @@ -286,7 +287,7 @@ func (t *fileTest) Test_IsValidReader_GenerationValidation() { t.Run(tc.name, func() { minObj := in.Source() minObj.Generation = tc.readerGeneration - fh.reader = gcsx.NewRandomReader(minObj, &t.bucket, 200, nil, false, metrics.NewNoopMetrics(), in.MRDWrapper, config, 0) + fh.reader = gcsx.NewRandomReader(minObj, &t.bucket, 200, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), in.MRDWrapper, config, 0) result := fh.isValidReader() @@ -300,7 +301,7 @@ func (t *fileTest) Test_Read_Success() { expectedData := []byte("hello from reader") parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, "test_obj_reader", expectedData, false) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) buf := make([]byte, len(expectedData)) fh.inode.Lock() @@ -316,7 +317,7 @@ func (t *fileTest) Test_ReadWithReadManager_Success() { expectedData := []byte("hello from readManager") parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, "test_obj_readManager", expectedData, false) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) buf := make([]byte, len(expectedData)) fh.inode.Lock() @@ -345,7 +346,7 @@ func (t *fileTest) Test_ReadWithReadManager_Concurrent() { parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, "concurrent_read_obj", objectContent, false) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) var wg sync.WaitGroup wg.Add(numReaders) @@ -404,7 +405,7 @@ func (t *fileTest) Test_Read_Concurrent() { parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, "concurrent_read_obj", objectContent, false) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) var wg sync.WaitGroup wg.Add(numReaders) @@ -466,7 +467,7 @@ func (t *fileTest) Test_ReadWithReadManager_ErrorScenarios() { t.SetupTest() parent := createDirInode(&t.bucket, &t.clock) testInode := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, object.Name, []byte("data"), false) - fh := NewFileHandle(testInode, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(testInode, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) fh.inode.Lock() mockRM := new(read_manager.MockReadManager) mockRM.On("ReadAt", t.ctx, mock.AnythingOfType("*gcsx.ReadRequest")).Return(gcsx.ReadResponse{}, tc.returnErr) @@ -506,7 +507,7 @@ func (t *fileTest) Test_Read_ErrorScenarios() { t.SetupTest() parent := createDirInode(&t.bucket, &t.clock) testInode := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, object.Name, []byte("data"), false) - fh := NewFileHandle(testInode, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(testInode, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) fh.inode.Lock() mockReader := new(gcsx.MockRandomReader) mockReader.On("ReadAt", t.ctx, dst, int64(0)).Return(gcsx.ObjectData{}, tc.returnErr) @@ -531,7 +532,7 @@ func (t *fileTest) Test_ReadWithReadManager_FallbackToInode() { object := gcs.MinObject{Name: "test_obj"} parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, object.Name, objectData, true) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) fh.inode.Lock() mockRM := new(read_manager.MockReadManager) fh.readManager = mockRM @@ -555,7 +556,7 @@ func (t *fileTest) Test_Read_FallbackToInode() { object := gcs.MinObject{Name: "test_obj"} parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, object.Name, objectData, true) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) fh.inode.Lock() mockR := new(gcsx.MockRandomReader) fh.reader = mockR @@ -576,7 +577,7 @@ func (t *fileTest) Test_ReadWithReadManager_ReadManagerInvalidatedByGenerationCh parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, objectName, content1, false) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) // First read, to create a readManager. fh.inode.Lock() @@ -622,7 +623,7 @@ func (t *fileTest) Test_Read_ReaderInvalidatedByGenerationChange() { parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, &cfg.Config{}, parent, objectName, content1, false) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{}, nil, nil, 0) // First read, to create a reader. fh.inode.Lock() @@ -668,7 +669,7 @@ func (t *fileTest) Test_ReadWithMrdSimpleReader_Success() { parent := createDirInode(&mockSyncerBucket, &t.clock) in := createFileInode(t.T(), &mockSyncerBucket, &t.clock, &cfg.Config{}, parent, objectName, expectedData, false) // Create File Handle. - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) require.NotNil(t.T(), fh.mrdSimpleReader) // Mock the downloader that mrdSimpleReader will use. fakeMRD := fake.NewFakeMultiRangeDownloader(in.Source(), expectedData) @@ -705,7 +706,7 @@ func (t *fileTest) Test_ReadWithMrdSimpleReader_NotAuthoritative() { // After write, content should be "dirtydata". expectedReadData := "dirtydata" // Create file handle. - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) require.NotNil(t.T(), fh.mrdSimpleReader) // Create read request and take inode lock. buf := make([]byte, len(expectedReadData)) @@ -731,7 +732,7 @@ func (t *fileTest) Test_ReadWithMrdSimpleReader_NilReader() { parent := createDirInode(&nonZonalBucket, &t.clock) in := createFileInode(t.T(), &nonZonalBucket, &t.clock, &cfg.Config{}, parent, "test_obj", []byte("data"), false) // Create file handle. - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: false}}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: false}}, nil, nil, 0) require.Nil(t.T(), fh.mrdSimpleReader) // Create read request and take inode lock. req := &gcsx.ReadRequest{ @@ -761,7 +762,7 @@ func (t *fileTest) Test_ReadWithMrdSimpleReader_ReadAtError() { parent := createDirInode(&mockSyncerBucket, &t.clock) in := createFileInode(t.T(), &mockSyncerBucket, &t.clock, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, parent, objectName, expectedData, false) // Create file handle. - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, &cfg.Config{FileSystem: cfg.FileSystemConfig{EnableKernelReader: true}}, nil, nil, 0) require.NotNil(t.T(), fh.mrdSimpleReader) // Mock the downloader to return an error. expectedErr := errors.New("mrd read error") @@ -819,7 +820,7 @@ func (t *fileTest) TestOpenMode() { parent := createDirInode(&t.bucket, &t.clock) config := &cfg.Config{Write: cfg.WriteConfig{EnableStreamingWrites: false}} in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "test_obj", nil, false) - fh := NewFileHandle(in, nil, false, nil, tc.openMode, &cfg.Config{}, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), tc.openMode, &cfg.Config{}, nil, nil, 0) openMode := fh.OpenMode() @@ -838,7 +839,7 @@ func (t *fileTest) TestFileHandle_Destroy_WithReaderAndReadManager() { mockReader.On("Destroy").Once() mockReadManager.On("Destroy").Once() // Construct file handle with mocks - fh := NewFileHandle(fileInode, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(fileInode, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) fh.reader = mockReader fh.readManager = mockReadManager @@ -854,7 +855,7 @@ func (t *fileTest) TestFileHandle_Destroy_WithNilReaderAndReadManager() { config := &cfg.Config{} fileInode := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "destroy_test_nil_obj", nil, false) // Construct file handle with nils - fh := NewFileHandle(fileInode, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(fileInode, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) fh.reader = nil fh.readManager = nil @@ -874,7 +875,7 @@ func (t *fileTest) TestFileHandle_CheckInvariants_WithNonNilReaderAndManager() { // Expectations mockReader.On("CheckInvariants").Once() mockRM.On("CheckInvariants").Once() - fh := NewFileHandle(fileInode, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(fileInode, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) fh.reader = mockReader fh.readManager = mockRM @@ -891,7 +892,7 @@ func (t *fileTest) TestFileHandle_CheckInvariants_WithNilReaderAndManager() { config := &cfg.Config{} in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "test_check_invariants_nil", nil, false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) // Should not panic even if both are nil assert.NotPanics(t.T(), func() { @@ -903,7 +904,7 @@ func (t *fileTest) Test_LockHandleAndRelockInode_Lock_NoDeadlockWithContention() parent := createDirInode(&t.bucket, &t.clock) config := &cfg.Config{} in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "test_obj_deadlock", []byte("content"), false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) var wg sync.WaitGroup const numContenders = 10 wg.Add(2 * numContenders) @@ -949,7 +950,7 @@ func (t *fileTest) Test_LockHandleAndRelockInode_RLock_NoDeadlockWithContention( parent := createDirInode(&t.bucket, &t.clock) config := &cfg.Config{} in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "test_obj_deadlock", []byte("content"), false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) var wg sync.WaitGroup const numContenders = 10 wg.Add(2 * numContenders) @@ -995,7 +996,7 @@ func (t *fileTest) Test_LockHandleAndRelockInode_Mixed_NoDeadlockWithContention( parent := createDirInode(&t.bucket, &t.clock) config := &cfg.Config{} in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "test_obj_deadlock", []byte("content"), false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) var wg sync.WaitGroup const numRContenders = 10 const numWContenders = 10 @@ -1041,7 +1042,7 @@ func (t *fileTest) Test_UnlockHandleAndInode() { parent := createDirInode(&t.bucket, &t.clock) config := &cfg.Config{} in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "test_obj_deadlock", []byte("content"), false) - fh := NewFileHandle(in, nil, false, nil, readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) var wg sync.WaitGroup const numContenders = 10 @@ -1109,7 +1110,7 @@ func (t *fileTest) Test_ReadWithReadManager_FullReadSuccessWithBufferedRead() { globalSemaphore := semaphore.NewWeighted(20) // Sufficient blocks for the test parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "read_obj", expectedData, false) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, config, workerPool, globalSemaphore, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, workerPool, globalSemaphore, 0) fh.inode.Lock() buf := make([]byte, fileSize) @@ -1211,7 +1212,7 @@ func (t *fileTest) Test_ShouldSkipSizeChecks() { parent := createDirInode(&t.bucket, &t.clock) config := &cfg.Config{} in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, tc.object.Name, nil, false) - fh := NewFileHandle(in, nil, false, nil, tc.openMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), tc.openMode, config, nil, nil, 0) if tc.useNilReadManager { fh.readManager = nil @@ -1255,7 +1256,7 @@ func (t *fileTest) Test_ReadWithReadManager_ConcurrentReadsWithBufferedReader() // Create mock inode and file handle. parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "read_obj", expectedData, false) - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, config, workerPool, globalSemaphore, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, workerPool, globalSemaphore, 0) // Use a WaitGroup to synchronize goroutines. var wg sync.WaitGroup wg.Add(numGoroutines) @@ -1311,7 +1312,7 @@ func (t *fileTest) Test_ReadWithReadManager_WorkloadInsightVisual() { parent := createDirInode(&t.bucket, &t.clock) in := createFileInode(t.T(), &t.bucket, &t.clock, config, parent, "test_obj_visual", content, false) in.Lock() - fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), readMode, config, nil, nil, 0) + fh := NewFileHandle(in, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), readMode, config, nil, nil, 0) in.Unlock() // Perform multiple reads and destroy the file-handle. diff --git a/internal/fs/server.go b/internal/fs/server.go index a38fd32e3f..6eff8f2213 100644 --- a/internal/fs/server.go +++ b/internal/fs/server.go @@ -33,7 +33,7 @@ func NewServer(ctx context.Context, cfg *ServerConfig) (fuse.Server, error) { fs = wrappers.WithErrorMapping(fs, cfg.NewConfig.FileSystem.PreconditionErrors) if newcfg.IsTracingEnabled(cfg.NewConfig) { - fs = wrappers.WithTracing(fs) + fs = wrappers.WithTracing(fs, cfg.TraceHandle) } fs = wrappers.WithMonitoring(fs, cfg.MetricHandle) if cfg.Notifier != nil { diff --git a/internal/fs/tracing_test.go b/internal/fs/tracing_test.go index b3d77b7242..912b165f18 100644 --- a/internal/fs/tracing_test.go +++ b/internal/fs/tracing_test.go @@ -18,11 +18,14 @@ import ( "context" "testing" + "github.com/stretchr/testify/suite" + "github.com/googlecloudplatform/gcsfuse/v3/cfg" "github.com/googlecloudplatform/gcsfuse/v3/internal/fs" "github.com/googlecloudplatform/gcsfuse/v3/internal/fs/wrappers" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/fake" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/jacobsa/fuse/fuseops" "github.com/jacobsa/fuse/fuseutil" "github.com/jacobsa/timeutil" @@ -34,6 +37,19 @@ import ( "go.opentelemetry.io/otel/trace" ) +type TracingTestSuite struct { + suite.Suite + globalExporter *tracetest.InMemoryExporter +} + +func (s *TracingTestSuite) SetupSuite() { + s.globalExporter = newInMemoryExporter() +} + +func (s *TracingTestSuite) SetupSubTest() { + s.globalExporter.Reset() +} + func createTestFileSystemWithTraces(ctx context.Context, t *testing.T, ignoreInterrupts bool) (gcs.Bucket, fuseutil.FileSystem) { t.Helper() @@ -73,17 +89,13 @@ func createTestFileSystemWithTraces(ctx context.Context, t *testing.T, ignoreInt return bucket, server } -func newInMemoryExporter(t *testing.T) *tracetest.InMemoryExporter { - t.Helper() +func newInMemoryExporter() *tracetest.InMemoryExporter { ex := tracetest.NewInMemoryExporter() - t.Cleanup(func() { - ex.Reset() - }) otel.SetTracerProvider(sdktrace.NewTracerProvider(sdktrace.WithSyncer(ex))) return ex } -func TestTraceLookupInode(t *testing.T) { +func (s *TracingTestSuite) TestTraceLookupInode() { ctx := context.Background() testCases := []struct { name string @@ -94,10 +106,10 @@ func TestTraceLookupInode(t *testing.T) { {"disabled", false, []string{"LookUpInode"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -110,7 +122,7 @@ func TestTraceLookupInode(t *testing.T) { err := m.LookUpInode(context.Background(), lookupOp) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -120,7 +132,7 @@ func TestTraceLookupInode(t *testing.T) { } } -func TestTraceStatFS(t *testing.T) { +func (s *TracingTestSuite) TestTraceStatFS() { ctx := context.Background() testCases := []struct { name string @@ -131,10 +143,10 @@ func TestTraceStatFS(t *testing.T) { {"disabled", false, []string{"StatFS"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -144,7 +156,7 @@ func TestTraceStatFS(t *testing.T) { err := m.StatFS(context.Background(), statFsOp) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -154,7 +166,7 @@ func TestTraceStatFS(t *testing.T) { } } -func TestTraceGetInodeAttributes(t *testing.T) { +func (s *TracingTestSuite) TestTraceGetInodeAttributes() { ctx := context.Background() testCases := []struct { name string @@ -165,10 +177,10 @@ func TestTraceGetInodeAttributes(t *testing.T) { {"disabled", false, []string{"GetInodeAttributes"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -180,7 +192,7 @@ func TestTraceGetInodeAttributes(t *testing.T) { err := m.GetInodeAttributes(context.Background(), op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -190,7 +202,7 @@ func TestTraceGetInodeAttributes(t *testing.T) { } } -func TestTraceSetInodeAttributes(t *testing.T) { +func (s *TracingTestSuite) TestTraceSetInodeAttributes() { ctx := context.Background() testCases := []struct { name string @@ -201,10 +213,10 @@ func TestTraceSetInodeAttributes(t *testing.T) { {"disabled", false, []string{"LookUpInode", "SetInodeAttributes"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -223,7 +235,7 @@ func TestTraceSetInodeAttributes(t *testing.T) { err = m.SetInodeAttributes(context.Background(), op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -233,7 +245,7 @@ func TestTraceSetInodeAttributes(t *testing.T) { } } -func TestTraceForgetInode(t *testing.T) { +func (s *TracingTestSuite) TestTraceForgetInode() { ctx := context.Background() testCases := []struct { name string @@ -244,14 +256,14 @@ func TestTraceForgetInode(t *testing.T) { {"disabled", false, []string{"LookUpInode", "ForgetInode"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) ctx := context.Background() fileName := "test.txt" content := "test content" createWithContents(ctx, t, bucket, fileName, content) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) lookUpOp := &fuseops.LookUpInodeOp{ Parent: fuseops.RootInodeID, Name: fileName, @@ -266,7 +278,7 @@ func TestTraceForgetInode(t *testing.T) { err = m.ForgetInode(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -276,7 +288,7 @@ func TestTraceForgetInode(t *testing.T) { } } -func TestTraceMkDir(t *testing.T) { +func (s *TracingTestSuite) TestTraceMkDir() { ctx := context.Background() testCases := []struct { name string @@ -287,10 +299,10 @@ func TestTraceMkDir(t *testing.T) { {"disabled", false, []string{"MkDir"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -303,7 +315,7 @@ func TestTraceMkDir(t *testing.T) { err := m.MkDir(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -313,7 +325,7 @@ func TestTraceMkDir(t *testing.T) { } } -func TestTraceMkNode(t *testing.T) { +func (s *TracingTestSuite) TestTraceMkNode() { ctx := context.Background() testCases := []struct { name string @@ -324,10 +336,10 @@ func TestTraceMkNode(t *testing.T) { {"disabled", false, []string{"MkNode"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -340,7 +352,7 @@ func TestTraceMkNode(t *testing.T) { err := m.MkNode(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -350,7 +362,7 @@ func TestTraceMkNode(t *testing.T) { } } -func TestTraceCreateFile(t *testing.T) { +func (s *TracingTestSuite) TestTraceCreateFile() { ctx := context.Background() testCases := []struct { name string @@ -361,10 +373,10 @@ func TestTraceCreateFile(t *testing.T) { {"disabled", false, []string{"CreateFile"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -377,7 +389,7 @@ func TestTraceCreateFile(t *testing.T) { err := m.CreateFile(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -387,7 +399,7 @@ func TestTraceCreateFile(t *testing.T) { } } -func TestTraceCreateLink(t *testing.T) { +func (s *TracingTestSuite) TestTraceCreateLink() { ctx := context.Background() testCases := []struct { name string @@ -398,10 +410,10 @@ func TestTraceCreateLink(t *testing.T) { {"disabled", false, []string{"LookUpInode", "CreateLink"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -421,7 +433,7 @@ func TestTraceCreateLink(t *testing.T) { err = m.CreateLink(ctx, op) assert.Error(t, err) // The operation is not implemented, so we expect an error. - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -431,7 +443,7 @@ func TestTraceCreateLink(t *testing.T) { } } -func TestTraceCreateSymlink(t *testing.T) { +func (s *TracingTestSuite) TestTraceCreateSymlink() { ctx := context.Background() testCases := []struct { name string @@ -442,10 +454,10 @@ func TestTraceCreateSymlink(t *testing.T) { {"disabled", false, []string{"CreateSymlink"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -459,7 +471,7 @@ func TestTraceCreateSymlink(t *testing.T) { err := m.CreateSymlink(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -469,7 +481,7 @@ func TestTraceCreateSymlink(t *testing.T) { } } -func TestTraceRename(t *testing.T) { +func (s *TracingTestSuite) TestTraceRename() { ctx := context.Background() testCases := []struct { name string @@ -480,10 +492,10 @@ func TestTraceRename(t *testing.T) { {"disabled", false, []string{"Rename"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() oldName := "old" newName := "new" @@ -499,7 +511,7 @@ func TestTraceRename(t *testing.T) { err := m.Rename(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -509,7 +521,7 @@ func TestTraceRename(t *testing.T) { } } -func TestTraceRmDir(t *testing.T) { +func (s *TracingTestSuite) TestTraceRmDir() { ctx := context.Background() testCases := []struct { name string @@ -520,10 +532,10 @@ func TestTraceRmDir(t *testing.T) { {"disabled", false, []string{"MkDir", "RmDir"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -543,7 +555,7 @@ func TestTraceRmDir(t *testing.T) { err = m.RmDir(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -553,7 +565,7 @@ func TestTraceRmDir(t *testing.T) { } } -func TestTraceUnlink(t *testing.T) { +func (s *TracingTestSuite) TestTraceUnlink() { ctx := context.Background() testCases := []struct { name string @@ -564,10 +576,10 @@ func TestTraceUnlink(t *testing.T) { {"disabled", false, []string{"Unlink"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -580,7 +592,7 @@ func TestTraceUnlink(t *testing.T) { err := m.Unlink(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -590,7 +602,7 @@ func TestTraceUnlink(t *testing.T) { } } -func TestTraceOpenDir(t *testing.T) { +func (s *TracingTestSuite) TestTraceOpenDir() { ctx := context.Background() testCases := []struct { name string @@ -601,10 +613,10 @@ func TestTraceOpenDir(t *testing.T) { {"disabled", false, []string{"OpenDir"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -616,7 +628,7 @@ func TestTraceOpenDir(t *testing.T) { err := m.OpenDir(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -626,7 +638,7 @@ func TestTraceOpenDir(t *testing.T) { } } -func TestTraceReadDir(t *testing.T) { +func (s *TracingTestSuite) TestTraceReadDir() { ctx := context.Background() testCases := []struct { name string @@ -637,10 +649,10 @@ func TestTraceReadDir(t *testing.T) { {"disabled", false, []string{"OpenDir", "ReadDir"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -660,7 +672,7 @@ func TestTraceReadDir(t *testing.T) { err = m.ReadDir(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -670,7 +682,7 @@ func TestTraceReadDir(t *testing.T) { } } -func TestTraceReadDirPlus(t *testing.T) { +func (s *TracingTestSuite) TestTraceReadDirPlus() { ctx := context.Background() testCases := []struct { name string @@ -681,10 +693,10 @@ func TestTraceReadDirPlus(t *testing.T) { {"disabled", false, []string{"OpenDir", "ReadDirPlus"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -706,7 +718,7 @@ func TestTraceReadDirPlus(t *testing.T) { err = m.ReadDirPlus(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -716,7 +728,7 @@ func TestTraceReadDirPlus(t *testing.T) { } } -func TestTraceReleaseDirHandle(t *testing.T) { +func (s *TracingTestSuite) TestTraceReleaseDirHandle() { ctx := context.Background() testCases := []struct { name string @@ -727,10 +739,10 @@ func TestTraceReleaseDirHandle(t *testing.T) { {"disabled", false, []string{"OpenDir", "ReleaseDirHandle"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -747,7 +759,7 @@ func TestTraceReleaseDirHandle(t *testing.T) { err = m.ReleaseDirHandle(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -757,7 +769,7 @@ func TestTraceReleaseDirHandle(t *testing.T) { } } -func TestTraceOpenFile(t *testing.T) { +func (s *TracingTestSuite) TestTraceOpenFile() { ctx := context.Background() testCases := []struct { name string @@ -768,10 +780,10 @@ func TestTraceOpenFile(t *testing.T) { {"disabled", false, []string{"LookUpInode", "OpenFile"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -789,7 +801,7 @@ func TestTraceOpenFile(t *testing.T) { err = m.OpenFile(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -799,7 +811,7 @@ func TestTraceOpenFile(t *testing.T) { } } -func TestTraceReadFile(t *testing.T) { +func (s *TracingTestSuite) TestTraceReadFile() { ctx := context.Background() testCases := []struct { name string @@ -810,10 +822,10 @@ func TestTraceReadFile(t *testing.T) { {"disabled", false, []string{"LookUpInode", "OpenFile", "ReadFile"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test" content := "test content" @@ -838,7 +850,7 @@ func TestTraceReadFile(t *testing.T) { err = m.ReadFile(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -848,7 +860,7 @@ func TestTraceReadFile(t *testing.T) { } } -func TestTraceWriteFile(t *testing.T) { +func (s *TracingTestSuite) TestTraceWriteFile() { ctx := context.Background() testCases := []struct { name string @@ -859,10 +871,10 @@ func TestTraceWriteFile(t *testing.T) { {"disabled", false, []string{"LookUpInode", "OpenFile", "WriteFile"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "" @@ -888,7 +900,7 @@ func TestTraceWriteFile(t *testing.T) { err = m.WriteFile(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -898,7 +910,7 @@ func TestTraceWriteFile(t *testing.T) { } } -func TestTraceSyncFile(t *testing.T) { +func (s *TracingTestSuite) TestTraceSyncFile() { ctx := context.Background() testCases := []struct { name string @@ -909,10 +921,10 @@ func TestTraceSyncFile(t *testing.T) { {"disabled", false, []string{"LookUpInode", "SyncFile"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -930,7 +942,7 @@ func TestTraceSyncFile(t *testing.T) { err = m.SyncFile(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -940,7 +952,7 @@ func TestTraceSyncFile(t *testing.T) { } } -func TestTraceFlushFile(t *testing.T) { +func (s *TracingTestSuite) TestTraceFlushFile() { ctx := context.Background() testCases := []struct { name string @@ -951,10 +963,10 @@ func TestTraceFlushFile(t *testing.T) { {"disabled", false, []string{"LookUpInode", "OpenFile", "FlushFile"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -978,7 +990,7 @@ func TestTraceFlushFile(t *testing.T) { err = m.FlushFile(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -988,7 +1000,7 @@ func TestTraceFlushFile(t *testing.T) { } } -func TestTraceReleaseFileHandle(t *testing.T) { +func (s *TracingTestSuite) TestTraceReleaseFileHandle() { ctx := context.Background() testCases := []struct { name string @@ -999,10 +1011,10 @@ func TestTraceReleaseFileHandle(t *testing.T) { {"disabled", false, []string{"LookUpInode", "OpenFile", "ReleaseFileHandle"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -1025,7 +1037,7 @@ func TestTraceReleaseFileHandle(t *testing.T) { err = m.ReleaseFileHandle(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -1035,7 +1047,7 @@ func TestTraceReleaseFileHandle(t *testing.T) { } } -func TestTraceReadSymlink(t *testing.T) { +func (s *TracingTestSuite) TestTraceReadSymlink() { ctx := context.Background() testCases := []struct { name string @@ -1046,10 +1058,10 @@ func TestTraceReadSymlink(t *testing.T) { {"disabled", false, []string{"CreateSymlink", "ReadSymlink"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() _, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() symlinkName := "test" target := "target" @@ -1067,7 +1079,7 @@ func TestTraceReadSymlink(t *testing.T) { err = m.ReadSymlink(ctx, op) require.NoError(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -1077,7 +1089,7 @@ func TestTraceReadSymlink(t *testing.T) { } } -func TestTraceRemoveXattr(t *testing.T) { +func (s *TracingTestSuite) TestTraceRemoveXattr() { ctx := context.Background() testCases := []struct { name string @@ -1088,10 +1100,10 @@ func TestTraceRemoveXattr(t *testing.T) { {"disabled", false, []string{"LookUpInode", "RemoveXattr"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test" content := "test content" @@ -1110,7 +1122,7 @@ func TestTraceRemoveXattr(t *testing.T) { err = m.RemoveXattr(ctx, op) assert.Error(t, err) // The operation is not implemented, so we expect an error. - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -1120,7 +1132,7 @@ func TestTraceRemoveXattr(t *testing.T) { } } -func TestTraceGetXattr(t *testing.T) { +func (s *TracingTestSuite) TestTraceGetXattr() { ctx := context.Background() testCases := []struct { name string @@ -1131,10 +1143,10 @@ func TestTraceGetXattr(t *testing.T) { {"disabled", false, []string{"LookUpInode", "GetXattr"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test" content := "test content" @@ -1153,7 +1165,7 @@ func TestTraceGetXattr(t *testing.T) { err = m.GetXattr(ctx, op) assert.NotNil(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -1163,7 +1175,7 @@ func TestTraceGetXattr(t *testing.T) { } } -func TestTraceListXattr(t *testing.T) { +func (s *TracingTestSuite) TestTraceListXattr() { ctx := context.Background() testCases := []struct { name string @@ -1174,10 +1186,10 @@ func TestTraceListXattr(t *testing.T) { {"disabled", false, []string{"LookUpInode", "ListXattr"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -1195,7 +1207,7 @@ func TestTraceListXattr(t *testing.T) { err = m.ListXattr(ctx, op) assert.NotNil(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -1205,7 +1217,7 @@ func TestTraceListXattr(t *testing.T) { } } -func TestTraceSetXattr(t *testing.T) { +func (s *TracingTestSuite) TestTraceSetXattr() { ctx := context.Background() testCases := []struct { name string @@ -1216,10 +1228,10 @@ func TestTraceSetXattr(t *testing.T) { {"disabled", false, []string{"LookUpInode", "SetXattr"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -1239,7 +1251,7 @@ func TestTraceSetXattr(t *testing.T) { err = m.SetXattr(ctx, op) assert.NotNil(t, err) - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -1249,7 +1261,7 @@ func TestTraceSetXattr(t *testing.T) { } } -func TestTraceFallocate(t *testing.T) { +func (s *TracingTestSuite) TestTraceFallocate() { ctx := context.Background() testCases := []struct { name string @@ -1260,10 +1272,10 @@ func TestTraceFallocate(t *testing.T) { {"disabled", false, []string{"LookUpInode", "OpenFile", "Fallocate"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -1290,7 +1302,7 @@ func TestTraceFallocate(t *testing.T) { err = m.Fallocate(ctx, op) assert.Error(t, err) // The operation is not implemented, so we expect an error. - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -1300,7 +1312,7 @@ func TestTraceFallocate(t *testing.T) { } } -func TestTraceSyncFS(t *testing.T) { +func (s *TracingTestSuite) TestTraceSyncFS() { ctx := context.Background() testCases := []struct { name string @@ -1311,10 +1323,10 @@ func TestTraceSyncFS(t *testing.T) { {"disabled", false, []string{"LookUpInode", "SyncFS"}}, } for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ex := newInMemoryExporter(t) + s.Run(tt.name, func() { + t := s.T() bucket, server := createTestFileSystemWithTraces(ctx, t, tt.ignoreInterrupts) - m := wrappers.WithTracing(server) + m := wrappers.WithTracing(server, tracing.NewOTELTracer()) ctx := context.Background() fileName := "test.txt" content := "test content" @@ -1332,7 +1344,7 @@ func TestTraceSyncFS(t *testing.T) { err = m.SyncFS(ctx, op) assert.Error(t, err) // The operation is not implemented, so we expect an error. - ss := ex.GetSpans() + ss := s.globalExporter.GetSpans() require.Len(t, ss, len(tt.spans)) for i, spanName := range tt.spans { assert.Equal(t, spanName, ss[i].Name) @@ -1341,3 +1353,7 @@ func TestTraceSyncFS(t *testing.T) { }) } } + +func TestTracingTestSuite(t *testing.T) { + suite.Run(t, new(TracingTestSuite)) +} diff --git a/internal/fs/wrappers/tracing.go b/internal/fs/wrappers/tracing.go index 5ef16af61c..1cb096072a 100644 --- a/internal/fs/wrappers/tracing.go +++ b/internal/fs/wrappers/tracing.go @@ -19,162 +19,160 @@ import ( "github.com/jacobsa/fuse/fuseops" "github.com/jacobsa/fuse/fuseutil" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" + + tracing "github.com/googlecloudplatform/gcsfuse/v3/tracing" ) -type tracing struct { - wrapped fuseutil.FileSystem - tracer trace.Tracer +type tracedFS struct { + wrapped fuseutil.FileSystem + traceHandle tracing.TraceHandle } // WithTracing wraps a FileSystem and creates a root trace. -func WithTracing(wrapped fuseutil.FileSystem) fuseutil.FileSystem { - return &tracing{ - wrapped: wrapped, - tracer: otel.Tracer(name), +func WithTracing(wrapped fuseutil.FileSystem, traceHandle tracing.TraceHandle) fuseutil.FileSystem { + return &tracedFS{ + wrapped: wrapped, + traceHandle: traceHandle, } } -func (fs *tracing) Destroy() { +func (fs *tracedFS) Destroy() { fs.wrapped.Destroy() } -func (fs *tracing) invokeWrapped(ctx context.Context, opName string, w wrappedCall) error { +func (fs *tracedFS) invokeWrapped(ctx context.Context, opName string, w wrappedCall) error { // Span's SpanKind is set to trace.SpanKindServer since GCSFuse is like a server for the requests that the Kernel sends. - ctx, span := fs.tracer.Start(ctx, opName, trace.WithSpanKind(trace.SpanKindServer)) - defer span.End() + ctx, span := fs.traceHandle.StartServerSpan(ctx, opName) + defer fs.traceHandle.EndSpan(span) err := w(ctx) if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) + fs.traceHandle.RecordError(span, err) } return err } -func (fs *tracing) StatFS(ctx context.Context, op *fuseops.StatFSOp) error { - return fs.invokeWrapped(ctx, "StatFS", func(ctx context.Context) error { return fs.wrapped.StatFS(ctx, op) }) +func (fs *tracedFS) StatFS(ctx context.Context, op *fuseops.StatFSOp) error { + return fs.invokeWrapped(ctx, tracing.StatFS, func(ctx context.Context) error { return fs.wrapped.StatFS(ctx, op) }) } -func (fs *tracing) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error { - return fs.invokeWrapped(ctx, "LookUpInode", func(ctx context.Context) error { return fs.wrapped.LookUpInode(ctx, op) }) +func (fs *tracedFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error { + return fs.invokeWrapped(ctx, tracing.LookUpInode, func(ctx context.Context) error { return fs.wrapped.LookUpInode(ctx, op) }) } -func (fs *tracing) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error { - return fs.invokeWrapped(ctx, "GetInodeAttributes", func(ctx context.Context) error { return fs.wrapped.GetInodeAttributes(ctx, op) }) +func (fs *tracedFS) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error { + return fs.invokeWrapped(ctx, tracing.GetInodeAttributes, func(ctx context.Context) error { return fs.wrapped.GetInodeAttributes(ctx, op) }) } -func (fs *tracing) SetInodeAttributes(ctx context.Context, op *fuseops.SetInodeAttributesOp) error { - return fs.invokeWrapped(ctx, "SetInodeAttributes", func(ctx context.Context) error { return fs.wrapped.SetInodeAttributes(ctx, op) }) +func (fs *tracedFS) SetInodeAttributes(ctx context.Context, op *fuseops.SetInodeAttributesOp) error { + return fs.invokeWrapped(ctx, tracing.SetInodeAttributes, func(ctx context.Context) error { return fs.wrapped.SetInodeAttributes(ctx, op) }) } -func (fs *tracing) ForgetInode(ctx context.Context, op *fuseops.ForgetInodeOp) error { - return fs.invokeWrapped(ctx, "ForgetInode", func(ctx context.Context) error { return fs.wrapped.ForgetInode(ctx, op) }) +func (fs *tracedFS) ForgetInode(ctx context.Context, op *fuseops.ForgetInodeOp) error { + return fs.invokeWrapped(ctx, tracing.ForgetInode, func(ctx context.Context) error { return fs.wrapped.ForgetInode(ctx, op) }) } -func (fs *tracing) BatchForget(ctx context.Context, op *fuseops.BatchForgetOp) error { - return fs.invokeWrapped(ctx, "BatchForget", func(ctx context.Context) error { return fs.wrapped.BatchForget(ctx, op) }) +func (fs *tracedFS) BatchForget(ctx context.Context, op *fuseops.BatchForgetOp) error { + return fs.invokeWrapped(ctx, tracing.BatchForget, func(ctx context.Context) error { return fs.wrapped.BatchForget(ctx, op) }) } -func (fs *tracing) MkDir(ctx context.Context, op *fuseops.MkDirOp) error { - return fs.invokeWrapped(ctx, "MkDir", func(ctx context.Context) error { return fs.wrapped.MkDir(ctx, op) }) +func (fs *tracedFS) MkDir(ctx context.Context, op *fuseops.MkDirOp) error { + return fs.invokeWrapped(ctx, tracing.MkDir, func(ctx context.Context) error { return fs.wrapped.MkDir(ctx, op) }) } -func (fs *tracing) MkNode(ctx context.Context, op *fuseops.MkNodeOp) error { - return fs.invokeWrapped(ctx, "MkNode", func(ctx context.Context) error { return fs.wrapped.MkNode(ctx, op) }) +func (fs *tracedFS) MkNode(ctx context.Context, op *fuseops.MkNodeOp) error { + return fs.invokeWrapped(ctx, tracing.MkNode, func(ctx context.Context) error { return fs.wrapped.MkNode(ctx, op) }) } -func (fs *tracing) CreateFile(ctx context.Context, op *fuseops.CreateFileOp) error { - return fs.invokeWrapped(ctx, "CreateFile", func(ctx context.Context) error { return fs.wrapped.CreateFile(ctx, op) }) +func (fs *tracedFS) CreateFile(ctx context.Context, op *fuseops.CreateFileOp) error { + return fs.invokeWrapped(ctx, tracing.CreateFile, func(ctx context.Context) error { return fs.wrapped.CreateFile(ctx, op) }) } -func (fs *tracing) CreateLink(ctx context.Context, op *fuseops.CreateLinkOp) error { - return fs.invokeWrapped(ctx, "CreateLink", func(ctx context.Context) error { return fs.wrapped.CreateLink(ctx, op) }) +func (fs *tracedFS) CreateLink(ctx context.Context, op *fuseops.CreateLinkOp) error { + return fs.invokeWrapped(ctx, tracing.CreateLink, func(ctx context.Context) error { return fs.wrapped.CreateLink(ctx, op) }) } -func (fs *tracing) CreateSymlink(ctx context.Context, op *fuseops.CreateSymlinkOp) error { - return fs.invokeWrapped(ctx, "CreateSymlink", func(ctx context.Context) error { return fs.wrapped.CreateSymlink(ctx, op) }) +func (fs *tracedFS) CreateSymlink(ctx context.Context, op *fuseops.CreateSymlinkOp) error { + return fs.invokeWrapped(ctx, tracing.CreateSymlink, func(ctx context.Context) error { return fs.wrapped.CreateSymlink(ctx, op) }) } -func (fs *tracing) Rename(ctx context.Context, op *fuseops.RenameOp) error { - return fs.invokeWrapped(ctx, "Rename", func(ctx context.Context) error { return fs.wrapped.Rename(ctx, op) }) +func (fs *tracedFS) Rename(ctx context.Context, op *fuseops.RenameOp) error { + return fs.invokeWrapped(ctx, tracing.Rename, func(ctx context.Context) error { return fs.wrapped.Rename(ctx, op) }) } -func (fs *tracing) RmDir(ctx context.Context, op *fuseops.RmDirOp) error { - return fs.invokeWrapped(ctx, "RmDir", func(ctx context.Context) error { return fs.wrapped.RmDir(ctx, op) }) +func (fs *tracedFS) RmDir(ctx context.Context, op *fuseops.RmDirOp) error { + return fs.invokeWrapped(ctx, tracing.RmDir, func(ctx context.Context) error { return fs.wrapped.RmDir(ctx, op) }) } -func (fs *tracing) Unlink(ctx context.Context, op *fuseops.UnlinkOp) error { - return fs.invokeWrapped(ctx, "Unlink", func(ctx context.Context) error { return fs.wrapped.Unlink(ctx, op) }) +func (fs *tracedFS) Unlink(ctx context.Context, op *fuseops.UnlinkOp) error { + return fs.invokeWrapped(ctx, tracing.Unlink, func(ctx context.Context) error { return fs.wrapped.Unlink(ctx, op) }) } -func (fs *tracing) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) error { - return fs.invokeWrapped(ctx, "OpenDir", func(ctx context.Context) error { return fs.wrapped.OpenDir(ctx, op) }) +func (fs *tracedFS) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) error { + return fs.invokeWrapped(ctx, tracing.OpenDir, func(ctx context.Context) error { return fs.wrapped.OpenDir(ctx, op) }) } -func (fs *tracing) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error { - return fs.invokeWrapped(ctx, "ReadDir", func(ctx context.Context) error { return fs.wrapped.ReadDir(ctx, op) }) +func (fs *tracedFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error { + return fs.invokeWrapped(ctx, tracing.ReadDir, func(ctx context.Context) error { return fs.wrapped.ReadDir(ctx, op) }) } -func (fs *tracing) ReadDirPlus(ctx context.Context, op *fuseops.ReadDirPlusOp) error { - return fs.invokeWrapped(ctx, "ReadDirPlus", func(ctx context.Context) error { return fs.wrapped.ReadDirPlus(ctx, op) }) +func (fs *tracedFS) ReadDirPlus(ctx context.Context, op *fuseops.ReadDirPlusOp) error { + return fs.invokeWrapped(ctx, tracing.ReadDirPlus, func(ctx context.Context) error { return fs.wrapped.ReadDirPlus(ctx, op) }) } -func (fs *tracing) ReleaseDirHandle(ctx context.Context, op *fuseops.ReleaseDirHandleOp) error { - return fs.invokeWrapped(ctx, "ReleaseDirHandle", func(ctx context.Context) error { return fs.wrapped.ReleaseDirHandle(ctx, op) }) +func (fs *tracedFS) ReleaseDirHandle(ctx context.Context, op *fuseops.ReleaseDirHandleOp) error { + return fs.invokeWrapped(ctx, tracing.ReleaseDirHandle, func(ctx context.Context) error { return fs.wrapped.ReleaseDirHandle(ctx, op) }) } -func (fs *tracing) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error { - return fs.invokeWrapped(ctx, "OpenFile", func(ctx context.Context) error { return fs.wrapped.OpenFile(ctx, op) }) +func (fs *tracedFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error { + return fs.invokeWrapped(ctx, tracing.OpenFile, func(ctx context.Context) error { return fs.wrapped.OpenFile(ctx, op) }) } -func (fs *tracing) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error { - return fs.invokeWrapped(ctx, "ReadFile", func(ctx context.Context) error { return fs.wrapped.ReadFile(ctx, op) }) +func (fs *tracedFS) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error { + return fs.invokeWrapped(ctx, tracing.ReadFile, func(ctx context.Context) error { return fs.wrapped.ReadFile(ctx, op) }) } -func (fs *tracing) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) error { - return fs.invokeWrapped(ctx, "WriteFile", func(ctx context.Context) error { return fs.wrapped.WriteFile(ctx, op) }) +func (fs *tracedFS) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) error { + return fs.invokeWrapped(ctx, tracing.WriteFile, func(ctx context.Context) error { return fs.wrapped.WriteFile(ctx, op) }) } -func (fs *tracing) SyncFile(ctx context.Context, op *fuseops.SyncFileOp) error { - return fs.invokeWrapped(ctx, "SyncFile", func(ctx context.Context) error { return fs.wrapped.SyncFile(ctx, op) }) +func (fs *tracedFS) SyncFile(ctx context.Context, op *fuseops.SyncFileOp) error { + return fs.invokeWrapped(ctx, tracing.SyncFile, func(ctx context.Context) error { return fs.wrapped.SyncFile(ctx, op) }) } -func (fs *tracing) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) error { - return fs.invokeWrapped(ctx, "FlushFile", func(ctx context.Context) error { return fs.wrapped.FlushFile(ctx, op) }) +func (fs *tracedFS) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) error { + return fs.invokeWrapped(ctx, tracing.FlushFile, func(ctx context.Context) error { return fs.wrapped.FlushFile(ctx, op) }) } -func (fs *tracing) ReleaseFileHandle(ctx context.Context, op *fuseops.ReleaseFileHandleOp) error { - return fs.invokeWrapped(ctx, "ReleaseFileHandle", func(ctx context.Context) error { return fs.wrapped.ReleaseFileHandle(ctx, op) }) +func (fs *tracedFS) ReleaseFileHandle(ctx context.Context, op *fuseops.ReleaseFileHandleOp) error { + return fs.invokeWrapped(ctx, tracing.ReleaseFileHandle, func(ctx context.Context) error { return fs.wrapped.ReleaseFileHandle(ctx, op) }) } -func (fs *tracing) ReadSymlink(ctx context.Context, op *fuseops.ReadSymlinkOp) error { - return fs.invokeWrapped(ctx, "ReadSymlink", func(ctx context.Context) error { return fs.wrapped.ReadSymlink(ctx, op) }) +func (fs *tracedFS) ReadSymlink(ctx context.Context, op *fuseops.ReadSymlinkOp) error { + return fs.invokeWrapped(ctx, tracing.ReadSymlink, func(ctx context.Context) error { return fs.wrapped.ReadSymlink(ctx, op) }) } -func (fs *tracing) RemoveXattr(ctx context.Context, op *fuseops.RemoveXattrOp) error { - return fs.invokeWrapped(ctx, "RemoveXattr", func(ctx context.Context) error { return fs.wrapped.RemoveXattr(ctx, op) }) +func (fs *tracedFS) RemoveXattr(ctx context.Context, op *fuseops.RemoveXattrOp) error { + return fs.invokeWrapped(ctx, tracing.RemoveXattr, func(ctx context.Context) error { return fs.wrapped.RemoveXattr(ctx, op) }) } -func (fs *tracing) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) error { - return fs.invokeWrapped(ctx, "GetXattr", func(ctx context.Context) error { return fs.wrapped.GetXattr(ctx, op) }) +func (fs *tracedFS) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) error { + return fs.invokeWrapped(ctx, tracing.GetXattr, func(ctx context.Context) error { return fs.wrapped.GetXattr(ctx, op) }) } -func (fs *tracing) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) error { - return fs.invokeWrapped(ctx, "ListXattr", func(ctx context.Context) error { return fs.wrapped.ListXattr(ctx, op) }) +func (fs *tracedFS) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) error { + return fs.invokeWrapped(ctx, tracing.ListXattr, func(ctx context.Context) error { return fs.wrapped.ListXattr(ctx, op) }) } -func (fs *tracing) SetXattr(ctx context.Context, op *fuseops.SetXattrOp) error { - return fs.invokeWrapped(ctx, "SetXattr", func(ctx context.Context) error { return fs.wrapped.SetXattr(ctx, op) }) +func (fs *tracedFS) SetXattr(ctx context.Context, op *fuseops.SetXattrOp) error { + return fs.invokeWrapped(ctx, tracing.SetXattr, func(ctx context.Context) error { return fs.wrapped.SetXattr(ctx, op) }) } -func (fs *tracing) Fallocate(ctx context.Context, op *fuseops.FallocateOp) error { - return fs.invokeWrapped(ctx, "Fallocate", func(ctx context.Context) error { return fs.wrapped.Fallocate(ctx, op) }) +func (fs *tracedFS) Fallocate(ctx context.Context, op *fuseops.FallocateOp) error { + return fs.invokeWrapped(ctx, tracing.Fallocate, func(ctx context.Context) error { return fs.wrapped.Fallocate(ctx, op) }) } -func (fs *tracing) SyncFS(ctx context.Context, op *fuseops.SyncFSOp) error { - return fs.invokeWrapped(ctx, "SyncFS", func(ctx context.Context) error { return fs.wrapped.SyncFS(ctx, op) }) +func (fs *tracedFS) SyncFS(ctx context.Context, op *fuseops.SyncFSOp) error { + return fs.invokeWrapped(ctx, tracing.SyncFS, func(ctx context.Context) error { return fs.wrapped.SyncFS(ctx, op) }) } diff --git a/internal/fs/wrappers/tracing_test.go b/internal/fs/wrappers/tracing_test.go index b621111032..94693261a1 100644 --- a/internal/fs/wrappers/tracing_test.go +++ b/internal/fs/wrappers/tracing_test.go @@ -25,6 +25,8 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace" + + tracing "github.com/googlecloudplatform/gcsfuse/v3/tracing" ) func newInMemoryExporter(t *testing.T) *tracetest.InMemoryExporter { @@ -170,9 +172,9 @@ func TestSpanCreation(t *testing.T) { t.Cleanup(func() { ex.Reset() }) - m := tracing{ - wrapped: dummyFS{}, - tracer: otel.Tracer("test"), + m := tracedFS{ + wrapped: dummyFS{}, + traceHandle: tracing.NewOTELTracer(), } err := m.StatFS(context.Background(), nil) diff --git a/internal/gcsx/file_cache_reader.go b/internal/gcsx/file_cache_reader.go index 2df5f17940..edc4058f0d 100644 --- a/internal/gcsx/file_cache_reader.go +++ b/internal/gcsx/file_cache_reader.go @@ -29,6 +29,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/logger" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/jacobsa/fuse/fuseops" ) @@ -66,16 +67,19 @@ type FileCacheReader struct { metricHandle metrics.MetricHandle + traceHandle tracing.TraceHandle + handleID fuseops.HandleID } -func NewFileCacheReader(o *gcs.MinObject, bucket gcs.Bucket, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle metrics.MetricHandle, handleID fuseops.HandleID) *FileCacheReader { +func NewFileCacheReader(o *gcs.MinObject, bucket gcs.Bucket, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle, handleID fuseops.HandleID) *FileCacheReader { return &FileCacheReader{ object: o, bucket: bucket, fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: cacheFileForRangeRead, metricHandle: metricHandle, + traceHandle: traceHandle, handleID: handleID, } } diff --git a/internal/gcsx/file_cache_reader_test.go b/internal/gcsx/file_cache_reader_test.go index ae2c97dc18..6960616385 100644 --- a/internal/gcsx/file_cache_reader_test.go +++ b/internal/gcsx/file_cache_reader_test.go @@ -36,6 +36,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" testutil "github.com/googlecloudplatform/gcsfuse/v3/internal/util" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -92,10 +93,10 @@ func (t *fileCacheReaderTest) SetupTest() { t.cacheDir = path.Join(os.Getenv("HOME"), "test_cache_dir") lruCache := lru.NewCache(cacheMaxSize) fileCacheConfig := &cfg.FileCacheConfig{EnableCrc: false} - t.jobManager = downloader.NewJobManager(lruCache, util.DefaultFilePerm, util.DefaultDirPerm, t.cacheDir, sequentialReadSizeInMb, fileCacheConfig, metrics.NewNoopMetrics()) + t.jobManager = downloader.NewJobManager(lruCache, util.DefaultFilePerm, util.DefaultDirPerm, t.cacheDir, sequentialReadSizeInMb, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer()) t.cacheHandler = file.NewCacheHandler(lruCache, t.jobManager, t.cacheDir, util.DefaultFilePerm, util.DefaultDirPerm, "", "", false) - t.reader = NewFileCacheReader(t.object, t.mockBucket, t.cacheHandler, true, metrics.NewNoopMetrics(), 0) - t.reader_unfinalized_object = NewFileCacheReader(t.unfinalized_object, t.mockBucket, t.cacheHandler, true, metrics.NewNoopMetrics(), 0) + t.reader = NewFileCacheReader(t.object, t.mockBucket, t.cacheHandler, true, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), 0) + t.reader_unfinalized_object = NewFileCacheReader(t.unfinalized_object, t.mockBucket, t.cacheHandler, true, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), 0) t.ctx = context.Background() } @@ -120,19 +121,20 @@ func getReadCloser(content []byte) io.ReadCloser { } func (t *fileCacheReaderTest) TestNewFileCacheReader() { - reader := NewFileCacheReader(t.object, t.mockBucket, t.cacheHandler, true, nil, 0) + reader := NewFileCacheReader(t.object, t.mockBucket, t.cacheHandler, true, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), 0) assert.NotNil(t.T(), reader) assert.Equal(t.T(), t.object, reader.object) assert.Equal(t.T(), t.mockBucket, reader.bucket) assert.Equal(t.T(), t.cacheHandler, reader.fileCacheHandler) assert.True(t.T(), reader.cacheFileForRangeRead) - assert.Nil(t.T(), reader.metricHandle) + assert.NotNil(t.T(), reader.metricHandle) + assert.NotNil(t.T(), reader.traceHandle) assert.Nil(t.T(), reader.fileCacheHandle) } func (t *fileCacheReaderTest) Test_ReadAt_NilFileCacheHandlerThrowFallBackError() { - reader := NewFileCacheReader(t.object, t.mockBucket, nil, true, nil, 0) + reader := NewFileCacheReader(t.object, t.mockBucket, nil, true, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), 0) readResponse, err := reader.ReadAt(t.ctx, &ReadRequest{ Buffer: make([]byte, 10), diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 09d7840d98..179a345e2d 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -32,6 +32,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/logger" "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/jacobsa/fuse/fuseops" "golang.org/x/net/context" ) @@ -107,7 +108,7 @@ const ( // NewRandomReader create a random reader for the supplied object record that // reads using the given bucket. -func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb int32, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle metrics.MetricHandle, mrdWrapper *MultiRangeDownloaderWrapper, config *cfg.Config, handleID fuseops.HandleID) RandomReader { +func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb int32, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle, mrdWrapper *MultiRangeDownloaderWrapper, config *cfg.Config, handleID fuseops.HandleID) RandomReader { return &randomReader{ object: o, bucket: bucket, @@ -118,6 +119,7 @@ func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb i cacheFileForRangeRead: cacheFileForRangeRead, mrdWrapper: mrdWrapper, metricHandle: metricHandle, + traceHandle: traceHandle, config: config, handleID: handleID, } @@ -178,6 +180,8 @@ type randomReader struct { metricHandle metrics.MetricHandle + traceHandle tracing.TraceHandle + config *cfg.Config // Specifies the next expected offset for the reads. Used to distinguish between diff --git a/internal/gcsx/random_reader_stretchr_test.go b/internal/gcsx/random_reader_stretchr_test.go index 1009539027..caf2138c68 100644 --- a/internal/gcsx/random_reader_stretchr_test.go +++ b/internal/gcsx/random_reader_stretchr_test.go @@ -35,6 +35,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" testutil "github.com/googlecloudplatform/gcsfuse/v3/internal/util" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -75,11 +76,11 @@ func (t *RandomReaderStretchrTest) SetupTest() { fileCacheConfig := &cfg.FileCacheConfig{ EnableCrc: false, } - t.jobManager = downloader.NewJobManager(lruCache, util.DefaultFilePerm, util.DefaultDirPerm, t.cacheDir, sequentialReadSizeInMb, fileCacheConfig, nil) + t.jobManager = downloader.NewJobManager(lruCache, util.DefaultFilePerm, util.DefaultDirPerm, t.cacheDir, sequentialReadSizeInMb, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer()) t.cacheHandler = file.NewCacheHandler(lruCache, t.jobManager, t.cacheDir, util.DefaultFilePerm, util.DefaultDirPerm, "", "", false) // Set up the reader. - rr := NewRandomReader(t.object, t.mockBucket, sequentialReadSizeInMb, nil, false, metrics.NewNoopMetrics(), nil, nil, 0) + rr := NewRandomReader(t.object, t.mockBucket, sequentialReadSizeInMb, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), nil, nil, 0) t.rr.wrapped = rr.(*randomReader) } diff --git a/internal/gcsx/random_reader_test.go b/internal/gcsx/random_reader_test.go index 6c61889b59..aadb2db181 100644 --- a/internal/gcsx/random_reader_test.go +++ b/internal/gcsx/random_reader_test.go @@ -37,6 +37,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" testutil "github.com/googlecloudplatform/gcsfuse/v3/internal/util" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/jacobsa/fuse/fuseops" . "github.com/jacobsa/oglematchers" . "github.com/jacobsa/oglemock" @@ -182,11 +183,11 @@ func (t *RandomReaderTest) SetUp(ti *TestInfo) { fileCacheConfig := &cfg.FileCacheConfig{ EnableCrc: false, } - t.jobManager = downloader.NewJobManager(lruCache, util.DefaultFilePerm, util.DefaultDirPerm, t.cacheDir, sequentialReadSizeInMb, fileCacheConfig, metrics.NewNoopMetrics()) + t.jobManager = downloader.NewJobManager(lruCache, util.DefaultFilePerm, util.DefaultDirPerm, t.cacheDir, sequentialReadSizeInMb, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer()) t.cacheHandler = file.NewCacheHandler(lruCache, t.jobManager, t.cacheDir, util.DefaultFilePerm, util.DefaultDirPerm, "", "", false) // Set up the reader. - rr := NewRandomReader(t.object, t.bucket, sequentialReadSizeInMb, nil, false, metrics.NewNoopMetrics(), nil, nil, 0) + rr := NewRandomReader(t.object, t.bucket, sequentialReadSizeInMb, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), nil, nil, 0) t.rr.wrapped = rr.(*randomReader) } @@ -548,7 +549,7 @@ func (t *RandomReaderTest) UpgradesSequentialReads_NoExistingReader() { t.object.Size = 1 << 40 const readSize = 1 * MiB // Set up the custom randomReader. - rr := NewRandomReader(t.object, t.bucket, readSize/MiB, nil, false, metrics.NewNoopMetrics(), nil, nil, 0) + rr := NewRandomReader(t.object, t.bucket, readSize/MiB, nil, false, metrics.NewNoopMetrics(), tracing.NewNoopTracer(), nil, nil, 0) t.rr.wrapped = rr.(*randomReader) // Simulate a previous exhausted reader that ended at the offset from which diff --git a/internal/gcsx/read_manager/read_manager.go b/internal/gcsx/read_manager/read_manager.go index 2a72c8423f..1ce43590db 100644 --- a/internal/gcsx/read_manager/read_manager.go +++ b/internal/gcsx/read_manager/read_manager.go @@ -31,6 +31,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "golang.org/x/sync/semaphore" ) @@ -45,6 +46,8 @@ type ReadManager struct { // readTypeClassifier tracks the read access pattern (e.g., sequential, random) // across all readers for a file handle to optimize read strategies. readTypeClassifier *gcsx.ReadTypeClassifier + + traceHandle tracing.TraceHandle } // ReadManagerConfig holds the configuration parameters for creating a new ReadManager. @@ -53,6 +56,7 @@ type ReadManagerConfig struct { FileCacheHandler *file.CacheHandler CacheFileForRangeRead bool MetricHandle metrics.MetricHandle + TraceHandle tracing.TraceHandle MrdWrapper *gcsx.MultiRangeDownloaderWrapper Config *cfg.Config GlobalMaxBlocksSem *semaphore.Weighted @@ -75,6 +79,7 @@ func NewReadManager(object *gcs.MinObject, bucket gcs.Bucket, config *ReadManage config.FileCacheHandler, config.CacheFileForRangeRead, config.MetricHandle, + config.TraceHandle, config.HandleID, ) readers = append(readers, fileCacheReader) // File cache reader is prioritized. @@ -99,6 +104,7 @@ func NewReadManager(object *gcs.MinObject, bucket gcs.Bucket, config *ReadManage GlobalMaxBlocksSem: config.GlobalMaxBlocksSem, WorkerPool: config.WorkerPool, MetricHandle: config.MetricHandle, + TraceHandle: config.TraceHandle, ReadTypeClassifier: readClassifier, HandleID: config.HandleID, } @@ -128,6 +134,7 @@ func NewReadManager(object *gcs.MinObject, bucket gcs.Bucket, config *ReadManage object: object, readers: readers, // Readers are prioritized: file cache first, then GCS. readTypeClassifier: readClassifier, + traceHandle: config.TraceHandle, } } diff --git a/internal/gcsx/read_manager/read_manager_test.go b/internal/gcsx/read_manager/read_manager_test.go index c1627ba98e..a85ea97e11 100644 --- a/internal/gcsx/read_manager/read_manager_test.go +++ b/internal/gcsx/read_manager/read_manager_test.go @@ -40,6 +40,7 @@ import ( testUtil "github.com/googlecloudplatform/gcsfuse/v3/internal/util" "github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool" "github.com/googlecloudplatform/gcsfuse/v3/metrics" + "github.com/googlecloudplatform/gcsfuse/v3/tracing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -83,7 +84,7 @@ func (t *readManagerTest) readManagerConfig(fileCacheEnable bool, bufferedReadEn cacheDir := path.Join(os.Getenv("HOME"), "test_cache_dir") lruCache := lru.NewCache(cacheMaxSize) fileCacheConfig := &cfg.FileCacheConfig{EnableCrc: false} - jobManager := downloader.NewJobManager(lruCache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, sequentialReadSizeInMb, fileCacheConfig, metrics.NewNoopMetrics()) + jobManager := downloader.NewJobManager(lruCache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, sequentialReadSizeInMb, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer()) config.FileCacheHandler = file.NewCacheHandler(lruCache, jobManager, cacheDir, util.DefaultFilePerm, util.DefaultDirPerm, "", "", false) } else { config.FileCacheHandler = nil diff --git a/tracing/noop_tracer.go b/tracing/noop_tracer.go new file mode 100644 index 0000000000..4ffc0f16a8 --- /dev/null +++ b/tracing/noop_tracer.go @@ -0,0 +1,40 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" +) + +type noopTracer struct{} + +func (*noopTracer) StartSpan(ctx context.Context, traceName string) (context.Context, trace.Span) { + return ctx, noop.Span{} +} + +func (*noopTracer) StartServerSpan(ctx context.Context, traceName string) (context.Context, trace.Span) { + return ctx, noop.Span{} +} + +func (*noopTracer) EndSpan(span trace.Span) {} + +func (*noopTracer) RecordError(span trace.Span, err error) {} + +func NewNoopTracer() TraceHandle { + return new(noopTracer) +} diff --git a/tracing/otel_tracer.go b/tracing/otel_tracer.go new file mode 100644 index 0000000000..0222e07811 --- /dev/null +++ b/tracing/otel_tracer.go @@ -0,0 +1,46 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +type otelTracer struct{} + +func (o *otelTracer) StartSpan(ctx context.Context, traceName string) (context.Context, trace.Span) { + return GCSFuseTracer().Start(ctx, traceName) +} + +func (o *otelTracer) StartServerSpan(ctx context.Context, traceName string) (context.Context, trace.Span) { + return GCSFuseTracer().Start(ctx, traceName, trace.WithSpanKind(trace.SpanKindServer)) +} + +func (o *otelTracer) EndSpan(span trace.Span) { + span.End() +} + +func (o *otelTracer) RecordError(span trace.Span, err error) { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) +} + +func NewOTELTracer() TraceHandle { + var o otelTracer + return &o +} diff --git a/tracing/span_names.go b/tracing/span_names.go new file mode 100644 index 0000000000..ed458673fc --- /dev/null +++ b/tracing/span_names.go @@ -0,0 +1,55 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package tracing contains the constants and utilities for OpenTelemetry +// instrumentation within gcsfuse. +package tracing + +// Span name constants for GCSFuse operations. +// These constants define the canonical names for spans created during FUSE +// operations. Using these constants ensures consistency and better readability and a single source of truth where all the span names are listed. + +const ( + StatFS = "StatFS" + LookUpInode = "LookUpInode" + GetInodeAttributes = "GetInodeAttributes" + SetInodeAttributes = "SetInodeAttributes" + ForgetInode = "ForgetInode" + BatchForget = "BatchForget" + MkDir = "MkDir" + MkNode = "MkNode" + CreateFile = "CreateFile" + CreateLink = "CreateLink" + CreateSymlink = "CreateSymlink" + Rename = "Rename" + RmDir = "RmDir" + Unlink = "Unlink" + OpenDir = "OpenDir" + ReadDir = "ReadDir" + ReadDirPlus = "ReadDirPlus" + ReleaseDirHandle = "ReleaseDirHandle" + OpenFile = "OpenFile" + ReadFile = "ReadFile" + WriteFile = "WriteFile" + SyncFile = "SyncFile" + FlushFile = "FlushFile" + ReleaseFileHandle = "ReleaseFileHandle" + ReadSymlink = "ReadSymlink" + RemoveXattr = "RemoveXattr" + GetXattr = "GetXattr" + ListXattr = "ListXattr" + SetXattr = "SetXattr" + Fallocate = "Fallocate" + SyncFS = "SyncFS" +) diff --git a/tracing/trace_handle.go b/tracing/trace_handle.go new file mode 100644 index 0000000000..2df8a2d300 --- /dev/null +++ b/tracing/trace_handle.go @@ -0,0 +1,36 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel/trace" +) + +// TraceHandle provides an interface for recording traces, trace links and everything related to tracing. This allows easier switching between various trace-implementations, especially with a custom no-op tracer. +type TraceHandle interface { + // Start a span with a given name & context + StartSpan(ctx context.Context, traceName string) (context.Context, trace.Span) + + // Start a span of span kind server given name & context + StartServerSpan(ctx context.Context, traceName string) (context.Context, trace.Span) + + // End a span + EndSpan(span trace.Span) + + // Record an error on the span for export in case of failure + RecordError(span trace.Span, err error) +} diff --git a/tracing/util.go b/tracing/util.go new file mode 100644 index 0000000000..aea0f49ac9 --- /dev/null +++ b/tracing/util.go @@ -0,0 +1,40 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +const name = "cloud.google.com/gcsfuse" + +var tracer = otel.Tracer(name) + +func GCSFuseTracer() trace.Tracer { + return tracer +} + +// When tracing is enabled ensure span & trace context from oldCtx is passed on to newCtx +func MaybePropagateTraceContext(newCtx context.Context, oldCtx context.Context, isTracingEnabled bool) context.Context { + if !isTracingEnabled { + return newCtx + } + + span := trace.SpanFromContext(oldCtx) + return trace.ContextWithSpan(newCtx, span) +}