diff --git a/cmd/node/main.go b/cmd/node/main.go index 07381946d..9f4644948 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -548,6 +548,12 @@ func main() { if err = srv.Vochain(); err != nil { log.Fatal(err) } + // create the offchain data downloader service + if conf.Vochain.OffChainDataDownloader { + if err := srv.OffChainDataHandler(); err != nil { + log.Fatal(err) + } + } // create the indexer service if conf.Vochain.Indexer.Enabled { if err := srv.VochainIndexer(); err != nil { @@ -560,12 +566,6 @@ func main() { log.Fatal(err) } } - // create the offchain data downloader service - if conf.Vochain.OffChainDataDownloader { - if err := srv.OffChainDataHandler(); err != nil { - log.Fatal(err) - } - } // start the service and block until finish fast sync if err := srv.Start(); err != nil { log.Fatal(err) diff --git a/service/indexer.go b/service/indexer.go index 788a26b41..b96811770 100644 --- a/service/indexer.go +++ b/service/indexer.go @@ -24,7 +24,7 @@ func (vs *VocdoniService) VochainIndexer() error { if vs.Config.Indexer.ArchiveURL != "" { log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL) - go vs.Indexer.StartArchiveRetrieval(vs.Storage, vs.Config.Indexer.ArchiveURL) + go vs.Indexer.StartArchiveRetrieval(vs.DataDownloader, vs.Config.Indexer.ArchiveURL) } return nil diff --git a/vochain/indexer/archive.go b/vochain/indexer/archive.go index 1310f4d2e..456ca8851 100644 --- a/vochain/indexer/archive.go +++ b/vochain/indexer/archive.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "go.vocdoni.io/dvote/data" + "go.vocdoni.io/dvote/data/downloader" "go.vocdoni.io/dvote/log" "go.vocdoni.io/dvote/types" indexerdb "go.vocdoni.io/dvote/vochain/indexer/db" @@ -16,7 +16,7 @@ import ( ) const ( - marxArchiveFileSize = 1024 * 100 // 100KB + maxArchiveFileSize = 1024 * 100 // 100KB timeoutArchiveRetrieval = 120 * time.Second archiveFetchInterval = 60 * time.Minute archiveFileNameSize = types.ProcessIDsize * 2 // 64 hex chars @@ -40,6 +40,7 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) ([]*ArchiveProcess, return nil, err } defer tx.Rollback() + queries := indexerdb.New(tx) added := []*ArchiveProcess{} for _, p := range archive { @@ -120,10 +121,14 @@ func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) ([]*ArchiveProcess, // StartArchiveRetrieval starts the archive retrieval process. It is a blocking function that runs continuously. // Retrieves the archive directory from the storage and imports the processes into the indexer database. -func (idx *Indexer) StartArchiveRetrieval(storage data.Storage, archiveURL string) { +func (idx *Indexer) StartArchiveRetrieval(storage *downloader.Downloader, archiveURL string) { + if storage == nil || archiveURL == "" { + log.Warnw("cannot start archive retrieval", "downloader", storage != nil, "url", archiveURL) + return + } for { ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval) - dirMap, err := storage.RetrieveDir(ctx, archiveURL, marxArchiveFileSize) + dirMap, err := storage.RemoteStorage.RetrieveDir(ctx, archiveURL, maxArchiveFileSize) cancel() if err != nil { log.Warnw("cannot retrieve archive directory", "url", archiveURL, "err", err) @@ -153,11 +158,9 @@ func (idx *Indexer) StartArchiveRetrieval(storage data.Storage, archiveURL strin if len(added) > 0 { log.Infow("new archive imported", "count", len(added)) for _, p := range added { - ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval) - if err := storage.Pin(ctx, p.ProcessInfo.Metadata); err != nil { - log.Warnw("cannot pin metadata", "err", err.Error()) + if p.ProcessInfo.Metadata != "" { + storage.AddToQueue(p.ProcessInfo.Metadata, nil, true) } - cancel() } } time.Sleep(archiveFetchInterval)