Skip to content

Commit

Permalink
Fix BuildEventStreamer to atomically post Configuration events whil…
Browse files Browse the repository at this point in the history
…e recording the configuration posted.

In the Build Event Protocol, any number of events may reference a single
configuration. For example, every `TargetComplete` event will reference its
configuration and most builds will have many targets and only a small handful of
configurations.

To simplify BEP consumption we aim to report an event's Configuration before we
report the event itself. We also do not want to report a copy of the same
Configuration for every single event that references it. So we only post the
configuration the first time we see it referenced.

We need to combine "check if this is the first time we saw the configuration,
recording if it is" with "actually post this configuration" in a single atomic
unit. Otherwise, when two targets finish in separate threads and reference the
same configuration, a race occurs.

RELNOTES: n/a
PiperOrigin-RevId: 716809640
Change-Id: I559a1d85e06d82144f98f24681ee357704ef5f5c
  • Loading branch information
michaeledgar authored and copybara-github committed Jan 17, 2025
1 parent cf701eb commit 5653583
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,10 @@ private void maybeReportConfiguration(@Nullable BuildEvent configuration) {
BuildEvent event = configuration == null ? NullConfiguration.INSTANCE : configuration;
BuildEventId id = event.getEventId();
synchronized (this) {
if (configurationsPosted.contains(id)) {
return;
if (configurationsPosted.add(id)) {
post(event);
}
configurationsPosted.add(id);
}
post(event);
}

@Subscribe
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/google/devtools/build/lib/runtime/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/analysis:config/fragment_factory",
"//src/main/java/com/google/devtools/build/lib/analysis:config/fragment_options",
"//src/main/java/com/google/devtools/build/lib/analysis:config/fragment_registry",
"//src/main/java/com/google/devtools/build/lib/analysis:config/invalid_configuration_exception",
"//src/main/java/com/google/devtools/build/lib/analysis:configured_target",
"//src/main/java/com/google/devtools/build/lib/analysis:server_directories",
"//src/main/java/com/google/devtools/build/lib/analysis:test/test_configuration",
Expand Down Expand Up @@ -112,7 +113,6 @@ java_library(
"//src/test/java/com/google/devtools/build/lib/testutil:TestConstants",
"//src/test/java/com/google/devtools/build/lib/testutil:TestThread",
"//src/test/java/com/google/devtools/build/lib/testutil:TestUtils",
"//src/test/java/com/google/devtools/build/lib/vfs/util",
"//src/test/java/com/google/devtools/common/options:testutils",
"//third_party:flogger",
"//third_party:guava",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static com.google.common.truth.extensions.proto.ProtoTruth.assertThat;
import static com.google.devtools.build.lib.bugreport.BugReport.constructOomExitMessage;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -47,6 +48,7 @@
import com.google.devtools.build.lib.analysis.config.CoreOptions;
import com.google.devtools.build.lib.analysis.config.FragmentFactory;
import com.google.devtools.build.lib.analysis.config.FragmentRegistry;
import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
Expand All @@ -60,6 +62,7 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted.AbortReason;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEventId;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEventId.IdCase;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEventId.NamedSetOfFilesId;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransportClosedEvent;
Expand Down Expand Up @@ -89,6 +92,7 @@
import com.google.devtools.build.lib.vfs.Root;
import com.google.devtools.common.options.Options;
import com.google.devtools.common.options.OptionsBase;
import com.google.devtools.common.options.OptionsParsingException;
import com.google.devtools.common.options.OptionsParsingResult;
import com.google.testing.junit.testparameterinjector.TestParameter;
import com.google.testing.junit.testparameterinjector.TestParameterInjector;
Expand All @@ -100,7 +104,9 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -781,32 +787,27 @@ public void testArtifactSetsPrecedeReportingEvent() throws InterruptedException

streamer.buildEvent(startEvent);
// Publish `numEvents` different events that all report the same NamedSet of artifacts on
// `numEvents` different threads. Use latches to ensure:
// `numEvents` different threads. Use a CyclicBarrier and latch to ensure:
//
// 1. all threads have started, before:
// 2. all threads send their event, before:
// 3. verifying the recorded events.
CountDownLatch readyToPublishLatch = new CountDownLatch(numEvents);
CountDownLatch startPublishingLatch = new CountDownLatch(1);
CyclicBarrier readyToPublishLatch = new CyclicBarrier(numEvents);
CountDownLatch donePublishingLatch = new CountDownLatch(numEvents);
for (int i = 0; i < numEvents; i++) {
int num = i;
BuildEvent reportingArtifacts = eventsToPost.get(i);
new Thread(
() -> {
try {
BuildEvent reportingArtifacts = eventsToPost.get(num);
readyToPublishLatch.countDown();
startPublishingLatch.await();
readyToPublishLatch.await();
streamer.buildEvent(reportingArtifacts);
} catch (InterruptedException e) {
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
donePublishingLatch.countDown();
})
.start();
}
readyToPublishLatch.await();
startPublishingLatch.countDown();
donePublishingLatch.await();

assertThat(streamer.isClosed()).isFalse();
Expand Down Expand Up @@ -955,39 +956,10 @@ public void testConsumeAsPairs() {
@Test
public void testReportedConfigurations() throws Exception {
// Verify that configuration events are posted, but only once.
BuildOptions defaultBuildOptions = BuildOptions.of(ImmutableList.of(CoreOptions.class));
BuildEvent startEvent =
new GenericBuildEvent(
testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
BuildConfigurationValue configuration =
BuildConfigurationValue.createForTesting(
defaultBuildOptions,
"some_mnemonic",
"workspace",
/* siblingRepositoryLayout= */ false,
new BlazeDirectories(
new ServerDirectories(outputBase, outputBase, outputBase),
rootDirectory,
/* defaultSystemJavabase= */ null,
"productName"),
new BuildConfigurationValue.GlobalStateProvider() {
@Override
public ActionEnvironment getActionEnvironment(BuildOptions buildOptions) {
return ActionEnvironment.EMPTY;
}

@Override
public FragmentRegistry getFragmentRegistry() {
return FragmentRegistry.create(
ImmutableList.of(), ImmutableList.of(), ImmutableList.of());
}

@Override
public ImmutableSet<String> getReservedActionMnemonics() {
return ImmutableSet.of();
}
},
new FragmentFactory());
BuildConfigurationValue configuration = makeTestingBuildConfigurationValue();
BuildEvent firstWithConfiguration =
new GenericConfigurationEvent(testId("first"), configuration.toBuildEvent());
BuildEvent secondWithConfiguration =
Expand All @@ -1009,6 +981,62 @@ public ImmutableSet<String> getReservedActionMnemonics() {
assertThat(allEventsSeen.get(6)).isEqualTo(secondWithConfiguration);
}

@Test
public void testReportedConfigurations_concurrent() throws Exception {
// Verify that configuration events are posted, but only once.
BuildEvent startEvent =
new GenericBuildEvent(
testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));
BuildConfigurationValue configuration = makeTestingBuildConfigurationValue();

int numEvents = 100;
List<BuildEvent> eventsToPost = new ArrayList<>();
for (int i = 0; i < numEvents; i++) {
eventsToPost.add(
new GenericConfigurationEvent(testId("has_config_" + i), configuration.toBuildEvent()));
}

streamer.buildEvent(startEvent);
// Publish `numEvents` different events that all report the same configuration on `numEvents`
// different threads. Use a CyclicBarrier and latch to ensure:
//
// 1. all threads have started, before:
// 2. all threads send their event, before:
// 3. verifying the recorded events.
CyclicBarrier readyToPublishLatch = new CyclicBarrier(numEvents);
CountDownLatch donePublishingLatch = new CountDownLatch(numEvents);
for (int i = 0; i < numEvents; i++) {
BuildEvent hasConfigEvent = eventsToPost.get(i);
new Thread(
() -> {
try {
readyToPublishLatch.await();
streamer.buildEvent(hasConfigEvent);
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
donePublishingLatch.countDown();
})
.start();
}
donePublishingLatch.await();

assertThat(streamer.isClosed()).isFalse();

List<BuildEvent> allEventsSeen = transport.getEvents();

// Two events for each GenericConfigurationEvent: a progress event announcing it and the
// actual GenericConfigurationEvent itself.
assertThat(allEventsSeen).hasSize(3 + (numEvents * 2));
assertThat(allEventsSeen.get(0).getEventId()).isEqualTo(startEvent.getEventId());
assertThat(allEventsSeen.get(1).getEventId()).isEqualTo(ProgressEvent.INITIAL_PROGRESS_UPDATE);
assertThat(allEventsSeen.get(2)).isEqualTo(configuration.toBuildEvent());
for (int idx = 3; idx < allEventsSeen.size(); idx++) {
assertThat(allEventsSeen.get(idx).getEventId().getIdCase())
.isNotEqualTo(IdCase.CONFIGURATION);
}
}

@Test
public void testEarlyFlush() {
// Verify that the streamer can handle early calls to flush() and still correctly
Expand Down Expand Up @@ -1791,6 +1819,38 @@ private OptionsParsingResult createMockOptions() {
return options;
}

private BuildConfigurationValue makeTestingBuildConfigurationValue()
throws InvalidConfigurationException, OptionsParsingException {
return BuildConfigurationValue.createForTesting(
BuildOptions.of(ImmutableList.of(CoreOptions.class)),
"some_mnemonic",
"workspace",
/* siblingRepositoryLayout= */ false,
new BlazeDirectories(
new ServerDirectories(outputBase, outputBase, outputBase),
rootDirectory,
/* defaultSystemJavabase= */ null,
"productName"),
new BuildConfigurationValue.GlobalStateProvider() {
@Override
public ActionEnvironment getActionEnvironment(BuildOptions buildOptions) {
return ActionEnvironment.EMPTY;
}

@Override
public FragmentRegistry getFragmentRegistry() {
return FragmentRegistry.create(
ImmutableList.of(), ImmutableList.of(), ImmutableList.of());
}

@Override
public ImmutableSet<String> getReservedActionMnemonics() {
return ImmutableSet.of();
}
},
new FragmentFactory());
}

private static DetailedExitCode createGenericDetailedExitCode() {
return DetailedExitCode.of(
FailureDetail.newBuilder()
Expand Down

0 comments on commit 5653583

Please sign in to comment.