Skip to content

Commit

Permalink
Enhance the ability to forward requests from Meta to the main Meta
Browse files Browse the repository at this point in the history
  • Loading branch information
hui-cha committed Dec 24, 2024
1 parent 4071c28 commit 2cdb85e
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,25 @@
import com.alipay.sofa.registry.remoting.bolt.exchange.BoltExchange;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.remoting.jersey.exchange.JerseyExchange;
import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig;
import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfigBean;
import com.alipay.sofa.registry.server.meta.bootstrap.config.MultiClusterMetaServerConfig;
import com.alipay.sofa.registry.server.meta.bootstrap.config.NodeConfig;
import com.alipay.sofa.registry.server.meta.bootstrap.config.NodeConfigBeanProperty;
import com.alipay.sofa.registry.server.meta.bootstrap.config.*;
import com.alipay.sofa.registry.server.meta.cleaner.AppRevisionCleaner;
import com.alipay.sofa.registry.server.meta.cleaner.InterfaceAppsIndexCleaner;
import com.alipay.sofa.registry.server.meta.lease.filter.DefaultForbiddenServerManager;
import com.alipay.sofa.registry.server.meta.lease.filter.RegistryForbiddenServerManager;
import com.alipay.sofa.registry.server.meta.provide.data.DefaultClientManagerService;
import com.alipay.sofa.registry.server.meta.provide.data.DefaultProvideDataService;
import com.alipay.sofa.registry.server.meta.provide.data.FetchStopPushService;
import com.alipay.sofa.registry.server.meta.provide.data.NodeOperatingService;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService;
import com.alipay.sofa.registry.server.meta.provide.data.*;
import com.alipay.sofa.registry.server.meta.remoting.DataNodeExchanger;
import com.alipay.sofa.registry.server.meta.remoting.MetaServerExchanger;
import com.alipay.sofa.registry.server.meta.remoting.SessionNodeExchanger;
import com.alipay.sofa.registry.server.meta.remoting.connection.DataConnectionManager;
import com.alipay.sofa.registry.server.meta.remoting.connection.MetaConnectionManager;
import com.alipay.sofa.registry.server.meta.remoting.connection.SessionConnectionManager;
import com.alipay.sofa.registry.server.meta.remoting.handler.FetchProvideDataRequestHandler;
import com.alipay.sofa.registry.server.meta.remoting.handler.FetchSystemPropertyRequestHandler;
import com.alipay.sofa.registry.server.meta.remoting.handler.GetSlotTableStatusRequestHandler;
import com.alipay.sofa.registry.server.meta.remoting.handler.HeartbeatRequestHandler;
import com.alipay.sofa.registry.server.meta.remoting.handler.RegistryForbiddenServerHandler;
import com.alipay.sofa.registry.server.meta.remoting.handler.*;
import com.alipay.sofa.registry.server.meta.remoting.meta.LocalMetaExchanger;
import com.alipay.sofa.registry.server.meta.remoting.meta.MetaServerRenewService;
import com.alipay.sofa.registry.server.meta.resource.BlacklistDataResource;
import com.alipay.sofa.registry.server.meta.resource.CircuitBreakerResources;
import com.alipay.sofa.registry.server.meta.resource.ClientManagerResource;
import com.alipay.sofa.registry.server.meta.resource.CompressResource;
import com.alipay.sofa.registry.server.meta.resource.DataInfoIDBlacklistResource;
import com.alipay.sofa.registry.server.meta.resource.HealthResource;
import com.alipay.sofa.registry.server.meta.resource.MetaCenterResource;
import com.alipay.sofa.registry.server.meta.resource.MetaDigestResource;
import com.alipay.sofa.registry.server.meta.resource.MetaLeaderResource;
import com.alipay.sofa.registry.server.meta.resource.ProvideDataResource;
import com.alipay.sofa.registry.server.meta.resource.RecoverConfigResource;
import com.alipay.sofa.registry.server.meta.resource.RegistryCoreOpsResource;
import com.alipay.sofa.registry.server.meta.resource.ShutdownSwitchResource;
import com.alipay.sofa.registry.server.meta.resource.SlotSyncResource;
import com.alipay.sofa.registry.server.meta.resource.SlotTableResource;
import com.alipay.sofa.registry.server.meta.resource.StopPushDataResource;
import com.alipay.sofa.registry.server.meta.resource.*;
import com.alipay.sofa.registry.server.meta.resource.filter.AuthRestFilter;
import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareFilter;
import com.alipay.sofa.registry.server.meta.resource.filter.LeaderForwardFilter;
import com.alipay.sofa.registry.server.meta.slot.status.SlotTableStatusService;
import com.alipay.sofa.registry.server.shared.config.CommonConfig;
import com.alipay.sofa.registry.server.shared.remoting.AbstractServerHandler;
Expand All @@ -89,12 +63,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

/**
* @author shangyu.wh
Expand Down Expand Up @@ -338,6 +307,11 @@ public LeaderAwareFilter leaderAwareFilter() {
return new LeaderAwareFilter();
}

@Bean
public LeaderForwardFilter leaderForwardFilter() {
return new LeaderForwardFilter();
}

@Bean
public AuthRestFilter authRestFilter() {
return new AuthRestFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,15 @@
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataNotifier;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService;
import com.alipay.sofa.registry.server.meta.resource.filter.AuthRestController;
import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareRestController;
import com.alipay.sofa.registry.server.meta.resource.filter.LeaderForwardRestController;
import com.alipay.sofa.registry.store.api.DBResponse;
import com.alipay.sofa.registry.store.api.OperationStatus;
import com.alipay.sofa.registry.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import org.springframework.beans.factory.annotation.Autowired;

import javax.ws.rs.FormParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -33,8 +30,7 @@
* @date 2024/12/13
*/
@Path("datainfoid/blacklist")
@AuthRestController
@LeaderAwareRestController
@LeaderForwardRestController
public class DataInfoIDBlacklistResource {
private static final Logger LOGGER = LoggerFactory.getLogger(DataInfoIDBlacklistResource.class);

Expand All @@ -47,6 +43,7 @@ public class DataInfoIDBlacklistResource {
@POST
@Path("add")
@Produces(MediaType.APPLICATION_JSON)
@AuthRestController
public Result addBlackList(@FormParam("dataId") String dataId,
@FormParam("group") String group,
@FormParam("instanceId") String instanceId) {
Expand All @@ -61,6 +58,7 @@ public Result addBlackList(@FormParam("dataId") String dataId,
@POST
@Path("delete")
@Produces(MediaType.APPLICATION_JSON)
@AuthRestController
public Result deleteBlackList(@FormParam("dataId") String dataId,
@FormParam("group") String group,
@FormParam("instanceId") String instanceId) {
Expand All @@ -72,6 +70,28 @@ public Result deleteBlackList(@FormParam("dataId") String dataId,
}
}

@GET
@Path("query")
@Produces(MediaType.APPLICATION_JSON)
public Result queryBlackList() {
try {
DBResponse<PersistenceData> queryResponse =
this.provideDataService.queryProvideData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);
OperationStatus operationStatus = queryResponse.getOperationStatus();
if (OperationStatus.SUCCESS.equals(operationStatus)) {
PersistenceData persistenceData = queryResponse.getEntity();
Result result = Result.success();
result.setMessage(persistenceData.getData());
return result;
} else {
return Result.success();
}
} catch (Throwable throwable) {
LOGGER.error("Query dataid black list exception", throwable);
return Result.failed("Query dataid black list exception");
}
}

private Result process(String dataId, String group, String instanceId, Operation operation) {
// 1. 参数检查
// 检查要处理的 DataId 以及 Group 是否符合规则
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.alipay.sofa.registry.server.meta.resource.filter;

import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.meta.MetaLeaderService;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.http.HttpStatus;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.Priority;
import javax.ws.rs.Priorities;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import javax.ws.rs.ext.Provider;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* @author huicha
* @date 2024/12/24
*/
@Provider
@LeaderForwardRestController
@Priority(Priorities.USER)
public class LeaderForwardFilter implements ContainerRequestFilter {

private Logger LOGGER = LoggerFactory.getLogger(LeaderForwardFilter.class, "[LeaderForwardFilter]");

@Autowired
private MetaLeaderService metaLeaderService;

@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
if (metaLeaderService.amILeader()) {
return;
}

String leaderAddr = metaLeaderService.getLeader();
if (StringUtils.isBlank(leaderAddr)) {
Response response =
Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.header("reason", "no leader found")
.build();
requestContext.abortWith(response);
return;
}

this.proxyRequestToMetaLeader(requestContext, leaderAddr);
}

/**
* 当前 Meta 不是 Leader,将请求转发到 Meta Leader 中
* 这里没考虑直接使用 Http Client 是因为使用到这个 Filter 的基本都是配置下发链路
* 属于旁路,不需要链接池、内存池等各种资源,因此就不想引入其他依赖了,只使用了 Java 原生的
* 方法,链接也是使用到的时候再主动去链
*/
private void proxyRequestToMetaLeader(ContainerRequestContext requestContext, String leaderAddr) {
HttpURLConnection connection = null;
try {
// 1. 获取请求信息
String method = requestContext.getMethod();
boolean hasEntity = requestContext.hasEntity();
MultivaluedMap<String, String> headers = requestContext.getHeaders();
UriInfo uriInfo = requestContext.getUriInfo();
URI requestURI = uriInfo.getAbsolutePath();
int requestPort = requestURI.getPort();
String requestPath = requestURI.getRawPath();

// 2. 拼接发送给 Meta Leader 的请求地址
String newRequestURLStr = String.format("http://%s:%d%s", leaderAddr, requestPort, requestPath);
URL newRequestURL = new URL(newRequestURLStr);

// 3. 打开链接,这里因为协议写死是 HTTP 协议,所以拿到的必然是 HttpURLConnection
connection = (HttpURLConnection) newRequestURL.openConnection();
connection.setRequestMethod(method);
if (hasEntity) {
connection.setDoOutput(true);
}
connection.setDoInput(true);

// 设置超时时间
connection.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(1));
connection.setReadTimeout((int) TimeUnit.SECONDS.toMillis(3));

// 设置请求头
for (MultivaluedMap.Entry<String, List<String>> entry : headers.entrySet()) {
String headerKey = entry.getKey();
List<String> headerValues = entry.getValue();
if (CollectionUtils.isNotEmpty(headerValues)) {
connection.addRequestProperty(headerKey, headerValues.get(0));
}
}

connection.connect();

// 4. 发送请求,直接做一次拷贝
if (hasEntity) {
try (
OutputStream outputStream = connection.getOutputStream();
InputStream inputStream = requestContext.getEntityStream()
) {
IOUtils.copy(inputStream, outputStream);
}
}

// 5. 接收响应
int responseCode = connection.getResponseCode();
if (responseCode != HttpStatus.OK_200) {
LOGGER.error("Proxy request to meta leader fail, response code: {}, message: {}", responseCode, connection.getResponseMessage());
Response response =
Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.header("reason", "proxy request to meta leader fail: " + connection.getResponseMessage())
.build();
requestContext.abortWith(response);
return;
}

// 读取数据
try (
InputStream inputStream = connection.getInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024);
) {
IOUtils.copy(inputStream, outputStream);
byte[] responseData = outputStream.toByteArray();
Response response = Response.ok(responseData).build();
requestContext.abortWith(response);
}
} catch (Throwable throwable) {
LOGGER.error("Proxy request to meta leader exception, meta leader address: {}", leaderAddr, throwable);
Response response =
Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.header("reason", "proxy request to meta leader exception")
.build();
requestContext.abortWith(response);
} finally {
if (null != connection) {
try {
connection.disconnect();
} catch (Throwable throwable) {
// 吃掉异常
LOGGER.error("Disconnect connection to meta leader fail, meta leader address: {}", leaderAddr, throwable);
}
}
}
}

@VisibleForTesting
public LeaderForwardFilter setMetaLeaderService(MetaLeaderService metaLeaderService) {
this.metaLeaderService = metaLeaderService;
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.alipay.sofa.registry.server.meta.resource.filter;

import javax.ws.rs.NameBinding;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @author huicha
* @date 2024/12/24
*/
@NameBinding
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(value = RetentionPolicy.RUNTIME)
public @interface LeaderForwardRestController {}
Loading

0 comments on commit 2cdb85e

Please sign in to comment.