Skip to content

Commit

Permalink
configureStream conditionally (#301)
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored May 29, 2024
1 parent 0636f8f commit 6b10ab5
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 16 deletions.
30 changes: 17 additions & 13 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,27 +184,25 @@ BSLA_MAYBE_UNUSED
bool isConfigure(const bmqp_ctrlmsg::ControlMessage& request,
const bmqp_ctrlmsg::ControlMessage& response)
{
return bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION
? request.choice().isConfigureStreamValue() &&
response.choice().isConfigureStreamResponseValue()
: request.choice().isConfigureQueueStreamValue() &&
response.choice().isConfigureQueueStreamResponseValue();
return request.choice().isConfigureStreamValue()
? response.choice().isConfigureStreamResponseValue()
: request.choice().isConfigureQueueStreamValue()
? response.choice().isConfigureQueueStreamResponseValue()
: false;
}

BSLA_MAYBE_UNUSED
bool isConfigure(const bmqp_ctrlmsg::ControlMessage& request)
{
return bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION
? request.choice().isConfigureStreamValue()
: request.choice().isConfigureQueueStreamValue();
return request.choice().isConfigureStreamValue() ||
request.choice().isConfigureQueueStreamValue();
}

BSLA_MAYBE_UNUSED
bool isConfigureResponse(const bmqp_ctrlmsg::ControlMessage& request)
bool isConfigureResponse(const bmqp_ctrlmsg::ControlMessage& response)
{
return bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION
? request.choice().isConfigureStreamResponseValue()
: request.choice().isConfigureQueueStreamResponseValue();
return response.choice().isConfigureStreamResponseValue() ||
response.choice().isConfigureQueueStreamResponseValue();
}

void makeDeconfigure(bmqp_ctrlmsg::ControlMessage* request)
Expand Down Expand Up @@ -407,6 +405,11 @@ void BrokerSession::SessionFsm::setStarted(

// Post on the semaphore (to wake-up a sync 'start', if any)
d_session.d_startSemaphore.post();

// Temporary safety switch to control configure request.
d_session.d_channel_sp->properties().load(
&d_session.d_doConfigureStream,
NegotiatedChannelFactory::k_CHANNEL_PROPERTY_CONFIGURE_STREAM);
}

bmqt::GenericResult::Enum
Expand Down Expand Up @@ -5198,7 +5201,7 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue,
}
context->setGroupId(grId);

if (bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION) {
if (d_doConfigureStream) {
// Make ConfigureStream request
bmqp_ctrlmsg::ConfigureStream& configureStream =
context->request().choice().makeConfigureStream();
Expand Down Expand Up @@ -5640,6 +5643,7 @@ BrokerSession::BrokerSession(
, d_nextRequestGroupId(k_NON_BUFFERED_REQUEST_GROUP_ID)
, d_queueRetransmissionTimeoutMap(allocator)
, d_nextInternalSubscriptionId(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID)
, d_doConfigureStream(0)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_scheduler_p->clockType() ==
Expand Down
3 changes: 3 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.h
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,9 @@ class BrokerSession BSLS_CPP11_FINAL {
unsigned int d_nextInternalSubscriptionId;
// Assists generating unique ids for Configure requests.

int d_doConfigureStream;
// Temporary safety switch to control configure request.

private:
// NOT IMPLEMENTED
BrokerSession(const BrokerSession&);
Expand Down
5 changes: 2 additions & 3 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,8 @@ bool waitRealTime(bslmt::TimedSemaphore* sem)

bool isConfigure(const bmqp_ctrlmsg::ControlMessage& request)
{
return bmqscm::Version::versionAsInt() == bmqp::Protocol::k_DEV_VERSION
? request.choice().isConfigureStreamValue()
: request.choice().isConfigureQueueStreamValue();
return request.choice().isConfigureStreamValue() ||
request.choice().isConfigureQueueStreamValue();
}

void makeResponse(bmqp_ctrlmsg::ControlMessage* response,
Expand Down
11 changes: 11 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ NegotiatedChannelFactoryConfig::NegotiatedChannelFactoryConfig(
const char* NegotiatedChannelFactory::k_CHANNEL_PROPERTY_MPS_EX =
"broker.response.mps.ex";

/// Temporary safety switch to control configure request.
const char* NegotiatedChannelFactory::k_CHANNEL_PROPERTY_CONFIGURE_STREAM =
"broker.response.confgiure_stream";

// PRIVATE ACCESSORS
void NegotiatedChannelFactory::baseResultCallback(
const ResultCallback& userCb,
Expand Down Expand Up @@ -322,6 +326,13 @@ void NegotiatedChannelFactory::onBrokerNegotiationResponse(
channel->properties().set(k_CHANNEL_PROPERTY_MPS_EX, 1);
}

if (bmqp::ProtocolUtil::hasFeature(
bmqp::SubscriptionsFeatures::k_FIELD_NAME,
bmqp::SubscriptionsFeatures::k_CONFIGURE_STREAM,
response.brokerResponse().brokerIdentity().features())) {
channel->properties().set(k_CHANNEL_PROPERTY_CONFIGURE_STREAM, 1);
}

cb(mwcio::ChannelFactoryEvent::e_CHANNEL_UP, mwcio::Status(), channel);
}

Expand Down
3 changes: 3 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ class NegotiatedChannelFactory : public mwcio::ChannelFactory {
/// Temporary; shall remove after 2nd roll out of "new style" brokers.
static const char* k_CHANNEL_PROPERTY_MPS_EX;

/// Temporary safety switch to control configure request.
static const char* k_CHANNEL_PROPERTY_CONFIGURE_STREAM;

private:
// PRIVATE DATA
Config d_config;
Expand Down

0 comments on commit 6b10ab5

Please sign in to comment.