diff --git a/DEPS.bzl b/DEPS.bzl index c7411c13ab79..c0abf84c5457 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -10577,10 +10577,10 @@ def go_deps(): ], build_file_proto_mode = "default", importpath = "go.etcd.io/raft/v3", - sha256 = "7540ce70b9c79987eb5a09f693302931ac88e34757397451c5d44c202649adb3", - strip_prefix = "go.etcd.io/raft/v3@v3.0.0-20230626154957-a10cd4571633", + sha256 = "292954320a69953ac8366cd7a83e4a50b1cfd858643a4cdf9947ab9607f1a142", + strip_prefix = "go.etcd.io/raft/v3@v3.0.0-20230717153924-72a6e6c9f3ee", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/raft/v3/io_etcd_go_raft_v3-v3.0.0-20230626154957-a10cd4571633.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/raft/v3/io_etcd_go_raft_v3-v3.0.0-20230717153924-72a6e6c9f3ee.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index be1b0bae93ba..bfeaddc71aa0 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -1023,7 +1023,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/client/pkg/v3/io_etcd_go_etcd_client_pkg_v3-v3.5.0.zip": "c0ca209767c5734c6ed023888ba5be02aab5bd3c4d018999467f2bfa8bf65ee3", "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/client/v2/io_etcd_go_etcd_client_v2-v2.305.0.zip": "91fcb507fe8c193844b56bfb6c8741aaeb6ffa11ee9043de2af0f141173679f3", "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/etcd/io_etcd_go_etcd-v0.5.0-alpha.5.0.20200910180754-dd1b699fc489.zip": "d982ee501979b41b68625693bad77d15e4ae79ab9d0eae5f6028205f96a74e49", - "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/raft/v3/io_etcd_go_raft_v3-v3.0.0-20230626154957-a10cd4571633.zip": "7540ce70b9c79987eb5a09f693302931ac88e34757397451c5d44c202649adb3", + "https://storage.googleapis.com/cockroach-godeps/gomod/go.etcd.io/raft/v3/io_etcd_go_raft_v3-v3.0.0-20230717153924-72a6e6c9f3ee.zip": "292954320a69953ac8366cd7a83e4a50b1cfd858643a4cdf9947ab9607f1a142", "https://storage.googleapis.com/cockroach-godeps/gomod/go.mongodb.org/mongo-driver/org_mongodb_go_mongo_driver-v1.5.1.zip": "446cff132e82c64af7ffcf48e268eb16ec81f694914aa6baecb06cbbae1be0d7", "https://storage.googleapis.com/cockroach-godeps/gomod/go.mozilla.org/pkcs7/org_mozilla_go_pkcs7-v0.0.0-20200128120323-432b2356ecb1.zip": "3c4c1667907ff3127e371d44696326bad9e965216d4257917ae28e8b82a9e08d", "https://storage.googleapis.com/cockroach-godeps/gomod/go.opencensus.io/io_opencensus_go-v0.24.0.zip": "203a767d7f8e7c1ebe5588220ad168d1e15b14ae70a636de7ca9a4a88a7e0d0c", diff --git a/go.mod b/go.mod index e88253890bcf..9d47f6a9da32 100644 --- a/go.mod +++ b/go.mod @@ -219,7 +219,7 @@ require ( github.com/xdg-go/scram v1.1.2 github.com/xdg-go/stringprep v1.0.4 github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292 - go.etcd.io/raft/v3 v3.0.0-20230626154957-a10cd4571633 + go.etcd.io/raft/v3 v3.0.0-20230717153924-72a6e6c9f3ee go.opentelemetry.io/otel v1.0.0-RC3 go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC3 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.0-RC3 diff --git a/go.sum b/go.sum index 1e01c2b29833..77761eb86ef7 100644 --- a/go.sum +++ b/go.sum @@ -2320,8 +2320,8 @@ go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3C go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= -go.etcd.io/raft/v3 v3.0.0-20230626154957-a10cd4571633 h1:C6cmDqdkeGrJ6fT7OFCbnPauTEJvFY+HZSWk2N0PUvI= -go.etcd.io/raft/v3 v3.0.0-20230626154957-a10cd4571633/go.mod h1:tP6U+sRzrl75ltgmFcdZg9reZVEyM3vKTxAWmwpHtB8= +go.etcd.io/raft/v3 v3.0.0-20230717153924-72a6e6c9f3ee h1:7KfWqaO+jirXdEFWpwK1fa2NIm1nmBf3fP0AH6NydCk= +go.etcd.io/raft/v3 v3.0.0-20230717153924-72a6e6c9f3ee/go.mod h1:tP6U+sRzrl75ltgmFcdZg9reZVEyM3vKTxAWmwpHtB8= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 3a81b7ff396f..200d3f3f5300 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -221,6 +222,9 @@ func backup( // Create a channel that is large enough that it does not block. perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, numTotalSpans) storePerNodeProgressLoop := func(ctx context.Context) error { + if !execCtx.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V23_1) { + return nil + } for { select { case prog, ok := <-perNodeProgressCh: @@ -477,6 +481,11 @@ func (b *backupResumer) ForceRealSpan() bool { return true } +// DumpTraceAfterRun implements the TraceableJob interface. +func (b *backupResumer) DumpTraceAfterRun() bool { + return true +} + // Resume is part of the jobs.Resumer interface. func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { // The span is finished by the registry executing the job. diff --git a/pkg/ccl/backupccl/file_sst_sink.go b/pkg/ccl/backupccl/file_sst_sink.go index fc0ff88b937a..7763ae3a1e15 100644 --- a/pkg/ccl/backupccl/file_sst_sink.go +++ b/pkg/ccl/backupccl/file_sst_sink.go @@ -47,17 +47,25 @@ type fileSSTSink struct { out io.WriteCloser outName string - flushedFiles []backuppb.BackupManifest_File - flushedSize int64 + flushedFiles []backuppb.BackupManifest_File + flushedSize int64 + + // flushedRevStart is the earliest start time of the export responses + // written to this sink since the last flush. Resets on each flush. flushedRevStart hlc.Timestamp - completedSpans int32 + // completedSpans contain the number of completed spans since the last + // flush. This counter resets on each flush. + completedSpans int32 + + // stats contain statistics about the actions of the fileSSTSink over its + // entire lifespan. stats struct { - files int - flushes int - oooFlushes int - sizeFlushes int - spanGrows int + files int // number of files created. + flushes int // number of flushes. + oooFlushes int // number of out of order flushes. + sizeFlushes int // number of flushes due to file exceeding targetFileSize. + spanGrows int // number of times a span was extended. } } diff --git a/pkg/ccl/backupccl/file_sst_sink_test.go b/pkg/ccl/backupccl/file_sst_sink_test.go index 7ddba58970e2..c4abb644ef32 100644 --- a/pkg/ccl/backupccl/file_sst_sink_test.go +++ b/pkg/ccl/backupccl/file_sst_sink_test.go @@ -12,20 +12,28 @@ import ( "bytes" "context" "fmt" + "reflect" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/gogo/protobuf/types" "github.com/stretchr/testify/require" ) @@ -41,30 +49,6 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) { tc, sqlDB, _, cleanup := backupRestoreTestSetup(t, singleNode, 1, InitManualReplication) defer cleanup() - store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///0", - base.ExternalIODirConfig{}, - tc.Servers[0].ClusterSettings(), - blobs.TestEmptyBlobClientFactory, - username.RootUserName(), - tc.Servers[0].InternalDB().(isql.DB), - nil, /* limiters */ - cloud.NilMetrics, - ) - require.NoError(t, err) - sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '20B'`) - - // Never block. - progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, 10) - - sinkConf := sstSinkConf{ - id: 1, - enc: nil, - progCh: progCh, - settings: &tc.Servers[0].ClusterSettings().SV, - } - - sink := makeFileSSTSink(sinkConf, store) - getKeys := func(prefix string, n int) []byte { var b bytes.Buffer sst := storage.MakeBackupSSTWriter(ctx, nil, &b) @@ -115,13 +99,17 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) { atKeyBoundary: true, } + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '20B'`) + + sink, _ := fileSSTSinkTestSetUp(ctx, t, tc, sqlDB) + require.NoError(t, sink.write(ctx, exportResponse1)) require.NoError(t, sink.write(ctx, exportResponse2)) - close(progCh) + close(sink.conf.progCh) var progs []execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - for p := range progCh { + for p := range sink.conf.progCh { progs = append(progs, p) } @@ -135,3 +123,837 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) { // file in the progress details. require.Equal(t, 1, len(progDetails.Files)) } + +// TestFileSSTSinkWrite tests the contents of flushed files and the internal +// unflushed files of the FileSSTSink under different write scenarios. Each test +// writes a sequence of exportedSpans into a fileSSTSink. The test then verifies +// the spans of the flushed files and the unflushed files still left in the +// sink, as well as makes sure all keys in each file fall within the span +// boundaries. +func TestFileSSTSinkWrite(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, sqlDB, _, cleanup := backupRestoreTestSetup(t, singleNode, 1, InitManualReplication) + defer cleanup() + + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '10KB'`) + + type testCase struct { + name string + exportSpans []exportedSpan + flushedSpans []roachpb.Spans + unflushedSpans []roachpb.Spans + skipReason string + } + + for _, tt := range []testCase{ + { + name: "out-of-order-key-boundary", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", true).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "c", timestamp: 10}}).build(), + newExportedSpanBuilder("b", "d", true).withKVs([]kvAndTS{{key: "b", timestamp: 10}, {key: "d", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{{roachpb.Span{Key: []byte("a"), EndKey: []byte("c")}}}, + unflushedSpans: []roachpb.Spans{{roachpb.Span{Key: []byte("b"), EndKey: []byte("d")}}}, + }, + { + // Test that even if the most recently ingested export span does not + // end at a key boundary, a flush will still occur on the writing of + // an out-of-order export span. + name: "out-of-order-not-key-boundary", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", false).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "c", timestamp: 10}}).build(), + newExportedSpanBuilder("b", "d", true).withKVs([]kvAndTS{{key: "b", timestamp: 10}, {key: "d", timestamp: 10}}).build(), + newExportedSpanBuilder("c", "e", true).withKVs([]kvAndTS{{key: "c", timestamp: 9}, {key: "e", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{ + {roachpb.Span{Key: []byte("a"), EndKey: []byte("c")}}, + {roachpb.Span{Key: []byte("b"), EndKey: []byte("d")}}, + }, + unflushedSpans: []roachpb.Spans{{roachpb.Span{Key: []byte("c"), EndKey: []byte("e")}}}, + }, + { + name: "extend-key-boundary-1-file", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", true).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).build(), + newExportedSpanBuilder("c", "e", true).withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{}, + unflushedSpans: []roachpb.Spans{{{Key: []byte("a"), EndKey: []byte("e")}}}, + }, + { + name: "extend-key-boundary-2-files", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", true).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).build(), + newExportedSpanBuilder("c", "e", true).withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}).build(), + newExportedSpanBuilder("e", "g", true).withKVs([]kvAndTS{{key: "e", timestamp: 10}, {key: "f", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{}, + unflushedSpans: []roachpb.Spans{{{Key: []byte("a"), EndKey: []byte("g")}}}, + }, + { + name: "extend-not-key-boundary", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", false).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "c", timestamp: 10}}).build(), + newExportedSpanBuilder("c", "e", true).withKVs([]kvAndTS{{key: "c", timestamp: 9}, {key: "d", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{}, + unflushedSpans: []roachpb.Spans{{{Key: []byte("a"), EndKey: []byte("e")}}}, + }, + { + // TODO(rui): currently it is possible to make the sink error if we + // write different times of the revision history for the same key + // out of order. + // Issue: https://github.com/cockroachdb/cockroach/issues/105372 + name: "extend-same-key", + skipReason: "incorrectly fails with pebble: keys must be added in strictly increasing order", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "a", false).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "a", timestamp: 9}}).build(), + newExportedSpanBuilder("a", "a", false).withKVs([]kvAndTS{{key: "a", timestamp: 5}, {key: "a", timestamp: 4}}).build(), + newExportedSpanBuilder("a", "a", false).withKVs([]kvAndTS{{key: "a", timestamp: 8}, {key: "a", timestamp: 7}}).build(), + }, + flushedSpans: []roachpb.Spans{}, + unflushedSpans: []roachpb.Spans{{{Key: []byte("a"), EndKey: []byte("e")}}}, + }, + { + name: "extend-metadata-same-timestamp", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", true). + withKVs([]kvAndTS{{key: "a", timestamp: 5}, {key: "b", timestamp: 5}}). + withStartTime(5). + withEndTime(10). + build(), + newExportedSpanBuilder("c", "e", true). + withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}). + withStartTime(5). + withEndTime(10). + build(), + }, + flushedSpans: []roachpb.Spans{}, + unflushedSpans: []roachpb.Spans{ + {{Key: []byte("a"), EndKey: []byte("e")}}, + }, + }, + { + name: "no-extend-metadata-timestamp-mismatch", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", true). + withKVs([]kvAndTS{{key: "a", timestamp: 5}, {key: "b", timestamp: 5}}). + withEndTime(5). + build(), + newExportedSpanBuilder("c", "e", true). + withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}). + withStartTime(5). + withEndTime(10). + build(), + }, + flushedSpans: []roachpb.Spans{}, + unflushedSpans: []roachpb.Spans{ + {{Key: []byte("a"), EndKey: []byte("c")}, {Key: []byte("c"), EndKey: []byte("e")}}, + }, + }, + { + name: "size-flush", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", true).withKVs([]kvAndTS{{key: "a", timestamp: 10, value: make([]byte, 20<<20)}, {key: "b", timestamp: 10}}).build(), + newExportedSpanBuilder("d", "f", true).withKVs([]kvAndTS{{key: "d", timestamp: 10}, {key: "e", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{ + {{Key: []byte("a"), EndKey: []byte("c")}}, + }, + unflushedSpans: []roachpb.Spans{ + {{Key: []byte("d"), EndKey: []byte("f")}}, + }, + }, + { + name: "no-size-flush-if-not-at-boundary", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("a", "c", false).withKVs([]kvAndTS{{key: "a", timestamp: 10, value: make([]byte, 20<<20)}, {key: "b", timestamp: 10}}).build(), + newExportedSpanBuilder("d", "f", false).withKVs([]kvAndTS{{key: "d", timestamp: 10}, {key: "e", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{}, + unflushedSpans: []roachpb.Spans{ + {{Key: []byte("a"), EndKey: []byte("c")}, {Key: []byte("d"), EndKey: []byte("f")}}, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + if tt.skipReason != "" { + skip.IgnoreLint(t, tt.skipReason) + } + + sink, store := fileSSTSinkTestSetUp(ctx, t, tc, sqlDB) + defer func() { + require.NoError(t, sink.Close()) + }() + + for _, es := range tt.exportSpans { + require.NoError(t, sink.write(ctx, es)) + } + + progress := make([]backuppb.BackupManifest_File, 0) + + Loop: + for { + select { + case p := <-sink.conf.progCh: + var progDetails backuppb.BackupManifest_Progress + if err := types.UnmarshalAny(&p.ProgressDetails, &progDetails); err != nil { + t.Fatal(err) + } + + progress = append(progress, progDetails.Files...) + default: + break Loop + } + } + + // progCh contains the files that have already been created with + // flushes. Verify the contents. + require.NoError(t, checkFiles(ctx, store, progress, tt.flushedSpans)) + + // flushedFiles contain the files that are in queue to be created on the + // next flush. Save these and then flush the sink to check their contents. + var actualUnflushedFiles []backuppb.BackupManifest_File + actualUnflushedFiles = append(actualUnflushedFiles, sink.flushedFiles...) + require.NoError(t, sink.flush(ctx)) + require.NoError(t, checkFiles(ctx, store, actualUnflushedFiles, tt.unflushedSpans)) + require.Empty(t, sink.flushedFiles) + }) + } + +} + +// TestFileSSTSinkStats tests the internal counters and stats of the FileSSTSink under +// different write scenarios. +func TestFileSSTSinkStats(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, sqlDB, _, cleanup := backupRestoreTestSetup(t, singleNode, 1, InitManualReplication) + defer cleanup() + + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '10KB'`) + + sink, _ := fileSSTSinkTestSetUp(ctx, t, tc, sqlDB) + + defer func() { + require.NoError(t, sink.Close()) + }() + + type sinkStats struct { + flushedRevStart hlc.Timestamp + completedSpans int32 + files int + flushes int + oooFlushes int + sizeFlushes int + spanGrows int + } + + type inputAndExpectedStats struct { + input exportedSpan + expectedStats sinkStats + } + + // no extends, extends, flush due to size, flush out of order + inputs := []inputAndExpectedStats{ + { + // Write the first exported span to the sink. + newExportedSpanBuilder("a", "c", true).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).build(), + sinkStats{hlc.Timestamp{}, 1, 1, 0, 0, 0, 0}, + }, + { + // Write another exported span after the first span that doesn't + // extend the previous span. This ES also has a revStartTime. + newExportedSpanBuilder("d", "e", true).withKVs([]kvAndTS{{key: "d", timestamp: 10}}).withRevStartTime(5).build(), + sinkStats{hlc.Timestamp{WallTime: 5}, 2, 2, 0, 0, 0, 0}}, + { + // Write an exported span that extends the previous span. This ES + // also has a later revStartTime. + newExportedSpanBuilder("e", "f", true).withKVs([]kvAndTS{{key: "e", timestamp: 10}}).withRevStartTime(10).build(), + sinkStats{hlc.Timestamp{WallTime: 10}, 3, 3, 0, 0, 0, 1}}, + { + // Write an exported span that comes after all spans so far. This span has enough data for a size flush. + newExportedSpanBuilder("g", "h", true).withKVs([]kvAndTS{{key: "g", timestamp: 10, value: make([]byte, 20<<20)}}).build(), + sinkStats{hlc.Timestamp{WallTime: 0}, 0, 4, 1, 0, 1, 1}}, + { + // Write the first exported span after the flush. + newExportedSpanBuilder("i", "k", true).withKVs([]kvAndTS{{key: "i", timestamp: 10}, {key: "j", timestamp: 10}}).build(), + sinkStats{hlc.Timestamp{}, 1, 5, 1, 0, 1, 1}}, + { + // Write another exported span that causes an out of order flush. + newExportedSpanBuilder("j", "l", true).withKVs([]kvAndTS{{key: "j", timestamp: 10}, {key: "k", timestamp: 10}}).build(), + sinkStats{hlc.Timestamp{}, 1, 6, 2, 1, 1, 1}}, + } + + for _, input := range inputs { + require.NoError(t, sink.write(ctx, input.input)) + + actualStats := sinkStats{ + flushedRevStart: sink.flushedRevStart, + completedSpans: sink.completedSpans, + files: sink.stats.files, + flushes: sink.stats.flushes, + oooFlushes: sink.stats.oooFlushes, + sizeFlushes: sink.stats.sizeFlushes, + spanGrows: sink.stats.spanGrows, + } + + require.Equal(t, input.expectedStats, actualStats, "stats after write for span %v", input.input.metadata.Span) + } + + require.NoError(t, sink.flush(ctx)) +} + +func TestFileSSTSinkCopyPointKeys(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + + type testInput struct { + input []kvAndTS + expectErr string + } + + type testCase struct { + name string + inputs []testInput + } + + for _, tt := range []testCase{ + { + name: "single-exported-span", + inputs: []testInput{ + { + input: []kvAndTS{ + {key: "a", value: []byte("1"), timestamp: 10}, + {key: "a", value: []byte("2"), timestamp: 9}, + {key: "b", value: []byte("2"), timestamp: 10}, + }, + }, + }, + }, + { + name: "multiple-exported-spans", + inputs: []testInput{ + { + input: []kvAndTS{ + {key: "a", value: []byte("1"), timestamp: 10}, + {key: "a", value: []byte("2"), timestamp: 9}, + {key: "b", value: []byte("2"), timestamp: 10}, + }, + }, + { + input: []kvAndTS{ + {key: "c", value: []byte("3"), timestamp: 10}, + {key: "d", value: []byte("4"), timestamp: 9}, + {key: "e", value: []byte("5"), timestamp: 8}, + }, + }, + { + input: []kvAndTS{ + {key: "e", value: []byte("3"), timestamp: 6}, + {key: "f", value: []byte("4"), timestamp: 10}, + {key: "g", value: []byte("5"), timestamp: 10}, + }, + }, + }, + }, + { + name: "out-of-order-key", + inputs: []testInput{ + { + input: []kvAndTS{ + {key: "a", value: []byte("1"), timestamp: 10}, + {key: "c", value: []byte("2"), timestamp: 10}, + }, + }, + { + input: []kvAndTS{ + {key: "b", value: []byte("3"), timestamp: 10}, + {key: "d", value: []byte("4"), timestamp: 10}, + }, + expectErr: "keys must be added in strictly increasing order", + }, + }, + }, + { + name: "out-of-order-timestamp", + inputs: []testInput{ + { + input: []kvAndTS{ + {key: "a", value: []byte("1"), timestamp: 10}, + {key: "b", value: []byte("2"), timestamp: 10}, + }, + }, + { + input: []kvAndTS{ + {key: "b", value: []byte("3"), timestamp: 11}, + {key: "c", value: []byte("4"), timestamp: 10}, + }, + expectErr: "keys must be added in strictly increasing order", + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + buf := &bytes.Buffer{} + sst := storage.MakeBackupSSTWriter(ctx, settings, buf) + sink := fileSSTSink{sst: sst} + compareSST := true + + for _, input := range tt.inputs { + kvs := input.input + // Add a range key in the input as well. + es := newExportedSpanBuilder(kvs[0].key, kvs[len(kvs)-1].key, false). + withKVs(kvs). + withRangeKeys([]rangeKeyAndTS{{"a", "z", 10}}). + build() + err := sink.copyPointKeys(es.dataSST) + if input.expectErr != "" { + // Do not compare resulting SSTs if we expect errors. + require.ErrorContains(t, err, input.expectErr) + compareSST = false + } else { + require.NoError(t, err) + } + } + + sst.Close() + + if !compareSST { + return + } + + var expected []kvAndTS + for _, input := range tt.inputs { + expected = append(expected, input.input...) + } + + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + + iter, err := storage.NewMemSSTIterator(buf.Bytes(), false, iterOpts) + if err != nil { + t.Fatal(err) + } + defer iter.Close() + + var actual []kvAndTS + for iter.SeekGE(storage.MVCCKey{}); ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + key := iter.UnsafeKey() + value, err := iter.UnsafeValue() + if err != nil { + t.Fatal(err) + } + + kv := kvAndTS{key: string(key.Key), timestamp: key.Timestamp.WallTime} + kv.value = append(kv.value, value...) + + actual = append(actual, kv) + } + + require.Equal(t, expected, actual) + }) + } +} + +func TestFileSSTSinkCopyRangeKeys(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + + type testInput struct { + input []rangeKeyAndTS + expectErr string + } + + type testCase struct { + name string + inputs []testInput + } + + for _, tt := range []testCase{ + { + name: "single-exported-span", + inputs: []testInput{ + { + input: []rangeKeyAndTS{ + {key: "a", endKey: "b", timestamp: 10}, + {key: "b", endKey: "c", timestamp: 9}, + }, + }, + }, + }, + { + name: "multiple-exported-spans", + inputs: []testInput{ + { + input: []rangeKeyAndTS{ + {key: "a", endKey: "b", timestamp: 10}, + {key: "b", endKey: "c", timestamp: 9}, + }, + }, + { + input: []rangeKeyAndTS{ + {key: "c", endKey: "d", timestamp: 10}, + {key: "c", endKey: "d", timestamp: 9}, + }, + }, + { + input: []rangeKeyAndTS{ + {key: "c", endKey: "d", timestamp: 8}, + }, + }, + }, + }, + { + name: "out-of-order-range", + inputs: []testInput{ + { + input: []rangeKeyAndTS{ + {key: "a", endKey: "b", timestamp: 10}, + {key: "b", endKey: "d", timestamp: 9}, + }, + }, + { + input: []rangeKeyAndTS{ + {key: "a", endKey: "c", timestamp: 8}, + {key: "c", endKey: "e", timestamp: 7}, + }, + expectErr: "spans must be added in order", + }, + }, + }, + { + name: "out-of-order-timestamp", + inputs: []testInput{ + { + input: []rangeKeyAndTS{ + {key: "a", endKey: "b", timestamp: 10}, + {key: "b", endKey: "d", timestamp: 9}, + }, + }, + { + input: []rangeKeyAndTS{ + {key: "b", endKey: "d", timestamp: 11}, + {key: "c", endKey: "e", timestamp: 7}, + }, + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + buf := &bytes.Buffer{} + sst := storage.MakeBackupSSTWriter(ctx, settings, buf) + sink := fileSSTSink{sst: sst} + compareSST := true + + for _, input := range tt.inputs { + rangeKeys := input.input + // Add some point key values in the input as well. + es := newExportedSpanBuilder(rangeKeys[0].key, rangeKeys[len(rangeKeys)-1].key, false). + withRangeKeys(rangeKeys). + withKVs([]kvAndTS{{rangeKeys[0].key, nil, rangeKeys[0].timestamp}}). + build() + err := sink.copyRangeKeys(es.dataSST) + if input.expectErr != "" { + // Do not compare resulting SSTs if we expect errors. + require.ErrorContains(t, err, input.expectErr) + compareSST = false + } else { + require.NoError(t, err) + } + } + + sst.Close() + + if !compareSST { + return + } + + expected := make(map[int64]*roachpb.SpanGroup) + for _, input := range tt.inputs { + for _, rk := range input.input { + if expected[rk.timestamp] == nil { + expected[rk.timestamp] = &roachpb.SpanGroup{} + } + + expected[rk.timestamp].Add(rk.span()) + } + } + + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + + iter, err := storage.NewMemSSTIterator(buf.Bytes(), false, iterOpts) + if err != nil { + t.Fatal(err) + } + + defer iter.Close() + + var actual []rangeKeyAndTS + for iter.SeekGE(storage.MVCCKey{}); ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + break + } + + rangeKeys := iter.RangeKeys() + for _, v := range rangeKeys.Versions { + rk := rangeKeyAndTS{ + key: string(rangeKeys.Bounds.Key), + endKey: string(rangeKeys.Bounds.EndKey), + timestamp: v.Timestamp.WallTime, + } + actual = append(actual, rk) + + } + } + + for _, rk := range actual { + sp := rk.span() + if !expected[rk.timestamp].Encloses(sp) { + t.Fatalf("expected to copy range %v at timestamp %d", sp, rk.timestamp) + } + expected[rk.timestamp].Sub(sp) + } + + for ts, sg := range expected { + for _, missing := range sg.Slice() { + t.Fatalf("expected range %v at timestamp %d to be copied", missing, ts) + } + } + }) + } +} + +type kvAndTS struct { + key string + value []byte + timestamp int64 +} + +type rangeKeyAndTS struct { + key string + endKey string + timestamp int64 +} + +func (rk rangeKeyAndTS) span() roachpb.Span { + return roachpb.Span{ + Key: []byte(rk.key), + EndKey: []byte(rk.endKey), + } +} + +type exportedSpanBuilder struct { + es *exportedSpan + keyValues []kvAndTS + rangeKeys []rangeKeyAndTS +} + +func newExportedSpanBuilder(spanStart, spanEnd string, atKeyBoundary bool) *exportedSpanBuilder { + return &exportedSpanBuilder{ + es: &exportedSpan{ + metadata: backuppb.BackupManifest_File{ + Span: roachpb.Span{ + Key: []byte(spanStart), + EndKey: []byte(spanEnd), + }, + EntryCounts: roachpb.RowCount{ + DataSize: 1, + Rows: 1, + IndexEntries: 0, + }, + }, + + completedSpans: 1, + atKeyBoundary: atKeyBoundary, + }, + } +} + +func (b *exportedSpanBuilder) withKVs(keyValues []kvAndTS) *exportedSpanBuilder { + b.keyValues = keyValues + return b +} + +func (b *exportedSpanBuilder) withRangeKeys(rangeKeys []rangeKeyAndTS) *exportedSpanBuilder { + b.rangeKeys = rangeKeys + return b +} + +func (b *exportedSpanBuilder) withStartTime(time int64) *exportedSpanBuilder { + b.es.metadata.StartTime = hlc.Timestamp{WallTime: time} + return b +} + +func (b *exportedSpanBuilder) withEndTime(time int64) *exportedSpanBuilder { + b.es.metadata.EndTime = hlc.Timestamp{WallTime: time} + return b +} + +func (b *exportedSpanBuilder) withRevStartTime(time int64) *exportedSpanBuilder { + b.es.revStart = hlc.Timestamp{WallTime: time} + return b +} + +func (b *exportedSpanBuilder) build() exportedSpan { + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + buf := &bytes.Buffer{} + sst := storage.MakeBackupSSTWriter(ctx, settings, buf) + for _, d := range b.keyValues { + err := sst.Put(storage.MVCCKey{ + Key: []byte(d.key), + Timestamp: hlc.Timestamp{WallTime: d.timestamp}, + }, d.value) + if err != nil { + panic(err) + } + } + + for _, d := range b.rangeKeys { + err := sst.PutMVCCRangeKey(storage.MVCCRangeKey{ + Timestamp: hlc.Timestamp{WallTime: d.timestamp}, + StartKey: []byte(d.key), + EndKey: []byte(d.endKey), + }, storage.MVCCValue{}) + + if err != nil { + panic(err) + } + } + + sst.Close() + + b.es.dataSST = buf.Bytes() + + return *b.es +} + +func fileSSTSinkTestSetUp( + ctx context.Context, t *testing.T, tc *testcluster.TestCluster, sqlDB *sqlutils.SQLRunner, +) (*fileSSTSink, cloud.ExternalStorage) { + store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///0", + base.ExternalIODirConfig{}, + tc.Servers[0].ClusterSettings(), + blobs.TestEmptyBlobClientFactory, + username.RootUserName(), + tc.Servers[0].InternalDB().(isql.DB), + nil, /* limiters */ + cloud.NilMetrics, + ) + require.NoError(t, err) + + // Never block. + progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, 100) + + sinkConf := sstSinkConf{ + id: 1, + enc: nil, + progCh: progCh, + settings: &tc.Servers[0].ClusterSettings().SV, + } + + sink := makeFileSSTSink(sinkConf, store) + return sink, store +} + +func checkFiles( + ctx context.Context, + store cloud.ExternalStorage, + files []backuppb.BackupManifest_File, + expectedFileSpans []roachpb.Spans, +) error { + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + + var filePaths []string + filePathToSpans := make(map[string]roachpb.Spans) + for _, f := range files { + if _, ok := filePathToSpans[f.Path]; !ok { + filePaths = append(filePaths, f.Path) + } + filePathToSpans[f.Path] = append(filePathToSpans[f.Path], f.Span) + } + + // First, check that we got the expected file spans. + if len(expectedFileSpans) != len(filePaths) { + return errors.Newf("expected %d files, got %d", len(expectedFileSpans), len(filePaths)) + } + + for i := range expectedFileSpans { + actualSpans := filePathToSpans[filePaths[i]] + + if !reflect.DeepEqual(expectedFileSpans[i], actualSpans) { + return errors.Newf("expected file at idx %d to have spans %v, got %v", i, expectedFileSpans[i], actualSpans) + } + } + + // Also check that all keys within the flushed files fall within the + // manifest file metadata spans that point to the file. + for f, spans := range filePathToSpans { + iter, err := storageccl.ExternalSSTReader(ctx, []storageccl.StoreFile{{Store: store, FilePath: f}}, nil, iterOpts) + if err != nil { + return err + } + + defer iter.Close() + for iter.SeekGE(storage.MVCCKey{}); ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return err + } else if !ok { + break + } + + key := iter.UnsafeKey() + + if !endKeyInclusiveSpansContainsKey(spans, key.Key) { + return errors.Newf("key %v in file %s not contained by its spans [%v]", key.Key, f, spans) + } + } + + } + + return nil +} + +func endKeyInclusiveSpansContainsKey(spans roachpb.Spans, key roachpb.Key) bool { + for _, sp := range spans { + if sp.ContainsKey(key) { + return true + } + if sp.EndKey.Compare(key) == 0 { + return true + } + } + + return false +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index e5ff52bc9206..105fea2eff1a 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -531,6 +531,11 @@ func (r *restoreResumer) ForceRealSpan() bool { return true } +// DumpTraceAfterRun implements the TraceableJob interface. +func (r *restoreResumer) DumpTraceAfterRun() bool { + return true +} + // remapAndFilterRelevantStatistics changes the table ID references in // the stats from those they had in the backed up database to what // they should be in the restored database. diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 9bec478574fb..b104870436cd 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -324,6 +324,7 @@ go_test( "@org_golang_google_api//option", "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//credentials/insecure", + "@org_golang_x_exp//slices", "@org_golang_x_text//collate", ], ) diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 1d26f0ca9421..b3338b640866 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -116,7 +116,8 @@ func newParquetWriterFromRow( return nil, err } } - writer, err := newParquetWriter(schemaDef, sink, opts...) + writer, err := parquet.NewWriter(schemaDef, sink, opts...) + if err != nil { return nil, err } @@ -324,17 +325,3 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error } return orderedKeys, m, nil } - -// newParquetWriter allocates a new parquet writer using the provided -// schema definition. -func newParquetWriter( - sch *parquet.SchemaDefinition, sink io.Writer, opts ...parquet.Option, -) (*parquet.Writer, error) { - if includeParquestTestMetadata { - // To use parquet test utils for reading datums, the writer needs to be - // configured with additional metadata. - return parquet.NewWriterWithReaderMeta(sch, sink, opts...) - } - - return parquet.NewWriter(sch, sink, opts...) -} diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index 007182be03a2..170b862e1381 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -148,7 +148,7 @@ func (parquetSink *parquetCloudStorageSink) EmitResolvedTimestamp( // TODO: Ideally, we do not create a new schema and writer every time // we emit a resolved timestamp. Currently, util/parquet does not support it. - writer, err := newParquetWriter(sch, &buf) + writer, err := parquet.NewWriter(sch, &buf) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index f4310019e073..c545fed3b3fd 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -33,8 +34,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" ) +// TestParquetRows tests that the parquetWriter correctly writes datums. It does +// this by setting up a rangefeed on a table wih data and verifying the writer +// writes the correct datums the parquet file. func TestParquetRows(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -62,11 +67,13 @@ func TestParquetRows(t *testing.T) { }{ { testName: "mixed", - createTable: `CREATE TABLE foo ( - int32Col INT4 PRIMARY KEY, - stringCol STRING , - uuidCol UUID - )`, + createTable: ` + CREATE TABLE foo ( + int32Col INT4 PRIMARY KEY, + stringCol STRING, + uuidCol UUID + ) + `, stmts: []string{ `INSERT INTO foo VALUES (0, 'a1', '2fec7a4b-0a78-40ce-92e0-d1c0fac70436')`, `INSERT INTO foo VALUES (1, 'b1', '0ce43188-e4a9-4b73-803b-a253abc57e6b')`, @@ -152,7 +159,24 @@ func TestParquetRows(t *testing.T) { err = writer.close() require.NoError(t, err) - parquet.ReadFileAndVerifyDatums(t, f.Name(), numRows, numCols, writer.inner, datums) + meta, readDatums, err := parquet.ReadFile(f.Name()) + require.NoError(t, err) + require.Equal(t, meta.NumRows, numRows) + require.Equal(t, meta.NumCols, numCols) + // NB: Rangefeeds have per-key ordering, so the rows in the parquet + // file may not match the order we insert them. To accommodate for + // this, sort the expected and actual datums by the primary key. + slices.SortFunc(datums, func(a []tree.Datum, b []tree.Datum) bool { + return a[0].Compare(&eval.Context{}, b[0]) == -1 + }) + slices.SortFunc(readDatums, func(a []tree.Datum, b []tree.Datum) bool { + return a[0].Compare(&eval.Context{}, b[0]) == -1 + }) + for r := 0; r < numRows; r++ { + for c := 0; c < numCols; c++ { + parquet.ValidateDatum(t, datums[r][c], readDatums[r][c]) + } + } }) } } @@ -175,6 +199,8 @@ func makeRangefeedReaderAndDecoder( return popRow, cleanup, decoder } +// TestParquetResolvedTimestamps runs tests a changefeed with format=parquet and +// resolved timestamps enabled. func TestParquetResolvedTimestamps(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 015891f09184..bc3f3725940d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -734,8 +734,10 @@ func (c *cutoverProgressTracker) onCompletedCallback( return nil } -func (s *streamIngestionResumer) ForceRealSpan() bool { return true } +func (s *streamIngestionResumer) ForceRealSpan() bool { return true } +func (s *streamIngestionResumer) DumpTraceAfterRun() bool { return true } +var _ jobs.TraceableJob = &streamIngestionResumer{} var _ jobs.Resumer = &streamIngestionResumer{} func init() { diff --git a/pkg/cli/debug_job_trace_test.go b/pkg/cli/debug_job_trace_test.go index 86cee3db0155..dc505437ae67 100644 --- a/pkg/cli/debug_job_trace_test.go +++ b/pkg/cli/debug_job_trace_test.go @@ -45,6 +45,10 @@ func (r *traceSpanResumer) ForceRealSpan() bool { return true } +func (r *traceSpanResumer) DumpTraceAfterRun() bool { + return true +} + type traceSpanResumer struct { ctx context.Context recordedSpanCh chan struct{} diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 13f387fb12be..9b288826a91e 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "adopt.go", "config.go", "errors.go", + "execution_detail_utils.go", "executor_impl.go", "helpers.go", "job_info_storage.go", @@ -31,6 +32,7 @@ go_library( "//pkg/base", "//pkg/clusterversion", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprofiler/profilerconstants", "//pkg/kv", "//pkg/kv/kvpb", "//pkg/multitenant", @@ -75,6 +77,7 @@ go_library( "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//jsonpb", "@com_github_gogo_protobuf//types", + "@com_github_klauspost_compress//gzip", "@com_github_prometheus_client_model//go", "@com_github_robfig_cron_v3//:cron", "@io_opentelemetry_go_otel//attribute", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 5dbdb33b0425..24c6743b1d56 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -13,7 +13,6 @@ package jobs import ( "context" "fmt" - "strconv" "sync" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -27,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" @@ -59,36 +59,39 @@ const ( RETURNING id;` ) -func (r *Registry) maybeDumpTrace( - resumerCtx context.Context, resumer Resumer, jobID, traceID int64, jobErr error, -) { - if _, ok := resumer.(TraceableJob); !ok || r.td == nil { - return - } - dumpMode := traceableJobDumpTraceMode.Get(&r.settings.SV) - if dumpMode == int64(noDump) { +// maybeDumpTrace will conditionally persist the trace recording of the job's +// current resumer for consumption by job profiler tools. This method must be +// invoked before the tracing span corresponding to the job's current resumer is +// Finish()'ed. +func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, jobID jobspb.JobID) { + if tj, ok := resumer.(TraceableJob); !ok || !tj.DumpTraceAfterRun() { return } // Make a new ctx to use in the trace dumper. This is because the resumerCtx // could have been canceled at this point. dumpCtx, _ := r.makeCtx() + sp := tracing.SpanFromContext(resumerCtx) + if sp == nil || sp.IsNoop() { + // Should never be true since TraceableJobs force real tracing spans to be + // attached to the context. + return + } - ieNotBoundToTxn := r.db.Executor() - - // If the job has failed, and the dump mode is set to anything - // except noDump, then we should dump the trace. - // The string comparison is unfortunate but is used to differentiate a job - // that has failed from a job that has been canceled. - if jobErr != nil && !HasErrJobCanceled(jobErr) && resumerCtx.Err() == nil { - r.td.Dump(dumpCtx, strconv.Itoa(int(jobID)), traceID, ieNotBoundToTxn) + if !r.settings.Version.IsActive(dumpCtx, clusterversion.V23_1) { return } - // If the dump mode is set to `dumpOnStop` then we should dump the - // trace when the job is any of paused, canceled, succeeded or failed state. - if dumpMode == int64(dumpOnStop) { - r.td.Dump(dumpCtx, strconv.Itoa(int(jobID)), traceID, ieNotBoundToTxn) + resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.txt", + r.ID().String(), timeutil.Now().Format("20060102_150405.00")) + td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()} + b, err := protoutil.Marshal(&td) + if err != nil { + return + } + if err := WriteExecutionDetailFile(dumpCtx, resumerTraceFilename, b, r.db, jobID); err != nil { + log.Warning(dumpCtx, "failed to write trace on resumer trace file") + return } } @@ -497,7 +500,7 @@ func (r *Registry) runJob( // and further updates to the job record from this node may // fail. r.maybeClearLease(job, err) - r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) + r.maybeDumpTrace(ctx, resumer, job.ID()) if r.knobs.AfterJobStateMachine != nil { r.knobs.AfterJobStateMachine() } diff --git a/pkg/sql/jobs_profiler_bundle.go b/pkg/jobs/execution_detail_utils.go similarity index 50% rename from pkg/sql/jobs_profiler_bundle.go rename to pkg/jobs/execution_detail_utils.go index f2dfee1342e4..7505b852f147 100644 --- a/pkg/sql/jobs_profiler_bundle.go +++ b/pkg/jobs/execution_detail_utils.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package sql +package jobs import ( "bytes" @@ -17,16 +17,9 @@ import ( "io" "strings" - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" - "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/klauspost/compress/gzip" ) @@ -34,31 +27,6 @@ import ( const bundleChunkSize = 1 << 20 // 1 MiB const finalChunkSuffix = "#_final" -// RequestExecutionDetails implements the JobProfiler interface. -func (p *planner) RequestExecutionDetails(ctx context.Context, jobID jobspb.JobID) error { - execCfg := p.ExecCfg() - if !execCfg.Settings.Version.IsActive(ctx, clusterversion.V23_1) { - return errors.Newf("execution details can only be requested on a cluster with version >= %s", - clusterversion.V23_1.String()) - } - - e := MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID) - // TODO(adityamaru): When we start collecting more information we can consider - // parallelize the collection of the various pieces. - e.addDistSQLDiagram(ctx) - e.addLabelledGoroutines(ctx) - - return nil -} - -// ExecutionDetailsBuilder can be used to read and write execution details corresponding -// to a job. -type ExecutionDetailsBuilder struct { - srv serverpb.SQLStatusServer - db isql.DB - jobID jobspb.JobID -} - func compressChunk(chunkBuf []byte) ([]byte, error) { gzipBuf := bytes.NewBuffer([]byte{}) gz := gzip.NewWriter(gzipBuf) @@ -71,15 +39,15 @@ func compressChunk(chunkBuf []byte) ([]byte, error) { return gzipBuf.Bytes(), nil } -// WriteExecutionDetail will break up data into chunks of a fixed size, and +// WriteExecutionDetailFile will break up data into chunks of a fixed size, and // gzip compress them before writing them to the job_info table. -func (e *ExecutionDetailsBuilder) WriteExecutionDetail( - ctx context.Context, filename string, data []byte, +func WriteExecutionDetailFile( + ctx context.Context, filename string, data []byte, db isql.DB, jobID jobspb.JobID, ) error { - return e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { // Take a copy of the data to operate on inside the txn closure. chunkData := data[:] - jobInfo := jobs.InfoStorageForJob(txn, e.jobID) + jobInfo := InfoStorageForJob(txn, jobID) var chunkCounter int var chunkName string @@ -114,19 +82,19 @@ func (e *ExecutionDetailsBuilder) WriteExecutionDetail( }) } -// ReadExecutionDetail will stitch together all the chunks corresponding to the +// ReadExecutionDetailFile will stitch together all the chunks corresponding to the // filename and return the uncompressed data of the file. -func (e *ExecutionDetailsBuilder) ReadExecutionDetail( - ctx context.Context, filename string, +func ReadExecutionDetailFile( + ctx context.Context, filename string, db isql.DB, jobID jobspb.JobID, ) ([]byte, error) { // TODO(adityamaru): If filename=all add logic to zip up all the files corresponding // to the job's execution details and return the zipped bundle instead. buf := bytes.NewBuffer([]byte{}) - if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { // Reset the buf inside the txn closure to guard against txn retries. buf.Reset() - jobInfo := jobs.InfoStorageForJob(txn, e.jobID) + jobInfo := InfoStorageForJob(txn, jobID) // Iterate over all the chunks of the requested file and return the unzipped // chunks of data. @@ -161,10 +129,12 @@ func (e *ExecutionDetailsBuilder) ReadExecutionDetail( // ListExecutionDetailFiles lists all the files that have been generated as part // of a job's execution details. -func (e *ExecutionDetailsBuilder) ListExecutionDetailFiles(ctx context.Context) ([]string, error) { +func ListExecutionDetailFiles( + ctx context.Context, db isql.DB, jobID jobspb.JobID, +) ([]string, error) { var res []string - if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - jobInfo := jobs.InfoStorageForJob(txn, e.jobID) + if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + jobInfo := InfoStorageForJob(txn, jobID) // Iterate over all the files that have been stored as part of the job's // execution details. @@ -187,53 +157,3 @@ func (e *ExecutionDetailsBuilder) ListExecutionDetailFiles(ctx context.Context) return res, nil } - -// MakeJobProfilerExecutionDetailsBuilder returns an instance of an ExecutionDetailsBuilder. -func MakeJobProfilerExecutionDetailsBuilder( - srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, -) ExecutionDetailsBuilder { - e := ExecutionDetailsBuilder{ - srv: srv, db: db, jobID: jobID, - } - return e -} - -// addLabelledGoroutines collects and persists goroutines from all nodes in the -// cluster that have a pprof label tying it to the job whose execution details -// are being collected. -func (e *ExecutionDetailsBuilder) addLabelledGoroutines(ctx context.Context) { - profileRequest := serverpb.ProfileRequest{ - NodeId: "all", - Type: serverpb.ProfileRequest_GOROUTINE, - Labels: true, - LabelFilter: fmt.Sprintf("%d", e.jobID), - } - resp, err := e.srv.Profile(ctx, &profileRequest) - if err != nil { - log.Errorf(ctx, "failed to collect goroutines for job %d: %+v", e.jobID, err.Error()) - return - } - filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00")) - if err := e.WriteExecutionDetail(ctx, filename, resp.Data); err != nil { - log.Errorf(ctx, "failed to write goroutine for job %d: %+v", e.jobID, err.Error()) - } -} - -// addDistSQLDiagram generates and persists a `distsql..html` file. -func (e *ExecutionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { - query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]` - row, err := e.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */ - sessiondata.NoSessionDataOverride, query, e.jobID) - if err != nil { - log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error()) - return - } - if row != nil && row[0] != tree.DNull { - dspDiagramURL := string(tree.MustBeDString(row[0])) - filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00")) - if err := e.WriteExecutionDetail(ctx, filename, - []byte(fmt.Sprintf(``, dspDiagramURL))); err != nil { - log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error()) - } - } -} diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index db478d35c817..8ecf081ee8eb 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -32,6 +32,10 @@ func (d FakeResumer) ForceRealSpan() bool { return d.TraceRealSpan } +func (d FakeResumer) DumpTraceAfterRun() bool { + return true +} + var _ Resumer = FakeResumer{} func (d FakeResumer) Resume(ctx context.Context, execCtx interface{}) error { diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 38230e2382d9..99d362c90d5a 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -44,34 +43,6 @@ import ( "github.com/gogo/protobuf/jsonpb" ) -// jobDumpTraceMode is the type that represents the mode in which a traceable -// job will dump a trace zip. -type jobDumpTraceMode int64 - -const ( - // A Traceable job will not dump a trace zip. - noDump jobDumpTraceMode = iota - // A Traceable job will dump a trace zip on failure. - dumpOnFail - // A Traceable job will dump a trace zip in any of paused, canceled, failed, - // succeeded states. - dumpOnStop -) - -var traceableJobDumpTraceMode = settings.RegisterEnumSetting( - settings.TenantWritable, - "jobs.trace.force_dump_mode", - "determines the state in which all traceable jobs will dump their cluster wide, inflight, "+ - "trace recordings. Traces may be dumped never, on fail, "+ - "or on any status change i.e paused, canceled, failed, succeeded.", - "never", - map[int64]string{ - int64(noDump): "never", - int64(dumpOnFail): "onFail", - int64(dumpOnStop): "onStop", - }, -) - // Job manages logging the progress of long-running system processes, like // backups and restores, to the system.jobs table. type Job struct { @@ -159,6 +130,9 @@ type TraceableJob interface { // ForceRealSpan forces the registry to create a real Span instead of a // low-overhead non-recordable noop span. ForceRealSpan() bool + // DumpTraceAfterRun determines whether the job's trace is dumped to disk at + // the end of every adoption. + DumpTraceAfterRun() bool } func init() { diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index b82a38206a7f..4fbac3945117 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -11,15 +11,12 @@ package jobs_test import ( - "archive/zip" "context" gosql "database/sql" "fmt" - "os" "path/filepath" "reflect" "runtime/pprof" - "sort" "strings" "sync" "sync/atomic" @@ -1031,6 +1028,7 @@ func TestRegistryLifecycle(t *testing.T) { }) t.Run("dump traces on pause-unpause-success", func(t *testing.T) { + ctx := context.Background() completeCh := make(chan struct{}) rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() { completeCh <- struct{}{} @@ -1053,7 +1051,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.check(t, jobs.StatusPaused) <-completeCh - checkTraceFiles(t, rts.registry, expectedNumFiles) + checkTraceFiles(ctx, t, expectedNumFiles, j.ID(), rts.s) rts.sqlDB.Exec(t, "RESUME JOB $1", j.ID()) @@ -1067,19 +1065,13 @@ func TestRegistryLifecycle(t *testing.T) { rts.check(t, jobs.StatusSucceeded) <-completeCh - checkTraceFiles(t, rts.registry, expectedNumFiles) + checkTraceFiles(ctx, t, expectedNumFiles+1, j.ID(), rts.s) } - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='never'`) - pauseUnpauseJob(0) - - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onFail'`) - pauseUnpauseJob(0) - - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) pauseUnpauseJob(1) }) t.Run("dump traces on fail", func(t *testing.T) { + ctx := context.Background() completeCh := make(chan struct{}) rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() { completeCh <- struct{}{} @@ -1088,7 +1080,7 @@ func TestRegistryLifecycle(t *testing.T) { defer rts.tearDown() runJobAndFail := func(expectedNumFiles int) { - j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.idb(), rts.mockJob) + j, err := jobs.TestingCreateAndStartJob(ctx, rts.registry, rts.idb(), rts.mockJob) if err != nil { t.Fatal(err) } @@ -1109,16 +1101,9 @@ func TestRegistryLifecycle(t *testing.T) { rts.check(t, jobs.StatusFailed) <-completeCh - checkTraceFiles(t, rts.registry, expectedNumFiles) + checkTraceFiles(ctx, t, expectedNumFiles, j.ID(), rts.s) } - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='never'`) - runJobAndFail(0) - - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onFail'`) - runJobAndFail(1) - - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) runJobAndFail(1) }) @@ -1129,7 +1114,6 @@ func TestRegistryLifecycle(t *testing.T) { }} rts.setUp(t) defer rts.tearDown() - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.idb(), rts.mockJob) if err != nil { t.Fatal(err) @@ -1143,7 +1127,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID()) <-completeCh - checkTraceFiles(t, rts.registry, 1) + checkTraceFiles(rts.ctx, t, 1, j.ID(), rts.s) rts.mu.e.OnFailOrCancelStart = true rts.check(t, jobs.StatusReverting) @@ -1197,28 +1181,29 @@ func TestRegistryLifecycle(t *testing.T) { }) } -func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int) { +func checkTraceFiles( + ctx context.Context, + t *testing.T, + expectedNumFiles int, + jobID jobspb.JobID, + s serverutils.TestServerInterface, +) { t.Helper() - // Check the configured inflight trace dir for dumped zip files. - expList := []string{"node1-trace.txt", "node1-jaeger.json"} - traceDumpDir := jobs.TestingGetTraceDumpDir(registry) - files := make([]string, 0) - require.NoError(t, filepath.Walk(traceDumpDir, func(path string, info os.FileInfo, err error) error { - if info.IsDir() { - return nil - } - files = append(files, path) - return nil - })) - require.Equal(t, expectedNumFiles, len(files)) - for _, file := range files { - checkBundle(t, file, expList) - } + recordings := make([]jobspb.TraceData, 0) + execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig) + edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID) + require.NoError(t, err) - // Cleanup files for next iteration of the test. - for _, file := range files { - require.NoError(t, os.Remove(file)) + for _, f := range edFiles { + data, err := jobs.ReadExecutionDetailFile(ctx, f, execCfg.InternalDB, jobID) + require.NoError(t, err) + td := jobspb.TraceData{} + require.NoError(t, protoutil.Unmarshal(data, &td)) + recordings = append(recordings, td) + } + if len(recordings) != expectedNumFiles { + t.Fatalf("expected %d entries but found %d", expectedNumFiles, len(recordings)) } } @@ -3151,25 +3136,6 @@ func TestLoseLeaseDuringExecution(t *testing.T) { require.Regexp(t, `expected session "\w+" but found NULL`, <-resumed) } -func checkBundle(t *testing.T, zipFile string, expectedFiles []string) { - t.Helper() - r, err := zip.OpenReader(zipFile) - defer func() { _ = r.Close() }() - require.NoError(t, err) - - // Make sure the bundle contains the expected list of files. - filesInZip := make([]string, 0) - for _, f := range r.File { - if f.UncompressedSize64 == 0 { - t.Fatalf("file %s is empty", f.Name) - } - filesInZip = append(filesInZip, f.Name) - } - sort.Strings(filesInZip) - sort.Strings(expectedFiles) - require.Equal(t, expectedFiles, filesInZip) -} - type resumeStartedSignaler struct { syncutil.Mutex cond *sync.Cond diff --git a/pkg/jobs/jobspb/BUILD.bazel b/pkg/jobs/jobspb/BUILD.bazel index 0c19e1ab7ac8..53607669ae8d 100644 --- a/pkg/jobs/jobspb/BUILD.bazel +++ b/pkg/jobs/jobspb/BUILD.bazel @@ -35,6 +35,7 @@ proto_library( "//pkg/sql/catalog/descpb:descpb_proto", "//pkg/sql/sessiondatapb:sessiondatapb_proto", "//pkg/util/hlc:hlc_proto", + "//pkg/util/tracing/tracingpb:tracingpb_proto", "@com_github_cockroachdb_errors//errorspb:errorspb_proto", "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:any_proto", diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index eecb95fa708a..452c869e9f7e 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -25,6 +25,7 @@ import "util/hlc/timestamp.proto"; import "clusterversion/cluster_version.proto"; import "google/protobuf/timestamp.proto"; import "server/autoconfig/autoconfigpb/autoconfig.proto"; +import "util/tracing/tracingpb/recorded_span.proto"; enum EncryptionMode { Passphrase = 0; @@ -1414,3 +1415,7 @@ message RetriableExecutionFailure { // some information will be preserved. string truncated_error = 6; } + +message TraceData { + repeated util.tracing.tracingpb.RecordedSpan collected_spans = 1 [(gogoproto.nullable) = false]; +} diff --git a/pkg/jobs/jobsprofiler/BUILD.bazel b/pkg/jobs/jobsprofiler/BUILD.bazel index 6b345922d7ac..c14958351927 100644 --- a/pkg/jobs/jobsprofiler/BUILD.bazel +++ b/pkg/jobs/jobsprofiler/BUILD.bazel @@ -39,10 +39,13 @@ go_test( "//pkg/sql/execinfrapb", "//pkg/sql/isql", "//pkg/testutils", + "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/jobs/jobsprofiler/profiler.go b/pkg/jobs/jobsprofiler/profiler.go index 43c37151a3ae..fd365d17964c 100644 --- a/pkg/jobs/jobsprofiler/profiler.go +++ b/pkg/jobs/jobsprofiler/profiler.go @@ -48,7 +48,10 @@ func StorePlanDiagram( infoStorage := jobs.InfoStorageForJob(txn, jobID) return infoStorage.Write(ctx, dspKey, []byte(diagURL.String())) }) - if err != nil { + // Don't log the error if the context has been canceled. This will likely be + // when the node is shutting down and so it doesn't add value to spam the + // logs with the error. + if err != nil && ctx.Err() == nil { log.Warningf(ctx, "failed to generate and write DistSQL diagram for job %d: %v", jobID, err.Error()) } diff --git a/pkg/jobs/jobsprofiler/profiler_test.go b/pkg/jobs/jobsprofiler/profiler_test.go index 501717164d0f..b731c822b7c9 100644 --- a/pkg/jobs/jobsprofiler/profiler_test.go +++ b/pkg/jobs/jobsprofiler/profiler_test.go @@ -27,9 +27,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -174,3 +177,70 @@ func TestStorePerNodeProcessorProgressFraction(t *testing.T) { } } } + +func TestTraceRecordingOnResumerCompletion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + _, err := sqlDB.Exec(`CREATE DATABASE test`) + require.NoError(t, err) + _, err = sqlDB.Exec(`CREATE TABLE foo (id INT PRIMARY KEY)`) + require.NoError(t, err) + _, err = sqlDB.Exec(`INSERT INTO foo VALUES (1), (2)`) + require.NoError(t, err) + _, err = sqlDB.Exec(`SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'`) + require.NoError(t, err) + + var jobID int + err = sqlDB.QueryRow(`BACKUP INTO 'userfile:///foo' WITH detached`).Scan(&jobID) + require.NoError(t, err) + runner := sqlutils.MakeSQLRunner(sqlDB) + jobutils.WaitForJobToPause(t, runner, jobspb.JobID(jobID)) + + _, err = sqlDB.Exec(`SET CLUSTER SETTING jobs.debug.pausepoints = ''`) + require.NoError(t, err) + + runner.Exec(t, `RESUME JOB $1`, jobID) + jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(jobID)) + + // At this point there should have been two resumers, and so we expect two + // trace recordings. + testutils.SucceedsSoon(t, func() error { + recordings := make([]jobspb.TraceData, 0) + execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig) + edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID)) + if err != nil { + return err + } + var traceFiles []string + for _, f := range edFiles { + if strings.Contains(f, "resumer-trace") { + traceFiles = append(traceFiles, f) + } + } + + for _, f := range traceFiles { + data, err := jobs.ReadExecutionDetailFile(ctx, f, execCfg.InternalDB, jobspb.JobID(jobID)) + if err != nil { + return err + } + td := jobspb.TraceData{} + if err := protoutil.Unmarshal(data, &td); err != nil { + return err + } + recordings = append(recordings, td) + } + if len(recordings) != 2 { + return errors.Newf("expected 2 entries but found %d", len(recordings)) + } + return nil + }) +} diff --git a/pkg/jobs/jobsprofiler/profilerconstants/constants.go b/pkg/jobs/jobsprofiler/profilerconstants/constants.go index 32623fcec04e..e3c314c16012 100644 --- a/pkg/jobs/jobsprofiler/profilerconstants/constants.go +++ b/pkg/jobs/jobsprofiler/profilerconstants/constants.go @@ -37,6 +37,14 @@ func MakeNodeProcessorProgressInfoKey(flowID string, instanceID string, processo return fmt.Sprintf("%s%s,%s,%d", NodeProcessorProgressInfoKeyPrefix, flowID, instanceID, processorID) } +const ResumerTraceInfoKeyPrefix = "~resumer-trace-" + +// MakeResumerTraceInfoKey returns the info_key used for rows that store the +// traces on completion of a resumer's execution. +func MakeResumerTraceInfoKey(traceID uint64, nodeID string) string { + return fmt.Sprintf("%s%d-%s", ResumerTraceInfoKeyPrefix, traceID, nodeID) +} + // ExecutionDetailsChunkKeyPrefix is the prefix of the info key used for rows that // store chunks of a job's execution details. const ExecutionDetailsChunkKeyPrefix = "~profiler/" diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 00409740a6f1..4dce434da100 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -7030,3 +7031,65 @@ func TestStoreMetricsOnIncomingOutgoingMsg(t *testing.T) { require.Equal(t, expected, actual) }) } + +// TestInvalidConfChangeRejection is a regression test for [1]. It proposes +// an (intentionally) invalid configuration change and makes sure that raft +// does not drop it. +// +// [1]: https://github.com/cockroachdb/cockroach/issues/105797 +func TestInvalidConfChangeRejection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // This is a regression test against a stuck command, so set a timeout to get + // a shot at a graceful failure on regression. + ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) + defer cancel() + + // When our configuration change shows up below raft, we need to apply it as a + // no-op, since the config change is intentionally invalid and assertions + // would fail if we were to try to actually apply it. + injErr := errors.New("injected error") + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + TestingApplyCalledTwiceFilter: func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) { + if args.Req != nil && args.Req.Txn != nil && args.Req.Txn.Name == "fake" { + return 0, kvpb.NewError(injErr) + } + return 0, nil + }}}}, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + + repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(keys.MustAddr(k)) + + // Try to leave a joint config even though we're not in one. This is something + // that will lead raft to propose an empty entry instead of our conf change. + // + // See: https://github.com/cockroachdb/cockroach/issues/105797 + var ba kvpb.BatchRequest + now := tc.Server(0).Clock().Now() + txn := roachpb.MakeTransaction("fake", k, isolation.Serializable, roachpb.NormalUserPriority, now, 500*time.Millisecond.Nanoseconds(), 1) + ba.Txn = &txn + ba.Timestamp = now + ba.Add(&kvpb.EndTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: k, + }, + Commit: true, + InternalCommitTrigger: &roachpb.InternalCommitTrigger{ + ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{ + Desc: repl.Desc(), + }, + }, + }) + + _, pErr := repl.Send(ctx, &ba) + // Verify that we see the configuration change below raft, where we rejected it + // (since it would've otherwise blow up the Replica: after all, we intentionally + // proposed an invalid configuration change. + require.True(t, errors.Is(pErr.GoError(), injErr), "%+v", pErr.GoError()) +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 996def487bca..41aefb1f7a52 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -305,18 +305,19 @@ func newRaftConfig( logger raft.Logger, ) *raft.Config { return &raft.Config{ - ID: id, - Applied: uint64(appliedIndex), - AsyncStorageWrites: true, - ElectionTick: storeCfg.RaftElectionTimeoutTicks, - HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks, - MaxUncommittedEntriesSize: storeCfg.RaftMaxUncommittedEntriesSize, - MaxCommittedSizePerReady: storeCfg.RaftMaxCommittedSizePerReady, - MaxSizePerMsg: storeCfg.RaftMaxSizePerMsg, - MaxInflightMsgs: storeCfg.RaftMaxInflightMsgs, - MaxInflightBytes: storeCfg.RaftMaxInflightBytes, - Storage: strg, - Logger: logger, + ID: id, + Applied: uint64(appliedIndex), + AsyncStorageWrites: true, + ElectionTick: storeCfg.RaftElectionTimeoutTicks, + HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks, + MaxUncommittedEntriesSize: storeCfg.RaftMaxUncommittedEntriesSize, + MaxCommittedSizePerReady: storeCfg.RaftMaxCommittedSizePerReady, + DisableConfChangeValidation: true, // see https://github.com/cockroachdb/cockroach/issues/105797 + MaxSizePerMsg: storeCfg.RaftMaxSizePerMsg, + MaxInflightMsgs: storeCfg.RaftMaxInflightMsgs, + MaxInflightBytes: storeCfg.RaftMaxInflightBytes, + Storage: strg, + Logger: logger, PreVote: true, CheckQuorum: storeCfg.RaftEnableCheckQuorum, diff --git a/pkg/server/status.go b/pkg/server/status.go index 6949903a3e9f..8413533ac252 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -4139,8 +4139,7 @@ func (s *statusServer) GetJobProfilerExecutionDetails( jobID := jobspb.JobID(req.JobId) execCfg := s.sqlServer.execCfg - eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID) - data, err := eb.ReadExecutionDetail(ctx, req.Filename) + data, err := jobs.ReadExecutionDetailFile(ctx, req.Filename, execCfg.InternalDB, jobID) if err != nil { return nil, err } @@ -4161,8 +4160,7 @@ func (s *statusServer) ListJobProfilerExecutionDetails( jobID := jobspb.JobID(req.JobId) execCfg := s.sqlServer.execCfg - eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID) - files, err := eb.ListExecutionDetailFiles(ctx) + files, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID) if err != nil { return nil, err } diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 522c757836d6..e151a2c976cb 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -171,6 +171,7 @@ var retiredSettings = map[string]struct{}{ "sql.auth.createrole_allows_grant_role_membership.enabled": {}, "changefeed.replan_flow_frequency": {}, "changefeed.replan_flow_threshold": {}, + "jobs.trace.force_dump_mode": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 77722665b591..bc2e2e5b049d 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -141,8 +141,7 @@ go_library( "job_exec_context.go", "job_exec_context_test_util.go", "jobs_collection.go", - "jobs_execution_details.go", - "jobs_profiler_bundle.go", + "jobs_profiler_execution_details.go", "join.go", "join_predicate.go", "join_token.go", @@ -577,7 +576,6 @@ go_library( "@com_github_dustin_go_humanize//:go-humanize", "@com_github_gogo_protobuf//proto", "@com_github_gogo_protobuf//types", - "@com_github_klauspost_compress//gzip", "@com_github_lib_pq//:pq", "@com_github_lib_pq//oid", "@com_github_prometheus_client_model//go", @@ -649,8 +647,7 @@ go_test( "indexbackfiller_test.go", "instrumentation_test.go", "internal_test.go", - "jobs_execution_details_test.go", - "jobs_profiler_bundle_test.go", + "jobs_profiler_execution_details_test.go", "join_token_test.go", "main_test.go", "materialized_view_test.go", diff --git a/pkg/sql/importer/export_base.go b/pkg/sql/importer/export_base.go index 162d8b57a53d..fbf3518512da 100644 --- a/pkg/sql/importer/export_base.go +++ b/pkg/sql/importer/export_base.go @@ -35,11 +35,6 @@ var eventMemoryMultipier = settings.RegisterFloatSetting( // ExportTestingKnobs contains testing knobs for Export. type ExportTestingKnobs struct { - // EnableParquetTestMetadata makes `EXPORT INTO ` with the - // parquet format write CRDB-specific metadata that is required - // for tests to read raw data in parquet files into CRDB datums. - EnableParquetTestMetadata bool - // MemoryMonitor is a test memory monitor to report allocations to. MemoryMonitor *mon.BytesMonitor } diff --git a/pkg/sql/importer/exportparquet.go b/pkg/sql/importer/exportparquet.go index 4b483482c166..e26ce0c1564e 100644 --- a/pkg/sql/importer/exportparquet.go +++ b/pkg/sql/importer/exportparquet.go @@ -670,7 +670,7 @@ func (sp *parquetWriterProcessor) Run(ctx context.Context, output execinfra.RowR for { var rows int64 buf.Reset() - writer, err := newWriter(sch, &buf, compression, knobs) + writer, err := crlparquet.NewWriter(sch, &buf, crlparquet.WithCompressionCodec(compression)) if err != nil { return err } @@ -796,21 +796,6 @@ func (sp *parquetWriterProcessor) testingKnobsOrNil() *ExportTestingKnobs { return sp.flowCtx.TestingKnobs().Export.(*ExportTestingKnobs) } -func newWriter( - sch *crlparquet.SchemaDefinition, - buf *bytes.Buffer, - compression crlparquet.CompressionCodec, - knobs *ExportTestingKnobs, /* may be nil */ -) (*crlparquet.Writer, error) { - if knobs != nil && knobs.EnableParquetTestMetadata { - // In tests, configure the writer to add metadata to allow CRDB datums - // to be reconstructed after being written to parquet files. - return crlparquet.NewWriterWithReaderMeta(sch, buf, crlparquet.WithCompressionCodec(compression)) - } else { - return crlparquet.NewWriter(sch, buf, crlparquet.WithCompressionCodec(compression)) - } -} - func init() { rowexec.NewParquetWriterProcessor = newParquetWriterProcessor } diff --git a/pkg/sql/importer/exportparquet_test.go b/pkg/sql/importer/exportparquet_test.go index 6ffbf4fec016..f477fdb2d6e4 100644 --- a/pkg/sql/importer/exportparquet_test.go +++ b/pkg/sql/importer/exportparquet_test.go @@ -179,11 +179,6 @@ func TestRandomParquetExports(t *testing.T) { DefaultTestTenant: base.TODOTestTenantDisabled, UseDatabase: dbName, ExternalIODir: dir, - Knobs: base.TestingKnobs{ - DistSQL: &execinfra.TestingKnobs{ - Export: &importer.ExportTestingKnobs{EnableParquetTestMetadata: true}, - }, - }, }) ctx := context.Background() defer srv.Stopper().Stop(ctx) @@ -283,11 +278,6 @@ func TestBasicParquetTypes(t *testing.T) { DefaultTestTenant: base.TODOTestTenantDisabled, UseDatabase: dbName, ExternalIODir: dir, - Knobs: base.TestingKnobs{ - DistSQL: &execinfra.TestingKnobs{ - Export: &importer.ExportTestingKnobs{EnableParquetTestMetadata: true}, - }, - }, }) ctx := context.Background() defer srv.Stopper().Stop(ctx) @@ -425,8 +415,7 @@ func TestMemoryMonitor(t *testing.T) { Knobs: base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ Export: &importer.ExportTestingKnobs{ - EnableParquetTestMetadata: true, - MemoryMonitor: mm, + MemoryMonitor: mm, }, }, }, diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index d14d60c005f9..2a038ff0b9c5 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -91,6 +91,11 @@ func (r *importResumer) ForceRealSpan() bool { return true } +// DumpTraceAfterRun implements the TraceableJob interface. +func (r *importResumer) DumpTraceAfterRun() bool { + return true +} + var _ jobs.Resumer = &importResumer{} var processorsPerNode = settings.RegisterIntSetting( diff --git a/pkg/sql/jobs_execution_details_test.go b/pkg/sql/jobs_execution_details_test.go deleted file mode 100644 index 7ea0677c0c34..000000000000 --- a/pkg/sql/jobs_execution_details_test.go +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2023 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql_test - -import ( - "context" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" - "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/isql" - "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" - "github.com/cockroachdb/cockroach/pkg/sql/tests" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/require" -) - -// fakeExecResumer calls optional callbacks during the job lifecycle. -type fakeExecResumer struct { - OnResume func(context.Context) error - FailOrCancel func(context.Context) error -} - -var _ jobs.Resumer = fakeExecResumer{} - -func (d fakeExecResumer) Resume(ctx context.Context, execCtx interface{}) error { - if d.OnResume != nil { - if err := d.OnResume(ctx); err != nil { - return err - } - } - return nil -} - -func (d fakeExecResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ error) error { - if d.FailOrCancel != nil { - return d.FailOrCancel(ctx) - } - return nil -} - -// checkForPlanDiagram is a method used in tests to wait for the existence of a -// DSP diagram for the provided jobID. -func checkForPlanDiagrams( - ctx context.Context, t *testing.T, db isql.DB, jobID jobspb.JobID, expectedNumDiagrams int, -) { - testutils.SucceedsSoon(t, func() error { - return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) - var found int - err := infoStorage.Iterate(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, - func(infoKey string, value []byte) error { - found++ - return nil - }) - if err != nil { - return err - } - if found != expectedNumDiagrams { - return errors.Newf("found %d diagrams, expected %d", found, expectedNumDiagrams) - } - return nil - }) - }) -} - -// TestJobsExecutionDetails tests that a job's execution details are retrieved -// and rendered correctly. -func TestJobsExecutionDetails(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // Timeout the test in a few minutes if it hasn't succeeded. - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, time.Minute*2) - defer cancel() - - params, _ := tests.CreateTestServerParams() - params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() - defer jobs.ResetConstructors()() - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - runner := sqlutils.MakeSQLRunner(sqlDB) - - jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { - return fakeExecResumer{ - OnResume: func(ctx context.Context) error { - p := sql.PhysicalPlan{} - infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) - p.PhysicalInfrastructure = infra - jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1) - return nil - }, - } - }, jobs.UsesTenantCostControl) - - runner.Exec(t, `CREATE TABLE t (id INT)`) - runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`) - var importJobID int - runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) - jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) - - var count int - runner.QueryRow(t, `SELECT count(*) FROM [SHOW JOB $1 WITH EXECUTION DETAILS] WHERE plan_diagram IS NOT NULL`, importJobID).Scan(&count) - require.NotZero(t, count) - runner.CheckQueryResults(t, `SELECT count(*) FROM [SHOW JOBS WITH EXECUTION DETAILS] WHERE plan_diagram IS NOT NULL`, [][]string{{"1"}}) -} diff --git a/pkg/sql/jobs_execution_details.go b/pkg/sql/jobs_profiler_execution_details.go similarity index 63% rename from pkg/sql/jobs_execution_details.go rename to pkg/sql/jobs_profiler_execution_details.go index 1a0238dfba2b..756a59f2127d 100644 --- a/pkg/sql/jobs_execution_details.go +++ b/pkg/sql/jobs_profiler_execution_details.go @@ -13,16 +13,22 @@ package sql import ( "context" gojson "encoding/json" + "fmt" "net/url" "strconv" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -166,3 +172,79 @@ func constructBackupExecutionDetails( j, err := gojson.Marshal(executionDetails) return j, err } + +// RequestExecutionDetailFiles implements the JobProfiler interface. +func (p *planner) RequestExecutionDetailFiles(ctx context.Context, jobID jobspb.JobID) error { + execCfg := p.ExecCfg() + if !execCfg.Settings.Version.IsActive(ctx, clusterversion.V23_1) { + return errors.Newf("execution details can only be requested on a cluster with version >= %s", + clusterversion.V23_1.String()) + } + + e := makeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID) + // TODO(adityamaru): When we start collecting more information we can consider + // parallelize the collection of the various pieces. + e.addDistSQLDiagram(ctx) + e.addLabelledGoroutines(ctx) + + return nil +} + +// executionDetailsBuilder can be used to read and write execution details corresponding +// to a job. +type executionDetailsBuilder struct { + srv serverpb.SQLStatusServer + db isql.DB + jobID jobspb.JobID +} + +// makeJobProfilerExecutionDetailsBuilder returns an instance of an executionDetailsBuilder. +func makeJobProfilerExecutionDetailsBuilder( + srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, +) executionDetailsBuilder { + e := executionDetailsBuilder{ + srv: srv, db: db, jobID: jobID, + } + return e +} + +// addLabelledGoroutines collects and persists goroutines from all nodes in the +// cluster that have a pprof label tying it to the job whose execution details +// are being collected. +func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) { + profileRequest := serverpb.ProfileRequest{ + NodeId: "all", + Type: serverpb.ProfileRequest_GOROUTINE, + Labels: true, + LabelFilter: fmt.Sprintf("%d", e.jobID), + } + resp, err := e.srv.Profile(ctx, &profileRequest) + if err != nil { + log.Errorf(ctx, "failed to collect goroutines for job %d: %+v", e.jobID, err.Error()) + return + } + filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00")) + if err := jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, e.db, e.jobID); err != nil { + log.Errorf(ctx, "failed to write goroutine for job %d: %+v", e.jobID, err.Error()) + } +} + +// addDistSQLDiagram generates and persists a `distsql..html` file. +func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { + query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]` + row, err := e.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */ + sessiondata.NoSessionDataOverride, query, e.jobID) + if err != nil { + log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error()) + return + } + if row[0] != tree.DNull { + dspDiagramURL := string(tree.MustBeDString(row[0])) + filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00")) + if err := jobs.WriteExecutionDetailFile(ctx, filename, + []byte(fmt.Sprintf(``, dspDiagramURL)), + e.db, e.jobID); err != nil { + log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %+v", e.jobID, err.Error()) + } + } +} diff --git a/pkg/sql/jobs_profiler_bundle_test.go b/pkg/sql/jobs_profiler_execution_details_test.go similarity index 70% rename from pkg/sql/jobs_profiler_bundle_test.go rename to pkg/sql/jobs_profiler_execution_details_test.go index 0d670d27e554..4768013b027c 100644 --- a/pkg/sql/jobs_profiler_bundle_test.go +++ b/pkg/sql/jobs_profiler_execution_details_test.go @@ -23,15 +23,18 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -40,9 +43,112 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) +// fakeExecResumer calls optional callbacks during the job lifecycle. +type fakeExecResumer struct { + OnResume func(context.Context) error + FailOrCancel func(context.Context) error +} + +func (d fakeExecResumer) ForceRealSpan() bool { + return true +} + +func (d fakeExecResumer) DumpTraceAfterRun() bool { + return true +} + +var _ jobs.Resumer = fakeExecResumer{} +var _ jobs.TraceableJob = fakeExecResumer{} + +func (d fakeExecResumer) Resume(ctx context.Context, execCtx interface{}) error { + if d.OnResume != nil { + if err := d.OnResume(ctx); err != nil { + return err + } + } + return nil +} + +func (d fakeExecResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ error) error { + if d.FailOrCancel != nil { + return d.FailOrCancel(ctx) + } + return nil +} + +// checkForPlanDiagram is a method used in tests to wait for the existence of a +// DSP diagram for the provided jobID. +func checkForPlanDiagrams( + ctx context.Context, t *testing.T, db isql.DB, jobID jobspb.JobID, expectedNumDiagrams int, +) { + testutils.SucceedsSoon(t, func() error { + return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := jobs.InfoStorageForJob(txn, jobID) + var found int + err := infoStorage.Iterate(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, + func(infoKey string, value []byte) error { + found++ + return nil + }) + if err != nil { + return err + } + if found != expectedNumDiagrams { + return errors.Newf("found %d diagrams, expected %d", found, expectedNumDiagrams) + } + return nil + }) + }) +} + +// TestJobsExecutionDetails tests that a job's execution details are retrieved +// and rendered correctly. +func TestJobsExecutionDetails(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Timeout the test in a few minutes if it hasn't succeeded. + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Minute*2) + defer cancel() + + params, _ := tests.CreateTestServerParams() + params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + defer jobs.ResetConstructors()() + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + runner := sqlutils.MakeSQLRunner(sqlDB) + + jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return fakeExecResumer{ + OnResume: func(ctx context.Context) error { + p := sql.PhysicalPlan{} + infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) + p.PhysicalInfrastructure = infra + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1) + return nil + }, + } + }, jobs.UsesTenantCostControl) + + runner.Exec(t, `CREATE TABLE t (id INT)`) + runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`) + var importJobID int + runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) + jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) + + var count int + runner.QueryRow(t, `SELECT count(*) FROM [SHOW JOB $1 WITH EXECUTION DETAILS] WHERE plan_diagram IS NOT NULL`, importJobID).Scan(&count) + require.NotZero(t, count) + runner.CheckQueryResults(t, `SELECT count(*) FROM [SHOW JOBS WITH EXECUTION DETAILS] WHERE plan_diagram IS NOT NULL`, [][]string{{"1"}}) +} + // TestReadWriteProfilerExecutionDetails is an end-to-end test of requesting and collecting // execution details for a job. func TestReadWriteProfilerExecutionDetails(t *testing.T) { @@ -162,9 +268,10 @@ func TestListProfilerExecutionDetails(t *testing.T) { runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files := listExecutionDetails(t, s, jobspb.JobID(importJobID)) - require.Len(t, files, 2) + require.Len(t, files, 3) require.Regexp(t, "distsql\\..*\\.html", files[0]) require.Regexp(t, "goroutines\\..*\\.txt", files[1]) + require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[2]) // Resume the job, so it can write another DistSQL diagram and goroutine // snapshot. @@ -174,11 +281,13 @@ func TestListProfilerExecutionDetails(t *testing.T) { jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files = listExecutionDetails(t, s, jobspb.JobID(importJobID)) - require.Len(t, files, 4) + require.Len(t, files, 6) require.Regexp(t, "distsql\\..*\\.html", files[0]) require.Regexp(t, "distsql\\..*\\.html", files[1]) require.Regexp(t, "goroutines\\..*\\.txt", files[2]) require.Regexp(t, "goroutines\\..*\\.txt", files[3]) + require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[4]) + require.Regexp(t, "resumer-trace-n[0-9]\\..*\\.txt", files[5]) }) } diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index 676ea0206fcc..6c8150036edb 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -7722,7 +7722,7 @@ specified store on the node it's run from. One of 'mvccGC', 'merge', 'split', } jobID := int(tree.MustBeDInt(args[0])) - if err := evalCtx.JobsProfiler.RequestExecutionDetails( + if err := evalCtx.JobsProfiler.RequestExecutionDetailFiles( ctx, jobspb.JobID(jobID), ); err != nil { diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 7c0dd9248ddc..4900b9696845 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -287,12 +287,12 @@ type JobsProfiler interface { // execution details. GenerateExecutionDetailsJSON(ctx context.Context, evalCtx *Context, jobID jobspb.JobID) ([]byte, error) - // RequestExecutionDetails triggers the collection of execution details for - // the specified jobID that are then persisted to `system.job_info`. This + // RequestExecutionDetailFiles triggers the collection of execution details + // for the specified jobID that are then persisted to `system.job_info`. This // currently includes the following pieces of information: // // - Latest DistSQL diagram of the job - RequestExecutionDetails(ctx context.Context, jobID jobspb.JobID) error + RequestExecutionDetailFiles(ctx context.Context, jobID jobspb.JobID) error } // DescIDGenerator generates unique descriptor IDs. diff --git a/pkg/util/parquet/BUILD.bazel b/pkg/util/parquet/BUILD.bazel index dea384112f3b..2c39f7b8ce7a 100644 --- a/pkg/util/parquet/BUILD.bazel +++ b/pkg/util/parquet/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/sql/types", "//pkg/util", "//pkg/util/bitarray", + "//pkg/util/buildutil", "//pkg/util/duration", "//pkg/util/encoding", "//pkg/util/timeofday", diff --git a/pkg/util/parquet/testutils.go b/pkg/util/parquet/testutils.go index d6f88bfc10ad..047a298bc435 100644 --- a/pkg/util/parquet/testutils.go +++ b/pkg/util/parquet/testutils.go @@ -13,7 +13,6 @@ package parquet import ( "bytes" "fmt" - "io" "math" "os" "strconv" @@ -31,15 +30,6 @@ import ( "github.com/stretchr/testify/require" ) -// NewWriterWithReaderMeta constructs a Writer that writes additional metadata -// required to use the reader utility functions below. -func NewWriterWithReaderMeta( - sch *SchemaDefinition, sink io.Writer, opts ...Option, -) (*Writer, error) { - opts = append(opts, WithMetadata(MakeReaderMetadata(sch))) - return NewWriter(sch, sink, opts...) -} - // ReadFileAndVerifyDatums asserts that a parquet file's metadata matches the // metadata from the writer and its data matches writtenDatums. func ReadFileAndVerifyDatums( @@ -69,7 +59,7 @@ func ReadFileAndVerifyDatums( // ReadFile reads a parquet file and returns the contained metadata and datums. // // To use this function, the Writer must be configured to write CRDB-specific -// metadata for the reader. See NewWriterWithReaderMeta. +// metadata for the reader. See NewWriter() and buildutil.CrdbTestBuild. // // NB: The returned datums may not be hydrated or identical to the ones // which were written. See comment on ValidateDatum for more info. @@ -95,7 +85,7 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu if typFamiliesMeta == nil { return ReadDatumsMetadata{}, nil, errors.AssertionFailedf("missing type family metadata. ensure the writer is configured" + - " to write reader metadata. see NewWriterWithReaderMeta()") + " to write reader metadata. see NewWriter() and buildutil.CrdbTestBuild") } typFamilies, err := deserializeIntArray(*typFamiliesMeta) if err != nil { @@ -112,7 +102,7 @@ func ReadFile(parquetFile string) (meta ReadDatumsMetadata, datums [][]tree.Datu if typOidsMeta == nil { return ReadDatumsMetadata{}, nil, errors.AssertionFailedf("missing type oid metadata. ensure the writer is configured" + - " to write reader metadata. see NewWriterWithReaderMeta()") + " to write reader metadata. see NewWriter() and buildutil.CrdbTestBuild)") } typOids, err := deserializeIntArray(*typOidsMeta) if err != nil { diff --git a/pkg/util/parquet/writer.go b/pkg/util/parquet/writer.go index 4120e9d4574b..a4f646b8ce60 100644 --- a/pkg/util/parquet/writer.go +++ b/pkg/util/parquet/writer.go @@ -18,6 +18,7 @@ import ( "github.com/apache/arrow/go/v11/parquet/file" "github.com/apache/arrow/go/v11/parquet/metadata" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/errors" ) @@ -165,6 +166,13 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer, return nil, err } } + // Add additional metadata required to use the reader utility functions in + // testutils.go. + if buildutil.CrdbTestBuild { + if err := WithMetadata(MakeReaderMetadata(sch)).apply(&cfg); err != nil { + return nil, err + } + } parquetOpts := []parquet.WriterProperty{ parquet.WithCreatedBy("cockroachdb"), diff --git a/pkg/util/parquet/writer_test.go b/pkg/util/parquet/writer_test.go index 7ef4ba569384..9190d6077938 100644 --- a/pkg/util/parquet/writer_test.go +++ b/pkg/util/parquet/writer_test.go @@ -128,7 +128,7 @@ func TestRandomDatums(t *testing.T) { schemaDef, err := NewSchema(sch.columnNames, sch.columnTypes) require.NoError(t, err) - writer, err := NewWriterWithReaderMeta(schemaDef, f, WithMaxRowGroupLength(maxRowGroupSize)) + writer, err := NewWriter(schemaDef, f, WithMaxRowGroupLength(maxRowGroupSize)) require.NoError(t, err) for _, row := range datums { @@ -501,7 +501,7 @@ func TestBasicDatums(t *testing.T) { schemaDef, err := NewSchema(tc.sch.columnNames, tc.sch.columnTypes) require.NoError(t, err) - writer, err := NewWriterWithReaderMeta(schemaDef, f, WithMaxRowGroupLength(maxRowGroupSize)) + writer, err := NewWriter(schemaDef, f, WithMaxRowGroupLength(maxRowGroupSize)) require.NoError(t, err) for _, row := range datums {