Skip to content

[FLINK-33566] HBase sql-connector needs overwrite the rowKey #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_OVERWRITE_KEY;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
Expand Down Expand Up @@ -156,6 +157,7 @@ public Set<ConfigOption<?>> optionalOptions() {
set.add(SINK_BUFFER_FLUSH_INTERVAL);
set.add(SINK_PARALLELISM);
set.add(SINK_IGNORE_NULL_VALUE);
set.add(SINK_OVERWRITE_KEY);
set.add(LOOKUP_ASYNC);
set.add(LOOKUP_CACHE_MAX_ROWS);
set.add(LOOKUP_CACHE_TTL);
Expand All @@ -182,7 +184,8 @@ public Set<ConfigOption<?>> forwardOptions() {
SINK_BUFFER_FLUSH_MAX_SIZE,
SINK_BUFFER_FLUSH_MAX_ROWS,
SINK_BUFFER_FLUSH_INTERVAL,
SINK_IGNORE_NULL_VALUE)
SINK_IGNORE_NULL_VALUE,
SINK_OVERWRITE_KEY)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
writeOptions.isIgnoreNullValue()),
writeOptions.getBufferFlushMaxSizeInBytes(),
writeOptions.getBufferFlushMaxRows(),
writeOptions.getBufferFlushIntervalMillis());
writeOptions.getBufferFlushIntervalMillis(),
writeOptions.isOverwriteKey());
return SinkFunctionProvider.of(sinkFunction, writeOptions.getParallelism());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ public void testHBaseSinkFunctionTableExistence() throws Exception {
false),
2 * 1024 * 1024,
1000,
1000);
1000,
false);

assertThatThrownBy(() -> sinkFunction.open(new Configuration()))
.getRootCause()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,21 @@ public class HBaseWriteOptions implements Serializable {
private final long bufferFlushMaxRows;
private final long bufferFlushIntervalMillis;
private final boolean ignoreNullValue;
private boolean overwriteKey;
private final Integer parallelism;

private HBaseWriteOptions(
long bufferFlushMaxSizeInBytes,
long bufferFlushMaxMutations,
long bufferFlushIntervalMillis,
boolean ignoreNullValue,
boolean overwriteKey,
Integer parallelism) {
this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
this.bufferFlushMaxRows = bufferFlushMaxMutations;
this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
this.ignoreNullValue = ignoreNullValue;
this.overwriteKey = overwriteKey;
this.parallelism = parallelism;
}

Expand All @@ -66,6 +69,10 @@ public boolean isIgnoreNullValue() {
return ignoreNullValue;
}

public boolean isOverwriteKey() {
return overwriteKey;
}

public Integer getParallelism() {
return parallelism;
}
Expand All @@ -81,6 +88,8 @@ public String toString() {
+ bufferFlushIntervalMillis
+ ", ignoreNullValue="
+ ignoreNullValue
+ ", overwriteKey="
+ overwriteKey
+ ", parallelism="
+ parallelism
+ '}';
Expand All @@ -99,6 +108,7 @@ public boolean equals(Object o) {
&& bufferFlushMaxRows == that.bufferFlushMaxRows
&& bufferFlushIntervalMillis == that.bufferFlushIntervalMillis
&& ignoreNullValue == that.ignoreNullValue
&& overwriteKey == that.overwriteKey
&& parallelism == that.parallelism;
}

Expand All @@ -123,6 +133,7 @@ public static class Builder {
private long bufferFlushMaxRows = 0;
private long bufferFlushIntervalMillis = 0;
private boolean ignoreNullValue;
private boolean overwriteKey;
private Integer parallelism;

/**
Expand Down Expand Up @@ -160,6 +171,12 @@ public Builder setIgnoreNullValue(boolean ignoreNullValue) {
return this;
}

/** Optional. Writing option, If necessary,delete this key first before adding new data. */
public Builder setOverwriteKey(boolean overwriteKey) {
this.overwriteKey = overwriteKey;
return this;
}

/**
* Optional. Defines the parallelism of the HBase sink operator. By default, the parallelism
* is determined by the framework using the same parallelism of the upstream chained
Expand All @@ -177,6 +194,7 @@ public HBaseWriteOptions build() {
bufferFlushMaxRows,
bufferFlushIntervalMillis,
ignoreNullValue,
overwriteKey,
parallelism);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
private final long bufferFlushMaxSizeInBytes;
private final long bufferFlushMaxMutations;
private final long bufferFlushIntervalMillis;
private final boolean overwriteKey;
private final HBaseMutationConverter<T> mutationConverter;

private transient Connection connection;
Expand All @@ -93,6 +95,24 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
*/
private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();

public HBaseSinkFunction(
String hTableName,
org.apache.hadoop.conf.Configuration conf,
HBaseMutationConverter<T> mutationConverter,
long bufferFlushMaxSizeInBytes,
long bufferFlushMaxMutations,
long bufferFlushIntervalMillis,
boolean overwriteKey) {
this.hTableName = hTableName;
// Configuration is not serializable
this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
this.mutationConverter = mutationConverter;
this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
this.bufferFlushMaxMutations = bufferFlushMaxMutations;
this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
this.overwriteKey = overwriteKey;
}

public HBaseSinkFunction(
String hTableName,
org.apache.hadoop.conf.Configuration conf,
Expand Down Expand Up @@ -203,7 +223,15 @@ private void checkErrorAndRethrow() {
public void invoke(T value, Context context) throws Exception {
checkErrorAndRethrow();

mutator.mutate(mutationConverter.convertToMutation(value));
Mutation mutation = mutationConverter.convertToMutation(value);
// If necessary, delete this key first before adding new data
if (overwriteKey) {
long now = System.currentTimeMillis();
mutator.mutator.mutate(new Delete(mutation.getRow()).setTimestamp(now));
mutator.mutate(mutation);
} else {
mutator.mutate(mutation);
}

// flush when the buffer number of mutations greater than the configured max size.
if (bufferFlushMaxMutations > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ public class HBaseConnectorOptions {
.defaultValue(false)
.withDescription("Writing option, whether ignore null value or not.");

public static final ConfigOption<Boolean> SINK_OVERWRITE_KEY =
ConfigOptions.key("sink.overwrite-key")
.booleanType()
.defaultValue(false)
.withDescription(
"Writing option, Do you want to overwrite this key? If necessary, "
+ "delete this key first before adding new data.");

public static final ConfigOption<Boolean> LOOKUP_ASYNC =
ConfigOptions.key("lookup.async")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_OVERWRITE_KEY;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
Expand Down Expand Up @@ -91,6 +92,7 @@ public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig tableOptions
builder.setBufferFlushMaxSizeInBytes(
tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
builder.setIgnoreNullValue(tableOptions.get(SINK_IGNORE_NULL_VALUE));
builder.setOverwriteKey(tableOptions.get(SINK_OVERWRITE_KEY));
builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null));
return builder.build();
}
Expand Down