diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index b479446389a9a..8594ea68c95d3 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -281,6 +281,11 @@ public void shouldPassMetrics(final String topologyType, final String groupProto streamsApplicationProperties = props(groupProtocol); final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); + shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT); + shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT); + } + + private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception { try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); @@ -292,8 +297,8 @@ public void shouldPassMetrics(final String topologyType, final String groupProto - final List consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList(); - final List adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList(); + final List consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList(); + final List adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList(); assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size()); diff --git a/streams/rocksdb/my-store/000840.sst b/streams/rocksdb/my-store/000840.sst new file mode 100644 index 0000000000000..78e88d70960fb Binary files /dev/null and b/streams/rocksdb/my-store/000840.sst differ diff --git a/streams/rocksdb/my-store/000841.sst b/streams/rocksdb/my-store/000841.sst new file mode 100644 index 0000000000000..03d2ae2aba22b Binary files /dev/null and b/streams/rocksdb/my-store/000841.sst differ diff --git a/streams/rocksdb/my-store/000848.sst b/streams/rocksdb/my-store/000848.sst new file mode 100644 index 0000000000000..5b4e4e20df20e Binary files /dev/null and b/streams/rocksdb/my-store/000848.sst differ diff --git a/streams/rocksdb/my-store/000851.log b/streams/rocksdb/my-store/000851.log new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/000855.sst b/streams/rocksdb/my-store/000855.sst new file mode 100644 index 0000000000000..318a1c9e90ba1 Binary files /dev/null and b/streams/rocksdb/my-store/000855.sst differ diff --git a/streams/rocksdb/my-store/CURRENT b/streams/rocksdb/my-store/CURRENT new file mode 100644 index 0000000000000..108d099be0ac4 --- /dev/null +++ b/streams/rocksdb/my-store/CURRENT @@ -0,0 +1 @@ +MANIFEST-000852 diff --git a/streams/rocksdb/my-store/IDENTITY b/streams/rocksdb/my-store/IDENTITY new file mode 100644 index 0000000000000..a7a9cd06ff835 --- /dev/null +++ b/streams/rocksdb/my-store/IDENTITY @@ -0,0 +1 @@ +7b84a3b9-3c9c-48be-8c17-824a92b86a98 \ No newline at end of file diff --git a/streams/rocksdb/my-store/LOCK b/streams/rocksdb/my-store/LOCK new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG b/streams/rocksdb/my-store/LOG new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792011407 b/streams/rocksdb/my-store/LOG.old.1761323792011407 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792046501 b/streams/rocksdb/my-store/LOG.old.1761323792046501 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792077198 b/streams/rocksdb/my-store/LOG.old.1761323792077198 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792119146 b/streams/rocksdb/my-store/LOG.old.1761323792119146 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792157400 b/streams/rocksdb/my-store/LOG.old.1761323792157400 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792186608 b/streams/rocksdb/my-store/LOG.old.1761323792186608 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792220476 b/streams/rocksdb/my-store/LOG.old.1761323792220476 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792280519 b/streams/rocksdb/my-store/LOG.old.1761323792280519 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792307235 b/streams/rocksdb/my-store/LOG.old.1761323792307235 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792330690 b/streams/rocksdb/my-store/LOG.old.1761323792330690 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792365936 b/streams/rocksdb/my-store/LOG.old.1761323792365936 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792404121 b/streams/rocksdb/my-store/LOG.old.1761323792404121 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792440846 b/streams/rocksdb/my-store/LOG.old.1761323792440846 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792464592 b/streams/rocksdb/my-store/LOG.old.1761323792464592 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792499803 b/streams/rocksdb/my-store/LOG.old.1761323792499803 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792558712 b/streams/rocksdb/my-store/LOG.old.1761323792558712 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792582399 b/streams/rocksdb/my-store/LOG.old.1761323792582399 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792608813 b/streams/rocksdb/my-store/LOG.old.1761323792608813 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792632113 b/streams/rocksdb/my-store/LOG.old.1761323792632113 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792669559 b/streams/rocksdb/my-store/LOG.old.1761323792669559 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792703358 b/streams/rocksdb/my-store/LOG.old.1761323792703358 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792748619 b/streams/rocksdb/my-store/LOG.old.1761323792748619 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792775665 b/streams/rocksdb/my-store/LOG.old.1761323792775665 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792797977 b/streams/rocksdb/my-store/LOG.old.1761323792797977 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792832641 b/streams/rocksdb/my-store/LOG.old.1761323792832641 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792871278 b/streams/rocksdb/my-store/LOG.old.1761323792871278 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792908607 b/streams/rocksdb/my-store/LOG.old.1761323792908607 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792943763 b/streams/rocksdb/my-store/LOG.old.1761323792943763 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323792983512 b/streams/rocksdb/my-store/LOG.old.1761323792983512 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793017405 b/streams/rocksdb/my-store/LOG.old.1761323793017405 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793054607 b/streams/rocksdb/my-store/LOG.old.1761323793054607 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793088341 b/streams/rocksdb/my-store/LOG.old.1761323793088341 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793121349 b/streams/rocksdb/my-store/LOG.old.1761323793121349 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793152806 b/streams/rocksdb/my-store/LOG.old.1761323793152806 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793188814 b/streams/rocksdb/my-store/LOG.old.1761323793188814 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793223410 b/streams/rocksdb/my-store/LOG.old.1761323793223410 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793260326 b/streams/rocksdb/my-store/LOG.old.1761323793260326 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793283769 b/streams/rocksdb/my-store/LOG.old.1761323793283769 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793307469 b/streams/rocksdb/my-store/LOG.old.1761323793307469 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793374975 b/streams/rocksdb/my-store/LOG.old.1761323793374975 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793399618 b/streams/rocksdb/my-store/LOG.old.1761323793399618 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793423906 b/streams/rocksdb/my-store/LOG.old.1761323793423906 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793457220 b/streams/rocksdb/my-store/LOG.old.1761323793457220 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793495748 b/streams/rocksdb/my-store/LOG.old.1761323793495748 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793562578 b/streams/rocksdb/my-store/LOG.old.1761323793562578 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793588606 b/streams/rocksdb/my-store/LOG.old.1761323793588606 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793655601 b/streams/rocksdb/my-store/LOG.old.1761323793655601 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793678108 b/streams/rocksdb/my-store/LOG.old.1761323793678108 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793704379 b/streams/rocksdb/my-store/LOG.old.1761323793704379 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793739212 b/streams/rocksdb/my-store/LOG.old.1761323793739212 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793811243 b/streams/rocksdb/my-store/LOG.old.1761323793811243 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793838647 b/streams/rocksdb/my-store/LOG.old.1761323793838647 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793906202 b/streams/rocksdb/my-store/LOG.old.1761323793906202 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793931033 b/streams/rocksdb/my-store/LOG.old.1761323793931033 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793957198 b/streams/rocksdb/my-store/LOG.old.1761323793957198 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323793983860 b/streams/rocksdb/my-store/LOG.old.1761323793983860 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794008122 b/streams/rocksdb/my-store/LOG.old.1761323794008122 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794034255 b/streams/rocksdb/my-store/LOG.old.1761323794034255 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794072516 b/streams/rocksdb/my-store/LOG.old.1761323794072516 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794101292 b/streams/rocksdb/my-store/LOG.old.1761323794101292 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794165485 b/streams/rocksdb/my-store/LOG.old.1761323794165485 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794188031 b/streams/rocksdb/my-store/LOG.old.1761323794188031 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794214060 b/streams/rocksdb/my-store/LOG.old.1761323794214060 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794238118 b/streams/rocksdb/my-store/LOG.old.1761323794238118 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794262160 b/streams/rocksdb/my-store/LOG.old.1761323794262160 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794330657 b/streams/rocksdb/my-store/LOG.old.1761323794330657 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794356353 b/streams/rocksdb/my-store/LOG.old.1761323794356353 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794419717 b/streams/rocksdb/my-store/LOG.old.1761323794419717 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794450337 b/streams/rocksdb/my-store/LOG.old.1761323794450337 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794476574 b/streams/rocksdb/my-store/LOG.old.1761323794476574 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794501540 b/streams/rocksdb/my-store/LOG.old.1761323794501540 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794524838 b/streams/rocksdb/my-store/LOG.old.1761323794524838 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794548891 b/streams/rocksdb/my-store/LOG.old.1761323794548891 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794616306 b/streams/rocksdb/my-store/LOG.old.1761323794616306 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794641749 b/streams/rocksdb/my-store/LOG.old.1761323794641749 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794667884 b/streams/rocksdb/my-store/LOG.old.1761323794667884 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794694258 b/streams/rocksdb/my-store/LOG.old.1761323794694258 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794720822 b/streams/rocksdb/my-store/LOG.old.1761323794720822 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323794748068 b/streams/rocksdb/my-store/LOG.old.1761323794748068 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885026226 b/streams/rocksdb/my-store/LOG.old.1761323885026226 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885074665 b/streams/rocksdb/my-store/LOG.old.1761323885074665 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885108076 b/streams/rocksdb/my-store/LOG.old.1761323885108076 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885134334 b/streams/rocksdb/my-store/LOG.old.1761323885134334 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885172161 b/streams/rocksdb/my-store/LOG.old.1761323885172161 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885219794 b/streams/rocksdb/my-store/LOG.old.1761323885219794 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885243845 b/streams/rocksdb/my-store/LOG.old.1761323885243845 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885279190 b/streams/rocksdb/my-store/LOG.old.1761323885279190 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885315885 b/streams/rocksdb/my-store/LOG.old.1761323885315885 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885354261 b/streams/rocksdb/my-store/LOG.old.1761323885354261 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885379604 b/streams/rocksdb/my-store/LOG.old.1761323885379604 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885418996 b/streams/rocksdb/my-store/LOG.old.1761323885418996 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885444833 b/streams/rocksdb/my-store/LOG.old.1761323885444833 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885480664 b/streams/rocksdb/my-store/LOG.old.1761323885480664 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885505552 b/streams/rocksdb/my-store/LOG.old.1761323885505552 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885539132 b/streams/rocksdb/my-store/LOG.old.1761323885539132 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885586663 b/streams/rocksdb/my-store/LOG.old.1761323885586663 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885614112 b/streams/rocksdb/my-store/LOG.old.1761323885614112 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885641755 b/streams/rocksdb/my-store/LOG.old.1761323885641755 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885670093 b/streams/rocksdb/my-store/LOG.old.1761323885670093 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885705579 b/streams/rocksdb/my-store/LOG.old.1761323885705579 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885738586 b/streams/rocksdb/my-store/LOG.old.1761323885738586 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885783722 b/streams/rocksdb/my-store/LOG.old.1761323885783722 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885808728 b/streams/rocksdb/my-store/LOG.old.1761323885808728 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885832851 b/streams/rocksdb/my-store/LOG.old.1761323885832851 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885870708 b/streams/rocksdb/my-store/LOG.old.1761323885870708 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885907147 b/streams/rocksdb/my-store/LOG.old.1761323885907147 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885941452 b/streams/rocksdb/my-store/LOG.old.1761323885941452 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323885973554 b/streams/rocksdb/my-store/LOG.old.1761323885973554 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886013270 b/streams/rocksdb/my-store/LOG.old.1761323886013270 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886046625 b/streams/rocksdb/my-store/LOG.old.1761323886046625 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886084501 b/streams/rocksdb/my-store/LOG.old.1761323886084501 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886120566 b/streams/rocksdb/my-store/LOG.old.1761323886120566 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886162303 b/streams/rocksdb/my-store/LOG.old.1761323886162303 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886200538 b/streams/rocksdb/my-store/LOG.old.1761323886200538 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886240053 b/streams/rocksdb/my-store/LOG.old.1761323886240053 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886276399 b/streams/rocksdb/my-store/LOG.old.1761323886276399 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886310994 b/streams/rocksdb/my-store/LOG.old.1761323886310994 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886338722 b/streams/rocksdb/my-store/LOG.old.1761323886338722 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886366805 b/streams/rocksdb/my-store/LOG.old.1761323886366805 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886437685 b/streams/rocksdb/my-store/LOG.old.1761323886437685 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886465143 b/streams/rocksdb/my-store/LOG.old.1761323886465143 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886492674 b/streams/rocksdb/my-store/LOG.old.1761323886492674 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886530538 b/streams/rocksdb/my-store/LOG.old.1761323886530538 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886569207 b/streams/rocksdb/my-store/LOG.old.1761323886569207 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886637625 b/streams/rocksdb/my-store/LOG.old.1761323886637625 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886664587 b/streams/rocksdb/my-store/LOG.old.1761323886664587 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886744431 b/streams/rocksdb/my-store/LOG.old.1761323886744431 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886770910 b/streams/rocksdb/my-store/LOG.old.1761323886770910 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886800440 b/streams/rocksdb/my-store/LOG.old.1761323886800440 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886835992 b/streams/rocksdb/my-store/LOG.old.1761323886835992 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886910514 b/streams/rocksdb/my-store/LOG.old.1761323886910514 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323886936764 b/streams/rocksdb/my-store/LOG.old.1761323886936764 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887007025 b/streams/rocksdb/my-store/LOG.old.1761323887007025 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887032292 b/streams/rocksdb/my-store/LOG.old.1761323887032292 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887056625 b/streams/rocksdb/my-store/LOG.old.1761323887056625 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887081797 b/streams/rocksdb/my-store/LOG.old.1761323887081797 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887106420 b/streams/rocksdb/my-store/LOG.old.1761323887106420 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887132311 b/streams/rocksdb/my-store/LOG.old.1761323887132311 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887168673 b/streams/rocksdb/my-store/LOG.old.1761323887168673 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887194513 b/streams/rocksdb/my-store/LOG.old.1761323887194513 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887261686 b/streams/rocksdb/my-store/LOG.old.1761323887261686 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887286410 b/streams/rocksdb/my-store/LOG.old.1761323887286410 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887309222 b/streams/rocksdb/my-store/LOG.old.1761323887309222 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887336666 b/streams/rocksdb/my-store/LOG.old.1761323887336666 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887362742 b/streams/rocksdb/my-store/LOG.old.1761323887362742 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887432782 b/streams/rocksdb/my-store/LOG.old.1761323887432782 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887458898 b/streams/rocksdb/my-store/LOG.old.1761323887458898 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887526859 b/streams/rocksdb/my-store/LOG.old.1761323887526859 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887552826 b/streams/rocksdb/my-store/LOG.old.1761323887552826 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887578854 b/streams/rocksdb/my-store/LOG.old.1761323887578854 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887604642 b/streams/rocksdb/my-store/LOG.old.1761323887604642 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887631684 b/streams/rocksdb/my-store/LOG.old.1761323887631684 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887659683 b/streams/rocksdb/my-store/LOG.old.1761323887659683 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887737835 b/streams/rocksdb/my-store/LOG.old.1761323887737835 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887764946 b/streams/rocksdb/my-store/LOG.old.1761323887764946 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887792621 b/streams/rocksdb/my-store/LOG.old.1761323887792621 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887818140 b/streams/rocksdb/my-store/LOG.old.1761323887818140 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887845853 b/streams/rocksdb/my-store/LOG.old.1761323887845853 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761323887872730 b/streams/rocksdb/my-store/LOG.old.1761323887872730 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761324670955504 b/streams/rocksdb/my-store/LOG.old.1761324670955504 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761324698515288 b/streams/rocksdb/my-store/LOG.old.1761324698515288 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761336584328259 b/streams/rocksdb/my-store/LOG.old.1761336584328259 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/LOG.old.1761336681465458 b/streams/rocksdb/my-store/LOG.old.1761336681465458 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/rocksdb/my-store/MANIFEST-000852 b/streams/rocksdb/my-store/MANIFEST-000852 new file mode 100644 index 0000000000000..5cf9e12d01b39 Binary files /dev/null and b/streams/rocksdb/my-store/MANIFEST-000852 differ diff --git a/streams/rocksdb/my-store/OPTIONS-000847 b/streams/rocksdb/my-store/OPTIONS-000847 new file mode 100644 index 0000000000000..ed127d1204b61 --- /dev/null +++ b/streams/rocksdb/my-store/OPTIONS-000847 @@ -0,0 +1,215 @@ +# This is a RocksDB option file. +# +# For detailed file format spec, please refer to the example file +# in examples/rocksdb_option_file_example.ini +# + +[Version] + rocksdb_version=10.1.3 + options_file_version=1.1 + +[DBOptions] + max_background_flushes=-1 + compaction_readahead_size=2097152 + strict_bytes_per_sync=false + wal_bytes_per_sync=0 + max_open_files=-1 + stats_history_buffer_size=1048576 + max_total_wal_size=0 + stats_persist_period_sec=600 + stats_dump_period_sec=600 + avoid_flush_during_shutdown=false + max_subcompactions=1 + bytes_per_sync=0 + delayed_write_rate=16777216 + max_background_compactions=-1 + max_background_jobs=14 + delete_obsolete_files_period_micros=21600000000 + writable_file_max_buffer_size=1048576 + follower_catchup_retry_wait_ms=100 + file_checksum_gen_factory=nullptr + allow_data_in_errors=false + max_bgerror_resume_count=2147483647 + best_efforts_recovery=false + wal_write_temperature=kUnknown + write_identity_file=true + write_dbid_to_manifest=true + atomic_flush=false + manual_wal_flush=false + two_write_queues=false + avoid_flush_during_recovery=false + dump_malloc_stats=false + info_log_level=ERROR_LEVEL + write_thread_slow_yield_usec=3 + unordered_write=false + follower_refresh_catchup_period_ms=10000 + log_readahead_size=0 + enable_pipelined_write=false + allow_ingest_behind=false + fail_if_options_file_error=true + persist_stats_to_disk=false + WAL_ttl_seconds=0 + bgerror_resume_retry_interval=1000000 + allow_concurrent_memtable_write=true + paranoid_checks=true + WAL_size_limit_MB=0 + metadata_write_temperature=kUnknown + lowest_used_cache_tier=kNonVolatileBlockTier + keep_log_file_num=1000 + table_cache_numshardbits=6 + max_file_opening_threads=16 + background_close_inactive_wals=false + wal_recovery_mode=kPointInTimeRecovery + follower_catchup_retry_count=10 + db_write_buffer_size=0 + allow_2pc=false + skip_checking_sst_file_sizes_on_db_open=false + skip_stats_update_on_db_open=false + recycle_log_file_num=0 + db_host_id=__hostname__ + track_and_verify_wals_in_manifest=false + use_fsync=false + wal_compression=kNoCompression + compaction_verify_record_count=true + error_if_exists=false + manifest_preallocation_size=4194304 + is_fd_close_on_exec=true + enable_write_thread_adaptive_yield=true + track_and_verify_wals=false + enable_thread_tracking=false + avoid_unnecessary_blocking_io=false + allow_fallocate=true + max_log_file_size=0 + advise_random_on_open=true + create_missing_column_families=false + max_write_batch_group_size_bytes=1048576 + use_adaptive_mutex=false + prefix_seek_opt_in_only=false + wal_filter=nullptr + create_if_missing=true + enforce_single_del_contracts=true + allow_mmap_writes=false + verify_sst_unique_id_in_manifest=true + log_file_time_to_roll=0 + use_direct_io_for_flush_and_compaction=false + flush_verify_memtable_count=true + max_manifest_file_size=1073741824 + write_thread_max_yield_usec=100 + use_direct_reads=false + allow_mmap_reads=false + + +[CFOptions "default"] + bottommost_file_compaction_delay=0 + memtable_protection_bytes_per_key=0 + bottommost_compression=kDisableCompressionOption + sample_for_compression=0 + blob_compression_type=kNoCompression + paranoid_memory_checks=false + blob_garbage_collection_age_cutoff=0.250000 + preclude_last_level_data_seconds=0 + compaction_options_universal={allow_trivial_move=false;stop_style=kCompactionStopStyleTotalSize;max_read_amp=-1;min_merge_width=2;compression_size_percent=-1;max_size_amplification_percent=200;incremental=false;max_merge_width=4294967295;size_ratio=1;} + target_file_size_base=67108864 + memtable_whole_key_filtering=false + blob_file_starting_level=0 + soft_pending_compaction_bytes_limit=68719476736 + max_write_buffer_number=3 + ttl=2592000 + compaction_options_fifo={file_temperature_age_thresholds=;allow_compaction=false;age_for_warm=0;max_table_files_size=1073741824;} + memtable_huge_page_size=0 + max_sequential_skip_in_iterations=8 + strict_max_successive_merges=false + max_successive_merges=0 + enable_blob_garbage_collection=false + arena_block_size=1048576 + bottommost_compression_opts={use_zstd_dict_trainer=true;enabled=false;zstd_max_train_bytes=0;parallel_threads=1;max_compressed_bytes_per_kb=896;checksum=false;max_dict_bytes=0;strategy=0;max_dict_buffer_bytes=0;level=32767;window_bits=-14;} + target_file_size_multiplier=1 + max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1 + prepopulate_blob_cache=kDisable + blob_compaction_readahead_size=0 + min_blob_size=0 + level0_stop_writes_trigger=36 + blob_garbage_collection_force_threshold=1.000000 + enable_blob_files=false + level0_slowdown_writes_trigger=20 + default_write_temperature=kUnknown + compression=kNoCompression + level0_file_num_compaction_trigger=4 + block_protection_bytes_per_key=0 + prefix_extractor=nullptr + max_bytes_for_level_multiplier=10.000000 + write_buffer_size=16777216 + uncache_aggressiveness=0 + disable_auto_compactions=false + max_compaction_bytes=1677721600 + inplace_update_num_locks=10000 + periodic_compaction_seconds=2592000 + experimental_mempurge_threshold=0.000000 + memtable_prefix_bloom_size_ratio=0.000000 + max_bytes_for_level_base=268435456 + paranoid_file_checks=false + blob_file_size=268435456 + preserve_internal_time_seconds=0 + memtable_max_range_deletions=0 + compression_opts={use_zstd_dict_trainer=true;enabled=false;zstd_max_train_bytes=0;parallel_threads=1;max_compressed_bytes_per_kb=896;checksum=false;max_dict_bytes=0;strategy=0;max_dict_buffer_bytes=0;level=32767;window_bits=-14;} + hard_pending_compaction_bytes_limit=274877906944 + last_level_temperature=kUnknown + table_factory=BlockBasedTable + report_bg_io_stats=false + persist_user_defined_timestamps=true + sst_partitioner_factory=nullptr + compaction_pri=kMinOverlappingRatio + compaction_style=kCompactionStyleUniversal + memtable_factory=SkipListFactory + comparator=leveldb.BytewiseComparator + bloom_locality=0 + compaction_filter_factory=nullptr + min_write_buffer_number_to_merge=1 + max_write_buffer_size_to_maintain=0 + max_write_buffer_number_to_maintain=0 + compaction_filter=nullptr + optimize_filters_for_hits=true + default_temperature=kUnknown + force_consistency_checks=true + merge_operator=nullptr + num_levels=7 + memtable_insert_with_hint_prefix_extractor=nullptr + level_compaction_dynamic_level_bytes=false + disallow_memtable_writes=false + inplace_update_support=false + +[TableOptions/BlockBasedTable "default"] + num_file_reads_for_auto_readahead=2 + metadata_cache_options={unpartitioned_pinning=kFallback;partition_pinning=kFallback;top_level_index_pinning=kFallback;} + read_amp_bytes_per_bit=0 + verify_compression=false + format_version=6 + detect_filter_construct_corruption=false + optimize_filters_for_memory=true + decouple_partitioned_filters=false + partition_filters=false + initial_auto_readahead_size=8192 + max_auto_readahead_size=262144 + enable_index_compression=true + checksum=kXXH3 + index_block_restart_interval=1 + pin_top_level_index_and_filter=true + block_align=false + block_size=4096 + index_type=kBinarySearch + filter_policy=bloomfilter:10:false + metadata_block_size=4096 + no_block_cache=false + whole_key_filtering=true + index_shortening=kShortenSeparators + block_size_deviation=10 + data_block_index_type=kDataBlockBinarySearch + use_delta_encoding=true + data_block_hash_table_util_ratio=0.750000 + cache_index_and_filter_blocks=false + prepopulate_block_cache=kDisable + block_restart_interval=16 + pin_l0_filter_and_index_blocks_in_cache=false + cache_index_and_filter_blocks_with_high_priority=true + flush_block_policy_factory=FlushBlockBySizePolicyFactory + diff --git a/streams/rocksdb/my-store/OPTIONS-000854 b/streams/rocksdb/my-store/OPTIONS-000854 new file mode 100644 index 0000000000000..ed127d1204b61 --- /dev/null +++ b/streams/rocksdb/my-store/OPTIONS-000854 @@ -0,0 +1,215 @@ +# This is a RocksDB option file. +# +# For detailed file format spec, please refer to the example file +# in examples/rocksdb_option_file_example.ini +# + +[Version] + rocksdb_version=10.1.3 + options_file_version=1.1 + +[DBOptions] + max_background_flushes=-1 + compaction_readahead_size=2097152 + strict_bytes_per_sync=false + wal_bytes_per_sync=0 + max_open_files=-1 + stats_history_buffer_size=1048576 + max_total_wal_size=0 + stats_persist_period_sec=600 + stats_dump_period_sec=600 + avoid_flush_during_shutdown=false + max_subcompactions=1 + bytes_per_sync=0 + delayed_write_rate=16777216 + max_background_compactions=-1 + max_background_jobs=14 + delete_obsolete_files_period_micros=21600000000 + writable_file_max_buffer_size=1048576 + follower_catchup_retry_wait_ms=100 + file_checksum_gen_factory=nullptr + allow_data_in_errors=false + max_bgerror_resume_count=2147483647 + best_efforts_recovery=false + wal_write_temperature=kUnknown + write_identity_file=true + write_dbid_to_manifest=true + atomic_flush=false + manual_wal_flush=false + two_write_queues=false + avoid_flush_during_recovery=false + dump_malloc_stats=false + info_log_level=ERROR_LEVEL + write_thread_slow_yield_usec=3 + unordered_write=false + follower_refresh_catchup_period_ms=10000 + log_readahead_size=0 + enable_pipelined_write=false + allow_ingest_behind=false + fail_if_options_file_error=true + persist_stats_to_disk=false + WAL_ttl_seconds=0 + bgerror_resume_retry_interval=1000000 + allow_concurrent_memtable_write=true + paranoid_checks=true + WAL_size_limit_MB=0 + metadata_write_temperature=kUnknown + lowest_used_cache_tier=kNonVolatileBlockTier + keep_log_file_num=1000 + table_cache_numshardbits=6 + max_file_opening_threads=16 + background_close_inactive_wals=false + wal_recovery_mode=kPointInTimeRecovery + follower_catchup_retry_count=10 + db_write_buffer_size=0 + allow_2pc=false + skip_checking_sst_file_sizes_on_db_open=false + skip_stats_update_on_db_open=false + recycle_log_file_num=0 + db_host_id=__hostname__ + track_and_verify_wals_in_manifest=false + use_fsync=false + wal_compression=kNoCompression + compaction_verify_record_count=true + error_if_exists=false + manifest_preallocation_size=4194304 + is_fd_close_on_exec=true + enable_write_thread_adaptive_yield=true + track_and_verify_wals=false + enable_thread_tracking=false + avoid_unnecessary_blocking_io=false + allow_fallocate=true + max_log_file_size=0 + advise_random_on_open=true + create_missing_column_families=false + max_write_batch_group_size_bytes=1048576 + use_adaptive_mutex=false + prefix_seek_opt_in_only=false + wal_filter=nullptr + create_if_missing=true + enforce_single_del_contracts=true + allow_mmap_writes=false + verify_sst_unique_id_in_manifest=true + log_file_time_to_roll=0 + use_direct_io_for_flush_and_compaction=false + flush_verify_memtable_count=true + max_manifest_file_size=1073741824 + write_thread_max_yield_usec=100 + use_direct_reads=false + allow_mmap_reads=false + + +[CFOptions "default"] + bottommost_file_compaction_delay=0 + memtable_protection_bytes_per_key=0 + bottommost_compression=kDisableCompressionOption + sample_for_compression=0 + blob_compression_type=kNoCompression + paranoid_memory_checks=false + blob_garbage_collection_age_cutoff=0.250000 + preclude_last_level_data_seconds=0 + compaction_options_universal={allow_trivial_move=false;stop_style=kCompactionStopStyleTotalSize;max_read_amp=-1;min_merge_width=2;compression_size_percent=-1;max_size_amplification_percent=200;incremental=false;max_merge_width=4294967295;size_ratio=1;} + target_file_size_base=67108864 + memtable_whole_key_filtering=false + blob_file_starting_level=0 + soft_pending_compaction_bytes_limit=68719476736 + max_write_buffer_number=3 + ttl=2592000 + compaction_options_fifo={file_temperature_age_thresholds=;allow_compaction=false;age_for_warm=0;max_table_files_size=1073741824;} + memtable_huge_page_size=0 + max_sequential_skip_in_iterations=8 + strict_max_successive_merges=false + max_successive_merges=0 + enable_blob_garbage_collection=false + arena_block_size=1048576 + bottommost_compression_opts={use_zstd_dict_trainer=true;enabled=false;zstd_max_train_bytes=0;parallel_threads=1;max_compressed_bytes_per_kb=896;checksum=false;max_dict_bytes=0;strategy=0;max_dict_buffer_bytes=0;level=32767;window_bits=-14;} + target_file_size_multiplier=1 + max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1 + prepopulate_blob_cache=kDisable + blob_compaction_readahead_size=0 + min_blob_size=0 + level0_stop_writes_trigger=36 + blob_garbage_collection_force_threshold=1.000000 + enable_blob_files=false + level0_slowdown_writes_trigger=20 + default_write_temperature=kUnknown + compression=kNoCompression + level0_file_num_compaction_trigger=4 + block_protection_bytes_per_key=0 + prefix_extractor=nullptr + max_bytes_for_level_multiplier=10.000000 + write_buffer_size=16777216 + uncache_aggressiveness=0 + disable_auto_compactions=false + max_compaction_bytes=1677721600 + inplace_update_num_locks=10000 + periodic_compaction_seconds=2592000 + experimental_mempurge_threshold=0.000000 + memtable_prefix_bloom_size_ratio=0.000000 + max_bytes_for_level_base=268435456 + paranoid_file_checks=false + blob_file_size=268435456 + preserve_internal_time_seconds=0 + memtable_max_range_deletions=0 + compression_opts={use_zstd_dict_trainer=true;enabled=false;zstd_max_train_bytes=0;parallel_threads=1;max_compressed_bytes_per_kb=896;checksum=false;max_dict_bytes=0;strategy=0;max_dict_buffer_bytes=0;level=32767;window_bits=-14;} + hard_pending_compaction_bytes_limit=274877906944 + last_level_temperature=kUnknown + table_factory=BlockBasedTable + report_bg_io_stats=false + persist_user_defined_timestamps=true + sst_partitioner_factory=nullptr + compaction_pri=kMinOverlappingRatio + compaction_style=kCompactionStyleUniversal + memtable_factory=SkipListFactory + comparator=leveldb.BytewiseComparator + bloom_locality=0 + compaction_filter_factory=nullptr + min_write_buffer_number_to_merge=1 + max_write_buffer_size_to_maintain=0 + max_write_buffer_number_to_maintain=0 + compaction_filter=nullptr + optimize_filters_for_hits=true + default_temperature=kUnknown + force_consistency_checks=true + merge_operator=nullptr + num_levels=7 + memtable_insert_with_hint_prefix_extractor=nullptr + level_compaction_dynamic_level_bytes=false + disallow_memtable_writes=false + inplace_update_support=false + +[TableOptions/BlockBasedTable "default"] + num_file_reads_for_auto_readahead=2 + metadata_cache_options={unpartitioned_pinning=kFallback;partition_pinning=kFallback;top_level_index_pinning=kFallback;} + read_amp_bytes_per_bit=0 + verify_compression=false + format_version=6 + detect_filter_construct_corruption=false + optimize_filters_for_memory=true + decouple_partitioned_filters=false + partition_filters=false + initial_auto_readahead_size=8192 + max_auto_readahead_size=262144 + enable_index_compression=true + checksum=kXXH3 + index_block_restart_interval=1 + pin_top_level_index_and_filter=true + block_align=false + block_size=4096 + index_type=kBinarySearch + filter_policy=bloomfilter:10:false + metadata_block_size=4096 + no_block_cache=false + whole_key_filtering=true + index_shortening=kShortenSeparators + block_size_deviation=10 + data_block_index_type=kDataBlockBinarySearch + use_delta_encoding=true + data_block_hash_table_util_ratio=0.750000 + cache_index_and_filter_blocks=false + prepopulate_block_cache=kDisable + block_restart_interval=16 + pin_l0_filter_and_index_blocks_in_cache=false + cache_index_and_filter_blocks_with_high_priority=true + flush_block_policy_factory=FlushBlockBySizePolicyFactory + diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 69012f0c3135a..c62e24aae5a54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1376,7 +1376,7 @@ private static HostInfo parseHostInfo(final String endPoint) { public synchronized void start() throws IllegalStateException, StreamsException { if (setState(State.REBALANCING)) { log.debug("Initializing STANDBY tasks for existing local state"); - stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext); + stateDirectory.initializeStartupTasks(topologyMetadata, logContext); log.debug("Starting Streams client"); diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyContext.java b/streams/src/main/java/org/apache/kafka/streams/TopologyContext.java new file mode 100644 index 0000000000000..2e0757dcb4cfe --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyContext.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.util.Map; + +public interface TopologyContext { + + String topologyName(); + + Map appConfigs(); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 38a3e23e28a1e..079957eb2fef0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -71,6 +71,8 @@ public interface StateStore { */ void init(final StateStoreContext stateStoreContext, final StateStore root); + default void preInit(final StateStoreContext stateStoreContext) {}; + /** * Flush any cached data */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyContext.java new file mode 100644 index 0000000000000..f9bba8a04d7d0 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyContext; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class InternalTopologyContext implements TopologyContext { + + private final String topologyName; + private final File stateDir; + private final Map appConfigs = new HashMap<>(); + + public InternalTopologyContext(final String topologyName, final StreamsConfig appConfig, final File stateDir) { + this.topologyName = topologyName; + this.stateDir = stateDir; + appConfigs.putAll(appConfig.originals()); + appConfigs.putAll(appConfig.values()); + } + + + @Override + public String topologyName() { + return topologyName; + } + + @Override + public Map appConfigs() { + return Collections.unmodifiableMap(appConfigs); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3506845d288af..5aebffd97fe71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static java.lang.String.format; @@ -176,6 +177,7 @@ public String toString() { // must be maintained in topological order private final FixedOrderMap stores = new FixedOrderMap<>(); + private final Map startupStores = new HashMap<>(); private final FixedOrderMap globalStores = new FixedOrderMap<>(); private final File baseDir; @@ -185,6 +187,7 @@ public String toString() { private TaskType taskType; private Logger log; private Task.State taskState; + private final AtomicBoolean startupState; public static String storeChangelogTopic(final String prefix, final String storeName, final String namedTopology) { if (namedTopology == null) { @@ -205,7 +208,8 @@ public ProcessorStateManager(final TaskId taskId, final ChangelogRegister changelogReader, final Map storeToChangelogTopic, final Collection sourcePartitions, - final boolean stateUpdaterEnabled) throws ProcessorStateException { + final boolean stateUpdaterEnabled, + final boolean startupState) throws ProcessorStateException { this.storeToChangelogTopic = storeToChangelogTopic; this.log = logContext.logger(ProcessorStateManager.class); this.logPrefix = logContext.logPrefix(); @@ -220,6 +224,22 @@ public ProcessorStateManager(final TaskId taskId, this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId)); log.debug("Created state store manager for task {}", taskId); + this.startupState = new AtomicBoolean(startupState); + } + + /** + * @throws ProcessorStateException if the task directory does not exist and could not be created + */ + public ProcessorStateManager(final TaskId taskId, + final TaskType taskType, + final boolean eosEnabled, + final LogContext logContext, + final StateDirectory stateDirectory, + final ChangelogRegister changelogReader, + final Map storeToChangelogTopic, + final Collection sourcePartitions, + final boolean stateUpdaterEnabled) throws ProcessorStateException { + this(taskId, taskType, eosEnabled, logContext, stateDirectory, changelogReader, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled, false); } /** @@ -234,7 +254,7 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId, final Map storeToChangelogTopic, final Set sourcePartitions, final boolean stateUpdaterEnabled) { - return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled); + return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled, true); } /** @@ -255,6 +275,10 @@ void assignToStreamThread(final LogContext logContext, this.sourcePartitions.addAll(sourcePartitions); } + void reuseState() { + startupState.set(false); + } + void registerStateStores(final List allStores, final InternalProcessorContext processorContext) { processorContext.uninitialize(); for (final StateStore store : allStores) { @@ -263,7 +287,13 @@ void registerStateStores(final List allStores, final InternalProcess maybeRegisterStoreWithChangelogReader(store.name()); } } else { - store.init(processorContext, store); + if (startupState.get()) { + store.preInit(processorContext); + startupStores.put(store.name(), store); + } else { + store.init(processorContext, store); + startupStores.remove(store.name()); + } } log.trace("Registered state store {}", store.name()); } @@ -649,9 +679,19 @@ else if (exception instanceof StreamsException) } } + stores.clear(); } + if (!startupStores.isEmpty()) { + for (final Map.Entry entry : startupStores.entrySet()) { + final StateStore store = entry.getValue(); + store.close(); + } + startupStores.clear(); + } + + if (firstException != null) { throw firstException; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index eb8bcafea695a..bf518e43468c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -165,6 +165,35 @@ StandbyTask createStandbyTask(final TaskId taskId, return task; } + StandbyTask createStandbyTaskFromStartupLocalStore(final TaskId taskId, + final Set inputPartitions, + final ProcessorTopology topology, + final ProcessorStateManager stateManager) { + stateManager.reuseState(); + final InternalProcessorContext context = new ProcessorContextImpl( + taskId, + applicationConfig, + stateManager, + streamsMetrics, + dummyCache + ); + final StandbyTask task = new StandbyTask( + taskId, + inputPartitions, + topology, + topologyMetadata.taskConfig(taskId), + streamsMetrics, + stateManager, + stateDirectory, + dummyCache, + context + ); + + log.trace("Created standby task {} with assigned partitions {}", taskId, inputPartitions); + createTaskSensor.record(); + return task; + } + private LogContext getLogContext(final TaskId taskId) { final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); final String logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", taskId); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 8ea2d3ae65a51..a021f76f3771a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -17,16 +17,24 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.internals.StreamsConfigUtils; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.ThreadCache; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -46,6 +54,8 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -112,7 +122,7 @@ public StateDirectoryProcessFile() { private FileLock stateDirLock; private final StreamsConfig config; - private final ConcurrentMap tasksForLocalState = new ConcurrentHashMap<>(); + private final ConcurrentMap tasksForLocalState = new ConcurrentHashMap<>(); /** * Ensures that the state base directory as well as the application's sub-directory are created. @@ -207,11 +217,9 @@ private boolean lockStateDirectory() { } public void initializeStartupTasks(final TopologyMetadata topologyMetadata, - final StreamsMetricsImpl streamsMetrics, final LogContext logContext) { final List nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { - final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); @@ -230,44 +238,23 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, .map(t -> new TopicPartition(t, id.partition())) .collect(Collectors.toSet()); final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager( - id, - eosEnabled, - logContext, - this, - subTopology.storeToChangelogTopic(), - inputPartitions, - stateUpdaterEnabled + id, + eosEnabled, + logContext, + this, + subTopology.storeToChangelogTopic(), + inputPartitions, + stateUpdaterEnabled ); - - final InternalProcessorContext context = new ProcessorContextImpl( - id, - config, - stateManager, - streamsMetrics, - dummyCache - ); - - final Task task = new StandbyTask( - id, - inputPartitions, - subTopology, - topologyMetadata.taskConfig(id), - streamsMetrics, - stateManager, - this, - dummyCache, - context - ); - - try { - task.initializeIfNeeded(); - - tasksForLocalState.put(id, task); - } catch (final TaskCorruptedException e) { - // Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it - task.suspend(); - task.closeDirty(); + final StartupContext initContext = new StartupContext(id, config, stateManager); + // TODO: we need to pass a proper logPrefix + StateManagerUtil.registerStateStores(log, "", subTopology, stateManager, this, initContext); + for (final StateStore stateStore : subTopology.stateStores()) { + if (!stateStore.isOpen()) { + throw new IllegalStateException("StateStore [" + stateStore.name() + "] is not open"); + } } + tasksForLocalState.put(id, new StartupState(id, subTopology, stateManager)); } } } @@ -277,8 +264,8 @@ public boolean hasStartupTasks() { return !tasksForLocalState.isEmpty(); } - public Task removeStartupTask(final TaskId taskId) { - final Task task = tasksForLocalState.remove(taskId); + public StartupState removeStartupTask(final TaskId taskId) { + final StartupState task = tasksForLocalState.remove(taskId); if (task != null) { lockedTasksToOwner.replace(taskId, Thread.currentThread()); } @@ -289,11 +276,11 @@ public void closeStartupTasks() { closeStartupTasks(t -> true); } - private void closeStartupTasks(final Predicate predicate) { + private void closeStartupTasks(final Predicate predicate) { if (!tasksForLocalState.isEmpty()) { // "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close - final Set drainedTasks = new HashSet<>(tasksForLocalState.size()); - for (final Map.Entry entry : tasksForLocalState.entrySet()) { + final Set drainedTasks = new HashSet<>(tasksForLocalState.size()); + for (final Map.Entry entry : tasksForLocalState.entrySet()) { if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != null) { // only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState // to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads @@ -302,9 +289,8 @@ private void closeStartupTasks(final Predicate predicate) { } // now that we have exclusive ownership of the drained tasks, close them - for (final Task task : drainedTasks) { - task.suspend(); - task.closeClean(); + for (final StartupState localState : drainedTasks) { + localState.close(); } } } @@ -633,7 +619,7 @@ private IOException maybeCleanEmptyNamedTopologyDirs(final boolean logExceptionA ); if (namedTopologyDirs != null) { for (final File namedTopologyDir : namedTopologyDirs) { - closeStartupTasks(task -> task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName()))); + closeStartupTasks(localState -> localState.getTaskId().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName()))); final File[] contents = namedTopologyDir.listFiles(); if (contents != null && contents.length == 0) { try { @@ -671,7 +657,7 @@ public void clearLocalStateForNamedTopology(final String topologyName) { log.debug("Tried to clear out the local state for NamedTopology {} but none was found", topologyName); } try { - closeStartupTasks(task -> task.id().topologyName().equals(topologyName)); + closeStartupTasks(localState -> localState.getTaskId().topologyName().equals(topologyName)); Utils.delete(namedTopologyDir); } catch (final IOException e) { log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e); @@ -815,4 +801,129 @@ public int hashCode() { return Objects.hash(file, namedTopology); } } + + public class StartupState { + private final ProcessorTopology topology; + private final ProcessorStateManager stateMngr; + private final TaskId taskId; + + public StartupState(final TaskId taskId, final ProcessorTopology topology, final ProcessorStateManager stateMngr) { + this.topology = topology; + this.stateMngr = stateMngr; + this.taskId = taskId; + } + + public ProcessorStateManager getStateMngr() { + return stateMngr; + } + + public ProcessorTopology getTopology() { + return topology; + } + + public TaskId getTaskId() { + return taskId; + } + + public void close() { + if (lock(taskId)) { + try { + stateMngr.close(); + } finally { + unlock(taskId); + } + } + } + } + + private static class StartupContext extends AbstractProcessorContext { + + private final StateManager stateManager; + + public StartupContext(final TaskId taskId, final StreamsConfig config, final StateManager stateManager) { + super(taskId, config, null, null); + this.stateManager = stateManager; + } + + @Override + protected StateManager stateManager() { + return stateManager; + } + + @Override + public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void transitionToStandby(final ThreadCache newCache) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp, final Position position) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final K key, final V value) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final K key, final V value, final To to) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void commit() { + throw new IllegalStateException("Should not be called"); + } + + @Override + public long currentStreamTimeMs() { + throw new IllegalStateException("Should not be called"); + } + + @Override + public StreamsMetricsImpl metrics() { + throw new IllegalStateException("Should not be called"); + } + + @Override + public S getStateStore(final String name) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) { + throw new IllegalStateException("Should not be called"); + } + + + @Override + public void forward(final FixedKeyRecord record) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final FixedKeyRecord record, final String childName) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final Record record) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final Record record, final String childName) { + throw new IllegalStateException("Should not be called"); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 67d009b037f78..5ae69bc2d5376 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -333,25 +333,19 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } - private Map> assignStartupTasks(final Map> tasksToAssign, - final String threadLogPrefix, - final TopologyMetadata topologyMetadata, - final ChangelogRegister changelogReader) { + private Map> assignStartupTasks(final Map> tasksToAssign) { if (stateDirectory.hasStartupTasks()) { final Map> assignedTasks = new HashMap<>(tasksToAssign.size()); for (final Map.Entry> entry : tasksToAssign.entrySet()) { final TaskId taskId = entry.getKey(); - final Task task = stateDirectory.removeStartupTask(taskId); - if (task != null) { + final StateDirectory.StartupState localState = stateDirectory.removeStartupTask(taskId); + if (localState != null) { // replace our dummy values with the real ones, now we know our thread and assignment final Set inputPartitions = entry.getValue(); - task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions); - updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions); - + final Task task = standbyTaskCreator.createStandbyTaskFromStartupLocalStore(taskId, inputPartitions, localState.getTopology(), localState.getStateMngr()); assignedTasks.put(task, inputPartitions); } } - return assignedTasks; } else { return Collections.emptyMap(); @@ -487,8 +481,8 @@ private void handleTasksWithoutStateUpdater(final Map> standbyTasksToCreate, final Map> tasksToRecycle, final Set tasksToCloseClean) { - final Map> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, changelogReader); - final Map> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, changelogReader); + final Map> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate); + final Map> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate); // recycle the startup standbys to active tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet()); @@ -571,8 +565,8 @@ private void handleTasksPendingInitialization() { private void handleStartupTaskReuse(final Map> activeTasksToCreate, final Map> standbyTasksToCreate, final Map failedTasks) { - final Map> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, changelogReader); - final Map> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, changelogReader); + final Map> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate); + final Map> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate); // recycle the startup standbys to active, and remove them from the set of actives that need to be created if (!startupStandbyTasksToRecycle.isEmpty()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index ede618237cf35..55492fe301488 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -80,6 +80,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; @@ -132,6 +133,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS protected StateStoreContext context; protected Position position; private OffsetCheckpoint positionCheckpoint; + private final AtomicBoolean initialized = new AtomicBoolean(false); public RocksDBStore(final String name, final String metricsScope) { @@ -157,9 +159,14 @@ public RocksDBStore(final String name, @Override public void init(final StateStoreContext stateStoreContext, final StateStore root) { + initialized.set(true); // open the DB dir metricsRecorder.init(metricsImpl(stateStoreContext), stateStoreContext.taskId()); - openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir()); + if (!open) { + preInit(stateStoreContext); + } + + addValueProvidersToMetricsRecorder(); final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position"); this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile); @@ -179,6 +186,11 @@ public void init(final StateStoreContext stateStoreContext, false); } + @Override + public void preInit(final StateStoreContext stateStoreContext) { + openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir()); + } + @SuppressWarnings("unchecked") void openDB(final Map configs, final File stateDir) { // initialize the default rocksdb options @@ -242,7 +254,6 @@ void openDB(final Map configs, final File stateDir) { dbAccessor = new DirectDBAccessor(db, fOptions, wOptions); open = true; - addValueProvidersToMetricsRecorder(); } private void setupStatistics(final Map configs, final DBOptions dbOptions) { @@ -677,8 +688,9 @@ public synchronized void close() { configSetter.close(name, userSpecifiedOptions); configSetter = null; } - - metricsRecorder.removeValueProviders(name); + if (initialized.get()) { + metricsRecorder.removeValueProviders(name); + } // Important: do not rearrange the order in which the below objects are closed! // Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index adbb7568c87c5..a2bcc50d1ea39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -58,6 +58,11 @@ public WrappedStateStore(final S wrapped) { this.wrapped = wrapped; } + @Override + public void preInit(final StateStoreContext stateStoreContext) { + wrapped.preInit(stateStoreContext); + } + @Override public void init(final StateStoreContext stateStoreContext, final StateStore root) { wrapped.init(stateStoreContext, root); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 14d4cf8c21fb3..2068a5ff9a725 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -416,9 +416,9 @@ public void shouldInitializeTasksForLocalStateOnStart() { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { assertEquals(1, constructed.constructed().size()); final StateDirectory stateDirectory = constructed.constructed().get(0); - verify(stateDirectory, times(0)).initializeStartupTasks(any(), any(), any()); + verify(stateDirectory, times(0)).initializeStartupTasks(any(), any()); streams.start(); - verify(stateDirectory, times(1)).initializeStartupTasks(any(), any(), any()); + verify(stateDirectory, times(1)).initializeStartupTasks(any(), any()); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 5447b6f39761b..2e25ad3ec24f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -28,7 +27,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.TestUtils; @@ -877,15 +875,15 @@ public void shouldInitializeStandbyTasksForLocalState() { public void shouldNotAssignStartupTasksWeDontHave() { final TaskId taskId = new TaskId(0, 0); initializeStartupTasks(taskId, false); - final Task task = directory.removeStartupTask(taskId); + final StateDirectory.StartupState task = directory.removeStartupTask(taskId); assertNull(task); } private class FakeStreamThread extends Thread { private final TaskId taskId; - private final AtomicReference result; + private final AtomicReference result; - private FakeStreamThread(final TaskId taskId, final AtomicReference result) { + private FakeStreamThread(final TaskId taskId, final AtomicReference result) { this.taskId = taskId; this.result = result; } @@ -906,14 +904,13 @@ public void shouldAssignStartupTaskToStreamThread() throws InterruptedException assertThat(directory.lockOwner(taskId), is(Thread.currentThread())); // spawn off a "fake" StreamThread, so we can verify the lock was updated to the correct thread - final AtomicReference result = new AtomicReference<>(); + final AtomicReference result = new AtomicReference<>(); final Thread streamThread = new FakeStreamThread(taskId, result); streamThread.start(); streamThread.join(); - final Task task = result.get(); + final StateDirectory.StartupState localState = result.get(); - assertNotNull(task); - assertThat(task, instanceOf(StandbyTask.class)); + assertNotNull(localState); // verify the owner of the task directory lock has been shifted over to our assigned StreamThread assertThat(directory.lockOwner(taskId), is(instanceOf(FakeStreamThread.class))); @@ -987,7 +984,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology); Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig()); - directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test")); + directory.initializeStartupTasks(metadata, new LogContext("test")); return store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 63cbc441f8a82..458409e6d6598 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -4669,14 +4669,16 @@ public void shouldListNotPausedTasks() { public void shouldRecycleStartupTasksFromStateDirectoryAsActive() { final Tasks taskRegistry = new Tasks(new LogContext()); final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry); + final StateDirectory.StartupState startupState = mock(StateDirectory.StartupState.class); final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build(); when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any())) .thenReturn(activeTask); + when(standbyTaskCreator.createStandbyTaskFromStartupLocalStore(eq(taskId00), eq(taskId00Partitions), any(), any())).thenReturn(startupTask); when(stateDirectory.hasStartupTasks()).thenReturn(true, false); - when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null); + when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupState, (StateDirectory.StartupState) null); taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap()); @@ -4707,10 +4709,13 @@ public void shouldRecycleStartupTasksFromStateDirectoryAsActive() { public void shouldUseStartupTasksFromStateDirectoryAsStandby() { final Tasks taskRegistry = new Tasks(new LogContext()); final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry); + final StateDirectory.StartupState startupState = mock(StateDirectory.StartupState.class); final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); + when(standbyTaskCreator.createStandbyTaskFromStartupLocalStore(eq(taskId00), eq(taskId00Partitions), any(), any())).thenReturn(startupTask); + when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false); - when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null); + when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupState, (StateDirectory.StartupState) null); assertFalse(taskRegistry.hasPendingTasksToInit()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 70224c8013c97..b647bd70dd333 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -208,6 +208,7 @@ public void shouldAddValueProvidersWithoutStatisticsToInjectedMetricsRecorderWhe context = getProcessorContext(RecordingLevel.INFO); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull()); } @@ -218,6 +219,7 @@ public void shouldAddValueProvidersWithStatisticsToInjectedMetricsRecorderWhenRe context = getProcessorContext(RecordingLevel.DEBUG); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), notNull()); } @@ -227,7 +229,9 @@ public void shouldRemoveValueProvidersFromInjectedMetricsRecorderOnClose() { rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); try { context = getProcessorContext(RecordingLevel.DEBUG); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); } finally { rocksDBStore.close(); } @@ -259,6 +263,7 @@ public void shouldNotSetStatisticsInValueProvidersWhenUserProvidesStatistics() { context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull()); } @@ -287,6 +292,7 @@ public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() th context = getProcessorContext(RecordingLevel.DEBUG); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), eq(getStatistics(rocksDBStore))); } @@ -325,9 +331,10 @@ public void shouldThrowWhenUserProvidesNewBlockBasedTableFormatConfig() { RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig.class ); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); assertThrows( ProcessorStateException.class, - () -> rocksDBStore.openDB(context.appConfigs(), context.stateDir()), + () -> rocksDBStore.init(context, rocksDBStore), "The used block-based table format configuration does not expose the " + "block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure " + "the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " + @@ -356,6 +363,7 @@ public void shouldNotSetCacheInValueProvidersWhenUserProvidesPlainTableFormatCon ); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), isNull(), notNull()); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index a2a6ac43dfd76..5106ea57454cd 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -56,6 +56,11 @@ public String name() { return name; } + @Override + public void preInit(StateStoreContext stateStoreContext) { + closed = false; + } + @Override public void init(final StateStoreContext stateStoreContext, final StateStore root) {