Skip to content

Commit

Permalink
Fail reconfigure cmd on invalid auto subscription (#298)
Browse files Browse the repository at this point in the history
* Fail reconfigure cmd on invalid auto subscription

Signed-off-by: dorjesinpo <[email protected]>

* Addressing review

Signed-off-by: dorjesinpo <[email protected]>

* addressing review

Signed-off-by: dorjesinpo <[email protected]>

* merging two tests

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored May 28, 2024
1 parent 8f73dfc commit c5bfaac
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 13 deletions.
5 changes: 2 additions & 3 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,7 @@ int Application::processCommand(const bslstl::StringRef& source,
mqbcfg::BrokerConfig::get(),
options);
if (rc != 0) {
cmdResult.makeError();
cmdResult.error().message() = brokerConfigOs.str();
cmdResult.makeError().message() = brokerConfigOs.str();
}
else {
cmdResult.makeBrokerConfig();
Expand Down Expand Up @@ -632,7 +631,7 @@ int Application::processCommand(const bslstl::StringRef& source,
default: BSLS_ASSERT_SAFE(false && "Unsupported encoding");
}

return 0;
return result.isErrorValue() ? -2 : 0;
}

int Application::processCommandCb(
Expand Down
15 changes: 11 additions & 4 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,20 +686,27 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
}
}

DecodeAndUpsertValue unused;
DecodeAndUpsertValue configureResult;
d_configProvider_p->clearCache(domainSp->name());
d_configProvider_p->getDomainConfig(
domainSp->name(),
bdlf::BindUtil::bind(
&DomainManager::decodeAndUpsert,
this,
&unused,
&configureResult,
bdlf::PlaceHolders::_1, // configProviderStatus
bdlf::PlaceHolders::_2, // configProviderResult
domainSp->name(),
domainSp->cluster()->name()));
result->makeSuccess();
return 0; // RETURN

if (configureResult.isError()) {
result->makeError().message() = configureResult.error().d_details;
return -1; // RETURN
}
else {
result->makeSuccess();
return 0; // RETURN
}
}

mwcu::MemOutStream os;
Expand Down
59 changes: 53 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,48 @@ void afterAppIdUnregisteredDispatched(mqbi::Queue* queue,
mqbi::Storage::AppIdKeyPair(appId, mqbu::StorageKey()));
}

/// Validates an application subscription.
bool validdateSubscriptionExpression(bsl::ostream& errorDescription,
const mqbconfm::Expression& expression,
bslma::Allocator* allocator)
{
if (mqbconfm::ExpressionVersion::E_VERSION_1 == expression.version()) {
if (!expression.text().empty()) {
bmqeval::CompilationContext context(allocator);

if (!bmqeval::SimpleEvaluator::validate(expression.text(),
context)) {
errorDescription
<< "Expression validation failed: [ expression: "
<< expression << ", rc: " << context.lastError()
<< ", reason: \"" << context.lastErrorMessage() << "\" ]";
return false; // RETURN
}
}
}
else {
errorDescription << "Unsupported version: [ expression: " << expression
<< " ]";
return false; // RETURN
}

return true;
}

/// Validates a domain configuration. If `previousDefn` is provided, also
/// checks that the implied reconfiguration is also valid.
int validateConfig(bsl::ostream& errorDescription,
const bdlb::NullableValue<mqbconfm::Domain>& previousDefn,
const mqbconfm::Domain& newConfig)
const mqbconfm::Domain& newConfig,
bslma::Allocator* allocator)
{
enum RcEnum {
// Value for the various RC error categories
rc_SUCCESS = 0,
rc_NON_BLOOMBERG_CFG = -1,
rc_CHANGED_DOMAIN_MODE = -2,
rc_CHANGED_STORAGE_TYPE = -3
rc_CHANGED_STORAGE_TYPE = -3,
rc_INVALID_SUBSCRIPTION = -4
};

if (previousDefn.isNull()) {
Expand Down Expand Up @@ -136,7 +166,21 @@ int validateConfig(bsl::ostream& errorDescription,
return rc_CHANGED_STORAGE_TYPE; // RETURN
}

return 0;
// Validate newConfig.subscriptions()

bsl::size_t size = newConfig.subscriptions().size();
bool allSubscriptionsAreValid = true;

for (bsl::size_t i = 0; i < size; ++i) {
if (!validdateSubscriptionExpression(
errorDescription,
newConfig.subscriptions()[i].expression(),
allocator)) {
allSubscriptionsAreValid = false;
}
}

return allSubscriptionsAreValid ? 0 : rc_INVALID_SUBSCRIPTION;
}

/// Given a definition `defn` for `domain`, ensures that the values provided
Expand Down Expand Up @@ -429,7 +473,10 @@ int Domain::configure(bsl::ostream& errorDescription,
}

// Validate config. Return early if the configuration is not valid.
if (int rc = validateConfig(errorDescription, d_config, finalConfig)) {
if (const int rc = validateConfig(errorDescription,
d_config,
finalConfig,
d_allocator_p)) {
return (rc * 10 + rc_VALIDATION_FAILED); // RETURN
}

Expand Down Expand Up @@ -514,6 +561,7 @@ int Domain::configure(bsl::ostream& errorDescription,
<< " queues from "
"domain "
<< d_name;

QueueMap::iterator it = d_queues.begin();
for (; it != d_queues.end(); it++) {
bsl::function<int()> reconfigureQueueFn = bdlf::BindUtil::bind(
Expand All @@ -525,8 +573,7 @@ int Domain::configure(bsl::ostream& errorDescription,
d_dispatcher_p->execute(reconfigureQueueFn, cluster());
}
}
BALL_LOG_INFO << "Domain '" << d_name << "' successfully "
<< (isReconfigure ? "reconfigured" : "configured");
// 'wait==false', so the result of reconfiguration is not known

d_state = e_STARTED;
return rc_SUCCESS;
Expand Down
70 changes: 70 additions & 0 deletions src/integration-tests/test_auto_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,73 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster):
assert len(self.consumer.list(block=True)) == 0
assert len(self.consumer_bar.list(block=True)) == 0
assert len(self.consumer_baz.list(block=True)) == 0

@tweak.domain.subscriptions(
[
{
"appId": "",
"expression": {"version": "E_VERSION_1", "text": "invalid expression"},
}
]
)
def test_invalid_configuration(self, cluster: Cluster):
"""
Configure priority domain with invalid auto subscription.
Make sure a queue fails to open.
Reconfigure the domain with valid auto subscription.
Make sure a queue opens successfully.
Reconfigure the domain with invalid auto subscription.
Make sure the reconfigure command fails.
Make sure a queue opens successfully.
"""

proxies = cluster.proxy_cycle()

# 1: Setup producers and consumers

next(proxies)
proxy = next(proxies)

consumer = proxy.create_client("consumer")

consumer.open(
tc.URI_PRIORITY_SC,
flags=["read"],
consumer_priority=1,
succeed=False,
)

cluster.config.domains[
tc.DOMAIN_PRIORITY_SC
].definition.parameters.subscriptions[0]["expression"]["text"] = "x==1"

cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

consumer.open(
tc.URI_PRIORITY_SC,
flags=["read"],
consumer_priority=1,
succeed=True,
)

consumer.close(
tc.URI_PRIORITY_SC,
succeed=True,
)

cluster.config.domains[
tc.DOMAIN_PRIORITY_SC
].definition.parameters.subscriptions[0]["expression"][
"text"
] = "invalid expression"

cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=None)
assert cluster.last_known_leader.capture("Error processing command")

# The validation fails and the domain is going to keep the old config
consumer.open(
tc.URI_PRIORITY_SC,
flags=["read"],
consumer_priority=1,
succeed=True,
)

0 comments on commit c5bfaac

Please sign in to comment.