Skip to content

Commit a89d5c1

Browse files
committed
save
1 parent 1baeaef commit a89d5c1

File tree

7 files changed

+137
-2
lines changed

7 files changed

+137
-2
lines changed

be/src/agent/task_worker_pool.cpp

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ std::mutex s_task_signatures_mtx;
111111
std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures;
112112

113113
std::atomic_ulong s_report_version(time(nullptr) * 100000);
114+
std::atomic<int64_t> s_tablet_report_failure_start_time(0);
114115

115116
void increase_report_version() {
116117
s_report_version.fetch_add(1, std::memory_order_relaxed);
@@ -1181,13 +1182,21 @@ void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_in
11811182
}
11821183
}
11831184

1184-
if (report_version < s_report_version) {
1185+
if (report_version < s_report_version ||
1186+
UNLIKELY(config::enable_debug_points &&
1187+
DebugPoints::instance()->is_enable(
1188+
"WorkPoolReportTablet.report_tablet_callback.skip"))) {
11851189
// TODO llj This can only reduce the possibility for report error, but can't avoid it.
11861190
// If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
11871191
// report, and the report version has a small probability that it has not been updated in time. When FE
11881192
// receives this report, it is possible to delete the new tablet.
11891193
LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
11901194
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1195+
int64_t expected = 0;
1196+
int64_t current_time = time(nullptr);
1197+
s_tablet_report_failure_start_time.compare_exchange_strong(expected, current_time);
1198+
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(
1199+
current_time - s_tablet_report_failure_start_time);
11911200
return;
11921201
}
11931202

@@ -1227,6 +1236,14 @@ void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_in
12271236
report_tablet_total << 1;
12281237
if (!succ) [[unlikely]] {
12291238
report_tablet_failed << 1;
1239+
int64_t expected = 0;
1240+
int64_t current_time = time(nullptr);
1241+
s_tablet_report_failure_start_time.compare_exchange_strong(expected, current_time);
1242+
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(
1243+
current_time - s_tablet_report_failure_start_time);
1244+
} else {
1245+
s_tablet_report_failure_start_time.store(0);
1246+
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(0);
12301247
}
12311248
}
12321249

@@ -1254,9 +1271,17 @@ void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* clust
12541271
}
12551272
}
12561273

1257-
if (report_version < s_report_version) {
1274+
if (report_version < s_report_version ||
1275+
UNLIKELY(config::enable_debug_points &&
1276+
DebugPoints::instance()->is_enable(
1277+
"WorkPoolCloudReportTablet.report_tablet_callback.skip"))) {
12581278
LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
12591279
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1280+
int64_t expected = 0;
1281+
int64_t current_time = time(nullptr);
1282+
s_tablet_report_failure_start_time.compare_exchange_strong(expected, current_time);
1283+
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(
1284+
current_time - s_tablet_report_failure_start_time);
12601285
return;
12611286
}
12621287

@@ -1267,6 +1292,14 @@ void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* clust
12671292
report_tablet_total << 1;
12681293
if (!succ) [[unlikely]] {
12691294
report_tablet_failed << 1;
1295+
int64_t expected = 0;
1296+
int64_t current_time = time(nullptr);
1297+
s_tablet_report_failure_start_time.compare_exchange_strong(expected, current_time);
1298+
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(
1299+
current_time - s_tablet_report_failure_start_time);
1300+
} else {
1301+
s_tablet_report_failure_start_time.store(0);
1302+
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(0);
12701303
}
12711304
}
12721305

be/src/agent/utils.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMaste
119119
}
120120

121121
Status MasterServerClient::report(const TReportRequest& request, TMasterResult* result) {
122+
#ifdef BE_TEST
123+
result->status.__set_status_code(TStatusCode::OK);
124+
return Status::OK();
125+
#else
122126
Status client_status;
123127
FrontendServiceConnection client(_client_cache.get(), _cluster_info->master_fe_addr,
124128
config::thrift_rpc_timeout_ms, &client_status);
@@ -173,6 +177,7 @@ Status MasterServerClient::report(const TReportRequest& request, TMasterResult*
173177
}
174178

175179
return Status::OK();
180+
#endif
176181
}
177182

178183
Status MasterServerClient::confirm_unused_remote_files(

be/src/util/doris_metrics.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,9 @@ DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(runtime_filter_consumer_timeout_num, Met
250250
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(get_remote_tablet_slow_time_ms, MetricUnit::MILLISECONDS);
251251
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(get_remote_tablet_slow_cnt, MetricUnit::NOUNIT);
252252

253+
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_report_continuous_failure_duration_s,
254+
MetricUnit::SECONDS);
255+
253256
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_load_costs_ms, MetricUnit::MILLISECONDS);
254257
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_load_cnt, MetricUnit::NOUNIT);
255258
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_search_costs_ms, MetricUnit::MILLISECONDS);
@@ -419,6 +422,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
419422
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, get_remote_tablet_slow_time_ms);
420423
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, get_remote_tablet_slow_cnt);
421424

425+
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_report_continuous_failure_duration_s);
426+
422427
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, pipeline_task_queue_size);
423428

424429
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, ann_index_load_costs_ms);

be/src/util/doris_metrics.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ class DorisMetrics {
155155
IntGauge* tablet_cumulative_max_compaction_score = nullptr;
156156
IntGauge* tablet_base_max_compaction_score = nullptr;
157157

158+
// tablet report
159+
IntGauge* tablet_report_continuous_failure_duration_s = nullptr;
160+
158161
IntGauge* all_rowsets_num = nullptr;
159162
IntGauge* all_segments_num = nullptr;
160163

be/test/agent/task_worker_pool_test.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,34 @@ TEST(TaskWorkerPoolTest, ReportWorkerPool) {
132132
EXPECT_EQ(count.load(), 3);
133133
}
134134

135+
TEST(TaskWorkerPoolTest, ReportTabletCallbackWithDebugPoint) {
136+
bool original_enable_debug_points = config::enable_debug_points;
137+
config::enable_debug_points = true;
138+
139+
ExecEnv::GetInstance()->set_storage_engine(std::make_unique<StorageEngine>(EngineOptions {}));
140+
141+
ClusterInfo cluster_info;
142+
cluster_info.master_fe_addr.__set_port(9030);
143+
144+
Defer defer {[] { ExecEnv::GetInstance()->set_storage_engine(nullptr); }};
145+
146+
{
147+
// debug point is enabled
148+
DebugPoints::instance()->add("WorkPoolReportTablet.report_tablet_callback.skip");
149+
EXPECT_TRUE(DebugPoints::instance()->is_enable(
150+
"WorkPoolReportTablet.report_tablet_callback.skip"));
151+
report_tablet_callback(ExecEnv::GetInstance()->storage_engine().to_local(), &cluster_info);
152+
}
153+
154+
{
155+
// debug point is removed
156+
DebugPoints::instance()->remove("WorkPoolReportTablet.report_tablet_callback.skip");
157+
EXPECT_FALSE(DebugPoints::instance()->is_enable(
158+
"WorkPoolReportTablet.report_tablet_callback.skip"));
159+
report_tablet_callback(ExecEnv::GetInstance()->storage_engine().to_local(), &cluster_info);
160+
}
161+
162+
config::enable_debug_points = original_enable_debug_points;
163+
}
164+
135165
} // namespace doris
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <gtest/gtest.h>
19+
20+
#include "agent/task_worker_pool.h"
21+
#include "cloud/cloud_storage_engine.h"
22+
#include "olap/options.h"
23+
#include "runtime/cluster_info.h"
24+
25+
namespace doris {
26+
27+
TEST(CloudTaskWorkerPoolTest, ReportTabletCallbackWithDebugPoint) {
28+
bool original_enable_debug_points = config::enable_debug_points;
29+
config::enable_debug_points = true;
30+
31+
ExecEnv::GetInstance()->set_storage_engine(
32+
std::make_unique<CloudStorageEngine>(EngineOptions {}));
33+
34+
ClusterInfo cluster_info;
35+
cluster_info.master_fe_addr.__set_port(9030);
36+
37+
Defer defer {[] { ExecEnv::GetInstance()->set_storage_engine(nullptr); }};
38+
39+
{
40+
// debug point is enabled
41+
DebugPoints::instance()->add("WorkPoolCloudReportTablet.report_tablet_callback.skip");
42+
EXPECT_TRUE(DebugPoints::instance()->is_enable(
43+
"WorkPoolCloudReportTablet.report_tablet_callback.skip"));
44+
report_tablet_callback(ExecEnv::GetInstance()->storage_engine().to_cloud(), &cluster_info);
45+
}
46+
47+
{
48+
// debug point is removed
49+
DebugPoints::instance()->remove("WorkPoolCloudReportTablet.report_tablet_callback.skip");
50+
EXPECT_FALSE(DebugPoints::instance()->is_enable(
51+
"WorkPoolCloudReportTablet.report_tablet_callback.skip"));
52+
report_tablet_callback(ExecEnv::GetInstance()->storage_engine().to_cloud(), &cluster_info);
53+
}
54+
55+
config::enable_debug_points = original_enable_debug_points;
56+
}
57+
58+
} // namespace doris

be/test/util/doris_metrics_test.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ TEST_F(DorisMetricsTest, Normal) {
108108
EXPECT_STREQ("16", metric->to_string().c_str());
109109
}
110110
{
111+
DorisMetrics::instance()->report_all_tablets_requests_skip->set_value(0);
111112
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
112113
auto metric = server_entity->get_metric("report_all_tablets_requests_skip",
113114
"engine_requests_total");

0 commit comments

Comments
 (0)