- 
                Notifications
    You must be signed in to change notification settings 
- Fork 371
Feature: Add solidification stage to pipeline and only broadcast solid transactions #1646
base: dev
Are you sure you want to change the base?
Changes from 22 commits
53ea166
              be7c132
              6593388
              33f193f
              ff58495
              1a0ea43
              ad3a203
              2283fb0
              24e31a5
              03590dd
              06242eb
              1ec3794
              bc5897f
              62a2330
              9fc4cba
              ccca691
              f0c1955
              fa72acc
              c3b1417
              1538b36
              a34c5e8
              14e784d
              89ec550
              8801ef2
              26b9521
              16170cf
              7e34c91
              778c833
              0b61c89
              1a2d950
              864c2db
              4bfb138
              d623d1d
              48258b1
              1cde4bf
              6543981
              7b4d3c5
              5e0a974
              f86fe55
              de08907
              4086fce
              b16c752
              f5de8fe
              30f226e
              303648b
              7e36e1c
              eb53b2f
              0409d3e
              1b5294a
              ab6e628
              839d9cb
              031180b
              f61e6ff
              6e0a383
              4126903
              ff4a13c
              5bd3066
              10baada
              4327b10
              092cfa5
              cc220b0
              82359a7
              7adfdc5
              d3baeee
              ffea048
              0f29416
              0084b48
              4664000
              cc5e8b8
              d488a1d
              9ad4886
              9ab2a7d
              5663d30
              b2cb269
              31223b3
              ea3bbf8
              e4feaf6
              059e7e4
              7528b22
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,8 +1,10 @@ | ||
| package com.iota.iri.network.pipeline; | ||
|  | ||
| import com.iota.iri.TransactionValidator; | ||
| import com.iota.iri.service.validation.TransactionSolidifier; | ||
| import com.iota.iri.service.validation.TransactionValidator; | ||
| import com.iota.iri.conf.NodeConfig; | ||
| import com.iota.iri.controllers.TipsViewModel; | ||
| import com.iota.iri.controllers.TransactionViewModel; | ||
| import com.iota.iri.crypto.batched.BatchedHasher; | ||
| import com.iota.iri.crypto.batched.BatchedHasherFactory; | ||
| import com.iota.iri.crypto.batched.HashRequest; | ||
|  | @@ -19,7 +21,10 @@ | |
| import com.iota.iri.utils.Converter; | ||
|  | ||
| import java.nio.ByteBuffer; | ||
| import java.util.Iterator; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ArrayBlockingQueue; | ||
| import java.util.concurrent.BlockingQueue; | ||
| import java.util.concurrent.ExecutorService; | ||
|  | @@ -62,12 +67,13 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP | |
| private BroadcastStage broadcastStage; | ||
| private BatchedHasher batchedHasher; | ||
| private HashingStage hashingStage; | ||
| private TransactionSolidifier txSolidifier; | ||
|  | ||
| private BlockingQueue<ProcessingContext> preProcessStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> validationStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> receivedStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> broadcastStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> replyStageQueue = new ArrayBlockingQueue<>(100); | ||
| private BlockingQueue<ProcessingContext> broadcastStageQueue = new ArrayBlockingQueue<>(100); | ||
|  | ||
| /** | ||
| * Creates a {@link TransactionProcessingPipeline}. | ||
|  | @@ -82,9 +88,9 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP | |
| * reply stage | ||
| */ | ||
| public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConfig config, | ||
| TransactionValidator txValidator, Tangle tangle, SnapshotProvider snapshotProvider, | ||
| TipsViewModel tipsViewModel, LatestMilestoneTracker latestMilestoneTracker, | ||
| TransactionRequester transactionRequester) { | ||
| TransactionValidator txValidator, Tangle tangle, SnapshotProvider snapshotProvider, | ||
| TipsViewModel tipsViewModel, LatestMilestoneTracker latestMilestoneTracker, | ||
| TransactionRequester transactionRequester, TransactionSolidifier txSolidifier) { | ||
|         
                  DyrellC marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| FIFOCache<Long, Hash> recentlySeenBytesCache = new FIFOCache<>(config.getCacheSizeBytes()); | ||
| this.preProcessStage = new PreProcessStage(recentlySeenBytesCache); | ||
| this.replyStage = new ReplyStage(neighborRouter, config, tangle, tipsViewModel, latestMilestoneTracker, | ||
|  | @@ -94,6 +100,7 @@ public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConf | |
| this.receivedStage = new ReceivedStage(tangle, txValidator, snapshotProvider, transactionRequester); | ||
| this.batchedHasher = BatchedHasherFactory.create(BatchedHasherFactory.Type.BCTCURL81, 20); | ||
| this.hashingStage = new HashingStage(batchedHasher); | ||
| this.txSolidifier = txSolidifier; | ||
| } | ||
|  | ||
| @Override | ||
|  | @@ -119,6 +126,7 @@ private void addStage(String name, BlockingQueue<ProcessingContext> queue, | |
| try { | ||
| while (!Thread.currentThread().isInterrupted()) { | ||
| ProcessingContext ctx = stage.process(queue.take()); | ||
|  | ||
| switch (ctx.getNextStage()) { | ||
| case REPLY: | ||
| replyStageQueue.put(ctx); | ||
|  | @@ -177,6 +185,7 @@ public BlockingQueue<ProcessingContext> getValidationStageQueue() { | |
| public void process(Neighbor neighbor, ByteBuffer data) { | ||
| try { | ||
| preProcessStageQueue.put(new ProcessingContext(new PreProcessPayload(neighbor, data))); | ||
| refillBroadcastQueue(); | ||
| } catch (InterruptedException e) { | ||
| e.printStackTrace(); | ||
| } | ||
|  | @@ -191,6 +200,23 @@ public void process(byte[] txTrits) { | |
| hashAndValidate(new ProcessingContext(payload)); | ||
| } | ||
|  | ||
| @Override | ||
| public void refillBroadcastQueue(){ | ||
| try{ | ||
| Iterator<TransactionViewModel> hashIterator = txSolidifier.getBroadcastQueue().iterator(); | ||
| Set<TransactionViewModel> toRemove = new LinkedHashSet<>(); | ||
| while(!Thread.currentThread().isInterrupted() && hashIterator.hasNext()){ | ||
| TransactionViewModel t = hashIterator.next(); | ||
| broadcastStageQueue.put(new ProcessingContext(new BroadcastPayload(null, t))); | ||
| toRemove.add(t); | ||
|          | ||
| hashIterator.remove(); | ||
| } | ||
| txSolidifier.clearBroadcastQueue(toRemove); | ||
| } catch(InterruptedException e){ | ||
| log.info(e.getMessage()); | ||
| } | ||
| } | ||
|  | ||
| /** | ||
| * Sets up the given hashing stage {@link ProcessingContext} so that up on success, it will submit further to the | ||
| * validation stage. | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently not sure why this has changed...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line 138 you gossip the tx to neighbor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to bolster broadcast rate to neighbours to increase the rate that
transactionsToRequestare sent out. It was meant to increase solidification speed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a bit hacky...
I've been thinking about it and I think this should be the correct design:
You need to add a solidification stage and a solidification queue in
TransactionProcessingPipeline.The flow should be like this:
From the
ReceivedStage-> if solid (due to quickly solid) -> Broadcast-> else go to solidification stage -> Broadcast
Also remember to pass neighbor information along so we don't broadcast the solid tx to the neighbor that sent this to us..