Implement HappyEyeballs for NettyConnectionPool (#1996)#3876
Implement HappyEyeballs for NettyConnectionPool (#1996)#3876
Conversation
✅ Deploy Preview for zio-http ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
8bc08ea to
fcddfc8
Compare
| connectToAddress( | ||
| addresses.head, | ||
| channelFactory, | ||
| eventLoopGroup, | ||
| location, | ||
| initializer, | ||
| connectionTimeout, | ||
| localAddress, | ||
| ).onExit { | ||
| case e: Exit.Success[JChannel] => successful.update(channels => channels :+ e.value) | ||
| case _: Exit.Failure[_] => lastFailed.offer(()).unit | ||
| }, |
There was a problem hiding this comment.
Maybe put all of this in a function and reuse it to make the code a bit more readable:
connect0 = connectToAddress(
_,
channelFactory,
eventLoopGroup,
location,
initializer,
connectionTimeout,
localAddress,
).onExit {
case e: Exit.Success[JChannel] => successful.update(channels => channels :+ e.value)
case _: Exit.Failure[_] => lastFailed.offer(()).unit
}| case ch: Some[JChannel] => ZIO.succeed(ch.value) | ||
| case None => ZIO.fail(new RuntimeException("All connection attempts failed")) | ||
| } | ||
| _ <- ZIO.foreachDiscard(channels.tail)(ch => ZIO.ignore(ch.close())) |
There was a problem hiding this comment.
There's no guarantee that the channel we selected via the collectFirst will be at the head of the list (since the first one might fail the c.isOpen condition)
| case _: Exit.Failure[_] => lastFailed.offer(()).unit | ||
| }, | ||
| addresses.tail.zipWithIndex.map { case (address, index) => | ||
| ZIO.sleep(HappyEyeballsDelay * index.toDouble).raceFirst(lastFailed.take).ignore *> |
There was a problem hiding this comment.
I'm not sure if the usage of a Queue here is intended or if it's a bug.
With a Queue, only one of the fibers will be "interrupted" via raceFirst(lastFailed.take) because only a single fiber can poll the item from the queue. Is this a bug? If this is the intended operation, then why do we want to "interrupt" a single connection attempt only?
| ).onExit { | ||
| case e: Exit.Success[JChannel] => successful.update(channels => channels :+ e.value) | ||
| case _: Exit.Failure[_] => lastFailed.offer(()).unit | ||
| }, |
There was a problem hiding this comment.
Don't we need raceFirst(lastFailed.take).ignore here as well?
| connectionTimeout, | ||
| localAddress, | ||
| ).onExit { | ||
| case e: Exit.Success[JChannel] => successful.update(channels => channels :+ e.value) |
There was a problem hiding this comment.
Previously I described how there's an issue here but perhaps my comment wasn't clear enough. What I meant was that since the release of the connections is managed by the Scope, you might end up in a case that the connection is acquired but not released until the Scope is closed, even if that connection lost the race.
Hope this reproducer helps to understand the issue:
import zio.*
final class Foo(val i: Int) {
def acquired() = println(s"acquired: $i")
def released() = println(s"released: $i")
}
object Main extends ZIOAppDefault {
val sleepRandom =
ZIO.randomWith(_.nextIntBounded(100).flatMap(i => ZIO.sleep(i.millis)))
def resource(i: Int) = {
val res = ZIO.acquireRelease(ZIO.succeed {
val foo = new Foo(i)
foo.acquired()
foo
})(f => ZIO.succeed(f.released()))
(res.map(_.i) <* sleepRandom <* ZIO.debug(s"initialized: $i")).onInterrupt(ZIO.debug(s"interrupted: $i"))
}
def run = {
val rs = (1 to 4).map(resource).toList
val f =
for {
won <- ZIO.raceAll(rs.head, rs.tail)
_ <- ZIO.sleep(10.millis)
_ <- ZIO.debug(s"doing some stuff with resource: $won")
_ <- ZIO.sleep(5.seconds)
_ <- ZIO.debug("Finished work")
} yield ()
ZIO.scoped(f)
}
}This prints the following which is really problematic, because released: 2 should be printed before the "Finished work" message.
acquired: 2
acquired: 1
initialized: 1
interrupted: 2
doing some stuff with resource: 1
Finished work
released: 1
released: 2
There was a problem hiding this comment.
Pull request overview
This PR implements the Happy Eyeballs algorithm (RFC 8305) for NettyConnectionPool to improve connection reliability and performance by racing IPv6 and IPv4 connection attempts with staggered delays.
Changes:
- Replaced sequential connection attempts with the Happy Eyeballs algorithm that races multiple address families
- Added address sorting logic to prioritize IPv6 while alternating between address families
- Refactored connection logic to support both managed and unmanaged channel creation
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| NettyConnectionPool.scala | Implements Happy Eyeballs connection algorithm with address sorting, concurrent connection racing, and proper channel lifecycle management |
| NettyConnectionPoolSpec.scala | Adds comprehensive test suite for address sorting logic covering various IPv4/IPv6 combinations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _ <- ZIO.raceAll( | ||
| attemptConnect(addresses.head), | ||
| addresses.tail.zipWithIndex.map { case (address, index) => | ||
| ZIO.sleep(HappyEyeballsDelay * (index + 1).toDouble).raceFirst(failedSub.take).ignore *> |
There was a problem hiding this comment.
The delay calculation uses (index + 1).toDouble which means the first tail address waits for 250ms * 2 = 500ms (index 0 becomes 2), then 750ms for the second, etc. This should likely be index.toDouble so the first tail address waits 0ms (starting immediately after the head), the second waits 250ms, and so on. The current implementation introduces an unnecessary extra delay.
| ZIO.sleep(HappyEyeballsDelay * (index + 1).toDouble).raceFirst(failedSub.take).ignore *> | |
| ZIO.sleep(HappyEyeballsDelay * index.toDouble).raceFirst(failedSub.take).ignore *> |
| _ <- ZIO.raceAll( | ||
| attemptConnect(addresses.head), | ||
| addresses.tail.zipWithIndex.map { case (address, index) => | ||
| ZIO.sleep(HappyEyeballsDelay * (index + 1).toDouble).raceFirst(failedSub.take).ignore *> | ||
| attemptConnect(address) | ||
| }, | ||
| ) |
There was a problem hiding this comment.
The raceAll terminates as soon as the first connection succeeds, but ongoing connection attempts continue running in the background. These lingering fibers could establish connections that are then immediately closed at lines 284-286, creating unnecessary network overhead. Consider interrupting pending connection attempts when the first one succeeds by using fiber-based racing with explicit interruption.
| _ <- ZIO.raceAll( | |
| attemptConnect(addresses.head), | |
| addresses.tail.zipWithIndex.map { case (address, index) => | |
| ZIO.sleep(HappyEyeballsDelay * (index + 1).toDouble).raceFirst(failedSub.take).ignore *> | |
| attemptConnect(address) | |
| }, | |
| ) | |
| // Build all connection attempt effects (first immediately, others with Happy Eyeballs delays) | |
| attempts = attemptConnect(addresses.head) :: | |
| addresses.tail.zipWithIndex.map { case (address, index) => | |
| ZIO | |
| .sleep(HappyEyeballsDelay * (index + 1).toDouble) | |
| .raceFirst(failedSub.take) | |
| .ignore *> | |
| attemptConnect(address) | |
| } | |
| // Fork each attempt into its own fiber so we can explicitly interrupt the losers | |
| fibers <- ZIO.foreach(attempts)(_.fork) | |
| // Race on joining the fibers; once one completes, interrupt all others | |
| _ <- ZIO | |
| .raceAll( | |
| fibers.head.join, | |
| fibers.tail.map(_.join), | |
| ) | |
| .onExit(_ => ZIO.foreachDiscard(fibers)(_.interrupt.ignore)) |
|
|
||
| channel <- channels.find(_.isOpen) match { | ||
| case Some(ch) => ZIO.succeed(ch) | ||
| case None => ZIO.fail(new RuntimeException("All connection attempts failed")) |
There was a problem hiding this comment.
The error message 'All connection attempts failed' doesn't provide useful debugging information. Consider including details such as the number of addresses attempted and the target host to help diagnose connection failures.
| case None => ZIO.fail(new RuntimeException("All connection attempts failed")) | |
| case None => | |
| val attemptedCount = addresses.size | |
| val targetDesc = | |
| addresses.headOption | |
| .map(addr => s"${addr.getHostName}/${addr.getHostAddress}") | |
| .getOrElse("unknown target") | |
| ZIO.fail( | |
| new RuntimeException( | |
| s"All connection attempts failed for $targetDesc (attempted $attemptedCount address(es))", | |
| ), | |
| ) |
| )(implicit trace: Trace): ZIO[Scope, Throwable, JChannel] = { | ||
|
|
||
| if (resolvedHosts.isEmpty) { | ||
| ZIO.fail(new RuntimeException("No addresses to connect to")) |
There was a problem hiding this comment.
The error message 'No addresses to connect to' lacks context. Consider including the hostname or location that failed to resolve to help with debugging DNS resolution issues.
| ZIO.fail(new RuntimeException("No addresses to connect to")) | |
| ZIO.fail(new RuntimeException(s"No addresses to connect to for location: $location")) |
| } | ||
|
|
||
| _ <- ZIO.foreachDiscard(channels.filterNot(_ eq channel)) { ch => | ||
| ZIO.succeed(ch.close()).ignore |
There was a problem hiding this comment.
| ZIO.succeed(ch.close()).ignore | |
| ZIO.ignore(ch.close()) |
There was a problem hiding this comment.
Done — applied ZIO.ignore(ch.close()) as suggested.
| ) | ||
|
|
||
| for { | ||
| failedHub <- Hub.dropping[Unit](requestedCapacity = 1) |
There was a problem hiding this comment.
I might have missed the previous response for this, but isn't a Promise better suited for this? Since we only care about fulfilling it once I think it's a better implementation for this, wdut?
There was a problem hiding this comment.
Good call — switched to Promise.make[Nothing, Unit]. The one-shot semantics are actually what we want here: per RFC 8305 Section 5, a failure should trigger the next staggered attempt to start immediately, but should NOT cancel or affect other running attempts. Promise.succeed(()).unit is idempotent, so subsequent failures are no-ops. This also let me remove the ZIO.scoped wrapper that was only needed for Hub.subscribe.
|
|
||
| NettyFutureExecutor | ||
| .executed(channelFuture) | ||
| .onInterrupt(ZIO.succeed(channelFuture.cancel(true)) *> ZIO.succeed(ch.close()).ignore) |
There was a problem hiding this comment.
| .onInterrupt(ZIO.succeed(channelFuture.cancel(true)) *> ZIO.succeed(ch.close()).ignore) | |
| .onInterrupt(ZIO.ignore { channelFuture.cancel(true); ch.close() }) |
There was a problem hiding this comment.
Done — applied ZIO.ignore { channelFuture.cancel(true); ch.close() } as suggested.
| _ <- ZIO.raceAll( | ||
| attemptConnect(addresses.head), | ||
| addresses.tail.zipWithIndex.map { case (address, index) => | ||
| ZIO.sleep(HappyEyeballsDelay * (index + 1).toDouble).raceFirst(failedSub.take).ignore *> |
There was a problem hiding this comment.
Is this the right behaviour according to the spec? In the case that a single connection attempt fails, should all the other ones be cancelled?
There was a problem hiding this comment.
Good question — per RFC 8305 Section 5: "Starting a new connection attempt does not affect previous attempts" and "Once one of the connection attempts succeeds, all other connections attempts that have not yet succeeded SHOULD be canceled." So failures should NOT cancel other attempts. The previous Hub-based approach was incorrect because Hub broadcasts to ALL subscribers simultaneously, meaning a single failure would unblock ALL waiting delays at once. Switched to Promise which has one-shot semantics — only the first failure unblocks the next waiting delay, subsequent failures are no-ops. The raceAll handles cancelling losers on success.
| initializer: ChannelInitializer[JChannel], | ||
| connectionTimeout: Option[Duration], | ||
| localAddress: Option[InetSocketAddress], | ||
| )(implicit trace: Trace): ZIO[Scope, Throwable, JChannel] = { |
There was a problem hiding this comment.
I feel like we need to control interruption more closely here (using uninterruptibleMask) and allow it only during NettyFutureExecutor.executed(channelFuture). Otherwise we're risking interruption creeping in between effect evaluations and leaking connections
There was a problem hiding this comment.
Good point — added ZIO.uninterruptibleMask { restore => ... } wrapping the entire connectToAddressUnmanaged body, with restore() applied only to NettyFutureExecutor.executed(channelFuture). This follows the same pattern used in ZioNettyConnectionPool.get and ensures the channel creation + onInterrupt handler registration happen in an uninterruptible region, preventing leaks.
Co-authored-by: kyri-petrou <67301607+kyri-petrou@users.noreply.github.com>
- Extract connect0 helper for code readability - Fix channels.tail bug: filter by identity, not position - Use Hub instead of Queue for broadcast failure notification - Fix resource leak: manually manage connections in race, close losers immediately - Add connectToAddressUnmanaged for race scenarios - Fix delay calculation for staggered attempts - Add comprehensive tests for sortAddresses
…revent connection leaks
9864b7f to
54df420
Compare
fixes #1996
/claim #1996