Skip to content

Commit

Permalink
Implement data sync between datanodes (#8)
Browse files Browse the repository at this point in the history
* Implement hash ring in load balancer

* Replicate messages in datanode's replica

* Add more error logs

* Add datanode partition count to leader configs

* Add data sync rpcs to datanode client

* Impelement datanode data syncer

* Increase max workers of threadpool

* Fix env vars of leader

* Remove datanode from hash circle after failing
  • Loading branch information
kysre authored Feb 10, 2024
1 parent 3425e85 commit 29e2cd9
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 57 deletions.
2 changes: 1 addition & 1 deletion datanode/src/datanode_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def serve():

datanode_name = ConfigManager.get_prop('datanode_name')

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server = grpc.server(futures.ThreadPoolExecutor(max_workers=50))
datanode_pb2_grpc.add_DataNodeServicer_to_server(DataNode(partitions_count, home_path), server)

server.add_insecure_port('[::]:' + port)
Expand Down
4 changes: 4 additions & 0 deletions leader/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type LeaderConfig struct {

DataNodeStateCheckPeriod int
DataNodeRemainingCheckPeriod int
DataNodePartitionCount int
DataNodeSyncTimeout int
}

type MetricConfig struct {
Expand All @@ -50,6 +52,8 @@ func LoadConfig(cmd *cobra.Command) (*Config, error) {
viper.SetDefault("leader.ReplicaHost", "localhost")
viper.SetDefault("leader.DataNodeStateCheckPeriod", 30)
viper.SetDefault("leader.DataNodeRemainingCheckPeriod", 2)
viper.SetDefault("leader.DataNodePartitionCount", 4)
viper.SetDefault("leader.DataNodeSyncTimeout", 300)

viper.SetDefault("metric.ListenPort", 9000)

Expand Down
17 changes: 13 additions & 4 deletions leader/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func serve(cmd *cobra.Command, args []string) {

// Get shared resources
logger := getLoggerOrPanic(config)
directory := getDataNodeDirectoryOrPanic(config)
directory := getDataNodeDirectoryOrPanic()
balancer := getLoadBalancerOrPanic(logger, directory)
runDataNodesTasks(config, logger, directory, balancer)

// Get grpc server cores
queueCore := getQueueCore(logger, directory, balancer)
Expand Down Expand Up @@ -88,20 +89,28 @@ func getLoggerOrPanic(conf *Config) *logrus.Logger {
return logger
}

func getDataNodeDirectoryOrPanic(conf *Config) *models.DataNodeDirectory {
func getDataNodeDirectoryOrPanic() *models.DataNodeDirectory {
directory := models.NewDataNodeDirectory()
if directory == nil {
panic("DataNodeDirectory is nil")
}
go tasks.RunHealthChecks(directory, conf.Leader.DataNodeStateCheckPeriod)
go tasks.RunRemainingCheck(directory, conf.Leader.DataNodeRemainingCheckPeriod)
return directory
}

func getLoadBalancerOrPanic(log *logrus.Logger, directory *models.DataNodeDirectory) loadbalancer.Balancer {
return loadbalancer.NewBalancer(log, directory)
}

func runDataNodesTasks(
conf *Config, log *logrus.Logger, directory *models.DataNodeDirectory, balancer loadbalancer.Balancer,
) {
syncer := tasks.NewDataNodeSyncer(
log, directory, balancer, conf.Leader.DataNodePartitionCount, conf.Leader.DataNodeSyncTimeout)
healthChecker := tasks.NewDataNodeHealthChecker(directory, conf.Leader.DataNodeStateCheckPeriod, syncer)
go healthChecker.RunHealthChecks()
go tasks.RunRemainingCheck(directory, conf.Leader.DataNodeRemainingCheckPeriod)
}

func getQueueCore(
log *logrus.Logger, directory *models.DataNodeDirectory, balancer loadbalancer.Balancer,
) queue.QueueServer {
Expand Down
116 changes: 116 additions & 0 deletions leader/cmd/tasks/datanode_data_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package tasks

import (
"context"
"fmt"
"os"
"time"

"github.com/sirupsen/logrus"

"github.com/kysre/TurtleMQ/leader/internal/app/loadbalancer"
"github.com/kysre/TurtleMQ/leader/internal/clients"
"github.com/kysre/TurtleMQ/leader/internal/models"
"github.com/kysre/TurtleMQ/leader/pkg/datanode"
)

type DataNodeDataSyncer struct {
logger *logrus.Logger
dataNodeDirectory *models.DataNodeDirectory
balancer loadbalancer.Balancer
partitionCount int
dataSyncTimeout int
}

func NewDataNodeSyncer(
logger *logrus.Logger,
dataNodeDirectory *models.DataNodeDirectory,
balancer loadbalancer.Balancer,
partitionCount int,
dataSyncTimeout int,
) *DataNodeDataSyncer {
return &DataNodeDataSyncer{
logger: logger,
dataNodeDirectory: dataNodeDirectory,
balancer: balancer,
partitionCount: partitionCount,
dataSyncTimeout: dataSyncTimeout,
}
}

// Data of datanode[i] is replicated in datanode[i+1]
// After datanode[i] fails:
// - Remove datanode[i] from load balancer
// - Replica data of datanode[i+1] should be read -> push to datanode[i+1] data
// - Replica data of datanode[i+1] should be purged
// - Data of datanode[i-1] should be read -> write to datanode[i+1] replica data

func (s *DataNodeDataSyncer) SyncData(failedDataNode *models.DataNode) {
s.logger.Info(fmt.Sprintf("Start datasync for datanode[%d]", failedDataNode.ID))
// Create context for requests
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.dataSyncTimeout)*time.Second)
defer cancel()
// Get dataNodes that are present in data sync
prevDataNode, afterDataNode, err := s.balancer.GetPreviousAndAfterDataNodesForSync(failedDataNode.Address)
if err != nil {
s.logger.Error(err)
}
// Halt push & pull to dataNodes that are present in data sync
s.dataNodeDirectory.UpdateDataNodeState(failedDataNode.ID, models.DataNodeStatePENDING)
s.dataNodeDirectory.UpdateDataNodeState(prevDataNode.ID, models.DataNodeStatePENDING)
s.dataNodeDirectory.UpdateDataNodeState(afterDataNode.ID, models.DataNodeStatePENDING)
// Step 1
err = s.balancer.RemoveDataNodeFromHashCircle(failedDataNode.Address)
if err != nil {
logrus.Error(err)
os.Exit(-1)
}
// Step 2
for i := 0; i < s.partitionCount; i++ {
res, err := afterDataNode.Client.ReadPartition(
ctx, &datanode.ReadPartitionRequest{PartitionIndex: int32(i), IsReplica: true})
if err != nil {
s.logger.Error(err)
continue
}
go s.pushMessagesToDataNode(ctx, afterDataNode.Client, res.PartitionMessages)
}
// Step 3
err = afterDataNode.Client.PurgeReplicaData(ctx)
if err != nil {
s.logger.Error(err)
}
// Step 4
for i := 0; i < s.partitionCount; i++ {
res, err := prevDataNode.Client.ReadPartition(
ctx, &datanode.ReadPartitionRequest{PartitionIndex: int32(i), IsReplica: false})
if err != nil {
s.logger.Error(err)
continue
}
req := datanode.WritePartitionRequest{PartitionIndex: int32(i), IsReplica: true}
req.PartitionMessages = append(req.PartitionMessages, res.PartitionMessages...)
err = afterDataNode.Client.WritePartition(ctx, &req)
if err != nil {
s.logger.Error(err)
}
}
// Resume push & pull to dataNodes that are present in data sync
s.dataNodeDirectory.UpdateDataNodeState(failedDataNode.ID, models.DataNodeStateUNHEALTHY)
s.dataNodeDirectory.UpdateDataNodeState(prevDataNode.ID, models.DataNodeStateAVAILABLE)
s.dataNodeDirectory.UpdateDataNodeState(afterDataNode.ID, models.DataNodeStateAVAILABLE)
s.logger.Info(fmt.Sprintf("Done datasync for datanode[%d]", failedDataNode.ID))
}

func (s *DataNodeDataSyncer) pushMessagesToDataNode(
ctx context.Context, client clients.DataNodeClient, messages []*datanode.QueueMessage) {
for _, message := range messages {
req := datanode.PushRequest{IsReplica: false, Message: message}
reqCtx, cancel := context.WithTimeout(ctx, time.Duration(3)*time.Second)
_, err := client.Push(reqCtx, &req)
if err != nil {
s.logger.Error(err)
}
cancel()
}
}
38 changes: 30 additions & 8 deletions leader/cmd/tasks/datanode_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,48 @@ import (
"github.com/kysre/TurtleMQ/leader/internal/models"
)

func RunHealthChecks(dataNodeDirectory *models.DataNodeDirectory, healthCheckPeriod int) {
tickerPeriod := time.Duration(healthCheckPeriod) * time.Second
type DataNodeHealthChecker struct {
dataNodeDirectory *models.DataNodeDirectory
healthCheckPeriod int
dataNodeDataSyncer *DataNodeDataSyncer
}

func NewDataNodeHealthChecker(
dataNodeDirectory *models.DataNodeDirectory,
healthCheckPeriod int,
dataNodeDataSyncer *DataNodeDataSyncer,
) *DataNodeHealthChecker {
return &DataNodeHealthChecker{
dataNodeDirectory: dataNodeDirectory,
healthCheckPeriod: healthCheckPeriod,
dataNodeDataSyncer: dataNodeDataSyncer,
}
}

func (hc *DataNodeHealthChecker) RunHealthChecks() {
tickerPeriod := time.Duration(hc.healthCheckPeriod) * time.Second
ticker := time.NewTicker(tickerPeriod)
for {
select {
case <-ticker.C:
logrus.Info("Running DataNode health-check")
checkNodes(dataNodeDirectory)
hc.checkNodes()
}
}
}

func checkNodes(dataNodeDirectory *models.DataNodeDirectory) {
dataNodes := dataNodeDirectory.DataNodes
func (hc *DataNodeHealthChecker) checkNodes() {
dataNodes := hc.dataNodeDirectory.DataNodes
for i, node := range dataNodes {
if node.State == models.DataNodeStateUNHEALTHY {
if node.State != models.DataNodeStatePENDING {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
isHealthy := node.Client.IsHealthy(ctx)
if isHealthy {
dataNodeDirectory.UpdateDataNodeState(i, models.DataNodeStateAVAILABLE)
if node.State == models.DataNodeStateAVAILABLE && !isHealthy {
hc.dataNodeDirectory.UpdateDataNodeState(i, models.DataNodeStateUNHEALTHY)
logrus.Info(fmt.Sprintf("DataNode [%d] is not healthy!", i))
go hc.dataNodeDataSyncer.SyncData(node)
} else if node.State == models.DataNodeStateUNHEALTHY && isHealthy {
hc.dataNodeDirectory.UpdateDataNodeState(i, models.DataNodeStateAVAILABLE)
logrus.Info(fmt.Sprintf("DataNode [%d] became healthy!", i))
}
cancel()
Expand Down
2 changes: 1 addition & 1 deletion leader/cmd/tasks/datanode_remaining_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func RunRemainingCheck(dataNodeDirectory *models.DataNodeDirectory, remainingChe
func updateNodesRemaining(dataNodeDirectory *models.DataNodeDirectory) {
dataNodes := dataNodeDirectory.DataNodes
for _, node := range dataNodes {
if node.State != models.DataNodeStateUNHEALTHY {
if node.State == models.DataNodeStateAVAILABLE {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
count, err := node.Client.GetRemainingMessagesCount(ctx)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions leader/internal/app/core/leader_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (lc *leaderCore) AddDataNode(ctx context.Context, request *leader.AddDataNo
address := request.GetAddress()
client, err := clients.NewDataNodeClient(address)
if err != nil {
lc.logger.Error(err)
return nil, err
}
dataNode := models.DataNode{
Expand All @@ -50,10 +51,12 @@ func (lc *leaderCore) AddDataNode(ctx context.Context, request *leader.AddDataNo
}
err = lc.directory.AddDataNode(&dataNode)
if err != nil {
lc.logger.Error(err)
return nil, err
}
err = lc.balancer.AddDataNodeToHashCircle(&dataNode)
if err != nil {
lc.logger.Error(err)
return nil, err
}
lc.logger.Info(fmt.Sprintf("Added DataNode %v", dataNode))
Expand Down
55 changes: 47 additions & 8 deletions leader/internal/app/core/queue_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ type queueCore struct {
balancer loadbalancer.Balancer
}

func NewQueueCore(logger *logrus.Logger, directory *models.DataNodeDirectory, balancer loadbalancer.Balancer) queue.QueueServer {
func NewQueueCore(
logger *logrus.Logger, directory *models.DataNodeDirectory, balancer loadbalancer.Balancer,
) queue.QueueServer {
return &queueCore{
logger: logger,
directory: directory,
Expand All @@ -37,16 +39,32 @@ func (c *queueCore) Push(
c.collectRpsMetrics("Push")
defer c.collectLatencyMetrics("Push", startTime)

// Get datanode & it's replica client
key := request.GetKey()
client, err := c.balancer.GetPushDataNodeClient(ctx, key)
c.logger.Info(fmt.Sprintf("Push key: %s to DataNode %v", key, client))
dataNodeClient, dataNodeReplicaClient, err := c.balancer.GetPushDataNodeAndReplicaClient(ctx, key)
if err != nil {
c.logger.Error(err)
return nil, err
}
c.logger.Info(fmt.Sprintf("Push key: %s to DataNode %v", key, dataNodeClient))
c.logger.Info(fmt.Sprintf("Push key: %s to DataNodeReplica %v", key, dataNodeReplicaClient))
// Create grpc requests
messagePb := datanode.QueueMessage{Key: key}
messagePb.Value = append(messagePb.Value, request.GetValue()...)
dataNodeReq := datanode.PushRequest{Message: &messagePb}
return client.Push(ctx, &dataNodeReq)
dataNodeReq := datanode.PushRequest{Message: &messagePb, IsReplica: false}
dataNodeReplicaReq := datanode.PushRequest{Message: &messagePb, IsReplica: true}
// Send grpc requests
_, err = dataNodeClient.Push(ctx, &dataNodeReq)
if err != nil {
c.logger.Error(err)
return nil, err
}
_, err = dataNodeReplicaClient.Push(ctx, &dataNodeReplicaReq)
if err != nil {
c.logger.Error(err)
return nil, err
}
return &emptypb.Empty{}, nil
}

func (c *queueCore) Pull(
Expand All @@ -58,15 +76,20 @@ func (c *queueCore) Pull(
defer c.collectLatencyMetrics("Pull", startTime)

c.logger.Info("Received Pull request")
// Get datanode
client, err := c.balancer.GetPullDataNodeClient(ctx)
if err != nil {
c.logger.Error(err)
return nil, err
}
c.logger.Info(fmt.Sprintf("Pull req datanode client: %v", client))
// Get pull response
dataNodeRes, err := client.Pull(ctx, request)
if err != nil {
c.logger.Error(err)
return nil, err
}
// Return response in correct format
message := dataNodeRes.GetMessage()
c.logger.Info(fmt.Sprintf("Pull response with key: %s", message.GetKey()))
response := queue.PullResponse{Key: message.GetKey()}
Expand All @@ -84,12 +107,28 @@ func (c *queueCore) AcknowledgePull(

key := request.GetKey()
c.logger.Info(fmt.Sprintf("Received Ack Pull key=%s", key))
client, err := c.balancer.GetPushDataNodeClient(ctx, key)
// Get datanode & it's replica client
dataNodeClient, dataNodeReplicaClient, err := c.balancer.GetPushDataNodeAndReplicaClient(ctx, key)
if err != nil {
c.logger.Error(err)
return nil, err
}
c.logger.Info(fmt.Sprintf("Ack key: %s to DataNode %v", key, dataNodeClient))
c.logger.Info(fmt.Sprintf("Ack key: %s to DataNodeReplica %v", key, dataNodeReplicaClient))
// Create requests & send Ack to datanode & it's replica
dataNodeReq := datanode.AcknowledgePullRequest{Key: key, IsReplica: false}
dataNodeReplicaReq := datanode.AcknowledgePullRequest{Key: key, IsReplica: true}
_, err = dataNodeClient.AcknowledgePull(ctx, &dataNodeReq)
if err != nil {
c.logger.Error(err)
return nil, err
}
_, err = dataNodeClient.AcknowledgePull(ctx, &dataNodeReplicaReq)
if err != nil {
c.logger.Error(err)
return nil, err
}
dataNodeReq := datanode.AcknowledgePullRequest{Key: key}
return client.AcknowledgePull(ctx, &dataNodeReq)
return &emptypb.Empty{}, nil
}

func (c *queueCore) collectLatencyMetrics(methodName string, startTime time.Time) {
Expand Down
Loading

0 comments on commit 29e2cd9

Please sign in to comment.