Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Fix: Always getting current objects on watch restart. (#881)
Browse files Browse the repository at this point in the history
  • Loading branch information
RayRoestenburg authored Nov 18, 2020
1 parent f550eb3 commit 50f22f2
Showing 1 changed file with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import akka.actor._
import akka.pattern._
import akka.stream._
import akka.stream.scaladsl._
import org.slf4j.LoggerFactory
import play.api.libs.json.Format

import skuber._
import skuber.api.client._
import skuber.json.format._
Expand All @@ -39,6 +39,8 @@ import cloudflow.operator.event._
import cloudflow.operator.flow._

object Operator {
lazy val log = LoggerFactory.getLogger("Operator")

val ProtocolVersion = "4"
val ProtocolVersionKey = "protocol-version"
val ProtocolVersionConfigMapName = "cloudflow-protocol-version"
Expand Down Expand Up @@ -216,8 +218,7 @@ object Operator {
rd: ResourceDefinition[O],
lc: LoggingContext,
ec: ExecutionContext,
ct: ClassTag[O]): Source[WatchEvent[O], NotUsed] = {

ct: ClassTag[O]): Source[WatchEvent[O], NotUsed] =
/* =================================================
* Workaround for issue found on openshift:
* After 10-15 minutes, K8s API server responds with 410 Gone status to a watch request, which skuber does not expect while processing the watch response stream.
Expand All @@ -235,27 +236,28 @@ object Operator {
* On failing watches this code becomes a polling loop of listing resources which are turned into events.
* Events that have already been processed are discarded in AppEvents.fromWatchEvent.
* ==================================================*/
system.log.info(s"Getting current events for ${classTag[O].runtimeClass.getName}")

val eventsResult = getCurrentEvents[O](client, options, 0, 3.seconds)

RestartSource.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
maxRestarts = -1 // limits the amount of restarts to 20
RestartSettings(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
)
) { () =>
system.log.info(s"Starting watch for ${classTag[O].runtimeClass.getName}")
log.info(s"Starting watch for ${classTag[O].runtimeClass.getName}")
Source
.future(eventsResult)
.future(getCurrentEvents[O](client, options, 0, 3.seconds))
.mapConcat(identity _)
.concat(
client
.watchWithOptions[O](options = options, bufsize = MaxObjectBufSize)
.map { o =>
log.debug(s"""WatchEvent for ${classTag[O].runtimeClass.getName}, object uid: ${o._object.metadata.uid}""")
o
}
.mapMaterializedValue(_ => NotUsed)
)
}
}

private def getCurrentEvents[O <: ObjectResource](
client: KubernetesClient,
Expand All @@ -266,31 +268,37 @@ object Operator {
rd: ResourceDefinition[O],
lc: LoggingContext,
system: ActorSystem,
ec: ExecutionContext): Future[List[WatchEvent[O]]] =
ec: ExecutionContext,
ct: ClassTag[O]): Future[List[WatchEvent[O]]] = {
log.info(s"Getting current events for ${classTag[O].runtimeClass.getName}")

(for {
namespaces <- client.getNamespaceNames
lists <- Future.sequence(namespaces.map(ns => client.usingNamespace(ns).listWithOptions[ListResource[O]](options)))
watchEvents = lists.flatMap(_.items.map(item => WatchEvent(EventType.ADDED, item)))
items = lists.flatMap(_.items)
watchEvents = items.map(item => WatchEvent(EventType.ADDED, item))
itemUids = items.map(_.metadata.uid)
_ = log.debug(s"""Current ${classTag[O].runtimeClass.getName} objects: ${itemUids.mkString(",")}""")
} yield watchEvents).recoverWith {
case NonFatal(e) =>
system.log.warning(s"Could not get current events, attempt number $attempt", e)
log.warn(s"Could not get current events, attempt number $attempt", e)
after(delay, system.scheduler)(getCurrentEvents(client, options, attempt + 1, delay))
}

}
private def runStream(
graph: RunnableGraph[Future[_]],
unexpectedCompletionMsg: String,
errorMsg: String
)(implicit system: ActorSystem, mat: Materializer, ec: ExecutionContext) =
graph.withAttributes(StreamAttributes).run.onComplete {
graph.withAttributes(StreamAttributes).run().onComplete {
case Success(_) =>
system.log.warning(unexpectedCompletionMsg)
system.registerOnTermination(exitWithFailure)
log.warn(unexpectedCompletionMsg)
system.registerOnTermination(exitWithFailure())
system.terminate()

case Failure(t) =>
system.log.error(t, errorMsg)
system.registerOnTermination(exitWithFailure)
log.error(errorMsg, t)
system.registerOnTermination(exitWithFailure())
system.terminate()
}

Expand Down

0 comments on commit 50f22f2

Please sign in to comment.