Skip to content

Commit 7ca8217

Browse files
committed
Implement pending changes processing and enhance key data caching in MerkleTree
1 parent 8438a61 commit 7ca8217

File tree

2 files changed

+160
-39
lines changed

2 files changed

+160
-39
lines changed

src/main/java/io/pwrlabs/database/rocksdb/MerkleTree.java

Lines changed: 116 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,23 @@
66
import io.pwrlabs.util.encoders.ByteArrayWrapper;
77
import io.pwrlabs.util.encoders.Hex;
88
import io.pwrlabs.util.files.FileUtils;
9+
import io.pwrlabs.utils.TriggerEvent;
910
import lombok.Getter;
10-
import lombok.SneakyThrows;
1111
import org.json.JSONObject;
12-
import org.junit.platform.commons.logging.LoggerFactory;
1312
import org.rocksdb.*;
1413

1514
import java.io.File;
1615
import java.io.IOException;
1716
import java.nio.ByteBuffer;
1817
import java.util.*;
18+
import java.util.concurrent.ArrayBlockingQueue;
1919
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.TimeUnit;
2022
import java.util.concurrent.atomic.AtomicBoolean;
2123
import java.util.concurrent.atomic.AtomicInteger;
2224
import java.util.concurrent.locks.ReadWriteLock;
25+
import java.util.concurrent.locks.ReentrantLock;
2326
import java.util.concurrent.locks.ReentrantReadWriteLock;
2427

2528
import static io.pwrlabs.newerror.NewError.errorIf;
@@ -82,7 +85,23 @@ public static List<MerkleTree> getAllOpenMerkleTrees() {
8285
*/
8386
private final Map<ByteArrayWrapper, Node> nodesCache = new ConcurrentHashMap<>();
8487
private final Map<Integer /*level*/, byte[]> hangingNodes = new ConcurrentHashMap<>();
85-
private final Map<ByteArrayWrapper /*Key*/, byte[] /*data*/> keyDataCache = new ConcurrentHashMap<>();
88+
/**
89+
* Stores key–value entries that have already been incorporated into the Merkle tree
90+
* and have contributed to the current root hash.
91+
* This cache is used to quickly retrieve recently committed values without having to
92+
* read them from RocksDB or re-traverse the Merkle tree.
93+
*/
94+
private final Map<ByteArrayWrapper /*Key*/, byte[] /*data*/> keyDataCommittedCache = new ConcurrentHashMap<>();
95+
96+
/**
97+
* Stores key–value entries that have been written or updated but have not yet been
98+
* incorporated into the Merkle tree, and therefore have not affected the current root hash.
99+
* This acts as a staging area for pending state updates before they are committed.
100+
*/
101+
private final Map<ByteArrayWrapper /*Key*/, byte[] /*data*/> keyDataPendingCache = new ConcurrentHashMap<>();
102+
103+
private final LinkedBlockingQueue<PendingChanges> pendingChangesQueue = new LinkedBlockingQueue<>();
104+
86105

87106
@Getter
88107
private int numLeaves = 0;
@@ -98,6 +117,8 @@ public static List<MerkleTree> getAllOpenMerkleTrees() {
98117
* Lock for reading/writing to the tree.
99118
*/
100119
private final ReadWriteLock lock = new ReentrantReadWriteLock();
120+
121+
private final TriggerEvent pendingChangesProcessedEvent = new TriggerEvent();
101122
//endregion
102123

103124
//region ===================== Constructors =====================
@@ -129,6 +150,8 @@ public MerkleTree(String treeName) throws RocksDBException {
129150
} catch (Exception e) {
130151
// Ignore compaction errors
131152
}
153+
154+
initPendingChangesProcessor();
132155
}
133156

134157
public MerkleTree(String treeName, boolean trackTimeOfOperations) throws RocksDBException {
@@ -146,8 +169,13 @@ public byte[] getRootHash() {
146169
errorIfClosed();
147170
getReadLock();
148171
try {
172+
if(keyDataPendingCache.size() > 0) {
173+
pendingChangesProcessedEvent.awaitEvent();
174+
}
149175
if (rootHash == null) return null;
150176
else return Arrays.copyOf(rootHash, rootHash.length);
177+
} catch (InterruptedException e) {
178+
throw new RuntimeException(e);
151179
} finally {
152180
releaseReadLock();
153181
}
@@ -230,18 +258,26 @@ public HashSet<Node> getAllNodes() throws RocksDBException {
230258
*/
231259
public byte[] getData(byte[] key) {
232260
errorIfClosed();
233-
byte[] data = keyDataCache.get(new ByteArrayWrapper(key));
261+
byte[] data = keyDataPendingCache.get(new ByteArrayWrapper(key));
262+
if(data == null) keyDataCommittedCache.get(new ByteArrayWrapper(key));
263+
if (data != null) return data;
264+
265+
try {
266+
return db.get(keyDataHandle, key);
267+
} catch (RocksDBException e) {
268+
throw new RuntimeException(e);
269+
}
270+
}
271+
272+
public byte[] getCommittedData(byte[] key) {
273+
errorIfClosed();
274+
byte[] data = keyDataCommittedCache.get(new ByteArrayWrapper(key));
234275
if (data != null) return data;
235276

236-
long startTime = System.currentTimeMillis();
237277
try {
238278
return db.get(keyDataHandle, key);
239279
} catch (RocksDBException e) {
240280
throw new RuntimeException(e);
241-
} finally {
242-
long endTime = System.currentTimeMillis();
243-
if (trackTimeOfOperations.get() && endTime - startTime > 2)
244-
System.out.println(treeName + " getData completed in " + (endTime - startTime) + " ms");
245281
}
246282
}
247283

@@ -268,27 +304,9 @@ public void addOrUpdateData(byte[] key, byte[] data) throws RocksDBException {
268304

269305
getWriteLock();
270306
try {
271-
// Check if key already exists
272-
byte[] existingData = getData(key);
273-
byte[] oldLeafHash = existingData == null ? null : calculateLeafHash(key, existingData);
274-
275-
// Calculate hash from key and data
276-
byte[] newLeafHash = calculateLeafHash(key, data);
277-
278-
if (oldLeafHash != null && Arrays.equals(oldLeafHash, newLeafHash)) return;
279-
280-
// Store key-data mapping
281-
keyDataCache.put(new ByteArrayWrapper(key), data);
307+
pendingChangesQueue.add(new PendingChanges(key, data));
308+
keyDataPendingCache.put(new ByteArrayWrapper(key), data);
282309
hasUnsavedChanges.set(true);
283-
284-
if (oldLeafHash == null) {
285-
// Key doesn't exist, add new leaf
286-
addLeaf(new Node(newLeafHash));
287-
} else {
288-
// Key exists, update leaf
289-
// First get the old leaf hash
290-
updateLeaf(oldLeafHash, newLeafHash);
291-
}
292310
} finally {
293311
releaseWriteLock();
294312
long endTime = System.currentTimeMillis();
@@ -305,7 +323,7 @@ public void revertUnsavedChanges() {
305323
try {
306324
nodesCache.clear();
307325
hangingNodes.clear();
308-
keyDataCache.clear();
326+
keyDataCommittedCache.clear();
309327

310328
loadMetaData();
311329

@@ -432,7 +450,7 @@ public void flushToDisk() throws RocksDBException {
432450
}
433451
}
434452

435-
for (Map.Entry<ByteArrayWrapper, byte[]> entry : keyDataCache.entrySet()) {
453+
for (Map.Entry<ByteArrayWrapper, byte[]> entry : keyDataCommittedCache.entrySet()) {
436454
batch.put(keyDataHandle, entry.getKey().data(), entry.getValue());
437455
}
438456

@@ -441,7 +459,7 @@ public void flushToDisk() throws RocksDBException {
441459
}
442460

443461
nodesCache.clear();
444-
keyDataCache.clear();
462+
keyDataCommittedCache.clear();
445463
hasUnsavedChanges.set(false);
446464
}
447465
} finally {
@@ -619,7 +637,7 @@ public void update(MerkleTree sourceTree) throws RocksDBException, IOException {
619637
loadMetaData();
620638

621639
nodesCache.clear();
622-
keyDataCache.clear();
640+
keyDataCommittedCache.clear();
623641
hangingNodes.clear();
624642
hasUnsavedChanges.set(false);
625643
treesCloned.incrementAndGet();
@@ -660,7 +678,7 @@ public void clear() throws RocksDBException {
660678

661679
// reset your in-memory state
662680
nodesCache.clear();
663-
keyDataCache.clear();
681+
keyDataCommittedCache.clear();
664682
hangingNodes.clear();
665683
rootHash = null;
666684
numLeaves = depth = 0;
@@ -680,7 +698,7 @@ public JSONObject getRamInfo() {
680698
json.put("numLeaves", numLeaves);
681699
json.put("depth", depth);
682700
json.put("nodeCacheSize", nodesCache.size());
683-
json.put("keyDataCacheSize", keyDataCache.size());
701+
json.put("keyDataCacheSize", keyDataCommittedCache.size());
684702
json.put("hangingNodesCacheSize", hangingNodes.size());
685703
return json;
686704
}
@@ -737,7 +755,8 @@ private void initializeDb() throws RocksDBException {
737755
.setMaxBackgroundJobs(1)
738756
.setInfoLogLevel(InfoLogLevel.FATAL_LEVEL)
739757
.setAllowMmapReads(true)
740-
.setAllowMmapWrites(false); ; // Enable memory-mapped reads for better performance
758+
.setAllowMmapWrites(false);
759+
; // Enable memory-mapped reads for better performance
741760
// (omit setNoBlockCache or any “disable cache” flags)
742761

743762
// 2) Table format: enable a 64 MB off-heap LRU cache
@@ -1001,15 +1020,15 @@ private void errorIfClosed() {
10011020

10021021
private void copyCache(MerkleTree sourceTree) {
10031022
nodesCache.clear();
1004-
keyDataCache.clear();
1023+
keyDataCommittedCache.clear();
10051024
hangingNodes.clear();
10061025

10071026
for (Map.Entry<ByteArrayWrapper, Node> entry : sourceTree.nodesCache.entrySet()) {
10081027
nodesCache.put(entry.getKey(), new Node(entry.getValue()));
10091028
}
10101029

1011-
for (Map.Entry<ByteArrayWrapper, byte[]> entry : sourceTree.keyDataCache.entrySet()) {
1012-
keyDataCache.put(entry.getKey(), Arrays.copyOf(entry.getValue(), entry.getValue().length));
1030+
for (Map.Entry<ByteArrayWrapper, byte[]> entry : sourceTree.keyDataCommittedCache.entrySet()) {
1031+
keyDataCommittedCache.put(entry.getKey(), Arrays.copyOf(entry.getValue(), entry.getValue().length));
10131032
}
10141033

10151034
for (Map.Entry<Integer, byte[]> entry : sourceTree.hangingNodes.entrySet()) {
@@ -1022,6 +1041,53 @@ private void copyCache(MerkleTree sourceTree) {
10221041
hasUnsavedChanges.set(sourceTree.hasUnsavedChanges.get());
10231042
}
10241043

1044+
private void initPendingChangesProcessor() {
1045+
Thread thread = new Thread(() -> {
1046+
while (!closed.get()) {
1047+
try {
1048+
PendingChanges changes = pendingChangesQueue.take();
1049+
1050+
byte[] key = changes.getKey();
1051+
byte[] data = changes.getData();
1052+
1053+
byte[] existingData = getCommittedData(key);
1054+
byte[] oldLeafHash = existingData == null ? null : calculateLeafHash(key, existingData);
1055+
1056+
// Calculate hash from key and data
1057+
byte[] newLeafHash = calculateLeafHash(key, data);
1058+
1059+
if (oldLeafHash != null && Arrays.equals(oldLeafHash, newLeafHash)) continue;
1060+
1061+
if (oldLeafHash == null) {
1062+
// Key doesn't exist, add new leaf
1063+
addLeaf(new Node(newLeafHash));
1064+
} else {
1065+
// Key exists, update leaf
1066+
// First get the old leaf hash
1067+
updateLeaf(oldLeafHash, newLeafHash);
1068+
}
1069+
1070+
// Store key-data mapping
1071+
hasUnsavedChanges.set(true);
1072+
keyDataCommittedCache.put(new ByteArrayWrapper(key), data);
1073+
keyDataPendingCache.compute(new ByteArrayWrapper(key), (k, v) -> {
1074+
if (v == null || Arrays.equals(v, data)) return null;
1075+
else return v;
1076+
});
1077+
1078+
pendingChangesProcessedEvent.triggerEvent();
1079+
} catch (Exception e) {
1080+
logger.error("Error processing pending changes in MerkleTree: " + treeName, e);
1081+
e.printStackTrace();
1082+
}
1083+
}
1084+
});
1085+
1086+
thread.setDaemon(true);
1087+
thread.setName("PendingChangesProcessor-" + treeName);
1088+
thread.start();
1089+
}
1090+
10251091
//endregion
10261092

10271093
//region ===================== Nested Classes =====================
@@ -1357,6 +1423,17 @@ public boolean equals(Object obj) {
13571423
return true;
13581424
}
13591425
}
1426+
1427+
@Getter
1428+
public class PendingChanges {
1429+
private final byte[] key;
1430+
private final byte[] data;
1431+
1432+
public PendingChanges(byte[] key, byte[] data) {
1433+
this.key = key;
1434+
this.data = data;
1435+
}
1436+
}
13601437
//endregion
13611438

13621439
public static void main(String[] args) throws Exception {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.pwrlabs.utils;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.atomic.AtomicReference;
6+
7+
/**
8+
* A one‑shot event barrier that automatically resets after each trigger.
9+
*
10+
* awaitEvent() will block until triggerEvent() is called.
11+
* triggerEvent() releases all current waiters and then resets for the next round.
12+
*/
13+
public class TriggerEvent {
14+
// Always points to the latch for the *current* cycle
15+
private final AtomicReference<CountDownLatch> latchRef =
16+
new AtomicReference<>(new CountDownLatch(1));
17+
18+
/**
19+
* Blocks until triggerEvent() is called.
20+
*/
21+
public void awaitEvent() throws InterruptedException {
22+
latchRef.get().await();
23+
}
24+
25+
/**
26+
* Blocks until triggerEvent() is called, or the timeout elapses.
27+
* @return true if released by triggerEvent(), false if timed out
28+
*/
29+
public boolean awaitEvent(long timeout, TimeUnit unit) throws InterruptedException {
30+
return latchRef.get().await(timeout, unit);
31+
}
32+
33+
/**
34+
* Releases all waiting threads, then immediately resets
35+
* so that subsequent calls to awaitEvent() will block again.
36+
*/
37+
public void triggerEvent() {
38+
// swap in a brand‑new latch for the next cycle
39+
CountDownLatch old = latchRef.getAndSet(new CountDownLatch(1));
40+
// countDown the *old* latch, releasing everyone who was waiting on it
41+
old.countDown();
42+
}
43+
}
44+

0 commit comments

Comments
 (0)