Skip to content

Commit 7b49acb

Browse files
author
Michael Schiff
committed
Event carries binlog filename and offset. Simplifies logic for
EventListeners that care about checkpointing progress.
1 parent 6cdecfd commit 7b49acb

File tree

6 files changed

+30
-15
lines changed

6 files changed

+30
-15
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,7 @@ private void listenForEventPackets() throws IOException {
889889
try {
890890
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
891891
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
892-
inputStream);
892+
inputStream, binlogFilename, binlogPosition);
893893
if (event == null) {
894894
throw new EOFException();
895895
}
@@ -1047,7 +1047,8 @@ public void unregisterEventListener(EventListener eventListener) {
10471047

10481048
private void notifyEventListeners(Event event) {
10491049
if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
1050-
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal());
1050+
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal(),
1051+
binlogFilename, binlogPosition);
10511052
}
10521053
synchronized (eventListeners) {
10531054
for (EventListener eventListener : eventListeners) {

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,30 +34,39 @@
3434
*/
3535
public class BinaryLogFileReader implements Closeable {
3636

37-
public static final byte[] MAGIC_HEADER = new byte[]{(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e};
37+
public static final byte[] MAGIC_HEADER = new byte[] {(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e};
3838

3939
private final ByteArrayInputStream inputStream;
4040
private final EventDeserializer eventDeserializer;
41+
private final String filename;
42+
private long offset;
4143

4244
public BinaryLogFileReader(File file) throws IOException {
4345
this(file, new EventDeserializer());
4446
}
4547

4648
public BinaryLogFileReader(File file, EventDeserializer eventDeserializer) throws IOException {
47-
this(file != null ? new BufferedInputStream(new FileInputStream(file)) : null, eventDeserializer);
49+
this(file != null ? file.getName() : null,
50+
file != null ? new BufferedInputStream(new FileInputStream(file)) : null,
51+
eventDeserializer);
4852
}
4953

50-
public BinaryLogFileReader(InputStream inputStream) throws IOException {
51-
this(inputStream, new EventDeserializer());
54+
public BinaryLogFileReader(String filename, InputStream inputStream) throws IOException {
55+
this(filename, inputStream, new EventDeserializer());
5256
}
5357

54-
public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeserializer) throws IOException {
58+
public BinaryLogFileReader(String filename, InputStream inputStream, EventDeserializer eventDeserializer)
59+
throws IOException {
60+
if (filename == null) {
61+
throw new IllegalArgumentException("File name cannot be NULL");
62+
}
5563
if (inputStream == null) {
5664
throw new IllegalArgumentException("Input stream cannot be NULL");
5765
}
5866
if (eventDeserializer == null) {
5967
throw new IllegalArgumentException("Event deserializer cannot be NULL");
6068
}
69+
this.filename = filename;
6170
this.inputStream = new ByteArrayInputStream(inputStream);
6271
try {
6372
byte[] magicHeader = this.inputStream.read(MAGIC_HEADER.length);
@@ -79,7 +88,7 @@ public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeser
7988
* @return deserialized event or null in case of end-of-stream
8089
*/
8190
public Event readEvent() throws IOException {
82-
return eventDeserializer.nextEvent(inputStream);
91+
return eventDeserializer.nextEvent(inputStream, filename, offset++);
8392
}
8493

8594
@Override

src/main/java/com/github/shyiko/mysql/binlog/event/Event.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@ public class Event implements Serializable {
2424

2525
private EventHeader header;
2626
private EventData data;
27+
private String binlogFilename;
28+
private long binlogPosition;
2729

28-
public Event(EventHeader header, EventData data) {
30+
public Event(EventHeader header, EventData data, String binlogFilename, long binlogPosition) {
2931
this.header = header;
3032
this.data = data;
33+
this.binlogFilename = binlogFilename;
34+
this.binlogPosition = binlogPosition;
3135
}
3236

3337
@SuppressWarnings("unchecked")

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,8 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
189189
/**
190190
* @return deserialized event or null in case of end-of-stream
191191
*/
192-
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
192+
public Event nextEvent(ByteArrayInputStream inputStream, String binlogFilename, long binlogOffset)
193+
throws IOException {
193194
if (inputStream.peek() == -1) {
194195
return null;
195196
}
@@ -212,7 +213,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
212213
}
213214
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
214215
}
215-
return new Event(eventHeader, eventData);
216+
return new Event(eventHeader, eventData, binlogFilename, binlogOffset);
216217
}
217218

218219
private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHeader eventHeader,

src/test/java/com/github/shyiko/mysql/binlog/BinaryLogFileReaderIntegrationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public class BinaryLogFileReaderIntegrationTest {
4040

4141
@Test
4242
public void testNextEvent() throws Exception {
43-
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
44-
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
43+
BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila",
44+
new GZIPInputStream(new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
4545
try {
4646
int numberOfEvents = 0;
4747
while ((reader.readEvent()) != null) {
@@ -68,7 +68,7 @@ public void testDeserializationSuppressionByEventType() throws Exception {
6868
EventDeserializer eventDeserializer = new EventDeserializer();
6969
eventDeserializer.setEventDataDeserializer(EventType.XID, new NullEventDataDeserializer());
7070
eventDeserializer.setEventDataDeserializer(EventType.QUERY, new ByteArrayEventDataDeserializer());
71-
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
71+
BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila.gz", new GZIPInputStream(
7272
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")), eventDeserializer);
7373
try {
7474
boolean n = true, b = true;

src/test/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatisticsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,6 @@ private Event generateEvent(long timestamp, EventType type, long serverId, long
107107
header.setEventType(type);
108108
header.setServerId(serverId);
109109
header.setNextPosition(nextPosition);
110-
return new Event(header, null);
110+
return new Event(header, null, "filename", 4);
111111
}
112112
}

0 commit comments

Comments
 (0)