-
Notifications
You must be signed in to change notification settings - Fork 371
Feature: Add solidification stage to pipeline and only broadcast solid transactions #1646
base: dev
Are you sure you want to change the base?
Changes from 9 commits
53ea166
be7c132
6593388
33f193f
ff58495
1a0ea43
ad3a203
2283fb0
24e31a5
03590dd
06242eb
1ec3794
bc5897f
62a2330
9fc4cba
ccca691
f0c1955
fa72acc
c3b1417
1538b36
a34c5e8
14e784d
89ec550
8801ef2
26b9521
16170cf
7e34c91
778c833
0b61c89
1a2d950
864c2db
4bfb138
d623d1d
48258b1
1cde4bf
6543981
7b4d3c5
5e0a974
f86fe55
de08907
4086fce
b16c752
f5de8fe
30f226e
303648b
7e36e1c
eb53b2f
0409d3e
1b5294a
ab6e628
839d9cb
031180b
f61e6ff
6e0a383
4126903
ff4a13c
5bd3066
10baada
4327b10
092cfa5
cc220b0
82359a7
7adfdc5
d3baeee
ffea048
0f29416
0084b48
4664000
cc5e8b8
d488a1d
9ad4886
9ab2a7d
5663d30
b2cb269
31223b3
ea3bbf8
e4feaf6
059e7e4
7528b22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package com.iota.iri.network.pipeline; | ||
|
|
||
| import java.util.concurrent.ArrayBlockingQueue; | ||
| import java.util.concurrent.BlockingQueue; | ||
|
|
||
| /** | ||
| * A queue for transactions intended to be submitted to the {@link BroadcastStage} | ||
| * for processing | ||
| */ | ||
| public class BroadcastQueue { | ||
|
|
||
| /** A blocking queue to store transactions for broadcasting */ | ||
| private BlockingQueue<ProcessingContext> broadcastStageQueue = new ArrayBlockingQueue<>(100); | ||
DyrellC marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /** An object to be used for synchronizing calls */ | ||
| private final Object broadcastSync = new Object(); | ||
|
|
||
|
|
||
| /** | ||
| * Add transactions to the Broadcast Queue | ||
| * @param context Transaction context to be passed to the {@link BroadcastStage} | ||
| * @return True if added properly, False if not | ||
| */ | ||
| public boolean add(ProcessingContext context) { | ||
| synchronized (broadcastSync) { | ||
| try { | ||
| this.broadcastStageQueue.put(context); | ||
| return true; | ||
| } catch (Exception e) { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| /** | ||
| * Getter for the current Broadcast Queue | ||
| * @return BlockingQueue of all transactions left to be broadcasted | ||
| */ | ||
| public BlockingQueue<ProcessingContext> get(){ | ||
| /** Call is synchronized to ensure all pending additions have completed before sending the state */ | ||
| synchronized (broadcastSync) { | ||
| return this.broadcastStageQueue; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,8 +66,8 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP | |
| private BlockingQueue<ProcessingContext> preProcessStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> validationStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> receivedStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> broadcastStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> replyStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BroadcastQueue broadcastStageQueue; | ||
|
|
||
| /** | ||
| * Creates a {@link TransactionProcessingPipeline}. | ||
|
|
@@ -84,7 +84,7 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP | |
| public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConfig config, | ||
| TransactionValidator txValidator, Tangle tangle, SnapshotProvider snapshotProvider, | ||
| TipsViewModel tipsViewModel, LatestMilestoneTracker latestMilestoneTracker, | ||
| TransactionRequester transactionRequester) { | ||
| TransactionRequester transactionRequester, BroadcastQueue broadcastStageQueue) { | ||
| FIFOCache<Long, Hash> recentlySeenBytesCache = new FIFOCache<>(config.getCacheSizeBytes()); | ||
| this.preProcessStage = new PreProcessStage(recentlySeenBytesCache); | ||
| this.replyStage = new ReplyStage(neighborRouter, config, tangle, tipsViewModel, latestMilestoneTracker, | ||
|
|
@@ -94,6 +94,7 @@ public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConf | |
| this.receivedStage = new ReceivedStage(tangle, txValidator, snapshotProvider, transactionRequester); | ||
| this.batchedHasher = BatchedHasherFactory.create(BatchedHasherFactory.Type.BCTCURL81, 20); | ||
| this.hashingStage = new HashingStage(batchedHasher); | ||
| this.broadcastStageQueue = broadcastStageQueue; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -103,7 +104,7 @@ public void start() { | |
| addStage("validation", validationStageQueue, validationStage); | ||
| addStage("reply", replyStageQueue, replyStage); | ||
| addStage("received", receivedStageQueue, receivedStage); | ||
| addStage("broadcast", broadcastStageQueue, broadcastStage); | ||
| addStage("broadcast", broadcastStageQueue.get(), broadcastStage); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -118,7 +119,14 @@ private void addStage(String name, BlockingQueue<ProcessingContext> queue, | |
| stagesThreadPool.submit(new Thread(() -> { | ||
| try { | ||
| while (!Thread.currentThread().isInterrupted()) { | ||
| ProcessingContext ctx = stage.process(queue.take()); | ||
| ProcessingContext queueTake; | ||
| if(name.equals("broadcast")) { | ||
| queueTake = broadcastStageQueue.get().take(); | ||
| } else{ | ||
| queueTake = queue.take(); | ||
| } | ||
| ProcessingContext ctx = stage.process(queueTake); | ||
|
|
||
| switch (ctx.getNextStage()) { | ||
| case REPLY: | ||
| replyStageQueue.put(ctx); | ||
|
|
@@ -135,7 +143,7 @@ private void addStage(String name, BlockingQueue<ProcessingContext> queue, | |
| receivedStageQueue.put(payload.getRight()); | ||
| break; | ||
| case BROADCAST: | ||
| broadcastStageQueue.put(ctx); | ||
| broadcastStageQueue.add(ctx); | ||
|
||
| break; | ||
| case ABORT: | ||
| break; | ||
|
|
@@ -160,7 +168,7 @@ public BlockingQueue<ProcessingContext> getReceivedStageQueue() { | |
|
|
||
| @Override | ||
| public BlockingQueue<ProcessingContext> getBroadcastStageQueue() { | ||
| return broadcastStageQueue; | ||
| return broadcastStageQueue.get(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.