Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions internal/gcsx/mrd_simple_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package gcsx

import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/googlecloudplatform/gcsfuse/v3/internal/logger"
)

// MrdSimpleReader is a reader that uses an MRD Instance to read data from a GCS object.
Expand All @@ -36,6 +40,17 @@ func NewMrdSimpleReader(mrdInstance *MrdInstance) *MrdSimpleReader {
}
}

// isShortRead checks if the read operation returned fewer bytes than requested
// without encountering a fatal error.
// It returns true if bytesRead < bufferSize and err is either nil, io.EOF, or io.ErrUnexpectedEOF.
func isShortRead(bytesRead int, bufferSize int, err error) bool {
if bytesRead >= bufferSize {
return false
}

return err == nil || errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)
}

// ReadAt reads data into the provided request buffer starting at the specified
// offset. It retrieves an available MRD entry and uses it to download the
// requested byte range.
Expand All @@ -55,8 +70,24 @@ func (msr *MrdSimpleReader) ReadAt(ctx context.Context, req *ReadRequest) (ReadR
msr.mrdInstance.IncrementRefCount()
}

n, err := msr.mrdInstance.Read(ctx, req.Buffer, req.Offset)
return ReadResponse{Size: n}, err
bytesRead, err := msr.mrdInstance.Read(ctx, req.Buffer, req.Offset)
if isShortRead(bytesRead, len(req.Buffer), err) {
originalErr := err
if err = msr.mrdInstance.RecreateMRD(); err != nil {
logger.Warnf("Failed to recreate MRD for short read retry. Will retry with older MRD: %v", err)
}
retryOffset := req.Offset + int64(bytesRead)
retryBuffer := req.Buffer[bytesRead:]
var bytesReadOnRetry int
bytesReadOnRetry, err = msr.mrdInstance.Read(ctx, retryBuffer, retryOffset)
bytesRead += bytesReadOnRetry
// In case the offset is greater than object size, we can get OutOfRange error which should not be propagated
// to user. Also, MRD will have to be recreated in that scenario which will happen automatically during next read.
if bytesReadOnRetry == 0 {
err = originalErr
}
}
return ReadResponse{Size: bytesRead}, err
}

// Destroy cleans up the resources used by the reader, primarily by destroying
Expand Down
55 changes: 55 additions & 0 deletions internal/gcsx/mrd_simple_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gcsx

import (
"context"
"io"
"testing"

"github.com/googlecloudplatform/gcsfuse/v3/cfg"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type MrdSimpleReaderTest struct {
Expand Down Expand Up @@ -138,6 +141,58 @@ func (t *MrdSimpleReaderTest) TestReadAt_NilMrdInstance() {
assert.Equal(t.T(), 0, resp.Size)
}

func (t *MrdSimpleReaderTest) TestReadAt_ShortRead_RetrySuccess() {
data := []byte("hello world")
// First MRD returns short read.
fakeMRD1 := fake.NewFakeMultiRangeDownloaderWithShortRead(t.object, data)
// Second MRD returns full read.
fakeMRD2 := fake.NewFakeMultiRangeDownloader(t.object, data)
// Expectation:
// 1. Initial Read calls ensureMRDPool -> NewMRDPool -> NewMultiRangeDownloader. Returns fakeMRD1.
// 2. Read returns short read.
// 3. ReadAt calls RecreateMRD -> NewMRDPool -> NewMultiRangeDownloader. Returns fakeMRD2.
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once()
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once()
buf := make([]byte, len(data))
req := &ReadRequest{
Buffer: buf,
Offset: 0,
}

resp, err := t.reader.ReadAt(context.Background(), req)

assert.NoError(t.T(), err)
assert.Equal(t.T(), len(data), resp.Size)
assert.Equal(t.T(), string(data), string(buf))
// Verify refCount incremented
t.mrdInstance.refCountMu.Lock()
assert.Equal(t.T(), int64(1), t.mrdInstance.refCount)
t.mrdInstance.refCountMu.Unlock()
t.bucket.AssertExpectations(t.T())
}

func (t *MrdSimpleReaderTest) TestReadAt_ShortRead_RetryFails_ReturnsOriginalError() {
data := []byte("hello world")
// First MRD returns short read with io.EOF.
fakeMRD1 := fake.NewFakeMultiRangeDownloaderWithShortRead(t.object, data)
// Second MRD returns 0 bytes and an error.
fakeMRD2 := fake.NewFakeMultiRangeDownloaderWithSleepAndDefaultError(t.object, []byte{}, 0, status.Error(codes.OutOfRange, "Out of range error"))
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD1, nil).Once()
t.bucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fakeMRD2, nil).Once()
buf := make([]byte, len(data))
req := &ReadRequest{
Buffer: buf,
Offset: 0,
}

resp, err := t.reader.ReadAt(context.Background(), req)

assert.ErrorIs(t.T(), err, io.EOF)
assert.Equal(t.T(), 5, resp.Size)
assert.Equal(t.T(), "hello", string(buf[:5]))
t.bucket.AssertExpectations(t.T())
}

func (t *MrdSimpleReaderTest) TestDestroy() {
// Setup state where refCount is incremented
t.reader.mrdInstanceInUse.Store(true)
Expand Down
Loading