Skip to content

Commit

Permalink
Fail reconfigure cmd on invalid auto subscription
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed May 21, 2024
1 parent 9acb58e commit ce81937
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 12 deletions.
5 changes: 2 additions & 3 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,7 @@ int Application::processCommand(const bslstl::StringRef& source,
mqbcfg::BrokerConfig::get(),
options);
if (rc != 0) {
cmdResult.makeError();
cmdResult.error().message() = brokerConfigOs.str();
cmdResult.makeError().message() = brokerConfigOs.str();
}
else {
cmdResult.makeBrokerConfig();
Expand Down Expand Up @@ -632,7 +631,7 @@ int Application::processCommand(const bslstl::StringRef& source,
default: BSLS_ASSERT_SAFE(false && "Unsupported encoding");
}

return 0;
return result.isErrorValue() ? -2 : 0;
}

int Application::processCommandCb(
Expand Down
12 changes: 9 additions & 3 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,19 +686,25 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
}
}

DecodeAndUpsertValue unused;
DecodeAndUpsertValue configureResult;
d_configProvider_p->clearCache(domainSp->name());
d_configProvider_p->getDomainConfig(
domainSp->name(),
bdlf::BindUtil::bind(
&DomainManager::decodeAndUpsert,
this,
&unused,
&configureResult,
bdlf::PlaceHolders::_1, // configProviderStatus
bdlf::PlaceHolders::_2, // configProviderResult
domainSp->name(),
domainSp->cluster()->name()));
result->makeSuccess();

if (configureResult.isError()) {
result->makeError().message() = configureResult.error().d_details;
}
else {
result->makeSuccess();
}
return 0; // RETURN
}

Expand Down
50 changes: 44 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,16 @@ void afterAppIdUnregisteredDispatched(mqbi::Queue* queue,
/// checks that the implied reconfiguration is also valid.
int validateConfig(bsl::ostream& errorDescription,
const bdlb::NullableValue<mqbconfm::Domain>& previousDefn,
const mqbconfm::Domain& newConfig)
const mqbconfm::Domain& newConfig,
bslma::Allocator* allocator)
{
enum RcEnum {
// Value for the various RC error categories
rc_SUCCESS = 0,
rc_NON_BLOOMBERG_CFG = -1,
rc_CHANGED_DOMAIN_MODE = -2,
rc_CHANGED_STORAGE_TYPE = -3
rc_CHANGED_STORAGE_TYPE = -3,
rc_INVALID_SUBSCRIPTION = -4
};

if (previousDefn.isNull()) {
Expand Down Expand Up @@ -136,7 +138,40 @@ int validateConfig(bsl::ostream& errorDescription,
return rc_CHANGED_STORAGE_TYPE; // RETURN
}

return 0;
// Validate newConfig.subscriptions()

bsl::size_t size = newConfig.subscriptions().size();
bool result = true;

for (bsl::size_t i = 0; i < size; ++i) {
const mqbconfm::Expression& expression =
newConfig.subscriptions()[i].expression();

if (mqbconfm::ExpressionVersion::E_VERSION_1 == expression.version()) {
if (expression.text().length()) {
bmqeval::CompilationContext context(allocator);

if (!bmqeval::SimpleEvaluator::validate(expression.text(),
context)) {
errorDescription
<< "Expression validation failed: [ expression: "
<< expression << ", rc: " << context.lastError()
<< ", reason: \"" << context.lastErrorMessage()
<< "\" ]";
result = false;
}
}
}
else {
if (expression.text().length()) {
errorDescription << "Invalid Expression: [ expression: "
<< expression;
result = false;
}
}
}

return result ? 0 : rc_INVALID_SUBSCRIPTION;
}

/// Given a definition `defn` for `domain`, ensures that the values provided
Expand Down Expand Up @@ -429,7 +464,10 @@ int Domain::configure(bsl::ostream& errorDescription,
}

// Validate config. Return early if the configuration is not valid.
if (int rc = validateConfig(errorDescription, d_config, finalConfig)) {
if (int rc = validateConfig(errorDescription,
d_config,
finalConfig,
d_allocator_p)) {
return (rc * 10 + rc_VALIDATION_FAILED); // RETURN
}

Expand Down Expand Up @@ -514,6 +552,7 @@ int Domain::configure(bsl::ostream& errorDescription,
<< " queues from "
"domain "
<< d_name;

QueueMap::iterator it = d_queues.begin();
for (; it != d_queues.end(); it++) {
bsl::function<int()> reconfigureQueueFn = bdlf::BindUtil::bind(
Expand All @@ -525,8 +564,7 @@ int Domain::configure(bsl::ostream& errorDescription,
d_dispatcher_p->execute(reconfigureQueueFn, cluster());
}
}
BALL_LOG_INFO << "Domain '" << d_name << "' successfully "
<< (isReconfigure ? "reconfigured" : "configured");
// 'wait==false', so the result of reconfiguration is not known

d_state = e_STARTED;
return rc_SUCCESS;
Expand Down
93 changes: 93 additions & 0 deletions src/integration-tests/test_auto_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,96 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster):
assert len(self.consumer.list(block=True)) == 0
assert len(self.consumer_bar.list(block=True)) == 0
assert len(self.consumer_baz.list(block=True)) == 0

@tweak.domain.subscriptions(
[{"appId": "", "expression": {"version": "E_VERSION_1", "text": "invalid expression"}}]
[
{
"appId": "",
"expression": {"version": "E_VERSION_1", "text": "invalid expression"},
}
]
)
def test_configure_invalid(self, cluster: Cluster):
"""
Configure the priority queue to evaluate auto subscription negatively.
Make sure the queue does not get the message.
Make sure the same is the case after restarts.
"""

proxies = cluster.proxy_cycle()

# 1: Setup producers and consumers

next(proxies)
proxy = next(proxies)

consumer = proxy.create_client("consumer")

consumer.open(
tc.URI_PRIORITY_SC,
flags=["read"],
consumer_priority=1,
succeed=False,
)

cluster.config.domains[
tc.DOMAIN_PRIORITY_SC
].definition.parameters.subscriptions[0]["expression"]["text"] = "x==1"

cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

consumer.open(
tc.URI_PRIORITY_SC,
flags=["read"],
consumer_priority=1,
succeed=True,
)

@tweak.domain.subscriptions(
[{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}]
)
def test_reconfigure_invalid(self, cluster: Cluster):
"""
Configure the priority queue to evaluate auto subscription negatively.
Make sure the queue does not get the message.
Make sure the same is the case after restarts.
"""

proxies = cluster.proxy_cycle()

# 1: Setup producers and consumers

next(proxies)
proxy = next(proxies)

consumer = proxy.create_client("consumer")

consumer.open(
tc.URI_PRIORITY_SC,
flags=["read"],
consumer_priority=1,
succeed=True,
)

consumer.close(
tc.URI_PRIORITY_SC,
succeed=True,
)

cluster.config.domains[
tc.DOMAIN_PRIORITY_SC
].definition.parameters.subscriptions[0]["expression"][
"text"
] = "invalid expression"

cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=None)
# TODO: why not succeed=False

# The validation fails and the domain is going to keep the old config
consumer.open(
tc.URI_PRIORITY_SC,
flags=["read"],
consumer_priority=1,
succeed=True,
)

0 comments on commit ce81937

Please sign in to comment.