NOTE #1: Any value but null will switch BinaryLogClient into a GTID mode (this will also set binlogFilename
* to "" (provided it's null) forcing MySQL to send events starting from the oldest known binlog (keep in mind
* that connection will fail if gtid_purged is anything but empty (unless
@@ -314,17 +343,30 @@ public String getGtidSet() {
* @see #getGtidSet()
* @see #setGtidSetFallbackToPurged(boolean)
*/
- public void setGtidSet(String gtidSet) {
- if (gtidSet != null && this.binlogFilename == null) {
+ public void setGtidSet(String gtidStr) {
+ if ( gtidStr == null )
+ return;
+
+ this.gtidEnabled = true;
+
+ if (this.binlogFilename == null) {
this.binlogFilename = "";
}
+
synchronized (gtidSetAccessLock) {
- this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;
+ if ( !gtidStr.equals("") ) {
+ if ( MariadbGtidSet.isMariaGtidSet(gtidStr) ) {
+ this.gtidSet = new MariadbGtidSet(gtidStr);
+ } else {
+ this.gtidSet = new GtidSet(gtidStr);
+ }
+ }
}
}
/**
* @see #setGtidSetFallbackToPurged(boolean)
+ * @return whether gtid_purged is used as a fallback
*/
public boolean isGtidSetFallbackToPurged() {
return gtidSetFallbackToPurged;
@@ -340,6 +382,7 @@ public void setGtidSetFallbackToPurged(boolean gtidSetFallbackToPurged) {
/**
* @see #setUseBinlogFilenamePositionInGtidMode(boolean)
+ * @return value of useBinlogFilenamePostionInGtidMode
*/
public boolean isUseBinlogFilenamePositionInGtidMode() {
return useBinlogFilenamePositionInGtidMode;
@@ -483,13 +526,29 @@ public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
+
+ /**
+ * @return true/false depending on whether we've connected to MariaDB. NULL if not connected.
+ */
+ public Boolean getMariaDB() {
+ return isMariaDB;
+ }
+
+ public boolean isUseSendAnnotateRowsEvent() {
+ return useSendAnnotateRowsEvent;
+ }
+
+ public void setUseSendAnnotateRowsEvent(boolean useSendAnnotateRowsEvent) {
+ this.useSendAnnotateRowsEvent = useSendAnnotateRowsEvent;
+ }
/**
* Connect to the replication stream. Note that this method blocks until disconnected.
* @throws AuthenticationException if authentication fails
* @throws ServerException if MySQL server responds with an error
* @throws IOException if anything goes wrong while trying to connect
+ * @throws IllegalStateException if binary log client is already connected
*/
- public void connect() throws IOException {
+ public void connect() throws IOException, IllegalStateException {
if (!connectLock.tryLock()) {
throw new IllegalStateException("BinaryLogClient is already connected");
}
@@ -512,14 +571,16 @@ public void connect() throws IOException {
". Please make sure it's running.", e);
}
GreetingPacket greetingPacket = receiveGreeting();
- authenticate(greetingPacket);
+
+ detectMariaDB(greetingPacket);
+ tryUpgradeToSSL(greetingPacket);
+
+ new Authenticator(greetingPacket, channel, schema, username, password).authenticate();
+ channel.authenticationComplete();
+
connectionId = greetingPacket.getThreadId();
if ("".equals(binlogFilename)) {
- synchronized (gtidSetAccessLock) {
- if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
- gtidSet = new GtidSet(fetchGtidPurged());
- }
- }
+ setupGtidSet();
}
if (binlogFilename == null) {
fetchBinlogFilenameAndPosition();
@@ -530,13 +591,7 @@ public void connect() throws IOException {
}
binlogPosition = 4;
}
- ChecksumType checksumType = fetchBinlogChecksum();
- if (checksumType != ChecksumType.NONE) {
- confirmSupportOfChecksum(checksumType);
- }
- if (heartbeatInterval > 0) {
- enableHeartbeat();
- }
+ setupConnection();
gtid = null;
tx = false;
requestBinaryLogStream();
@@ -573,9 +628,8 @@ public void connect() throws IOException {
}
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
- if (gtidSet != null) {
- ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
- ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
+ if (this.gtidEnabled) {
+ ensureGtidEventDataDeserializer();
}
}
listenForEventPackets();
@@ -589,6 +643,27 @@ public void connect() throws IOException {
}
}
+ private void detectMariaDB(GreetingPacket packet) {
+ String serverVersion = packet.getServerVersion();
+ if ( serverVersion == null )
+ return;
+
+ this.isMariaDB = serverVersion.toLowerCase().contains("mariadb");
+ }
+ /**
+ * Apply additional options for connection before requesting binlog stream.
+ */
+ protected void setupConnection() throws IOException {
+ ChecksumType checksumType = fetchBinlogChecksum();
+ if (checksumType != ChecksumType.NONE) {
+ confirmSupportOfChecksum(checksumType);
+ }
+ setMasterServerId();
+ if (heartbeatInterval > 0) {
+ enableHeartbeat();
+ }
+ }
+
private PacketChannel openChannel() throws IOException {
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
socket.connect(new InetSocketAddress(hostname, port), (int) connectTimeout);
@@ -634,33 +709,75 @@ public Object call() throws Exception {
};
}
- private GreetingPacket receiveGreeting() throws IOException {
- byte[] initialHandshakePacket = channel.read();
- if (initialHandshakePacket[0] == (byte) 0xFF /* error */) {
- byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length);
+ protected void checkError(byte[] packet) throws IOException {
+ if (packet[0] == (byte) 0xFF /* error */) {
+ byte[] bytes = Arrays.copyOfRange(packet, 1, packet.length);
ErrorPacket errorPacket = new ErrorPacket(bytes);
throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
- errorPacket.getSqlState());
+ errorPacket.getSqlState());
}
+ }
+
+ private GreetingPacket receiveGreeting() throws IOException {
+ byte[] initialHandshakePacket = channel.read();
+ checkError(initialHandshakePacket);
+
return new GreetingPacket(initialHandshakePacket);
}
+ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOException {
+ int collation = greetingPacket.getServerCollation();
+
+ if (sslMode != SSLMode.DISABLED) {
+ boolean serverSupportsSSL = (greetingPacket.getServerCapabilities() & ClientCapabilities.SSL) != 0;
+ if (!serverSupportsSSL && (sslMode == SSLMode.REQUIRED || sslMode == SSLMode.VERIFY_CA ||
+ sslMode == SSLMode.VERIFY_IDENTITY)) {
+ throw new IOException("MySQL server does not support SSL");
+ }
+ if (serverSupportsSSL) {
+ SSLRequestCommand sslRequestCommand = new SSLRequestCommand();
+ sslRequestCommand.setCollation(collation);
+ channel.write(sslRequestCommand);
+ SSLSocketFactory sslSocketFactory =
+ this.sslSocketFactory != null ?
+ this.sslSocketFactory :
+ sslMode == SSLMode.REQUIRED || sslMode == SSLMode.PREFERRED ?
+ DEFAULT_REQUIRED_SSL_MODE_SOCKET_FACTORY :
+ DEFAULT_VERIFY_CA_SSL_MODE_SOCKET_FACTORY;
+ channel.upgradeToSSL(sslSocketFactory, null);
+ logger.info("SSL enabled");
+ return true;
+ }
+ }
+ return false;
+ }
+
private void enableHeartbeat() throws IOException {
channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000));
byte[] statementResult = channel.read();
- if (statementResult[0] == (byte) 0xFF /* error */) {
- byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);
- ErrorPacket errorPacket = new ErrorPacket(bytes);
- throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
- errorPacket.getSqlState());
+ checkError(statementResult);
+ }
+
+ private void setMasterServerId() throws IOException {
+ channel.write(new QueryCommand("select @@server_id"));
+ ResultSetRowPacket[] resultSet = readResultSet();
+ if (resultSet.length >= 0) {
+ this.masterServerId = Long.parseLong(resultSet[0].getValue(0));
}
}
- private void requestBinaryLogStream() throws IOException {
+ protected void requestBinaryLogStream() throws IOException {
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
+ if ( this.isMariaDB )
+ requestBinaryLogStreamMaria(serverId);
+ else
+ requestBinaryLogStreamMysql(serverId);
+ }
+
+ private void requestBinaryLogStreamMysql(long serverId) throws IOException {
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
- if (gtidSet != null) {
+ if (this.gtidEnabled) {
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
@@ -672,7 +789,33 @@ private void requestBinaryLogStream() throws IOException {
channel.write(dumpBinaryLogCommand);
}
- private void ensureEventDataDeserializer(EventType eventType,
+ protected void requestBinaryLogStreamMaria(long serverId) throws IOException {
+ Command dumpBinaryLogCommand;
+
+ /*
+ https://jira.mariadb.org/browse/MDEV-225
+ */
+ channel.write(new QueryCommand("SET @mariadb_slave_capability=4"));
+ checkError(channel.read());
+
+ synchronized (gtidSetAccessLock) {
+ if (this.gtidEnabled) {
+ logger.info(gtidSet.toString());
+ channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'"));
+ checkError(channel.read());
+ channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0"));
+ checkError(channel.read());
+ channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0"));
+ checkError(channel.read());
+ dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent());
+ } else {
+ dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition, isUseSendAnnotateRowsEvent());
+ }
+ }
+ channel.write(dumpBinaryLogCommand);
+ }
+
+ protected void ensureEventDataDeserializer(EventType eventType,
Class extends EventDataDeserializer> eventDataDeserializerClass) {
EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType);
if (eventDataDeserializer.getClass() != eventDataDeserializerClass &&
@@ -689,77 +832,12 @@ private void ensureEventDataDeserializer(EventType eventType,
}
}
- private void authenticate(GreetingPacket greetingPacket) throws IOException {
- int collation = greetingPacket.getServerCollation();
- int packetNumber = 1;
-
- boolean usingSSLSocket = false;
- if (sslMode != SSLMode.DISABLED) {
- boolean serverSupportsSSL = (greetingPacket.getServerCapabilities() & ClientCapabilities.SSL) != 0;
- if (!serverSupportsSSL && (sslMode == SSLMode.REQUIRED || sslMode == SSLMode.VERIFY_CA ||
- sslMode == SSLMode.VERIFY_IDENTITY)) {
- throw new IOException("MySQL server does not support SSL");
- }
- if (serverSupportsSSL) {
- SSLRequestCommand sslRequestCommand = new SSLRequestCommand();
- sslRequestCommand.setCollation(collation);
- channel.write(sslRequestCommand, packetNumber++);
- SSLSocketFactory sslSocketFactory =
- this.sslSocketFactory != null ?
- this.sslSocketFactory :
- sslMode == SSLMode.REQUIRED || sslMode == SSLMode.PREFERRED ?
- DEFAULT_REQUIRED_SSL_MODE_SOCKET_FACTORY :
- DEFAULT_VERIFY_CA_SSL_MODE_SOCKET_FACTORY;
- channel.upgradeToSSL(sslSocketFactory,
- sslMode == SSLMode.VERIFY_IDENTITY ? new TLSHostnameVerifier() : null);
- usingSSLSocket = true;
- }
- }
- AuthenticateCommand authenticateCommand = new AuthenticateCommand(schema, username, password,
- greetingPacket.getScramble());
- authenticateCommand.setCollation(collation);
- channel.write(authenticateCommand, packetNumber);
- byte[] authenticationResult = channel.read();
- if (authenticationResult[0] != (byte) 0x00 /* ok */) {
- if (authenticationResult[0] == (byte) 0xFF /* error */) {
- byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length);
- ErrorPacket errorPacket = new ErrorPacket(bytes);
- throw new AuthenticationException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
- errorPacket.getSqlState());
- } else if (authenticationResult[0] == (byte) 0xFE) {
- switchAuthentication(authenticationResult, usingSSLSocket);
- } else {
- throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")");
- }
- }
- }
-
- private void switchAuthentication(byte[] authenticationResult, boolean usingSSLSocket) throws IOException {
- /*
- Azure-MySQL likes to tell us to switch authentication methods, even though
- we haven't advertised that we support any. It uses this for some-odd
- reason to send the real password scramble.
- */
- ByteArrayInputStream buffer = new ByteArrayInputStream(authenticationResult);
- buffer.read(1);
-
- String authName = buffer.readZeroTerminatedString();
- if ("mysql_native_password".equals(authName)) {
- String scramble = buffer.readZeroTerminatedString();
-
- Command switchCommand = new AuthenticateNativePasswordCommand(scramble, password);
- channel.write(switchCommand, (usingSSLSocket ? 4 : 3));
- byte[] authResult = channel.read();
-
- if (authResult[0] != (byte) 0x00) {
- byte[] bytes = Arrays.copyOfRange(authResult, 1, authResult.length);
- ErrorPacket errorPacket = new ErrorPacket(bytes);
- throw new AuthenticationException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
- errorPacket.getSqlState());
- }
- } else {
- throw new AuthenticationException("Unsupported authentication type: " + authName);
- }
+ protected void ensureGtidEventDataDeserializer() {
+ ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
+ ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
+ ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class);
+ ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class);
+ ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class);
}
private void spawnKeepAliveThread() {
@@ -783,6 +861,7 @@ public void run() {
// expected in case of disconnect
}
if (threadExecutor.isShutdown()) {
+ logger.info("threadExecutor is shut down, terminating keepalive thread");
return;
}
boolean connectionLost = false;
@@ -796,17 +875,13 @@ public void run() {
}
}
if (connectionLost) {
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Trying to restore lost connection to " + hostname + ":" + port);
- }
+ logger.info("Keepalive: Trying to restore lost connection to " + hostname + ":" + port);
try {
terminateConnect();
connect(connectTimeout);
} catch (Exception ce) {
- if (logger.isLoggable(Level.WARNING)) {
- logger.warning("Failed to restore connection to " + hostname + ":" + port +
- ". Next attempt in " + keepAliveInterval + "ms");
- }
+ logger.warning("keepalive: Failed to restore connection to " + hostname + ":" + port +
+ ". Next attempt in " + keepAliveInterval + "ms");
}
}
}
@@ -861,6 +936,9 @@ public void run() {
} catch (IOException e) {
exceptionReference.set(e);
countDownLatch.countDown(); // making sure we don't end up waiting whole "timeout"
+ } catch (Exception e) {
+ exceptionReference.set(new IOException(e)); // method is asynchronous, catch all exceptions so that they are not lost
+ countDownLatch.countDown(); // making sure we don't end up waiting whole "timeout"
}
}
};
@@ -902,6 +980,30 @@ private String fetchGtidPurged() throws IOException {
return "";
}
+ protected void setupGtidSet() throws IOException{
+ if (!this.gtidEnabled)
+ return;
+
+ synchronized (gtidSetAccessLock) {
+ if ( this.isMariaDB ) {
+ if ( gtidSet == null ) {
+ gtidSet = new MariadbGtidSet("");
+ } else if ( !(gtidSet instanceof MariadbGtidSet) ) {
+ throw new RuntimeException("Connected to MariaDB but given a mysql GTID set!");
+ }
+ } else {
+ if ( gtidSet == null && gtidSetFallbackToPurged ) {
+ gtidSet = new GtidSet(fetchGtidPurged());
+ } else if ( gtidSet == null ){
+ gtidSet = new GtidSet("");
+ } else if ( gtidSet instanceof MariadbGtidSet ) {
+ throw new RuntimeException("Connected to Mysql but given a MariaDB GTID set!");
+ }
+ }
+ }
+
+ }
+
private void fetchBinlogFilenameAndPosition() throws IOException {
ResultSetRowPacket[] resultSet;
channel.write(new QueryCommand("show master status"));
@@ -914,7 +1016,7 @@ private void fetchBinlogFilenameAndPosition() throws IOException {
binlogPosition = Long.parseLong(resultSetRow.getValue(1));
}
- private ChecksumType fetchBinlogChecksum() throws IOException {
+ protected ChecksumType fetchBinlogChecksum() throws IOException {
channel.write(new QueryCommand("show global variables like 'binlog_checksum'"));
ResultSetRowPacket[] resultSet = readResultSet();
if (resultSet.length == 0) {
@@ -926,20 +1028,16 @@ private ChecksumType fetchBinlogChecksum() throws IOException {
private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException {
channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum"));
byte[] statementResult = channel.read();
- if (statementResult[0] == (byte) 0xFF /* error */) {
- byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);
- ErrorPacket errorPacket = new ErrorPacket(bytes);
- throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
- errorPacket.getSqlState());
- }
+ checkError(statementResult);
eventDeserializer.setChecksumType(checksumType);
}
- private void listenForEventPackets() throws IOException {
+ protected void listenForEventPackets() throws IOException {
+ abortRequest = false;
ByteArrayInputStream inputStream = channel.getInputStream();
boolean completeShutdown = false;
try {
- while (inputStream.peek() != -1) {
+ while (!abortRequest && inputStream.peek() != -1) {
int packetLength = inputStream.readInteger(3);
inputStream.skip(1); // 1 byte for sequence
int marker = inputStream.read();
@@ -986,6 +1084,7 @@ private void listenForEventPackets() throws IOException {
}
}
} finally {
+ abortRequest = false;
if (isConnected()) {
if (completeShutdown) {
disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
@@ -1027,7 +1126,7 @@ private void updateClientBinlogFilenameAndPosition(Event event) {
}
}
- private void updateGtidSet(Event event) {
+ protected void updateGtidSet(Event event) {
synchronized (gtidSetAccessLock) {
if (gtidSet == null) {
return;
@@ -1049,21 +1148,43 @@ private void updateGtidSet(Event event) {
if (sql == null) {
break;
}
- if ("BEGIN".equals(sql)) {
- tx = true;
- } else
- if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
- commitGtid();
- tx = false;
- } else
- if (!tx) {
- // auto-commit query, likely DDL
- commitGtid();
+ commitGtid(sql);
+ break;
+ case ANNOTATE_ROWS:
+ AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDeserializer.EventDataWrapper.internal(event.getData());
+ sql = annotateRowsEventData.getRowsQuery();
+ if (sql == null) {
+ break;
}
+ commitGtid(sql);
+ break;
+ case MARIADB_GTID:
+ MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDeserializer.EventDataWrapper.internal(event.getData());
+ mariadbGtidEventData.setServerId(eventHeader.getServerId());
+ gtid = mariadbGtidEventData.toString();
+ break;
+ case MARIADB_GTID_LIST:
+ MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDeserializer.EventDataWrapper.internal(event.getData());
+ gtid = mariadbGtidListEventData.getMariaGTIDSet().toString();
+ break;
default:
}
}
+ protected void commitGtid(String sql) {
+ if ("BEGIN".equals(sql)) {
+ tx = true;
+ } else
+ if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
+ commitGtid();
+ tx = false;
+ } else
+ if (!tx) {
+ // auto-commit query, likely DDL
+ commitGtid();
+ }
+ }
+
private void commitGtid() {
if (gtid != null) {
synchronized (gtidSetAccessLock) {
@@ -1072,17 +1193,14 @@ private void commitGtid() {
}
}
- private ResultSetRowPacket[] readResultSet() throws IOException {
- List If an invalid or proprietary ordinal is passed, EventType.UNKNOWN is returned.
+ */
+ public static EventType forId(int eventId) {
+ for (EventType type : EventType.values()) {
+ if (type.eventId == eventId) return type;
+ }
+
+ return EventType.UNKNOWN;
+ }
public static boolean isRowMutation(EventType eventType) {
return EventType.isWrite(eventType) ||
- EventType.isUpdate(eventType) ||
- EventType.isDelete(eventType);
+ EventType.isUpdate(eventType) ||
+ EventType.isDelete(eventType);
}
public static boolean isWrite(EventType eventType) {
return eventType == PRE_GA_WRITE_ROWS ||
- eventType == WRITE_ROWS ||
- eventType == EXT_WRITE_ROWS;
+ eventType == WRITE_ROWS ||
+ eventType == EXT_WRITE_ROWS;
}
public static boolean isUpdate(EventType eventType) {
return eventType == PRE_GA_UPDATE_ROWS ||
- eventType == UPDATE_ROWS ||
- eventType == EXT_UPDATE_ROWS;
+ eventType == UPDATE_ROWS ||
+ eventType == EXT_UPDATE_ROWS;
}
public static boolean isDelete(EventType eventType) {
return eventType == PRE_GA_DELETE_ROWS ||
- eventType == DELETE_ROWS ||
- eventType == EXT_DELETE_ROWS;
+ eventType == DELETE_ROWS ||
+ eventType == EXT_DELETE_ROWS;
}
+ public static EventType byEventNumber(int num) {
+ for (EventType type : EventType.values()) {
+ if (type.eventId == num) {
+ return type;
+ }
+ }
+ return null;
+ }
}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/LRUCache.java b/src/main/java/com/github/shyiko/mysql/binlog/event/LRUCache.java
new file mode 100644
index 00000000..784cec0e
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/LRUCache.java
@@ -0,0 +1,19 @@
+package com.github.shyiko.mysql.binlog.event;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class LRUCache This option is going to be enabled by default starting from mysql-binlog-connector-java@1.0.0.
*/
- CHAR_AND_BINARY_AS_BYTE_ARRAY
+ CHAR_AND_BINARY_AS_BYTE_ARRAY,
+ /**
+ * Return TINY/SHORT/INT24/LONG/LONGLONG values as byte[]|s (instead of int|s).
+ */
+ INTEGER_AS_BYTE_ARRAY
}
/**
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java
index 2d8bcdd8..27fa617b 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java
@@ -26,25 +26,15 @@
*/
public class EventHeaderV4Deserializer implements EventHeaderDeserializer
- *
*
* Our check() methods throw exceptions if the name is
+ * invalid, whereas javax.net.ssl.HostnameVerifier just returns true/false.
+ *
+ * We provide the HostnameVerifier.DEFAULT, HostnameVerifier.STRICT, and
+ * HostnameVerifier.ALLOW_ALL implementations. We also provide the more
+ * specialized HostnameVerifier.DEFAULT_AND_LOCALHOST, as well as
+ * HostnameVerifier.STRICT_IE6. But feel free to define your own
+ * implementations!
+ *
+ * Inspired by Sebastian Hauer's original StrictSSLProtocolSocketFactory in the
+ * HttpClient "contrib" repository.
+ *
+ * @author Julius Davies
+ * @author Sebastian Hauer
+ * @since 8-Dec-2006
+ */
+public interface HostnameChecker extends javax.net.ssl.HostnameVerifier {
+
+ boolean verify(String host, SSLSession session);
+
+ void check(String host, SSLSocket ssl) throws IOException;
+
+ void check(String host, X509Certificate cert) throws SSLException;
+
+ void check(String host, String[] cns, String[] subjectAlts)
+ throws SSLException;
+
+ void check(String[] hosts, SSLSocket ssl) throws IOException;
+
+ void check(String[] hosts, X509Certificate cert) throws SSLException;
+
+
+ /**
+ * Checks to see if the supplied hostname matches any of the supplied CNs
+ * or "DNS" Subject-Alts. Most implementations only look at the first CN,
+ * and ignore any additional CNs. Most implementations do look at all of
+ * the "DNS" Subject-Alts. The CNs or Subject-Alts may contain wildcards
+ * according to RFC 2818.
+ *
+ * @param cns CN fields, in order, as extracted from the X.509
+ * certificate.
+ * @param subjectAlts Subject-Alt fields of type 2 ("DNS"), as extracted
+ * from the X.509 certificate.
+ * @param hosts The array of hostnames to verify.
+ * @throws SSLException If verification failed.
+ */
+ void check(String[] hosts, String[] cns, String[] subjectAlts)
+ throws SSLException;
+
+
+ /**
+ * The DEFAULT HostnameVerifier works the same way as Curl and Firefox.
+ *
+ * The hostname must match either the first CN, or any of the subject-alts.
+ * A wildcard can occur in the CN, and in any of the subject-alts.
+ *
+ * The only difference between DEFAULT and STRICT is that a wildcard (such
+ * as "*.foo.com") with DEFAULT matches all subdomains, including
+ * "a.b.foo.com".
+ */
+ public final static HostnameChecker DEFAULT =
+ new AbstractChecker() {
+ public final void check(final String[] hosts, final String[] cns,
+ final String[] subjectAlts)
+ throws SSLException {
+ check(hosts, cns, subjectAlts, false, false);
+ }
+
+ public final String toString() { return "DEFAULT"; }
+ };
+
+
+ /**
+ * The DEFAULT_AND_LOCALHOST HostnameVerifier works like the DEFAULT
+ * one with one additional relaxation: a host of "localhost",
+ * "localhost.localdomain", "127.0.0.1", "::1" will always pass, no matter
+ * what is in the server's certificate.
+ */
+ public final static HostnameChecker DEFAULT_AND_LOCALHOST =
+ new AbstractChecker() {
+ public final void check(final String[] hosts, final String[] cns,
+ final String[] subjectAlts)
+ throws SSLException {
+ if (isLocalhost(hosts[0])) {
+ return;
+ }
+ check(hosts, cns, subjectAlts, false, false);
+ }
+
+ public final String toString() { return "DEFAULT_AND_LOCALHOST"; }
+ };
+
+ /**
+ * The STRICT HostnameVerifier works the same way as java.net.URL in Sun
+ * Java 1.4, Sun Java 5, Sun Java 6. It's also pretty close to IE6.
+ * This implementation appears to be compliant with RFC 2818 for dealing
+ * with wildcards.
+ *
+ * The hostname must match either the first CN, or any of the subject-alts.
+ * A wildcard can occur in the CN, and in any of the subject-alts. The
+ * one divergence from IE6 is how we only check the first CN. IE6 allows
+ * a match against any of the CNs present. We decided to follow in
+ * Sun Java 1.4's footsteps and only check the first CN.
+ *
+ * A wildcard such as "*.foo.com" matches only subdomains in the same
+ * level, for example "a.foo.com". It does not match deeper subdomains
+ * such as "a.b.foo.com".
+ */
+ public final static HostnameChecker STRICT =
+ new AbstractChecker() {
+ public final void check(final String[] host, final String[] cns,
+ final String[] subjectAlts)
+ throws SSLException {
+ check(host, cns, subjectAlts, false, true);
+ }
+
+ public final String toString() { return "STRICT"; }
+ };
+
+ /**
+ * The STRICT_IE6 HostnameVerifier works just like the STRICT one with one
+ * minor variation: the hostname can match against any of the CN's in the
+ * server's certificate, not just the first one. This behaviour is
+ * identical to IE6's behaviour.
+ */
+ public final static HostnameChecker STRICT_IE6 =
+ new AbstractChecker() {
+ public final void check(final String[] host, final String[] cns,
+ final String[] subjectAlts)
+ throws SSLException {
+ check(host, cns, subjectAlts, true, true);
+ }
+
+ public final String toString() { return "STRICT_IE6"; }
+ };
+
+ /**
+ * The ALLOW_ALL HostnameVerifier essentially turns hostname verification
+ * off. This implementation is a no-op, and never throws the SSLException.
+ */
+ public final static HostnameChecker ALLOW_ALL =
+ new AbstractChecker() {
+ public final void check(final String[] host, final String[] cns,
+ final String[] subjectAlts) {
+ // Allow everything - so never blowup.
+ }
+
+ public final String toString() { return "ALLOW_ALL"; }
+ };
+
+ abstract class AbstractChecker implements HostnameChecker {
+ private final Logger logger = Logger.getLogger(getClass().getName());
+
+ public static String[] getCNs(X509Certificate cert) {
+ try {
+ final String subjectPrincipal = cert.getSubjectX500Principal().getName(X500Principal.RFC2253);
+ final LinkedList
+ * Note: Java doesn't appear able to extract international characters
+ * from the SubjectAlts. It can only extract international characters
+ * from the CN field.
+ *
+ * (Or maybe the version of OpenSSL I'm using to test isn't storing the
+ * international characters correctly in the SubjectAlts?).
+ *
+ * @param cert X509Certificate
+ * @return Array of SubjectALT DNS names stored in the certificate.
+ */
+ public static String[] getDNSSubjectAlts(X509Certificate cert) {
+ LinkedList subjectAltList = new LinkedList();
+ Collection c = null;
+ try {
+ c = cert.getSubjectAlternativeNames();
+ }
+ catch (Exception cpe) { }
+ if (c != null) {
+ Iterator it = c.iterator();
+ while (it.hasNext()) {
+ List list = (List) it.next();
+ int type = ((Integer) list.get(0)).intValue();
+ // If type is 2, then we've got a dNSName
+ if (type == 2) {
+ String s = (String) list.get(1);
+ subjectAltList.add(s);
+ }
+ }
+ }
+ if (!subjectAltList.isEmpty()) {
+ String[] subjectAlts = new String[subjectAltList.size()];
+ subjectAltList.toArray(subjectAlts);
+ return subjectAlts;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * This contains a list of 2nd-level domains that aren't allowed to
+ * have wildcards when combined with country-codes.
+ * For example: [*.co.uk].
+ *
+ * The [*.co.uk] problem is an interesting one. Should we just hope
+ * that CA's would never foolishly allow such a certificate to happen?
+ * Looks like we're the only implementation guarding against this.
+ * Firefox, Curl, Sun Java 1.4, 5, 6 don't bother with this check.
+ */
+ private final static String[] BAD_COUNTRY_2LDS =
+ {"ac", "co", "com", "ed", "edu", "go", "gouv", "gov", "info",
+ "lg", "ne", "net", "or", "org"};
+
+ private final static String[] LOCALHOSTS = {"::1", "127.0.0.1",
+ "localhost",
+ "localhost.localdomain"};
+
+
+ static {
+ // Just in case developer forgot to manually sort the array. :-)
+ Arrays.sort(BAD_COUNTRY_2LDS);
+ Arrays.sort(LOCALHOSTS);
+ }
+
+ protected AbstractChecker() {}
+
+ /**
+ * The javax.net.ssl.HostnameVerifier contract.
+ *
+ * @param host 'hostname' we used to create our socket
+ * @param session SSLSession with the remote server
+ * @return true if the host matched the one in the certificate.
+ */
+ public boolean verify(String host, SSLSession session) {
+ try {
+ Certificate[] certs = session.getPeerCertificates();
+ X509Certificate x509 = (X509Certificate) certs[0];
+ check(new String[]{host}, x509);
+ return true;
+ }
+ catch (SSLException e) {
+ return false;
+ }
+ }
+
+ public void check(String host, SSLSocket ssl) throws IOException {
+ check(new String[]{host}, ssl);
+ }
+
+ public void check(String host, X509Certificate cert)
+ throws SSLException {
+ check(new String[]{host}, cert);
+ }
+
+ public void check(String host, String[] cns, String[] subjectAlts)
+ throws SSLException {
+ check(new String[]{host}, cns, subjectAlts);
+ }
+
+ public void check(String host[], SSLSocket ssl)
+ throws IOException {
+ if (host == null) {
+ throw new NullPointerException("host to verify is null");
+ }
+
+ SSLSession session = ssl.getSession();
+ if (session == null) {
+ // In our experience this only happens under IBM 1.4.x when
+ // spurious (unrelated) certificates show up in the server'
+ // chain. Hopefully this will unearth the real problem:
+ InputStream in = ssl.getInputStream();
+ in.available();
+ /*
+ If you're looking at the 2 lines of code above because
+ you're running into a problem, you probably have two
+ options:
+
+ #1. Clean up the certificate chain that your server
+ is presenting (e.g. edit "/etc/apache2/server.crt"
+ or wherever it is your server's certificate chain
+ is defined).
+
+ OR
+
+ #2. Upgrade to an IBM 1.5.x or greater JVM, or switch
+ to a non-IBM JVM.
+ */
+
+ // If ssl.getInputStream().available() didn't cause an
+ // exception, maybe at least now the session is available?
+ session = ssl.getSession();
+ if (session == null) {
+ // If it's still null, probably a startHandshake() will
+ // unearth the real problem.
+ ssl.startHandshake();
+
+ // Okay, if we still haven't managed to cause an exception,
+ // might as well go for the NPE. Or maybe we're okay now?
+ session = ssl.getSession();
+ }
+ }
+ Certificate[] certs;
+ try {
+ certs = session.getPeerCertificates();
+ } catch (SSLPeerUnverifiedException spue) {
+ InputStream in = ssl.getInputStream();
+ in.available();
+ // Didn't trigger anything interesting? Okay, just throw
+ // original.
+ throw spue;
+ }
+ X509Certificate x509 = (X509Certificate) certs[0];
+ check(host, x509);
+ }
+
+ private String commaJoin(String [] input) {
+ if ( input == null ) return "";
+ return String.join(",", Arrays.asList(input));
+ }
+
+ public void check(String[] host, X509Certificate cert)
+ throws SSLException {
+ String[] cns = AbstractChecker.getCNs(cert);
+ String[] subjectAlts = AbstractChecker.getDNSSubjectAlts(cert);
+ logger.log(Level.INFO,
+ "attempting to verify SSL identity '" + commaJoin(host) + "' " +
+ "against cns: [" + commaJoin(cns) + "], " +
+ "subject-alts: [" + commaJoin(subjectAlts) + "]");
+ check(host, cns, subjectAlts);
+ }
+
+ public void check(final String[] hosts, final String[] cns,
+ final String[] subjectAlts, final boolean ie6,
+ final boolean strictWithSubDomains)
+ throws SSLException {
+ // Build up lists of allowed hosts For logging/debugging purposes.
+ StringBuffer buf = new StringBuffer(32);
+ buf.append('<');
+ for (int i = 0; i < hosts.length; i++) {
+ String h = hosts[i];
+ h = h != null ? h.trim().toLowerCase() : "";
+ hosts[i] = h;
+ if (i > 0) {
+ buf.append('/');
+ }
+ buf.append(h);
+ }
+ buf.append('>');
+ String hostnames = buf.toString();
+ // Build the list of names we're going to check. Our DEFAULT and
+ // STRICT implementations of the HostnameVerifier only use the
+ // first CN provided. All other CNs are ignored.
+ // (Firefox, wget, curl, Sun Java 1.4, 5, 6 all work this way).
+ TreeSet names = new TreeSet();
+ if (cns != null && cns.length > 0 && cns[0] != null) {
+ names.add(cns[0]);
+ if (ie6) {
+ for (int i = 1; i < cns.length; i++) {
+ names.add(cns[i]);
+ }
+ }
+ }
+ if (subjectAlts != null) {
+ for (int i = 0; i < subjectAlts.length; i++) {
+ if (subjectAlts[i] != null) {
+ names.add(subjectAlts[i]);
+ }
+ }
+ }
+ if (names.isEmpty()) {
+ String msg = "Certificate for " + hosts[0] + " doesn't contain CN or DNS subjectAlt";
+ throw new SSLException(msg);
+ }
+
+ // StringBuffer for building the error message.
+ buf = new StringBuffer();
+
+ boolean match = false;
+ out:
+ for (Iterator it = names.iterator(); it.hasNext();) {
+ // Don't trim the CN, though!
+ String cn = (String) it.next();
+ cn = cn.toLowerCase();
+ // Store CN in StringBuffer in case we need to report an error.
+ buf.append(" <");
+ buf.append(cn);
+ buf.append('>');
+ if (it.hasNext()) {
+ buf.append(" OR");
+ }
+
+ // The CN better have at least two dots if it wants wildcard
+ // action. It also can't be [*.co.uk] or [*.co.jp] or
+ // [*.org.uk], etc...
+ boolean doWildcard = cn.startsWith("*.") &&
+ cn.lastIndexOf('.') >= 0 &&
+ !isIP4Address(cn) &&
+ acceptableCountryWildcard(cn);
+
+ for (int i = 0; i < hosts.length; i++) {
+ final String hostName = hosts[i].trim().toLowerCase();
+ if (doWildcard) {
+ match = hostName.endsWith(cn.substring(1));
+ if (match && strictWithSubDomains) {
+ // If we're in strict mode, then [*.foo.com] is not
+ // allowed to match [a.b.foo.com]
+ match = countDots(hostName) == countDots(cn);
+ }
+ } else {
+ match = hostName.equals(cn);
+ }
+ if (match) {
+ break out;
+ }
+ }
+ }
+ if (!match) {
+ throw new SSLException("hostname in certificate didn't match: " + hostnames + " !=" + buf);
+ }
+ }
+
+ public static boolean isIP4Address(final String cn) {
+ boolean isIP4 = true;
+ String tld = cn;
+ int x = cn.lastIndexOf('.');
+ // We only bother analyzing the characters after the final dot
+ // in the name.
+ if (x >= 0 && x + 1 < cn.length()) {
+ tld = cn.substring(x + 1);
+ }
+ for (int i = 0; i < tld.length(); i++) {
+ if (!Character.isDigit(tld.charAt(0))) {
+ isIP4 = false;
+ break;
+ }
+ }
+ return isIP4;
+ }
+
+ public static boolean acceptableCountryWildcard(final String cn) {
+ int cnLen = cn.length();
+ if (cnLen >= 7 && cnLen <= 9) {
+ // Look for the '.' in the 3rd-last position:
+ if (cn.charAt(cnLen - 3) == '.') {
+ // Trim off the [*.] and the [.XX].
+ String s = cn.substring(2, cnLen - 3);
+ // And test against the sorted array of bad 2lds:
+ int x = Arrays.binarySearch(BAD_COUNTRY_2LDS, s);
+ return x < 0;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isLocalhost(String host) {
+ host = host != null ? host.trim().toLowerCase() : "";
+ if (host.startsWith("::1")) {
+ int x = host.lastIndexOf('%');
+ if (x >= 0) {
+ host = host.substring(0, x);
+ }
+ }
+ int x = Arrays.binarySearch(LOCALHOSTS, host);
+ return x >= 0;
+ }
+
+ /**
+ * Counts the number of dots "." in a string.
+ *
+ * @param s string to count dots from
+ * @return number of dots
+ */
+ public static int countDots(final String s) {
+ int count = 0;
+ for (int i = 0; i < s.length(); i++) {
+ if (s.charAt(i) == '.') {
+ count++;
+ }
+ }
+ return count;
+ }
+ }
+
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/SSLMode.java b/src/main/java/com/github/shyiko/mysql/binlog/network/SSLMode.java
index a5ce7f42..041dd6ea 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/SSLMode.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/SSLMode.java
@@ -16,8 +16,7 @@
package com.github.shyiko.mysql.binlog.network;
/**
- * @see * ssl-mode for the original documentation.
* @author Stanley Shyiko
*/
public enum SSLMode {
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/ServerException.java b/src/main/java/com/github/shyiko/mysql/binlog/network/ServerException.java
index cfd026d2..03ce3b72 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/ServerException.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/ServerException.java
@@ -33,6 +33,7 @@ public ServerException(String message, int errorCode, String sqlState) {
/**
* @see ErrorCode
+ * @return error code
*/
public int getErrorCode() {
return errorCode;
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/TLSHostnameVerifier.java b/src/main/java/com/github/shyiko/mysql/binlog/network/TLSHostnameVerifier.java
deleted file mode 100644
index 28b106d6..00000000
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/TLSHostnameVerifier.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2016 Stanley Shyiko
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.github.shyiko.mysql.binlog.network;
-
-import sun.security.util.HostnameChecker;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-
-/**
- * @author Stanley Shyiko
- */
-public class TLSHostnameVerifier implements HostnameVerifier {
-
- public boolean verify(String hostname, SSLSession session) {
- HostnameChecker checker = HostnameChecker.getInstance(HostnameChecker.TYPE_TLS);
- try {
- Certificate[] peerCertificates = session.getPeerCertificates();
- if (peerCertificates.length > 0 && peerCertificates[0] instanceof X509Certificate) {
- X509Certificate peerCertificate = (X509Certificate) peerCertificates[0];
- try {
- checker.match(hostname, peerCertificate);
- return true;
- } catch (CertificateException ignored) {
- }
- }
- } catch (SSLPeerUnverifiedException ignored) {
- }
- return false;
- }
-
-}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/PacketChannel.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/PacketChannel.java
index fbbe950f..c64e7034 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/PacketChannel.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/PacketChannel.java
@@ -32,7 +32,9 @@
* @author Stanley Shyiko
*/
public class PacketChannel implements Channel {
-
+ private int packetNumber = 0;
+ private boolean authenticationComplete;
+ private boolean isSSL = false;
private Socket socket;
private ByteArrayInputStream inputStream;
private ByteArrayOutputStream outputStream;
@@ -55,36 +57,41 @@ public ByteArrayOutputStream getOutputStream() {
return outputStream;
}
+ public void authenticationComplete() {
+ authenticationComplete = true;
+ }
+
public byte[] read() throws IOException {
int length = inputStream.readInteger(3);
- inputStream.skip(1); //sequence
+ int sequence = inputStream.read(); // sequence
+ if ( sequence != packetNumber++ ) {
+ throw new IOException("unexpected sequence #" + sequence);
+ }
return inputStream.read(length);
}
- public void write(Command command, int packetNumber) throws IOException {
+ public void write(Command command) throws IOException {
byte[] body = command.toByteArray();
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
buffer.writeInteger(body.length, 3); // packet length
- buffer.writeInteger(packetNumber, 1);
+
+ // see https://dev.mysql.com/doc/dev/mysql-server/8.0.11/page_protocol_basic_packets.html#sect_protocol_basic_packets_sequence_id
+ // we only have to maintain a sequence number in the authentication phase.
+ // what the point is, I do not know
+ if ( authenticationComplete ) {
+ packetNumber = 0;
+ }
+
+ buffer.writeInteger(packetNumber++, 1);
+
buffer.write(body, 0, body.length);
+ buffer.flush();
outputStream.write(buffer.toByteArray());
// though it has no effect in case of default (underlying) output stream (SocketOutputStream),
// it may be necessary in case of non-default one
outputStream.flush();
}
- /**
- * @deprecated use {@link #write(Command, int)} instead
- */
- @Deprecated
- public void writeBuffered(Command command, int packetNumber) throws IOException {
- write(command, packetNumber);
- }
-
- public void write(Command command) throws IOException {
- write(command, 0);
- }
-
public void upgradeToSSL(SSLSocketFactory sslSocketFactory, HostnameVerifier hostnameVerifier) throws IOException {
SSLSocket sslSocket = sslSocketFactory.createSocket(this.socket);
sslSocket.startHandshake();
@@ -96,6 +103,11 @@ public void upgradeToSSL(SSLSocketFactory sslSocketFactory, HostnameVerifier hos
throw new IdentityVerificationException("\"" + sslSocket.getInetAddress().getHostName() +
"\" identity was not confirmed");
}
+ isSSL = true;
+ }
+
+ public boolean isSSL() {
+ return isSSL;
}
@Override
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateNativePasswordCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateNativePasswordCommand.java
index f98eced0..25711af0 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateNativePasswordCommand.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateNativePasswordCommand.java
@@ -29,6 +29,6 @@ public AuthenticateNativePasswordCommand(String scramble, String password) {
}
@Override
public byte[] toByteArray() throws IOException {
- return AuthenticateCommand.passwordCompatibleWithMySQL411(password, scramble);
+ return AuthenticateSecurityPasswordCommand.passwordCompatibleWithMySQL411(password, scramble);
}
}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSHA2Command.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSHA2Command.java
new file mode 100644
index 00000000..b776df58
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSHA2Command.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2018 dingxiaobo
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.shyiko.mysql.binlog.network.protocol.command;
+
+import com.github.shyiko.mysql.binlog.io.ByteArrayOutputStream;
+import com.github.shyiko.mysql.binlog.network.ClientCapabilities;
+
+import java.io.IOException;
+import java.security.DigestException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * @author dingxiaobo
+ */
+public class AuthenticateSHA2Command implements Command {
+
+ private String schema;
+ private String username;
+ private String password;
+ private String scramble;
+ private int clientCapabilities;
+ private int collation;
+ private boolean rawPassword = false;
+
+ public AuthenticateSHA2Command(String schema, String username, String password, String scramble, int collation) {
+ this.schema = schema;
+ this.username = username;
+ this.password = password;
+ this.scramble = scramble;
+ this.collation = collation;
+ }
+
+ public AuthenticateSHA2Command(String scramble, String password) {
+ this.rawPassword = true;
+ this.password = password;
+ this.scramble = scramble;
+ }
+
+ public void setClientCapabilities(int clientCapabilities) {
+ this.clientCapabilities = clientCapabilities;
+ }
+
+ @Override
+ public byte[] toByteArray() throws IOException {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+ if ( rawPassword ) {
+ byte[] passwordSHA1 = encodePassword();
+ buffer.write(passwordSHA1);
+ return buffer.toByteArray();
+ }
+
+ int clientCapabilities = this.clientCapabilities;
+ if (clientCapabilities == 0) {
+ clientCapabilities |= ClientCapabilities.LONG_FLAG;
+ clientCapabilities |= ClientCapabilities.PROTOCOL_41;
+ clientCapabilities |= ClientCapabilities.SECURE_CONNECTION;
+ clientCapabilities |= ClientCapabilities.PLUGIN_AUTH;
+ clientCapabilities |= ClientCapabilities.PLUGIN_AUTH_LENENC_CLIENT_DATA;
+
+ if (schema != null) {
+ clientCapabilities |= ClientCapabilities.CONNECT_WITH_DB;
+ }
+ }
+ buffer.writeInteger(clientCapabilities, 4);
+ buffer.writeInteger(0, 4); // maximum packet length
+ buffer.writeInteger(collation, 1);
+ for (int i = 0; i < 23; i++) {
+ buffer.write(0);
+ }
+ buffer.writeZeroTerminatedString(username);
+ byte[] passwordSHA1 = encodePassword();
+ buffer.writeInteger(passwordSHA1.length, 1);
+ buffer.write(passwordSHA1);
+ if (schema != null) {
+ buffer.writeZeroTerminatedString(schema);
+ }
+ buffer.writeZeroTerminatedString("caching_sha2_password");
+
+ return buffer.toByteArray();
+ }
+
+ private byte[] encodePassword() {
+ if (password == null || "".equals(password)) {
+ return new byte[0];
+ }
+ // caching_sha2_password
+ /*
+ * Server does it in 4 steps (see sql/auth/sha2_password_common.cc Generate_scramble::scramble method):
+ *
+ * SHA2(src) => digest_stage1
+ * SHA2(digest_stage1) => digest_stage2
+ * SHA2(digest_stage2, m_rnd) => scramble_stage1
+ * XOR(digest_stage1, scramble_stage1) => scramble
+ */
+ MessageDigest md;
+ try {
+ md = MessageDigest.getInstance("SHA-256");
+
+ int CACHING_SHA2_DIGEST_LENGTH = 32;
+ byte[] dig1 = new byte[CACHING_SHA2_DIGEST_LENGTH];
+ byte[] dig2 = new byte[CACHING_SHA2_DIGEST_LENGTH];
+ byte[] scramble1 = new byte[CACHING_SHA2_DIGEST_LENGTH];
+
+ // SHA2(src) => digest_stage1
+ md.update(password.getBytes(), 0, password.getBytes().length);
+ md.digest(dig1, 0, CACHING_SHA2_DIGEST_LENGTH);
+ md.reset();
+
+ // SHA2(digest_stage1) => digest_stage2
+ md.update(dig1, 0, dig1.length);
+ md.digest(dig2, 0, CACHING_SHA2_DIGEST_LENGTH);
+ md.reset();
+
+ // SHA2(digest_stage2, m_rnd) => scramble_stage1
+ md.update(dig2, 0, dig1.length);
+ md.update(scramble.getBytes(), 0, scramble.getBytes().length);
+ md.digest(scramble1, 0, CACHING_SHA2_DIGEST_LENGTH);
+
+ // XOR(digest_stage1, scramble_stage1) => scramble
+ return CommandUtils.xor(dig1, scramble1);
+ } catch (NoSuchAlgorithmException ex) {
+ throw new RuntimeException(ex);
+ } catch (DigestException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSHA2RSAPasswordCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSHA2RSAPasswordCommand.java
new file mode 100644
index 00000000..e8de8d4f
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSHA2RSAPasswordCommand.java
@@ -0,0 +1,63 @@
+package com.github.shyiko.mysql.binlog.network.protocol.command;
+
+import com.github.shyiko.mysql.binlog.network.AuthenticationException;
+import com.github.shyiko.mysql.binlog.io.ByteArrayOutputStream;
+
+import javax.crypto.Cipher;
+import java.io.IOException;
+import java.security.KeyFactory;
+import java.security.interfaces.RSAPublicKey;
+import java.security.spec.X509EncodedKeySpec;
+import java.util.Base64;
+
+public class AuthenticateSHA2RSAPasswordCommand implements Command {
+ private static final String RSA_METHOD = "RSA/ECB/OAEPWithSHA-1AndMGF1Padding";
+ private final String rsaKey;
+ private final String password;
+ private final String scramble;
+
+ public AuthenticateSHA2RSAPasswordCommand(String rsaKey, String password, String scramble) {
+ this.rsaKey = rsaKey;
+ this.password = password;
+ this.scramble = scramble;
+ }
+
+ @Override
+ public byte[] toByteArray() throws IOException {
+ RSAPublicKey key = decodeKey(rsaKey);
+
+ ByteArrayOutputStream passBuffer = new ByteArrayOutputStream();
+ passBuffer.writeZeroTerminatedString(password);
+
+ byte[] xorBuffer = CommandUtils.xor(passBuffer.toByteArray(), scramble.getBytes());
+ return encrypt(xorBuffer, key, RSA_METHOD);
+ }
+
+ private RSAPublicKey decodeKey(String key) throws AuthenticationException {
+ int beginIndex = key.indexOf("\n") + 1;
+ int endIndex = key.indexOf("-----END PUBLIC KEY-----");
+ String innerKey = key.substring(beginIndex, endIndex).replaceAll("\\n", "");
+
+ Base64.Decoder decoder = Base64.getDecoder();
+ byte[] certificateData = decoder.decode(innerKey.getBytes());
+
+ X509EncodedKeySpec spec = new X509EncodedKeySpec(certificateData);
+ try {
+ KeyFactory kf = KeyFactory.getInstance("RSA");
+ return (RSAPublicKey) kf.generatePublic(spec);
+ } catch (Exception e) {
+ throw new AuthenticationException("Unable to decode public key: " + key);
+ }
+ }
+
+ private byte[] encrypt(byte[] source, RSAPublicKey key, String transformation) throws AuthenticationException {
+ try {
+ Cipher cipher = Cipher.getInstance(transformation);
+ cipher.init(Cipher.ENCRYPT_MODE, key);
+ return cipher.doFinal(source);
+ } catch (Exception e) {
+ throw new AuthenticationException("couldn't encrypt password: " + e.getMessage());
+ }
+ }
+
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSecurityPasswordCommand.java
similarity index 78%
rename from src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateCommand.java
rename to src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSecurityPasswordCommand.java
index a045fe24..744e85fe 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateCommand.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/AuthenticateSecurityPasswordCommand.java
@@ -25,7 +25,7 @@
/**
* @author Stanley Shyiko
*/
-public class AuthenticateCommand implements Command {
+public class AuthenticateSecurityPasswordCommand implements Command {
private String schema;
private String username;
@@ -34,11 +34,12 @@ public class AuthenticateCommand implements Command {
private int clientCapabilities;
private int collation;
- public AuthenticateCommand(String schema, String username, String password, String salt) {
+ public AuthenticateSecurityPasswordCommand(String schema, String username, String password, String salt, int collation) {
this.schema = schema;
this.username = username;
this.password = password;
this.salt = salt;
+ this.collation = collation;
}
public void setClientCapabilities(int clientCapabilities) {
@@ -55,7 +56,10 @@ public byte[] toByteArray() throws IOException {
int clientCapabilities = this.clientCapabilities;
if (clientCapabilities == 0) {
clientCapabilities = ClientCapabilities.LONG_FLAG |
- ClientCapabilities.PROTOCOL_41 | ClientCapabilities.SECURE_CONNECTION;
+ ClientCapabilities.PROTOCOL_41 |
+ ClientCapabilities.SECURE_CONNECTION |
+ ClientCapabilities.PLUGIN_AUTH;
+
if (schema != null) {
clientCapabilities |= ClientCapabilities.CONNECT_WITH_DB;
}
@@ -67,19 +71,26 @@ public byte[] toByteArray() throws IOException {
buffer.write(0);
}
buffer.writeZeroTerminatedString(username);
- byte[] passwordSHA1 = "".equals(password) ? new byte[0] : passwordCompatibleWithMySQL411(password, salt);
+ byte[] passwordSHA1 = passwordCompatibleWithMySQL411(password, salt);
buffer.writeInteger(passwordSHA1.length, 1);
buffer.write(passwordSHA1);
if (schema != null) {
buffer.writeZeroTerminatedString(schema);
}
+ buffer.writeZeroTerminatedString("mysql_native_password");
return buffer.toByteArray();
}
/**
* see mysql/sql/password.c scramble(...)
+ * @param password the password
+ * @param salt salt received from server
+ * @return hashed password
*/
public static byte[] passwordCompatibleWithMySQL411(String password, String salt) {
+ if ( "".equals(password) || password == null )
+ return new byte[0];
+
MessageDigest sha;
try {
sha = MessageDigest.getInstance("SHA-1");
@@ -87,7 +98,7 @@ public static byte[] passwordCompatibleWithMySQL411(String password, String salt
throw new RuntimeException(e);
}
byte[] passwordHash = sha.digest(password.getBytes());
- return xor(passwordHash, sha.digest(union(salt.getBytes(), sha.digest(passwordHash))));
+ return CommandUtils.xor(passwordHash, sha.digest(union(salt.getBytes(), sha.digest(passwordHash))));
}
private static byte[] union(byte[] a, byte[] b) {
@@ -96,13 +107,4 @@ private static byte[] union(byte[] a, byte[] b) {
System.arraycopy(b, 0, r, a.length, b.length);
return r;
}
-
- private static byte[] xor(byte[] a, byte[] b) {
- byte[] r = new byte[a.length];
- for (int i = 0; i < r.length; i++) {
- r[i] = (byte) (a[i] ^ b[i]);
- }
- return r;
- }
-
}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/ByteArrayCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/ByteArrayCommand.java
new file mode 100644
index 00000000..94ee6998
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/ByteArrayCommand.java
@@ -0,0 +1,15 @@
+package com.github.shyiko.mysql.binlog.network.protocol.command;
+
+import java.io.IOException;
+
+public class ByteArrayCommand implements Command {
+ private final byte[] command;
+
+ public ByteArrayCommand(byte[] command) {
+ this.command = command;
+ }
+ @Override
+ public byte[] toByteArray() throws IOException {
+ return command;
+ }
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/CommandUtils.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/CommandUtils.java
new file mode 100644
index 00000000..68c09095
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/CommandUtils.java
@@ -0,0 +1,12 @@
+package com.github.shyiko.mysql.binlog.network.protocol.command;
+
+public class CommandUtils {
+ public static byte[] xor(byte[] input, byte[] against) {
+ byte[] to = new byte[input.length];
+
+ for( int i = 0; i < input.length; i++ ) {
+ to[i] = (byte) (input[i] ^ against[i % against.length]);
+ }
+ return to;
+ }
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java
index 36216c76..b7436aab 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java
@@ -24,9 +24,11 @@
*/
public class DumpBinaryLogCommand implements Command {
+ public static final int BINLOG_SEND_ANNOTATE_ROWS_EVENT = 2;
private long serverId;
private String binlogFilename;
private long binlogPosition;
+ private boolean sendAnnotateRowsEvent;
public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPosition) {
this.serverId = serverId;
@@ -34,12 +36,21 @@ public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPos
this.binlogPosition = binlogPosition;
}
+ public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPosition, boolean sendAnnotateRowsEvent) {
+ this(serverId, binlogFilename, binlogPosition);
+ this.sendAnnotateRowsEvent = sendAnnotateRowsEvent;
+ }
+
@Override
public byte[] toByteArray() throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
buffer.writeInteger(CommandType.BINLOG_DUMP.ordinal(), 1);
buffer.writeLong(this.binlogPosition, 4);
- buffer.writeInteger(0, 2); // flag
+ int binlogFlags = 0;
+ if (sendAnnotateRowsEvent) {
+ binlogFlags |= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
+ }
+ buffer.writeInteger(binlogFlags, 2); // flag
buffer.writeLong(this.serverId, 4);
buffer.writeString(this.binlogFilename);
return buffer.toByteArray();
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/SSLRequestCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/SSLRequestCommand.java
index ea748104..959cf5f9 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/SSLRequestCommand.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/SSLRequestCommand.java
@@ -42,7 +42,9 @@ public byte[] toByteArray() throws IOException {
int clientCapabilities = this.clientCapabilities;
if (clientCapabilities == 0) {
clientCapabilities = ClientCapabilities.LONG_FLAG |
- ClientCapabilities.PROTOCOL_41 | ClientCapabilities.SECURE_CONNECTION;
+ ClientCapabilities.PROTOCOL_41 |
+ ClientCapabilities.SECURE_CONNECTION |
+ ClientCapabilities.PLUGIN_AUTH;
}
clientCapabilities |= ClientCapabilities.SSL;
buffer.writeInteger(clientCapabilities, 4);
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/AbstractIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/AbstractIntegrationTest.java
new file mode 100644
index 00000000..802052f6
--- /dev/null
+++ b/src/test/java/com/github/shyiko/mysql/binlog/AbstractIntegrationTest.java
@@ -0,0 +1,69 @@
+package com.github.shyiko.mysql.binlog;
+
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import org.testng.annotations.BeforeClass;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.TimeZone;
+
+public abstract class AbstractIntegrationTest {
+ protected MySQLConnection master;
+ protected MySQLConnection slave;
+ protected BinaryLogClient client;
+ protected CountDownEventListener eventListener;
+ protected MysqlVersion mysqlVersion;
+
+ protected MysqlOnetimeServerOptions getOptions() {
+ MysqlOnetimeServerOptions options = new MysqlOnetimeServerOptions();
+ options.fullRowMetaData = true;
+ return options;
+ }
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+ mysqlVersion = MysqlOnetimeServer.getVersion();
+ MysqlOnetimeServer masterServer = new MysqlOnetimeServer(getOptions());
+ MysqlOnetimeServer slaveServer = new MysqlOnetimeServer(getOptions());
+
+ masterServer.boot();
+ slaveServer.boot();
+ slaveServer.setupSlave(masterServer.getPort());
+
+ master = new MySQLConnection("127.0.0.1", masterServer.getPort(), "root", "");
+ slave = new MySQLConnection("127.0.0.1", slaveServer.getPort(), "root", "");
+
+ client = new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password);
+ EventDeserializer eventDeserializer = new EventDeserializer();
+ eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY,
+ EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG);
+ client.setEventDeserializer(eventDeserializer);
+ client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances
+ client.setKeepAlive(false);
+ client.registerEventListener(new TraceEventListener());
+ client.registerEventListener(eventListener = new CountDownEventListener());
+ client.registerLifecycleListener(new TraceLifecycleListener());
+ client.connect(BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT);
+ master.execute(new BinaryLogClientIntegrationTest.Callback
+ * string<EOF> The SQL statement (not null-terminated)
+ *
+ *
+ * @author Winger
+ */
+public class AnnotateRowsEventDataDeserializer implements EventDataDeserializer
+ * uint8 GTID sequence
+ * uint4 Replication Domain ID
+ * uint1 Flags
+ *
+ * if flag & FL_GROUP_COMMIT_ID
+ * uint8 commit_id
+ * else
+ * uint6 0
+ *
+ *
+ * @author Winger
+ * @see GTID_EVENT for the original doc
+ */
+public class MariadbGtidEventDataDeserializer implements EventDataDeserializer
+ * uint4 Number of GTIDs
+ * GTID[0]
+ * uint4 Replication Domain ID
+ * uint4 Server_ID
+ * uint8 GTID sequence ...
+ * GTID[n]
+ *
+ *
+ * @author Winger
+ * @see GTID_EVENT for the original doc
+ */
+public class MariadbGtidListEventDataDeserializer implements EventDataDeserializerGrammar
* The grammar of the binary representation of JSON objects are defined in the MySQL codebase in the
* json_binary.h file:
- *
* doc ::= type value
* type ::=
@@ -164,11 +162,21 @@ public class JsonBinary {
* @throws IOException if there is a problem reading or processing the binary representation
*/
public static String parseAsString(byte[] bytes) throws IOException {
+ /* check for mariaDB-format JSON strings inside columns marked JSON */
+ if ( isJSONString(bytes) ) {
+ return new String(bytes);
+ }
JsonStringFormatter handler = new JsonStringFormatter();
parse(bytes, handler);
return handler.getString();
}
+ private static boolean isJSONString(byte[] bytes) {
+ if (bytes[0] > 0x0f)
+ return true;
+ else
+ return false;
+ }
/**
* Parse the MySQL binary representation of a {@code JSON} value and call the supplied {@link JsonFormatter}
* for the various components of the value.
@@ -189,6 +197,7 @@ public JsonBinary(byte[] bytes) {
public JsonBinary(ByteArrayInputStream contents) {
this.reader = contents;
+ this.reader.mark(Integer.MAX_VALUE);
}
public String getString() {
@@ -262,8 +271,6 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException
* json_binary.h file:
*
Grammar
*
- * Grammar
- *
*
* value ::=
* object |
@@ -319,16 +326,21 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException
*/
protected void parseObject(boolean small, JsonFormatter formatter)
throws IOException {
+ // this is terrible, but without a decent seekable InputStream the other way seemed like
+ // a full-on rewrite
+ int objectOffset = this.reader.getPosition();
+
// Read the header ...
int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in");
int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of");
int valueSize = small ? 2 : 4;
// Read each key-entry, consisting of the offset and length of each key ...
- int[] keyLengths = new int[numElements];
+ KeyEntry[] keys = new KeyEntry[numElements];
for (int i = 0; i != numElements; ++i) {
- readUnsignedIndex(numBytes, small, "key offset in"); // unused
- keyLengths[i] = readUInt16();
+ keys[i] = new KeyEntry(
+ readUnsignedIndex(numBytes, small, "key offset in"),
+ readUInt16());
}
// Read each key value value-entry
@@ -373,9 +385,14 @@ protected void parseObject(boolean small, JsonFormatter formatter)
}
// Read each key ...
- String[] keys = new String[numElements];
for (int i = 0; i != numElements; ++i) {
- keys[i] = reader.readString(keyLengths[i]);
+ final int skipBytes = keys[i].index + objectOffset - reader.getPosition();
+ // Skip to a start of a field name if the current position does not point to it
+ // This can happen for MySQL 8
+ if (skipBytes != 0) {
+ reader.fastSkip(skipBytes);
+ }
+ keys[i].name = reader.readString(keys[i].length);
}
// Now parse the values ...
@@ -384,7 +401,7 @@ protected void parseObject(boolean small, JsonFormatter formatter)
if (i != 0) {
formatter.nextEntry();
}
- formatter.name(keys[i]);
+ formatter.name(keys[i].name);
ValueEntry entry = entries[i];
if (entry.resolved) {
Object value = entry.value;
@@ -397,6 +414,8 @@ protected void parseObject(boolean small, JsonFormatter formatter)
}
} else {
// Parse the value ...
+ this.reader.reset();
+ this.reader.fastSkip(objectOffset + entry.index);
parse(entry.type, formatter);
}
}
@@ -463,6 +482,8 @@ protected void parseObject(boolean small, JsonFormatter formatter)
// checkstyle, please ignore MethodLength for the next line
protected void parseArray(boolean small, JsonFormatter formatter)
throws IOException {
+ int arrayOffset = this.reader.getPosition();
+
// Read the header ...
int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in");
int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of");
@@ -527,6 +548,9 @@ protected void parseArray(boolean small, JsonFormatter formatter)
}
} else {
// Parse the value ...
+ this.reader.reset();
+ this.reader.fastSkip(arrayOffset + entry.index);
+
parse(entry.type, formatter);
}
}
@@ -650,7 +674,6 @@ protected void parseString(JsonFormatter formatter) throws IOException {
* See the
* MySQL source code for the logic used in this method.
- *
Grammar
*
*
@@ -946,6 +969,7 @@ protected BigInteger readUInt64() throws IOException {
* to 16383, and so on...
*
* @return the integer value
+ * @throws IOException if we don't encounter an end-of-int marker
*/
protected int readVariableInt() throws IOException {
int length = 0;
@@ -988,6 +1012,26 @@ protected static String asHex(int value) {
return Integer.toHexString(value);
}
+ /**
+ * Class used internally to hold key entry information.
+ */
+ protected static final class KeyEntry {
+
+ protected final int index;
+ protected final int length;
+ protected String name;
+
+ public KeyEntry(int index, int length) {
+ this.index = index;
+ this.length = length;
+ }
+
+ public KeyEntry setKey(String key) {
+ this.name = key;
+ return this;
+ }
+ }
+
/**
* Class used internally to hold value entry information.
*/
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java
index 5ae92c85..58a3b841 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java
@@ -19,6 +19,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.util.Base64;
/**
* A {@link JsonFormatter} implementation that creates a JSON string representation.
@@ -197,7 +198,7 @@ public void valueTimestamp(long secondsPastEpoch, int microSeconds) {
@Override
public void valueOpaque(ColumnType type, byte[] value) {
sb.append('"');
- sb.append(javax.xml.bind.DatatypeConverter.printBase64Binary(value));
+ sb.append(Base64.getEncoder().encodeToString(value));
sb.append('"');
}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/BufferedSocketInputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/BufferedSocketInputStream.java
index 69f00e1a..5095637c 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/io/BufferedSocketInputStream.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/io/BufferedSocketInputStream.java
@@ -60,6 +60,10 @@ public int read(byte[] b, int off, int len) throws IOException {
}
offset = 0;
limit = in.read(buffer, 0, buffer.length);
+
+ if (limit == -1) {
+ return -1;
+ }
}
int bytesRemainingInBuffer = Math.min(len, limit - offset);
System.arraycopy(buffer, offset, b, off, bytesRemainingInBuffer);
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java
index 350b8709..66dbe13a 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java
@@ -27,10 +27,13 @@ public class ByteArrayInputStream extends InputStream {
private InputStream inputStream;
private Integer peek;
+ private Integer pos, markPosition;
private int blockLength = -1;
+ private int initialBlockLength = -1;
public ByteArrayInputStream(InputStream inputStream) {
this.inputStream = inputStream;
+ this.pos = 0;
}
public ByteArrayInputStream(byte[] bytes) {
@@ -39,6 +42,9 @@ public ByteArrayInputStream(byte[] bytes) {
/**
* Read int written in little-endian format.
+ * @param length length of the integer to read
+ * @throws IOException in case of EOF
+ * @return the integer from the binlog
*/
public int readInteger(int length) throws IOException {
int result = 0;
@@ -50,6 +56,9 @@ public int readInteger(int length) throws IOException {
/**
* Read long written in little-endian format.
+ * @param length length of the long to read
+ * @throws IOException in case of EOF
+ * @return the long from the binlog
*/
public long readLong(int length) throws IOException {
long result = 0;
@@ -61,6 +70,9 @@ public long readLong(int length) throws IOException {
/**
* Read fixed length string.
+ * @param length length of string to read
+ * @throws IOException in case of EOF
+ * @return string
*/
public String readString(int length) throws IOException {
return new String(read(length));
@@ -68,6 +80,8 @@ public String readString(int length) throws IOException {
/**
* Read variable-length string. Preceding packed integer indicates the length of the string.
+ * @throws IOException in case of EOF
+ * @return string
*/
public String readLengthEncodedString() throws IOException {
return readString(readPackedInteger());
@@ -75,6 +89,8 @@ public String readLengthEncodedString() throws IOException {
/**
* Read variable-length string. End is indicated by 0x00 byte.
+ * @throws IOException in case of EOF
+ * @return string
*/
public String readZeroTerminatedString() throws IOException {
ByteArrayOutputStream s = new ByteArrayOutputStream();
@@ -95,7 +111,10 @@ public void fill(byte[] bytes, int offset, int length) throws IOException {
while (remaining != 0) {
int read = read(bytes, offset + length - remaining, remaining);
if (read == -1) {
- throw new EOFException();
+ throw new EOFException(
+ String.format("Failed to read remaining %d of %d bytes from position %d. Block length: %d. Initial block length: %d.",
+ remaining, length, pos, blockLength, initialBlockLength)
+ );
}
remaining -= read;
}
@@ -126,6 +145,8 @@ private byte[] reverse(byte[] bytes) {
/**
* @see #readPackedNumber()
+ * @throws IOException in case of malformed number, eof, null, or long
+ * @return integer
*/
public int readPackedInteger() throws IOException {
Number number = readPackedNumber();
@@ -139,12 +160,14 @@ public int readPackedInteger() throws IOException {
}
/**
- * Format (first-byte-based):
- * 0-250 - The first byte is the number (in the range 0-250). No additional bytes are used.
- * 251 - SQL NULL value
- * 252 - Two more bytes are used. The number is in the range 251-0xffff.
- * 253 - Three more bytes are used. The number is in the range 0xffff-0xffffff.
+ * Format (first-byte-based):
+ * 0-250 - The first byte is the number (in the range 0-250). No additional bytes are used.
+ * 251 - SQL NULL value
+ * 252 - Two more bytes are used. The number is in the range 251-0xffff.
+ * 253 - Three more bytes are used. The number is in the range 0xffff-0xffffff.
* 254 - Eight more bytes are used. The number is in the range 0xffffff-0xffffffffffffffff.
+ * @throws IOException in case of malformed number or EOF
+ * @return long or null
*/
public Number readPackedNumber() throws IOException {
int b = this.read();
@@ -187,8 +210,9 @@ public int read() throws IOException {
peek = null;
}
if (result == -1) {
- throw new EOFException();
+ throw new EOFException(String.format("Failed to read next byte from position %d", this.pos));
}
+ this.pos += 1;
return result;
}
@@ -202,6 +226,50 @@ private int readWithinBlockBoundaries() throws IOException {
return inputStream.read();
}
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ if (peek != null) {
+ b[off] = (byte)(int)peek;
+ off += 1;
+ len -= 1;
+ }
+
+ int read = readWithinBlockBoundaries(b, off, len);
+
+ if (read > 0) {
+ this.pos += read;
+ }
+
+ if (peek != null) {
+ peek = null;
+ read = read <= 0 ? 1 : read + 1;
+ }
+
+ return read;
+ }
+
+ private int readWithinBlockBoundaries(byte[] b, int off, int len) throws IOException {
+ if (blockLength == -1) {
+ return inputStream.read(b, off, len);
+ } else if (blockLength == 0) {
+ return -1;
+ }
+
+ int read = inputStream.read(b, off, Math.min(len, blockLength));
+ if (read > 0) {
+ blockLength -= read;
+ }
+ return read;
+ }
+
@Override
public void close() throws IOException {
inputStream.close();
@@ -209,6 +277,7 @@ public void close() throws IOException {
public void enterBlock(int length) {
this.blockLength = length < -1 ? -1 : length;
+ this.initialBlockLength = length;
}
public void skipToTheEndOfTheBlock() throws IOException {
@@ -218,4 +287,46 @@ public void skipToTheEndOfTheBlock() throws IOException {
}
}
+ public int getPosition() {
+ return pos;
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ markPosition = pos;
+ inputStream.mark(readlimit);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return inputStream.markSupported();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ pos = markPosition;
+ inputStream.reset();
+ }
+
+ /**
+ * This method implements fast-forward skipping in the stream.
+ * It can be used if and only if the underlying stream is fully available till its end.
+ * In other cases the regular {@link #skip(long)} method must be used.
+ *
+ * @param n - number of bytes to skip
+ * @return number of bytes skipped
+ * @throws IOException
+ */
+ public synchronized long fastSkip(long n) throws IOException {
+ long skipOf = n;
+ if (blockLength != -1) {
+ skipOf = Math.min(blockLength, skipOf);
+ blockLength -= skipOf;
+ if (blockLength == 0) {
+ blockLength = -1;
+ }
+ }
+ pos += (int) skipOf;
+ return inputStream.skip(skipOf);
+ }
}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayOutputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayOutputStream.java
index 91e4ca44..17840c6f 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayOutputStream.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayOutputStream.java
@@ -35,6 +35,9 @@ public ByteArrayOutputStream(OutputStream outputStream) {
/**
* Write int in little-endian format.
+ * @throws IOException on underlying stream error
+ * @param value integer to write
+ * @param length length in bytes of the integer
*/
public void writeInteger(int value, int length) throws IOException {
for (int i = 0; i < length; i++) {
@@ -44,6 +47,9 @@ public void writeInteger(int value, int length) throws IOException {
/**
* Write long in little-endian format.
+ * @throws IOException on underlying stream error
+ * @param value long to write
+ * @param length length in bytes of the long
*/
public void writeLong(long value, int length) throws IOException {
for (int i = 0; i < length; i++) {
@@ -57,9 +63,13 @@ public void writeString(String value) throws IOException {
/**
* @see ByteArrayInputStream#readZeroTerminatedString()
+ * @param value string to write
+ * @throws IOException on underlying stream error
*/
public void writeZeroTerminatedString(String value) throws IOException {
- write(value.getBytes());
+ if ( value != null )
+ write(value.getBytes());
+
write(0);
}
@@ -68,6 +78,11 @@ public void write(int b) throws IOException {
outputStream.write(b);
}
+ @Override
+ public void write(byte[] bytes) throws IOException {
+ outputStream.write(bytes);
+ }
+
public byte[] toByteArray() {
// todo: whole approach feels wrong
if (outputStream instanceof java.io.ByteArrayOutputStream) {
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java b/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java
index b1910901..dd13946f 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatistics.java
@@ -18,6 +18,7 @@
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
+import com.github.shyiko.mysql.binlog.event.EventType;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -65,6 +66,9 @@ public long getSecondsBehindMaster() {
if (timestamp == 0 || eventHeader == null) {
return -1;
}
+ if (eventHeader.getEventType() == EventType.HEARTBEAT && eventHeader.getTimestamp() == 0) {
+ return 0;
+ }
return (timestamp - eventHeader.getTimestamp()) / 1000;
}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/Authenticator.java b/src/main/java/com/github/shyiko/mysql/binlog/network/Authenticator.java
new file mode 100644
index 00000000..0faf012d
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/Authenticator.java
@@ -0,0 +1,179 @@
+package com.github.shyiko.mysql.binlog.network;
+
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+import com.github.shyiko.mysql.binlog.io.ByteArrayOutputStream;
+import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
+import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
+import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
+import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateNativePasswordCommand;
+import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSHA2Command;
+import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSHA2RSAPasswordCommand;
+import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSecurityPasswordCommand;
+import com.github.shyiko.mysql.binlog.network.protocol.command.ByteArrayCommand;
+import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
+import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class Authenticator {
+ private enum AuthMethod {
+ NATIVE,
+ CACHING_SHA2
+ };
+
+ private final GreetingPacket greetingPacket;
+ private String scramble;
+ private final PacketChannel channel;
+ private final String schema;
+ private final String username;
+ private final String password;
+
+ private final Logger logger = Logger.getLogger(getClass().getName());
+
+ private final String SHA2_PASSWORD = "caching_sha2_password";
+ private final String MYSQL_NATIVE = "mysql_native_password";
+
+ private AuthMethod authMethod = AuthMethod.NATIVE;
+
+ public Authenticator(
+ GreetingPacket greetingPacket,
+ PacketChannel channel,
+ String schema,
+ String username,
+ String password
+ ) {
+ this.greetingPacket = greetingPacket;
+ this.scramble = greetingPacket.getScramble();
+ this.channel = channel;
+ this.schema = schema;
+ this.username = username;
+ this.password = password;
+ }
+
+ public void authenticate() throws IOException {
+ logger.log(Level.FINE, "Begin auth for " + username);
+ int collation = greetingPacket.getServerCollation();
+
+ Command authenticateCommand;
+ if ( SHA2_PASSWORD.equals(greetingPacket.getPluginProvidedData()) ) {
+ authMethod = AuthMethod.CACHING_SHA2;
+ authenticateCommand = new AuthenticateSHA2Command(schema, username, password, scramble, collation);
+ } else {
+ authMethod = AuthMethod.NATIVE;
+ authenticateCommand = new AuthenticateSecurityPasswordCommand(schema, username, password, scramble, collation);
+ }
+
+ channel.write(authenticateCommand);
+ readResult();
+ logger.log(Level.FINE, "Auth complete " + username);
+ }
+
+ private void readResult() throws IOException {
+ byte[] authenticationResult = channel.read();
+ switch(authenticationResult[0]) {
+ case (byte) 0x00:
+ // success
+ return;
+ case (byte) 0xFF:
+ // error
+ byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length);
+ ErrorPacket errorPacket = new ErrorPacket(bytes);
+ throw new AuthenticationException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
+ errorPacket.getSqlState());
+ case (byte) 0xFE:
+ switchAuthentication(authenticationResult);
+ return;
+ default:
+ if ( authMethod == AuthMethod.NATIVE )
+ throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")");
+ else
+ processCachingSHA2Result(authenticationResult);
+ }
+ }
+
+ private void processCachingSHA2Result(byte[] authenticationResult) throws IOException {
+ if (authenticationResult.length < 2)
+ throw new AuthenticationException("caching_sha2_password response too short!");
+
+ ByteArrayInputStream stream = new ByteArrayInputStream(authenticationResult);
+ stream.readPackedInteger(); // throw away length, always 1
+
+ switch(stream.read()) {
+ case 0x03:
+ logger.log(Level.FINE, "cached sha2 auth successful");
+ // successful fast authentication
+ readResult();
+ return;
+ case 0x04:
+ logger.log(Level.FINE, "cached sha2 auth not successful, moving to full auth path");
+ continueCachingSHA2Authentication();
+ }
+ }
+
+ private void continueCachingSHA2Authentication() throws IOException {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ if ( channel.isSSL() ) {
+ // over SSL we simply send the password in cleartext.
+
+ buffer.writeZeroTerminatedString(password);
+
+ Command c = new ByteArrayCommand(buffer.toByteArray());
+ channel.write(c);
+ readResult();
+ } else {
+ // try to download an RSA key
+ buffer.write(0x02);
+ channel.write(new ByteArrayCommand(buffer.toByteArray()));
+
+ ByteArrayInputStream stream = new ByteArrayInputStream(channel.read());
+ int result = stream.read();
+ switch(result) {
+ case 0x01:
+ byte[] rsaKey = new byte[stream.available()];
+ stream.read(rsaKey);
+
+ logger.log(Level.FINE, "received RSA key: " + rsaKey);
+ Command c = new AuthenticateSHA2RSAPasswordCommand(new String(rsaKey), password, scramble);
+ channel.write(c);
+
+ readResult();
+ return;
+ default:
+ throw new AuthenticationException("Unkown response fetching RSA key in caching_sha2_pasword auth: " + result);
+ }
+ }
+ }
+
+ private void switchAuthentication(byte[] authenticationResult) throws IOException {
+ /*
+ Azure-MySQL likes to tell us to switch authentication methods, even though
+ we haven't advertised that we support any. It uses this for some-odd
+ reason to send the real password scramble.
+ */
+ ByteArrayInputStream buffer = new ByteArrayInputStream(authenticationResult);
+ buffer.read(1);
+
+ String authName = buffer.readZeroTerminatedString();
+ if (MYSQL_NATIVE.equals(authName)) {
+ authMethod = AuthMethod.NATIVE;
+
+ this.scramble = buffer.readZeroTerminatedString();
+
+ Command switchCommand = new AuthenticateNativePasswordCommand(scramble, password);
+ channel.write(switchCommand);
+ } else if ( SHA2_PASSWORD.equals(authName) ) {
+ authMethod = AuthMethod.CACHING_SHA2;
+
+ this.scramble = buffer.readZeroTerminatedString();
+ Command authCommand = new AuthenticateSHA2Command(scramble, password);
+ channel.write(authCommand);
+ } else {
+ throw new AuthenticationException("unsupported authentication method: " + authName);
+ }
+
+ readResult();
+ }
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/ClientCapabilities.java b/src/main/java/com/github/shyiko/mysql/binlog/network/ClientCapabilities.java
index c744d5a9..d81e2e1a 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/ClientCapabilities.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/ClientCapabilities.java
@@ -42,6 +42,7 @@ public final class ClientCapabilities {
public static final int MULTI_RESULTS = 1 << 17; /* enable/disable multi-results */
public static final int PS_MULTI_RESULTS = 1 << 18; /* multi-results in ps-protocol */
public static final int PLUGIN_AUTH = 1 << 19; /* client supports plugin authentication */
+ public static final int PLUGIN_AUTH_LENENC_CLIENT_DATA = 1 << 21;
public static final int SSL_VERIFY_SERVER_CERT = 1 << 30;
public static final int REMEMBER_OPTIONS = 1 << 31;
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/DefaultSSLSocketFactory.java b/src/main/java/com/github/shyiko/mysql/binlog/network/DefaultSSLSocketFactory.java
index 0fabaa10..388e95d2 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/DefaultSSLSocketFactory.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/DefaultSSLSocketFactory.java
@@ -30,11 +30,11 @@ public class DefaultSSLSocketFactory implements SSLSocketFactory {
private final String protocol;
public DefaultSSLSocketFactory() {
- this("TLSv1");
+ this("TLSv1.2");
}
/**
- * @param protocol TLSv1, TLSv1.1 or TLSv1.2 (the last two require JDK 7+)
+ * @param protocol TLSv1, TLSv1.1 or TLSv1.2. Since JDK 11.0.11, TLSv1 and TLSv1.1 are no longer supported.
*/
public DefaultSSLSocketFactory(String protocol) {
this.protocol = protocol;
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/ErrorCode.java b/src/main/java/com/github/shyiko/mysql/binlog/network/ErrorCode.java
index e7e46752..2cd4484e 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/ErrorCode.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/ErrorCode.java
@@ -2178,7 +2178,7 @@ public final class ErrorCode {
public static final int ER_TOO_BIG_PRECISION = 1426;
/**
- * For float(M,D), double(M,D) or decimal(M,D), M must be >= D (column '%-.192s').
+ * For float(M,D), double(M,D) or decimal(M,D), M must be >= D (column '%-.192s').
*/
public static final int ER_M_BIGGER_THAN_D = 1427;
@@ -3146,7 +3146,7 @@ public final class ErrorCode {
public static final int WARN_NO_MASTER_INFO = 1617;
/**
- * <%-.64s> option ignored
+ * %-.64s option ignored
*/
public static final int WARN_OPTION_IGNORED = 1618;
@@ -3988,56 +3988,56 @@ public final class ErrorCode {
public static final int ER_CANT_DO_IMPLICIT_COMMIT_IN_TRX_WHEN_GTID_NEXT_IS_SET = 1778;
/**
- * @@GLOBAL.GTID_MODE = ON or UPGRADE_STEP_2 requires @@GLOBAL.ENFORCE_GTID_CONSISTENCY = 1.
+ @@GLOBAL.GTID_MODE = ON or UPGRADE_STEP_2 requires @@GLOBAL.ENFORCE_GTID_CONSISTENCY = 1.
*/
public static final int ER_GTID_MODE_2_OR_3_REQUIRES_ENFORCE_GTID_CONSISTENCY_ON = 1779;
/**
- * @@GLOBAL.GTID_MODE = ON or UPGRADE_STEP_1 or UPGRADE_STEP_2 requires --log-bin and --log-slave-updates.
+ * @@GLOBAL.GTID_MODE = ON or UPGRADE_STEP_1 or UPGRADE_STEP_2 requires --log-bin and --log-slave-updates.
*/
public static final int ER_GTID_MODE_REQUIRES_BINLOG = 1780;
/**
- * @@SESSION.GTID_NEXT cannot be set to UUID:NUMBER when @@GLOBAL.GTID_MODE = OFF.
+ * @@SESSION.GTID_NEXT cannot be set to UUID:NUMBER when @@GLOBAL.GTID_MODE = OFF.
*/
public static final int ER_CANT_SET_GTID_NEXT_TO_GTID_WHEN_GTID_MODE_IS_OFF = 1781;
/**
- * @@SESSION.GTID_NEXT cannot be set to ANONYMOUS when @@GLOBAL.GTID_MODE = ON.
+ * @@SESSION.GTID_NEXT cannot be set to ANONYMOUS when @@GLOBAL.GTID_MODE = ON.
*/
public static final int ER_CANT_SET_GTID_NEXT_TO_ANONYMOUS_WHEN_GTID_MODE_IS_ON = 1782;
/**
- * @@SESSION.GTID_NEXT_LIST cannot be set to a non-NULL value when @@GLOBAL.GTID_MODE = OFF.
+ * @@SESSION.GTID_NEXT_LIST cannot be set to a non-NULL value when @@GLOBAL.GTID_MODE = OFF.
*/
public static final int ER_CANT_SET_GTID_NEXT_LIST_TO_NON_NULL_WHEN_GTID_MODE_IS_OFF = 1783;
/**
- * Found a Gtid_log_event or Previous_gtids_log_event when @@GLOBAL.GTID_MODE = OFF.
+ * Found a Gtid_log_event or Previous_gtids_log_event when @@GLOBAL.GTID_MODE = OFF.
*/
public static final int ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF = 1784;
/**
- * When @@GLOBAL.ENFORCE_GTID_CONSISTENCY = 1, updates to non-transactional tables can only be done in either
+ * When @@GLOBAL.ENFORCE_GTID_CONSISTENCY = 1, updates to non-transactional tables can only be done in either
* autocommitted statements or single-statement transactions, and never in the same statement as updates to
* transactional tables.
*/
public static final int ER_GTID_UNSAFE_NON_TRANSACTIONAL_TABLE = 1785;
/**
- * CREATE TABLE ... SELECT is forbidden when @@GLOBAL.ENFORCE_GTID_CONSISTENCY = 1.
+ * CREATE TABLE ... SELECT is forbidden when @@GLOBAL.ENFORCE_GTID_CONSISTENCY = 1.
*/
public static final int ER_GTID_UNSAFE_CREATE_SELECT = 1786;
/**
- * When @@GLOBAL.ENFORCE_GTID_CONSISTENCY = 1, the statements CREATE TEMPORARY TABLE and DROP TEMPORARY TABLE can
+ * When @@GLOBAL.ENFORCE_GTID_CONSISTENCY = 1, the statements CREATE TEMPORARY TABLE and DROP TEMPORARY TABLE can
* be executed in a non-transactional context only, and require that AUTOCOMMIT = 1.
*/
public static final int ER_GTID_UNSAFE_CREATE_DROP_TEMPORARY_TABLE_IN_TRANSACTION = 1787;
/**
- * The value of @@GLOBAL.GTID_MODE can only change one step at a time: OFF <-> UPGRADE_STEP_1 <-> UPGRADE_STEP_2
- * <-> ON. Also note that this value must be stepped up or down simultaneously on all servers; see the Manual for
+ * The value of @@GLOBAL.GTID_MODE can only change one step at a time: OFF <-> UPGRADE_STEP_1 <-> UPGRADE_STEP_2
+ * <-> ON. Also note that this value must be stepped up or down simultaneously on all servers; see the Manual for
* instructions.
*/
public static final int ER_GTID_MODE_CAN_ONLY_CHANGE_ONE_STEP_AT_A_TIME = 1788;
@@ -4049,7 +4049,7 @@ public final class ErrorCode {
public static final int ER_MASTER_HAS_PURGED_REQUIRED_GTIDS = 1789;
/**
- * @@SESSION.GTID_NEXT cannot be changed by a client that owns a GTID. The client owns %s. Ownership is released
+ * @@SESSION.GTID_NEXT cannot be changed by a client that owns a GTID. The client owns %s. Ownership is released
* on COMMIT or ROLLBACK.
*/
public static final int ER_CANT_SET_GTID_NEXT_WHEN_OWNING_GTID = 1790;
@@ -4291,8 +4291,8 @@ public final class ErrorCode {
public static final int ER_READ_ONLY_MODE = 1836;
/**
- * When @@SESSION.GTID_NEXT is set to a GTID, you must explicitly set it to a different value after a COMMIT or
- * ROLLBACK. Please check GTID_NEXT variable manual page for detailed explanation. Current @@SESSION.GTID_NEXT is
+ * When @@SESSION.GTID_NEXT is set to a GTID, you must explicitly set it to a different value after a COMMIT or
+ * ROLLBACK. Please check GTID_NEXT variable manual page for detailed explanation. Current @@SESSION.GTID_NEXT is
* '%s'.
*/
public static final int ER_GTID_NEXT_TYPE_UNDEFINED_GROUP = 1837;
@@ -4303,27 +4303,27 @@ public final class ErrorCode {
public static final int ER_VARIABLE_NOT_SETTABLE_IN_SP = 1838;
/**
- * @@GLOBAL.GTID_PURGED can only be set when @@GLOBAL.GTID_MODE = ON.
+ * @@GLOBAL.GTID_PURGED can only be set when @@GLOBAL.GTID_MODE = ON.
*/
public static final int ER_CANT_SET_GTID_PURGED_WHEN_GTID_MODE_IS_OFF = 1839;
/**
- * @@GLOBAL.GTID_PURGED can only be set when @@GLOBAL.GTID_EXECUTED is empty.
+ * @@GLOBAL.GTID_PURGED can only be set when @@GLOBAL.GTID_EXECUTED is empty.
*/
public static final int ER_CANT_SET_GTID_PURGED_WHEN_GTID_EXECUTED_IS_NOT_EMPTY = 1840;
/**
- * @@GLOBAL.GTID_PURGED can only be set when there are no ongoing transactions (not even in other clients).
+ * @@GLOBAL.GTID_PURGED can only be set when there are no ongoing transactions (not even in other clients).
*/
public static final int ER_CANT_SET_GTID_PURGED_WHEN_OWNED_GTIDS_IS_NOT_EMPTY = 1841;
/**
- * @@GLOBAL.GTID_PURGED was changed from '%s' to '%s'.
+ * @@GLOBAL.GTID_PURGED was changed from '%s' to '%s'.
*/
public static final int ER_GTID_PURGED_WAS_CHANGED = 1842;
/**
- * @@GLOBAL.GTID_EXECUTED was changed from '%s' to '%s'.
+ * @@GLOBAL.GTID_EXECUTED was changed from '%s' to '%s'.
*/
public static final int ER_GTID_EXECUTED_WAS_CHANGED = 1843;
@@ -4518,7 +4518,7 @@ public final class ErrorCode {
public static final int ER_OLD_TEMPORALS_UPGRADED = 1880;
/**
- * Operation not allowed when innodb_forced_recovery > 0.
+ * Operation not allowed when innodb_forced_recovery > 0.
*/
public static final int ER_INNODB_FORCED_RECOVERY = 1881;
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/HostnameChecker.java b/src/main/java/com/github/shyiko/mysql/binlog/network/HostnameChecker.java
new file mode 100644
index 00000000..059a5f33
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/HostnameChecker.java
@@ -0,0 +1,581 @@
+/*
+ * $HeadURL: file:///opt/dev/not-yet-commons-ssl-SVN-repo/tags/commons-ssl-0.3.17/src/java/org/apache/commons/ssl/HostnameVerifier.java $
+ * $Revision: 121 $
+ * $Date: 2007-11-13 21:26:57 -0800 (Tue, 13 Nov 2007) $
+ *
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ *