Skip to content

Commit

Permalink
Support cleaning Publisher data based on DataID & ignore Publisher re…
Browse files Browse the repository at this point in the history
…quests from clients
  • Loading branch information
hui-cha committed Dec 16, 2024
1 parent 632e0d6 commit 6ca5ae6
Show file tree
Hide file tree
Showing 7 changed files with 957 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class ValueConstants {
DataInfo.toDataInfoId(
"session.blacklist.data", SESSION_PROVIDE_DATA_INSTANCE_ID, SESSION_PROVIDE_DATA_GROUP);

public static final String SESSION_DATAID_BLACKLIST_DATA_ID =
DataInfo.toDataInfoId(
"session.dataid.blacklist", SESSION_PROVIDE_DATA_INSTANCE_ID, SESSION_PROVIDE_DATA_GROUP);

public static final String CLIENT_OFF_ADDRESS_DATA_ID =
DataInfo.toDataInfoId(
"registry.client.off.list", SESSION_PROVIDE_DATA_INSTANCE_ID, SESSION_PROVIDE_DATA_GROUP);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package com.alipay.sofa.registry.server.meta.resource;

import com.alipay.sofa.registry.common.model.Tuple;
import com.alipay.sofa.registry.common.model.console.PersistenceData;
import com.alipay.sofa.registry.common.model.console.PersistenceDataBuilder;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.metaserver.ProvideDataChangeEvent;
import com.alipay.sofa.registry.common.model.store.DataInfo;
import com.alipay.sofa.registry.core.model.Result;
import com.alipay.sofa.registry.jdbc.constant.TableEnum;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataNotifier;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService;
import com.alipay.sofa.registry.store.api.DBResponse;
import com.alipay.sofa.registry.store.api.OperationStatus;
import com.alipay.sofa.registry.store.api.config.DefaultCommonConfig;
import com.alipay.sofa.registry.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.HashSet;
import java.util.Set;

/**
* @author huicha
* @date 2024/12/13
*/
@Path("datainfoid/blacklist")
public class DataInfoIDBlacklistResource {
private static final Logger LOGGER = LoggerFactory.getLogger(DataInfoIDBlacklistResource.class);

@Autowired
private ProvideDataService provideDataService;

@Autowired
private ProvideDataNotifier provideDataNotifier;

@Autowired
private DefaultCommonConfig defaultCommonConfig;

@POST
@Path("add")
@Produces(MediaType.APPLICATION_JSON)
public Result addBlackList(String dataCenter, String dataId, String group, String instanceId) {
try {
return process(dataCenter, dataId, group, instanceId, Operation.ADD);
} catch (Throwable throwable) {
LOGGER.error("Save dataid black list exception", throwable);
return Result.failed("Save dataid black list exception");
}
}

@POST
@Path("delete")
@Produces(MediaType.APPLICATION_JSON)
public Result deleteBlackList(String dataCenter, String dataId, String group, String instanceId) {
try {
return process(dataCenter, dataId, group, instanceId, Operation.DELETE);
} catch (Throwable throwable) {
LOGGER.error("Delete dataid black list exception", throwable);
return Result.failed("Delete dataid black list exception");
}
}

private Result process(String dataCenter, String dataId, String group, String instanceId, Operation operation) {
// 1. 参数检查
// 1.1. 检查 DataCenter 是否就是当前 Meta 的所属 DataCenter
String clusterId = defaultCommonConfig.getClusterId(TableEnum.PROVIDE_DATA.getTableName(), ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);
if (!StringUtils.equals(dataCenter, clusterId)) {
// 给定的机房不是当前机房,那么拒绝添加黑名单,直接返回
return Result.failed("Invalid data center");
}

// 1.2. 检查要处理的 DataId 以及 Group 是否符合规则
DataInfo dataInfo = new DataInfo(instanceId, dataId, group);
Tuple<Boolean, String> checkResult = this.checkDataInfoId(dataInfo);
if (!checkResult.o1) {
// 不符合规则,那么拒绝添加黑名单,直接返回
return Result.failed("Invalid dataid: " + checkResult.o2);
}

// 2. 查询出当前黑名单列表
DBResponse<PersistenceData> queryResponse =
this.provideDataService.queryProvideData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);

// 3. 根据操作类型,添加 DataID 到列表中,或者删除列表中的 DataID,并保存
Tuple<PersistenceData, Long> tuple = this.createNewPersistenceData(queryResponse, dataInfo, operation);
PersistenceData newPersistenceData = tuple.o1;
Long oldVersion = tuple.o2;
if (!this.provideDataService.saveProvideData(newPersistenceData, oldVersion)) {
// 保存失败
return Result.failed("Save new black list fail");
}

// 4. 保存成功则通知 Session 黑名单变化了
ProvideDataChangeEvent provideDataChangeEvent =
new ProvideDataChangeEvent(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID,
newPersistenceData.getVersion());
this.provideDataNotifier.notifyProvideDataChange(provideDataChangeEvent);

return Result.success();
}

private Tuple<PersistenceData, Long> createNewPersistenceData(DBResponse<PersistenceData> queryResponse, DataInfo dataInfo, Operation operation) {
OperationStatus operationStatus = queryResponse.getOperationStatus();
if (OperationStatus.SUCCESS.equals(operationStatus)) {
// 读取旧数据成功,其格式为 Json 字符串,解析出来
PersistenceData oldPersistenceData = queryResponse.getEntity();
String oldBlackListJson = oldPersistenceData.getData();
Set<String> oldDataIdBlackList = JsonUtils.read(oldBlackListJson, new TypeReference<Set<String>>() {});

// 添加或删除新的需要拉黑的数据
if (Operation.ADD.equals(operation)) {
oldDataIdBlackList.add(dataInfo.getDataInfoId());
} else {
oldDataIdBlackList.remove(dataInfo.getDataInfoId());
}

// 创建新数据,并返回新数据以及旧数据的版本号
PersistenceData newPersistenceData = PersistenceDataBuilder
.createPersistenceData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID,
JsonUtils.writeValueAsString(oldDataIdBlackList));
return new Tuple<>(newPersistenceData, oldPersistenceData.getVersion());
} else {
// 没有旧数据旧直接创建新的,旧数据的版本号设置为 0
Set<String> dataIdBlackList = new HashSet<>();
if (Operation.ADD.equals(operation)) {
dataIdBlackList.add(dataInfo.getDataInfoId());
}
PersistenceData newPersistenceData = PersistenceDataBuilder
.createPersistenceData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID,
JsonUtils.writeValueAsString(dataIdBlackList));
return new Tuple<>(newPersistenceData, 0L);
}
}

protected Tuple<Boolean, String> checkDataInfoId(DataInfo dataInfo) {
return new Tuple<>(true, "");
}

@VisibleForTesting
public DataInfoIDBlacklistResource setProvideDataService(ProvideDataService provideDataService) {
this.provideDataService = provideDataService;
return this;
}

@VisibleForTesting
public DataInfoIDBlacklistResource setProvideDataNotifier(ProvideDataNotifier provideDataNotifier) {
this.provideDataNotifier = provideDataNotifier;
return this;
}

@VisibleForTesting
public DataInfoIDBlacklistResource setDefaultCommonConfig(DefaultCommonConfig defaultCommonConfig) {
this.defaultCommonConfig = defaultCommonConfig;
return this;
}
}

enum Operation {
ADD,
DELETE
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.alipay.sofa.registry.server.meta.resource;

import com.alipay.sofa.registry.common.model.Node.NodeType;
import com.alipay.sofa.registry.common.model.console.PersistenceData;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.core.model.Result;
import com.alipay.sofa.registry.server.meta.AbstractMetaServerTestBase;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataNotifier;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService;
import com.alipay.sofa.registry.store.api.DBResponse;
import com.alipay.sofa.registry.store.api.OperationStatus;
import com.alipay.sofa.registry.store.api.config.DefaultCommonConfig;
import com.alipay.sofa.registry.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.junit.Assert;
import org.junit.Test;

import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/**
* @author huicha
* @date 2024/12/13
*/
public class DataInfoIDBlacklistResourceTest extends AbstractMetaServerTestBase {

private ProvideDataService createProvideDataService() {
return spy(new InMemoryProvideDataRepo());
}

private DataInfoIDBlacklistResource createDataIDBlacklistResource(ProvideDataService provideDataService) {
ProvideDataNotifier provideDataNotifier = mock(ProvideDataNotifier.class);
DefaultCommonConfig defaultCommonConfig = mock(DefaultCommonConfig.class);
when(defaultCommonConfig.getClusterId(anyString(), anyString())).thenReturn("DEFAULT_DATACENTER");

return new DataInfoIDBlacklistResource()
.setProvideDataNotifier(provideDataNotifier)
.setProvideDataService(provideDataService)
.setDefaultCommonConfig(defaultCommonConfig);
}

private DataInfoIDBlacklistResource createDataIDBlacklistResource(ProvideDataService provideDataService,
ProvideDataNotifier provideDataNotifier) {
DefaultCommonConfig defaultCommonConfig = mock(DefaultCommonConfig.class);
when(defaultCommonConfig.getClusterId(anyString(), anyString())).thenReturn("DEFAULT_DATACENTER");

return new DataInfoIDBlacklistResource()
.setProvideDataNotifier(provideDataNotifier)
.setProvideDataService(provideDataService)
.setDefaultCommonConfig(defaultCommonConfig);
}

@Test
public void testAddAndDelete() {
ProvideDataService provideDataService = createProvideDataService();
DataInfoIDBlacklistResource resource = this.createDataIDBlacklistResource(provideDataService);

String dataCenter = "DEFAULT_DATACENTER";
String dataIdOne = "dataid.black.list.test";
String group = "dataid-black-list-test-group";
String instanceId = "DEFAULT_INSTANCE_ID";

// 添加了两个数据
Result resultOne = resource.addBlackList(dataCenter, dataIdOne, group, instanceId);
Assert.assertTrue(resultOne.isSuccess());

String dataIdTwo = "dataid.black.list.test2";
Result resultTwo = resource.addBlackList(dataCenter, dataIdTwo, group, instanceId);
Assert.assertTrue(resultTwo.isSuccess());

// 因此这里的查询结果也应该是两条
DBResponse<PersistenceData> queryResult = provideDataService.queryProvideData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);
Assert.assertEquals(OperationStatus.SUCCESS, queryResult.getOperationStatus());
Assert.assertNotNull(queryResult.getEntity());
PersistenceData persistenceData = queryResult.getEntity();
String dataJson = persistenceData.getData();
Set<String> data = JsonUtils.read(dataJson, new TypeReference<Set<String>>() {});
Assert.assertEquals(2, data.size());
Assert.assertTrue(data.contains(String.format("%s#@#%s#@#%s", dataIdOne, instanceId, group)));
Assert.assertTrue(data.contains(String.format("%s#@#%s#@#%s", dataIdTwo, instanceId, group)));

// 删除了第一条数据以及一条不存在的数据
Result deleteResultOne = resource.deleteBlackList(dataCenter, dataIdOne, group, instanceId);
Assert.assertTrue(deleteResultOne.isSuccess());

String notExistDataId = "not.exist";
Result deleteResultTwo = resource.deleteBlackList(dataCenter, notExistDataId, group, instanceId);
Assert.assertTrue(deleteResultTwo.isSuccess());

// 因此这里的查询结果应该是只有一条数据,且是第二条数据
DBResponse<PersistenceData> queryResultTwo = provideDataService.queryProvideData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);
Assert.assertEquals(OperationStatus.SUCCESS, queryResultTwo.getOperationStatus());
Assert.assertNotNull(queryResultTwo.getEntity());
PersistenceData persistenceDataTwo = queryResultTwo.getEntity();
String dataJsonTwo = persistenceDataTwo.getData();
Set<String> dataTwo = JsonUtils.read(dataJsonTwo, new TypeReference<Set<String>>() {});
Assert.assertEquals(1, dataTwo.size());
Assert.assertTrue(data.contains(String.format("%s#@#%s#@#%s", dataIdTwo, instanceId, group)));
}

@Test
public void testNotify() {
String dataCenter = "DEFAULT_DATACENTER";
String dataId = "dataid.black.list.test";
String group = "dataid-black-list-test-group";
String instanceId = "DEFAULT_INSTANCE_ID";

AtomicInteger counter = new AtomicInteger(0);

ProvideDataService provideDataService = createProvideDataService();
DataInfoIDBlacklistResource resource = this.createDataIDBlacklistResource(provideDataService, event -> {
// 这个数据是提供给 Session 消费的,因此消费的节点类型有且只有 Session
Set<NodeType> nodeTypes = event.getNodeTypes();
Assert.assertEquals(1, nodeTypes.size());
Assert.assertTrue(nodeTypes.contains(NodeType.SESSION));

// 检查 DataInfoId 是否是预期的
String dataInfoId = event.getDataInfoId();
Assert.assertEquals(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID, dataInfoId);

// 增加计数
counter.addAndGet(1);
});

Result result = resource.addBlackList(dataCenter, dataId, group, instanceId);
Assert.assertTrue(result.isSuccess());
Assert.assertEquals(1, counter.get());
}
}
Loading

0 comments on commit 6ca5ae6

Please sign in to comment.