diff --git a/src/main/scala/factory/PhakerDataFactory.scala b/src/main/scala/factory/PhakerDataFactory.scala index df07862..37f0695 100644 --- a/src/main/scala/factory/PhakerDataFactory.scala +++ b/src/main/scala/factory/PhakerDataFactory.scala @@ -25,6 +25,7 @@ class PhakerDataFactory extends DataSourceFactory { TableId.parse(conf.get(TABLE_ID)), conf.get(REJECTED_TYPES).split(',').toSet, conf.get(SCHEMA_EVOLVE), + conf.get(NON_NULL_COLUMNS), conf.get(MAX_COLUMN_COUNT), conf.get(RECORDS_PER_SECOND) ) @@ -40,6 +41,7 @@ class PhakerDataFactory extends DataSourceFactory { Set[ConfigOption[_]]( REJECTED_TYPES, SCHEMA_EVOLVE, + NON_NULL_COLUMNS, MAX_COLUMN_COUNT, RECORDS_PER_SECOND ).asJava diff --git a/src/main/scala/source/PhakeDataGenerator.scala b/src/main/scala/source/PhakeDataGenerator.scala index 5e07dc0..eb9e25d 100644 --- a/src/main/scala/source/PhakeDataGenerator.scala +++ b/src/main/scala/source/PhakeDataGenerator.scala @@ -11,8 +11,9 @@ import java.time.{Instant, ZonedDateTime} import scala.util.Random object PhakeDataGenerator { - def randomType(rejectedTypes: Set[String]): DataType = { - val choices = List( + + def possibleChoices(rejectedTypes: Set[String]): List[DataType] = { + List( DataTypes.BINARY(17 + Random.nextInt(100)), DataTypes.VARBINARY(17 + Random.nextInt(100)), DataTypes.BOOLEAN, @@ -30,10 +31,25 @@ object PhakeDataGenerator { DataTypes.TIMESTAMP_TZ(Random.nextInt(10)), DataTypes.TIMESTAMP_LTZ(Random.nextInt(10)) ).filterNot(t => rejectedTypes.contains(t.getClass.getSimpleName)) - choices(Random.nextInt(choices.length)) + } + + def randomType( + rejectedTypes: Set[String], + generateNonNullColumns: Boolean + ): DataType = { + val choices = possibleChoices(rejectedTypes) + val resultType = choices(Random.nextInt(choices.length)) + if (generateNonNullColumns && Random.nextBoolean()) { + resultType.notNull + } else { + resultType.nullable + } } def randomData(name: String, dataType: DataType): AnyRef = { + if (dataType.isNullable && Random.nextBoolean()) { + return null + } if (name == PhakerDatabase.primaryKey) { return idCount .synchronized { diff --git a/src/main/scala/source/PhakerDataSource.scala b/src/main/scala/source/PhakerDataSource.scala index 8489c1c..938b628 100644 --- a/src/main/scala/source/PhakerDataSource.scala +++ b/src/main/scala/source/PhakerDataSource.scala @@ -8,6 +8,7 @@ class PhakerDataSource( tableId: TableId, rejectedTypes: Set[String], schemaEvolve: Boolean, + generateNonNullColumns: Boolean, maxColumnCount: Int, recordsPerSecond: Int ) extends DataSource { @@ -18,6 +19,7 @@ class PhakerDataSource( tableId, rejectedTypes, schemaEvolve, + generateNonNullColumns, maxColumnCount ), recordsPerSecond diff --git a/src/main/scala/source/PhakerDataSourceOptions.scala b/src/main/scala/source/PhakerDataSourceOptions.scala index c577bca..1aef3b6 100644 --- a/src/main/scala/source/PhakerDataSourceOptions.scala +++ b/src/main/scala/source/PhakerDataSourceOptions.scala @@ -26,6 +26,14 @@ object PhakerDataSourceOptions { "Whether generate schema evolution events occasionally. Defaults to true." ) + val NON_NULL_COLUMNS: ConfigOption[lang.Boolean] = ConfigOptions + .key("non.null.columns") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether generating non-nullable columns into downstream." + ) + val MAX_COLUMN_COUNT: ConfigOption[lang.Integer] = ConfigOptions .key("max.column.count") .intType() diff --git a/src/main/scala/source/PhakerSourceGenerator.scala b/src/main/scala/source/PhakerSourceGenerator.scala index 84f3ef2..228fcab 100644 --- a/src/main/scala/source/PhakerSourceGenerator.scala +++ b/src/main/scala/source/PhakerSourceGenerator.scala @@ -14,10 +14,18 @@ class PhakerSourceGenerator( tableId: TableId, rejectedTypes: Set[String], schemaEvolve: Boolean, + generateNonNullColumns: Boolean, maxColumnCount: Int ) extends RandomGenerator[Event] { private val cachedEvents: util.List[Event] = { + if (!schemaEvolve) { + PhakerDatabase.columnList ++= + PhakeDataGenerator + .possibleChoices(rejectedTypes) + .zipWithIndex + .map(t => (s"column${t._2}_${t._1.getClass.getSimpleName}", t._1)) + } val cache = new util.ArrayList[Event] cache.add( new CreateTableEvent( @@ -114,12 +122,13 @@ class PhakerSourceGenerator( println("Emitting schema change events...") + val addedColumnType = + PhakeDataGenerator.randomType(rejectedTypes, generateNonNullColumns) + val addedColumnName = colCount.synchronized { colCount += 1 - s"column$colCount" + s"column${colCount}_${addedColumnType.getClass.getSimpleName}" } - val addedColumnType = PhakeDataGenerator.randomType(rejectedTypes) - PhakerDatabase.columnList.synchronized { PhakerDatabase.columnList :+= (addedColumnName, addedColumnType) println(s"Done, new schema: ${PhakerDatabase.genSchema}") diff --git a/src/test/scala/PhakerTest.scala b/src/test/scala/PhakerTest.scala index 5f9f3a5..e77907e 100644 --- a/src/test/scala/PhakerTest.scala +++ b/src/test/scala/PhakerTest.scala @@ -24,9 +24,10 @@ class PhakerTest extends AnyFunSuite { TableId.tableId("default_namespace", "default_schema", "default_table"), Set("IntType", "FloatType", "DoubleType"), true, + true, 17 ), - 1000 + 1 ) val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -50,6 +51,8 @@ class PhakerTest extends AnyFunSuite { ) .set(PhakerDataSourceOptions.REJECTED_TYPES, "BinaryType,VarBinaryType") .set[java.lang.Integer](PhakerDataSourceOptions.RECORDS_PER_SECOND, 1) + .set[java.lang.Boolean](PhakerDataSourceOptions.NON_NULL_COLUMNS, true) + .set[java.lang.Boolean](PhakerDataSourceOptions.SCHEMA_EVOLVE, true) .set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50) val sourceDef =