From b72249c5d244c377691e3036b705d96f4a760c14 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 14 Aug 2024 18:52:49 +0800 Subject: [PATCH] add: phaker -> values test --- build.sbt | 4 ++- src/test/scala/PhakerTest.scala | 61 +++++++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 15906bb..e878c24 100644 --- a/build.sbt +++ b/build.sbt @@ -13,5 +13,7 @@ libraryDependencies ++= Seq( "org.apache.flink" % "flink-cdc-runtime" % flinkCdcVersion, "org.scalatest" %% "scalatest" % "3.2.19" % Test, "org.apache.flink" % "flink-clients" % flinkVersion % Test, - "org.apache.flink" % "flink-streaming-java" % flinkVersion % Test + "org.apache.flink" % "flink-streaming-java" % flinkVersion % Test, + "org.apache.flink" % "flink-cdc-composer" % flinkCdcVersion % Test, + "org.apache.flink" % "flink-cdc-pipeline-connector-values" % flinkCdcVersion % Test ) diff --git a/src/test/scala/PhakerTest.scala b/src/test/scala/PhakerTest.scala index 868bc7a..c16b7ed 100644 --- a/src/test/scala/PhakerTest.scala +++ b/src/test/scala/PhakerTest.scala @@ -1,23 +1,80 @@ package io.github.yuxiqian.phaker +import factory.PhakerDataFactory import source.PhakerSourceFunction import org.apache.flink.cdc.common.event.TableId +import org.apache.flink.cdc.composer.definition.{SinkDef, SourceDef} +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory +import org.apache.flink.cdc.connectors.values.sink.{ValuesDataSink, ValuesDataSinkOptions} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.scalatest.funsuite.AnyFunSuite class PhakerTest extends AnyFunSuite { - test("Phaker test example") { + import org.apache.flink.cdc.common.configuration.Configuration + import org.apache.flink.cdc.common.pipeline.PipelineOptions + import org.apache.flink.cdc.composer.definition.PipelineDef + + import java.util.Collections + + test("Phaker source test") { val source = new PhakerSourceFunction( TableId.tableId("default_namespace", "default_schema", "default_table"), true, 17, - 1, + 17, 1000 ) val env = StreamExecutionEnvironment.getExecutionEnvironment env.addSource(source).print().setParallelism(1) env.execute("Let's Test Phaker Source...") } + + test("Phaker to Values test") { + import source.PhakerDataSourceOptions + + import org.apache.flink.cdc.composer.definition.{RouteDef, TransformDef} + + val composer = FlinkPipelineComposer.ofMiniCluster + + // Setup value source + val sourceConfig = new Configuration + sourceConfig + .set(PhakerDataSourceOptions.NAMESPACE_NAME, "default_namespace") + .set(PhakerDataSourceOptions.SCHEMA_NAME, "default_schema") + .set(PhakerDataSourceOptions.TABLE_NAME, "default_table") + .set[java.lang.Integer](PhakerDataSourceOptions.BATCH_COUNT, 1) + .set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50) + .set[java.lang.Integer](PhakerDataSourceOptions.SLEEP_TIME, 1000) + + val sourceDef = + new SourceDef(PhakerDataFactory.IDENTIFIER, "Value Source", sourceConfig) + + // Setup value sink + val sinkConfig = new Configuration + sinkConfig.set( + ValuesDataSinkOptions.SINK_API, + ValuesDataSink.SinkApi.SINK_V2 + ) + val sinkDef = + new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig) + + // Setup pipeline + val pipelineConfig = new Configuration + pipelineConfig + .set[java.lang.Integer](PipelineOptions.PIPELINE_PARALLELISM, 1) + val pipelineDef = new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList[RouteDef], + Collections.emptyList[TransformDef], + pipelineConfig + ) + + // Execute the pipeline + val execution = composer.compose(pipelineDef) + execution.execute + } }