diff --git a/pom.xml b/pom.xml index a3f78dcb..0a854a90 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,7 @@ mysql mysql-connector-java - 8.0.15 + 8.0.18 test @@ -233,6 +233,8 @@ -Dvagrant.integration.box=supplement/vagrant/mysql-5.7.15-sandbox-prepackaged ./mvnw -P coverage,mysql-8-compat verify \ -Dvagrant.integration.box=supplement/vagrant/mysql-8.0.1-sandbox-prepackaged + ./mvnw -P coverage,mysql-8-compat verify \ + -Dvagrant.integration.box=supplement/vagrant/mysql-8.0.18-sandbox-prepackaged # submit coverage report to coveralls ./mvnw -P coverage coveralls:jacoco -DrepoToken=<coveralls.io> @@ -261,7 +263,7 @@ mysql mysql-connector-java - 8.0.7-dmr + 8.0.18 test diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java index a6b5fc0e..63b7a1e8 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java @@ -15,15 +15,15 @@ */ package com.github.shyiko.mysql.binlog.event.deserialization.json; -import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; -import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; - import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.Charset; +import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; +import com.github.shyiko.mysql.binlog.io.JsonByteArrayInputStream; + /** * Utility to parse the binary-encoded value of a MySQL {@code JSON} type, translating the encoded representation into * method calls on a supplied {@link JsonFormatter} implementation. @@ -181,14 +181,10 @@ public static void parse(byte[] bytes, JsonFormatter formatter) throws IOExcepti new JsonBinary(bytes).parse(formatter); } - private final ByteArrayInputStream reader; + private final JsonByteArrayInputStream reader; public JsonBinary(byte[] bytes) { - this(new ByteArrayInputStream(bytes)); - } - - public JsonBinary(ByteArrayInputStream contents) { - this.reader = contents; + this.reader = new JsonByteArrayInputStream(bytes); } public String getString() { @@ -202,10 +198,12 @@ public String getString() { } public void parse(JsonFormatter formatter) throws IOException { - parse(readValueType(), formatter); + parse(new ValueEntry(readValueType(), reader.getPosition(), 0), formatter); } - protected void parse(ValueType type, JsonFormatter formatter) throws IOException { + protected void parse(ValueEntry entry, JsonFormatter formatter) throws IOException { + ensureOffset(entry.absoluteOffset()); + ValueType type = entry.type; switch (type) { case SMALL_DOCUMENT: parseObject(true, formatter); @@ -255,6 +253,22 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException } } + /** + * ensure parse position equals entry-offset + * @param ensureOffset parse offset + */ + private void ensureOffset(int ensureOffset) { + // ensure reader's position equals parse offset + int pos = reader.getPosition(); + if (pos != ensureOffset) { + if (ensureOffset < pos) { + String msg = String.format("wrong offset. pos:%d, ensureOffset:%d", pos, ensureOffset); + throw new RuntimeException(msg); + } + reader.skip(ensureOffset - pos); + } + } + /** * Parse a JSON object. *

@@ -319,6 +333,8 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException */ protected void parseObject(boolean small, JsonFormatter formatter) throws IOException { + int objectPosition = reader.getPosition(); // object start position + // Read the header ... int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in"); int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of"); @@ -326,8 +342,9 @@ protected void parseObject(boolean small, JsonFormatter formatter) // Read each key-entry, consisting of the offset and length of each key ... int[] keyLengths = new int[numElements]; + int[] keyOffsets = new int[numElements]; for (int i = 0; i != numElements; ++i) { - readUnsignedIndex(numBytes, small, "key offset in"); // unused + keyOffsets[i] = readUnsignedIndex(numBytes, small, "key offset in"); keyLengths[i] = readUInt16(); } @@ -368,13 +385,14 @@ protected void parseObject(boolean small, JsonFormatter formatter) ", which is larger than the binary form of the JSON document (" + numBytes + " bytes)"); } - entries[i] = new ValueEntry(type, offset); + entries[i] = new ValueEntry(type, objectPosition, offset); } } // Read each key ... String[] keys = new String[numElements]; for (int i = 0; i != numElements; ++i) { + ensureOffset(objectPosition + keyOffsets[i]); keys[i] = reader.readString(keyLengths[i]); } @@ -397,7 +415,7 @@ protected void parseObject(boolean small, JsonFormatter formatter) } } else { // Parse the value ... - parse(entry.type, formatter); + parse(entry, formatter); } } formatter.endObject(); @@ -462,7 +480,9 @@ protected void parseObject(boolean small, JsonFormatter formatter) */ // checkstyle, please ignore MethodLength for the next line protected void parseArray(boolean small, JsonFormatter formatter) - throws IOException { + throws IOException { + int objectPosition = reader.getPosition(); // array object start position + // Read the header ... int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in"); int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of"); @@ -505,7 +525,7 @@ protected void parseArray(boolean small, JsonFormatter formatter) ", which is larger than the binary form of the JSON document (" + numBytes + " bytes)"); } - entries[i] = new ValueEntry(type, offset); + entries[i] = new ValueEntry(type, objectPosition, offset); } } @@ -527,7 +547,7 @@ protected void parseArray(boolean small, JsonFormatter formatter) } } else { // Parse the value ... - parse(entry.type, formatter); + parse(entry, formatter); } } formatter.endArray(); @@ -995,17 +1015,20 @@ protected static final class ValueEntry { protected final ValueType type; protected final int index; + protected final int objectIndex; protected Object value; protected boolean resolved; public ValueEntry(ValueType type) { this.type = type; this.index = 0; + this.objectIndex = 0; } - public ValueEntry(ValueType type, int index) { + public ValueEntry(ValueType type, int objectIndex, int index) { this.type = type; this.index = index; + this.objectIndex = objectIndex; } public ValueEntry setValue(Object value) { @@ -1013,5 +1036,9 @@ public ValueEntry setValue(Object value) { this.resolved = true; return this; } + + public int absoluteOffset() { + return this.objectIndex + this.index; + } } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/JsonByteArrayInputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/JsonByteArrayInputStream.java new file mode 100644 index 00000000..b9dd55ea --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/io/JsonByteArrayInputStream.java @@ -0,0 +1,63 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.io; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; + +/** + * this byte array input stream is easy to parse json binary. + * @author Stanley Shyiko + */ +public class JsonByteArrayInputStream extends ByteArrayInputStream { + + public JsonByteArrayInputStream(byte[] bytes) { + super(bytes); + } + + /** + * Read fixed length string. + */ + public String readString(int length) throws IOException { + return new String(read(length)); + } + + public byte[] read(int length) throws IOException { + byte[] bytes = new byte[length]; + fill(bytes, 0, length); + return bytes; + } + + public void fill(byte[] bytes, int offset, int length) throws IOException { + int remaining = length; + while (remaining != 0) { + int read = read(bytes, offset + length - remaining, remaining); + if (read == -1) { + throw new EOFException(); + } + remaining -= read; + } + } + + /** + * return current cursor position + * @return current cursor + */ + public int getPosition() { + return super.pos; + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java index cdf314f7..eedf3607 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -15,22 +15,8 @@ */ package com.github.shyiko.mysql.binlog.event.deserialization.json; -import com.github.shyiko.mysql.binlog.BinaryLogClient; -import com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTest; -import com.github.shyiko.mysql.binlog.CapturingEventListener; -import com.github.shyiko.mysql.binlog.CountDownEventListener; -import com.github.shyiko.mysql.binlog.TraceEventListener; -import com.github.shyiko.mysql.binlog.TraceLifecycleListener; -import com.github.shyiko.mysql.binlog.event.Event; -import com.github.shyiko.mysql.binlog.event.EventData; -import com.github.shyiko.mysql.binlog.event.EventType; -import com.github.shyiko.mysql.binlog.event.QueryEventData; -import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; -import org.skyscreamer.jsonassert.JSONAssert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.io.Serializable; import java.sql.SQLException; @@ -44,8 +30,24 @@ import java.util.logging.Level; import java.util.logging.Logger; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; +import org.skyscreamer.jsonassert.JSONAssert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTest; +import com.github.shyiko.mysql.binlog.CapturingEventListener; +import com.github.shyiko.mysql.binlog.CountDownEventListener; +import com.github.shyiko.mysql.binlog.TraceEventListener; +import com.github.shyiko.mysql.binlog.TraceLifecycleListener; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventData; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; /** * @author Randall Hauch @@ -104,6 +106,72 @@ public void execute(Statement statement) throws SQLException { eventListener.reset(); } + @Test + public void testMysql8JsonSetPartialUpdateWithHoles() throws Exception { + CountDownEventListener eventListener = new CountDownEventListener(); + client.registerEventListener(eventListener); + CapturingEventListener capturingEventListener = new CapturingEventListener(); + client.registerEventListener(capturingEventListener); + String json = "{\"age\":22,\"addr\":{\"code\":100,\"detail\":{\"ab\":\"970785C8-C299\"}},\"name\":\"Alice\"}"; + master.execute("DROP TABLE IF EXISTS json_test", "create table json_test (j JSON)", + "INSERT INTO json_test VALUES ('" + json + "')", + "UPDATE json_test SET j = JSON_SET(j, '$.addr.detail.ab', '970785C8')"); + eventListener.waitFor(WriteRowsEventData.class, 1, DEFAULT_TIMEOUT); + eventListener.waitFor(UpdateRowsEventData.class, 1, DEFAULT_TIMEOUT); + List events = capturingEventListener.getEvents(WriteRowsEventData.class); + Serializable[] insertData = events.iterator().next().getRows().get(0); + assertEquals(JsonBinary.parseAsString((byte[]) insertData[0]), json); + + List updateEvents = capturingEventListener.getEvents(UpdateRowsEventData.class); + Serializable[] updateData = updateEvents.iterator().next().getRows().get(0).getValue(); + assertEquals(JsonBinary.parseAsString((byte[]) updateData[0]), json.replace("970785C8-C299", "970785C8")); + assertEquals(((byte[]) updateData[0]).length, ((byte[]) insertData[0]).length); + } + + @Test + public void testMysql8JsonRemovePartialUpdateWithHoles() throws Exception { + CountDownEventListener eventListener = new CountDownEventListener(); + client.registerEventListener(eventListener); + CapturingEventListener capturingEventListener = new CapturingEventListener(); + client.registerEventListener(capturingEventListener); + String json = "{\"age\":22,\"addr\":{\"code\":100,\"detail\":{\"ab\":\"970785C8-C299\"}},\"name\":\"Alice\"}"; + master.execute("DROP TABLE IF EXISTS json_test", "create table json_test (j JSON)", + "INSERT INTO json_test VALUES ('" + json + "')", + "UPDATE json_test SET j = JSON_REMOVE(j, '$.addr.detail.ab')"); + eventListener.waitFor(WriteRowsEventData.class, 1, DEFAULT_TIMEOUT); + eventListener.waitFor(UpdateRowsEventData.class, 1, DEFAULT_TIMEOUT); + List events = capturingEventListener.getEvents(WriteRowsEventData.class); + Serializable[] insertData = events.iterator().next().getRows().get(0); + assertEquals(JsonBinary.parseAsString((byte[]) insertData[0]), json); + + List updateEvents = capturingEventListener.getEvents(UpdateRowsEventData.class); + Serializable[] updateData = updateEvents.iterator().next().getRows().get(0).getValue(); + assertEquals(JsonBinary.parseAsString((byte[]) updateData[0]), json.replace("\"ab\":\"970785C8-C299\"", "")); + assertEquals(((byte[]) updateData[0]).length, ((byte[]) insertData[0]).length); + } + + @Test + public void testMysql8JsonReplacePartialUpdateWithHoles() throws Exception { + CountDownEventListener eventListener = new CountDownEventListener(); + client.registerEventListener(eventListener); + CapturingEventListener capturingEventListener = new CapturingEventListener(); + client.registerEventListener(capturingEventListener); + String json = "{\"age\":22,\"addr\":{\"code\":100,\"detail\":{\"ab\":\"970785C8-C299\"}},\"name\":\"Alice\"}"; + master.execute("DROP TABLE IF EXISTS json_test", "create table json_test (j JSON)", + "INSERT INTO json_test VALUES ('" + json + "')", + "UPDATE json_test SET j = JSON_REPLACE(j, '$.addr.detail.ab', '9707')"); + eventListener.waitFor(WriteRowsEventData.class, 1, DEFAULT_TIMEOUT); + eventListener.waitFor(UpdateRowsEventData.class, 1, DEFAULT_TIMEOUT); + List events = capturingEventListener.getEvents(WriteRowsEventData.class); + Serializable[] insertData = events.iterator().next().getRows().get(0); + assertEquals(JsonBinary.parseAsString((byte[]) insertData[0]), json); + + List updateEvents = capturingEventListener.getEvents(UpdateRowsEventData.class); + Serializable[] updateData = updateEvents.iterator().next().getRows().get(0).getValue(); + assertEquals(JsonBinary.parseAsString((byte[]) updateData[0]), json.replace("970785C8-C299", "9707")); + assertEquals(((byte[]) updateData[0]).length, ((byte[]) insertData[0]).length); + } + @Test public void testValueBoundariesAreHonored() throws Exception { CountDownEventListener eventListener = new CountDownEventListener(); @@ -433,8 +501,11 @@ public void afterEachTest() throws Exception { public void onEvent(Event event) { if (event.getHeader().getEventType() == EventType.QUERY) { EventData data = event.getData(); - if (data != null && ((QueryEventData) data).getSql().contains("_EOS_marker")) { - latch.countDown(); + if (data != null) { + String sql = ((QueryEventData) data).getSql().toLowerCase(); + if (sql.contains("_EOS_marker".toLowerCase())) { + latch.countDown(); + } } } } diff --git a/supplement/vagrant/mysql-8.0.18-sandbox-prepackaged/vagrantfile b/supplement/vagrant/mysql-8.0.18-sandbox-prepackaged/vagrantfile new file mode 100644 index 00000000..2cbfbc08 --- /dev/null +++ b/supplement/vagrant/mysql-8.0.18-sandbox-prepackaged/vagrantfile @@ -0,0 +1,9 @@ +Vagrant.configure("2") do |config| + config.vm.box = 'shyiko/mysql-sandbox-prepackaged' + config.vm.box_version = '8.0.18' + config.vm.network :forwarded_port, guest: 33061, host: 33061 + config.vm.network :forwarded_port, guest: 33062, host: 33062 + config.vm.provider "virtualbox" do |v| + v.memory = 1024 + end +end diff --git a/supplement/vagrant/mysql-8.0.18-sandbox/vagrantfile b/supplement/vagrant/mysql-8.0.18-sandbox/vagrantfile new file mode 100644 index 00000000..afdb2f99 --- /dev/null +++ b/supplement/vagrant/mysql-8.0.18-sandbox/vagrantfile @@ -0,0 +1,27 @@ +Vagrant.configure("2") do |config| + config.vm.box = 'deb/jessie-i386' + config.vm.provision :shell, :inline => %Q( + sed -i.bak -r 's/(us.)?(archive|security).ubuntu.com/old-releases.ubuntu.com/g' /etc/apt/sources.list + apt-get update && apt-get install -y make libaio1 libnuma1 libtinfo5 # lib* required by mysql + echo 'Downloading MySQL distribution ...' + wget --no-check-certificate --progress=dot:mega --content-disposition \ + https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-8.0.18-linux-glibc2.12-i686.tar.xz \ + 2>&1 | grep --line-buffered -o '[0-9]*%' + tar -Jxf mysql-8.0.18-linux-glibc2.12-i686.tar.xz + tar -zcvf mysql-8.0.18-linux-glibc2.12-i686.tar.gz mysql-8.0.18-linux-glibc2.12-i686/ + rm -rf mysql-8.0.18-linux-glibc2.12-i686/ mysql-8.0.18-linux-glibc2.12-i686.tar.xz + wget -O - https://github.com/datacharmer/mysql-sandbox/releases/download/3.2.17/MySQL-Sandbox-3.2.17.tar.gz | tar xzv + (cd MySQL-Sandbox-3.2.17 && perl Makefile.PL && make && make install) + su -c "make_replication_sandbox ~/mysql-8.0.18-linux-glibc2.12-i686.tar.gz \ + --remote_access='%' --how_many_slaves=1 --sandbox_base_port=33061 \ + --master_options='-c binlog_format=ROW' \ + --slave_options='-c binlog_format=ROW -c log-slave-updates=TRUE'" vagrant + rm -f *.tar.xz + sed -i -e "s/exit\ 0/\\/home\\/vagrant\\/sandboxes\\/rsandbox_mysql-8_0_18\\/restart_all; exit 0/g" /etc/rc.local + ) + config.vm.network :forwarded_port, guest: 33061, host: 33061 + config.vm.network :forwarded_port, guest: 33062, host: 33062 + config.vm.provider "virtualbox" do |v| + v.memory = 1024 + end +end