File tree Expand file tree Collapse file tree 3 files changed +4
-3
lines changed
spark-doris-connector/src/main
java/org/apache/doris/spark/cfg
scala/org/apache/doris/spark Expand file tree Collapse file tree 3 files changed +4
-3
lines changed Original file line number Diff line number Diff line change @@ -94,10 +94,10 @@ public interface ConfigurationOptions {
9494 int SINK_MAX_BLOCKING_TIMES_DEFAULT = 1 ;
9595
9696 String DORIS_SINK_MAX_BLOCKING_INTERVAL_MS = "doris.sink.max.blocking.interval.ms" ;
97- int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 1000 ;
97+ int SINK_MAX_BLOCKING_INTERVAL_MS_DEFAULT = 300000 ;
9898
9999 String DORIS_SINK_BLOCKING_TRIGGER_KEYS = "doris.sink.block.trigger.keys" ;
100- String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "-235 " ;
100+ String SINK_BLOCKING_TRIGGER_KEYS_DEFAULT = "" ;
101101
102102
103103 /**
Original file line number Diff line number Diff line change @@ -182,7 +182,7 @@ private[spark] object Utils {
182182 }
183183
184184 def shouldBlock (exception : String ): Boolean = {
185- blockTriggerKeysArray.exists(exception.contains)
185+ blockTriggerKeysArray.nonEmpty && blockTriggerKeysArray. exists(exception.contains)
186186 }
187187
188188 val result = Try (f)
Original file line number Diff line number Diff line change @@ -79,6 +79,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
7979
8080 var resultRdd = dataFrame.queryExecution.toRdd
8181 val schema = dataFrame.schema
82+ val dfColumns = dataFrame.columns
8283 if (Objects .nonNull(sinkTaskPartitionSize)) {
8384 resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize)
8485 }
You can’t perform that action at this time.
0 commit comments