From 54dfd6005bcdd599604d42a6771cdf5a1025d7d9 Mon Sep 17 00:00:00 2001 From: Noble Mittal <62551163+beingnoble03@users.noreply.github.com> Date: Sat, 11 Jan 2025 02:01:48 +0530 Subject: [PATCH] Move VDiff related workflow server APIs to `vdiff.go` and add unit tests (#17466) Signed-off-by: Noble Mittal --- go/vt/vtctl/workflow/server.go | 284 ---------------------- go/vt/vtctl/workflow/server_test.go | 253 -------------------- go/vt/vtctl/workflow/vdiff.go | 291 +++++++++++++++++++++++ go/vt/vtctl/workflow/vdiff_test.go | 355 ++++++++++++++++++++++++++++ 4 files changed, 646 insertions(+), 537 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8123416eb41..f27851275b6 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "math" "slices" "sort" "strings" @@ -28,7 +27,6 @@ import ( "text/template" "time" - "github.com/google/uuid" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc/codes" @@ -41,7 +39,6 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -1249,287 +1246,6 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea }) } -// VDiffCreate is part of the vtctlservicepb.VtctldServer interface. -// It passes on the request to the target primary tablets that are -// participating in the given workflow and VDiff. -func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffCreate") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("source_cells", req.SourceCells) - span.Annotate("target_cells", req.TargetCells) - span.Annotate("tablet_types", req.TabletTypes) - span.Annotate("tables", req.Tables) - span.Annotate("auto_retry", req.AutoRetry) - span.Annotate("max_diff_duration", req.MaxDiffDuration) - if req.AutoStart != nil { - span.Annotate("auto_start", req.GetAutoStart()) - } - - var err error - req.Uuid = strings.TrimSpace(req.Uuid) - if req.Uuid == "" { // Generate a UUID - req.Uuid = uuid.New().String() - } else { // Validate UUID if provided - if err = uuid.Validate(req.Uuid); err != nil { - return nil, vterrors.Wrapf(err, "invalid UUID provided: %s", req.Uuid) - } - } - - tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) - - if req.Limit == 0 { // This would produce no useful results - req.Limit = math.MaxInt64 - } - // This is a pointer so there's no ZeroValue in the message - // and an older v18 client will not provide it. - if req.MaxDiffDuration == nil { - req.MaxDiffDuration = &vttimepb.Duration{} - } - // The other vttime.Duration vars should not be nil as the - // client should always provide them, but we check anyway to - // be safe. - if req.FilteredReplicationWaitTime == nil { - // A value of 0 is not valid as the vdiff will never succeed. - req.FilteredReplicationWaitTime = &vttimepb.Duration{ - Seconds: int64(DefaultTimeout.Seconds()), - } - } - if req.WaitUpdateInterval == nil { - req.WaitUpdateInterval = &vttimepb.Duration{} - } - - autoStart := true - if req.AutoStart != nil { - autoStart = req.GetAutoStart() - } - - options := &tabletmanagerdatapb.VDiffOptions{ - PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ - TabletTypes: tabletTypesStr, - SourceCell: strings.Join(req.SourceCells, ","), - TargetCell: strings.Join(req.TargetCells, ","), - }, - CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ - Tables: strings.Join(req.Tables, ","), - AutoRetry: req.AutoRetry, - MaxRows: req.Limit, - TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds, - MaxExtraRowsToCompare: req.MaxExtraRowsToCompare, - UpdateTableStats: req.UpdateTableStats, - MaxDiffSeconds: req.MaxDiffDuration.Seconds, - AutoStart: &autoStart, - }, - ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ - OnlyPks: req.OnlyPKs, - DebugQuery: req.DebugQuery, - MaxSampleRows: req.MaxReportSampleRows, - RowDiffColumnTruncateAt: req.RowDiffColumnTruncateAt, - }, - } - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.CreateAction), - Options: options, - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - if ts.frozen { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s", - req.TargetKeyspace, req.Workflow) - } - - workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { - s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, - "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff create action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffCreateResponse{ - UUID: req.Uuid, - }, nil -} - -// VDiffDelete is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffDelete") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("argument", req.Arg) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.DeleteAction), - ActionArg: req.Arg, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff delete action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffDeleteResponse{}, nil -} - -// VDiffResume is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume") - defer span.Finish() - - targetShards := req.GetTargetShards() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("target_shards", targetShards) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.ResumeAction), - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - if len(targetShards) > 0 { - if err := applyTargetShards(ts, targetShards); err != nil { - return nil, err - } - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff resume action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffResumeResponse{}, nil -} - -// VDiffShow is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("argument", req.Arg) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.ShowAction), - ActionArg: req.Arg, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - output := &vdiffOutput{ - responses: make(map[string]*tabletmanagerdatapb.VDiffResponse, len(ts.targets)), - err: nil, - } - output.err = ts.ForAllTargets(func(target *MigrationTarget) error { - resp, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - output.mu.Lock() - defer output.mu.Unlock() - output.responses[target.GetShard().ShardName()] = resp - return err - }) - if output.err != nil { - s.Logger().Errorf("Error executing vdiff show action: %v", output.err) - return nil, output.err - } - return &vtctldatapb.VDiffShowResponse{ - TabletResponses: output.responses, - }, nil -} - -// VDiffStop is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffStop") - defer span.Finish() - - targetShards := req.GetTargetShards() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("target_shards", targetShards) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.StopAction), - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - if len(targetShards) > 0 { - if err := applyTargetShards(ts, targetShards); err != nil { - return nil, err - } - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff stop action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffStopResponse{}, nil -} - // WorkflowDelete is part of the vtctlservicepb.VtctldServer interface. // It passes on the request to the target primary tablets that are // participating in the given workflow. diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 26d722f1de0..8bb4a06f23a 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -27,7 +27,6 @@ import ( "testing" "time" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/encoding/prototext" @@ -39,7 +38,6 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -191,257 +189,6 @@ func TestCheckReshardingJournalExistsOnTablet(t *testing.T) { } } -// TestVDiffCreate performs some basic tests of the VDiffCreate function -// to ensure that it behaves as expected given a specific request. -func TestVDiffCreate(t *testing.T) { - ctx := context.Background() - workflowName := "wf1" - sourceKeyspace := &testKeyspace{ - KeyspaceName: "source", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "target", - ShardNames: []string{"-80", "80-"}, - } - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - tests := []struct { - name string - req *vtctldatapb.VDiffCreateRequest - wantErr string - }{ - { - name: "no values", - req: &vtctldatapb.VDiffCreateRequest{}, - // We did not provide any keyspace or shard. - wantErr: "FindAllShardsInKeyspace() invalid keyspace name: UnescapeID err: invalid input identifier ''", - }, - { - name: "generated UUID", - req: &vtctldatapb.VDiffCreateRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflowName, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.wantErr == "" { - env.tmc.expectVRQueryResultOnKeyspaceTablets(targetKeyspace.KeyspaceName, &queryResult{ - query: "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", - result: &querypb.QueryResult{}, - }) - } - got, err := env.ws.VDiffCreate(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - return - } - require.NoError(t, err) - require.NotNil(t, got) - // Ensure that we always use a valid UUID. - err = uuid.Validate(got.UUID) - require.NoError(t, err) - }) - } -} - -func TestVDiffResume(t *testing.T) { - ctx := context.Background() - sourceKeyspace := &testKeyspace{ - KeyspaceName: "sourceks", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "targetks", - ShardNames: []string{"-80", "80-"}, - } - workflow := "testwf" - uuid := uuid.New().String() - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - env.tmc.strict = true - action := string(vdiff.ResumeAction) - - tests := []struct { - name string - req *vtctldatapb.VDiffResumeRequest // vtctld requests - expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests - wantErr string - }{ - { - name: "basic resume", // Both target shards - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "resume on first shard", - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: targetKeyspace.ShardNames[:1], - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "resume on invalid shard", - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: []string{"0"}, - Workflow: workflow, - Uuid: uuid, - }, - wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for tab, vdr := range tt.expectedVDiffRequests { - env.tmc.expectVDiffRequest(tab, vdr) - } - got, err := env.ws.VDiffResume(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - } else { - require.NoError(t, err) - require.NotNil(t, got) - } - env.tmc.confirmVDiffRequests(t) - }) - } -} - -func TestVDiffStop(t *testing.T) { - ctx := context.Background() - sourceKeyspace := &testKeyspace{ - KeyspaceName: "sourceks", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "targetks", - ShardNames: []string{"-80", "80-"}, - } - workflow := "testwf" - uuid := uuid.New().String() - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - env.tmc.strict = true - action := string(vdiff.StopAction) - - tests := []struct { - name string - req *vtctldatapb.VDiffStopRequest // vtctld requests - expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests - wantErr string - }{ - { - name: "basic stop", // Both target shards - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "stop on first shard", - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: targetKeyspace.ShardNames[:1], - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "stop on invalid shard", - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: []string{"0"}, - Workflow: workflow, - Uuid: uuid, - }, - wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for tab, vdr := range tt.expectedVDiffRequests { - env.tmc.expectVDiffRequest(tab, vdr) - } - got, err := env.ws.VDiffStop(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - } else { - require.NoError(t, err) - require.NotNil(t, got) - } - env.tmc.confirmVDiffRequests(t) - }) - } -} - func TestMoveTablesComplete(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() diff --git a/go/vt/vtctl/workflow/vdiff.go b/go/vt/vtctl/workflow/vdiff.go index 6be5fe3c3b5..30953868b0b 100644 --- a/go/vt/vtctl/workflow/vdiff.go +++ b/go/vt/vtctl/workflow/vdiff.go @@ -17,16 +17,26 @@ limitations under the License. package workflow import ( + "context" "encoding/json" "math" "sort" "strings" "time" + "github.com/google/uuid" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + vttimepb "vitess.io/vitess/go/vt/proto/vttime" ) // TableSummary aggregates the current state of the table diff from all shards. @@ -278,3 +288,284 @@ func BuildProgressReport(rowsCompared int64, rowsToCompare int64, startedAt stri } return report } + +// VDiffCreate is part of the vtctlservicepb.VtctldServer interface. +// It passes on the request to the target primary tablets that are +// participating in the given workflow and VDiff. +func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffCreate") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("source_cells", req.SourceCells) + span.Annotate("target_cells", req.TargetCells) + span.Annotate("tablet_types", req.TabletTypes) + span.Annotate("tables", req.Tables) + span.Annotate("auto_retry", req.AutoRetry) + span.Annotate("max_diff_duration", req.MaxDiffDuration) + if req.AutoStart != nil { + span.Annotate("auto_start", req.GetAutoStart()) + } + + var err error + req.Uuid = strings.TrimSpace(req.Uuid) + if req.Uuid == "" { // Generate a UUID + req.Uuid = uuid.New().String() + } else { // Validate UUID if provided + if err = uuid.Validate(req.Uuid); err != nil { + return nil, vterrors.Wrapf(err, "invalid UUID provided: %s", req.Uuid) + } + } + + tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) + + if req.Limit == 0 { // This would produce no useful results + req.Limit = math.MaxInt64 + } + // This is a pointer so there's no ZeroValue in the message + // and an older v18 client will not provide it. + if req.MaxDiffDuration == nil { + req.MaxDiffDuration = &vttimepb.Duration{} + } + // The other vttime.Duration vars should not be nil as the + // client should always provide them, but we check anyway to + // be safe. + if req.FilteredReplicationWaitTime == nil { + // A value of 0 is not valid as the vdiff will never succeed. + req.FilteredReplicationWaitTime = &vttimepb.Duration{ + Seconds: int64(DefaultTimeout.Seconds()), + } + } + if req.WaitUpdateInterval == nil { + req.WaitUpdateInterval = &vttimepb.Duration{} + } + + autoStart := true + if req.AutoStart != nil { + autoStart = req.GetAutoStart() + } + + options := &tabletmanagerdatapb.VDiffOptions{ + PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ + TabletTypes: tabletTypesStr, + SourceCell: strings.Join(req.SourceCells, ","), + TargetCell: strings.Join(req.TargetCells, ","), + }, + CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ + Tables: strings.Join(req.Tables, ","), + AutoRetry: req.AutoRetry, + MaxRows: req.Limit, + TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds, + MaxExtraRowsToCompare: req.MaxExtraRowsToCompare, + UpdateTableStats: req.UpdateTableStats, + MaxDiffSeconds: req.MaxDiffDuration.Seconds, + AutoStart: &autoStart, + }, + ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ + OnlyPks: req.OnlyPKs, + DebugQuery: req.DebugQuery, + MaxSampleRows: req.MaxReportSampleRows, + RowDiffColumnTruncateAt: req.RowDiffColumnTruncateAt, + }, + } + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.CreateAction), + Options: options, + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + if ts.frozen { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s", + req.TargetKeyspace, req.Workflow) + } + + workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { + s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff create action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffCreateResponse{ + UUID: req.Uuid, + }, nil +} + +// VDiffDelete is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffDelete") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("argument", req.Arg) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.DeleteAction), + ActionArg: req.Arg, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff delete action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffDeleteResponse{}, nil +} + +// VDiffResume is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume") + defer span.Finish() + + targetShards := req.GetTargetShards() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("target_shards", targetShards) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.ResumeAction), + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + if len(targetShards) > 0 { + if err := applyTargetShards(ts, targetShards); err != nil { + return nil, err + } + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff resume action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffResumeResponse{}, nil +} + +// VDiffShow is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("argument", req.Arg) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.ShowAction), + ActionArg: req.Arg, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + output := &vdiffOutput{ + responses: make(map[string]*tabletmanagerdatapb.VDiffResponse, len(ts.targets)), + err: nil, + } + output.err = ts.ForAllTargets(func(target *MigrationTarget) error { + resp, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + output.mu.Lock() + defer output.mu.Unlock() + output.responses[target.GetShard().ShardName()] = resp + return err + }) + if output.err != nil { + s.Logger().Errorf("Error executing vdiff show action: %v", output.err) + return nil, output.err + } + return &vtctldatapb.VDiffShowResponse{ + TabletResponses: output.responses, + }, nil +} + +// VDiffStop is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffStop") + defer span.Finish() + + targetShards := req.GetTargetShards() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("target_shards", targetShards) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.StopAction), + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + if len(targetShards) > 0 { + if err := applyTargetShards(ts, targetShards); err != nil { + return nil, err + } + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff stop action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffStopResponse{}, nil +} diff --git a/go/vt/vtctl/workflow/vdiff_test.go b/go/vt/vtctl/workflow/vdiff_test.go index e5578afc170..0da4a3ef480 100644 --- a/go/vt/vtctl/workflow/vdiff_test.go +++ b/go/vt/vtctl/workflow/vdiff_test.go @@ -17,13 +17,21 @@ limitations under the License. package workflow import ( + "context" + "fmt" "math" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) func TestBuildProgressReport(t *testing.T) { @@ -134,3 +142,350 @@ func TestBuildProgressReport(t *testing.T) { }) } } + +// TestVDiffCreate performs some basic tests of the VDiffCreate function +// to ensure that it behaves as expected given a specific request. +func TestVDiffCreate(t *testing.T) { + ctx := context.Background() + workflowName := "wf1" + sourceKeyspace := &testKeyspace{ + KeyspaceName: "source", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "target", + ShardNames: []string{"-80", "80-"}, + } + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + tests := []struct { + name string + req *vtctldatapb.VDiffCreateRequest + wantErr string + }{ + { + name: "no values", + req: &vtctldatapb.VDiffCreateRequest{}, + // We did not provide any keyspace or shard. + wantErr: "FindAllShardsInKeyspace() invalid keyspace name: UnescapeID err: invalid input identifier ''", + }, + { + name: "generated UUID", + req: &vtctldatapb.VDiffCreateRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflowName, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.wantErr == "" { + env.tmc.expectVRQueryResultOnKeyspaceTablets(targetKeyspace.KeyspaceName, &queryResult{ + query: "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", + result: &querypb.QueryResult{}, + }) + } + got, err := env.ws.VDiffCreate(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + // Ensure that we always use a valid UUID. + err = uuid.Validate(got.UUID) + require.NoError(t, err) + }) + } +} + +func TestVDiffResume(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.ResumeAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffResumeRequest // vtctld requests + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests + wantErr string + }{ + { + name: "basic resume", // Both target shards + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "resume on first shard", + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: targetKeyspace.ShardNames[:1], + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "resume on invalid shard", + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: []string{"0"}, + Workflow: workflow, + Uuid: uuid, + }, + wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffResume(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +} + +func TestVDiffStop(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.StopAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffStopRequest // vtctld requests + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests + wantErr string + }{ + { + name: "basic stop", // Both target shards + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "stop on first shard", + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: targetKeyspace.ShardNames[:1], + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "stop on invalid shard", + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: []string{"0"}, + Workflow: workflow, + Uuid: uuid, + }, + wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffStop(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +} + +func TestVDiffDelete(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.DeleteAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffDeleteRequest + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse + wantErr string + }{ + { + name: "basic delete", + req: &vtctldatapb.VDiffDeleteRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Arg: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + }, + }, + { + name: "invalid delete", + req: &vtctldatapb.VDiffDeleteRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Arg: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + err: fmt.Errorf("error on invalid delete"), + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + }, + wantErr: "error on invalid delete", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffDelete(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +}