This repository has been archived by the owner on Apr 28, 2020. It is now read-only.
forked from mediocregopher/radix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeliner_test.go
129 lines (95 loc) · 3.22 KB
/
pipeliner_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package radix
import (
"net"
. "testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPipeliner(t *T) {
dialOpts := []DialOpt{DialReadTimeout(time.Second)}
testRecoverableError := func(t *T, p *pipeliner) {
key := randStr()
setCmd := getPipelinerCmd(Cmd(nil, "SET", key, key))
var firstGetResult string
firstGetCmd := getPipelinerCmd(Cmd(&firstGetResult, "GET", key))
invalidCmd := getPipelinerCmd(Cmd(nil, "RADIXISAWESOME"))
var secondGetResult string
secondGetCmd := getPipelinerCmd(Cmd(&secondGetResult, "GET", key))
p.flush([]CmdAction{setCmd, firstGetCmd, invalidCmd, secondGetCmd})
require.Nil(t, <-setCmd.resCh)
require.Nil(t, <-firstGetCmd.resCh)
require.Equal(t, key, firstGetResult)
require.NotNil(t, <-invalidCmd.resCh)
require.Nil(t, <-secondGetCmd.resCh)
require.Equal(t, key, secondGetResult)
}
testTimeout := func(t *T, p *pipeliner) {
key := randStr()
delCmd := getPipelinerCmd(Cmd(nil, "DEL", key))
pushCmd := getPipelinerCmd(Cmd(nil, "LPUSH", key, "3", "2", "1"))
p.flush([]CmdAction{delCmd, pushCmd})
require.Nil(t, <-delCmd.resCh)
require.Nil(t, <-pushCmd.resCh)
var firstPopResult string
firstPopCmd := getPipelinerCmd(Cmd(&firstPopResult, "LPOP", key))
var pauseResult string
pauseCmd := getPipelinerCmd(Cmd(&pauseResult, "CLIENT", "PAUSE", "1100"))
var secondPopResult string
secondPopCmd := getPipelinerCmd(Cmd(&secondPopResult, "LPOP", key))
var thirdPopResult string
thirdPopCmd := getPipelinerCmd(Cmd(&thirdPopResult, "LPOP", key))
p.flush([]CmdAction{firstPopCmd, pauseCmd, secondPopCmd, thirdPopCmd})
require.Nil(t, <-firstPopCmd.resCh)
require.Equal(t, "1", firstPopResult)
require.Nil(t, <-pauseCmd.resCh)
require.Equal(t, "OK", pauseResult)
secondPopErr := <-secondPopCmd.resCh
require.IsType(t, (*net.OpError)(nil), secondPopErr)
require.True(t, secondPopErr.(net.Error).Temporary())
require.True(t, secondPopErr.(net.Error).Timeout())
assert.Empty(t, secondPopResult)
thirdPopErr := <-thirdPopCmd.resCh
require.IsType(t, (*net.OpError)(nil), thirdPopErr)
require.True(t, thirdPopErr.(net.Error).Temporary())
require.True(t, thirdPopErr.(net.Error).Timeout())
assert.Empty(t, thirdPopResult)
}
t.Run("Conn", func(t *T) {
t.Run("RecoverableError", func(t *T) {
conn := dial(dialOpts...)
defer conn.Close()
p := newPipeliner(conn, 0, 0, 0)
defer p.Close()
testRecoverableError(t, p)
})
t.Run("Timeout", func(t *T) {
conn := dial(dialOpts...)
defer conn.Close()
p := newPipeliner(conn, 0, 0, 0)
defer p.Close()
testTimeout(t, p)
})
})
// Pool may have potentially different semantics because it uses ioErrConn
// directly, so we test it separately.
t.Run("Pool", func(t *T) {
poolOpts := []PoolOpt{
PoolConnFunc(func(string, string) (Conn, error) {
return dial(dialOpts...), nil
}),
PoolPipelineConcurrency(1),
PoolPipelineWindow(time.Hour, 0),
}
t.Run("RecoverableError", func(t *T) {
pool := testPool(1, poolOpts...)
defer pool.Close()
testRecoverableError(t, pool.pipeliner)
})
t.Run("Timeout", func(t *T) {
pool := testPool(1, poolOpts...)
defer pool.Close()
testTimeout(t, pool.pipeliner)
})
})
}