Skip to content

Commit 24d6424

Browse files
author
ffffwh
committed
use context
1 parent 3e38983 commit 24d6424

15 files changed

+66
-56
lines changed

api/handler/v2/database.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package v2
22

33
import (
4+
"context"
45
"fmt"
56
"net/http"
67
"strings"
@@ -123,7 +124,7 @@ func listOracleSchema(logger hclog.Logger, reqParam *models.ListDatabaseSchemasR
123124
}
124125
reqParam.Password = realPwd
125126
}
126-
oracleDb, err := config.NewDB(&config.OracleConfig{
127+
oracleDb, err := config.NewDB(context.TODO(), &config.OracleConfig{
127128
User: reqParam.User,
128129
Password: reqParam.Password,
129130
Host: reqParam.Host,
@@ -206,7 +207,7 @@ func ListDatabaseColumnsV2(c echo.Context) error {
206207
}
207208
reqParam.Password = realPwd
208209
}
209-
oracleDb, err := config.NewDB(&config.OracleConfig{
210+
oracleDb, err := config.NewDB(context.TODO(), &config.OracleConfig{
210211
User: reqParam.User,
211212
Password: reqParam.Password,
212213
Host: reqParam.Host,

driver/common/dumper.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package common
88

99
import (
10+
"context"
1011
"sync"
1112

1213
"github.com/actiontech/dtle/g"
@@ -16,6 +17,7 @@ type GetChunkDataFn func() (nRows int64, err error)
1617
type PrepareFn func() (err error)
1718

1819
type Dumper struct {
20+
Ctx context.Context
1921
Logger g.LoggerType
2022
ChunkSize int64
2123
TableSchema string
@@ -36,9 +38,10 @@ type Dumper struct {
3638
PrepareForDumping PrepareFn
3739
}
3840

39-
func NewDumper(table *Table, chunkSize int64, logger g.LoggerType, memory *int64) *Dumper {
41+
func NewDumper(ctx context.Context, table *Table, chunkSize int64, logger g.LoggerType, memory *int64) *Dumper {
4042

4143
dumper := &Dumper{
44+
Ctx: ctx,
4245
Logger: logger,
4346
TableSchema: table.TableSchema,
4447
TableName: table.TableName,

driver/driver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
560560
return nil, nil, errors.Wrap(err, "SetDriverState")
561561
}
562562

563-
h := newDtleTaskHandle(d.logger, cfg, drivers.TaskStateRunning, time.Now().Round(time.Millisecond))
563+
h := newDtleTaskHandle(d.ctx, d.logger, cfg, drivers.TaskStateRunning, time.Now().Round(time.Millisecond))
564564
h.driverConfig = &common.MySQLDriverConfig{DtleTaskConfig: dtleTaskConfig}
565565
d.tasks.Set(cfg.ID, h)
566566
AllocIdTaskNameToTaskHandler.Set(cfg.AllocID, cfg.Name, cfg.ID, h)

driver/handle.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type taskHandle struct {
4141
driverConfig *common.MySQLDriverConfig
4242
}
4343

44-
func newDtleTaskHandle(logger g.LoggerType, cfg *drivers.TaskConfig, state drivers.TaskState, started time.Time) *taskHandle {
44+
func newDtleTaskHandle(ctx context.Context, logger g.LoggerType, cfg *drivers.TaskConfig, state drivers.TaskState, started time.Time) *taskHandle {
4545
h := &taskHandle{
4646
logger: logger,
4747
stateLock: sync.RWMutex{},
@@ -52,7 +52,7 @@ func newDtleTaskHandle(logger g.LoggerType, cfg *drivers.TaskConfig, state drive
5252
exitResult: nil,
5353
waitCh: make(chan *drivers.ExitResult, 1),
5454
}
55-
h.ctx, h.cancelFunc = context.WithCancel(context.TODO())
55+
h.ctx, h.cancelFunc = context.WithCancel(ctx)
5656
return h
5757
}
5858

@@ -168,7 +168,7 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
168168
case common.TaskTypeSrc:
169169
if h.driverConfig.OracleConfig != nil {
170170
h.logger.Debug("found oracle src", "OracleConfig", h.driverConfig.OracleConfig)
171-
runner, err = extractor.NewExtractorOracle(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh)
171+
runner, err = extractor.NewExtractorOracle(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, h.ctx)
172172
if err != nil {
173173
return nil, errors.Wrap(err, "NewExtractor")
174174
}

driver/mysql/applier.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ func NewApplier(
106106
logger.Info("NewApplier", "job", execCtx.Subject)
107107

108108
a = &Applier{
109-
ctx: ctx,
110109
logger: logger.Named("applier").With("job", execCtx.Subject),
111110
subject: execCtx.Subject,
112111
mysqlContext: cfg,
@@ -125,7 +124,7 @@ func NewApplier(
125124
taskConfig: taskConfig,
126125
}
127126

128-
a.ctx, a.cancelFunc = context.WithCancel(context.TODO())
127+
a.ctx, a.cancelFunc = context.WithCancel(ctx)
129128

130129
stubFullApplyDelayStr := os.Getenv(g.ENV_FULL_APPLY_DELAY)
131130
if stubFullApplyDelayStr == "" {
@@ -668,7 +667,7 @@ func (a *Applier) initDBConnections() (err error) {
668667
}
669668
a.db.SetMaxOpenConns(10 + a.mysqlContext.ParallelWorkers)
670669
a.logger.Debug("CreateConns", "ParallelWorkers", a.mysqlContext.ParallelWorkers)
671-
if a.dbs, err = sql.CreateConns(a.db, a.mysqlContext.ParallelWorkers); err != nil {
670+
if a.dbs, err = sql.CreateConns(a.ctx, a.db, a.mysqlContext.ParallelWorkers); err != nil {
672671
a.logger.Debug("beging connetion mysql 2 create conns err")
673672
return err
674673
}

driver/mysql/applier_gtid_executed.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package mysql
22

33
import (
4-
"context"
54
gosql "database/sql"
65
"fmt"
76
"strconv"
@@ -263,7 +262,7 @@ func (a *ApplierIncr) cleanGtidExecuted(sid uuid.UUID, txSid string) error {
263262
// However, consider `binlog_group_commit_sync_delay > 0`,
264263
// `begin; delete; insert; commit;` (1 TX) is faster than `insert; delete;` (2 TX)
265264
dbApplier := a.dbs[0]
266-
tx, err := dbApplier.Db.BeginTx(context.Background(), &gosql.TxOptions{})
265+
tx, err := dbApplier.Db.BeginTx(a.ctx, &gosql.TxOptions{})
267266
if err != nil {
268267
return err
269268
}
@@ -282,7 +281,7 @@ func (a *ApplierIncr) cleanGtidExecuted(sid uuid.UUID, txSid string) error {
282281
}
283282

284283
a.logger.Debug("compactation gtid. new interval", "intervalStr", intervalStr)
285-
_, err = dbApplier.Db.ExecContext(context.TODO(),
284+
_, err = dbApplier.Db.ExecContext(a.ctx,
286285
fmt.Sprintf("insert into %v.%v values (?,?,0,?)", g.DtleSchemaName, g.GtidExecutedTableV4),
287286
a.subject, sid.Bytes(), intervalStr)
288287
if err != nil {

driver/mysql/binlog/binlog_reader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -959,7 +959,7 @@ func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *common.EntryConte
959959
}
960960
b.logger.Trace("b.HasBigTx.Wait. after")
961961

962-
ev, err := b.binlogStreamer.GetEvent(context.Background())
962+
ev, err := b.binlogStreamer.GetEvent(b.ctx)
963963
if err != nil {
964964
b.logger.Error("error GetEvent.", "err", err)
965965
return err

driver/mysql/dumper.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package mysql
88

99
import (
10+
"context"
1011
"fmt"
1112
"os"
1213
"strings"
@@ -36,10 +37,10 @@ type dumper struct {
3637
sentTableDef bool
3738
}
3839

39-
func NewDumper(db usql.QueryAble, table *common.Table, chunkSize int64,
40+
func NewDumper(ctx context.Context, db usql.QueryAble, table *common.Table, chunkSize int64,
4041
logger g.LoggerType, memory *int64) *dumper {
4142
dumper := &dumper{
42-
common.NewDumper(table, chunkSize, logger, memory),
43+
common.NewDumper(ctx, table, chunkSize, logger, memory),
4344
umconf.EscapeName(table.TableSchema),
4445
umconf.EscapeName(table.TableName),
4546
db,

driver/mysql/extractor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1404,7 +1404,7 @@ func (e *Extractor) mysqlDump() error {
14041404
e.logger.Info("Step n: - scanning table (i of N tables)",
14051405
"n", step, "schema", t.TableSchema, "table", t.TableName, "i", counter, "N", e.tableCount)
14061406

1407-
d := NewDumper(tx, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1)
1407+
d := NewDumper(e.ctx, tx, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1)
14081408
if err := d.Dump(); err != nil {
14091409
e.onError(common.TaskStateDead, err)
14101410
}

driver/mysql/sql/sqlutils.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,15 @@ func CreateDB(mysql_uri string) (*gosql.DB, error) {
148148
return db, nil
149149
}
150150

151-
func CreateConns(db *gosql.DB, count int) ([]*Conn, error) {
151+
func CreateConns(ctx context.Context, db *gosql.DB, count int) ([]*Conn, error) {
152152
conns := make([]*Conn, count)
153153
for i := 0; i < count; i++ {
154-
conn, err := db.Conn(context.Background())
154+
conn, err := db.Conn(ctx)
155155
if err != nil {
156156
return nil, err
157157
}
158158

159-
_, err = conn.ExecContext(context.Background(), "SET @@session.foreign_key_checks = 0")
159+
_, err = conn.ExecContext(ctx, "SET @@session.foreign_key_checks = 0")
160160
if err != nil {
161161
return nil, err
162162
}

driver/oracle/config/db_config.go

+19-16
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type OracleConfig struct {
2020
}
2121

2222
type OracleDB struct {
23+
ctx context.Context
2324
_db *sql.DB
2425
LogMinerConn *sql.Conn
2526
MetaDataConn *sql.Conn
@@ -42,7 +43,7 @@ func OpenDb(meta *OracleConfig) (*sql.DB, error) {
4243
return sqlDb, nil
4344
}
4445

45-
func NewDB(meta *OracleConfig) (*OracleDB, error) {
46+
func NewDB(ctx context.Context, meta *OracleConfig) (*OracleDB, error) {
4647
sqlDB, err := OpenDb(meta)
4748
if err != nil {
4849
return nil, err
@@ -51,13 +52,16 @@ func NewDB(meta *OracleConfig) (*OracleDB, error) {
5152
if err != nil {
5253
return nil, err
5354
}
54-
oracleDB := &OracleDB{_db: sqlDB}
55+
oracleDB := &OracleDB{
56+
ctx: ctx,
57+
_db: sqlDB,
58+
}
5559

56-
oracleDB.LogMinerConn, err = sqlDB.Conn(context.TODO())
60+
oracleDB.LogMinerConn, err = sqlDB.Conn(ctx)
5761
if err != nil {
5862
return nil, fmt.Errorf("error on get connection:%v", err)
5963
}
60-
oracleDB.MetaDataConn, err = sqlDB.Conn(context.TODO())
64+
oracleDB.MetaDataConn, err = sqlDB.Conn(ctx)
6165
if err != nil {
6266
return nil, fmt.Errorf("error on get connection:%v", err)
6367
}
@@ -76,7 +80,7 @@ func (o *OracleDB) Close() error {
7680

7781
func (o *OracleDB) CurrentRedoLogSequenceFp() (string, error) {
7882
query := `SELECT GROUP#, THREAD#, SEQUENCE# FROM V$LOG WHERE STATUS = 'CURRENT'`
79-
rows, err := o.LogMinerConn.QueryContext(context.TODO(), query)
83+
rows, err := o.LogMinerConn.QueryContext(o.ctx, query)
8084
if err != nil {
8185
return "", err
8286
}
@@ -101,12 +105,12 @@ func (o *OracleDB) CurrentRedoLogSequenceFp() (string, error) {
101105
// reset date/timestamp format
102106
func (o *OracleDB) NLS_DATE_FORMAT() error {
103107
SQL_ALTER_DATE_FORMAT := `ALTER SESSION SET NLS_DATE_FORMAT = 'SYYYY-MM-DD HH24:MI:SS'`
104-
_, err := o.LogMinerConn.ExecContext(context.TODO(), SQL_ALTER_DATE_FORMAT)
108+
_, err := o.LogMinerConn.ExecContext(o.ctx, SQL_ALTER_DATE_FORMAT)
105109
if err != nil {
106110
return err
107111
}
108112
NLS_TIMESTAMP_FORMAT := "ALTER SESSION SET NLS_TIMESTAMP_FORMAT = 'SYYYY-MM-DD HH24:MI:SS.FF6'"
109-
_, err = o.LogMinerConn.ExecContext(context.TODO(), NLS_TIMESTAMP_FORMAT)
113+
_, err = o.LogMinerConn.ExecContext(o.ctx, NLS_TIMESTAMP_FORMAT)
110114
if err != nil {
111115
return err
112116
}
@@ -126,7 +130,7 @@ FROM
126130
WHERE
127131
owner = '%s'`, asOfSCN, schema)
128132

129-
rows, err := o.MetaDataConn.QueryContext(context.TODO(), query)
133+
rows, err := o.MetaDataConn.QueryContext(o.ctx, query)
130134
if err != nil {
131135
return nil, err
132136
}
@@ -157,7 +161,7 @@ func (o *OracleDB) GetSchemas() ([]string, error) {
157161
WHERE
158162
USERNAME NOT IN ( 'SYS', 'SYSTEM', 'ANONYMOUS', 'APEX_PUBLIC_USER', 'APEX_040000', 'OUTLN', 'XS$NULL', 'FLOWS_FILES', 'MDSYS', 'CTXSYS', 'XDB', 'HR' )`, asOfSCN)
159163

160-
rows, err := o.MetaDataConn.QueryContext(context.TODO(), query)
164+
rows, err := o.MetaDataConn.QueryContext(o.ctx, query)
161165
if err != nil {
162166
return nil, err
163167
}
@@ -187,7 +191,7 @@ func (o *OracleDB) GetColumns(schema, table string) ([]string, error) {
187191
AND owner = '%s'
188192
ORDER BY COLUMN_ID`, asOfSCN, table, schema)
189193

190-
rows, err := o.MetaDataConn.QueryContext(context.TODO(), query)
194+
rows, err := o.MetaDataConn.QueryContext(o.ctx, query)
191195
if err != nil {
192196
return nil, err
193197
}
@@ -206,11 +210,10 @@ func (o *OracleDB) GetColumns(schema, table string) ([]string, error) {
206210
}
207211

208212
func (o *OracleDB) GetTableDDL(schema, table string) (string, error) {
209-
ctx := context.TODO()
210-
o.MetaDataConn.ExecContext(ctx, `begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;`)
211-
o.MetaDataConn.ExecContext(ctx, `begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;`)
212-
o.MetaDataConn.ExecContext(ctx, `begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SQLTERMINATOR', true); end;`)
213-
row := o.MetaDataConn.QueryRowContext(ctx, fmt.Sprintf(`
213+
o.MetaDataConn.ExecContext(o.ctx, `begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;`)
214+
o.MetaDataConn.ExecContext(o.ctx, `begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;`)
215+
o.MetaDataConn.ExecContext(o.ctx, `begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SQLTERMINATOR', true); end;`)
216+
row := o.MetaDataConn.QueryRowContext(o.ctx, fmt.Sprintf(`
214217
SELECT dbms_metadata.get_ddl('TABLE','%s','%s') FROM dual`, table, schema))
215218
var query string
216219
err := row.Scan(&query)
@@ -231,7 +234,7 @@ func (o *OracleDB) NewTx(ctx context.Context) (*sql.Tx, error) {
231234
func (o *OracleDB) GetCurrentSnapshotSCN() (int64, error) {
232235
var globalSCN int64
233236
// 获取当前 SCN 号
234-
err := o.MetaDataConn.QueryRowContext(context.TODO(), "SELECT CURRENT_SCN FROM V$DATABASE").Scan(&globalSCN)
237+
err := o.MetaDataConn.QueryRowContext(o.ctx, "SELECT CURRENT_SCN FROM V$DATABASE").Scan(&globalSCN)
235238
if err != nil {
236239
return 0, err
237240
}

driver/oracle/extractor/dumper.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ type dumper struct {
1818
snapshotSCN int64
1919
}
2020

21-
func NewDumper(db *config.OracleDB, table *common.Table, chunkSize int64,
21+
func NewDumper(ctx context.Context, db *config.OracleDB, table *common.Table, chunkSize int64,
2222
logger g.LoggerType, memory *int64, scn int64) *dumper {
2323
d := &dumper{
24-
common.NewDumper(table, chunkSize, logger, memory),
24+
common.NewDumper(ctx, table, chunkSize, logger, memory),
2525
db,
2626
scn,
2727
}
@@ -74,7 +74,7 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
7474
keepGoing = false
7575
case <-timer.C:
7676
d.Logger.Debug("resultsChannel full. waiting and ping conn")
77-
errPing := d.db.MetaDataConn.PingContext(context.TODO())
77+
errPing := d.db.MetaDataConn.PingContext(d.Ctx)
7878
if errPing != nil {
7979
d.Logger.Debug("ping query row got error.", "err", errPing)
8080
}
@@ -101,7 +101,7 @@ func (d *dumper) getChunkData() (nRows int64, err error) {
101101

102102
// this must be increased after building query
103103
d.Iteration += 1
104-
rows, err := d.db.MetaDataConn.QueryContext(context.TODO(), query)
104+
rows, err := d.db.MetaDataConn.QueryContext(d.Ctx, query)
105105
if err != nil {
106106
newErr := fmt.Errorf("error at select chunk. err: %v", err)
107107
d.Logger.Error(newErr.Error())

0 commit comments

Comments
 (0)