From 4d9fb3dc94ca37908ed74d96737b9cd02c41dc14 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 14 Aug 2024 20:24:17 +0800 Subject: [PATCH] add: allow rejecting types & minor cleanup --- README.md | 13 ++++--- .../scala/factory/PhakerDataFactory.scala | 17 ++++++--- .../scala/source/PhakeDataGenerator.scala | 4 +-- src/main/scala/source/PhakerDataSource.scala | 10 +++--- .../source/PhakerDataSourceOptions.scala | 29 +++++++-------- src/main/scala/source/PhakerDatabase.scala | 3 +- .../scala/source/PhakerMetadataAccessor.scala | 22 +++++------- .../scala/source/PhakerSourceFunction.scala | 36 ++++++++++--------- src/test/scala/PhakerTest.scala | 6 ++-- 9 files changed, 70 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 5958984..7a64fd0 100644 --- a/README.md +++ b/README.md @@ -9,13 +9,12 @@ ```yaml source: type: phaker - namespace.name: default_namespace - schema.name: default_schema - table.name: table_name - schema.evolve: true # Generate schema evolution events, too - max.column.count: 50 # limit maximum column count - batch.count: 17 # how many data records should source emits in each batch - sleep.time: 1000 # sleep duration (in ms) between two consecutive batches + table.id: default_namespace.default_schema # Fully qualified Table ID + rejected.types: BinaryType,VarBinaryType # Exclude some data types if downstream could not handle them + schema.evolve: true # Generate schema evolution events, too + max.column.count: 50 # limit maximum column count + batch.count: 17 # how many data records should source emits in each batch + sleep.time: 1000 # sleep duration (in ms) between two consecutive batches ``` ## example diff --git a/src/main/scala/factory/PhakerDataFactory.scala b/src/main/scala/factory/PhakerDataFactory.scala index 7e625a4..53eadbd 100644 --- a/src/main/scala/factory/PhakerDataFactory.scala +++ b/src/main/scala/factory/PhakerDataFactory.scala @@ -5,6 +5,7 @@ import source.PhakerDataSource import source.PhakerDataSourceOptions._ import org.apache.flink.cdc.common.configuration.ConfigOption +import org.apache.flink.cdc.common.event.TableId import org.apache.flink.cdc.common.factories.{DataSourceFactory, Factory} import org.apache.flink.cdc.common.source.DataSource @@ -18,11 +19,11 @@ object PhakerDataFactory { class PhakerDataFactory extends DataSourceFactory { override def createDataSource(context: Factory.Context): DataSource = { + val conf = context.getFactoryConfiguration new PhakerDataSource( - conf.get(NAMESPACE_NAME), - conf.get(SCHEMA_NAME), - conf.get(TABLE_NAME), + TableId.parse(conf.get(TABLE_ID)), + conf.get(REJECTED_TYPES).split(',').toSet, conf.get(SCHEMA_EVOLVE), conf.get(MAX_COLUMN_COUNT), conf.get(BATCH_COUNT), @@ -33,10 +34,16 @@ class PhakerDataFactory extends DataSourceFactory { override def identifier(): String = PhakerDataFactory.IDENTIFIER override def requiredOptions(): util.Set[ConfigOption[_]] = { - Set[ConfigOption[_]](NAMESPACE_NAME, SCHEMA_NAME, TABLE_NAME).asJava + Set[ConfigOption[_]](TABLE_ID).asJava } override def optionalOptions(): util.Set[ConfigOption[_]] = { - Set[ConfigOption[_]](SCHEMA_EVOLVE, MAX_COLUMN_COUNT, BATCH_COUNT, SLEEP_TIME).asJava + Set[ConfigOption[_]]( + REJECTED_TYPES, + SCHEMA_EVOLVE, + MAX_COLUMN_COUNT, + BATCH_COUNT, + SLEEP_TIME + ).asJava } } diff --git a/src/main/scala/source/PhakeDataGenerator.scala b/src/main/scala/source/PhakeDataGenerator.scala index bb7f333..4980830 100644 --- a/src/main/scala/source/PhakeDataGenerator.scala +++ b/src/main/scala/source/PhakeDataGenerator.scala @@ -11,7 +11,7 @@ import java.time.{Instant, ZonedDateTime} import scala.util.Random object PhakeDataGenerator { - def randomType: DataType = { + def randomType(rejectedTypes: Set[String]): DataType = { val choices = List( DataTypes.BINARY(17 + Random.nextInt(100)), DataTypes.VARBINARY(17 + Random.nextInt(100)), @@ -29,7 +29,7 @@ object PhakeDataGenerator { DataTypes.TIMESTAMP(Random.nextInt(10)), DataTypes.TIMESTAMP_TZ(Random.nextInt(10)), DataTypes.TIMESTAMP_LTZ(Random.nextInt(10)) - ) + ).filterNot(t => rejectedTypes.contains(t.getClass.getSimpleName)) choices(Random.nextInt(choices.length)) } diff --git a/src/main/scala/source/PhakerDataSource.scala b/src/main/scala/source/PhakerDataSource.scala index 0612ff7..26f86f2 100644 --- a/src/main/scala/source/PhakerDataSource.scala +++ b/src/main/scala/source/PhakerDataSource.scala @@ -5,9 +5,8 @@ import org.apache.flink.cdc.common.event.TableId import org.apache.flink.cdc.common.source.{DataSource, EventSourceProvider, FlinkSourceFunctionProvider, MetadataAccessor} class PhakerDataSource( - namespaceName: String, - schemaName: String, - tableName: String, + tableId: TableId, + rejectedTypes: Set[String], schemaEvolve: Boolean, maxColumnCount: Int, batchCount: Int, @@ -16,7 +15,8 @@ class PhakerDataSource( override def getEventSourceProvider: EventSourceProvider = { FlinkSourceFunctionProvider.of( new PhakerSourceFunction( - TableId.tableId(namespaceName, schemaName, tableName), + tableId, + rejectedTypes, schemaEvolve, maxColumnCount, batchCount, @@ -26,6 +26,6 @@ class PhakerDataSource( } override def getMetadataAccessor: MetadataAccessor = { - new PhakerMetadataAccessor(namespaceName, schemaName, tableName) + new PhakerMetadataAccessor(tableId) } } diff --git a/src/main/scala/source/PhakerDataSourceOptions.scala b/src/main/scala/source/PhakerDataSourceOptions.scala index bfccb2d..3e381a2 100644 --- a/src/main/scala/source/PhakerDataSourceOptions.scala +++ b/src/main/scala/source/PhakerDataSourceOptions.scala @@ -6,23 +6,17 @@ import org.apache.flink.cdc.common.configuration.{ConfigOption, ConfigOptions} import java.lang object PhakerDataSourceOptions { - val NAMESPACE_NAME: ConfigOption[lang.String] = ConfigOptions - .key("namespace.name") + val TABLE_ID: ConfigOption[lang.String] = ConfigOptions + .key("table.id") .stringType() .noDefaultValue() - .withDescription("Namespace name of the simulated table.") + .withDescription("Table ID of the simulated table.") - val SCHEMA_NAME: ConfigOption[lang.String] = ConfigOptions - .key("schema.name") + val REJECTED_TYPES: ConfigOption[lang.String] = ConfigOptions + .key("rejected.types") .stringType() - .noDefaultValue() - .withDescription("Schema name of the simulated table.") - - val TABLE_NAME: ConfigOption[lang.String] = ConfigOptions - .key("table.name") - .stringType() - .noDefaultValue() - .withDescription("Name of the simulated table.") + .defaultValue("") + .withDescription("Unwanted data types (). Separated with comma.") val SCHEMA_EVOLVE: ConfigOption[lang.Boolean] = ConfigOptions .key("schema.evolve") @@ -36,8 +30,9 @@ object PhakerDataSourceOptions { .key("max.column.count") .intType() .defaultValue(50) - .withDescription("Max added columns count. No schema evolution events will be generated if this limit has exceeded. Defaults to 50.") - + .withDescription( + "Max added columns count. No schema evolution events will be generated if this limit has exceeded. Defaults to 50." + ) val BATCH_COUNT: ConfigOption[lang.Integer] = ConfigOptions .key("batch.count") @@ -49,5 +44,7 @@ object PhakerDataSourceOptions { .key("sleep.time") .intType() .defaultValue(1000) - .withDescription("Sleep time for a while during each batch (in milliseconds). Defaults to 1000.") + .withDescription( + "Sleep time for a while during each batch (in milliseconds). Defaults to 1000." + ) } diff --git a/src/main/scala/source/PhakerDatabase.scala b/src/main/scala/source/PhakerDatabase.scala index 089b2ea..901b475 100644 --- a/src/main/scala/source/PhakerDatabase.scala +++ b/src/main/scala/source/PhakerDatabase.scala @@ -4,6 +4,7 @@ package source import org.apache.flink.cdc.common.schema.{Column, Schema} import org.apache.flink.cdc.common.types.{DataType, DataTypes} +import java.util object PhakerDatabase { val primaryKey: String = "id" var columnList: Array[(String, DataType)] = Array( @@ -14,7 +15,7 @@ object PhakerDatabase { def genSchema: Schema = { PhakerDatabase.synchronized { - import java.util + Schema .newBuilder() .setColumns( diff --git a/src/main/scala/source/PhakerMetadataAccessor.scala b/src/main/scala/source/PhakerMetadataAccessor.scala index c7e8992..2049a48 100644 --- a/src/main/scala/source/PhakerMetadataAccessor.scala +++ b/src/main/scala/source/PhakerMetadataAccessor.scala @@ -9,30 +9,24 @@ import java.util import scala.collection.JavaConverters._ class PhakerMetadataAccessor( - namespaceName: String, - schemaName: String, - tableName: String + tableId: TableId ) extends MetadataAccessor { - def apply( - namespaceName: String, - schemaName: String, - tableName: String - ): PhakerMetadataAccessor = - new PhakerMetadataAccessor(namespaceName, schemaName, tableName) - - override def listNamespaces(): util.List[String] = List(namespaceName).asJava + override def listNamespaces(): util.List[String] = List( + tableId.getNamespace + ).asJava override def listSchemas(namespace: String): util.List[String] = { - if (namespace == namespaceName) List(schemaName) else List.empty[String] + if (namespace == tableId.getNamespace) List(tableId.getSchemaName) + else List.empty[String] }.asJava override def listTables( namespace: String, schema: String ): util.List[TableId] = { - if (namespace == namespaceName && schema == schemaName) - List(TableId.tableId(namespaceName, schemaName, tableName)) + if (namespace == tableId.getNamespace && schema == tableId.getSchemaName) + List(tableId) else List.empty[TableId] }.asJava diff --git a/src/main/scala/source/PhakerSourceFunction.scala b/src/main/scala/source/PhakerSourceFunction.scala index 212f208..7375178 100644 --- a/src/main/scala/source/PhakerSourceFunction.scala +++ b/src/main/scala/source/PhakerSourceFunction.scala @@ -1,15 +1,18 @@ package io.github.yuxiqian.phaker package source -import source.PhakerDatabase.idCount +import source.PhakerDatabase.{colCount, idCount} import org.apache.flink.cdc.common.event._ import org.apache.flink.cdc.common.schema.Column import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator import org.apache.flink.streaming.api.functions.source.SourceFunction +import java.util + class PhakerSourceFunction( tableId: TableId, + rejectedTypes: Set[String], schemaEvolve: Boolean, maxColumnCount: Int, batchCount: Int, @@ -54,19 +57,6 @@ class PhakerSourceFunction( } } - private def genRecord() = { - val generator = new BinaryRecordDataGenerator( - PhakerDatabase.columnList.map(_._2).toArray - ) - val rowData = PhakerDatabase.columnList - .map(col => PhakeDataGenerator.randomData(col._1, col._2)) - - println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}") - generator.generate( - rowData - ) - } - private def emitUpdateEvents(ctx: Context, count: Int): Unit = { for (_ <- 0 until count) { val updateBeforeData = genRecord() @@ -85,6 +75,19 @@ class PhakerSourceFunction( } } + private def genRecord() = { + val generator = new BinaryRecordDataGenerator( + PhakerDatabase.columnList.map(_._2).toArray + ) + val rowData = PhakerDatabase.columnList + .map(col => PhakeDataGenerator.randomData(col._1, col._2)) + + println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}") + generator.generate( + rowData + ) + } + private def emitDeleteEvents(ctx: Context, count: Int): Unit = { for (_ <- 0 until count) { val deleteBeforeData = genRecord() @@ -103,7 +106,6 @@ class PhakerSourceFunction( } private def emitSchemaEvolutionEvents(ctx: Context): Unit = { - import source.PhakerDatabase.colCount if (!schemaEvolve) { return } if (colCount > maxColumnCount) { @@ -116,10 +118,10 @@ class PhakerSourceFunction( colCount += 1 s"column$colCount" } - val addedColumnType = PhakeDataGenerator.randomType + val addedColumnType = PhakeDataGenerator.randomType(rejectedTypes) PhakerDatabase.columnList.synchronized { - import java.util + PhakerDatabase.columnList :+= (addedColumnName, addedColumnType) ctx.collect( new AddColumnEvent( diff --git a/src/test/scala/PhakerTest.scala b/src/test/scala/PhakerTest.scala index c16b7ed..ac9e976 100644 --- a/src/test/scala/PhakerTest.scala +++ b/src/test/scala/PhakerTest.scala @@ -22,6 +22,7 @@ class PhakerTest extends AnyFunSuite { test("Phaker source test") { val source = new PhakerSourceFunction( TableId.tableId("default_namespace", "default_schema", "default_table"), + Set("IntType", "FloatType", "DoubleType"), true, 17, 17, @@ -42,9 +43,8 @@ class PhakerTest extends AnyFunSuite { // Setup value source val sourceConfig = new Configuration sourceConfig - .set(PhakerDataSourceOptions.NAMESPACE_NAME, "default_namespace") - .set(PhakerDataSourceOptions.SCHEMA_NAME, "default_schema") - .set(PhakerDataSourceOptions.TABLE_NAME, "default_table") + .set(PhakerDataSourceOptions.TABLE_ID, "default_namespace.default_schema.default_table") + .set(PhakerDataSourceOptions.REJECTED_TYPES, "BinaryType,VarBinaryType") .set[java.lang.Integer](PhakerDataSourceOptions.BATCH_COUNT, 1) .set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50) .set[java.lang.Integer](PhakerDataSourceOptions.SLEEP_TIME, 1000)