Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit 7dd81a2

Browse files
committed
reproduce hang in test and fix
needed to cover the case where import routines have errors and exit early
1 parent 5176ede commit 7dd81a2

File tree

2 files changed

+75
-5
lines changed

2 files changed

+75
-5
lines changed

client.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,25 +1078,39 @@ func (c *Client) ExperimentalReplayImport(r io.Reader, concurrency int) error {
10781078
})
10791079
}
10801080

1081+
// wait on importers asynchronously in case they return early with errors
1082+
waitErrCh := make(chan error)
1083+
go func() {
1084+
waitErrCh <- eg.Wait()
1085+
close(waitErrCh)
1086+
}()
1087+
10811088
// populate work channel
10821089
dec := newImportLogDecoder(r)
10831090
var err error
10841091
for {
10851092
log := importLog{}
10861093
err = dec.Decode(&log)
10871094
if err != nil {
1095+
err = errors.Wrap(err, "decoding")
10881096
break
10891097
}
1090-
work <- &log
1098+
select {
1099+
case work <- &log:
1100+
continue
1101+
case err = <-waitErrCh:
1102+
err = errors.Wrap(err, "waiting")
1103+
}
1104+
break
10911105
}
10921106

10931107
// close work channel (now workers can exit)
10941108
close(work)
1095-
waitErr := eg.Wait()
1096-
if err != io.EOF {
1097-
return errors.Wrap(err, "decoding")
1109+
<-waitErrCh // make sure workers are complete
1110+
if errors.Cause(err) == io.EOF {
1111+
return nil
10981112
}
1099-
return errors.Wrap(waitErr, "waiting")
1113+
return err
11001114
}
11011115

11021116
func defaultProtobufHeaders() map[string]string {

client_internal_it_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"io/ioutil"
99
"net/http"
1010
"reflect"
11+
"strings"
1112
"testing"
1213
"testing/iotest"
14+
"time"
1315
)
1416

1517
func TestNewClientFromAddresses(t *testing.T) {
@@ -115,3 +117,57 @@ func TestImportWithReplay(t *testing.T) {
115117
}
116118
}
117119
}
120+
121+
func TestImportWithReplayErrors(t *testing.T) {
122+
buf := &bytes.Buffer{}
123+
client := getClient(ExperimentalOptClientLogImports(buf))
124+
125+
// the first iterator for creating the target
126+
iterator := &ColumnGenerator{numRows: 10, numColumns: 1000}
127+
target := map[uint64][]uint64{}
128+
for {
129+
rec, err := iterator.NextRecord()
130+
if err == io.EOF {
131+
break
132+
}
133+
if err != nil {
134+
t.Fatal(err)
135+
}
136+
if col, ok := rec.(Column); ok {
137+
target[col.RowID] = append(target[col.RowID], col.ColumnID)
138+
}
139+
}
140+
// the second iterator for the actual import
141+
iterator = &ColumnGenerator{numRows: 10, numColumns: 1000}
142+
field := index.Field("importfield-batchsize")
143+
err := client.EnsureField(field)
144+
if err != nil {
145+
t.Fatal(err)
146+
}
147+
148+
statusChan := make(chan ImportStatusUpdate, 1000)
149+
err = client.ImportField(field, iterator, OptImportStatusChannel(statusChan), OptImportThreadCount(2), OptImportBatchSize(100))
150+
if err != nil {
151+
t.Fatal(err)
152+
}
153+
154+
// delete index, but don't recreate
155+
Reset()
156+
157+
result := make(chan error)
158+
go func() {
159+
result <- client.ExperimentalReplayImport(buf, 2)
160+
}()
161+
162+
select {
163+
case err := <-result:
164+
if err == nil {
165+
t.Fatalf("import shouldn't have worked")
166+
}
167+
if !strings.Contains(err.Error(), "waiting: ") {
168+
t.Fatalf("unexpected error: %v", err)
169+
}
170+
case <-time.After(time.Second * 5):
171+
t.Fatal("import replay hanging when no schema created")
172+
}
173+
}

0 commit comments

Comments
 (0)