Skip to content

Commit 2df8ad7

Browse files
author
ffffwh
committed
compressing: replace snappy with gzip
1 parent 24118c7 commit 2df8ad7

File tree

2 files changed

+26
-5
lines changed

2 files changed

+26
-5
lines changed

drivers/mysql/common/common.go

+22-3
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package common
22

33
import (
44
"bytes"
5+
compress "compress/gzip"
56
"encoding/gob"
67
"fmt"
78
"github.com/actiontech/dtle/g"
8-
"github.com/golang/snappy"
99
"github.com/pingcap/tidb/types"
1010
"github.com/satori/go.uuid"
1111
"github.com/siddontang/go-mysql/mysql"
12+
"io/ioutil"
1213
"os"
1314
"time"
1415
)
@@ -56,16 +57,34 @@ func EncodeTable(v *Table) ([]byte, error) {
5657
}
5758
return b.Bytes(), nil
5859
}
60+
func Compress(bs []byte) (outBs []byte, err error) {
61+
var buf bytes.Buffer
62+
w, _ := compress.NewWriterLevel(&buf, compress.BestSpeed)
63+
_, err = w.Write(bs)
64+
if err != nil {
65+
return nil, err
66+
}
67+
err = w.Close()
68+
if err != nil {
69+
return nil, err
70+
}
71+
return buf.Bytes(), nil
72+
}
5973
func Encode(v GencodeType) ([]byte, error) {
6074
bs, err := v.Marshal(nil)
6175
if err != nil {
6276
return nil, err
6377
}
64-
return snappy.Encode(nil, bs), nil
78+
79+
return Compress(bs)
6580
}
6681

6782
func Decode(data []byte, out GencodeType) (err error) {
68-
msg, err := snappy.Decode(nil, data)
83+
r, err := compress.NewReader(bytes.NewReader(data))
84+
if err != nil {
85+
return err
86+
}
87+
msg, err := ioutil.ReadAll(r)
6988
if err != nil {
7089
return err
7190
}

drivers/mysql/mysql/extractor.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"sync/atomic"
2828
"time"
2929

30-
"github.com/golang/snappy"
3130
gonats "github.com/nats-io/go-nats"
3231
gomysql "github.com/siddontang/go-mysql/mysql"
3332

@@ -1447,7 +1446,10 @@ func (e *Extractor) encodeAndSendDumpEntry(entry *common.DumpEntry) error {
14471446
if err != nil {
14481447
return err
14491448
}
1450-
txMsg := snappy.Encode(nil, bs)
1449+
txMsg, err := common.Compress(bs)
1450+
if err != nil {
1451+
return errors.Wrap(err, "common.Compress")
1452+
}
14511453
if err := e.publish(ctx, fmt.Sprintf("%s_full", e.subject), txMsg, 0); err != nil {
14521454
return err
14531455
}

0 commit comments

Comments
 (0)