diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index cb7f890f..cb9223d3 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -889,7 +889,7 @@ private void listenForEventPackets() throws IOException { try { event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ? new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) : - inputStream); + inputStream, binlogFilename, binlogPosition); if (event == null) { throw new EOFException(); } @@ -1047,7 +1047,8 @@ public void unregisterEventListener(EventListener eventListener) { private void notifyEventListeners(Event event) { if (event.getData() instanceof EventDeserializer.EventDataWrapper) { - event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal()); + event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal(), + binlogFilename, binlogPosition); } synchronized (eventListeners) { for (EventListener eventListener : eventListeners) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java index 96b79fc6..e0265012 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java @@ -34,30 +34,39 @@ */ public class BinaryLogFileReader implements Closeable { - public static final byte[] MAGIC_HEADER = new byte[]{(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e}; + public static final byte[] MAGIC_HEADER = new byte[] {(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e}; private final ByteArrayInputStream inputStream; private final EventDeserializer eventDeserializer; + private final String filename; + private long offset; public BinaryLogFileReader(File file) throws IOException { this(file, new EventDeserializer()); } public BinaryLogFileReader(File file, EventDeserializer eventDeserializer) throws IOException { - this(file != null ? new BufferedInputStream(new FileInputStream(file)) : null, eventDeserializer); + this(file != null ? file.getName() : null, + file != null ? new BufferedInputStream(new FileInputStream(file)) : null, + eventDeserializer); } - public BinaryLogFileReader(InputStream inputStream) throws IOException { - this(inputStream, new EventDeserializer()); + public BinaryLogFileReader(String filename, InputStream inputStream) throws IOException { + this(filename, inputStream, new EventDeserializer()); } - public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeserializer) throws IOException { + public BinaryLogFileReader(String filename, InputStream inputStream, EventDeserializer eventDeserializer) + throws IOException { + if (filename == null) { + throw new IllegalArgumentException("File name cannot be NULL"); + } if (inputStream == null) { throw new IllegalArgumentException("Input stream cannot be NULL"); } if (eventDeserializer == null) { throw new IllegalArgumentException("Event deserializer cannot be NULL"); } + this.filename = filename; this.inputStream = new ByteArrayInputStream(inputStream); try { byte[] magicHeader = this.inputStream.read(MAGIC_HEADER.length); @@ -79,7 +88,7 @@ public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeser * @return deserialized event or null in case of end-of-stream */ public Event readEvent() throws IOException { - return eventDeserializer.nextEvent(inputStream); + return eventDeserializer.nextEvent(inputStream, filename, offset++); } @Override diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/Event.java b/src/main/java/com/github/shyiko/mysql/binlog/event/Event.java index 37e040d7..867597ce 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/Event.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/Event.java @@ -24,10 +24,14 @@ public class Event implements Serializable { private EventHeader header; private EventData data; + private String binlogFilename; + private long binlogPosition; - public Event(EventHeader header, EventData data) { + public Event(EventHeader header, EventData data, String binlogFilename, long binlogPosition) { this.header = header; this.data = data; + this.binlogFilename = binlogFilename; + this.binlogPosition = binlogPosition; } @SuppressWarnings("unchecked") diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java index 999145f5..cb8ef001 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java @@ -189,7 +189,8 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) { /** * @return deserialized event or null in case of end-of-stream */ - public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { + public Event nextEvent(ByteArrayInputStream inputStream, String binlogFilename, long binlogOffset) + throws IOException { if (inputStream.peek() == -1) { return null; } @@ -212,7 +213,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { } tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent); } - return new Event(eventHeader, eventData); + return new Event(eventHeader, eventData, binlogFilename, binlogOffset); } private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHeader eventHeader, diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogFileReaderIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogFileReaderIntegrationTest.java index 835aa34a..4a39b8f3 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogFileReaderIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogFileReaderIntegrationTest.java @@ -40,8 +40,8 @@ public class BinaryLogFileReaderIntegrationTest { @Test public void testNextEvent() throws Exception { - BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream( - new FileInputStream("src/test/resources/mysql-bin.sakila.gz"))); + BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila", + new GZIPInputStream(new FileInputStream("src/test/resources/mysql-bin.sakila.gz"))); try { int numberOfEvents = 0; while ((reader.readEvent()) != null) { @@ -68,7 +68,7 @@ public void testDeserializationSuppressionByEventType() throws Exception { EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setEventDataDeserializer(EventType.XID, new NullEventDataDeserializer()); eventDeserializer.setEventDataDeserializer(EventType.QUERY, new ByteArrayEventDataDeserializer()); - BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream( + BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila.gz", new GZIPInputStream( new FileInputStream("src/test/resources/mysql-bin.sakila.gz")), eventDeserializer); try { boolean n = true, b = true; diff --git a/src/test/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatisticsTest.java b/src/test/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatisticsTest.java index c512386e..cfbf47dd 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatisticsTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatisticsTest.java @@ -107,6 +107,6 @@ private Event generateEvent(long timestamp, EventType type, long serverId, long header.setEventType(type); header.setServerId(serverId); header.setNextPosition(nextPosition); - return new Event(header, null); + return new Event(header, null, "filename", 4); } }