Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cmd/legacy_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"time"

"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"golang.org/x/sys/unix"

"github.com/googlecloudplatform/gcsfuse/v3/cfg"
Expand Down Expand Up @@ -179,7 +180,7 @@ func createStorageHandle(newConfig *cfg.Config, userAgent string, metricHandle m
////////////////////////////////////////////////////////////////////////

// Mount the file system according to arguments in the supplied context.
func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, metricHandle metrics.MetricHandle, isSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) {
func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle, isSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) {
// Enable invariant checking if requested.
if newConfig.Debug.ExitOnInvariantViolation {
locker.EnableInvariantsCheck()
Expand Down Expand Up @@ -214,6 +215,7 @@ func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config,
newConfig,
storageHandle,
metricHandle,
traceHandle,
isSet)

if err != nil {
Expand Down Expand Up @@ -558,6 +560,11 @@ func Mount(mountInfo *mountInfo, bucketName, mountPoint string) (err error) {
}
}
shutdownTracingFn := monitor.SetupTracing(ctx, newConfig, logger.MountInstanceID(fsName(bucketName)))
traceHandle := tracing.NewNoopTracer()
if cfg.IsTracingEnabled(newConfig) {
traceHandle = tracing.NewOtelTracer()
}

shutdownFn := common.JoinShutdownFunc(metricExporterShutdownFn, shutdownTracingFn)

// No-op if profiler is disabled.
Expand All @@ -570,7 +577,7 @@ func Mount(mountInfo *mountInfo, bucketName, mountPoint string) (err error) {
var mfs *fuse.MountedFileSystem
{
startTime := time.Now()
mfs, err = mountWithArgs(bucketName, mountPoint, newConfig, metricHandle, mountInfo.isUserSet)
mfs, err = mountWithArgs(bucketName, mountPoint, newConfig, metricHandle, traceHandle, mountInfo.isUserSet)

// This utility is to absorb the error
// returned by daemonize.SignalOutcome calls by simply
Expand Down
4 changes: 3 additions & 1 deletion cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/mount"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"golang.org/x/net/context"

"github.com/googlecloudplatform/gcsfuse/v3/internal/fs"
Expand All @@ -43,8 +44,8 @@ func mountWithStorageHandle(
newConfig *cfg.Config,
storageHandle storage.StorageHandle,
metricHandle metrics.MetricHandle,
traceHandle tracing.TraceHandle,
isUserSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) {

// Sanity check: make sure the temporary directory exists and is writable
// currently. This gives a better user experience than harder to debug EIO
// errors when reading files in the future.
Expand Down Expand Up @@ -127,6 +128,7 @@ be interacting with the file system.`)
NewConfig: newConfig,
IsUserSet: isUserSet,
MetricHandle: metricHandle,
TraceHandle: traceHandle,
}
if serverCfg.NewConfig.FileSystem.ExperimentalEnableDentryCache {
serverCfg.Notifier = fuse.NewNotifier()
Expand Down
7 changes: 7 additions & 0 deletions internal/bufferedread/buffered_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/jacobsa/fuse/fuseops"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -72,6 +73,8 @@ type BufferedReader struct {

metricHandle metrics.MetricHandle

traceHandle tracing.TraceHandle

// handleID is the file handle id, used for logging.
handleID fuseops.HandleID

Expand Down Expand Up @@ -123,6 +126,7 @@ type BufferedReaderOptions struct {
GlobalMaxBlocksSem *semaphore.Weighted
WorkerPool workerpool.WorkerPool
MetricHandle metrics.MetricHandle
TraceHandle tracing.TraceHandle
ReadTypeClassifier *gcsx.ReadTypeClassifier
HandleID fuseops.HandleID
}
Expand All @@ -136,7 +140,9 @@ func NewBufferedReader(opts *BufferedReaderOptions) (*BufferedReader, error) {
// the file, capped by the configured minimum.
blocksInFile := (int64(opts.Object.Size) + opts.Config.PrefetchBlockSizeBytes - 1) / opts.Config.PrefetchBlockSizeBytes
numBlocksToReserve := min(blocksInFile, opts.Config.MinBlocksPerHandle)
_, span := opts.TraceHandle.StartTrace(context.Background(), tracing.ReadPrefetchBlockPoolGen)
blockpool, err := block.NewPrefetchBlockPool(opts.Config.PrefetchBlockSizeBytes, opts.Config.MaxPrefetchBlockCnt, numBlocksToReserve, opts.GlobalMaxBlocksSem)
opts.TraceHandle.EndTrace(span)
if err != nil {
if errors.Is(err, block.CantAllocateAnyBlockError) {
opts.MetricHandle.BufferedReadFallbackTriggerCount(1, "insufficient_memory")
Expand All @@ -155,6 +161,7 @@ func NewBufferedReader(opts *BufferedReaderOptions) (*BufferedReader, error) {
blockPool: blockpool,
workerPool: opts.WorkerPool,
metricHandle: opts.MetricHandle,
traceHandle: opts.TraceHandle,
handleID: opts.HandleID,
prefetchMultiplier: defaultPrefetchMultiplier,
randomReadsThreshold: opts.Config.RandomSeekThreshold,
Expand Down
7 changes: 5 additions & 2 deletions internal/cache/file/downloader/downloader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Google LLC

Check failure on line 1 in internal/cache/file/downloader/downloader.go

View workflow job for this annotation

GitHub Actions / Lint

: # github.com/googlecloudplatform/gcsfuse/v3/internal/cache/file/downloader [github.com/googlecloudplatform/gcsfuse/v3/internal/cache/file/downloader.test]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
"github.com/googlecloudplatform/gcsfuse/v3/internal/locker"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"golang.org/x/sync/semaphore"
)

Expand Down Expand Up @@ -62,11 +63,12 @@
mu locker.Locker
maxParallelismSem *semaphore.Weighted
metricHandle metrics.MetricHandle
traceHandle tracing.TraceHandle
}

func NewJobManager(fileInfoCache *lru.Cache, filePerm os.FileMode, dirPerm os.FileMode,
cacheDir string, sequentialReadSizeMb int32, c *cfg.FileCacheConfig,
metricHandle metrics.MetricHandle) (jm *JobManager) {
metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle) (jm *JobManager) {
maxParallelDownloads := int64(math.MaxInt64)
if c.MaxParallelDownloads > 0 {
maxParallelDownloads = c.MaxParallelDownloads
Expand All @@ -81,6 +83,7 @@
// Shared between jobs - Limits the overall concurrency of downloads.
maxParallelismSem: semaphore.NewWeighted(maxParallelDownloads),
metricHandle: metricHandle,
traceHandle: traceHandle,
}
jm.mu = locker.New("JobManager", func() {})
jm.jobs = make(map[string]*Job)
Expand Down Expand Up @@ -118,7 +121,7 @@
removeJobCallback := func() {
jm.removeJob(object.Name, bucket.Name())
}
job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig, jm.maxParallelismSem, jm.metricHandle)
job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig, jm.maxParallelismSem, jm.metricHandle, jm.traceHandle)
jm.jobs[objectPath] = job
return job
}
Expand Down
17 changes: 15 additions & 2 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"go.opentelemetry.io/otel/trace"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -97,6 +99,8 @@ type Job struct {
rangeChan chan data.ObjectRange

metricsHandle metrics.MetricHandle

traceHandle tracing.TraceHandle
}

// JobStatus represents the status of job.
Expand All @@ -123,6 +127,7 @@ func NewJob(
fileCacheConfig *cfg.FileCacheConfig,
maxParallelismSem *semaphore.Weighted,
metricHandle metrics.MetricHandle,
traceHandle tracing.TraceHandle,
) (job *Job) {
job = &Job{
object: object,
Expand All @@ -134,6 +139,7 @@ func NewJob(
fileCacheConfig: fileCacheConfig,
maxParallelismSem: maxParallelismSem,
metricsHandle: metricHandle,
traceHandle: traceHandle,
}
job.mu = locker.New("Job-"+fileSpec.Path, job.checkInvariants)
job.init()
Expand Down Expand Up @@ -486,8 +492,15 @@ func (job *Job) Download(ctx context.Context, offset int64, waitForDownload bool
} else if job.status.Name == NotStarted {
// Start the async download
job.status.Name = Downloading
job.cancelCtx, job.cancelFunc = context.WithCancel(context.Background())
go job.downloadObjectAsync()
span := trace.SpanFromContext(ctx)
newCtx := context.Background()
newCtx = trace.ContextWithSpan(newCtx, span)
newCtx, downloadSpan := job.traceHandle.StartTraceLink(newCtx, tracing.FileCacheRead)
job.cancelCtx, job.cancelFunc = context.WithCancel(newCtx)
go func() {
defer job.traceHandle.EndTrace(downloadSpan)
job.downloadObjectAsync()
}()
} else if job.status.Name == Failed || job.status.Name == Invalid || job.status.Offset >= offset {
defer job.mu.Unlock()
return job.status, nil
Expand Down
26 changes: 10 additions & 16 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/fs/gcsfuse_errors"
"github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"

"golang.org/x/sync/semaphore"

"go.opentelemetry.io/otel/trace"

"github.com/googlecloudplatform/gcsfuse/v3/cfg"
"github.com/googlecloudplatform/gcsfuse/v3/internal/cache/file"
"github.com/googlecloudplatform/gcsfuse/v3/internal/cache/file/downloader"
Expand Down Expand Up @@ -143,6 +142,8 @@ type ServerConfig struct {

MetricHandle metrics.MetricHandle

TraceHandle tracing.TraceHandle

// Notifier allows the file system to send invalidation messages to the FUSE
// kernel module. This enables proactive cache invalidation (e.g., for dentries)
// when underlying content changes, improving consistency while still leveraging
Expand Down Expand Up @@ -213,6 +214,7 @@ func NewFileSystem(ctx context.Context, serverCfg *ServerConfig) (fuseutil.FileS
fileCacheHandler: fileCacheHandler,
cacheFileForRangeRead: serverCfg.NewConfig.FileCache.CacheFileForRangeRead,
metricHandle: serverCfg.MetricHandle,
traceHandle: serverCfg.TraceHandle,
enableAtomicRenameObject: serverCfg.NewConfig.EnableAtomicRenameObject,
isTracingEnabled: cfg.IsTracingEnabled(serverCfg.NewConfig),
globalMaxWriteBlocksSem: semaphore.NewWeighted(serverCfg.NewConfig.Write.GlobalMaxBlocks),
Expand Down Expand Up @@ -313,7 +315,7 @@ func createFileCacheHandler(serverCfg *ServerConfig) (fileCacheHandler *file.Cac
return nil, fmt.Errorf("createFileCacheHandler: while creating file cache directory: %w", cacheDirErr)
}

jobManager := downloader.NewJobManager(fileInfoCache, filePerm, dirPerm, cacheDir, serverCfg.SequentialReadSizeMb, &serverCfg.NewConfig.FileCache, serverCfg.MetricHandle)
jobManager := downloader.NewJobManager(fileInfoCache, filePerm, dirPerm, cacheDir, serverCfg.SequentialReadSizeMb, &serverCfg.NewConfig.FileCache, serverCfg.MetricHandle, serverCfg.TraceHandle)
fileCacheHandler = file.NewCacheHandler(fileInfoCache, jobManager, cacheDir, filePerm, dirPerm, serverCfg.NewConfig.FileCache.ExcludeRegex, serverCfg.NewConfig.FileCache.IncludeRegex, serverCfg.NewConfig.FileCache.ExperimentalEnableChunkCache)
return
}
Expand Down Expand Up @@ -552,6 +554,8 @@ type fileSystem struct {

metricHandle metrics.MetricHandle

traceHandle tracing.TraceHandle

enableAtomicRenameObject bool

isTracingEnabled bool
Expand Down Expand Up @@ -1732,16 +1736,6 @@ func (fs *fileSystem) StatFS(
return
}

// When tracing is enabled ensure span & trace context from oldCtx is passed on to newCtx
func maybePropagateTraceContext(newCtx context.Context, oldCtx context.Context, isTracingEnabled bool) context.Context {
if !isTracingEnabled {
return newCtx
}

span := trace.SpanFromContext(oldCtx)
return trace.ContextWithSpan(newCtx, span)
}

// getInterruptlessContext returns a new context that is not cancellable by the
// parent context if the ignore-interrupts flag is set. Otherwise, it returns
// the original context.
Expand All @@ -1750,7 +1744,7 @@ func (fs *fileSystem) getInterruptlessContext(ctx context.Context) context.Conte
// When ignore interrupts config is set, we are creating a new context not
// cancellable by parent context.
newCtx := context.Background()
return maybePropagateTraceContext(newCtx, ctx, fs.isTracingEnabled)
return tracing.MaybePropagateTraceContext(newCtx, ctx, fs.isTracingEnabled)
}

return ctx
Expand Down Expand Up @@ -2096,7 +2090,7 @@ func (fs *fileSystem) CreateFile(

// CreateFile() invoked to create new files, can be safely considered as filehandle
// opened in append mode.
fs.handles[op.Handle] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, openMode, fs.newConfig, fs.bufferedReadWorkerPool, fs.globalMaxReadBlocksSem, op.Handle)
fs.handles[op.Handle] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, fs.traceHandle, openMode, fs.newConfig, fs.bufferedReadWorkerPool, fs.globalMaxReadBlocksSem, op.Handle)

fs.mu.Unlock()

Expand Down Expand Up @@ -2870,7 +2864,7 @@ func (fs *fileSystem) OpenFile(

// Figure out the mode in which the file is being opened.
openMode := util.FileOpenMode(op.OpenFlags)
fs.handles[op.Handle] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, openMode, fs.newConfig, fs.bufferedReadWorkerPool, fs.globalMaxReadBlocksSem, op.Handle)
fs.handles[op.Handle] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, fs.traceHandle, openMode, fs.newConfig, fs.bufferedReadWorkerPool, fs.globalMaxReadBlocksSem, op.Handle)

// When we observe object generations that we didn't create, we assign them
// new inode IDs. So for a given inode, all modifications go through the
Expand Down
9 changes: 7 additions & 2 deletions internal/fs/handle/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool"
"github.com/googlecloudplatform/gcsfuse/v3/internal/workloadinsight"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/syncutil"
"golang.org/x/net/context"
Expand Down Expand Up @@ -64,6 +65,8 @@ type FileHandle struct {
// will be downloaded for random reads as well too.
cacheFileForRangeRead bool
metricHandle metrics.MetricHandle
traceHandle tracing.TraceHandle

// openMode is used to store the mode in which the file is opened.
openMode util.OpenMode

Expand All @@ -82,12 +85,13 @@ type FileHandle struct {
}

// LOCKS_REQUIRED(fh.inode.mu)
func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle metrics.MetricHandle, openMode util.OpenMode, c *cfg.Config, bufferedReadWorkerPool workerpool.WorkerPool, globalMaxReadBlocksSem *semaphore.Weighted, handleID fuseops.HandleID) (fh *FileHandle) {
func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle, openMode util.OpenMode, c *cfg.Config, bufferedReadWorkerPool workerpool.WorkerPool, globalMaxReadBlocksSem *semaphore.Weighted, handleID fuseops.HandleID) (fh *FileHandle) {
fh = &FileHandle{
inode: inode,
fileCacheHandler: fileCacheHandler,
cacheFileForRangeRead: cacheFileForRangeRead,
metricHandle: metricHandle,
traceHandle: traceHandle,
openMode: openMode,
config: c,
bufferedReadWorkerPool: bufferedReadWorkerPool,
Expand Down Expand Up @@ -204,6 +208,7 @@ func (fh *FileHandle) ReadWithReadManager(ctx context.Context, req *gcsx.ReadReq
FileCacheHandler: fh.fileCacheHandler,
CacheFileForRangeRead: fh.cacheFileForRangeRead,
MetricHandle: fh.metricHandle,
TraceHandle: fh.traceHandle,
MrdWrapper: mrdWrapper,
Config: fh.config,
GlobalMaxBlocksSem: fh.globalMaxReadBlocksSem,
Expand Down Expand Up @@ -284,7 +289,7 @@ func (fh *FileHandle) Read(ctx context.Context, dst []byte, offset int64, sequen

fh.destroyReader()
// Attempt to create an appropriate reader.
fh.reader = gcsx.NewRandomReader(minObj, bucket, sequentialReadSizeMb, fh.fileCacheHandler, fh.cacheFileForRangeRead, fh.metricHandle, mrdWrapper, fh.config, fh.handleID)
fh.reader = gcsx.NewRandomReader(minObj, bucket, sequentialReadSizeMb, fh.fileCacheHandler, fh.cacheFileForRangeRead, fh.metricHandle, fh.traceHandle, mrdWrapper, fh.config, fh.handleID)

// Release RWLock and take RLock on file handle again
fh.mu.Unlock()
Expand Down
Loading
Loading