From 1fc0449faf5a067047167be9f4eee02ee4d12049 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Fri, 17 Jan 2025 11:09:58 +0800 Subject: [PATCH 1/3] [hotfix][build] Add release-3.3 docs --- docs/config.toml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/config.toml b/docs/config.toml index e3f39d30d17..40ebbf0c357 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -baseURL = '//nightlies.apache.org/flink/flink-cdc-docs-master' +baseURL = '//nightlies.apache.org/flink/flink-cdc-docs-release-3.3' languageCode = 'en-us' title = 'Apache Flink CDC' enableGitInfo = false @@ -24,7 +24,7 @@ pygmentsUseClasses = true [params] # Flag whether this is a stable version or not. # Used for the quickstart page. - IsStable = false + IsStable = true # Flag to indicate whether an outdated warning should be shown. ShowOutDatedWarning = false @@ -34,14 +34,14 @@ pygmentsUseClasses = true # where we change the version for the complete docs when forking of a release # branch etc. # The full version string as referenced in Maven (e.g. 1.2.1) - Version = "3.3-SNAPSHOT" + Version = "3.3.0" # For stable releases, leave the bugfix version out (e.g. 1.2). For snapshot # release this should be the same as the regular version - VersionTitle = "3.3-SNAPSHOT" + VersionTitle = "3.3" # The branch for this version of Apache Flink CDC - Branch = "master" + Branch = "release-3.3" # The GitHub repository for Apache Flink CDC Repo = "//github.com/apache/flink-cdc" @@ -54,7 +54,7 @@ pygmentsUseClasses = true # of the menu MenuLinks = [ ["Project Homepage", "//flink.apache.org"], - ["JavaDocs", "//nightlies.apache.org/flink/flink-cdc-docs-master/api/java/"], + ["JavaDocs", "//nightlies.apache.org/flink/flink-cdc-docs-release-3.3/api/java/"], ] PreviousDocs = [ From cb9eba616a38ea6a4d3e886eca78d8275e548b43 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Wed, 22 Jan 2025 17:53:14 +0800 Subject: [PATCH 2/3] [hotfix][docs] Fix dead links in Doris (#3878) --- docs/content.zh/docs/connectors/pipeline-connectors/doris.md | 4 ++-- docs/content/docs/connectors/pipeline-connectors/doris.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md index 307abc13817..4c442d814ee 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md @@ -169,7 +169,7 @@ pipeline: String StreamLoad的参数。 For example: sink.properties.strict_mode: true. - 查看更多关于 StreamLoad 的属性 + 查看更多关于 StreamLoad 的属性 @@ -179,7 +179,7 @@ pipeline: String 创建表的Properties配置。 For example: table.create.properties.replication_num: 1. - 查看更多关于 Doris Table 的属性 + 查看更多关于 Doris Table 的属性 diff --git a/docs/content/docs/connectors/pipeline-connectors/doris.md b/docs/content/docs/connectors/pipeline-connectors/doris.md index cee412c16e8..ffa769da46f 100644 --- a/docs/content/docs/connectors/pipeline-connectors/doris.md +++ b/docs/content/docs/connectors/pipeline-connectors/doris.md @@ -169,7 +169,7 @@ pipeline: String Parameters of StreamLoad. For example: sink.properties.strict_mode: true. - See more about StreamLoad Properties + See more about StreamLoad Properties @@ -179,7 +179,7 @@ pipeline: String Create the Properties configuration of the table. For example: table.create.properties.replication_num: 1. - See more about Doris Table Properties + See more about Doris Table Properties From 8664b9793b56e716e96b176e7c8264fa6008404f Mon Sep 17 00:00:00 2001 From: hashmapybx <15868861416@163.com> Date: Thu, 23 Jan 2025 15:45:33 +0800 Subject: [PATCH 3/3] mvn package happen error,by plugin solvtion --- .../base/dialect/JdbcDataSourceDialect.java | 4 +- .../cdc/debezium/DebeziumSourceFunction.java | 35 ++++--------- .../oracle/logminer/LogMinerAdapter.java | 51 +++++++++++++++++++ .../LogMinerStreamingChangeEventSource.java | 7 +-- .../cdc/connectors/oracle/OracleSource.java | 6 ++- .../connectors/oracle/OracleValidator.java | 5 +- .../oracle/table/OracleTableSource.java | 31 +++++++++++ pom.xml | 10 ++++ 8 files changed, 117 insertions(+), 32 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java index 136e73ddab4..4e83e74dff7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java @@ -37,11 +37,11 @@ @Experimental public interface JdbcDataSourceDialect extends DataSourceDialect { - /** Discovers the list of table to capture. */ + /** Discovers the list of table to capture. 发现需要捕获的表的lie*/ @Override List discoverDataCollections(JdbcSourceConfig sourceConfig); - /** Discovers the captured tables' schema by {@link SourceConfig}. */ + /** Discovers the captured tables' schema by {@link SourceConfig}. 发现表的schema信息*/ @Override Map discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java index bdc985b715d..57c16359495 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java @@ -17,6 +17,13 @@ package org.apache.flink.cdc.debezium; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.embedded.Connect; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.heartbeat.Heartbeat; +import org.apache.commons.collections.map.LinkedMap; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -27,50 +34,28 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.common.annotation.VisibleForTesting; -import org.apache.flink.cdc.debezium.internal.DebeziumChangeConsumer; -import org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher; -import org.apache.flink.cdc.debezium.internal.DebeziumOffset; -import org.apache.flink.cdc.debezium.internal.DebeziumOffsetSerializer; -import org.apache.flink.cdc.debezium.internal.FlinkDatabaseHistory; -import org.apache.flink.cdc.debezium.internal.FlinkDatabaseSchemaHistory; -import org.apache.flink.cdc.debezium.internal.FlinkOffsetBackingStore; -import org.apache.flink.cdc.debezium.internal.Handover; -import org.apache.flink.cdc.debezium.internal.SchemaRecord; +import org.apache.flink.cdc.debezium.internal.*; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; - -import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder; - -import io.debezium.document.DocumentReader; -import io.debezium.document.DocumentWriter; -import io.debezium.embedded.Connect; -import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.spi.OffsetCommitPolicy; -import io.debezium.heartbeat.Heartbeat; -import org.apache.commons.collections.map.LinkedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static org.apache.flink.cdc.debezium.internal.Handover.ClosedException.isGentlyClosedException; import static org.apache.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java index 481051e8a58..b24c0da9b9d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java @@ -63,6 +63,7 @@ * *

Line 356: Replace < condition with <= to be able to catch ongoing transactions during snapshot * if current SCN points to START/INSERT/DELETE/UPDATE event. + * 如果当前 SCN 指向 START/INSERT/DELETE/UPDATE 事件,则将 < 条件替换为 <=,以便能够在快照期间捕获正在进行的事务。 */ public class LogMinerAdapter extends AbstractStreamingAdapter { @@ -98,6 +99,18 @@ public OffsetContext.Loader getOffsetContextLoader() { return new LogMinerOracleOffsetContextLoader(connectorConfig); } + /** + * 从 DB 日志(例如 MySQL 的 binlog 或类似日志)发出事件的变更事件源 + * @param connection + * @param dispatcher + * @param errorHandler + * @param clock + * @param schema + * @param taskContext + * @param jdbcConfig + * @param streamingMetrics + * @return + */ @Override public StreamingChangeEventSource getSource( OracleConnection connection, @@ -119,6 +132,14 @@ public StreamingChangeEventSource getSourc streamingMetrics); } + /** + * 获取快照的偏移量 + * @param ctx the relational snapshot context, should never be {@code null} + * @param connectorConfig the connector configuration, should never be {@code null} + * @param connection the database connection, should never be {@code null} + * @return + * @throws SQLException + */ @Override public OracleOffsetContext determineSnapshotOffset( RelationalSnapshotContext ctx, @@ -161,6 +182,13 @@ public OracleOffsetContext determineSnapshotOffset( } } + /** + * 获取当前的SCN(System Change Number) 是在某个时间点定义数据库已提交版本的时间戳标记 + * @param latestTableDdlScn + * @param connection + * @return + * @throws SQLException + */ private Optional getCurrentScn(Scn latestTableDdlScn, OracleConnection connection) throws SQLException { final String query = "SELECT CURRENT_SCN FROM V$DATABASE"; @@ -175,6 +203,15 @@ private Optional getCurrentScn(Scn latestTableDdlScn, OracleConnection conn return Optional.ofNullable(currentScn); } + /** + * 获取等待的事务 + * @param latestTableDdlScn + * @param connection + * @param transactions + * @param transactionTableName + * @return + * @throws SQLException + */ private Optional getPendingTransactions( Scn latestTableDdlScn, OracleConnection connection, @@ -285,6 +322,12 @@ private OracleOffsetContext determineSnapshotOffset( .build(); } + /** + * 将logs添加到session中 + * @param logs + * @param connection + * @throws SQLException + */ private void addLogsToSession(List logs, OracleConnection connection) throws SQLException { for (LogFile logFile : logs) { @@ -294,6 +337,11 @@ private void addLogsToSession(List logs, OracleConnection connection) } } + /** + * 启动一个事务会话 + * @param connection + * @throws SQLException + */ private void startSession(OracleConnection connection) throws SQLException { // We explicitly use the ONLINE data dictionary mode here. // Since we are only concerned about non-SQL columns, it is safe to always use this mode @@ -319,6 +367,9 @@ private void stopSession(OracleConnection connection) throws SQLException { } } + /** + * + */ private Scn getOldestScnAvailableInLogs( OracleConnectorConfig config, OracleConnection connection) throws SQLException { final Duration archiveLogRetention = config.getLogMiningArchiveLogRetention(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index 5993cadee71..ea784e87878 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -69,6 +69,7 @@ * *

Diff: Make createProcessor method as protected to produce a LogMinerEventProcessor with * enhanced processRow method to distinguish whether is bounded. + * */ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventSource { @@ -131,7 +132,7 @@ public LogMinerStreamingChangeEventSource( /** * This is the loop to get changes from LogMiner. - * + * 从logminer获取日志数据 * @param context change event source context */ @Override @@ -144,7 +145,7 @@ public void execute( return; } try { - // We explicitly expect auto-commit to be disabled + // We explicitly expect auto-commit to be disabled 取消数据库事务的自动提交 jdbcConnection.setAutoCommit(false); startScn = offsetContext.getScn(); @@ -617,7 +618,7 @@ private void pauseBetweenMiningSessions() throws InterruptedException { /** * Sets the NLS parameters for the mining session. - * + * 设置LNLS参数 * @param connection database connection, should not be {@code null} * @throws SQLException if a database exception occurred */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleSource.java index cb56a5165f0..5db229b4884 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleSource.java @@ -31,16 +31,18 @@ /** * A builder to build a SourceFunction which can read snapshot and continue to consume log miner. + * */ public class OracleSource { + private static final String DATABASE_SERVER_NAME = "oracle_logminer"; public static Builder builder() { return new Builder<>(); } - /** Builder class of {@link OracleSource}. */ + /** Builder class of {@link OracleSource}. 构件类*/ public static class Builder { private Integer port = 1521; // default 1521 port @@ -136,6 +138,7 @@ public Builder startupOptions(StartupOptions startupOptions) { return this; } + // 构件源处理oracle数据的日志数据 public DebeziumSourceFunction build() { Properties props = new Properties(); props.setProperty("connector.class", OracleConnector.class.getCanonicalName()); @@ -195,6 +198,7 @@ public DebeziumSourceFunction build() { } return new DebeziumSourceFunction<>( + //对于传入的数据库配置参数进行校验 deserializer, props, null, new OracleValidator(props)); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleValidator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleValidator.java index 05c3bee1b4b..4ddbc8250e7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleValidator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/OracleValidator.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Properties; -/** Validates the version of the database connecting to. */ +/** Validates the version of the database connecting to.验证Oracle数据库的版本 */ public class OracleValidator implements Validator { private static final long serialVersionUID = 1L; @@ -43,6 +43,7 @@ public OracleValidator(Properties properties) { this.properties = properties; } + //判断数据库是否满足处理数据的版本要求 @Override public void validate() { try (Connection connection = openConnection(properties)) { @@ -62,11 +63,13 @@ public void validate() { } } + //建立数据库连接 public static Connection openConnection(Properties properties) throws SQLException { DriverManager.registerDriver(new oracle.jdbc.OracleDriver()); String url = OracleJdbcUrlUtils.getConnectionUrlWithSid(properties); String userName = properties.getProperty("database.user"); String userpwd = properties.getProperty("database.password"); + System.out.println("========数据库连接==="); return DriverManager.getConnection(url, userName, userpwd); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java index 5903ce41e9f..d3230d8a50f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java @@ -53,6 +53,7 @@ /** * A {@link DynamicTableSource} that describes how to create a Oracle redo log from a logical * description. + * 描述从逻辑构件redo log */ public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata { @@ -91,6 +92,33 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada /** Metadata that is appended at the end of a physical source row. */ protected List metadataKeys; + /** + * 构造函数 + * @param physicalSchema + * @param url + * @param port + * @param hostname + * @param database + * @param tableName + * @param schemaName + * @param username + * @param password + * @param dbzProperties + * @param startupOptions + * @param enableParallelRead + * @param splitSize + * @param splitMetaGroupSize + * @param fetchSize + * @param connectTimeout + * @param connectMaxRetries + * @param connectionPoolSize + * @param distributionFactorUpper + * @param distributionFactorLower + * @param chunkKeyColumn + * @param closeIdleReaders + * @param skipSnapshotBackfill + * @param scanNewlyAddedTableEnabled + */ public OracleTableSource( ResolvedSchema physicalSchema, @Nullable String url, @@ -144,6 +172,9 @@ public OracleTableSource( this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; } + /** + * 获取日志变更模式 + */ @Override public ChangelogMode getChangelogMode() { return ChangelogMode.all(); diff --git a/pom.xml b/pom.xml index a3b6b0c4b84..f1658875f16 100644 --- a/pom.xml +++ b/pom.xml @@ -686,6 +686,16 @@ limitations under the License. ${maven.compiler.target} + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + true + + +