diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index f9433df11a4d..20464d38e19b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -254,6 +254,12 @@ public class PipeConnectorConstant { public static final String SINK_MARK_AS_PIPE_REQUEST_KEY = "sink.mark-as-pipe-request"; public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE = true; + public static final String CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY = + "connector.mark-as-general-write-request"; + public static final String SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY = + "sink.mark-as-general-write-request"; + public static final boolean CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE = false; + public static final String CONNECTOR_SKIP_IF_KEY = "connector.skipif"; public static final String SINK_SKIP_IF_KEY = "sink.skipif"; public static final String CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES = "no-privileges"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index 34e579aba5c2..d96596bc8ac4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -103,6 +103,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE; @@ -132,6 +134,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_SKIP_IF_KEY; @@ -229,6 +232,13 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, SINK_IOTDB_BATCH_DELAY_KEY), false); + // Check coexistence of mark-as-pipe-request and mark-as-general-write-request + validator.validateSynonymAttributes( + Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY), + Arrays.asList( + CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY, SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY), + false); + username = parameters.getStringOrDefault( Arrays.asList( @@ -383,10 +393,20 @@ public void customize( .equals(CONNECTOR_FORMAT_TS_FILE_VALUE); LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled); - shouldMarkAsPipeRequest = + final boolean shouldMarkAsGeneralWriteRequest = parameters.getBooleanOrDefault( - Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY), - CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE); + Arrays.asList( + CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY, + SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY), + CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE); + if (shouldMarkAsGeneralWriteRequest) { + shouldMarkAsPipeRequest = false; + } else { + shouldMarkAsPipeRequest = + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY), + CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE); + } LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", shouldMarkAsPipeRequest); final String connectorSkipIfValue =