From 593ab1027ca15a721370b053b34be504311653d3 Mon Sep 17 00:00:00 2001 From: "xidong.rxd" Date: Mon, 23 Dec 2024 16:27:28 +0800 Subject: [PATCH] Add the capability to query the Publisher information of dataInfoId in the Session --- .../registry/common/model/PublisherUtils.java | 28 +++++- .../sessionserver/QueryPublisherRequest.java | 30 +++++++ .../model/sessionserver/SimplePublisher.java | 44 ++++++++++ .../bootstrap/SessionServerConfiguration.java | 7 ++ .../handler/QueryPublisherRequestHandler.java | 52 +++++++++++ .../resource/SessionDigestResource.java | 66 ++++++++++++-- .../QueryPublisherRequestHandlerTest.java | 87 +++++++++++++++++++ 7 files changed, 306 insertions(+), 8 deletions(-) create mode 100644 server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/QueryPublisherRequest.java create mode 100644 server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/SimplePublisher.java create mode 100644 server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/console/handler/QueryPublisherRequestHandler.java create mode 100644 server/server/session/src/test/java/com/alipay/sofa/registry/server/session/remoting/console/handler/QueryPublisherRequestHandlerTest.java diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherUtils.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherUtils.java index b7f8f1f28..9811a00a5 100644 --- a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherUtils.java +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/PublisherUtils.java @@ -17,11 +17,18 @@ package com.alipay.sofa.registry.common.model; import com.alipay.sofa.registry.common.model.dataserver.DatumSummary; +import com.alipay.sofa.registry.common.model.sessionserver.SimplePublisher; import com.alipay.sofa.registry.common.model.slot.filter.SyncAcceptorRequest; import com.alipay.sofa.registry.common.model.slot.filter.SyncSlotAcceptorManager; import com.alipay.sofa.registry.common.model.store.Publisher; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.util.*; +import org.apache.commons.collections.CollectionUtils; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; /** * @author xiaojian.xj @@ -92,4 +99,23 @@ public static Publisher clonePublisher(Publisher publisher) { newPub.setSessionProcessId(publisher.getSessionProcessId()); return newPub; } + + public static SimplePublisher convert(Publisher publisher) { + return new SimplePublisher( + publisher.getClientId(), + publisher.getSourceAddress().buildAddressString(), + publisher.getAppName() + ); + } + + public static List convert(Collection publishers) { + if (CollectionUtils.isEmpty(publishers)) { + return Collections.emptyList(); + } + List ret = Lists.newArrayListWithCapacity(publishers.size()); + for (Publisher publisher : publishers) { + ret.add(convert(publisher)); + } + return ret; + } } diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/QueryPublisherRequest.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/QueryPublisherRequest.java new file mode 100644 index 000000000..00a183982 --- /dev/null +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/QueryPublisherRequest.java @@ -0,0 +1,30 @@ +package com.alipay.sofa.registry.common.model.sessionserver; + +import com.alipay.sofa.registry.util.StringFormatter; + +import java.io.Serializable; + +/** + * @author huicha + * @date 2024/12/23 + */ +public class QueryPublisherRequest implements Serializable { + + private static final long serialVersionUID = 5295572570779995725L; + + private final String dataInfoId; + + public QueryPublisherRequest(String dataInfoId) { + this.dataInfoId = dataInfoId; + } + + public String getDataInfoId() { + return dataInfoId; + } + + @Override + public String toString() { + return StringFormatter.format("QueryPublisherRequest={}}", dataInfoId); + } + +} diff --git a/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/SimplePublisher.java b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/SimplePublisher.java new file mode 100644 index 000000000..9aaf006d8 --- /dev/null +++ b/server/common/model/src/main/java/com/alipay/sofa/registry/common/model/sessionserver/SimplePublisher.java @@ -0,0 +1,44 @@ +package com.alipay.sofa.registry.common.model.sessionserver; + +import com.alipay.sofa.registry.util.StringFormatter; + +import java.io.Serializable; + +/** + * @author huicha + * @date 2024/12/23 + */ +public final class SimplePublisher implements Serializable { + + private static final long serialVersionUID = 6861155219172594665L; + + private final String clientId; + + private final String sourceAddress; + + private final String appName; + + public SimplePublisher(String clientId, String sourceAddress, String appName) { + this.clientId = clientId; + this.sourceAddress = sourceAddress; + this.appName = appName; + } + + public String getClientId() { + return clientId; + } + + public String getSourceAddress() { + return sourceAddress; + } + + public String getAppName() { + return appName; + } + + @Override + public String toString() { + return StringFormatter.format( + "SimplePublisher{app={},clientId={},add={}}", appName, clientId, sourceAddress); + } +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java index 78ab9849d..6b5ffbbc9 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java @@ -82,6 +82,7 @@ import com.alipay.sofa.registry.server.session.remoting.console.handler.FilterSubscriberIPsHandler; import com.alipay.sofa.registry.server.session.remoting.console.handler.GetClientManagerRequestHandler; import com.alipay.sofa.registry.server.session.remoting.console.handler.PubSubDataInfoIdRequestHandler; +import com.alipay.sofa.registry.server.session.remoting.console.handler.QueryPublisherRequestHandler; import com.alipay.sofa.registry.server.session.remoting.console.handler.QuerySubscriberRequestHandler; import com.alipay.sofa.registry.server.session.remoting.console.handler.StopPushRequestHandler; import com.alipay.sofa.registry.server.session.remoting.handler.AppRevisionSliceHandler; @@ -301,6 +302,7 @@ public Collection serverSyncHandlers() { public Collection consoleHandlers() { Collection list = new ArrayList<>(); list.add(querySubscriberRequestHandler()); + list.add(queryPublisherRequestHandler()); list.add(clientOffRequestHandler()); list.add(clientOnRequestHandler()); list.add(getClientManagerRequestHandler()); @@ -341,6 +343,11 @@ public AbstractServerHandler querySubscriberRequestHandler() { return new QuerySubscriberRequestHandler(); } + @Bean + public AbstractServerHandler queryPublisherRequestHandler() { + return new QueryPublisherRequestHandler(); + } + @Bean public AbstractServerHandler filterSubscriberIPsHandler() { return new FilterSubscriberIPsHandler(); diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/console/handler/QueryPublisherRequestHandler.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/console/handler/QueryPublisherRequestHandler.java new file mode 100644 index 000000000..d2ab37451 --- /dev/null +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/remoting/console/handler/QueryPublisherRequestHandler.java @@ -0,0 +1,52 @@ +package com.alipay.sofa.registry.server.session.remoting.console.handler; + +import com.alipay.sofa.registry.common.model.GenericResponse; +import com.alipay.sofa.registry.common.model.PublisherUtils; +import com.alipay.sofa.registry.common.model.sessionserver.QueryPublisherRequest; +import com.alipay.sofa.registry.common.model.store.Publisher; +import com.alipay.sofa.registry.remoting.Channel; +import com.alipay.sofa.registry.server.session.bootstrap.ExecutorManager; +import com.alipay.sofa.registry.server.session.store.DataStore; +import com.google.common.annotations.VisibleForTesting; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Collection; + +/** + * @author huicha + * @date 2024/12/23 + */ +public class QueryPublisherRequestHandler extends AbstractConsoleHandler { + + @Autowired + protected DataStore sessionDataStore; + + @Override + public Object doHandle(Channel channel, QueryPublisherRequest request) { + Collection publishers = sessionDataStore.getDatas(request.getDataInfoId()); + return new GenericResponse().fillSucceed(PublisherUtils.convert(publishers)); + } + + @Override + public Object buildFailedResponse(String msg) { + return new GenericResponse().fillFailed(msg); + } + + @Override + public Class interest() { + return QueryPublisherRequest.class; + } + + @VisibleForTesting + public QueryPublisherRequestHandler setSessionDataStore(DataStore sessionDataStore) { + this.sessionDataStore = sessionDataStore; + return this; + } + + @VisibleForTesting + public QueryPublisherRequestHandler setExecutorManager(ExecutorManager executorManager) { + this.executorManager = executorManager; + return this; + } + +} diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java index 445965e09..bc0264ef0 100644 --- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java +++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/SessionDigestResource.java @@ -16,15 +16,16 @@ */ package com.alipay.sofa.registry.server.session.resource; -import static com.alipay.sofa.registry.common.model.constants.ValueConstants.CONNECT_ID_SPLIT; - import com.alipay.sofa.registry.common.model.CommonResponse; import com.alipay.sofa.registry.common.model.ConnectId; import com.alipay.sofa.registry.common.model.GenericResponse; +import com.alipay.sofa.registry.common.model.PublisherUtils; import com.alipay.sofa.registry.common.model.Tuple; import com.alipay.sofa.registry.common.model.appmeta.InterfaceMapping; import com.alipay.sofa.registry.common.model.sessionserver.PubSubDataInfoIdRequest; import com.alipay.sofa.registry.common.model.sessionserver.PubSubDataInfoIdResp; +import com.alipay.sofa.registry.common.model.sessionserver.QueryPublisherRequest; +import com.alipay.sofa.registry.common.model.sessionserver.SimplePublisher; import com.alipay.sofa.registry.common.model.store.Publisher; import com.alipay.sofa.registry.common.model.store.StoreData; import com.alipay.sofa.registry.common.model.store.Subscriber; @@ -52,9 +53,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ThreadPoolExecutor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.CollectionUtils; + import javax.annotation.PostConstruct; import javax.annotation.Resource; import javax.ws.rs.GET; @@ -64,8 +65,20 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.util.CollectionUtils; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; + +import static com.alipay.sofa.registry.common.model.constants.ValueConstants.CONNECT_ID_SPLIT; /** * @author shangyu.wh @@ -422,4 +435,43 @@ public List getDataServerList() { public List getMetaServerLeader() { return Lists.newArrayList(mataNodeService.getMetaServerLeader()); } + + @GET + @Path("/data/zone/queryPublisher") + @Produces(MediaType.APPLICATION_JSON) + public GenericResponse> queryZonePublisher(@QueryParam("dataInfoId") String dataInfoId) { + Collection publishers = this.sessionDataStore.getDatas(dataInfoId); + List allPublishers = publishers.stream().map(PublisherUtils::convert).collect(Collectors.toList()); + + List otherSessions = Sdks.getOtherConsoleServers(null, this.sessionServerConfig, this.metaNodeService); + if (!CollectionUtils.isEmpty(otherSessions)) { + Map respMap = Sdks.concurrentSdkSend( + pubSubQueryZoneExecutor, + otherSessions, + (URL url) -> { + final QueryPublisherRequest req = new QueryPublisherRequest(dataInfoId); + return (CommonResponse) sessionConsoleExchanger.request(new SimpleRequest(req, url)).getResult(); + }, + 5000 + ); + + for (Entry entry : respMap.entrySet()) { + CommonResponse response = entry.getValue(); + if (response instanceof GenericResponse) { + GenericResponse> genericResponse = (GenericResponse>) response; + if (genericResponse.isSuccess()) { + List subPublishers = genericResponse.getData(); + allPublishers.addAll(subPublishers); + } else { + LOGGER.error("url={} query publishers fail, response:{}.", + entry.getKey().getIpAddress(), entry.getValue()); + } + } else { + LOGGER.error("url={} query publishers fail, unexpect response type, response:{}.", + entry.getKey().getIpAddress(), entry.getValue()); + } + } + } + return new GenericResponse>().fillSucceed(allPublishers); + } } diff --git a/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/remoting/console/handler/QueryPublisherRequestHandlerTest.java b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/remoting/console/handler/QueryPublisherRequestHandlerTest.java new file mode 100644 index 000000000..4b662462c --- /dev/null +++ b/server/server/session/src/test/java/com/alipay/sofa/registry/server/session/remoting/console/handler/QueryPublisherRequestHandlerTest.java @@ -0,0 +1,87 @@ +package com.alipay.sofa.registry.server.session.remoting.console.handler; + +import com.alipay.sofa.registry.common.model.CommonResponse; +import com.alipay.sofa.registry.common.model.GenericResponse; +import com.alipay.sofa.registry.common.model.Node; +import com.alipay.sofa.registry.common.model.sessionserver.QueryPublisherRequest; +import com.alipay.sofa.registry.common.model.sessionserver.SimplePublisher; +import com.alipay.sofa.registry.common.model.store.Publisher; +import com.alipay.sofa.registry.common.model.store.URL; +import com.alipay.sofa.registry.remoting.ChannelHandler; +import com.alipay.sofa.registry.server.session.TestUtils; +import com.alipay.sofa.registry.server.session.bootstrap.ExecutorManager; +import com.alipay.sofa.registry.server.session.store.DataStore; +import org.apache.commons.collections.CollectionUtils; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @author huicha + * @date 2024/12/23 + */ +public class QueryPublisherRequestHandlerTest { + + + @Test + public void testHandle() { + String dataInfoId = "test-data-info-id"; + + List mockPublishers = new ArrayList<>(); + for (int index = 0; index < 3; index++) { + Publisher mockPublisher = new Publisher(); + mockPublisher.setDataInfoId(dataInfoId); + mockPublisher.setClientId("ClientId-" + index); + mockPublisher.setSourceAddress(URL.valueOf("127.0.0." + index + ":1234")); + mockPublisher.setAppName("App"); + mockPublishers.add(mockPublisher); + } + + DataStore dataStore = mock(DataStore.class); + when(dataStore.getDatas(Mockito.eq(dataInfoId))).thenReturn(mockPublishers); + + QueryPublisherRequestHandler handler = new QueryPublisherRequestHandler(); + handler + .setExecutorManager(new ExecutorManager(TestUtils.newSessionConfig("testDc"))) + .setSessionDataStore(dataStore); + + Assert.assertNotNull(handler.getExecutor()); + Assert.assertEquals(handler.interest(), QueryPublisherRequest.class); + Assert.assertEquals(handler.getConnectNodeType(), Node.NodeType.CONSOLE); + Assert.assertEquals(handler.getType(), ChannelHandler.HandlerType.PROCESSER); + Assert.assertEquals(handler.getInvokeType(), ChannelHandler.InvokeType.SYNC); + Assert.assertFalse(((CommonResponse) handler.buildFailedResponse("msg")).isSuccess()); + + QueryPublisherRequest notExistReq = new QueryPublisherRequest("not-exist"); + GenericResponse> notExistResp = (GenericResponse) handler.doHandle(null, notExistReq); + Assert.assertTrue(notExistResp.isSuccess()); + List notExistPublishers = notExistResp.getData(); + Assert.assertTrue(CollectionUtils.isEmpty(notExistPublishers)); + + QueryPublisherRequest existReq = new QueryPublisherRequest(dataInfoId); + GenericResponse> existResp = (GenericResponse) handler.doHandle(null, existReq); + Assert.assertTrue(existResp.isSuccess()); + List existPublishers = existResp.getData(); + Assert.assertFalse(CollectionUtils.isEmpty(existPublishers)); + Assert.assertEquals(3, existPublishers.size()); + + for (int index = 0; index < existPublishers.size(); index++) { + SimplePublisher existPublisher = existPublishers.get(index); + + String clientId = existPublisher.getClientId(); + String sourceAddr = existPublisher.getSourceAddress(); + String appName = existPublisher.getAppName(); + + Assert.assertEquals("ClientId-" + index, clientId); + Assert.assertEquals("127.0.0." + index + ":1234", sourceAddr); + Assert.assertEquals("App", appName); + } + } + +} \ No newline at end of file