Skip to content

Commit

Permalink
fix: non-schema evolve mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 16, 2024
1 parent a44b8e6 commit 722ce2d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 34 deletions.
12 changes: 5 additions & 7 deletions src/main/scala/source/PhakerDataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ class PhakerDataSource(
override def getEventSourceProvider: EventSourceProvider = {
FlinkSourceFunctionProvider.of(
new PhakerSourceFunction(
new PhakerSourceGenerator(
tableId,
rejectedTypes,
schemaEvolve,
generateNonNullColumns,
maxColumnCount
),
tableId,
rejectedTypes,
schemaEvolve,
generateNonNullColumns,
maxColumnCount,
recordsPerSecond
)
)
Expand Down
18 changes: 14 additions & 4 deletions src/main/scala/source/PhakerSourceFunction.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package io.github.yuxiqian.phaker
package source

import org.apache.flink.cdc.common.event.Event
import org.apache.flink.streaming.api.functions.source.datagen.{DataGenerator, DataGeneratorSource}
import org.apache.flink.cdc.common.event.{Event, TableId}
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource

class PhakerSourceFunction(
generator: DataGenerator[Event],
tableId: TableId,
rejectedTypes: Set[String],
schemaEvolve: Boolean,
generateNonNullColumns: Boolean,
maxColumnCount: Int,
rowsPerSecond: Long
) extends DataGeneratorSource[Event](
generator,
new PhakerSourceGenerator(
tableId,
rejectedTypes,
schemaEvolve,
generateNonNullColumns,
maxColumnCount
),
rowsPerSecond,
null
) {}
39 changes: 26 additions & 13 deletions src/main/scala/source/PhakerSourceGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package source

import source.PhakerDatabase.{colCount, idCount}

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.cdc.common.event._
import org.apache.flink.cdc.common.schema.Column
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator
import org.apache.flink.runtime.state.FunctionInitializationContext
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator

import java.util
Expand All @@ -18,22 +20,31 @@ class PhakerSourceGenerator(
maxColumnCount: Int
) extends RandomGenerator[Event] {

private val cachedEvents: util.List[Event] = {
private val cachedEvents = new util.ArrayList[Event]();

override def open(
name: String,
context: FunctionInitializationContext,
runtimeContext: RuntimeContext
): Unit = {
super.open(name, context, runtimeContext)
if (!schemaEvolve) {
PhakerDatabase.columnList ++=
PhakeDataGenerator
.possibleChoices(rejectedTypes)
.zipWithIndex
.map(t => (s"column${t._2}_${t._1.getClass.getSimpleName}", t._1))
PhakerDatabase.columnList.synchronized {
PhakerDatabase.columnList ++=
PhakeDataGenerator
.possibleChoices(rejectedTypes)
.zipWithIndex
.map(t =>
(s"column${t._2 + 1}_${t._1.getClass.getSimpleName}", t._1)
)
}
}
val cache = new util.ArrayList[Event]
cache.add(
cachedEvents.add(
new CreateTableEvent(
tableId,
PhakerDatabase.genSchema
)
)
cache
}

override def next(): Event = {
Expand All @@ -47,8 +58,6 @@ class PhakerSourceGenerator(

private def pushEvents(): Unit = {
PhakerDatabase.synchronized {
println("Emitting insert events...")

{
val insertedData = genRecord()
cachedEvents.add(
Expand Down Expand Up @@ -102,12 +111,16 @@ class PhakerSourceGenerator(

private def genRecord() = {
val generator = new BinaryRecordDataGenerator(
PhakerDatabase.columnList.map(_._2)
PhakerDatabase.columnList.synchronized {
PhakerDatabase.columnList.map(_._2)
}
)
val rowData = PhakerDatabase.columnList
.map(col => PhakeDataGenerator.randomData(col._1, col._2))

println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}")
println(
s"Generated data record (${rowData.length}): ${rowData.mkString("Array(", ", ", ")")}"
)
generator.generate(
rowData
)
Expand Down
18 changes: 8 additions & 10 deletions src/test/scala/PhakerTest.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.yuxiqian.phaker

import factory.PhakerDataFactory
import source.{PhakerSourceFunction, PhakerSourceGenerator}
import source.PhakerSourceFunction

import org.apache.flink.cdc.common.configuration.Configuration
import org.apache.flink.cdc.common.event.TableId
Expand All @@ -20,18 +20,16 @@ class PhakerTest extends AnyFunSuite {
test("Phaker source test") {

val source = new PhakerSourceFunction(
new PhakerSourceGenerator(
TableId.tableId("default_namespace", "default_schema", "default_table"),
Set("IntType", "FloatType", "DoubleType"),
true,
true,
17
),
TableId.tableId("default_namespace", "default_schema", "default_table"),
Set("IntType", "FloatType", "DoubleType"),
false,
true,
17,
1
)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(source).print().setParallelism(1)
env.addSource(source).setParallelism(1).print().setParallelism(1)
env.execute("Let's Test Phaker Source...")
}

Expand All @@ -52,7 +50,7 @@ class PhakerTest extends AnyFunSuite {
.set(PhakerDataSourceOptions.REJECTED_TYPES, "BinaryType,VarBinaryType")
.set[java.lang.Integer](PhakerDataSourceOptions.RECORDS_PER_SECOND, 1)
.set[java.lang.Boolean](PhakerDataSourceOptions.NON_NULL_COLUMNS, true)
.set[java.lang.Boolean](PhakerDataSourceOptions.SCHEMA_EVOLVE, true)
.set[java.lang.Boolean](PhakerDataSourceOptions.SCHEMA_EVOLVE, false)
.set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50)

val sourceDef =
Expand Down

0 comments on commit 722ce2d

Please sign in to comment.