From cab3889dea223eff98311c368b55a6d14ba84b86 Mon Sep 17 00:00:00 2001 From: Boris Lublinsky Date: Thu, 26 Nov 2020 11:29:51 -0600 Subject: [PATCH] Refactored error handling (#897) * Refactored error handling Co-authored-by: franciscolopezsancho Co-authored-by: Raymond Roestenburg --- .../akkastream/AkkaStreamletContextImpl.scala | 36 +++++++++++---- .../flink/FlinkStreamletContextImpl.scala | 20 +++++++- .../kafka/SparkStreamletContextImpl.scala | 21 +++++++-- .../scala/cloudflow/streamlets/Codec.scala | 6 +-- .../cloudflow/streamlets/StreamletPort.scala | 27 +++++++++++ .../cloudflow/streamlets/avro/AvroCodec.scala | 12 ++--- .../cloudflow/streamlets/avro/AvroInlet.scala | 17 ++++--- .../streamlets/proto/ProtoCodec.scala | 9 ++-- .../streamlets/proto/ProtoInlet.scala | 19 ++++---- .../streamlets/proto/javadsl/ProtoCodec.scala | 5 +- .../streamlets/proto/javadsl/ProtoInlet.scala | 23 ++++++++-- .../develop/pages/schema-first-approach.adoc | 46 +++++++++++++++++++ 12 files changed, 192 insertions(+), 49 deletions(-) diff --git a/core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaStreamletContextImpl.scala b/core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaStreamletContextImpl.scala index 2a638fdaa..10d84f3ae 100644 --- a/core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaStreamletContextImpl.scala +++ b/core/cloudflow-akka/src/main/scala/cloudflow/akkastream/AkkaStreamletContextImpl.scala @@ -137,7 +137,13 @@ final class AkkaStreamletContextImpl( KafkaControls.add(c) NotUsed } - .map(record => inlet.codec.decode(record.value)) + .map(record => + inlet.codec.decode(record.value) match { + case Success(value) => Some(value) + case Failure(t) => inlet.errorHandler(record.value, t) + } + ) + .collect { case Some(v) => v } .via(handleTermination) } @@ -193,7 +199,13 @@ final class AkkaStreamletContextImpl( KafkaControls.add(c) NotUsed } - .map(record => inlet.codec.decode(record.value)) + .map(record => + inlet.codec.decode(record.value) match { + case Success(value) => Some(value) + case Failure(t) => inlet.errorHandler(record.value, t) + } + ) + .collect { case Some(v) => v } .via(handleTermination) .asSource }(system.dispatcher) @@ -297,9 +309,13 @@ final class AkkaStreamletContextImpl( NotUsed } .via(handleTermination) - .map { record => - inlet.codec.decode(record.value) - } + .map(record => + inlet.codec.decode(record.value) match { + case Success(value) => Some(value) + case Failure(t) => inlet.errorHandler(record.value, t) + } + ) + .collect { case Some(v) => v } } def shardedPlainSource[T, M, E](inlet: CodecInlet[T], @@ -350,9 +366,13 @@ final class AkkaStreamletContextImpl( NotUsed } .via(handleTermination) - .map { record => - inlet.codec.decode(record.value) - } + .map(record => + inlet.codec.decode(record.value) match { + case Success(value) => Some(value) + case Failure(t) => inlet.errorHandler(record.value, t) + } + ) + .collect { case Some(v) => v } }(system.dispatcher) } } diff --git a/core/cloudflow-flink/src/main/scala/cloudflow/flink/FlinkStreamletContextImpl.scala b/core/cloudflow-flink/src/main/scala/cloudflow/flink/FlinkStreamletContextImpl.scala index b98e6dcd1..b16c1d4e3 100644 --- a/core/cloudflow-flink/src/main/scala/cloudflow/flink/FlinkStreamletContextImpl.scala +++ b/core/cloudflow-flink/src/main/scala/cloudflow/flink/FlinkStreamletContextImpl.scala @@ -20,11 +20,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka._ - import com.typesafe.config._ import cloudflow.streamlets._ import java.{ util => ju } +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.util.Collector + +import scala.util._ + /** * An implementation of `FlinkStreamletContext` */ @@ -64,7 +68,19 @@ class FlinkStreamletContextImpl( // also this setting is honored only when checkpointing is on - otherwise the property in Kafka // "enable.auto.commit" is considered consumer.setCommitOffsetsOnCheckpoints(true) - env.addSource(consumer).map(inlet.codec.decode(_)) + env + .addSource(consumer) + .flatMap(new FlatMapFunction[Array[Byte], In]() { + override def flatMap(value: Array[Byte], out: Collector[In]): Unit = + inlet.codec.decode(value) match { + case Success(v) => out.collect(v) + case Failure(t) => + inlet.errorHandler(value, t) match { + case Some(r) => out.collect(r) + case _ => + } + } + }) } /** diff --git a/core/cloudflow-spark/src/main/scala/cloudflow/spark/kafka/SparkStreamletContextImpl.scala b/core/cloudflow-spark/src/main/scala/cloudflow/spark/kafka/SparkStreamletContextImpl.scala index 05e26c16a..139c566c3 100644 --- a/core/cloudflow-spark/src/main/scala/cloudflow/spark/kafka/SparkStreamletContextImpl.scala +++ b/core/cloudflow-spark/src/main/scala/cloudflow/spark/kafka/SparkStreamletContextImpl.scala @@ -26,6 +26,7 @@ import cloudflow.spark.sql.SQLImplicits._ import cloudflow.streamlets._ import scala.reflect.runtime.universe._ +import scala.util.{ Failure, Success } class SparkStreamletContextImpl( private[cloudflow] override val streamletDefinition: StreamletDefinition, @@ -52,17 +53,29 @@ class SparkStreamletContextImpl( .load() val rawDataset = src.select($"value").as[Array[Byte]] - - rawDataset.map(inPort.codec.decode(_)) + rawDataset + .map(raw => + inPort.codec.decode(raw) match { + case Success(v) => v + case Failure(t) => + inPort.errorHandler(raw, t) match { + case Some(r) => r + case _ => null.asInstanceOf[In] + } + } + ) + .filter(validateNotNull[In](_)) } - def kafkaConsumerMap(topic: Topic) = topic.kafkaConsumerProperties.map { + private def kafkaConsumerMap(topic: Topic) = topic.kafkaConsumerProperties.map { case (key, value) => s"kafka.$key" -> value } - def kafkaProducerMap(topic: Topic) = topic.kafkaProducerProperties.map { + private def kafkaProducerMap(topic: Topic) = topic.kafkaProducerProperties.map { case (key, value) => s"kafka.$key" -> value } + private def validateNotNull[T](message: T): Boolean = message != null + def writeStream[Out](stream: Dataset[Out], outPort: CodecOutlet[Out], outputMode: OutputMode, optionalTrigger: Option[Trigger] = None)( implicit encoder: Encoder[Out], typeTag: TypeTag[Out] diff --git a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/Codec.scala b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/Codec.scala index ec918e0c9..1624b5369 100644 --- a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/Codec.scala +++ b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/Codec.scala @@ -17,13 +17,11 @@ package cloudflow.streamlets import scala.annotation._ -import scala.util.control._ +import scala.util.Try @implicitNotFound(msg = "Cannot find Codec typeclass for ${T}") trait Codec[T] extends Serializable { def encode(value: T): Array[Byte] // TODO, docs that it throws DecodeException - def decode(value: Array[Byte]): T + def decode(value: Array[Byte]): Try[T] } - -case class DecodeException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with NoStackTrace diff --git a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/StreamletPort.scala b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/StreamletPort.scala index d1e845c0f..e12d4c9ba 100644 --- a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/StreamletPort.scala +++ b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/StreamletPort.scala @@ -16,6 +16,8 @@ package cloudflow.streamlets +import org.slf4j.LoggerFactory + /** * A named port handle handle to read or write data according to a schema. */ @@ -48,6 +50,20 @@ trait Inlet extends StreamletPort */ trait Outlet extends StreamletPort +object CodecInlet { + + val logger = LoggerFactory.getLogger(this.getClass) + + /** + * A default error handler. This error handler just logs bad message and skips them. + */ + def logAndSkip[T](message: Array[Byte], cause: Throwable): Option[T] = { + logger.error("Data decoding error, skipping message", cause) + None + } + +} + /** * A handle to read and deserialize data into elements of type `T`. */ @@ -76,6 +92,17 @@ trait CodecInlet[T] extends Inlet { * If no unique group Id is set (which is the default), streamlet instances will each receive part of the data (on this inlet). */ def withUniqueGroupId: CodecInlet[T] + + /** + * Sets a value for error handler for potential data unmarshalling errors + * If no error handler is specified, defaults to logging error and skipping record. + */ + def withErrorHandler(f: (Array[Byte], Throwable) => Option[T]): CodecInlet[T] + + /** + * handle marshalling errors + */ + val errorHandler: (Array[Byte], Throwable) => Option[T] } /** diff --git a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/avro/AvroCodec.scala b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/avro/AvroCodec.scala index ef9ed3f79..a8cf038eb 100644 --- a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/avro/AvroCodec.scala +++ b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/avro/AvroCodec.scala @@ -30,9 +30,9 @@ class AvroCodec[T <: SpecificRecordBase](avroSchema: Schema) extends Codec[T] { val recordInjection: Injection[T, Array[Byte]] = SpecificAvroCodecs.toBinary(avroSchema) val avroSerde = new AvroSerde(recordInjection) - def encode(value: T): Array[Byte] = avroSerde.encode(value) - def decode(bytes: Array[Byte]): T = avroSerde.decode(bytes) - def schema: Schema = avroSchema + def encode(value: T): Array[Byte] = avroSerde.encode(value) + def decode(bytes: Array[Byte]): Try[T] = avroSerde.decode(bytes) + def schema: Schema = avroSchema } private[avro] class AvroSerde[T <: SpecificRecordBase](injection: Injection[T, Array[Byte]]) extends Serializable { @@ -41,9 +41,5 @@ private[avro] class AvroSerde[T <: SpecificRecordBase](injection: Injection[T, A def encode(value: T): Array[Byte] = injection(value) // TODO fix up the exception, maybe pas through input - def decode(bytes: Array[Byte]): T = - Try(inverted(bytes).get).recoverWith { - case t => - Failure(DecodeException("Could not decode.", t)) - }.get + def decode(bytes: Array[Byte]): Try[T] = inverted(bytes) } diff --git a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/avro/AvroInlet.scala b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/avro/AvroInlet.scala index 2763ead88..683b9d73d 100644 --- a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/avro/AvroInlet.scala +++ b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/avro/AvroInlet.scala @@ -20,15 +20,18 @@ import cloudflow.streamlets._ import org.apache.avro.specific.SpecificRecordBase import scala.reflect.ClassTag -import scala.reflect._ - import AvroUtil._ -case class AvroInlet[T <: SpecificRecordBase: ClassTag](name: String, hasUniqueGroupId: Boolean = false) extends CodecInlet[T] { - def codec = new AvroCodec[T](makeSchema) - def schemaDefinition = createSchemaDefinition(makeSchema) - def schemaAsString = makeSchema.toString(false) - def withUniqueGroupId: AvroInlet[T] = copy(hasUniqueGroupId = true) +case class AvroInlet[T <: SpecificRecordBase: ClassTag]( + name: String, + hasUniqueGroupId: Boolean = false, + errorHandler: (Array[Byte], Throwable) => Option[T] = CodecInlet.logAndSkip[T](_: Array[Byte], _: Throwable) +) extends CodecInlet[T] { + def codec = new AvroCodec[T](makeSchema) + def schemaDefinition = createSchemaDefinition(makeSchema) + def schemaAsString = makeSchema.toString(false) + def withUniqueGroupId: AvroInlet[T] = copy(hasUniqueGroupId = true) + override def withErrorHandler(handler: (Array[Byte], Throwable) => Option[T]): CodecInlet[T] = copy(errorHandler = handler) } object AvroInlet { diff --git a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/ProtoCodec.scala b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/ProtoCodec.scala index 66688175b..834795a5e 100644 --- a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/ProtoCodec.scala +++ b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/ProtoCodec.scala @@ -17,11 +17,12 @@ package cloudflow.streamlets.proto import scalapb.{ GeneratedMessage, GeneratedMessageCompanion } - import cloudflow.streamlets._ +import scala.util.Try + class ProtoCodec[T <: GeneratedMessage: GeneratedMessageCompanion] extends Codec[T] { - val cmp = implicitly[GeneratedMessageCompanion[T]] - def encode(value: T): Array[Byte] = cmp.toByteArray(value) - def decode(bytes: Array[Byte]): T = cmp.parseFrom(bytes) + val cmp = implicitly[GeneratedMessageCompanion[T]] + def encode(value: T): Array[Byte] = cmp.toByteArray(value) + def decode(bytes: Array[Byte]): Try[T] = Try { cmp.parseFrom(bytes) } } diff --git a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/ProtoInlet.scala b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/ProtoInlet.scala index 40d215355..db9ddbd02 100644 --- a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/ProtoInlet.scala +++ b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/ProtoInlet.scala @@ -17,14 +17,17 @@ package cloudflow.streamlets.proto import cloudflow.streamlets._ - import scalapb.{ GeneratedMessage, GeneratedMessageCompanion } -final case class ProtoInlet[T <: GeneratedMessage: GeneratedMessageCompanion](name: String, hasUniqueGroupId: Boolean = false) - extends CodecInlet[T] { - val cmp = implicitly[GeneratedMessageCompanion[T]] - val codec = new ProtoCodec[T] - def schemaAsString = cmp.scalaDescriptor.asProto.toProtoString - def schemaDefinition = ProtoUtil.createSchemaDefinition(cmp.scalaDescriptor) - def withUniqueGroupId: ProtoInlet[T] = if (hasUniqueGroupId) this else copy(hasUniqueGroupId = true) +final case class ProtoInlet[T <: GeneratedMessage: GeneratedMessageCompanion]( + name: String, + hasUniqueGroupId: Boolean = false, + errorHandler: (Array[Byte], Throwable) => Option[T] = CodecInlet.logAndSkip[T](_: Array[Byte], _: Throwable) +) extends CodecInlet[T] { + val cmp = implicitly[GeneratedMessageCompanion[T]] + val codec = new ProtoCodec[T] + def schemaAsString = cmp.scalaDescriptor.asProto.toProtoString + def schemaDefinition = ProtoUtil.createSchemaDefinition(cmp.scalaDescriptor) + def withUniqueGroupId: ProtoInlet[T] = if (hasUniqueGroupId) this else copy(hasUniqueGroupId = true) + override def withErrorHandler(handler: (Array[Byte], Throwable) => Option[T]): CodecInlet[T] = copy(errorHandler = handler) } diff --git a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/javadsl/ProtoCodec.scala b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/javadsl/ProtoCodec.scala index f36921bb4..f336023cf 100644 --- a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/javadsl/ProtoCodec.scala +++ b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/javadsl/ProtoCodec.scala @@ -19,10 +19,13 @@ package cloudflow.streamlets.proto.javadsl import cloudflow.streamlets._ import com.google.protobuf.GeneratedMessageV3 +import scala.util.Try + class ProtoCodec[T <: GeneratedMessageV3](clazz: Class[T]) extends Codec[T] { val parser = clazz.getMethod("parseFrom", classOf[Array[Byte]]) def encode(value: T): Array[Byte] = value.toByteArray - def decode(bytes: Array[Byte]): T = + def decode(bytes: Array[Byte]): Try[T] = Try { parser.invoke(clazz, bytes).asInstanceOf[T] + } } diff --git a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/javadsl/ProtoInlet.scala b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/javadsl/ProtoInlet.scala index 8d66eae21..289e29402 100644 --- a/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/javadsl/ProtoInlet.scala +++ b/core/cloudflow-streamlets/src/main/scala/cloudflow/streamlets/proto/javadsl/ProtoInlet.scala @@ -17,11 +17,18 @@ package cloudflow.streamlets.proto.javadsl import cloudflow.streamlets._ +import cloudflow.streamlets.avro.AvroInlet import com.google.protobuf.Descriptors.Descriptor import com.google.protobuf.{ GeneratedMessageV3, TextFormat } -final case class ProtoInlet[T <: GeneratedMessageV3](name: String, clazz: Class[T], hasUniqueGroupId: Boolean = false) - extends CodecInlet[T] { +import scala.reflect.ClassTag + +final case class ProtoInlet[T <: GeneratedMessageV3]( + name: String, + clazz: Class[T], + hasUniqueGroupId: Boolean = false, + errorHandler: (Array[Byte], Throwable) => Option[T] = CodecInlet.logAndSkip[T](_: Array[Byte], _: Throwable) +) extends CodecInlet[T] { // We know we can do this because of 'GeneratedMessageV3' val descriptor = clazz.getMethod("getDescriptor").invoke(null).asInstanceOf[Descriptor] @@ -29,5 +36,15 @@ final case class ProtoInlet[T <: GeneratedMessageV3](name: String, clazz: Class[ def schemaAsString = TextFormat.printToUnicodeString(descriptor.toProto) def schemaDefinition = ProtoUtil.createSchemaDefinition(descriptor) - def withUniqueGroupId: ProtoInlet[T] = if (hasUniqueGroupId) this else copy(hasUniqueGroupId = true) + def withUniqueGroupId: ProtoInlet[T] = if (hasUniqueGroupId) this else copy(hasUniqueGroupId = true) + override def withErrorHandler(handler: (Array[Byte], Throwable) => Option[T]): CodecInlet[T] = copy(errorHandler = handler) +} + +object ProtoInlet { + // Java API + def create[T <: GeneratedMessageV3](name: String, clazz: Class[T]): ProtoInlet[T] = + ProtoInlet[T](name, clazz) + + def create[T <: GeneratedMessageV3](name: String, clazz: Class[T], hasUniqueGroupId: Boolean): ProtoInlet[T] = + ProtoInlet[T](name, clazz, hasUniqueGroupId) } diff --git a/docs/shared-content-source/docs/modules/develop/pages/schema-first-approach.adoc b/docs/shared-content-source/docs/modules/develop/pages/schema-first-approach.adoc index 0d362626f..c09a518a9 100644 --- a/docs/shared-content-source/docs/modules/develop/pages/schema-first-approach.adoc +++ b/docs/shared-content-source/docs/modules/develop/pages/schema-first-approach.adoc @@ -65,6 +65,52 @@ Cloudflow ensures the following compile time guarantees through this type defini + For example, when implementing `createLogic` if you do `readStream(in)` where `in` is an inlet parameterized by a type other than `CallRecord`, the compiler will complain. +== Dealing with "bad" messages + +Despite data integrity guarantee through schemas provided by Cloudflow, there is still +cases of "bad" messages coming into inlet. This can be a result of different schema versions +or the case when inlet is listening on the topic produced by non Cloudflow applications. + +There are two distict flavors of "bad" data: + +* Syntactically bad data - the data that does not adhere to inlet's schema and as a result can't +be unmarshalled properly +* Semantically bad data is the data that is syntactically correct, but its content does not +adhere to business requirements for streamlet processing. + +Dealing with Semantically bad data has to be implemented directly by Streamlet's logic, while +support for dealing with semantically bad data is provided by Cloudflow. + +There are several possible strategies for dealing with semantically bad data: + +* You can skip bad data records. + +NOTE: If the bad record is skipped, its offset is not committed. This is ok, if the occurance of +of the "bad" records is rare. The next record's offset will be committed. If all the records are +"bad", then the offset will be never committed. + +* You can replace bad message with the predefined ones. + +In addition there several possible approaches to reporting "bad" records, for example: + +* Logging +* Writing to the dead-leter queue/topic +* etc. + +By default, cloudflow will log "bad" records and skip them. This behavior can be overwritten (on an Inlet level) +by adding a custom error handler to inlet: + +[source,scala] +---- +val in = AvroInlet[CallRecord]("in").withErrorHandler(CustomHandler) +---- +Here `CustomHandler` is any function adhering to the following interface + +[source,scala] +---- +errorHandler: (Array[Byte], Throwable) => Option[T] +---- + == Outlets and the partitioning function Similar to inlets, the user can define an outlet as: