Skip to content

Commit afd7c70

Browse files
committed
Inline Put, Push, Ack, Confirm
Signed-off-by: dorjesinpo <[email protected]>
1 parent c9a1fae commit afd7c70

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1871
-1651
lines changed

src/groups/mqb/mqba/mqba_clientsession.cpp

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -351,16 +351,16 @@ struct BuildAckOverflowFunctor {
351351
// -------------------------
352352

353353
ClientSessionState::ClientSessionState(
354-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
355-
BlobSpPool* blobSpPool,
356-
bdlbb::BlobBufferFactory* bufferFactory,
357-
bmqp::EncodingType::Enum encodingType,
358-
bslma::Allocator* allocator)
354+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
355+
BlobSpPool* blobSpPool,
356+
bdlbb::BlobBufferFactory* bufferFactory,
357+
bmqp::EncodingType::Enum encodingType,
358+
bslma::Allocator* allocator)
359359
: d_allocator_p(allocator)
360360
, d_channelBufferQueue(allocator)
361361
, d_unackedMessageInfos(d_allocator_p)
362362
, d_dispatcherClientData()
363-
, d_statContext_mp(clientStatContext)
363+
, d_statContext_sp(clientStatContext)
364364
, d_bufferFactory_p(bufferFactory)
365365
, d_blobSpPool_p(blobSpPool)
366366
, d_schemaEventBuilder(blobSpPool, encodingType, allocator)
@@ -668,7 +668,7 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,
668668
queueStats = invalidQueueStats();
669669
}
670670
else {
671-
queueStats = subQueueCiter->value().d_stats.get();
671+
queueStats = subQueueCiter->value().d_stats_sp.get();
672672
}
673673
}
674674

@@ -1667,23 +1667,10 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
16671667
return false; // RETURN
16681668
}
16691669

1670-
StreamsMap::iterator subQueueIt =
1671-
queueIt->second.d_subQueueInfosMap.findBySubIdSafe(queueId.subId());
1672-
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
1673-
subQueueIt == queueIt->second.d_subQueueInfosMap.end())) {
1674-
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
1675-
1676-
if (eventType == bmqp::EventType::e_CONFIRM) {
1677-
// Update invalid queue stats
1678-
invalidQueueStats()->onEvent(
1679-
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1680-
1);
1681-
}
1682-
1683-
*errorStream << "for an unknown subQueueId";
1684-
1685-
return false; // RETURN
1686-
}
1670+
// Do not lookup 'queueId.subId()'.
1671+
// 'QueueHandle::confirmMessageDispatched' does the check.
1672+
// Note, that it does not update stats (on "bmq://invalid/queue").
1673+
// It does log warnings.
16871674

16881675
*queueHandle = queueIt->second.d_handle_p;
16891676
BSLS_ASSERT_SAFE(queueHandle);
@@ -1729,13 +1716,6 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
17291716
return false; // RETURN
17301717
}
17311718

1732-
if (eventType == bmqp::EventType::e_CONFIRM) {
1733-
// Update stats for the queue (or subStream of the queue)
1734-
subQueueIt->value().d_stats->onEvent(
1735-
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1736-
1);
1737-
}
1738-
17391719
return true;
17401720
}
17411721

@@ -1877,7 +1857,7 @@ void ClientSession::onPushEvent(const mqbi::DispatcherPushEvent& event)
18771857
blob->length());
18781858
}
18791859
else {
1880-
subQueueCiter->value().d_stats->onEvent(
1860+
subQueueCiter->value().onEvent(
18811861
mqbstat::QueueStatsClient::EventType::e_PUSH,
18821862
blob->length());
18831863
}
@@ -2028,9 +2008,8 @@ void ClientSession::onPutEvent(const mqbi::DispatcherPutEvent& event)
20282008
BSLS_ASSERT_SAFE(queueStatePtr && subQueueInfoPtr);
20292009
BSLS_ASSERT_SAFE(queueStatePtr->d_handle_p);
20302010

2031-
subQueueInfoPtr->d_stats->onEvent(
2032-
mqbstat::QueueStatsClient::EventType::e_PUT,
2033-
appDataSp->length());
2011+
subQueueInfoPtr->onEvent(mqbstat::QueueStatsClient::EventType::e_PUT,
2012+
appDataSp->length());
20342013

20352014
const bool isAtMostOnce =
20362015
queueStatePtr->d_handle_p->queue()->isAtMostOnce();
@@ -2197,7 +2176,7 @@ mqbstat::QueueStatsClient* ClientSession::invalidQueueStats()
21972176
d_state.d_invalidQueueStats.makeValue();
21982177
d_state.d_invalidQueueStats.value().initialize(
21992178
"bmq://invalid/queue",
2200-
d_state.d_statContext_mp.get(),
2179+
d_state.d_statContext_sp.get(),
22012180
d_state.d_allocator_p);
22022181
// TBD: The queue uri should be '** INVALID QUEUE **', but that can
22032182
// only be done once the stats UI panel has been updated to
@@ -2400,17 +2379,17 @@ bool ClientSession::validatePutMessage(QueueState** queueState,
24002379

24012380
// CREATORS
24022381
ClientSession::ClientSession(
2403-
const bsl::shared_ptr<bmqio::Channel>& channel,
2404-
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
2405-
const bsl::string& sessionDescription,
2406-
mqbi::Dispatcher* dispatcher,
2407-
mqbblp::ClusterCatalog* clusterCatalog,
2408-
mqbi::DomainFactory* domainFactory,
2409-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
2410-
ClientSessionState::BlobSpPool* blobSpPool,
2411-
bdlbb::BlobBufferFactory* bufferFactory,
2412-
bdlmt::EventScheduler* scheduler,
2413-
bslma::Allocator* allocator)
2382+
const bsl::shared_ptr<bmqio::Channel>& channel,
2383+
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
2384+
const bsl::string& sessionDescription,
2385+
mqbi::Dispatcher* dispatcher,
2386+
mqbblp::ClusterCatalog* clusterCatalog,
2387+
mqbi::DomainFactory* domainFactory,
2388+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
2389+
ClientSessionState::BlobSpPool* blobSpPool,
2390+
bdlbb::BlobBufferFactory* bufferFactory,
2391+
bdlmt::EventScheduler* scheduler,
2392+
bslma::Allocator* allocator)
24142393
: d_self(this) // use default allocator
24152394
, d_operationState(e_RUNNING)
24162395
, d_isDisconnecting(false)
@@ -2431,7 +2410,7 @@ ClientSession::ClientSession(
24312410
allocator)
24322411
, d_queueSessionManager(this,
24332412
*d_clientIdentity_p,
2434-
d_state.d_statContext_mp.get(),
2413+
d_state.d_statContext_sp,
24352414
domainFactory,
24362415
allocator)
24372416
, d_clusterCatalog_p(clusterCatalog)

src/groups/mqb/mqba/mqba_clientsession.h

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,6 @@ struct ClientSessionState {
149149
typedef bsl::pair<UnackedMessageInfoMap::iterator, bool>
150150
UnackedMessageInfoMapInsertRc;
151151

152-
typedef bslma::ManagedPtr<bmqst::StatContext> StatContextMp;
153-
154152
public:
155153
// PUBLIC DATA
156154

@@ -173,7 +171,7 @@ struct ClientSessionState {
173171

174172
/// Stat context dedicated to this domain, to use as the parent stat
175173
/// context for any queue in this domain.
176-
StatContextMp d_statContext_mp;
174+
const bsl::shared_ptr<bmqst::StatContext> d_statContext_sp;
177175

178176
/// Blob buffer factory to use.
179177
///
@@ -225,11 +223,11 @@ struct ClientSessionState {
225223
/// builder will use. Memory allocations are performed using the
226224
/// specified `allocator`.
227225
ClientSessionState(
228-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
229-
BlobSpPool* blobSpPool,
230-
bdlbb::BlobBufferFactory* bufferFactory,
231-
bmqp::EncodingType::Enum encodingType,
232-
bslma::Allocator* allocator);
226+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
227+
BlobSpPool* blobSpPool,
228+
bdlbb::BlobBufferFactory* bufferFactory,
229+
bmqp::EncodingType::Enum encodingType,
230+
bslma::Allocator* allocator);
233231
};
234232

235233
// ===================
@@ -577,11 +575,11 @@ class ClientSession : public mqbnet::Session,
577575
mqbi::Dispatcher* dispatcher,
578576
mqbblp::ClusterCatalog* clusterCatalog,
579577
mqbi::DomainFactory* domainFactory,
580-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
581-
ClientSessionState::BlobSpPool* blobSpPool,
582-
bdlbb::BlobBufferFactory* bufferFactory,
583-
bdlmt::EventScheduler* scheduler,
584-
bslma::Allocator* allocator);
578+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
579+
ClientSessionState::BlobSpPool* blobSpPool,
580+
bdlbb::BlobBufferFactory* bufferFactory,
581+
bdlmt::EventScheduler* scheduler,
582+
bslma::Allocator* allocator);
585583

586584
/// Destructor
587585
~ClientSession() BSLS_KEYWORD_OVERRIDE;

src/groups/mqb/mqba/mqba_clientsession.t.cpp

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ class MyMockDomain : public mqbmock::Domain {
585585
/// calls the specified `callback` with a new queue handle created
586586
/// using the specified `handleParameters`. The specified `uri` and
587587
/// `clientContext` are ignored.
588-
void openQueue(BSLA_UNUSED const bmqt::Uri& uri,
588+
void openQueue(const bmqt::Uri& uri,
589589
const bsl::shared_ptr<mqbi::QueueHandleRequesterContext>&
590590
clientContext,
591591
const bmqp_ctrlmsg::QueueHandleParameters& handleParameters,
@@ -605,8 +605,15 @@ class MyMockDomain : public mqbmock::Domain {
605605
handleParameters,
606606
d_allocator_p);
607607

608-
OpenQueueConfirmationCookie confirmationCookie;
609-
confirmationCookie.createInplace(d_allocator_p, d_queueHandle.get());
608+
mqbi::OpenQueueConfirmationCookieSp confirmationCookie;
609+
confirmationCookie.createInplace(d_allocator_p);
610+
confirmationCookie->d_handle = d_queueHandle.get();
611+
612+
confirmationCookie->d_stats_sp.createInplace(d_allocator_p);
613+
confirmationCookie->d_stats_sp->initialize(
614+
uri,
615+
clientContext->statContext().get(),
616+
d_allocator_p);
610617

611618
bmqp_ctrlmsg::Status status(d_allocator_p);
612619
status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;
@@ -648,18 +655,18 @@ class TestBench {
648655

649656
public:
650657
// DATA
651-
bdlbb::PooledBlobBufferFactory d_bufferFactory;
652-
BlobSpPool d_blobSpPool;
658+
bdlbb::PooledBlobBufferFactory d_bufferFactory;
659+
BlobSpPool d_blobSpPool;
653660
bsl::shared_ptr<bmqio::TestChannel> d_channel;
654-
mqbmock::Cluster d_cluster;
655-
mqbmock::Dispatcher d_mockDispatcher;
656-
MyMockDomain d_domain;
657-
mqbmock::DomainFactory d_mockDomainFactory;
658-
bslma::ManagedPtr<bmqst::StatContext> d_clientStatContext_mp;
659-
bdlmt::EventScheduler d_scheduler;
660-
TestClock d_testClock;
661-
mqba::ClientSession d_cs;
662-
bslma::Allocator* d_allocator_p;
661+
mqbmock::Cluster d_cluster;
662+
mqbmock::Dispatcher d_mockDispatcher;
663+
MyMockDomain d_domain;
664+
mqbmock::DomainFactory d_mockDomainFactory;
665+
const bsl::shared_ptr<bmqst::StatContext> d_clientStatContext_sp;
666+
bdlmt::EventScheduler d_scheduler;
667+
TestClock d_testClock;
668+
mqba::ClientSession d_cs;
669+
bslma::Allocator* d_allocator_p;
663670

664671
static const int k_PAYLOAD_LENGTH = 36;
665672

@@ -682,9 +689,8 @@ class TestBench {
682689
, d_mockDispatcher(allocator)
683690
, d_domain(&d_mockDispatcher, &d_cluster, atMostOnce, allocator)
684691
, d_mockDomainFactory(d_domain, allocator)
685-
, d_clientStatContext_mp(
686-
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator)
687-
.managedPtr())
692+
, d_clientStatContext_sp(
693+
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator))
688694
, d_scheduler(bsls::SystemClockType::e_MONOTONIC, allocator)
689695
, d_testClock(d_scheduler)
690696
, d_cs(d_channel,
@@ -693,7 +699,7 @@ class TestBench {
693699
setInDispatcherThread(&d_mockDispatcher),
694700
0, // ClusterCatalog
695701
&d_mockDomainFactory,
696-
d_clientStatContext_mp,
702+
d_clientStatContext_sp,
697703
&d_blobSpPool,
698704
&d_bufferFactory,
699705
&d_scheduler,

0 commit comments

Comments
 (0)