Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ public void shouldPassMetrics(final String topologyType, final String groupProto
streamsApplicationProperties = props(groupProtocol);
final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology();

shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT);
shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT);
}

private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception {
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);

Expand All @@ -292,8 +297,8 @@ public void shouldPassMetrics(final String topologyType, final String groupProto



final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList();
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList();
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList();


assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ private static HostInfo parseHostInfo(final String endPoint) {
public synchronized void start() throws IllegalStateException, StreamsException {
if (setState(State.REBALANCING)) {
log.debug("Initializing STANDBY tasks for existing local state");
stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext);
stateDirectory.initializeStartupTasks(topologyMetadata, logContext);

log.debug("Starting Streams client");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public interface StateStore {
*/
void init(final StateStoreContext stateStoreContext, final StateStore root);

default void preInit(final StateStoreContext stateStoreContext) {};

/**
* Flush any cached data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.lang.String.format;
Expand Down Expand Up @@ -176,6 +177,7 @@ public String toString() {

// must be maintained in topological order
private final FixedOrderMap<String, StateStoreMetadata> stores = new FixedOrderMap<>();
private final Map<String, StateStore> startupStores = new HashMap<>();
private final FixedOrderMap<String, StateStore> globalStores = new FixedOrderMap<>();

private final File baseDir;
Expand All @@ -185,6 +187,7 @@ public String toString() {
private TaskType taskType;
private Logger log;
private Task.State taskState;
private final AtomicBoolean startupState;

public static String storeChangelogTopic(final String prefix, final String storeName, final String namedTopology) {
if (namedTopology == null) {
Expand All @@ -205,7 +208,8 @@ public ProcessorStateManager(final TaskId taskId,
final ChangelogRegister changelogReader,
final Map<String, String> storeToChangelogTopic,
final Collection<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) throws ProcessorStateException {
final boolean stateUpdaterEnabled,
final boolean startupState) throws ProcessorStateException {
this.storeToChangelogTopic = storeToChangelogTopic;
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
Expand All @@ -220,6 +224,22 @@ public ProcessorStateManager(final TaskId taskId,
this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));

log.debug("Created state store manager for task {}", taskId);
this.startupState = new AtomicBoolean(startupState);
}

/**
* @throws ProcessorStateException if the task directory does not exist and could not be created
*/
public ProcessorStateManager(final TaskId taskId,
final TaskType taskType,
final boolean eosEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final ChangelogRegister changelogReader,
final Map<String, String> storeToChangelogTopic,
final Collection<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) throws ProcessorStateException {
this(taskId, taskType, eosEnabled, logContext, stateDirectory, changelogReader, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled, false);
}

/**
Expand All @@ -234,7 +254,7 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
final Map<String, String> storeToChangelogTopic,
final Set<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) {
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled);
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled, true);
}

/**
Expand All @@ -255,6 +275,10 @@ void assignToStreamThread(final LogContext logContext,
this.sourcePartitions.addAll(sourcePartitions);
}

void reuseState() {
startupState.set(false);
}

void registerStateStores(final List<StateStore> allStores, final InternalProcessorContext<?, ?> processorContext) {
processorContext.uninitialize();
for (final StateStore store : allStores) {
Expand All @@ -263,7 +287,13 @@ void registerStateStores(final List<StateStore> allStores, final InternalProcess
maybeRegisterStoreWithChangelogReader(store.name());
}
} else {
store.init(processorContext, store);
if (startupState.get()) {
store.preInit(processorContext);
startupStores.put(store.name(), store);
} else {
store.init(processorContext, store);
startupStores.remove(store.name());
}
}
log.trace("Registered state store {}", store.name());
}
Expand Down Expand Up @@ -649,9 +679,19 @@ else if (exception instanceof StreamsException)
}
}


stores.clear();
}

if (!startupStores.isEmpty()) {
for (final Map.Entry<String, StateStore> entry : startupStores.entrySet()) {
final StateStore store = entry.getValue();
store.close();
}
startupStores.clear();
}


if (firstException != null) {
throw firstException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,35 @@ StandbyTask createStandbyTask(final TaskId taskId,
return task;
}

StandbyTask createStandbyTaskFromStartupLocalStore(final TaskId taskId,
final Set<TopicPartition> inputPartitions,
final ProcessorTopology topology,
final ProcessorStateManager stateManager) {
stateManager.reuseState();
final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
taskId,
applicationConfig,
stateManager,
streamsMetrics,
dummyCache
);
final StandbyTask task = new StandbyTask(
taskId,
inputPartitions,
topology,
topologyMetadata.taskConfig(taskId),
streamsMetrics,
stateManager,
stateDirectory,
dummyCache,
context
);

log.trace("Created standby task {} with assigned partitions {}", taskId, inputPartitions);
createTaskSensor.record();
return task;
}

private LogContext getLogContext(final TaskId taskId) {
final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
final String logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", taskId);
Expand Down
Loading