Skip to content

Commit

Permalink
add: allow rejecting types & minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 14, 2024
1 parent b72249c commit 4d9fb3d
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 70 deletions.
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
```yaml
source:
type: phaker
namespace.name: default_namespace
schema.name: default_schema
table.name: table_name
schema.evolve: true # Generate schema evolution events, too
max.column.count: 50 # limit maximum column count
batch.count: 17 # how many data records should source emits in each batch
sleep.time: 1000 # sleep duration (in ms) between two consecutive batches
table.id: default_namespace.default_schema # Fully qualified Table ID
rejected.types: BinaryType,VarBinaryType # Exclude some data types if downstream could not handle them
schema.evolve: true # Generate schema evolution events, too
max.column.count: 50 # limit maximum column count
batch.count: 17 # how many data records should source emits in each batch
sleep.time: 1000 # sleep duration (in ms) between two consecutive batches
```
## example
Expand Down
17 changes: 12 additions & 5 deletions src/main/scala/factory/PhakerDataFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import source.PhakerDataSource
import source.PhakerDataSourceOptions._

import org.apache.flink.cdc.common.configuration.ConfigOption
import org.apache.flink.cdc.common.event.TableId
import org.apache.flink.cdc.common.factories.{DataSourceFactory, Factory}
import org.apache.flink.cdc.common.source.DataSource

Expand All @@ -18,11 +19,11 @@ object PhakerDataFactory {
class PhakerDataFactory extends DataSourceFactory {

override def createDataSource(context: Factory.Context): DataSource = {

val conf = context.getFactoryConfiguration
new PhakerDataSource(
conf.get(NAMESPACE_NAME),
conf.get(SCHEMA_NAME),
conf.get(TABLE_NAME),
TableId.parse(conf.get(TABLE_ID)),
conf.get(REJECTED_TYPES).split(',').toSet,
conf.get(SCHEMA_EVOLVE),
conf.get(MAX_COLUMN_COUNT),
conf.get(BATCH_COUNT),
Expand All @@ -33,10 +34,16 @@ class PhakerDataFactory extends DataSourceFactory {
override def identifier(): String = PhakerDataFactory.IDENTIFIER

override def requiredOptions(): util.Set[ConfigOption[_]] = {
Set[ConfigOption[_]](NAMESPACE_NAME, SCHEMA_NAME, TABLE_NAME).asJava
Set[ConfigOption[_]](TABLE_ID).asJava
}

override def optionalOptions(): util.Set[ConfigOption[_]] = {
Set[ConfigOption[_]](SCHEMA_EVOLVE, MAX_COLUMN_COUNT, BATCH_COUNT, SLEEP_TIME).asJava
Set[ConfigOption[_]](
REJECTED_TYPES,
SCHEMA_EVOLVE,
MAX_COLUMN_COUNT,
BATCH_COUNT,
SLEEP_TIME
).asJava
}
}
4 changes: 2 additions & 2 deletions src/main/scala/source/PhakeDataGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.time.{Instant, ZonedDateTime}
import scala.util.Random

object PhakeDataGenerator {
def randomType: DataType = {
def randomType(rejectedTypes: Set[String]): DataType = {
val choices = List(
DataTypes.BINARY(17 + Random.nextInt(100)),
DataTypes.VARBINARY(17 + Random.nextInt(100)),
Expand All @@ -29,7 +29,7 @@ object PhakeDataGenerator {
DataTypes.TIMESTAMP(Random.nextInt(10)),
DataTypes.TIMESTAMP_TZ(Random.nextInt(10)),
DataTypes.TIMESTAMP_LTZ(Random.nextInt(10))
)
).filterNot(t => rejectedTypes.contains(t.getClass.getSimpleName))
choices(Random.nextInt(choices.length))
}

Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/source/PhakerDataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import org.apache.flink.cdc.common.event.TableId
import org.apache.flink.cdc.common.source.{DataSource, EventSourceProvider, FlinkSourceFunctionProvider, MetadataAccessor}

class PhakerDataSource(
namespaceName: String,
schemaName: String,
tableName: String,
tableId: TableId,
rejectedTypes: Set[String],
schemaEvolve: Boolean,
maxColumnCount: Int,
batchCount: Int,
Expand All @@ -16,7 +15,8 @@ class PhakerDataSource(
override def getEventSourceProvider: EventSourceProvider = {
FlinkSourceFunctionProvider.of(
new PhakerSourceFunction(
TableId.tableId(namespaceName, schemaName, tableName),
tableId,
rejectedTypes,
schemaEvolve,
maxColumnCount,
batchCount,
Expand All @@ -26,6 +26,6 @@ class PhakerDataSource(
}

override def getMetadataAccessor: MetadataAccessor = {
new PhakerMetadataAccessor(namespaceName, schemaName, tableName)
new PhakerMetadataAccessor(tableId)
}
}
29 changes: 13 additions & 16 deletions src/main/scala/source/PhakerDataSourceOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,17 @@ import org.apache.flink.cdc.common.configuration.{ConfigOption, ConfigOptions}
import java.lang

object PhakerDataSourceOptions {
val NAMESPACE_NAME: ConfigOption[lang.String] = ConfigOptions
.key("namespace.name")
val TABLE_ID: ConfigOption[lang.String] = ConfigOptions
.key("table.id")
.stringType()
.noDefaultValue()
.withDescription("Namespace name of the simulated table.")
.withDescription("Table ID of the simulated table.")

val SCHEMA_NAME: ConfigOption[lang.String] = ConfigOptions
.key("schema.name")
val REJECTED_TYPES: ConfigOption[lang.String] = ConfigOptions
.key("rejected.types")
.stringType()
.noDefaultValue()
.withDescription("Schema name of the simulated table.")

val TABLE_NAME: ConfigOption[lang.String] = ConfigOptions
.key("table.name")
.stringType()
.noDefaultValue()
.withDescription("Name of the simulated table.")
.defaultValue("")
.withDescription("Unwanted data types (). Separated with comma.")

val SCHEMA_EVOLVE: ConfigOption[lang.Boolean] = ConfigOptions
.key("schema.evolve")
Expand All @@ -36,8 +30,9 @@ object PhakerDataSourceOptions {
.key("max.column.count")
.intType()
.defaultValue(50)
.withDescription("Max added columns count. No schema evolution events will be generated if this limit has exceeded. Defaults to 50.")

.withDescription(
"Max added columns count. No schema evolution events will be generated if this limit has exceeded. Defaults to 50."
)

val BATCH_COUNT: ConfigOption[lang.Integer] = ConfigOptions
.key("batch.count")
Expand All @@ -49,5 +44,7 @@ object PhakerDataSourceOptions {
.key("sleep.time")
.intType()
.defaultValue(1000)
.withDescription("Sleep time for a while during each batch (in milliseconds). Defaults to 1000.")
.withDescription(
"Sleep time for a while during each batch (in milliseconds). Defaults to 1000."
)
}
3 changes: 2 additions & 1 deletion src/main/scala/source/PhakerDatabase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package source
import org.apache.flink.cdc.common.schema.{Column, Schema}
import org.apache.flink.cdc.common.types.{DataType, DataTypes}

import java.util
object PhakerDatabase {
val primaryKey: String = "id"
var columnList: Array[(String, DataType)] = Array(
Expand All @@ -14,7 +15,7 @@ object PhakerDatabase {

def genSchema: Schema = {
PhakerDatabase.synchronized {
import java.util

Schema
.newBuilder()
.setColumns(
Expand Down
22 changes: 8 additions & 14 deletions src/main/scala/source/PhakerMetadataAccessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,24 @@ import java.util
import scala.collection.JavaConverters._

class PhakerMetadataAccessor(
namespaceName: String,
schemaName: String,
tableName: String
tableId: TableId
) extends MetadataAccessor {

def apply(
namespaceName: String,
schemaName: String,
tableName: String
): PhakerMetadataAccessor =
new PhakerMetadataAccessor(namespaceName, schemaName, tableName)

override def listNamespaces(): util.List[String] = List(namespaceName).asJava
override def listNamespaces(): util.List[String] = List(
tableId.getNamespace
).asJava

override def listSchemas(namespace: String): util.List[String] = {
if (namespace == namespaceName) List(schemaName) else List.empty[String]
if (namespace == tableId.getNamespace) List(tableId.getSchemaName)
else List.empty[String]
}.asJava

override def listTables(
namespace: String,
schema: String
): util.List[TableId] = {
if (namespace == namespaceName && schema == schemaName)
List(TableId.tableId(namespaceName, schemaName, tableName))
if (namespace == tableId.getNamespace && schema == tableId.getSchemaName)
List(tableId)
else List.empty[TableId]
}.asJava

Expand Down
36 changes: 19 additions & 17 deletions src/main/scala/source/PhakerSourceFunction.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package io.github.yuxiqian.phaker
package source

import source.PhakerDatabase.idCount
import source.PhakerDatabase.{colCount, idCount}

import org.apache.flink.cdc.common.event._
import org.apache.flink.cdc.common.schema.Column
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator
import org.apache.flink.streaming.api.functions.source.SourceFunction

import java.util

class PhakerSourceFunction(
tableId: TableId,
rejectedTypes: Set[String],
schemaEvolve: Boolean,
maxColumnCount: Int,
batchCount: Int,
Expand Down Expand Up @@ -54,19 +57,6 @@ class PhakerSourceFunction(
}
}

private def genRecord() = {
val generator = new BinaryRecordDataGenerator(
PhakerDatabase.columnList.map(_._2).toArray
)
val rowData = PhakerDatabase.columnList
.map(col => PhakeDataGenerator.randomData(col._1, col._2))

println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}")
generator.generate(
rowData
)
}

private def emitUpdateEvents(ctx: Context, count: Int): Unit = {
for (_ <- 0 until count) {
val updateBeforeData = genRecord()
Expand All @@ -85,6 +75,19 @@ class PhakerSourceFunction(
}
}

private def genRecord() = {
val generator = new BinaryRecordDataGenerator(
PhakerDatabase.columnList.map(_._2).toArray
)
val rowData = PhakerDatabase.columnList
.map(col => PhakeDataGenerator.randomData(col._1, col._2))

println(s"Generated data record: ${rowData.mkString("Array(", ", ", ")")}")
generator.generate(
rowData
)
}

private def emitDeleteEvents(ctx: Context, count: Int): Unit = {
for (_ <- 0 until count) {
val deleteBeforeData = genRecord()
Expand All @@ -103,7 +106,6 @@ class PhakerSourceFunction(
}

private def emitSchemaEvolutionEvents(ctx: Context): Unit = {
import source.PhakerDatabase.colCount

if (!schemaEvolve) { return }
if (colCount > maxColumnCount) {
Expand All @@ -116,10 +118,10 @@ class PhakerSourceFunction(
colCount += 1
s"column$colCount"
}
val addedColumnType = PhakeDataGenerator.randomType
val addedColumnType = PhakeDataGenerator.randomType(rejectedTypes)

PhakerDatabase.columnList.synchronized {
import java.util

PhakerDatabase.columnList :+= (addedColumnName, addedColumnType)
ctx.collect(
new AddColumnEvent(
Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/PhakerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class PhakerTest extends AnyFunSuite {
test("Phaker source test") {
val source = new PhakerSourceFunction(
TableId.tableId("default_namespace", "default_schema", "default_table"),
Set("IntType", "FloatType", "DoubleType"),
true,
17,
17,
Expand All @@ -42,9 +43,8 @@ class PhakerTest extends AnyFunSuite {
// Setup value source
val sourceConfig = new Configuration
sourceConfig
.set(PhakerDataSourceOptions.NAMESPACE_NAME, "default_namespace")
.set(PhakerDataSourceOptions.SCHEMA_NAME, "default_schema")
.set(PhakerDataSourceOptions.TABLE_NAME, "default_table")
.set(PhakerDataSourceOptions.TABLE_ID, "default_namespace.default_schema.default_table")
.set(PhakerDataSourceOptions.REJECTED_TYPES, "BinaryType,VarBinaryType")
.set[java.lang.Integer](PhakerDataSourceOptions.BATCH_COUNT, 1)
.set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50)
.set[java.lang.Integer](PhakerDataSourceOptions.SLEEP_TIME, 1000)
Expand Down

0 comments on commit 4d9fb3d

Please sign in to comment.