Skip to content

Commit

Permalink
Impelement datanode data syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
kysre committed Feb 10, 2024
1 parent b745f21 commit 5b10788
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 43 deletions.
2 changes: 2 additions & 0 deletions leader/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type LeaderConfig struct {
DataNodeStateCheckPeriod int
DataNodeRemainingCheckPeriod int
DataNodePartitionCount int
DataNodeSyncTimeout int
}

type MetricConfig struct {
Expand All @@ -52,6 +53,7 @@ func LoadConfig(cmd *cobra.Command) (*Config, error) {
viper.SetDefault("leader.DataNodeStateCheckPeriod", 30)
viper.SetDefault("leader.DataNodeRemainingCheckPeriod", 2)
viper.SetDefault("leader.DataNodePartitionCount", 4)
viper.SetDefault("leader.DataNodeSyncTimeout", 300)

viper.SetDefault("metric.ListenPort", 9000)

Expand Down
17 changes: 13 additions & 4 deletions leader/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func serve(cmd *cobra.Command, args []string) {

// Get shared resources
logger := getLoggerOrPanic(config)
directory := getDataNodeDirectoryOrPanic(config)
directory := getDataNodeDirectoryOrPanic()
balancer := getLoadBalancerOrPanic(logger, directory)
runDataNodesTasks(config, logger, directory, balancer)

// Get grpc server cores
queueCore := getQueueCore(logger, directory, balancer)
Expand Down Expand Up @@ -88,20 +89,28 @@ func getLoggerOrPanic(conf *Config) *logrus.Logger {
return logger
}

func getDataNodeDirectoryOrPanic(conf *Config) *models.DataNodeDirectory {
func getDataNodeDirectoryOrPanic() *models.DataNodeDirectory {
directory := models.NewDataNodeDirectory()
if directory == nil {
panic("DataNodeDirectory is nil")
}
go tasks.RunHealthChecks(directory, conf.Leader.DataNodeStateCheckPeriod)
go tasks.RunRemainingCheck(directory, conf.Leader.DataNodeRemainingCheckPeriod)
return directory
}

func getLoadBalancerOrPanic(log *logrus.Logger, directory *models.DataNodeDirectory) loadbalancer.Balancer {
return loadbalancer.NewBalancer(log, directory)
}

func runDataNodesTasks(
conf *Config, log *logrus.Logger, directory *models.DataNodeDirectory, balancer loadbalancer.Balancer,
) {
syncer := tasks.NewDataNodeSyncer(
log, directory, balancer, conf.Leader.DataNodePartitionCount, conf.Leader.DataNodeSyncTimeout)
healthChecker := tasks.NewDataNodeHealthChecker(directory, conf.Leader.DataNodeStateCheckPeriod, syncer)
go healthChecker.RunHealthChecks()
go tasks.RunRemainingCheck(directory, conf.Leader.DataNodeRemainingCheckPeriod)
}

func getQueueCore(
log *logrus.Logger, directory *models.DataNodeDirectory, balancer loadbalancer.Balancer,
) queue.QueueServer {
Expand Down
108 changes: 108 additions & 0 deletions leader/cmd/tasks/datanode_data_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package tasks

import (
"context"
"fmt"
"time"

"github.com/sirupsen/logrus"

"github.com/kysre/TurtleMQ/leader/internal/app/loadbalancer"
"github.com/kysre/TurtleMQ/leader/internal/clients"
"github.com/kysre/TurtleMQ/leader/internal/models"
"github.com/kysre/TurtleMQ/leader/pkg/datanode"
)

type DataNodeDataSyncer struct {
logger *logrus.Logger
dataNodeDirectory *models.DataNodeDirectory
balancer loadbalancer.Balancer
partitionCount int
dataSyncTimeout int
}

func NewDataNodeSyncer(
logger *logrus.Logger,
dataNodeDirectory *models.DataNodeDirectory,
balancer loadbalancer.Balancer,
partitionCount int,
dataSyncTimeout int,
) *DataNodeDataSyncer {
return &DataNodeDataSyncer{
logger: logger,
dataNodeDirectory: dataNodeDirectory,
balancer: balancer,
partitionCount: partitionCount,
dataSyncTimeout: dataSyncTimeout,
}
}

// Data of datanode[i] is replicated in datanode[i+1]
// After datanode[i] fails:
// - Replica data of datanode[i+1] should be read -> push to datanode[i+1] data
// - Replica data of datanode[i+1] should be purged
// - Data of datanode[i-1] should be read -> write to datanode[i+1] replica data

func (s *DataNodeDataSyncer) SyncData(failedDataNode *models.DataNode) {
s.logger.Info(fmt.Sprintf("Start datasync for datanode[%d]", failedDataNode.ID))
// Create context for requests
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.dataSyncTimeout)*time.Second)
defer cancel()
// Get dataNodes that are present in data sync
prevDataNode, afterDataNode, err := s.balancer.GetPreviousAndAfterDataNodesForSync(failedDataNode.Address)
if err != nil {
s.logger.Error(err)
}
// Halt push & pull to dataNodes that are present in data sync
s.dataNodeDirectory.UpdateDataNodeState(failedDataNode.ID, models.DataNodeStatePENDING)
s.dataNodeDirectory.UpdateDataNodeState(prevDataNode.ID, models.DataNodeStatePENDING)
s.dataNodeDirectory.UpdateDataNodeState(afterDataNode.ID, models.DataNodeStatePENDING)
// Step 1
for i := 0; i < s.partitionCount; i++ {
res, err := afterDataNode.Client.ReadPartition(
ctx, &datanode.ReadPartitionRequest{PartitionIndex: int32(i), IsReplica: true})
if err != nil {
s.logger.Error(err)
continue
}
go s.pushMessagesToDataNode(ctx, afterDataNode.Client, res.PartitionMessages)
}
// Step 2
err = afterDataNode.Client.PurgeReplicaData(ctx)
if err != nil {
s.logger.Error(err)
}
// Step 3
for i := 0; i < s.partitionCount; i++ {
res, err := prevDataNode.Client.ReadPartition(
ctx, &datanode.ReadPartitionRequest{PartitionIndex: int32(i), IsReplica: false})
if err != nil {
s.logger.Error(err)
continue
}
req := datanode.WritePartitionRequest{PartitionIndex: int32(i), IsReplica: true}
req.PartitionMessages = append(req.PartitionMessages, res.PartitionMessages...)
err = afterDataNode.Client.WritePartition(ctx, &req)
if err != nil {
s.logger.Error(err)
}
}
// Resume push & pull to dataNodes that are present in data sync
s.dataNodeDirectory.UpdateDataNodeState(failedDataNode.ID, models.DataNodeStateUNHEALTHY)
s.dataNodeDirectory.UpdateDataNodeState(prevDataNode.ID, models.DataNodeStateAVAILABLE)
s.dataNodeDirectory.UpdateDataNodeState(afterDataNode.ID, models.DataNodeStateAVAILABLE)
s.logger.Info(fmt.Sprintf("Done datasync for datanode[%d]", failedDataNode.ID))
}

func (s *DataNodeDataSyncer) pushMessagesToDataNode(
ctx context.Context, client clients.DataNodeClient, messages []*datanode.QueueMessage) {
for _, message := range messages {
req := datanode.PushRequest{IsReplica: false, Message: message}
reqCtx, cancel := context.WithTimeout(ctx, time.Duration(3)*time.Second)
_, err := client.Push(reqCtx, &req)
if err != nil {
s.logger.Error(err)
}
cancel()
}
}
38 changes: 30 additions & 8 deletions leader/cmd/tasks/datanode_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,48 @@ import (
"github.com/kysre/TurtleMQ/leader/internal/models"
)

func RunHealthChecks(dataNodeDirectory *models.DataNodeDirectory, healthCheckPeriod int) {
tickerPeriod := time.Duration(healthCheckPeriod) * time.Second
type DataNodeHealthChecker struct {
dataNodeDirectory *models.DataNodeDirectory
healthCheckPeriod int
dataNodeDataSyncer *DataNodeDataSyncer
}

func NewDataNodeHealthChecker(
dataNodeDirectory *models.DataNodeDirectory,
healthCheckPeriod int,
dataNodeDataSyncer *DataNodeDataSyncer,
) *DataNodeHealthChecker {
return &DataNodeHealthChecker{
dataNodeDirectory: dataNodeDirectory,
healthCheckPeriod: healthCheckPeriod,
dataNodeDataSyncer: dataNodeDataSyncer,
}
}

func (hc *DataNodeHealthChecker) RunHealthChecks() {
tickerPeriod := time.Duration(hc.healthCheckPeriod) * time.Second
ticker := time.NewTicker(tickerPeriod)
for {
select {
case <-ticker.C:
logrus.Info("Running DataNode health-check")
checkNodes(dataNodeDirectory)
hc.checkNodes()
}
}
}

func checkNodes(dataNodeDirectory *models.DataNodeDirectory) {
dataNodes := dataNodeDirectory.DataNodes
func (hc *DataNodeHealthChecker) checkNodes() {
dataNodes := hc.dataNodeDirectory.DataNodes
for i, node := range dataNodes {
if node.State == models.DataNodeStateUNHEALTHY {
if node.State != models.DataNodeStatePENDING {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
isHealthy := node.Client.IsHealthy(ctx)
if isHealthy {
dataNodeDirectory.UpdateDataNodeState(i, models.DataNodeStateAVAILABLE)
if node.State == models.DataNodeStateAVAILABLE && !isHealthy {
hc.dataNodeDirectory.UpdateDataNodeState(i, models.DataNodeStateUNHEALTHY)
logrus.Info(fmt.Sprintf("DataNode [%d] is not healthy!", i))
go hc.dataNodeDataSyncer.SyncData(node)
} else if node.State == models.DataNodeStateUNHEALTHY && isHealthy {
hc.dataNodeDirectory.UpdateDataNodeState(i, models.DataNodeStateAVAILABLE)
logrus.Info(fmt.Sprintf("DataNode [%d] became healthy!", i))
}
cancel()
Expand Down
2 changes: 1 addition & 1 deletion leader/cmd/tasks/datanode_remaining_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func RunRemainingCheck(dataNodeDirectory *models.DataNodeDirectory, remainingChe
func updateNodesRemaining(dataNodeDirectory *models.DataNodeDirectory) {
dataNodes := dataNodeDirectory.DataNodes
for _, node := range dataNodes {
if node.State != models.DataNodeStateUNHEALTHY {
if node.State == models.DataNodeStateAVAILABLE {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
count, err := node.Client.GetRemainingMessagesCount(ctx)
if err != nil {
Expand Down
55 changes: 44 additions & 11 deletions leader/internal/app/loadbalancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (

"github.com/kysre/TurtleMQ/leader/internal/clients"
"github.com/kysre/TurtleMQ/leader/internal/models"
"github.com/kysre/TurtleMQ/leader/pkg/errors"
)

type Balancer interface {
AddDataNodeToHashCircle(datanode *models.DataNode) error
GetPushDataNodeAndReplicaClient(
ctx context.Context, token string) (clients.DataNodeClient, clients.DataNodeClient, error)
GetPullDataNodeClient(ctx context.Context) (clients.DataNodeClient, error)

GetPreviousAndAfterDataNodesForSync(addr string) (*models.DataNode, *models.DataNode, error)
}

type balancer struct {
Expand Down Expand Up @@ -50,40 +53,52 @@ func (b *balancer) GetPushDataNodeAndReplicaClient(
if err != nil {
return nil, nil, err
}
replicaIndex := b.getDataNodeReplicaIndex(index)
// Get datanode
dataNodeHash := b.datanodeHashSortedSlice[index]
dataNode, err := b.directory.GetDataNode(ctx, b.dataNodeHashMap[dataNodeHash])
if err != nil {
dataNode := b.directory.GetDataNode(b.dataNodeHashMap[dataNodeHash])
if dataNode.State == models.DataNodeStatePENDING {
return nil, nil, err
}
if dataNode.State == models.DataNodeStateUNHEALTHY {
index = index + 1
dataNodeHash = b.datanodeHashSortedSlice[index]
dataNode = b.directory.GetDataNode(b.dataNodeHashMap[dataNodeHash])
}
// Get datanode replica
replicaIndex := b.getDataNodeReplicaIndex(index)
dataNodeReplicaHash := b.datanodeHashSortedSlice[replicaIndex]
dataNodeReplica, err := b.directory.GetDataNode(ctx, b.dataNodeHashMap[dataNodeReplicaHash])
if err != nil {
dataNodeReplica := b.directory.GetDataNode(b.dataNodeHashMap[dataNodeReplicaHash])
if dataNodeReplica.State == models.DataNodeStatePENDING {
return nil, nil, err
}
if dataNodeReplica.State == models.DataNodeStateUNHEALTHY {
replicaIndex = replicaIndex + 1
dataNodeReplicaHash = b.datanodeHashSortedSlice[replicaIndex]
dataNodeReplica = b.directory.GetDataNode(b.dataNodeHashMap[dataNodeReplicaHash])
}
return dataNode.Client, dataNodeReplica.Client, nil
}

// TODO: Add randomness to loadbalancer for pull

func (b *balancer) GetPullDataNodeClient(ctx context.Context) (clients.DataNodeClient, error) {
maxRemainingMsgHash := ""
maxRemainingMsgCount := 0
for i := 0; i < 2*len(b.datanodeHashSortedSlice); i++ {
dnHash := b.datanodeHashSortedSlice[i%len(b.datanodeHashSortedSlice)]
dn, _ := b.directory.GetDataNode(ctx, b.dataNodeHashMap[dnHash])
if dn == nil {
dn := b.directory.GetDataNode(b.dataNodeHashMap[dnHash])
if dn.State != models.DataNodeStateAVAILABLE {
continue
}
if dn.RemainingMsgCount > maxRemainingMsgCount {
maxRemainingMsgHash = dnHash
maxRemainingMsgCount = dn.RemainingMsgCount
}
}
dn, err := b.directory.GetDataNode(ctx, b.dataNodeHashMap[maxRemainingMsgHash])
if err != nil {
return nil, err
if maxRemainingMsgHash == "" {
return nil, errors.New("No AVAILABLE datanode!")
}
dn := b.directory.GetDataNode(b.dataNodeHashMap[maxRemainingMsgHash])
return dn.Client, nil
}

Expand Down Expand Up @@ -145,7 +160,25 @@ func (b *balancer) getLessOrEqualIndexInHashCircle(hash string) (int, error) {
return index, nil
}

// Hash Ring implementation
// Hash Ring implementation (Datanode[i]'s data is replicated in Datanode[i+1])
func (b *balancer) getDataNodeReplicaIndex(i int) int {
return (i + 1) % len(b.datanodeHashSortedSlice)
}

func (b *balancer) GetPreviousAndAfterDataNodesForSync(addr string) (*models.DataNode, *models.DataNode, error) {
datanodeHash, err := b.getHash(addr)
if err != nil {
return nil, nil, err
}
index, err := b.getLessOrEqualIndexInHashCircle(datanodeHash)
if err != nil {
return nil, nil, err
}
prevIndex := (index - 1 + len(b.datanodeHashSortedSlice)) % len(b.datanodeHashSortedSlice)
prevDataNodeHash := b.datanodeHashSortedSlice[prevIndex]
afterIndex := (index + 1) % len(b.datanodeHashSortedSlice)
afterDataNodeHash := b.datanodeHashSortedSlice[afterIndex]
prevDataNode := b.directory.GetDataNode(b.dataNodeHashMap[prevDataNodeHash])
afterDataNode := b.directory.GetDataNode(b.dataNodeHashMap[afterDataNodeHash])
return prevDataNode, afterDataNode, nil
}
21 changes: 2 additions & 19 deletions leader/internal/models/datanode.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package models

import (
"context"
"sync"

"github.com/kysre/TurtleMQ/leader/internal/clients"
"github.com/kysre/TurtleMQ/leader/pkg/errors"
)

type DataNodeState string
Expand Down Expand Up @@ -44,24 +42,9 @@ func (d *DataNodeDirectory) AddDataNode(dataNode *DataNode) error {
return nil
}

func (d *DataNodeDirectory) GetDataNode(ctx context.Context, index int) (*DataNode, error) {
func (d *DataNodeDirectory) GetDataNode(index int) *DataNode {
dataNode := d.DataNodes[index]
if dataNode.State != DataNodeStateAVAILABLE {
return nil, errors.New("PENDING")
}
healthy := dataNode.Client.IsHealthy(ctx)
if !healthy {
d.MX.Lock()
dataNode.State = DataNodeStateUNHEALTHY
d.MX.Unlock()
}
if dataNode.State == DataNodeStatePENDING {
return nil, errors.New("PENDING")
}
if dataNode.State == DataNodeStateUNHEALTHY {
return nil, errors.New("PENDING")
}
return dataNode, nil
return dataNode
}

func (d *DataNodeDirectory) UpdateDataNodeState(index int, state DataNodeState) {
Expand Down

0 comments on commit 5b10788

Please sign in to comment.