diff --git a/af/tmq/consumer.go b/af/tmq/consumer.go index aab447c..ddc2e0a 100644 --- a/af/tmq/consumer.go +++ b/af/tmq/consumer.go @@ -65,12 +65,12 @@ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) err for _, topic := range topics { errCode := wrapper.TMQListAppend(topicList, topic) if errCode != 0 { - return c.tmqError(errCode) + return tmqError(errCode) } } errCode := wrapper.TMQSubscribe(c.cConsumer, topicList) if errCode != 0 { - return c.tmqError(errCode) + return tmqError(errCode) } return nil } @@ -79,7 +79,7 @@ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) err func (c *Consumer) Unsubscribe() error { errCode := wrapper.TMQUnsubscribe(c.cConsumer) if errCode != taosError.SUCCESS { - return c.tmqError(errCode) + return tmqError(errCode) } return nil } @@ -199,7 +199,7 @@ func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) { func (c *Consumer) Commit() ([]tmq.TopicPartition, error) { errCode := wrapper.TMQCommitSync(c.cConsumer, nil) if errCode != taosError.SUCCESS { - return nil, c.tmqError(errCode) + return nil, tmqError(errCode) } partitions, err := c.Assignment() if err != nil { @@ -208,18 +208,10 @@ func (c *Consumer) Commit() ([]tmq.TopicPartition, error) { return c.Committed(partitions, 0) } -func (c *Consumer) doCommit(message unsafe.Pointer) ([]tmq.TopicPartition, error) { - errCode := wrapper.TMQCommitSync(c.cConsumer, message) - if errCode != taosError.SUCCESS { - return nil, c.tmqError(errCode) - } - return nil, nil -} - func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) { errCode, list := wrapper.TMQSubscription(c.cConsumer) if errCode != taosError.SUCCESS { - return nil, c.tmqError(errCode) + return nil, tmqError(errCode) } defer wrapper.TMQListDestroy(list) size := wrapper.TMQListGetSize(list) @@ -227,7 +219,7 @@ func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) { for _, topic := range topics { errCode, assignment := wrapper.TMQGetTopicAssignment(c.cConsumer, topic) if errCode != taosError.SUCCESS { - return nil, c.tmqError(errCode) + return nil, tmqError(errCode) } for i := 0; i < len(assignment); i++ { topicName := topic @@ -244,7 +236,7 @@ func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) { func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error { errCode := wrapper.TMQOffsetSeek(c.cConsumer, *partition.Topic, partition.Partition, int64(partition.Offset)) if errCode != taosError.SUCCESS { - return c.tmqError(errCode) + return tmqError(errCode) } return nil } @@ -255,7 +247,7 @@ func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (of cOffset := wrapper.TMQCommitted(c.cConsumer, *partitions[i].Topic, partitions[i].Partition) offset := tmq.Offset(cOffset) if !offset.Valid() { - return nil, c.tmqError(int32(offset)) + return nil, tmqError(int32(offset)) } offsets[i] = tmq.TopicPartition{ Topic: partitions[i].Topic, @@ -270,7 +262,7 @@ func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicParti for i := 0; i < len(offsets); i++ { errCode := wrapper.TMQCommitOffsetSync(c.cConsumer, *offsets[i].Topic, offsets[i].Partition, int64(offsets[i].Offset)) if errCode != taosError.SUCCESS { - return nil, c.tmqError(errCode) + return nil, tmqError(errCode) } } return c.Committed(offsets, 0) @@ -281,7 +273,7 @@ func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.Topi for i := 0; i < len(partitions); i++ { position := wrapper.TMQPosition(c.cConsumer, *partitions[i].Topic, partitions[i].Partition) if position < 0 { - return nil, c.tmqError(int32(position)) + return nil, tmqError(int32(position)) } offsets[i] = tmq.TopicPartition{ Topic: partitions[i].Topic, @@ -296,12 +288,12 @@ func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.Topi func (c *Consumer) Close() error { errCode := wrapper.TMQConsumerClose(c.cConsumer) if errCode != 0 { - return c.tmqError(errCode) + return tmqError(errCode) } return nil } -func (c *Consumer) tmqError(errCode int32) error { +func tmqError(errCode int32) error { errStr := wrapper.TMQErr2Str(errCode) return taosError.NewError(int(errCode), errStr) } diff --git a/af/tmq/consumer_test.go b/af/tmq/consumer_test.go index 47252b7..ebee650 100644 --- a/af/tmq/consumer_test.go +++ b/af/tmq/consumer_test.go @@ -493,3 +493,9 @@ func TestMeta(t *testing.T) { } } } + +func Test_tmqError(t *testing.T) { + err := tmqError(-1) + expectError := &errors.TaosError{Code: 65535, ErrStr: "fail"} + assert.Equal(t, expectError, err) +}