Skip to content

Commit

Permalink
add support to process archive retrival using IPNS
Browse files Browse the repository at this point in the history
Signed-off-by: p4u <[email protected]>
  • Loading branch information
p4u committed Nov 2, 2023
1 parent f541236 commit 8b25d62
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 89 deletions.
6 changes: 6 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func newConfig() (*config.Config, config.Error) {
"enable IPFS group synchronization using the given secret key")
conf.Ipfs.ConnectPeers = *flag.StringSlice("ipfsConnectPeers", []string{},
"use custom ipfsconnect peers/bootnodes for accessing the DHT (comma-separated)")
conf.Vochain.Indexer.ArchiveURL = *flag.String("archiveURL", types.ArchiveURL, "enable archive retrival from the given IPNS url")

// vochain
conf.Vochain.P2PListen = *flag.String("vochainP2PListen", "0.0.0.0:26656",
Expand Down Expand Up @@ -198,6 +199,11 @@ func newConfig() (*config.Config, config.Error) {
// use different datadirs for different chains
conf.DataDir = filepath.Join(conf.DataDir, conf.Vochain.Chain)

if err = viper.BindPFlag("archiveURL", flag.Lookup("archiveURL")); err != nil {
log.Fatalf("failed to bind archiveURL flag to viper: %v", err)
}
conf.Vochain.Indexer.ArchiveURL = viper.GetString("archiveURL")

// add viper config path (now we know it)
viper.AddConfigPath(conf.DataDir)

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type IndexerCfg struct {
Enabled bool
// Disables live results computation on indexer
IgnoreLiveResults bool
// ArchiveURL is the URL where the archive is retrieved from (usually IPNS)
ArchiveURL string
}

// MetricsCfg initializes the metrics config
Expand Down
1 change: 1 addition & 0 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Storage interface {
Publish(ctx context.Context, data []byte) (string, error)
PublishReader(ctx context.Context, data io.Reader) (string, error)
Retrieve(ctx context.Context, id string, maxSize int64) ([]byte, error)
RetrieveDir(ctx context.Context, id string, maxSize int64) (map[string][]byte, error)
Pin(ctx context.Context, path string) error
Unpin(ctx context.Context, path string) error
ListPins(ctx context.Context) (map[string]string, error)
Expand Down
5 changes: 5 additions & 0 deletions data/datamocktest.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (d *DataMockTest) Retrieve(_ context.Context, id string, _ int64) ([]byte,
return d.rnd.RandomBytes(256), nil
}

func (d *DataMockTest) RetrieveDir(_ context.Context, id string, _ int64) (map[string][]byte, error) {
// TODO: Implement
return nil, nil
}

func (d *DataMockTest) Pin(_ context.Context, path string) error {
d.filesMu.Lock()
defer d.filesMu.Unlock()
Expand Down
76 changes: 55 additions & 21 deletions data/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,41 @@ func (i *Handler) ListPins(ctx context.Context) (map[string]string, error) {
return pinMap, nil
}

// RetrieveDir gets an IPFS directory and returns a map of all files and their content.
// It only supports 1 level of directory depth, so subdirectories are ignored.
func (i *Handler) RetrieveDir(ctx context.Context, path string, maxSize int64) (map[string][]byte, error) {
path = strings.Replace(path, "ipfs://", "/ipfs/", 1)

// first resolve the path
cpath, err := i.CoreAPI.ResolvePath(ctx, corepath.New(path))
if err != nil {
return nil, fmt.Errorf("could not resolve path %s", path)
}
// then get the file
f, err := i.CoreAPI.Unixfs().Get(ctx, cpath)
if err != nil {
return nil, fmt.Errorf("could not retrieve unixfs file: %w", err)
}

dirMap := make(map[string][]byte)
if dir := files.ToDir(f); dir != nil {
if err := files.Walk(dir, func(path string, node files.Node) error {
if file := files.ToFile(node); file != nil {
content, err := fetchFileContent(file)
if err != nil {
log.Warnw("could not retrieve file from directory", "path", path, "error", err)
return nil
}
dirMap[path] = content
}
return nil
}); err != nil {
return nil, err
}
}
return dirMap, nil
}

// Retrieve gets an IPFS file (either from the p2p network or from the local cache).
// If maxSize is 0, it is set to the hardcoded maximum of MaxFileSizeBytes.
func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]byte, error) {
Expand All @@ -277,32 +312,13 @@ func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]b
if err != nil {
return nil, fmt.Errorf("could not resolve path %s", path)
}

// then get the file
f, err := i.CoreAPI.Unixfs().Get(ctx, cpath)
if err != nil {
return nil, fmt.Errorf("could not retrieve unixfs file: %w", err)
}
file := files.ToFile(f)
if file == nil {
return nil, fmt.Errorf("object is not a file")
}
defer file.Close()

fsize, err := file.Size()
if err != nil {
return nil, err
}

if maxSize == 0 {
maxSize = MaxFileSizeBytes
}

if fsize > maxSize {
return nil, fmt.Errorf("file too big: %d", fsize)
}

content, err := io.ReadAll(file)
content, err := fetchFileContent(f)
if err != nil {
return nil, err
}
Expand All @@ -322,10 +338,28 @@ func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]b
// Save file to cache for future attempts
i.retrieveCache.Add(path, content)

log.Infow("retrieved file", "path", path, "size", fsize)
log.Infow("retrieved file", "path", path, "size", len(content))
return content, nil
}

func fetchFileContent(node files.Node) ([]byte, error) {
file := files.ToFile(node)
if file == nil {
return nil, fmt.Errorf("object is not a file")
}
defer file.Close()

fsize, err := file.Size()
if err != nil {
return nil, err
}

if fsize > MaxFileSizeBytes {
return nil, fmt.Errorf("file too big: %d", fsize)
}
return io.ReadAll(io.LimitReader(file, MaxFileSizeBytes))
}

// PublishIPNSpath creates or updates an IPNS record with the content of a
// filesystem path (a single file or a directory).
//
Expand Down
1 change: 1 addition & 0 deletions dockerfiles/vocdoninode/env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ VOCDONI_ENABLEAPI=True
VOCDONI_ENABLERPC=False
VOCDONI_LISTENHOST=0.0.0.0
VOCDONI_LISTENPORT=9090
VOCDONI_ARCHIVEURL=/ipns/k2k4r8mdn544n7f8nprwqeo27jr1v1unsu74th57s1j8mumjck7y7cbz
#VOCDONI_TLS_DIRCERT=
#VOCDONI_TLS_DOMAIN=
#VOCDONI_ADMINTOKEN=
Expand Down
6 changes: 6 additions & 0 deletions service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,11 @@ func (vs *VocdoniService) VochainIndexer() error {
}
// launch the indexer after sync routine (executed when the blockchain is ready)
go vs.Indexer.AfterSyncBootstrap(false)

if vs.Config.Indexer.ArchiveURL != "" {
log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL)
go vs.Indexer.StartArchiveRetrival(vs.Storage, vs.Config.Indexer.ArchiveURL)
}

return nil
}
3 changes: 3 additions & 0 deletions types/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ const (
// on a specific block
IndexerProcessEndingPrefix = byte(0x25)

// ArchiveURL is the default URL where the archive is retrieved from
ArchiveURL = "/ipns/k2k4r8mdn544n7f8nprwqeo27jr1v1unsu74th57s1j8mumjck7y7cbz"

// Vochain

// PetitionSign contains the string that needs to match with the received vote type
Expand Down
66 changes: 61 additions & 5 deletions vochain/indexer/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@ package indexer

import (
"context"
"encoding/json"
"fmt"
"time"

"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/types"
indexerdb "go.vocdoni.io/dvote/vochain/indexer/db"
"go.vocdoni.io/dvote/vochain/indexer/indexertypes"
"go.vocdoni.io/dvote/vochain/results"
"go.vocdoni.io/proto/build/go/models"
)

const (
marxArchiveFileSize = 1024 * 100 // 100KB
timeoutArchiveRetrieval = 60 * time.Second
archiveFetchInterval = 20 * time.Minute
archiveFileNameSize = types.ProcessIDsize * 2 // 64 hex chars
)

// ArchiveProcess is the struct used to store the process data in the archive.
type ArchiveProcess struct {
ChainID string `json:"chainId,omitempty"`
Expand All @@ -25,16 +35,22 @@ type ArchiveProcess struct {
func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error {
tx, err := idx.readWriteDB.Begin()
if err != nil {
panic(err) // shouldn't happen, use an error return if it ever does
return err
}
queries := idx.blockQueries.WithTx(tx)
defer tx.Rollback()
height := idx.App.State.CurrentHeight()

queries := indexerdb.New(tx)
addCount := 0
for _, p := range archive {
if idx.App.ChainID() == p.ChainID {
log.Warnw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String())
log.Debugw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String())
continue
}
if p.ProcessInfo == nil {
log.Debugw("skipping import of archive process with nil process info")
continue
}

// Check if election already exists
if _, err := idx.ProcessInfo(p.ProcessInfo.ID); err != nil {
if err != ErrProcessNotFound {
Expand Down Expand Up @@ -72,11 +88,51 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error {
SourceNetworkID: int64(models.SourceNetworkId_value[p.ProcessInfo.SourceNetworkId]),
Metadata: p.ProcessInfo.Metadata,
ResultsVotes: indexertypes.EncodeJSON(p.Results.Votes),
// TODO: Add a boolean field to the process table to indicate if the process is archived
VoteCount: int64(p.ProcessInfo.VoteCount),
}
if _, err := queries.CreateProcess(context.TODO(), procParams); err != nil {
return fmt.Errorf("create archive process: %w", err)
}
addCount++
}
if addCount > 0 {
log.Infow("archive new elections imported", "elections", addCount)
}
return tx.Commit()
}

// StartArchiveRetrival starts the archive retrieval process. It is a blocking function that runs continuously.
// Retrieves the archive directory from the storage and imports the processes into the indexer database.
func (idx *Indexer) StartArchiveRetrival(storage data.Storage, archiveURL string) {
for {
ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval)
dirMap, err := storage.RetrieveDir(ctx, archiveURL, marxArchiveFileSize)
cancel()
if err != nil {
log.Warnw("cannot retrieve archive directory", "url", archiveURL, "err", err)
continue
}
archive := []*ArchiveProcess{}
for name, data := range dirMap {
if len(data) == 0 {
continue
}
if len(name) != archiveFileNameSize {
continue
}
var p ArchiveProcess
if err := json.Unmarshal(data, &p); err != nil {
log.Warnw("cannot unmarshal archive process", "name", name, "err", err)
continue
}
archive = append(archive, &p)
}

log.Debugw("archive processes unmarshaled", "processes", len(archive))
if err := idx.ImportArchive(archive); err != nil {
log.Warnw("cannot import archive", "err", err)
}

time.Sleep(archiveFetchInterval)
}
}
67 changes: 6 additions & 61 deletions vochain/indexer/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package indexer

import (
"encoding/json"
"fmt"
"testing"

qt "github.com/frankban/quicktest"
Expand All @@ -15,20 +14,20 @@ func TestImportArchive(t *testing.T) {

archive := []*ArchiveProcess{}
archiveProcess1 := &ArchiveProcess{}
archiveProcess2 := &ArchiveProcess{}
err := json.Unmarshal([]byte(testArchiveProcess1), archiveProcess1)
qt.Assert(t, err, qt.IsNil)
err = json.Unmarshal([]byte(testArchiveProcess2), archiveProcess2)
qt.Assert(t, err, qt.IsNil)
archive = append(archive, archiveProcess1)
archive = append(archive, archiveProcess2)

err = idx.ImportArchive(archive)
qt.Assert(t, err, qt.IsNil)

process1, err := idx.ProcessInfo(archiveProcess1.ProcessInfo.ID)
qt.Assert(t, err, qt.IsNil)
fmt.Printf("process1: %+v\n", process1)
qt.Assert(t, process1.ID.String(), qt.Equals, archiveProcess1.ProcessInfo.ID.String())
qt.Assert(t, process1.Results().Votes[0][0].MathBigInt().Int64(), qt.Equals, int64(342))
qt.Assert(t, process1.Results().Votes[0][1].MathBigInt().Int64(), qt.Equals, int64(365))
qt.Assert(t, process1.Results().Votes[0][2].MathBigInt().Int64(), qt.Equals, int64(21))
// TODO: qt.Assert(t, process1.Results().Weight.MathBigInt().Int64(), qt.Equals, int64(342+365+21))
}

var testArchiveProcess1 = `
Expand All @@ -40,6 +39,7 @@ var testArchiveProcess1 = `
"startBlock": 236631,
"endBlock": 240060,
"blockCount": 3515,
"voteCount": 728,
"censusRoot": "09397c5b65efc0e95b338b610ef38394312e94418d4a828f8820008378758451",
"censusURI": "ipfs://bafybeifduki6huacmtufg5avyo6g2pxxefwohsks3fp63w3dd733ixzecu",
"metadata": "ipfs://bafybeiel2cvsrg7d4frqxziberrpnjj4yxzs5peyxkgsvamylpxxdn4m7q",
Expand Down Expand Up @@ -88,58 +88,3 @@ var testArchiveProcess1 = `
"startDate": "2023-10-20T07:59:49.977338317Z"
}
`

var testArchiveProcess2 = `
{
"chainId": "vocdoni-stage-8",
"process": {
"processId": "c5d2460186f7d8fcfaa76192aa69cceddaec554b1d82b0166dc9020000000031",
"entityId": "d8fcfaa76192aa69cceddaec554b1d82b0166dc9",
"startBlock": 287823,
"endBlock": 287858,
"blockCount": 35,
"censusRoot": "8b840a7ddbadfbc98145bf86cf3e8f53f3670a703b1b92f9d0ee4b065eae970b",
"censusURI": "ipfs://bafybeigff4k3pqxzokvixov6s34yaw6dje474ghnehz6duaxryyf2hd5ke",
"metadata": "ipfs://bafybeicyi4o3hmp27ejkakzqrdk37e2n3julvmwg3txwyyqfub3pwz6ora",
"censusOrigin": 2,
"status": 5,
"namespace": 0,
"envelopeType": {},
"processMode": {
"autoStart": true,
"interruptible": true
},
"voteOptions": {
"maxCount": 1,
"maxValue": 2,
"costExponent": 10000
},
"questionIndex": 0,
"creationTime": "2023-10-26T13:24:25Z",
"haveResults": true,
"finalResults": true,
"sourceBlockHeight": 0,
"sourceNetworkId": "UNKNOWN",
"maxCensusSize": 2
},
"results": {
"processId": "c5d2460186f7d8fcfaa76192aa69cceddaec554b1d82b0166dc9020000000031",
"votes": [
[
"0",
"1000000000000000000000",
"0"
]
],
"weight": null,
"envelopeType": {},
"voteOptions": {
"maxCount": 1,
"maxValue": 2,
"costExponent": 10000
},
"blockHeight": 0
},
"startDate": "2023-10-26T13:24:56.609443588Z"
}
`
Loading

0 comments on commit 8b25d62

Please sign in to comment.