Skip to content

Commit bf222b7

Browse files
authored
[Subtask]: Modify DefaultTableService to be compatible with master-slave mode. (#3927)
* [Subtask]: Use a new configuration item to control whether master & slave mode is enabled. #3845 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: add AmsAssignService to implement balanced bucket allocation in master-slave mode. #3921 * [Subtask]: Modify DefaultTableService to be compatible with master-slave mode #3923 * [Subtask]: Modify DefaultTableService to be compatible with master-slave mode #3923 * [Subtask]: Modify DefaultTableService to be compatible with master-slave mode #3923 * [Subtask]: Fix unit test failure issue #3923 * [Subtask]: Modify DefaultTableService to be compatible with master-slave mode #3923 * [Subtask]: Modify DefaultTableService to be compatible with master-slave mode #3923 * [Subtask]: Modify DefaultTableService to be compatible with master-slave mode #3923 * [Subtask]: Optimized based on CR feedback. #3923 * [Subtask]: Optimized based on CR feedback. #3923 * [Subtask]: Optimized based on CR feedback. #3923 * [Subtask]: Fixed a legacy bug that could cause unit tests to fail during compilation. #3923 * [Subtask]: Optimized based on CR feedback. #3923
1 parent a40a8c1 commit bf222b7

File tree

18 files changed

+581
-25
lines changed

18 files changed

+581
-25
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,13 @@ public class AmoroManagementConf {
307307
.withDescription(
308308
"Interval for bucket assignment service to detect node changes and redistribute bucket IDs.");
309309

310+
public static final ConfigOption<Duration> HA_BUCKET_TABLE_SYNC_INTERVAL =
311+
ConfigOptions.key("ha.bucket-table-sync.interval")
312+
.durationType()
313+
.defaultValue(Duration.ofSeconds(60))
314+
.withDescription(
315+
"Interval for syncing tables assigned to bucket IDs in master-slave mode. Each node periodically loads tables from database based on its assigned bucket IDs.");
316+
310317
public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
311318
ConfigOptions.key("thrift-server.table-service.bind-port")
312319
.intType()

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,17 @@ public void startOptimizingService() throws Exception {
250250

251251
DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory();
252252
defaultRuntimeFactory.initialize(processFactories);
253-
// In master-slave mode, create AmsAssignService for bucket assignment
253+
254+
BucketAssignStore bucketAssignStore = null;
254255
if (IS_MASTER_SLAVE_MODE && haContainer != null) {
256+
bucketAssignStore = BucketAssignStoreFactory.create(haContainer, serviceConfig);
257+
}
258+
259+
// In master-slave mode, create AmsAssignService for bucket assignment (shares BucketAssignStore
260+
// with DefaultTableService).
261+
if (IS_MASTER_SLAVE_MODE && haContainer != null && bucketAssignStore != null) {
255262
try {
256-
// Create and start AmsAssignService for bucket assignment
257-
// The factory will handle different HA types (ZK, database, etc.)
258-
amsAssignService = new AmsAssignService(haContainer, serviceConfig);
263+
amsAssignService = new AmsAssignService(haContainer, serviceConfig, bucketAssignStore);
259264
amsAssignService.start();
260265
LOG.info("AmsAssignService started for master-slave mode");
261266
} catch (UnsupportedOperationException e) {
@@ -267,7 +272,9 @@ public void startOptimizingService() throws Exception {
267272

268273
List<ActionCoordinator> actionCoordinators = defaultRuntimeFactory.supportedCoordinators();
269274

270-
tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory);
275+
tableService =
276+
new DefaultTableService(
277+
serviceConfig, catalogManager, defaultRuntimeFactory, haContainer, bucketAssignStore);
271278
processService = new ProcessService(tableService, actionCoordinators, executeEngineManager);
272279
optimizingService =
273280
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);

amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ boolean isRunning() {
6767
}
6868

6969
public AmsAssignService(HighAvailabilityContainer haContainer, Configurations serviceConfig) {
70+
this(haContainer, serviceConfig, null);
71+
}
72+
73+
/**
74+
* @param assignStore if non-null, used as the bucket assignment store; otherwise one is created
75+
* via {@link BucketAssignStoreFactory} (same instance can be shared with {@code
76+
* DefaultTableService}).
77+
*/
78+
public AmsAssignService(
79+
HighAvailabilityContainer haContainer,
80+
Configurations serviceConfig,
81+
BucketAssignStore assignStore) {
7082
this.haContainer = haContainer;
7183
this.serviceConfig = serviceConfig;
7284
this.bucketIdTotalCount =
@@ -75,7 +87,10 @@ public AmsAssignService(HighAvailabilityContainer haContainer, Configurations se
7587
serviceConfig.get(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT).toMillis();
7688
this.assignIntervalSeconds =
7789
serviceConfig.get(AmoroManagementConf.HA_ASSIGN_INTERVAL).getSeconds();
78-
this.assignStore = BucketAssignStoreFactory.create(haContainer, serviceConfig);
90+
this.assignStore =
91+
assignStore != null
92+
? assignStore
93+
: BucketAssignStoreFactory.create(haContainer, serviceConfig);
7994
}
8095

8196
/**

amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ public boolean hasLeadership() {
175175
return isLeader.get();
176176
}
177177

178+
@Override
179+
public AmsServerInfo getTableServiceServerInfo() {
180+
return tableServiceServerInfo;
181+
}
182+
178183
/** Closes the heartbeat executor safely. */
179184
@Override
180185
public void close() {

amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,11 @@ public interface HighAvailabilityContainer {
6868
* @return true if the current AMS node is the primary node, false otherwise
6969
*/
7070
boolean hasLeadership();
71+
72+
/**
73+
* Get current AMS node information.
74+
*
75+
* @return {@link AmsServerInfo}
76+
*/
77+
AmsServerInfo getTableServiceServerInfo();
7178
}

amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,9 @@ public List<AmsServerInfo> getAliveNodes() {
6161
public boolean hasLeadership() {
6262
return false;
6363
}
64+
65+
@Override
66+
public AmsServerInfo getTableServiceServerInfo() {
67+
return null;
68+
}
6469
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server.persistence;
20+
21+
/** Simple class to hold bucketId and its table count. */
22+
public class BucketIdCount {
23+
private String bucketId;
24+
private Integer count;
25+
26+
public String getBucketId() {
27+
return bucketId;
28+
}
29+
30+
public void setBucketId(String bucketId) {
31+
this.bucketId = bucketId;
32+
}
33+
34+
public Integer getCount() {
35+
return count;
36+
}
37+
38+
public void setCount(Integer count) {
39+
this.count = count;
40+
}
41+
}

amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.amoro.server.persistence.mapper;
2020

21+
import org.apache.amoro.server.persistence.BucketIdCount;
2122
import org.apache.amoro.server.persistence.TableRuntimeMeta;
2223
import org.apache.amoro.server.persistence.TableRuntimeState;
2324
import org.apache.ibatis.annotations.Delete;
@@ -59,6 +60,17 @@ public interface TableRuntimeMapper {
5960
+ " WHERE table_id = #{tableId}")
6061
int updateRuntime(TableRuntimeMeta meta);
6162

63+
/**
64+
* Sets bucket_id only when it is still null. Avoids overwriting status/config/summary with stale
65+
* snapshots from a prior read.
66+
*/
67+
@Update(
68+
"UPDATE "
69+
+ TABLE_NAME
70+
+ " SET bucket_id = #{bucketId, jdbcType=VARCHAR} "
71+
+ "WHERE table_id = #{tableId} AND bucket_id IS NULL")
72+
int updateBucketIdIfNull(@Param("tableId") Long tableId, @Param("bucketId") String bucketId);
73+
6274
/* ---------- delete ---------- */
6375
@Delete("DELETE FROM " + TABLE_NAME + " WHERE table_id = #{tableId}")
6476
int deleteRuntime(@Param("tableId") Long tableId);
@@ -102,6 +114,29 @@ public interface TableRuntimeMapper {
102114
@ResultMap("tableRuntimeMeta")
103115
List<TableRuntimeMeta> selectAllRuntimes();
104116

117+
@Select(
118+
"<script>"
119+
+ "SELECT "
120+
+ SELECT_COLS
121+
+ "FROM "
122+
+ TABLE_NAME
123+
+ " WHERE bucket_id IN "
124+
+ "<foreach item='item' collection='bucketIds' open='(' separator=',' close=')'>"
125+
+ "#{item}"
126+
+ "</foreach>"
127+
+ "<if test='includeNullBucketId'> OR bucket_id IS NULL </if>"
128+
+ "</script>")
129+
/**
130+
* Select runtimes by bucket ids.
131+
*
132+
* @param includeNullBucketId false = only rows with bucket_id in list (master-slave); true = also
133+
* include bucket_id IS NULL (e.g. for non-master-slave compatibility)
134+
*/
135+
@ResultMap("tableRuntimeMeta")
136+
List<TableRuntimeMeta> selectRuntimesByBucketIds(
137+
@Param("bucketIds") List<String> bucketIds,
138+
@Param("includeNullBucketId") boolean includeNullBucketId);
139+
105140
@Select(
106141
"<script>"
107142
+ "<bind name=\"isMySQL\" value=\"_databaseId == 'mysql'\" />"
@@ -180,4 +215,19 @@ void saveState(
180215

181216
@Delete("DELETE FROM " + STATE_TABLE_NAME + " WHERE table_id = #{tableId}")
182217
void removeAllTableStates(@Param("tableId") long tableId);
218+
219+
/**
220+
* Count tables per bucketId. Returns a map where key is bucketId and value is the count of tables
221+
* for that bucketId. Only counts non-null and non-empty bucketIds.
222+
*/
223+
@Select(
224+
"SELECT bucket_id, COUNT(*) as table_count FROM "
225+
+ TABLE_NAME
226+
+ " WHERE bucket_id IS NOT NULL AND bucket_id != '' "
227+
+ "GROUP BY bucket_id")
228+
@Results({
229+
@Result(column = "bucket_id", property = "bucketId"),
230+
@Result(column = "table_count", property = "count")
231+
})
232+
List<BucketIdCount> countTablesByBucketId();
183233
}

0 commit comments

Comments
 (0)