Skip to content

Commit 49650ce

Browse files
committed
fixed(feat): broken AVRO when using excludedColumns flag
1 parent 362c8f5 commit 49650ce

File tree

8 files changed

+128
-47
lines changed

8 files changed

+128
-47
lines changed

dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilder.java

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -125,23 +125,27 @@ public int hashCode() {
125125
private final List<String> whereConditions;
126126
private final Optional<String> limitStr;
127127
private final Optional<ImmutableSet<String>> excludedColumns;
128+
private final Optional<String> splitColumn;
128129

129130
private QueryBuilder(final QueryBase base) {
130131
this.base = base;
131132
this.limitStr = Optional.empty();
132133
this.whereConditions = ImmutableList.of();
133134
this.excludedColumns = Optional.empty();
135+
this.splitColumn = Optional.empty();
134136
}
135137

136138
private QueryBuilder(
137139
final QueryBase base,
138140
final List<String> whereConditions,
139141
final Optional<String> limitStr,
140-
final Optional<ImmutableSet<String>> excludedColumns) {
142+
final Optional<ImmutableSet<String>> excludedColumns,
143+
final Optional<String> splitColumn) {
141144
this.base = base;
142145
this.whereConditions = whereConditions;
143146
this.limitStr = limitStr;
144147
this.excludedColumns = excludedColumns;
148+
this.splitColumn = splitColumn;
145149
}
146150

147151
public static QueryBuilder fromTablename(final String tableName) {
@@ -162,20 +166,29 @@ public QueryBuilder withPartitionCondition(
162166
createSqlPartitionCondition(partitionColumn, startPointIncl, endPointExcl)))
163167
.collect(Collectors.toList()),
164168
this.limitStr,
165-
this.excludedColumns);
169+
this.excludedColumns,
170+
this.splitColumn);
171+
}
172+
173+
public QueryBuilder withSplitColumn(final Optional<String> splitColumn) {
174+
return new QueryBuilder(
175+
this.base, this.whereConditions, this.limitStr, this.excludedColumns, splitColumn);
166176
}
167177

168178
public QueryBuilder withExcludedColumns(final Optional<ImmutableSet<String>> excludedColumns) {
169179
if (excludedColumns.isPresent() && this.base instanceof UserQueryBase) {
170180
UserQueryBase userQueryBase = (UserQueryBase) this.base;
171-
String newSqlQuery = rebuildSelectClause(userQueryBase.userSqlQuery, excludedColumns.get());
181+
String newSqlQuery =
182+
rebuildSelectClause(userQueryBase.userSqlQuery, excludedColumns.get(), this.splitColumn);
172183
return new QueryBuilder(
173184
new UserQueryBase(newSqlQuery, userQueryBase.selectClause),
174185
this.whereConditions,
175186
this.limitStr,
176-
excludedColumns);
187+
excludedColumns,
188+
this.splitColumn);
177189
} else {
178-
return new QueryBuilder(this.base, this.whereConditions, this.limitStr, excludedColumns);
190+
return new QueryBuilder(
191+
this.base, this.whereConditions, this.limitStr, excludedColumns, this.splitColumn);
179192
}
180193
}
181194

@@ -200,7 +213,8 @@ public QueryBuilder withParallelizationCondition(
200213
partitionColumn, startPointIncl, endPoint, isEndPointExcl)))
201214
.collect(Collectors.toList()),
202215
this.limitStr,
203-
this.excludedColumns);
216+
this.excludedColumns,
217+
this.splitColumn);
204218
}
205219

206220
private static String createSqlSplitCondition(
@@ -235,7 +249,7 @@ private static String removeTrailingSymbols(String sqlQuery) {
235249
}
236250

237251
private static String rebuildSelectClause(
238-
String sqlQuery, ImmutableSet<String> excludedColumns) {
252+
String sqlQuery, ImmutableSet<String> excludedColumns, Optional<String> splitColumn) {
239253
String lowerCaseQuery = sqlQuery.toLowerCase();
240254
int selectIdx = lowerCaseQuery.indexOf("select");
241255
int fromIdx = lowerCaseQuery.indexOf("from");
@@ -247,12 +261,25 @@ private static String rebuildSelectClause(
247261

248262
String selectClause = sqlQuery.substring(selectIdx + "select".length(), fromIdx).trim();
249263
String[] columns = selectClause.split(",");
250-
List<String> newColumns =
264+
List<String> newColumns =
251265
Stream.of(columns)
252266
.map(String::trim)
253-
.filter(column -> !excludedColumns.contains(column))
267+
.filter(
268+
column -> {
269+
if (splitColumn.isPresent() && column.equals(splitColumn.get())) {
270+
return true;
271+
}
272+
return !excludedColumns.contains(column);
273+
})
254274
.collect(Collectors.toList());
255275

276+
if (splitColumn.isPresent()) {
277+
boolean exists = newColumns.stream().anyMatch(c -> c.equals(splitColumn.get()));
278+
if (!exists) {
279+
newColumns.add(splitColumn.get());
280+
}
281+
}
282+
256283
if (newColumns.isEmpty()) {
257284
return "SELECT * " + sqlQuery.substring(fromIdx);
258285
} else {
@@ -265,7 +292,8 @@ public QueryBuilder withLimit(long limit) {
265292
this.base,
266293
this.whereConditions,
267294
Optional.of(String.format(" LIMIT %d", limit)),
268-
this.excludedColumns);
295+
this.excludedColumns,
296+
this.splitColumn);
269297
}
270298

271299
@Override
@@ -294,9 +322,10 @@ public QueryBuilder resolveSelect(final Connection connection) throws SQLExcepti
294322
if (this.excludedColumns.isPresent()) {
295323
String queryToCheck = this.base.getBaseSql() + " AND 1=0";
296324
List<String> columns = getColumnsFromQuery(connection, queryToCheck);
297-
List<String> filteredColumns = columns.stream()
298-
.filter(c -> !this.excludedColumns.get().contains(c))
299-
.collect(Collectors.toList());
325+
List<String> filteredColumns =
326+
columns.stream()
327+
.filter(c -> !this.excludedColumns.get().contains(c))
328+
.collect(Collectors.toList());
300329

301330
if (filteredColumns.isEmpty()) {
302331
throw new SQLException("All columns excluded for query: " + queryToCheck);
@@ -307,7 +336,8 @@ public QueryBuilder resolveSelect(final Connection connection) throws SQLExcepti
307336
this.base.withSelect(selectClause),
308337
this.whereConditions,
309338
this.limitStr,
310-
this.excludedColumns);
339+
this.excludedColumns,
340+
this.splitColumn);
311341
}
312342
return this;
313343
}
@@ -352,6 +382,7 @@ public QueryBuilder generateQueryToGetLimitsOfSplitColumn(
352382
base.withSelect(selectMinMax),
353383
this.whereConditions,
354384
this.limitStr,
355-
this.excludedColumns);
385+
this.excludedColumns,
386+
this.splitColumn);
356387
}
357388
}

dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ public String sqlQueryWithLimitOne() {
127127
*/
128128
public List<String> buildQueries(final Connection connection) throws SQLException {
129129
QueryBuilder queryBuilder = this.baseSqlQuery();
130+
if (this.splitColumn().isPresent()) {
131+
queryBuilder = queryBuilder.withSplitColumn(this.splitColumn());
132+
}
130133
if (this.excludedColumns().isPresent()) {
131134
queryBuilder = queryBuilder.withExcludedColumns(this.excludedColumns());
132135
queryBuilder = queryBuilder.resolveSelect(connection);

dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ private static class JdbcAvroWriter extends FileBasedSink.Writer<Void, String> {
139139
private Connection connection;
140140
private JdbcAvroMetering metering;
141141
private CountingOutputStream countingOutputStream;
142+
private Schema schema;
142143

143144
JdbcAvroWriter(
144145
FileBasedSink.WriteOperation<Void, String> writeOperation,
@@ -160,7 +161,7 @@ protected void prepareWrite(final WritableByteChannel channel) throws Exception
160161
LOGGER.info("jdbcavroio : Preparing write...");
161162
connection = jdbcAvroArgs.jdbcConnectionConfiguration().createConnection();
162163
final Void destination = getDestination();
163-
final Schema schema = dynamicDestinations.getSchema(destination);
164+
this.schema = dynamicDestinations.getSchema(destination);
164165
dataFileWriter =
165166
new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema))
166167
.setCodec(jdbcAvroArgs.getCodecFactory())
@@ -206,6 +207,7 @@ public void write(final String query) throws Exception {
206207
try (ResultSet resultSet = executeQuery(query)) {
207208
metering.startWriteMeter();
208209
final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet,
210+
this.schema,
209211
this.jdbcAvroArgs.arrayMode(), this.jdbcAvroArgs.nullableArrayItems());
210212
while (resultSet.next()) {
211213
dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes());

dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,52 +27,60 @@
2727
import java.sql.ResultSetMetaData;
2828
import java.sql.SQLException;
2929
import java.util.UUID;
30+
import org.apache.avro.Schema;
3031
import org.apache.avro.io.BinaryEncoder;
3132
import org.apache.avro.io.EncoderFactory;
3233

3334
public class JdbcAvroRecordConverter {
3435

3536
private final JdbcAvroRecord.SqlFunction<ResultSet, Object>[] mappings;
36-
private final int columnCount;
3737
private final ResultSet resultSet;
3838
private final EncoderFactory encoderFactory = EncoderFactory.get();
3939
private final boolean nullableArrayItems;
40+
private final Schema schema;
4041

4142
public JdbcAvroRecordConverter(
4243
final JdbcAvroRecord.SqlFunction<ResultSet, Object>[] mappings,
43-
final int columnCount,
4444
final ResultSet resultSet,
45-
final boolean nullableArrayItems) {
45+
final boolean nullableArrayItems,
46+
final Schema schema) {
4647
this.mappings = mappings;
47-
this.columnCount = columnCount;
4848
this.resultSet = resultSet;
4949
this.nullableArrayItems = nullableArrayItems;
50+
this.schema = schema;
5051
}
5152

5253
public static JdbcAvroRecordConverter create(final ResultSet resultSet,
54+
final Schema schema,
5355
final String arrayMode,
5456
final boolean nullableArrayItems)
5557
throws SQLException {
5658
return new JdbcAvroRecordConverter(
57-
computeAllMappings(resultSet, arrayMode),
58-
resultSet.getMetaData().getColumnCount(),
59+
computeAllMappings(resultSet, schema, arrayMode),
5960
resultSet,
60-
nullableArrayItems);
61+
nullableArrayItems,
62+
schema);
6163
}
6264

6365
@SuppressWarnings("unchecked")
6466
static JdbcAvroRecord.SqlFunction<ResultSet, Object>[] computeAllMappings(
65-
final ResultSet resultSet, final String arrayMode)
67+
final ResultSet resultSet, final Schema schema, final String arrayMode)
6668
throws SQLException {
6769
final ResultSetMetaData meta = resultSet.getMetaData();
68-
final int columnCount = meta.getColumnCount();
70+
final int columnCount = schema.getFields().size();
6971

7072
final JdbcAvroRecord.SqlFunction<ResultSet, Object>[] mappings =
7173
(JdbcAvroRecord.SqlFunction<ResultSet, Object>[])
72-
new JdbcAvroRecord.SqlFunction<?, ?>[columnCount + 1];
74+
new JdbcAvroRecord.SqlFunction<?, ?>[columnCount];
7375

74-
for (int i = 1; i <= columnCount; i++) {
75-
mappings[i] = JdbcAvroRecord.computeMapping(meta, i, arrayMode);
76+
for (int i = 0; i < columnCount; i++) {
77+
Schema.Field field = schema.getFields().get(i);
78+
String columnName = field.getProp("columnName");
79+
if (columnName == null) {
80+
columnName = field.name();
81+
}
82+
int columnIndex = resultSet.findColumn(columnName);
83+
mappings[i] = JdbcAvroRecord.computeMapping(meta, columnIndex, arrayMode);
7684
}
7785
return mappings;
7886
}
@@ -100,16 +108,16 @@ byte[] getBufffer() {
100108
* @throws IOException in case binary encoding fails
101109
*/
102110
public ByteBuffer convertResultSetIntoAvroBytes() throws SQLException, IOException {
103-
final MyByteArrayOutputStream out = new MyByteArrayOutputStream(columnCount * 64);
111+
final MyByteArrayOutputStream out = new MyByteArrayOutputStream(mappings.length * 64);
104112
binaryEncoder = encoderFactory.directBinaryEncoder(out, binaryEncoder);
105-
for (int i = 1; i <= columnCount; i++) {
113+
for (int i = 0; i < mappings.length; i++) {
106114
final Object value = mappings[i].apply(resultSet);
107115
if (value == null || resultSet.wasNull()) {
108116
binaryEncoder.writeIndex(0);
109117
binaryEncoder.writeNull();
110118
} else {
111119
binaryEncoder.writeIndex(1);
112-
writeValue(value, resultSet.getMetaData().getColumnName(i), binaryEncoder);
120+
writeValue(value, schema.getFields().get(i).name(), binaryEncoder);
113121
}
114122
}
115123
binaryEncoder.flush();

dbeam-core/src/test/java/com/spotify/dbeam/TestHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public static void mockArrayColumn(ResultSetMetaData meta, ResultSet resultSet,
7474
throws SQLException {
7575
mockResultSetMeta(meta, columnIdx, Types.ARRAY, columnName, "java.sql.Array",
7676
columnTypeName);
77+
when(resultSet.findColumn(columnName)).thenReturn(columnIdx);
7778
Array res1;
7879
if (array1 == null) {
7980
res1 = null;

dbeam-core/src/test/java/com/spotify/dbeam/args/QueryBuilderArgsTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,26 @@ public void shouldConfigureExcludedColumnsWithTable() throws IOException, SQLExc
306306
Assert.assertTrue(query.contains("PRICE"));
307307
}
308308

309+
@Test
310+
public void shouldCreateQueriesWithExplicitSelectAndExcludeColumnsAndMissingSplitColumn()
311+
throws IOException, SQLException {
312+
Path sqlPath =
313+
TestHelper.createTmpDirPath("jdbc-export-args-test").resolve("explicit_select_exclude.sql");
314+
Files.write(
315+
sqlPath,
316+
"SELECT COF_NAME FROM COFFEES".getBytes(StandardCharsets.UTF_8));
317+
318+
final QueryBuilderArgs actual =
319+
parseOptions(
320+
String.format(
321+
"--connectionUrl=jdbc:postgresql://some_db "
322+
+ "--sqlFile=%s --splitColumn=TOTAL --queryParallelism=5 "
323+
+ "--excludeColumns=COF_NAME",
324+
sqlPath.toString()));
325+
326+
actual.buildQueries(connection);
327+
}
328+
309329
private QueryBuilderArgs parseOptions(String cmdLineArgs) throws IOException {
310330
JdbcExportPipelineOptions opts = commandLineToOptions(cmdLineArgs);
311331
return JdbcExportArgsFactory.createQueryArgs(opts);

dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ public void shouldEncodeResultSetToValidAvro()
197197
JdbcAvroSchema.createAvroSchema(
198198
rs, "dbeam_generated", "connection", Optional.empty(), "doc",
199199
false, arrayMode, false, Optional.empty());
200-
final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(rs, arrayMode, false);
200+
final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(rs, schema,
201+
arrayMode, false);
201202
final DataFileWriter<GenericRecord> dataFileWriter =
202203
new DataFileWriter<>(new GenericDatumWriter<>(schema));
203204
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

0 commit comments

Comments
 (0)