Skip to content

Commit

Permalink
Implement leader sync task
Browse files Browse the repository at this point in the history
  • Loading branch information
kysre committed Feb 10, 2024
1 parent ae59cb9 commit 7619e1c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 11 deletions.
4 changes: 3 additions & 1 deletion leader/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type QueueConfig struct {
}

type LeaderConfig struct {
ReplicaHost string
ReplicaHost string
LeaderSyncPeriod int

DataNodeStateCheckPeriod int
DataNodeRemainingCheckPeriod int
Expand All @@ -48,6 +49,7 @@ func LoadConfig(cmd *cobra.Command) (*Config, error) {
viper.SetDefault("queue.listenPort", 8888)

viper.SetDefault("leader.ReplicaHost", "leader_1")
viper.SetDefault("leader.LeaderSyncPeriod", 10)
viper.SetDefault("leader.DataNodeStateCheckPeriod", 30)
viper.SetDefault("leader.DataNodeRemainingCheckPeriod", 2)
viper.SetDefault("leader.DataNodePartitionCount", 4)
Expand Down
24 changes: 20 additions & 4 deletions leader/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func serve(cmd *cobra.Command, args []string) {
logger := getLoggerOrPanic(config)
directory := getDataNodeDirectoryOrPanic()
balancer := getLoadBalancerOrPanic(logger, directory)
runDataNodesTasks(config, logger, directory, balancer)
consensusHandler := getConsensusOrPanic(config)

runTasks(config, logger, directory, balancer, consensusHandler)

// Get grpc server cores
queueCore := getQueueCore(logger, directory, balancer)
Expand Down Expand Up @@ -101,13 +103,27 @@ func getLoadBalancerOrPanic(log *logrus.Logger, directory *models.DataNodeDirect
return loadbalancer.NewBalancer(log, directory)
}

func runDataNodesTasks(
conf *Config, log *logrus.Logger, directory *models.DataNodeDirectory, balancer loadbalancer.Balancer,
func getConsensusOrPanic(conf *Config) *models.LeaderConsensusHandler {
return models.NewLeaderConsensusHandler(conf.Leader.ReplicaHost)
}

func runTasks(
conf *Config,
log *logrus.Logger,
directory *models.DataNodeDirectory,
balancer loadbalancer.Balancer,
consensusHandler *models.LeaderConsensusHandler,
) {
// Init resources
syncer := tasks.NewDataNodeSyncer(
log, directory, balancer, conf.Leader.DataNodePartitionCount, conf.Leader.DataNodeSyncTimeout)
log, directory, balancer, conf.Leader.DataNodePartitionCount,
conf.Leader.DataNodeSyncTimeout, consensusHandler.AmIMaster(),
)
healthChecker := tasks.NewDataNodeHealthChecker(directory, conf.Leader.DataNodeStateCheckPeriod, syncer)
leaderSyncer := tasks.NewLeaderSyncer(log, consensusHandler, directory, conf.Leader.LeaderSyncPeriod)
// Run tasks
go healthChecker.RunHealthChecks()
go leaderSyncer.RunLeaderSync()
go tasks.RunRemainingCheck(directory, conf.Leader.DataNodeRemainingCheckPeriod)
}

Expand Down
7 changes: 7 additions & 0 deletions leader/cmd/tasks/datanode_data_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type DataNodeDataSyncer struct {
balancer loadbalancer.Balancer
partitionCount int
dataSyncTimeout int
shouldSync bool
}

func NewDataNodeSyncer(
Expand All @@ -28,13 +29,15 @@ func NewDataNodeSyncer(
balancer loadbalancer.Balancer,
partitionCount int,
dataSyncTimeout int,
shouldSync bool,
) *DataNodeDataSyncer {
return &DataNodeDataSyncer{
logger: logger,
dataNodeDirectory: dataNodeDirectory,
balancer: balancer,
partitionCount: partitionCount,
dataSyncTimeout: dataSyncTimeout,
shouldSync: shouldSync,
}
}

Expand All @@ -46,6 +49,10 @@ func NewDataNodeSyncer(
// - Data of datanode[i-1] should be read -> write to datanode[i+1] replica data

func (s *DataNodeDataSyncer) SyncData(failedDataNode *models.DataNode) {
// Not sync data in leader's replica
if !s.shouldSync {
return
}
s.logger.Info(fmt.Sprintf("Start datasync for datanode[%d]", failedDataNode.ID))
// Create context for requests
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.dataSyncTimeout)*time.Second)
Expand Down
47 changes: 47 additions & 0 deletions leader/cmd/tasks/leader_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tasks

import (
"context"
"time"

"github.com/sirupsen/logrus"

"github.com/kysre/TurtleMQ/leader/internal/models"
)

type LeaderSyncer struct {
logger *logrus.Logger
handler *models.LeaderConsensusHandler
directory *models.DataNodeDirectory
period int
}

func NewLeaderSyncer(
logger *logrus.Logger,
handler *models.LeaderConsensusHandler,
directory *models.DataNodeDirectory,
period int,
) *LeaderSyncer {
return &LeaderSyncer{
logger: logger,
handler: handler,
directory: directory,
period: period,
}
}

func (ls *LeaderSyncer) RunLeaderSync() {
ls.logger.Info("Running leader sync")
tickerPeriod := time.Duration(ls.period) * time.Second
ticker := time.NewTicker(tickerPeriod)
for {
select {
case <-ticker.C:
if ls.handler.IsReplicaAvailable() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ls.handler.AddDataNodesToReplica(ctx, ls.directory.DataNodes)
cancel()
}
}
}
}
19 changes: 13 additions & 6 deletions leader/internal/models/consensus_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models

import (
"context"
"fmt"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -10,32 +11,38 @@ import (
"github.com/kysre/TurtleMQ/leader/pkg/leader"
)

const MasterAddr = "leader_0"
const MasterHost = "leader_0"

type LeaderConsensusHandler struct {
replicaAddr string
replicaClient clients.LeaderClient
isMaster bool
}

func NewLeaderConsensusHandler(addr string) *LeaderConsensusHandler {
client, err := clients.NewLeaderClient(addr)
func NewLeaderConsensusHandler(replicaHost string) *LeaderConsensusHandler {
client, err := clients.NewLeaderClient(fmt.Sprintf("%s:8888", replicaHost))
if err != nil {
logrus.Error(err)
}
return &LeaderConsensusHandler{
replicaAddr: addr,
replicaAddr: replicaHost,
replicaClient: client,
isMaster: addr == MasterAddr,
isMaster: replicaHost != MasterHost,
}
}

func (l *LeaderConsensusHandler) AmIMaster() bool {
return l.isMaster
}

func (l *LeaderConsensusHandler) IsReplicaAvailable() bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return l.replicaClient.IsHealthy(ctx)
}

func (l *LeaderConsensusHandler) AddDataNodesToReplica(ctx context.Context, dataNodes []*DataNode) {
tickerPeriod := time.Duration(500) * time.Millisecond
tickerPeriod := time.Duration(200) * time.Millisecond
ticker := time.NewTicker(tickerPeriod)
for _, node := range dataNodes {
select {
Expand Down

0 comments on commit 7619e1c

Please sign in to comment.