Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 41 additions & 31 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,42 +163,52 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final String typeName = JDBCType.valueOf(columnType).getName();
final String columnClassName = meta.getColumnClassName(i);
final String columnTypeName = meta.getColumnTypeName(i);
SchemaBuilder.FieldBuilder<Schema> field =
builder
.name(normalizeForAvro(columnName))
.doc(String.format("From sqlType %d %s (%s)", columnType, typeName, columnClassName))
.prop("columnName", columnName)
.prop("sqlCode", String.valueOf(columnType))
.prop("typeName", typeName)
.prop("columnClassName", columnClassName);

if (columnTypeName != null) {
field = field.prop("columnTypeName", columnTypeName);
}
try {
SchemaBuilder.FieldBuilder<Schema> field =
builder
.name(normalizeForAvro(columnName))
.doc(String.format(
"From sqlType %d %s (%s)", columnType, typeName, columnClassName))
.prop("columnName", columnName)
.prop("sqlCode", String.valueOf(columnType))
.prop("typeName", typeName)
.prop("columnClassName", columnClassName);

final SchemaBuilder.BaseTypeBuilder<
SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>
fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and();
if (columnTypeName != null) {
field = field.prop("columnTypeName", columnTypeName);
}

Array arrayInstance =
resultSet.isFirst() && columnType == ARRAY
&& arrayMode.equals(ArrayHandlingMode.TypedMetaFromFirstRow)
? resultSet.getArray(i) : null;
final SchemaBuilder.BaseTypeBuilder<
SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>
fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and();

final SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>> schemaFieldAssembler =
buildAvroFieldType(
columnName,
columnType,
arrayInstance,
meta.getPrecision(i),
columnClassName,
columnTypeName,
useLogicalTypes,
arrayMode,
nullableArrayItems,
fieldSchemaBuilder);
Array arrayInstance =
resultSet.isFirst() && columnType == ARRAY
&& arrayMode.equals(ArrayHandlingMode.TypedMetaFromFirstRow)
? resultSet.getArray(i) : null;

final SchemaBuilder.UnionAccumulator<
SchemaBuilder.NullDefault<Schema>> schemaFieldAssembler =
buildAvroFieldType(
columnName,
columnType,
arrayInstance,
meta.getPrecision(i),
columnClassName,
columnTypeName,
useLogicalTypes,
arrayMode,
nullableArrayItems,
fieldSchemaBuilder);

schemaFieldAssembler.endUnion().nullDefault();
schemaFieldAssembler.endUnion().nullDefault();
} catch (Exception e) {
throw new RuntimeException(String.format(
"Failed to build Avro schema for sqlType %d %s [%s, %s]",
columnType, typeName, columnClassName, columnTypeName
), e);
}
Comment on lines +206 to +211
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind the idea. I think in the new version of dbeam though the exceptions won't be as bad, but wrapping the code and reporting which field had an issue could still be useful anyways. Maybe it would look cleaner with a new helper function such as convertField(), then in createAvroFields() in the loop, we call convertField() on each field and that is wrapped in try-catch block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice idea! will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh, tried out the helper function and i think it was overall messier because there's like 5 variables we need to pass in per field, and extra 2 containing state from the outer loop. might be more trouble than it's worth :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fair enough, I'm fine with just wrapping the whole code block in try-catch then

}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,24 @@ public void shouldThrowOnNonSupportedTypes() throws SQLException {
final ResultSet resultSet = buildMockResultSet(Types.STRUCT);
RuntimeException thrown = Assert.assertThrows(RuntimeException.class,
() -> createAvroSchemaForSingleField(resultSet, false));
Assert.assertEquals("STRUCT type is not supported", thrown.getMessage());
Assert.assertEquals("STRUCT type is not supported", thrown.getCause().getMessage());

final ResultSet resultSet2 = buildMockResultSet(Types.REF);
RuntimeException thrown2 = Assert.assertThrows(RuntimeException.class,
() -> createAvroSchemaForSingleField(resultSet2, false));
Assert.assertEquals("REF and REF_CURSOR type are not supported", thrown2.getMessage());
Assert.assertEquals(
"REF and REF_CURSOR type are not supported", thrown2.getCause().getMessage());

final ResultSet resultSet3 = buildMockResultSet(Types.REF_CURSOR);
RuntimeException thrown3 = Assert.assertThrows(RuntimeException.class,
() -> createAvroSchemaForSingleField(resultSet3, false));
Assert.assertEquals("REF and REF_CURSOR type are not supported", thrown3.getMessage());
Assert.assertEquals(
"REF and REF_CURSOR type are not supported", thrown3.getCause().getMessage());

final ResultSet resultSet4 = buildMockResultSet(Types.DATALINK);
RuntimeException thrown4 = Assert.assertThrows(RuntimeException.class,
() -> createAvroSchemaForSingleField(resultSet4, false));
Assert.assertEquals("DATALINK type is not supported", thrown4.getMessage());
Assert.assertEquals("DATALINK type is not supported", thrown4.getCause().getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void shouldThrowOnInvalidArrayColumnTypeName() throws SQLException {
() -> JdbcAvroSchema.createAvroSchema(resultSet, "ns", "conn_url",
Optional.empty(), "doc", true, arrayMode, nullableArrayItems));
Assert.assertEquals("columnName=array_field_text columnTypeName=text should start with '_'",
thrown.getMessage());
thrown.getCause().getMessage());
}

@Test
Expand All @@ -366,6 +366,6 @@ public void shouldThrowOnNotSupportedArrayColumnTypeName() throws SQLException {
Optional.empty(), "doc", true, arrayMode, nullableArrayItems));
Assert.assertEquals(
"columnName=array_field_text Postgres type 'not_supported' is not supported",
thrown.getMessage());
thrown.getCause().getMessage());
}
}