diff --git a/docs/Changelog.md b/docs/Changelog.md index e982b21cd5a..773d43d03e3 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -47,6 +47,8 @@ some types of unallowed expressions. * [#6880](https://github.com/TouK/nussknacker/pull/6880) Performance optimization of generating Avro messages with unions - shorter message in logs +* [#6886](https://github.com/TouK/nussknacker/pull/6886) Fix for "Illegal table name:$nuCatalog" error when using Apache Iceberg catalog. + Internal Nussknacker catalog is now named `_nu_catalog` ## 1.17 diff --git a/engine/flink/components/base-tests/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/engine/flink/components/base-tests/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 670172264d6..8ea4b3044fe 100644 --- a/engine/flink/components/base-tests/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/engine/flink/components/base-tests/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -1 +1 @@ -pl.touk.nussknacker.engine.flink.table.definition.MockableCatalogFactory +pl.touk.nussknacker.engine.flink.table.definition.StubbedCatalogFactory diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/MockableCatalogFactory.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/MockableCatalogFactory.scala deleted file mode 100644 index 91d7b897d44..00000000000 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/MockableCatalogFactory.scala +++ /dev/null @@ -1,30 +0,0 @@ -package pl.touk.nussknacker.engine.flink.table.definition - -import org.apache.flink.table.catalog.{Catalog, GenericInMemoryCatalog} -import org.apache.flink.table.factories.CatalogFactory - -class MockableCatalogFactory extends CatalogFactory { - - override def factoryIdentifier(): String = MockableCatalogFactory.catalogName - - override def createCatalog(context: CatalogFactory.Context): Catalog = MockableCatalogFactory.catalog - -} - -// Warning: this implementation can't be used by concurrent threads -object MockableCatalogFactory { - - private val catalogName = "mockable" - - @volatile - var catalog: GenericInMemoryCatalog = createCatalog - - private def createCatalog = { - new GenericInMemoryCatalog(catalogName) - } - - def resetCatalog(): Unit = { - catalog = createCatalog - } - -} diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/StubbedCatalogFactory.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/StubbedCatalogFactory.scala new file mode 100644 index 00000000000..3e0d92510fe --- /dev/null +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/StubbedCatalogFactory.scala @@ -0,0 +1,44 @@ +package pl.touk.nussknacker.engine.flink.table.definition + +import org.apache.flink.table.api.{DataTypes, Schema} +import org.apache.flink.table.catalog.{Catalog, CatalogTable, GenericInMemoryCatalog, ObjectPath} +import org.apache.flink.table.factories.CatalogFactory + +import scala.jdk.CollectionConverters._ + +class StubbedCatalogFactory extends CatalogFactory { + + override def factoryIdentifier(): String = StubbedCatalogFactory.catalogName + + override def createCatalog(context: CatalogFactory.Context): Catalog = StubbedCatalogFactory.catalog + +} + +object StubbedCatalogFactory { + + val catalogName = "stubbed" + + val sampleBoundedTablePath: ObjectPath = ObjectPath.fromString("default.sample_bounded_table") + + val sampleBoundedTableNumberOfRows: Int = 10 + + val sampleColumnName = "fooColumn" + + private val catalog: GenericInMemoryCatalog = populateCatalog(new GenericInMemoryCatalog(catalogName)) + + private def populateCatalog(inMemoryCatalog: GenericInMemoryCatalog): GenericInMemoryCatalog = { + val sampleBoundedTable = CatalogTable.of( + Schema.newBuilder().column(sampleColumnName, DataTypes.STRING()).build(), + null, + List.empty[String].asJava, + Map( + "connector" -> "datagen", + // to make it bounded + "number-of-rows" -> sampleBoundedTableNumberOfRows.toString + ).asJava + ) + inMemoryCatalog.createTable(sampleBoundedTablePath, sampleBoundedTable, false) + inMemoryCatalog + } + +} diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscoveryTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscoveryTest.scala index 5095302c62b..b0f4870198b 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscoveryTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscoveryTest.scala @@ -107,23 +107,19 @@ class TablesDefinitionDiscoveryTest } test("use catalog configuration in data definition") { - val catalogConfiguration = Configuration.fromMap(Map("type" -> "mockable").asJava) - val catalogTable = CatalogTable.of( - Schema.newBuilder().column("fooColumn", DataTypes.STRING()).build(), - null, - List.empty[String].asJava, - Map.empty[String, String].asJava - ) - MockableCatalogFactory.resetCatalog() - MockableCatalogFactory.catalog.createTable(ObjectPath.fromString("default.fooTable"), catalogTable, false) - val flinkDataDefinition = FlinkDataDefinition.create(None, Some(catalogConfiguration)).validValue + val catalogConfiguration = Configuration.fromMap(Map("type" -> StubbedCatalogFactory.catalogName).asJava) + val flinkDataDefinition = FlinkDataDefinition.create(None, Some(catalogConfiguration)).validValue val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue val tableDefinition = discovery.listTables.loneElement - tableDefinition.tableId.toString shouldBe s"`${FlinkDataDefinition.internalCatalogName}`.`default`.`fooTable`" - tableDefinition.schema shouldBe ResolvedSchema.of(Column.physical("fooColumn", DataTypes.STRING())) + tableDefinition.tableId.toString shouldBe s"`${FlinkDataDefinition.internalCatalogName}`." + + s"`${StubbedCatalogFactory.sampleBoundedTablePath.getDatabaseName}`." + + s"`${StubbedCatalogFactory.sampleBoundedTablePath.getObjectName}`" + tableDefinition.schema shouldBe ResolvedSchema.of( + Column.physical(StubbedCatalogFactory.sampleColumnName, DataTypes.STRING()) + ) } } diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala index 8a9339808e3..36e2b17f08f 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala @@ -6,10 +6,12 @@ import org.apache.flink.types.Row import org.scalatest.LoneElement import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.NodeId +import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodesDeploymentData, SqlFilteringExpression} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider +import pl.touk.nussknacker.engine.flink.table.definition.{FlinkDataDefinition, StubbedCatalogFactory} import pl.touk.nussknacker.engine.flink.test.FlinkSpec import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode @@ -65,7 +67,7 @@ class TableSourceTest .withExtraComponents(tableComponents) .build() - test("be possible to user table declared inside a database other than the default one") { + test("be possible to use table declared inside a database other than the default one") { val scenario = ScenarioBuilder .streaming("test") .source("start", "table", "Table" -> s"'`default_catalog`.`testdb`.`tablewithqualifiedname`'".spel) @@ -76,4 +78,69 @@ class TableSourceTest result.successes.loneElement } + test("be possible to use nodes deployment data") { + val scenario = ScenarioBuilder + .streaming("test") + .source("start", "table", "Table" -> s"'`default_catalog`.`testdb`.`tablewithqualifiedname`'".spel) + .emptySink(s"end", TestScenarioRunner.testResultSink, "value" -> "#input".spel) + + val result = runner + .runWithoutData[Row]( + scenario, + nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = true"))) + ) + .validValue + result.errors shouldBe empty + result.successes.loneElement + } + + test("be possible combine nodes deployment data with catalogs configuration") { + val configWithCatalogConfiguration = ConfigFactory.parseString( + s"""catalogConfiguration { + | type: ${StubbedCatalogFactory.catalogName} + |}""".stripMargin + ) + + val tableComponentsBasedOnCatalogConfiguration: List[ComponentDefinition] = + new FlinkTableComponentProvider().create( + configWithCatalogConfiguration, + ProcessObjectDependencies.withConfig(configWithCatalogConfiguration) + ) + + val runnerWithCatalogConfiguration: FlinkTestScenarioRunner = TestScenarioRunner + .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .withExecutionMode(ExecutionMode.Batch) + .withExtraComponents(tableComponentsBasedOnCatalogConfiguration) + .build() + + val scenario = ScenarioBuilder + .streaming("test") + .source( + "start", + "table", + "Table" -> (s"'`${FlinkDataDefinition.internalCatalogName}`." + + s"`${StubbedCatalogFactory.sampleBoundedTablePath.getDatabaseName}`." + + s"`${StubbedCatalogFactory.sampleBoundedTablePath.getObjectName}`'").spel + ) + .emptySink(s"end", TestScenarioRunner.testResultSink, "value" -> "#input".spel) + + val resultWithoutFiltering = runnerWithCatalogConfiguration + .runWithoutData[Row]( + scenario, + nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = true"))) + ) + .validValue + resultWithoutFiltering.errors shouldBe empty + resultWithoutFiltering.successes should have size StubbedCatalogFactory.sampleBoundedTableNumberOfRows + + val resultWithFiltering = runnerWithCatalogConfiguration + .runWithoutData[Row]( + scenario, + nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = false"))) + ) + .validValue + resultWithFiltering.errors shouldBe empty + resultWithFiltering.successes shouldBe empty + } + } diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDataDefinition.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDataDefinition.scala index a97c5ea019b..f3240009f50 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDataDefinition.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDataDefinition.scala @@ -42,7 +42,9 @@ class FlinkDataDefinition private ( object FlinkDataDefinition { - private[definition] val internalCatalogName = "$nuCatalog" + // We can't user dollar ($) character in this name as some catalogs such as Apache Iceberg use it internally + // to split object paths + private[table] val internalCatalogName = "_nu_catalog" def create( sqlStatements: Option[List[String]], diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala index ee2b902de08..f4b5b62c856 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/FlinkTestScenarioRunner.scala @@ -4,7 +4,7 @@ import com.typesafe.config.{Config, ConfigValueFactory} import org.apache.flink.api.connector.source.Boundedness import pl.touk.nussknacker.defaultmodel.DefaultConfigCreator import pl.touk.nussknacker.engine.api.ProcessVersion -import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodesDeploymentData} import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, SourceFactory} import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -74,17 +74,23 @@ class FlinkTestScenarioRunner( ) extends ClassBasedTestScenarioRunner { override def runWithData[I: ClassTag, R](scenario: CanonicalProcess, data: List[I]): RunnerListResult[R] = { - runWithTestSourceComponent(scenario, testDataSourceComponent(data, Typed.typedClass[I], None)) + runWithTestSourceComponent( + scenario, + NodesDeploymentData.empty, + testDataSourceComponent(data, Typed.typedClass[I], None) + ) } def runWithData[I: ClassTag, R]( scenario: CanonicalProcess, data: List[I], boundedness: Boundedness = Boundedness.CONTINUOUS_UNBOUNDED, - timestampAssigner: Option[TimestampWatermarkHandler[I]] = None + timestampAssigner: Option[TimestampWatermarkHandler[I]] = None, + nodesData: NodesDeploymentData = NodesDeploymentData.empty ): RunnerListResult[R] = { runWithTestSourceComponent( scenario, + nodesData, testDataSourceComponent(data, Typed.typedClass[I], timestampAssigner, boundedness) ) } @@ -94,16 +100,19 @@ class FlinkTestScenarioRunner( data: List[I], inputType: TypingResult, boundedness: Boundedness = Boundedness.CONTINUOUS_UNBOUNDED, - timestampAssigner: Option[TimestampWatermarkHandler[I]] = None + timestampAssigner: Option[TimestampWatermarkHandler[I]] = None, + nodesData: NodesDeploymentData = NodesDeploymentData.empty ): RunnerListResult[R] = { runWithTestSourceComponent( scenario, + nodesData, testDataSourceComponent(data, inputType, timestampAssigner, boundedness) ) } private def runWithTestSourceComponent[I: ClassTag, R]( scenario: CanonicalProcess, + nodesData: NodesDeploymentData = NodesDeploymentData.empty, testDataSourceComponent: ComponentDefinition ): RunnerListResult[R] = { val testComponents = testDataSourceComponent :: noopSourceComponent :: Nil @@ -111,32 +120,39 @@ class FlinkTestScenarioRunner( TestExtensionsHolder .registerTestExtensions(components ++ testComponents, testResultSinkComponentCreator :: Nil, globalVariables) ) { testComponentHolder => - run[R](scenario, testComponentHolder) + run[R](scenario, nodesData, testComponentHolder) } } /** * Can be used to test Flink bounded sources - we wait for the scenario to finish. */ - def runWithoutData[R](scenario: CanonicalProcess): RunnerListResult[R] = { + def runWithoutData[R]( + scenario: CanonicalProcess, + nodesData: NodesDeploymentData = NodesDeploymentData.empty + ): RunnerListResult[R] = { val testComponents = noopSourceComponent :: Nil Using.resource( TestExtensionsHolder .registerTestExtensions(components ++ testComponents, testResultSinkComponentCreator :: Nil, globalVariables) ) { testComponentHolder => - run[R](scenario, testComponentHolder) + run[R](scenario, nodesData, testComponentHolder) } } /** * Can be used to test Flink based sinks. */ - def runWithDataIgnoringResults[I: ClassTag](scenario: CanonicalProcess, data: List[I]): RunnerResultUnit = { + def runWithDataIgnoringResults[I: ClassTag]( + scenario: CanonicalProcess, + data: List[I], + nodesData: NodesDeploymentData = NodesDeploymentData.empty + ): RunnerResultUnit = { val testComponents = testDataSourceComponent(data, Typed.typedClass[I], None) :: noopSourceComponent :: Nil Using.resource( TestExtensionsHolder.registerTestExtensions(components ++ testComponents, List.empty, globalVariables) ) { testComponentHolder => - run[AnyRef](scenario, testComponentHolder).map { case RunListResult(errors, _) => + run[AnyRef](scenario, nodesData, testComponentHolder).map { case RunListResult(errors, _) => RunUnitResult(errors) } } @@ -144,6 +160,7 @@ class FlinkTestScenarioRunner( private def run[OUTPUT]( scenario: CanonicalProcess, + nodesData: NodesDeploymentData, testExtensionsHolder: TestExtensionsHolder ): RunnerListResult[OUTPUT] = { val modelData = LocalModelData( @@ -186,7 +203,7 @@ class FlinkTestScenarioRunner( env, scenario, ProcessVersion.empty, - DeploymentData.empty, + DeploymentData.empty.copy(nodesData = nodesData), testScenarioCollectorHandler.resultCollector )