Skip to content

Commit 060d706

Browse files
authored
Merge pull request #118 from ohkinozomu/testcluster-etcd-tests
testCluster: Add etcd integration tests
2 parents 06fe601 + d26aa34 commit 060d706

File tree

2 files changed

+135
-6
lines changed

2 files changed

+135
-6
lines changed

cluster/agent_test.go

+132-5
Original file line numberDiff line numberDiff line change
@@ -134,18 +134,144 @@ func TestCluster_Hashicorp_Memberlist(t *testing.T) {
134134
testCluster(t, conf1, conf2, conf3)
135135
}
136136

137+
func TestCluster_Etcd_Serf(t *testing.T) {
138+
bindPort1, err := utils.GetFreePort()
139+
require.NoError(t, err, "Failed to get free port for node1")
140+
raftPort1, err := utils.GetFreePort()
141+
require.NoError(t, err, "Failed to get free port for node1 Raft")
142+
143+
bindPort2, err := utils.GetFreePort()
144+
require.NoError(t, err, "Failed to get free port for node2")
145+
raftPort2, err := utils.GetFreePort()
146+
require.NoError(t, err, "Failed to get free port for node2 Raft")
147+
148+
bindPort3, err := utils.GetFreePort()
149+
require.NoError(t, err, "Failed to get free port for node3")
150+
raftPort3, err := utils.GetFreePort()
151+
require.NoError(t, err, "Failed to get free port for node3 Raft")
152+
153+
members := []string{
154+
"127.0.0.1:" + strconv.Itoa(bindPort1),
155+
"127.0.0.1:" + strconv.Itoa(bindPort2),
156+
"127.0.0.1:" + strconv.Itoa(bindPort3),
157+
}
158+
159+
conf1 := &config.Cluster{
160+
NodeName: "1",
161+
RaftImpl: config.RaftImplEtcd,
162+
BindAddr: "127.0.0.1",
163+
BindPort: bindPort1,
164+
RaftPort: raftPort1,
165+
RaftBootstrap: true,
166+
RaftDir: t.TempDir(),
167+
GrpcEnable: false,
168+
Members: members,
169+
DiscoveryWay: config.DiscoveryWaySerf,
170+
NodesFileDir: t.TempDir(),
171+
}
172+
conf2 := &config.Cluster{
173+
NodeName: "2",
174+
RaftImpl: config.RaftImplEtcd,
175+
BindAddr: "127.0.0.1",
176+
BindPort: bindPort2,
177+
RaftPort: raftPort2,
178+
RaftBootstrap: false,
179+
RaftDir: t.TempDir(),
180+
GrpcEnable: false,
181+
Members: members,
182+
DiscoveryWay: config.DiscoveryWaySerf,
183+
NodesFileDir: t.TempDir(),
184+
}
185+
conf3 := &config.Cluster{
186+
NodeName: "3",
187+
RaftImpl: config.RaftImplEtcd,
188+
BindAddr: "127.0.0.1",
189+
BindPort: bindPort3,
190+
RaftPort: raftPort3,
191+
RaftBootstrap: false,
192+
RaftDir: t.TempDir(),
193+
GrpcEnable: false,
194+
Members: members,
195+
DiscoveryWay: config.DiscoveryWaySerf,
196+
NodesFileDir: t.TempDir(),
197+
}
198+
testCluster(t, conf1, conf2, conf3)
199+
}
200+
201+
func TestCluster_Etcd_Memberlist(t *testing.T) {
202+
bindPort1, err := utils.GetFreePort()
203+
require.NoError(t, err, "Failed to get free port for node1")
204+
205+
bindPort2, err := utils.GetFreePort()
206+
require.NoError(t, err, "Failed to get free port for node2")
207+
208+
bindPort3, err := utils.GetFreePort()
209+
require.NoError(t, err, "Failed to get free port for node3")
210+
211+
members := []string{
212+
"127.0.0.1:" + strconv.Itoa(bindPort1),
213+
"127.0.0.1:" + strconv.Itoa(bindPort2),
214+
"127.0.0.1:" + strconv.Itoa(bindPort3),
215+
}
216+
217+
conf1 := &config.Cluster{
218+
NodeName: "1",
219+
RaftImpl: config.RaftImplEtcd,
220+
BindAddr: "127.0.0.1",
221+
BindPort: bindPort1,
222+
RaftPort: mlist.GetRaftPortFromBindPort(bindPort1),
223+
RaftBootstrap: true,
224+
RaftDir: t.TempDir(),
225+
GrpcEnable: false,
226+
Members: members,
227+
DiscoveryWay: config.DiscoveryWayMemberlist,
228+
NodesFileDir: t.TempDir(),
229+
}
230+
conf2 := &config.Cluster{
231+
NodeName: "2",
232+
RaftImpl: config.RaftImplEtcd,
233+
BindAddr: "127.0.0.1",
234+
BindPort: bindPort2,
235+
RaftPort: mlist.GetRaftPortFromBindPort(bindPort2),
236+
RaftBootstrap: false,
237+
RaftDir: t.TempDir(),
238+
GrpcEnable: false,
239+
Members: members,
240+
DiscoveryWay: config.DiscoveryWayMemberlist,
241+
NodesFileDir: t.TempDir(),
242+
}
243+
conf3 := &config.Cluster{
244+
NodeName: "3",
245+
RaftImpl: config.RaftImplEtcd,
246+
BindAddr: "127.0.0.1",
247+
BindPort: bindPort3,
248+
RaftPort: mlist.GetRaftPortFromBindPort(bindPort3),
249+
RaftBootstrap: false,
250+
RaftDir: t.TempDir(),
251+
GrpcEnable: false,
252+
Members: members,
253+
DiscoveryWay: config.DiscoveryWayMemberlist,
254+
NodesFileDir: t.TempDir(),
255+
}
256+
testCluster(t, conf1, conf2, conf3)
257+
}
258+
137259
func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, conf3 *config.Cluster) {
138260
log.Init(log.DefaultOptions())
139261

140262
agent1 := NewAgent(conf1)
141263
err := agent1.Start()
142264
require.NoError(t, err, "Agent start failed for node: %s", conf1.NodeName)
143265

266+
time.Sleep(1 * time.Second)
267+
144268
agent2 := NewAgent(conf2)
145269
err = agent2.Start()
146270
defer agent2.Stop()
147271
require.NoError(t, err, "Agent start failed for node: %s", conf2.NodeName)
148272

273+
time.Sleep(1 * time.Second)
274+
149275
agent3 := NewAgent(conf3)
150276
err = agent3.Start()
151277
defer agent3.Stop()
@@ -157,9 +283,9 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con
157283
_, leader2 := agent2.raftPeer.GetLeader()
158284
_, leader3 := agent3.raftPeer.GetLeader()
159285

160-
require.Equal(t, leader1, "node1")
161-
require.Equal(t, leader2, "node1")
162-
require.Equal(t, leader3, "node1")
286+
require.Equal(t, leader1, conf1.NodeName)
287+
require.Equal(t, leader2, conf1.NodeName)
288+
require.Equal(t, leader3, conf1.NodeName)
163289

164290
members1 := agent1.GetMemberList()
165291
members2 := agent2.GetMemberList()
@@ -177,13 +303,14 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con
177303
_, newLeader3 := agent3.raftPeer.GetLeader()
178304

179305
// Check that either agent2 or agent3 becomes the new leader
180-
if newLeader2 == "node2" || newLeader2 == "node3" {
306+
if newLeader2 == conf2.NodeName || newLeader3 == conf3.NodeName {
181307
require.Equal(t, newLeader2, newLeader3, "Leaders should be the same for agent2 and agent3")
182308
} else {
183309
require.Fail(t, "New leader should be either node2 or node3")
184310
}
185311

186312
// Restart agent1 and verify it is a follower
313+
t.Log("Restarting agent1")
187314
restartedAgent1 := NewAgent(conf1)
188315
err = restartedAgent1.Start()
189316
require.NoError(t, err, "Agent restart failed for node: %s", conf1.NodeName)
@@ -198,7 +325,7 @@ func testCluster(t *testing.T, conf1 *config.Cluster, conf2 *config.Cluster, con
198325
require.Equal(t, leaderAfterRestart2, leaderAfterRestart1)
199326
require.Equal(t, leaderAfterRestart3, leaderAfterRestart1)
200327

201-
require.NotEqual(t, leaderAfterRestart1, "node1", "After restart, node1 should not be the leader")
328+
require.NotEqual(t, leaderAfterRestart1, conf1.NodeName, "After restart, node1 should not be the leader")
202329

203330
t.Log("Test completed successfully")
204331
}

cluster/raft/etcd/peer.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ func (p *Peer) serveChannels() {
368368
return
369369

370370
case <-p.stopC:
371-
p.Stop()
371+
//p.Stop()
372372
return
373373
}
374374
}
@@ -603,6 +603,8 @@ func (p *Peer) writeError(err error) {
603603

604604
func (p *Peer) Stop() {
605605
p.stopHTTP()
606+
close(p.proposeC)
607+
close(p.confChangeC)
606608
close(p.commitC)
607609
close(p.errorC)
608610
p.node.Stop()

0 commit comments

Comments
 (0)