diff --git a/build.sbt b/build.sbt index fbaf1a8e2fa..5ffb4a97a78 100644 --- a/build.sbt +++ b/build.sbt @@ -280,8 +280,8 @@ lazy val commonSettings = // Note: when updating check versions in 'flink*V' below, because some libraries must be fixed at versions provided // by Flink, or jobs may fail in runtime when Flink is run with 'classloader.resolve-order: parent-first'. // You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file. -val flinkV = "1.19.1" -val flinkConnectorKafkaV = "3.2.0-1.19" +val flinkV = "1.20.0" +val flinkConnectorKafkaV = "3.3.0-1.20" val flinkCommonsCompressV = "1.26.0" val flinkCommonsLang3V = "3.12.0" val flinkCommonsTextV = "1.10.0" diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSink.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSink.scala index be42b9cfbad..27da42b9735 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSink.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSink.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.api.process +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} @@ -66,5 +67,6 @@ trait BasicFlinkSink extends FlinkSink with ExplicitUidInOperatorsSupport { helper: FlinkLazyParameterFunctionHelper ): FlatMapFunction[Context, ValueWithContext[Value]] + @silent("deprecated") def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[Value] } diff --git a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/EmptySink.scala b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/EmptySink.scala index 863b73a15ee..3925220c055 100644 --- a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/EmptySink.scala +++ b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/EmptySink.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.util.sink +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.functions.sink.{DiscardingSink, SinkFunction} import pl.touk.nussknacker.engine.api.{Context, ValueWithContext} @@ -20,6 +21,8 @@ trait EmptySink extends BasicFlinkSink { ): FlatMapFunction[Context, ValueWithContext[AnyRef]] = (_, _) => {} + @silent("deprecated") override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] = new DiscardingSink[AnyRef] + } diff --git a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/SingleValueSinkFactory.scala b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/SingleValueSinkFactory.scala index 70bf24ca041..c51142130d3 100644 --- a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/SingleValueSinkFactory.scala +++ b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/sink/SingleValueSinkFactory.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.util.sink +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import pl.touk.nussknacker.engine.api.process.SinkFactory @@ -16,6 +17,7 @@ object SingleValueSinkFactory { final val SingleValueParamName = "Value" } +@silent("deprecated") class SingleValueSinkFactory[T <: AnyRef](sink: => SinkFunction[T]) extends SinkFactory with Serializable { @MethodToInvoke diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/StubbedCatalogFactory.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/StubbedCatalogFactory.scala index 3e0d92510fe..6412100cc5d 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/StubbedCatalogFactory.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/StubbedCatalogFactory.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.table.definition +import com.github.ghik.silencer.silent import org.apache.flink.table.api.{DataTypes, Schema} import org.apache.flink.table.catalog.{Catalog, CatalogTable, GenericInMemoryCatalog, ObjectPath} import org.apache.flink.table.factories.CatalogFactory @@ -26,6 +27,7 @@ object StubbedCatalogFactory { private val catalog: GenericInMemoryCatalog = populateCatalog(new GenericInMemoryCatalog(catalogName)) + @silent("deprecated") private def populateCatalog(inMemoryCatalog: GenericInMemoryCatalog): GenericInMemoryCatalog = { val sampleBoundedTable = CatalogTable.of( Schema.newBuilder().column(sampleColumnName, DataTypes.STRING()).build(), diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala index 032a10dad64..0cbbee307ad 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.flink.table.sink +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.{RichFlatMapFunction, RuntimeContext} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable @@ -38,6 +39,7 @@ class TableSink( ) } + @silent("deprecated") override def registerSink( dataStream: DataStream[ValueWithContext[Value]], flinkNodeContext: FlinkCustomNodeContext diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/CollectingSinkFunction.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/CollectingSinkFunction.scala index 3ceb0d135d1..79fd6724c97 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/CollectingSinkFunction.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/CollectingSinkFunction.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.registrar +import com.github.ghik.silencer.silent import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import pl.touk.nussknacker.engine.api.component.{ComponentType, NodeComponentInfo} import pl.touk.nussknacker.engine.api.ValueWithContext @@ -7,6 +8,7 @@ import pl.touk.nussknacker.engine.process.ExceptionHandlerFunction import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData import pl.touk.nussknacker.engine.testmode.SinkInvocationCollector +@silent("deprecated") private[registrar] class CollectingSinkFunction[T]( val compilerDataForClassloader: ClassLoader => FlinkProcessCompilerData, collectingSink: SinkInvocationCollector, diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala index 5bd2dd1435d..27f674e6316 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/helpers/ProcessTestHelpers.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.helpers +import com.github.ghik.silencer.silent import com.typesafe.config.Config import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction @@ -135,6 +136,7 @@ case object SinkAccessingNodeContext extends EmptySink with Serializable { def nodeId: String = _nodeId + @silent("deprecated") override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] = { _nodeId = flinkNodeContext.nodeId super.toFlinkFunction(flinkNodeContext) diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala index 263c9abe958..ba114a84b52 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/DelayedFlinkKafkaConsumer.scala @@ -46,6 +46,7 @@ import scala.jdk.CollectionConverters._ object DelayedFlinkKafkaConsumer { + @silent("deprecated") def apply[T]( topics: NonEmptyList[PreparedKafkaTopic[TopicName.ForSource]], schema: KafkaDeserializationSchema[T], @@ -88,6 +89,7 @@ object DelayedFlinkKafkaConsumer { } } + @silent("deprecated") type ExtractTimestampForDelay[T] = (KafkaTopicPartitionState[T, TopicPartition], T, Long) => Long } @@ -154,6 +156,7 @@ object DelayedKafkaFetcher { private val maxSleepTime = time.Duration.of(30, ChronoUnit.SECONDS).toMillis } +@silent("deprecated") class DelayedKafkaFetcher[T]( sourceContext: SourceFunction.SourceContext[T], assignedPartitionsWithInitialOffsets: util.Map[KafkaTopicPartition, lang.Long], @@ -188,6 +191,7 @@ class DelayedKafkaFetcher[T]( with LazyLogging { import DelayedKafkaFetcher._ + @silent("deprecated") override def emitRecordsWithTimestamps( records: util.Queue[T], partitionState: KafkaTopicPartitionState[T, TopicPartition], diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/serialization/FlinkSerializationSchemaConversions.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/serialization/FlinkSerializationSchemaConversions.scala index cf9c0935e71..cb8a8e9f661 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/serialization/FlinkSerializationSchemaConversions.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/serialization/FlinkSerializationSchemaConversions.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.kafka.serialization +import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.serialization.DeserializationSchema import org.apache.flink.api.common.typeinfo.TypeInformation @@ -23,6 +24,7 @@ object FlinkSerializationSchemaConversions extends LazyLogging { ): FlinkDeserializationSchemaWrapper[T] = new FlinkDeserializationSchemaWrapper[T](deserializationSchema) + @silent("deprecated") class FlinkDeserializationSchemaWrapper[T](deserializationSchema: serialization.KafkaDeserializationSchema[T]) extends kafka.KafkaDeserializationSchema[T] { @@ -73,6 +75,7 @@ object FlinkSerializationSchemaConversions extends LazyLogging { } + @silent("deprecated") def wrapToFlinkSerializationSchema[T]( serializationSchema: serialization.KafkaSerializationSchema[T] ): kafka.KafkaSerializationSchema[T] = new kafka.KafkaSerializationSchema[T] { diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/serialization/NKKafkaDeserializationSchemaWrapper.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/serialization/NKKafkaDeserializationSchemaWrapper.scala index 2157813bd77..31b4a1012ed 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/serialization/NKKafkaDeserializationSchemaWrapper.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/serialization/NKKafkaDeserializationSchemaWrapper.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.kafka.serialization +import com.github.ghik.silencer.silent import org.apache.flink.api.common.serialization.DeserializationSchema import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper import org.apache.kafka.clients.consumer.ConsumerRecord @@ -8,6 +9,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord // and for Flink's compatibility reasons we don't want to invoke deserialize(ConsumerRecord message, Collector out) // because it wasn't available in previous Flink's versions @SerialVersionUID(7952681455584002102L) +@silent("deprecated") class NKKafkaDeserializationSchemaWrapper[T](deserializationSchema: DeserializationSchema[T]) extends KafkaDeserializationSchemaWrapper[T](deserializationSchema) { diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/sink/flink/FlinkKafkaSink.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/sink/flink/FlinkKafkaSink.scala index 6b9b1ab00b0..5ebeced80de 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/sink/flink/FlinkKafkaSink.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/sink/flink/FlinkKafkaSink.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.kafka.sink.flink +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import pl.touk.nussknacker.engine.api.process.TopicName @@ -31,6 +32,7 @@ class FlinkKafkaSink( ): FlatMapFunction[Context, ValueWithContext[AnyRef]] = helper.lazyMapFunction(value) + @silent("deprecated") override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] = PartitionByKeyFlinkKafkaProducer(kafkaConfig, topic.prepared, serializationSchema, clientId) diff --git a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala index 0ce3696e419..dd11028cd7a 100644 --- a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala +++ b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.management.sample import com.cronutils.model.CronType import com.cronutils.model.definition.CronDefinitionBuilder import com.cronutils.parser.CronParser +import com.github.ghik.silencer.silent import io.circe.parser.decode import io.circe.{Decoder, Encoder} import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema} @@ -58,6 +59,7 @@ class DevProcessConfigCreator extends ProcessConfigCreator { private def all[T](value: T): WithCategories[T] = WithCategories(value, "Category1", "Category2", "DevelopmentTests", "Periodic") + @silent("deprecated") override def sinkFactories( modelDependencies: ProcessObjectDependencies ): Map[String, WithCategories[SinkFactory]] = { diff --git a/engine/flink/management/src/main/java/org/apache/flink/util/MdcUtils.java b/engine/flink/management/src/main/java/org/apache/flink/util/MdcUtils.java new file mode 100644 index 00000000000..cfef3dabb70 --- /dev/null +++ b/engine/flink/management/src/main/java/org/apache/flink/util/MdcUtils.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; +import org.apache.flink.api.common.JobID; + +import org.slf4j.MDC; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Utility class to manage common Flink attributes in {@link MDC} (only {@link JobID} ATM). */ +// TODO: temporary patch for poc +public class MdcUtils { + + public static final String JOB_ID = "flink-job-id"; + + /** + * Replace MDC contents with the provided one and return a closeable object that can be used to + * restore the original MDC. + * + * @param context to put into MDC + */ + public static MdcCloseable withContext(Map context) { + final Map orig = MDC.getCopyOfContextMap(); + final Map safeOrig = orig != null ? orig : Collections.emptyMap(); + MDC.setContextMap(context); + return () -> MDC.setContextMap(safeOrig); + } + + /** {@link AutoCloseable } that restores the {@link MDC} contents on close. */ + public interface MdcCloseable extends AutoCloseable { + @Override + void close(); + } + + /** + * Wrap the given {@link Runnable} so that the given data is added to {@link MDC} before its + * execution and removed afterward. + */ + public static Runnable wrapRunnable(Map contextData, Runnable command) { + return () -> { + try (MdcCloseable ctx = withContext(contextData)) { + command.run(); + } + }; + } + + /** + * Wrap the given {@link Callable} so that the given data is added to {@link MDC} before its + * execution and removed afterward. + */ + public static Callable wrapCallable( + Map contextData, Callable command) { + return () -> { + try (MdcCloseable ctx = withContext(contextData)) { + return command.call(); + } + }; + } + + /** + * Wrap the given {@link Executor} so that the given {@link JobID} is added before it executes + * any submitted commands and removed afterward. + */ + public static Executor scopeToJob(JobID jobID, Executor executor) { + checkArgument(!(executor instanceof MdcAwareExecutor)); + return new MdcAwareExecutor<>(executor, asContextData(jobID)); + } + + /** + * Wrap the given {@link ExecutorService} so that the given {@link JobID} is added before it + * executes any submitted commands and removed afterward. + */ + public static ExecutorService scopeToJob(JobID jobID, ExecutorService delegate) { + checkArgument(!(delegate instanceof MdcAwareExecutorService)); + return new MdcAwareExecutorService<>(delegate, asContextData(jobID)); + } + + /** + * Wrap the given {@link ScheduledExecutorService} so that the given {@link JobID} is added + * before it executes any submitted commands and removed afterward. + */ + public static ScheduledExecutorService scopeToJob(JobID jobID, ScheduledExecutorService ses) { + checkArgument(!(ses instanceof MdcAwareScheduledExecutorService)); + return new MdcAwareScheduledExecutorService(ses, asContextData(jobID)); + } + + public static Map asContextData(JobID jobID) { + return Collections.singletonMap(JOB_ID, jobID.toHexString()); + } +} diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala index a7563c0ac54..318750b19aa 100644 --- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala +++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.schemedkafka.sink.flink +import com.github.ghik.silencer.silent import com.typesafe.scalalogging.LazyLogging import io.confluent.kafka.schemaregistry.ParsedSchema import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext} @@ -55,6 +56,7 @@ class FlinkKafkaUniversalSink( ds.flatMap(new KeyedValueMapper(flinkNodeContext.lazyParameterHelper, key, value), typeInfo) } + @silent("deprecated") private def toFlinkFunction: SinkFunction[KeyedValue[AnyRef, AnyRef]] = { PartitionByKeyFlinkKafkaProducer(kafkaConfig, preparedTopic.prepared, serializationSchema, clientId) } diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala index 8a564ffe275..84f84e90423 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala @@ -542,6 +542,7 @@ object SampleNodes { override def canBeEnding: Boolean = true + @silent("deprecated") @MethodToInvoke(returnType = classOf[String]) def execute(@ParamName("param") @Nullable param: LazyParameter[String]) = FlinkCustomStreamTransformation((start: DataStream[Context], context: FlinkCustomNodeContext) => { @@ -570,6 +571,7 @@ object SampleNodes { ): FlatMapFunction[Context, ValueWithContext[String]] = (ctx, collector) => collector.collect(ValueWithContext(serializableValue, ctx)) + @silent("deprecated") override def toFlinkFunction(flinkCustomNodeContext: FlinkCustomNodeContext): SinkFunction[String] = new SinkFunction[String] { override def invoke(value: String, context: SinkFunction.Context): Unit = resultsHolder.add(value) diff --git a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SinkForType.scala b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SinkForType.scala index c4975d79845..2b1bf856578 100644 --- a/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SinkForType.scala +++ b/engine/flink/test-utils/src/main/scala/pl/touk/nussknacker/engine/process/helpers/SinkForType.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.helpers +import com.github.ghik.silencer.silent import org.apache.flink.streaming.api.functions.sink.SinkFunction import pl.touk.nussknacker.engine.api.process.SinkFactory import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory @@ -12,6 +13,7 @@ object SinkForType { } +@silent("deprecated") class SinkForTypeFunction[T <: AnyRef](resultsHolder: => TestResultsHolder[T]) extends SinkFunction[T] { override def invoke(value: T, context: SinkFunction.Context): Unit = { diff --git a/examples/installation/docker-compose.yml b/examples/installation/docker-compose.yml index a8e977f117a..e166c5de64d 100644 --- a/examples/installation/docker-compose.yml +++ b/examples/installation/docker-compose.yml @@ -210,7 +210,7 @@ services: build: context: ./flink/ args: - FLINK_VERSION: "1.19.1-scala_2.12-java11" + FLINK_VERSION: "1.20.0-scala_2.12-java11" restart: unless-stopped command: jobmanager environment: @@ -226,7 +226,7 @@ services: build: context: ./flink/ args: - FLINK_VERSION: "1.19.1-scala_2.12-java11" + FLINK_VERSION: "1.20.0-scala_2.12-java11" restart: unless-stopped command: taskmanager environment: diff --git a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/TestResultSinkFactory.scala b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/TestResultSinkFactory.scala index 05b0ddaebfb..0f2e55932ec 100644 --- a/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/TestResultSinkFactory.scala +++ b/utils/flink-components-testkit/src/main/scala/pl/touk/nussknacker/engine/flink/util/test/TestResultSinkFactory.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.flink.util.test import cats.data.NonEmptyList +import com.github.ghik.silencer.silent import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import pl.touk.nussknacker.engine.api._ @@ -49,6 +50,7 @@ object TestResultSinkFactory { ): FlatMapFunction[Context, ValueWithContext[Value]] = helper.lazyMapFunction(value) + @silent("deprecated") override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[Value] = new SinkFunction[Value] {