From 05b72b861d0196c33968fa882c4b5cadc7d0bd3f Mon Sep 17 00:00:00 2001 From: p4u Date: Tue, 28 Nov 2023 10:57:58 +0100 Subject: [PATCH] archive: some minor fixes Signed-off-by: p4u --- cmd/node/main.go | 6 +++- data/data.go | 46 +++++------------------- data/{ => datamocktest}/datamocktest.go | 8 ++--- data/downloader/downloader.go | 27 ++++++++------ data/downloader/downloader_test.go | 4 +-- data/ipfs/ipfs.go | 31 ++++++++++------ service/ipfs.go | 19 +++++----- test/testcommon/api.go | 4 ++- vochain/cometbft.go | 7 ++-- vochain/indexer/archive.go | 6 ++-- vochain/indexer/process.go | 18 ++++------ vochain/indexer/vote.go | 10 ++++-- vochain/processarchive/processarchive.go | 35 ++++++------------ 13 files changed, 102 insertions(+), 119 deletions(-) rename data/{ => datamocktest}/datamocktest.go (95%) diff --git a/cmd/node/main.go b/cmd/node/main.go index 9f4644948..dbfb3ae76 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -544,7 +544,7 @@ func main() { conf.Vochain.OffChainDataDownloader = conf.Vochain.OffChainDataDownloader && conf.Mode == types.ModeGateway - // create the vochain service + // create the vochain service if err = srv.Vochain(); err != nil { log.Fatal(err) } @@ -556,6 +556,10 @@ func main() { } // create the indexer service if conf.Vochain.Indexer.Enabled { + // disable archive if process archive mode is enabled + if conf.Vochain.ProcessArchive { + conf.Vochain.Indexer.ArchiveURL = "" + } if err := srv.VochainIndexer(); err != nil { log.Fatal(err) } diff --git a/data/data.go b/data/data.go index 582f8942e..7588fbee1 100644 --- a/data/data.go +++ b/data/data.go @@ -6,7 +6,6 @@ import ( "fmt" "io" - "go.vocdoni.io/dvote/data/ipfs" "go.vocdoni.io/dvote/types" ) @@ -25,40 +24,13 @@ type Storage interface { Stop() error } -// StorageID is the type for the different storage providers. -// Currently only IPFS is supported. -type StorageID int - -const ( - // IPFS is the InterPlanetary File System. - IPFS StorageID = iota + 1 +var ( + // ErrInvalidPath is returned when the path provided is not valid. + ErrInvalidPath = fmt.Errorf("invalid storage path") + // ErrUnavailable is returned when the storage path is not available. + ErrUnavailable = fmt.Errorf("storage path is unavailable") + // ErrNotFound is returned when the file is not found (cannot be fetch). + ErrNotFound = fmt.Errorf("storage file not found") + // ErrTimeout is returned when the storage context times out. + ErrTimeout = fmt.Errorf("storage context timeout") ) - -// StorageIDFromString returns the Storage identifier from a string. -func StorageIDFromString(i string) StorageID { - switch i { - case "IPFS": - return IPFS - default: - return -1 - } -} - -// IPFSNewConfig returns a new DataStore configuration for IPFS. -func IPFSNewConfig(path string) *types.DataStore { - datastore := new(types.DataStore) - datastore.Datadir = path - return datastore -} - -// Init returns a new Storage instance of type `t`. -func Init(t StorageID, d *types.DataStore) (Storage, error) { - switch t { - case IPFS: - s := new(ipfs.Handler) - err := s.Init(d) - return s, err - default: - return nil, fmt.Errorf("bad storage type or DataStore specification") - } -} diff --git a/data/datamocktest.go b/data/datamocktest/datamocktest.go similarity index 95% rename from data/datamocktest.go rename to data/datamocktest/datamocktest.go index 2ed3f0e1a..2511fe661 100644 --- a/data/datamocktest.go +++ b/data/datamocktest/datamocktest.go @@ -1,13 +1,13 @@ -package data +package datamocktest import ( "context" "io" "maps" - "os" "sync" "time" + "go.vocdoni.io/dvote/data" "go.vocdoni.io/dvote/data/ipfs" "go.vocdoni.io/dvote/test/testcommon/testutil" "go.vocdoni.io/dvote/types" @@ -51,7 +51,7 @@ func (d *DataMockTest) Retrieve(_ context.Context, id string, _ int64) ([]byte, return []byte(data), nil } if d.rnd.RandomIntn(2) == 0 { - return nil, os.ErrDeadlineExceeded + return nil, data.ErrTimeout } time.Sleep(200 * time.Millisecond) return d.rnd.RandomBytes(256), nil @@ -77,7 +77,7 @@ func (d *DataMockTest) Unpin(_ context.Context, path string) error { d.filesMu.Lock() defer d.filesMu.Unlock() if _, ok := d.files[path]; !ok { - return os.ErrNotExist + return data.ErrNotFound } delete(d.files, path) return nil diff --git a/data/downloader/downloader.go b/data/downloader/downloader.go index 8b817fe4a..5a1754e8e 100644 --- a/data/downloader/downloader.go +++ b/data/downloader/downloader.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "maps" - "os" "strings" "sync" "sync/atomic" @@ -21,10 +20,10 @@ const ( ImportQueueRoutines = 32 // ImportRetrieveTimeout the maximum duration the import queue will wait // for retrieving a remote file. - ImportRetrieveTimeout = 5 * time.Minute + ImportRetrieveTimeout = 3 * time.Minute // ImportPinTimeout is the maximum duration the import queue will wait // for pinning a remote file. - ImportPinTimeout = 3 * time.Minute + ImportPinTimeout = 2 * time.Minute // MaxFileSize is the maximum size of a file that can be imported. MaxFileSize = 100 * 1024 * 1024 // 100MB @@ -122,16 +121,16 @@ func (d *Downloader) handleImport(item *DownloadItem) { d.queueAddDelta(1) defer d.queueAddDelta(-1) ctx, cancel := context.WithTimeout(context.Background(), ImportRetrieveTimeout) - data, err := d.RemoteStorage.Retrieve(ctx, item.URI, MaxFileSize) + file, err := d.RemoteStorage.Retrieve(ctx, item.URI, MaxFileSize) cancel() if err != nil { - if os.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, data.ErrTimeout) { log.Warnw("timeout importing file, adding it to failed queue for retry", "uri", item.URI) d.failedQueueLock.Lock() d.failedQueue[item.URI] = item d.failedQueueLock.Unlock() } else { - log.Warnw("could not retrieve file", "uri", item.URI, "error", fmt.Sprintf("%v", err)) + log.Warnw("could not retrieve file", "uri", item.URI, "error", err) } return } @@ -145,7 +144,7 @@ func (d *Downloader) handleImport(item *DownloadItem) { } }() if item.Callback != nil { - go item.Callback(item.URI, data) + go item.Callback(item.URI, file) } } @@ -181,11 +180,19 @@ func (d *Downloader) importFailedQueue() map[string]*DownloadItem { // handleImportFailedQueue tries to import files that failed. func (d *Downloader) handleImportFailedQueue() { for cid, item := range d.importFailedQueue() { - log.Debugf("retrying download %s", cid) + log.Debugw("retrying failed download", "cid", cid) ctx, cancel := context.WithTimeout(context.Background(), ImportRetrieveTimeout) - data, err := d.RemoteStorage.Retrieve(ctx, strings.TrimPrefix(item.URI, d.RemoteStorage.URIprefix()), 0) + file, err := d.RemoteStorage.Retrieve(ctx, strings.TrimPrefix(item.URI, d.RemoteStorage.URIprefix()), MaxFileSize) cancel() if err != nil { + if !errors.Is(err, data.ErrTimeout) { + // if the error is not a timeout, we remove the item from the failed queue + d.failedQueueLock.Lock() + delete(d.failedQueue, cid) + d.failedQueueLock.Unlock() + log.Debugw("removed download from failed queue, we won't try anymore", "cid", cid) + } + // if the error is a timeout, we just continue to retry continue } d.failedQueueLock.Lock() @@ -199,7 +206,7 @@ func (d *Downloader) handleImportFailedQueue() { log.Warnf("could not pin file %q: %v", uri, err) } if item.Callback != nil { - go item.Callback(uri, data) + go item.Callback(uri, file) } } } diff --git a/data/downloader/downloader_test.go b/data/downloader/downloader_test.go index bc7aac0e8..6aa96974a 100644 --- a/data/downloader/downloader_test.go +++ b/data/downloader/downloader_test.go @@ -4,11 +4,11 @@ import ( "testing" qt "github.com/frankban/quicktest" - "go.vocdoni.io/dvote/data" + "go.vocdoni.io/dvote/data/datamocktest" ) func TestDownloader(t *testing.T) { - stg := data.DataMockTest{} + stg := datamocktest.DataMockTest{} stg.Init(nil) d := NewDownloader(&stg) d.Start() diff --git a/data/ipfs/ipfs.go b/data/ipfs/ipfs.go index 42a7a231d..49dff43b2 100644 --- a/data/ipfs/ipfs.go +++ b/data/ipfs/ipfs.go @@ -3,6 +3,7 @@ package ipfs import ( "bytes" "context" + "errors" "fmt" "io" "os" @@ -30,6 +31,7 @@ import ( manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" + "go.vocdoni.io/dvote/data" "go.vocdoni.io/dvote/log" "go.vocdoni.io/dvote/types" ) @@ -87,7 +89,7 @@ func (i *Handler) Init(d *types.DataStore) error { log.Errorw(err, "error running ipfs garbage collector") } }() - log.Infow("IPFS initialization", + log.Debugw("IPFS initialization", "peerID", node.Identity.String(), "addresses", node.PeerHost.Addrs(), "pubKey", node.PrivateKey.GetPublic(), @@ -189,7 +191,7 @@ func (i *Handler) Pin(ctx context.Context, path string) error { path = strings.Replace(path, "ipfs://", "/ipfs/", 1) p, err := ipfspath.NewPath(path) if err != nil { - return err + return data.ErrInvalidPath } return i.CoreAPI.Pin().Add(ctx, p) } @@ -216,7 +218,7 @@ func (i *Handler) Unpin(ctx context.Context, path string) error { path = strings.Replace(path, "ipfs://", "/ipfs/", 1) cpath, err := ipfspath.NewPath(path) if err != nil { - return err + return data.ErrInvalidPath } log.Debugf("removing pin %s", cpath) return i.CoreAPI.Pin().Rm(ctx, cpath, options.Pin.RmRecursive(true)) @@ -272,16 +274,20 @@ func (i *Handler) RetrieveDir(ctx context.Context, path string, maxSize int64) ( // first resolve the path p, err := ipfspath.NewPath(path) if err != nil { - return nil, err + return nil, data.ErrInvalidPath } cpath, _, err := i.CoreAPI.ResolvePath(ctx, p) if err != nil { - return nil, fmt.Errorf("could not resolve path %s", path) + return nil, data.ErrUnavailable } // then get the file f, err := i.CoreAPI.Unixfs().Get(ctx, cpath) if err != nil { - return nil, fmt.Errorf("could not retrieve unixfs file: %w", err) + // if error is timeout or context deadline exceeded, return timeout error + if os.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) { + return nil, data.ErrTimeout + } + return nil, data.ErrNotFound } dirMap := make(map[string][]byte) @@ -311,23 +317,28 @@ func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]b // check if we have the file in the local cache ccontent, _ := i.retrieveCache.Get(path) if ccontent != nil { - log.Debugf("retrieved file %s from cache", path) return ccontent, nil } // first resolve the path p, err := ipfspath.NewPath(path) if err != nil { - return nil, err + log.Warnw("invalid path", "path", path, "error", err) + return nil, data.ErrInvalidPath } cpath, _, err := i.CoreAPI.ResolvePath(ctx, p) if err != nil { - return nil, fmt.Errorf("could not resolve path %s", path) + return nil, data.ErrUnavailable } // then get the file f, err := i.CoreAPI.Unixfs().Get(ctx, cpath) if err != nil { - return nil, fmt.Errorf("could not retrieve unixfs file: %w", err) + // if error is timeout or context deadline exceeded, return timeout error + if os.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) { + return nil, data.ErrTimeout + } + log.Warnw("could not retrieve file", "path", path, "error", err) + return nil, data.ErrNotFound } content, err := fetchFileContent(f) diff --git a/service/ipfs.go b/service/ipfs.go index b80459df1..d05052cd4 100644 --- a/service/ipfs.go +++ b/service/ipfs.go @@ -9,18 +9,19 @@ import ( "go.vocdoni.io/dvote/data/ipfs" "go.vocdoni.io/dvote/data/ipfs/ipfsconnect" "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/types" ) // IPFS starts the IPFS service -func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage, err error) { +func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (data.Storage, error) { log.Info("creating ipfs service") - os.Setenv("IPFS_FD_MAX", "1024") - ipfsStore := data.IPFSNewConfig(ipfsconfig.ConfigPath) - storage, err = data.Init(data.StorageIDFromString("IPFS"), ipfsStore) - if err != nil { - return + if err := os.Setenv("IPFS_FD_MAX", "1024"); err != nil { + log.Warnw("could not set IPFS_FD_MAX", "err", err) + } + storage := new(ipfs.Handler) + if err := storage.Init(&types.DataStore{Datadir: ipfsconfig.ConfigPath}); err != nil { + return nil, err } - go func() { for { time.Sleep(time.Second * 120) @@ -32,7 +33,7 @@ func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage log.Infow("starting ipfsconnect service", "key", ipfsconfig.ConnectKey) ipfsconn := ipfsconnect.New( ipfsconfig.ConnectKey, - storage.(*ipfs.Handler), + storage, ) if len(ipfsconfig.ConnectPeers) > 0 && len(ipfsconfig.ConnectPeers[0]) > 8 { log.Debugf("using custom ipfsconnect bootnodes %s", ipfsconfig.ConnectPeers) @@ -40,5 +41,5 @@ func (vs *VocdoniService) IPFS(ipfsconfig *config.IPFSCfg) (storage data.Storage } ipfsconn.Start() } - return + return storage, nil } diff --git a/test/testcommon/api.go b/test/testcommon/api.go index 55720231a..47ef9ff98 100644 --- a/test/testcommon/api.go +++ b/test/testcommon/api.go @@ -4,6 +4,8 @@ import ( "net/url" "testing" + "go.vocdoni.io/dvote/data/datamocktest" + qt "github.com/frankban/quicktest" "go.vocdoni.io/dvote/api" "go.vocdoni.io/dvote/api/censusdb" @@ -44,7 +46,7 @@ func (d *APIserver) Start(t testing.TB, apis ...string) { } // create the IPFS storage - d.Storage = &data.DataMockTest{} + d.Storage = &datamocktest.DataMockTest{} d.Storage.Init(&types.DataStore{Datadir: t.TempDir()}) // create the API router diff --git a/vochain/cometbft.go b/vochain/cometbft.go index 27011cff7..b2bd2b981 100644 --- a/vochain/cometbft.go +++ b/vochain/cometbft.go @@ -260,8 +260,7 @@ func (app *BaseApplication) FinalizeBlock(_ context.Context, if len(req.Txs) > 0 { log.Debugw("finalize block", "height", height, "txs", len(req.Txs), "hash", hex.EncodeToString(root), - "blockSeconds", time.Since(req.GetTime()).Seconds(), - "elapsedSeconds", time.Since(start).Seconds(), + "milliSeconds", time.Since(start).Milliseconds(), "proposer", hex.EncodeToString(req.GetProposerAddress())) } @@ -425,7 +424,7 @@ func (app *BaseApplication) PrepareProposal(ctx context.Context, // Rollback the state to discard the changes made app.State.Rollback() log.Debugw("prepare proposal", "height", app.Height(), "txs", len(validTxs), - "elapsedSeconds", time.Since(startTime).Seconds()) + "milliSeconds", time.Since(startTime).Milliseconds()) return &abcitypes.ResponsePrepareProposal{ Txs: validTxs, }, nil @@ -477,7 +476,7 @@ func (app *BaseApplication) ProcessProposal(_ context.Context, app.lastBlockHash = req.GetHash() log.Debugw("process proposal", "height", app.Height(), "txs", len(req.Txs), "action", "accept", "blockHash", hex.EncodeToString(app.lastBlockHash), - "hash", hex.EncodeToString(resp.Root), "elapsedSeconds", time.Since(startTime).Seconds()) + "hash", hex.EncodeToString(resp.Root), "milliSeconds", time.Since(startTime).Milliseconds()) return &abcitypes.ResponseProcessProposal{ Status: abcitypes.ResponseProcessProposal_ACCEPT, }, nil diff --git a/vochain/indexer/archive.go b/vochain/indexer/archive.go index 456ca8851..6873010f8 100644 --- a/vochain/indexer/archive.go +++ b/vochain/indexer/archive.go @@ -24,11 +24,11 @@ const ( // ArchiveProcess is the struct used to store the process data in the archive. type ArchiveProcess struct { - ChainID string `json:"chainId,omitempty"` + ChainID string `json:"chainId,omitempty"` // Legacy ProcessInfo *indexertypes.Process `json:"process"` Results *results.Results `json:"results"` - StartDate *time.Time `json:"startDate,omitempty"` - EndDate *time.Time `json:"endDate,omitempty"` + StartDate *time.Time `json:"startDate,omitempty"` // Legacy + EndDate *time.Time `json:"endDate,omitempty"` // Legacy } // ImportArchive imports an archive list of processes into the indexer database. diff --git a/vochain/indexer/process.go b/vochain/indexer/process.go index 870d9a8a7..427fa1c0b 100644 --- a/vochain/indexer/process.go +++ b/vochain/indexer/process.go @@ -16,6 +16,7 @@ import ( ) var ( + // ErrProcessNotFound is returned if the process is not found in the indexer database. ErrProcessNotFound = fmt.Errorf("process not found") zeroBytes = []byte("") ) @@ -46,14 +47,8 @@ func (idx *Indexer) ProcessInfo(pid []byte) (*indexertypes.Process, error) { // EntityID, searchTerm, namespace, status, and withResults are optional filters, if // declared as zero-values will be ignored. SearchTerm is a partial or full PID. // Status is one of READY, CANCELED, ENDED, PAUSED, RESULTS -func (idx *Indexer) ProcessList(entityID []byte, - from, - max int, - searchTerm string, - namespace uint32, - srcNetworkId int32, - status string, - withResults bool) ([][]byte, error) { +func (idx *Indexer) ProcessList(entityID []byte, from, max int, searchTerm string, namespace uint32, + srcNetworkId int32, status string, withResults bool) ([][]byte, error) { if from < 0 { return nil, fmt.Errorf("processList: invalid value: from is invalid value %d", from) } @@ -203,10 +198,11 @@ func (idx *Indexer) updateProcess(ctx context.Context, queries *indexerdb.Querie return fmt.Errorf("updateProcess: cannot fetch process %x: %w", pid, err) } - previousStatus, err := queries.GetProcessStatus(ctx, pid) + dbProc, err := queries.GetProcess(ctx, pid) if err != nil { - return err + return fmt.Errorf("updateProcess: cannot fetch process %x: %w", pid, err) } + previousStatus := dbProc.Status // We need to use the time of start/end from the block header, as we might be syncing the blockchain currentBlockTime := func() time.Time { @@ -218,7 +214,7 @@ func (idx *Indexer) updateProcess(ctx context.Context, queries *indexerdb.Querie } // Update start date with the block time if the process starts on this block - var startDate time.Time + startDate := dbProc.StartDate if idx.App.Height() == p.StartBlock { startDate = currentBlockTime() } diff --git a/vochain/indexer/vote.go b/vochain/indexer/vote.go index 2de795abd..ddea4f00a 100644 --- a/vochain/indexer/vote.go +++ b/vochain/indexer/vote.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "math/big" + "time" "go.vocdoni.io/proto/build/go/models" @@ -107,8 +108,12 @@ func (idx *Indexer) CountTotalVotes() (uint64, error) { func (idx *Indexer) finalizeResults(ctx context.Context, queries *indexerdb.Queries, process *models.Process) error { height := idx.App.Height() processID := process.ProcessId - log.Debugw("finalize results", "processID", hex.EncodeToString(processID), "height", height) - + endDate := idx.App.TimestampFromBlock(int64(idx.App.Height())) + if endDate == nil { + endDate = new(time.Time) + *endDate = time.Now() + } + log.Debugw("finalize results", "processID", hex.EncodeToString(processID), "height", height, "endDate", endDate.String()) // Get the results r := results.ProtoToResults(process.Results) if _, err := queries.SetProcessResultsReady(ctx, indexerdb.SetProcessResultsReadyParams{ @@ -116,6 +121,7 @@ func (idx *Indexer) finalizeResults(ctx context.Context, queries *indexerdb.Quer Votes: indexertypes.EncodeJSON(r.Votes), Weight: indexertypes.EncodeJSON(r.Weight), BlockHeight: int64(r.BlockHeight), + EndDate: *endDate, }); err != nil { return err } diff --git a/vochain/processarchive/processarchive.go b/vochain/processarchive/processarchive.go index 4e57c397d..fe75f8e8a 100644 --- a/vochain/processarchive/processarchive.go +++ b/vochain/processarchive/processarchive.go @@ -135,7 +135,6 @@ func BuildIndex(datadir string) (*Index, error) { name := entry.Name() path := filepath.Join(datadir, name) if len(entry.Name()) != 64 { - log.Warnf("archive file %s has an invalid name", path) continue } content, err := os.ReadFile(filepath.Join(datadir, name)) @@ -158,7 +157,7 @@ func BuildIndex(datadir string) (*Index, error) { if err != nil { return nil, err } - log.Infof("archive index build with %d processes", count) + log.Infow("archive index build", "processes", count) return i, os.WriteFile(filepath.Join(datadir, "index.json"), indexData, 0o644) } @@ -232,9 +231,7 @@ func (pa *ProcessArchive) ProcessScan(fromBlock int) error { if err != nil { return err } - - log.Infof("scanning blockchain processes from block %d (ended:%d results:%d ready:%d)", - fromBlock, len(pids2), len(pids), len(pids3)) + log.Infow("archive scan started", "fromBlock", fromBlock, "ended", len(pids2), "results", len(pids), "ready", len(pids3)) added := 0 for _, p := range append(append(pids, pids2...), pids3...) { exists, err := pa.storage.ProcessExist(p) @@ -249,58 +246,46 @@ func (pa *ProcessArchive) ProcessScan(fromBlock int) error { if err != nil { return err } - // If status is READY but the process is not yet finished, ignore - if procInfo.Status == int32(models.ProcessStatus_READY) { - if pa.indexer.App.Height() < procInfo.EndBlock { - continue - } + // If status is not results, we skip it. + if procInfo.Status != int32(models.ProcessStatus_RESULTS) { + continue } - results := procInfo.Results() if err := pa.storage.AddProcess(&Process{ ProcessInfo: procInfo, - Results: results, - StartDate: pa.indexer.App.TimestampFromBlock(int64(procInfo.StartBlock)), - EndDate: pa.indexer.App.TimestampFromBlock(int64(results.BlockHeight)), - ChainID: pa.indexer.App.ChainID(), + Results: procInfo.Results(), }); err != nil { log.Warnf("processScan: %v", err) } added++ } - log.Infof("archive scan added %d archive processes, took %s", added, time.Since(startTime)) + log.Infow("archive scan finished", "added", added, "took", time.Since(startTime).String()) return nil } // OnComputeResults implements the indexer event callback. // On this event the results are set always and the process info only if it // does not exist yet in the json storage. -func (pa *ProcessArchive) OnComputeResults(results *results.Results, - proc *indexertypes.Process, height uint32) { +func (pa *ProcessArchive) OnComputeResults(results *results.Results, proc *indexertypes.Process, _ uint32) { // Get the process (if exist) jsProc, err := pa.storage.GetProcess(results.ProcessID) if err != nil { if os.IsNotExist(err) { // if it does not exist yet, we create it jsProc = &Process{ ProcessInfo: proc, - Results: results, - ChainID: pa.indexer.App.ChainID(), } } else { - log.Errorf("cannot get json store process: %v", err) + log.Errorw(err, "cannot get json store process") return } } jsProc.Results = results - jsProc.StartDate = pa.indexer.App.TimestampFromBlock(int64(proc.StartBlock)) - jsProc.EndDate = pa.indexer.App.TimestampFromBlock(int64(results.BlockHeight)) if err := pa.storage.AddProcess(jsProc); err != nil { log.Errorf("cannot add json process: %v", err) return } - log.Infof("stored json process %x for compute results event", proc.ID) + log.Infow("stored json archive process", "id", proc.ID.String()) // send publish signal - log.Debugf("sending archive publish signal for height %d", height) select { case pa.publish <- true: default: // do nothing