Skip to content

Commit b9ebe29

Browse files
committed
feat(gcs): implement Seek, ReadAt, and WriteAt methods for GCS provider
1 parent 071e2f9 commit b9ebe29

File tree

6 files changed

+514
-29
lines changed

6 files changed

+514
-29
lines changed

pkg/gofr/datasource/file/gcs/file.go

Lines changed: 170 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gcs
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"io"
78
"time"
89
)
@@ -25,10 +26,8 @@ type File struct {
2526
}
2627

2728
var (
28-
errNilGCSFileBody = errors.New("gcs file body is nil")
29-
errSeekNotSupported = errors.New("seek not supported on GCSFile")
30-
errReadAtNotSupported = errors.New("readAt not supported on GCSFile")
31-
errWriteAtNotSupported = errors.New("writeAt not supported on GCSFile (read-only)")
29+
errNilGCSFileBody = errors.New("gcs file body is nil")
30+
errOffesetOutOfRange = errors.New("offset out of range")
3231
)
3332

3433
const (
@@ -127,17 +126,177 @@ func (f *File) Close() error {
127126
return nil
128127
}
129128

130-
func (*File) Seek(_ int64, _ int) (int64, error) {
131-
// Not supported: Seek requires reopening with range.
132-
return 0, errSeekNotSupported
129+
func (f *File) check(whence int, offset, length int64, msg *string) (int64, error) {
130+
switch whence {
131+
case io.SeekStart:
132+
case io.SeekEnd:
133+
offset += length
134+
case io.SeekCurrent:
135+
offset += f.size
136+
default:
137+
return 0, errOffesetOutOfRange
138+
}
139+
140+
if offset < 0 || offset > length {
141+
*msg = fmt.Sprintf("Offset %v out of bounds %v", offset, length)
142+
return 0, errOffesetOutOfRange
143+
}
144+
145+
f.size = offset
146+
147+
return f.size, nil
133148
}
134149

135-
func (*File) ReadAt(_ []byte, _ int64) (int, error) {
136-
return 0, errReadAtNotSupported
150+
func (f *File) Seek(offset int64, whence int) (int64, error) {
151+
bucketName := getBucketName(f.name)
152+
153+
var msg string
154+
155+
status := statusErr
156+
157+
defer f.sendOperationStats(&FileLog{
158+
Operation: "SEEK",
159+
Location: getLocation(bucketName),
160+
Status: &status,
161+
Message: &msg,
162+
}, time.Now())
163+
164+
ctx := context.Background()
165+
166+
attrs, err := f.conn.StatObject(ctx, f.name)
167+
if err != nil {
168+
msg = fmt.Sprintf("could not get object attrs: %v", err)
169+
f.logger.Errorf(msg)
170+
171+
return 0, err
172+
}
173+
174+
newPos, err := f.check(whence, offset, attrs.Size, &msg)
175+
if err != nil {
176+
f.logger.Errorf("Seek failed. Error: %v", err)
177+
return 0, err
178+
}
179+
180+
if f.body != nil {
181+
_ = f.body.Close()
182+
}
183+
184+
reader, err := f.conn.NewRangeReader(ctx, getObjectName(f.name), newPos, -1)
185+
if err != nil {
186+
f.logger.Errorf("failed to set new range reader: %v", err)
187+
188+
return 0, err
189+
}
190+
191+
f.body = reader
192+
f.size = newPos
193+
194+
status = statusSuccess
195+
196+
f.logger.Logf("Seek repositioned reader to offset %v for %q", newPos, f.name)
197+
198+
return newPos, nil
199+
}
200+
201+
func (f *File) ReadAt(p []byte, off int64) (int, error) {
202+
bucketName := getBucketName(f.name)
203+
204+
var msg string
205+
206+
status := statusErr
207+
208+
defer f.sendOperationStats(&FileLog{
209+
Operation: "READ_AT",
210+
Location: getLocation(bucketName),
211+
Status: &status,
212+
Message: &msg,
213+
}, time.Now())
214+
215+
ctx := context.Background()
216+
217+
rdr, err := f.conn.NewRangeReader(ctx, getObjectName(f.name), off, int64(len(p)))
218+
if err != nil {
219+
f.logger.Errorf("failed to create range reader: %v", err)
220+
221+
return 0, err
222+
}
223+
defer rdr.Close()
224+
225+
n, err := io.ReadFull(rdr, p)
226+
if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
227+
msg = fmt.Sprintf("read failed: %v", err)
228+
f.logger.Errorf(msg)
229+
230+
return n, err
231+
}
232+
233+
status = statusSuccess
234+
235+
f.logger.Debugf("ReadAt read %d bytes from offset %d for file %q", n, off, f.name)
236+
237+
return n, nil
137238
}
138239

139-
func (*File) WriteAt(_ []byte, _ int64) (int, error) {
140-
return 0, errWriteAtNotSupported
240+
func (f *File) WriteAt(p []byte, off int64) (int, error) {
241+
bucketName := getBucketName(f.name)
242+
243+
var msg string
244+
245+
status := statusErr
246+
247+
defer f.sendOperationStats(&FileLog{
248+
Operation: "WRITE_AT",
249+
Location: getLocation(bucketName),
250+
Status: &status,
251+
Message: &msg,
252+
}, time.Now())
253+
254+
objectName := getObjectName(f.name)
255+
ctx := context.Background()
256+
rdr, err := f.conn.NewReader(ctx, objectName)
257+
258+
var oldData []byte
259+
if err == nil {
260+
oldData, _ = io.ReadAll(rdr)
261+
_ = rdr.Close()
262+
}
263+
264+
if int64(len(oldData)) < off {
265+
pad := make([]byte, off-int64(len(oldData)))
266+
oldData = append(oldData, pad...)
267+
}
268+
269+
end := off + int64(len(p))
270+
if end > int64(len(oldData)) {
271+
newData := make([]byte, end)
272+
copy(newData, oldData)
273+
copy(newData[off:], p)
274+
oldData = newData
275+
} else {
276+
copy(oldData[off:end], p)
277+
}
278+
279+
w := f.conn.NewWriter(ctx, objectName)
280+
if _, err := w.Write(oldData); err != nil {
281+
_ = w.Close()
282+
msg = fmt.Sprintf("failed to write updated data: %v", err)
283+
f.logger.Errorf(msg)
284+
285+
return 0, err
286+
}
287+
288+
if err := w.Close(); err != nil {
289+
msg = fmt.Sprintf("failed to close writer: %v", err)
290+
f.logger.Errorf(msg)
291+
292+
return 0, err
293+
}
294+
295+
status = statusSuccess
296+
297+
f.logger.Debugf("WriteAt wrote %d bytes at offset %d in %q", len(p), off, f.name)
298+
299+
return len(p), nil
141300
}
142301

143302
func (f *File) sendOperationStats(fl *FileLog, startTime time.Time) {

0 commit comments

Comments
 (0)