Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ std::mutex s_task_signatures_mtx;
std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures;

std::atomic_ulong s_report_version(time(nullptr) * 100000);
std::atomic<int64_t> s_tablet_report_failure_start_time(0);

void increase_report_version() {
s_report_version.fetch_add(1, std::memory_order_relaxed);
Expand Down Expand Up @@ -1181,13 +1182,21 @@ void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_in
}
}

if (report_version < s_report_version) {
if (report_version < s_report_version ||
UNLIKELY(config::enable_debug_points &&
DebugPoints::instance()->is_enable(
"WorkPoolReportTablet.report_tablet_callback.skip"))) {
// TODO llj This can only reduce the possibility for report error, but can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
// report, and the report version has a small probability that it has not been updated in time. When FE
// receives this report, it is possible to delete the new tablet.
LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
int64_t expected = 0;
int64_t current_time = time(nullptr);
s_tablet_report_failure_start_time.compare_exchange_strong(expected, current_time);
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(
current_time - s_tablet_report_failure_start_time);
return;
}

Expand Down Expand Up @@ -1227,6 +1236,14 @@ void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_in
report_tablet_total << 1;
if (!succ) [[unlikely]] {
report_tablet_failed << 1;
int64_t expected = 0;
int64_t current_time = time(nullptr);
s_tablet_report_failure_start_time.compare_exchange_strong(expected, current_time);
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(
current_time - s_tablet_report_failure_start_time);
} else {
s_tablet_report_failure_start_time.store(0);
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(0);
}
}

Expand Down Expand Up @@ -1254,9 +1271,17 @@ void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* clust
}
}

if (report_version < s_report_version) {
if (report_version < s_report_version ||
UNLIKELY(config::enable_debug_points &&
DebugPoints::instance()->is_enable(
"WorkPoolCloudReportTablet.report_tablet_callback.skip"))) {
LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
int64_t expected = 0;
int64_t current_time = time(nullptr);
s_tablet_report_failure_start_time.compare_exchange_strong(expected, current_time);
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(
current_time - s_tablet_report_failure_start_time);
return;
}

Expand All @@ -1267,6 +1292,14 @@ void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* clust
report_tablet_total << 1;
if (!succ) [[unlikely]] {
report_tablet_failed << 1;
int64_t expected = 0;
int64_t current_time = time(nullptr);
s_tablet_report_failure_start_time.compare_exchange_strong(expected, current_time);
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(
current_time - s_tablet_report_failure_start_time);
} else {
s_tablet_report_failure_start_time.store(0);
DorisMetrics::instance()->tablet_report_continuous_failure_duration_s->set_value(0);
}
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/agent/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMaste
}

Status MasterServerClient::report(const TReportRequest& request, TMasterResult* result) {
#ifdef BE_TEST
result->status.__set_status_code(TStatusCode::OK);
return Status::OK();
#else
Status client_status;
FrontendServiceConnection client(_client_cache.get(), _cluster_info->master_fe_addr,
config::thrift_rpc_timeout_ms, &client_status);
Expand Down Expand Up @@ -173,6 +177,7 @@ Status MasterServerClient::report(const TReportRequest& request, TMasterResult*
}

return Status::OK();
#endif
}

Status MasterServerClient::confirm_unused_remote_files(
Expand Down
5 changes: 5 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(runtime_filter_consumer_timeout_num, Met
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(get_remote_tablet_slow_time_ms, MetricUnit::MILLISECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(get_remote_tablet_slow_cnt, MetricUnit::NOUNIT);

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_report_continuous_failure_duration_s,
MetricUnit::SECONDS);

DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_load_costs_ms, MetricUnit::MILLISECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_load_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_search_costs_ms, MetricUnit::MILLISECONDS);
Expand Down Expand Up @@ -419,6 +422,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, get_remote_tablet_slow_time_ms);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, get_remote_tablet_slow_cnt);

INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_report_continuous_failure_duration_s);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, pipeline_task_queue_size);

INT_COUNTER_METRIC_REGISTER(_server_metric_entity, ann_index_load_costs_ms);
Expand Down
3 changes: 3 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class DorisMetrics {
IntGauge* tablet_cumulative_max_compaction_score = nullptr;
IntGauge* tablet_base_max_compaction_score = nullptr;

// tablet report
IntGauge* tablet_report_continuous_failure_duration_s = nullptr;

IntGauge* all_rowsets_num = nullptr;
IntGauge* all_segments_num = nullptr;

Expand Down
30 changes: 30 additions & 0 deletions be/test/agent/task_worker_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,34 @@ TEST(TaskWorkerPoolTest, ReportWorkerPool) {
EXPECT_EQ(count.load(), 3);
}

TEST(TaskWorkerPoolTest, ReportTabletCallbackWithDebugPoint) {
bool original_enable_debug_points = config::enable_debug_points;
config::enable_debug_points = true;

ExecEnv::GetInstance()->set_storage_engine(std::make_unique<StorageEngine>(EngineOptions {}));

ClusterInfo cluster_info;
cluster_info.master_fe_addr.__set_port(9030);

Defer defer {[] { ExecEnv::GetInstance()->set_storage_engine(nullptr); }};

{
// debug point is enabled
DebugPoints::instance()->add("WorkPoolReportTablet.report_tablet_callback.skip");
EXPECT_TRUE(DebugPoints::instance()->is_enable(
"WorkPoolReportTablet.report_tablet_callback.skip"));
report_tablet_callback(ExecEnv::GetInstance()->storage_engine().to_local(), &cluster_info);
}

{
// debug point is removed
DebugPoints::instance()->remove("WorkPoolReportTablet.report_tablet_callback.skip");
EXPECT_FALSE(DebugPoints::instance()->is_enable(
"WorkPoolReportTablet.report_tablet_callback.skip"));
report_tablet_callback(ExecEnv::GetInstance()->storage_engine().to_local(), &cluster_info);
}

config::enable_debug_points = original_enable_debug_points;
}

} // namespace doris
58 changes: 58 additions & 0 deletions be/test/cloud/cloud_task_worker_pool_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "agent/task_worker_pool.h"
#include "cloud/cloud_storage_engine.h"
#include "olap/options.h"
#include "runtime/cluster_info.h"

namespace doris {

TEST(CloudTaskWorkerPoolTest, ReportTabletCallbackWithDebugPoint) {
bool original_enable_debug_points = config::enable_debug_points;
config::enable_debug_points = true;

ExecEnv::GetInstance()->set_storage_engine(
std::make_unique<CloudStorageEngine>(EngineOptions {}));

ClusterInfo cluster_info;
cluster_info.master_fe_addr.__set_port(9030);

Defer defer {[] { ExecEnv::GetInstance()->set_storage_engine(nullptr); }};

{
// debug point is enabled
DebugPoints::instance()->add("WorkPoolCloudReportTablet.report_tablet_callback.skip");
EXPECT_TRUE(DebugPoints::instance()->is_enable(
"WorkPoolCloudReportTablet.report_tablet_callback.skip"));
report_tablet_callback(ExecEnv::GetInstance()->storage_engine().to_cloud(), &cluster_info);
}

{
// debug point is removed
DebugPoints::instance()->remove("WorkPoolCloudReportTablet.report_tablet_callback.skip");
EXPECT_FALSE(DebugPoints::instance()->is_enable(
"WorkPoolCloudReportTablet.report_tablet_callback.skip"));
report_tablet_callback(ExecEnv::GetInstance()->storage_engine().to_cloud(), &cluster_info);
}

config::enable_debug_points = original_enable_debug_points;
}

} // namespace doris
1 change: 1 addition & 0 deletions be/test/util/doris_metrics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ TEST_F(DorisMetricsTest, Normal) {
EXPECT_STREQ("16", metric->to_string().c_str());
}
{
DorisMetrics::instance()->report_all_tablets_requests_skip->set_value(0);
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
auto metric = server_entity->get_metric("report_all_tablets_requests_skip",
"engine_requests_total");
Expand Down
Loading