Skip to content

Commit 8e41204

Browse files
committed
update for pool
1 parent f7c5cee commit 8e41204

File tree

7 files changed

+101
-49
lines changed

7 files changed

+101
-49
lines changed

common/condTimeout.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
package common
22

33
import (
4-
"github.com/pkg/errors"
4+
"fmt"
55
"sync"
6+
"sync/atomic"
67
"time"
8+
9+
"github.com/pkg/errors"
710
)
811

912
var TimeoutErr = errors.New("timeout")
1013

1114
type TimeoutCond struct {
12-
cd *sync.Cond
15+
cd *sync.Cond
16+
notifyNum uint32
1317
}
1418

1519
func NewTimeoutCond() *TimeoutCond {
@@ -18,12 +22,21 @@ func NewTimeoutCond() *TimeoutCond {
1822
}
1923

2024
func (self *TimeoutCond) Wait() {
25+
old := atomic.SwapUint32(&self.notifyNum, 0)
26+
if old > 0 {
27+
return
28+
}
2129
self.cd.L.Lock()
2230
defer self.cd.L.Unlock()
2331
self.cd.Wait()
2432
}
2533

2634
func (self *TimeoutCond) WaitTimeout(t time.Duration) error {
35+
old := atomic.SwapUint32(&self.notifyNum, 0)
36+
if old > 0 {
37+
fmt.Printf("t:%s, num:%d\n", time.Now(), old)
38+
return nil
39+
}
2740
done := make(chan struct{})
2841
go func() {
2942
self.Wait()
@@ -40,9 +53,10 @@ func (self *TimeoutCond) WaitTimeout(t time.Duration) error {
4053
}
4154

4255
func (self *TimeoutCond) Broadcast() {
56+
atomic.AddUint32(&self.notifyNum, 1)
4357
self.cd.Broadcast()
44-
4558
}
4659
func (self *TimeoutCond) Signal() {
60+
atomic.AddUint32(&self.notifyNum, 1)
4761
self.cd.Signal()
4862
}

common/types/address.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55
"crypto/rand"
66
"encoding/hex"
77
"fmt"
8+
"math/big"
9+
"strings"
10+
811
"github.com/vitelabs/go-vite/common/helper"
912
vcrypto "github.com/vitelabs/go-vite/crypto"
1013
"github.com/vitelabs/go-vite/crypto/ed25519"
11-
"math/big"
12-
"strings"
1314
)
1415

1516
const (
@@ -85,6 +86,14 @@ func HexToAddress(hexStr string) (Address, error) {
8586
}
8687
}
8788

89+
func HexToAddressPanic(hexstr string) Address {
90+
h, err := HexToAddress(hexstr)
91+
if err != nil {
92+
panic(err)
93+
}
94+
return h
95+
}
96+
8897
func IsValidHexAddress(hexStr string) bool {
8998
if len(hexStr) != hexAddressLength || !strings.HasPrefix(hexStr, AddressPrefix) {
9099
return false

pool/account_pool.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -628,8 +628,8 @@ func (self *accountPool) makePackage(q Package, info *offsetInfo) (uint64, error
628628
self.pool.RLock()
629629
defer self.pool.RUnLock()
630630

631-
self.rMu.Lock()
632-
defer self.rMu.Unlock()
631+
self.chainTailMu.Lock()
632+
defer self.chainTailMu.Unlock()
633633

634634
cp := self.chainpool
635635
current := cp.current
@@ -679,7 +679,7 @@ func (self *accountPool) makePackage(q Package, info *offsetInfo) (uint64, error
679679
func (self *accountPool) tryInsertItems(items []*Item, latestSb *ledger.SnapshotBlock) error {
680680
// if current size is empty, do nothing.
681681
if self.chainpool.current.size() <= 0 {
682-
return errors.New("empty chainpool")
682+
return errors.Errorf("empty chainpool, but item size:%d", len(items))
683683
}
684684

685685
self.chainTailMu.Lock()
@@ -720,7 +720,7 @@ func (self *accountPool) tryInsertItems(items []*Item, latestSb *ledger.Snapshot
720720
fmt.Println(self.address, item.commonBlock.(*accountPoolBlock).block.IsSendBlock())
721721
return errors.New("tail not match")
722722
}
723-
fmt.Printf("try to insert account block[%s]%d-%d success.\n", block.Hash(), i, len(items))
723+
self.log.Info(fmt.Sprintf("try to insert account block[%s]%d-%d [latency:%s]success.\n", block.Hash(), i, len(items), block.Latency()))
724724
}
725725
return nil
726726
}

pool/pool.go

+25-12
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,19 @@ type commonBlock interface {
7979
resetForkVersion()
8080
forkVersion() int
8181
Source() types.BlockSource
82+
Latency() time.Duration
8283
ReferHashes() ([]types.Hash, []types.Hash, *types.Hash)
8384
}
8485

8586
func newForkBlock(v *ForkVersion, source types.BlockSource) *forkBlock {
86-
return &forkBlock{firstV: v.Val(), v: v, source: source}
87+
return &forkBlock{firstV: v.Val(), v: v, source: source, nTime: time.Now()}
8788
}
8889

8990
type forkBlock struct {
9091
firstV int
9192
v *ForkVersion
9293
source types.BlockSource
94+
nTime time.Time
9395
}
9496

9597
func (self *forkBlock) forkVersion() int {
@@ -102,6 +104,16 @@ func (self *forkBlock) resetForkVersion() {
102104
val := self.v.Val()
103105
self.firstV = val
104106
}
107+
func (self *forkBlock) Latency() time.Duration {
108+
if self.Source() == types.RemoteBroadcast || self.Source() == types.RemoteFetch {
109+
return time.Now().Sub(self.nTime)
110+
}
111+
return time.Duration(0)
112+
}
113+
114+
func (self *forkBlock) Source() types.BlockSource {
115+
return self.source
116+
}
105117

106118
type pool struct {
107119
pendingSc *snapshotPool
@@ -117,6 +129,9 @@ type pool struct {
117129
accountSubId int
118130
snapshotSubId int
119131

132+
newAccBlockCond *common.TimeoutCond
133+
newSnapshotBlockCond *common.TimeoutCond
134+
120135
rwMutex sync.RWMutex
121136
version *ForkVersion
122137

@@ -189,6 +204,8 @@ func NewPool(bc chainDb) (*pool, error) {
189204
self.addrCache = cache
190205

191206
self.hashBlacklist, err = NewBlacklist()
207+
self.newAccBlockCond = common.NewTimeoutCond()
208+
self.newSnapshotBlockCond = common.NewTimeoutCond()
192209
if err != nil {
193210
return nil, err
194211
}
@@ -205,7 +222,7 @@ func (self *pool) Init(s syncer,
205222
fe := &snapshotSyncer{fetcher: s, log: self.log.New("t", "snapshot")}
206223
v := &snapshotVerifier{v: snapshotV}
207224
self.accountVerifier = accountV
208-
snapshotPool := newSnapshotPool("snapshotPool", self.version, v, fe, rw, self.hashBlacklist, self.log)
225+
snapshotPool := newSnapshotPool("snapshotPool", self.version, v, fe, rw, self.hashBlacklist, self.newSnapshotBlockCond, self.log)
209226
snapshotPool.init(
210227
newTools(fe, rw),
211228
self)
@@ -335,6 +352,8 @@ func (self *pool) AddSnapshotBlock(block *ledger.SnapshotBlock, source types.Blo
335352
return
336353
}
337354
self.pendingSc.AddBlock(newSnapshotPoolBlock(block, self.version, source))
355+
356+
self.newSnapshotBlockCond.Broadcast()
338357
}
339358

340359
func (self *pool) AddDirectSnapshotBlock(block *ledger.SnapshotBlock) error {
@@ -375,6 +394,7 @@ func (self *pool) AddAccountBlock(address types.Address, block *ledger.AccountBl
375394
ac.AddBlock(newAccountPoolBlock(block, nil, self.version, source))
376395
ac.AddReceivedBlock(block)
377396

397+
self.newAccBlockCond.Broadcast()
378398
}
379399

380400
func (self *pool) AddDirectAccountBlock(address types.Address, block *vm_db.VmAccountBlock) error {
@@ -408,6 +428,7 @@ func (self *pool) AddAccountBlocks(address types.Address, blocks []*ledger.Accou
408428
self.AddAccountBlock(address, b, source)
409429
}
410430

431+
self.newAccBlockCond.Broadcast()
411432
return nil
412433
}
413434

@@ -625,24 +646,16 @@ func (self *pool) loopCompact() {
625646
self.wg.Add(1)
626647
defer self.wg.Done()
627648

628-
t := time.NewTicker(time.Millisecond * 40)
629-
defer t.Stop()
630649
sum := 0
631650
for {
632651
select {
633652
case <-self.closed:
634653
return
635-
case <-t.C:
654+
default:
636655
if sum == 0 {
637-
//self.accountCond.L.Lock()
638-
//self.accountCond.Wait()
639-
//self.accountCond.L.Unlock()
640-
time.Sleep(200 * time.Millisecond)
656+
self.newAccBlockCond.WaitTimeout(30 * time.Millisecond)
641657
}
642658
sum = 0
643-
644-
sum += self.accountsCompact()
645-
default:
646659
sum += self.accountsCompact()
647660
}
648661
}

pool/pool_loop.go

+22-12
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,29 @@ loop:
1717
2. insert for queue
1818
*/
1919
func (self *pool) loopQueue() {
20+
self.wg.Add(1)
21+
defer self.wg.Done()
2022
for {
21-
q := self.makeQueue()
22-
size := q.Size()
23-
if size == 0 {
24-
time.Sleep(20 * time.Millisecond)
25-
continue
26-
}
27-
err := self.insertQueue(q)
28-
if err != nil {
29-
fmt.Printf("insert queue err:%s\n", err)
30-
fmt.Printf("all queue:%s\n", q.Info())
31-
time.Sleep(time.Second * 2)
32-
self.log.Crit("loop pool exit")
23+
select {
24+
case <-self.closed:
25+
return
26+
default:
27+
t1 := time.Now()
28+
q := self.makeQueue()
29+
size := q.Size()
30+
if size == 0 {
31+
time.Sleep(2 * time.Millisecond)
32+
continue
33+
}
34+
err := self.insertQueue(q)
35+
if err != nil {
36+
fmt.Printf("insert queue err:%s\n", err)
37+
fmt.Printf("all queue:%s\n", q.Info())
38+
time.Sleep(time.Second * 2)
39+
self.log.Crit("loop pool exit")
40+
}
41+
t2 := time.Now()
42+
self.log.Info(fmt.Sprintf("time duration:%s, size:%d", t2.Sub(t1), size))
3343
}
3444
}
3545
}

pool/processor_snapshot_package.go

+6
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,12 @@ func (self *snapshotPackage) AddItem(b *Item) error {
163163
if max > self.current {
164164
self.current = max
165165
}
166+
if !b.Snapshot() {
167+
err := self.accountExistsF(b.Hash())
168+
if err == nil {
169+
panic(fmt.Sprintf("panic for account block[%s][%d][%s] exist", b.Hash(), b.Height(), b.ownerWrapper))
170+
}
171+
}
166172
return err
167173
}
168174

pool/snapshot_pool.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@ type snapshotPool struct {
1818
BCPool
1919
//rwMu *sync.RWMutex
2020
//consensus consensus.AccountsConsensus
21-
closed chan struct{}
22-
wg sync.WaitGroup
23-
pool *pool
24-
rw *snapshotCh
25-
v *snapshotVerifier
26-
f *snapshotSyncer
27-
nextFetchTime time.Time
28-
nextInsertTime time.Time
29-
nextCompactTime time.Time
30-
hashBlacklist Blacklist
21+
closed chan struct{}
22+
wg sync.WaitGroup
23+
pool *pool
24+
rw *snapshotCh
25+
v *snapshotVerifier
26+
f *snapshotSyncer
27+
28+
nextFetchTime time.Time
29+
nextInsertTime time.Time
30+
nextCompactTime time.Time
31+
hashBlacklist Blacklist
32+
newSnapshotBlockCond *common.TimeoutCond
3133
}
3234

3335
func newSnapshotPoolBlock(block *ledger.SnapshotBlock, version *ForkVersion, source types.BlockSource) *snapshotPoolBlock {
@@ -68,17 +70,14 @@ func (self *snapshotPoolBlock) PrevHash() types.Hash {
6870
return self.block.PrevHash
6971
}
7072

71-
func (self *snapshotPoolBlock) Source() types.BlockSource {
72-
return self.source
73-
}
74-
7573
func newSnapshotPool(
7674
name string,
7775
version *ForkVersion,
7876
v *snapshotVerifier,
7977
f *snapshotSyncer,
8078
rw *snapshotCh,
8179
hashBlacklist Blacklist,
80+
cond *common.TimeoutCond,
8281
log log15.Logger,
8382
) *snapshotPool {
8483
pool := &snapshotPool{}
@@ -93,6 +92,7 @@ func newSnapshotPool(
9392
pool.nextInsertTime = now
9493
pool.nextCompactTime = now
9594
pool.hashBlacklist = hashBlacklist
95+
pool.newSnapshotBlockCond = cond
9696
return pool
9797
}
9898

@@ -305,9 +305,9 @@ func (self *snapshotPool) loop() {
305305
s1 := self.nextCompactTime.Sub(n2)
306306
s2 := self.nextInsertTime.Sub(n2)
307307
if s1 > s2 {
308-
time.Sleep(s2)
308+
self.newSnapshotBlockCond.WaitTimeout(s2)
309309
} else {
310-
time.Sleep(s1)
310+
self.newSnapshotBlockCond.WaitTimeout(s1)
311311
}
312312
monitor.LogTime("pool", "snapshotRealSleep", n2)
313313
last = time.Now()

0 commit comments

Comments
 (0)