diff --git a/.editorconfig b/.editorconfig index 07f2dafb..2536d66b 100644 --- a/.editorconfig +++ b/.editorconfig @@ -9,4 +9,4 @@ trim_trailing_whitespace = true insert_final_newline = true [*.md] -trim_trailing_whitespace = false \ No newline at end of file +trim_trailing_whitespace = false diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index c395aa7b..336b4323 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -422,9 +422,9 @@ public long getHeartbeatInterval() { *

* If set (recommended) *

@@ -574,7 +574,7 @@ public void connect() throws IOException { ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class); synchronized (gtidSetAccessLock) { if (gtidSet != null) { - ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.GTID_LOG, GtidEventDataDeserializer.class); ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); } } @@ -1035,7 +1035,7 @@ private void updateGtidSet(Event event) { } EventHeader eventHeader = event.getHeader(); switch(eventHeader.getEventType()) { - case GTID: + case GTID_LOG: GtidEventData gtidEventData = (GtidEventData) EventDataWrapper.internal(event.getData()); gtid = gtidEventData.getGtid(); break; diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/DeleteRowsEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/DeleteRowsEventData.java index b595b8f2..de940af4 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/DeleteRowsEventData.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/DeleteRowsEventData.java @@ -23,7 +23,7 @@ /** * @author Stanley Shyiko */ -public class DeleteRowsEventData implements EventData { +public class DeleteRowsEventData implements EventData, RowEventData { private long tableId; private BitSet includedColumns; diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeader.java b/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeader.java index 326da5fe..84da919e 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeader.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeader.java @@ -25,6 +25,8 @@ public interface EventHeader extends Serializable { long getTimestamp(); EventType getEventType(); long getServerId(); + long getEventLength(); long getHeaderLength(); long getDataLength(); + long getNextPosition(); } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeaderV4.java b/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeaderV4.java index 1583303e..4df250cb 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeaderV4.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/EventHeaderV4.java @@ -57,6 +57,7 @@ public void setServerId(long serverId) { this.serverId = serverId; } + @Override public long getEventLength() { return eventLength; } @@ -69,6 +70,7 @@ public long getPosition() { return nextPosition - eventLength; } + @Override public long getNextPosition() { return nextPosition; } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java index 7f94ccfd..7b22cd9d 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java @@ -15,6 +15,9 @@ */ package com.github.shyiko.mysql.binlog.event; +import java.util.HashMap; +import java.util.Map; + /** * @see Event Meanings for the original * documentation. @@ -25,90 +28,94 @@ public enum EventType { /** * Events of this event type should never occur. Not written to a binary log. */ - UNKNOWN, + UNKNOWN(0), /** * A descriptor event that is written to the beginning of the each binary log file. (In MySQL 4.0 and 4.1, * this event is written only to the first binary log file that the server creates after startup.) This event is * used in MySQL 3.23 through 4.1 and superseded in MySQL 5.0 by {@link #FORMAT_DESCRIPTION}. */ - START_V3, + START_V3(1), /** * Written when an updating statement is done. */ - QUERY, + QUERY(2), /** * Written when mysqld stops. */ - STOP, + STOP(3), /** * Written when mysqld switches to a new binary log file. This occurs when someone issues a FLUSH LOGS statement or * the current binary log file becomes larger than max_binlog_size. + * + * See https://dev.mysql.com/doc/internals/en/rotate-event.html */ - ROTATE, + ROTATE(4), /** * Written every time a statement uses an AUTO_INCREMENT column or the LAST_INSERT_ID() function; precedes other * events for the statement. This is written only before a {@link #QUERY} and is not used in case of RBR. */ - INTVAR, + INTVAR(5), /** * Used for LOAD DATA INFILE statements in MySQL 3.23. */ - LOAD, + LOAD(6), /** * Not used. */ - SLAVE, + SLAVE(7), /** * Used for LOAD DATA INFILE statements in MySQL 4.0 and 4.1. */ - CREATE_FILE, + CREATE_FILE(8), /** * Used for LOAD DATA INFILE statements as of MySQL 4.0. */ - APPEND_BLOCK, + APPEND_BLOCK(9), /** * Used for LOAD DATA INFILE statements in 4.0 and 4.1. */ - EXEC_LOAD, + EXEC_LOAD(10), /** * Used for LOAD DATA INFILE statements as of MySQL 4.0. */ - DELETE_FILE, + DELETE_FILE(11), /** * Used for LOAD DATA INFILE statements in MySQL 4.0 and 4.1. */ - NEW_LOAD, + NEW_LOAD(12), /** * Written every time a statement uses the RAND() function; precedes other events for the statement. Indicates the * seed values to use for generating a random number with RAND() in the next statement. This is written only * before a {@link #QUERY} and is not used in case of RBR. */ - RAND, + RAND(13), /** * Written every time a statement uses a user variable; precedes other events for the statement. Indicates the * value to use for the user variable in the next statement. This is written only before a {@link #QUERY} and * is not used in case of RBR. */ - USER_VAR, + USER_VAR(14), /** * A descriptor event that is written to the beginning of the each binary log file. * This event is used as of MySQL 5.0; it supersedes {@link #START_V3}. + * + * See: https://dev.mysql.com/doc/internals/en/format-description-event.html */ - FORMAT_DESCRIPTION, + FORMAT_DESCRIPTION(15), /** * Generated for a commit of a transaction that modifies one or more tables of an XA-capable storage engine. * Normal transactions are implemented by sending a {@link #QUERY} containing a BEGIN statement and a {@link #QUERY} * containing a COMMIT statement (or a ROLLBACK statement if the transaction is rolled back). */ - XID, + XID(16), /** * Used for LOAD DATA INFILE statements as of MySQL 5.0. */ - BEGIN_LOAD_QUERY, + BEGIN_LOAD_QUERY(17), /** * Used for LOAD DATA INFILE statements as of MySQL 5.0. */ - EXECUTE_LOAD_QUERY, + EXECUTE_LOAD_QUERY(18), /** * This event precedes each row operation event. It maps a table definition to a number, where the table definition * consists of database and table names and column definitions. The purpose of this event is to enable replication @@ -117,82 +124,145 @@ public enum EventType { * of TABLE_MAP events: one per table used by events in the sequence. * Used in case of RBR. */ - TABLE_MAP, + TABLE_MAP(19), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_WRITE_ROWS, + PRE_GA_WRITE_ROWS(20), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_UPDATE_ROWS, + PRE_GA_UPDATE_ROWS(21), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_DELETE_ROWS, + PRE_GA_DELETE_ROWS(22), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - WRITE_ROWS, + WRITE_ROWS_V1(23), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - UPDATE_ROWS, + UPDATE_ROWS_V1(24), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - DELETE_ROWS, + DELETE_ROWS_V1(25), /** * Used to log an out of the ordinary event that occurred on the master. It notifies the slave that something * happened on the master that might cause data to be in an inconsistent state. */ - INCIDENT, + INCIDENT(26), /** * Sent by a master to a slave to let the slave know that the master is still alive. Not written to a binary log. */ - HEARTBEAT, + HEARTBEAT_LOG(27), /** * In some situations, it is necessary to send over ignorable data to the slave: data that a slave can handle in * case there is code for handling it, but which can be ignored if it is not recognized. */ - IGNORABLE, + IGNORABLE_LOG(28), /** * Introduced to record the original query for rows events in RBR. */ - ROWS_QUERY, + ROWS_QUERY_LOG(29), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_WRITE_ROWS, + WRITE_ROWS(30), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_UPDATE_ROWS, + UPDATE_ROWS(31), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_DELETE_ROWS, + DELETE_ROWS(32), + /** * Global Transaction Identifier. + * + * MySQL 5.6 GTID events + */ + GTID_LOG(33), + ANONYMOUS_GTID_LOG(34), + PREVIOUS_GTIDS_LOG(35), + + /** + * MySQL 5.7 Events */ - GTID, - ANONYMOUS_GTID, - PREVIOUS_GTIDS, - TRANSACTION_CONTEXT, - VIEW_CHANGE, + TRANSACTION_CONTEXT(36), + VIEW_CHANGE(37), /** * Prepared XA transaction terminal event similar to XID except that it is specific to XA transaction. */ - XA_PREPARE; + XA_PREPARE_LOG(38), + + /** + * New Maria event numbers start from here + */ + MARIA_ANNOTATE_ROWS(160), + /** + * Binlog checkpoint event. Used for XA crash recovery on the master, not used + * in replication. + * A binlog checkpoint event specifies a binlog file such that XA crash + * recovery can start from that file - and it is guaranteed to find all XIDs + * that are prepared in storage engines but not yet committed. + */ + MARIA_BINLOG_CHECKPOINT(161), + /** + * Gtid event. For global transaction ID, used to start a new event group, + * instead of the old BEGIN query event, and also to mark stand-alone + * events. + */ + MARIA_GTID(162), + /** + * Gtid list event. Logged at the start of every binlog, to record the + * current replication state. This consists of the last GTID seen for + * each replication domain. + */ + MARIA_GTID_LIST(163), + + MARIA_START_ENCRYPTION(164), + + /** + * Compressed binlog event. + + * Note that the order between WRITE/UPDATE/DELETE events is significant; + * this is so that we can convert from the compressed to the uncompressed + * event type with (type-WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT) + * and similar for _V1. + */ + MARIA_QUERY_COMPRESSED(165), + MARIA_WRITE_ROWS_COMPRESSED_V1(166), + MARIA_UPDATE_ROWS_COMPRESSED_V1(167), + MARIA_DELETE_ROWS_COMPRESSED_V1(168), + MARIA_WRITE_ROWS_COMPRESSED(169), + MARIA_UPDATE_ROWS_COMPRESSED(170), + MARIA_DELETE_ROWS_COMPRESSED(171); + + private int index; + + EventType(int index) { + this.index = index; + } + + public static final Map EVENT_TYPES = new HashMap(); + static { + for (EventType eventType: EventType.values()) { + EVENT_TYPES.put(eventType.index, eventType); + } + } public static boolean isRowMutation(EventType eventType) { return EventType.isWrite(eventType) || @@ -202,20 +272,27 @@ public static boolean isRowMutation(EventType eventType) { public static boolean isWrite(EventType eventType) { return eventType == PRE_GA_WRITE_ROWS || + eventType == WRITE_ROWS_V1 || eventType == WRITE_ROWS || - eventType == EXT_WRITE_ROWS; + eventType == MARIA_WRITE_ROWS_COMPRESSED_V1 || + eventType == MARIA_WRITE_ROWS_COMPRESSED; + } public static boolean isUpdate(EventType eventType) { return eventType == PRE_GA_UPDATE_ROWS || + eventType == UPDATE_ROWS_V1 || eventType == UPDATE_ROWS || - eventType == EXT_UPDATE_ROWS; + eventType == MARIA_UPDATE_ROWS_COMPRESSED_V1 || + eventType == MARIA_UPDATE_ROWS_COMPRESSED; } public static boolean isDelete(EventType eventType) { return eventType == PRE_GA_DELETE_ROWS || + eventType == DELETE_ROWS_V1 || eventType == DELETE_ROWS || - eventType == EXT_DELETE_ROWS; + eventType == MARIA_DELETE_ROWS_COMPRESSED_V1 || + eventType == MARIA_DELETE_ROWS_COMPRESSED; } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/RowEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/RowEventData.java new file mode 100644 index 00000000..ec4e7c93 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/RowEventData.java @@ -0,0 +1,27 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.event; + +import java.io.Serializable; + +/** + * @author Jin Huang + */ +public interface RowEventData extends Serializable { + + long getTableId(); + +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/UpdateRowsEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/UpdateRowsEventData.java index 169dd21d..2805cc41 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/UpdateRowsEventData.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/UpdateRowsEventData.java @@ -24,7 +24,7 @@ /** * @author Stanley Shyiko */ -public class UpdateRowsEventData implements EventData { +public class UpdateRowsEventData implements EventData, RowEventData { private long tableId; private BitSet includedColumnsBeforeUpdate; diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/WriteRowsEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/WriteRowsEventData.java index 5c99f9f2..3da84313 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/WriteRowsEventData.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/WriteRowsEventData.java @@ -23,7 +23,7 @@ /** * @author Stanley Shyiko */ -public class WriteRowsEventData implements EventData { +public class WriteRowsEventData implements EventData, RowEventData { private long tableId; private BitSet includedColumns; diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java index 5ca3e692..42cfcfa7 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java @@ -96,28 +96,28 @@ private void registerDefaultEventDataDeserializers() { new TableMapEventDataDeserializer()); eventDataDeserializers.put(EventType.XID, new XidEventDataDeserializer()); - eventDataDeserializers.put(EventType.WRITE_ROWS, + eventDataDeserializers.put(EventType.WRITE_ROWS_V1, new WriteRowsEventDataDeserializer(tableMapEventByTableId)); - eventDataDeserializers.put(EventType.UPDATE_ROWS, + eventDataDeserializers.put(EventType.UPDATE_ROWS_V1, new UpdateRowsEventDataDeserializer(tableMapEventByTableId)); - eventDataDeserializers.put(EventType.DELETE_ROWS, + eventDataDeserializers.put(EventType.DELETE_ROWS_V1, new DeleteRowsEventDataDeserializer(tableMapEventByTableId)); - eventDataDeserializers.put(EventType.EXT_WRITE_ROWS, + eventDataDeserializers.put(EventType.WRITE_ROWS, new WriteRowsEventDataDeserializer(tableMapEventByTableId). setMayContainExtraInformation(true)); - eventDataDeserializers.put(EventType.EXT_UPDATE_ROWS, + eventDataDeserializers.put(EventType.UPDATE_ROWS, new UpdateRowsEventDataDeserializer(tableMapEventByTableId). setMayContainExtraInformation(true)); - eventDataDeserializers.put(EventType.EXT_DELETE_ROWS, + eventDataDeserializers.put(EventType.DELETE_ROWS, new DeleteRowsEventDataDeserializer(tableMapEventByTableId). setMayContainExtraInformation(true)); - eventDataDeserializers.put(EventType.ROWS_QUERY, + eventDataDeserializers.put(EventType.ROWS_QUERY_LOG, new RowsQueryEventDataDeserializer()); - eventDataDeserializers.put(EventType.GTID, + eventDataDeserializers.put(EventType.GTID_LOG, new GtidEventDataDeserializer()); - eventDataDeserializers.put(EventType.PREVIOUS_GTIDS, + eventDataDeserializers.put(EventType.PREVIOUS_GTIDS_LOG, new PreviousGtidSetDeserializer()); - eventDataDeserializers.put(EventType.XA_PREPARE, + eventDataDeserializers.put(EventType.XA_PREPARE_LOG, new XAPrepareEventDataDeserializer()); } @@ -307,6 +307,10 @@ public EventDataDeserializer getEventDataDeserializer(EventType eventType) { return eventDataDeserializer != null ? eventDataDeserializer : defaultEventDataDeserializer; } + public Map getTableMapEventByTableId() { + return tableMapEventByTableId; + } + /** * @see CompatibilityMode#DATE_AND_TIME_AS_LONG * @see CompatibilityMode#DATE_AND_TIME_AS_LONG_MICRO diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java index 2d8bcdd8..4071bb1c 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java @@ -26,8 +26,6 @@ */ public class EventHeaderV4Deserializer implements EventHeaderDeserializer { - private static final EventType[] EVENT_TYPES = EventType.values(); - @Override public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOException { EventHeaderV4 header = new EventHeaderV4(); @@ -40,11 +38,11 @@ public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOExce return header; } - private static EventType getEventType(int ordinal) throws IOException { - if (ordinal >= EVENT_TYPES.length) { - throw new IOException("Unknown event type " + ordinal); + private static EventType getEventType(int index) throws IOException { + if (EventType.EVENT_TYPES.get(index) == null) { + throw new IOException("Unknown event type " + index); } - return EVENT_TYPES[ordinal]; + return EventType.EVENT_TYPES.get(index); } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java index 350b8709..23903fcb 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java @@ -43,7 +43,8 @@ public ByteArrayInputStream(byte[] bytes) { public int readInteger(int length) throws IOException { int result = 0; for (int i = 0; i < length; ++i) { - result |= (this.read() << (i << 3)); + int a = this.read(); + result |= (a << (i << 3)); } return result; } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java index df033afa..d3513c67 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java @@ -15,28 +15,34 @@ */ package com.github.shyiko.mysql.binlog; -import com.github.shyiko.mysql.binlog.event.QueryEventData; -import com.github.shyiko.mysql.binlog.event.XidEventData; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; +import static org.testng.Assert.assertNotEquals; +import static org.testng.AssertJUnit.assertNotNull; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.concurrent.TimeUnit; -import static org.testng.Assert.assertNotEquals; -import static org.testng.AssertJUnit.assertNotNull; +import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.XidEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; /** * @author Ben Osheroff */ -public class BinaryLogClientGTIDIntegrationTest extends BinaryLogClientIntegrationTest { +public class BinaryLogClientGTIDIntegrationTest extends BinaryLogClientIntegrationTestBase { @BeforeClass - private void enableGTID() throws SQLException { + private void enableGTID() throws Exception { + if (masterMysqlVersion.contains(MARIADB_VERSION_SUBSTR)) { + throw new SkipException("Skipping GTID test for MariaDB"); + } + MySQLConnection[] servers = {slave, master}; for (MySQLConnection m : servers) { m.execute(new Callback() { @@ -58,6 +64,10 @@ public void execute(Statement statement) throws SQLException { @AfterClass(alwaysRun = true) private void disableGTID() throws SQLException { + if (masterMysqlVersion.contains(MARIADB_VERSION_SUBSTR)) { + return; + } + MySQLConnection[] servers = {slave, master}; for (MySQLConnection m : servers) { m.execute(new Callback() { diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index 71dfb524..77355f16 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -15,35 +15,20 @@ */ package com.github.shyiko.mysql.binlog; -import com.github.shyiko.mysql.binlog.event.ByteArrayEventData; -import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; -import com.github.shyiko.mysql.binlog.event.Event; -import com.github.shyiko.mysql.binlog.event.EventData; -import com.github.shyiko.mysql.binlog.event.EventHeaderV4; -import com.github.shyiko.mysql.binlog.event.EventType; -import com.github.shyiko.mysql.binlog.event.QueryEventData; -import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; -import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; -import com.github.shyiko.mysql.binlog.event.deserialization.ByteArrayEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer; -import com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream; -import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; -import com.github.shyiko.mysql.binlog.network.AuthenticationException; -import com.github.shyiko.mysql.binlog.network.ServerException; -import com.github.shyiko.mysql.binlog.network.SocketFactory; -import org.mockito.InOrder; -import org.testng.SkipException; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; +import static com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.CompatibilityMode; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; -import javax.xml.bind.DatatypeConverter; -import java.io.Closeable; import java.io.EOFException; import java.io.FilterInputStream; import java.io.FilterOutputStream; @@ -55,8 +40,6 @@ import java.math.MathContext; import java.net.Socket; import java.net.SocketException; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; @@ -66,8 +49,8 @@ import java.util.Calendar; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.ResourceBundle; -import java.util.TimeZone; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -78,26 +61,37 @@ import java.util.logging.Level; import java.util.logging.Logger; -import static com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.CompatibilityMode; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.only; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; +import javax.xml.bind.DatatypeConverter; + +import org.mockito.InOrder; +import org.testng.SkipException; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.github.shyiko.mysql.binlog.event.ByteArrayEventData; +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventData; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.ByteArrayEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer; +import com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import com.github.shyiko.mysql.binlog.network.AuthenticationException; +import com.github.shyiko.mysql.binlog.network.ServerException; +import com.github.shyiko.mysql.binlog.network.SocketFactory; /** * @author Stanley Shyiko */ -public class BinaryLogClientIntegrationTest { - - protected static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(3); +public class BinaryLogClientIntegrationTest extends BinaryLogClientIntegrationTestBase { private final Logger logger = Logger.getLogger(getClass().getSimpleName()); @@ -105,47 +99,40 @@ public class BinaryLogClientIntegrationTest { logger.setLevel(Level.FINEST); } - private final TimeZone timeZoneBeforeTheTest = TimeZone.getDefault(); + public void waitForLatestBinlogEvent(BinaryLogClient client) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final Random r = new Random(); + final String tableName = "table" + r.nextInt(9999999); + final String markerQuery = "drop table if exists " + tableName; - protected MySQLConnection master, slave; - protected BinaryLogClient client; - protected CountDownEventListener eventListener; + BinaryLogClient.EventListener markerInterceptor = new BinaryLogClient.EventListener() { + @Override + public void onEvent(Event event) { + if (event.getHeader().getEventType() == EventType.QUERY) { + EventData data = event.getData(); + if (data != null && ((QueryEventData) data).getSql().contains(tableName)) { + latch.countDown(); + } + } + } + }; + client.registerEventListener(markerInterceptor); - @BeforeClass - public void setUp() throws Exception { - TimeZone.setDefault(TimeZone.getTimeZone("GMT")); - ResourceBundle bundle = ResourceBundle.getBundle("jdbc"); - String prefix = "jdbc.mysql.replication."; - master = new MySQLConnection(bundle.getString(prefix + "master.hostname"), - Integer.parseInt(bundle.getString(prefix + "master.port")), - bundle.getString(prefix + "master.username"), bundle.getString(prefix + "master.password")); - slave = new MySQLConnection(bundle.getString(prefix + "slave.hostname"), - Integer.parseInt(bundle.getString(prefix + "slave.port")), - bundle.getString(prefix + "slave.superUsername"), bundle.getString(prefix + "slave.superPassword")); - client = new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password); - EventDeserializer eventDeserializer = new EventDeserializer(); - eventDeserializer.setCompatibilityMode(CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, - CompatibilityMode.DATE_AND_TIME_AS_LONG); - client.setEventDeserializer(eventDeserializer); - client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances - client.setKeepAlive(false); - client.registerEventListener(new TraceEventListener()); - client.registerEventListener(eventListener = new CountDownEventListener()); - client.registerLifecycleListener(new TraceLifecycleListener()); - client.connect(DEFAULT_TIMEOUT); master.execute(new Callback() { @Override public void execute(Statement statement) throws SQLException { - statement.execute("drop database if exists mbcj_test"); - statement.execute("create database mbcj_test"); - statement.execute("use mbcj_test"); + statement.execute(markerQuery); } }); - eventListener.waitFor(EventType.QUERY, 2, DEFAULT_TIMEOUT); + assertTrue(latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)); + client.unregisterEventListener(markerInterceptor); + eventListener.reset(); } @BeforeMethod public void beforeEachTest() throws Exception { + waitForLatestBinlogEvent(client); + eventListener.reset(); master.execute(new Callback() { @Override public void execute(Statement statement) throws SQLException { @@ -157,6 +144,33 @@ public void execute(Statement statement) throws SQLException { eventListener.reset(); } + @AfterMethod + public void afterEachTest() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final String markerQuery = "drop table if exists _EOS_marker"; + BinaryLogClient.EventListener markerInterceptor = new BinaryLogClient.EventListener() { + @Override + public void onEvent(Event event) { + if (event.getHeader().getEventType() == EventType.QUERY) { + EventData data = event.getData(); + if (data != null && ((QueryEventData) data).getSql().contains("_EOS_marker")) { + latch.countDown(); + } + } + } + }; + client.registerEventListener(markerInterceptor); + master.execute(new Callback() { + @Override + public void execute(Statement statement) throws SQLException { + statement.execute(markerQuery); + } + }); + assertTrue(latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)); + client.unregisterEventListener(markerInterceptor); + eventListener.reset(); + } + @Test public void testWriteUpdateDeleteEvents() throws Exception { CapturingEventListener capturingEventListener = new CapturingEventListener(); @@ -362,6 +376,7 @@ public void testDeserializationOfGEOMETRY() throws Exception { @Test public void testFSP() throws Exception { +// Assume.assumeTrue(!masterMysqlVersion.contains(MARIADB_VERSION_SUBSTR)); try { master.execute(new Callback() { @Override @@ -372,13 +387,15 @@ public void execute(Statement statement) throws SQLException { } catch (SQLSyntaxErrorException e) { throw new SkipException("MySQL < 5.6.4+"); } - assertEquals(writeAndCaptureRow("datetime(0)", "'1989-03-21 01:02:03.777777'"), new Serializable[]{ +// assertEquals(writeAndCaptureRow("datetime(0)", "'1989-03-21 01:02:03.777777'"), new Serializable[]{ +// generateTime(1989, 3, 21, 1, 2, 4, 0)}); + assertEquals(writeAndCaptureRow("datetime(0)", "'1989-03-21 01:02:04.000000'"), new Serializable[]{ generateTime(1989, 3, 21, 1, 2, 4, 0)}); - assertEquals(writeAndCaptureRow("datetime(1)", "'1989-03-21 01:02:03.777777'"), new Serializable[]{ + assertEquals(writeAndCaptureRow("datetime(1)", "'1989-03-21 01:02:03.800000'"), new Serializable[]{ generateTime(1989, 3, 21, 1, 2, 3, 800)}); - assertEquals(writeAndCaptureRow("datetime(2)", "'1989-03-21 01:02:03.777777'"), new Serializable[]{ + assertEquals(writeAndCaptureRow("datetime(2)", "'1989-03-21 01:02:03.780000'"), new Serializable[]{ generateTime(1989, 3, 21, 1, 2, 3, 780)}); - assertEquals(writeAndCaptureRow("datetime(3)", "'1989-03-21 01:02:03.777777'"), new Serializable[]{ + assertEquals(writeAndCaptureRow("datetime(3)", "'1989-03-21 01:02:03.778000'"), new Serializable[]{ generateTime(1989, 3, 21, 1, 2, 3, 778)}); assertEquals(writeAndCaptureRow("datetime(3)", "'1989-03-21 01:02:03.777'"), new Serializable[]{ generateTime(1989, 3, 21, 1, 2, 3, 777)}); @@ -775,11 +792,11 @@ public void testCustomEventDataDeserializers() throws Exception { try { client.disconnect(); final BinaryLogClient binaryLogClient = new BinaryLogClient(slave.hostname, slave.port, - slave.username, slave.password); + slave.username, slave.password); binaryLogClient.registerEventListener(new TraceEventListener()); binaryLogClient.registerEventListener(eventListener); EventDeserializer deserializer = new EventDeserializer(); - deserializer.setEventDataDeserializer(EventType.QUERY, new ByteArrayEventDataDeserializer()); +// deserializer.setEventDataDeserializer(EventType.QUERY, new ByteArrayEventDataDeserializer()); // TABLE_MAP and ROTATE events are both used internally, but that doesn't mean it shouldn't be possible to // specify different EventDataDeserializer|s deserializer.setEventDataDeserializer(EventType.TABLE_MAP, new ByteArrayEventDataDeserializer()); @@ -789,6 +806,7 @@ public void testCustomEventDataDeserializers() throws Exception { eventListener.reset(); binaryLogClient.connect(DEFAULT_TIMEOUT); eventListener.waitFor(EventType.FORMAT_DESCRIPTION, 1, DEFAULT_TIMEOUT); + master.execute(new Callback() { @Override public void execute(Statement statement) throws SQLException { @@ -801,9 +819,9 @@ public void execute(Statement statement) throws SQLException { statement.execute("flush logs"); } }); - eventListener.waitFor(EventType.QUERY, 1, DEFAULT_TIMEOUT); +// eventListener.waitFor(EventType.QUERY, 1, DEFAULT_TIMEOUT); eventListener.waitFor(EventType.ROTATE, 3, DEFAULT_TIMEOUT); /* 2 with timestamp 0 */ - eventListener.waitFor(ByteArrayEventData.class, 5, DEFAULT_TIMEOUT); + eventListener.waitFor(ByteArrayEventData.class, 4, DEFAULT_TIMEOUT); } finally { binaryLogClient.disconnect(); } @@ -1001,190 +1019,4 @@ public void testMySQL8TableMetadata() throws Exception { eventListener.waitFor(WriteRowsEventData.class, 1, DEFAULT_TIMEOUT); } - @AfterMethod - public void afterEachTest() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - final String markerQuery = "drop table if exists _EOS_marker"; - BinaryLogClient.EventListener markerInterceptor = new BinaryLogClient.EventListener() { - @Override - public void onEvent(Event event) { - if (event.getHeader().getEventType() == EventType.QUERY) { - EventData data = event.getData(); - if (data != null && ((QueryEventData) data).getSql().contains("_EOS_marker")) { - latch.countDown(); - } - } - } - }; - client.registerEventListener(markerInterceptor); - master.execute(new Callback() { - @Override - public void execute(Statement statement) throws SQLException { - statement.execute(markerQuery); - } - }); - assertTrue(latch.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)); - client.unregisterEventListener(markerInterceptor); - eventListener.reset(); - } - - @AfterClass(alwaysRun = true) - public void tearDown() throws Exception { - TimeZone.setDefault(timeZoneBeforeTheTest); - try { - if (client != null) { - client.disconnect(); - } - } finally { - if (slave != null) { - slave.close(); - } - if (master != null) { - master.execute(new Callback() { - @Override - public void execute(Statement statement) throws SQLException { - statement.execute("drop database mbcj_test"); - } - }); - master.close(); - } - } - } - - /** - * Representation of a MySQL connection. - */ - public static final class MySQLConnection implements Closeable { - - private final String hostname; - private final int port; - private final String username; - private final String password; - private Connection connection; - - public MySQLConnection(String hostname, int port, String username, String password) - throws ClassNotFoundException, SQLException { - this.hostname = hostname; - this.port = port; - this.username = username; - this.password = password; - Class.forName("com.mysql.jdbc.Driver"); - connect(); - } - - private void connect() throws SQLException { - this.connection = DriverManager.getConnection("jdbc:mysql://" + hostname + ":" + port + - "?serverTimezone=UTC", username, password); - execute(new Callback() { - - @Override - public void execute(Statement statement) throws SQLException { - statement.execute("SET time_zone = '+00:00'"); - } - }); - } - - public String hostname() { - return hostname; - } - - public int port() { - return port; - } - - public String username() { - return username; - } - - public String password() { - return password; - } - - public void execute(Callback callback, boolean autocommit) throws SQLException { - connection.setAutoCommit(autocommit); - Statement statement = connection.createStatement(); - try { - callback.execute(statement); - if (!autocommit) { - connection.commit(); - } - } finally { - statement.close(); - } - } - - public void execute(Callback callback) throws SQLException { - execute(callback, false); - } - - public void execute(final String...statements) throws SQLException { - execute(new Callback() { - @Override - public void execute(Statement statement) throws SQLException { - for (String command : statements) { - statement.execute(command); - } - } - }); - } - - public void query(String sql, Callback callback) throws SQLException { - connection.setAutoCommit(false); - Statement statement = connection.createStatement(); - try { - ResultSet rs = statement.executeQuery(sql); - try { - callback.execute(rs); - connection.commit(); - } finally { - rs.close(); - } - } finally { - statement.close(); - } - } - - @Override - public void close() throws IOException { - try { - connection.close(); - } catch (SQLException e) { - throw new IOException(e); - } - } - - public void reconnect() throws IOException, SQLException { - close(); - connect(); - } - } - - /** - * Callback used in the {@link MySQLConnection#execute(Callback)} method. - * - * @param the type of argument - */ - public interface Callback { - - void execute(T obj) throws SQLException; - } - - /** - * Used to simulate {@link SocketException} inside - * {@link QueryEventDataDeserializer#deserialize(ByteArrayInputStream)} (once). - */ - protected class QueryEventFailureSimulator extends QueryEventDataDeserializer { - private boolean failureSimulated; - - @Override - public QueryEventData deserialize(ByteArrayInputStream inputStream) throws IOException { - QueryEventData eventData = super.deserialize(inputStream); - if (!failureSimulated) { - failureSimulated = true; - throw new SocketException(); - } - return eventData; - } - } - } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTestBase.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTestBase.java new file mode 100644 index 00000000..b0376a29 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTestBase.java @@ -0,0 +1,266 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog; + +import static com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.CompatibilityMode; + +import java.io.Closeable; +import java.io.IOException; +import java.net.SocketException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ResourceBundle; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +/** + * @author Stanley Shyiko + */ +public class BinaryLogClientIntegrationTestBase { + + public static final String MARIADB_VERSION_SUBSTR = "MariaDB"; + + protected static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(20); + protected final TimeZone timeZoneBeforeTheTest = TimeZone.getDefault(); + + protected MySQLConnection master, slave; + protected BinaryLogClient client; + protected CountDownEventListener eventListener; + protected String masterMysqlVersion; + protected String slaveMysqlVersion; + + @BeforeClass + public void setUp() throws Exception { + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + ResourceBundle bundle = ResourceBundle.getBundle("jdbc"); + String prefix = "jdbc.mysql.replication."; + master = new MySQLConnection(bundle.getString(prefix + "master.hostname"), + Integer.parseInt(bundle.getString(prefix + "master.port")), + bundle.getString(prefix + "master.username"), bundle.getString(prefix + "master.password")); + slave = new MySQLConnection(bundle.getString(prefix + "slave.hostname"), + Integer.parseInt(bundle.getString(prefix + "slave.port")), + bundle.getString(prefix + "slave.superUsername"), bundle.getString(prefix + "slave.superPassword")); + client = new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password); + EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode(CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, + CompatibilityMode.DATE_AND_TIME_AS_LONG); + client.setEventDeserializer(eventDeserializer); + client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances + client.setKeepAlive(false); + client.registerEventListener(new TraceEventListener()); + + eventListener = new CountDownEventListener(); + client.registerEventListener(eventListener); + client.registerLifecycleListener(new TraceLifecycleListener()); + client.connect(DEFAULT_TIMEOUT); + master.execute(new Callback() { + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("drop database if exists mbcj_test"); + statement.execute("create database mbcj_test"); + statement.execute("use mbcj_test"); + } + }); + eventListener.waitFor(EventType.QUERY, 2, DEFAULT_TIMEOUT); + + master.query("select version();", new Callback() { + @Override + public void execute(ResultSet rs) throws SQLException { + rs.next(); + masterMysqlVersion = rs.getString(1); + } + }); + slave.query("select version();", new Callback() { + @Override + public void execute(ResultSet rs) throws SQLException { + rs.next(); + slaveMysqlVersion = rs.getString(1); + } + }); + } + + @AfterClass(alwaysRun = true) + public void tearDown() throws Exception { + TimeZone.setDefault(timeZoneBeforeTheTest); + try { + if (client != null) { + client.disconnect(); + } + } finally { + if (slave != null) { + slave.close(); + } + if (master != null) { + master.execute(new Callback() { + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("drop database mbcj_test"); + } + }); + master.close(); + } + } + } + + /** + * Representation of a MySQL connection. + */ + public static final class MySQLConnection implements Closeable { + + protected final String hostname; + protected final int port; + protected final String username; + protected final String password; + protected Connection connection; + + public MySQLConnection(String hostname, int port, String username, String password) + throws ClassNotFoundException, SQLException { + this.hostname = hostname; + this.port = port; + this.username = username; + this.password = password; + Class.forName("com.mysql.jdbc.Driver"); + connect(); + } + + private void connect() throws SQLException { + this.connection = DriverManager.getConnection("jdbc:mysql://" + hostname + ":" + port + + "?serverTimezone=UTC", username, password); + execute(new Callback() { + + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("SET time_zone = '+00:00'"); + } + }); + } + + public String hostname() { + return hostname; + } + + public int port() { + return port; + } + + public String username() { + return username; + } + + public String password() { + return password; + } + + public void execute(Callback callback, boolean autocommit) throws SQLException { + connection.setAutoCommit(autocommit); + Statement statement = connection.createStatement(); + try { + callback.execute(statement); + if (!autocommit) { + connection.commit(); + } + } finally { + statement.close(); + } + } + + public void execute(Callback callback) throws SQLException { + execute(callback, false); + } + + public void execute(final String...statements) throws SQLException { + execute(new Callback() { + @Override + public void execute(Statement statement) throws SQLException { + for (String command : statements) { + statement.execute(command); + } + } + }); + } + + public void query(String sql, Callback callback) throws SQLException { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + try { + ResultSet rs = statement.executeQuery(sql); + try { + callback.execute(rs); + connection.commit(); + } finally { + rs.close(); + } + } finally { + statement.close(); + } + } + + @Override + public void close() throws IOException { + try { + connection.close(); + } catch (SQLException e) { + throw new IOException(e); + } + } + + public void reconnect() throws IOException, SQLException { + close(); + connect(); + } + } + + /** + * Callback used in the {@link MySQLConnection#execute(Callback)} method. + * + * @param the type of argument + */ + public interface Callback { + + void execute(T obj) throws SQLException; + } + + /** + * Used to simulate {@link SocketException} inside + * {@link QueryEventDataDeserializer#deserialize(ByteArrayInputStream)} (once). + */ + protected class QueryEventFailureSimulator extends QueryEventDataDeserializer { + private boolean failureSimulated; + + @Override + public QueryEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + QueryEventData eventData = super.deserialize(inputStream); + if (!failureSimulated) { + failureSimulated = true; + throw new SocketException(); + } + return eventData; + } + } + +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/CountDownEventListener.java b/src/test/java/com/github/shyiko/mysql/binlog/CountDownEventListener.java index 73a6f5b9..14f0b6fc 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/CountDownEventListener.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/CountDownEventListener.java @@ -18,6 +18,7 @@ import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.QueryEventData; import java.util.HashMap; import java.util.Map; @@ -35,6 +36,15 @@ public class CountDownEventListener implements BinaryLogClient.EventListener { @Override public void onEvent(Event event) { + if (event.getHeader().getEventType() == EventType.QUERY) { + try { + QueryEventData data = event.getData(); + if (data.getSql().startsWith("# Dum")) { + return; + } + } catch (Exception e) { } + } + incrementCounter(getCounter(countersByType, event.getHeader().getEventType())); EventData data = event.getData(); if (data != null) { diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java index 6a5d376c..1f6eb847 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java @@ -32,7 +32,12 @@ public class EventTypeTest { @Test public void testIsWrite() throws Exception { List writeEventTypes = - Arrays.asList(EventType.PRE_GA_WRITE_ROWS, EventType.WRITE_ROWS, EventType.EXT_WRITE_ROWS); + Arrays.asList( + EventType.PRE_GA_WRITE_ROWS, + EventType.WRITE_ROWS_V1, + EventType.WRITE_ROWS, + EventType.MARIA_WRITE_ROWS_COMPRESSED_V1, + EventType.MARIA_WRITE_ROWS_COMPRESSED); for (EventType writeEventType : writeEventTypes) { assertTrue(EventType.isWrite(writeEventType)); } @@ -46,7 +51,12 @@ public void testIsWrite() throws Exception { @Test public void testIsUpdate() throws Exception { List writeEventTypes = - Arrays.asList(EventType.PRE_GA_UPDATE_ROWS, EventType.UPDATE_ROWS, EventType.EXT_UPDATE_ROWS); + Arrays.asList( + EventType.PRE_GA_UPDATE_ROWS, + EventType.UPDATE_ROWS_V1, + EventType.UPDATE_ROWS, + EventType.MARIA_UPDATE_ROWS_COMPRESSED_V1, + EventType.MARIA_UPDATE_ROWS_COMPRESSED); for (EventType writeEventType : writeEventTypes) { assertTrue(EventType.isUpdate(writeEventType)); } @@ -60,7 +70,12 @@ public void testIsUpdate() throws Exception { @Test public void testIsDelete() throws Exception { List writeEventTypes = - Arrays.asList(EventType.PRE_GA_DELETE_ROWS, EventType.DELETE_ROWS, EventType.EXT_DELETE_ROWS); + Arrays.asList( + EventType.PRE_GA_DELETE_ROWS, + EventType.DELETE_ROWS_V1, + EventType.DELETE_ROWS, + EventType.MARIA_DELETE_ROWS_COMPRESSED_V1, + EventType.MARIA_DELETE_ROWS_COMPRESSED); for (EventType writeEventType : writeEventTypes) { assertTrue(EventType.isDelete(writeEventType)); } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java index cdf314f7..cb1efb30 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -17,6 +17,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTest; +import com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTestBase; import com.github.shyiko.mysql.binlog.CapturingEventListener; import com.github.shyiko.mysql.binlog.CountDownEventListener; import com.github.shyiko.mysql.binlog.TraceEventListener; @@ -27,12 +28,14 @@ import com.github.shyiko.mysql.binlog.event.QueryEventData; import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; import org.skyscreamer.jsonassert.JSONAssert; +import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.Serializable; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; import java.sql.Statement; @@ -44,6 +47,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import static com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTestBase.MARIADB_VERSION_SUBSTR; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -56,6 +60,8 @@ public class JsonBinaryValueIntegrationTest { private final Logger logger = Logger.getLogger(getClass().getSimpleName()); + private String masterMysqlVersion; + { logger.setLevel(Level.FINEST); } @@ -102,6 +108,17 @@ public void execute(Statement statement) throws SQLException { } eventListener.waitFor(EventType.QUERY, 3, DEFAULT_TIMEOUT); eventListener.reset(); + + master.query("select version();", new BinaryLogClientIntegrationTestBase.Callback() { + @Override + public void execute(ResultSet rs) throws SQLException { + rs.next(); + masterMysqlVersion = rs.getString(1); + } + }); + if (masterMysqlVersion.contains(MARIADB_VERSION_SUBSTR)) { + throw new SkipException("Skipping GTID test for MariaDB"); + } } @Test diff --git a/supplement/vagrant/mariadb-10.3.15-sandbox-prepackaged/vagrantfile b/supplement/vagrant/mariadb-10.3.15-sandbox-prepackaged/vagrantfile new file mode 100644 index 00000000..18a5584b --- /dev/null +++ b/supplement/vagrant/mariadb-10.3.15-sandbox-prepackaged/vagrantfile @@ -0,0 +1,6 @@ +Vagrant.configure("2") do |config| + config.vm.box = 'jinfwhuang/mariadb-sandbox-prepackaged' + config.vm.box_version = '10.3.15' + config.vm.network :forwarded_port, guest: 33063, host: 33061 + config.vm.network :forwarded_port, guest: 33064, host: 33062 +end diff --git a/supplement/vagrant/mariadb-10.3.15-sandbox/.gitignore b/supplement/vagrant/mariadb-10.3.15-sandbox/.gitignore new file mode 100644 index 00000000..0898481e --- /dev/null +++ b/supplement/vagrant/mariadb-10.3.15-sandbox/.gitignore @@ -0,0 +1,2 @@ +mariadb-10.3.15.box +ubuntu*.log diff --git a/supplement/vagrant/mariadb-10.3.15-sandbox/vagrantfile b/supplement/vagrant/mariadb-10.3.15-sandbox/vagrantfile new file mode 100644 index 00000000..e0c6d4c0 --- /dev/null +++ b/supplement/vagrant/mariadb-10.3.15-sandbox/vagrantfile @@ -0,0 +1,32 @@ +Vagrant.configure("2") do |config| + config.vm.box = "ubuntu/xenial64" + # config.vm.box = "ubuntu/bionic64" + config.vm.provision :shell, :inline => %Q( + +apt-get update +apt-get install -y libdbi-perl libdbd-mysql-perl psmisc + +VERSION=1.31.0 +OS=linux +origin=https://github.com/datacharmer/dbdeployer/releases/download/v$VERSION +wget $origin/dbdeployer-$VERSION.$OS.tar.gz +tar -xzf dbdeployer-$VERSION.$OS.tar.gz +chmod +x dbdeployer-$VERSION.$OS +sudo mv dbdeployer-$VERSION.$OS /usr/local/bin/dbdeployer + +wget https://downloads.mariadb.org/f/mariadb-10.3.15/bintar-linux-x86_64/mariadb-10.3.15-linux-x86_64.tar.gz + +mkdir -p /home/vagrant/mariadb + +dbdeployer --flavor=mariadb --sandbox-binary=mariadb unpack mariadb-10.3.15-linux-x86_64.tar.gz + +dbdeployer --flavor=mariadb --sandbox-binary=mariadb deploy replication --bind-address='0.0.0.0' --remote-access="%" --base-port=33061 --nodes=2 --my-cnf-options log_slave_updates --my-cnf-options log_bin --my-cnf-options binlog_format=ROW 10.3.15 + +sed -i -e "s/exit\ 0/\\/home\\/vagrant\\/sandboxes\\/rsandbox_10_3_15\\/restart_all; exit 0/g" /etc/rc.local + ) + config.vm.network :forwarded_port, guest: 33063, host: 33061 + config.vm.network :forwarded_port, guest: 33064, host: 33062 + config.vm.provider "virtualbox" do |v| + v.memory = 1024 + end +end