From afd342dc5e2086f5c4032d6f026135e93beca4ff Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 30 Mar 2017 16:16:10 +0200 Subject: [PATCH 1/6] add flag to disable chasing --- .../scala/akka/stream/impl/fusing/GraphInterpreter.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 18bf0346076..dde4883c381 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -197,7 +197,10 @@ import scala.util.control.NonFatal import GraphInterpreter._ - private[this] val ChaseLimit = if (fuzzingMode) 0 else 16 + private val enableChasing = true + + protected val ChaseLimit = + if (!enableChasing || fuzzingMode) 0 else 16 /** * INTERNAL API From 47a380199e7e141273cd72bb3dc06125bf15d5c7 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 30 Mar 2017 16:13:44 +0200 Subject: [PATCH 2/6] disable pull chasing --- .../main/scala/akka/stream/impl/fusing/GraphInterpreter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index dde4883c381..4e204b97df4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -197,7 +197,7 @@ import scala.util.control.NonFatal import GraphInterpreter._ - private val enableChasing = true + private val enableChasing = false protected val ChaseLimit = if (!enableChasing || fuzzingMode) 0 else 16 From 3f4c77c5f9b6258479c4b9660e1301f1157b4e57 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 28 Mar 2017 13:41:23 +0200 Subject: [PATCH 3/6] refactor GraphInterpreter to allow specialization (by codegen) to remove megamorphic call sites Created special-cased 31-connection version specialized for HTTP --- .../impl/fusing/ActorGraphInterpreter.scala | 27 +- .../stream/impl/fusing/GraphInterpreter.scala | 605 ++++++++++++------ 2 files changed, 444 insertions(+), 188 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index cfb812754c6..e31d28981f8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -480,14 +480,25 @@ import scala.util.control.NonFatal private var enqueueToShortCircuit: (Any) ⇒ Unit = _ - lazy val interpreter: GraphInterpreter = new GraphInterpreter(mat, log, logics, connections, - (logic, event, handler) ⇒ { - val asyncInput = AsyncInput(this, logic, event, handler) - val currentInterpreter = GraphInterpreter.currentInterpreterOrNull - if (currentInterpreter == null || (currentInterpreter.context ne self)) - self ! asyncInput - else enqueueToShortCircuit(asyncInput) - }, settings.fuzzingMode, self) + lazy val interpreter: GraphInterpreter = + if (connections.size == 31) + new GraphInterpreter31Impl(mat, log, logics, connections, + (logic, event, handler) ⇒ { + val asyncInput = AsyncInput(this, logic, event, handler) + val currentInterpreter = GraphInterpreter.currentInterpreterOrNull + if (currentInterpreter == null || (currentInterpreter.context ne self)) + self ! asyncInput + else enqueueToShortCircuit(asyncInput) + }, settings.fuzzingMode, self) + else + new GraphInterpreterImpl(mat, log, logics, connections, + (logic, event, handler) ⇒ { + val asyncInput = AsyncInput(this, logic, event, handler) + val currentInterpreter = GraphInterpreter.currentInterpreterOrNull + if (currentInterpreter == null || (currentInterpreter.context ne self)) + self ! asyncInput + else enqueueToShortCircuit(asyncInput) + }, settings.fuzzingMode, self) // TODO: really needed? private var subscribesPending = 0 diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 4e204b97df4..e02a5cdc841 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -11,6 +11,7 @@ import java.util.concurrent.ThreadLocalRandom import akka.annotation.InternalApi +import scala.annotation.switch import scala.util.control.NonFatal /** @@ -86,7 +87,7 @@ import scala.util.control.NonFatal else s"Connection($id, $portState, $slot, $inHandler, $outHandler)" } - private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] { + private[fusing] val _currentInterpreter = new ThreadLocal[Array[AnyRef]] { /* * Using an Object-array avoids holding on to the GraphInterpreter class * when this accidentally leaks onto threads that are not stopped when this @@ -186,14 +187,14 @@ import scala.util.control.NonFatal * be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle * edge of a balance is pulled, dissolving the original cycle). */ -@InternalApi private[akka] final class GraphInterpreter( - val materializer: Materializer, - val log: LoggingAdapter, - val logics: Array[GraphStageLogic], // Array of stage logics - val connections: Array[GraphInterpreter.Connection], - val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, - val fuzzingMode: Boolean, - val context: ActorRef) { +@InternalApi private[akka] abstract class GraphInterpreter( + final val materializer: Materializer, + final val log: LoggingAdapter, + final val logics: Array[GraphStageLogic], // Array of stage logics + final val connections: Array[GraphInterpreter.Connection], + final val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, + final val fuzzingMode: Boolean, + final val context: ActorRef) { import GraphInterpreter._ @@ -209,28 +210,28 @@ import scala.util.control.NonFatal // The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be // completed - private[this] var runningStages = logics.length + protected var runningStages = logics.length // Counts how many active connections a stage has. Once it reaches zero, the stage is automatically stopped. - private[this] val shutdownCounter = Array.tabulate(logics.length) { i ⇒ + protected val shutdownCounter = Array.tabulate(logics.length) { i ⇒ logics(i).handlers.length } - private[this] var _subFusingMaterializer: Materializer = _ + protected var _subFusingMaterializer: Materializer = _ def subFusingMaterializer: Materializer = _subFusingMaterializer // An event queue implemented as a circular buffer // FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue - private[this] val eventQueue = new Array[Connection](1 << (32 - Integer.numberOfLeadingZeros(connections.length - 1))) - private[this] val mask = eventQueue.length - 1 - private[this] var queueHead: Int = 0 - private[this] var queueTail: Int = 0 + protected val eventQueue = new Array[Connection](1 << (32 - Integer.numberOfLeadingZeros(connections.length - 1))) + protected val mask = eventQueue.length - 1 + protected var queueHead: Int = 0 + protected var queueTail: Int = 0 - private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased - private[this] var chasedPush: Connection = NoEvent - private[this] var chasedPull: Connection = NoEvent + protected var chaseCounter = 0 // the first events in preStart blocks should be not chased + protected var chasedPush: Connection = NoEvent + protected var chasedPull: Connection = NoEvent - private def queueStatus: String = { + protected def queueStatus: String = { val contents = (queueHead until queueTail).map(idx ⇒ { val conn = eventQueue(idx & mask) conn @@ -314,127 +315,46 @@ import scala.util.control.NonFatal } // Debug name for a connections input part - private def inOwnerName(connection: Connection): String = connection.inOwner.toString + protected def inOwnerName(connection: Connection): String = connection.inOwner.toString // Debug name for a connections output part - private def outOwnerName(connection: Connection): String = connection.outOwner.toString + protected def outOwnerName(connection: Connection): String = connection.outOwner.toString // Debug name for a connections input part - private def inLogicName(connection: Connection): String = logics(connection.inOwner.stageId).toString + protected def inLogicName(connection: Connection): String = logics(connection.inOwner.stageId).toString // Debug name for a connections output part - private def outLogicName(connection: Connection): String = logics(connection.outOwner.stageId).toString + protected def outLogicName(connection: Connection): String = logics(connection.outOwner.stageId).toString - private def shutdownCounters: String = + protected def shutdownCounters: String = shutdownCounter.map(x ⇒ if (x >= KeepGoingFlag) s"${x & KeepGoingMask}(KeepGoing)" else x.toString).mkString(",") - /** - * Executes pending events until the given limit is met. If there were remaining events, isSuspended will return - * true. - */ - def execute(eventLimit: Int): Int = { - if (Debug) println(s"$Name ---------------- EXECUTE $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") - val currentInterpreterHolder = _currentInterpreter.get() - val previousInterpreter = currentInterpreterHolder(0) - currentInterpreterHolder(0) = this - var eventsRemaining = eventLimit - try { - while (eventsRemaining > 0 && queueTail != queueHead) { - val connection = dequeue() - eventsRemaining -= 1 - chaseCounter = math.min(ChaseLimit, eventsRemaining) - - def reportStageError(e: Throwable): Unit = { - if (activeStage == null) throw e - else { - log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage) - activeStage.failStage(e) - - // Abort chasing - chaseCounter = 0 - if (chasedPush ne NoEvent) { - enqueue(chasedPush) - chasedPush = NoEvent - } - if (chasedPull ne NoEvent) { - enqueue(chasedPull) - chasedPull = NoEvent - } - } - } + protected def reportStageError(e: Throwable): Unit = { + if (activeStage == null) throw e + else { + log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage) + activeStage.failStage(e) - /* - * This is the "normal" event processing code which dequeues directly from the internal event queue. Since - * most execution paths tend to produce either a Push that will be propagated along a longer chain we take - * extra steps below to make this more efficient. - */ - try processEvent(connection) - catch { - case NonFatal(e) ⇒ reportStageError(e) - } - afterStageHasRun(activeStage) - - /* - * "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or - * Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event - * dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path - * instead. Looking at the scenario of a Push, the following events will happen. - * - "normal" dispatch executes an onPush event - * - stage eventually calls push() - * - code inside the push() method checks the validity of the call, and also if it can be safely ignored - * (because the target stage already completed we just have not been notified yet) - * - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush - * variable - * - the loop below immediately captures this push and dispatches it - * - * What is saved by this optimization is three steps: - * - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing - * pressure on array load-store - * - no need to dequeue the Connection from the queue, similar to above - * - no need to decode the event, we know it is a Push already - * - no need to check for validity of the event because we already checked at the push() call, and there - * can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is - * called in the target stage just before the onPush() arrives). This avoids unnecessary branching. - */ - - // Chasing PUSH events - while (chasedPush != NoEvent) { - val connection = chasedPush - chasedPush = NoEvent - try processPush(connection) - catch { - case NonFatal(e) ⇒ reportStageError(e) - } - afterStageHasRun(activeStage) - } - - // Chasing PULL events - while (chasedPull != NoEvent) { - val connection = chasedPull - chasedPull = NoEvent - try processPull(connection) - catch { - case NonFatal(e) ⇒ reportStageError(e) - } - afterStageHasRun(activeStage) - } - - if (chasedPush != NoEvent) { - enqueue(chasedPush) - chasedPush = NoEvent - } - - } - // Event *must* be enqueued while not in the execute loop (events enqueued from external, possibly async events) + // Abort chasing chaseCounter = 0 - } finally { - currentInterpreterHolder(0) = previousInterpreter + if (chasedPush ne NoEvent) { + enqueue(chasedPush) + chasedPush = NoEvent + } + if (chasedPull ne NoEvent) { + enqueue(chasedPull) + chasedPull = NoEvent + } } - if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") - // TODO: deadlock detection - eventsRemaining } + /** + * Executes pending events until the given limit is met. If there were remaining events, isSuspended will return + * true. + */ + def execute(eventLimit: Int): Int + protected def processEvent(connection: Connection): Unit + def runAsyncInput(logic: GraphStageLogic, evt: Any, handler: (Any) ⇒ Unit): Unit = if (!isStageCompleted(logic)) { if (GraphInterpreter.Debug) println(s"$Name ASYNC $evt ($handler) [$logic]") @@ -451,65 +371,21 @@ import scala.util.control.NonFatal } finally currentInterpreterHolder(0) = previousInterpreter } - // Decodes and processes a single event for the given connection - private def processEvent(connection: Connection): Unit = { - - // this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage - // (this can happen if a stage completes voluntarily while connection close events are still queued) - activeStage = null - val code = connection.portState - - // Manual fast decoding, fast paths are PUSH and PULL - // PUSH - if ((code & (Pushing | InClosed | OutClosed)) == Pushing) { - processPush(connection) - - // PULL - } else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) { - processPull(connection) - - // CANCEL - } else if ((code & (OutClosed | InClosed)) == InClosed) { - activeStage = connection.outOwner - if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") - connection.portState |= OutClosed - completeConnection(connection.outOwner.stageId) - connection.outHandler.onDownstreamFinish() - } else if ((code & (OutClosed | InClosed)) == OutClosed) { - // COMPLETIONS - - if ((code & Pushing) == 0) { - // Normal completion (no push pending) - if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]") - connection.portState |= InClosed - activeStage = connection.inOwner - completeConnection(connection.inOwner.stageId) - if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish() - else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex) - } else { - // Push is pending, first process push, then re-enqueue closing event - processPush(connection) - enqueue(connection) - } - - } - } - - private def processPush(connection: Connection): Unit = { + protected def processPush(connection: Connection): Unit = { if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connection.slot} (${connection.inHandler}) [${inLogicName(connection)}]") activeStage = connection.inOwner connection.portState ^= PushEndFlip connection.inHandler.onPush() } - private def processPull(connection: Connection): Unit = { + protected def processPull(connection: Connection): Unit = { if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") activeStage = connection.outOwner connection.portState ^= PullEndFlip connection.outHandler.onPull() } - private def dequeue(): Connection = { + protected def dequeue(): Connection = { val idx = queueHead & mask if (fuzzingMode) { val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask @@ -540,7 +416,7 @@ import scala.util.control.NonFatal // Register that a connection in which the given stage participated has been completed and therefore the stage // itself might stop, too. - private def completeConnection(stageId: Int): Unit = { + protected def completeConnection(stageId: Int): Unit = { val activeConnections = shutdownCounter(stageId) if (activeConnections > 0) shutdownCounter(stageId) = activeConnections - 1 } @@ -559,21 +435,21 @@ import scala.util.control.NonFatal } } - private[stream] def chasePush(connection: Connection): Unit = { + protected[stream] def chasePush(connection: Connection): Unit = { if (chaseCounter > 0 && chasedPush == NoEvent) { chaseCounter -= 1 chasedPush = connection } else enqueue(connection) } - private[stream] def chasePull(connection: Connection): Unit = { + protected[stream] def chasePull(connection: Connection): Unit = { if (chaseCounter > 0 && chasedPull == NoEvent) { chaseCounter -= 1 chasedPull = connection } else enqueue(connection) } - private[stream] def complete(connection: Connection): Unit = { + protected[stream] def complete(connection: Connection): Unit = { val currentState = connection.portState if (Debug) println(s"$Name complete($connection) [$currentState]") connection.portState = currentState | OutClosed @@ -587,7 +463,7 @@ import scala.util.control.NonFatal if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId) } - private[stream] def fail(connection: Connection, ex: Throwable): Unit = { + protected[stream] def fail(connection: Connection, ex: Throwable): Unit = { val currentState = connection.portState if (Debug) println(s"$Name fail($connection, $ex) [$currentState]") connection.portState = currentState | OutClosed @@ -605,7 +481,7 @@ import scala.util.control.NonFatal if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId) } - private[stream] def cancel(connection: Connection): Unit = { + protected[stream] def cancel(connection: Connection): Unit = { val currentState = connection.portState if (Debug) println(s"$Name cancel($connection) [$currentState]") connection.portState = currentState | InClosed @@ -669,3 +545,372 @@ import scala.util.control.NonFatal } } } + +private[akka] final class GraphInterpreterImpl( + materializer: Materializer, + log: LoggingAdapter, + logics: Array[GraphStageLogic], // Array of stage logics + connections: Array[GraphInterpreter.Connection], + onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, + fuzzingMode: Boolean, + context: ActorRef) extends GraphInterpreter(materializer, log, logics, connections, onAsyncInput, fuzzingMode, context) { + import GraphInterpreter._ + + println(s"GraphInterpreter created with ${connections.size} conns") + + def execute(eventLimit: Int): Int = { + if (Debug) println(s"$Name ---------------- EXECUTE $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") + val currentInterpreterHolder = _currentInterpreter.get() + val previousInterpreter = currentInterpreterHolder(0) + currentInterpreterHolder(0) = this + var eventsRemaining = eventLimit + try { + while (eventsRemaining > 0 && queueTail != queueHead) { + val connection = dequeue() + eventsRemaining -= 1 + chaseCounter = math.min(ChaseLimit, eventsRemaining) + + /* + * This is the "normal" event processing code which dequeues directly from the internal event queue. Since + * most execution paths tend to produce either a Push that will be propagated along a longer chain we take + * extra steps below to make this more efficient. + */ + try processEvent(connection) + catch { + case NonFatal(e) ⇒ reportStageError(e) + } + afterStageHasRun(activeStage) + + /* + * "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or + * Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event + * dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path + * instead. Looking at the scenario of a Push, the following events will happen. + * - "normal" dispatch executes an onPush event + * - stage eventually calls push() + * - code inside the push() method checks the validity of the call, and also if it can be safely ignored + * (because the target stage already completed we just have not been notified yet) + * - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush + * variable + * - the loop below immediately captures this push and dispatches it + * + * What is saved by this optimization is three steps: + * - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing + * pressure on array load-store + * - no need to dequeue the Connection from the queue, similar to above + * - no need to decode the event, we know it is a Push already + * - no need to check for validity of the event because we already checked at the push() call, and there + * can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is + * called in the target stage just before the onPush() arrives). This avoids unnecessary branching. + */ + + // Chasing PUSH events + while (chasedPush != NoEvent) { + val connection = chasedPush + chasedPush = NoEvent + try processPush(connection) + catch { + case NonFatal(e) ⇒ reportStageError(e) + } + afterStageHasRun(activeStage) + } + + // Chasing PULL events + while (chasedPull != NoEvent) { + val connection = chasedPull + chasedPull = NoEvent + try processPull(connection) + catch { + case NonFatal(e) ⇒ reportStageError(e) + } + afterStageHasRun(activeStage) + } + + if (chasedPush != NoEvent) { + enqueue(chasedPush) + chasedPush = NoEvent + } + + } + // Event *must* be enqueued while not in the execute loop (events enqueued from external, possibly async events) + chaseCounter = 0 + } finally { + currentInterpreterHolder(0) = previousInterpreter + } + if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") + // TODO: deadlock detection + eventsRemaining + } + + // Decodes and processes a single event for the given connection + protected final def processEvent(connection: Connection): Unit = { + + // this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage + // (this can happen if a stage completes voluntarily while connection close events are still queued) + activeStage = null + val code = connection.portState + + // Manual fast decoding, fast paths are PUSH and PULL + // PUSH + if ((code & (Pushing | InClosed | OutClosed)) == Pushing) { + processPush(connection) + + // PULL + } else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) { + processPull(connection) + + // CANCEL + } else if ((code & (OutClosed | InClosed)) == InClosed) { + activeStage = connection.outOwner + if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") + connection.portState |= OutClosed + completeConnection(connection.outOwner.stageId) + connection.outHandler.onDownstreamFinish() + } else if ((code & (OutClosed | InClosed)) == OutClosed) { + // COMPLETIONS + + if ((code & Pushing) == 0) { + // Normal completion (no push pending) + if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]") + connection.portState |= InClosed + activeStage = connection.inOwner + completeConnection(connection.inOwner.stageId) + if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish() + else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex) + } else { + // Push is pending, first process push, then re-enqueue closing event + processPush(connection) + enqueue(connection) + } + + } + } + + protected final override def processPush(connection: Connection): Unit = { + if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connection.slot} (${connection.inHandler}) [${inLogicName(connection)}]") + activeStage = connection.inOwner + connection.portState ^= PushEndFlip + connection.inHandler.onPush() + } +} + +private[akka] final class GraphInterpreter31Impl( + materializer: Materializer, + log: LoggingAdapter, + logics: Array[GraphStageLogic], // Array of stage logics + connections: Array[GraphInterpreter.Connection], + onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, + fuzzingMode: Boolean, + context: ActorRef) extends GraphInterpreter(materializer, log, logics, connections, onAsyncInput, fuzzingMode, context) { + import GraphInterpreter._ + + println(s"Created special cased HTTP graph interpreter") + + final override def execute(eventLimit: Int): Int = { + if (Debug) println(s"$Name ---------------- EXECUTE $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") + val currentInterpreterHolder = _currentInterpreter.get() + val previousInterpreter = currentInterpreterHolder(0) + currentInterpreterHolder(0) = this + var eventsRemaining = eventLimit + try { + while (eventsRemaining > 0 && queueTail != queueHead) { + val connection = dequeue() + eventsRemaining -= 1 + chaseCounter = math.min(ChaseLimit, eventsRemaining) + + /* + * This is the "normal" event processing code which dequeues directly from the internal event queue. Since + * most execution paths tend to produce either a Push that will be propagated along a longer chain we take + * extra steps below to make this more efficient. + */ + try processEvent(connection) + catch { + case NonFatal(e) ⇒ reportStageError(e) + } + afterStageHasRun(activeStage) + + /* + * "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or + * Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event + * dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path + * instead. Looking at the scenario of a Push, the following events will happen. + * - "normal" dispatch executes an onPush event + * - stage eventually calls push() + * - code inside the push() method checks the validity of the call, and also if it can be safely ignored + * (because the target stage already completed we just have not been notified yet) + * - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush + * variable + * - the loop below immediately captures this push and dispatches it + * + * What is saved by this optimization is three steps: + * - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing + * pressure on array load-store + * - no need to dequeue the Connection from the queue, similar to above + * - no need to decode the event, we know it is a Push already + * - no need to check for validity of the event because we already checked at the push() call, and there + * can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is + * called in the target stage just before the onPush() arrives). This avoids unnecessary branching. + */ + + // Chasing PUSH events + while (chasedPush != NoEvent) { + val connection = chasedPush + chasedPush = NoEvent + try processPush(connection) + catch { + case NonFatal(e) ⇒ reportStageError(e) + } + afterStageHasRun(activeStage) + } + + // Chasing PULL events + while (chasedPull != NoEvent) { + val connection = chasedPull + chasedPull = NoEvent + try processPull(connection) + catch { + case NonFatal(e) ⇒ reportStageError(e) + } + afterStageHasRun(activeStage) + } + + if (chasedPush != NoEvent) { + enqueue(chasedPush) + chasedPush = NoEvent + } + + } + // Event *must* be enqueued while not in the execute loop (events enqueued from external, possibly async events) + chaseCounter = 0 + } finally { + currentInterpreterHolder(0) = previousInterpreter + } + if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") + // TODO: deadlock detection + eventsRemaining + } + + // Decodes and processes a single event for the given connection + protected final def processEvent(connection: Connection): Unit = { + + // this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage + // (this can happen if a stage completes voluntarily while connection close events are still queued) + activeStage = null + val code = connection.portState + + // Manual fast decoding, fast paths are PUSH and PULL + // PUSH + if ((code & (Pushing | InClosed | OutClosed)) == Pushing) { + processPush(connection) + + // PULL + } else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) { + processPull(connection) + + // CANCEL + } else if ((code & (OutClosed | InClosed)) == InClosed) { + activeStage = connection.outOwner + if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") + connection.portState |= OutClosed + completeConnection(connection.outOwner.stageId) + connection.outHandler.onDownstreamFinish() + } else if ((code & (OutClosed | InClosed)) == OutClosed) { + // COMPLETIONS + + if ((code & Pushing) == 0) { + // Normal completion (no push pending) + if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]") + connection.portState |= InClosed + activeStage = connection.inOwner + completeConnection(connection.inOwner.stageId) + if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish() + else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex) + } else { + // Push is pending, first process push, then re-enqueue closing event + processPush(connection) + enqueue(connection) + } + + } + } + + protected final override def processPush(connection: Connection): Unit = { + if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connection.slot} (${connection.inHandler}) [${inLogicName(connection)}]") + activeStage = connection.inOwner + connection.portState ^= PushEndFlip + + (connection.id: @switch) match { + case 0 ⇒ connection.inHandler.onPush() + case 1 ⇒ connection.inHandler.onPush() + case 2 ⇒ connection.inHandler.onPush() + case 3 ⇒ connection.inHandler.onPush() + case 4 ⇒ connection.inHandler.onPush() + case 5 ⇒ connection.inHandler.onPush() + case 6 ⇒ connection.inHandler.onPush() + case 7 ⇒ connection.inHandler.onPush() + case 8 ⇒ connection.inHandler.onPush() + case 9 ⇒ connection.inHandler.onPush() + case 10 ⇒ connection.inHandler.onPush() + case 11 ⇒ connection.inHandler.onPush() + case 12 ⇒ connection.inHandler.onPush() + case 13 ⇒ connection.inHandler.onPush() + case 14 ⇒ connection.inHandler.onPush() + case 15 ⇒ connection.inHandler.onPush() + case 16 ⇒ connection.inHandler.onPush() + case 17 ⇒ connection.inHandler.onPush() + case 18 ⇒ connection.inHandler.onPush() + case 19 ⇒ connection.inHandler.onPush() + case 20 ⇒ connection.inHandler.onPush() + case 21 ⇒ connection.inHandler.onPush() + case 22 ⇒ connection.inHandler.onPush() + case 23 ⇒ connection.inHandler.onPush() + case 24 ⇒ connection.inHandler.onPush() + case 25 ⇒ connection.inHandler.onPush() + case 26 ⇒ connection.inHandler.onPush() + case 27 ⇒ connection.inHandler.onPush() + case 28 ⇒ connection.inHandler.onPush() + case 29 ⇒ connection.inHandler.onPush() + case 30 ⇒ connection.inHandler.onPush() + } + } + + protected final override def processPull(connection: Connection): Unit = { + if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") + activeStage = connection.outOwner + connection.portState ^= PullEndFlip + + (connection.id: @switch) match { + case 0 ⇒ connection.outHandler.onPull() + case 1 ⇒ connection.outHandler.onPull() + case 2 ⇒ connection.outHandler.onPull() + case 3 ⇒ connection.outHandler.onPull() + case 4 ⇒ connection.outHandler.onPull() + case 5 ⇒ connection.outHandler.onPull() + case 6 ⇒ connection.outHandler.onPull() + case 7 ⇒ connection.outHandler.onPull() + case 8 ⇒ connection.outHandler.onPull() + case 9 ⇒ connection.outHandler.onPull() + case 10 ⇒ connection.outHandler.onPull() + case 11 ⇒ connection.outHandler.onPull() + case 12 ⇒ connection.outHandler.onPull() + case 13 ⇒ connection.outHandler.onPull() + case 14 ⇒ connection.outHandler.onPull() + case 15 ⇒ connection.outHandler.onPull() + case 16 ⇒ connection.outHandler.onPull() + case 17 ⇒ connection.outHandler.onPull() + case 18 ⇒ connection.outHandler.onPull() + case 19 ⇒ connection.outHandler.onPull() + case 20 ⇒ connection.outHandler.onPull() + case 21 ⇒ connection.outHandler.onPull() + case 22 ⇒ connection.outHandler.onPull() + case 23 ⇒ connection.outHandler.onPull() + case 24 ⇒ connection.outHandler.onPull() + case 25 ⇒ connection.outHandler.onPull() + case 26 ⇒ connection.outHandler.onPull() + case 27 ⇒ connection.outHandler.onPull() + case 28 ⇒ connection.outHandler.onPull() + case 29 ⇒ connection.outHandler.onPull() + case 30 ⇒ connection.outHandler.onPull() + } + } +} From ae8ba7cc26af664c4325fcdd5e1c24fb58d4ab18 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 30 Mar 2017 17:02:01 +0200 Subject: [PATCH 4/6] specialized GraphInterpreter improvements --- .../stream/impl/fusing/GraphInterpreter.scala | 67 ++----------------- 1 file changed, 7 insertions(+), 60 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index e02a5cdc841..41efa1045a0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -36,6 +36,10 @@ import scala.util.control.NonFatal final val OutClosed = 32 final val InFailed = 64 + final val PushingInClosedOutClosed = Pushing | InClosed | OutClosed + final val PullingInClosedOutClosed = Pulling | OutClosed | InClosed + final val OutClosedInClosed = OutClosed | InClosed + final val PullStartFlip = 3 // 0011 final val PullEndFlip = 10 // 1010 final val PushStartFlip = 12 //1100 @@ -716,69 +720,12 @@ private[akka] final class GraphInterpreter31Impl( while (eventsRemaining > 0 && queueTail != queueHead) { val connection = dequeue() eventsRemaining -= 1 - chaseCounter = math.min(ChaseLimit, eventsRemaining) - /* - * This is the "normal" event processing code which dequeues directly from the internal event queue. Since - * most execution paths tend to produce either a Push that will be propagated along a longer chain we take - * extra steps below to make this more efficient. - */ try processEvent(connection) catch { case NonFatal(e) ⇒ reportStageError(e) } afterStageHasRun(activeStage) - - /* - * "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or - * Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event - * dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path - * instead. Looking at the scenario of a Push, the following events will happen. - * - "normal" dispatch executes an onPush event - * - stage eventually calls push() - * - code inside the push() method checks the validity of the call, and also if it can be safely ignored - * (because the target stage already completed we just have not been notified yet) - * - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush - * variable - * - the loop below immediately captures this push and dispatches it - * - * What is saved by this optimization is three steps: - * - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing - * pressure on array load-store - * - no need to dequeue the Connection from the queue, similar to above - * - no need to decode the event, we know it is a Push already - * - no need to check for validity of the event because we already checked at the push() call, and there - * can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is - * called in the target stage just before the onPush() arrives). This avoids unnecessary branching. - */ - - // Chasing PUSH events - while (chasedPush != NoEvent) { - val connection = chasedPush - chasedPush = NoEvent - try processPush(connection) - catch { - case NonFatal(e) ⇒ reportStageError(e) - } - afterStageHasRun(activeStage) - } - - // Chasing PULL events - while (chasedPull != NoEvent) { - val connection = chasedPull - chasedPull = NoEvent - try processPull(connection) - catch { - case NonFatal(e) ⇒ reportStageError(e) - } - afterStageHasRun(activeStage) - } - - if (chasedPush != NoEvent) { - enqueue(chasedPush) - chasedPush = NoEvent - } - } // Event *must* be enqueued while not in the execute loop (events enqueued from external, possibly async events) chaseCounter = 0 @@ -800,15 +747,15 @@ private[akka] final class GraphInterpreter31Impl( // Manual fast decoding, fast paths are PUSH and PULL // PUSH - if ((code & (Pushing | InClosed | OutClosed)) == Pushing) { + if ((code & PushingInClosedOutClosed) == Pushing) { processPush(connection) // PULL - } else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) { + } else if ((code & PullingInClosedOutClosed) == Pulling) { processPull(connection) // CANCEL - } else if ((code & (OutClosed | InClosed)) == InClosed) { + } else if ((code & OutClosedInClosed) == InClosed) { activeStage = connection.outOwner if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") connection.portState |= OutClosed From 0e059d9d5a932ec8e78fa8476dfeb33677338a90 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 28 Mar 2017 14:53:49 +0200 Subject: [PATCH 5/6] replace Wakeup in SelectorHandler by waiting longer by default --- .../src/main/scala/akka/io/SelectionHandler.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 341d35c3ff7..7b8bc5a45d1 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -106,13 +106,13 @@ private[io] object SelectionHandler { private class ChannelRegistryImpl(executionContext: ExecutionContext, log: LoggingAdapter) extends ChannelRegistry { private[this] val selector = SelectorProvider.provider.openSelector - private[this] val wakeUp = new AtomicBoolean(false) + //private[this] val wakeUp = new AtomicBoolean(false) final val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant private[this] val select = new Task { def tryRun(): Unit = { - if (selector.select() > 0) { // This assumes select return value == selectedKeys.size + if (selector.select(1) > 0) { // This assumes select return value == selectedKeys.size val keys = selector.selectedKeys val iterator = keys.iterator() while (iterator.hasNext) { @@ -140,7 +140,7 @@ private[io] object SelectionHandler { } keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected } - wakeUp.set(false) + //wakeUp.set(false) } override def run(): Unit = @@ -208,8 +208,8 @@ private[io] object SelectionHandler { private def execute(task: Task): Unit = { executionContext.execute(task) - if (wakeUp.compareAndSet(false, true)) // if possible avoid syscall and trade off with LOCK CMPXCHG - selector.wakeup() + //if (wakeUp.compareAndSet(false, true)) // if possible avoid syscall and trade off with LOCK CMPXCHG + //selector.wakeup() } // FIXME: Add possibility to signal failure of task to someone From 2c216764c5930ebf574fd38d9bfaa584ac16459c Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 30 Mar 2017 19:08:08 +0200 Subject: [PATCH 6/6] allow fast-path for ByteBuffer rendering in TCP (to safe buffer copies) --- .../main/scala/akka/io/TcpConnection.scala | 26 +++++++++++-------- .../src/main/scala/akka/util/ByteString.scala | 20 +++++++++++++- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 80606d4d61e..24987d0a817 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -373,18 +373,22 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha create(write) } - def PendingBufferWrite(commander: ActorRef, data: ByteString, ack: Event, tail: WriteCommand): PendingBufferWrite = { - val buffer = bufferPool.acquire() - try { - val copied = data.copyToBuffer(buffer) - buffer.flip() - new PendingBufferWrite(commander, data.drop(copied), ack, buffer, tail) - } catch { - case NonFatal(e) ⇒ - bufferPool.release(buffer) - throw e + def PendingBufferWrite(commander: ActorRef, data: ByteString, ack: Event, tail: WriteCommand): PendingBufferWrite = + data match { + case b: ByteString.ByteString1CByteBuffer ⇒ + new PendingBufferWrite(commander, ByteString.empty, ack, data.asByteBuffer, tail) + case _ ⇒ + val buffer = bufferPool.acquire() + try { + val copied = data.copyToBuffer(buffer) + buffer.flip() + new PendingBufferWrite(commander, data.drop(copied), ack, buffer, tail) + } catch { + case NonFatal(e) ⇒ + bufferPool.release(buffer) + throw e + } } - } class PendingBufferWrite( val commander: ActorRef, diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 325f52dc65e..760ad37d22b 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -12,7 +12,7 @@ import scala.annotation.{ tailrec, varargs } import scala.collection.IndexedSeqOptimized import scala.collection.mutable.{ Builder, WrappedArray } import scala.collection.immutable -import scala.collection.immutable.{ IndexedSeq, VectorBuilder, VectorIterator } +import scala.collection.immutable.{ IndexedSeq, Iterable, VectorBuilder, VectorIterator } import scala.collection.generic.CanBuildFrom import scala.reflect.ClassTag import java.nio.charset.{ Charset, StandardCharsets } @@ -116,6 +116,24 @@ object ByteString { } } + final class ByteString1CByteBuffer(byteBuffer: ByteBuffer) extends CompactByteString { + def length: Int = byteBuffer.limit() + def apply(idx: Int): Byte = byteBuffer.get(idx) + + private[akka] def byteStringCompanion: Companion = convertToRegular.byteStringCompanion + private[akka] def writeToOutputStream(os: ObjectOutputStream): Unit = convertToRegular.writeToOutputStream(os) + + def ++(that: ByteString): ByteString = convertToRegular ++ that + + def asByteBuffer: ByteBuffer = byteBuffer + def asByteBuffers: Iterable[ByteBuffer] = byteBuffer :: Nil + + def convertToRegular: ByteString = ByteString.fromByteBuffer(byteBuffer) + + def decodeString(charset: String): String = convertToRegular.decodeString(charset) + def decodeString(charset: Charset): String = convertToRegular.decodeString(charset) + } + /** * A compact (unsliced) and unfragmented ByteString, implementation of ByteString1C. */