Skip to content

IoTV2: Make consensus event retry forever. #15565

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
Expand All @@ -53,7 +54,6 @@
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;

Expand Down Expand Up @@ -146,11 +146,11 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
}
} catch (final Exception e) {
throw new PipeConnectionException(
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
"Failed to transfer tablet insertion event %s, because %s.",
tabletInsertionEvent, e.getMessage()),
e);
Integer.MAX_VALUE);
}
}

Expand All @@ -169,11 +169,11 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc
final long duration = System.nanoTime() - startTime;
pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
} catch (Exception e) {
throw new PipeConnectionException(
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
"Failed to transfer tsfile insertion event %s, because %s.",
tsFileInsertionEvent, e.getMessage()),
e);
Integer.MAX_VALUE);
}
}

Expand Down Expand Up @@ -215,14 +215,14 @@ private void doTransfer() {

tabletBatchBuilder.onSuccess();
} catch (final Exception e) {
throw new PipeConnectionException(
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
getFollowerUrl().getIp(),
getFollowerUrl().getPort(),
TABLET_BATCH_SCENARIO,
e.getMessage()),
e);
Integer.MAX_VALUE);
}
}

Expand Down Expand Up @@ -265,14 +265,14 @@ private void doTransfer(final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
progressIndex,
thisDataNodeId));
} catch (final Exception e) {
throw new PipeConnectionException(
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
getFollowerUrl().getIp(),
getFollowerUrl().getPort(),
DELETION_SCENARIO,
e.getMessage()),
e);
Integer.MAX_VALUE);
}

final TSStatus status = resp.getStatus();
Expand Down Expand Up @@ -344,14 +344,14 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI
thisDataNodeId));
}
} catch (final Exception e) {
throw new PipeConnectionException(
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
getFollowerUrl().getIp(),
getFollowerUrl().getPort(),
TABLET_INSERTION_NODE_SCENARIO,
e.getMessage()),
e);
Integer.MAX_VALUE);
}

final TSStatus status = resp.getStatus();
Expand Down Expand Up @@ -418,14 +418,14 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
thisDataNodeId));
}
} catch (final Exception e) {
throw new PipeConnectionException(
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
getFollowerUrl().getIp(),
getFollowerUrl().getPort(),
TSFILE_SCENARIO,
e.getMessage()),
e);
Integer.MAX_VALUE);
}

final TSStatus status = resp.getStatus();
Expand Down Expand Up @@ -483,10 +483,10 @@ protected void transferFilePieces(
tConsensusGroupId,
thisDataNodeId)));
} catch (Exception e) {
throw new PipeConnectionException(
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
String.format(
"Network error when transfer file %s, because %s.", file, e.getMessage()),
e);
Integer.MAX_VALUE);
}

position += readLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
* under the License.
*/

package org.apache.iotdb.pipe.api.exception;
package org.apache.iotdb.commons.exception.pipe;

public class PipeConsensusRetryWithIncreasingIntervalException extends PipeException {
public class PipeConsensusRetryWithIncreasingIntervalException
extends PipeRuntimeConnectorRetryTimesConfigurableException {

public PipeConsensusRetryWithIncreasingIntervalException(String message) {
super(message);
public PipeConsensusRetryWithIncreasingIntervalException(String message, int retryTimes) {
super(message, retryTimes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -124,8 +123,7 @@ public synchronized void onFailure(final Throwable throwable) {
// Notice that the PipeRuntimeConnectorCriticalException must be thrown here
// because the upper layer relies on this to stop all the related pipe tasks
// Other exceptions may cause the subtask to stop forever and can not be restarted
if (throwable instanceof PipeRuntimeConnectorCriticalException
|| throwable instanceof PipeConsensusRetryWithIncreasingIntervalException) {
if (throwable instanceof PipeRuntimeConnectorCriticalException) {
super.onFailure(throwable);
} else {
// Print stack trace for better debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.iotdb.commons.pipe.agent.task.subtask;

import org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -56,6 +56,20 @@ public synchronized void onFailure(final Throwable throwable) {
// is dropped or the process is running normally.
}

private long getSleepIntervalBasedOnThrowable(final Throwable throwable) {
long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
// if receiver is read-only/internal-error/write-reject, connector will retry will
// power-increasing interval
if (throwable instanceof PipeConsensusRetryWithIncreasingIntervalException) {
if (retryCount.get() >= 5) {
sleepInterval = 1000L * 20;
} else {
sleepInterval = 1000L * retryCount.get() * retryCount.get();
}
}
return sleepInterval;
}

private void onEnrichedEventFailure(final Throwable throwable) {
final int maxRetryTimes =
throwable instanceof PipeRuntimeConnectorRetryTimesConfigurableException
Expand Down Expand Up @@ -85,7 +99,7 @@ private void onEnrichedEventFailure(final Throwable throwable) {
throwable.getMessage(),
throwable);
try {
Thread.sleep(Math.min(1000L * retryCount.get(), 10000));
Thread.sleep(getSleepIntervalBasedOnThrowable(throwable));
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
Expand Down Expand Up @@ -152,17 +166,7 @@ private void onNonEnrichedEventFailure(final Throwable throwable) {
throwable.getMessage(),
throwable);
try {
long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
// if receiver is read-only/internal-error/write-reject, connector will retry will
// power-increasing interval
if (throwable instanceof PipeConsensusRetryWithIncreasingIntervalException) {
if (retryCount.get() >= 5) {
sleepInterval = 1000L * 20;
} else {
sleepInterval = 1000L * retryCount.get() * retryCount.get();
}
}
Thread.sleep(sleepInterval);
Thread.sleep(getSleepIntervalBasedOnThrowable(throwable));
} catch (final InterruptedException e) {
LOGGER.warn(
"Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
package org.apache.iotdb.commons.pipe.receiver;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;

Expand Down Expand Up @@ -95,7 +95,8 @@ public void handle(

if (RetryUtils.needRetryForConsensus(status.getCode())) {
LOGGER.info("IoTConsensusV2: will retry with increasing interval. status: {}", status);
throw new PipeConsensusRetryWithIncreasingIntervalException(exceptionMessage);
throw new PipeConsensusRetryWithIncreasingIntervalException(
exceptionMessage, Integer.MAX_VALUE);
}

switch (status.getCode()) {
Expand Down
Loading