Skip to content

Commit b4224de

Browse files
committed
add reauthn
Signed-off-by: Emelia Lei <[email protected]>
1 parent d0373f9 commit b4224de

File tree

3 files changed

+78
-6
lines changed

3 files changed

+78
-6
lines changed

src/groups/bmq/bmqimp/bmqimp_application.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,21 @@ int Application::start(const bsls::TimeInterval& timeout)
720720
<< "::: START (SYNC) << [state: " << d_brokerSession.state()
721721
<< "] :::";
722722

723-
return d_brokerSession.start(timeout);
723+
int rc = d_initialConnectionChannelFactory.start();
724+
if (rc != 0) {
725+
BALL_LOG_ERROR << id()
726+
<< "Failed to start initialConnectionChannelFactory "
727+
<< "[rc: " << rc << "]";
728+
return rc; // RETURN
729+
}
730+
731+
rc = d_brokerSession.start(timeout);
732+
if (rc != 0) {
733+
d_initialConnectionChannelFactory.stop();
734+
return rc; // RETURN
735+
}
736+
737+
return rc;
724738
}
725739

726740
int Application::startAsync(const bsls::TimeInterval& timeout)
@@ -755,6 +769,9 @@ void Application::stop()
755769

756770
// Stop the brokerSession
757771
d_brokerSession.stop();
772+
773+
// Stop the channel factories
774+
d_initialConnectionChannelFactory.stop();
758775
}
759776

760777
void Application::stopAsync()

src/groups/bmq/bmqimp/bmqimp_initialconnectionchannelfactory.cpp

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <bmqp_event.h>
2222
#include <bmqp_protocol.h>
2323
#include <bmqp_schemaeventbuilder.h>
24+
#include <bmqsys_time.h>
2425

2526
#include <bmqio_channelutil.h>
2627
#include <bmqu_blob.h>
@@ -32,12 +33,14 @@
3233
#include <bdlf_bind.h>
3334
#include <bdlf_placeholder.h>
3435
#include <bdlma_localsequentialallocator.h>
36+
#include <bsl_algorithm.h>
3537
#include <bsl_iostream.h>
3638
#include <bsl_memory.h>
3739
#include <bsla_annotations.h>
3840
#include <bsla_unused.h>
3941
#include <bslma_default.h>
4042
#include <bsls_assert.h>
43+
#include <bsls_timeinterval.h>
4144

4245
namespace BloombergLP {
4346
namespace bmqimp {
@@ -277,7 +280,7 @@ void InitialConnectionChannelFactory::readPacketsCb(
277280
const ResultCallback& cb,
278281
const bmqio::Status& status,
279282
int* numNeeded,
280-
bdlbb::Blob* blob) const
283+
bdlbb::Blob* blob)
281284
{
282285
BALL_LOG_INFO << "At readPacketsCb";
283286

@@ -441,7 +444,7 @@ int InitialConnectionChannelFactory::decodeInitialConnectionMessage(
441444
void InitialConnectionChannelFactory::onBrokerAuthenticationResponse(
442445
const bmqp_ctrlmsg::AuthenticationMessage& response,
443446
const ResultCallback& cb,
444-
const bsl::shared_ptr<bmqio::Channel>& channel) const
447+
const bsl::shared_ptr<bmqio::Channel>& channel)
445448
{
446449
const bmqp_ctrlmsg::AuthenticateResponse& authenticateResponse =
447450
response.authenticateResponse();
@@ -461,7 +464,22 @@ void InitialConnectionChannelFactory::onBrokerAuthenticationResponse(
461464
// Authentication SUCCEEDED
462465
BALL_LOG_INFO << "Authentication with broker was successful: " << response;
463466

464-
// TODO: reauthenticate
467+
// Schedule recurring reauthentication events if lifetime is specified in
468+
// the response.
469+
if (authenticateResponse.lifetimeMs()) {
470+
int lifetimeMs = authenticateResponse.lifetimeMs().value();
471+
int intervalMs = bsl::min(lifetimeMs - k_REAUTHN_EARLY_BUFFER,
472+
lifetimeMs * k_REAUTHN_EARLY_RATIO);
473+
d_scheduler.scheduleRecurringEvent(
474+
&d_authnEventHandle,
475+
bsls::TimeInterval(intervalMs),
476+
bdlf::BindUtil::bind(
477+
bmqu::WeakMemFnUtil::weakMemFn(
478+
&InitialConnectionChannelFactory::authenticate,
479+
d_self.acquireWeak()),
480+
channel,
481+
cb));
482+
}
465483

466484
negotiate(channel, cb);
467485
}
@@ -549,6 +567,8 @@ InitialConnectionChannelFactory::InitialConnectionChannelFactory(
549567
const Config& config,
550568
bslma::Allocator* basicAllocator)
551569
: d_config(config, basicAllocator)
570+
, d_scheduler(bsls::SystemClockType::e_MONOTONIC, basicAllocator)
571+
, d_authnEventHandle()
552572
, d_self(this) // use default allocator
553573
{
554574
// NOTHING
@@ -561,6 +581,22 @@ InitialConnectionChannelFactory::~InitialConnectionChannelFactory()
561581
d_self.invalidate();
562582
}
563583

584+
int InitialConnectionChannelFactory::start()
585+
{
586+
int rc = d_scheduler.start();
587+
if (rc != 0) {
588+
BALL_LOG_ERROR << "Failed to start event scheduler [rc: " << rc << "]";
589+
}
590+
return rc;
591+
}
592+
593+
void InitialConnectionChannelFactory::stop()
594+
{
595+
// Cancel any scheduled reauthentication events
596+
d_scheduler.cancelEvent(&d_authnEventHandle);
597+
d_scheduler.stop();
598+
}
599+
564600
// MANIPULATORS
565601
void InitialConnectionChannelFactory::listen(
566602
BSLA_UNUSED bmqio::Status* status,

src/groups/bmq/bmqimp/bmqimp_initialconnectionchannelfactory.h

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
// BDE
4343
#include <bdlbb_blob.h>
44+
#include <bdlmt_eventscheduler.h>
4445
#include <bsl_functional.h>
4546
#include <bslma_allocator.h>
4647
#include <bslma_usesbslmaallocator.h>
@@ -104,6 +105,8 @@ class InitialConnectionChannelFactory : public bmqio::ChannelFactory {
104105
// TYPES
105106
typedef InitialConnectionChannelFactoryConfig Config;
106107

108+
typedef bdlmt::EventScheduler::RecurringEventHandle EventHandle;
109+
107110
// CONSTANTS
108111

109112
/// Name of a property set on the channel representing the broker's
@@ -118,13 +121,25 @@ class InitialConnectionChannelFactory : public bmqio::ChannelFactory {
118121

119122
static const char* k_CHANNEL_PROPERTY_MAX_MISSED_HEARTBEATS;
120123

124+
// Minimum buffer to subtract from lifetimeMs to avoid cutting too close
125+
const int k_REAUTHN_EARLY_BUFFER = 1000; // 1 second
126+
127+
// Proportion of lifetimeMs after which to initiate reauthentication.
128+
const int k_REAUTHN_EARLY_RATIO = 0.9;
129+
121130
private:
122131
// TYPES
123132
enum ACTION { AUTHENTICATION = 0, NEGOTIATION = 1 };
124133

125134
// PRIVATE DATA
126135
Config d_config;
127136

137+
// Used to schedule events for sending reauthentication requests.
138+
bdlmt::EventScheduler d_scheduler;
139+
140+
// Event handle for reauthentication events.
141+
EventHandle d_authnEventHandle;
142+
128143
// Used to make sure no callback is invoked on a destroyed object.
129144
mutable bmqu::SharedResource<InitialConnectionChannelFactory> d_self;
130145

@@ -167,7 +182,7 @@ class InitialConnectionChannelFactory : public bmqio::ChannelFactory {
167182
const ResultCallback& cb,
168183
const bmqio::Status& status,
169184
int* numNeeded,
170-
bdlbb::Blob* blob) const;
185+
bdlbb::Blob* blob);
171186

172187
int decodeInitialConnectionMessage(
173188
const bdlbb::Blob& packet,
@@ -179,7 +194,7 @@ class InitialConnectionChannelFactory : public bmqio::ChannelFactory {
179194
void onBrokerAuthenticationResponse(
180195
const bmqp_ctrlmsg::AuthenticationMessage& response,
181196
const ResultCallback& cb,
182-
const bsl::shared_ptr<bmqio::Channel>& channel) const;
197+
const bsl::shared_ptr<bmqio::Channel>& channel);
183198

184199
void onBrokerNegotiationResponse(
185200
const bmqp_ctrlmsg::NegotiationMessage& response,
@@ -196,6 +211,10 @@ class InitialConnectionChannelFactory : public bmqio::ChannelFactory {
196211

197212
public:
198213
// MANIPULATORS
214+
int start();
215+
216+
void stop();
217+
199218
void listen(bmqio::Status* status,
200219
bslma::ManagedPtr<OpHandle>* handle,
201220
const bmqio::ListenOptions& options,

0 commit comments

Comments
 (0)