discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
index bdc985b715d..57c16359495 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
@@ -17,6 +17,13 @@
package org.apache.flink.cdc.debezium;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -27,50 +34,28 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
-import org.apache.flink.cdc.debezium.internal.DebeziumChangeConsumer;
-import org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher;
-import org.apache.flink.cdc.debezium.internal.DebeziumOffset;
-import org.apache.flink.cdc.debezium.internal.DebeziumOffsetSerializer;
-import org.apache.flink.cdc.debezium.internal.FlinkDatabaseHistory;
-import org.apache.flink.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
-import org.apache.flink.cdc.debezium.internal.FlinkOffsetBackingStore;
-import org.apache.flink.cdc.debezium.internal.Handover;
-import org.apache.flink.cdc.debezium.internal.SchemaRecord;
+import org.apache.flink.cdc.debezium.internal.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-
-import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import io.debezium.document.DocumentReader;
-import io.debezium.document.DocumentWriter;
-import io.debezium.embedded.Connect;
-import io.debezium.engine.DebeziumEngine;
-import io.debezium.engine.spi.OffsetCommitPolicy;
-import io.debezium.heartbeat.Heartbeat;
-import org.apache.commons.collections.map.LinkedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Properties;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import static org.apache.flink.cdc.debezium.internal.Handover.ClosedException.isGentlyClosedException;
import static org.apache.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java
index 481051e8a58..b24c0da9b9d 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java
@@ -63,6 +63,7 @@
*
* Line 356: Replace < condition with <= to be able to catch ongoing transactions during snapshot
* if current SCN points to START/INSERT/DELETE/UPDATE event.
+ * 如果当前 SCN 指向 START/INSERT/DELETE/UPDATE 事件,则将 < 条件替换为 <=,以便能够在快照期间捕获正在进行的事务。
*/
public class LogMinerAdapter extends AbstractStreamingAdapter {
@@ -98,6 +99,18 @@ public OffsetContext.Loader getOffsetContextLoader() {
return new LogMinerOracleOffsetContextLoader(connectorConfig);
}
+ /**
+ * 从 DB 日志(例如 MySQL 的 binlog 或类似日志)发出事件的变更事件源
+ * @param connection
+ * @param dispatcher
+ * @param errorHandler
+ * @param clock
+ * @param schema
+ * @param taskContext
+ * @param jdbcConfig
+ * @param streamingMetrics
+ * @return
+ */
@Override
public StreamingChangeEventSource getSource(
OracleConnection connection,
@@ -119,6 +132,14 @@ public StreamingChangeEventSource getSourc
streamingMetrics);
}
+ /**
+ * 获取快照的偏移量
+ * @param ctx the relational snapshot context, should never be {@code null}
+ * @param connectorConfig the connector configuration, should never be {@code null}
+ * @param connection the database connection, should never be {@code null}
+ * @return
+ * @throws SQLException
+ */
@Override
public OracleOffsetContext determineSnapshotOffset(
RelationalSnapshotContext ctx,
@@ -161,6 +182,13 @@ public OracleOffsetContext determineSnapshotOffset(
}
}
+ /**
+ * 获取当前的SCN(System Change Number) 是在某个时间点定义数据库已提交版本的时间戳标记
+ * @param latestTableDdlScn
+ * @param connection
+ * @return
+ * @throws SQLException
+ */
private Optional getCurrentScn(Scn latestTableDdlScn, OracleConnection connection)
throws SQLException {
final String query = "SELECT CURRENT_SCN FROM V$DATABASE";
@@ -175,6 +203,15 @@ private Optional getCurrentScn(Scn latestTableDdlScn, OracleConnection conn
return Optional.ofNullable(currentScn);
}
+ /**
+ * 获取等待的事务
+ * @param latestTableDdlScn
+ * @param connection
+ * @param transactions
+ * @param transactionTableName
+ * @return
+ * @throws SQLException
+ */
private Optional getPendingTransactions(
Scn latestTableDdlScn,
OracleConnection connection,
@@ -285,6 +322,12 @@ private OracleOffsetContext determineSnapshotOffset(
.build();
}
+ /**
+ * 将logs添加到session中
+ * @param logs
+ * @param connection
+ * @throws SQLException
+ */
private void addLogsToSession(List logs, OracleConnection connection)
throws SQLException {
for (LogFile logFile : logs) {
@@ -294,6 +337,11 @@ private void addLogsToSession(List logs, OracleConnection connection)
}
}
+ /**
+ * 启动一个事务会话
+ * @param connection
+ * @throws SQLException
+ */
private void startSession(OracleConnection connection) throws SQLException {
// We explicitly use the ONLINE data dictionary mode here.
// Since we are only concerned about non-SQL columns, it is safe to always use this mode
@@ -319,6 +367,9 @@ private void stopSession(OracleConnection connection) throws SQLException {
}
}
+ /**
+ *
+ */
private Scn getOldestScnAvailableInLogs(
OracleConnectorConfig config, OracleConnection connection) throws SQLException {
final Duration archiveLogRetention = config.getLogMiningArchiveLogRetention();
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
index 5993cadee71..ea784e87878 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
@@ -69,6 +69,7 @@
*
* Diff: Make createProcessor method as protected to produce a LogMinerEventProcessor with
* enhanced processRow method to distinguish whether is bounded.
+ *
*/
public class LogMinerStreamingChangeEventSource
implements StreamingChangeEventSource {
@@ -131,7 +132,7 @@ public LogMinerStreamingChangeEventSource(
/**
* This is the loop to get changes from LogMiner.
- *
+ * 从logminer获取日志数据
* @param context change event source context
*/
@Override
@@ -144,7 +145,7 @@ public void execute(
return;
}
try {
- // We explicitly expect auto-commit to be disabled
+ // We explicitly expect auto-commit to be disabled 取消数据库事务的自动提交
jdbcConnection.setAutoCommit(false);
startScn = offsetContext.getScn();
@@ -617,7 +618,7 @@ private void pauseBetweenMiningSessions() throws InterruptedException {
/**
* Sets the NLS parameters for the mining session.
- *
+ * 设置LNLS参数
* @param connection database connection, should not be {@code null}
* @throws SQLException if a database exception occurred
*/
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleSource.java
index cb56a5165f0..5db229b4884 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleSource.java
@@ -31,16 +31,18 @@
/**
* A builder to build a SourceFunction which can read snapshot and continue to consume log miner.
+ *
*/
public class OracleSource {
+
private static final String DATABASE_SERVER_NAME = "oracle_logminer";
public static Builder builder() {
return new Builder<>();
}
- /** Builder class of {@link OracleSource}. */
+ /** Builder class of {@link OracleSource}. 构件类*/
public static class Builder {
private Integer port = 1521; // default 1521 port
@@ -136,6 +138,7 @@ public Builder startupOptions(StartupOptions startupOptions) {
return this;
}
+ // 构件源处理oracle数据的日志数据
public DebeziumSourceFunction build() {
Properties props = new Properties();
props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
@@ -195,6 +198,7 @@ public DebeziumSourceFunction build() {
}
return new DebeziumSourceFunction<>(
+ //对于传入的数据库配置参数进行校验
deserializer, props, null, new OracleValidator(props));
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleValidator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleValidator.java
index 05c3bee1b4b..4ddbc8250e7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleValidator.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleValidator.java
@@ -30,7 +30,7 @@
import java.util.List;
import java.util.Properties;
-/** Validates the version of the database connecting to. */
+/** Validates the version of the database connecting to.验证Oracle数据库的版本 */
public class OracleValidator implements Validator {
private static final long serialVersionUID = 1L;
@@ -43,6 +43,7 @@ public OracleValidator(Properties properties) {
this.properties = properties;
}
+ //判断数据库是否满足处理数据的版本要求
@Override
public void validate() {
try (Connection connection = openConnection(properties)) {
@@ -62,11 +63,13 @@ public void validate() {
}
}
+ //建立数据库连接
public static Connection openConnection(Properties properties) throws SQLException {
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
String url = OracleJdbcUrlUtils.getConnectionUrlWithSid(properties);
String userName = properties.getProperty("database.user");
String userpwd = properties.getProperty("database.password");
+ System.out.println("========数据库连接===");
return DriverManager.getConnection(url, userName, userpwd);
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
index 5903ce41e9f..d3230d8a50f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
@@ -53,6 +53,7 @@
/**
* A {@link DynamicTableSource} that describes how to create a Oracle redo log from a logical
* description.
+ * 描述从逻辑构件redo log
*/
public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
@@ -91,6 +92,33 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
/** Metadata that is appended at the end of a physical source row. */
protected List metadataKeys;
+ /**
+ * 构造函数
+ * @param physicalSchema
+ * @param url
+ * @param port
+ * @param hostname
+ * @param database
+ * @param tableName
+ * @param schemaName
+ * @param username
+ * @param password
+ * @param dbzProperties
+ * @param startupOptions
+ * @param enableParallelRead
+ * @param splitSize
+ * @param splitMetaGroupSize
+ * @param fetchSize
+ * @param connectTimeout
+ * @param connectMaxRetries
+ * @param connectionPoolSize
+ * @param distributionFactorUpper
+ * @param distributionFactorLower
+ * @param chunkKeyColumn
+ * @param closeIdleReaders
+ * @param skipSnapshotBackfill
+ * @param scanNewlyAddedTableEnabled
+ */
public OracleTableSource(
ResolvedSchema physicalSchema,
@Nullable String url,
@@ -144,6 +172,9 @@ public OracleTableSource(
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
}
+ /**
+ * 获取日志变更模式
+ */
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.all();
diff --git a/pom.xml b/pom.xml
index a3b6b0c4b84..f1658875f16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -686,6 +686,16 @@ limitations under the License.
${maven.compiler.target}
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 1.6
+
+ true
+
+
+