diff --git a/extern/boostd-data/yugabyte/service.go b/extern/boostd-data/yugabyte/service.go index 33f787064..571e54c99 100644 --- a/extern/boostd-data/yugabyte/service.go +++ b/extern/boostd-data/yugabyte/service.go @@ -537,28 +537,54 @@ func (s *Store) addMultihashesToPieces(ctx context.Context, pieceCid cid.Cid, re insertPieceOffsetsQry := `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid) VALUES (?, ?)` pieceCidBytes := pieceCid.Bytes() - var batch *gocql.Batch - for allIdx, rec := range recs { - if batch == nil { - batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) + threadBatch := len(recs) / 32 // split the slice into go-routine batches for ~32 workers + + var eg errgroup.Group + for i := 0; i < len(recs); i += threadBatch { + i := i + j := i + threadBatch + if j >= len(recs) { + j = len(recs) } - batch.Entries = append(batch.Entries, gocql.BatchEntry{ - Stmt: insertPieceOffsetsQry, - Args: []interface{}{trimMultihash(rec.Cid.Hash()), pieceCidBytes}, - Idempotent: true, - }) + // Process batch recs[i:j] - if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize { - err := s.session.ExecuteBatch(batch) - if err != nil { - return fmt.Errorf("inserting into PayloadToPieces: %w", err) + eg.Go(func() error { + var batch *gocql.Batch + recsb := recs[i:j] + for allIdx, rec := range recsb { + if batch == nil { + batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) + } + + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: insertPieceOffsetsQry, + Args: []interface{}{trimMultihash(rec.Cid.Hash()), pieceCidBytes}, + Idempotent: true, + }) + + if allIdx == len(recsb)-1 || len(batch.Entries) == s.settings.InsertBatchSize { + err := s.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("inserting into PayloadToPieces: %w", err) + } + batch = nil + + // emit progress only from batch 0 + if i == 0 { + numberOfGoroutines := len(recs)/threadBatch + 1 + progress(float64(numberOfGoroutines) * float64(allIdx+1) / float64(len(recs))) + } + } } - batch = nil + return nil + }) + } - progress(float64(allIdx+1) / float64(len(recs))) - } + err := eg.Wait() + if err != nil { + return err } return nil } @@ -570,28 +596,54 @@ func (s *Store) addPieceInfos(ctx context.Context, pieceCid cid.Cid, recs []mode insertPieceOffsetsQry := `INSERT INTO PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)` pieceCidBytes := pieceCid.Bytes() - var batch *gocql.Batch - for allIdx, rec := range recs { - if batch == nil { - batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) + threadBatch := len(recs) / 32 // split the slice into go-routine batches for ~32 workers + + var eg errgroup.Group + for i := 0; i < len(recs); i += threadBatch { + i := i + j := i + threadBatch + if j >= len(recs) { + j = len(recs) } - batch.Entries = append(batch.Entries, gocql.BatchEntry{ - Stmt: insertPieceOffsetsQry, - Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, - Idempotent: true, - }) + // Process batch recs[i:j] - if allIdx == len(recs)-1 || len(batch.Entries) == s.settings.InsertBatchSize { - err := s.session.ExecuteBatch(batch) - if err != nil { - return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err) + eg.Go(func() error { + var batch *gocql.Batch + recsb := recs[i:j] + for allIdx, rec := range recsb { + if batch == nil { + batch = s.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, s.settings.InsertBatchSize) + } + + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: insertPieceOffsetsQry, + Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, + Idempotent: true, + }) + + if allIdx == len(recsb)-1 || len(batch.Entries) == s.settings.InsertBatchSize { + err := s.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("executing offset / size batch insert for piece %s: %w", pieceCid, err) + } + batch = nil + + // emit progress only from batch 0 + if i == 0 { + numberOfGoroutines := len(recs)/threadBatch + 1 + progress(float64(numberOfGoroutines) * float64(allIdx+1) / float64(len(recs))) + } + } } - batch = nil + return nil + }) + } - progress(float64(allIdx+1) / float64(len(recs))) - } + err := eg.Wait() + if err != nil { + return err } return nil