Skip to content

Commit

Permalink
add support to process archive retrival using IPNS
Browse files Browse the repository at this point in the history
Signed-off-by: p4u <[email protected]>
  • Loading branch information
p4u committed Oct 30, 2023
1 parent fcd11e5 commit 7b0d3fb
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 24 deletions.
6 changes: 6 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func newConfig() (*config.Config, config.Error) {
"enable IPFS group synchronization using the given secret key")
conf.Ipfs.ConnectPeers = *flag.StringSlice("ipfsConnectPeers", []string{},
"use custom ipfsconnect peers/bootnodes for accessing the DHT (comma-separated)")
conf.Vochain.Indexer.ArchiveURL = *flag.String("archiveURL", types.ArchiveURL, "enable archive retrival from the given IPNS url")

// vochain
conf.Vochain.P2PListen = *flag.String("vochainP2PListen", "0.0.0.0:26656",
Expand Down Expand Up @@ -199,6 +200,11 @@ func newConfig() (*config.Config, config.Error) {
// use different datadirs for different chains
conf.DataDir = filepath.Join(conf.DataDir, conf.Vochain.Chain)

if err = viper.BindPFlag("archiveURL", flag.Lookup("archiveURL")); err != nil {
log.Fatalf("failed to bind archiveURL flag to viper: %v", err)
}
conf.Vochain.Indexer.ArchiveURL = viper.GetString("archiveURL")

// add viper config path (now we know it)
viper.AddConfigPath(conf.DataDir)

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type IndexerCfg struct {
Enabled bool
// Disables live results computation on indexer
IgnoreLiveResults bool
// ArchiveURL is the URL where the archive is retrieved from (usually IPNS)
ArchiveURL string
}

// MetricsCfg initializes the metrics config
Expand Down
1 change: 1 addition & 0 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Storage interface {
Init(d *types.DataStore) error
Publish(ctx context.Context, data []byte) (string, error)
Retrieve(ctx context.Context, id string, maxSize int64) ([]byte, error)
RetrieveDir(ctx context.Context, id string, maxSize int64) (map[string][]byte, error)
Pin(ctx context.Context, path string) error
Unpin(ctx context.Context, path string) error
ListPins(ctx context.Context) (map[string]string, error)
Expand Down
5 changes: 5 additions & 0 deletions data/datamocktest.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (d *DataMockTest) Retrieve(_ context.Context, id string, _ int64) ([]byte,
return d.rnd.RandomBytes(256), nil
}

func (d *DataMockTest) RetrieveDir(_ context.Context, id string, _ int64) (map[string][]byte, error) {
// TODO: Implement
return nil, nil
}

func (d *DataMockTest) Pin(_ context.Context, path string) error {
d.filesMu.Lock()
defer d.filesMu.Unlock()
Expand Down
77 changes: 56 additions & 21 deletions data/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,41 @@ func (i *Handler) ListPins(ctx context.Context) (map[string]string, error) {
return pinMap, nil
}

// RetrieveDir gets an IPFS directory and returns a map of all files and their content.
// It only supports 1 level of directory depth, so subdirectories are ignored.
func (i *Handler) RetrieveDir(ctx context.Context, path string, maxSize int64) (map[string][]byte, error) {
path = strings.Replace(path, "ipfs://", "/ipfs/", 1)

// first resolve the path
cpath, err := i.CoreAPI.ResolvePath(ctx, corepath.New(path))
if err != nil {
return nil, fmt.Errorf("could not resolve path %s", path)
}
// then get the file
f, err := i.CoreAPI.Unixfs().Get(ctx, cpath)
if err != nil {
return nil, fmt.Errorf("could not retrieve unixfs file: %w", err)
}

dirMap := make(map[string][]byte)
if dir := files.ToDir(f); dir != nil {
if err := files.Walk(dir, func(path string, node files.Node) error {
if file := files.ToFile(node); file != nil {
content, err := fetchFileContent(file)
if err != nil {
log.Warnw("could not retrieve file from directory", "path", path, "error", err)
return nil
}
dirMap[path] = content
}
return nil
}); err != nil {
return nil, err
}
}
return dirMap, nil
}

// Retrieve gets an IPFS file (either from the p2p network or from the local cache).
// If maxSize is 0, it is set to the hardcoded maximum of MaxFileSizeBytes.
func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]byte, error) {
Expand All @@ -268,32 +303,13 @@ func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]b
if err != nil {
return nil, fmt.Errorf("could not resolve path %s", path)
}

// then get the file
f, err := i.CoreAPI.Unixfs().Get(ctx, cpath)
if err != nil {
return nil, fmt.Errorf("could not retrieve unixfs file: %w", err)
}
file := files.ToFile(f)
if file == nil {
return nil, fmt.Errorf("object is not a file")
}
defer file.Close()

fsize, err := file.Size()
if err != nil {
return nil, err
}

if maxSize == 0 {
maxSize = MaxFileSizeBytes
}

if fsize > maxSize {
return nil, fmt.Errorf("file too big: %d", fsize)
}

content, err := io.ReadAll(file)
content, err := fetchFileContent(f)
if err != nil {
return nil, err
}
Expand All @@ -313,10 +329,29 @@ func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]b
// Save file to cache for future attempts
i.retrieveCache.Add(path, content)

log.Infow("retrieved file", "path", path, "size", fsize)
log.Infow("retrieved file", "path", path, "size", len(content))
return content, nil
}

func fetchFileContent(node files.Node) ([]byte, error) {
file := files.ToFile(node)
if file == nil {
return nil, fmt.Errorf("object is not a file")
}
defer file.Close()

fsize, err := file.Size()
if err != nil {
return nil, err
}

if fsize > MaxFileSizeBytes {
return nil, fmt.Errorf("file too big: %d", fsize)
}

return io.ReadAll(file)
}

// PublishIPNSpath creates or updates an IPNS record with the content of a
// filesystem path (a single file or a directory).
//
Expand Down
1 change: 1 addition & 0 deletions dockerfiles/vocdoninode/env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ VOCDONI_ENABLEAPI=True
VOCDONI_ENABLERPC=False
VOCDONI_LISTENHOST=0.0.0.0
VOCDONI_LISTENPORT=9090
VOCDONI_ARCHIVEURL=/ipns/k2k4r8mdn544n7f8nprwqeo27jr1v1unsu74th57s1j8mumjck7y7cbz
#VOCDONI_TLS_DIRCERT=
#VOCDONI_TLS_DOMAIN=
#VOCDONI_ADMINTOKEN=
Expand Down
6 changes: 6 additions & 0 deletions service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,11 @@ func (vs *VocdoniService) VochainIndexer() error {
}
// launch the indexer after sync routine (executed when the blockchain is ready)
go vs.Indexer.AfterSyncBootstrap(false)

if vs.Config.Indexer.ArchiveURL != "" {
log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL)
go vs.Indexer.StartArchiveRetrival(vs.Storage, vs.Config.Indexer.ArchiveURL)
}

return nil
}
3 changes: 3 additions & 0 deletions types/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ const (
// on a specific block
IndexerProcessEndingPrefix = byte(0x25)

// ArchiveURL is the default URL where the archive is retrieved from
ArchiveURL = "/ipns/k2k4r8mdn544n7f8nprwqeo27jr1v1unsu74th57s1j8mumjck7y7cbz"

// Vochain

// PetitionSign contains the string that needs to match with the received vote type
Expand Down
58 changes: 57 additions & 1 deletion vochain/indexer/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@ package indexer

import (
"context"
"encoding/json"
"fmt"
"time"

"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/types"
indexerdb "go.vocdoni.io/dvote/vochain/indexer/db"
"go.vocdoni.io/dvote/vochain/indexer/indexertypes"
"go.vocdoni.io/dvote/vochain/results"
"go.vocdoni.io/proto/build/go/models"
)

const (
marxArchiveFileSize = 1024 * 100 // 100KB
timeoutArchiveRetrieval = 60 * time.Second
archiveFetchInterval = 20 * time.Minute
archiveFileNameSize = types.ProcessIDsize * 2 // 64 hex chars
)

// ArchiveProcess is the struct used to store the process data in the archive.
type ArchiveProcess struct {
ChainID string `json:"chainId,omitempty"`
Expand All @@ -30,11 +40,17 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error {
queries := idx.blockQueries.WithTx(tx)
height := idx.App.State.CurrentHeight()

addCount := 0
for _, p := range archive {
if idx.App.ChainID() == p.ChainID {
log.Warnw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String())
log.Debugw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String())
continue
}
if p.ProcessInfo == nil {
log.Debugw("skipping import of archive process with nil process info")
continue
}

// Check if election already exists
if _, err := idx.ProcessInfo(p.ProcessInfo.ID); err != nil {
if err != ErrProcessNotFound {
Expand Down Expand Up @@ -77,6 +93,46 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) error {
if _, err := queries.CreateProcess(context.TODO(), procParams); err != nil {
return fmt.Errorf("create archive process: %w", err)
}
addCount++
}
if addCount > 0 {
log.Infow("archive new elections imported", "elections", addCount)
}
return tx.Commit()
}

// StartArchiveRetrival starts the archive retrieval process. It is a blocking function that runs continuously.
// Retrieves the archive directory from the storage and imports the processes into the indexer database.
func (idx *Indexer) StartArchiveRetrival(storage data.Storage, archiveURL string) {
for {
ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval)
dirMap, err := storage.RetrieveDir(ctx, archiveURL, marxArchiveFileSize)
cancel()
if err != nil {
log.Warnw("cannot retrieve archive directory", "url", archiveURL, "err", err)
continue
}
archive := []*ArchiveProcess{}
for name, data := range dirMap {
if len(data) == 0 {
continue
}
if len(name) != archiveFileNameSize {
continue
}
var p ArchiveProcess
if err := json.Unmarshal(data, &p); err != nil {
log.Warnw("cannot unmarshal archive process", "name", name, "err", err)
continue
}
archive = append(archive, &p)
}

log.Debugw("archive processes unmarshaled", "processes", len(archive))
if err := idx.ImportArchive(archive); err != nil {
log.Warnw("cannot import archive", "err", err)
}

time.Sleep(archiveFetchInterval)
}
}
3 changes: 1 addition & 2 deletions vochain/processarchive/processarchive.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func BuildIndex(datadir string) (*Index, error) {
// The key parameter must be either a valid IPFS base64 encoded private key
// or empty (a new key will be generated).
// If ipfs is nil, only JSON archive storage will be performed.
func NewProcessArchive(s *indexer.Indexer, ipfs *ipfs.Handler,
datadir, key string) (*ProcessArchive, error) {
func NewProcessArchive(s *indexer.Indexer, ipfs *ipfs.Handler, datadir, key string) (*ProcessArchive, error) {
js, err := NewJsonStorage(datadir)
if err != nil {
return nil, fmt.Errorf("could not create process archive: %w", err)
Expand Down

0 comments on commit 7b0d3fb

Please sign in to comment.