Skip to content

Commit

Permalink
fix: Only replicate offsets when the value changed
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgebay authored Nov 23, 2022
1 parent a19d3d4 commit cdb02d5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internal/consuming/offset_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,9 @@ func (s *defaultOffsetState) setMap(key OffsetStoreKey, value *Offset) bool {
}

// 1x1 replace
isNewValue := !value.Equals(&list[offsetIndex].value)
list[offsetIndex].value = *value
return true
return isNewValue
}

if !Intersects(item.start, item.end, start, end) {
Expand Down
17 changes: 17 additions & 0 deletions internal/consuming/offset_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,23 @@ var _ = Describe("defaultOffsetState", func() {
Expect(s.offsetMap[key][4]).To(Equal(offsetRange{start: startC3T2_3, end: endC3T2_3, value: offsetEnd}))
})

It("should return false when offset already exist with the same value", func () {
s := newTestOffsetState(offsetMap, consumerRanges)
result := s.Set(group, topic, valueC3_T0_1, OffsetCommitNone)
Expect(result).To(BeFalse())

// Even with different source
value := valueC3_T0_1
value.Source.Timestamp = time.Now().UnixMicro()
result = s.Set(group, topic, value, OffsetCommitNone)
Expect(result).To(BeFalse())

// When changing the actual value, it should return true
value.Offset++
result = s.Set(group, topic, value, OffsetCommitNone)
Expect(result).To(BeTrue(), "should return true when changing the actual value")
})

It("should insert a range at the beginning", func() {
// Empty initial map
s := newTestOffsetState(nil, consumerRanges)
Expand Down
8 changes: 8 additions & 0 deletions internal/types/models_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ func NewDefaultOffset(topic *TopicDataId, clusterSize int, value int64) Offset {
}
}

// Compares the values in the struct, except the source
func (o *Offset) Equals(other *Offset) bool {
if o.GenId() == other.GenId() && o.Index == other.Index && o.Offset == other.Offset {
return true
}
return false
}

func (o *Offset) GenId() GenId {
return GenId{
Start: o.Token,
Expand Down

0 comments on commit cdb02d5

Please sign in to comment.