Skip to content

Commit a4890bd

Browse files
authored
Merge pull request #722 from segmentio/fix-writer-tests
Make Writer tests more resilient
2 parents 05c9b5b + bd2b3b1 commit a4890bd

File tree

6 files changed

+94
-26
lines changed

6 files changed

+94
-26
lines changed

compress/compress_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"text/tabwriter"
1717
"time"
1818

19-
kafka "github.com/segmentio/kafka-go"
19+
"github.com/segmentio/kafka-go"
2020
pkg "github.com/segmentio/kafka-go/compress"
2121
"github.com/segmentio/kafka-go/compress/gzip"
2222
"github.com/segmentio/kafka-go/compress/lz4"
@@ -147,7 +147,7 @@ func testCompressedMessages(t *testing.T, codec pkg.Codec) {
147147
}
148148
offset++
149149
}
150-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
150+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
151151
if err := w.WriteMessages(ctx, batch...); err != nil {
152152
t.Errorf("error sending batch %d, reason: %+v", i+1, err)
153153
}

consumergroup_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package kafka
33
import (
44
"context"
55
"errors"
6-
"log"
7-
"os"
86
"reflect"
97
"strings"
108
"sync"
@@ -323,7 +321,7 @@ func TestConsumerGroup(t *testing.T) {
323321
HeartbeatInterval: 2 * time.Second,
324322
RebalanceTimeout: 2 * time.Second,
325323
RetentionTime: time.Hour,
326-
Logger: log.New(os.Stdout, "cg-test: ", 0),
324+
Logger: &testKafkaLogger{T: t},
327325
})
328326
if err != nil {
329327
t.Fatal(err)
@@ -569,7 +567,7 @@ func TestConsumerGroupErrors(t *testing.T) {
569567
connect: func(*Dialer, ...string) (coordinator, error) {
570568
return mc, nil
571569
},
572-
Logger: log.New(os.Stdout, "cg-errors-test: ", 0),
570+
Logger: &testKafkaLogger{T: t},
573571
})
574572
if err != nil {
575573
t.Fatal(err)

describegroups_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,6 @@ func TestClientDescribeGroups(t *testing.T) {
6363
t.Skip("Skipping because kafka version is 2.3.1")
6464
}
6565

66-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
67-
defer cancel()
68-
6966
client, shutdown := newLocalClient()
7067
defer shutdown()
7168

@@ -78,6 +75,10 @@ func TestClientDescribeGroups(t *testing.T) {
7875
w := newTestWriter(WriterConfig{
7976
Topic: topic,
8077
})
78+
79+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
80+
defer cancel()
81+
8182
err := w.WriteMessages(
8283
ctx,
8384
Message{

listgroups_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ func TestListGroupsResponseV1(t *testing.T) {
4242
}
4343

4444
func TestClientListGroups(t *testing.T) {
45-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
46-
defer cancel()
47-
4845
client, shutdown := newLocalClient()
4946
defer shutdown()
5047

@@ -57,6 +54,10 @@ func TestClientListGroups(t *testing.T) {
5754
w := newTestWriter(WriterConfig{
5855
Topic: topic,
5956
})
57+
58+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
59+
defer cancel()
60+
6061
err := w.WriteMessages(
6162
ctx,
6263
Message{

reader_test.go

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,14 +266,20 @@ func testReaderOutOfRangeGetsCanceled(t *testing.T, ctx context.Context, r *Read
266266
}
267267

268268
func createTopic(t *testing.T, topic string, partitions int) {
269+
t.Helper()
270+
271+
t.Logf("createTopic(%s, %d)", topic, partitions)
272+
269273
conn, err := Dial("tcp", "localhost:9092")
270274
if err != nil {
275+
err = fmt.Errorf("createTopic, Dial: %w", err)
271276
t.Fatal(err)
272277
}
273278
defer conn.Close()
274279

275280
controller, err := conn.Controller()
276281
if err != nil {
282+
err = fmt.Errorf("createTopic, conn.Controller: %w", err)
277283
t.Fatal(err)
278284
}
279285

@@ -282,7 +288,7 @@ func createTopic(t *testing.T, topic string, partitions int) {
282288
t.Fatal(err)
283289
}
284290

285-
conn.SetDeadline(time.Now().Add(2 * time.Second))
291+
conn.SetDeadline(time.Now().Add(10 * time.Second))
286292

287293
_, err = conn.createTopics(createTopicsRequestV0{
288294
Topics: []createTopicsRequestV0Topic{
@@ -300,12 +306,65 @@ func createTopic(t *testing.T, topic string, partitions int) {
300306
case TopicAlreadyExists:
301307
// ok
302308
default:
309+
err = fmt.Errorf("creaetTopic, conn.createtTopics: %w", err)
303310
t.Error(err)
304311
t.FailNow()
305312
}
313+
314+
315+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
316+
defer cancel()
317+
318+
waitForTopic(ctx, t, topic)
319+
}
320+
321+
// Block until topic exists
322+
func waitForTopic(ctx context.Context, t *testing.T, topic string) {
323+
t.Helper()
324+
325+
for {
326+
select {
327+
case <-ctx.Done():
328+
t.Fatalf("reached deadline before verifying topic existence")
329+
default:
330+
}
331+
332+
cli := &Client{
333+
Addr: TCP("localhost:9092"),
334+
Timeout: 5 * time.Second,
335+
}
336+
337+
response, err := cli.Metadata(ctx, &MetadataRequest{
338+
Addr: cli.Addr,
339+
Topics: []string{topic},
340+
})
341+
if err != nil {
342+
t.Fatalf("waitForTopic: error listing topics: %s", err.Error())
343+
}
344+
345+
// Find a topic which has at least 1 partition in the metadata response
346+
for _, top := range response.Topics {
347+
if top.Name != topic {
348+
continue
349+
}
350+
351+
numPartitions := len(top.Partitions)
352+
t.Logf("waitForTopic: found topic %q with %d partitions",
353+
topic, numPartitions)
354+
355+
if numPartitions > 0 {
356+
return
357+
}
358+
}
359+
360+
t.Logf("retrying after 1s")
361+
time.Sleep(time.Second)
362+
continue
363+
}
306364
}
307365

308366
func deleteTopic(t *testing.T, topic ...string) {
367+
t.Helper()
309368
conn, err := Dial("tcp", "localhost:9092")
310369
if err != nil {
311370
t.Fatal(err)
@@ -322,7 +381,7 @@ func deleteTopic(t *testing.T, topic ...string) {
322381
t.Fatal(err)
323382
}
324383

325-
conn.SetDeadline(time.Now().Add(2 * time.Second))
384+
conn.SetDeadline(time.Now().Add(10 * time.Second))
326385

327386
if err := conn.DeleteTopics(topic...); err != nil {
328387
t.Fatal(err)

writer_test.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kafka
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"math"
78
"testing"
@@ -281,15 +282,23 @@ func testWriterMaxBytes(t *testing.T) {
281282
}
282283
}
283284

285+
// readOffset gets the latest offset for the given topic/partition
284286
func readOffset(topic string, partition int) (offset int64, err error) {
285287
var conn *Conn
286288

287-
if conn, err = DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition); err != nil {
289+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
290+
defer cancel()
291+
292+
if conn, err = DialLeader(ctx, "tcp", "localhost:9092", topic, partition); err != nil {
293+
err = fmt.Errorf("readOffset, DialLeader: %w", err)
288294
return
289295
}
290296
defer conn.Close()
291297

292298
offset, err = conn.ReadLastOffset()
299+
if err != nil {
300+
err = fmt.Errorf("readOffset, conn.ReadLastOffset: %w", err)
301+
}
293302
return
294303
}
295304

@@ -302,7 +311,7 @@ func readPartition(topic string, partition int, offset int64) (msgs []Message, e
302311
defer conn.Close()
303312

304313
conn.Seek(offset, SeekAbsolute)
305-
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
314+
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
306315
batch := conn.ReadBatch(0, 1000000000)
307316
defer batch.Close()
308317

@@ -321,10 +330,7 @@ func readPartition(topic string, partition int, offset int64) (msgs []Message, e
321330
}
322331

323332
func testWriterBatchBytes(t *testing.T) {
324-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
325-
defer cancel()
326-
327-
const topic = "test-writer-1-bytes"
333+
topic := makeTopic()
328334
createTopic(t, topic, 1)
329335
defer deleteTopic(t, topic)
330336

@@ -341,6 +347,8 @@ func testWriterBatchBytes(t *testing.T) {
341347
})
342348
defer w.Close()
343349

350+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
351+
defer cancel()
344352
if err := w.WriteMessages(ctx, []Message{
345353
Message{Value: []byte("Hi")}, // 24 Bytes
346354
Message{Value: []byte("By")}, // 24 Bytes
@@ -374,8 +382,6 @@ func testWriterBatchBytes(t *testing.T) {
374382
}
375383

376384
func testWriterBatchSize(t *testing.T) {
377-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
378-
defer cancel()
379385

380386
topic := makeTopic()
381387
createTopic(t, topic, 1)
@@ -394,6 +400,9 @@ func testWriterBatchSize(t *testing.T) {
394400
})
395401
defer w.Close()
396402

403+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
404+
defer cancel()
405+
397406
if err := w.WriteMessages(ctx, []Message{
398407
Message{Value: []byte("Hi")}, // 24 Bytes
399408
Message{Value: []byte("By")}, // 24 Bytes
@@ -427,8 +436,6 @@ func testWriterBatchSize(t *testing.T) {
427436
}
428437

429438
func testWriterSmallBatchBytes(t *testing.T) {
430-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
431-
defer cancel()
432439

433440
topic := makeTopic()
434441
createTopic(t, topic, 1)
@@ -447,6 +454,8 @@ func testWriterSmallBatchBytes(t *testing.T) {
447454
})
448455
defer w.Close()
449456

457+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
458+
defer cancel()
450459
if err := w.WriteMessages(ctx, []Message{
451460
Message{Value: []byte("Hi")}, // 24 Bytes
452461
Message{Value: []byte("By")}, // 24 Bytes
@@ -480,8 +489,6 @@ func testWriterSmallBatchBytes(t *testing.T) {
480489
}
481490

482491
func testWriterMultipleTopics(t *testing.T) {
483-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
484-
defer cancel()
485492

486493
topic1 := makeTopic()
487494
createTopic(t, topic1, 1)
@@ -509,6 +516,8 @@ func testWriterMultipleTopics(t *testing.T) {
509516
msg1 := Message{Topic: topic1, Value: []byte("Hello")}
510517
msg2 := Message{Topic: topic2, Value: []byte("World")}
511518

519+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
520+
defer cancel()
512521
if err := w.WriteMessages(ctx, msg1, msg2); err != nil {
513522
t.Error(err)
514523
return

0 commit comments

Comments
 (0)