Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public final class GroupConfig extends AbstractConfig {

public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = "streams.num.standby.replicas";

public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG = "streams.initial.rebalance.delay.ms";

public final int consumerSessionTimeoutMs;

public final int consumerHeartbeatIntervalMs;
Expand All @@ -93,6 +95,8 @@ public final class GroupConfig extends AbstractConfig {

public final int streamsNumStandbyReplicas;

public final int streamsInitialRebalanceDelayMs;

public final String shareIsolationLevel;

private static final ConfigDef CONFIG = new ConfigDef()
Expand Down Expand Up @@ -155,7 +159,13 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT,
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
.define(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT,
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);

public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false);
Expand All @@ -168,6 +178,7 @@ public GroupConfig(Map<?, ?> props) {
this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsInitialRebalanceDelayMs = getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
}

Expand Down Expand Up @@ -379,6 +390,13 @@ public int streamsNumStandbyReplicas() {
return streamsNumStandbyReplicas;
}

/**
* The initial rebalance delay for streams groups.
*/
public int streamsInitialRebalanceDelayMs() {
return streamsInitialRebalanceDelayMs;
}

/**
* The share group isolation level.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ public class GroupCoordinatorConfig {
public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2;
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;

public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.streams.initial.rebalance.delay.ms";
public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Are we good with a default of 3 seconds for delaying the initial rebalance?

public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more streams clients to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances.";

public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG = "group.share.initialize.retry.interval.ms";
// Because persister retries with exp backoff 5 times and upper cap of 30 secs.
public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT = 30_000;
Expand Down Expand Up @@ -352,7 +356,8 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
.define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC)
.define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);


/**
Expand Down Expand Up @@ -405,6 +410,7 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMaxSize;
private final int streamsGroupNumStandbyReplicas;
private final int streamsGroupMaxStandbyReplicas;
private final int streamsGroupInitialRebalanceDelayMs;

@SuppressWarnings("this-escape")
public GroupCoordinatorConfig(AbstractConfig config) {
Expand Down Expand Up @@ -457,6 +463,7 @@ public GroupCoordinatorConfig(AbstractConfig config) {
this.streamsGroupMaxSize = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG);
this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsGroupMaxStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
this.streamsGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);

// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
Expand Down Expand Up @@ -961,4 +968,11 @@ public int streamsGroupNumStandbyReplicas() {
public int streamsGroupMaxNumStandbyReplicas() {
return streamsGroupMaxStandbyReplicas;
}

/**
* The initial rebalance delay for streams groups.
*/
public int streamsGroupInitialRebalanceDelayMs() {
return streamsGroupInitialRebalanceDelayMs;
}
}
Loading