44
44
import org .apache .kafka .clients .consumer .internals .events .BackgroundEvent ;
45
45
import org .apache .kafka .clients .consumer .internals .events .BackgroundEventHandler ;
46
46
import org .apache .kafka .clients .consumer .internals .events .CommitApplicationEvent ;
47
+ import org .apache .kafka .clients .consumer .internals .events .CommitOnCloseApplicationEvent ;
47
48
import org .apache .kafka .clients .consumer .internals .events .CompletableApplicationEvent ;
48
49
import org .apache .kafka .clients .consumer .internals .events .ConsumerRebalanceListenerCallbackCompletedEvent ;
49
50
import org .apache .kafka .clients .consumer .internals .events .ConsumerRebalanceListenerCallbackNeededEvent ;
50
51
import org .apache .kafka .clients .consumer .internals .events .ErrorBackgroundEvent ;
51
52
import org .apache .kafka .clients .consumer .internals .events .EventProcessor ;
52
53
import org .apache .kafka .clients .consumer .internals .events .FetchCommittedOffsetsApplicationEvent ;
53
54
import org .apache .kafka .clients .consumer .internals .events .GroupMetadataUpdateEvent ;
55
+ import org .apache .kafka .clients .consumer .internals .events .LeaveOnCloseApplicationEvent ;
54
56
import org .apache .kafka .clients .consumer .internals .events .ListOffsetsApplicationEvent ;
55
57
import org .apache .kafka .clients .consumer .internals .events .NewTopicsMetadataUpdateRequestEvent ;
56
58
import org .apache .kafka .clients .consumer .internals .events .ResetPositionsApplicationEvent ;
83
85
import org .apache .kafka .common .utils .LogContext ;
84
86
import org .apache .kafka .common .utils .Time ;
85
87
import org .apache .kafka .common .utils .Timer ;
88
+ import org .apache .kafka .common .utils .Utils ;
86
89
import org .slf4j .Logger ;
90
+ import org .slf4j .event .Level ;
87
91
88
92
import java .net .InetSocketAddress ;
89
93
import java .time .Duration ;
94
98
import java .util .HashSet ;
95
99
import java .util .List ;
96
100
import java .util .Map ;
101
+ import java .util .Objects ;
97
102
import java .util .Optional ;
98
103
import java .util .OptionalLong ;
99
104
import java .util .Set ;
100
105
import java .util .SortedSet ;
106
+ import java .util .TreeSet ;
101
107
import java .util .concurrent .BlockingQueue ;
102
108
import java .util .concurrent .CompletableFuture ;
103
109
import java .util .concurrent .Future ;
124
130
import static org .apache .kafka .common .utils .Utils .closeQuietly ;
125
131
import static org .apache .kafka .common .utils .Utils .isBlank ;
126
132
import static org .apache .kafka .common .utils .Utils .join ;
133
+ import static org .apache .kafka .common .utils .Utils .swallow ;
127
134
128
135
/**
129
136
* This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process
@@ -245,7 +252,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
245
252
246
253
private final ApplicationEventHandler applicationEventHandler ;
247
254
private final Time time ;
248
- private Optional <ConsumerGroupMetadata > groupMetadata ;
255
+ private Optional <ConsumerGroupMetadata > groupMetadata = Optional . empty () ;
249
256
private final KafkaConsumerMetrics kafkaConsumerMetrics ;
250
257
private Logger log ;
251
258
private final String clientId ;
@@ -268,6 +275,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
268
275
private final Metrics metrics ;
269
276
private final long retryBackoffMs ;
270
277
private final int defaultApiTimeoutMs ;
278
+ private final boolean autoCommitEnabled ;
271
279
private volatile boolean closed = false ;
272
280
private final List <ConsumerPartitionAssignor > assignors ;
273
281
private final Optional <ClientTelemetryReporter > clientTelemetryReporter ;
@@ -313,6 +321,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
313
321
GroupRebalanceConfig .ProtocolType .CONSUMER
314
322
);
315
323
this .clientId = config .getString (CommonClientConfigs .CLIENT_ID_CONFIG );
324
+ this .autoCommitEnabled = config .getBoolean (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG );
316
325
LogContext logContext = createLogContext (config , groupRebalanceConfig );
317
326
this .log = logContext .logger (getClass ());
318
327
@@ -434,6 +443,51 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
434
443
}
435
444
436
445
// Visible for testing
446
+ AsyncKafkaConsumer (LogContext logContext ,
447
+ String clientId ,
448
+ Deserializers <K , V > deserializers ,
449
+ FetchBuffer fetchBuffer ,
450
+ FetchCollector <K , V > fetchCollector ,
451
+ ConsumerInterceptors <K , V > interceptors ,
452
+ Time time ,
453
+ ApplicationEventHandler applicationEventHandler ,
454
+ BlockingQueue <BackgroundEvent > backgroundEventQueue ,
455
+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker ,
456
+ Metrics metrics ,
457
+ SubscriptionState subscriptions ,
458
+ ConsumerMetadata metadata ,
459
+ long retryBackoffMs ,
460
+ int defaultApiTimeoutMs ,
461
+ List <ConsumerPartitionAssignor > assignors ,
462
+ String groupId ,
463
+ boolean autoCommitEnabled ) {
464
+ this .log = logContext .logger (getClass ());
465
+ this .subscriptions = subscriptions ;
466
+ this .clientId = clientId ;
467
+ this .fetchBuffer = fetchBuffer ;
468
+ this .fetchCollector = fetchCollector ;
469
+ this .isolationLevel = IsolationLevel .READ_UNCOMMITTED ;
470
+ this .interceptors = Objects .requireNonNull (interceptors );
471
+ this .time = time ;
472
+ this .backgroundEventProcessor = new BackgroundEventProcessor (
473
+ logContext ,
474
+ backgroundEventQueue ,
475
+ applicationEventHandler ,
476
+ rebalanceListenerInvoker
477
+ );
478
+ this .metrics = metrics ;
479
+ this .groupMetadata = initializeGroupMetadata (groupId , Optional .empty ());
480
+ this .metadata = metadata ;
481
+ this .retryBackoffMs = retryBackoffMs ;
482
+ this .defaultApiTimeoutMs = defaultApiTimeoutMs ;
483
+ this .deserializers = deserializers ;
484
+ this .applicationEventHandler = applicationEventHandler ;
485
+ this .assignors = assignors ;
486
+ this .kafkaConsumerMetrics = new KafkaConsumerMetrics (metrics , "consumer" );
487
+ this .clientTelemetryReporter = Optional .empty ();
488
+ this .autoCommitEnabled = autoCommitEnabled ;
489
+ }
490
+
437
491
AsyncKafkaConsumer (LogContext logContext ,
438
492
Time time ,
439
493
ConsumerConfig config ,
@@ -446,6 +500,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
446
500
this .log = logContext .logger (getClass ());
447
501
this .subscriptions = subscriptions ;
448
502
this .clientId = config .getString (ConsumerConfig .CLIENT_ID_CONFIG );
503
+ this .autoCommitEnabled = config .getBoolean (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG );
449
504
this .fetchBuffer = new FetchBuffer (logContext );
450
505
this .isolationLevel = IsolationLevel .READ_UNCOMMITTED ;
451
506
this .interceptors = new ConsumerInterceptors <>(Collections .emptyList ());
@@ -1159,15 +1214,12 @@ private void close(Duration timeout, boolean swallowException) {
1159
1214
final Timer closeTimer = time .timer (timeout );
1160
1215
clientTelemetryReporter .ifPresent (reporter -> reporter .initiateClose (timeout .toMillis ()));
1161
1216
closeTimer .update ();
1162
-
1217
+ // Prepare shutting down the network thread
1218
+ prepareShutdown (closeTimer , firstException );
1219
+ closeTimer .update ();
1163
1220
if (applicationEventHandler != null )
1164
- closeQuietly (() -> applicationEventHandler .close (Duration .ofMillis (closeTimer .remainingMs ())), "Failed to close application event handler with a timeout(ms)=" + closeTimer .remainingMs (), firstException );
1165
-
1166
- // Invoke all callbacks after the background thread exists in case if there are unsent async
1167
- // commits
1168
- maybeInvokeCommitCallbacks ();
1169
-
1170
- closeQuietly (fetchBuffer , "Failed to close the fetch buffer" , firstException );
1221
+ closeQuietly (() -> applicationEventHandler .close (Duration .ofMillis (closeTimer .remainingMs ())), "Failed shutting down network thread" , firstException );
1222
+ closeTimer .update ();
1171
1223
closeQuietly (interceptors , "consumer interceptors" , firstException );
1172
1224
closeQuietly (kafkaConsumerMetrics , "kafka consumer metrics" , firstException );
1173
1225
closeQuietly (metrics , "consumer metrics" , firstException );
@@ -1185,6 +1237,74 @@ private void close(Duration timeout, boolean swallowException) {
1185
1237
}
1186
1238
}
1187
1239
1240
+ /**
1241
+ * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
1242
+ * 1. autocommit offsets
1243
+ * 2. revoke all partitions
1244
+ * 3. if partition revocation completes successfully, send leave group
1245
+ * 4. invoke all async commit callbacks if there is any
1246
+ */
1247
+ void prepareShutdown (final Timer timer , final AtomicReference <Throwable > firstException ) {
1248
+ if (!groupMetadata .isPresent ())
1249
+ return ;
1250
+ maybeAutoCommitSync (autoCommitEnabled , timer , firstException );
1251
+ applicationEventHandler .add (new CommitOnCloseApplicationEvent ());
1252
+ completeQuietly (
1253
+ () -> {
1254
+ maybeRevokePartitions ();
1255
+ applicationEventHandler .addAndGet (new LeaveOnCloseApplicationEvent (), timer );
1256
+ },
1257
+ "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer .timeoutMs (), firstException );
1258
+ swallow (log , Level .ERROR , "Failed invoking asynchronous commit callback." , this ::maybeInvokeCommitCallbacks ,
1259
+ firstException );
1260
+ }
1261
+
1262
+ // Visible for testing
1263
+ void maybeAutoCommitSync (final boolean shouldAutoCommit ,
1264
+ final Timer timer ,
1265
+ final AtomicReference <Throwable > firstException ) {
1266
+ if (!shouldAutoCommit )
1267
+ return ;
1268
+ Map <TopicPartition , OffsetAndMetadata > allConsumed = subscriptions .allConsumed ();
1269
+ log .debug ("Sending synchronous auto-commit of offsets {} on closing" , allConsumed );
1270
+ try {
1271
+ commitSync (allConsumed , Duration .ofMillis (timer .remainingMs ()));
1272
+ } catch (Exception e ) {
1273
+ // consistent with async auto-commit failures, we do not propagate the exception
1274
+ log .warn ("Synchronous auto-commit of offsets {} failed: {}" , allConsumed , e .getMessage ());
1275
+ }
1276
+ timer .update ();
1277
+ }
1278
+
1279
+ // Visible for testing
1280
+ void maybeRevokePartitions () {
1281
+ if (!subscriptions .hasAutoAssignedPartitions () || subscriptions .assignedPartitions ().isEmpty ())
1282
+ return ;
1283
+ try {
1284
+ SortedSet <TopicPartition > droppedPartitions = new TreeSet <>(MembershipManagerImpl .TOPIC_PARTITION_COMPARATOR );
1285
+ droppedPartitions .addAll (subscriptions .assignedPartitions ());
1286
+ if (subscriptions .rebalanceListener ().isPresent ())
1287
+ subscriptions .rebalanceListener ().get ().onPartitionsRevoked (droppedPartitions );
1288
+ } catch (Exception e ) {
1289
+ throw new KafkaException (e );
1290
+ } finally {
1291
+ subscriptions .assignFromSubscribed (Collections .emptySet ());
1292
+ }
1293
+ }
1294
+
1295
+ // Visible for testing
1296
+ void completeQuietly (final Utils .ThrowingRunnable function ,
1297
+ final String msg ,
1298
+ final AtomicReference <Throwable > firstException ) {
1299
+ try {
1300
+ function .run ();
1301
+ } catch (TimeoutException e ) {
1302
+ log .debug ("Timeout expired before the {} operation could complete." , msg );
1303
+ } catch (Exception e ) {
1304
+ firstException .compareAndSet (null , e );
1305
+ }
1306
+ }
1307
+
1188
1308
@ Override
1189
1309
public void wakeup () {
1190
1310
wakeupTrigger .wakeup ();
0 commit comments