From 75f786f21f803a9d391a15ca31d5c251c4228053 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 15 Aug 2024 16:43:46 +0800 Subject: [PATCH] fix: limit rate by recordPerSec --- README.md | 3 +- .../scala/factory/PhakerDataFactory.scala | 6 +- src/main/scala/source/PhakerDataSource.scala | 16 +- .../source/PhakerDataSourceOptions.scala | 14 +- .../scala/source/PhakerSourceFunction.scala | 153 ++---------------- .../scala/source/PhakerSourceGenerator.scala | 141 ++++++++++++++++ src/test/scala/PhakerTest.scala | 31 ++-- 7 files changed, 182 insertions(+), 182 deletions(-) create mode 100644 src/main/scala/source/PhakerSourceGenerator.scala diff --git a/README.md b/README.md index 7a64fd0..a632876 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,7 @@ source: rejected.types: BinaryType,VarBinaryType # Exclude some data types if downstream could not handle them schema.evolve: true # Generate schema evolution events, too max.column.count: 50 # limit maximum column count - batch.count: 17 # how many data records should source emits in each batch - sleep.time: 1000 # sleep duration (in ms) between two consecutive batches + records.per.second: 17 # maximum records emitted each second ``` ## example diff --git a/src/main/scala/factory/PhakerDataFactory.scala b/src/main/scala/factory/PhakerDataFactory.scala index 53eadbd..df07862 100644 --- a/src/main/scala/factory/PhakerDataFactory.scala +++ b/src/main/scala/factory/PhakerDataFactory.scala @@ -26,8 +26,7 @@ class PhakerDataFactory extends DataSourceFactory { conf.get(REJECTED_TYPES).split(',').toSet, conf.get(SCHEMA_EVOLVE), conf.get(MAX_COLUMN_COUNT), - conf.get(BATCH_COUNT), - conf.get(SLEEP_TIME) + conf.get(RECORDS_PER_SECOND) ) } @@ -42,8 +41,7 @@ class PhakerDataFactory extends DataSourceFactory { REJECTED_TYPES, SCHEMA_EVOLVE, MAX_COLUMN_COUNT, - BATCH_COUNT, - SLEEP_TIME + RECORDS_PER_SECOND ).asJava } } diff --git a/src/main/scala/source/PhakerDataSource.scala b/src/main/scala/source/PhakerDataSource.scala index 26f86f2..8489c1c 100644 --- a/src/main/scala/source/PhakerDataSource.scala +++ b/src/main/scala/source/PhakerDataSource.scala @@ -9,18 +9,18 @@ class PhakerDataSource( rejectedTypes: Set[String], schemaEvolve: Boolean, maxColumnCount: Int, - batchCount: Int, - sleepTime: Int + recordsPerSecond: Int ) extends DataSource { override def getEventSourceProvider: EventSourceProvider = { FlinkSourceFunctionProvider.of( new PhakerSourceFunction( - tableId, - rejectedTypes, - schemaEvolve, - maxColumnCount, - batchCount, - sleepTime + new PhakerSourceGenerator( + tableId, + rejectedTypes, + schemaEvolve, + maxColumnCount + ), + recordsPerSecond ) ) } diff --git a/src/main/scala/source/PhakerDataSourceOptions.scala b/src/main/scala/source/PhakerDataSourceOptions.scala index 3e381a2..c577bca 100644 --- a/src/main/scala/source/PhakerDataSourceOptions.scala +++ b/src/main/scala/source/PhakerDataSourceOptions.scala @@ -34,17 +34,11 @@ object PhakerDataSourceOptions { "Max added columns count. No schema evolution events will be generated if this limit has exceeded. Defaults to 50." ) - val BATCH_COUNT: ConfigOption[lang.Integer] = ConfigOptions - .key("batch.count") + val RECORDS_PER_SECOND: ConfigOption[lang.Integer] = ConfigOptions + .key("records.per.second") .intType() - .defaultValue(17) - .withDescription("Data records to be generated per batch. Defaults to 17.") - - val SLEEP_TIME: ConfigOption[lang.Integer] = ConfigOptions - .key("sleep.time") - .intType() - .defaultValue(1000) + .defaultValue(60) .withDescription( - "Sleep time for a while during each batch (in milliseconds). Defaults to 1000." + "Data records to be generated each second. Defaults to 60." ) } diff --git a/src/main/scala/source/PhakerSourceFunction.scala b/src/main/scala/source/PhakerSourceFunction.scala index 1d7b222..a70f54f 100644 --- a/src/main/scala/source/PhakerSourceFunction.scala +++ b/src/main/scala/source/PhakerSourceFunction.scala @@ -1,149 +1,14 @@ package io.github.yuxiqian.phaker package source -import source.PhakerDatabase.{colCount, idCount} - -import org.apache.flink.cdc.common.event._ -import org.apache.flink.cdc.common.schema.Column -import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator -import org.apache.flink.streaming.api.functions.source.SourceFunction - -import java.util +import org.apache.flink.cdc.common.event.Event +import org.apache.flink.streaming.api.functions.source.datagen.{DataGenerator, DataGeneratorSource} class PhakerSourceFunction( - tableId: TableId, - rejectedTypes: Set[String], - schemaEvolve: Boolean, - maxColumnCount: Int, - batchCount: Int, - sleepTime: Int -) extends SourceFunction[Event] { - - private type Context = SourceFunction.SourceContext[Event] - - private var isRunning = true - - override def run(ctx: Context): Unit = { - - ctx.collect( - new CreateTableEvent( - tableId, - PhakerDatabase.genSchema - ) - ) - - while (isRunning) { - PhakerDatabase.synchronized { - println("Emitting insert events...") - emitInsertEvents(ctx, batchCount) - emitSchemaEvolutionEvents(ctx) - - println("Emitting update events...") - emitUpdateEvents(ctx, batchCount) - emitSchemaEvolutionEvents(ctx) - - println("Emitting delete events...") - emitDeleteEvents(ctx, batchCount) - emitSchemaEvolutionEvents(ctx) - } - Thread.sleep(sleepTime) - } - } - - private def emitInsertEvents(ctx: Context, count: Int): Unit = { - for (_ <- 0 until count) { - val insertedData = genRecord() - ctx.collect( - DataChangeEvent.insertEvent(tableId, insertedData) - ) - } - } - - private def emitUpdateEvents(ctx: Context, count: Int): Unit = { - for (_ <- 0 until count) { - val updateBeforeData = genRecord() - ctx.collect( - DataChangeEvent.insertEvent(tableId, updateBeforeData) - ) - - idCount.synchronized { - idCount -= 1 - } - - val updateAfterData = genRecord() - ctx.collect( - DataChangeEvent.updateEvent(tableId, updateBeforeData, updateAfterData) - ) - } - } - - private def emitDeleteEvents(ctx: Context, count: Int): Unit = { - for (_ <- 0 until count) { - val deleteBeforeData = genRecord() - ctx.collect( - DataChangeEvent.insertEvent(tableId, deleteBeforeData) - ) - - idCount.synchronized { - idCount -= 1 - } - - ctx.collect( - DataChangeEvent.deleteEvent(tableId, deleteBeforeData) - ) - } - } - - private def genRecord() = { - val generator = new BinaryRecordDataGenerator( - PhakerDatabase.columnList.map(_._2) - ) - val rowData = PhakerDatabase.columnList - .map(col => PhakeDataGenerator.randomData(col._1, col._2)) - - println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}") - generator.generate( - rowData - ) - } - - private def emitSchemaEvolutionEvents(ctx: Context): Unit = { - - if (!schemaEvolve) { return } - if (colCount > maxColumnCount) { - return - } - - println("Emitting schema change events...") - - val addedColumnName = colCount.synchronized { - colCount += 1 - s"column$colCount" - } - val addedColumnType = PhakeDataGenerator.randomType(rejectedTypes) - - PhakerDatabase.columnList.synchronized { - - PhakerDatabase.columnList :+= (addedColumnName, addedColumnType) - ctx.collect( - new AddColumnEvent( - tableId, - util.Arrays.asList( - new AddColumnEvent.ColumnWithPosition( - Column.physicalColumn( - addedColumnName, - addedColumnType - ) - ) - ) - ) - ) - } - - println(s"Done, new schema: ${PhakerDatabase.genSchema}") - } - - override def cancel(): Unit = { - isRunning = false - } -} + generator: DataGenerator[Event], + rowsPerSecond: Long +) extends DataGeneratorSource[Event]( + generator, + rowsPerSecond, + null + ) {} diff --git a/src/main/scala/source/PhakerSourceGenerator.scala b/src/main/scala/source/PhakerSourceGenerator.scala new file mode 100644 index 0000000..84f3ef2 --- /dev/null +++ b/src/main/scala/source/PhakerSourceGenerator.scala @@ -0,0 +1,141 @@ +package io.github.yuxiqian.phaker +package source + +import source.PhakerDatabase.{colCount, idCount} + +import org.apache.flink.cdc.common.event._ +import org.apache.flink.cdc.common.schema.Column +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator +import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator + +import java.util + +class PhakerSourceGenerator( + tableId: TableId, + rejectedTypes: Set[String], + schemaEvolve: Boolean, + maxColumnCount: Int +) extends RandomGenerator[Event] { + + private val cachedEvents: util.List[Event] = { + val cache = new util.ArrayList[Event] + cache.add( + new CreateTableEvent( + tableId, + PhakerDatabase.genSchema + ) + ) + cache + } + + override def next(): Event = { + cachedEvents.synchronized { + if (cachedEvents.isEmpty) { + pushEvents() + } + cachedEvents.remove(0) + } + } + + private def pushEvents(): Unit = { + PhakerDatabase.synchronized { + println("Emitting insert events...") + + { + val insertedData = genRecord() + cachedEvents.add( + DataChangeEvent.insertEvent(tableId, insertedData) + ) + } + + { + val updateBeforeData = genRecord() + cachedEvents.add( + DataChangeEvent.insertEvent(tableId, updateBeforeData) + ) + + idCount.synchronized { + idCount -= 1 + } + + val updateAfterData = genRecord() + cachedEvents.add( + DataChangeEvent.updateEvent( + tableId, + updateBeforeData, + updateAfterData + ) + ) + } + + { + + val deleteBeforeData = genRecord() + cachedEvents.add( + DataChangeEvent.insertEvent(tableId, deleteBeforeData) + ) + + idCount.synchronized { + idCount -= 1 + } + + cachedEvents.add( + DataChangeEvent.deleteEvent(tableId, deleteBeforeData) + ) + } + + { + emitSchemaEvolutionEvents().foreach( + cachedEvents.add + ) + } + } + } + + private def genRecord() = { + val generator = new BinaryRecordDataGenerator( + PhakerDatabase.columnList.map(_._2) + ) + val rowData = PhakerDatabase.columnList + .map(col => PhakeDataGenerator.randomData(col._1, col._2)) + + println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}") + generator.generate( + rowData + ) + } + + private def emitSchemaEvolutionEvents(): Option[Event] = { + + if (!schemaEvolve) { return None } + if (colCount > maxColumnCount) { + return None + } + + println("Emitting schema change events...") + + val addedColumnName = colCount.synchronized { + colCount += 1 + s"column$colCount" + } + val addedColumnType = PhakeDataGenerator.randomType(rejectedTypes) + + PhakerDatabase.columnList.synchronized { + PhakerDatabase.columnList :+= (addedColumnName, addedColumnType) + println(s"Done, new schema: ${PhakerDatabase.genSchema}") + Some( + new AddColumnEvent( + tableId, + util.Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + addedColumnName, + addedColumnType + ) + ) + ) + ) + ) + } + } +} diff --git a/src/test/scala/PhakerTest.scala b/src/test/scala/PhakerTest.scala index ac9e976..5f9f3a5 100644 --- a/src/test/scala/PhakerTest.scala +++ b/src/test/scala/PhakerTest.scala @@ -1,10 +1,12 @@ package io.github.yuxiqian.phaker import factory.PhakerDataFactory -import source.PhakerSourceFunction +import source.{PhakerSourceFunction, PhakerSourceGenerator} +import org.apache.flink.cdc.common.configuration.Configuration import org.apache.flink.cdc.common.event.TableId -import org.apache.flink.cdc.composer.definition.{SinkDef, SourceDef} +import org.apache.flink.cdc.common.pipeline.PipelineOptions +import org.apache.flink.cdc.composer.definition.{PipelineDef, SinkDef, SourceDef} import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory import org.apache.flink.cdc.connectors.values.sink.{ValuesDataSink, ValuesDataSinkOptions} @@ -13,21 +15,20 @@ import org.scalatest.funsuite.AnyFunSuite class PhakerTest extends AnyFunSuite { - import org.apache.flink.cdc.common.configuration.Configuration - import org.apache.flink.cdc.common.pipeline.PipelineOptions - import org.apache.flink.cdc.composer.definition.PipelineDef - import java.util.Collections test("Phaker source test") { + val source = new PhakerSourceFunction( - TableId.tableId("default_namespace", "default_schema", "default_table"), - Set("IntType", "FloatType", "DoubleType"), - true, - 17, - 17, + new PhakerSourceGenerator( + TableId.tableId("default_namespace", "default_schema", "default_table"), + Set("IntType", "FloatType", "DoubleType"), + true, + 17 + ), 1000 ) + val env = StreamExecutionEnvironment.getExecutionEnvironment env.addSource(source).print().setParallelism(1) env.execute("Let's Test Phaker Source...") @@ -43,11 +44,13 @@ class PhakerTest extends AnyFunSuite { // Setup value source val sourceConfig = new Configuration sourceConfig - .set(PhakerDataSourceOptions.TABLE_ID, "default_namespace.default_schema.default_table") + .set( + PhakerDataSourceOptions.TABLE_ID, + "default_namespace.default_schema.default_table" + ) .set(PhakerDataSourceOptions.REJECTED_TYPES, "BinaryType,VarBinaryType") - .set[java.lang.Integer](PhakerDataSourceOptions.BATCH_COUNT, 1) + .set[java.lang.Integer](PhakerDataSourceOptions.RECORDS_PER_SECOND, 1) .set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50) - .set[java.lang.Integer](PhakerDataSourceOptions.SLEEP_TIME, 1000) val sourceDef = new SourceDef(PhakerDataFactory.IDENTIFIER, "Value Source", sourceConfig)