Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit c41d154

Browse files
jkffdavorbonaci
authored andcommitted
Continues unifying ReaderIterator and Source.Reader
* Flattens the ReaderIterator hierarchy a bit: removes Abstract{Bounded,}ReaderIterator, instead merges them all into LegacyReaderIterator directly. * Converts InMemoryReaderIterator, PubsubReaderIterator, UngroupedWindmillReaderIterator, ConcatIterator, and a couple of anonymous classes to the new interface. File-based and shuffle readers remain. * Extracts a common base class from Pubsub and UngroupedWindmill readers. * Converts most call sites that could use Source.Reader instead of ReaderIterator to use either Source.Reader, or the concrete class of the reader, to ease future conversion. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=112728940
1 parent baa8e2f commit c41d154

30 files changed

+641
-361
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroByteReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
2222
import com.google.cloud.dataflow.sdk.util.CoderUtils;
2323
import com.google.cloud.dataflow.sdk.util.WindowedValue;
24-
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
2524
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
2625

2726
import org.apache.avro.Schema;
@@ -51,11 +50,11 @@ public AvroByteReader(String filename, @Nullable Long startPosition, @Nullable L
5150
}
5251

5352
@Override
54-
public NativeReaderIterator<T> iterator() throws IOException {
53+
public AvroByteFileIterator iterator() throws IOException {
5554
return new AvroByteFileIterator();
5655
}
5756

58-
class AvroByteFileIterator extends AbstractBoundedReaderIterator<T> {
57+
class AvroByteFileIterator extends LegacyReaderIterator<T> {
5958
private final LegacyReaderIterator<WindowedValue<ByteBuffer>> avroFileIterator;
6059

6160
public AvroByteFileIterator() throws IOException {

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.google.cloud.dataflow.sdk.io.OffsetBasedSource;
2727
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
2828
import com.google.cloud.dataflow.sdk.util.WindowedValue;
29-
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
3029
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
3130

3231
import org.apache.avro.generic.GenericRecord;
@@ -78,7 +77,7 @@ public AvroReader(String filename, @Nullable Long startPosition, @Nullable Long
7877
}
7978

8079
@Override
81-
public LegacyReaderIterator<WindowedValue<T>> iterator() throws IOException {
80+
public AvroFileIterator iterator() throws IOException {
8281
Long endPosition = this.endPosition;
8382
Long startPosition = this.startPosition;
8483
if (endPosition == null) {
@@ -99,7 +98,7 @@ public LegacyReaderIterator<WindowedValue<T>> iterator() throws IOException {
9998
return new AvroFileIterator((AvroSource.AvroReader<T>) reader);
10099
}
101100

102-
class AvroFileIterator extends AbstractBoundedReaderIterator<WindowedValue<T>> {
101+
class AvroFileIterator extends LegacyReaderIterator<WindowedValue<T>> {
103102
final AvroSource.AvroReader<T> reader;
104103
boolean hasStarted = false;
105104
long blockOffset = -1;

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/BigQueryReader.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator;
2626
import com.google.cloud.dataflow.sdk.util.Transport;
2727
import com.google.cloud.dataflow.sdk.util.WindowedValue;
28-
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
2928
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
3029
import com.google.common.annotations.VisibleForTesting;
3130

@@ -113,8 +112,7 @@ public BigQueryReaderIterator iterator() throws IOException {
113112
* A ReaderIterator that yields TableRow objects for each row of a BigQuery table.
114113
*/
115114
@VisibleForTesting
116-
static class BigQueryReaderIterator
117-
extends AbstractBoundedReaderIterator<WindowedValue<TableRow>> {
115+
static class BigQueryReaderIterator extends LegacyReaderIterator<WindowedValue<TableRow>> {
118116
private BigQueryTableRowIterator rowIterator;
119117

120118
public BigQueryReaderIterator(TableReference tableRef, Bigquery bigQueryClient) {

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ConcatReader.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.DataflowReaderProgress;
3131
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
3232
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
33-
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
3433
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
34+
import com.google.common.annotations.VisibleForTesting;
3535
import com.google.common.base.Preconditions;
3636

3737
import org.slf4j.Logger;
@@ -42,6 +42,8 @@
4242
import java.util.List;
4343
import java.util.NoSuchElementException;
4444

45+
import javax.annotation.Nullable;
46+
4547
/**
4648
* A {@link NativeReader} that reads elements from a given set of encoded {@link Source}s. Creates
4749
* {@link NativeReader}s for sources lazily, i.e. only when elements from the particular
@@ -96,7 +98,7 @@ public Iterator<Source> getSources() {
9698
}
9799

98100
@Override
99-
public LegacyReaderIterator<T> iterator() throws IOException {
101+
public ConcatIterator<T> iterator() throws IOException {
100102
return new ConcatIterator<T>(
101103
registry,
102104
options,
@@ -106,16 +108,16 @@ public LegacyReaderIterator<T> iterator() throws IOException {
106108
sources);
107109
}
108110

109-
private static class ConcatIterator<T> extends AbstractBoundedReaderIterator<T> {
111+
@VisibleForTesting
112+
static class ConcatIterator<T> extends NativeReaderIterator<T> {
110113
private int currentIteratorIndex = -1;
111-
private LegacyReaderIterator<T> currentIterator = null;
114+
@Nullable private NativeReaderIterator<T> currentIterator = null;
112115
private final List<Source> sources;
113116
private final PipelineOptions options;
114117
private final ExecutionContext executionContext;
115118
private final CounterSet.AddCounterMutator addCounterMutator;
116119
private final String operationName;
117120
private final OffsetRangeTracker rangeTracker;
118-
private boolean isAtFirstRecordInCurrentSource = true;
119121
private final ReaderFactory.Registry registry;
120122

121123
public ConcatIterator(
@@ -135,16 +137,29 @@ public ConcatIterator(
135137
}
136138

137139
@Override
138-
protected boolean hasNextImpl() throws IOException {
139-
for (;;) {
140-
if (currentIterator != null && currentIterator.hasNext()) {
141-
break;
142-
}
140+
public boolean start() throws IOException {
141+
return advance();
142+
}
143143

144+
@Override
145+
public boolean advance() throws IOException {
146+
while (true) {
147+
// Invariant: we call currentIterator.start() immediately when opening an iterator
148+
// (below). So if currentIterator != null, then start() has already been called on it.
149+
if (currentIterator != null && currentIterator.advance()) {
150+
// Happy case: current iterator has a next record.
151+
return true;
152+
}
153+
// Now current iterator is either non-existent or exhausted.
154+
// Close it, and try opening a new one.
144155
if (currentIterator != null) {
145156
currentIterator.close();
157+
currentIterator = null;
146158
}
147159

160+
if (!rangeTracker.tryReturnRecordAt(true, currentIteratorIndex + 1)) {
161+
return false;
162+
}
148163
currentIteratorIndex++;
149164
if (currentIteratorIndex == sources.size()) {
150165
// All sources were read.
@@ -158,20 +173,26 @@ protected boolean hasNextImpl() throws IOException {
158173
(NativeReader<T>)
159174
registry.create(
160175
currentSource, options, executionContext, addCounterMutator, operationName);
161-
currentIterator = (LegacyReaderIterator) currentReader.iterator();
162-
isAtFirstRecordInCurrentSource = true;
176+
currentIterator = currentReader.iterator();
163177
} catch (Exception e) {
164178
throw new IOException("Failed to create a reader for source: " + currentSource, e);
165179
}
180+
if (!currentIterator.start()) {
181+
currentIterator.close();
182+
currentIterator = null;
183+
continue;
184+
}
185+
// Happy case: newly opened iterator has a first record.
186+
return true;
166187
}
167-
168-
return rangeTracker.tryReturnRecordAt(isAtFirstRecordInCurrentSource, currentIteratorIndex);
169188
}
170189

171190
@Override
172-
protected T nextImpl() throws IOException, NoSuchElementException {
173-
isAtFirstRecordInCurrentSource = false;
174-
return currentIterator.next();
191+
public T getCurrent() throws NoSuchElementException {
192+
if (currentIterator == null) {
193+
throw new NoSuchElementException();
194+
}
195+
return currentIterator.getCurrent();
175196
}
176197

177198
@Override

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObservableIterator;
4141
import com.google.cloud.dataflow.sdk.util.common.Reiterable;
4242
import com.google.cloud.dataflow.sdk.util.common.Reiterator;
43-
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
4443
import com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntryReader;
4544
import com.google.cloud.dataflow.sdk.util.common.worker.GroupingShuffleEntryIterator;
4645
import com.google.cloud.dataflow.sdk.util.common.worker.KeyGroupedShuffleEntries;
@@ -193,7 +192,7 @@ final GroupingShuffleReaderIterator<K, V> iterator(ShuffleEntryReader reader) {
193192
*/
194193
@VisibleForTesting
195194
static final class GroupingShuffleReaderIterator<K, V>
196-
extends AbstractBoundedReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> {
195+
extends LegacyReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> {
197196
// The enclosing GroupingShuffleReader.
198197
private final GroupingShuffleReader<K, V> parentReader;
199198

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/InMemoryReader.java

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,25 @@
2020
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudPositionToReaderPosition;
2121
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
2222
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.splitRequestToApproximateSplitRequest;
23-
import static java.lang.Math.min;
23+
import static com.google.common.base.MoreObjects.firstNonNull;
24+
import static com.google.common.base.Preconditions.checkArgument;
2425

2526
import com.google.api.services.dataflow.model.ApproximateReportedProgress;
2627
import com.google.cloud.dataflow.sdk.coders.Coder;
28+
import com.google.cloud.dataflow.sdk.coders.CoderException;
2729
import com.google.cloud.dataflow.sdk.io.range.OffsetRangeTracker;
2830
import com.google.cloud.dataflow.sdk.util.CoderUtils;
2931
import com.google.cloud.dataflow.sdk.util.StringUtils;
30-
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
3132
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
3233
import com.google.common.annotations.VisibleForTesting;
34+
import com.google.common.base.Preconditions;
3335

3436
import org.slf4j.Logger;
3537
import org.slf4j.LoggerFactory;
3638

3739
import java.io.IOException;
3840
import java.util.List;
41+
import java.util.NoSuchElementException;
3942

4043
import javax.annotation.Nullable;
4144

@@ -52,26 +55,20 @@ public class InMemoryReader<T> extends NativeReader<T> {
5255
final int endIndex;
5356
final Coder<T> coder;
5457

55-
public InMemoryReader(List<String> encodedElements, @Nullable Long startIndex,
56-
@Nullable Long endIndex, Coder<T> coder) {
58+
public InMemoryReader(
59+
List<String> encodedElements,
60+
@Nullable Integer startIndex,
61+
@Nullable Integer endIndex,
62+
Coder<T> coder) {
63+
checkNotNull(encodedElements);
5764
this.encodedElements = encodedElements;
5865
int maxIndex = encodedElements.size();
59-
if (startIndex == null) {
60-
this.startIndex = 0;
61-
} else {
62-
if (startIndex < 0) {
63-
throw new IllegalArgumentException("start index should be >= 0");
64-
}
65-
this.startIndex = (int) min(startIndex, maxIndex);
66-
}
67-
if (endIndex == null) {
68-
this.endIndex = maxIndex;
69-
} else {
70-
if (endIndex < this.startIndex) {
71-
throw new IllegalArgumentException("end index should be >= start index");
72-
}
73-
this.endIndex = (int) min(endIndex, maxIndex);
74-
}
66+
this.startIndex = Math.min(maxIndex, firstNonNull(startIndex, 0));
67+
this.endIndex = Math.min(maxIndex, firstNonNull(endIndex, maxIndex));
68+
checkArgument(this.startIndex >= 0, "negative start index: " + startIndex);
69+
checkArgument(
70+
this.endIndex >= this.startIndex,
71+
"end index before start: [" + this.startIndex + ", " + this.endIndex + ")");
7572
this.coder = coder;
7673
}
7774

@@ -88,24 +85,45 @@ public double getTotalParallelism() {
8885
/**
8986
* A ReaderIterator that yields an in-memory list of elements.
9087
*/
91-
class InMemoryReaderIterator extends AbstractBoundedReaderIterator<T> {
88+
class InMemoryReaderIterator extends NativeReaderIterator<T> {
9289
@VisibleForTesting
9390
OffsetRangeTracker tracker;
94-
private int nextIndex;
91+
@Nullable private Integer lastReturnedIndex;
92+
private T current;
9593

9694
public InMemoryReaderIterator() {
9795
this.tracker = new OffsetRangeTracker(startIndex, endIndex);
98-
this.nextIndex = startIndex;
96+
this.lastReturnedIndex = null;
9997
}
10098

10199
@Override
102-
protected boolean hasNextImpl() {
103-
return tracker.tryReturnRecordAt(true, nextIndex);
100+
public boolean start() throws IOException {
101+
Preconditions.checkState(lastReturnedIndex == null, "Already started");
102+
if (!tracker.tryReturnRecordAt(true, startIndex)) {
103+
return false;
104+
}
105+
current = decode(encodedElements.get(startIndex));
106+
lastReturnedIndex = startIndex;
107+
return true;
104108
}
105109

106110
@Override
107-
protected T nextImpl() throws IOException {
108-
String encodedElementString = encodedElements.get(nextIndex++);
111+
public boolean advance() throws IOException {
112+
Preconditions.checkNotNull(lastReturnedIndex, "Not started");
113+
if (!tracker.tryReturnRecordAt(true, (long) lastReturnedIndex + 1)) {
114+
return false;
115+
}
116+
++lastReturnedIndex;
117+
current = decode(encodedElements.get(lastReturnedIndex));
118+
return true;
119+
}
120+
121+
@Override
122+
public T getCurrent() throws NoSuchElementException {
123+
return current;
124+
}
125+
126+
private T decode(String encodedElementString) throws CoderException {
109127
// TODO: Replace with the real encoding used by the
110128
// front end, when we know what it is.
111129
byte[] encodedElement = StringUtils.jsonStringToByteArray(encodedElementString);
@@ -115,12 +133,15 @@ protected T nextImpl() throws IOException {
115133

116134
@Override
117135
public Progress getProgress() {
136+
if (lastReturnedIndex == null) {
137+
return null;
138+
}
118139
// Currently we assume that only a record index position is reported as
119140
// current progress. An implementer can override this method to update
120141
// other metrics, e.g. completion percentage or remaining time.
121142
com.google.api.services.dataflow.model.Position currentPosition =
122143
new com.google.api.services.dataflow.model.Position();
123-
currentPosition.setRecordIndex((long) nextIndex);
144+
currentPosition.setRecordIndex((long) lastReturnedIndex);
124145

125146
ApproximateReportedProgress progress = new ApproximateReportedProgress();
126147
progress.setPosition(currentPosition);
@@ -130,7 +151,8 @@ public Progress getProgress() {
130151

131152
@Override
132153
public double getRemainingParallelism() {
133-
return tracker.getStopPosition() - nextIndex;
154+
// Use the starting index if no elements have yet been returned.
155+
return tracker.getStopPosition() - firstNonNull(lastReturnedIndex, startIndex);
134156
}
135157

136158
@Override

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/InMemoryReaderFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud.dataflow.sdk.runners.worker;
1818

19-
import static com.google.cloud.dataflow.sdk.util.Structs.getLong;
19+
import static com.google.cloud.dataflow.sdk.util.Structs.getInt;
2020
import static com.google.cloud.dataflow.sdk.util.Structs.getStrings;
2121

2222
import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -51,7 +51,8 @@ public NativeReader<?> create(
5151
<T> InMemoryReader<T> create(CloudObject spec, Coder<T> coder) throws Exception {
5252
return new InMemoryReader<>(
5353
getStrings(spec, PropertyNames.ELEMENTS, Collections.<String>emptyList()),
54-
getLong(spec, PropertyNames.START_INDEX, null),
55-
getLong(spec, PropertyNames.END_INDEX, null), coder);
54+
getInt(spec, PropertyNames.START_INDEX, null),
55+
getInt(spec, PropertyNames.END_INDEX, null),
56+
coder);
5657
}
5758
}

0 commit comments

Comments
 (0)