Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1872] Bump Flink to 1.20 #7137

Draft
wants to merge 3 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ lazy val commonSettings =
// Note: when updating check versions in 'flink*V' below, because some libraries must be fixed at versions provided
// by Flink, or jobs may fail in runtime when Flink is run with 'classloader.resolve-order: parent-first'.
// You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file.
val flinkV = "1.19.1"
val flinkConnectorKafkaV = "3.2.0-1.19"
val flinkV = "1.20.0"
val flinkConnectorKafkaV = "3.3.0-1.20"
val flinkCommonsCompressV = "1.26.0"
val flinkCommonsLang3V = "3.12.0"
val flinkCommonsTextV = "1.10.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.api.process

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
Expand Down Expand Up @@ -66,5 +67,6 @@ trait BasicFlinkSink extends FlinkSink with ExplicitUidInOperatorsSupport {
helper: FlinkLazyParameterFunctionHelper
): FlatMapFunction[Context, ValueWithContext[Value]]

@silent("deprecated")
def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[Value]
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.sink

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.{DiscardingSink, SinkFunction}
import pl.touk.nussknacker.engine.api.{Context, ValueWithContext}
Expand All @@ -20,6 +21,8 @@ trait EmptySink extends BasicFlinkSink {
): FlatMapFunction[Context, ValueWithContext[AnyRef]] =
(_, _) => {}

@silent("deprecated")
override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] =
new DiscardingSink[AnyRef]

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.sink

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import pl.touk.nussknacker.engine.api.process.SinkFactory
Expand All @@ -16,6 +17,7 @@ object SingleValueSinkFactory {
final val SingleValueParamName = "Value"
}

@silent("deprecated")
class SingleValueSinkFactory[T <: AnyRef](sink: => SinkFunction[T]) extends SinkFactory with Serializable {

@MethodToInvoke
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.table.definition

import com.github.ghik.silencer.silent
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.catalog.{Catalog, CatalogTable, GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.factories.CatalogFactory
Expand All @@ -26,6 +27,7 @@ object StubbedCatalogFactory {

private val catalog: GenericInMemoryCatalog = populateCatalog(new GenericInMemoryCatalog(catalogName))

@silent("deprecated")
private def populateCatalog(inMemoryCatalog: GenericInMemoryCatalog): GenericInMemoryCatalog = {
val sampleBoundedTable = CatalogTable.of(
Schema.newBuilder().column(sampleColumnName, DataTypes.STRING()).build(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.table.sink

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.{RichFlatMapFunction, RuntimeContext}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
Expand Down Expand Up @@ -38,6 +39,7 @@ class TableSink(
)
}

@silent("deprecated")
override def registerSink(
dataStream: DataStream[ValueWithContext[Value]],
flinkNodeContext: FlinkCustomNodeContext
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package pl.touk.nussknacker.engine.process.registrar

import com.github.ghik.silencer.silent
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import pl.touk.nussknacker.engine.api.component.{ComponentType, NodeComponentInfo}
import pl.touk.nussknacker.engine.api.ValueWithContext
import pl.touk.nussknacker.engine.process.ExceptionHandlerFunction
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData
import pl.touk.nussknacker.engine.testmode.SinkInvocationCollector

@silent("deprecated")
private[registrar] class CollectingSinkFunction[T](
val compilerDataForClassloader: ClassLoader => FlinkProcessCompilerData,
collectingSink: SinkInvocationCollector,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.helpers

import com.github.ghik.silencer.silent
import com.typesafe.config.Config
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
Expand Down Expand Up @@ -135,6 +136,7 @@ case object SinkAccessingNodeContext extends EmptySink with Serializable {

def nodeId: String = _nodeId

@silent("deprecated")
override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] = {
_nodeId = flinkNodeContext.nodeId
super.toFlinkFunction(flinkNodeContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import scala.jdk.CollectionConverters._

object DelayedFlinkKafkaConsumer {

@silent("deprecated")
def apply[T](
topics: NonEmptyList[PreparedKafkaTopic[TopicName.ForSource]],
schema: KafkaDeserializationSchema[T],
Expand Down Expand Up @@ -88,6 +89,7 @@ object DelayedFlinkKafkaConsumer {
}
}

@silent("deprecated")
type ExtractTimestampForDelay[T] = (KafkaTopicPartitionState[T, TopicPartition], T, Long) => Long
}

Expand Down Expand Up @@ -154,6 +156,7 @@ object DelayedKafkaFetcher {
private val maxSleepTime = time.Duration.of(30, ChronoUnit.SECONDS).toMillis
}

@silent("deprecated")
class DelayedKafkaFetcher[T](
sourceContext: SourceFunction.SourceContext[T],
assignedPartitionsWithInitialOffsets: util.Map[KafkaTopicPartition, lang.Long],
Expand Down Expand Up @@ -188,6 +191,7 @@ class DelayedKafkaFetcher[T](
with LazyLogging {
import DelayedKafkaFetcher._

@silent("deprecated")
override def emitRecordsWithTimestamps(
records: util.Queue[T],
partitionState: KafkaTopicPartitionState[T, TopicPartition],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.kafka.serialization

import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -23,6 +24,7 @@ object FlinkSerializationSchemaConversions extends LazyLogging {
): FlinkDeserializationSchemaWrapper[T] =
new FlinkDeserializationSchemaWrapper[T](deserializationSchema)

@silent("deprecated")
class FlinkDeserializationSchemaWrapper[T](deserializationSchema: serialization.KafkaDeserializationSchema[T])
extends kafka.KafkaDeserializationSchema[T] {

Expand Down Expand Up @@ -73,6 +75,7 @@ object FlinkSerializationSchemaConversions extends LazyLogging {

}

@silent("deprecated")
def wrapToFlinkSerializationSchema[T](
serializationSchema: serialization.KafkaSerializationSchema[T]
): kafka.KafkaSerializationSchema[T] = new kafka.KafkaSerializationSchema[T] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.kafka.serialization

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper
import org.apache.kafka.clients.consumer.ConsumerRecord
Expand All @@ -8,6 +9,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
// and for Flink's compatibility reasons we don't want to invoke deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
// because it wasn't available in previous Flink's versions
@SerialVersionUID(7952681455584002102L)
@silent("deprecated")
class NKKafkaDeserializationSchemaWrapper[T](deserializationSchema: DeserializationSchema[T])
extends KafkaDeserializationSchemaWrapper[T](deserializationSchema) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.kafka.sink.flink

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import pl.touk.nussknacker.engine.api.process.TopicName
Expand Down Expand Up @@ -31,6 +32,7 @@ class FlinkKafkaSink(
): FlatMapFunction[Context, ValueWithContext[AnyRef]] =
helper.lazyMapFunction(value)

@silent("deprecated")
override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] =
PartitionByKeyFlinkKafkaProducer(kafkaConfig, topic.prepared, serializationSchema, clientId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.management.sample
import com.cronutils.model.CronType
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.parser.CronParser
import com.github.ghik.silencer.silent
import io.circe.parser.decode
import io.circe.{Decoder, Encoder}
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
Expand Down Expand Up @@ -58,6 +59,7 @@ class DevProcessConfigCreator extends ProcessConfigCreator {
private def all[T](value: T): WithCategories[T] =
WithCategories(value, "Category1", "Category2", "DevelopmentTests", "Periodic")

@silent("deprecated")
override def sinkFactories(
modelDependencies: ProcessObjectDependencies
): Map[String, WithCategories[SinkFactory]] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;
import org.apache.flink.api.common.JobID;

import org.slf4j.MDC;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.flink.util.Preconditions.checkArgument;

/** Utility class to manage common Flink attributes in {@link MDC} (only {@link JobID} ATM). */
// TODO: temporary patch for poc
public class MdcUtils {

public static final String JOB_ID = "flink-job-id";

/**
* Replace MDC contents with the provided one and return a closeable object that can be used to
* restore the original MDC.
*
* @param context to put into MDC
*/
public static MdcCloseable withContext(Map<String, String> context) {
final Map<String, String> orig = MDC.getCopyOfContextMap();
final Map<String, String> safeOrig = orig != null ? orig : Collections.emptyMap();
MDC.setContextMap(context);
return () -> MDC.setContextMap(safeOrig);
}

/** {@link AutoCloseable } that restores the {@link MDC} contents on close. */
public interface MdcCloseable extends AutoCloseable {
@Override
void close();
}

/**
* Wrap the given {@link Runnable} so that the given data is added to {@link MDC} before its
* execution and removed afterward.
*/
public static Runnable wrapRunnable(Map<String, String> contextData, Runnable command) {
return () -> {
try (MdcCloseable ctx = withContext(contextData)) {
command.run();
}
};
}

/**
* Wrap the given {@link Callable} so that the given data is added to {@link MDC} before its
* execution and removed afterward.
*/
public static <T> Callable<T> wrapCallable(
Map<String, String> contextData, Callable<T> command) {
return () -> {
try (MdcCloseable ctx = withContext(contextData)) {
return command.call();
}
};
}

/**
* Wrap the given {@link Executor} so that the given {@link JobID} is added before it executes
* any submitted commands and removed afterward.
*/
public static Executor scopeToJob(JobID jobID, Executor executor) {
checkArgument(!(executor instanceof MdcAwareExecutor));
return new MdcAwareExecutor<>(executor, asContextData(jobID));
}

/**
* Wrap the given {@link ExecutorService} so that the given {@link JobID} is added before it
* executes any submitted commands and removed afterward.
*/
public static ExecutorService scopeToJob(JobID jobID, ExecutorService delegate) {
checkArgument(!(delegate instanceof MdcAwareExecutorService));
return new MdcAwareExecutorService<>(delegate, asContextData(jobID));
}

/**
* Wrap the given {@link ScheduledExecutorService} so that the given {@link JobID} is added
* before it executes any submitted commands and removed afterward.
*/
public static ScheduledExecutorService scopeToJob(JobID jobID, ScheduledExecutorService ses) {
checkArgument(!(ses instanceof MdcAwareScheduledExecutorService));
return new MdcAwareScheduledExecutorService(ses, asContextData(jobID));
}

public static Map<String, String> asContextData(JobID jobID) {
return Collections.singletonMap(JOB_ID, jobID.toHexString());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.schemedkafka.sink.flink

import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.ParsedSchema
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
Expand Down Expand Up @@ -55,6 +56,7 @@ class FlinkKafkaUniversalSink(
ds.flatMap(new KeyedValueMapper(flinkNodeContext.lazyParameterHelper, key, value), typeInfo)
}

@silent("deprecated")
private def toFlinkFunction: SinkFunction[KeyedValue[AnyRef, AnyRef]] = {
PartitionByKeyFlinkKafkaProducer(kafkaConfig, preparedTopic.prepared, serializationSchema, clientId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ object SampleNodes {

override def canBeEnding: Boolean = true

@silent("deprecated")
@MethodToInvoke(returnType = classOf[String])
def execute(@ParamName("param") @Nullable param: LazyParameter[String]) =
FlinkCustomStreamTransformation((start: DataStream[Context], context: FlinkCustomNodeContext) => {
Expand Down Expand Up @@ -570,6 +571,7 @@ object SampleNodes {
): FlatMapFunction[Context, ValueWithContext[String]] =
(ctx, collector) => collector.collect(ValueWithContext(serializableValue, ctx))

@silent("deprecated")
override def toFlinkFunction(flinkCustomNodeContext: FlinkCustomNodeContext): SinkFunction[String] =
new SinkFunction[String] {
override def invoke(value: String, context: SinkFunction.Context): Unit = resultsHolder.add(value)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.helpers

import com.github.ghik.silencer.silent
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import pl.touk.nussknacker.engine.api.process.SinkFactory
import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory
Expand All @@ -12,6 +13,7 @@ object SinkForType {

}

@silent("deprecated")
class SinkForTypeFunction[T <: AnyRef](resultsHolder: => TestResultsHolder[T]) extends SinkFunction[T] {

override def invoke(value: T, context: SinkFunction.Context): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions examples/installation/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ services:
build:
context: ./flink/
args:
FLINK_VERSION: "1.19.1-scala_2.12-java11"
FLINK_VERSION: "1.20.0-scala_2.12-java11"
restart: unless-stopped
command: jobmanager
environment:
Expand All @@ -226,7 +226,7 @@ services:
build:
context: ./flink/
args:
FLINK_VERSION: "1.19.1-scala_2.12-java11"
FLINK_VERSION: "1.20.0-scala_2.12-java11"
restart: unless-stopped
command: taskmanager
environment:
Expand Down
Loading
Loading