diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java index 07c324d1..413d91b3 100644 --- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java +++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java @@ -51,6 +51,7 @@ import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_OVERWRITE_KEY; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM; @@ -156,6 +157,7 @@ public Set> optionalOptions() { set.add(SINK_BUFFER_FLUSH_INTERVAL); set.add(SINK_PARALLELISM); set.add(SINK_IGNORE_NULL_VALUE); + set.add(SINK_OVERWRITE_KEY); set.add(LOOKUP_ASYNC); set.add(LOOKUP_CACHE_MAX_ROWS); set.add(LOOKUP_CACHE_TTL); @@ -182,7 +184,8 @@ public Set> forwardOptions() { SINK_BUFFER_FLUSH_MAX_SIZE, SINK_BUFFER_FLUSH_MAX_ROWS, SINK_BUFFER_FLUSH_INTERVAL, - SINK_IGNORE_NULL_VALUE) + SINK_IGNORE_NULL_VALUE, + SINK_OVERWRITE_KEY) .collect(Collectors.toSet()); } } diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java index fa8ab78c..7913d637 100644 --- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java +++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java @@ -82,7 +82,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { writeOptions.isIgnoreNullValue()), writeOptions.getBufferFlushMaxSizeInBytes(), writeOptions.getBufferFlushMaxRows(), - writeOptions.getBufferFlushIntervalMillis()); + writeOptions.getBufferFlushIntervalMillis(), + writeOptions.isOverwriteKey()); return SinkFunctionProvider.of(sinkFunction, writeOptions.getParallelism()); } diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index c73bbc3c..cb8b86ee 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -596,7 +596,8 @@ public void testHBaseSinkFunctionTableExistence() throws Exception { false), 2 * 1024 * 1024, 1000, - 1000); + 1000, + false); assertThatThrownBy(() -> sinkFunction.open(new Configuration())) .getRootCause() diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java index 94627834..08593ea1 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java @@ -35,6 +35,7 @@ public class HBaseWriteOptions implements Serializable { private final long bufferFlushMaxRows; private final long bufferFlushIntervalMillis; private final boolean ignoreNullValue; + private boolean overwriteKey; private final Integer parallelism; private HBaseWriteOptions( @@ -42,11 +43,13 @@ private HBaseWriteOptions( long bufferFlushMaxMutations, long bufferFlushIntervalMillis, boolean ignoreNullValue, + boolean overwriteKey, Integer parallelism) { this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; this.bufferFlushMaxRows = bufferFlushMaxMutations; this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; this.ignoreNullValue = ignoreNullValue; + this.overwriteKey = overwriteKey; this.parallelism = parallelism; } @@ -66,6 +69,10 @@ public boolean isIgnoreNullValue() { return ignoreNullValue; } + public boolean isOverwriteKey() { + return overwriteKey; + } + public Integer getParallelism() { return parallelism; } @@ -81,6 +88,8 @@ public String toString() { + bufferFlushIntervalMillis + ", ignoreNullValue=" + ignoreNullValue + + ", overwriteKey=" + + overwriteKey + ", parallelism=" + parallelism + '}'; @@ -99,6 +108,7 @@ public boolean equals(Object o) { && bufferFlushMaxRows == that.bufferFlushMaxRows && bufferFlushIntervalMillis == that.bufferFlushIntervalMillis && ignoreNullValue == that.ignoreNullValue + && overwriteKey == that.overwriteKey && parallelism == that.parallelism; } @@ -123,6 +133,7 @@ public static class Builder { private long bufferFlushMaxRows = 0; private long bufferFlushIntervalMillis = 0; private boolean ignoreNullValue; + private boolean overwriteKey; private Integer parallelism; /** @@ -160,6 +171,12 @@ public Builder setIgnoreNullValue(boolean ignoreNullValue) { return this; } + /** Optional. Writing option, If necessary,delete this key first before adding new data. */ + public Builder setOverwriteKey(boolean overwriteKey) { + this.overwriteKey = overwriteKey; + return this; + } + /** * Optional. Defines the parallelism of the HBase sink operator. By default, the parallelism * is determined by the framework using the same parallelism of the upstream chained @@ -177,6 +194,7 @@ public HBaseWriteOptions build() { bufferFlushMaxRows, bufferFlushIntervalMillis, ignoreNullValue, + overwriteKey, parallelism); } } diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java index 0ffad05d..cfbae5f1 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.slf4j.Logger; @@ -73,6 +74,7 @@ public class HBaseSinkFunction extends RichSinkFunction private final long bufferFlushMaxSizeInBytes; private final long bufferFlushMaxMutations; private final long bufferFlushIntervalMillis; + private final boolean overwriteKey; private final HBaseMutationConverter mutationConverter; private transient Connection connection; @@ -93,6 +95,24 @@ public class HBaseSinkFunction extends RichSinkFunction */ private final AtomicReference failureThrowable = new AtomicReference<>(); + public HBaseSinkFunction( + String hTableName, + org.apache.hadoop.conf.Configuration conf, + HBaseMutationConverter mutationConverter, + long bufferFlushMaxSizeInBytes, + long bufferFlushMaxMutations, + long bufferFlushIntervalMillis, + boolean overwriteKey) { + this.hTableName = hTableName; + // Configuration is not serializable + this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); + this.mutationConverter = mutationConverter; + this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; + this.bufferFlushMaxMutations = bufferFlushMaxMutations; + this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; + this.overwriteKey = overwriteKey; + } + public HBaseSinkFunction( String hTableName, org.apache.hadoop.conf.Configuration conf, @@ -203,7 +223,15 @@ private void checkErrorAndRethrow() { public void invoke(T value, Context context) throws Exception { checkErrorAndRethrow(); - mutator.mutate(mutationConverter.convertToMutation(value)); + Mutation mutation = mutationConverter.convertToMutation(value); + // If necessary, delete this key first before adding new data + if (overwriteKey) { + long now = System.currentTimeMillis(); + mutator.mutator.mutate(new Delete(mutation.getRow()).setTimestamp(now)); + mutator.mutate(mutation); + } else { + mutator.mutate(mutation); + } // flush when the buffer number of mutations greater than the configured max size. if (bufferFlushMaxMutations > 0 diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java index d760c034..996db88c 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java @@ -94,6 +94,14 @@ public class HBaseConnectorOptions { .defaultValue(false) .withDescription("Writing option, whether ignore null value or not."); + public static final ConfigOption SINK_OVERWRITE_KEY = + ConfigOptions.key("sink.overwrite-key") + .booleanType() + .defaultValue(false) + .withDescription( + "Writing option, Do you want to overwrite this key? If necessary, " + + "delete this key first before adding new data."); + public static final ConfigOption LOOKUP_ASYNC = ConfigOptions.key("lookup.async") .booleanType() diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java index 482644fd..3b6b6f94 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java @@ -35,6 +35,7 @@ import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_OVERWRITE_KEY; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT; @@ -91,6 +92,7 @@ public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig tableOptions builder.setBufferFlushMaxSizeInBytes( tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes()); builder.setIgnoreNullValue(tableOptions.get(SINK_IGNORE_NULL_VALUE)); + builder.setOverwriteKey(tableOptions.get(SINK_OVERWRITE_KEY)); builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null)); return builder.build(); }