Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ public void cancel(String key) {
if (prevTask != null) prevTask.cancel();
}

@Override
public boolean isScheduled(String key) {
return tasks.containsKey(key);
}

public void cancelAll() {
Iterator<Map.Entry<String, TimerTask>> iterator = tasks.entrySet().iterator();
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,12 @@ interface TimeoutOperation<T, U> {
* @param key The key.
*/
void cancel(String key);

/**
* Check if an operation with the given key is scheduled.
*
* @param key The key.
* @return True if an operation with the key is scheduled, false otherwise.
*/
boolean isScheduled(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ public void cancel(String key) {
}
}

/**
* Checks if a timeout with the given key is scheduled.
*/
@Override
public boolean isScheduled(String key) {
return timeoutMap.containsKey(key);
}

/**
* @return True if a timeout with the key exists; false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public final class GroupConfig extends AbstractConfig {

public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = "streams.num.standby.replicas";

public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG = "streams.initial.rebalance.delay.ms";

public final int consumerSessionTimeoutMs;

public final int consumerHeartbeatIntervalMs;
Expand All @@ -93,6 +95,8 @@ public final class GroupConfig extends AbstractConfig {

public final int streamsNumStandbyReplicas;

public final int streamsInitialRebalanceDelayMs;

public final String shareIsolationLevel;

private static final ConfigDef CONFIG = new ConfigDef()
Expand Down Expand Up @@ -155,7 +159,13 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT,
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
.define(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT,
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);

public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false);
Expand All @@ -168,6 +178,7 @@ public GroupConfig(Map<?, ?> props) {
this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsInitialRebalanceDelayMs = getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
}

Expand Down Expand Up @@ -379,6 +390,13 @@ public int streamsNumStandbyReplicas() {
return streamsNumStandbyReplicas;
}

/**
* The initial rebalance delay for streams groups.
*/
public int streamsInitialRebalanceDelayMs() {
return streamsInitialRebalanceDelayMs;
}

/**
* The share group isolation level.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ public class GroupCoordinatorConfig {
public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2;
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;

public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.streams.initial.rebalance.delay.ms";
public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Are we good with a default of 3 seconds for delaying the initial rebalance?

public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more streams clients to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances.";

public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG = "group.share.initialize.retry.interval.ms";
// Because persister retries with exp backoff 5 times and upper cap of 30 secs.
public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT = 30_000;
Expand Down Expand Up @@ -352,7 +356,8 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
.define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC)
.define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);


/**
Expand Down Expand Up @@ -405,6 +410,7 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMaxSize;
private final int streamsGroupNumStandbyReplicas;
private final int streamsGroupMaxStandbyReplicas;
private final int streamsGroupInitialRebalanceDelayMs;

@SuppressWarnings("this-escape")
public GroupCoordinatorConfig(AbstractConfig config) {
Expand Down Expand Up @@ -457,6 +463,7 @@ public GroupCoordinatorConfig(AbstractConfig config) {
this.streamsGroupMaxSize = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG);
this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsGroupMaxStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
this.streamsGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);

// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
Expand Down Expand Up @@ -961,4 +968,11 @@ public int streamsGroupNumStandbyReplicas() {
public int streamsGroupMaxNumStandbyReplicas() {
return streamsGroupMaxStandbyReplicas;
}

/**
* The initial rebalance delay for streams groups.
*/
public int streamsGroupInitialRebalanceDelayMs() {
return streamsGroupInitialRebalanceDelayMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream

// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
boolean isInitialRebalance = group.isEmpty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group being empty does not mean initial rebalance right? It could be that the group became empty again?

if (bumpGroupEpoch) {
groupEpoch += 1;
if (isInitialRebalance) {
groupEpoch = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the group becomes empty at epoch 9, we are going back in time here to epoch 2, right?

} else {
groupEpoch += 1;
}
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {} and validated topic epoch {}.", groupId, memberId, groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}

// Schedule initial rebalance delay for new streams groups to coalesce joins.
int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
if (isInitialRebalance & initialDelayMs > 0) {
timer.scheduleIfAbsent(
streamsInitialRebalanceKey(groupId),
initialDelayMs,
TimeUnit.MILLISECONDS,
false,
() -> fireStreamsInitialRebalance(groupId)
);
}

// 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
// replaces an existing static member.
// The delta between the existing and the new target assignment is persisted to the partition.
int targetAssignmentEpoch;
TasksTuple targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateStreamsTargetAssignment(
group,
groupEpoch,
updatedMember,
updatedConfiguredTopology,
metadataImage,
records,
currentAssignmentConfigs
);
targetAssignmentEpoch = groupEpoch;
boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId));
if (initialDelayActive && group.assignmentEpoch() == 0) {
// During initial rebalance delay, return empty assignment to first joining members.
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = TasksTuple.EMPTY;
} else {
targetAssignment = updateStreamsTargetAssignment(
group,
groupEpoch,
updatedMember,
updatedConfiguredTopology,
metadataImage,
records,
currentAssignmentConfigs
);
targetAssignmentEpoch = groupEpoch;
}
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId());
Expand Down Expand Up @@ -8570,6 +8594,10 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRec
// Add tombstones for the previous streams group. The tombstones won't actually be
// replayed because its coordinator result has a non-null appendFuture.
createGroupTombstoneRecords(group, records);
// Cancel any pending initial rebalance timer.
if (timer.isScheduled(streamsInitialRebalanceKey(groupId))) {
timer.cancel(streamsInitialRebalanceKey(groupId));
}
removeGroup(groupId);
return true;
}
Expand Down Expand Up @@ -8659,6 +8687,15 @@ private int streamsGroupHeartbeatIntervalMs(String groupId) {
.orElse(config.streamsGroupHeartbeatIntervalMs());
}

/**
* Get the initial rebalance delay of the provided streams group.
*/
private int streamsGroupInitialRebalanceDelayMs(String groupId) {
Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId);
return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs)
.orElse(config.streamsGroupInitialRebalanceDelayMs());
}

/**
* Get the assignor of the provided streams group.
*/
Expand Down Expand Up @@ -8716,6 +8753,31 @@ static String classicGroupSyncKey(String groupId) {
return "sync-" + groupId;
}

/**
* Callback when the initial rebalance delay timer expires.
* This is a no-op as the actual assignment computation happens on the next heartbeat.
*
* @param groupId The group id.
*
* @return An empty result.
*/
private CoordinatorResult<Void, CoordinatorRecord> fireStreamsInitialRebalance(String groupId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not doing anything, I think we can inline this.

return EMPTY_RESULT;
}

/**
* Generate a streams group initial rebalance key for the timer.
*
* Package private for testing.
*
* @param groupId The group id.
*
* @return the initial rebalance key.
*/
static String streamsInitialRebalanceKey(String groupId) {
return "initial-rebalance-timeout-" + groupId;
}

/**
* Generate a consumer group join key for the timer.
*
Expand Down
Loading