Skip to content

Commit 4d20bc8

Browse files
branch-3.1: [fix](external catalog) Distinguish between functions used to refresh cache and change catalog properties #56639 (#57399)
Cherry-picked from #56639 Co-authored-by: zy-kkk <[email protected]>
1 parent 168863f commit 4d20bc8

File tree

7 files changed

+183
-33
lines changed

7 files changed

+183
-33
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.doris.datasource.ExternalDatabase;
2727
import org.apache.doris.datasource.ExternalObjectLog;
2828
import org.apache.doris.datasource.ExternalTable;
29-
import org.apache.doris.datasource.InternalCatalog;
3029
import org.apache.doris.datasource.hive.HMSExternalTable;
3130
import org.apache.doris.persist.OperationType;
3231

@@ -70,11 +69,11 @@ public void replayRefreshCatalog(CatalogLog log) {
7069
}
7170

7271
private void refreshCatalogInternal(CatalogIf catalog, boolean invalidCache) {
73-
String catalogName = catalog.getName();
74-
if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
75-
((ExternalCatalog) catalog).resetToUninitialized(invalidCache);
76-
LOG.info("refresh catalog {} with invalidCache {}", catalogName, invalidCache);
72+
if (catalog.isInternalCatalog()) {
73+
return;
7774
}
75+
((ExternalCatalog) catalog).onRefreshCache(invalidCache);
76+
LOG.info("refresh catalog {} with invalidCache {}", catalog.getName(), invalidCache);
7877
}
7978

8079
// Refresh database
@@ -114,7 +113,7 @@ public void replayRefreshDb(ExternalObjectLog log) {
114113
}
115114

116115
private void refreshDbInternal(ExternalDatabase db) {
117-
db.resetToUninitialized();
116+
db.resetMetaToUninitialized();
118117
LOG.info("refresh database {} in catalog {}", db.getFullName(), db.getCatalog().getName());
119118
}
120119

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ private void buildMetaCache() {
360360
localDbName -> Optional.ofNullable(
361361
buildDbForInit(null, localDbName, Util.genIdByName(name, localDbName), logType,
362362
true)),
363-
(key, value, cause) -> value.ifPresent(v -> v.resetToUninitialized()));
363+
(key, value, cause) -> value.ifPresent(v -> v.resetMetaToUninitialized()));
364364
}
365365
}
366366

@@ -581,29 +581,36 @@ public synchronized void resetToUninitialized(boolean invalidCache) {
581581
this.cachedConf = null;
582582
}
583583
onClose();
584-
585-
refreshOnlyCatalogCache(invalidCache);
584+
onRefreshCache(invalidCache);
586585
}
587586

588-
// Only for hms event handling.
589-
public void onRefreshCache() {
590-
refreshOnlyCatalogCache(true);
587+
/**
588+
* Refresh both meta cache and catalog cache.
589+
*
590+
* @param invalidCache
591+
*/
592+
public void onRefreshCache(boolean invalidCache) {
593+
refreshMetaCacheOnly();
594+
if (invalidCache) {
595+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
596+
}
591597
}
592598

593-
private void refreshOnlyCatalogCache(boolean invalidCache) {
599+
/**
600+
* Refresh meta cache only (database level cache), without invalidating catalog level cache.
601+
* This method is safe to call within synchronized block.
602+
*/
603+
private void refreshMetaCacheOnly() {
594604
if (useMetaCache.isPresent()) {
595605
if (useMetaCache.get() && metaCache != null) {
596606
metaCache.invalidateAll();
597607
} else if (!useMetaCache.get()) {
598608
this.initialized = false;
599609
for (ExternalDatabase<? extends ExternalTable> db : idToDb.values()) {
600-
db.resetToUninitialized();
610+
db.resetMetaToUninitialized();
601611
}
602612
}
603613
}
604-
if (invalidCache) {
605-
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
606-
}
607614
}
608615

609616
public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) {
@@ -1492,8 +1499,6 @@ public void dropTag(TableIf dorisTable, DropTagInfo tagInfo) throws UserExceptio
14921499
public void resetMetaCacheNames() {
14931500
if (useMetaCache.isPresent() && useMetaCache.get() && metaCache != null) {
14941501
metaCache.resetNames();
1495-
} else {
1496-
resetToUninitialized(true);
14971502
}
14981503
}
14991504

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,19 +135,21 @@ public void setTableExtCatalog(ExternalCatalog extCatalog) {
135135
}
136136
}
137137

138-
public synchronized void resetToUninitialized() {
138+
public void resetMetaToUninitialized() {
139139
if (LOG.isDebugEnabled()) {
140140
LOG.debug("resetToUninitialized db name {}, id {}, isInitializing: {}, initialized: {}",
141141
this.name, this.id, isInitializing, initialized, new Exception());
142142
}
143-
this.initialized = false;
144-
this.lowerCaseToTableName = Maps.newConcurrentMap();
145-
if (extCatalog.getUseMetaCache().isPresent()) {
146-
if (extCatalog.getUseMetaCache().get() && metaCache != null) {
147-
metaCache.invalidateAll();
148-
} else if (!extCatalog.getUseMetaCache().get()) {
149-
for (T table : idToTbl.values()) {
150-
table.unsetObjectCreated();
143+
synchronized (this) {
144+
this.initialized = false;
145+
this.lowerCaseToTableName = Maps.newConcurrentMap();
146+
if (extCatalog.getUseMetaCache().isPresent()) {
147+
if (extCatalog.getUseMetaCache().get() && metaCache != null) {
148+
metaCache.invalidateAll();
149+
} else if (!extCatalog.getUseMetaCache().get()) {
150+
for (T table : idToTbl.values()) {
151+
table.unsetObjectCreated();
152+
}
151153
}
152154
}
153155
}
@@ -900,7 +902,7 @@ public void resetMetaCacheNames() {
900902
if (extCatalog.getUseMetaCache().isPresent() && extCatalog.getUseMetaCache().get() && metaCache != null) {
901903
metaCache.resetNames();
902904
} else {
903-
resetToUninitialized();
905+
resetMetaToUninitialized();
904906
}
905907
}
906908
}

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private void realRun() {
139139
} catch (MetastoreNotificationFetchException e) {
140140
LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e);
141141
} catch (Exception ex) {
142-
hmsExternalCatalog.onRefreshCache();
142+
hmsExternalCatalog.onRefreshCache(true);
143143
updateLastSyncedEventId(hmsExternalCatalog, -1);
144144
LOG.warn("Failed to process hive metastore [{}] events .",
145145
hmsExternalCatalog.getName(), ex);

fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ public void testRefreshCatalogLastUpdateTime() throws Exception {
157157
} catch (Exception e) {
158158
// Do nothing
159159
}
160-
// after refresh, the catalog will be set to uninitialized
161-
Assertions.assertFalse(((ExternalCatalog) test2).isInitialized());
160+
// after refresh, the catalog will NOT be set to uninitialized
161+
Assertions.assertTrue(((ExternalCatalog) test2).isInitialized());
162162
// call get table to trigger catalog initialization
163163
table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get();
164164
Assertions.assertTrue(((ExternalCatalog) test2).isInitialized());

regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ suite("test_hive_use_meta_cache_false", "p0,external,hive,external_docker,extern
170170
// can not see
171171
order_qt_sql13 "show databases like '%${database_hive}%'";
172172
}
173-
test_use_meta_cache(false)
173+
// test_use_meta_cache(false)
174174
} finally {
175175
}
176176
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_iceberg_insert_refresh", "p0,external,iceberg,polaris,external_docker,external_docker_polaris") {
19+
String enabled = context.config.otherConfigs.get("enableIcebergTest")
20+
if (enabled != null && enabled.equalsIgnoreCase("true")) {
21+
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
22+
23+
String polaris_port = context.config.otherConfigs.get("polaris_rest_uri_port")
24+
String minio_port = context.config.otherConfigs.get("polaris_minio_port")
25+
26+
String iceberg_catalog_name = "test_iceberg_insert_refresh"
27+
sql """drop catalog if exists ${iceberg_catalog_name}"""
28+
sql """create catalog if not exists ${iceberg_catalog_name} properties (
29+
'type'='iceberg',
30+
'warehouse' = 'doris_test',
31+
'iceberg.catalog.type'='rest',
32+
'iceberg.rest.uri' = 'http://${externalEnvIp}:${polaris_port}/api/catalog',
33+
'iceberg.rest.security.type' = 'oauth2',
34+
'iceberg.rest.oauth2.credential' = 'root:secret123',
35+
'iceberg.rest.oauth2.server-uri' = 'http://${externalEnvIp}:${polaris_port}/api/catalog/v1/oauth/tokens',
36+
'iceberg.rest.oauth2.scope' = 'PRINCIPAL_ROLE:ALL',
37+
's3.access_key' = 'admin',
38+
's3.secret_key' = 'password',
39+
's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
40+
's3.region' = 'us-east-1'
41+
)"""
42+
43+
sql """switch ${iceberg_catalog_name}"""
44+
sql """create database if not exists db_iceberg_insert_refresh"""
45+
sql """use db_iceberg_insert_refresh"""
46+
sql """drop table if exists taxis"""
47+
sql """CREATE TABLE taxis
48+
(
49+
vendor_id BIGINT,
50+
trip_id BIGINT,
51+
trip_distance FLOAT,
52+
fare_amount DOUBLE,
53+
store_and_fwd_flag STRING,
54+
ts DATETIME
55+
)
56+
PARTITION BY LIST (vendor_id, DAY(ts)) ()
57+
PROPERTIES (
58+
"compression-codec" = "zstd",
59+
"write-format" = "parquet"
60+
);"""
61+
String insert_sql = """INSERT OVERWRITE TABLE ${iceberg_catalog_name}.db_iceberg_insert_refresh.taxis
62+
VALUES
63+
(1, 1000371, 1.8, 15.32, 'N', '2024-01-01 9:15:23'),
64+
(2, 1000372, 2.5, 22.15, 'N', '2024-01-02 12:10:11'),
65+
(2, 1000373, 0.9, 9.01, 'N', '2024-01-01 3:25:15'),
66+
(1, 1000374, 8.4, 42.13, 'Y', '2024-01-03 7:12:33');"""
67+
68+
String refresh_sql = """REFRESH CATALOG ${iceberg_catalog_name};"""
69+
70+
// Simple concurrent test: 10 inserts + refresh, each insert must complete within 1 minute
71+
72+
def insertCount = 10
73+
def insertTimeoutMs = 60000L // 1 minute per insert
74+
75+
def insertCompleted = false
76+
def insertException = null
77+
78+
logger.info("Starting concurrent insert and refresh test")
79+
80+
// Insert task: run 10 inserts, fail if any takes >1min
81+
def insertTask = {
82+
try {
83+
for (int i = 1; i <= insertCount; i++) {
84+
def start = System.currentTimeMillis()
85+
sql insert_sql
86+
def duration = System.currentTimeMillis() - start
87+
88+
if (duration > insertTimeoutMs) {
89+
throw new RuntimeException("Insert ${i} took ${duration}ms > ${insertTimeoutMs}ms")
90+
}
91+
92+
logger.info("Insert ${i} completed in ${duration}ms")
93+
Thread.sleep(100)
94+
}
95+
insertCompleted = true
96+
} catch (Exception e) {
97+
insertException = e
98+
}
99+
}
100+
101+
// Refresh task: keep refreshing while inserts are running
102+
def refreshTask = {
103+
while (!insertCompleted && insertException == null) {
104+
try {
105+
sql refresh_sql
106+
Thread.sleep(200)
107+
} catch (Exception e) {
108+
logger.warn("Refresh failed: ${e.message}")
109+
Thread.sleep(200)
110+
}
111+
}
112+
}
113+
114+
// Start both tasks
115+
def insertThread = Thread.start(insertTask)
116+
def refreshThread = Thread.start(refreshTask)
117+
118+
// Wait for insert thread with 1 minute total timeout
119+
insertThread.join(60000) // 1 minute total
120+
121+
// Force stop both threads if still running
122+
if (insertThread.isAlive()) {
123+
insertThread.interrupt()
124+
}
125+
refreshThread.interrupt()
126+
127+
// Check results
128+
if (insertException != null) {
129+
throw new RuntimeException("Test failed: ${insertException.message}")
130+
}
131+
132+
if (!insertCompleted) {
133+
throw new RuntimeException("Test failed: Inserts did not complete within 1 minute")
134+
}
135+
136+
logger.info("✅ Test PASSED - All ${insertCount} inserts completed within timeout")
137+
138+
// Cleanup
139+
sql """drop table if exists taxis"""
140+
sql """drop database if exists db_iceberg_insert_refresh"""
141+
sql """drop catalog if exists ${iceberg_catalog_name}"""
142+
143+
}
144+
}

0 commit comments

Comments
 (0)