Skip to content

Commit

Permalink
fix posting align with series
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Dec 26, 2024
1 parent 0228c3d commit c898baa
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2606,7 +2606,7 @@ func (r *bucketIndexReader) ExpandedPostings(
return nil, nil
}

hit, postings, err := r.fetchSingleExpandedPostingFromCacheAndExpand(ctx, ms, bytesLimiter, tenant)
hit, postings, err := r.fetchAlignedExpandedPostingFromCacheAndExpand(ctx, ms, bytesLimiter, tenant)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2956,8 +2956,8 @@ type postingPtr struct {
ptr index.Range
}

func (r *bucketIndexReader) fetchSingleExpandedPostingFromCacheAndExpand(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) {
postings, closers, err := r.fetchExpandedPostingsFromCache(ctx, [][]*labels.Matcher{ms}, bytesLimiter, tenant)
func (r *bucketIndexReader) fetchAlignedExpandedPostingFromCacheAndExpand(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) {
postings, closers, err := r.fetchExpandedPostingsFromCache(ctx, [][]*labels.Matcher{ms}, bytesLimiter, tenant, true)
defer func() {
for _, closer := range closers {
closer()
Expand All @@ -2980,7 +2980,7 @@ func (r *bucketIndexReader) fetchSingleExpandedPostingFromCacheAndExpand(ctx con
return true, ps, nil
}

func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms [][]*labels.Matcher, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms [][]*labels.Matcher, bytesLimiter BytesLimiter, tenant string, seriesByteAligned bool) ([]index.Postings, []func(), error) {
if len(ms) == 0 {
return nil, nil, nil
}
Expand Down Expand Up @@ -3013,17 +3013,19 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
continue
}

// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.IndexVersion()
if err != nil {
return nil, closeFns, errors.Wrap(err, "get index version")
}

res[i] = p
if version >= 2 {
// Index version 2 series are padded by 16 bytes.
res[i] = newSeriesByteAlignedPostings(p)
if seriesByteAligned {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.IndexVersion()
if err != nil {
return nil, closeFns, errors.Wrap(err, "get index version")
}

if version >= 2 {
// Index version 2 series are padded by 16 bytes.
res[i] = newSeriesByteAlignedPostings(p)
}
}
}
}
Expand Down Expand Up @@ -3082,7 +3084,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, postingGroups []*
timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant))
defer timer.ObserveDuration()

expandedPostings, closers, err := r.fetchExpandedPostingsFromCache(ctx, expandedPostingMatchers, bytesLimiter, tenant)
expandedPostings, closers, err := r.fetchExpandedPostingsFromCache(ctx, expandedPostingMatchers, bytesLimiter, tenant, false)
closeFns = append(closeFns, closers...)
if err != nil {
return nil, closeFns, errors.Wrap(err, "fetch expanded postings from cache")
Expand Down

0 comments on commit c898baa

Please sign in to comment.