Skip to content

Commit

Permalink
fix: limit rate by recordPerSec
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 15, 2024
1 parent a2f6dad commit 75f786f
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 182 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ source:
rejected.types: BinaryType,VarBinaryType # Exclude some data types if downstream could not handle them
schema.evolve: true # Generate schema evolution events, too
max.column.count: 50 # limit maximum column count
batch.count: 17 # how many data records should source emits in each batch
sleep.time: 1000 # sleep duration (in ms) between two consecutive batches
records.per.second: 17 # maximum records emitted each second
```
## example
Expand Down
6 changes: 2 additions & 4 deletions src/main/scala/factory/PhakerDataFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ class PhakerDataFactory extends DataSourceFactory {
conf.get(REJECTED_TYPES).split(',').toSet,
conf.get(SCHEMA_EVOLVE),
conf.get(MAX_COLUMN_COUNT),
conf.get(BATCH_COUNT),
conf.get(SLEEP_TIME)
conf.get(RECORDS_PER_SECOND)
)
}

Expand All @@ -42,8 +41,7 @@ class PhakerDataFactory extends DataSourceFactory {
REJECTED_TYPES,
SCHEMA_EVOLVE,
MAX_COLUMN_COUNT,
BATCH_COUNT,
SLEEP_TIME
RECORDS_PER_SECOND
).asJava
}
}
16 changes: 8 additions & 8 deletions src/main/scala/source/PhakerDataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ class PhakerDataSource(
rejectedTypes: Set[String],
schemaEvolve: Boolean,
maxColumnCount: Int,
batchCount: Int,
sleepTime: Int
recordsPerSecond: Int
) extends DataSource {
override def getEventSourceProvider: EventSourceProvider = {
FlinkSourceFunctionProvider.of(
new PhakerSourceFunction(
tableId,
rejectedTypes,
schemaEvolve,
maxColumnCount,
batchCount,
sleepTime
new PhakerSourceGenerator(
tableId,
rejectedTypes,
schemaEvolve,
maxColumnCount
),
recordsPerSecond
)
)
}
Expand Down
14 changes: 4 additions & 10 deletions src/main/scala/source/PhakerDataSourceOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,11 @@ object PhakerDataSourceOptions {
"Max added columns count. No schema evolution events will be generated if this limit has exceeded. Defaults to 50."
)

val BATCH_COUNT: ConfigOption[lang.Integer] = ConfigOptions
.key("batch.count")
val RECORDS_PER_SECOND: ConfigOption[lang.Integer] = ConfigOptions
.key("records.per.second")
.intType()
.defaultValue(17)
.withDescription("Data records to be generated per batch. Defaults to 17.")

val SLEEP_TIME: ConfigOption[lang.Integer] = ConfigOptions
.key("sleep.time")
.intType()
.defaultValue(1000)
.defaultValue(60)
.withDescription(
"Sleep time for a while during each batch (in milliseconds). Defaults to 1000."
"Data records to be generated each second. Defaults to 60."
)
}
153 changes: 9 additions & 144 deletions src/main/scala/source/PhakerSourceFunction.scala
Original file line number Diff line number Diff line change
@@ -1,149 +1,14 @@
package io.github.yuxiqian.phaker
package source

import source.PhakerDatabase.{colCount, idCount}

import org.apache.flink.cdc.common.event._
import org.apache.flink.cdc.common.schema.Column
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator
import org.apache.flink.streaming.api.functions.source.SourceFunction

import java.util
import org.apache.flink.cdc.common.event.Event
import org.apache.flink.streaming.api.functions.source.datagen.{DataGenerator, DataGeneratorSource}

class PhakerSourceFunction(
tableId: TableId,
rejectedTypes: Set[String],
schemaEvolve: Boolean,
maxColumnCount: Int,
batchCount: Int,
sleepTime: Int
) extends SourceFunction[Event] {

private type Context = SourceFunction.SourceContext[Event]

private var isRunning = true

override def run(ctx: Context): Unit = {

ctx.collect(
new CreateTableEvent(
tableId,
PhakerDatabase.genSchema
)
)

while (isRunning) {
PhakerDatabase.synchronized {
println("Emitting insert events...")
emitInsertEvents(ctx, batchCount)
emitSchemaEvolutionEvents(ctx)

println("Emitting update events...")
emitUpdateEvents(ctx, batchCount)
emitSchemaEvolutionEvents(ctx)

println("Emitting delete events...")
emitDeleteEvents(ctx, batchCount)
emitSchemaEvolutionEvents(ctx)
}
Thread.sleep(sleepTime)
}
}

private def emitInsertEvents(ctx: Context, count: Int): Unit = {
for (_ <- 0 until count) {
val insertedData = genRecord()
ctx.collect(
DataChangeEvent.insertEvent(tableId, insertedData)
)
}
}

private def emitUpdateEvents(ctx: Context, count: Int): Unit = {
for (_ <- 0 until count) {
val updateBeforeData = genRecord()
ctx.collect(
DataChangeEvent.insertEvent(tableId, updateBeforeData)
)

idCount.synchronized {
idCount -= 1
}

val updateAfterData = genRecord()
ctx.collect(
DataChangeEvent.updateEvent(tableId, updateBeforeData, updateAfterData)
)
}
}

private def emitDeleteEvents(ctx: Context, count: Int): Unit = {
for (_ <- 0 until count) {
val deleteBeforeData = genRecord()
ctx.collect(
DataChangeEvent.insertEvent(tableId, deleteBeforeData)
)

idCount.synchronized {
idCount -= 1
}

ctx.collect(
DataChangeEvent.deleteEvent(tableId, deleteBeforeData)
)
}
}

private def genRecord() = {
val generator = new BinaryRecordDataGenerator(
PhakerDatabase.columnList.map(_._2)
)
val rowData = PhakerDatabase.columnList
.map(col => PhakeDataGenerator.randomData(col._1, col._2))

println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}")
generator.generate(
rowData
)
}

private def emitSchemaEvolutionEvents(ctx: Context): Unit = {

if (!schemaEvolve) { return }
if (colCount > maxColumnCount) {
return
}

println("Emitting schema change events...")

val addedColumnName = colCount.synchronized {
colCount += 1
s"column$colCount"
}
val addedColumnType = PhakeDataGenerator.randomType(rejectedTypes)

PhakerDatabase.columnList.synchronized {

PhakerDatabase.columnList :+= (addedColumnName, addedColumnType)
ctx.collect(
new AddColumnEvent(
tableId,
util.Arrays.asList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
addedColumnName,
addedColumnType
)
)
)
)
)
}

println(s"Done, new schema: ${PhakerDatabase.genSchema}")
}

override def cancel(): Unit = {
isRunning = false
}
}
generator: DataGenerator[Event],
rowsPerSecond: Long
) extends DataGeneratorSource[Event](
generator,
rowsPerSecond,
null
) {}
141 changes: 141 additions & 0 deletions src/main/scala/source/PhakerSourceGenerator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.github.yuxiqian.phaker
package source

import source.PhakerDatabase.{colCount, idCount}

import org.apache.flink.cdc.common.event._
import org.apache.flink.cdc.common.schema.Column
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator

import java.util

class PhakerSourceGenerator(
tableId: TableId,
rejectedTypes: Set[String],
schemaEvolve: Boolean,
maxColumnCount: Int
) extends RandomGenerator[Event] {

private val cachedEvents: util.List[Event] = {
val cache = new util.ArrayList[Event]
cache.add(
new CreateTableEvent(
tableId,
PhakerDatabase.genSchema
)
)
cache
}

override def next(): Event = {
cachedEvents.synchronized {
if (cachedEvents.isEmpty) {
pushEvents()
}
cachedEvents.remove(0)
}
}

private def pushEvents(): Unit = {
PhakerDatabase.synchronized {
println("Emitting insert events...")

{
val insertedData = genRecord()
cachedEvents.add(
DataChangeEvent.insertEvent(tableId, insertedData)
)
}

{
val updateBeforeData = genRecord()
cachedEvents.add(
DataChangeEvent.insertEvent(tableId, updateBeforeData)
)

idCount.synchronized {
idCount -= 1
}

val updateAfterData = genRecord()
cachedEvents.add(
DataChangeEvent.updateEvent(
tableId,
updateBeforeData,
updateAfterData
)
)
}

{

val deleteBeforeData = genRecord()
cachedEvents.add(
DataChangeEvent.insertEvent(tableId, deleteBeforeData)
)

idCount.synchronized {
idCount -= 1
}

cachedEvents.add(
DataChangeEvent.deleteEvent(tableId, deleteBeforeData)
)
}

{
emitSchemaEvolutionEvents().foreach(
cachedEvents.add
)
}
}
}

private def genRecord() = {
val generator = new BinaryRecordDataGenerator(
PhakerDatabase.columnList.map(_._2)
)
val rowData = PhakerDatabase.columnList
.map(col => PhakeDataGenerator.randomData(col._1, col._2))

println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}")
generator.generate(
rowData
)
}

private def emitSchemaEvolutionEvents(): Option[Event] = {

if (!schemaEvolve) { return None }
if (colCount > maxColumnCount) {
return None
}

println("Emitting schema change events...")

val addedColumnName = colCount.synchronized {
colCount += 1
s"column$colCount"
}
val addedColumnType = PhakeDataGenerator.randomType(rejectedTypes)

PhakerDatabase.columnList.synchronized {
PhakerDatabase.columnList :+= (addedColumnName, addedColumnType)
println(s"Done, new schema: ${PhakerDatabase.genSchema}")
Some(
new AddColumnEvent(
tableId,
util.Arrays.asList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
addedColumnName,
addedColumnType
)
)
)
)
)
}
}
}
Loading

0 comments on commit 75f786f

Please sign in to comment.