|
65 | 65 | import org.apache.hadoop.hbase.Waiter;
|
66 | 66 | import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
|
67 | 67 | import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
| 68 | +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; |
68 | 69 | import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
69 | 70 | import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
70 | 71 | import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
|
@@ -917,4 +918,52 @@ public void testCleanClosedWALs() throws Exception {
|
917 | 918 | assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
918 | 919 | }
|
919 | 920 | }
|
| 921 | + |
| 922 | + /** |
| 923 | + * Tests that we handle EOFException properly if the wal has moved to oldWALs directory. |
| 924 | + * @throws Exception exception |
| 925 | + */ |
| 926 | + @Test |
| 927 | + public void testEOFExceptionInOldWALsDirectory() throws Exception { |
| 928 | + assertEquals(1, logQueue.getQueueSize(fakeWalGroupId)); |
| 929 | + FSHLog fsLog = (FSHLog)log; |
| 930 | + Path emptyLogFile = fsLog.getCurrentFileName(); |
| 931 | + log.rollWriter(true); |
| 932 | + // There will 2 logs in the queue. |
| 933 | + assertEquals(2, logQueue.getQueueSize(fakeWalGroupId)); |
| 934 | + |
| 935 | + Configuration localConf = new Configuration(conf); |
| 936 | + localConf.setInt("replication.source.maxretriesmultiplier", 1); |
| 937 | + localConf.setBoolean("replication.source.eof.autorecovery", true); |
| 938 | + |
| 939 | + try (WALEntryStream entryStream = |
| 940 | + new WALEntryStream(logQueue, fs, localConf, logQueue.getMetrics(), fakeWalGroupId)) { |
| 941 | + // Get the archived dir path for the first wal. |
| 942 | + Path archivePath = entryStream.getArchivedLog(emptyLogFile); |
| 943 | + // Make sure that the wal path is not the same as archived Dir path. |
| 944 | + assertNotEquals(emptyLogFile.toString(), archivePath.toString()); |
| 945 | + assertTrue(fs.exists(archivePath)); |
| 946 | + fs.truncate(archivePath, 0); |
| 947 | + // make sure the size of the wal file is 0. |
| 948 | + assertEquals(0, fs.getFileStatus(archivePath).getLen()); |
| 949 | + } |
| 950 | + |
| 951 | + ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); |
| 952 | + ReplicationSource source = Mockito.mock(ReplicationSource.class); |
| 953 | + when(source.isPeerEnabled()).thenReturn(true); |
| 954 | + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); |
| 955 | + |
| 956 | + // Start the reader thread. |
| 957 | + ReplicationSourceWALReaderThread readerThread = |
| 958 | + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue, 0, |
| 959 | + fs, localConf, getDummyFilter(), logQueue.getMetrics(), source, fakeWalGroupId); |
| 960 | + readerThread.start(); |
| 961 | + // Wait for the replication queue size to be 1. This means that we have handled |
| 962 | + // 0 length wal from oldWALs directory. |
| 963 | + Waiter.waitFor(conf, 10000, new Waiter.Predicate<Exception>() { |
| 964 | + @Override public boolean evaluate() { |
| 965 | + return logQueue.getQueueSize(fakeWalGroupId) == 1; |
| 966 | + } |
| 967 | + }); |
| 968 | + } |
920 | 969 | }
|
0 commit comments