diff --git a/crdt.go b/crdt.go index 3d7635c9..12c0d7b4 100644 --- a/crdt.go +++ b/crdt.go @@ -297,7 +297,7 @@ func New( "crdt Datastore created. Number of heads: %d. Current max-height: %d. Dirty: %t", len(headList), maxHeight, - dstore.isDirty(), + dstore.IsDirty(), ) // sendJobWorker + NumWorkers @@ -372,7 +372,7 @@ func (store *Datastore) handleNext() { // processed, thus it did not leave a branch // half-processed and there's nothign to // recover. - // disabled: store.markDirty() + // disabled: store.MarkDirty() } } @@ -501,7 +501,7 @@ func (store *Datastore) repair() { } return case <-timer.C: - if !store.isDirty() { + if !store.IsDirty() { store.logger.Info("store is marked clean. No need to repair") } else { store.logger.Warn("store is marked dirty. Starting DAG repair operation") @@ -564,7 +564,7 @@ func (store *Datastore) logStats() { len(heads), height, len(store.jobQueue), - store.isDirty(), + store.IsDirty(), ) case <-store.ctx.Done(): ticker.Stop() @@ -631,7 +631,7 @@ func (store *Datastore) dagWorker() { if err != nil { store.logger.Error(err) - store.markDirty() + store.MarkDirty() job.session.Done() continue } @@ -639,7 +639,7 @@ func (store *Datastore) dagWorker() { err := store.sendNewJobs(j.session, j.nodeGetter, j.root, j.rootPrio, children) if err != nil { store.logger.Error(err) - store.markDirty() + store.MarkDirty() } j.session.Done() }(job) @@ -722,7 +722,7 @@ func (store *Datastore) sendJobWorker() { case <-store.ctx.Done(): if len(store.sendJobs) > 0 { // we left something in the queue - store.markDirty() + store.MarkDirty() } close(store.jobQueue) return @@ -748,15 +748,17 @@ func (store *Datastore) dirtyKey() ds.Key { return store.namespace.ChildString(dirtyBitKey) } -func (store *Datastore) markDirty() { - store.logger.Error("marking datastore as dirty") +// MarkDirty marks the Datastore as dirty. +func (store *Datastore) MarkDirty() { + store.logger.Warn("marking datastore as dirty") err := store.store.Put(store.ctx, store.dirtyKey(), nil) if err != nil { store.logger.Errorf("error setting dirty bit: %s", err) } } -func (store *Datastore) isDirty() bool { +// IsDirty returns whether the datastore is marked dirty. +func (store *Datastore) IsDirty() bool { ok, err := store.store.Has(store.ctx, store.dirtyKey()) if err != nil { store.logger.Errorf("error checking dirty bit: %s", err) @@ -764,7 +766,8 @@ func (store *Datastore) isDirty() bool { return ok } -func (store *Datastore) markClean() { +// MarkClean removes the dirty mark from the datastore. +func (store *Datastore) MarkClean() { store.logger.Info("marking datastore as clean") err := store.store.Delete(store.ctx, store.dirtyKey()) if err != nil { @@ -880,7 +883,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u return children, nil } -// repairDAG is used to walk down the chain until a non-processed node is +// RepairDAG is used to walk down the chain until a non-processed node is // found and at that moment, queues it for processing. func (store *Datastore) repairDAG() error { start := time.Now() @@ -986,7 +989,7 @@ func (store *Datastore) repairDAG() error { // If we are here we have successfully reprocessed the chain until the // bottom. - store.markClean() + store.MarkClean() return nil } @@ -1023,15 +1026,14 @@ func (store *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err // Query searches the datastore and returns a query result. This function // may return before the query actually runs. To wait for the query: // -// result, _ := ds.Query(q) +// result, _ := ds.Query(q) // -// // use the channel interface; result may come in at different times -// for entry := range result.Next() { ... } -// -// // or wait for the query to be completely done -// entries, _ := result.Rest() -// for entry := range entries { ... } +// // use the channel interface; result may come in at different times +// for entry := range result.Next() { ... } // +// // or wait for the query to be completely done +// entries, _ := result.Rest() +// for entry := range entries { ... } func (store *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error) { qr, err := store.set.Elements(ctx, q) if err != nil { @@ -1108,7 +1110,7 @@ func (store *Datastore) Sync(ctx context.Context, prefix ds.Key) error { func (store *Datastore) Close() error { store.cancel() store.wg.Wait() - if store.isDirty() { + if store.IsDirty() { store.logger.Warn("datastore is being closed marked as dirty") } return nil @@ -1257,7 +1259,7 @@ func (store *Datastore) addDAGNode(delta *pb.Delta) (cid.Cid, error) { nd, ) if err != nil { - store.markDirty() // not sure if this will fix much if this happens. + store.MarkDirty() // not sure if this will fix much if this happens. return cid.Undef, errors.Wrap(err, "error processing new block") } if len(children) != 0 { @@ -1464,6 +1466,26 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c return nil } +// Stats wraps internal information about the datastore. +// Might be expanded in the future. +type Stats struct { + Heads []cid.Cid + MaxHeight uint64 + QueuedJobs int +} + +// InternalStats returns internal datastore information like the current heads +// and max height. +func (store *Datastore) InternalStats() Stats { + heads, height, _ := store.heads.List() + + return Stats{ + Heads: heads, + MaxHeight: height, + QueuedJobs: len(store.jobQueue), + } +} + type cidSafeSet struct { set map[cid.Cid]struct{} mux sync.RWMutex