Skip to content

Commit b20ad4a

Browse files
committed
WIP
Signed-off-by: dorjesinpo <[email protected]>
1 parent 3ec7eb4 commit b20ad4a

19 files changed

+693
-175
lines changed

src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ int QueueManager::onPushEvent(QueueManager::EventInfos* eventInfos,
385385

386386
if (mps.streamIn(appData, input.isExtended()) == 0) {
387387
// Learn new schema.
388-
schema = mps.makeSchema(d_allocator_p);
388+
schema = mps.getSchema(d_allocator_p);
389389
schema_p = &schema;
390390
}
391391
}

src/groups/bmq/bmqp/bmqp_eventutil.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ Flattener::packMesage(const Protocol::SubQueueInfosArray& subQInfo)
393393

394394
if (mps.streamIn(d_appData, input.isExtended()) == 0) {
395395
// Learn new schema.
396-
schema = mps.makeSchema(d_allocator_p);
396+
schema = mps.getSchema(d_allocator_p);
397397
schema_p = &schema;
398398
}
399399
}

src/groups/bmq/bmqp/bmqp_messageproperties.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,12 @@ class MessageProperties {
471471
// accessing the returned reference after this object changes its
472472
// state.
473473

474-
SchemaPtr makeSchema(bslma::Allocator* allocator);
474+
SchemaPtr getSchema(bslma::Allocator* allocator);
475+
476+
/// Look up an ordinal for the specified `name`, load it into the specified
477+
/// `index`, and return `true` if this object has a valid schema. Return
478+
/// `false` otherwise.
479+
bool loadIndex(int* index, const bsl::string& name) const;
475480

476481
/// Return a blob having the BlazingMQ wire protocol representation of
477482
/// this instance. The specified `info` controls MessagePropertyHeader
@@ -878,7 +883,7 @@ MessageProperties::setPropertyAsBinary(const bsl::string& name,
878883
// ACCESSORS
879884

880885
inline MessageProperties::SchemaPtr
881-
MessageProperties::makeSchema(bslma::Allocator* allocator)
886+
MessageProperties::getSchema(bslma::Allocator* allocator)
882887
{
883888
if (!d_schema) {
884889
d_schema.load(new (*allocator)

src/groups/bmq/bmqp/bmqp_schemalearner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ int SchemaLearner::read(Context& context,
396396
if (rc == 0) {
397397
if (!schemaHandle->d_schema_sp) {
398398
// Learn new schema.
399-
schemaHandle->d_schema_sp = mps->makeSchema(d_allocator_p);
399+
schemaHandle->d_schema_sp = mps->getSchema(d_allocator_p);
400400
}
401401
}
402402
else {

src/groups/bmq/bmqp/bmqp_schemalearner.t.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ static void test2_readingTest()
112112
theLearner.multiplex(clientSession, input),
113113
blob));
114114

115-
bmqp::MessageProperties::SchemaPtr schema1 = out.makeSchema(
115+
bmqp::MessageProperties::SchemaPtr schema1 = out.getSchema(
116116
bmqtst::TestHelperUtil::allocator());
117117

118118
BMQTST_ASSERT_EQ(
@@ -123,7 +123,7 @@ static void test2_readingTest()
123123
blob));
124124

125125
BMQTST_ASSERT_EQ(schema1,
126-
out.makeSchema(bmqtst::TestHelperUtil::allocator()));
126+
out.getSchema(bmqtst::TestHelperUtil::allocator()));
127127
// subsequent call returns the same Schema
128128

129129
int start = bsl::rand() % num;
@@ -146,7 +146,7 @@ static void test2_readingTest()
146146
blob));
147147

148148
bmqp::MessageProperties::SchemaPtr schema2;
149-
schema2 = out.makeSchema(bmqtst::TestHelperUtil::allocator());
149+
schema2 = out.getSchema(bmqtst::TestHelperUtil::allocator());
150150
BMQTST_ASSERT_NE(schema1, schema2);
151151
// ...unless the input is recycled
152152

@@ -196,17 +196,17 @@ static void test3_observingTest()
196196
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out1, input, blob));
197197
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
198198

199-
bmqp::MessageProperties::SchemaPtr schema1 = out1.makeSchema(
199+
bmqp::MessageProperties::SchemaPtr schema1 = out1.getSchema(
200200
bmqtst::TestHelperUtil::allocator());
201201

202202
BMQTST_ASSERT_EQ(schema1,
203-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
203+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
204204
// subsequent call returns the same Schema
205205

206206
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
207207

208208
BMQTST_ASSERT_EQ(schema1,
209-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
209+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
210210
// subsequent call returns the same Schema
211211

212212
bmqp::MessagePropertiesInfo recycledInput(true, 1, true);
@@ -216,7 +216,7 @@ static void test3_observingTest()
216216
BMQTST_ASSERT_EQ(0, theLearner.read(server, &out2, input, blob));
217217

218218
BMQTST_ASSERT_NE(schema1,
219-
out2.makeSchema(bmqtst::TestHelperUtil::allocator()));
219+
out2.getSchema(bmqtst::TestHelperUtil::allocator()));
220220
// ...unless the input is recycled
221221
}
222222

src/groups/mqb/mqbblp/mqbblp_localqueue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,8 @@ void LocalQueue::loadInternals(mqbcmd::LocalQueue* out) const
657657
BSLS_ASSERT_SAFE(d_state_p->queue()->dispatcher()->inDispatcherThread(
658658
d_state_p->queue()));
659659

660-
d_queueEngine_mp->loadInternals(&out->queueEngine());
660+
d_queueEngine_mp->loadInternals(&out->queueEngine(),
661+
bsl::numeric_limits<unsigned int>::max());
661662
}
662663

663664
mqbi::Domain* LocalQueue::domain() const

src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -860,8 +860,10 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState(
860860
const bsl::string& appId,
861861
const mqbu::StorageKey& appKey,
862862
bslma::Allocator* allocator)
863-
: d_routing_sp(new(*allocator) Routers::AppContext(queueContext, allocator),
864-
allocator)
863+
: d_routing_sp(
864+
new(*allocator)
865+
Routers::AppContext(queueContext, upstreamSubQueueId, allocator),
866+
allocator)
865867
, d_redeliveryList(allocator)
866868
, d_putAsideList(allocator)
867869
, d_priorityCount(0)
@@ -1374,6 +1376,14 @@ Routers::Result QueueEngineUtil_AppState::selectConsumer(
13741376
<< currentMessage->guid();
13751377
}
13761378
}
1379+
else if (result == Routers::e_SUCCESS &&
1380+
d_routing_sp->d_queue.d_preader->numRuns() % 10000 == 0) {
1381+
BALL_LOG_INFO << "[THROTTLED] Queue '" << d_queue_p->description()
1382+
<< "', appId = '" << appId() << "' cache hits "
1383+
<< d_routing_sp->root().hits() << " iterations "
1384+
<< d_routing_sp->root().iterations() << " runs "
1385+
<< d_routing_sp->d_queue.d_preader->numRuns();
1386+
}
13771387

13781388
return result;
13791389
}
@@ -1397,9 +1407,9 @@ int QueueEngineUtil_AppState::setSubscription(
13971407
return 0;
13981408
}
13991409

1400-
bool QueueEngineUtil_AppState::evaluateAppSubcription()
1410+
bool QueueEngineUtil_AppState::evaluateAppSubcription(unsigned int run)
14011411
{
1402-
return d_appSubscription.evaluate();
1412+
return d_appSubscription.evaluate(run);
14031413
}
14041414

14051415
void QueueEngineUtil_AppState::authorize(const mqbu::StorageKey& appKey,

src/groups/mqb/mqbblp/mqbblp_queueengineutil.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ struct QueueEngineUtil_AppState {
499499
int setSubscription(const mqbconfm::Expression& value);
500500

501501
/// Evaluate the application subscription
502-
bool evaluateAppSubcription();
502+
bool evaluateAppSubcription(unsigned int run);
503503

504504
/// Change the state to authorized, thus enabling delivery
505505
void authorize(const mqbu::StorageKey& appKey, unsigned int appOrdinal);

src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ void RelayQueueEngine::onHandleConfiguredDispatched(
380380
BALL_LOGTHROTTLE_INFO_BLOCK(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
381381
{
382382
mqbcmd::QueueEngine outqe(d_allocator_p);
383-
loadInternals(&outqe);
383+
loadInternals(&outqe, 10);
384384

385385
BALL_LOG_OUTPUT_STREAM << "[THROTTLED] For queue ["
386386
<< d_queueState_p->uri()
@@ -1244,7 +1244,8 @@ void RelayQueueEngine::configureHandle(
12441244
App_State* app = findApp(upstreamSubQueueId);
12451245
BSLS_ASSERT_SAFE(app);
12461246

1247-
context->initializeRouting(d_queueState_p->routingContext());
1247+
context->initializeRouting(d_queueState_p->routingContext(),
1248+
app->upstreamSubQueueId());
12481249

12491250
configureApp(*app, handle, streamParameters, context);
12501251
}
@@ -1675,7 +1676,8 @@ mqbi::StorageResult::Enum RelayQueueEngine::evaluateAppSubscriptions(
16751676
return mqbi::StorageResult::e_INVALID_OPERATION;
16761677
}
16771678

1678-
void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const
1679+
void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out,
1680+
unsigned int max) const
16791681
{
16801682
// executed by the *QUEUE DISPATCHER* thread
16811683

@@ -1724,8 +1726,8 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const
17241726
it->second->loadInternals(&appState);
17251727
}
17261728

1727-
d_queueState_p->routingContext().loadInternals(
1728-
&relayQueueEngine.routing());
1729+
d_queueState_p->routingContext().loadInternals(&relayQueueEngine.routing(),
1730+
max);
17291731
}
17301732

17311733
void RelayQueueEngine::registerStorage(const bsl::string& appId,

src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
203203

204204
void reset();
205205

206-
void initializeRouting(Routers::QueueRoutingContext& queueContext);
206+
void initializeRouting(Routers::QueueRoutingContext& queueContext,
207+
unsigned int upstreamSubQueueId);
207208
};
208209

209210
private:
@@ -554,7 +555,8 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
554555

555556
/// Load into the specified `out` object the internal information about
556557
/// this queue engine and associated queue handles.
557-
void loadInternals(mqbcmd::QueueEngine* out) const BSLS_KEYWORD_OVERRIDE;
558+
void loadInternals(mqbcmd::QueueEngine* out,
559+
unsigned int max) const BSLS_KEYWORD_OVERRIDE;
558560

559561
/// Load upstream subQueue id into the specified `subQueueId` given the
560562
/// specified upstream `subscriptionId`.
@@ -659,10 +661,13 @@ inline void RelayQueueEngine::ConfigureContext::resetCallback()
659661
}
660662

661663
inline void RelayQueueEngine::ConfigureContext::initializeRouting(
662-
Routers::QueueRoutingContext& queueContext)
664+
Routers::QueueRoutingContext& queueContext,
665+
unsigned int upstreamSubQueueId)
663666
{
664667
d_routing_sp.reset(new (*d_allocator_p)
665-
Routers::AppContext(queueContext, d_allocator_p),
668+
Routers::AppContext(queueContext,
669+
upstreamSubQueueId,
670+
d_allocator_p),
666671
d_allocator_p);
667672
}
668673

0 commit comments

Comments
 (0)