-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Open
Labels
status: pending-design-workNeeds design work before any code can be developedNeeds design work before any code can be developed
Description
We are continuously listening for messages on a stream. When we use block > 0(We do not want to set it to 0) in the XREADGROUP command, spring is creating new connections for every read. This results in so many TIME_WAIT ports on the client which builds up rapidly if there are many incoming messages. For our use case, we need to set block > 0 and also re-use the same connection. Checking the code in DefaultStreamReceiver
's constructor, looks like overriding the StreamReadOptions.isBlocking()
to always return false will solve our problem but the DefaultStreamReceiver
does not take a custom StreamReadOptions
. Can you expose a method like setReadOptions(reaOptions)
on DefaultStreamReceiver
to take it as an argument?
Sample code I use:
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofSeconds(30)).build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(factory, options);
receiver.receiveAutoAck(Consumer.from("group", "consumer"), StreamOffset.create("stream", ReadOffset.lastConsumed()))//
.flatMap(record -> Mono.fromCallable(() -> consume(record)).subscribeOn(Schedulers.boundedElastic()))//
.subscribe();
chingyi-lin and tinyhhj
Metadata
Metadata
Assignees
Labels
status: pending-design-workNeeds design work before any code can be developedNeeds design work before any code can be developed