Skip to content

Commit 70fe118

Browse files
committed
add write blocking properties to control write blocking.
1 parent 11f4976 commit 70fe118

File tree

3 files changed

+56
-2
lines changed

3 files changed

+56
-2
lines changed

spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ public interface ConfigurationOptions {
9090

9191
int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
9292

93+
String DORIS_SINK_MAX_BLOCKING_TIMES = "doris.sink.max.blocking.times";
94+
int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1;
95+
96+
String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms";
97+
int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 1000;
98+
99+
String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys";
100+
String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "-235";
101+
102+
93103
/**
94104
* set types to ignore, split by comma
95105
* e.g.

spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,43 @@ private[spark] object Utils {
170170
finalParams
171171
}
172172

173+
@tailrec
174+
def retry[R, T <: Throwable : ClassTag](retryTimes: Int, maxBlockTimes: Int, batchInterval: Duration, maxBlockInterval: Duration,
175+
blockTriggerKeysArray: Array[String], logger: Logger)(f: => R): Try[R] = {
176+
assert(retryTimes >= 0)
177+
assert(maxBlockTimes >= 0)
178+
var currentBlockInterval = batchInterval
179+
180+
def increaseBackoffTime(): Unit = {
181+
currentBlockInterval = Duration.ofNanos(Math.min(batchInterval.toNanos * 2, maxBlockInterval.toNanos))
182+
}
183+
184+
def shouldBlock(exception: String): Boolean = {
185+
blockTriggerKeysArray.exists(exception.contains)
186+
}
187+
188+
val result = Try(f)
189+
result match {
190+
case Success(result) =>
191+
LockSupport.parkNanos(currentBlockInterval.toNanos)
192+
Success(result)
193+
case Failure(exception: T) if retryTimes > 0 && !shouldBlock(exception.getMessage) =>
194+
logger.warn(s"Execution failed caused by: ", exception)
195+
logger.warn(s"$retryTimes times retry remaining, the next will be in ${batchInterval.toMillis}ms")
196+
LockSupport.parkNanos(batchInterval.toNanos)
197+
retry(retryTimes - 1, maxBlockTimes, currentBlockInterval, maxBlockInterval, blockTriggerKeysArray, logger)(f)
198+
case Failure(exception: T) if maxBlockTimes > 0 && shouldBlock(exception.getMessage) =>
199+
logger.warn(s"Execution failed caused by: ", exception)
200+
increaseBackoffTime()
201+
logger.warn(s"$maxBlockTimes times write blocking retry remaining, the next will be in ${currentBlockInterval.toMillis}ms")
202+
LockSupport.parkNanos(currentBlockInterval.toNanos)
203+
retry(retryTimes, maxBlockTimes - 1, currentBlockInterval, maxBlockInterval, blockTriggerKeysArray, logger)(f)
204+
case Failure(exception) =>
205+
logger.warn(s"Execution failed caused by: ", exception)
206+
Failure(exception)
207+
}
208+
}
209+
173210
@tailrec
174211
def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)(f: => R): Try[R] = {
175212
assert(retryTimes >= 0)

spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
4848
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
4949
private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
5050
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
51+
private val maxSinkBlocks: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_BLOCKING_TIMES,
52+
ConfigurationOptions.SINK_MAX_BLOCKING_TIMES_DEFAULT)
53+
private val blockTriggerKeys: String = settings.getProperty(ConfigurationOptions.DORIS_SINK_BLOCKING_TRIGGER_KEYS,
54+
ConfigurationOptions.SINK_BLOCKING_TRIGGER_KEYS_DEFAULT)
55+
private val maxBlockInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_BLOCKING_INTERVAL_MS,
56+
ConfigurationOptions.SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT)
57+
private val blockTriggerKeysArray: Array[String] = blockTriggerKeys.split(",")
5158

5259
private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
5360
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
@@ -80,8 +87,8 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
8087
*
8188
*/
8289
def flush(batch: Seq[util.List[Object]], dfColumns: Array[String]): Unit = {
83-
Utils.retry[util.List[Integer], Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
84-
dorisStreamLoader.loadV2(batch.asJava, dfColumns, enable2PC)
90+
Utils.retry[util.List[Integer], Exception](maxRetryTimes, maxSinkBlocks, Duration.ofMillis(batchInterValMs.toLong), Duration.ofMillis(maxBlockInterValMs.toLong), blockTriggerKeysArray, logger) {
91+
dorisStreamLoader.loadV2(batch.toList.asJava, dfColumns, enable2PC)
8592
} match {
8693
case Success(txnIds) => if (enable2PC) handleLoadSuccess(txnIds.asScala, preCommittedTxnAcc)
8794
case Failure(e) =>

0 commit comments

Comments
 (0)