11package com .virjar .spider .proxy .ha .handlers ;
22
3- import com .virjar .spider .proxy .ha .auth .AuthInfo ;
4- import com .virjar .spider .proxy .ha .auth .AuthenticatorManager ;
3+ import com .google .common .collect .Lists ;
54import com .virjar .spider .proxy .ha .core .HaProxyMapping ;
6- import com .virjar .spider .proxy .ha .core .Source ;
75import com .virjar .spider .proxy .ha .handlers .upstream .HttpUpstreamHandShaker ;
86import com .virjar .spider .proxy .ha .handlers .upstream .HttpsUpstreamHandShaker ;
97import com .virjar .spider .proxy .ha .handlers .upstream .UpstreamHandShaker ;
10- import com .virjar .spider .proxy .ha .utils .ClientAuthUtils ;
118import com .virjar .spider .proxy .ha .utils .HttpNettyUtils ;
129import com .virjar .spider .proxy .ha .utils .NettyUtils ;
1310import io .netty .channel .Channel ;
11+ import io .netty .channel .ChannelDuplexHandler ;
1412import io .netty .channel .ChannelFutureListener ;
1513import io .netty .channel .ChannelHandlerContext ;
1614import io .netty .channel .ChannelPipeline ;
17- import io .netty .channel .SimpleChannelInboundHandler ;
1815import io .netty .handler .codec .http .FullHttpResponse ;
1916import io .netty .handler .codec .http .HttpHeaders ;
2017import io .netty .handler .codec .http .HttpMethod ;
18+ import io .netty .handler .codec .http .HttpObject ;
2119import io .netty .handler .codec .http .HttpRequest ;
2220import io .netty .handler .codec .http .HttpRequestDecoder ;
2321import io .netty .handler .codec .http .HttpRequestEncoder ;
2624import io .netty .handler .codec .http .HttpResponseEncoder ;
2725import io .netty .handler .codec .http .HttpResponseStatus ;
2826import io .netty .handler .codec .http .HttpVersion ;
29- import io .netty .util .ReferenceCounted ;
27+ import io .netty .util .ReferenceCountUtil ;
28+ import io .netty .util .concurrent .EventExecutor ;
3029import lombok .extern .slf4j .Slf4j ;
3130
31+ import java .util .LinkedList ;
3232import java .util .regex .Pattern ;
3333
3434@ Slf4j
35- public class HttpServerHandler extends SimpleChannelInboundHandler < HttpRequest > {
35+ public class HttpServerHandler extends ChannelDuplexHandler {
3636 private ChannelHandlerContext ctx ;
3737 private HttpRequest httpRequest ;
3838 private HaProxyMapping haProxyMapping ;
@@ -42,63 +42,66 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<HttpRequest>
4242 private static final int MAX_HEADER_SIZE_DEFAULT = 8192 * 2 ;
4343 private static final int MAX_CHUNK_SIZE_DEFAULT = 8192 * 2 ;
4444
45+ private LinkedList <HttpObject > httpObjects = Lists .newLinkedList ();
46+
4547 @ Override
46- protected void channelRead0 (ChannelHandlerContext ctx , HttpRequest httpRequest ) throws Exception {
48+ public void channelRead (ChannelHandlerContext ctx , Object msg ) throws Exception {
4749 this .ctx = ctx ;
48- log .info ("Received raw request from ip:{} : request:{} " , ctx .channel ().remoteAddress (), httpRequest );
49- if (httpRequest .getDecoderResult ().isFailure ()) {
50- log .warn ("Could not parse request from client. Decoder result: {}" ,
51- httpRequest .getDecoderResult ().toString ());
52- FullHttpResponse response = HttpNettyUtils
53- .createFullHttpResponse (HttpVersion .HTTP_1_1 , HttpResponseStatus .BAD_REQUEST ,
54- "Unable to parse HTTP request" );
55- HttpHeaders .setKeepAlive (response , false );
56- HttpNettyUtils .respondWithShortCircuitResponse (ctx .channel (), response );
57- return ;
58- }
5950
60- this .httpRequest = httpRequest ;
61- if (isRequestToOriginServer ()) {
62- // 不是代理请求,而是直连到代理服务中
63- // 此时认为他是想操作服务器
64- // 比如feedback代理质量、发送指令切换上游代理ip映射、获取当前代理绑定上游ip、获取当前代理的真实出口ip等
65- handleApiHttpRequest ();
51+ if (msg instanceof HttpRequest ) {
52+ httpRequest = (HttpRequest ) msg ;
53+ httpObjects .addLast (httpRequest );
54+ } else if (msg instanceof HttpObject ) {
55+ httpObjects .addLast ((HttpObject ) msg );
56+ return ;
57+ } else {
58+ log .warn ("can not handle message: {}" , msg );
59+ ReferenceCountUtil .release (msg );
60+ ctx .close ();
6661 return ;
6762 }
6863
69- // 创建到 后端代理资源的链接
70- haProxyMapping = HaProxyMapping .get (ctx .channel ());
64+ log .info ("Received raw request from ip:{} local:{}: request:{} " , ctx .channel ().remoteAddress (), ctx .channel ().localAddress (), httpRequest );
7165
72- if (!authClientPermission (haProxyMapping .getSource (), ctx .channel (), httpRequest )) {
73- FullHttpResponse response = HttpNettyUtils
74- .createFullHttpResponse (HttpVersion .HTTP_1_1 , HttpResponseStatus .BAD_REQUEST , "auth failed!" );
66+ if (httpRequest .getDecoderResult ().isFailure ()) {
67+ log .warn ("Could not parse request from client. Decoder result: {}" , httpRequest .getDecoderResult ().toString ());
68+ FullHttpResponse response = HttpNettyUtils .createFullHttpResponse (HttpVersion .HTTP_1_1 ,
69+ HttpResponseStatus .BAD_REQUEST ,
70+ "Unable to parse HTTP request" );
7571 HttpHeaders .setKeepAlive (response , false );
7672 HttpNettyUtils .respondWithShortCircuitResponse (ctx .channel (), response );
73+ ctx .close ();
7774 return ;
7875 }
7976
8077 isHttps = HttpNettyUtils .isCONNECT (httpRequest );
81-
8278 if (!isHttps ) {
8379 // http代理模式,需要暂停读,否则客户端可能一直发数据过来
8480 ctx .channel ().config ().setAutoRead (false );
8581 }
8682
87- haProxyMapping .borrowConnect (value -> {
88- if (value == null ) {
89- log .warn ("connect to upstream proxy server failed:{} " , haProxyMapping .resourceKey ());
90- HttpNettyUtils .writeBadRequest (ctx .channel (), httpRequest );
91- return ;
92- }
93- onUpstreamConnectionEstablish (value );
94- });
95- }
9683
97- private boolean authClientPermission (Source source , Channel channel , HttpRequest httpRequest ) {
98- String proxyAuthContents = httpRequest .headers ().get (HttpHeaders .Names .PROXY_AUTHORIZATION );
99- String ip = ClientAuthUtils .getClientIp (channel , httpRequest );
100- AuthInfo authInfo = AuthInfo .builder ().authToken (proxyAuthContents ).ip (ip ).build ();
101- return AuthenticatorManager .getAuthenticator (source ).authenticate (authInfo );
84+ if (isRequestToOriginServer ()) {
85+ // 不是代理请求,而是直连到代理服务中
86+ // 此时认为他是想操作服务器
87+ // 比如feedback代理质量、发送指令切换上游代理ip映射、获取当前代理绑定上游ip、获取当前代理的真实出口ip等
88+ handleApiHttpRequest ();
89+ return ;
90+ }
91+
92+ // 创建到 后端代理资源的链接
93+ haProxyMapping = HaProxyMapping .get (ctx .channel ());
94+
95+ haProxyMapping .borrowConnect (
96+ value -> {
97+ if (value == null ) {
98+ log .warn ("connect to upstream proxy server failed:{} " , haProxyMapping .resourceKey ());
99+ HttpNettyUtils .writeBadRequest (ctx .channel (), httpRequest );
100+ return ;
101+ }
102+ onUpstreamConnectionEstablish (value );
103+ }
104+ );
102105 }
103106
104107 private void onUpstreamConnectionEstablish (Channel upstreamChannel ) {
@@ -124,62 +127,87 @@ public void onHandSharkSuccess() {
124127
125128 UpstreamHandShaker <HttpRequest > handShaker ;
126129 if (isHttps ) {
127- handShaker = new HttpsUpstreamHandShaker (upstreamChannel , haProxyMapping .getSource (), callback ,
128- httpRequest );
130+ handShaker = new HttpsUpstreamHandShaker (upstreamChannel , haProxyMapping .getSource (), callback , httpRequest );
129131 } else {
130132 handShaker = new HttpUpstreamHandShaker (upstreamChannel , haProxyMapping .getSource (), callback , httpRequest );
131133 }
132134 handShaker .doHandShark ();
133135 }
134136
135137 private void onHttpsHandSharkFinish (Channel upstreamChannel ) {
136- HttpResponse response = HttpNettyUtils .createFullHttpResponse (HttpVersion .HTTP_1_1 , CONNECTION_ESTABLISHED );
138+ HttpResponse response = HttpNettyUtils .createFullHttpResponse (HttpVersion .HTTP_1_1 ,
139+ CONNECTION_ESTABLISHED );
137140 response .headers ().set (HttpHeaders .Names .CONNECTION , HttpHeaders .Values .KEEP_ALIVE );
138141 HttpNettyUtils .addVia (response , "virjar-spider-ha-proxy" );
139- ctx .channel ().writeAndFlush (response ).addListener ((ChannelFutureListener ) channelFuture -> {
140- if (!channelFuture .isSuccess ()) {
141- //TODO 未来这里可以复用??
142- upstreamChannel .close ();
142+ ctx .channel ().writeAndFlush (response )
143+ .addListener ((ChannelFutureListener ) channelFuture -> {
144+ if (!channelFuture .isSuccess ()) {
145+ //TODO 未来这里可以复用??
146+ upstreamChannel .close ();
147+ return ;
148+ }
149+ ChannelPipeline pipeline = ctx .pipeline ();
150+ pipeline .remove (HttpResponseEncoder .class );
151+ pipeline .remove (HttpRequestDecoder .class );
152+ pipeline .remove (HttpServerHandler .class );
153+
154+ pipeline .addLast (new RelayHandler (upstreamChannel ));
155+ upstreamChannel .pipeline ().addLast (new RelayHandler (channelFuture .channel ()));
156+ });
157+ }
158+
159+ private void appendHttpContent (Channel outboundChannel ) {
160+ EventExecutor executor = ctx .executor ();
161+ if (!executor .inEventLoop ()) {
162+ executor .execute (() -> appendHttpContent (outboundChannel ));
163+ return ;
164+ }
165+ HttpObject httpObject = httpObjects .pollFirst ();
166+ if (httpObject == null ) {
167+ return ;
168+ }
169+
170+ if (!outboundChannel .isActive ()) {
171+ log .warn ("outbound socket has been closed" );
172+ ctx .close ();
173+ return ;
174+ }
175+ outboundChannel .writeAndFlush (httpObject ).addListener (future -> {
176+ if (!future .isSuccess ()) {
177+ ctx .close ();
178+ log .warn ("outbound socket has been closed when append http object" );
143179 return ;
144180 }
145- ChannelPipeline pipeline = ctx .pipeline ();
146- pipeline .remove (HttpResponseEncoder .class );
147- pipeline .remove (HttpRequestDecoder .class );
148- pipeline .remove (HttpServerHandler .class );
149181
150- pipeline .addLast (new RelayHandler (upstreamChannel ));
151- upstreamChannel .pipeline ().addLast (new RelayHandler (channelFuture .channel ()));
182+ if (!httpObjects .isEmpty ()) {
183+ appendHttpContent (outboundChannel );
184+ return ;
185+ }
186+ log .info ("http message write finish ,start http tuning" );
187+ ctx .channel ().pipeline ().addLast (new RelayHandler (outboundChannel ));
188+ outboundChannel .pipeline ().addLast (new RelayHandler (ctx .channel ()));
189+ ctx .pipeline ().remove (HttpServerHandler .class );
190+ ctx .channel ().config ().setAutoRead (true );
152191 });
153192 }
154193
155194 private void onHttpHandSharkFinish (Channel upstreamChannel ) {
156195 ChannelPipeline pipeline = upstreamChannel .pipeline ();
157196 pipeline .addLast (new HttpRequestEncoder ());
158- pipeline .addLast (new HttpResponseDecoder (MAX_INITIAL_LINE_LENGTH_DEFAULT , MAX_HEADER_SIZE_DEFAULT ,
197+ pipeline .addLast (new HttpResponseDecoder (MAX_INITIAL_LINE_LENGTH_DEFAULT ,
198+ MAX_HEADER_SIZE_DEFAULT ,
159199 MAX_CHUNK_SIZE_DEFAULT ));
160200
161- pipeline .addLast (new RelayHandler (ctx .channel ()));
162-
163- ctx .pipeline ().addLast (new RelayHandler (upstreamChannel ));
164- ctx .pipeline ().remove (HttpServerHandler .class );
165-
166- if (httpRequest instanceof ReferenceCounted ) {
167- ((ReferenceCounted ) httpRequest ).retain ();
168- }
169- upstreamChannel .writeAndFlush (httpRequest );
201+ appendHttpContent (upstreamChannel );
170202 }
171203
172- private void handleApiHttpRequest () {
173204
174- //PortalManager.handleRequest(httpRequest, ctx.channel());
205+ private void handleApiHttpRequest () {
175206 // 暂时都返回502,后续再处理真实业务逻辑
176207 HttpNettyUtils .writeBadRequest (ctx .channel (), httpRequest );
177- // if (httpRequest instanceof ReferenceCounted) {
178- // ((ReferenceCounted) httpRequest).retain();
179- // }
180-
181208 }
182209
210+
183211 private boolean isRequestToOriginServer () {
184212 if (httpRequest .getMethod () == HttpMethod .CONNECT ) {
185213 return false ;
@@ -192,6 +220,6 @@ private boolean isRequestToOriginServer() {
192220
193221 private static final Pattern HTTP_SCHEME = Pattern .compile ("^http://.*" , Pattern .CASE_INSENSITIVE );
194222
195- private static final HttpResponseStatus CONNECTION_ESTABLISHED = new HttpResponseStatus (200 ,
196- "Connection established" );
223+ private static final HttpResponseStatus CONNECTION_ESTABLISHED = new HttpResponseStatus (
224+ 200 , "Connection established" );
197225}
0 commit comments