Skip to content

Commit

Permalink
Merge changes I07f322b8,I9ab3c4be,I26ed486e into main
Browse files Browse the repository at this point in the history
* changes:
  tracing_service_impl: Emit lifecycle event when flush starts
  tracing_service_impl_unittest: Test slow to flush data sources
  tracing_service_impl_unittest: Test slow to start data sources
  • Loading branch information
Treehugger Robot authored and Gerrit Code Review committed Oct 11, 2024
2 parents 90886fd + 6e7483a commit 84aa746
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 5 deletions.
3 changes: 3 additions & 0 deletions protos/perfetto/trace/perfetto/tracing_service_event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ message TracingServiceEvent {
// sources have been recording events.
bool all_data_sources_started = 1;

// Emitted when a flush is started.
bool flush_started = 9;

// Emitted when all data sources have been flushed successfully or with an
// error (including timeouts). This can generally happen many times over the
// course of the trace.
Expand Down
3 changes: 3 additions & 0 deletions protos/perfetto/trace/perfetto_trace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13442,6 +13442,9 @@ message TracingServiceEvent {
// sources have been recording events.
bool all_data_sources_started = 1;

// Emitted when a flush is started.
bool flush_started = 9;

// Emitted when all data sources have been flushed successfully or with an
// error (including timeouts). This can generally happen many times over the
// course of the trace.
Expand Down
15 changes: 13 additions & 2 deletions src/tracing/service/tracing_service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,11 @@ void TracingServiceImpl::Flush(TracingSessionID tsid,
return;
}

SnapshotLifecyleEvent(
tracing_session,
protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
false /* snapshot_clocks */);

std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
for (const auto& [producer_id, ds_inst] :
tracing_session->data_source_instances) {
Expand Down Expand Up @@ -4021,6 +4026,9 @@ base::Status TracingServiceImpl::FlushAndCloneSession(
}
}

SnapshotLifecyleEvent(
session, protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
false /* snapshot_clocks */);
clone_op.pending_flush_cnt = bufs_groups.size();
for (const std::set<BufferID>& buf_group : bufs_groups) {
FlushDataSourceInstances(
Expand Down Expand Up @@ -4963,12 +4971,15 @@ TracingServiceImpl::TracingSession::TracingSession(
config(new_config),
snapshot_periodic_task(task_runner),
timed_stop_task(task_runner) {
// all_data_sources_flushed is special because we store up to 64 events of
// this type. Other events will go through the default case in
// all_data_sources_flushed (and flush_started) is special because we store up
// to 64 events of this type. Other events will go through the default case in
// SnapshotLifecycleEvent() where they will be given a max history of 1.
lifecycle_events.emplace_back(
protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
64 /* max_size */);
lifecycle_events.emplace_back(
protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
64 /* max_size */);
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
150 changes: 147 additions & 3 deletions src/tracing/service/tracing_service_impl_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4171,11 +4171,14 @@ TEST_F(TracingServiceImplTest, LifecycleMultipleFlushEventsQueued) {
ASSERT_TRUE(flush_request.WaitForReply());

auto packets = consumer->ReadBuffers();
uint32_t count = 0;
uint32_t flush_started_count = 0;
uint32_t flush_done_count = 0;
for (const auto& packet : packets) {
count += packet.service_event().all_data_sources_flushed();
flush_started_count += packet.service_event().flush_started();
flush_done_count += packet.service_event().all_data_sources_flushed();
}
ASSERT_EQ(count, 2u);
EXPECT_EQ(flush_started_count, 2u);
EXPECT_EQ(flush_done_count, 2u);

consumer->DisableTracing();
producer->WaitForDataSourceStop("data_source");
Expand Down Expand Up @@ -5809,4 +5812,145 @@ TEST_F(TracingServiceImplTest, DetachDurationTimeoutFreeBuffers) {
task_runner.RunUntilCheckpoint(on_attach_name);
}

TEST_F(TracingServiceImplTest, SlowStartingDataSources) {
std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
consumer->Connect(svc.get());

std::unique_ptr<MockProducer> producer = CreateMockProducer();
producer->Connect(svc.get(), "mock_producer");
producer->RegisterDataSource("data_source1", /*ack_stop=*/false,
/*ack_start=*/true);
producer->RegisterDataSource("data_source2", /*ack_stop=*/false,
/*ack_start=*/true);
producer->RegisterDataSource("data_source3", /*ack_stop=*/false,
/*ack_start=*/true);

TraceConfig trace_config;
trace_config.add_buffers()->set_size_kb(128);
trace_config.add_data_sources()->mutable_config()->set_name("data_source1");
trace_config.add_data_sources()->mutable_config()->set_name("data_source2");
trace_config.add_data_sources()->mutable_config()->set_name("data_source3");
consumer->EnableTracing(trace_config);

producer->WaitForTracingSetup();
producer->WaitForDataSourceSetup("data_source1");
producer->WaitForDataSourceSetup("data_source2");
producer->WaitForDataSourceSetup("data_source3");

producer->WaitForDataSourceStart("data_source1");
producer->WaitForDataSourceStart("data_source2");
producer->WaitForDataSourceStart("data_source3");

DataSourceInstanceID id1 = producer->GetDataSourceInstanceId("data_source1");
DataSourceInstanceID id3 = producer->GetDataSourceInstanceId("data_source3");

producer->endpoint()->NotifyDataSourceStarted(id1);
producer->endpoint()->NotifyDataSourceStarted(id3);

// This matches kAllDataSourceStartedTimeout.
AdvanceTimeAndRunUntilIdle(20000);

consumer->DisableTracing();
producer->WaitForDataSourceStop("data_source1");
producer->WaitForDataSourceStop("data_source2");
producer->WaitForDataSourceStop("data_source3");
consumer->WaitForTracingDisabled();

std::vector<protos::gen::TracePacket> packets = consumer->ReadBuffers();
EXPECT_THAT(
packets,
Contains(Property(
&protos::gen::TracePacket::service_event,
Property(
&protos::gen::TracingServiceEvent::slow_starting_data_sources,
Property(
&protos::gen::TracingServiceEvent::DataSources::data_source,
ElementsAre(
Property(&protos::gen::TracingServiceEvent::DataSources::
DataSource::data_source_name,
"data_source2")))))));
}

TEST_F(TracingServiceImplTest, FlushTimeoutEventsEmitted) {
std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
consumer->Connect(svc.get());

std::unique_ptr<MockProducer> producer = CreateMockProducer();
producer->Connect(svc.get(), "mock_producer1");
producer->RegisterDataSource("ds_1");

TraceConfig trace_config;
trace_config.add_buffers()->set_size_kb(1024); // Buf 0.
auto* ds_cfg = trace_config.add_data_sources()->mutable_config();
ds_cfg->set_name("ds_1");
ds_cfg->set_target_buffer(0);

consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
producer->WaitForDataSourceSetup("ds_1");
producer->WaitForDataSourceStart("ds_1");

std::unique_ptr<TraceWriter> writer1 = producer->CreateTraceWriter("ds_1");

// Do not reply to Flush.
std::string producer_flush1_checkpoint_name = "producer_flush1_requested";
auto flush1_requested =
task_runner.CreateCheckpoint(producer_flush1_checkpoint_name);
EXPECT_CALL(*producer, Flush).WillOnce(Invoke(flush1_requested));
consumer->Flush(5000, FlushFlags(FlushFlags::Initiator::kTraced,
FlushFlags::Reason::kTraceStop));

task_runner.RunUntilCheckpoint(producer_flush1_checkpoint_name);

AdvanceTimeAndRunUntilIdle(5000);

// ReadBuffers returns a last_flush_slow_data_source event.
std::vector<protos::gen::TracePacket> packets = consumer->ReadBuffers();
EXPECT_THAT(
packets,
Contains(Property(
&protos::gen::TracePacket::service_event,
Property(
&protos::gen::TracingServiceEvent::last_flush_slow_data_sources,
Property(
&protos::gen::TracingServiceEvent::DataSources::data_source,
ElementsAre(
Property(&protos::gen::TracingServiceEvent::DataSources::
DataSource::data_source_name,
"ds_1")))))));

// Reply to Flush.
std::string producer_flush2_checkpoint_name = "producer_flush2_requested";
auto flush2_requested =
task_runner.CreateCheckpoint(producer_flush2_checkpoint_name);
FlushRequestID flush2_req_id;
EXPECT_CALL(*producer, Flush(_, _, _, _))
.WillOnce([&](FlushRequestID req_id, const DataSourceInstanceID*, size_t,
FlushFlags) {
flush2_req_id = req_id;
flush2_requested();
});
consumer->Flush(5000, FlushFlags(FlushFlags::Initiator::kTraced,
FlushFlags::Reason::kTraceStop));

task_runner.RunUntilCheckpoint(producer_flush2_checkpoint_name);

producer->endpoint()->NotifyFlushComplete(flush2_req_id);

AdvanceTimeAndRunUntilIdle(5000);

// ReadBuffers returns a last_flush_slow_data_source event.
packets = consumer->ReadBuffers();
EXPECT_THAT(
packets,
Not(Contains(Property(&protos::gen::TracePacket::service_event,
Property(&protos::gen::TracingServiceEvent::
has_last_flush_slow_data_sources,
Eq(true))))));

consumer->DisableTracing();
producer->WaitForDataSourceStop("ds_1");
consumer->WaitForTracingDisabled();
}

} // namespace perfetto

0 comments on commit 84aa746

Please sign in to comment.