Skip to content

Commit

Permalink
bump Flink
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Nov 8, 2024
1 parent 03566ab commit 809e286
Show file tree
Hide file tree
Showing 18 changed files with 42 additions and 6 deletions.
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ lazy val commonSettings =
// Note: when updating check versions in 'flink*V' below, because some libraries must be fixed at versions provided
// by Flink, or jobs may fail in runtime when Flink is run with 'classloader.resolve-order: parent-first'.
// You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file.
val flinkV = "1.19.1"
val flinkConnectorKafkaV = "3.2.0-1.19"
val flinkV = "1.20.0"
val flinkConnectorKafkaV = "3.3.0-1.20"
val flinkCommonsCompressV = "1.26.0"
val flinkCommonsLang3V = "3.12.0"
val flinkCommonsTextV = "1.10.0"
Expand All @@ -297,7 +297,7 @@ val scalaCheckVshort = scalaCheckV.take(4).replace(".", "-")
val scalaTestPlusV =
"3.2.18.0" // has to match scalatest and scalacheck versions, see https://github.com/scalatest/scalatestplus-scalacheck/releases
// note: Logback 1.3 requires Slf4j 2.x, but Flink has Slf4j 1.7 on its classpath
val logbackV = "1.2.13"
val logbackV = "1.5.12"
// this is used in cloud, official JsonEncoder uses different field layout
val logbackJsonV = "0.1.5"
val betterFilesV = "3.9.2"
Expand All @@ -309,7 +309,7 @@ val jacksonV = "2.17.2"
val catsV = "2.12.0"
val catsEffectV = "3.5.4"
val everitSchemaV = "1.14.4"
val slf4jV = "1.7.36"
val slf4jV = "2.0.16"
val scalaLoggingV = "3.9.5"
val scalaCompatV = "1.0.2"
val ficusV = "1.4.7"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.api.process

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
Expand Down Expand Up @@ -66,5 +67,6 @@ trait BasicFlinkSink extends FlinkSink with ExplicitUidInOperatorsSupport {
helper: FlinkLazyParameterFunctionHelper
): FlatMapFunction[Context, ValueWithContext[Value]]

@silent("deprecated")
def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[Value]
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.sink

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.{DiscardingSink, SinkFunction}
import pl.touk.nussknacker.engine.api.{Context, ValueWithContext}
Expand All @@ -20,6 +21,8 @@ trait EmptySink extends BasicFlinkSink {
): FlatMapFunction[Context, ValueWithContext[AnyRef]] =
(_, _) => {}

@silent("deprecated")
override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] =
new DiscardingSink[AnyRef]

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.util.sink

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import pl.touk.nussknacker.engine.api.process.SinkFactory
Expand All @@ -16,6 +17,7 @@ object SingleValueSinkFactory {
final val SingleValueParamName = "Value"
}

@silent("deprecated")
class SingleValueSinkFactory[T <: AnyRef](sink: => SinkFunction[T]) extends SinkFactory with Serializable {

@MethodToInvoke
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.table.definition

import com.github.ghik.silencer.silent
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.catalog.{Catalog, CatalogTable, GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.factories.CatalogFactory
Expand All @@ -26,6 +27,7 @@ object StubbedCatalogFactory {

private val catalog: GenericInMemoryCatalog = populateCatalog(new GenericInMemoryCatalog(catalogName))

@silent("deprecated")
private def populateCatalog(inMemoryCatalog: GenericInMemoryCatalog): GenericInMemoryCatalog = {
val sampleBoundedTable = CatalogTable.of(
Schema.newBuilder().column(sampleColumnName, DataTypes.STRING()).build(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.flink.table.sink

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.{RichFlatMapFunction, RuntimeContext}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
Expand Down Expand Up @@ -38,6 +39,7 @@ class TableSink(
)
}

@silent("deprecated")
override def registerSink(
dataStream: DataStream[ValueWithContext[Value]],
flinkNodeContext: FlinkCustomNodeContext
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package pl.touk.nussknacker.engine.process.registrar

import com.github.ghik.silencer.silent
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import pl.touk.nussknacker.engine.api.component.{ComponentType, NodeComponentInfo}
import pl.touk.nussknacker.engine.api.ValueWithContext
import pl.touk.nussknacker.engine.process.ExceptionHandlerFunction
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData
import pl.touk.nussknacker.engine.testmode.SinkInvocationCollector

@silent("deprecated")
private[registrar] class CollectingSinkFunction[T](
val compilerDataForClassloader: ClassLoader => FlinkProcessCompilerData,
collectingSink: SinkInvocationCollector,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.helpers

import com.github.ghik.silencer.silent
import com.typesafe.config.Config
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
Expand Down Expand Up @@ -135,6 +136,7 @@ case object SinkAccessingNodeContext extends EmptySink with Serializable {

def nodeId: String = _nodeId

@silent("deprecated")
override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] = {
_nodeId = flinkNodeContext.nodeId
super.toFlinkFunction(flinkNodeContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import scala.jdk.CollectionConverters._

object DelayedFlinkKafkaConsumer {

@silent("deprecated")
def apply[T](
topics: NonEmptyList[PreparedKafkaTopic[TopicName.ForSource]],
schema: KafkaDeserializationSchema[T],
Expand Down Expand Up @@ -88,6 +89,7 @@ object DelayedFlinkKafkaConsumer {
}
}

@silent("deprecated")
type ExtractTimestampForDelay[T] = (KafkaTopicPartitionState[T, TopicPartition], T, Long) => Long
}

Expand Down Expand Up @@ -154,6 +156,7 @@ object DelayedKafkaFetcher {
private val maxSleepTime = time.Duration.of(30, ChronoUnit.SECONDS).toMillis
}

@silent("deprecated")
class DelayedKafkaFetcher[T](
sourceContext: SourceFunction.SourceContext[T],
assignedPartitionsWithInitialOffsets: util.Map[KafkaTopicPartition, lang.Long],
Expand Down Expand Up @@ -188,6 +191,7 @@ class DelayedKafkaFetcher[T](
with LazyLogging {
import DelayedKafkaFetcher._

@silent("deprecated")
override def emitRecordsWithTimestamps(
records: util.Queue[T],
partitionState: KafkaTopicPartitionState[T, TopicPartition],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.kafka.serialization

import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -23,6 +24,7 @@ object FlinkSerializationSchemaConversions extends LazyLogging {
): FlinkDeserializationSchemaWrapper[T] =
new FlinkDeserializationSchemaWrapper[T](deserializationSchema)

@silent("deprecated")
class FlinkDeserializationSchemaWrapper[T](deserializationSchema: serialization.KafkaDeserializationSchema[T])
extends kafka.KafkaDeserializationSchema[T] {

Expand Down Expand Up @@ -73,6 +75,7 @@ object FlinkSerializationSchemaConversions extends LazyLogging {

}

@silent("deprecated")
def wrapToFlinkSerializationSchema[T](
serializationSchema: serialization.KafkaSerializationSchema[T]
): kafka.KafkaSerializationSchema[T] = new kafka.KafkaSerializationSchema[T] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.kafka.serialization

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper
import org.apache.kafka.clients.consumer.ConsumerRecord
Expand All @@ -8,6 +9,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
// and for Flink's compatibility reasons we don't want to invoke deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
// because it wasn't available in previous Flink's versions
@SerialVersionUID(7952681455584002102L)
@silent("deprecated")
class NKKafkaDeserializationSchemaWrapper[T](deserializationSchema: DeserializationSchema[T])
extends KafkaDeserializationSchemaWrapper[T](deserializationSchema) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.kafka.sink.flink

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import pl.touk.nussknacker.engine.api.process.TopicName
Expand Down Expand Up @@ -31,6 +32,7 @@ class FlinkKafkaSink(
): FlatMapFunction[Context, ValueWithContext[AnyRef]] =
helper.lazyMapFunction(value)

@silent("deprecated")
override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[AnyRef] =
PartitionByKeyFlinkKafkaProducer(kafkaConfig, topic.prepared, serializationSchema, clientId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.management.sample
import com.cronutils.model.CronType
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.parser.CronParser
import com.github.ghik.silencer.silent
import io.circe.parser.decode
import io.circe.{Decoder, Encoder}
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
Expand Down Expand Up @@ -58,6 +59,7 @@ class DevProcessConfigCreator extends ProcessConfigCreator {
private def all[T](value: T): WithCategories[T] =
WithCategories(value, "Category1", "Category2", "DevelopmentTests", "Periodic")

@silent("deprecated")
override def sinkFactories(
modelDependencies: ProcessObjectDependencies
): Map[String, WithCategories[SinkFactory]] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.schemedkafka.sink.flink

import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.ParsedSchema
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
Expand Down Expand Up @@ -55,6 +56,7 @@ class FlinkKafkaUniversalSink(
ds.flatMap(new KeyedValueMapper(flinkNodeContext.lazyParameterHelper, key, value), typeInfo)
}

@silent("deprecated")
private def toFlinkFunction: SinkFunction[KeyedValue[AnyRef, AnyRef]] = {
PartitionByKeyFlinkKafkaProducer(kafkaConfig, preparedTopic.prepared, serializationSchema, clientId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ object SampleNodes {

override def canBeEnding: Boolean = true

@silent("deprecated")
@MethodToInvoke(returnType = classOf[String])
def execute(@ParamName("param") @Nullable param: LazyParameter[String]) =
FlinkCustomStreamTransformation((start: DataStream[Context], context: FlinkCustomNodeContext) => {
Expand Down Expand Up @@ -570,6 +571,7 @@ object SampleNodes {
): FlatMapFunction[Context, ValueWithContext[String]] =
(ctx, collector) => collector.collect(ValueWithContext(serializableValue, ctx))

@silent("deprecated")
override def toFlinkFunction(flinkCustomNodeContext: FlinkCustomNodeContext): SinkFunction[String] =
new SinkFunction[String] {
override def invoke(value: String, context: SinkFunction.Context): Unit = resultsHolder.add(value)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.helpers

import com.github.ghik.silencer.silent
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import pl.touk.nussknacker.engine.api.process.SinkFactory
import pl.touk.nussknacker.engine.flink.util.sink.SingleValueSinkFactory
Expand All @@ -12,6 +13,7 @@ object SinkForType {

}

@silent("deprecated")
class SinkForTypeFunction[T <: AnyRef](resultsHolder: => TestResultsHolder[T]) extends SinkFunction[T] {

override def invoke(value: T, context: SinkFunction.Context): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions examples/installation/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ services:
build:
context: ./flink/
args:
FLINK_VERSION: "1.19.1-scala_2.12-java11"
FLINK_VERSION: "1.20.0-scala_2.12-java11"
restart: unless-stopped
command: jobmanager
environment:
Expand All @@ -226,7 +226,7 @@ services:
build:
context: ./flink/
args:
FLINK_VERSION: "1.19.1-scala_2.12-java11"
FLINK_VERSION: "1.20.0-scala_2.12-java11"
restart: unless-stopped
command: taskmanager
environment:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.flink.util.test

import cats.data.NonEmptyList
import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import pl.touk.nussknacker.engine.api._
Expand Down Expand Up @@ -49,6 +50,7 @@ object TestResultSinkFactory {
): FlatMapFunction[Context, ValueWithContext[Value]] =
helper.lazyMapFunction(value)

@silent("deprecated")
override def toFlinkFunction(flinkNodeContext: FlinkCustomNodeContext): SinkFunction[Value] =
new SinkFunction[Value] {

Expand Down

0 comments on commit 809e286

Please sign in to comment.