diff --git a/TODO.org b/TODO.org index a63adeb3..0508d516 100644 --- a/TODO.org +++ b/TODO.org @@ -1,4 +1,4 @@ -* on deck +* completed ** DONE Add application codec to base pipeline @@ -10,15 +10,19 @@ ** DONE Add route applicator to http pipeline -** TODO Wrap Http/2 Request/Response objects +** DONE Wrap Http/2 Request/Response objects *** DONE Wrap Http2Headers *** DONE Http2ServerCodec -*** TODO Http2ClientCodec +*** DONE Http2ClientCodec + +** DONE Update proxy to use application request object + +* on deck -*** TODO Refactor Http1Headers and Http2Headers to use java8 interface default methods +** TODO Refactor Http1Headers and Http2Headers to use java8 interface default methods ** Authorization uses route pipeline @@ -34,13 +38,27 @@ * backlog -** Update proxy to use application request object +** ResponseBuilder that takes a Request to build a response for. + - pull in tracing and anything else the response would otherwise have to go look for + +** RequestBuilder that takes a Request + - used proxy logic + - propagate tracing ** connection pool that is optimized to worker loop allocation for a worker loop group of size N create at least N connections to the remote endpoint ** get rid of jetty helpers, replace them with MockWebServer +** setup errorprone + https://github.com/square/okhttp/blob/master/pom.xml#L148 + +** document how to write a netty handler + +*** it's ok to throw exceptions + +*** use ChannelFuture's cause() to catch exceptions + ** document immutable object testing concepts #+begin_src java public class ClassUnderTest { @@ -51,6 +69,130 @@ public class ClassUnderTest { } #+end_src +* in code + +#+begin_src shell :results output value drawer +git grep -n TODO src | sed -e 's/^/** /g' +#+end_src + +#+RESULTS: +:RESULTS: +** src/example/java/com/xjeffrose/xio/client/chicago/ChicagoNode.java:95: // TODO schedule a timeout to fail this write +** src/example/java/com/xjeffrose/xio/client/chicago/ChicagoNode.java:104: // TODO could maybe put a listener here to track successful writes +** src/example/java/com/xjeffrose/xio/client/chicago/XioChicagoClient.java:78: // TODO retry? +** src/main/java/com/xjeffrose/xio/SSL/X509CertificateGenerator.java:52: // TODO(JR): This is bad practice, we should fix this more elegantly +** src/main/java/com/xjeffrose/xio/SSL/X509CertificateGenerator.java:145: // TODO(JR): We should verify key after creation +** src/main/java/com/xjeffrose/xio/SSL/XioSecurityHandlerImpl.java:145: // TODO(JR): Fix this or only enable for certain service as this is insecure +** src/main/java/com/xjeffrose/xio/SSL/XioTrustManagerFactory.java:122: // TODO(CK): return our root certs here? +** src/main/java/com/xjeffrose/xio/application/Application.java:16: // TODO(CK): move this into ApplicationState +** src/main/java/com/xjeffrose/xio/application/Application.java:21: // TODO(CK): move this into ApplicationState +** src/main/java/com/xjeffrose/xio/application/ApplicationState.java:18: // TODO(CK): store ClientChannelConfiguration here as well +** src/main/java/com/xjeffrose/xio/bootstrap/XioServerBootstrap.java:43: // TODO(CK): refactor tests and remove this +** src/main/java/com/xjeffrose/xio/bootstrap/XioServerBootstrap.java:58: // TODO(CK): interrogate fragment for channel options +** src/main/java/com/xjeffrose/xio/client/ChannelConfiguration.java:16:// TODO(CK): this needs to move into the bootstrap package +** src/main/java/com/xjeffrose/xio/client/RequestMuxer.java:38:// TODO(CK): consider renaming this to something not including Request +** src/main/java/com/xjeffrose/xio/client/RequestMuxer.java:40: // TODO(CK): remove +** src/main/java/com/xjeffrose/xio/client/RequestMuxer.java:43: // TODO(CK): this isn't being used +** src/main/java/com/xjeffrose/xio/client/RequestMuxer.java:45: // TODO(CK): this should be a method +** src/main/java/com/xjeffrose/xio/client/RequestMuxer.java:95: // TODO(CK): fix this +** src/main/java/com/xjeffrose/xio/client/RequestMuxer.java:104: // TODO(CK): fix this +** src/main/java/com/xjeffrose/xio/client/RequestMuxer.java:147: // TODO(CK): fix this +** src/main/java/com/xjeffrose/xio/client/RequestMuxer.java:188: // TODO(CK): split out some of this complexity? +** src/main/java/com/xjeffrose/xio/client/XioClientBootstrap.java:80: // TODO(CK): This logic should be move outside of XioClientBootstrap to something HTTP related +** src/main/java/com/xjeffrose/xio/client/loadbalancer/Distributor.java:149: // TODO(CK): Not sure what to close +** src/main/java/com/xjeffrose/xio/client/loadbalancer/Node.java:83: // TODO(CK): This be passed in, we're not really taking advantage of pooling +** src/main/java/com/xjeffrose/xio/client/loadbalancer/Node.java:229: // TODO(CK): Not sure what to close +** src/main/java/com/xjeffrose/xio/client/loadbalancer/NodeHealthCheck.java:167: // TODO: close will happen after true ecv check is done +** src/main/java/com/xjeffrose/xio/core/ChannelStatistics.java:35: // TODO: Properly implement +** src/main/java/com/xjeffrose/xio/core/ShutdownUtil.java:39: // TODO: Find netty4 equivalent (may not be nessisary with shutdown gracefully) +** src/main/java/com/xjeffrose/xio/core/ShutdownUtil.java:46: // TODO : allow an option here to control if we need to drain connections and wait instead of +** src/main/java/com/xjeffrose/xio/core/ShutdownUtil.java:60: // TODO : make wait time configurable ? +** src/main/java/com/xjeffrose/xio/core/XioMessageLogger.java:6:// TODO(CK): Consider renaming this to either MessageLogger or XioLoggingHandler +** src/main/java/com/xjeffrose/xio/core/ZkClient.java:135: // TODO: I need to deal with the error better +** src/main/java/com/xjeffrose/xio/core/ZkClient.java:172: // TODO: I need to deal with the error better +** src/main/java/com/xjeffrose/xio/filter/IpFilter.java:8:// TODO(CK): emit user events when we filter +** src/main/java/com/xjeffrose/xio/http/DefaultFullResponse.java:9:// TODO(CK): Consolidate Full/Streaming Response Builder into a single builder +** src/main/java/com/xjeffrose/xio/http/EmptyHeaders.java:5:// TODO(CK): optimize emptiness with get calls that return null; +** src/main/java/com/xjeffrose/xio/http/Http1ClientCodec.java:41: // TODO(CK): Deal with null? +** src/main/java/com/xjeffrose/xio/http/Http1ClientCodec.java:61: // TODO(CK): throw an exception? +** src/main/java/com/xjeffrose/xio/http/Http1ClientCodec.java:107: // TODO(CK): TransferEncoding +** src/main/java/com/xjeffrose/xio/http/Http1ServerCodec.java:39: // TODO(CK): Deal with null? +** src/main/java/com/xjeffrose/xio/http/Http1ServerCodec.java:58: // TODO(CK): throw an exception? +** src/main/java/com/xjeffrose/xio/http/Http1ServerCodec.java:102: // TODO(CK): TransferEncoding +** src/main/java/com/xjeffrose/xio/http/Http2ClientCodec.java:30: // TODO(CK): Deal with null? +** src/main/java/com/xjeffrose/xio/http/Http2ClientCodec.java:62: // TODO(CK): throw an exception? +** src/main/java/com/xjeffrose/xio/http/Http2ClientCodec.java:105: int streamId = 0; // TODO(CK): need a no stream constant somewhere +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:12:// TODO(CK): break this out into client/server classes +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:84: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:90: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:95: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:109: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:114: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:125: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:132: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:138: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java:144: // TODO(CK): We don't currently have a use case for these frames +** src/main/java/com/xjeffrose/xio/http/Http2Handler.java:15:// TODO(CK): break this out into client/server classes +** src/main/java/com/xjeffrose/xio/http/Http2Handler.java:16:// TODO(CK): Rename this to Http2ServerHandler +** src/main/java/com/xjeffrose/xio/http/Http2Handler.java:96: // TODO(CK): This should be broken out into Http2ClientHandler +** src/main/java/com/xjeffrose/xio/http/Http2HandlerBuilder.java:14:// TODO(CK): break this out into client/server classes +** src/main/java/com/xjeffrose/xio/http/Http2ProxyHandler.java:48: // TODO(CK): This is a little goofy we only want to call close once for each implementation +** src/main/java/com/xjeffrose/xio/http/Http2ProxyRoute.java:22:// TODO(CK): This class should be given a pool of clients to use. +** src/main/java/com/xjeffrose/xio/http/Http2ProxyRoute.java:39: // TODO(CK): Remove this hack after xio client is refactored +** src/main/java/com/xjeffrose/xio/http/Http2ProxyRoute.java:127: // TODO(CK): How do we trace over http2? +** src/main/java/com/xjeffrose/xio/http/Http2RouteProvider.java:5:// TODO(CK): Refactor this after we find a way to unify HTTP/1 and HTTP/2 +** src/main/java/com/xjeffrose/xio/http/Http2RouteProvider.java:8: // TODO(CK): ChannelHandlerContext should come first +** src/main/java/com/xjeffrose/xio/http/Http2ServerCodec.java:29: // TODO(CK): Deal with null? +** src/main/java/com/xjeffrose/xio/http/Http2ServerCodec.java:60: // TODO(CK): throw an exception? +** src/main/java/com/xjeffrose/xio/http/ProxyBackendHandler.java:39: // TODO(CK): move this into a logger class +** src/main/java/com/xjeffrose/xio/http/ProxyBackendHandler.java:59: // TODO(CK): this should really be some sort of notification to the frontend +** src/main/java/com/xjeffrose/xio/http/ProxyHandler.java:39: // TODO(CK): this handler should be notifying a connection pool on release +** src/main/java/com/xjeffrose/xio/http/ProxyHandler.java:54: // TODO(CK): this should really be requesting a client from a pool +** src/main/java/com/xjeffrose/xio/http/RawBackendHandler.java:26: // TODO(CK): move this into a logger class +** src/main/java/com/xjeffrose/xio/http/RawBackendHandler.java:45: // TODO(CK): this should really be some sort of notification to the frontend +** src/main/java/com/xjeffrose/xio/http/Request.java:14: // TODO(CK): move this here from StreamingData? +** src/main/java/com/xjeffrose/xio/http/ResponseBuilders.java:9: // TODO(CK): move this into the builder? +** src/main/java/com/xjeffrose/xio/http/internal/Http1Headers.java:19:// TODO(CK): Rename this to Http1HeadersWrapper +** src/main/java/com/xjeffrose/xio/http/internal/Http1Headers.java:594: TODO(CK): maybe move this to request/response +** src/main/java/com/xjeffrose/xio/http/internal/Http1Request.java:12:// TODO(CK): Rename this to StreamingHttp1Request +** src/main/java/com/xjeffrose/xio/http/internal/Http1Response.java:11:// TODO(CK): Rename this to StreamingHttp1Response +** src/main/java/com/xjeffrose/xio/mux/ConnectionPool.java:18: // TODO(CK): move to config +** src/main/java/com/xjeffrose/xio/mux/ConnectionPool.java:42: // TODO(CK): this error needs to get bubbled back up to the requestor +** src/main/java/com/xjeffrose/xio/mux/ConnectionPool.java:50: // TODO(CK): handle failures and retry +** src/main/java/com/xjeffrose/xio/mux/ConnectionPool.java:79: // TODO(CK): this error needs to get bubbled back up to the requestor +** src/main/java/com/xjeffrose/xio/mux/ConnectionPool.java:100: // TODO(CK): change this to a not and get rid of the else +** src/main/java/com/xjeffrose/xio/mux/ConnectionPool.java:113: // TODO(CK): this error needs to get bubbled back up to the requestor +** src/main/java/com/xjeffrose/xio/mux/Connector.java:44: // TODO(CK): get this from the constructor? +** src/main/java/com/xjeffrose/xio/mux/Connector.java:71: // TODO(CK): move all of these constants out into Config +** src/main/java/com/xjeffrose/xio/pipeline/XioBasePipeline.java:56: // TODO(CK): pull globalConnectionLimiter from state +** src/main/java/com/xjeffrose/xio/pipeline/XioBasePipeline.java:58: "globalConnectionLimiter", globalConnectionLimiter); // TODO(JR): Need to make this config +** src/main/java/com/xjeffrose/xio/pipeline/XioBasePipeline.java:67: appState.getZkClient(), true)); // TODO(JR): Need to make this config +** src/main/java/com/xjeffrose/xio/pipeline/XioBasePipeline.java:92: appState.getZkClient(), true)); // TODO(JR): Need to make this config +** src/main/java/com/xjeffrose/xio/pipeline/XioBasePipeline.java:96: appState.getZkClient(), true)); // TODO(JR): Need to make this config +** src/main/java/com/xjeffrose/xio/pipeline/XioBasePipeline.java:100: "xioResponseClassifier", new XioResponseClassifier(true)); // / TODO(JR): This is a maybe +** src/main/java/com/xjeffrose/xio/pipeline/XioServerPipeline.java:9:// TODO(CK): merge this with XioBasePipeline +** src/main/java/com/xjeffrose/xio/server/XioFirewall.java:40: // TODO(JR): ZK should populate this in the constructor? +** src/main/java/com/xjeffrose/xio/server/XioFirewall.java:63: // TODO(JR): Throw probably? +** src/main/java/com/xjeffrose/xio/server/XioServerInstrumentation.java:5:// TODO(CK): this can be folded into XioServerState +** src/main/resources/reference.conf:85: // TODO(CK): deprecate +** src/main/resources/reference.conf:87: // TODO(CK): deprecate +** src/test/java/com/xjeffrose/xio/SSL/HeldCertificate.java:111: // TODO(CK): Maybe throw to inform the user that they're doing something silly +** src/test/java/com/xjeffrose/xio/client/RequestMuxerUnitTest.java:65: // TODO(CK): Override connection pool request node instead of connector.connect +** src/test/java/com/xjeffrose/xio/client/RequestMuxerUnitTest.java:110: // TODO(CK): Refactor this into a helper class +** src/test/java/com/xjeffrose/xio/client/RequestMuxerUnitTest.java:119: // TODO(CK): Refactor this into a helper class +** src/test/java/com/xjeffrose/xio/client/RequestMuxerUnitTest.java:140: // TODO(CK): Refactor this into a functional test +** src/test/java/com/xjeffrose/xio/filter/IpFilterUnitTest.java:36: // TODO(CK): This is a bit kludgy, basically we create a new logger for every test +** src/test/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapperUnitTest.java:124: // TODO(CK): netty bug? we can't call getTimeMillis() here +** src/test/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapperUnitTest.java:516: // TODO(CK): netty bug? we can't call addTimeMillis() here +** src/test/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapperUnitTest.java:523: // TODO(CK): netty bug? we can't call addTimeMillis() here +** src/test/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapperUnitTest.java:531: // TODO(CK): netty bug? we can't call addTimeMillis() here +** src/test/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapperUnitTest.java:539: // TODO(CK): netty bug? we can't call addTimeMillis() here +** src/test/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapperUnitTest.java:727: // TODO(CK): netty bug? we can't call getTimeMillis() here +** src/test/java/com/xjeffrose/xio/pipeline/XioSslHttp1_1PipelineFunctionalTest.java:93: // TODO(CK): This is actually an integration test and a flaky one at that +** src/test/java/com/xjeffrose/xio/server/XioServerFunctionalTest.java:86: //TODO(JR): Figure out why \n seems to get chopped off +** src/test/java/com/xjeffrose/xio/tracing/HttpClientTracingHandlerIntegrationTest.java:7:// TODO(CK): These brave integration tests are flaky and stall out sometimes +:END: + * re-file ** todo.md @@ -126,3 +268,10 @@ public class ClassUnderTest { *** Persistent Connection Manager - startup configurable **** TODO Load from Typesafe Config + +* Local Variables + +# Local Variables: +# eval: (org-babel-do-load-languages 'org-babel-load-languages '((shell . t))) +# org-confirm-babel-evaluate: nil +# End: diff --git a/pom.xml b/pom.xml index 79011ef1..a2c7e13d 100644 --- a/pom.xml +++ b/pom.xml @@ -108,6 +108,14 @@ 1.7.21 + + + org.slf4j + jul-to-slf4j + 1.7.25 + test + + ch.qos.logback logback-classic @@ -324,6 +332,42 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.5.1 + + + -XDignore.symbol.file + -Xlint:all + -Xlint:-deprecation + -Xlint:-unchecked + -Xlint:-rawtypes + -Xlint:-serial + + javac-with-errorprone + true + true + + 8 + 8 + + + + org.codehaus.plexus + plexus-compiler-javac-errorprone + 2.8 + + + + com.google.errorprone + error_prone_core + 2.1.1 + + + org.xolstice.maven.plugins protobuf-maven-plugin @@ -350,7 +394,15 @@ %1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS %4$s %5$s%6$s%n + -Xbootclasspath/p:${bootclasspathPrefix} -Xms512m -Xmx512m + + + org.mortbay.jetty.alpn + alpn-boot + ${alpn.jdk8.version} + + + + + + alpn-when-jdk8_05 + + 1.8.0_05 + + + 8.1.0.v20141016 + + + + alpn-when-jdk8_11 + + 1.8.0_11 + + + 8.1.0.v20141016 + + + + alpn-when-jdk8_20 + + 1.8.0_20 + + + 8.1.0.v20141016 + + + + alpn-when-jdk8_25 + + 1.8.0_25 + + + 8.1.2.v20141202 + + + + alpn-when-jdk8_31 + + 1.8.0_31 + + + 8.1.3.v20150130 + + + + alpn-when-jdk8_40 + + 1.8.0_40 + + + 8.1.3.v20150130 + + + + alpn-when-jdk8_45 + + 1.8.0_45 + + + 8.1.3.v20150130 + + + + alpn-when-jdk8_51 + + 1.8.0_51 + + + 8.1.4.v20150727 + + + + alpn-when-jdk8_60 + + 1.8.0_60 + + + 8.1.5.v20150921 + + + + alpn-when-jdk8_65 + + 1.8.0_65 + + + 8.1.6.v20151105 + + + + alpn-when-jdk8_66 + + 1.8.0_66 + + + 8.1.6.v20151105 + + + + alpn-when-jdk8_71 + + 1.8.0_71 + + + 8.1.7.v20160121 + + + + alpn-when-jdk8_72 + + 1.8.0_72 + + + 8.1.7.v20160121 + + + + alpn-when-jdk8_73 + + 1.8.0_73 + + + 8.1.7.v20160121 + + + + alpn-when-jdk8_74 + + 1.8.0_74 + + + 8.1.7.v20160121 + + + + alpn-when-jdk8_77 + + 1.8.0_77 + + + 8.1.7.v20160121 + + + + alpn-when-jdk8_91 + + 1.8.0_91 + + + 8.1.7.v20160121 + + + + alpn-when-jdk8_92 + + 1.8.0_92 + + + 8.1.8.v20160420 + + + + alpn-when-jdk8_101 + + 1.8.0_101 + + + 8.1.8.v20160420 + + + + alpn-when-jdk8_102 + + 1.8.0_102 + + + 8.1.9.v20160720 + + + + alpn-when-jdk8_111 + + 1.8.0_111 + + + 8.1.9.v20160720 + + + + alpn-when-jdk8_112 + + 1.8.0_112 + + + 8.1.9.v20160720 + + + + alpn-when-jdk8_121 + + 1.8.0_121 + + + 8.1.11.v20170118 + + + + alpn-when-jdk8_131 + + 1.8.0_131 + + + 8.1.11.v20170118 + + + + alpn-when-jdk8_141 + + 1.8.0_141 + + + 8.1.11.v20170118 + + + + alpn-when-jdk8_144 + + 1.8.0_144 + + + 8.1.11.v20170118 + + + + alpn-when-jdk8_151 + + 1.8.0_151 + + + 8.1.11.v20170118 + + + + alpn-when-jdk8_152 + + 1.8.0_152 + + + 8.1.11.v20170118 + + diff --git a/src/main/java/com/xjeffrose/xio/application/Application.java b/src/main/java/com/xjeffrose/xio/application/Application.java index 1dcd3e14..74f51a2c 100644 --- a/src/main/java/com/xjeffrose/xio/application/Application.java +++ b/src/main/java/com/xjeffrose/xio/application/Application.java @@ -10,12 +10,15 @@ @Slf4j public class Application implements AutoCloseable { + // TOOD(CK): move this into ApplicationState @Getter private final ApplicationConfig config; + // TODO(CK): move this into ApplicationState private final Map servers; @Getter private final ApplicationState state; + // TODO(CK): move this into ApplicationState private final Configurator configurator; public Application( diff --git a/src/main/java/com/xjeffrose/xio/application/ApplicationConfig.java b/src/main/java/com/xjeffrose/xio/application/ApplicationConfig.java index 029a5b84..4ee87d42 100644 --- a/src/main/java/com/xjeffrose/xio/application/ApplicationConfig.java +++ b/src/main/java/com/xjeffrose/xio/application/ApplicationConfig.java @@ -12,8 +12,7 @@ @Slf4j public class ApplicationConfig { - private final Config config; - + @Getter private final Config config; @Getter private final String name; @Getter private final int bossThreads; @Getter private final String bossNameFormat; diff --git a/src/main/java/com/xjeffrose/xio/http/Client.java b/src/main/java/com/xjeffrose/xio/http/Client.java new file mode 100644 index 00000000..ec179c45 --- /dev/null +++ b/src/main/java/com/xjeffrose/xio/http/Client.java @@ -0,0 +1,104 @@ +package com.xjeffrose.xio.http; + +import com.xjeffrose.xio.SSL.SslContextFactory; +import com.xjeffrose.xio.client.ChannelConfiguration; +import com.xjeffrose.xio.client.ClientConfig; +import com.xjeffrose.xio.http.Http1ClientCodec; +import com.xjeffrose.xio.http.PipelineRequestHandler; +import com.xjeffrose.xio.http.RawBackendHandler; +import com.xjeffrose.xio.http.Request; +import com.xjeffrose.xio.http.Route; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.ssl.SslContext; +import java.net.InetSocketAddress; +import java.util.ArrayDeque; +import java.util.Queue; +import io.netty.channel.ChannelHandler; +import com.xjeffrose.xio.core.XioMessageLogger; +import com.xjeffrose.xio.core.XioIdleDisconnectHandler; +import java.util.function.Supplier; +import java.util.concurrent.Future; +import io.netty.channel.ChannelFutureListener; + +public class Client { + + private final ClientState state; + private final Supplier appHandler; + Channel channel; + + public Client(ClientState state, Supplier appHandler) { + this.state = state; + this.appHandler = appHandler; + } + + private ChannelHandler buildHttp2Handler() { + return new Http2HandlerBuilder().server(false).build(); + } + + private ChannelFuture connect() { + Bootstrap b = new Bootstrap(); + b.channel(state.channelConfig.channel()); + b.group(state.channelConfig.workerGroup()); + b.handler( + new ChannelInitializer() { + public void initChannel(Channel channel) { + if (state.sslContext != null) { + channel + .pipeline() + .addLast( + "ssl handler", + state.sslContext.newHandler( + channel.alloc(), state.remote.getHostString(), state.remote.getPort())); + } + channel + .pipeline() + .addLast( + "negotiation handler", + new HttpClientNegotiationHandler(Client.this::buildHttp2Handler)) + .addLast("codec", CodecPlaceholderHandler.INSTANCE) + .addLast("application codec", ApplicationCodecPlaceholderHandler.INSTANCE) + .addLast("idle handler", new XioIdleDisconnectHandler(60, 60, 60)) + .addLast("message logging", new XioMessageLogger(Client.class, "objects")) + .addLast("request buffer", new RequestBuffer()) + .addLast("app handler", appHandler.get()); + // .addLast(new RawBackendHandler(ctx)); + } + }); + + return b.connect(state.remote); + } + + private void connected(ChannelFuture f) { + if (f.isDone() && f.isSuccess()) { + // log? + } else { + // log? + throw new RuntimeException(f.cause()); + } + } + + public ChannelFuture write(Request request) { + if (channel == null) { + ChannelFuture future = connect(); + ChannelFutureListener l = + f -> { + if (f.isDone() && f.isSuccess()) { + // log? + } else { + // log? + // fail the write future? + throw new RuntimeException(f.cause()); + } + }; + channel = future.channel(); + // connect().addListener(f -> connected((ChannelFuture) f)); + connect().addListener(l); + } + return channel.writeAndFlush(request); + } +} diff --git a/src/main/java/com/xjeffrose/xio/http/ClientState.java b/src/main/java/com/xjeffrose/xio/http/ClientState.java new file mode 100644 index 00000000..c7a21c37 --- /dev/null +++ b/src/main/java/com/xjeffrose/xio/http/ClientState.java @@ -0,0 +1,48 @@ +package com.xjeffrose.xio.http; + +import io.netty.channel.ChannelHandler; +import io.netty.handler.ssl.SslContext; +import java.net.InetSocketAddress; +import java.util.function.Supplier; +import com.xjeffrose.xio.client.ClientConfig; +import com.xjeffrose.xio.bootstrap.ClientChannelConfiguration; +import com.xjeffrose.xio.SSL.SslContextFactory; + +public class ClientState { + + public final ClientChannelConfiguration channelConfig; + public final ClientConfig config; + public final InetSocketAddress remote; + public final SslContext sslContext; + public final Supplier tracingHandler; + + private static SslContext sslContext(boolean enableTls, ClientConfig clientConfig) { + if (enableTls) { + return SslContextFactory.buildClientContext(clientConfig.getTls()); + } else { + return null; + } + } + + public ClientState( + ClientChannelConfiguration channelConfig, + ClientConfig config, + InetSocketAddress remote, + SslContext sslContext, + Supplier tracingHandler) { + this.channelConfig = channelConfig; + this.config = config; + this.remote = remote; + this.sslContext = sslContext; + this.tracingHandler = tracingHandler; + } + + public ClientState( + ClientChannelConfiguration channelConfig, + ClientConfig config, + InetSocketAddress remote, + boolean enableTls, + Supplier tracingHandler) { + this(channelConfig, config, remote, sslContext(enableTls, config), tracingHandler); + } +} diff --git a/src/main/java/com/xjeffrose/xio/http/DefaultFullRequest.java b/src/main/java/com/xjeffrose/xio/http/DefaultFullRequest.java index a8a51cc8..ec9ab90b 100644 --- a/src/main/java/com/xjeffrose/xio/http/DefaultFullRequest.java +++ b/src/main/java/com/xjeffrose/xio/http/DefaultFullRequest.java @@ -15,6 +15,11 @@ @ToString public abstract class DefaultFullRequest implements FullRequest { + @Override + public boolean startOfStream() { + return true; + } + public abstract ByteBuf body(); public abstract HttpMethod method(); @@ -23,6 +28,8 @@ public abstract class DefaultFullRequest implements FullRequest { public abstract Headers headers(); + public abstract int streamId(); + /** Not intended to be called. */ @Override public String version() { @@ -44,6 +51,8 @@ public abstract static class Builder { public abstract Builder headers(Headers headers); + public abstract Builder streamId(int streamId); + public abstract DefaultFullRequest build(); abstract Optional headers(); @@ -59,6 +68,6 @@ public Builder host(String host) { } static Builder builder() { - return new AutoValue_DefaultFullRequest.Builder(); + return new AutoValue_DefaultFullRequest.Builder().streamId(-1); } } diff --git a/src/main/java/com/xjeffrose/xio/http/DefaultStreamingRequest.java b/src/main/java/com/xjeffrose/xio/http/DefaultStreamingRequest.java index 1c89eaba..7ea80980 100644 --- a/src/main/java/com/xjeffrose/xio/http/DefaultStreamingRequest.java +++ b/src/main/java/com/xjeffrose/xio/http/DefaultStreamingRequest.java @@ -14,12 +14,19 @@ @ToString public abstract class DefaultStreamingRequest implements StreamingRequest { + @Override + public boolean startOfStream() { + return true; + } + public abstract HttpMethod method(); public abstract String path(); public abstract Headers headers(); + public abstract int streamId(); + /** Not intended to be called. */ @Override public String version() { @@ -39,6 +46,8 @@ public abstract static class Builder { public abstract Builder headers(Headers headers); + public abstract Builder streamId(int streamId); + public abstract DefaultStreamingRequest build(); abstract Optional headers(); @@ -54,6 +63,6 @@ public Builder host(String host) { } static Builder builder() { - return new AutoValue_DefaultStreamingRequest.Builder(); + return new AutoValue_DefaultStreamingRequest.Builder().streamId(-1); } } diff --git a/src/main/java/com/xjeffrose/xio/http/Headers.java b/src/main/java/com/xjeffrose/xio/http/Headers.java index d809bbaa..b094d077 100644 --- a/src/main/java/com/xjeffrose/xio/http/Headers.java +++ b/src/main/java/com/xjeffrose/xio/http/Headers.java @@ -4,17 +4,27 @@ import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpHeaders; import java.util.Map.Entry; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.Http2Headers; @UnstableApi public interface Headers extends io.netty.handler.codec.Headers, Iterable> { - default HttpHeaders http1Headers() { + default HttpHeaders http1Headers(boolean isTrailer, boolean isRequest) { HttpHeaders result = new DefaultHttpHeaders(); for (Entry entry : this) { result.add(entry.getKey(), entry.getValue()); } return result; } + + default Http2Headers http2Headers() { + Http2Headers result = new DefaultHttp2Headers(); + for (Entry entry : this) { + result.add(entry.getKey(), entry.getValue()); + } + return result; + } } diff --git a/src/main/java/com/xjeffrose/xio/http/Http1ClientCodec.java b/src/main/java/com/xjeffrose/xio/http/Http1ClientCodec.java index 5f372490..0e92b965 100644 --- a/src/main/java/com/xjeffrose/xio/http/Http1ClientCodec.java +++ b/src/main/java/com/xjeffrose/xio/http/Http1ClientCodec.java @@ -101,12 +101,15 @@ HttpRequest buildRequest(ChannelHandlerContext ctx, Request request) { full.method(), full.path(), content, - full.headers().http1Headers(), + full.headers().http1Headers(false, true), EmptyHttpHeaders.INSTANCE); } else { // TODO(CK): TransferEncoding return new DefaultHttpRequest( - HttpVersion.HTTP_1_1, request.method(), request.path(), request.headers().http1Headers()); + HttpVersion.HTTP_1_1, + request.method(), + request.path(), + request.headers().http1Headers(false, true)); } } @@ -114,7 +117,7 @@ HttpContent buildContent(ChannelHandlerContext ctx, StreamingData data) { if (data.endOfStream()) { LastHttpContent last = new DefaultLastHttpContent(data.content()); if (data.trailingHeaders() != null) { - last.trailingHeaders().add(data.trailingHeaders().http1Headers()); + last.trailingHeaders().add(data.trailingHeaders().http1Headers(true, true)); } // setChannelRequest(ctx, null); return last; diff --git a/src/main/java/com/xjeffrose/xio/http/Http1ServerCodec.java b/src/main/java/com/xjeffrose/xio/http/Http1ServerCodec.java index b1a3114b..e7714baa 100644 --- a/src/main/java/com/xjeffrose/xio/http/Http1ServerCodec.java +++ b/src/main/java/com/xjeffrose/xio/http/Http1ServerCodec.java @@ -96,7 +96,7 @@ HttpResponse buildResponse(ChannelHandlerContext ctx, Response response) { HttpVersion.HTTP_1_1, full.status(), content, - full.headers().http1Headers(), + full.headers().http1Headers(false, false), EmptyHttpHeaders.INSTANCE); } else { // TODO(CK): TransferEncoding @@ -106,7 +106,7 @@ HttpResponse buildResponse(ChannelHandlerContext ctx, Response response) { } return new DefaultHttpResponse( - HttpVersion.HTTP_1_1, response.status(), response.headers().http1Headers()); + HttpVersion.HTTP_1_1, response.status(), response.headers().http1Headers(false, false)); } } @@ -114,7 +114,7 @@ HttpContent buildContent(ChannelHandlerContext ctx, StreamingData data) { if (data.endOfStream()) { LastHttpContent last = new DefaultLastHttpContent(data.content()); if (data.trailingHeaders() != null) { - last.trailingHeaders().add(data.trailingHeaders().http1Headers()); + last.trailingHeaders().add(data.trailingHeaders().http1Headers(true, false)); } setChannelRequest(ctx, null); return last; diff --git a/src/main/java/com/xjeffrose/xio/http/Http2ClientCodec.java b/src/main/java/com/xjeffrose/xio/http/Http2ClientCodec.java index e60782aa..36e95db7 100644 --- a/src/main/java/com/xjeffrose/xio/http/Http2ClientCodec.java +++ b/src/main/java/com/xjeffrose/xio/http/Http2ClientCodec.java @@ -80,8 +80,7 @@ void writeRequest(ChannelHandlerContext ctx, Request request, ChannelPromise pro } */ - DefaultHttp2Headers headers = new DefaultHttp2Headers(); - headers.add(request.headers()); + Http2Headers headers = request.headers().http2Headers(); headers.authority(request.host()).method(request.method().asciiName()).path(request.path()); @@ -91,7 +90,7 @@ void writeRequest(ChannelHandlerContext ctx, Request request, ChannelPromise pro if (request.body().readableBytes() > 0) { PromiseCombiner combiner = new PromiseCombiner(); combiner.add(ctx.write(Http2Request.build(streamId, headers, false), ctx.newPromise())); - Http2DataFrame data = new DefaultHttp2DataFrame(request.body(), true, streamId); + Http2DataFrame data = new DefaultHttp2DataFrame(request.body(), true); combiner.add(ctx.write(Http2Request.build(streamId, data, true), ctx.newPromise())); combiner.finish(promise); } else { @@ -106,12 +105,10 @@ void writeContent(ChannelHandlerContext ctx, StreamingData data, ChannelPromise int streamId = 0; // TODO(CK): need a no stream constant somewhere boolean dataEos = data.endOfStream() && data.trailingHeaders().size() == 0; Http2Request request = - Http2Request.build( - streamId, new DefaultHttp2DataFrame(data.content(), dataEos, streamId), dataEos); + Http2Request.build(streamId, new DefaultHttp2DataFrame(data.content(), dataEos), dataEos); if (data.trailingHeaders().size() != 0) { - Http2Headers headers = new DefaultHttp2Headers(); - headers.add(data.trailingHeaders()); + Http2Headers headers = data.trailingHeaders().http2Headers(); Http2Request last = Http2Request.build(streamId, headers, true); PromiseCombiner combiner = new PromiseCombiner(); combiner.add(ctx.write(request, ctx.newPromise())); diff --git a/src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java b/src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java index 1ea862ca..618acd7e 100644 --- a/src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java +++ b/src/main/java/com/xjeffrose/xio/http/Http2FrameForwarder.java @@ -9,6 +9,8 @@ import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; +// TODO(CK): break this out into client/server classes + /** Forwards the frame with stream id to the next handler in the pipeline */ public class Http2FrameForwarder implements Http2FrameListener { @@ -96,7 +98,10 @@ public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { @Override public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { - // TODO(CK): We don't currently have a use case for these frames + // h2 clients need to know that server settings have been received before they can write + if (!isServer) { + ctx.fireUserEventTriggered(RequestBuffer.WriteReady.INSTANCE); + } } @Override diff --git a/src/main/java/com/xjeffrose/xio/http/Http2Handler.java b/src/main/java/com/xjeffrose/xio/http/Http2Handler.java index 3fb91483..728b60d3 100644 --- a/src/main/java/com/xjeffrose/xio/http/Http2Handler.java +++ b/src/main/java/com/xjeffrose/xio/http/Http2Handler.java @@ -12,6 +12,7 @@ import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; +// TODO(CK): break this out into client/server classes // TODO(CK): Rename this to Http2ServerHandler @Slf4j public class Http2Handler extends Http2ConnectionHandler { @@ -70,6 +71,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) // not an h2 frame, forward the write if (!(msg instanceof Http2DataFrame || msg instanceof Http2Headers + || msg instanceof Http2Request || msg instanceof Http2Response)) { ctx.write(msg, promise); return; @@ -92,6 +94,25 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } // TODO(CK): This should be broken out into Http2ClientHandler + + if (msg instanceof Http2Request) { + Http2Request request = (Http2Request) msg; + + if (request.payload instanceof Http2Headers) { + Http2Headers headers = (Http2Headers) request.payload; + currentStreamId = connection().local().incrementAndGetNextStreamId(); + setCurrentStreamId(ctx, currentStreamId); + writeHeaders(ctx, headers, false, promise); + return; + } + + if (request.payload instanceof Http2DataFrame) { + Http2DataFrame data = (Http2DataFrame) request.payload; + writeData(ctx, data, promise); + return; + } + } + if (msg instanceof Http2Headers) { Http2Headers headers = (Http2Headers) msg; currentStreamId = connection().local().incrementAndGetNextStreamId(); diff --git a/src/main/java/com/xjeffrose/xio/http/Http2HandlerBuilder.java b/src/main/java/com/xjeffrose/xio/http/Http2HandlerBuilder.java index 1896a262..976f5665 100644 --- a/src/main/java/com/xjeffrose/xio/http/Http2HandlerBuilder.java +++ b/src/main/java/com/xjeffrose/xio/http/Http2HandlerBuilder.java @@ -11,6 +11,7 @@ import java.util.function.Function; import lombok.extern.slf4j.Slf4j; +// TODO(CK): break this out into client/server classes @Slf4j public class Http2HandlerBuilder extends AbstractHttp2ConnectionHandlerBuilder { diff --git a/src/main/java/com/xjeffrose/xio/http/Http2ServerCodec.java b/src/main/java/com/xjeffrose/xio/http/Http2ServerCodec.java index b0b78e42..610ce9c9 100644 --- a/src/main/java/com/xjeffrose/xio/http/Http2ServerCodec.java +++ b/src/main/java/com/xjeffrose/xio/http/Http2ServerCodec.java @@ -77,8 +77,7 @@ void writeResponse(ChannelHandlerContext ctx, Response response, ChannelPromise Request request = getChannelRequest(ctx); int streamId = request.streamId(); - DefaultHttp2Headers headers = new DefaultHttp2Headers(); - headers.add(response.headers()); + Http2Headers headers = response.headers().http2Headers(); headers.status(response.status().codeAsText()); @@ -87,7 +86,7 @@ void writeResponse(ChannelHandlerContext ctx, Response response, ChannelPromise if (response.body().readableBytes() > 0) { PromiseCombiner combiner = new PromiseCombiner(); combiner.add(ctx.write(Http2Response.build(streamId, headers, false), ctx.newPromise())); - Http2DataFrame data = new DefaultHttp2DataFrame(response.body(), true, streamId); + Http2DataFrame data = new DefaultHttp2DataFrame(response.body(), true); combiner.add(ctx.write(Http2Response.build(streamId, data, true), ctx.newPromise())); combiner.finish(promise); } else { @@ -107,12 +106,10 @@ void writeContent(ChannelHandlerContext ctx, StreamingData data, ChannelPromise boolean dataEos = data.endOfStream() && data.trailingHeaders().size() == 0; Http2Response response = - Http2Response.build( - streamId, new DefaultHttp2DataFrame(data.content(), dataEos, streamId), dataEos); + Http2Response.build(streamId, new DefaultHttp2DataFrame(data.content(), dataEos), dataEos); if (data.trailingHeaders().size() != 0) { - Http2Headers headers = new DefaultHttp2Headers(); - headers.add(data.trailingHeaders()); + Http2Headers headers = data.trailingHeaders().http2Headers(); Http2Response last = Http2Response.build(streamId, headers, true); PromiseCombiner combiner = new PromiseCombiner(); combiner.add(ctx.write(response, ctx.newPromise())); diff --git a/src/main/java/com/xjeffrose/xio/http/HttpClientNegotiationHandler.java b/src/main/java/com/xjeffrose/xio/http/HttpClientNegotiationHandler.java new file mode 100644 index 00000000..e8f6b579 --- /dev/null +++ b/src/main/java/com/xjeffrose/xio/http/HttpClientNegotiationHandler.java @@ -0,0 +1,46 @@ +package com.xjeffrose.xio.http; + +import static io.netty.handler.codec.http.HttpResponseStatus.*; +import static io.netty.handler.codec.http.HttpVersion.*; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import io.netty.handler.codec.http.HttpClientCodec; + +@Slf4j +public class HttpClientNegotiationHandler extends ApplicationProtocolNegotiationHandler { + + private final Supplier http2Handler; + + public HttpClientNegotiationHandler(Supplier http2Handler) { + super(ApplicationProtocolNames.HTTP_1_1); + this.http2Handler = http2Handler; + } + + private void replaceCodec(ChannelHandlerContext ctx, ChannelHandler handler) { + ctx.pipeline().replace(CodecPlaceholderHandler.class, "codec", handler); + } + + private void replaceApplicationCodec(ChannelHandlerContext ctx, ChannelHandler handler) { + ctx.pipeline().replace(ApplicationCodecPlaceholderHandler.class, "application codec", handler); + } + + @Override + protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception { + if (protocol.equals(ApplicationProtocolNames.HTTP_1_1)) { + replaceCodec(ctx, new HttpClientCodec()); + replaceApplicationCodec(ctx, new Http1ClientCodec()); + ctx.fireUserEventTriggered(RequestBuffer.WriteReady.INSTANCE); + } else if (protocol.equals(ApplicationProtocolNames.HTTP_2)) { + replaceCodec(ctx, http2Handler.get()); + replaceApplicationCodec(ctx, new Http2ClientCodec()); + } else { + throw new RuntimeException("Unknown Application Protocol '" + protocol + "'"); + } + } +} diff --git a/src/main/java/com/xjeffrose/xio/http/HttpNegotiationHandler.java b/src/main/java/com/xjeffrose/xio/http/HttpNegotiationHandler.java index f1b79945..779be926 100644 --- a/src/main/java/com/xjeffrose/xio/http/HttpNegotiationHandler.java +++ b/src/main/java/com/xjeffrose/xio/http/HttpNegotiationHandler.java @@ -36,7 +36,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr replaceApplicationCodec(ctx, new Http1ServerCodec()); } else if (protocol.equals(ApplicationProtocolNames.HTTP_2)) { replaceCodec(ctx, http2Handler.get()); - // TODO(CK): replaceApplicationCodec(ctx, new Http2ServerCodec()); + replaceApplicationCodec(ctx, new Http2ServerCodec()); } else { throw new RuntimeException("Unknown Application Protocol '" + protocol + "'"); } diff --git a/src/main/java/com/xjeffrose/xio/http/ProxyBackendHandler.java b/src/main/java/com/xjeffrose/xio/http/ProxyBackendHandler.java new file mode 100644 index 00000000..6a1c7547 --- /dev/null +++ b/src/main/java/com/xjeffrose/xio/http/ProxyBackendHandler.java @@ -0,0 +1,70 @@ +package com.xjeffrose.xio.http; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelFuture; + +@Slf4j +public class ProxyBackendHandler extends ChannelInboundHandlerAdapter { + + private final ChannelHandlerContext frontend; + private boolean needFlush = false; + + public ProxyBackendHandler(ChannelHandlerContext frontend) { + this.frontend = frontend; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + if (ctx.pipeline().get(Http2Handler.class) != null) { + log.debug("handlerAdded: adding Http2StreamMapper"); + // we are an http2 pipeline + ctx.pipeline().addBefore("application codec", "stream mapper", new Http2StreamMapper()); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.debug("RawBackendHandler[{}] channelRead: {}", this, msg); + frontend + .write(msg) + .addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) { + if (f.cause() != null) { + // TODO(CK): move this into a logger class + log.error("Write Error!", f.cause()); + } + } + }); + needFlush = true; + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + log.debug("RawBackendHandler[{}] channelReadComplete", this); + if (needFlush) { + frontend.flush(); + needFlush = false; + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.debug("RawBackendHandler[{}] channelInactive", this); + // TODO(CK): this should really be some sort of notification to the frontend + // that the backend closed. Keepalive/h2 will require the connection to stay open, we + // shouldn't be closing it. + frontend.close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.debug("RawBackendHandler[{}] exceptionCaught: {}", this, cause); + ctx.close(); + } +} diff --git a/src/main/java/com/xjeffrose/xio/http/ProxyHandler.java b/src/main/java/com/xjeffrose/xio/http/ProxyHandler.java new file mode 100644 index 00000000..1c84a0f4 --- /dev/null +++ b/src/main/java/com/xjeffrose/xio/http/ProxyHandler.java @@ -0,0 +1,89 @@ +package com.xjeffrose.xio.http; + +import com.xjeffrose.xio.bootstrap.ChannelConfiguration; +import com.xjeffrose.xio.client.ClientConfig; +import com.xjeffrose.xio.http.Http1ClientCodec; +import com.xjeffrose.xio.http.PipelineRequestHandler; +import com.xjeffrose.xio.http.RawBackendHandler; +import com.xjeffrose.xio.http.Request; +import com.xjeffrose.xio.http.Route; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.ssl.SslContext; +import java.net.InetSocketAddress; +import java.util.ArrayDeque; +import java.util.Queue; +import io.netty.channel.ChannelHandler; +import com.xjeffrose.xio.core.XioMessageLogger; +import com.xjeffrose.xio.bootstrap.ClientChannelConfiguration; +import io.netty.util.AttributeKey; + +public class ProxyHandler implements PipelineRequestHandler { + + private static final AttributeKey CLIENT_KEY = + AttributeKey.newInstance("xio_proxy_client_key"); + + private static void setClient(ChannelHandlerContext ctx, Client client) { + ctx.channel().attr(CLIENT_KEY).set(client); + } + + private Client getClient(ChannelHandlerContext ctx) { + Client client = ctx.channel().attr(CLIENT_KEY).get(); + if (client == null) { + ClientState state = + new ClientState(channelConfig(ctx), clientConfig, remote, enableTls, () -> null); + // TODO(CK): this handler should be notifying a connection pool on release + client = new Client(state, () -> new ProxyBackendHandler(ctx)); + setClient(ctx, client); + } + return client; + } + + final ClientConfig clientConfig; + final InetSocketAddress remote; + final boolean enableTls; + + private ClientChannelConfiguration channelConfig(ChannelHandlerContext ctx) { + return ChannelConfiguration.clientConfig(ctx.channel().eventLoop()); + } + + // TODO(CK): this should really be requesting a client from a pool + public ProxyHandler(ClientConfig clientConfig, ProxyConfig config) { + this.clientConfig = clientConfig; + this.remote = config.address; + this.enableTls = config.needSSL; + } + + public void handle(ChannelHandlerContext ctx, Request request, Route route) { + + /* + Optional path = + route + .groups(request.path) + .entrySet() + .stream() + .filter(e -> e.getKey().equals("path")) + .map(e -> e.getValue()) + .findFirst(); + + payload.setUri(path.map(config.urlPath::concat).orElse(config.urlPath)); + + payload.headers().set("Host", config.hostHeader); + + XioRequest request = + HttpTracingState.hasSpan(ctx) + ? new XioRequest(payload, HttpTracingState.getSpan(ctx).context()) + : new XioRequest(payload, null); + */ + + // 1) map the incoming request path to the outgoing request path + // 2) set the outgoing request host + // 3) set the tracing span (if there is one) + + getClient(ctx).write(request); + } +} diff --git a/src/main/java/com/xjeffrose/xio/http/Request.java b/src/main/java/com/xjeffrose/xio/http/Request.java index 9986a354..a9c8b0f5 100644 --- a/src/main/java/com/xjeffrose/xio/http/Request.java +++ b/src/main/java/com/xjeffrose/xio/http/Request.java @@ -9,6 +9,11 @@ @UnstableApi public interface Request { + boolean startOfStream(); + + // TODO(CK): move this here from StreamingData? + // boolean endOfStream(); + HttpMethod method(); String path(); @@ -29,9 +34,7 @@ default String host(String defaultValue) { return result; } - default int streamId() { - return 0; - } + int streamId(); default boolean hasBody() { return false; diff --git a/src/main/java/com/xjeffrose/xio/http/RequestBuffer.java b/src/main/java/com/xjeffrose/xio/http/RequestBuffer.java new file mode 100644 index 00000000..81cdb3d1 --- /dev/null +++ b/src/main/java/com/xjeffrose/xio/http/RequestBuffer.java @@ -0,0 +1,92 @@ +package com.xjeffrose.xio.http; + +import com.xjeffrose.xio.client.ClientState; +import com.xjeffrose.xio.client.DefaultChannelInitializer; +import com.xjeffrose.xio.client.XioClient; +import com.xjeffrose.xio.client.XioClientBootstrap; +import com.xjeffrose.xio.server.Route; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import java.util.function.BiFunction; + +@Slf4j +public class RequestBuffer extends ChannelDuplexHandler { + + public static class WriteReady { + public static final WriteReady INSTANCE = new WriteReady(); + } + + public static class ObjectAndPromise { + public final Object object; + public final ChannelPromise promise; + + ObjectAndPromise(Object object, ChannelPromise promise) { + this.object = object; + this.promise = promise; + } + + public ChannelFuture apply(BiFunction write) { + return write.apply(object, promise); + } + } + + List writeBuffer = new ArrayList<>(); + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (!(evt instanceof WriteReady)) { + ctx.fireUserEventTriggered(evt); + return; + } + + // drain buffer + while (writeBuffer.size() > 0) { + ObjectAndPromise pair = writeBuffer.remove(0); + + /* + if (writeBuffer.isEmpty()) { + pair.apply(ctx::writeAndFlush); + } else { + pair.apply(ctx::write); + } + */ + + pair.apply(writeBuffer.isEmpty() ? ctx::writeAndFlush : ctx::write) + .addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) { + if (channelFuture.isSuccess()) { + log.debug("write finished for {}", pair.object); + } else { + log.error("Write error: ", channelFuture.cause()); + } + } + }); + } + // unhook this handler + ctx.pipeline().remove(this); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + writeBuffer.add(new ObjectAndPromise(msg, promise)); + } + + @Override + public void flush(ChannelHandlerContext ctx) { + // no need to buffer the calls to flush, just ignore them + } +} diff --git a/src/main/java/com/xjeffrose/xio/http/StreamingRequestData.java b/src/main/java/com/xjeffrose/xio/http/StreamingRequestData.java index 2cfb8791..0acdec93 100644 --- a/src/main/java/com/xjeffrose/xio/http/StreamingRequestData.java +++ b/src/main/java/com/xjeffrose/xio/http/StreamingRequestData.java @@ -17,6 +17,11 @@ public StreamingRequestData(Request request, StreamingData data) { this.data = data; } + @Override + public boolean startOfStream() { + return false; + } + @Override public HttpMethod method() { return request.method(); @@ -37,6 +42,11 @@ public Headers headers() { return request.headers(); } + @Override + public int streamId() { + return request.streamId(); + } + @Override public boolean keepAlive() { return request.keepAlive(); diff --git a/src/main/java/com/xjeffrose/xio/http/internal/FullHttp1Request.java b/src/main/java/com/xjeffrose/xio/http/internal/FullHttp1Request.java index 0f9f5549..26ac534b 100644 --- a/src/main/java/com/xjeffrose/xio/http/internal/FullHttp1Request.java +++ b/src/main/java/com/xjeffrose/xio/http/internal/FullHttp1Request.java @@ -17,30 +17,47 @@ public FullHttp1Request(FullHttpRequest delegate) { headers = new Http1Headers(delegate.headers()); } + @Override + public boolean startOfStream() { + return true; + } + + @Override public HttpMethod method() { return delegate.method(); } + @Override public String path() { return delegate.uri(); } + @Override public String version() { return delegate.protocolVersion().text(); } + @Override public Headers headers() { return headers; } + @Override + public int streamId() { + return -1; + } + + @Override public boolean keepAlive() { return HttpUtil.isKeepAlive(delegate); } + @Override public boolean hasBody() { return delegate.content() != null && delegate.content().readableBytes() > 0; } + @Override public ByteBuf body() { return delegate.content(); } diff --git a/src/main/java/com/xjeffrose/xio/http/internal/FullHttp2Request.java b/src/main/java/com/xjeffrose/xio/http/internal/FullHttp2Request.java index 13019641..4a1b6a39 100644 --- a/src/main/java/com/xjeffrose/xio/http/internal/FullHttp2Request.java +++ b/src/main/java/com/xjeffrose/xio/http/internal/FullHttp2Request.java @@ -17,6 +17,11 @@ public FullHttp2Request(Http2Headers delegate, int streamId) { this.streamId = streamId; } + @Override + public boolean startOfStream() { + return true; + } + @Override public HttpMethod method() { return HttpMethod.valueOf(delegate.method().toString()); diff --git a/src/main/java/com/xjeffrose/xio/http/internal/FullHttp2Response.java b/src/main/java/com/xjeffrose/xio/http/internal/FullHttp2Response.java index e9e7340c..9fb0e0fb 100644 --- a/src/main/java/com/xjeffrose/xio/http/internal/FullHttp2Response.java +++ b/src/main/java/com/xjeffrose/xio/http/internal/FullHttp2Response.java @@ -6,6 +6,8 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.HttpConversionUtil; +import io.netty.handler.codec.http2.Http2Exception; /** Wrap an incoming Http2 Response, for use in a client. */ public class FullHttp2Response implements FullResponse { @@ -23,9 +25,9 @@ public FullHttp2Response(Http2Headers delegate, int streamId) { @Override public HttpResponseStatus status() { try { - return HttpResponseStatus.valueOf(Integer.parseInt(delegate.status().toString())); - } catch (Exception e) { - return null; + return HttpConversionUtil.parseStatus(delegate.status()); + } catch (Http2Exception e) { + throw new RuntimeException(e); } } diff --git a/src/main/java/com/xjeffrose/xio/http/internal/Http1Headers.java b/src/main/java/com/xjeffrose/xio/http/internal/Http1Headers.java index 929c84fb..adc7bd37 100644 --- a/src/main/java/com/xjeffrose/xio/http/internal/Http1Headers.java +++ b/src/main/java/com/xjeffrose/xio/http/internal/Http1Headers.java @@ -13,6 +13,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.HttpConversionUtil; // TODO(CK): Rename this to Http1HeadersWrapper @@ -584,7 +586,39 @@ public boolean remove(CharSequence name) { } @Override - public HttpHeaders http1Headers() { + public HttpHeaders http1Headers(boolean isTrailer, boolean isRequest) { return delegate; } + + /* + TODO(CK): maybe move this to request/response + public static Http2Headers toHttp2Headers(HttpMessage in, boolean validateHeaders) { + HttpHeaders inHeaders = in.headers(); + final Http2Headers out = new DefaultHttp2Headers(validateHeaders, inHeaders.size()); + if (in instanceof HttpRequest) { + HttpRequest request = (HttpRequest) in; + URI requestTargetUri = URI.create(request.uri()); + out.path(toHttp2Path(requestTargetUri)); + out.method(request.method().asciiName()); + setHttp2Scheme(inHeaders, requestTargetUri, out); + + if (!isOriginForm(requestTargetUri) && !isAsteriskForm(requestTargetUri)) { + // Attempt to take from HOST header before taking from the request-line + String host = inHeaders.getAsString(HttpHeaderNames.HOST); + setHttp2Authority((host == null || host.isEmpty()) ? requestTargetUri.getAuthority() : host, out); + } + } else if (in instanceof HttpResponse) { + HttpResponse response = (HttpResponse) in; + out.status(response.status().codeAsText()); + } + + // Add the HTTP headers which have not been consumed above + toHttp2Headers(inHeaders, out); + return out; + } + */ + @Override + public Http2Headers http2Headers() { + return HttpConversionUtil.toHttp2Headers(delegate, true); + } } diff --git a/src/main/java/com/xjeffrose/xio/http/internal/Http1Request.java b/src/main/java/com/xjeffrose/xio/http/internal/Http1Request.java index c78625df..3c10eb15 100644 --- a/src/main/java/com/xjeffrose/xio/http/internal/Http1Request.java +++ b/src/main/java/com/xjeffrose/xio/http/internal/Http1Request.java @@ -23,26 +23,42 @@ public Http1Request(HttpRequest delegate) { headers = new Http1Headers(delegate.headers()); } + @Override + public boolean startOfStream() { + return true; + } + + @Override public HttpMethod method() { return delegate.method(); } + @Override public String path() { return delegate.uri(); } + @Override public String version() { return delegate.protocolVersion().text(); } + @Override public Headers headers() { return headers; } + @Override + public int streamId() { + return -1; + } + + @Override public boolean keepAlive() { return HttpUtil.isKeepAlive(delegate); } + @Override public ByteBuf body() { return Unpooled.EMPTY_BUFFER; } diff --git a/src/main/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapper.java b/src/main/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapper.java index 064659b2..69bf70a2 100644 --- a/src/main/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapper.java +++ b/src/main/java/com/xjeffrose/xio/http/internal/Http2HeadersWrapper.java @@ -15,6 +15,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http2.HttpConversionUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2Exception; public class Http2HeadersWrapper implements Headers { @@ -578,10 +582,15 @@ public int size() { return delegate.size(); } - // TODO(CK): Remove this in favor of using the iterator constructor? - @Override - public HttpHeaders http1Headers() { - return null; + public HttpHeaders http1Headers(boolean isTrailer, boolean isRequest) { + try { + HttpHeaders headers = new DefaultHttpHeaders(); + HttpConversionUtil.addHttp2ToHttpHeaders( + -1, delegate, headers, HttpVersion.HTTP_1_1, isTrailer, isRequest); + return headers; + } catch (Http2Exception e) { + throw new RuntimeException(e); + } } } diff --git a/src/main/java/com/xjeffrose/xio/http/internal/StreamingHttp2Request.java b/src/main/java/com/xjeffrose/xio/http/internal/StreamingHttp2Request.java index 2856f2a2..02e04e70 100644 --- a/src/main/java/com/xjeffrose/xio/http/internal/StreamingHttp2Request.java +++ b/src/main/java/com/xjeffrose/xio/http/internal/StreamingHttp2Request.java @@ -22,6 +22,11 @@ public StreamingHttp2Request(Http2Headers delegate, int streamId) { this.streamId = streamId; } + @Override + public boolean startOfStream() { + return true; + } + @Override public HttpMethod method() { return HttpMethod.valueOf(delegate.method().toString()); diff --git a/src/main/java/com/xjeffrose/xio/http/internal/StreamingHttp2Response.java b/src/main/java/com/xjeffrose/xio/http/internal/StreamingHttp2Response.java index ae8d639a..12940931 100644 --- a/src/main/java/com/xjeffrose/xio/http/internal/StreamingHttp2Response.java +++ b/src/main/java/com/xjeffrose/xio/http/internal/StreamingHttp2Response.java @@ -7,6 +7,8 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http2.Http2Headers; import lombok.ToString; +import io.netty.handler.codec.http2.HttpConversionUtil; +import io.netty.handler.codec.http2.Http2Exception; /** Wrap an incoming Http2 Response, for use in a client. */ @ToString @@ -24,9 +26,9 @@ public StreamingHttp2Response(Http2Headers delegate, int streamId) { public HttpResponseStatus status() { try { - return HttpResponseStatus.valueOf(Integer.parseInt(delegate.status().toString())); - } catch (Exception e) { - return null; + return HttpConversionUtil.parseStatus(delegate.status()); + } catch (Http2Exception e) { + throw new RuntimeException(e); } } diff --git a/src/test/java/com/xjeffrose/xio/client/http/HttpClientFunctionalTest.java b/src/test/java/com/xjeffrose/xio/client/http/HttpClientFunctionalTest.java index bbeb34a2..ab8aa474 100644 --- a/src/test/java/com/xjeffrose/xio/client/http/HttpClientFunctionalTest.java +++ b/src/test/java/com/xjeffrose/xio/client/http/HttpClientFunctionalTest.java @@ -25,9 +25,16 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.BeforeClass; +import com.xjeffrose.xio.fixtures.JulBridge; public class HttpClientFunctionalTest extends Assert { + @BeforeClass + public static void setupJul() { + JulBridge.initialize(); + } + MockWebServer server; static Logger disableJavaLogging() { diff --git a/src/test/java/com/xjeffrose/xio/fixtures/JulBridge.java b/src/test/java/com/xjeffrose/xio/fixtures/JulBridge.java new file mode 100644 index 00000000..2dcd23ce --- /dev/null +++ b/src/test/java/com/xjeffrose/xio/fixtures/JulBridge.java @@ -0,0 +1,49 @@ +package com.xjeffrose.xio.fixtures; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.mockwebserver.Dispatcher; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import okhttp3.mockwebserver.SocketPolicy; +import com.xjeffrose.xio.fixtures.OkHttpUnsafe; +import com.xjeffrose.xio.application.Application; +import com.xjeffrose.xio.bootstrap.ApplicationBootstrap; +import com.xjeffrose.xio.pipeline.SmartHttpPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.Rule; +import org.junit.rules.TestName; +import com.xjeffrose.xio.application.ApplicationConfig; +import io.netty.channel.ChannelHandler; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.ConfigFactory; +import com.xjeffrose.xio.SSL.TlsConfig; +import com.typesafe.config.Config; +import okhttp3.RequestBody; +import okhttp3.MediaType; +import okhttp3.Protocol; +import java.util.Arrays; +import java.util.List; +import com.xjeffrose.xio.client.ClientConfig; +import org.slf4j.bridge.SLF4JBridgeHandler; +import org.junit.BeforeClass; + +public abstract class JulBridge { + private static int dummy = -1; + + public static void initialize() { + if (dummy == -1) { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + dummy = 0; + } + } +} diff --git a/src/test/java/com/xjeffrose/xio/fixtures/OkHttpUnsafe.java b/src/test/java/com/xjeffrose/xio/fixtures/OkHttpUnsafe.java index 3414e13c..37d91b99 100644 --- a/src/test/java/com/xjeffrose/xio/fixtures/OkHttpUnsafe.java +++ b/src/test/java/com/xjeffrose/xio/fixtures/OkHttpUnsafe.java @@ -16,6 +16,8 @@ import javax.net.ssl.X509TrustManager; import okhttp3.OkHttpClient; import okhttp3.mockwebserver.MockWebServer; +import java.util.Arrays; +import okhttp3.Protocol; public class OkHttpUnsafe { @@ -91,6 +93,7 @@ public boolean verify(String hostname, SSLSession session) { return true; } }) + .protocols(Arrays.asList(Protocol.HTTP_1_1)) .build(); return okHttpClient; diff --git a/src/test/java/com/xjeffrose/xio/http/GrpcFunctionalTest.java b/src/test/java/com/xjeffrose/xio/http/GrpcFunctionalTest.java index 141e88e3..5d6eb005 100644 --- a/src/test/java/com/xjeffrose/xio/http/GrpcFunctionalTest.java +++ b/src/test/java/com/xjeffrose/xio/http/GrpcFunctionalTest.java @@ -4,6 +4,7 @@ import com.xjeffrose.xio.SSL.SslContextFactory; import com.xjeffrose.xio.SSL.TlsConfig; import com.xjeffrose.xio.bootstrap.XioServerBootstrap; +import com.xjeffrose.xio.client.ClientConfig; import com.xjeffrose.xio.pipeline.SmartHttpPipeline; import com.xjeffrose.xio.pipeline.XioChannelHandlerFactory; import com.xjeffrose.xio.server.XioServer; @@ -43,7 +44,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.Rule; +import org.junit.rules.TestName; +import com.google.common.collect.ImmutableMap; +import io.netty.channel.ChannelHandler; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class GrpcFunctionalTest extends Assert { public static class HelloWorldClient { @@ -144,9 +151,12 @@ public void sayHello(HelloRequest req, StreamObserver responseObserv private EventLoopGroup group; + @Rule public TestName testName = new TestName(); + @Before public void setUp() { group = new NioEventLoopGroup(2); + log.debug("Test: " + testName.getMethodName()); } @After @@ -183,23 +193,33 @@ public void testFakeGrpcServer() throws Exception { Unpooled.copiedBuffer(ByteBufUtil.decodeHexDump("000000000d0a0b48656c6c6f20776f726c64")); final Http2DataFrame cannedData = new DefaultHttp2DataFrame(buf.retain(), false); - XioChannelHandlerFactory f = - () -> - new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof Http2Request) { - Http2Request request = (Http2Request) msg; - if (request.payload instanceof Http2DataFrame) { - ctx.write(Http2Response.build(request.streamId, cannedHeaders)); - ctx.write(Http2Response.build(request.streamId, cannedData, false)); - ctx.write(Http2Response.build(request.streamId, cannedTrailers, true)); - } - } - } - }; XioServerBootstrap bootstrap = - XioServerBootstrap.fromConfig("xio.testGrpcServer").addToPipeline(new SmartHttpPipeline(f)); + XioServerBootstrap.fromConfig("xio.testGrpcServer") + .addToPipeline( + new SmartHttpPipeline() { + @Override + public ChannelHandler getApplicationRouter() { + return new PipelineRouter( + ImmutableMap.of(), + new PipelineRequestHandler() { + @Override + public void handle( + ChannelHandlerContext ctx, Request request, Route route) { + if (request instanceof StreamingRequestData) { + StreamingRequestData streaming = (StreamingRequestData) request; + + if (streaming.endOfStream()) { + ctx.write(Http2Response.build(request.streamId(), cannedHeaders)); + ctx.write( + Http2Response.build(request.streamId(), cannedData, false)); + ctx.write( + Http2Response.build(request.streamId(), cannedTrailers, true)); + } + } + } + }); + } + }); XioServer xioServer = bootstrap.build(); HelloWorldClient client = HelloWorldClient.run(xioServer.getPort()); @@ -338,45 +358,22 @@ protected void initChannel(Channel ch) throws Exception { return ch; } - private static final AttributeKey TEST_CH_KEY = - AttributeKey.newInstance("xio_test_ch_key"); - @Test public void testGrpcProxyRequest() throws Exception { HelloWorldServer server = HelloWorldServer.run(); - final SslContext sslContext = - SslContextFactory.buildClientContext( - TlsConfig.fromConfig("xio.h2TestClient.settings.tls"), - InsecureTrustManagerFactory.INSTANCE); - - InetSocketAddress boundAddress = new InetSocketAddress("127.0.0.1", server.getPort()); - - XioChannelHandlerFactory f = - () -> - new ChannelDuplexHandler() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Channel ch = ctx.channel().attr(TEST_CH_KEY).get(); - if (ch == null) { - ch = buildProxy(group, sslContext, ctx, boundAddress); - ctx.channel().attr(TEST_CH_KEY).set(ch); - } - - if (msg instanceof Http2Request) { - Http2Request request = (Http2Request) msg; - ch.writeAndFlush(request); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - ctx.write(msg, promise); - } - }; + ClientConfig config = ClientConfig.fromConfig("xio.h2TestClient"); + ProxyConfig proxyConfig = ProxyConfig.parse("https://127.0.0.1:" + server.getPort() + "/"); XioServerBootstrap bootstrap = - XioServerBootstrap.fromConfig("xio.testGrpcServer").addToPipeline(new SmartHttpPipeline(f)); + XioServerBootstrap.fromConfig("xio.testGrpcServer") + .addToPipeline( + new SmartHttpPipeline() { + @Override + public ChannelHandler getApplicationRouter() { + return new PipelineRouter( + ImmutableMap.of(), new ProxyHandler(config, proxyConfig)); + } + }); XioServer xioServer = bootstrap.build(); HelloWorldClient client = HelloWorldClient.run(xioServer.getPort()); diff --git a/src/test/java/com/xjeffrose/xio/http/Http2ClientCodecUnitTest.java b/src/test/java/com/xjeffrose/xio/http/Http2ClientCodecUnitTest.java index 9484f0fc..41a8d235 100644 --- a/src/test/java/com/xjeffrose/xio/http/Http2ClientCodecUnitTest.java +++ b/src/test/java/com/xjeffrose/xio/http/Http2ClientCodecUnitTest.java @@ -247,11 +247,10 @@ public void testStreamingResponse() throws Exception { Http2Headers headers = new DefaultHttp2Headers().status("200"); Http2Response responseIn = Http2Response.build(1, headers, false); ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1"); - Http2Response content = - Http2Response.build(1, new DefaultHttp2DataFrame(body1, false, 1), false); + Http2Response content = Http2Response.build(1, new DefaultHttp2DataFrame(body1, false), false); ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2"); Http2Response lastContent = - Http2Response.build(1, new DefaultHttp2DataFrame(body2, true, 1), true); + Http2Response.build(1, new DefaultHttp2DataFrame(body2, true), true); channel.writeInbound(responseIn); channel.writeInbound(content); @@ -299,11 +298,10 @@ public void testStreamingResponseWithTrailingHeaders() throws Exception { Http2Headers headers = new DefaultHttp2Headers().status("200"); Http2Response responseIn = Http2Response.build(1, headers, false); ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1"); - Http2Response content = - Http2Response.build(1, new DefaultHttp2DataFrame(body1, false, 1), false); + Http2Response content = Http2Response.build(1, new DefaultHttp2DataFrame(body1, false), false); ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2"); Http2Response lastContent = - Http2Response.build(1, new DefaultHttp2DataFrame(body2, false, 1), false); + Http2Response.build(1, new DefaultHttp2DataFrame(body2, false), false); Http2Response trailers = Http2Response.build(1, new DefaultHttp2Headers().set("foo", "bar"), true); diff --git a/src/test/java/com/xjeffrose/xio/http/Http2ServerCodecUnitTest.java b/src/test/java/com/xjeffrose/xio/http/Http2ServerCodecUnitTest.java index 864271ec..b6f8160c 100644 --- a/src/test/java/com/xjeffrose/xio/http/Http2ServerCodecUnitTest.java +++ b/src/test/java/com/xjeffrose/xio/http/Http2ServerCodecUnitTest.java @@ -93,10 +93,9 @@ public void testStreamingRequest() throws Exception { Http2Headers headers = new DefaultHttp2Headers().method("POST").path("/"); Http2Request requestIn = Http2Request.build(1, headers, false); ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1"); - Http2Request content = Http2Request.build(1, new DefaultHttp2DataFrame(body1, false, 1), false); + Http2Request content = Http2Request.build(1, new DefaultHttp2DataFrame(body1, false), false); ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2"); - Http2Request lastContent = - Http2Request.build(1, new DefaultHttp2DataFrame(body2, true, 1), true); + Http2Request lastContent = Http2Request.build(1, new DefaultHttp2DataFrame(body2, true), true); channel.writeInbound(requestIn); channel.writeInbound(content); @@ -151,10 +150,9 @@ public void testStreamingRequestWithTrailingHeaders() { Http2Headers headers = new DefaultHttp2Headers().method("POST").path("/"); Http2Request requestIn = Http2Request.build(1, headers, false); ByteBuf body1 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body1"); - Http2Request content = Http2Request.build(1, new DefaultHttp2DataFrame(body1, false, 1), false); + Http2Request content = Http2Request.build(1, new DefaultHttp2DataFrame(body1, false), false); ByteBuf body2 = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, "body2"); - Http2Request lastContent = - Http2Request.build(1, new DefaultHttp2DataFrame(body2, true, 1), false); + Http2Request lastContent = Http2Request.build(1, new DefaultHttp2DataFrame(body2, true), false); Http2Headers trailers = new DefaultHttp2Headers().set("foo", "bar"); Http2Request lastHeaders = Http2Request.build(1, trailers, true); diff --git a/src/test/java/com/xjeffrose/xio/http/ProxyWiringTest.java b/src/test/java/com/xjeffrose/xio/http/ProxyWiringTest.java index 3698a2bf..f9e509a4 100644 --- a/src/test/java/com/xjeffrose/xio/http/ProxyWiringTest.java +++ b/src/test/java/com/xjeffrose/xio/http/ProxyWiringTest.java @@ -23,8 +23,16 @@ import org.junit.AssumptionViolatedException; import org.junit.Before; import org.junit.Test; +import org.junit.BeforeClass; +import com.xjeffrose.xio.fixtures.JulBridge; public class ProxyWiringTest extends Assert { + + @BeforeClass + public static void setupJul() { + JulBridge.initialize(); + } + OkHttpClient client = OkHttpUnsafe.getUnsafeClient(); public MockWebServer server; diff --git a/src/test/java/com/xjeffrose/xio/http/ReverseProxyFunctionalTest.java b/src/test/java/com/xjeffrose/xio/http/ReverseProxyFunctionalTest.java new file mode 100644 index 00000000..41bfb06e --- /dev/null +++ b/src/test/java/com/xjeffrose/xio/http/ReverseProxyFunctionalTest.java @@ -0,0 +1,235 @@ +package com.xjeffrose.xio.http; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.mockwebserver.Dispatcher; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import okhttp3.mockwebserver.SocketPolicy; +import com.xjeffrose.xio.fixtures.OkHttpUnsafe; +import com.xjeffrose.xio.application.Application; +import com.xjeffrose.xio.bootstrap.ApplicationBootstrap; +import com.xjeffrose.xio.pipeline.SmartHttpPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.Rule; +import org.junit.rules.TestName; +import com.xjeffrose.xio.application.ApplicationConfig; +import io.netty.channel.ChannelHandler; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.ConfigFactory; +import com.xjeffrose.xio.SSL.TlsConfig; +import com.typesafe.config.Config; +import okhttp3.RequestBody; +import okhttp3.MediaType; +import okhttp3.Protocol; +import java.util.Arrays; +import java.util.List; +import com.xjeffrose.xio.client.ClientConfig; +import com.xjeffrose.xio.fixtures.JulBridge; +import org.junit.BeforeClass; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ReverseProxyFunctionalTest extends Assert { + + @BeforeClass + public static void setupJul() { + JulBridge.initialize(); + } + + OkHttpClient client; + Config config; + EventLoopGroup group; + ApplicationConfig appConfig; + Application reverseProxy; + MockWebServer server; + + static Application setupReverseProxy(ApplicationConfig appConfig, ProxyConfig proxyConfig) { + ClientConfig config = ClientConfig.fromConfig("clients.main", appConfig.getConfig()); + + return new ApplicationBootstrap(appConfig) + .addServer( + "main", + (bs) -> + bs.addToPipeline( + new SmartHttpPipeline() { + @Override + public ChannelHandler getApplicationRouter() { + return new PipelineRouter( + ImmutableMap.of(), new ProxyHandler(config, proxyConfig)); + } + })) + .build(); + } + + @Rule public TestName testName = new TestName(); + + @Before + public void setupCommon() { + config = ConfigFactory.load(); + log.debug("Test: " + testName.getMethodName()); + } + + void setupBack(boolean h2) throws Exception { + String back = h2 ? "h2" : "h1"; + + TlsConfig tlsConfig = + TlsConfig.fromConfig("xio." + back + "BackendServer.settings.tls", config); + List protocols; + if (h2) { + protocols = Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1); + } else { + protocols = Arrays.asList(Protocol.HTTP_1_1); + } + + server = OkHttpUnsafe.getSslMockWebServer(tlsConfig); + server.setProtocols(protocols); + server.start(); + } + + void setupFrontBack(boolean h2Front, boolean h2Back) throws Exception { + setupBack(h2Back); + int port = server.getPort(); + + String front = h2Front ? "h2" : "h1"; + appConfig = ApplicationConfig.fromConfig("xio." + front + "ReverseProxy", config); + ProxyConfig proxyConfig = ProxyConfig.parse("https://127.0.0.1:" + port + "/hello"); + reverseProxy = setupReverseProxy(appConfig, proxyConfig); + } + + void setupClient(boolean h2) throws Exception { + if (h2) { + client = + OkHttpUnsafe.getUnsafeClient() + .newBuilder() + .protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1)) + .build(); + } else { + client = + OkHttpUnsafe.getUnsafeClient() + .newBuilder() + .protocols(Arrays.asList(Protocol.HTTP_1_1)) + .build(); + } + } + + @After + public void tearDown() throws Exception { + client.connectionPool().evictAll(); + if (reverseProxy != null) { + reverseProxy.close(); + } + server.close(); + } + + int port() { + return reverseProxy.instrumentation("main").boundAddress().getPort(); + } + + String url(int port) { + StringBuilder path = + new StringBuilder("https://").append("127.0.0.1").append(":").append(port).append("/hello"); + return path.toString(); + } + + MockResponse buildResponse() { + return new MockResponse().setBody("hello, world").setSocketPolicy(SocketPolicy.KEEP_OPEN); + } + + void get(int port) throws Exception { + String url = url(port); + Request request = new Request.Builder().url(url).build(); + + server.enqueue(buildResponse()); + Response response = client.newCall(request).execute(); + + RecordedRequest servedRequest = server.takeRequest(); + assertEquals("/hello", servedRequest.getRequestUrl().encodedPath()); + } + + void post(int port) throws Exception { + String url = url(port); + MediaType mediaType = MediaType.parse("text/plain"); + RequestBody body = RequestBody.create(mediaType, "this is the post body"); + Request request = new Request.Builder().url(url).post(body).build(); + + server.enqueue(buildResponse()); + Response response = client.newCall(request).execute(); + + RecordedRequest servedRequest = server.takeRequest(); + assertEquals("/hello", servedRequest.getRequestUrl().encodedPath()); + assertEquals("this is the post body", servedRequest.getBody().readUtf8()); + } + + @Test + public void sanityCheckHttp1Get() throws Exception { + setupClient(false); + setupBack(false); + + get(server.getPort()); + } + + @Test + public void sanityCheckHttp1Post() throws Exception { + setupClient(false); + setupBack(false); + + post(server.getPort()); + } + + @Test + public void sanityCheckHttp2Get() throws Exception { + setupClient(true); + setupBack(true); + + get(server.getPort()); + } + + @Test + public void sanityCheckHttp2Post() throws Exception { + setupClient(true); + setupBack(true); + + post(server.getPort()); + } + + @Test + public void testHttp2toHttp1ServerGet() throws Exception { + setupClient(true); + setupFrontBack(true, false); + + get(port()); + } + + @Test + public void testHttp2toHttp1ServerPost() throws Exception { + setupClient(true); + setupFrontBack(true, false); + + post(port()); + } + + @Test + public void testHttp1toHttp2ServerGet() throws Exception { + setupClient(false); + setupFrontBack(false, true); + + get(port()); + } + + @Test + public void testHttp1toHttp2ServerPost() throws Exception { + setupClient(false); + setupFrontBack(false, true); + + post(port()); + } +} diff --git a/src/test/java/com/xjeffrose/xio/tracing/HttpClientTracingHandlerIntegrationTest.java b/src/test/java/com/xjeffrose/xio/tracing/HttpClientTracingHandlerIntegrationTest.java index 9ed1e528..5c8213f0 100644 --- a/src/test/java/com/xjeffrose/xio/tracing/HttpClientTracingHandlerIntegrationTest.java +++ b/src/test/java/com/xjeffrose/xio/tracing/HttpClientTracingHandlerIntegrationTest.java @@ -9,6 +9,11 @@ /* public class HttpClientTracingHandlerIntegrationTest extends ITHttpClient { + @BeforeClass + public static void setupJul() { + JulBridge.initialize(); + } + @Rule public TestWatcher testWatcher = new TestWatcher() { @Override diff --git a/src/test/java/com/xjeffrose/xio/tracing/HttpServerTracingHandlerIntegrationTest.java b/src/test/java/com/xjeffrose/xio/tracing/HttpServerTracingHandlerIntegrationTest.java index 67c33d9c..7e32ec5d 100644 --- a/src/test/java/com/xjeffrose/xio/tracing/HttpServerTracingHandlerIntegrationTest.java +++ b/src/test/java/com/xjeffrose/xio/tracing/HttpServerTracingHandlerIntegrationTest.java @@ -7,6 +7,11 @@ /* public class HttpServerTracingHandlerIntegrationTest extends ITHttpServer { + @BeforeClass + public static void setupJul() { + JulBridge.initialize(); + } + static Logger disableJavaLogging() { Logger logger = Logger.getLogger("okhttp3.mockwebserver.MockWebServer"); logger.setLevel(Level.WARNING); diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index aa611a98..4a35f718 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -138,4 +138,90 @@ xio { } } + h1ReverseProxy = ${xio.applicationTemplate} { + name = "test application" + settings { + zookeeper { + cluster = "" + } + } + servers { + main = ${xio.serverTemplate} { + name = "test server" + settings { + bindPort = 0 + tls { + alpn { + supportedProtocols = [ + "http/1.1" + ] + } + } + } + } + } + clients { + main = ${xio.clientTemplate} { + name = "test client" + } + } + } + + h2ReverseProxy = ${xio.applicationTemplate} { + name = "test application" + settings { + zookeeper { + cluster = "" + } + } + servers { + main = ${xio.serverTemplate} { + name = "test server" + settings { + bindPort = 0 + tls { + alpn { + supportedProtocols = [ + "h2" + ] + } + } + } + } + } + clients { + main = ${xio.clientTemplate} { + name = "test client" + } + } + } + + h1BackendServer = ${xio.serverTemplate} { + name = "testHttpsServer" + settings { + bindPort = 0 + tls { + alpn { + supportedProtocols = [ + "http/1.1" + ] + } + } + } + } + + h2BackendServer = ${xio.serverTemplate} { + name = "testHttpsServer" + settings { + bindPort = 0 + tls { + alpn { + supportedProtocols = [ + "h2" + ] + } + } + } + } + } diff --git a/src/test/resources/logback.groovy b/src/test/resources/logback.groovy index 0316d0e9..cafd1a52 100644 --- a/src/test/resources/logback.groovy +++ b/src/test/resources/logback.groovy @@ -23,8 +23,11 @@ appender("DEVNULL", FileAppender) { } } -logger("io.netty.channel.DefaultChannelPipeline", DEBUG) logger("com.xjeffrose.xio.config.ConfigReloader", OFF) +logger("com.xjeffrose.xio.SSL.XioTrustManagerFactory", OFF) +logger("com.xjeffrose.xio.core.NullZkClient", OFF) +logger("io.netty.channel.DefaultChannelPipeline", DEBUG) +logger("io.netty.util.internal.NativeLibraryLoader", ERROR) if (System.getProperty("DEBUG") != null) { root(DEBUG, ["CONSOLE"])