From c898baa2251d1a60d9f38ff98463e55474dd0d17 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 26 Dec 2024 15:54:07 -0800 Subject: [PATCH] fix posting align with series Signed-off-by: Ben Ye --- pkg/store/bucket.go | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 357bb291b8..cf3ed6c2cd 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2606,7 +2606,7 @@ func (r *bucketIndexReader) ExpandedPostings( return nil, nil } - hit, postings, err := r.fetchSingleExpandedPostingFromCacheAndExpand(ctx, ms, bytesLimiter, tenant) + hit, postings, err := r.fetchAlignedExpandedPostingFromCacheAndExpand(ctx, ms, bytesLimiter, tenant) if err != nil { return nil, err } @@ -2956,8 +2956,8 @@ type postingPtr struct { ptr index.Range } -func (r *bucketIndexReader) fetchSingleExpandedPostingFromCacheAndExpand(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { - postings, closers, err := r.fetchExpandedPostingsFromCache(ctx, [][]*labels.Matcher{ms}, bytesLimiter, tenant) +func (r *bucketIndexReader) fetchAlignedExpandedPostingFromCacheAndExpand(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) { + postings, closers, err := r.fetchExpandedPostingsFromCache(ctx, [][]*labels.Matcher{ms}, bytesLimiter, tenant, true) defer func() { for _, closer := range closers { closer() @@ -2980,7 +2980,7 @@ func (r *bucketIndexReader) fetchSingleExpandedPostingFromCacheAndExpand(ctx con return true, ps, nil } -func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms [][]*labels.Matcher, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) { +func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms [][]*labels.Matcher, bytesLimiter BytesLimiter, tenant string, seriesByteAligned bool) ([]index.Postings, []func(), error) { if len(ms) == 0 { return nil, nil, nil } @@ -3013,17 +3013,19 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, continue } - // As of version two all series entries are 16 byte padded. All references - // we get have to account for that to get the correct offset. - version, err := r.IndexVersion() - if err != nil { - return nil, closeFns, errors.Wrap(err, "get index version") - } - res[i] = p - if version >= 2 { - // Index version 2 series are padded by 16 bytes. - res[i] = newSeriesByteAlignedPostings(p) + if seriesByteAligned { + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + version, err := r.IndexVersion() + if err != nil { + return nil, closeFns, errors.Wrap(err, "get index version") + } + + if version >= 2 { + // Index version 2 series are padded by 16 bytes. + res[i] = newSeriesByteAlignedPostings(p) + } } } } @@ -3082,7 +3084,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, postingGroups []* timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant)) defer timer.ObserveDuration() - expandedPostings, closers, err := r.fetchExpandedPostingsFromCache(ctx, expandedPostingMatchers, bytesLimiter, tenant) + expandedPostings, closers, err := r.fetchExpandedPostingsFromCache(ctx, expandedPostingMatchers, bytesLimiter, tenant, false) closeFns = append(closeFns, closers...) if err != nil { return nil, closeFns, errors.Wrap(err, "fetch expanded postings from cache")