diff --git a/internal/consuming/offset_state.go b/internal/consuming/offset_state.go index cbd0262..4a0b712 100644 --- a/internal/consuming/offset_state.go +++ b/internal/consuming/offset_state.go @@ -640,8 +640,9 @@ func (s *defaultOffsetState) setMap(key OffsetStoreKey, value *Offset) bool { } // 1x1 replace + isNewValue := !value.Equals(&list[offsetIndex].value) list[offsetIndex].value = *value - return true + return isNewValue } if !Intersects(item.start, item.end, start, end) { diff --git a/internal/consuming/offset_state_test.go b/internal/consuming/offset_state_test.go index 0760b64..fea393b 100644 --- a/internal/consuming/offset_state_test.go +++ b/internal/consuming/offset_state_test.go @@ -418,6 +418,23 @@ var _ = Describe("defaultOffsetState", func() { Expect(s.offsetMap[key][4]).To(Equal(offsetRange{start: startC3T2_3, end: endC3T2_3, value: offsetEnd})) }) + It("should return false when offset already exist with the same value", func () { + s := newTestOffsetState(offsetMap, consumerRanges) + result := s.Set(group, topic, valueC3_T0_1, OffsetCommitNone) + Expect(result).To(BeFalse()) + + // Even with different source + value := valueC3_T0_1 + value.Source.Timestamp = time.Now().UnixMicro() + result = s.Set(group, topic, value, OffsetCommitNone) + Expect(result).To(BeFalse()) + + // When changing the actual value, it should return true + value.Offset++ + result = s.Set(group, topic, value, OffsetCommitNone) + Expect(result).To(BeTrue(), "should return true when changing the actual value") + }) + It("should insert a range at the beginning", func() { // Empty initial map s := newTestOffsetState(nil, consumerRanges) diff --git a/internal/types/models_offset.go b/internal/types/models_offset.go index a444ea0..f27ac73 100644 --- a/internal/types/models_offset.go +++ b/internal/types/models_offset.go @@ -60,6 +60,14 @@ func NewDefaultOffset(topic *TopicDataId, clusterSize int, value int64) Offset { } } +// Compares the values in the struct, except the source +func (o *Offset) Equals(other *Offset) bool { + if o.GenId() == other.GenId() && o.Index == other.Index && o.Offset == other.Offset { + return true + } + return false +} + func (o *Offset) GenId() GenId { return GenId{ Start: o.Token,