Skip to content
11 changes: 9 additions & 2 deletions cmd/legacy_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"time"

"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"golang.org/x/sys/unix"

"github.com/googlecloudplatform/gcsfuse/v3/cfg"
Expand Down Expand Up @@ -179,7 +180,7 @@ func createStorageHandle(newConfig *cfg.Config, userAgent string, metricHandle m
////////////////////////////////////////////////////////////////////////

// Mount the file system according to arguments in the supplied context.
func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, metricHandle metrics.MetricHandle, isSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) {
func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle, isSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) {
// Enable invariant checking if requested.
if newConfig.Debug.ExitOnInvariantViolation {
locker.EnableInvariantsCheck()
Expand Down Expand Up @@ -214,6 +215,7 @@ func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config,
newConfig,
storageHandle,
metricHandle,
traceHandle,
isSet)

if err != nil {
Expand Down Expand Up @@ -558,6 +560,11 @@ func Mount(mountInfo *mountInfo, bucketName, mountPoint string) (err error) {
}
}
shutdownTracingFn := monitor.SetupTracing(ctx, newConfig, logger.MountInstanceID(fsName(bucketName)))
traceHandle := tracing.NewNoopTracer()
if cfg.IsTracingEnabled(newConfig) {
traceHandle = tracing.NewOTELTracer()
}

shutdownFn := common.JoinShutdownFunc(metricExporterShutdownFn, shutdownTracingFn)

// No-op if profiler is disabled.
Expand All @@ -570,7 +577,7 @@ func Mount(mountInfo *mountInfo, bucketName, mountPoint string) (err error) {
var mfs *fuse.MountedFileSystem
{
startTime := time.Now()
mfs, err = mountWithArgs(bucketName, mountPoint, newConfig, metricHandle, mountInfo.isUserSet)
mfs, err = mountWithArgs(bucketName, mountPoint, newConfig, metricHandle, traceHandle, mountInfo.isUserSet)

// This utility is to absorb the error
// returned by daemonize.SignalOutcome calls by simply
Expand Down
3 changes: 3 additions & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/mount"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"golang.org/x/net/context"

"github.com/googlecloudplatform/gcsfuse/v3/internal/fs"
Expand All @@ -43,6 +44,7 @@ func mountWithStorageHandle(
newConfig *cfg.Config,
storageHandle storage.StorageHandle,
metricHandle metrics.MetricHandle,
traceHandle tracing.TraceHandle,
isUserSet cfg.IsValueSet) (mfs *fuse.MountedFileSystem, err error) {

// Sanity check: make sure the temporary directory exists and is writable
Expand Down Expand Up @@ -127,6 +129,7 @@ be interacting with the file system.`)
NewConfig: newConfig,
IsUserSet: isUserSet,
MetricHandle: metricHandle,
TraceHandle: traceHandle,
}
if serverCfg.NewConfig.FileSystem.ExperimentalEnableDentryCache {
serverCfg.Notifier = fuse.NewNotifier()
Expand Down
5 changes: 5 additions & 0 deletions internal/bufferedread/buffered_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/internal/workerpool"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/jacobsa/fuse/fuseops"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -72,6 +73,8 @@ type BufferedReader struct {

metricHandle metrics.MetricHandle

traceHandle tracing.TraceHandle

// handleID is the file handle id, used for logging.
handleID fuseops.HandleID

Expand Down Expand Up @@ -123,6 +126,7 @@ type BufferedReaderOptions struct {
GlobalMaxBlocksSem *semaphore.Weighted
WorkerPool workerpool.WorkerPool
MetricHandle metrics.MetricHandle
TraceHandle tracing.TraceHandle
ReadTypeClassifier *gcsx.ReadTypeClassifier
HandleID fuseops.HandleID
}
Expand Down Expand Up @@ -155,6 +159,7 @@ func NewBufferedReader(opts *BufferedReaderOptions) (*BufferedReader, error) {
blockPool: blockpool,
workerPool: opts.WorkerPool,
metricHandle: opts.MetricHandle,
traceHandle: opts.TraceHandle,
handleID: opts.HandleID,
prefetchMultiplier: defaultPrefetchMultiplier,
randomReadsThreshold: opts.Config.RandomSeekThreshold,
Expand Down
5 changes: 5 additions & 0 deletions internal/cache/file/cache_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -156,6 +157,7 @@ func (cht *cacheHandleTest) SetupTest() {
fileCacheConfig,
semaphore.NewWeighted(math.MaxInt64),
metrics.NewNoopMetrics(),
tracing.NewNoopTracer(),
)

cht.cacheHandle = NewCacheHandle(readLocalFileHandle, fileDownloadJob, cht.cache, false, 0)
Expand Down Expand Up @@ -859,6 +861,7 @@ func (cht *cacheHandleTest) Test_SequentialRead_Parallel_Download_True() {
fileCacheConfig,
semaphore.NewWeighted(math.MaxInt64),
metrics.NewNoopMetrics(),
tracing.NewNoopTracer(),
)
cht.cacheHandle.fileDownloadJob = fileDownloadJob

Expand Down Expand Up @@ -894,6 +897,7 @@ func (cht *cacheHandleTest) Test_RandomRead_Parallel_Download_True() {
fileCacheConfig,
semaphore.NewWeighted(math.MaxInt64),
metrics.NewNoopMetrics(),
tracing.NewNoopTracer(),
)
cht.cacheHandle.fileDownloadJob = fileDownloadJob

Expand Down Expand Up @@ -929,6 +933,7 @@ func (cht *cacheHandleTest) Test_RandomRead_CacheForRangeReadFalse_And_ParallelD
fileCacheConfig,
semaphore.NewWeighted(math.MaxInt64),
metrics.NewNoopMetrics(),
tracing.NewNoopTracer(),
)

// Since, it's a random read, download job will not start.
Expand Down
3 changes: 2 additions & 1 deletion internal/cache/file/cache_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -85,7 +86,7 @@ func initializeCacheHandlerTestArgs(t *testing.T, fileCacheConfig *cfg.FileCache

// Job manager
jobManager := downloader.NewJobManager(cache, util.DefaultFilePerm,
util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, fileCacheConfig, metrics.NewNoopMetrics())
util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer())

// Mocked cached handler object.
isSparse := fileCacheConfig != nil && fileCacheConfig.ExperimentalEnableChunkCache
Expand Down
7 changes: 5 additions & 2 deletions internal/cache/file/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/locker"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"golang.org/x/sync/semaphore"
)

Expand Down Expand Up @@ -62,11 +63,12 @@ type JobManager struct {
mu locker.Locker
maxParallelismSem *semaphore.Weighted
metricHandle metrics.MetricHandle
traceHandle tracing.TraceHandle
}

func NewJobManager(fileInfoCache *lru.Cache, filePerm os.FileMode, dirPerm os.FileMode,
cacheDir string, sequentialReadSizeMb int32, c *cfg.FileCacheConfig,
metricHandle metrics.MetricHandle) (jm *JobManager) {
metricHandle metrics.MetricHandle, traceHandle tracing.TraceHandle) (jm *JobManager) {
maxParallelDownloads := int64(math.MaxInt64)
if c.MaxParallelDownloads > 0 {
maxParallelDownloads = c.MaxParallelDownloads
Expand All @@ -81,6 +83,7 @@ func NewJobManager(fileInfoCache *lru.Cache, filePerm os.FileMode, dirPerm os.Fi
// Shared between jobs - Limits the overall concurrency of downloads.
maxParallelismSem: semaphore.NewWeighted(maxParallelDownloads),
metricHandle: metricHandle,
traceHandle: traceHandle,
}
jm.mu = locker.New("JobManager", func() {})
jm.jobs = make(map[string]*Job)
Expand Down Expand Up @@ -118,7 +121,7 @@ func (jm *JobManager) CreateJobIfNotExists(object *gcs.MinObject, bucket gcs.Buc
removeJobCallback := func() {
jm.removeJob(object.Name, bucket.Name())
}
job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig, jm.maxParallelismSem, jm.metricHandle)
job = NewJob(object, bucket, jm.fileInfoCache, jm.sequentialReadSizeMb, fileSpec, removeJobCallback, jm.fileCacheConfig, jm.maxParallelismSem, jm.metricHandle, jm.traceHandle)
jm.jobs[objectPath] = job
return job
}
Expand Down
3 changes: 2 additions & 1 deletion internal/cache/file/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
testutil "github.com/googlecloudplatform/gcsfuse/v3/internal/util"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
. "github.com/jacobsa/ogletest"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -70,7 +71,7 @@ func (dt *downloaderTest) setupHelper() {
ExpectEq(nil, err)

dt.initJobTest(DefaultObjectName, []byte("taco"), DefaultSequentialReadSizeMb, CacheMaxSize, func() {})
dt.jm = NewJobManager(dt.cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, dt.defaultFileCacheConfig, metrics.NewNoopMetrics())
dt.jm = NewJobManager(dt.cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, dt.defaultFileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer())
}

func (dt *downloaderTest) SetUp(*TestInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestParallelDownloads(t *testing.T) {
WriteBufferSize: 4 * 1024 * 1024,
EnableODirect: tc.enableODirect,
}
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig, metrics.NewNoopMetrics())
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer())
job := jm.CreateJobIfNotExists(&minObj, bucket)
subscriberC := job.subscribe(tc.subscribedOffset)

Expand Down Expand Up @@ -199,7 +200,7 @@ func TestMultipleConcurrentDownloads(t *testing.T) {
MaxParallelDownloads: 2,
WriteBufferSize: 4 * 1024 * 1024,
}
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig, metrics.NewNoopMetrics())
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig, metrics.NewNoopMetrics(), tracing.NewNoopTracer())
job1 := jm.CreateJobIfNotExists(&minObj1, bucket)
job2 := jm.CreateJobIfNotExists(&minObj2, bucket)
s1 := job1.subscribe(10 * util.MiB)
Expand Down
6 changes: 6 additions & 0 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"

"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -97,6 +99,8 @@ type Job struct {
rangeChan chan data.ObjectRange

metricsHandle metrics.MetricHandle

traceHandle tracing.TraceHandle
}

// JobStatus represents the status of job.
Expand All @@ -123,6 +127,7 @@ func NewJob(
fileCacheConfig *cfg.FileCacheConfig,
maxParallelismSem *semaphore.Weighted,
metricHandle metrics.MetricHandle,
traceHandle tracing.TraceHandle,
) (job *Job) {
job = &Job{
object: object,
Expand All @@ -134,6 +139,7 @@ func NewJob(
fileCacheConfig: fileCacheConfig,
maxParallelismSem: maxParallelismSem,
metricsHandle: metricHandle,
traceHandle: traceHandle,
}
job.mu = locker.New("Job-"+fileSpec.Path, job.checkInvariants)
job.init()
Expand Down
3 changes: 2 additions & 1 deletion internal/cache/file/downloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil"
testutil "github.com/googlecloudplatform/gcsfuse/v3/internal/util"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
. "github.com/jacobsa/ogletest"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func (dt *downloaderTest) initJobTest(objectName string, objectContent []byte, s
}
dt.cache = lru.NewCache(lruCacheSize)

dt.job = NewJob(&dt.object, dt.bucket, dt.cache, sequentialReadSize, dt.fileSpec, removeCallback, dt.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics())
dt.job = NewJob(&dt.object, dt.bucket, dt.cache, sequentialReadSize, dt.fileSpec, removeCallback, dt.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics(), tracing.NewNoopTracer())
fileInfoKey := data.FileInfoKey{
BucketName: storage.TestBucketName,
ObjectName: objectName,
Expand Down
3 changes: 2 additions & 1 deletion internal/cache/file/downloader/job_testify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"

Expand Down Expand Up @@ -65,7 +66,7 @@ func (t *JobTestifyTest) initReadCacheTestifyTest(objectName string, objectConte
DirPerm: util.DefaultDirPerm,
}
t.cache = lru.NewCache(lruCacheSize)
t.job = NewJob(&t.object, t.mockBucket, t.cache, sequentialReadSize, t.fileSpec, removeCallback, t.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics())
t.job = NewJob(&t.object, t.mockBucket, t.cache, sequentialReadSize, t.fileSpec, removeCallback, t.defaultFileCacheConfig, semaphore.NewWeighted(math.MaxInt64), metrics.NewNoopMetrics(), tracing.NewNoopTracer())
fileInfoKey := data.FileInfoKey{
BucketName: storage.TestBucketName,
ObjectName: objectName,
Expand Down
Loading
Loading