Skip to content

Commit b29c2e8

Browse files
authored
Merge branch 'master' into save-main-session-registries
2 parents f97ca55 + b65ec28 commit b29c2e8

File tree

19 files changed

+541
-542
lines changed

19 files changed

+541
-542
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
33
"pr": "36271",
4-
"modification": 34
4+
"modification": 35
55
}
66

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,11 +264,15 @@ public long add(WindowedValue<T> data) throws IOException {
264264
}
265265
}
266266

267-
Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key);
268-
if (keyedOutput == null) {
269-
keyedOutput = Windmill.KeyedMessageBundle.newBuilder().setKey(key);
270-
productionMap.put(key, keyedOutput);
271-
}
267+
Windmill.KeyedMessageBundle.Builder keyedOutput =
268+
productionMap.computeIfAbsent(
269+
key,
270+
(k) -> {
271+
Windmill.KeyedMessageBundle.Builder builder =
272+
Windmill.KeyedMessageBundle.newBuilder();
273+
builder.setKey(k);
274+
return builder;
275+
});
272276

273277
try {
274278
messageBuilder

sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.concurrent.atomic.AtomicLong;
2222
import org.apache.beam.sdk.util.Weighted;
23+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;
2324

2425
/** Facade for a {@link List<T>} that keeps track of weight, for cache limit reasons. */
2526
public class WeightedList<T> implements Weighted {
@@ -71,14 +72,6 @@ public void addAll(List<T> values, long weight) {
7172
}
7273

7374
public void accumulateWeight(long weight) {
74-
this.weight.accumulateAndGet(
75-
weight,
76-
(first, second) -> {
77-
try {
78-
return Math.addExact(first, second);
79-
} catch (ArithmeticException e) {
80-
return Long.MAX_VALUE;
81-
}
82-
});
75+
this.weight.accumulateAndGet(weight, LongMath::saturatedAdd);
8376
}
8477
}

sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ private static class RestrictionTrackerWithProgress extends RestrictionTracker<O
102102
implements HasProgress {
103103
private boolean blockTryClaim;
104104
private boolean blockTrySplit;
105-
private boolean isBlocked;
105+
private volatile boolean isBlocked;
106106
public static final Progress REPORT_PROGRESS = Progress.from(2.0, 3.0);
107107

108108
public RestrictionTrackerWithProgress() {

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
5050
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
5151
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
52+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
53+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;
5254

5355
/**
5456
* Adapters which convert a logical series of chunks using continuation tokens over the Beam Fn
@@ -249,15 +251,11 @@ static class BlocksPrefix<T> extends Blocks<T> implements Shrinkable<BlocksPrefi
249251

250252
@Override
251253
public long getWeight() {
252-
try {
253-
long sum = 8 + blocks.size() * 8L;
254-
for (Block<T> block : blocks) {
255-
sum = Math.addExact(sum, block.getWeight());
256-
}
257-
return sum;
258-
} catch (ArithmeticException e) {
259-
return Long.MAX_VALUE;
254+
long sum = 8 + blocks.size() * 8L;
255+
for (Block<T> block : blocks) {
256+
sum = LongMath.saturatedAdd(sum, block.getWeight());
260257
}
258+
return sum;
261259
}
262260

263261
BlocksPrefix(List<Block<T>> blocks) {
@@ -282,8 +280,7 @@ public List<Block<T>> getBlocks() {
282280

283281
@AutoValue
284282
abstract static class Block<T> implements Weighted {
285-
private static final Block<Void> EMPTY =
286-
fromValues(WeightedList.of(Collections.emptyList(), 0), null);
283+
private static final Block<Void> EMPTY = fromValues(ImmutableList.of(), 0, null);
287284

288285
@SuppressWarnings("unchecked") // Based upon as Collections.emptyList()
289286
public static <T> Block<T> emptyBlock() {
@@ -299,21 +296,37 @@ public static <T> Block<T> mutatedBlock(WeightedList<T> values) {
299296
}
300297

301298
public static <T> Block<T> fromValues(List<T> values, @Nullable ByteString nextToken) {
302-
return fromValues(WeightedList.of(values, Caches.weigh(values)), nextToken);
299+
if (values.isEmpty() && nextToken == null) {
300+
return emptyBlock();
301+
}
302+
ImmutableList<T> immutableValues = ImmutableList.copyOf(values);
303+
long listWeight = immutableValues.size() * Caches.REFERENCE_SIZE;
304+
for (T value : immutableValues) {
305+
listWeight = LongMath.saturatedAdd(listWeight, Caches.weigh(value));
306+
}
307+
return fromValues(immutableValues, listWeight, nextToken);
303308
}
304309

305310
public static <T> Block<T> fromValues(
306311
WeightedList<T> values, @Nullable ByteString nextToken) {
307-
long weight = values.getWeight() + 24;
312+
if (values.isEmpty() && nextToken == null) {
313+
return emptyBlock();
314+
}
315+
return fromValues(ImmutableList.copyOf(values.getBacking()), values.getWeight(), nextToken);
316+
}
317+
318+
private static <T> Block<T> fromValues(
319+
ImmutableList<T> values, long listWeight, @Nullable ByteString nextToken) {
320+
long weight = LongMath.saturatedAdd(listWeight, 24);
308321
if (nextToken != null) {
309322
if (nextToken.isEmpty()) {
310323
nextToken = ByteString.EMPTY;
311324
} else {
312-
weight += Caches.weigh(nextToken);
325+
weight = LongMath.saturatedAdd(weight, Caches.weigh(nextToken));
313326
}
314327
}
315328
return new AutoValue_StateFetchingIterators_CachingStateIterable_Block<>(
316-
values.getBacking(), nextToken, weight);
329+
values, nextToken, weight);
317330
}
318331

319332
abstract List<T> getValues();
@@ -372,10 +385,12 @@ public void remove(Set<Object> toRemoveStructuralValues) {
372385
totalSize += tBlock.getValues().size();
373386
}
374387

375-
WeightedList<T> allValues = WeightedList.of(new ArrayList<>(totalSize), 0L);
388+
ImmutableList.Builder<T> allValues = ImmutableList.builderWithExpectedSize(totalSize);
389+
long weight = 0;
390+
List<T> blockValuesToKeep = new ArrayList<>();
376391
for (Block<T> block : blocks) {
392+
blockValuesToKeep.clear();
377393
boolean valueRemovedFromBlock = false;
378-
List<T> blockValuesToKeep = new ArrayList<>();
379394
for (T value : block.getValues()) {
380395
if (!toRemoveStructuralValues.contains(valueCoder.structuralValue(value))) {
381396
blockValuesToKeep.add(value);
@@ -387,13 +402,19 @@ public void remove(Set<Object> toRemoveStructuralValues) {
387402
// If any value was removed from this block, need to estimate the weight again.
388403
// Otherwise, just reuse the block's weight.
389404
if (valueRemovedFromBlock) {
390-
allValues.addAll(blockValuesToKeep, Caches.weigh(block.getValues()));
405+
allValues.addAll(blockValuesToKeep);
406+
for (T value : blockValuesToKeep) {
407+
weight = LongMath.saturatedAdd(weight, Caches.weigh(value));
408+
}
391409
} else {
392-
allValues.addAll(block.getValues(), block.getWeight());
410+
allValues.addAll(block.getValues());
411+
weight = LongMath.saturatedAdd(weight, block.getWeight());
393412
}
394413
}
395414

396-
cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues)));
415+
cache.put(
416+
IterableCacheKey.INSTANCE,
417+
new MutatedBlocks<>(Block.fromValues(allValues.build(), weight, null)));
397418
}
398419

399420
/**
@@ -484,21 +505,24 @@ private void appendHelper(List<T> newValues, long newWeight) {
484505
for (Block<T> block : blocks) {
485506
totalSize += block.getValues().size();
486507
}
487-
WeightedList<T> allValues = WeightedList.of(new ArrayList<>(totalSize), 0L);
508+
ImmutableList.Builder<T> allValues = ImmutableList.builderWithExpectedSize(totalSize);
509+
long weight = 0;
488510
for (Block<T> block : blocks) {
489-
allValues.addAll(block.getValues(), block.getWeight());
511+
allValues.addAll(block.getValues());
512+
weight = LongMath.saturatedAdd(weight, block.getWeight());
490513
}
491514
if (newWeight < 0) {
492-
if (newValues.size() == 1) {
493-
// Optimize weighing of the common value state as single single-element bag state.
494-
newWeight = Caches.weigh(newValues.get(0));
495-
} else {
496-
newWeight = Caches.weigh(newValues);
515+
newWeight = 0;
516+
for (T value : newValues) {
517+
newWeight = LongMath.saturatedAdd(newWeight, Caches.weigh(value));
497518
}
498519
}
499-
allValues.addAll(newValues, newWeight);
520+
allValues.addAll(newValues);
521+
weight = LongMath.saturatedAdd(weight, newWeight);
500522

501-
cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues)));
523+
cache.put(
524+
IterableCacheKey.INSTANCE,
525+
new MutatedBlocks<>(Block.fromValues(allValues.build(), weight, null)));
502526
}
503527

504528
class CachingStateIterator implements PrefetchableIterator<T> {
@@ -580,8 +604,7 @@ public boolean hasNext() {
580604
return false;
581605
}
582606
// Release the block while we are loading the next one.
583-
currentBlock =
584-
Block.fromValues(WeightedList.of(Collections.emptyList(), 0L), ByteString.EMPTY);
607+
currentBlock = Block.emptyBlock();
585608

586609
@Nullable Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
587610
boolean isFirstBlock = ByteString.EMPTY.equals(nextToken);

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.joda.time.Duration;
7171
import org.junit.After;
7272
import org.junit.Before;
73+
import org.junit.Ignore;
7374
import org.junit.Rule;
7475
import org.junit.Test;
7576
import org.junit.rules.ExpectedException;
@@ -115,6 +116,7 @@ public void tearDown() throws NoSuchFieldException, IllegalAccessException {
115116
}
116117

117118
@Test
119+
@Ignore("https://github.com/apache/beam/issues/37002 Re-enable skipped tests.")
118120
// Error code UNAVAILABLE is retried repeatedly until the RPC times out.
119121
public void testUnavailableExceptionRetries() throws InterruptedException {
120122
DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
@@ -155,6 +157,7 @@ public void testUnavailableExceptionRetries() throws InterruptedException {
155157
}
156158

157159
@Test
160+
@Ignore("https://github.com/apache/beam/issues/37002 Re-enable skipped tests.")
158161
// Error code ABORTED is retried repeatedly until it times out.
159162
public void testAbortedExceptionRetries() throws InterruptedException {
160163
mockSpannerService.setExecuteStreamingSqlExecutionTime(
@@ -218,6 +221,7 @@ public void testUnknownExceptionDoesNotRetry() {
218221
}
219222

220223
@Test
224+
@Ignore("https://github.com/apache/beam/issues/37002 Re-enable skipped tests.")
221225
// Error code RESOURCE_EXHAUSTED is retried repeatedly.
222226
public void testResourceExhaustedRetry() {
223227
mockSpannerService.setExecuteStreamingSqlExecutionTime(
@@ -281,6 +285,7 @@ public void testResourceExhaustedRetryWithDefaultSettings() {
281285
}
282286

283287
@Test
288+
@Ignore("https://github.com/apache/beam/issues/37002 Re-enable skipped tests.")
284289
public void testInvalidRecordReceived() {
285290
final Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos(0, 1000);
286291
final Timestamp endTimestamp =

sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.apache.beam.sdk.io.jms.CommonJms.toSerializableFunction;
2626
import static org.apache.beam.sdk.io.jms.JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME;
2727
import static org.apache.beam.sdk.io.jms.JmsIO.Writer.PUBLICATION_RETRIES_METRIC_NAME;
28+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
2829
import static org.hamcrest.CoreMatchers.allOf;
2930
import static org.hamcrest.MatcherAssert.assertThat;
3031
import static org.hamcrest.Matchers.contains;
@@ -86,6 +87,7 @@
8687
import org.apache.beam.sdk.coders.Coder;
8788
import org.apache.beam.sdk.coders.SerializableCoder;
8889
import org.apache.beam.sdk.coders.StringUtf8Coder;
90+
import org.apache.beam.sdk.io.UnboundedSource;
8991
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
9092
import org.apache.beam.sdk.io.jms.JmsIO.UnboundedJmsReader;
9193
import org.apache.beam.sdk.metrics.MetricNameFilter;
@@ -541,6 +543,16 @@ public void testSplitForTopic() throws Exception {
541543
assertEquals(1, splits.size());
542544
}
543545

546+
private boolean advanceWithRetry(UnboundedSource.UnboundedReader reader) throws IOException {
547+
for (int attempt = 0; attempt < 10; attempt++) {
548+
if (reader.advance()) {
549+
return true;
550+
}
551+
sleepUninterruptibly(java.time.Duration.ofMillis(100));
552+
}
553+
return false;
554+
}
555+
544556
@Test
545557
public void testCheckpointMark() throws Exception {
546558
// we are using no prefetch here
@@ -558,7 +570,7 @@ public void testCheckpointMark() throws Exception {
558570

559571
// consume 3 messages (NB: start already consumed the first message)
560572
for (int i = 0; i < 3; i++) {
561-
assertTrue(String.format("Failed at %d-th message", i), reader.advance());
573+
assertTrue(String.format("Failed at %d-th message", i), advanceWithRetry(reader));
562574
}
563575

564576
// the messages are still pending in the queue (no ACK yet)
@@ -572,7 +584,7 @@ public void testCheckpointMark() throws Exception {
572584

573585
// we read the 6 pending messages
574586
for (int i = 0; i < 6; i++) {
575-
assertTrue(String.format("Failed at %d-th message", i), reader.advance());
587+
assertTrue(String.format("Failed at %d-th message", i), advanceWithRetry(reader));
576588
}
577589

578590
// still 6 pending messages as we didn't finalize the checkpoint
@@ -592,8 +604,8 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
592604
assertTrue(reader.start());
593605

594606
// consume 2 message (NB: start already consumed the first message)
595-
assertTrue(reader.advance());
596-
assertTrue(reader.advance());
607+
assertTrue(advanceWithRetry(reader));
608+
assertTrue(advanceWithRetry(reader));
597609

598610
// get checkpoint mark after consumed 4 messages
599611
CheckpointMark mark = reader.getCheckpointMark();
@@ -724,7 +736,7 @@ public void testCheckpointMarkSafety() throws Exception {
724736

725737
// consume half the messages (NB: start already consumed the first message)
726738
for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
727-
assertTrue(reader.advance());
739+
assertTrue(advanceWithRetry(reader));
728740
}
729741

730742
// the messages are still pending in the queue (no ACK yet)
@@ -738,7 +750,7 @@ public void testCheckpointMarkSafety() throws Exception {
738750
() -> {
739751
try {
740752
for (int i = 0; i < messagesToProcess / 2; i++) {
741-
assertTrue(reader.advance());
753+
assertTrue(advanceWithRetry(reader));
742754
}
743755
} catch (IOException ex) {
744756
throw new RuntimeException(ex);
@@ -877,7 +889,7 @@ public void testDiscardCheckpointMark() throws Exception {
877889

878890
// consume 3 more messages (NB: start already consumed the first message)
879891
for (int i = 0; i < 3; i++) {
880-
assertTrue(reader.advance());
892+
assertTrue(advanceWithRetry(reader));
881893
}
882894

883895
// the messages are still pending in the queue (no ACK yet)
@@ -891,7 +903,7 @@ public void testDiscardCheckpointMark() throws Exception {
891903

892904
// we read the 6 pending messages
893905
for (int i = 0; i < 6; i++) {
894-
assertTrue(reader.advance());
906+
assertTrue(advanceWithRetry(reader));
895907
}
896908

897909
// still 6 pending messages as we didn't finalize the checkpoint

0 commit comments

Comments
 (0)