Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer: static vote_count plus fixes #1173

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions vochain/indexer/db/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions vochain/indexer/db/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 25 additions & 40 deletions vochain/indexer/db/processes.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 33 additions & 5 deletions vochain/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type Indexer struct {
blockQueries *indexerdb.Queries
// blockUpdateProcs is the list of process IDs that require sync with the state database.
// The key is a types.ProcessID as a string, so that it can be used as a map key.
blockUpdateProcs map[string]bool
blockUpdateProcs map[string]bool
blockUpdateProcVoteCounts map[string]bool

// list of live processes (those on which the votes will be computed on arrival)
// TODO: we could query the procs table, perhaps memoizing to avoid querying the same over and over again?
Expand All @@ -100,7 +101,13 @@ func NewIndexer(dataDir string, app *vochain.BaseApplication, countLiveResults b
App: app,
ignoreLiveResults: !countLiveResults,

blockUpdateProcs: make(map[string]bool),
// TODO(mvdan): these three maps are all keyed by process ID,
// and each of them needs to query existing data from the DB.
// Since the map keys very often overlap, consider joining the maps
// so that we can also reuse queries to the DB.
votePool: make(map[string]map[string]*state.Vote),
blockUpdateProcs: make(map[string]bool),
blockUpdateProcVoteCounts: make(map[string]bool),
}
log.Infow("indexer initialization", "dataDir", dataDir, "liveResults", countLiveResults)

Expand Down Expand Up @@ -405,6 +412,25 @@ func (idx *Indexer) Commit(height uint32) error {
}
}()
}
clear(idx.votePool)

// Note that we re-compute each process vote count from the votes table,
// since simply incrementing the vote count would break with vote overwrites.
for pidStr := range idx.blockUpdateProcVoteCounts {
pid := []byte(pidStr)
voteCount, err := queries.CountVotesByProcessID(ctx, pid)
if err != nil {
log.Errorw(err, "could not get vote count")
continue
}
if _, err := queries.SetProcessVoteCount(ctx, indexerdb.SetProcessVoteCountParams{
ID: pid,
VoteCount: voteCount,
}); err != nil {
log.Errorw(err, "could not set vote count")
}
}
clear(idx.blockUpdateProcVoteCounts)

if err := idx.blockTx.Commit(); err != nil {
log.Errorw(err, "could not commit tx")
Expand All @@ -424,14 +450,15 @@ func (idx *Indexer) Commit(height uint32) error {
func (idx *Indexer) Rollback() {
idx.blockMu.Lock()
defer idx.blockMu.Unlock()
idx.votePool = make(map[string]map[string]*state.Vote)
clear(idx.votePool)
clear(idx.blockUpdateProcs)
clear(idx.blockUpdateProcVoteCounts)
if idx.blockTx != nil {
if err := idx.blockTx.Rollback(); err != nil {
log.Errorw(err, "could not rollback tx")
}
idx.blockTx = nil
}
clear(idx.blockUpdateProcs)
}

// OnProcess indexer stores the processID
Expand All @@ -450,9 +477,9 @@ func (idx *Indexer) OnProcess(pid, _ []byte, _, _ string, _ int32) {
// voterID is the identifier of the voter, the most common case is an ethereum address
// but can be any kind of id expressed as bytes.
func (idx *Indexer) OnVote(vote *state.Vote, txIndex int32) {
pid := string(vote.ProcessID)
if !idx.ignoreLiveResults && idx.isProcessLiveResults(vote.ProcessID) {
// Since []byte in Go isn't comparable, but we can convert any bytes to string.
pid := string(vote.ProcessID)
nullifier := string(vote.Nullifier)
if idx.votePool[pid] == nil {
idx.votePool[pid] = make(map[string]*state.Vote)
Expand All @@ -471,6 +498,7 @@ func (idx *Indexer) OnVote(vote *state.Vote, txIndex int32) {
if err := idx.addVoteIndex(context.TODO(), queries, vote, txIndex); err != nil {
log.Errorw(err, "could not index vote")
}
idx.blockUpdateProcVoteCounts[pid] = true
}

// OnCancel indexer stores the processID and entityID
Expand Down
5 changes: 2 additions & 3 deletions vochain/indexer/indexertypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Process struct {
StartBlock uint32 `json:"startBlock"`
EndBlock uint32 `json:"endBlock"`
BlockCount uint32 `json:"blockCount"`
VoteCount uint64 `json:"voteCount"`
CensusRoot types.HexBytes `json:"censusRoot"`
CensusURI string `json:"censusURI"`
Metadata string `json:"metadata"`
Expand All @@ -42,8 +43,6 @@ type Process struct {
PrivateKeys json.RawMessage `json:"-"` // json array
PublicKeys json.RawMessage `json:"-"` // json array

VoteCount uint64 `json:"-"` // via LEFT JOIN

ResultsVotes [][]*types.BigInt `json:"-"`
ResultsWeight *types.BigInt `json:"-"`
ResultsBlockHeight uint32 `json:"-"`
Expand All @@ -60,7 +59,7 @@ func (p *Process) Results() *results.Results {
}
}

func ProcessFromDB(dbproc *indexerdb.GetProcessRow) *Process {
func ProcessFromDB(dbproc *indexerdb.Process) *Process {
proc := &Process{
ID: dbproc.ID,
EntityID: nonEmptyBytes(dbproc.EntityID),
Expand Down
1 change: 1 addition & 0 deletions vochain/indexer/migrations/0001_create_table_processes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CREATE TABLE processes (
start_block INTEGER NOT NULL,
end_block INTEGER NOT NULL,
block_count INTEGER NOT NULL,
vote_count INTEGER NOT NULL,

have_results BOOLEAN NOT NULL,
final_results BOOLEAN NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions vochain/indexer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (idx *Indexer) newEmptyProcess(pid []byte) error {
StartBlock: int64(p.StartBlock),
EndBlock: int64(p.BlockCount + p.StartBlock),
BlockCount: int64(p.BlockCount),
VoteCount: 0, // an empty process has no votes yet
HaveResults: !p.EnvelopeType.EncryptedVotes, // like isOpenProcess, but on the state type
CensusRoot: nonNullBytes(p.CensusRoot),
MaxCensusSize: int64(p.GetMaxCensusSize()),
Expand Down
17 changes: 10 additions & 7 deletions vochain/indexer/queries/processes.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- name: CreateProcess :execresult
INSERT INTO processes (
id, entity_id, start_block, end_block, block_count,
have_results, final_results, census_root,
vote_count, have_results, final_results, census_root,
max_census_size, census_uri, metadata,
census_origin, status, namespace,
envelope, mode, vote_opts,
Expand All @@ -12,7 +12,7 @@ INSERT INTO processes (
results_votes, results_weight, results_block_height
) VALUES (
?, ?, ?, ?, ?,
?, ?, ?,
?, ?, ?, ?,
?, ?, ?,
?, ?, ?,
?, ?, ?,
Expand All @@ -24,11 +24,9 @@ INSERT INTO processes (
);

-- name: GetProcess :one
SELECT p.*, COUNT(v.nullifier) AS vote_count FROM processes AS p
LEFT JOIN votes AS v
ON p.id = v.process_id
WHERE p.id = ?
GROUP BY p.id
SELECT * FROM processes
WHERE id = ?
GROUP BY id
LIMIT 1;

-- name: SearchProcesses :many
Expand Down Expand Up @@ -80,6 +78,11 @@ UPDATE processes
SET have_results = FALSE, final_results = TRUE
WHERE id = sqlc.arg(id);

-- name: SetProcessVoteCount :execresult
UPDATE processes
SET vote_count = sqlc.arg(vote_count)
WHERE id = sqlc.arg(id);

-- name: GetProcessCount :one
SELECT COUNT(*) FROM processes;

Expand Down
2 changes: 1 addition & 1 deletion vochain/indexer/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (*Indexer) addLiveVote(process *indexertypes.Process, VotePackage []byte, w
// This method is triggered by Commit callback for each vote added to the blockchain.
// If txn is provided the vote will be added on the transaction (without performing a commit).
func (*Indexer) addVoteIndex(ctx context.Context, queries *indexerdb.Queries, vote *state.Vote, txIndex int32) error {
weightStr := "1"
weightStr := `"1"` // note that weight is stored as a JSON string-quoted number
if vote.Weight != nil {
weightStr = indexertypes.EncodeJSON((*types.BigInt)(vote.Weight))
}
Expand Down