Skip to content

Commit 06fe601

Browse files
authored
Merge pull request #116 from ohkinozomu/mlist-tests
Add tests for mlist
2 parents 775c81d + 4b26ed3 commit 06fe601

File tree

4 files changed

+243
-17
lines changed

4 files changed

+243
-17
lines changed

cluster/agent_test.go

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@ import (
66
"time"
77

88
"github.com/stretchr/testify/require"
9+
"github.com/wind-c/comqtt/v2/cluster/discovery/mlist"
910
"github.com/wind-c/comqtt/v2/cluster/log"
1011
"github.com/wind-c/comqtt/v2/cluster/utils"
1112
"github.com/wind-c/comqtt/v2/config"
1213
)
1314

14-
func TestCluster(t *testing.T) {
15-
log.Init(log.DefaultOptions())
16-
15+
func TestCluster_Hashicorp_Serf(t *testing.T) {
1716
bindPort1, err := utils.GetFreePort()
1817
require.NoError(t, err, "Failed to get free port for node1")
1918
raftPort1, err := utils.GetFreePort()
@@ -48,10 +47,6 @@ func TestCluster(t *testing.T) {
4847
DiscoveryWay: config.DiscoveryWaySerf,
4948
NodesFileDir: t.TempDir(),
5049
}
51-
agent1 := NewAgent(conf1)
52-
err = agent1.Start()
53-
require.NoError(t, err, "Agent start failed for node: %s", conf1.NodeName)
54-
5550
conf2 := &config.Cluster{
5651
NodeName: "node2",
5752
RaftImpl: config.RaftImplHashicorp,
@@ -65,11 +60,6 @@ func TestCluster(t *testing.T) {
6560
DiscoveryWay: config.DiscoveryWaySerf,
6661
NodesFileDir: t.TempDir(),
6762
}
68-
agent2 := NewAgent(conf2)
69-
err = agent2.Start()
70-
defer agent2.Stop()
71-
require.NoError(t, err, "Agent start failed for node: %s", conf2.NodeName)
72-
7363
conf3 := &config.Cluster{
7464
NodeName: "node3",
7565
RaftImpl: config.RaftImplHashicorp,
@@ -83,6 +73,79 @@ func TestCluster(t *testing.T) {
8373
DiscoveryWay: config.DiscoveryWaySerf,
8474
NodesFileDir: t.TempDir(),
8575
}
76+
testCluster(t, conf1, conf2, conf3)
77+
}
78+
79+
func TestCluster_Hashicorp_Memberlist(t *testing.T) {
80+
bindPort1, err := utils.GetFreePort()
81+
require.NoError(t, err, "Failed to get free port for node1")
82+
83+
bindPort2, err := utils.GetFreePort()
84+
require.NoError(t, err, "Failed to get free port for node2")
85+
86+
bindPort3, err := utils.GetFreePort()
87+
require.NoError(t, err, "Failed to get free port for node3")
88+
89+
members := []string{
90+
"127.0.0.1:" + strconv.Itoa(bindPort1),
91+
"127.0.0.1:" + strconv.Itoa(bindPort2),
92+
"127.0.0.1:" + strconv.Itoa(bindPort3),
93+
}
94+
95+
conf1 := &config.Cluster{
96+
NodeName: "node1",
97+
RaftImpl: config.RaftImplHashicorp,
98+
BindAddr: "127.0.0.1",
99+
BindPort: bindPort1,
100+
RaftPort: mlist.GetRaftPortFromBindPort(bindPort1),
101+
RaftBootstrap: true,
102+
RaftDir: t.TempDir(),
103+
GrpcEnable: false,
104+
Members: members,
105+
DiscoveryWay: config.DiscoveryWayMemberlist,
106+
NodesFileDir: t.TempDir(),
107+
}
108+
conf2 := &config.Cluster{
109+
NodeName: "node2",
110+
RaftImpl: config.RaftImplHashicorp,
111+
BindAddr: "127.0.0.1",
112+
BindPort: bindPort2,
113+
RaftPort: mlist.GetRaftPortFromBindPort(bindPort2),
114+
RaftBootstrap: false,
115+
RaftDir: t.TempDir(),
116+
GrpcEnable: false,
117+
Members: members,
118+
DiscoveryWay: config.DiscoveryWayMemberlist,
119+
NodesFileDir: t.TempDir(),
120+
}
121+
conf3 := &config.Cluster{
122+
NodeName: "node3",
123+
RaftImpl: config.RaftImplHashicorp,
124+
BindAddr: "127.0.0.1",
125+
BindPort: bindPort3,
126+
RaftPort: mlist.GetRaftPortFromBindPort(bindPort3),
127+
RaftBootstrap: false,
128+
RaftDir: t.TempDir(),
129+
GrpcEnable: false,
130+
Members: members,
131+
DiscoveryWay: config.DiscoveryWayMemberlist,
132+
NodesFileDir: t.TempDir(),
133+
}
134+
testCluster(t, conf1, conf2, conf3)
135+
}
136+
137+
func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, conf3 *config.Cluster) {
138+
log.Init(log.DefaultOptions())
139+
140+
agent1 := NewAgent(conf1)
141+
err := agent1.Start()
142+
require.NoError(t, err, "Agent start failed for node: %s", conf1.NodeName)
143+
144+
agent2 := NewAgent(conf2)
145+
err = agent2.Start()
146+
defer agent2.Stop()
147+
require.NoError(t, err, "Agent start failed for node: %s", conf2.NodeName)
148+
86149
agent3 := NewAgent(conf3)
87150
err = agent3.Start()
88151
defer agent3.Stop()
@@ -121,13 +184,14 @@ func TestCluster(t *testing.T) {
121184
}
122185

123186
// Restart agent1 and verify it is a follower
124-
err = agent1.Start()
187+
restartedAgent1 := NewAgent(conf1)
188+
err = restartedAgent1.Start()
125189
require.NoError(t, err, "Agent restart failed for node: %s", conf1.NodeName)
126-
defer agent1.Stop()
190+
defer restartedAgent1.Stop()
127191

128192
time.Sleep(5 * time.Second)
129193

130-
_, leaderAfterRestart1 := agent1.raftPeer.GetLeader()
194+
_, leaderAfterRestart1 := restartedAgent1.raftPeer.GetLeader()
131195
_, leaderAfterRestart2 := agent2.raftPeer.GetLeader()
132196
_, leaderAfterRestart3 := agent3.raftPeer.GetLeader()
133197

cluster/discovery/mlist/delegate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ func NewDelegate(inboundMsgCh chan<- []byte) *Delegate {
5050
d := &Delegate{
5151
msgCh: inboundMsgCh,
5252
State: make(map[string]int64, 2),
53+
stop: make(chan struct{}),
5354
}
5455
go d.handleQueueDepth()
5556
return d
5657
}
5758

5859
func (d *Delegate) Stop() {
59-
d.stop <- struct{}{}
60+
close(d.stop)
6061
close(d.msgCh)
6162
}
6263

cluster/discovery/mlist/membership.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (m *Membership) LocalAddr() string {
104104
return m.list.LocalNode().Addr.String()
105105
}
106106

107-
func (m *Membership) NumMembers() int {
107+
func (m *Membership) numMembers() int {
108108
return m.list.NumMembers()
109109
}
110110

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package mlist
2+
3+
import (
4+
"strconv"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/wind-c/comqtt/v2/cluster/utils"
10+
"github.com/wind-c/comqtt/v2/config"
11+
)
12+
13+
func TestJoinAndLeave(t *testing.T) {
14+
bindPort1, err := utils.GetFreePort()
15+
assert.NoError(t, err)
16+
conf1 := &config.Cluster{
17+
BindAddr: "127.0.0.1",
18+
BindPort: bindPort1,
19+
NodeName: "test-node-1",
20+
}
21+
inboundMsgCh1 := make(chan []byte)
22+
membership1 := New(conf1, inboundMsgCh1)
23+
err = membership1.Setup()
24+
assert.NoError(t, err)
25+
defer membership1.Stop()
26+
27+
assert.Equal(t, 1, membership1.numMembers())
28+
29+
bindPort2, err := utils.GetFreePort()
30+
assert.NoError(t, err)
31+
conf2 := &config.Cluster{
32+
BindAddr: "127.0.0.1",
33+
BindPort: bindPort2,
34+
NodeName: "test-node-2",
35+
}
36+
inboundMsgCh2 := make(chan []byte)
37+
membership2 := New(conf2, inboundMsgCh2)
38+
err = membership2.Setup()
39+
assert.NoError(t, err)
40+
defer membership2.Stop()
41+
42+
numJoined, err := membership2.Join([]string{"127.0.0.1:" + strconv.Itoa(bindPort1)})
43+
assert.NoError(t, err)
44+
time.Sleep(3 * time.Second)
45+
assert.Equal(t, numJoined, 1)
46+
assert.Equal(t, 2, membership1.numMembers())
47+
assert.Equal(t, 2, membership2.numMembers())
48+
49+
t.Log("Leave node 2")
50+
err = membership2.Leave()
51+
assert.NoError(t, err)
52+
53+
time.Sleep(5 * time.Second)
54+
assert.Equal(t, 1, membership1.numMembers())
55+
}
56+
57+
func TestSendToNode(t *testing.T) {
58+
bindPort1, err := utils.GetFreePort()
59+
assert.NoError(t, err)
60+
bindPort2, err := utils.GetFreePort()
61+
assert.NoError(t, err)
62+
63+
conf1 := &config.Cluster{
64+
BindAddr: "127.0.0.1",
65+
BindPort: bindPort1,
66+
NodeName: "test-node-1",
67+
}
68+
conf2 := &config.Cluster{
69+
BindAddr: "127.0.0.1",
70+
BindPort: bindPort2,
71+
NodeName: "test-node-2",
72+
Members: []string{"127.0.0.1:" + strconv.Itoa(bindPort1)},
73+
}
74+
inboundMsgCh1 := make(chan []byte)
75+
inboundMsgCh2 := make(chan []byte)
76+
77+
membership1 := New(conf1, inboundMsgCh1)
78+
err = membership1.Setup()
79+
assert.NoError(t, err)
80+
defer membership1.Stop()
81+
82+
membership2 := New(conf2, inboundMsgCh2)
83+
err = membership2.Setup()
84+
assert.NoError(t, err)
85+
defer membership2.Stop()
86+
87+
time.Sleep(3 * time.Second)
88+
89+
err = membership1.SendToNode("test-node-2", []byte("test message"))
90+
assert.NoError(t, err)
91+
92+
select {
93+
case msg := <-inboundMsgCh2:
94+
assert.Equal(t, []byte("test message"), msg)
95+
case <-time.After(5 * time.Second):
96+
t.Fatal("Did not receive the message in membership2")
97+
}
98+
}
99+
100+
func TestSendToOthers(t *testing.T) {
101+
bindPort1, err := utils.GetFreePort()
102+
assert.NoError(t, err)
103+
bindPort2, err := utils.GetFreePort()
104+
assert.NoError(t, err)
105+
bindPort3, err := utils.GetFreePort()
106+
assert.NoError(t, err)
107+
108+
conf1 := &config.Cluster{
109+
BindAddr: "127.0.0.1",
110+
BindPort: bindPort1,
111+
NodeName: "test-node-1",
112+
}
113+
conf2 := &config.Cluster{
114+
BindAddr: "127.0.0.1",
115+
BindPort: bindPort2,
116+
NodeName: "test-node-2",
117+
Members: []string{"127.0.0.1:" + strconv.Itoa(bindPort1)},
118+
}
119+
conf3 := &config.Cluster{
120+
BindAddr: "127.0.0.1",
121+
BindPort: bindPort3,
122+
NodeName: "test-node-3",
123+
Members: []string{"127.0.0.1:" + strconv.Itoa(bindPort1)},
124+
}
125+
inboundMsgCh1 := make(chan []byte)
126+
inboundMsgCh2 := make(chan []byte)
127+
inboundMsgCh3 := make(chan []byte)
128+
129+
membership1 := New(conf1, inboundMsgCh1)
130+
err = membership1.Setup()
131+
assert.NoError(t, err)
132+
defer membership1.Stop()
133+
134+
membership2 := New(conf2, inboundMsgCh2)
135+
err = membership2.Setup()
136+
assert.NoError(t, err)
137+
defer membership2.Stop()
138+
139+
membership3 := New(conf3, inboundMsgCh3)
140+
err = membership3.Setup()
141+
assert.NoError(t, err)
142+
defer membership3.Stop()
143+
144+
time.Sleep(3 * time.Second)
145+
146+
membership1.SendToOthers([]byte("test message"))
147+
148+
select {
149+
case msg := <-inboundMsgCh2:
150+
assert.Equal(t, []byte("test message"), msg)
151+
case <-time.After(5 * time.Second):
152+
t.Fatal("Did not receive the message in membership2")
153+
}
154+
155+
select {
156+
case msg := <-inboundMsgCh3:
157+
assert.Equal(t, []byte("test message"), msg)
158+
case <-time.After(5 * time.Second):
159+
t.Fatal("Did not receive the message in membership3")
160+
}
161+
}

0 commit comments

Comments
 (0)