Skip to content

Commit 870a134

Browse files
authored
[Fix](job) fix drop db error and duplicate fragment_id when retry (#57307)
### What problem does this PR solve? Fixed the following issues: 1. In a cloud environment, if the database has been deleted, attempting to drop a job would result in an error. 2. In a cloud environment, MS offsets were not successfully put, and statistics were inaccurate. 3. When a StreamingTask timed out and retried, a duplicate framement error would be reported.
1 parent bd45c50 commit 870a134

File tree

8 files changed

+317
-67
lines changed

8 files changed

+317
-67
lines changed

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
@Log4j2
9090
public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, Map<Object, Object>> implements
9191
TxnStateChangeCallback, GsonPostProcessable {
92-
private final long dbId;
92+
private long dbId;
9393
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
9494
@Getter
9595
@Setter
@@ -129,7 +129,6 @@ public StreamingInsertJob(String jobName,
129129
Map<String, String> properties) {
130130
super(Env.getCurrentEnv().getNextId(), jobName, jobStatus, dbName, comment, createUser,
131131
jobConfig, createTimeMs, executeSql);
132-
this.dbId = ConnectContext.get().getCurrentDbId();
133132
this.properties = properties;
134133
init();
135134
}
@@ -246,11 +245,12 @@ public void updateJobStatus(JobStatus status) throws JobException {
246245
try {
247246
super.updateJobStatus(status);
248247
if (JobStatus.PAUSED.equals(getJobStatus())) {
249-
clearRunningStreamTask();
248+
clearRunningStreamTask(status);
250249
}
251250
if (isFinalStatus()) {
252251
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
253252
}
253+
log.info("Streaming insert job {} update status to {}", getJobId(), getJobStatus());
254254
} finally {
255255
lock.writeLock().unlock();
256256
}
@@ -296,6 +296,8 @@ protected StreamingInsertTask createStreamingInsertTask() {
296296
offsetProvider, getCurrentDbName(), jobProperties, originTvfProps, getCreateUser());
297297
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
298298
this.runningStreamTask.setStatus(TaskStatus.PENDING);
299+
log.info("create new streaming insert task for job {}, task {} ",
300+
getJobId(), runningStreamTask.getTaskId());
299301
return runningStreamTask;
300302
}
301303

@@ -314,18 +316,19 @@ public boolean needScheduleTask() {
314316
return (getJobStatus().equals(JobStatus.RUNNING) || getJobStatus().equals(JobStatus.PENDING));
315317
}
316318

317-
public void clearRunningStreamTask() {
319+
public void clearRunningStreamTask(JobStatus newJobStatus) {
318320
if (runningStreamTask != null) {
321+
log.info("clear running streaming insert task for job {}, task {}, status {} ",
322+
getJobId(), runningStreamTask.getTaskId(), runningStreamTask.getStatus());
323+
runningStreamTask.cancel(JobStatus.STOPPED.equals(newJobStatus) ? false : true);
319324
runningStreamTask.closeOrReleaseResources();
320-
runningStreamTask = null;
321325
}
322326
}
323327

324328
public boolean hasMoreDataToConsume() {
325329
return offsetProvider.hasMoreDataToConsume();
326330
}
327331

328-
329332
@Override
330333
public void onTaskFail(StreamingJobSchedulerTask task) throws JobException {
331334
if (task.getErrMsg() != null) {
@@ -358,6 +361,8 @@ public void onStreamTaskSuccess(StreamingInsertTask task) {
358361
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
359362
StreamingInsertTask nextTask = createStreamingInsertTask();
360363
this.runningStreamTask = nextTask;
364+
log.info("Streaming insert job {} create next streaming insert task {} after task {} success",
365+
getJobId(), nextTask.getTaskId(), task.getTaskId());
361366
} finally {
362367
writeUnlock();
363368
}
@@ -374,6 +379,17 @@ private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attach
374379
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
375380
}
376381

382+
private void updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
383+
if (this.jobStatistic == null) {
384+
this.jobStatistic = new StreamingJobStatistic();
385+
}
386+
this.jobStatistic.setScannedRows(attachment.getScannedRows());
387+
this.jobStatistic.setLoadBytes(attachment.getLoadBytes());
388+
this.jobStatistic.setFileNumber(attachment.getNumFiles());
389+
this.jobStatistic.setFileSize(attachment.getFileBytes());
390+
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
391+
}
392+
377393
@Override
378394
public void onRegister() throws JobException {
379395
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
@@ -555,6 +571,12 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
555571
boolean shouldReleaseLock = false;
556572
writeLock();
557573
try {
574+
if (runningStreamTask.getIsCanceled().get()) {
575+
log.info("streaming insert job {} task {} is canceled, skip beforeCommitted",
576+
getJobId(), runningStreamTask.getTaskId());
577+
return;
578+
}
579+
558580
ArrayList<Long> taskIds = new ArrayList<>();
559581
taskIds.add(runningStreamTask.getTaskId());
560582
// todo: Check whether the taskid of runningtask is consistent with the taskid associated with txn
@@ -601,14 +623,26 @@ public void replayOnCommitted(TransactionState txnState) {
601623
succeedTaskCount.incrementAndGet();
602624
}
603625

626+
public long getDbId() {
627+
if (dbId <= 0) {
628+
try {
629+
this.dbId = Env.getCurrentInternalCatalog().getDbOrAnalysisException(getCurrentDbName()).getId();
630+
} catch (AnalysisException e) {
631+
log.warn("failed to get db id for streaming insert job {}, db name: {}, msg: {}",
632+
getJobId(), getCurrentDbName(), e.getMessage());
633+
}
634+
}
635+
return dbId;
636+
}
637+
604638
public void replayOnCloudMode() throws JobException {
605639
Cloud.GetStreamingTaskCommitAttachRequest.Builder builder =
606640
Cloud.GetStreamingTaskCommitAttachRequest.newBuilder();
607641
builder.setCloudUniqueId(Config.cloud_unique_id);
608-
builder.setDbId(dbId);
642+
builder.setDbId(getDbId());
609643
builder.setJobId(getJobId());
610644

611-
Cloud.GetStreamingTaskCommitAttachResponse response;
645+
Cloud.GetStreamingTaskCommitAttachResponse response = null;
612646
try {
613647
response = MetaServiceProxy.getInstance().getStreamingTaskCommitAttach(builder.build());
614648
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
@@ -621,13 +655,13 @@ public void replayOnCloudMode() throws JobException {
621655
}
622656
}
623657
} catch (RpcException e) {
624-
log.info("failed to get streaming task commit attach {}", e);
658+
log.info("failed to get streaming task commit attach {}", response, e);
625659
throw new JobException(e.getMessage());
626660
}
627661

628662
StreamingTaskTxnCommitAttachment commitAttach =
629663
new StreamingTaskTxnCommitAttachment(response.getCommitAttach());
630-
updateJobStatisticAndOffset(commitAttach);
664+
updateCloudJobStatisticAndOffset(commitAttach);
631665
}
632666

633667
@Override

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java

Lines changed: 55 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.doris.job.extensions.insert.InsertTask;
2828
import org.apache.doris.job.offset.Offset;
2929
import org.apache.doris.job.offset.SourceOffsetProvider;
30+
import org.apache.doris.job.offset.s3.S3Offset;
3031
import org.apache.doris.nereids.StatementContext;
3132
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
3233
import org.apache.doris.nereids.parser.NereidsParser;
@@ -67,10 +68,12 @@ public class StreamingInsertTask {
6768
private UserIdentity userIdentity;
6869
private ConnectContext ctx;
6970
private Offset runningOffset;
71+
@Getter
7072
private AtomicBoolean isCanceled = new AtomicBoolean(false);
7173
private StreamingJobProperties jobProperties;
7274
private Map<String, String> originTvfProps;
7375
SourceOffsetProvider offsetProvider;
76+
private int retryCount = 0;
7477

7578
public StreamingInsertTask(long jobId,
7679
long taskId,
@@ -93,22 +96,32 @@ public StreamingInsertTask(long jobId,
9396
}
9497

9598
public void execute() throws JobException {
96-
try {
97-
before();
98-
run();
99-
onSuccess();
100-
} catch (Exception e) {
101-
if (TaskStatus.CANCELED.equals(status)) {
99+
while (retryCount <= MAX_RETRY) {
100+
try {
101+
before();
102+
run();
103+
onSuccess();
102104
return;
103-
}
104-
log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e);
105-
onFail(e.getMessage());
106-
} finally {
107-
// The cancel logic will call the closeOrReleased Resources method by itself.
108-
// If it is also called here,
109-
// it may result in the inability to obtain relevant information when canceling the task
110-
if (!TaskStatus.CANCELED.equals(status)) {
111-
closeOrReleaseResources();
105+
} catch (Exception e) {
106+
if (TaskStatus.CANCELED.equals(status)) {
107+
return;
108+
}
109+
this.errMsg = e.getMessage();
110+
retryCount++;
111+
if (retryCount > MAX_RETRY) {
112+
log.error("Task execution failed after {} retries.", MAX_RETRY, e);
113+
onFail(e.getMessage());
114+
return;
115+
}
116+
log.warn("execute streaming task error, job id is {}, task id is {}, retrying {}/{}: {}",
117+
jobId, taskId, retryCount, MAX_RETRY, e.getMessage());
118+
} finally {
119+
// The cancel logic will call the closeOrReleased Resources method by itself.
120+
// If it is also called here,
121+
// it may result in the inability to obtain relevant information when canceling the task
122+
if (!TaskStatus.CANCELED.equals(status)) {
123+
closeOrReleaseResources();
124+
}
112125
}
113126
}
114127
}
@@ -118,7 +131,8 @@ private void before() throws Exception {
118131
this.startTimeMs = System.currentTimeMillis();
119132

120133
if (isCanceled.get()) {
121-
throw new JobException("Streaming insert task has been canceled, task id: {}", getTaskId());
134+
log.info("streaming insert task has been canceled, task id is {}", getTaskId());
135+
return;
122136
}
123137
ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
124138
ctx.setSessionVariable(jobProperties.getSessionVariable());
@@ -135,42 +149,36 @@ private void before() throws Exception {
135149
throw new JobException("Can not get Parsed plan");
136150
}
137151
this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, runningOffset);
138-
this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + getTaskId()));
152+
this.taskCommand.setLabelName(Optional.of(labelName));
139153
this.stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
140154
}
141155

142156
private void run() throws JobException {
157+
StreamingInsertJob job =
158+
(StreamingInsertJob) Env.getCurrentEnv().getJobManager().getJob(getJobId());
159+
StreamingInsertTask runningStreamTask = job.getRunningStreamTask();
160+
log.info("current running stream task id is {} for job id {}",
161+
runningStreamTask == null ? -1 : runningStreamTask.getTaskId(), getJobId());
162+
if (isCanceled.get()) {
163+
log.info("task has been canceled, task id is {}", getTaskId());
164+
return;
165+
}
166+
log.info("start to run streaming insert task, label {}, offset is {}, filepath {}",
167+
labelName, runningOffset.toString(), ((S3Offset) runningOffset).getFileLists());
143168
String errMsg = null;
144-
int retry = 0;
145-
while (retry <= MAX_RETRY) {
146-
try {
147-
if (isCanceled.get()) {
148-
log.info("task has been canceled, task id is {}", getTaskId());
149-
return;
150-
}
151-
taskCommand.run(ctx, stmtExecutor);
152-
if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) {
153-
return;
154-
} else {
155-
errMsg = ctx.getState().getErrorMessage();
156-
}
157-
log.error(
158-
"streaming insert failed with {}, reason {}, to retry",
159-
taskCommand.getLabelName(),
160-
errMsg);
161-
if (retry == MAX_RETRY) {
162-
errMsg = "reached max retry times, failed with " + errMsg;
163-
}
164-
} catch (Exception e) {
165-
log.warn("execute insert task error, label is {},offset is {}", taskCommand.getLabelName(),
166-
runningOffset.toString(), e);
167-
errMsg = Util.getRootCauseMessage(e);
169+
try {
170+
taskCommand.run(ctx, stmtExecutor);
171+
if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) {
172+
return;
173+
} else {
174+
errMsg = ctx.getState().getErrorMessage();
168175
}
169-
retry++;
176+
throw new JobException(errMsg);
177+
} catch (Exception e) {
178+
log.warn("execute insert task error, label is {},offset is {}", taskCommand.getLabelName(),
179+
runningOffset.toString(), e);
180+
throw new JobException(Util.getRootCauseMessage(e));
170181
}
171-
log.error("streaming insert task failed, job id is {}, task id is {}, offset is {}, errMsg is {}",
172-
getJobId(), getTaskId(), runningOffset.toString(), errMsg);
173-
throw new JobException(errMsg);
174182
}
175183

176184
public boolean onSuccess() throws JobException {
@@ -218,6 +226,8 @@ public void cancel(boolean needWaitCancelComplete) {
218226
}
219227
isCanceled.getAndSet(true);
220228
if (null != stmtExecutor) {
229+
log.info("cancelling streaming insert task, job id is {}, task id is {}",
230+
getJobId(), getTaskId());
221231
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "streaming insert task cancelled"),
222232
needWaitCancelComplete);
223233
}

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@
3030
import org.apache.doris.thrift.TCell;
3131
import org.apache.doris.thrift.TRow;
3232

33+
import lombok.extern.log4j.Log4j2;
3334
import org.apache.commons.lang3.StringUtils;
3435

3536
import java.util.Arrays;
3637
import java.util.List;
3738

39+
@Log4j2
3840
public class StreamingJobSchedulerTask extends AbstractTask {
3941
private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
4042
private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
@@ -108,6 +110,7 @@ protected void closeOrReleaseResources() {
108110

109111
@Override
110112
protected void executeCancelLogic(boolean needWaitCancelComplete) throws Exception {
113+
log.info("cancelling streaming insert job scheduler task for job id {}", streamingInsertJob.getJobId());
111114
if (streamingInsertJob.getRunningStreamTask() != null) {
112115
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
113116
}
@@ -119,6 +122,10 @@ public TRow getTvfInfo(String jobName) {
119122
if (runningTask == null) {
120123
return null;
121124
}
125+
if (!streamingInsertJob.needScheduleTask()) {
126+
//todo: should list history task
127+
return null;
128+
}
122129
TRow trow = new TRow();
123130
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
124131
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getJobId())));

fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.doris.nereids.trees.expressions.Expression;
5252
import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
5353
import org.apache.doris.qe.ConnectContext;
54+
import org.apache.doris.rpc.RpcException;
5455

5556
import com.google.common.collect.Lists;
5657
import lombok.Getter;
@@ -241,25 +242,25 @@ public void dropJobInternal(T job, boolean isReplay) throws JobException {
241242
}
242243

243244
private void deleteStremingJob(AbstractJob<?, C> job) throws JobException {
244-
boolean isStreamingJob = job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING);
245-
if (!(Config.isCloudMode() && isStreamingJob)) {
245+
if (!(Config.isCloudMode() && job instanceof StreamingInsertJob)) {
246246
return;
247247
}
248+
StreamingInsertJob streamingJob = (StreamingInsertJob) job;
249+
Cloud.DeleteStreamingJobResponse resp = null;
248250
try {
249-
long dbId = Env.getCurrentInternalCatalog().getDbOrDdlException(job.getCurrentDbName()).getId();
250251
Cloud.DeleteStreamingJobRequest req = Cloud.DeleteStreamingJobRequest.newBuilder()
251252
.setCloudUniqueId(Config.cloud_unique_id)
252-
.setDbId(dbId)
253+
.setDbId(streamingJob.getDbId())
253254
.setJobId(job.getJobId())
254255
.build();
255-
Cloud.DeleteStreamingJobResponse resp = MetaServiceProxy.getInstance().deleteStreamingJob(req);
256+
resp = MetaServiceProxy.getInstance().deleteStreamingJob(req);
256257
if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
257-
throw new JobException("deleteJobKey failed for jobId={}, dbId={}, status={}",
258-
job.getJobId(), dbId, resp.getStatus());
258+
log.warn("failed to delete streaming job, response: {}", resp);
259+
throw new JobException("deleteJobKey failed for jobId=%s, dbId=%s, status=%s",
260+
job.getJobId(), job.getJobId(), resp.getStatus());
259261
}
260-
} catch (Exception e) {
261-
throw new JobException("deleteJobKey exception for jobId={}, dbName={}",
262-
job.getJobId(), job.getCurrentDbName(), e);
262+
} catch (RpcException e) {
263+
log.warn("failed to delete streaming job {}", resp, e);
263264
}
264265
}
265266

0 commit comments

Comments
 (0)