Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Refactored error handling (#897)
Browse files Browse the repository at this point in the history
* Refactored error handling

Co-authored-by: franciscolopezsancho <[email protected]>
Co-authored-by: Raymond Roestenburg <[email protected]>
  • Loading branch information
3 people authored Nov 26, 2020
1 parent 7aee74c commit cab3889
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ final class AkkaStreamletContextImpl(
KafkaControls.add(c)
NotUsed
}
.map(record => inlet.codec.decode(record.value))
.map(record =>
inlet.codec.decode(record.value) match {
case Success(value) => Some(value)
case Failure(t) => inlet.errorHandler(record.value, t)
}
)
.collect { case Some(v) => v }
.via(handleTermination)
}

Expand Down Expand Up @@ -193,7 +199,13 @@ final class AkkaStreamletContextImpl(
KafkaControls.add(c)
NotUsed
}
.map(record => inlet.codec.decode(record.value))
.map(record =>
inlet.codec.decode(record.value) match {
case Success(value) => Some(value)
case Failure(t) => inlet.errorHandler(record.value, t)
}
)
.collect { case Some(v) => v }
.via(handleTermination)
.asSource
}(system.dispatcher)
Expand Down Expand Up @@ -297,9 +309,13 @@ final class AkkaStreamletContextImpl(
NotUsed
}
.via(handleTermination)
.map { record =>
inlet.codec.decode(record.value)
}
.map(record =>
inlet.codec.decode(record.value) match {
case Success(value) => Some(value)
case Failure(t) => inlet.errorHandler(record.value, t)
}
)
.collect { case Some(v) => v }
}

def shardedPlainSource[T, M, E](inlet: CodecInlet[T],
Expand Down Expand Up @@ -350,9 +366,13 @@ final class AkkaStreamletContextImpl(
NotUsed
}
.via(handleTermination)
.map { record =>
inlet.codec.decode(record.value)
}
.map(record =>
inlet.codec.decode(record.value) match {
case Success(value) => Some(value)
case Failure(t) => inlet.errorHandler(record.value, t)
}
)
.collect { case Some(v) => v }
}(system.dispatcher)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._

import com.typesafe.config._
import cloudflow.streamlets._
import java.{ util => ju }

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.util.Collector

import scala.util._

/**
* An implementation of `FlinkStreamletContext`
*/
Expand Down Expand Up @@ -64,7 +68,19 @@ class FlinkStreamletContextImpl(
// also this setting is honored only when checkpointing is on - otherwise the property in Kafka
// "enable.auto.commit" is considered
consumer.setCommitOffsetsOnCheckpoints(true)
env.addSource(consumer).map(inlet.codec.decode(_))
env
.addSource(consumer)
.flatMap(new FlatMapFunction[Array[Byte], In]() {
override def flatMap(value: Array[Byte], out: Collector[In]): Unit =
inlet.codec.decode(value) match {
case Success(v) => out.collect(v)
case Failure(t) =>
inlet.errorHandler(value, t) match {
case Some(r) => out.collect(r)
case _ =>
}
}
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import cloudflow.spark.sql.SQLImplicits._
import cloudflow.streamlets._

import scala.reflect.runtime.universe._
import scala.util.{ Failure, Success }

class SparkStreamletContextImpl(
private[cloudflow] override val streamletDefinition: StreamletDefinition,
Expand All @@ -52,17 +53,29 @@ class SparkStreamletContextImpl(
.load()

val rawDataset = src.select($"value").as[Array[Byte]]

rawDataset.map(inPort.codec.decode(_))
rawDataset
.map(raw =>
inPort.codec.decode(raw) match {
case Success(v) => v
case Failure(t) =>
inPort.errorHandler(raw, t) match {
case Some(r) => r
case _ => null.asInstanceOf[In]
}
}
)
.filter(validateNotNull[In](_))
}

def kafkaConsumerMap(topic: Topic) = topic.kafkaConsumerProperties.map {
private def kafkaConsumerMap(topic: Topic) = topic.kafkaConsumerProperties.map {
case (key, value) => s"kafka.$key" -> value
}
def kafkaProducerMap(topic: Topic) = topic.kafkaProducerProperties.map {
private def kafkaProducerMap(topic: Topic) = topic.kafkaProducerProperties.map {
case (key, value) => s"kafka.$key" -> value
}

private def validateNotNull[T](message: T): Boolean = message != null

def writeStream[Out](stream: Dataset[Out], outPort: CodecOutlet[Out], outputMode: OutputMode, optionalTrigger: Option[Trigger] = None)(
implicit encoder: Encoder[Out],
typeTag: TypeTag[Out]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
package cloudflow.streamlets

import scala.annotation._
import scala.util.control._
import scala.util.Try

@implicitNotFound(msg = "Cannot find Codec typeclass for ${T}")
trait Codec[T] extends Serializable {
def encode(value: T): Array[Byte]
// TODO, docs that it throws DecodeException
def decode(value: Array[Byte]): T
def decode(value: Array[Byte]): Try[T]
}

case class DecodeException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with NoStackTrace
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package cloudflow.streamlets

import org.slf4j.LoggerFactory

/**
* A named port handle handle to read or write data according to a schema.
*/
Expand Down Expand Up @@ -48,6 +50,20 @@ trait Inlet extends StreamletPort
*/
trait Outlet extends StreamletPort

object CodecInlet {

val logger = LoggerFactory.getLogger(this.getClass)

/**
* A default error handler. This error handler just logs bad message and skips them.
*/
def logAndSkip[T](message: Array[Byte], cause: Throwable): Option[T] = {
logger.error("Data decoding error, skipping message", cause)
None
}

}

/**
* A handle to read and deserialize data into elements of type `T`.
*/
Expand Down Expand Up @@ -76,6 +92,17 @@ trait CodecInlet[T] extends Inlet {
* If no unique group Id is set (which is the default), streamlet instances will each receive part of the data (on this inlet).
*/
def withUniqueGroupId: CodecInlet[T]

/**
* Sets a value for error handler for potential data unmarshalling errors
* If no error handler is specified, defaults to logging error and skipping record.
*/
def withErrorHandler(f: (Array[Byte], Throwable) => Option[T]): CodecInlet[T]

/**
* handle marshalling errors
*/
val errorHandler: (Array[Byte], Throwable) => Option[T]
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class AvroCodec[T <: SpecificRecordBase](avroSchema: Schema) extends Codec[T] {
val recordInjection: Injection[T, Array[Byte]] = SpecificAvroCodecs.toBinary(avroSchema)
val avroSerde = new AvroSerde(recordInjection)

def encode(value: T): Array[Byte] = avroSerde.encode(value)
def decode(bytes: Array[Byte]): T = avroSerde.decode(bytes)
def schema: Schema = avroSchema
def encode(value: T): Array[Byte] = avroSerde.encode(value)
def decode(bytes: Array[Byte]): Try[T] = avroSerde.decode(bytes)
def schema: Schema = avroSchema
}

private[avro] class AvroSerde[T <: SpecificRecordBase](injection: Injection[T, Array[Byte]]) extends Serializable {
Expand All @@ -41,9 +41,5 @@ private[avro] class AvroSerde[T <: SpecificRecordBase](injection: Injection[T, A
def encode(value: T): Array[Byte] = injection(value)

// TODO fix up the exception, maybe pas through input
def decode(bytes: Array[Byte]): T =
Try(inverted(bytes).get).recoverWith {
case t =>
Failure(DecodeException("Could not decode.", t))
}.get
def decode(bytes: Array[Byte]): Try[T] = inverted(bytes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ import cloudflow.streamlets._
import org.apache.avro.specific.SpecificRecordBase

import scala.reflect.ClassTag
import scala.reflect._

import AvroUtil._

case class AvroInlet[T <: SpecificRecordBase: ClassTag](name: String, hasUniqueGroupId: Boolean = false) extends CodecInlet[T] {
def codec = new AvroCodec[T](makeSchema)
def schemaDefinition = createSchemaDefinition(makeSchema)
def schemaAsString = makeSchema.toString(false)
def withUniqueGroupId: AvroInlet[T] = copy(hasUniqueGroupId = true)
case class AvroInlet[T <: SpecificRecordBase: ClassTag](
name: String,
hasUniqueGroupId: Boolean = false,
errorHandler: (Array[Byte], Throwable) => Option[T] = CodecInlet.logAndSkip[T](_: Array[Byte], _: Throwable)
) extends CodecInlet[T] {
def codec = new AvroCodec[T](makeSchema)
def schemaDefinition = createSchemaDefinition(makeSchema)
def schemaAsString = makeSchema.toString(false)
def withUniqueGroupId: AvroInlet[T] = copy(hasUniqueGroupId = true)
override def withErrorHandler(handler: (Array[Byte], Throwable) => Option[T]): CodecInlet[T] = copy(errorHandler = handler)
}

object AvroInlet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package cloudflow.streamlets.proto

import scalapb.{ GeneratedMessage, GeneratedMessageCompanion }

import cloudflow.streamlets._

import scala.util.Try

class ProtoCodec[T <: GeneratedMessage: GeneratedMessageCompanion] extends Codec[T] {
val cmp = implicitly[GeneratedMessageCompanion[T]]
def encode(value: T): Array[Byte] = cmp.toByteArray(value)
def decode(bytes: Array[Byte]): T = cmp.parseFrom(bytes)
val cmp = implicitly[GeneratedMessageCompanion[T]]
def encode(value: T): Array[Byte] = cmp.toByteArray(value)
def decode(bytes: Array[Byte]): Try[T] = Try { cmp.parseFrom(bytes) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package cloudflow.streamlets.proto

import cloudflow.streamlets._

import scalapb.{ GeneratedMessage, GeneratedMessageCompanion }

final case class ProtoInlet[T <: GeneratedMessage: GeneratedMessageCompanion](name: String, hasUniqueGroupId: Boolean = false)
extends CodecInlet[T] {
val cmp = implicitly[GeneratedMessageCompanion[T]]
val codec = new ProtoCodec[T]
def schemaAsString = cmp.scalaDescriptor.asProto.toProtoString
def schemaDefinition = ProtoUtil.createSchemaDefinition(cmp.scalaDescriptor)
def withUniqueGroupId: ProtoInlet[T] = if (hasUniqueGroupId) this else copy(hasUniqueGroupId = true)
final case class ProtoInlet[T <: GeneratedMessage: GeneratedMessageCompanion](
name: String,
hasUniqueGroupId: Boolean = false,
errorHandler: (Array[Byte], Throwable) => Option[T] = CodecInlet.logAndSkip[T](_: Array[Byte], _: Throwable)
) extends CodecInlet[T] {
val cmp = implicitly[GeneratedMessageCompanion[T]]
val codec = new ProtoCodec[T]
def schemaAsString = cmp.scalaDescriptor.asProto.toProtoString
def schemaDefinition = ProtoUtil.createSchemaDefinition(cmp.scalaDescriptor)
def withUniqueGroupId: ProtoInlet[T] = if (hasUniqueGroupId) this else copy(hasUniqueGroupId = true)
override def withErrorHandler(handler: (Array[Byte], Throwable) => Option[T]): CodecInlet[T] = copy(errorHandler = handler)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package cloudflow.streamlets.proto.javadsl
import cloudflow.streamlets._
import com.google.protobuf.GeneratedMessageV3

import scala.util.Try

class ProtoCodec[T <: GeneratedMessageV3](clazz: Class[T]) extends Codec[T] {
val parser = clazz.getMethod("parseFrom", classOf[Array[Byte]])

def encode(value: T): Array[Byte] = value.toByteArray
def decode(bytes: Array[Byte]): T =
def decode(bytes: Array[Byte]): Try[T] = Try {
parser.invoke(clazz, bytes).asInstanceOf[T]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,34 @@
package cloudflow.streamlets.proto.javadsl

import cloudflow.streamlets._
import cloudflow.streamlets.avro.AvroInlet
import com.google.protobuf.Descriptors.Descriptor
import com.google.protobuf.{ GeneratedMessageV3, TextFormat }

final case class ProtoInlet[T <: GeneratedMessageV3](name: String, clazz: Class[T], hasUniqueGroupId: Boolean = false)
extends CodecInlet[T] {
import scala.reflect.ClassTag

final case class ProtoInlet[T <: GeneratedMessageV3](
name: String,
clazz: Class[T],
hasUniqueGroupId: Boolean = false,
errorHandler: (Array[Byte], Throwable) => Option[T] = CodecInlet.logAndSkip[T](_: Array[Byte], _: Throwable)
) extends CodecInlet[T] {
// We know we can do this because of 'GeneratedMessageV3'
val descriptor = clazz.getMethod("getDescriptor").invoke(null).asInstanceOf[Descriptor]

val codec = new ProtoCodec[T](clazz)
def schemaAsString = TextFormat.printToUnicodeString(descriptor.toProto)
def schemaDefinition = ProtoUtil.createSchemaDefinition(descriptor)

def withUniqueGroupId: ProtoInlet[T] = if (hasUniqueGroupId) this else copy(hasUniqueGroupId = true)
def withUniqueGroupId: ProtoInlet[T] = if (hasUniqueGroupId) this else copy(hasUniqueGroupId = true)
override def withErrorHandler(handler: (Array[Byte], Throwable) => Option[T]): CodecInlet[T] = copy(errorHandler = handler)
}

object ProtoInlet {
// Java API
def create[T <: GeneratedMessageV3](name: String, clazz: Class[T]): ProtoInlet[T] =
ProtoInlet[T](name, clazz)

def create[T <: GeneratedMessageV3](name: String, clazz: Class[T], hasUniqueGroupId: Boolean): ProtoInlet[T] =
ProtoInlet[T](name, clazz, hasUniqueGroupId)
}
Loading

0 comments on commit cab3889

Please sign in to comment.