From 722ce2d6ebcf836beada9281a452e75f08ac343f Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 16 Aug 2024 15:52:31 +0800 Subject: [PATCH] fix: non-schema evolve mode --- src/main/scala/source/PhakerDataSource.scala | 12 +++--- .../scala/source/PhakerSourceFunction.scala | 18 +++++++-- .../scala/source/PhakerSourceGenerator.scala | 39 ++++++++++++------- src/test/scala/PhakerTest.scala | 18 ++++----- 4 files changed, 53 insertions(+), 34 deletions(-) diff --git a/src/main/scala/source/PhakerDataSource.scala b/src/main/scala/source/PhakerDataSource.scala index 938b628..461084c 100644 --- a/src/main/scala/source/PhakerDataSource.scala +++ b/src/main/scala/source/PhakerDataSource.scala @@ -15,13 +15,11 @@ class PhakerDataSource( override def getEventSourceProvider: EventSourceProvider = { FlinkSourceFunctionProvider.of( new PhakerSourceFunction( - new PhakerSourceGenerator( - tableId, - rejectedTypes, - schemaEvolve, - generateNonNullColumns, - maxColumnCount - ), + tableId, + rejectedTypes, + schemaEvolve, + generateNonNullColumns, + maxColumnCount, recordsPerSecond ) ) diff --git a/src/main/scala/source/PhakerSourceFunction.scala b/src/main/scala/source/PhakerSourceFunction.scala index a70f54f..c6482cb 100644 --- a/src/main/scala/source/PhakerSourceFunction.scala +++ b/src/main/scala/source/PhakerSourceFunction.scala @@ -1,14 +1,24 @@ package io.github.yuxiqian.phaker package source -import org.apache.flink.cdc.common.event.Event -import org.apache.flink.streaming.api.functions.source.datagen.{DataGenerator, DataGeneratorSource} +import org.apache.flink.cdc.common.event.{Event, TableId} +import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource class PhakerSourceFunction( - generator: DataGenerator[Event], + tableId: TableId, + rejectedTypes: Set[String], + schemaEvolve: Boolean, + generateNonNullColumns: Boolean, + maxColumnCount: Int, rowsPerSecond: Long ) extends DataGeneratorSource[Event]( - generator, + new PhakerSourceGenerator( + tableId, + rejectedTypes, + schemaEvolve, + generateNonNullColumns, + maxColumnCount + ), rowsPerSecond, null ) {} diff --git a/src/main/scala/source/PhakerSourceGenerator.scala b/src/main/scala/source/PhakerSourceGenerator.scala index 228fcab..d10cec9 100644 --- a/src/main/scala/source/PhakerSourceGenerator.scala +++ b/src/main/scala/source/PhakerSourceGenerator.scala @@ -3,9 +3,11 @@ package source import source.PhakerDatabase.{colCount, idCount} +import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.cdc.common.event._ import org.apache.flink.cdc.common.schema.Column import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator +import org.apache.flink.runtime.state.FunctionInitializationContext import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator import java.util @@ -18,22 +20,31 @@ class PhakerSourceGenerator( maxColumnCount: Int ) extends RandomGenerator[Event] { - private val cachedEvents: util.List[Event] = { + private val cachedEvents = new util.ArrayList[Event](); + + override def open( + name: String, + context: FunctionInitializationContext, + runtimeContext: RuntimeContext + ): Unit = { + super.open(name, context, runtimeContext) if (!schemaEvolve) { - PhakerDatabase.columnList ++= - PhakeDataGenerator - .possibleChoices(rejectedTypes) - .zipWithIndex - .map(t => (s"column${t._2}_${t._1.getClass.getSimpleName}", t._1)) + PhakerDatabase.columnList.synchronized { + PhakerDatabase.columnList ++= + PhakeDataGenerator + .possibleChoices(rejectedTypes) + .zipWithIndex + .map(t => + (s"column${t._2 + 1}_${t._1.getClass.getSimpleName}", t._1) + ) + } } - val cache = new util.ArrayList[Event] - cache.add( + cachedEvents.add( new CreateTableEvent( tableId, PhakerDatabase.genSchema ) ) - cache } override def next(): Event = { @@ -47,8 +58,6 @@ class PhakerSourceGenerator( private def pushEvents(): Unit = { PhakerDatabase.synchronized { - println("Emitting insert events...") - { val insertedData = genRecord() cachedEvents.add( @@ -102,12 +111,16 @@ class PhakerSourceGenerator( private def genRecord() = { val generator = new BinaryRecordDataGenerator( - PhakerDatabase.columnList.map(_._2) + PhakerDatabase.columnList.synchronized { + PhakerDatabase.columnList.map(_._2) + } ) val rowData = PhakerDatabase.columnList .map(col => PhakeDataGenerator.randomData(col._1, col._2)) - println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}") + println( + s"Generated data record (${rowData.length}): ${rowData.mkString("Array(", ", ", ")")}" + ) generator.generate( rowData ) diff --git a/src/test/scala/PhakerTest.scala b/src/test/scala/PhakerTest.scala index e77907e..205f941 100644 --- a/src/test/scala/PhakerTest.scala +++ b/src/test/scala/PhakerTest.scala @@ -1,7 +1,7 @@ package io.github.yuxiqian.phaker import factory.PhakerDataFactory -import source.{PhakerSourceFunction, PhakerSourceGenerator} +import source.PhakerSourceFunction import org.apache.flink.cdc.common.configuration.Configuration import org.apache.flink.cdc.common.event.TableId @@ -20,18 +20,16 @@ class PhakerTest extends AnyFunSuite { test("Phaker source test") { val source = new PhakerSourceFunction( - new PhakerSourceGenerator( - TableId.tableId("default_namespace", "default_schema", "default_table"), - Set("IntType", "FloatType", "DoubleType"), - true, - true, - 17 - ), + TableId.tableId("default_namespace", "default_schema", "default_table"), + Set("IntType", "FloatType", "DoubleType"), + false, + true, + 17, 1 ) val env = StreamExecutionEnvironment.getExecutionEnvironment - env.addSource(source).print().setParallelism(1) + env.addSource(source).setParallelism(1).print().setParallelism(1) env.execute("Let's Test Phaker Source...") } @@ -52,7 +50,7 @@ class PhakerTest extends AnyFunSuite { .set(PhakerDataSourceOptions.REJECTED_TYPES, "BinaryType,VarBinaryType") .set[java.lang.Integer](PhakerDataSourceOptions.RECORDS_PER_SECOND, 1) .set[java.lang.Boolean](PhakerDataSourceOptions.NON_NULL_COLUMNS, true) - .set[java.lang.Boolean](PhakerDataSourceOptions.SCHEMA_EVOLVE, true) + .set[java.lang.Boolean](PhakerDataSourceOptions.SCHEMA_EVOLVE, false) .set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50) val sourceDef =