diff --git a/protos/perfetto/trace/perfetto/tracing_service_event.proto b/protos/perfetto/trace/perfetto/tracing_service_event.proto index c7e3698762..dbc4df188c 100644 --- a/protos/perfetto/trace/perfetto/tracing_service_event.proto +++ b/protos/perfetto/trace/perfetto/tracing_service_event.proto @@ -46,6 +46,9 @@ message TracingServiceEvent { // sources have been recording events. bool all_data_sources_started = 1; + // Emitted when a flush is started. + bool flush_started = 9; + // Emitted when all data sources have been flushed successfully or with an // error (including timeouts). This can generally happen many times over the // course of the trace. diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto index df04bec611..78b58a97b7 100644 --- a/protos/perfetto/trace/perfetto_trace.proto +++ b/protos/perfetto/trace/perfetto_trace.proto @@ -13442,6 +13442,9 @@ message TracingServiceEvent { // sources have been recording events. bool all_data_sources_started = 1; + // Emitted when a flush is started. + bool flush_started = 9; + // Emitted when all data sources have been flushed successfully or with an // error (including timeouts). This can generally happen many times over the // course of the trace. diff --git a/src/tracing/service/tracing_service_impl.cc b/src/tracing/service/tracing_service_impl.cc index eb1baf8129..40fa60e68d 100644 --- a/src/tracing/service/tracing_service_impl.cc +++ b/src/tracing/service/tracing_service_impl.cc @@ -1901,6 +1901,11 @@ void TracingServiceImpl::Flush(TracingSessionID tsid, return; } + SnapshotLifecyleEvent( + tracing_session, + protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber, + false /* snapshot_clocks */); + std::map> data_source_instances; for (const auto& [producer_id, ds_inst] : tracing_session->data_source_instances) { @@ -4021,6 +4026,9 @@ base::Status TracingServiceImpl::FlushAndCloneSession( } } + SnapshotLifecyleEvent( + session, protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber, + false /* snapshot_clocks */); clone_op.pending_flush_cnt = bufs_groups.size(); for (const std::set& buf_group : bufs_groups) { FlushDataSourceInstances( @@ -4963,12 +4971,15 @@ TracingServiceImpl::TracingSession::TracingSession( config(new_config), snapshot_periodic_task(task_runner), timed_stop_task(task_runner) { - // all_data_sources_flushed is special because we store up to 64 events of - // this type. Other events will go through the default case in + // all_data_sources_flushed (and flush_started) is special because we store up + // to 64 events of this type. Other events will go through the default case in // SnapshotLifecycleEvent() where they will be given a max history of 1. lifecycle_events.emplace_back( protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber, 64 /* max_size */); + lifecycle_events.emplace_back( + protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber, + 64 /* max_size */); } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/tracing/service/tracing_service_impl_unittest.cc b/src/tracing/service/tracing_service_impl_unittest.cc index caf8b221f0..a69e650daf 100644 --- a/src/tracing/service/tracing_service_impl_unittest.cc +++ b/src/tracing/service/tracing_service_impl_unittest.cc @@ -4171,11 +4171,14 @@ TEST_F(TracingServiceImplTest, LifecycleMultipleFlushEventsQueued) { ASSERT_TRUE(flush_request.WaitForReply()); auto packets = consumer->ReadBuffers(); - uint32_t count = 0; + uint32_t flush_started_count = 0; + uint32_t flush_done_count = 0; for (const auto& packet : packets) { - count += packet.service_event().all_data_sources_flushed(); + flush_started_count += packet.service_event().flush_started(); + flush_done_count += packet.service_event().all_data_sources_flushed(); } - ASSERT_EQ(count, 2u); + EXPECT_EQ(flush_started_count, 2u); + EXPECT_EQ(flush_done_count, 2u); consumer->DisableTracing(); producer->WaitForDataSourceStop("data_source"); @@ -5809,4 +5812,145 @@ TEST_F(TracingServiceImplTest, DetachDurationTimeoutFreeBuffers) { task_runner.RunUntilCheckpoint(on_attach_name); } +TEST_F(TracingServiceImplTest, SlowStartingDataSources) { + std::unique_ptr consumer = CreateMockConsumer(); + consumer->Connect(svc.get()); + + std::unique_ptr producer = CreateMockProducer(); + producer->Connect(svc.get(), "mock_producer"); + producer->RegisterDataSource("data_source1", /*ack_stop=*/false, + /*ack_start=*/true); + producer->RegisterDataSource("data_source2", /*ack_stop=*/false, + /*ack_start=*/true); + producer->RegisterDataSource("data_source3", /*ack_stop=*/false, + /*ack_start=*/true); + + TraceConfig trace_config; + trace_config.add_buffers()->set_size_kb(128); + trace_config.add_data_sources()->mutable_config()->set_name("data_source1"); + trace_config.add_data_sources()->mutable_config()->set_name("data_source2"); + trace_config.add_data_sources()->mutable_config()->set_name("data_source3"); + consumer->EnableTracing(trace_config); + + producer->WaitForTracingSetup(); + producer->WaitForDataSourceSetup("data_source1"); + producer->WaitForDataSourceSetup("data_source2"); + producer->WaitForDataSourceSetup("data_source3"); + + producer->WaitForDataSourceStart("data_source1"); + producer->WaitForDataSourceStart("data_source2"); + producer->WaitForDataSourceStart("data_source3"); + + DataSourceInstanceID id1 = producer->GetDataSourceInstanceId("data_source1"); + DataSourceInstanceID id3 = producer->GetDataSourceInstanceId("data_source3"); + + producer->endpoint()->NotifyDataSourceStarted(id1); + producer->endpoint()->NotifyDataSourceStarted(id3); + + // This matches kAllDataSourceStartedTimeout. + AdvanceTimeAndRunUntilIdle(20000); + + consumer->DisableTracing(); + producer->WaitForDataSourceStop("data_source1"); + producer->WaitForDataSourceStop("data_source2"); + producer->WaitForDataSourceStop("data_source3"); + consumer->WaitForTracingDisabled(); + + std::vector packets = consumer->ReadBuffers(); + EXPECT_THAT( + packets, + Contains(Property( + &protos::gen::TracePacket::service_event, + Property( + &protos::gen::TracingServiceEvent::slow_starting_data_sources, + Property( + &protos::gen::TracingServiceEvent::DataSources::data_source, + ElementsAre( + Property(&protos::gen::TracingServiceEvent::DataSources:: + DataSource::data_source_name, + "data_source2"))))))); +} + +TEST_F(TracingServiceImplTest, FlushTimeoutEventsEmitted) { + std::unique_ptr consumer = CreateMockConsumer(); + consumer->Connect(svc.get()); + + std::unique_ptr producer = CreateMockProducer(); + producer->Connect(svc.get(), "mock_producer1"); + producer->RegisterDataSource("ds_1"); + + TraceConfig trace_config; + trace_config.add_buffers()->set_size_kb(1024); // Buf 0. + auto* ds_cfg = trace_config.add_data_sources()->mutable_config(); + ds_cfg->set_name("ds_1"); + ds_cfg->set_target_buffer(0); + + consumer->EnableTracing(trace_config); + producer->WaitForTracingSetup(); + producer->WaitForDataSourceSetup("ds_1"); + producer->WaitForDataSourceStart("ds_1"); + + std::unique_ptr writer1 = producer->CreateTraceWriter("ds_1"); + + // Do not reply to Flush. + std::string producer_flush1_checkpoint_name = "producer_flush1_requested"; + auto flush1_requested = + task_runner.CreateCheckpoint(producer_flush1_checkpoint_name); + EXPECT_CALL(*producer, Flush).WillOnce(Invoke(flush1_requested)); + consumer->Flush(5000, FlushFlags(FlushFlags::Initiator::kTraced, + FlushFlags::Reason::kTraceStop)); + + task_runner.RunUntilCheckpoint(producer_flush1_checkpoint_name); + + AdvanceTimeAndRunUntilIdle(5000); + + // ReadBuffers returns a last_flush_slow_data_source event. + std::vector packets = consumer->ReadBuffers(); + EXPECT_THAT( + packets, + Contains(Property( + &protos::gen::TracePacket::service_event, + Property( + &protos::gen::TracingServiceEvent::last_flush_slow_data_sources, + Property( + &protos::gen::TracingServiceEvent::DataSources::data_source, + ElementsAre( + Property(&protos::gen::TracingServiceEvent::DataSources:: + DataSource::data_source_name, + "ds_1"))))))); + + // Reply to Flush. + std::string producer_flush2_checkpoint_name = "producer_flush2_requested"; + auto flush2_requested = + task_runner.CreateCheckpoint(producer_flush2_checkpoint_name); + FlushRequestID flush2_req_id; + EXPECT_CALL(*producer, Flush(_, _, _, _)) + .WillOnce([&](FlushRequestID req_id, const DataSourceInstanceID*, size_t, + FlushFlags) { + flush2_req_id = req_id; + flush2_requested(); + }); + consumer->Flush(5000, FlushFlags(FlushFlags::Initiator::kTraced, + FlushFlags::Reason::kTraceStop)); + + task_runner.RunUntilCheckpoint(producer_flush2_checkpoint_name); + + producer->endpoint()->NotifyFlushComplete(flush2_req_id); + + AdvanceTimeAndRunUntilIdle(5000); + + // ReadBuffers returns a last_flush_slow_data_source event. + packets = consumer->ReadBuffers(); + EXPECT_THAT( + packets, + Not(Contains(Property(&protos::gen::TracePacket::service_event, + Property(&protos::gen::TracingServiceEvent:: + has_last_flush_slow_data_sources, + Eq(true)))))); + + consumer->DisableTracing(); + producer->WaitForDataSourceStop("ds_1"); + consumer->WaitForTracingDisabled(); +} + } // namespace perfetto