Skip to content

Commit 6838bcc

Browse files
authored
Merge pull request #93 from rstudio/fix-leader-ping
Fix unhandled ping requests from the same PgxLeader node
2 parents 2908159 + de10d3e commit 6838bcc

File tree

2 files changed

+96
-13
lines changed

2 files changed

+96
-13
lines changed

pkg/rselection/impls/pgx/leader.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/google/uuid"
15+
1516
"github.com/rstudio/platform-lib/pkg/rselection"
1617
"github.com/rstudio/platform-lib/pkg/rselection/electiontypes"
1718
"github.com/rstudio/platform-lib/pkg/rsnotify/broadcaster"
@@ -134,10 +135,10 @@ func (p *PgxLeader) lead(pingTick, sweepTick <-chan time.Time, stop chan bool) {
134135
l := p.awb.Subscribe(electiontypes.ClusterMessageTypePingResponse)
135136
defer p.awb.Unsubscribe(l)
136137

137-
// Listen for other ping requests. If an expected ping request is received,
138+
// Listen for other ping requests. If an unexpected ping request is received,
138139
// we must demote the leader since there's another leader.
139-
unexpectedPings := p.awb.Subscribe(electiontypes.ClusterMessageTypePing)
140-
defer p.awb.Unsubscribe(unexpectedPings)
140+
leaderPings := p.awb.Subscribe(electiontypes.ClusterMessageTypePing)
141+
defer p.awb.Unsubscribe(leaderPings)
141142

142143
// Listen for requests to enumerate nodes in the cluster.
143144
nodesCh := p.awb.Subscribe(electiontypes.ClusterMessageTypeNodes)
@@ -171,9 +172,9 @@ func (p *PgxLeader) lead(pingTick, sweepTick <-chan time.Time, stop chan bool) {
171172
if cn, ok := n.(*electiontypes.ClusterPingResponse); ok {
172173
go p.handlePingResponse(cn)
173174
}
174-
case n := <-unexpectedPings:
175+
case n := <-leaderPings:
175176
if cn, ok := n.(*electiontypes.ClusterPingRequest); ok {
176-
go p.handleUnexpectedPing(cn)
177+
go p.handleLeaderPing(cn)
177178
}
178179
case n := <-nodesCh:
179180
// This supports receiving a request to enumerate cluster nodes. The leader
@@ -215,12 +216,12 @@ func (p *PgxLeader) info() string {
215216
// this to ensure that the cluster is healthy before running scheduled tasks.
216217
// This helps to prevent split-brain issues in the cluster.
217218
//
218-
// 1. The task handler is ready to run a scheduled task.
219-
// 2. The task handler sends a `chan bool` to its verify channel.
220-
// 3. We receive the `chan bool` here and:
221-
// a. Verify that the cluster is healthy.
222-
// b. Respond with `true` over the channel if the cluster is healthy.
223-
// 4. The task handler runs the scheduled task when the cluster is healthy.
219+
// 1. The task handler is ready to run a scheduled task.
220+
// 2. The task handler sends a `chan bool` to its verify channel.
221+
// 3. We receive the `chan bool` here and:
222+
// a. Verify that the cluster is healthy.
223+
// b. Respond with `true` over the channel if the cluster is healthy.
224+
// 4. The task handler runs the scheduled task when the cluster is healthy.
224225
func (p *PgxLeader) verify(vCh chan bool) {
225226
var err error
226227

@@ -329,11 +330,14 @@ func (p *PgxLeader) handleNodesRequest(cn *electiontypes.ClusterNodesRequest) {
329330
}
330331
}
331332

332-
func (p *PgxLeader) handleUnexpectedPing(cn *electiontypes.ClusterPingRequest) {
333+
func (p *PgxLeader) handleLeaderPing(cn *electiontypes.ClusterPingRequest) {
333334
// If we received a ping from another leader, then stop leading
334335
if cn.SrcAddr != p.address {
335336
p.debugLogger.Debugf("Leader received ping from another leader. Stopping and moving back to the follower loop.")
336337
p.stop <- true
338+
} else {
339+
resp := electiontypes.NewClusterPingResponse(p.address, cn.SrcAddr, p.awb.IP())
340+
p.handlePingResponse(resp)
337341
}
338342
}
339343

pkg/rselection/impls/pgx/leader_test.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ import (
1010

1111
"github.com/fortytw2/leaktest"
1212
"github.com/jackc/pgx/v4/pgxpool"
13+
"gopkg.in/check.v1"
14+
1315
"github.com/rstudio/platform-lib/pkg/rselection"
1416
"github.com/rstudio/platform-lib/pkg/rselection/electiontypes"
1517
"github.com/rstudio/platform-lib/pkg/rsnotify/broadcaster"
1618
"github.com/rstudio/platform-lib/pkg/rsnotify/listener"
1719
"github.com/rstudio/platform-lib/pkg/rsnotify/listeners/postgrespgx"
18-
"gopkg.in/check.v1"
1920
)
2021

2122
type fakeTaskHandler struct {
@@ -363,6 +364,84 @@ func (s *LeaderSuite) TestPingNodes(c *check.C) {
363364
c.Check(leader.unsuccessfulPing(), check.Equals, true)
364365
}
365366

367+
func (s *LeaderSuite) TestLeaderPingSelf(c *check.C) {
368+
defer leaktest.Check(c)
369+
370+
channel := c.TestName()
371+
// Use a real notifier to send the Ping Request notification.
372+
realNotifier := &PgxPgNotifier{pool: s.pool}
373+
// Use a fake notifier to record the Ping Response notification.
374+
fakeNotifier := &dummyNotifier{}
375+
matcher := listener.NewMatcher("MessageType")
376+
matcher.Register(electiontypes.ClusterMessageTypePingResponse, &electiontypes.ClusterPingResponse{})
377+
matcher.Register(electiontypes.ClusterMessageTypePing, &electiontypes.ClusterPingRequest{})
378+
matcher.Register(electiontypes.ClusterMessageTypeNodes, &electiontypes.ClusterNodesNotification{})
379+
plf := postgrespgx.NewPgxListener(postgrespgx.PgxListenerArgs{
380+
Name: channel + "_leader",
381+
Pool: s.pool,
382+
Matcher: matcher,
383+
IpReporter: &listener.TestIPReporter{Ip: "192.168.5.11"},
384+
})
385+
defer plf.Stop()
386+
awbStop := make(chan bool)
387+
awb, err := broadcaster.NewNotificationBroadcaster(plf, awbStop)
388+
c.Assert(err, check.IsNil)
389+
defer func() {
390+
awbStop <- true
391+
}()
392+
stop := make(chan bool)
393+
394+
pingCh := make(chan bool)
395+
defer close(pingCh)
396+
397+
leader := &PgxLeader{
398+
awb: awb,
399+
notify: fakeNotifier,
400+
chLeader: "leader",
401+
chFollower: "follower",
402+
address: "leader",
403+
stop: stop,
404+
nodes: map[string]*electiontypes.ClusterNode{},
405+
pingResponseChTEST: pingCh,
406+
taskHandler: &fakeTaskHandler{},
407+
debugLogger: &fakeLogger{},
408+
traceLogger: &fakeLogger{},
409+
}
410+
411+
// Notified when leader exits
412+
done := make(chan struct{})
413+
go func() {
414+
defer close(done)
415+
leader.lead(nil, nil, stop)
416+
}()
417+
418+
// Receive notification from self
419+
msgBytes, err := json.Marshal(&electiontypes.ClusterPingRequest{
420+
ClusterNotification: electiontypes.ClusterNotification{
421+
GuidVal: "65db0d7d-8db1-4fa8-bc2a-58fad248507f",
422+
MessageType: electiontypes.ClusterMessageTypePing,
423+
SrcAddr: "leader",
424+
},
425+
})
426+
c.Assert(err, check.IsNil)
427+
428+
now := time.Now()
429+
wait(pingCh, func() {
430+
err = realNotifier.Notify(channel+"_leader", msgBytes)
431+
c.Assert(err, check.IsNil)
432+
})
433+
c.Assert(len(leader.nodes), check.Equals, 1)
434+
node, ok := leader.nodes["leader_192.168.5.11"]
435+
c.Assert(ok, check.Equals, true)
436+
c.Assert(node.Ping.After(now), check.Equals, true)
437+
438+
// Stop
439+
close(stop)
440+
441+
// Wait for exit
442+
<-done
443+
}
444+
366445
func (s *LeaderSuite) TestLeaderDemotion(c *check.C) {
367446
defer leaktest.Check(c)
368447

0 commit comments

Comments
 (0)