Skip to content

Commit 1ee45ff

Browse files
committed
Inline Put, Push, Ack, Confirm
Signed-off-by: dorjesinpo <[email protected]>
1 parent d9c94a3 commit 1ee45ff

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1871
-1649
lines changed

src/groups/mqb/mqba/mqba_clientsession.cpp

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -356,16 +356,16 @@ struct BuildAckOverflowFunctor {
356356
// -------------------------
357357

358358
ClientSessionState::ClientSessionState(
359-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
360-
BlobSpPool* blobSpPool,
361-
bdlbb::BlobBufferFactory* bufferFactory,
362-
bmqp::EncodingType::Enum encodingType,
363-
bslma::Allocator* allocator)
359+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
360+
BlobSpPool* blobSpPool,
361+
bdlbb::BlobBufferFactory* bufferFactory,
362+
bmqp::EncodingType::Enum encodingType,
363+
bslma::Allocator* allocator)
364364
: d_allocator_p(allocator)
365365
, d_channelBufferQueue(allocator)
366366
, d_unackedMessageInfos(d_allocator_p)
367367
, d_dispatcherClientData()
368-
, d_statContext_mp(clientStatContext)
368+
, d_statContext_sp(clientStatContext)
369369
, d_bufferFactory_p(bufferFactory)
370370
, d_blobSpPool_p(blobSpPool)
371371
, d_schemaEventBuilder(blobSpPool, encodingType, allocator)
@@ -673,7 +673,7 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,
673673
queueStats = invalidQueueStats();
674674
}
675675
else {
676-
queueStats = subQueueCiter->value().d_stats.get();
676+
queueStats = subQueueCiter->value().d_stats_sp.get();
677677
}
678678
}
679679

@@ -1916,23 +1916,10 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
19161916
return false; // RETURN
19171917
}
19181918

1919-
StreamsMap::iterator subQueueIt =
1920-
queueIt->second.d_subQueueInfosMap.findBySubIdSafe(queueId.subId());
1921-
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
1922-
subQueueIt == queueIt->second.d_subQueueInfosMap.end())) {
1923-
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
1924-
1925-
if (eventType == bmqp::EventType::e_CONFIRM) {
1926-
// Update invalid queue stats
1927-
invalidQueueStats()->onEvent(
1928-
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1929-
1);
1930-
}
1931-
1932-
*errorStream << "for an unknown subQueueId";
1933-
1934-
return false; // RETURN
1935-
}
1919+
// Do not lookup 'queueId.subId()'.
1920+
// 'QueueHandle::confirmMessageDispatched' does the check.
1921+
// Note, that it does not update stats (on "bmq://invalid/queue").
1922+
// It does log warnings.
19361923

19371924
*queueHandle = queueIt->second.d_handle_p;
19381925
BSLS_ASSERT_SAFE(queueHandle);
@@ -1978,13 +1965,6 @@ bool ClientSession::validateMessage(mqbi::QueueHandle** queueHandle,
19781965
return false; // RETURN
19791966
}
19801967

1981-
if (eventType == bmqp::EventType::e_CONFIRM) {
1982-
// Update stats for the queue (or subStream of the queue)
1983-
subQueueIt->value().d_stats->onEvent(
1984-
mqbstat::QueueStatsClient::EventType::e_CONFIRM,
1985-
1);
1986-
}
1987-
19881968
return true;
19891969
}
19901970

@@ -2126,7 +2106,7 @@ void ClientSession::onPushEvent(const mqbi::DispatcherPushEvent& event)
21262106
blob->length());
21272107
}
21282108
else {
2129-
subQueueCiter->value().d_stats->onEvent(
2109+
subQueueCiter->value().onEvent(
21302110
mqbstat::QueueStatsClient::EventType::e_PUSH,
21312111
blob->length());
21322112
}
@@ -2277,9 +2257,8 @@ void ClientSession::onPutEvent(const mqbi::DispatcherPutEvent& event)
22772257
BSLS_ASSERT_SAFE(queueStatePtr && subQueueInfoPtr);
22782258
BSLS_ASSERT_SAFE(queueStatePtr->d_handle_p);
22792259

2280-
subQueueInfoPtr->d_stats->onEvent(
2281-
mqbstat::QueueStatsClient::EventType::e_PUT,
2282-
appDataSp->length());
2260+
subQueueInfoPtr->onEvent(mqbstat::QueueStatsClient::EventType::e_PUT,
2261+
appDataSp->length());
22832262

22842263
const bool isAtMostOnce =
22852264
queueStatePtr->d_handle_p->queue()->isAtMostOnce();
@@ -2446,7 +2425,7 @@ mqbstat::QueueStatsClient* ClientSession::invalidQueueStats()
24462425
d_state.d_invalidQueueStats.makeValue();
24472426
d_state.d_invalidQueueStats.value().initialize(
24482427
"bmq://invalid/queue",
2449-
d_state.d_statContext_mp.get(),
2428+
d_state.d_statContext_sp.get(),
24502429
d_state.d_allocator_p);
24512430
// TBD: The queue uri should be '** INVALID QUEUE **', but that can
24522431
// only be done once the stats UI panel has been updated to
@@ -2649,17 +2628,17 @@ bool ClientSession::validatePutMessage(QueueState** queueState,
26492628

26502629
// CREATORS
26512630
ClientSession::ClientSession(
2652-
const bsl::shared_ptr<bmqio::Channel>& channel,
2653-
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
2654-
const bsl::string& sessionDescription,
2655-
mqbi::Dispatcher* dispatcher,
2656-
mqbblp::ClusterCatalog* clusterCatalog,
2657-
mqbi::DomainFactory* domainFactory,
2658-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
2659-
ClientSessionState::BlobSpPool* blobSpPool,
2660-
bdlbb::BlobBufferFactory* bufferFactory,
2661-
bdlmt::EventScheduler* scheduler,
2662-
bslma::Allocator* allocator)
2631+
const bsl::shared_ptr<bmqio::Channel>& channel,
2632+
const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage,
2633+
const bsl::string& sessionDescription,
2634+
mqbi::Dispatcher* dispatcher,
2635+
mqbblp::ClusterCatalog* clusterCatalog,
2636+
mqbi::DomainFactory* domainFactory,
2637+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
2638+
ClientSessionState::BlobSpPool* blobSpPool,
2639+
bdlbb::BlobBufferFactory* bufferFactory,
2640+
bdlmt::EventScheduler* scheduler,
2641+
bslma::Allocator* allocator)
26632642
: d_self(this) // use default allocator
26642643
, d_operationState(e_RUNNING)
26652644
, d_isDisconnecting(false)
@@ -2680,7 +2659,7 @@ ClientSession::ClientSession(
26802659
allocator)
26812660
, d_queueSessionManager(this,
26822661
*d_clientIdentity_p,
2683-
d_state.d_statContext_mp.get(),
2662+
d_state.d_statContext_sp,
26842663
domainFactory,
26852664
allocator)
26862665
, d_clusterCatalog_p(clusterCatalog)

src/groups/mqb/mqba/mqba_clientsession.h

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,6 @@ struct ClientSessionState {
149149
typedef bsl::pair<UnackedMessageInfoMap::iterator, bool>
150150
UnackedMessageInfoMapInsertRc;
151151

152-
typedef bslma::ManagedPtr<bmqst::StatContext> StatContextMp;
153-
154152
public:
155153
// PUBLIC DATA
156154

@@ -173,7 +171,7 @@ struct ClientSessionState {
173171

174172
/// Stat context dedicated to this domain, to use as the parent stat
175173
/// context for any queue in this domain.
176-
StatContextMp d_statContext_mp;
174+
const bsl::shared_ptr<bmqst::StatContext> d_statContext_sp;
177175

178176
/// Blob buffer factory to use.
179177
///
@@ -225,11 +223,11 @@ struct ClientSessionState {
225223
/// builder will use. Memory allocations are performed using the
226224
/// specified `allocator`.
227225
ClientSessionState(
228-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
229-
BlobSpPool* blobSpPool,
230-
bdlbb::BlobBufferFactory* bufferFactory,
231-
bmqp::EncodingType::Enum encodingType,
232-
bslma::Allocator* allocator);
226+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
227+
BlobSpPool* blobSpPool,
228+
bdlbb::BlobBufferFactory* bufferFactory,
229+
bmqp::EncodingType::Enum encodingType,
230+
bslma::Allocator* allocator);
233231
};
234232

235233
// ===================
@@ -632,11 +630,11 @@ class ClientSession : public mqbnet::Session,
632630
mqbi::Dispatcher* dispatcher,
633631
mqbblp::ClusterCatalog* clusterCatalog,
634632
mqbi::DomainFactory* domainFactory,
635-
bslma::ManagedPtr<bmqst::StatContext>& clientStatContext,
636-
ClientSessionState::BlobSpPool* blobSpPool,
637-
bdlbb::BlobBufferFactory* bufferFactory,
638-
bdlmt::EventScheduler* scheduler,
639-
bslma::Allocator* allocator);
633+
const bsl::shared_ptr<bmqst::StatContext>& clientStatContext,
634+
ClientSessionState::BlobSpPool* blobSpPool,
635+
bdlbb::BlobBufferFactory* bufferFactory,
636+
bdlmt::EventScheduler* scheduler,
637+
bslma::Allocator* allocator);
640638

641639
/// Destructor
642640
~ClientSession() BSLS_KEYWORD_OVERRIDE;

src/groups/mqb/mqba/mqba_clientsession.t.cpp

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ class MyMockDomain : public mqbmock::Domain {
585585
/// calls the specified `callback` with a new queue handle created
586586
/// using the specified `handleParameters`. The specified `uri` and
587587
/// `clientContext` are ignored.
588-
void openQueue(BSLA_UNUSED const bmqt::Uri& uri,
588+
void openQueue(const bmqt::Uri& uri,
589589
const bsl::shared_ptr<mqbi::QueueHandleRequesterContext>&
590590
clientContext,
591591
const bmqp_ctrlmsg::QueueHandleParameters& handleParameters,
@@ -605,8 +605,15 @@ class MyMockDomain : public mqbmock::Domain {
605605
handleParameters,
606606
d_allocator_p);
607607

608-
OpenQueueConfirmationCookie confirmationCookie;
609-
confirmationCookie.createInplace(d_allocator_p, d_queueHandle.get());
608+
mqbi::OpenQueueConfirmationCookieSp confirmationCookie;
609+
confirmationCookie.createInplace(d_allocator_p);
610+
confirmationCookie->d_handle = d_queueHandle.get();
611+
612+
confirmationCookie->d_stats_sp.createInplace(d_allocator_p);
613+
confirmationCookie->d_stats_sp->initialize(
614+
uri,
615+
clientContext->statContext().get(),
616+
d_allocator_p);
610617

611618
bmqp_ctrlmsg::Status status(d_allocator_p);
612619
status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;
@@ -648,18 +655,18 @@ class TestBench {
648655

649656
public:
650657
// DATA
651-
bdlbb::PooledBlobBufferFactory d_bufferFactory;
652-
BlobSpPool d_blobSpPool;
658+
bdlbb::PooledBlobBufferFactory d_bufferFactory;
659+
BlobSpPool d_blobSpPool;
653660
bsl::shared_ptr<bmqio::TestChannel> d_channel;
654-
mqbmock::Cluster d_cluster;
655-
mqbmock::Dispatcher d_mockDispatcher;
656-
MyMockDomain d_domain;
657-
mqbmock::DomainFactory d_mockDomainFactory;
658-
bslma::ManagedPtr<bmqst::StatContext> d_clientStatContext_mp;
659-
bdlmt::EventScheduler d_scheduler;
660-
TestClock d_testClock;
661-
mqba::ClientSession d_cs;
662-
bslma::Allocator* d_allocator_p;
661+
mqbmock::Cluster d_cluster;
662+
mqbmock::Dispatcher d_mockDispatcher;
663+
MyMockDomain d_domain;
664+
mqbmock::DomainFactory d_mockDomainFactory;
665+
const bsl::shared_ptr<bmqst::StatContext> d_clientStatContext_sp;
666+
bdlmt::EventScheduler d_scheduler;
667+
TestClock d_testClock;
668+
mqba::ClientSession d_cs;
669+
bslma::Allocator* d_allocator_p;
663670

664671
static const int k_PAYLOAD_LENGTH = 36;
665672

@@ -682,9 +689,8 @@ class TestBench {
682689
, d_mockDispatcher(allocator)
683690
, d_domain(&d_mockDispatcher, &d_cluster, atMostOnce, allocator)
684691
, d_mockDomainFactory(d_domain, allocator)
685-
, d_clientStatContext_mp(
686-
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator)
687-
.managedPtr())
692+
, d_clientStatContext_sp(
693+
mqbstat::QueueStatsUtil::initializeStatContextClients(10, allocator))
688694
, d_scheduler(bsls::SystemClockType::e_MONOTONIC, allocator)
689695
, d_testClock(d_scheduler)
690696
, d_cs(d_channel,
@@ -693,7 +699,7 @@ class TestBench {
693699
setInDispatcherThread(&d_mockDispatcher),
694700
0, // ClusterCatalog
695701
&d_mockDomainFactory,
696-
d_clientStatContext_mp,
702+
d_clientStatContext_sp,
697703
&d_blobSpPool,
698704
&d_bufferFactory,
699705
&d_scheduler,

0 commit comments

Comments
 (0)