Skip to content

Commit 4df0441

Browse files
celikfatihmjsax
authored andcommitted
KAFKA-19724: Global stream thread should not ignore any exceptions (#20668)
Kafka Streams does not catch Error types that occur during `GlobalStreamThread` initiation, and therefore it is not possible to trace the error (for example, an `ExceptionInInitializerError` occurs when RocksDB is not found for a global store). This is because errors are not caught and logged. The catch block in `GlobalStreamThread#initialize()` has been ensured to catch `Throwable` instead of `Exception`. Additionally, the empty `setUncaughtHandler` set operation that prevented this from taking effect when users employed setUncaughtExceptionHandler has been removed. Reviewers: Matthias J. Sax <[email protected]>
1 parent 8b11e9f commit 4df0441

File tree

3 files changed

+48
-13
lines changed

3 files changed

+48
-13
lines changed

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -476,11 +476,6 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler us
476476
}
477477
processStreamThread(thread -> thread.setUncaughtExceptionHandler((t, e) -> { }
478478
));
479-
480-
if (globalStreamThread != null) {
481-
globalStreamThread.setUncaughtExceptionHandler((t, e) -> { }
482-
);
483-
}
484479
} else {
485480
throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). " +
486481
"Current state is: " + state);

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ static class StateConsumer {
243243
}
244244

245245
/**
246-
* @throws IllegalStateException If store gets registered after initialized is already finished
246+
* @throws IllegalStateException If a store gets registered after initialized is already finished
247247
* @throws StreamsException if the store's change log does not contain the partition
248248
*/
249249
void initialize() {
@@ -433,7 +433,7 @@ private StateConsumer initialize() {
433433
} catch (final StreamsException fatalException) {
434434
closeStateConsumer(stateConsumer, false);
435435
startupException = fatalException;
436-
} catch (final Exception fatalException) {
436+
} catch (final Throwable fatalException) {
437437
closeStateConsumer(stateConsumer, false);
438438
startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException);
439439
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,7 @@ public void process(final Record<Object, Object> record) {
121121
);
122122

123123
baseDirectoryName = TestUtils.tempDirectory().getAbsolutePath();
124-
final HashMap<String, Object> properties = new HashMap<>();
125-
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
126-
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId");
127-
properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName);
128-
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
129-
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
124+
final HashMap<String, Object> properties = getStreamProperties();
130125
config = new StreamsConfig(properties);
131126
globalStreamThread = new GlobalStreamThread(
132127
builder.rewriteTopology(config).buildGlobalStateTopology(),
@@ -406,6 +401,51 @@ public void shouldTimeOutOnGlobalConsumerInstanceId() throws Exception {
406401
}
407402
}
408403

404+
@Test
405+
public void shouldThrowStreamsExceptionOnStartupIfThrowableOccurred() throws Exception {
406+
final String exceptionMessage = "Throwable occurred!";
407+
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
408+
@Override
409+
public List<PartitionInfo> partitionsFor(final String topic) {
410+
throw new ExceptionInInitializerError(exceptionMessage);
411+
}
412+
};
413+
final StateStore globalStore = builder.globalStateStores().get(GLOBAL_STORE_NAME);
414+
globalStreamThread = new GlobalStreamThread(
415+
builder.buildGlobalStateTopology(),
416+
config,
417+
consumer,
418+
new StateDirectory(config, time, true, false),
419+
0,
420+
new StreamsMetricsImpl(new Metrics(), "test-client", "processId", time),
421+
time,
422+
"clientId",
423+
stateRestoreListener,
424+
e -> { }
425+
);
426+
427+
try {
428+
globalStreamThread.start();
429+
fail("Should have thrown StreamsException if start up failed");
430+
} catch (final StreamsException e) {
431+
assertThat(e.getCause(), instanceOf(Throwable.class));
432+
assertThat(e.getCause().getMessage(), equalTo(exceptionMessage));
433+
}
434+
globalStreamThread.join();
435+
assertThat(globalStore.isOpen(), is(false));
436+
assertFalse(globalStreamThread.stillRunning());
437+
}
438+
439+
private HashMap<String, Object> getStreamProperties() {
440+
final HashMap<String, Object> properties = new HashMap<>();
441+
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
442+
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId");
443+
properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName);
444+
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
445+
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
446+
return properties;
447+
}
448+
409449
private void initializeConsumer() {
410450
mockConsumer.updatePartitions(
411451
GLOBAL_STORE_TOPIC_NAME,

0 commit comments

Comments
 (0)