Skip to content
This repository has been archived by the owner on Nov 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2302 from bandprotocol/syncable-flusher
Browse files Browse the repository at this point in the history
flusher: Add offset check before sync flusher
  • Loading branch information
Benzbeeb authored Jul 23, 2020
2 parents 903066c + af61229 commit d4f90dc
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_UNRELEASED.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

### Emitter & Flusher

- (impv) [\#2302](https://github.com/bandprotocol/bandchain/pull/2302) Add offset check before sync flusher.
- (bugs) [\#2298](https://github.com/bandprotocol/bandchain/pull/2298) Fix bug `accumulated_commission` in `emitSetValidator`.
- (bugs) [\#2295](https://github.com/bandprotocol/bandchain/pull/2295) Truncate `accumulated_commission` precision.

Expand Down
1 change: 0 additions & 1 deletion chain/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ vendor
**/target/
/pkg/owasm/res/*.wasm
/docker-config/genesis.json
/docker-config/single-validator/genesis.json
4 changes: 2 additions & 2 deletions chain/docker-config/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ cp -r /chain/docker-config/files ~/.bandd
if [ "$1" == "emitter" ];then
sleep 60
bandd start --with-emitter [email protected]:9092 \
--p2p.persistent_peers [email protected]:26656,[email protected]:26656,[email protected]:26656,[email protected]:26656 --pruning nothing
--p2p.persistent_peers [email protected]:26656,[email protected]:26656,[email protected]:26656,[email protected]:26656
else
bandd start --rpc.laddr tcp://0.0.0.0:26657 \
--p2p.persistent_peers [email protected]:26656,[email protected]:26656,[email protected]:26656,[email protected]:26656 --pruning nothing
--p2p.persistent_peers [email protected]:26656,[email protected]:26656,[email protected]:26656,[email protected]:26656
fi
41 changes: 0 additions & 41 deletions chain/docker-config/single-validator/generate-genesis.sh

This file was deleted.

16 changes: 0 additions & 16 deletions chain/docker-config/single-validator/run.sh

This file was deleted.

2 changes: 1 addition & 1 deletion chain/scripts/start_bandd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ dropdb my_db
createdb my_db

# start bandchain
bandd start --rpc.laddr tcp://0.0.0.0:26657 --pruning=nothing
bandd start --rpc.laddr tcp://0.0.0.0:26657
3 changes: 1 addition & 2 deletions chain/scripts/start_bandd_with_emitter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@ python ../flusher/main.py init bandchain test --db localhost:5432/my_db


# start bandchain
bandd start --with-emitter test@localhost:9092 \
--rpc.laddr tcp://0.0.0.0:26657 --pruning=nothing
bandd start --with-emitter test@localhost:9092 --rpc.laddr tcp://0.0.0.0:26657
1 change: 0 additions & 1 deletion flusher/flusher/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@
def cli():
"""BandChain's flusher utility program."""
pass

12 changes: 11 additions & 1 deletion flusher/flusher/sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import click
import sys
import time

from kafka import KafkaConsumer, TopicPartition
from loguru import logger
from sqlalchemy import create_engine
Expand Down Expand Up @@ -39,7 +41,15 @@ def sync(commit_interval, db, servers, echo_sqlalchemy):
partitions = consumer.partitions_for_topic(topic)
if len(partitions) != 1:
raise Exception("Only exact 1 partition is supported.")
consumer.seek(TopicPartition(topic, partitions.pop()), tracking_info.kafka_offset + 1)
tp = TopicPartition(topic, partitions.pop())
while True:
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
if tracking_info.kafka_offset < last_offset:
break
logger.info("Waiting emitter sync current emitter offset is {}", last_offset)
time.sleep(5)
consumer.seek(tp, tracking_info.kafka_offset + 1)
consumer_iter = iter(consumer)
# Main loop
while True:
Expand Down

0 comments on commit d4f90dc

Please sign in to comment.