1717package org .apache .kafka .common .requests ;
1818
1919import org .apache .kafka .common .TopicPartition ;
20- import org .apache .kafka .common .Uuid ;
2120import org .apache .kafka .common .message .ListOffsetsResponseData ;
2221import org .apache .kafka .common .message .ListOffsetsResponseData .ListOffsetsPartitionResponse ;
2322import org .apache .kafka .common .message .ListOffsetsResponseData .ListOffsetsTopicResponse ;
2827
2928import java .util .Collections ;
3029import java .util .EnumMap ;
31- import java .util .HashMap ;
3230import java .util .List ;
3331import java .util .Map ;
34- import java .util .function .Function ;
3532
3633/**
3734 * Possible error codes:
@@ -109,14 +106,6 @@ public static boolean useTopicIds(short version) {
109106 return version >= 12 ;
110107 }
111108
112- public static Builder newBuilder (boolean useTopicIds ) {
113- if (useTopicIds ) {
114- return new TopicIdBuilder ();
115- } else {
116- return new TopicNameBuilder ();
117- }
118- }
119-
120109 public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse (TopicPartition tp , Errors error , long timestamp , long offset , int epoch ) {
121110 return new ListOffsetsTopicResponse ()
122111 .setName (tp .topic ())
@@ -128,154 +117,5 @@ public static ListOffsetsTopicResponse singletonListOffsetsTopicResponse(TopicPa
128117 .setLeaderEpoch (epoch )));
129118 }
130119
131- public abstract static class Builder {
132- protected ListOffsetsResponseData data = new ListOffsetsResponseData ();
133-
134- protected abstract void add (
135- ListOffsetsTopicResponse topic
136- );
137-
138- protected abstract ListOffsetsTopicResponse get (
139- Uuid topicId ,
140- String topicName
141- );
142-
143- protected abstract ListOffsetsTopicResponse getOrCreate (
144- Uuid topicId ,
145- String topicName
146- );
147-
148- public Builder addPartition (
149- Uuid topicId ,
150- String topicName ,
151- int partitionIndex ,
152- Errors error
153- ) {
154- final ListOffsetsTopicResponse topicResponse = getOrCreate (topicId , topicName );
155- topicResponse .partitions ().add (new ListOffsetsPartitionResponse ()
156- .setPartitionIndex (partitionIndex )
157- .setErrorCode (error .code ()));
158- return this ;
159- }
160-
161- public <P > Builder addPartitions (
162- Uuid topicId ,
163- String topicName ,
164- List <P > partitions ,
165- Function <P , Integer > partitionIndex ,
166- Errors error
167- ) {
168- final ListOffsetsTopicResponse topicResponse = getOrCreate (topicId , topicName );
169- partitions .forEach (partition ->
170- topicResponse .partitions ().add (new ListOffsetsPartitionResponse ()
171- .setPartitionIndex (partitionIndex .apply (partition ))
172- .setErrorCode (error .code ()))
173- );
174- return this ;
175- }
176-
177- public Builder merge (
178- ListOffsetsResponseData newData
179- ) {
180- if (data .topics ().isEmpty ()) {
181- // If the current data is empty, we can discard it and use the new data.
182- data = newData ;
183- } else {
184- // Otherwise, we have to merge them together.
185- newData .topics ().forEach (newTopic -> {
186- ListOffsetsTopicResponse existingTopic = get (newTopic .topicId (), newTopic .name ());
187- if (existingTopic == null ) {
188- // If no topic exists, we can directly copy the new topic data.
189- add (newTopic );
190- } else {
191- // Otherwise, we add the partitions to the existing one. Note we
192- // expect non-overlapping partitions here as we don't verify
193- // if the partition is already in the list before adding it.
194- existingTopic .partitions ().addAll (newTopic .partitions ());
195- }
196- });
197- }
198- return this ;
199- }
200-
201- public ListOffsetsResponse build () {
202- return new ListOffsetsResponse (data );
203- }
204-
205- }
206-
207- public static class TopicIdBuilder extends Builder {
208- private final HashMap <Uuid , ListOffsetsTopicResponse > byTopicId = new HashMap <>();
209-
210- @ Override
211- protected void add (ListOffsetsTopicResponse topic ) {
212- throwIfTopicIdIsNull (topic .topicId ());
213- data .topics ().add (topic );
214- byTopicId .put (topic .topicId (), topic );
215- }
216-
217- @ Override
218- protected ListOffsetsTopicResponse get (Uuid topicId , String topicName ) {
219- throwIfTopicIdIsNull (topicId );
220- return byTopicId .get (topicId );
221- }
222-
223- @ Override
224- protected ListOffsetsResponseData .ListOffsetsTopicResponse getOrCreate (Uuid topicId , String topicName ) {
225- throwIfTopicIdIsNull (topicId );
226- ListOffsetsResponseData .ListOffsetsTopicResponse topic = byTopicId .get (topicId );
227- if (topic == null ) {
228- topic = new ListOffsetsResponseData .ListOffsetsTopicResponse ()
229- .setName (topicName )
230- .setTopicId (topicId );
231- data .topics ().add (topic );
232- byTopicId .put (topicId , topic );
233- }
234- return topic ;
235- }
236-
237- private static void throwIfTopicIdIsNull (Uuid topicId ) {
238- if (topicId == null ) {
239- throw new IllegalArgumentException ("TopicId cannot be null." );
240- }
241- }
242- }
243-
244- public static class TopicNameBuilder extends Builder {
245- private final HashMap <String , ListOffsetsTopicResponse > byTopicName = new HashMap <>();
246-
247- @ Override
248- protected void add (ListOffsetsTopicResponse topic ) {
249- throwIfTopicNameIsNull (topic .name ());
250- data .topics ().add (topic );
251- byTopicName .put (topic .name (), topic );
252- }
253-
254- @ Override
255- protected ListOffsetsTopicResponse get (Uuid topicId , String topicName ) {
256- throwIfTopicNameIsNull (topicName );
257- return byTopicName .get (topicName );
258- }
259-
260- @ Override
261- protected ListOffsetsTopicResponse getOrCreate (Uuid topicId , String topicName ) {
262- throwIfTopicNameIsNull (topicName );
263- ListOffsetsTopicResponse topic = byTopicName .get (topicName );
264- if (topic == null ) {
265- topic = new ListOffsetsTopicResponse ()
266- .setName (topicName )
267- .setTopicId (topicId );
268- data .topics ().add (topic );
269- byTopicName .put (topicName , topic );
270- }
271- return topic ;
272- }
273-
274- private void throwIfTopicNameIsNull (String topicName ) {
275- if (topicName == null ) {
276- throw new IllegalArgumentException ("TopicName cannot be null." );
277- }
278- }
279- }
280120
281121}
0 commit comments