Skip to content

Commit

Permalink
test: add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Nov 8, 2024
1 parent 526835e commit 3090beb
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 20 deletions.
32 changes: 12 additions & 20 deletions af/tmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) err
for _, topic := range topics {
errCode := wrapper.TMQListAppend(topicList, topic)
if errCode != 0 {
return c.tmqError(errCode)
return tmqError(errCode)
}
}
errCode := wrapper.TMQSubscribe(c.cConsumer, topicList)
if errCode != 0 {
return c.tmqError(errCode)
return tmqError(errCode)
}
return nil
}
Expand All @@ -79,7 +79,7 @@ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) err
func (c *Consumer) Unsubscribe() error {
errCode := wrapper.TMQUnsubscribe(c.cConsumer)
if errCode != taosError.SUCCESS {
return c.tmqError(errCode)
return tmqError(errCode)
}
return nil
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) {
func (c *Consumer) Commit() ([]tmq.TopicPartition, error) {
errCode := wrapper.TMQCommitSync(c.cConsumer, nil)
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
return nil, tmqError(errCode)
}
partitions, err := c.Assignment()
if err != nil {
Expand All @@ -208,26 +208,18 @@ func (c *Consumer) Commit() ([]tmq.TopicPartition, error) {
return c.Committed(partitions, 0)
}

func (c *Consumer) doCommit(message unsafe.Pointer) ([]tmq.TopicPartition, error) {
errCode := wrapper.TMQCommitSync(c.cConsumer, message)
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
}
return nil, nil
}

func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) {
errCode, list := wrapper.TMQSubscription(c.cConsumer)
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
return nil, tmqError(errCode)
}
defer wrapper.TMQListDestroy(list)
size := wrapper.TMQListGetSize(list)
topics := wrapper.TMQListToCArray(list, int(size))
for _, topic := range topics {
errCode, assignment := wrapper.TMQGetTopicAssignment(c.cConsumer, topic)
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
return nil, tmqError(errCode)
}
for i := 0; i < len(assignment); i++ {
topicName := topic
Expand All @@ -244,7 +236,7 @@ func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) {
func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error {
errCode := wrapper.TMQOffsetSeek(c.cConsumer, *partition.Topic, partition.Partition, int64(partition.Offset))
if errCode != taosError.SUCCESS {
return c.tmqError(errCode)
return tmqError(errCode)
}
return nil
}
Expand All @@ -255,7 +247,7 @@ func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (of
cOffset := wrapper.TMQCommitted(c.cConsumer, *partitions[i].Topic, partitions[i].Partition)
offset := tmq.Offset(cOffset)
if !offset.Valid() {
return nil, c.tmqError(int32(offset))
return nil, tmqError(int32(offset))
}
offsets[i] = tmq.TopicPartition{
Topic: partitions[i].Topic,
Expand All @@ -270,7 +262,7 @@ func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicParti
for i := 0; i < len(offsets); i++ {
errCode := wrapper.TMQCommitOffsetSync(c.cConsumer, *offsets[i].Topic, offsets[i].Partition, int64(offsets[i].Offset))
if errCode != taosError.SUCCESS {
return nil, c.tmqError(errCode)
return nil, tmqError(errCode)
}
}
return c.Committed(offsets, 0)
Expand All @@ -281,7 +273,7 @@ func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.Topi
for i := 0; i < len(partitions); i++ {
position := wrapper.TMQPosition(c.cConsumer, *partitions[i].Topic, partitions[i].Partition)
if position < 0 {
return nil, c.tmqError(int32(position))
return nil, tmqError(int32(position))
}
offsets[i] = tmq.TopicPartition{
Topic: partitions[i].Topic,
Expand All @@ -296,12 +288,12 @@ func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.Topi
func (c *Consumer) Close() error {
errCode := wrapper.TMQConsumerClose(c.cConsumer)
if errCode != 0 {
return c.tmqError(errCode)
return tmqError(errCode)
}
return nil
}

func (c *Consumer) tmqError(errCode int32) error {
func tmqError(errCode int32) error {
errStr := wrapper.TMQErr2Str(errCode)
return taosError.NewError(int(errCode), errStr)
}
6 changes: 6 additions & 0 deletions af/tmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,9 @@ func TestMeta(t *testing.T) {
}
}
}

func Test_tmqError(t *testing.T) {
err := tmqError(-1)
expectError := &errors.TaosError{Code: 65535, ErrStr: "fail"}
assert.Equal(t, expectError, err)
}

0 comments on commit 3090beb

Please sign in to comment.