Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class JdbcSinkOptions extends JdbcCommonOptions {
.withDescription("auto commit");

public static final Option<Integer> MAX_RETRIES =
Options.key("max_retries").intType().defaultValue(0).withDescription("max_retired");
Options.key("max_retries").intType().defaultValue(0).withDescription("max_retries");

public static final Option<String> XA_DATA_SOURCE_CLASS_NAME =
Options.key("xa_data_source_class_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -35,6 +36,7 @@
import java.util.function.Function;

@RequiredArgsConstructor
@Slf4j
public class BufferReducedBatchStatementExecutor
implements JdbcBatchStatementExecutor<SeaTunnelRow> {
@NonNull private final JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor;
Expand Down Expand Up @@ -100,6 +102,8 @@ public void closeStatements() throws SQLException {
if (!buffer.isEmpty()) {
executeBatch();
}
} catch (JdbcConnectorException e) {
log.error("Failed to execute remaining batch", e);
} finally {
upsertExecutor.closeStatements();
deleteExecutor.closeStatements();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -29,6 +31,7 @@
import java.util.function.Function;

@RequiredArgsConstructor
@Slf4j
public class BufferedBatchStatementExecutor implements JdbcBatchStatementExecutor<SeaTunnelRow> {
@NonNull private final JdbcBatchStatementExecutor<SeaTunnelRow> statementExecutor;
@NonNull private final Function<SeaTunnelRow, SeaTunnelRow> valueTransform;
Expand Down Expand Up @@ -61,6 +64,8 @@ public void closeStatements() throws SQLException {
if (!buffer.isEmpty()) {
executeBatch();
}
} catch (JdbcConnectorException e) {
log.error("Failed to execute remaining batch", e);
} finally {
statementExecutor.closeStatements();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;

Expand All @@ -36,6 +37,7 @@
import java.util.function.Function;

@RequiredArgsConstructor
@Slf4j
public class InsertOrUpdateBatchStatementExecutor
implements JdbcBatchStatementExecutor<SeaTunnelRow> {
private final StatementFactory existStmtFactory;
Expand Down Expand Up @@ -121,6 +123,8 @@ public void closeStatements() throws SQLException {
if (!submitted) {
executeBatch();
}
} catch (JdbcConnectorException e) {
log.error("Failed to execute remaining batch", e);
} finally {
for (PreparedStatement statement :
Arrays.asList(existStatement, insertStatement, updateStatement)) {
Expand Down