Skip to content

Commit

Permalink
archive: some minor fixes
Browse files Browse the repository at this point in the history
Signed-off-by: p4u <[email protected]>
  • Loading branch information
p4u committed Nov 28, 2023
1 parent 244ed50 commit 05b72b8
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 119 deletions.
6 changes: 5 additions & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func main() {
conf.Vochain.OffChainDataDownloader = conf.Vochain.OffChainDataDownloader &&
conf.Mode == types.ModeGateway

// create the vochain service
// create the vochain service
if err = srv.Vochain(); err != nil {
log.Fatal(err)
}
Expand All @@ -556,6 +556,10 @@ func main() {
}
// create the indexer service
if conf.Vochain.Indexer.Enabled {
// disable archive if process archive mode is enabled
if conf.Vochain.ProcessArchive {
conf.Vochain.Indexer.ArchiveURL = ""
}
if err := srv.VochainIndexer(); err != nil {
log.Fatal(err)
}
Expand Down
46 changes: 9 additions & 37 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"

"go.vocdoni.io/dvote/data/ipfs"
"go.vocdoni.io/dvote/types"
)

Expand All @@ -25,40 +24,13 @@ type Storage interface {
Stop() error
}

// StorageID is the type for the different storage providers.
// Currently only IPFS is supported.
type StorageID int

const (
// IPFS is the InterPlanetary File System.
IPFS StorageID = iota + 1
var (
// ErrInvalidPath is returned when the path provided is not valid.
ErrInvalidPath = fmt.Errorf("invalid storage path")
// ErrUnavailable is returned when the storage path is not available.
ErrUnavailable = fmt.Errorf("storage path is unavailable")
// ErrNotFound is returned when the file is not found (cannot be fetch).
ErrNotFound = fmt.Errorf("storage file not found")
// ErrTimeout is returned when the storage context times out.
ErrTimeout = fmt.Errorf("storage context timeout")
)

// StorageIDFromString returns the Storage identifier from a string.
func StorageIDFromString(i string) StorageID {
switch i {
case "IPFS":
return IPFS
default:
return -1
}
}

// IPFSNewConfig returns a new DataStore configuration for IPFS.
func IPFSNewConfig(path string) *types.DataStore {
datastore := new(types.DataStore)
datastore.Datadir = path
return datastore
}

// Init returns a new Storage instance of type `t`.
func Init(t StorageID, d *types.DataStore) (Storage, error) {
switch t {
case IPFS:
s := new(ipfs.Handler)
err := s.Init(d)
return s, err
default:
return nil, fmt.Errorf("bad storage type or DataStore specification")
}
}
8 changes: 4 additions & 4 deletions data/datamocktest.go → data/datamocktest/datamocktest.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package data
package datamocktest

import (
"context"
"io"
"maps"
"os"
"sync"
"time"

"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/data/ipfs"
"go.vocdoni.io/dvote/test/testcommon/testutil"
"go.vocdoni.io/dvote/types"
Expand Down Expand Up @@ -51,7 +51,7 @@ func (d *DataMockTest) Retrieve(_ context.Context, id string, _ int64) ([]byte,
return []byte(data), nil
}
if d.rnd.RandomIntn(2) == 0 {
return nil, os.ErrDeadlineExceeded
return nil, data.ErrTimeout
}
time.Sleep(200 * time.Millisecond)
return d.rnd.RandomBytes(256), nil
Expand All @@ -77,7 +77,7 @@ func (d *DataMockTest) Unpin(_ context.Context, path string) error {
d.filesMu.Lock()
defer d.filesMu.Unlock()
if _, ok := d.files[path]; !ok {
return os.ErrNotExist
return data.ErrNotFound
}
delete(d.files, path)
return nil
Expand Down
27 changes: 17 additions & 10 deletions data/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"maps"
"os"
"strings"
"sync"
"sync/atomic"
Expand All @@ -21,10 +20,10 @@ const (
ImportQueueRoutines = 32
// ImportRetrieveTimeout the maximum duration the import queue will wait
// for retrieving a remote file.
ImportRetrieveTimeout = 5 * time.Minute
ImportRetrieveTimeout = 3 * time.Minute
// ImportPinTimeout is the maximum duration the import queue will wait
// for pinning a remote file.
ImportPinTimeout = 3 * time.Minute
ImportPinTimeout = 2 * time.Minute
// MaxFileSize is the maximum size of a file that can be imported.
MaxFileSize = 100 * 1024 * 1024 // 100MB

Expand Down Expand Up @@ -122,16 +121,16 @@ func (d *Downloader) handleImport(item *DownloadItem) {
d.queueAddDelta(1)
defer d.queueAddDelta(-1)
ctx, cancel := context.WithTimeout(context.Background(), ImportRetrieveTimeout)
data, err := d.RemoteStorage.Retrieve(ctx, item.URI, MaxFileSize)
file, err := d.RemoteStorage.Retrieve(ctx, item.URI, MaxFileSize)
cancel()
if err != nil {
if os.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) {
if errors.Is(err, data.ErrTimeout) {
log.Warnw("timeout importing file, adding it to failed queue for retry", "uri", item.URI)
d.failedQueueLock.Lock()
d.failedQueue[item.URI] = item
d.failedQueueLock.Unlock()
} else {
log.Warnw("could not retrieve file", "uri", item.URI, "error", fmt.Sprintf("%v", err))
log.Warnw("could not retrieve file", "uri", item.URI, "error", err)
}
return
}
Expand All @@ -145,7 +144,7 @@ func (d *Downloader) handleImport(item *DownloadItem) {
}
}()
if item.Callback != nil {
go item.Callback(item.URI, data)
go item.Callback(item.URI, file)
}

}
Expand Down Expand Up @@ -181,11 +180,19 @@ func (d *Downloader) importFailedQueue() map[string]*DownloadItem {
// handleImportFailedQueue tries to import files that failed.
func (d *Downloader) handleImportFailedQueue() {
for cid, item := range d.importFailedQueue() {
log.Debugf("retrying download %s", cid)
log.Debugw("retrying failed download", "cid", cid)
ctx, cancel := context.WithTimeout(context.Background(), ImportRetrieveTimeout)
data, err := d.RemoteStorage.Retrieve(ctx, strings.TrimPrefix(item.URI, d.RemoteStorage.URIprefix()), 0)
file, err := d.RemoteStorage.Retrieve(ctx, strings.TrimPrefix(item.URI, d.RemoteStorage.URIprefix()), MaxFileSize)
cancel()
if err != nil {
if !errors.Is(err, data.ErrTimeout) {
// if the error is not a timeout, we remove the item from the failed queue
d.failedQueueLock.Lock()
delete(d.failedQueue, cid)
d.failedQueueLock.Unlock()
log.Debugw("removed download from failed queue, we won't try anymore", "cid", cid)
}
// if the error is a timeout, we just continue to retry
continue
}
d.failedQueueLock.Lock()
Expand All @@ -199,7 +206,7 @@ func (d *Downloader) handleImportFailedQueue() {
log.Warnf("could not pin file %q: %v", uri, err)
}
if item.Callback != nil {
go item.Callback(uri, data)
go item.Callback(uri, file)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions data/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"testing"

qt "github.com/frankban/quicktest"
"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/data/datamocktest"
)

func TestDownloader(t *testing.T) {
stg := data.DataMockTest{}
stg := datamocktest.DataMockTest{}
stg.Init(nil)
d := NewDownloader(&stg)
d.Start()
Expand Down
31 changes: 21 additions & 10 deletions data/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ipfs
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -30,6 +31,7 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"go.vocdoni.io/dvote/data"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/types"
)
Expand Down Expand Up @@ -87,7 +89,7 @@ func (i *Handler) Init(d *types.DataStore) error {
log.Errorw(err, "error running ipfs garbage collector")
}
}()
log.Infow("IPFS initialization",
log.Debugw("IPFS initialization",
"peerID", node.Identity.String(),
"addresses", node.PeerHost.Addrs(),
"pubKey", node.PrivateKey.GetPublic(),
Expand Down Expand Up @@ -189,7 +191,7 @@ func (i *Handler) Pin(ctx context.Context, path string) error {
path = strings.Replace(path, "ipfs://", "/ipfs/", 1)
p, err := ipfspath.NewPath(path)
if err != nil {
return err
return data.ErrInvalidPath
}
return i.CoreAPI.Pin().Add(ctx, p)
}
Expand All @@ -216,7 +218,7 @@ func (i *Handler) Unpin(ctx context.Context, path string) error {
path = strings.Replace(path, "ipfs://", "/ipfs/", 1)
cpath, err := ipfspath.NewPath(path)
if err != nil {
return err
return data.ErrInvalidPath
}
log.Debugf("removing pin %s", cpath)
return i.CoreAPI.Pin().Rm(ctx, cpath, options.Pin.RmRecursive(true))
Expand Down Expand Up @@ -272,16 +274,20 @@ func (i *Handler) RetrieveDir(ctx context.Context, path string, maxSize int64) (
// first resolve the path
p, err := ipfspath.NewPath(path)
if err != nil {
return nil, err
return nil, data.ErrInvalidPath
}
cpath, _, err := i.CoreAPI.ResolvePath(ctx, p)
if err != nil {
return nil, fmt.Errorf("could not resolve path %s", path)
return nil, data.ErrUnavailable
}
// then get the file
f, err := i.CoreAPI.Unixfs().Get(ctx, cpath)
if err != nil {
return nil, fmt.Errorf("could not retrieve unixfs file: %w", err)
// if error is timeout or context deadline exceeded, return timeout error
if os.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) {
return nil, data.ErrTimeout
}
return nil, data.ErrNotFound
}

dirMap := make(map[string][]byte)
Expand Down Expand Up @@ -311,23 +317,28 @@ func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]b
// check if we have the file in the local cache
ccontent, _ := i.retrieveCache.Get(path)
if ccontent != nil {
log.Debugf("retrieved file %s from cache", path)
return ccontent, nil
}

// first resolve the path
p, err := ipfspath.NewPath(path)
if err != nil {
return nil, err
log.Warnw("invalid path", "path", path, "error", err)
return nil, data.ErrInvalidPath
}
cpath, _, err := i.CoreAPI.ResolvePath(ctx, p)
if err != nil {
return nil, fmt.Errorf("could not resolve path %s", path)
return nil, data.ErrUnavailable
}
// then get the file
f, err := i.CoreAPI.Unixfs().Get(ctx, cpath)
if err != nil {
return nil, fmt.Errorf("could not retrieve unixfs file: %w", err)
// if error is timeout or context deadline exceeded, return timeout error
if os.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) {
return nil, data.ErrTimeout
}
log.Warnw("could not retrieve file", "path", path, "error", err)
return nil, data.ErrNotFound
}

content, err := fetchFileContent(f)
Expand Down
19 changes: 10 additions & 9 deletions service/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import (
"go.vocdoni.io/dvote/data/ipfs"
"go.vocdoni.io/dvote/data/ipfs/ipfsconnect"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/types"
)

// IPFS starts the IPFS service
func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage, err error) {
func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (data.Storage, error) {
log.Info("creating ipfs service")
os.Setenv("IPFS_FD_MAX", "1024")
ipfsStore := data.IPFSNewConfig(ipfsconfig.ConfigPath)
storage, err = data.Init(data.StorageIDFromString("IPFS"), ipfsStore)
if err != nil {
return
if err := os.Setenv("IPFS_FD_MAX", "1024"); err != nil {
log.Warnw("could not set IPFS_FD_MAX", "err", err)
}
storage := new(ipfs.Handler)
if err := storage.Init(&types.DataStore{Datadir: ipfsconfig.ConfigPath}); err != nil {
return nil, err
}

go func() {
for {
time.Sleep(time.Second * 120)
Expand All @@ -32,13 +33,13 @@ func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage
log.Infow("starting ipfsconnect service", "key", ipfsconfig.ConnectKey)
ipfsconn := ipfsconnect.New(
ipfsconfig.ConnectKey,
storage.(*ipfs.Handler),
storage,
)
if len(ipfsconfig.ConnectPeers) > 0 && len(ipfsconfig.ConnectPeers[0]) > 8 {
log.Debugf("using custom ipfsconnect bootnodes %s", ipfsconfig.ConnectPeers)
ipfsconn.Transport.BootNodes = ipfsconfig.ConnectPeers
}
ipfsconn.Start()
}
return
return storage, nil
}
4 changes: 3 additions & 1 deletion test/testcommon/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"net/url"
"testing"

"go.vocdoni.io/dvote/data/datamocktest"

qt "github.com/frankban/quicktest"
"go.vocdoni.io/dvote/api"
"go.vocdoni.io/dvote/api/censusdb"
Expand Down Expand Up @@ -44,7 +46,7 @@ func (d *APIserver) Start(t testing.TB, apis ...string) {
}

// create the IPFS storage
d.Storage = &data.DataMockTest{}
d.Storage = &datamocktest.DataMockTest{}
d.Storage.Init(&types.DataStore{Datadir: t.TempDir()})

// create the API router
Expand Down
7 changes: 3 additions & 4 deletions vochain/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ func (app *BaseApplication) FinalizeBlock(_ context.Context,
if len(req.Txs) > 0 {
log.Debugw("finalize block", "height", height,
"txs", len(req.Txs), "hash", hex.EncodeToString(root),
"blockSeconds", time.Since(req.GetTime()).Seconds(),
"elapsedSeconds", time.Since(start).Seconds(),
"milliSeconds", time.Since(start).Milliseconds(),
"proposer", hex.EncodeToString(req.GetProposerAddress()))
}

Expand Down Expand Up @@ -425,7 +424,7 @@ func (app *BaseApplication) PrepareProposal(ctx context.Context,
// Rollback the state to discard the changes made
app.State.Rollback()
log.Debugw("prepare proposal", "height", app.Height(), "txs", len(validTxs),
"elapsedSeconds", time.Since(startTime).Seconds())
"milliSeconds", time.Since(startTime).Milliseconds())
return &abcitypes.ResponsePrepareProposal{
Txs: validTxs,
}, nil
Expand Down Expand Up @@ -477,7 +476,7 @@ func (app *BaseApplication) ProcessProposal(_ context.Context,
app.lastBlockHash = req.GetHash()
log.Debugw("process proposal", "height", app.Height(), "txs", len(req.Txs), "action", "accept",
"blockHash", hex.EncodeToString(app.lastBlockHash),
"hash", hex.EncodeToString(resp.Root), "elapsedSeconds", time.Since(startTime).Seconds())
"hash", hex.EncodeToString(resp.Root), "milliSeconds", time.Since(startTime).Milliseconds())
return &abcitypes.ResponseProcessProposal{
Status: abcitypes.ResponseProcessProposal_ACCEPT,
}, nil
Expand Down
Loading

0 comments on commit 05b72b8

Please sign in to comment.