Skip to content
This repository was archived by the owner on Apr 17, 2019. It is now read-only.

Commit 7663aa5

Browse files
committed
MovedBatchPtr
Signed-off-by: Mikhail Boldyrev <[email protected]>
1 parent 7bd6317 commit 7663aa5

File tree

19 files changed

+189
-108
lines changed

19 files changed

+189
-108
lines changed

irohad/multi_sig_transactions/impl/mst_processor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ namespace iroha {
1919
return this->onStateUpdateImpl();
2020
}
2121

22-
rxcpp::observable<DataType> MstProcessor::onPreparedBatches() const {
22+
rxcpp::observable<std::shared_ptr<MovedBatchPtr>>
23+
MstProcessor::onPreparedBatches() const {
2324
return this->onPreparedBatchesImpl();
2425
}
2526

irohad/multi_sig_transactions/impl/mst_processor_impl.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ namespace iroha {
3535
auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch)
3636
-> decltype(propagateBatch(batch)) {
3737
auto state_update = storage_->updateOwnState(batch);
38-
completedBatchesNotify(*state_update.completed_state_);
38+
completedBatchesNotify(state_update.completed_state_);
3939
updatedBatchesNotify(*state_update.updated_state_);
4040
expiredBatchesNotify(
4141
storage_->extractExpiredTransactions(time_provider_->getCurrentTime()));
@@ -57,11 +57,13 @@ namespace iroha {
5757
}
5858

5959
// TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one
60-
void FairMstProcessor::completedBatchesNotify(ConstRefState state) const {
61-
if (not state.isEmpty()) {
62-
state.iterateBatches([this](const auto &batch) {
63-
batches_subject_.get_subscriber().on_next(batch);
64-
});
60+
void FairMstProcessor::completedBatchesNotify(
61+
std::vector<std::shared_ptr<MovedBatchPtr>> completed) const {
62+
if (not completed.empty()) {
63+
std::for_each(
64+
completed.begin(), completed.end(), [this](const auto &batch) {
65+
batches_subject_.get_subscriber().on_next(batch);
66+
});
6567
}
6668
}
6769

@@ -100,7 +102,7 @@ namespace iroha {
100102
state_update.updated_state_->transactionsQuantity());
101103

102104
// completed batches
103-
completedBatchesNotify(*state_update.completed_state_);
105+
completedBatchesNotify(state_update.completed_state_);
104106

105107
// expired batches
106108
expiredBatchesNotify(storage_->getDiffState(from, current_time));

irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,13 @@ MstToPcsPropagation::~MstToPcsPropagation() {
4747
propagation_available_subscription_.unsubscribe();
4848
}
4949

50-
void MstToPcsPropagation::notifyCompletedBatch(BatchPtr batch) {
51-
if (not pcs_->propagate_batch(batch)) {
52-
if (not pending_batches_.insert(batch)) {
50+
void MstToPcsPropagation::notifyCompletedBatch(
51+
std::shared_ptr<MovedBatchPtr> moved_batch) {
52+
if (not pcs_->propagate_batch(moved_batch->get())) {
53+
if (not pending_batches_.insert(std::move(moved_batch))) {
5354
log_->critical(
5455
"Dropped a completed MST batch because no place left in storage: {}",
55-
batch);
56+
moved_batch->get());
5657
assert(false);
5758
}
5859
}

irohad/multi_sig_transactions/mst_processor.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ namespace iroha {
4747
* Observable emit batches which are prepared for further processing in
4848
* system
4949
*/
50-
rxcpp::observable<DataType> onPreparedBatches() const;
50+
rxcpp::observable<std::shared_ptr<MovedBatchPtr>> onPreparedBatches() const;
5151

5252
/**
5353
* Observable emit expired by time transactions

irohad/multi_sig_transactions/mst_processor_impl.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ namespace iroha {
7575
* signatures and ready to move forward
7676
* @param state with those batches
7777
*/
78-
void completedBatchesNotify(ConstRefState state) const;
78+
void completedBatchesNotify(
79+
std::vector<std::shared_ptr<MovedBatchPtr>> completed) const;
7980

8081
/**
8182
* Notify subscribers when some of the batches received new signatures, but
@@ -102,7 +103,7 @@ namespace iroha {
102103
rxcpp::subjects::subject<std::shared_ptr<MstState>> state_subject_;
103104

104105
/// use for share completed batches
105-
rxcpp::subjects::subject<DataType> batches_subject_;
106+
rxcpp::subjects::subject<std::shared_ptr<MovedBatchPtr>> batches_subject_;
106107

107108
/// use for share expired batches
108109
rxcpp::subjects::subject<DataType> expired_subject_;

irohad/multi_sig_transactions/mst_types.hpp

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,6 @@ namespace iroha {
3838
using ConstRefState = ConstRefT<MstState>;
3939

4040
using DataType = BatchPtr;
41-
42-
/**
43-
* Contains result of updating local state:
44-
* - state with completed batches
45-
* - state with updated (still not enough signatures) batches
46-
*/
47-
struct StateUpdateResult {
48-
StateUpdateResult(std::shared_ptr<MstState> completed_state,
49-
std::shared_ptr<MstState> updated_state)
50-
: completed_state_{std::move(completed_state)},
51-
updated_state_{std::move(updated_state)} {}
52-
std::shared_ptr<MstState> completed_state_;
53-
std::shared_ptr<MstState> updated_state_;
54-
};
5541
} // namespace iroha
5642

5743
#endif // IROHA_MST_TYPES_HPP

irohad/multi_sig_transactions/propagation_to_pcs.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ namespace iroha {
2626

2727
virtual ~MstToPcsPropagation();
2828

29-
void notifyCompletedBatch(BatchPtr batch);
29+
void notifyCompletedBatch(std::shared_ptr<MovedBatchPtr> batch);
3030

3131
size_t pendingBatchesQuantity() const;
3232

irohad/multi_sig_transactions/state/impl/mst_state.cpp

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,13 @@ namespace iroha {
7979
}
8080

8181
StateUpdateResult MstState::operator+=(const DataType &rhs) {
82-
auto state_update = StateUpdateResult{
83-
std::make_shared<MstState>(empty(
84-
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_)),
85-
std::make_shared<MstState>(empty(
86-
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_))};
82+
auto state_update = StateUpdateResult{completer_, log_};
8783
insertOne(state_update, rhs);
8884
return state_update;
8985
}
9086

9187
StateUpdateResult MstState::operator+=(const MstState &rhs) {
92-
auto state_update = StateUpdateResult{
93-
std::make_shared<MstState>(empty(
94-
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_)),
95-
std::make_shared<MstState>(empty(
96-
completer_, std::make_shared<iroha::StorageLimitDummy>(), log_))};
88+
auto state_update = StateUpdateResult{completer_, log_};
9789
return rhs.batches_.access([this, &state_update](const auto &storage) {
9890
for (auto &&rhs_tx : storage.batches.right | boost::adaptors::map_keys) {
9991
this->insertOne(state_update, rhs_tx);
@@ -209,8 +201,9 @@ namespace iroha {
209201
void MstState::insertOne(StateUpdateResult &state_update,
210202
const DataType &rhs_batch) {
211203
log_->info("batch: {}", *rhs_batch);
212-
batches_.extract([this, &state_update, &rhs_batch](
213-
auto &storage) -> std::vector<BatchPtr> {
204+
auto completed_batches = batches_.move([this, &state_update, &rhs_batch](
205+
auto &storage)
206+
-> std::vector<BatchPtr> {
214207
auto corresponding = storage.batches.right.find(rhs_batch);
215208
if (corresponding == storage.batches.right.end()) {
216209
// when state does not contain transaction
@@ -234,17 +227,19 @@ namespace iroha {
234227
// state already has completed transaction,
235228
// remove from state and return it
236229
storage.batches.right.erase(found);
237-
state_update.completed_state_->rawInsert(found);
238230
return {found};
239231
}
240232

241-
// if batch still isn't completed, return it, if new signatures were
242-
// inserted
233+
// if batch still isn't completed, return it, if new signatures
234+
// were inserted
243235
if (inserted_new_signatures) {
244236
state_update.updated_state_->rawInsert(found);
245237
}
246238
return {};
247239
});
240+
for (auto &batch : completed_batches) {
241+
state_update.completed_state_.emplace_back(std::move(batch));
242+
}
248243
}
249244

250245
bool MstState::rawInsert(const DataType &rhs_batch) {

irohad/multi_sig_transactions/state/mst_state.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ namespace iroha {
8787

8888
using CompleterType = std::shared_ptr<const Completer>;
8989

90+
struct StateUpdateResult;
91+
9092
class MstState {
9193
public:
9294
// -----------------------------| public api |------------------------------
@@ -245,6 +247,22 @@ namespace iroha {
245247
logger::LoggerPtr log_;
246248
};
247249

250+
/**
251+
* Contains result of updating local state:
252+
* - state with completed batches
253+
* - state with updated (still not enough signatures) batches
254+
*/
255+
struct StateUpdateResult {
256+
StateUpdateResult(std::shared_ptr<const Completer> completer,
257+
logger::LoggerPtr log)
258+
: updated_state_(std::make_shared<MstState>(
259+
MstState::empty(std::move(completer),
260+
std::make_shared<iroha::StorageLimitDummy>(),
261+
std::move(log)))) {}
262+
std::vector<std::shared_ptr<MovedBatchPtr>> completed_state_;
263+
std::shared_ptr<MstState> updated_state_;
264+
};
265+
248266
} // namespace iroha
249267

250268
#endif // IROHA_MST_STATE_HPP

irohad/multi_sig_transactions/storage/storage_limit.hpp

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <memory>
1111
#include <type_traits>
1212

13+
#include <boost/core/noncopyable.hpp>
14+
#include <boost/optional.hpp>
1315
#include "interfaces/iroha_internal/transaction_batch.hpp"
1416
#include "multi_sig_transactions/mst_types.hpp"
1517

@@ -81,6 +83,39 @@ namespace iroha {
8183
std::atomic<size_t> txs_quantity_{0};
8284
};
8385

86+
/// RAII batch wrapper for transfers between limited storages
87+
class MovedBatchPtr : public boost::noncopyable {
88+
public:
89+
~MovedBatchPtr() {
90+
if (not is_extracted_.test_and_set()) {
91+
limit_->remove(batch_);
92+
}
93+
}
94+
95+
BatchPtr get() const {
96+
return batch_;
97+
}
98+
99+
BatchPtr extract() {
100+
if (not is_extracted_.test_and_set()) {
101+
limit_->remove(batch_);
102+
}
103+
return batch_;
104+
}
105+
106+
protected:
107+
template <typename T>
108+
friend class LimitedStorage;
109+
110+
MovedBatchPtr(BatchPtr batch, std::shared_ptr<StorageLimit> limit)
111+
: batch_(std::move(batch)), limit_(std::move(limit)) {}
112+
113+
private:
114+
std::atomic_flag is_extracted_ = ATOMIC_FLAG_INIT;
115+
BatchPtr batch_;
116+
std::shared_ptr<StorageLimit> limit_;
117+
};
118+
84119
template <class StorageImpl>
85120
class LimitedStorage {
86121
static_assert(
@@ -108,9 +143,7 @@ namespace iroha {
108143
if (not limit_->addIfAllowed(batch)) {
109144
return false;
110145
}
111-
112-
txs_quantity_ += batch->transactions().size();
113-
++batches_quantity_;
146+
updateCountersOnInsertedBatch(batch);
114147
return storage_->insert(std::move(batch));
115148
}
116149

@@ -120,11 +153,7 @@ namespace iroha {
120153
auto extracted = extractor(static_cast<StorageImpl &>(*storage_));
121154
for (const auto &batch : extracted) {
122155
limit_->remove(batch);
123-
const size_t extracted_txs = batch->transactions().size();
124-
assert(txs_quantity_ >= extracted_txs);
125-
txs_quantity_ -= extracted_txs;
126-
assert(batches_quantity_ > 0);
127-
--batches_quantity_;
156+
updateCountersOnRemovedBatch(batch);
128157
}
129158
return extracted;
130159
}
@@ -135,7 +164,50 @@ namespace iroha {
135164
return func(static_cast<const StorageImpl &>(*storage_));
136165
}
137166

167+
template <typename Lambda>
168+
std::vector<std::shared_ptr<MovedBatchPtr>> move(Lambda extractor) {
169+
auto moved_batches = extractor(static_cast<StorageImpl &>(*storage_));
170+
std::vector<std::shared_ptr<MovedBatchPtr>> wrapped_moved_batches;
171+
//wrapped_moved_batches.reserve(moved_batches.size());
172+
std::transform(moved_batches.begin(),
173+
moved_batches.end(),
174+
std::back_inserter(wrapped_moved_batches),
175+
[this](auto &&moved_batch) {
176+
this->updateCountersOnRemovedBatch(moved_batch);
177+
return std::shared_ptr<MovedBatchPtr>(
178+
new MovedBatchPtr(std::move(moved_batch), limit_));
179+
});
180+
return wrapped_moved_batches;
181+
}
182+
183+
bool insert(std::shared_ptr<MovedBatchPtr> moved) {
184+
if (moved->limit_ == limit_) {
185+
moved->is_extracted_.test_and_set();
186+
return insertUnsafe(moved->batch_);
187+
} else {
188+
return insert(moved->extract());
189+
}
190+
}
191+
138192
private:
193+
bool insertUnsafe(BatchPtr batch) {
194+
updateCountersOnInsertedBatch(batch);
195+
return storage_->insert(std::move(batch));
196+
}
197+
198+
void updateCountersOnInsertedBatch(const BatchPtr &batch) {
199+
txs_quantity_ += batch->transactions().size();
200+
++batches_quantity_;
201+
}
202+
203+
void updateCountersOnRemovedBatch(const BatchPtr &batch) {
204+
const size_t extracted_txs = batch->transactions().size();
205+
assert(txs_quantity_ >= extracted_txs);
206+
txs_quantity_ -= extracted_txs;
207+
assert(batches_quantity_ > 0);
208+
--batches_quantity_;
209+
}
210+
139211
const std::shared_ptr<StorageLimit> limit_;
140212
const std::shared_ptr<LimitableStorage> storage_;
141213
size_t txs_quantity_{0};

0 commit comments

Comments
 (0)