Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.net.UnknownHostException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;

public class Consumers {

Expand Down Expand Up @@ -149,6 +156,16 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
log.error(e.getMessage(), e);
}

// Optionally add dynamic suffix to the KCL application name
if (kinesisConfig.isAppNameDynamicSuffixEnabled()) {
String suffix = resolveDynamicSuffix(kinesisConfig.getAppNameDynamicSuffixFormat());
kinesisApplicationName = kinesisApplicationName + suffix;
// Optionally kick off async cleanup of old lease tables
if (kinesisConfig.isCleanupOldLeaseTablesEnabled()) {
asyncCleanupOldLeaseTables(getKinesisApplicationName(kinesisStreamName, hostName, carbonjEnv));
}
}

Counter initRetryCounter = metricRegistry.counter(MetricRegistry.name("kinesis.consumer." + kinesisStreamName + ".initRetryCounter"));
KinesisConsumer kinesisConsumer = new KinesisConsumer(metricRegistry, pointProcessor, recoveryPointProcessor, kinesisStreamName,
kinesisApplicationName, kinesisConfig, checkPointMgr, initRetryCounter, kinesisConsumerRegion, kinesisConsumerTracebackMinutes);
Expand Down Expand Up @@ -185,6 +202,50 @@ private String getKinesisApplicationName(String streamName, String hostName, Str
return streamName + "-" + hostName + "-" + carbonjEnv;
}

private String resolveDynamicSuffix(String format) {
String suffix = format;
try {
suffix = suffix.replace("{epoch}", Long.toString(System.currentTimeMillis()))
.replace("{hostname}", InetAddress.getLocalHost().getHostName())
.replace("{uuid}", java.util.UUID.randomUUID().toString());
} catch (Throwable t) {
// fallback minimal suffix
suffix = "-" + System.currentTimeMillis();
}
if (!suffix.startsWith("-")) {
suffix = "-" + suffix;
}
return suffix;
}

private void asyncCleanupOldLeaseTables(String baseApplicationName) {
ExecutorService es = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "kcl-lease-cleanup");
t.setDaemon(true);
return t;
});
es.submit(() -> {
try (DynamoDbClient ddb = DynamoDbClient.builder().build()) {
ListTablesResponse resp = ddb.listTables(ListTablesRequest.builder().build());
for (String table : resp.tableNames()) {
// KCL v2/v3 table name prefix is usually application name; be conservative
if (table.startsWith(baseApplicationName) && !table.equals(baseApplicationName)) {
try {
ddb.deleteTable(DeleteTableRequest.builder().tableName(table).build());
log.info("Submitted async deletion for old KCL lease/checkpoint table {}", table);
} catch (Throwable t) {
log.warn("Failed to delete table {}: {}", table, t.getMessage());
}
}
}
} catch (Throwable t) {
log.warn("KCL lease cleanup encountered an error: {}", t.getMessage());
}
});
es.shutdown();
try { es.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException ignored) {}
}

private void close(Set<String> consumerSet) {
if (null == consumerSet) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,18 @@ public class KinesisConfig {
private final int maxRecords;
private final boolean aggregationEnabled;

// Controls dynamic application name behavior and cleanup of old lease tables
private final boolean appNameDynamicSuffixEnabled;
private final String appNameDynamicSuffixFormat;
private final boolean cleanupOldLeaseTablesEnabled;

public KinesisConfig(boolean kinesisConsumerEnabled, boolean recoveryEnabled,
long recoveryIdleTimeMillis, long checkPointIntervalMillis, long retryTimeInMillis,
int recoveryThreads, Path checkPointDir, int initRetryTimeInSecs,
int leaseExpirationTimeInSecs, String recoveryProvider, int gapsTableProvThroughput,
int maxRecords, boolean aggregationEnabled) {
int maxRecords, boolean aggregationEnabled,
boolean appNameDynamicSuffixEnabled, String appNameDynamicSuffixFormat,
boolean cleanupOldLeaseTablesEnabled) {
this.kinesisConsumerEnabled = kinesisConsumerEnabled;
this.recoveryEnabled = recoveryEnabled;
this.recoveryIdleTimeMillis = recoveryIdleTimeMillis;
Expand All @@ -46,6 +53,9 @@ public KinesisConfig(boolean kinesisConsumerEnabled, boolean recoveryEnabled,
this.gapsTableProvisionedThroughput = gapsTableProvThroughput;
this.maxRecords = maxRecords;
this.aggregationEnabled = aggregationEnabled;
this.appNameDynamicSuffixEnabled = appNameDynamicSuffixEnabled;
this.appNameDynamicSuffixFormat = appNameDynamicSuffixFormat;
this.cleanupOldLeaseTablesEnabled = cleanupOldLeaseTablesEnabled;

if( recoveryProvider.equalsIgnoreCase("dynamodb")) {
this.recoveryProvider = KinesisRecoveryProvider.DYNAMODB;
Expand Down Expand Up @@ -102,4 +112,16 @@ public int getMaxRecords() {
public boolean isAggregationEnabled() {
return aggregationEnabled;
}

public boolean isAppNameDynamicSuffixEnabled() {
return appNameDynamicSuffixEnabled;
}

public String getAppNameDynamicSuffixFormat() {
return appNameDynamicSuffixFormat;
}

public boolean isCleanupOldLeaseTablesEnabled() {
return cleanupOldLeaseTablesEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,24 @@ public class cfgKinesis
@Value( "${aggregation.enabled:true}" )
private boolean aggregationEnabled;

// Optional: dynamically suffix the KCL application name and cleanup old lease tables
@Value( "${kinesis.consumer.appName.dynamicSuffix.enabled:false}" )
private boolean appNameDynamicSuffixEnabled;

// Supports tokens: {epoch}, {uuid}, {hostname}
@Value( "${kinesis.consumer.appName.dynamicSuffix.format:-{epoch}}" )
private String appNameDynamicSuffixFormat;

@Value( "${kinesis.consumer.cleanupOldLeaseTables.enabled:false}" )
private boolean cleanupOldLeaseTablesEnabled;

@Bean
KinesisConfig kinesisConfig()
{
return new KinesisConfig(kinesisConsumerEnabled, recoveryEnabled, recoveryIdleTimeInMillis,
checkPointIntervalMillis, retryTimeInMillis, recoveryThreads, Paths.get(checkPointDir),
initRetryTimeInSecs, leaseExpirationTimeInSecs, recoveryProvider, gapsTableProvisionedThroughput,
maxRecords, aggregationEnabled);
maxRecords, aggregationEnabled,
appNameDynamicSuffixEnabled, appNameDynamicSuffixFormat, cleanupOldLeaseTablesEnabled);
}
}
Loading