@@ -48,9 +48,11 @@ object Finagle {
4848 .make(allocate(svc)) { _ =>
4949 Async [F ].delay(())
5050 }
51- def mkService [F [_]: Async ](route : HttpApp [F ])(implicit ec : ExecutionContext ): Resource [F , Service [Req , Resp ]] =
52- Dispatcher .parallel[F ] map { dispatcher =>
53- (req : Req ) => toFuture(dispatcher, route.local(toHttp4sReq[F ]).flatMapF(fromHttp4sResponse[F ]).run(req))
51+ def mkService [F [_]: Async ](
52+ route : HttpApp [F ]
53+ )(implicit ec : ExecutionContext ): Resource [F , Service [Req , Resp ]] =
54+ Dispatcher .parallel[F ].map { dispatcher => (req : Req ) =>
55+ toFuture(dispatcher, route.local(toHttp4sReq[F ]).flatMapF(fromHttp4sResponse[F ]).run(req))
5456 }
5557
5658 private def allocate [F [_]: Async ](svc : Service [Req , Resp ]): F [Client [F ]] =
@@ -66,7 +68,9 @@ object Finagle {
6668 private def toHttp4sReq [F [_]](req : Req ): Request [F ] = {
6769 val method = H4Method .fromString(req.method.name).getOrElse(H4Method .GET )
6870 val uri = Uri .unsafeFromString(req.uri)
69- val headers = Headers (req.headerMap.toList.map { case (name, value) => Header .Raw (CIString (name), value) })
71+ val headers = Headers (req.headerMap.toList.map { case (name, value) =>
72+ Header .Raw (CIString (name), value)
73+ })
7074 val body = toStream[F ](req.content)
7175 val version = HttpVersion
7276 .fromVersion(req.version.major, req.version.minor)
@@ -79,14 +83,14 @@ object Finagle {
7983 val status = Status (resp.status.code)
8084 val headers = resp.headers.headers.map(h => (h.name.show, h.value))
8185 val finagleResp = Resp (status)
82- headers.foreach{ case (k, v) => finagleResp.headerMap.add(k,v) }
86+ headers.foreach { case (k, v) => finagleResp.headerMap.add(k, v) }
8387 val writeBody = if (resp.isChunked) {
8488 finagleResp.setChunked(true )
8589 Concurrent [F ].start(streamBody(resp.body, finagleResp.writer).compile.drain).void
8690 } else {
8791 resp
8892 .as[Array [Byte ]]
89- .map { Buf .ByteArray .Owned (_) }
93+ .map( Buf .ByteArray .Owned (_))
9094 .map(finagleResp.content = _)
9195 .void
9296 }
@@ -98,13 +102,13 @@ object Finagle {
98102 val version = Version (req.httpVersion.major, req.httpVersion.minor)
99103 val request = Req (version, method, req.uri.toString)
100104 req.uri.host.foreach(uri => request.headerMap.add(Fields .Host , uri.value))
101- req.uri.userInfo.foreach{ user =>
105+ req.uri.userInfo.foreach { user =>
102106 val repr = user.username ++ user.password.fold(" " )(" :" ++ _)
103107 val auth = " Basic " + Base64StringEncoder .encode(repr.getBytes(StandardCharsets .UTF_8 ))
104108 request.headerMap.add(Fields .Authorization , auth)
105109 }
106- req.headers.headers.foreach { h=>
107- request.headerMap.add(h.name.show, h.value)
110+ req.headers.headers.foreach { h =>
111+ request.headerMap.add(h.name.show, h.value)
108112 }
109113
110114 if (req.isChunked) {
@@ -113,7 +117,7 @@ object Finagle {
113117 Spawn [F ].start(streamBody(req.body, request.writer).compile.drain) *> Async [F ].delay(request)
114118 } else {
115119 req.as[Array [Byte ]].map { b =>
116- if (b.nonEmpty) {
120+ if (b.nonEmpty) {
117121 val content = Buf .ByteArray .Owned (b)
118122 request.content = content
119123 request.contentLength = content.length.longValue()
@@ -124,43 +128,48 @@ object Finagle {
124128
125129 private def streamBody [F [_]: Async ](
126130 body : Stream [F , Byte ],
127- writer : Writer [Buf ]): Stream [F , Unit ] = {
131+ writer : Writer [Buf ],
132+ ): Stream [F , Unit ] = {
128133 import com .twitter .finagle .http .Chunk
129134 body.chunks.map(a => Chunk .fromByteArray(a.toArray).content).evalMap { a =>
130135 toF(writer.write(a))
131- } ++ Stream .eval { toF(writer.close()) }
136+ } ++ Stream .eval( toF(writer.close()))
132137 }
133138
134139 private def toStream [F [_]](buf : Buf ): Stream [F , Byte ] =
135140 Stream .chunk[F , Byte ](Chunk .array(Buf .ByteArray .Owned .extract(buf)))
136141
137- private def toHttp4sResp [F [_]](resp : Resp ): Response [F ] = {
142+ private def toHttp4sResp [F [_]](resp : Resp ): Response [F ] =
138143 Status .fromInt(resp.status.code) match {
139144 case Right (status) =>
140145 Response [F ](
141146 status
142- ).withHeaders(Headers (resp.headerMap.toList.map { case (name, value) => Header .Raw (CIString (name), value) }))
147+ ).withHeaders(Headers (resp.headerMap.toList.map { case (name, value) =>
148+ Header .Raw (CIString (name), value)
149+ }))
143150 .withEntity(toStream[F ](resp.content))
144- case Left (parseFailure) => parseFailure.toHttpResponse(HttpVersion (resp.version.major, resp.version.minor))
151+ case Left (parseFailure) =>
152+ parseFailure.toHttpResponse(HttpVersion (resp.version.major, resp.version.minor))
145153 }
146- }
147154
148155 private def toF [F [_]: Async , A ](f : Future [A ]): F [A ] = Async [F ].async_ { cb =>
149- f.respond {
150- case Return (value) =>
151- cb(Right (value))
152- case Throw (exception) =>
153- cb(Left (exception))
154- }
155- ()
156+ f.respond {
157+ case Return (value) =>
158+ cb(Right (value))
159+ case Throw (exception) =>
160+ cb(Left (exception))
156161 }
162+ ()
163+ }
157164
158- private def toFuture [F [_], A ](dispatcher : Dispatcher [F ], f : F [A ])(implicit ec : ExecutionContext ): Future [A ] = {
159- val promise : Promise [A ] = Promise ()
160- dispatcher.unsafeToFuture(f).onComplete {
161- case Success (value) => promise.setValue(value)
162- case Failure (exception) => promise.setException(exception)
163- }
165+ private def toFuture [F [_], A ](dispatcher : Dispatcher [F ], f : F [A ])(implicit
166+ ec : ExecutionContext
167+ ): Future [A ] = {
168+ val promise : Promise [A ] = Promise ()
169+ dispatcher.unsafeToFuture(f).onComplete {
170+ case Success (value) => promise.setValue(value)
171+ case Failure (exception) => promise.setException(exception)
172+ }
164173 promise
165174 }
166175}
0 commit comments