Skip to content

fix parse JsonBinary type column blank holes error caused by MySql 8.0 #302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
<version>8.0.18</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down Expand Up @@ -233,6 +233,8 @@
-Dvagrant.integration.box=supplement/vagrant/mysql-5.7.15-sandbox-prepackaged
./mvnw -P coverage,mysql-8-compat verify \
-Dvagrant.integration.box=supplement/vagrant/mysql-8.0.1-sandbox-prepackaged
./mvnw -P coverage,mysql-8-compat verify \
-Dvagrant.integration.box=supplement/vagrant/mysql-8.0.18-sandbox-prepackaged

# submit coverage report to coveralls
./mvnw -P coverage coveralls:jacoco -DrepoToken=&lt;coveralls.io&gt;
Expand Down Expand Up @@ -261,7 +263,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.7-dmr</version>
<version>8.0.18</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.github.shyiko.mysql.binlog.event.deserialization.json;

import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;

import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType;
import com.github.shyiko.mysql.binlog.io.JsonByteArrayInputStream;

/**
* Utility to parse the binary-encoded value of a MySQL {@code JSON} type, translating the encoded representation into
* method calls on a supplied {@link JsonFormatter} implementation.
Expand Down Expand Up @@ -181,14 +181,10 @@ public static void parse(byte[] bytes, JsonFormatter formatter) throws IOExcepti
new JsonBinary(bytes).parse(formatter);
}

private final ByteArrayInputStream reader;
private final JsonByteArrayInputStream reader;

public JsonBinary(byte[] bytes) {
this(new ByteArrayInputStream(bytes));
}

public JsonBinary(ByteArrayInputStream contents) {
this.reader = contents;
this.reader = new JsonByteArrayInputStream(bytes);
}

public String getString() {
Expand All @@ -202,10 +198,12 @@ public String getString() {
}

public void parse(JsonFormatter formatter) throws IOException {
parse(readValueType(), formatter);
parse(new ValueEntry(readValueType(), reader.getPosition(), 0), formatter);
}

protected void parse(ValueType type, JsonFormatter formatter) throws IOException {
protected void parse(ValueEntry entry, JsonFormatter formatter) throws IOException {
ensureOffset(entry.absoluteOffset());
ValueType type = entry.type;
switch (type) {
case SMALL_DOCUMENT:
parseObject(true, formatter);
Expand Down Expand Up @@ -255,6 +253,22 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException
}
}

/**
* ensure parse position equals entry-offset
* @param ensureOffset parse offset
*/
private void ensureOffset(int ensureOffset) {
// ensure reader's position equals parse offset
int pos = reader.getPosition();
if (pos != ensureOffset) {
if (ensureOffset < pos) {
String msg = String.format("wrong offset. pos:%d, ensureOffset:%d", pos, ensureOffset);
throw new RuntimeException(msg);
}
reader.skip(ensureOffset - pos);
}
}

/**
* Parse a JSON object.
* <p>
Expand Down Expand Up @@ -319,15 +333,18 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException
*/
protected void parseObject(boolean small, JsonFormatter formatter)
throws IOException {
int objectPosition = reader.getPosition(); // object start position

// Read the header ...
int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in");
int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of");
int valueSize = small ? 2 : 4;

// Read each key-entry, consisting of the offset and length of each key ...
int[] keyLengths = new int[numElements];
int[] keyOffsets = new int[numElements];
for (int i = 0; i != numElements; ++i) {
readUnsignedIndex(numBytes, small, "key offset in"); // unused
keyOffsets[i] = readUnsignedIndex(numBytes, small, "key offset in");
keyLengths[i] = readUInt16();
}

Expand Down Expand Up @@ -368,13 +385,14 @@ protected void parseObject(boolean small, JsonFormatter formatter)
", which is larger than the binary form of the JSON document (" +
numBytes + " bytes)");
}
entries[i] = new ValueEntry(type, offset);
entries[i] = new ValueEntry(type, objectPosition, offset);
}
}

// Read each key ...
String[] keys = new String[numElements];
for (int i = 0; i != numElements; ++i) {
ensureOffset(objectPosition + keyOffsets[i]);
keys[i] = reader.readString(keyLengths[i]);
}

Expand All @@ -397,7 +415,7 @@ protected void parseObject(boolean small, JsonFormatter formatter)
}
} else {
// Parse the value ...
parse(entry.type, formatter);
parse(entry, formatter);
}
}
formatter.endObject();
Expand Down Expand Up @@ -462,7 +480,9 @@ protected void parseObject(boolean small, JsonFormatter formatter)
*/
// checkstyle, please ignore MethodLength for the next line
protected void parseArray(boolean small, JsonFormatter formatter)
throws IOException {
throws IOException {
int objectPosition = reader.getPosition(); // array object start position

// Read the header ...
int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in");
int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of");
Expand Down Expand Up @@ -505,7 +525,7 @@ protected void parseArray(boolean small, JsonFormatter formatter)
", which is larger than the binary form of the JSON document (" +
numBytes + " bytes)");
}
entries[i] = new ValueEntry(type, offset);
entries[i] = new ValueEntry(type, objectPosition, offset);
}
}

Expand All @@ -527,7 +547,7 @@ protected void parseArray(boolean small, JsonFormatter formatter)
}
} else {
// Parse the value ...
parse(entry.type, formatter);
parse(entry, formatter);
}
}
formatter.endArray();
Expand Down Expand Up @@ -995,23 +1015,30 @@ protected static final class ValueEntry {

protected final ValueType type;
protected final int index;
protected final int objectIndex;
protected Object value;
protected boolean resolved;

public ValueEntry(ValueType type) {
this.type = type;
this.index = 0;
this.objectIndex = 0;
}

public ValueEntry(ValueType type, int index) {
public ValueEntry(ValueType type, int objectIndex, int index) {
this.type = type;
this.index = index;
this.objectIndex = objectIndex;
}

public ValueEntry setValue(Object value) {
this.value = value;
this.resolved = true;
return this;
}

public int absoluteOffset() {
return this.objectIndex + this.index;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2013 Stanley Shyiko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.shyiko.mysql.binlog.io;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;

/**
* this byte array input stream is easy to parse json binary.
* @author <a href="mailto:[email protected]">Stanley Shyiko</a>
*/
public class JsonByteArrayInputStream extends ByteArrayInputStream {

public JsonByteArrayInputStream(byte[] bytes) {
super(bytes);
}

/**
* Read fixed length string.
*/
public String readString(int length) throws IOException {
return new String(read(length));
}

public byte[] read(int length) throws IOException {
byte[] bytes = new byte[length];
fill(bytes, 0, length);
return bytes;
}

public void fill(byte[] bytes, int offset, int length) throws IOException {
int remaining = length;
while (remaining != 0) {
int read = read(bytes, offset + length - remaining, remaining);
if (read == -1) {
throw new EOFException();
}
remaining -= read;
}
}

/**
* return current cursor position
* @return current cursor
*/
public int getPosition() {
return super.pos;
}
}
Loading