Skip to content

Commit efa2f77

Browse files
authored
IoTV2: Make consensus event retry forever. (#15565)
1 parent 4342166 commit efa2f77

File tree

5 files changed

+41
-37
lines changed

5 files changed

+41
-37
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
2828
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
2929
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
30+
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
3031
import org.apache.iotdb.commons.pipe.config.PipeConfig;
3132
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
3233
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
@@ -53,7 +54,6 @@
5354
import org.apache.iotdb.pipe.api.event.Event;
5455
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
5556
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
56-
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
5757
import org.apache.iotdb.pipe.api.exception.PipeException;
5858
import org.apache.iotdb.rpc.TSStatusCode;
5959

@@ -146,11 +146,11 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
146146
pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
147147
}
148148
} catch (final Exception e) {
149-
throw new PipeConnectionException(
149+
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
150150
String.format(
151151
"Failed to transfer tablet insertion event %s, because %s.",
152152
tabletInsertionEvent, e.getMessage()),
153-
e);
153+
Integer.MAX_VALUE);
154154
}
155155
}
156156

@@ -169,11 +169,11 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc
169169
final long duration = System.nanoTime() - startTime;
170170
pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
171171
} catch (Exception e) {
172-
throw new PipeConnectionException(
172+
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
173173
String.format(
174174
"Failed to transfer tsfile insertion event %s, because %s.",
175175
tsFileInsertionEvent, e.getMessage()),
176-
e);
176+
Integer.MAX_VALUE);
177177
}
178178
}
179179

@@ -215,14 +215,14 @@ private void doTransfer() {
215215

216216
tabletBatchBuilder.onSuccess();
217217
} catch (final Exception e) {
218-
throw new PipeConnectionException(
218+
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
219219
String.format(
220220
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
221221
getFollowerUrl().getIp(),
222222
getFollowerUrl().getPort(),
223223
TABLET_BATCH_SCENARIO,
224224
e.getMessage()),
225-
e);
225+
Integer.MAX_VALUE);
226226
}
227227
}
228228

@@ -265,14 +265,14 @@ private void doTransfer(final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
265265
progressIndex,
266266
thisDataNodeId));
267267
} catch (final Exception e) {
268-
throw new PipeConnectionException(
268+
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
269269
String.format(
270270
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
271271
getFollowerUrl().getIp(),
272272
getFollowerUrl().getPort(),
273273
DELETION_SCENARIO,
274274
e.getMessage()),
275-
e);
275+
Integer.MAX_VALUE);
276276
}
277277

278278
final TSStatus status = resp.getStatus();
@@ -344,14 +344,14 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI
344344
thisDataNodeId));
345345
}
346346
} catch (final Exception e) {
347-
throw new PipeConnectionException(
347+
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
348348
String.format(
349349
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
350350
getFollowerUrl().getIp(),
351351
getFollowerUrl().getPort(),
352352
TABLET_INSERTION_NODE_SCENARIO,
353353
e.getMessage()),
354-
e);
354+
Integer.MAX_VALUE);
355355
}
356356

357357
final TSStatus status = resp.getStatus();
@@ -418,14 +418,14 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
418418
thisDataNodeId));
419419
}
420420
} catch (final Exception e) {
421-
throw new PipeConnectionException(
421+
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
422422
String.format(
423423
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
424424
getFollowerUrl().getIp(),
425425
getFollowerUrl().getPort(),
426426
TSFILE_SCENARIO,
427427
e.getMessage()),
428-
e);
428+
Integer.MAX_VALUE);
429429
}
430430

431431
final TSStatus status = resp.getStatus();
@@ -483,10 +483,10 @@ protected void transferFilePieces(
483483
tConsensusGroupId,
484484
thisDataNodeId)));
485485
} catch (Exception e) {
486-
throw new PipeConnectionException(
486+
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
487487
String.format(
488488
"Network error when transfer file %s, because %s.", file, e.getMessage()),
489-
e);
489+
Integer.MAX_VALUE);
490490
}
491491

492492
position += readLength;
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.iotdb.pipe.api.exception;
20+
package org.apache.iotdb.commons.exception.pipe;
2121

22-
public class PipeConsensusRetryWithIncreasingIntervalException extends PipeException {
22+
public class PipeConsensusRetryWithIncreasingIntervalException
23+
extends PipeRuntimeConnectorRetryTimesConfigurableException {
2324

24-
public PipeConsensusRetryWithIncreasingIntervalException(String message) {
25-
super(message);
25+
public PipeConsensusRetryWithIncreasingIntervalException(String message, int retryTimes) {
26+
super(message, retryTimes);
2627
}
2728
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.iotdb.pipe.api.PipeConnector;
2727
import org.apache.iotdb.pipe.api.event.Event;
2828
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
29-
import org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
3029

3130
import com.google.common.util.concurrent.Futures;
3231
import com.google.common.util.concurrent.ListenableFuture;
@@ -124,8 +123,7 @@ public synchronized void onFailure(final Throwable throwable) {
124123
// Notice that the PipeRuntimeConnectorCriticalException must be thrown here
125124
// because the upper layer relies on this to stop all the related pipe tasks
126125
// Other exceptions may cause the subtask to stop forever and can not be restarted
127-
if (throwable instanceof PipeRuntimeConnectorCriticalException
128-
|| throwable instanceof PipeConsensusRetryWithIncreasingIntervalException) {
126+
if (throwable instanceof PipeRuntimeConnectorCriticalException) {
129127
super.onFailure(throwable);
130128
} else {
131129
// Print stack trace for better debugging

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
package org.apache.iotdb.commons.pipe.agent.task.subtask;
2121

22+
import org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
2223
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
2324
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
2425
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
2526
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
26-
import org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
2727

2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
@@ -56,6 +56,20 @@ public synchronized void onFailure(final Throwable throwable) {
5656
// is dropped or the process is running normally.
5757
}
5858

59+
private long getSleepIntervalBasedOnThrowable(final Throwable throwable) {
60+
long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
61+
// if receiver is read-only/internal-error/write-reject, connector will retry with
62+
// power-increasing interval
63+
if (throwable instanceof PipeConsensusRetryWithIncreasingIntervalException) {
64+
if (retryCount.get() >= 5) {
65+
sleepInterval = 1000L * 20;
66+
} else {
67+
sleepInterval = 1000L * retryCount.get() * retryCount.get();
68+
}
69+
}
70+
return sleepInterval;
71+
}
72+
5973
private void onEnrichedEventFailure(final Throwable throwable) {
6074
final int maxRetryTimes =
6175
throwable instanceof PipeRuntimeConnectorRetryTimesConfigurableException
@@ -85,7 +99,7 @@ private void onEnrichedEventFailure(final Throwable throwable) {
8599
throwable.getMessage(),
86100
throwable);
87101
try {
88-
Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
102+
Thread.sleep(getSleepIntervalBasedOnThrowable(throwable));
89103
} catch (final InterruptedException e) {
90104
LOGGER.warn(
91105
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
@@ -152,17 +166,7 @@ private void onNonEnrichedEventFailure(final Throwable throwable) {
152166
throwable.getMessage(),
153167
throwable);
154168
try {
155-
long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
156-
// if receiver is read-only/internal-error/write-reject, connector will retry will
157-
// power-increasing interval
158-
if (throwable instanceof PipeConsensusRetryWithIncreasingIntervalException) {
159-
if (retryCount.get() >= 5) {
160-
sleepInterval = 1000L * 20;
161-
} else {
162-
sleepInterval = 1000L * retryCount.get() * retryCount.get();
163-
}
164-
}
165-
Thread.sleep(sleepInterval);
169+
Thread.sleep(getSleepIntervalBasedOnThrowable(throwable));
166170
} catch (final InterruptedException e) {
167171
LOGGER.warn(
168172
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
package org.apache.iotdb.commons.pipe.receiver;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
2324
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
2425
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
2526
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
2627
import org.apache.iotdb.commons.utils.RetryUtils;
2728
import org.apache.iotdb.pipe.api.event.Event;
28-
import org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
2929
import org.apache.iotdb.pipe.api.exception.PipeException;
3030
import org.apache.iotdb.rpc.TSStatusCode;
3131

@@ -98,7 +98,8 @@ public void handle(
9898

9999
if (RetryUtils.needRetryForConsensus(status.getCode())) {
100100
LOGGER.info("IoTConsensusV2: will retry with increasing interval. status: {}", status);
101-
throw new PipeConsensusRetryWithIncreasingIntervalException(exceptionMessage);
101+
throw new PipeConsensusRetryWithIncreasingIntervalException(
102+
exceptionMessage, Integer.MAX_VALUE);
102103
}
103104

104105
switch (status.getCode()) {

0 commit comments

Comments
 (0)