Skip to content

Commit

Permalink
add desync monitor, change err level
Browse files Browse the repository at this point in the history
  • Loading branch information
powerpook committed Dec 10, 2021
1 parent 1351a75 commit fc5f307
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public/

packages/api/helper_test.go
/tempdir/*
/tools/
#/tools/
.scannerwork/
sonar-project.properties
tempdir/
Expand Down
8 changes: 8 additions & 0 deletions tools/desync_monitor/config.toml.temp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
nodes_list = [
# "http://127.0.0.1:7079",
# "http://127.0.0.1:2079",
# "http://127.0.0.1:3079",
]
[Daemon]
Daemon = true
querying_period = 4
20 changes: 20 additions & 0 deletions tools/desync_monitor/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package config

import (
"github.com/BurntSushi/toml"
)

type Daemon struct {
DaemonMode bool `toml:"daemon"`
QueryingPeriod int `toml:"querying_period"`
}

type Config struct {
Daemon Daemon `toml:"daemon"`
NodesList []string `toml:"nodes_list"`
}

func (c *Config) Read(fileName string) error {
_, err := toml.DecodeFile(fileName, c)
return err
}
108 changes: 108 additions & 0 deletions tools/desync_monitor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"flag"
"fmt"
"math"
"strings"
"time"

"github.com/IBAX-io/go-ibax/tools/desync_monitor/config"
"github.com/IBAX-io/go-ibax/tools/desync_monitor/query"

log "github.com/sirupsen/logrus"
)

const confPathFlagName = "confPath"
const nodesListFlagName = "nodesList"
const daemonModeFlagName = "daemonMode"
const queryingPeriodFlagName = "queryingPeriod"

var configPath = flag.String(confPathFlagName, "config.toml", "path to desync monitor config")
var nodesList = flag.String(nodesListFlagName, "127.0.0.1:7079", "which nodes to query, in format url1,url2,url3")
var daemonMode = flag.Bool(daemonModeFlagName, false, "start as daemon")
var queryingPeriod = flag.Int(queryingPeriodFlagName, 1, "period of querying nodes in seconds, if started as daemon")

func minElement(slice []int64) int64 {
var min int64 = math.MaxInt64
for _, blockID := range slice {
if blockID < min {
min = blockID
}
}
return min
}

func flagsOverrideConfig(conf *config.Config) {
flag.Visit(func(flag *flag.Flag) {
switch flag.Name {
case nodesListFlagName:
nodesList := strings.Split(*nodesList, ",")
conf.NodesList = nodesList
case daemonModeFlagName:
conf.Daemon.DaemonMode = *daemonMode
case queryingPeriodFlagName:
conf.Daemon.QueryingPeriod = *queryingPeriod
}
})
}

func monitor(conf *config.Config) {
maxBlockIDs, err := query.MaxBlockIDs(conf.NodesList)
if err != nil {
log.WithFields(log.Fields{"err": err}).Error("on sending max block request")
return
}

log.Infoln("max blocks ", maxBlockIDs)

blockInfos, err := query.BlockInfo(conf.NodesList, minElement(maxBlockIDs))
if err != nil {
log.WithFields(log.Fields{"err": err}).Error("on sending block info request")
return
}

hash2Node := map[string][]string{}
for node, blockInfo := range blockInfos {
rollbacksHash := string(blockInfo.RollbacksHash)
if _, ok := hash2Node[rollbacksHash]; !ok {
hash2Node[rollbacksHash] = []string{}
}
hash2Node[rollbacksHash] = append(hash2Node[rollbacksHash], node)
}

log.Infof("requested nodes: %v", conf.NodesList)

if len(hash2Node) <= 1 {
log.Infoln("nodes synced")
return
}

hash2NodeStrResults := []string{}
for k, v := range hash2Node {
hash2NodeStrResults = append(hash2NodeStrResults, fmt.Sprintf("%x: %s", k, v))
}

log.Infof("nodes unsynced. Rollback hashes are: %s", strings.Join(hash2NodeStrResults, ","))
}

func main() {
flag.Parse()
conf := &config.Config{}
if err := conf.Read(*configPath); err != nil {
log.WithFields(log.Fields{"error": err}).Fatal("reading config")
}

flagsOverrideConfig(conf)

if conf.Daemon.DaemonMode {
log.Infoln("MODE: daemon")
ticker := time.NewTicker(time.Second * time.Duration(conf.Daemon.QueryingPeriod))
for range ticker.C {
monitor(conf)
}
} else {
log.Println("MODE: single request")
monitor(conf)
}
}
78 changes: 78 additions & 0 deletions tools/desync_monitor/query/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package query

import (
"fmt"
"sync"
)

const maxBlockIDEndpoint = "/api/v2/maxblockid"
const blockInfoEndpoint = "/api/v2/block/%d"

type MaxBlockID struct {
MaxBlockID int64 `json:"max_block_id"`
}

type blockInfoResult struct {
Hash []byte `json:"hash"`
EcosystemID int64 `json:"ecosystem_id"`
KeyID int64 `json:"key_id"`
Time int64 `json:"time"`
Tx int32 `json:"tx_count"`
RollbacksHash []byte `json:"rollbacks_hash"`
}

func MaxBlockIDs(nodesList []string) ([]int64, error) {
wg := sync.WaitGroup{}
workResults := ConcurrentMap{m: map[string]interface{}{}}
for _, nodeUrl := range nodesList {
wg.Add(1)
go func(url string) {
defer wg.Done()
maxBlockID := &MaxBlockID{}
if err := sendGetRequest(url+maxBlockIDEndpoint, maxBlockID); err != nil {
workResults.Set(url, err)
return
}
workResults.Set(url, maxBlockID.MaxBlockID)
}(nodeUrl)
}
wg.Wait()
maxBlockIds := []int64{}
for _, result := range workResults.m {
switch res := result.(type) {
case int64:
maxBlockIds = append(maxBlockIds, res)
case error:
return nil, res
}
}
return maxBlockIds, nil
}

func BlockInfo(nodesList []string, blockID int64) (map[string]*blockInfoResult, error) {
wg := sync.WaitGroup{}
workResults := ConcurrentMap{m: map[string]interface{}{}}
for _, nodeUrl := range nodesList {
wg.Add(1)
go func(url string) {
defer wg.Done()
blockInfo := &blockInfoResult{}
if err := sendGetRequest(url+fmt.Sprintf(blockInfoEndpoint, blockID), blockInfo); err != nil {
workResults.Set(url, err)
return
}
workResults.Set(url, blockInfo)
}(nodeUrl)
}
wg.Wait()
result := map[string]*blockInfoResult{}
for nodeUrl, blockInfoOrError := range workResults.m {
switch res := blockInfoOrError.(type) {
case error:
return nil, res
case *blockInfoResult:
result[nodeUrl] = res
}
}
return result, nil
}
53 changes: 53 additions & 0 deletions tools/desync_monitor/query/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package query

import (
"encoding/json"
"fmt"
"io"
"net/http"
"sync"

log "github.com/sirupsen/logrus"
)

type ConcurrentMap struct {
m map[string]interface{}
mu sync.RWMutex
}

func (c *ConcurrentMap) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[key] = value
}

func (c *ConcurrentMap) Get(key string) (bool, interface{}) {
c.mu.RLock()
defer c.mu.RUnlock()
res, ok := c.m[key]
return ok, res
}

func sendGetRequest(url string, v interface{}) error {
resp, err := http.Get(url)
if err != nil {
log.WithFields(log.Fields{"url": url, "error": err}).Error("get requesting url")
return err
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("status code is not OK %d", resp.StatusCode)
log.WithFields(log.Fields{"url": url, "error": err}).Error("incorrect status code")
return err
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
log.WithFields(log.Fields{"url": url, "error": err}).Error("reading response body")
return err
}
if err := json.Unmarshal(data, v); err != nil {
log.WithFields(log.Fields{"data": string(data), "error": err}).Error("unmarshalling json to struct")
return err
}
return nil
}

0 comments on commit fc5f307

Please sign in to comment.