Skip to content

Commit 8bd0bec

Browse files
authored
Fail with unwrapped exception when using CompletionStage #2054
2 parents 3fb9856 + 344b5d8 commit 8bd0bec

File tree

2 files changed

+78
-8
lines changed

2 files changed

+78
-8
lines changed

akka-http-tests/src/test/java/akka/http/javadsl/server/CompleteTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.CompletableFuture;
1010
import java.util.concurrent.CompletionStage;
1111

12+
import akka.http.javadsl.model.StatusCodes;
1213
import org.junit.Test;
1314

1415
import akka.http.javadsl.marshallers.jackson.Jackson;
@@ -70,4 +71,64 @@ public void completeWithFuture() {
7071
.assertStatusCode(200)
7172
.assertEntity("42 + 23 = 65");
7273
}
74+
75+
76+
private ExceptionHandler customExceptionHandler() {
77+
return ExceptionHandler.newBuilder()
78+
.match(IllegalStateException.class, ex ->
79+
complete(StatusCodes.SERVICE_UNAVAILABLE, "Custom Error"))
80+
.build();
81+
}
82+
83+
private void checkRoute(Route route) {
84+
Route sealedRoute = route.seal(RejectionHandler.defaultHandler(), customExceptionHandler());
85+
runRoute(sealedRoute, HttpRequest.GET("/crash"))
86+
.assertStatusCode(StatusCodes.SERVICE_UNAVAILABLE)
87+
.assertEntity("Custom Error");
88+
}
89+
90+
@Test
91+
public void completeOKWithFutureStringFailing() {
92+
Route route = path("crash", () ->
93+
completeOKWithFutureString(CompletableFuture.supplyAsync(() -> {
94+
throw new IllegalStateException("Boom!");
95+
})));
96+
checkRoute(route);
97+
}
98+
99+
@Test
100+
public void completeWithFutureStatusFailing() {
101+
Route route = path("crash", () ->
102+
completeWithFutureStatus(CompletableFuture.supplyAsync(() -> {
103+
throw new IllegalStateException("Boom!");
104+
})));
105+
checkRoute(route);
106+
}
107+
108+
@Test
109+
public void completeWithFutureFailing() {
110+
Route route = path("crash", () ->
111+
completeWithFuture(CompletableFuture.supplyAsync(() -> {
112+
throw new IllegalStateException("Boom!");
113+
})));
114+
checkRoute(route);
115+
}
116+
117+
@Test
118+
public void completeOKWithFutureFailing() {
119+
Route route = path("crash", () ->
120+
completeOKWithFuture(CompletableFuture.supplyAsync(() -> {
121+
throw new IllegalStateException("Boom!");
122+
})));
123+
checkRoute(route);
124+
}
125+
126+
@Test
127+
public void completeOKWithFutureTFailing() {
128+
Route route = path("crash", () ->
129+
completeOKWithFuture(CompletableFuture.<Integer>supplyAsync(() -> {
130+
throw new IllegalStateException("Boom!");
131+
}), Jackson.marshaller()));
132+
checkRoute(route);
133+
}
73134
}

akka-http/src/main/scala/akka/http/javadsl/server/directives/RouteDirectives.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package akka.http.javadsl.server.directives
66

7-
import java.util.concurrent.CompletionStage
7+
import java.util.concurrent.{ CompletionException, CompletionStage }
88

99
import akka.dispatch.ExecutionContexts
1010
import akka.http.javadsl.marshalling.Marshaller
@@ -20,7 +20,7 @@ import akka.http.javadsl.model.RequestEntity
2020
import akka.http.javadsl.model.ResponseEntity
2121
import akka.http.javadsl.model.StatusCode
2222
import akka.http.javadsl.model.Uri
23-
import akka.http.javadsl.server.{ RoutingJavaMapping, Rejection, Route }
23+
import akka.http.javadsl.server.{ Rejection, Route, RoutingJavaMapping }
2424
import akka.http.scaladsl
2525
import akka.http.scaladsl.marshalling.Marshaller._
2626
import akka.http.scaladsl.marshalling.ToResponseMarshallable
@@ -239,47 +239,56 @@ abstract class RouteDirectives extends RespondWithDirectives {
239239
*/
240240
@CorrespondsTo("complete")
241241
def completeWithFuture(value: CompletionStage[HttpResponse]) = RouteAdapter {
242-
D.complete(value.asScala.fast.map(_.asScala))
242+
D.complete(value.asScala.fast.map(_.asScala).recover(unwrapCompletionException))
243243
}
244244

245245
/**
246246
* Completes the request by marshalling the given future value into an http response.
247247
*/
248248
@CorrespondsTo("complete")
249249
def completeOKWithFuture(value: CompletionStage[RequestEntity]) = RouteAdapter {
250-
D.complete(value.asScala.fast.map(_.asScala))
250+
D.complete(value.asScala.fast.map(_.asScala).recover(unwrapCompletionException))
251251
}
252252

253253
/**
254254
* Completes the request by marshalling the given future value into an http response.
255255
*/
256256
@CorrespondsTo("complete")
257257
def completeOKWithFutureString(value: CompletionStage[String]) = RouteAdapter {
258-
D.complete(value.asScala)
258+
D.complete(value.asScala.recover(unwrapCompletionException))
259259
}
260260

261261
/**
262262
* Completes the request using the given future status code.
263263
*/
264264
@CorrespondsTo("complete")
265265
def completeWithFutureStatus(status: CompletionStage[StatusCode]): Route = RouteAdapter {
266-
D.complete(status.asScala.fast.map(_.asScala))
266+
D.complete(status.asScala.fast.map(_.asScala).recover(unwrapCompletionException))
267267
}
268268

269269
/**
270270
* Completes the request with an `OK` status code by marshalling the given value into an http response.
271271
*/
272272
@CorrespondsTo("complete")
273273
def completeOKWithFuture[T](value: CompletionStage[T], marshaller: Marshaller[T, RequestEntity]) = RouteAdapter {
274-
D.complete(value.asScala.fast.map(v ToResponseMarshallable(v)(fromToEntityMarshaller()(marshaller))))
274+
D.complete(value.asScala.fast.map(v ToResponseMarshallable(v)(fromToEntityMarshaller()(marshaller))).recover(unwrapCompletionException))
275275
}
276276

277277
/**
278278
* Completes the request by marshalling the given value into an http response.
279279
*/
280280
@CorrespondsTo("complete")
281281
def completeWithFuture[T](value: CompletionStage[T], marshaller: Marshaller[T, HttpResponse]) = RouteAdapter {
282-
D.complete(value.asScala.fast.map(v ToResponseMarshallable(v)(marshaller)))
282+
D.complete(value.asScala.fast.map(v ToResponseMarshallable(v)(marshaller)).recover(unwrapCompletionException))
283+
}
284+
285+
// TODO: This might need to be raised as an issue to scala-java8-compat instead.
286+
// Right now, having this in Java:
287+
// CompletableFuture.supplyAsync(() -> { throw new IllegalArgumentException("always failing"); })
288+
// will in fact fail the future with CompletionException.
289+
private def unwrapCompletionException[T]: PartialFunction[Throwable, T] = {
290+
case x: CompletionException if x.getCause ne null
291+
throw x.getCause
283292
}
284293

285294
}

0 commit comments

Comments
 (0)