|
21 | 21 | import com.bytedance.bitsail.base.connector.writer.v1.WriterCommitter; |
22 | 22 | import com.bytedance.bitsail.base.serializer.BinarySerializer; |
23 | 23 | import com.bytedance.bitsail.base.serializer.SimpleBinarySerializer; |
| 24 | +import com.bytedance.bitsail.common.BitSailException; |
24 | 25 | import com.bytedance.bitsail.common.configuration.BitSailConfiguration; |
25 | 26 | import com.bytedance.bitsail.common.exception.CommonErrorCode; |
26 | 27 | import com.bytedance.bitsail.common.model.ColumnInfo; |
27 | 28 | import com.bytedance.bitsail.common.type.TypeInfoConverter; |
28 | 29 | import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter; |
| 30 | +import com.bytedance.bitsail.common.util.DateUtil; |
29 | 31 | import com.bytedance.bitsail.connector.doris.DorisConnectionHolder; |
30 | 32 | import com.bytedance.bitsail.connector.doris.committer.DorisCommittable; |
31 | 33 | import com.bytedance.bitsail.connector.doris.committer.DorisCommittableSerializer; |
|
36 | 38 | import com.bytedance.bitsail.connector.doris.option.DorisWriterOptions; |
37 | 39 | import com.bytedance.bitsail.connector.doris.partition.DorisPartition; |
38 | 40 | import com.bytedance.bitsail.connector.doris.partition.DorisPartitionManager; |
| 41 | +import com.bytedance.bitsail.connector.doris.partition.DorisPartitionTemplate; |
39 | 42 | import com.bytedance.bitsail.connector.doris.sink.ddl.DorisSchemaManagerGenerator; |
40 | 43 |
|
41 | 44 | import com.alibaba.fastjson.JSON; |
|
44 | 47 |
|
45 | 48 | import java.io.IOException; |
46 | 49 | import java.sql.SQLException; |
47 | | -import java.util.HashMap; |
48 | | -import java.util.List; |
49 | | -import java.util.Map; |
50 | | -import java.util.Optional; |
51 | | -import java.util.Properties; |
| 50 | +import java.text.ParseException; |
| 51 | +import java.text.SimpleDateFormat; |
| 52 | +import java.util.*; |
52 | 53 | import java.util.stream.Collectors; |
53 | 54 |
|
54 | 55 | public class DorisSink<InputT> implements Sink<InputT, DorisCommittable, DorisWriterState> { |
@@ -137,16 +138,38 @@ private void initDorisOptions(BitSailConfiguration writerConfiguration) { |
137 | 138 | // Need partition info in batch replace modes. |
138 | 139 | if (isHasPartition && this.writeMode.equals(DorisExecutionOptions.WRITE_MODE.BATCH_REPLACE)) { |
139 | 140 | //BATCH and REPLACE mode need the partition infos |
140 | | - List<Map<String, Object>> partitionList = writerConfiguration.getNecessaryOption(DorisWriterOptions.PARTITIONS, CommonErrorCode.CONFIG_ERROR); |
141 | | - builder.partitions( |
142 | | - partitionList.stream() |
143 | | - .map(partition -> JSON.parseObject(JSON.toJSONString(partition), DorisPartition.class)) |
144 | | - .collect(Collectors.toList()) |
145 | | - ); |
| 141 | + DorisPartitionTemplate dorisPartitionTemplate = writerConfiguration.getNecessaryOption(DorisWriterOptions.PARTITIONS, CommonErrorCode.CONFIG_ERROR); |
| 142 | + List<DorisPartition> dorisPartitions = parseTemplateToDorisPartitions(dorisPartitionTemplate); |
| 143 | + builder.partitions(dorisPartitions); |
146 | 144 | } |
147 | 145 | dorisOptions = builder.build(); |
148 | 146 | } |
149 | 147 |
|
| 148 | + private List<DorisPartition> parseTemplateToDorisPartitions(DorisPartitionTemplate dorisPartitionTemplate){ |
| 149 | + String pattern = dorisPartitionTemplate.getPattern(); |
| 150 | + String prefix = dorisPartitionTemplate.getPrefix(); |
| 151 | + String start = dorisPartitionTemplate.getStartRange(); |
| 152 | + String end = dorisPartitionTemplate.getEndRange(); |
| 153 | + |
| 154 | + SimpleDateFormat sdf = new SimpleDateFormat(pattern); |
| 155 | + try { |
| 156 | + Date dStart = sdf.parse(start); |
| 157 | + Date dEnd = sdf.parse(end); |
| 158 | + List<DorisPartition> dorisPartitions = new ArrayList<>(); |
| 159 | + List<Date> listDate = DateUtil.getDatesBetweenTwoDate(dStart, dEnd); |
| 160 | + listDate.forEach(date->{ |
| 161 | + DorisPartition dorisPartition = new DorisPartition(); |
| 162 | + dorisPartition.setName(prefix + sdf.format(date)); |
| 163 | + dorisPartition.setStartRange(Collections.singletonList(sdf.format(date))); |
| 164 | + dorisPartition.setEndRange(Collections.singletonList(sdf.format(DateUtil.getNDaysAfterDate(date, 1)))); |
| 165 | + dorisPartitions.add(dorisPartition); |
| 166 | + }); |
| 167 | + return dorisPartitions; |
| 168 | + } catch (ParseException e) { |
| 169 | + throw BitSailException.asBitSailException(CommonErrorCode.CONFIG_ERROR, "Can't parse configuration info: " + dorisPartitionTemplate, e); |
| 170 | + } |
| 171 | + } |
| 172 | + |
150 | 173 | private void initDorisExecutionOptions(BitSailConfiguration writerConfiguration) { |
151 | 174 | LOG.info("Start to init DorisExecutionOptions!"); |
152 | 175 | final DorisExecutionOptions.DorisExecutionOptionsBuilder builder = DorisExecutionOptions.builder(); |
|
0 commit comments