From 50f22f2bf1c52e601b5731e976f945b196e34762 Mon Sep 17 00:00:00 2001 From: Raymond Roestenburg Date: Wed, 18 Nov 2020 14:56:35 +0100 Subject: [PATCH] Fix: Always getting current objects on watch restart. (#881) --- .../scala/cloudflow/operator/Operator.scala | 52 +++++++++++-------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Operator.scala b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Operator.scala index 9d292580a..f4c050dd9 100644 --- a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Operator.scala +++ b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Operator.scala @@ -27,8 +27,8 @@ import akka.actor._ import akka.pattern._ import akka.stream._ import akka.stream.scaladsl._ +import org.slf4j.LoggerFactory import play.api.libs.json.Format - import skuber._ import skuber.api.client._ import skuber.json.format._ @@ -39,6 +39,8 @@ import cloudflow.operator.event._ import cloudflow.operator.flow._ object Operator { + lazy val log = LoggerFactory.getLogger("Operator") + val ProtocolVersion = "4" val ProtocolVersionKey = "protocol-version" val ProtocolVersionConfigMapName = "cloudflow-protocol-version" @@ -216,8 +218,7 @@ object Operator { rd: ResourceDefinition[O], lc: LoggingContext, ec: ExecutionContext, - ct: ClassTag[O]): Source[WatchEvent[O], NotUsed] = { - + ct: ClassTag[O]): Source[WatchEvent[O], NotUsed] = /* ================================================= * Workaround for issue found on openshift: * After 10-15 minutes, K8s API server responds with 410 Gone status to a watch request, which skuber does not expect while processing the watch response stream. @@ -235,27 +236,28 @@ object Operator { * On failing watches this code becomes a polling loop of listing resources which are turned into events. * Events that have already been processed are discarded in AppEvents.fromWatchEvent. * ==================================================*/ - system.log.info(s"Getting current events for ${classTag[O].runtimeClass.getName}") - - val eventsResult = getCurrentEvents[O](client, options, 0, 3.seconds) RestartSource.withBackoff( - minBackoff = 3.seconds, - maxBackoff = 30.seconds, - randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly - maxRestarts = -1 // limits the amount of restarts to 20 + RestartSettings( + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + ) ) { () => - system.log.info(s"Starting watch for ${classTag[O].runtimeClass.getName}") + log.info(s"Starting watch for ${classTag[O].runtimeClass.getName}") Source - .future(eventsResult) + .future(getCurrentEvents[O](client, options, 0, 3.seconds)) .mapConcat(identity _) .concat( client .watchWithOptions[O](options = options, bufsize = MaxObjectBufSize) + .map { o => + log.debug(s"""WatchEvent for ${classTag[O].runtimeClass.getName}, object uid: ${o._object.metadata.uid}""") + o + } .mapMaterializedValue(_ => NotUsed) ) } - } private def getCurrentEvents[O <: ObjectResource]( client: KubernetesClient, @@ -266,31 +268,37 @@ object Operator { rd: ResourceDefinition[O], lc: LoggingContext, system: ActorSystem, - ec: ExecutionContext): Future[List[WatchEvent[O]]] = + ec: ExecutionContext, + ct: ClassTag[O]): Future[List[WatchEvent[O]]] = { + log.info(s"Getting current events for ${classTag[O].runtimeClass.getName}") + (for { namespaces <- client.getNamespaceNames lists <- Future.sequence(namespaces.map(ns => client.usingNamespace(ns).listWithOptions[ListResource[O]](options))) - watchEvents = lists.flatMap(_.items.map(item => WatchEvent(EventType.ADDED, item))) + items = lists.flatMap(_.items) + watchEvents = items.map(item => WatchEvent(EventType.ADDED, item)) + itemUids = items.map(_.metadata.uid) + _ = log.debug(s"""Current ${classTag[O].runtimeClass.getName} objects: ${itemUids.mkString(",")}""") } yield watchEvents).recoverWith { case NonFatal(e) => - system.log.warning(s"Could not get current events, attempt number $attempt", e) + log.warn(s"Could not get current events, attempt number $attempt", e) after(delay, system.scheduler)(getCurrentEvents(client, options, attempt + 1, delay)) } - + } private def runStream( graph: RunnableGraph[Future[_]], unexpectedCompletionMsg: String, errorMsg: String )(implicit system: ActorSystem, mat: Materializer, ec: ExecutionContext) = - graph.withAttributes(StreamAttributes).run.onComplete { + graph.withAttributes(StreamAttributes).run().onComplete { case Success(_) => - system.log.warning(unexpectedCompletionMsg) - system.registerOnTermination(exitWithFailure) + log.warn(unexpectedCompletionMsg) + system.registerOnTermination(exitWithFailure()) system.terminate() case Failure(t) => - system.log.error(t, errorMsg) - system.registerOnTermination(exitWithFailure) + log.error(errorMsg, t) + system.registerOnTermination(exitWithFailure()) system.terminate() }