go get github.com/lantern-db/[email protected]
topic := gpubsub.NewTopic[int](topicName, concurrency, interval, ttl)
subscription := topic.NewSubscription("DummyConsumer")
subscription.Subscribe(ctx, func (m *gpubsub.Message[int]) {
// get the content of message which has type T
message := m.Body()
// some consumer process
if err == nil {
// Ack if succeed
m.Ack()
} else {
// Nack if failed, retry later
m.Nack()
}
})
topic.Publish(1)
ctx, cancel := context.WithCancel(context.Background())
topicName := "DummyData"
concurrency := int64(2)
interval := 30 * time.Second
ttl := 1 * time.Hour
topic := gpubsub.NewTopic[int](topicName, concurrency, interval, ttl)
subscription := topic.NewSubscription("DummyConsumer")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
subscription.Subscribe(ctx, func(m gpubsub.Message[int]) {
log.Printf("data: %d\n", m.Body())
m.Ack()
})
}()
for i := 0; i < 10; i++ {
topic.Publish(i)
}
cancel()
wg.Wait()
It will show belows.
2022/04/10 13:59:26 closing subscription: DummyConsumer
2022/04/10 13:59:27 data: 0
2022/04/10 13:59:27 data: 1
2022/04/10 13:59:28 data: 4
2022/04/10 13:59:28 data: 3
2022/04/10 13:59:29 data: 5
2022/04/10 13:59:29 data: 6
2022/04/10 13:59:30 data: 7
2022/04/10 13:59:30 data: 8
2022/04/10 13:59:31 data: 9
2022/04/10 13:59:31 data: 2