Skip to content

Commit 2bf6eea

Browse files
authored
*: LOAD DATA support compressed source file (pingcap#42813)
ref pingcap#40499
1 parent ad59092 commit 2bf6eea

File tree

11 files changed

+94
-20
lines changed

11 files changed

+94
-20
lines changed

br/pkg/lightning/importer/chunk_process.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func newChunkProcessor(
6060
) (*chunkProcessor, error) {
6161
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)
6262

63-
reader, err := mydump.OpenReader(ctx, chunk.FileMeta, store)
63+
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store)
6464
if err != nil {
6565
return nil, errors.Trace(err)
6666
}

br/pkg/lightning/importer/get_pre_info.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context,
457457
// ReadFirstNRowsByFileMeta reads the first N rows of an data file.
458458
// It implements the PreImportInfoGetter interface.
459459
func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) {
460-
reader, err := mydump.OpenReader(ctx, dataFileMeta, p.srcStorage)
460+
reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage)
461461
if err != nil {
462462
return nil, nil, errors.Trace(err)
463463
}
@@ -606,7 +606,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
606606
return resultIndexRatio, isRowOrdered, nil
607607
}
608608
sampleFile := tableMeta.DataFiles[0].FileMeta
609-
reader, err := mydump.OpenReader(ctx, sampleFile, p.srcStorage)
609+
reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage)
610610
if err != nil {
611611
return 0.0, false, errors.Trace(err)
612612
}

br/pkg/lightning/mydump/parser.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -652,8 +652,11 @@ func ReadUntil(parser Parser, pos int64) error {
652652
}
653653

654654
// OpenReader opens a reader for the given file and storage.
655-
func OpenReader(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (
656-
reader storage.ReadSeekCloser, err error) {
655+
func OpenReader(
656+
ctx context.Context,
657+
fileMeta *SourceFileMeta,
658+
store storage.ExternalStorage,
659+
) (reader storage.ReadSeekCloser, err error) {
657660
switch {
658661
case fileMeta.Type == SourceTypeParquet:
659662
reader, err = OpenParquetReader(ctx, store, fileMeta.Path, fileMeta.FileSize)

br/pkg/lightning/mydump/router.go

+15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mydump
22

33
import (
44
"net/url"
5+
"path/filepath"
56
"regexp"
67
"strconv"
78
"strings"
@@ -128,6 +129,20 @@ func (s SourceType) String() string {
128129
}
129130
}
130131

132+
// ParseCompressionOnFileExtension parses the compression type from the file extension.
133+
func ParseCompressionOnFileExtension(filename string) Compression {
134+
fileExt := strings.ToLower(filepath.Ext(filename))
135+
if len(fileExt) == 0 {
136+
return CompressionNone
137+
}
138+
tp, err := parseCompressionType(fileExt[1:])
139+
if err != nil {
140+
// file extension is not a compression type, just ignore it
141+
return CompressionNone
142+
}
143+
return tp
144+
}
145+
131146
func parseCompressionType(t string) (Compression, error) {
132147
switch strings.ToLower(strings.TrimSpace(t)) {
133148
case "gz", "gzip":

br/pkg/storage/compress.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (w *withCompression) Open(ctx context.Context, path string) (ExternalFileRe
4646
if err != nil {
4747
return nil, errors.Trace(err)
4848
}
49-
uncompressReader, err := newInterceptReader(fileReader, w.compressType)
49+
uncompressReader, err := InterceptDecompressReader(fileReader, w.compressType)
5050
if err != nil {
5151
return nil, errors.Trace(err)
5252
}
@@ -87,8 +87,11 @@ type compressReader struct {
8787
io.Closer
8888
}
8989

90-
// nolint:interfacer
91-
func newInterceptReader(fileReader ExternalFileReader, compressType CompressType) (ExternalFileReader, error) {
90+
// InterceptDecompressReader intercepts the reader and wraps it with a decompress
91+
// reader on the given io.ReadSeekCloser. Note that the returned
92+
// io.ReadSeekCloser does not have the property that Seek(0, io.SeekCurrent)
93+
// equals total bytes Read() if the decompress reader is used.
94+
func InterceptDecompressReader(fileReader io.ReadSeekCloser, compressType CompressType) (io.ReadSeekCloser, error) {
9295
if compressType == NoCompression {
9396
return fileReader, nil
9497
}
@@ -114,7 +117,7 @@ func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType Compr
114117
Closer: fileReader,
115118
}
116119
}
117-
return newInterceptReader(newFileReader, compressType)
120+
return InterceptDecompressReader(newFileReader, compressType)
118121
}
119122

120123
func (c *compressReader) Seek(offset int64, whence int) (int64, error) {

br/pkg/storage/storage.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ type ExternalStorage interface {
107107

108108
// ExternalFileReader represents the streaming external file reader.
109109
type ExternalFileReader interface {
110-
io.ReadCloser
111-
io.Seeker
110+
io.ReadSeekCloser
112111
}
113112

114113
// ExternalFileWriter represents the streaming external file writer.

executor/importer/import.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -669,9 +669,11 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
669669
if err3 != nil {
670670
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "failed to read file size by seek in LOAD DATA")
671671
}
672+
compressTp := mydump.ParseCompressionOnFileExtension(path)
672673
dataFiles = append(dataFiles, &mydump.SourceFileMeta{
673-
Path: path,
674-
FileSize: size,
674+
Path: path,
675+
FileSize: size,
676+
Compression: compressTp,
675677
})
676678
} else {
677679
commonPrefix := path[:idx]
@@ -685,9 +687,11 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
685687
if !match {
686688
return nil
687689
}
690+
compressTp := mydump.ParseCompressionOnFileExtension(remotePath)
688691
dataFiles = append(dataFiles, &mydump.SourceFileMeta{
689-
Path: remotePath,
690-
FileSize: size,
692+
Path: remotePath,
693+
FileSize: size,
694+
Compression: compressTp,
691695
})
692696
return nil
693697
})
@@ -708,7 +712,7 @@ func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo {
708712
f := e.dataFiles[i]
709713
result = append(result, LoadDataReaderInfo{
710714
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
711-
fileReader, err2 := e.dataStore.Open(ctx, f.Path)
715+
fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore)
712716
if err2 != nil {
713717
return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "Please check the INFILE path is correct")
714718
}

executor/importer/table_import.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ var _ io.Closer = &tableImporter{}
198198
func (ti *tableImporter) getParser(ctx context.Context, chunk *checkpoints.ChunkCheckpoint) (mydump.Parser, error) {
199199
info := LoadDataReaderInfo{
200200
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
201-
reader, err := mydump.OpenReader(ctx, chunk.FileMeta, ti.dataStore)
201+
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, ti.dataStore)
202202
if err != nil {
203203
return nil, errors.Trace(err)
204204
}

executor/loadremotetest/multi_file_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@ package loadremotetest
1616

1717
import (
1818
"bytes"
19+
"compress/gzip"
1920
"fmt"
2021
"strconv"
2122

2223
"github.com/fsouza/fake-gcs-server/fakestorage"
2324
"github.com/pingcap/tidb/testkit"
25+
"github.com/stretchr/testify/require"
2426
)
2527

2628
func (s *mockGCSSuite) TestFilenameAsterisk() {
@@ -131,3 +133,41 @@ func (s *mockGCSSuite) TestMultiBatchWithIgnoreLines() {
131133
"13", "14", "15", "16", "17", "18", "19", "20",
132134
))
133135
}
136+
137+
func (s *mockGCSSuite) TestMixedCompression() {
138+
s.tk.MustExec("DROP DATABASE IF EXISTS multi_load;")
139+
s.tk.MustExec("CREATE DATABASE multi_load;")
140+
s.tk.MustExec("CREATE TABLE multi_load.t (i INT PRIMARY KEY, s varchar(32));")
141+
142+
// gzip content
143+
var buf bytes.Buffer
144+
w := gzip.NewWriter(&buf)
145+
_, err := w.Write([]byte("1\ttest1\n" +
146+
"2\ttest2"))
147+
require.NoError(s.T(), err)
148+
err = w.Close()
149+
require.NoError(s.T(), err)
150+
151+
s.server.CreateObject(fakestorage.Object{
152+
ObjectAttrs: fakestorage.ObjectAttrs{
153+
BucketName: "test-multi-load",
154+
Name: "compress.001.tsv.gz",
155+
},
156+
Content: buf.Bytes(),
157+
})
158+
s.server.CreateObject(fakestorage.Object{
159+
ObjectAttrs: fakestorage.ObjectAttrs{
160+
BucketName: "test-multi-load",
161+
Name: "compress.002.tsv",
162+
},
163+
Content: []byte("3\ttest3\n" +
164+
"4\ttest4"),
165+
})
166+
167+
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-multi-load/compress.*?endpoint=%s'
168+
INTO TABLE multi_load.t WITH thread=1;`, gcsEndpoint)
169+
s.tk.MustExec(sql)
170+
s.tk.MustQuery("SELECT * FROM multi_load.t;").Check(testkit.Rows(
171+
"1 test1", "2 test2", "3 test3", "4 test4",
172+
))
173+
}

server/BUILD.bazel

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ go_library(
2828
visibility = ["//visibility:public"],
2929
deps = [
3030
"//autoid_service",
31+
"//br/pkg/lightning/mydump",
32+
"//br/pkg/storage",
3133
"//config",
3234
"//ddl",
3335
"//domain",

server/conn.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ import (
5656

5757
"github.com/pingcap/errors"
5858
"github.com/pingcap/failpoint"
59+
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
60+
"github.com/pingcap/tidb/br/pkg/storage"
5961
"github.com/pingcap/tidb/config"
6062
"github.com/pingcap/tidb/domain/infosync"
6163
"github.com/pingcap/tidb/errno"
@@ -1580,8 +1582,13 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut
15801582
if loadDataWorker == nil {
15811583
return errors.New("load data info is empty")
15821584
}
1583-
1584-
err := cc.writeReq(ctx, loadDataWorker.GetInfilePath())
1585+
infile := loadDataWorker.GetInfilePath()
1586+
compressTp := mydump.ParseCompressionOnFileExtension(infile)
1587+
compressTp2, err := mydump.ToStorageCompressType(compressTp)
1588+
if err != nil {
1589+
return err
1590+
}
1591+
err = cc.writeReq(ctx, infile)
15851592
if err != nil {
15861593
return err
15871594
}
@@ -1628,7 +1635,8 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut
16281635
ctx = kv.WithInternalSourceType(ctx, kv.InternalLoadData)
16291636
_, err = loadDataWorker.Load(ctx, []importer.LoadDataReaderInfo{{
16301637
Opener: func(_ context.Context) (io.ReadSeekCloser, error) {
1631-
return executor.NewSimpleSeekerOnReadCloser(r), nil
1638+
addedSeekReader := executor.NewSimpleSeekerOnReadCloser(r)
1639+
return storage.InterceptDecompressReader(addedSeekReader, compressTp2)
16321640
}}})
16331641
_ = r.Close()
16341642
wg.Wait()

0 commit comments

Comments
 (0)