Skip to content

Commit

Permalink
Add replica for leader (#33)
Browse files Browse the repository at this point in the history
* Add leader replica host to config

* Add leader client

* Add leader consensus handler

* Handle not adding duplicate datanodes

* Implement leader sync task

* Add leader replica support to client_go

* Add leader replica support to client_py

* Increase leader health check timeout

* Fix debug level logs log-level
  • Loading branch information
kysre authored Feb 10, 2024
1 parent d3c7179 commit 7cbc882
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 36 deletions.
30 changes: 22 additions & 8 deletions client_go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/kysre/TurtleMQ/client_go/queue"
)

const HOST = "localhost"

type SubscribeFunction func(key string, value []byte)

type QueueClient interface {
Expand All @@ -21,19 +23,29 @@ type QueueClient interface {
}

func GetQueueClient() QueueClient {
leaderAddr := "localhost:8000"
leaderAddr := fmt.Sprintf("%s:8000", HOST)
leaderReplicaAddr := fmt.Sprintf("%s:8001", HOST)

conn, err := grpc.Dial(leaderAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic("Can't connect to TurtleMQ cluster")
}
client := queue.NewQueueClient(conn)
repConn, err := grpc.Dial(leaderReplicaAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic("Can't connect to TurtleMQ cluster")
}
replicaClient := queue.NewQueueClient(repConn)

return &queueClient{
client: client,
client: client,
replicaClient: replicaClient,
}
}

type queueClient struct {
client queue.QueueClient
client queue.QueueClient
replicaClient queue.QueueClient
}

func (c *queueClient) Push(key string, value []byte) {
Expand All @@ -42,16 +54,18 @@ func (c *queueClient) Push(key string, value []byte) {
req.Value = append(req.Value, value...)
_, err := c.client.Push(ctx, &req)
if err != nil {
fmt.Print(err)
_, _ = c.replicaClient.Push(ctx, &req)
}
}

func (c *queueClient) Pull() (string, []byte) {
ctx := context.Background()
res, err := c.client.Pull(ctx, &emptypb.Empty{})
if err != nil {
fmt.Print(err)
return "", nil
res, err = c.replicaClient.Pull(ctx, &emptypb.Empty{})
if err != nil {
return "", nil
}
}
defer c.acknowledgePull(ctx, res.GetKey())
return res.GetKey(), res.GetValue()
Expand All @@ -65,7 +79,7 @@ func (c *queueClient) acknowledgePull(ctx context.Context, key string) {
req := queue.AcknowledgePullRequest{Key: key}
_, err := c.client.AcknowledgePull(ctx, &req)
if err != nil {
fmt.Print(err)
_, _ = c.replicaClient.AcknowledgePull(ctx, &req)
}
}

Expand All @@ -76,7 +90,7 @@ func (c *queueClient) runSubscribe(function SubscribeFunction) {
select {
case <-ticker.C:
key, value := c.Pull()
if value == nil {
if key == "" {
continue
}
function(key, value)
Expand Down
37 changes: 29 additions & 8 deletions client_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
from threading import Thread
import grpc
# import google.protobuf.empty_pb2 as empty_pb2
from google.protobuf import empty_pb2 as _empty_pb2
from concurrent.futures import ThreadPoolExecutor
from client_py import queue_pb2_grpc
Expand All @@ -13,7 +12,9 @@

class QueueClient:
stub = None
HOST, PORT = "localhost", "8000"
replica_stub = None
HOST = "localhost"
PORT, REPLICA_PORT = "8000", "8001"
SUBSCRIBE_WORKERS = 3
SUBSCRIBE_SLEEP_TIMEOUT = 2

Expand All @@ -24,14 +25,24 @@ def get_stub(cls, host: str, port: str):
cls.stub = queue_pb2_grpc.QueueStub(channel)
return cls.stub

@classmethod
def get_replica_stub(cls, host: str, port: str):
if cls.replica_stub is None:
channel = grpc.insecure_channel(f"{host}:{port}")
cls.replica_stub = queue_pb2_grpc.QueueStub(channel)
return cls.replica_stub

def push(self, key: str, value: bytes):
try:
stub = self.get_stub(self.HOST, self.PORT)

stub.Push(queue_pb2.PushRequest(key=key, value=value))

except grpc.RpcError as e:
print(f"Error in pushing: {e}.")
try:
stub = self.get_replica_stub(self.HOST, self.REPLICA_PORT)
stub.Push(queue_pb2.PushRequest(key=key, value=value))
except grpc.RpcError as e:
pass

def pull(self) -> (str, bytes):
try:
Expand All @@ -40,16 +51,26 @@ def pull(self) -> (str, bytes):
self.ack(response.key)
return response.key, response.value
except grpc.RpcError as e:
print(f"Error in pulling: {e}.")
try:
stub = self.get_replica_stub(self.HOST, self.REPLICA_PORT)
response = stub.Pull(_empty_pb2.Empty())
self.ack(response.key)
return response.key, response.value
except grpc.RpcError as e:
return '', None

def ack(self, acknowledgement: str):
try:
stub = self.get_stub(self.HOST, self.PORT)
stub.AcknowledgePull(queue_pb2.AcknowledgePullRequest(key=acknowledgement))
return None
except grpc.RpcError as e:
print(f"Error in acknowledgement: {e}")
return False
try:
stub = self.get_replica_stub(self.HOST, self.REPLICA_PORT)
stub.AcknowledgePull(queue_pb2.AcknowledgePullRequest(key=acknowledgement))
return None
except grpc.RpcError as e:
return None

def subscribe(self, f):
thread = Thread(target=self.run_subscribe, args=(f,))
Expand All @@ -62,7 +83,7 @@ def run_subscribe(self, f):
with ThreadPoolExecutor(max_workers=QueueClient.SUBSCRIBE_WORKERS) as executer:
for _ in range(QueueClient.SUBSCRIBE_WORKERS):
pull_response = self.pull()
if pull_response is not None and pull_response is not False:
if pull_response is not None and pull_response is not False and pull_response[0] != '':
futures.append(executer.submit(f, pull_response[0], pull_response[1]))
time.sleep(QueueClient.SUBSCRIBE_SLEEP_TIMEOUT)

Expand Down
8 changes: 4 additions & 4 deletions leader/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type QueueConfig struct {
}

type LeaderConfig struct {
ListenPort int
ReplicaHost string
ReplicaHost string
LeaderSyncPeriod int

DataNodeStateCheckPeriod int
DataNodeRemainingCheckPeriod int
Expand All @@ -48,8 +48,8 @@ func LoadConfig(cmd *cobra.Command) (*Config, error) {

viper.SetDefault("queue.listenPort", 8888)

viper.SetDefault("leader.ListenPort", 8080) // Not used as of now
viper.SetDefault("leader.ReplicaHost", "localhost")
viper.SetDefault("leader.ReplicaHost", "leader_1")
viper.SetDefault("leader.LeaderSyncPeriod", 10)
viper.SetDefault("leader.DataNodeStateCheckPeriod", 30)
viper.SetDefault("leader.DataNodeRemainingCheckPeriod", 2)
viper.SetDefault("leader.DataNodePartitionCount", 4)
Expand Down
24 changes: 20 additions & 4 deletions leader/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func serve(cmd *cobra.Command, args []string) {
logger := getLoggerOrPanic(config)
directory := getDataNodeDirectoryOrPanic()
balancer := getLoadBalancerOrPanic(logger, directory)
runDataNodesTasks(config, logger, directory, balancer)
consensusHandler := getConsensusOrPanic(config)

runTasks(config, logger, directory, balancer, consensusHandler)

// Get grpc server cores
queueCore := getQueueCore(logger, directory, balancer)
Expand Down Expand Up @@ -101,13 +103,27 @@ func getLoadBalancerOrPanic(log *logrus.Logger, directory *models.DataNodeDirect
return loadbalancer.NewBalancer(log, directory)
}

func runDataNodesTasks(
conf *Config, log *logrus.Logger, directory *models.DataNodeDirectory, balancer loadbalancer.Balancer,
func getConsensusOrPanic(conf *Config) *models.LeaderConsensusHandler {
return models.NewLeaderConsensusHandler(conf.Leader.ReplicaHost)
}

func runTasks(
conf *Config,
log *logrus.Logger,
directory *models.DataNodeDirectory,
balancer loadbalancer.Balancer,
consensusHandler *models.LeaderConsensusHandler,
) {
// Init resources
syncer := tasks.NewDataNodeSyncer(
log, directory, balancer, conf.Leader.DataNodePartitionCount, conf.Leader.DataNodeSyncTimeout)
log, directory, balancer, conf.Leader.DataNodePartitionCount,
conf.Leader.DataNodeSyncTimeout, consensusHandler.AmIMaster(),
)
healthChecker := tasks.NewDataNodeHealthChecker(directory, conf.Leader.DataNodeStateCheckPeriod, syncer)
leaderSyncer := tasks.NewLeaderSyncer(log, consensusHandler, directory, conf.Leader.LeaderSyncPeriod)
// Run tasks
go healthChecker.RunHealthChecks()
go leaderSyncer.RunLeaderSync()
go tasks.RunRemainingCheck(directory, conf.Leader.DataNodeRemainingCheckPeriod)
}

Expand Down
11 changes: 9 additions & 2 deletions leader/cmd/tasks/datanode_data_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type DataNodeDataSyncer struct {
balancer loadbalancer.Balancer
partitionCount int
dataSyncTimeout int
shouldSync bool
}

func NewDataNodeSyncer(
Expand All @@ -28,13 +29,15 @@ func NewDataNodeSyncer(
balancer loadbalancer.Balancer,
partitionCount int,
dataSyncTimeout int,
shouldSync bool,
) *DataNodeDataSyncer {
return &DataNodeDataSyncer{
logger: logger,
dataNodeDirectory: dataNodeDirectory,
balancer: balancer,
partitionCount: partitionCount,
dataSyncTimeout: dataSyncTimeout,
shouldSync: shouldSync,
}
}

Expand All @@ -46,7 +49,11 @@ func NewDataNodeSyncer(
// - Data of datanode[i-1] should be read -> write to datanode[i+1] replica data

func (s *DataNodeDataSyncer) SyncData(failedDataNode *models.DataNode) {
s.logger.Info(fmt.Sprintf("Start datasync for datanode[%d]", failedDataNode.ID))
// Not sync data in leader's replica
if !s.shouldSync {
return
}
s.logger.Debug(fmt.Sprintf("Start datasync for datanode[%d]", failedDataNode.ID))
// Create context for requests
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.dataSyncTimeout)*time.Second)
defer cancel()
Expand Down Expand Up @@ -99,7 +106,7 @@ func (s *DataNodeDataSyncer) SyncData(failedDataNode *models.DataNode) {
s.dataNodeDirectory.UpdateDataNodeState(failedDataNode.ID, models.DataNodeStateUNHEALTHY)
s.dataNodeDirectory.UpdateDataNodeState(prevDataNode.ID, models.DataNodeStateAVAILABLE)
s.dataNodeDirectory.UpdateDataNodeState(afterDataNode.ID, models.DataNodeStateAVAILABLE)
s.logger.Info(fmt.Sprintf("Done datasync for datanode[%d]", failedDataNode.ID))
s.logger.Debug(fmt.Sprintf("Done datasync for datanode[%d]", failedDataNode.ID))
}

func (s *DataNodeDataSyncer) pushMessagesToDataNode(
Expand Down
2 changes: 1 addition & 1 deletion leader/cmd/tasks/datanode_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (hc *DataNodeHealthChecker) RunHealthChecks() {
for {
select {
case <-ticker.C:
logrus.Info("Running DataNode health-check")
logrus.Debug("Running DataNode health-check")
hc.checkNodes()
}
}
Expand Down
2 changes: 1 addition & 1 deletion leader/cmd/tasks/datanode_remaining_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func RunRemainingCheck(dataNodeDirectory *models.DataNodeDirectory, remainingChe
for {
select {
case <-ticker.C:
logrus.Info("Running DataNode remaining-check")
logrus.Debug("Running DataNode remaining-check")
updateNodesRemaining(dataNodeDirectory)
}
}
Expand Down
47 changes: 47 additions & 0 deletions leader/cmd/tasks/leader_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tasks

import (
"context"
"time"

"github.com/sirupsen/logrus"

"github.com/kysre/TurtleMQ/leader/internal/models"
)

type LeaderSyncer struct {
logger *logrus.Logger
handler *models.LeaderConsensusHandler
directory *models.DataNodeDirectory
period int
}

func NewLeaderSyncer(
logger *logrus.Logger,
handler *models.LeaderConsensusHandler,
directory *models.DataNodeDirectory,
period int,
) *LeaderSyncer {
return &LeaderSyncer{
logger: logger,
handler: handler,
directory: directory,
period: period,
}
}

func (ls *LeaderSyncer) RunLeaderSync() {
ls.logger.Debug("Running leader sync")
tickerPeriod := time.Duration(ls.period) * time.Second
ticker := time.NewTicker(tickerPeriod)
for {
select {
case <-ticker.C:
if ls.handler.IsReplicaAvailable() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ls.handler.AddDataNodesToReplica(ctx, ls.directory.DataNodes)
cancel()
}
}
}
}
3 changes: 3 additions & 0 deletions leader/internal/app/core/leader_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (lc *leaderCore) IsHealthy(ctx context.Context, request *empty.Empty) (*emp

func (lc *leaderCore) AddDataNode(ctx context.Context, request *leader.AddDataNodeRequest) (*empty.Empty, error) {
address := request.GetAddress()
if lc.directory.DoesDataNodeExist(address) {
return &emptypb.Empty{}, nil
}
client, err := clients.NewDataNodeClient(address)
if err != nil {
lc.logger.Error(err)
Expand Down
Loading

0 comments on commit 7cbc882

Please sign in to comment.