Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential performance improvements helping Akka Http #1

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions akka-actor/src/main/scala/akka/io/SelectionHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ private[io] object SelectionHandler {

private class ChannelRegistryImpl(executionContext: ExecutionContext, log: LoggingAdapter) extends ChannelRegistry {
private[this] val selector = SelectorProvider.provider.openSelector
private[this] val wakeUp = new AtomicBoolean(false)
//private[this] val wakeUp = new AtomicBoolean(false)

final val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant

private[this] val select = new Task {
def tryRun(): Unit = {
if (selector.select() > 0) { // This assumes select return value == selectedKeys.size
if (selector.select(1) > 0) { // This assumes select return value == selectedKeys.size
val keys = selector.selectedKeys
val iterator = keys.iterator()
while (iterator.hasNext) {
Expand Down Expand Up @@ -140,7 +140,7 @@ private[io] object SelectionHandler {
}
keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected
}
wakeUp.set(false)
//wakeUp.set(false)
}

override def run(): Unit =
Expand Down Expand Up @@ -208,8 +208,8 @@ private[io] object SelectionHandler {

private def execute(task: Task): Unit = {
executionContext.execute(task)
if (wakeUp.compareAndSet(false, true)) // if possible avoid syscall and trade off with LOCK CMPXCHG
selector.wakeup()
//if (wakeUp.compareAndSet(false, true)) // if possible avoid syscall and trade off with LOCK CMPXCHG
//selector.wakeup()
}

// FIXME: Add possibility to signal failure of task to someone
Expand Down
26 changes: 15 additions & 11 deletions akka-actor/src/main/scala/akka/io/TcpConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -373,18 +373,22 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
create(write)
}

def PendingBufferWrite(commander: ActorRef, data: ByteString, ack: Event, tail: WriteCommand): PendingBufferWrite = {
val buffer = bufferPool.acquire()
try {
val copied = data.copyToBuffer(buffer)
buffer.flip()
new PendingBufferWrite(commander, data.drop(copied), ack, buffer, tail)
} catch {
case NonFatal(e) ⇒
bufferPool.release(buffer)
throw e
def PendingBufferWrite(commander: ActorRef, data: ByteString, ack: Event, tail: WriteCommand): PendingBufferWrite =
data match {
case b: ByteString.ByteString1CByteBuffer ⇒
new PendingBufferWrite(commander, ByteString.empty, ack, data.asByteBuffer, tail)
case _ ⇒
val buffer = bufferPool.acquire()
try {
val copied = data.copyToBuffer(buffer)
buffer.flip()
new PendingBufferWrite(commander, data.drop(copied), ack, buffer, tail)
} catch {
case NonFatal(e) ⇒
bufferPool.release(buffer)
throw e
}
}
}

class PendingBufferWrite(
val commander: ActorRef,
Expand Down
20 changes: 19 additions & 1 deletion akka-actor/src/main/scala/akka/util/ByteString.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.annotation.{ tailrec, varargs }
import scala.collection.IndexedSeqOptimized
import scala.collection.mutable.{ Builder, WrappedArray }
import scala.collection.immutable
import scala.collection.immutable.{ IndexedSeq, VectorBuilder, VectorIterator }
import scala.collection.immutable.{ IndexedSeq, Iterable, VectorBuilder, VectorIterator }
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
import java.nio.charset.{ Charset, StandardCharsets }
Expand Down Expand Up @@ -116,6 +116,24 @@ object ByteString {
}
}

final class ByteString1CByteBuffer(byteBuffer: ByteBuffer) extends CompactByteString {
def length: Int = byteBuffer.limit()
def apply(idx: Int): Byte = byteBuffer.get(idx)

private[akka] def byteStringCompanion: Companion = convertToRegular.byteStringCompanion
private[akka] def writeToOutputStream(os: ObjectOutputStream): Unit = convertToRegular.writeToOutputStream(os)

def ++(that: ByteString): ByteString = convertToRegular ++ that

def asByteBuffer: ByteBuffer = byteBuffer
def asByteBuffers: Iterable[ByteBuffer] = byteBuffer :: Nil

def convertToRegular: ByteString = ByteString.fromByteBuffer(byteBuffer)

def decodeString(charset: String): String = convertToRegular.decodeString(charset)
def decodeString(charset: Charset): String = convertToRegular.decodeString(charset)
}

/**
* A compact (unsliced) and unfragmented ByteString, implementation of ByteString1C.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,25 @@ import scala.util.control.NonFatal

private var enqueueToShortCircuit: (Any) ⇒ Unit = _

lazy val interpreter: GraphInterpreter = new GraphInterpreter(mat, log, logics, connections,
(logic, event, handler) ⇒ {
val asyncInput = AsyncInput(this, logic, event, handler)
val currentInterpreter = GraphInterpreter.currentInterpreterOrNull
if (currentInterpreter == null || (currentInterpreter.context ne self))
self ! asyncInput
else enqueueToShortCircuit(asyncInput)
}, settings.fuzzingMode, self)
lazy val interpreter: GraphInterpreter =
if (connections.size == 31)
new GraphInterpreter31Impl(mat, log, logics, connections,
(logic, event, handler) ⇒ {
val asyncInput = AsyncInput(this, logic, event, handler)
val currentInterpreter = GraphInterpreter.currentInterpreterOrNull
if (currentInterpreter == null || (currentInterpreter.context ne self))
self ! asyncInput
else enqueueToShortCircuit(asyncInput)
}, settings.fuzzingMode, self)
else
new GraphInterpreterImpl(mat, log, logics, connections,
(logic, event, handler) ⇒ {
val asyncInput = AsyncInput(this, logic, event, handler)
val currentInterpreter = GraphInterpreter.currentInterpreterOrNull
if (currentInterpreter == null || (currentInterpreter.context ne self))
self ! asyncInput
else enqueueToShortCircuit(asyncInput)
}, settings.fuzzingMode, self)

// TODO: really needed?
private var subscribesPending = 0
Expand Down
Loading