Skip to content

Commit

Permalink
Merge pull request #11 from fivetran/handle-unknown-metadata-field-type
Browse files Browse the repository at this point in the history
feature(table_map_event_data_serializer): add support of ColumnVisibility metadata and handle unknown metadata field type
  • Loading branch information
zekail authored Oct 29, 2021
2 parents ded5e80 + f391a50 commit 2cbc106
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class TableMapEventMetadata implements EventData {
private Map<Integer, Integer> primaryKeysWithPrefix;
private DefaultCharset enumAndSetDefaultCharset;
private List<Integer> enumAndSetColumnCharsets;
private BitSet visibility;

public BitSet getSignedness() {
return signedness;
Expand Down Expand Up @@ -125,6 +126,14 @@ public void setEnumAndSetColumnCharsets(List<Integer> enumAndSetColumnCharsets)
this.enumAndSetColumnCharsets = enumAndSetColumnCharsets;
}

public BitSet getVisibility() {
return visibility;
}

public void setVisibility(BitSet visibility) {
this.visibility = visibility;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -163,6 +172,8 @@ public String toString() {
sb.append(", enumAndSetColumnCharsets=").append(enumAndSetColumnCharsets == null ? "null" : "");
appendList(sb, enumAndSetColumnCharsets);

sb.append(",visibility=").append(visibility);

sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IO
if (metadataLength > 0) {
metadata = metadataDeserializer.deserialize(
new ByteArrayInputStream(inputStream.read(metadataLength)),
eventData.getColumnTypes().length,
numericColumnCount(eventData.getColumnTypes())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* @author <a href="mailto:[email protected]">Ahmed Abdul Hamid</a>
*/
public class TableMapEventMetadataDeserializer {

public TableMapEventMetadata deserialize(ByteArrayInputStream inputStream, int nIntColumns) throws IOException {
private final Logger logger = Logger.getLogger(getClass().getName());

public TableMapEventMetadata deserialize(ByteArrayInputStream inputStream, int nColumns, int nNumericColumns) throws IOException {
int remainingBytes = inputStream.available();
if (remainingBytes <= 0) {
return null;
Expand All @@ -41,15 +45,27 @@ public TableMapEventMetadata deserialize(ByteArrayInputStream inputStream, int n
TableMapEventMetadata result = new TableMapEventMetadata();

for (; remainingBytes > 0; inputStream.enterBlock(remainingBytes)) {
MetadataFieldType fieldType = MetadataFieldType.byCode(inputStream.readInteger(1));
int code = inputStream.readInteger(1);

MetadataFieldType fieldType = MetadataFieldType.byCode(code);

if(fieldType == null)
throw new IOException("Unsupported table metadata field type " + code);

//for some reasons, the UNKNOWN_METADATA_FIELD_TYPE will mess up the stream
if(inputStream.available() == 0) {
logger.warning("Stream is empty so cannot read field length for field type: " + fieldType);
return result;
}

int fieldLength = inputStream.readPackedInteger();

remainingBytes = inputStream.available();
inputStream.enterBlock(fieldLength);

switch (fieldType) {
case SIGNEDNESS:
result.setSignedness(readSignedness(inputStream, nIntColumns));
result.setSignedness(readBooleanList(inputStream, nNumericColumns));
break;
case DEFAULT_CHARSET:
result.setDefaultCharset(readDefaultCharset(inputStream));
Expand Down Expand Up @@ -81,16 +97,22 @@ public TableMapEventMetadata deserialize(ByteArrayInputStream inputStream, int n
case ENUM_AND_SET_COLUMN_CHARSET:
result.setEnumAndSetColumnCharsets(readIntegers(inputStream));
break;
case VISIBILITY:
result.setVisibility(readBooleanList(inputStream, nColumns));
break;
case UNKNOWN_METADATA_FIELD_TYPE:
logger.warning("Received metadata field of unknown type");
break;
default:
inputStream.enterBlock(remainingBytes);
throw new IOException("Unsupported table metadata field type " + fieldType);
throw new IOException("Unsupported table metadata field type " + code);
}
remainingBytes -= fieldLength;
}
return result;
}

private static BitSet readSignedness(ByteArrayInputStream inputStream, int length) throws IOException {
private static BitSet readBooleanList(ByteArrayInputStream inputStream, int length) throws IOException {
BitSet result = new BitSet();
// according to MySQL internals the amount of storage required for N columns is INT((N+7)/8) bytes
byte[] bytes = inputStream.read((length + 7) >> 3);
Expand Down Expand Up @@ -162,7 +184,9 @@ private enum MetadataFieldType {
SIMPLE_PRIMARY_KEY(8), // The primary key without any prefix
PRIMARY_KEY_WITH_PREFIX(9), // The primary key with some prefix
ENUM_AND_SET_DEFAULT_CHARSET(10), // Charsets of ENUM and SET columns
ENUM_AND_SET_COLUMN_CHARSET(11); // Charsets of ENUM and SET columns
ENUM_AND_SET_COLUMN_CHARSET(11), // Charsets of ENUM and SET columns
VISIBILITY(12), // Column visibility (8.0.23 and newer)
UNKNOWN_METADATA_FIELD_TYPE(128); // Returned from MySQL 8.0 in some cases

private final int code;

Expand Down

0 comments on commit 2cbc106

Please sign in to comment.