From 6571134750026978f8a48d695b4b63e9e7ea27a7 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Tue, 21 May 2024 18:55:19 -0400 Subject: [PATCH 1/4] Fail reconfigure cmd on invalid auto subscription Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/mqb/mqba/mqba_application.cpp | 5 +- src/groups/mqb/mqba/mqba_domainmanager.cpp | 12 ++- src/groups/mqb/mqbblp/mqbblp_domain.cpp | 50 ++++++++-- .../test_auto_subscriptions.py | 92 +++++++++++++++++++ 4 files changed, 147 insertions(+), 12 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index d014249ad7..120867fa9c 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -599,8 +599,7 @@ int Application::processCommand(const bslstl::StringRef& source, mqbcfg::BrokerConfig::get(), options); if (rc != 0) { - cmdResult.makeError(); - cmdResult.error().message() = brokerConfigOs.str(); + cmdResult.makeError().message() = brokerConfigOs.str(); } else { cmdResult.makeBrokerConfig(); @@ -632,7 +631,7 @@ int Application::processCommand(const bslstl::StringRef& source, default: BSLS_ASSERT_SAFE(false && "Unsupported encoding"); } - return 0; + return result.isErrorValue() ? -2 : 0; } int Application::processCommandCb( diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index f59930cbb7..3299bf7d0c 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -686,19 +686,25 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, } } - DecodeAndUpsertValue unused; + DecodeAndUpsertValue configureResult; d_configProvider_p->clearCache(domainSp->name()); d_configProvider_p->getDomainConfig( domainSp->name(), bdlf::BindUtil::bind( &DomainManager::decodeAndUpsert, this, - &unused, + &configureResult, bdlf::PlaceHolders::_1, // configProviderStatus bdlf::PlaceHolders::_2, // configProviderResult domainSp->name(), domainSp->cluster()->name())); - result->makeSuccess(); + + if (configureResult.isError()) { + result->makeError().message() = configureResult.error().d_details; + } + else { + result->makeSuccess(); + } return 0; // RETURN } diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index b4999ec0e3..3532de9a5e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -99,14 +99,16 @@ void afterAppIdUnregisteredDispatched(mqbi::Queue* queue, /// checks that the implied reconfiguration is also valid. int validateConfig(bsl::ostream& errorDescription, const bdlb::NullableValue& previousDefn, - const mqbconfm::Domain& newConfig) + const mqbconfm::Domain& newConfig, + bslma::Allocator* allocator) { enum RcEnum { // Value for the various RC error categories rc_SUCCESS = 0, rc_NON_BLOOMBERG_CFG = -1, rc_CHANGED_DOMAIN_MODE = -2, - rc_CHANGED_STORAGE_TYPE = -3 + rc_CHANGED_STORAGE_TYPE = -3, + rc_INVALID_SUBSCRIPTION = -4 }; if (previousDefn.isNull()) { @@ -136,7 +138,40 @@ int validateConfig(bsl::ostream& errorDescription, return rc_CHANGED_STORAGE_TYPE; // RETURN } - return 0; + // Validate newConfig.subscriptions() + + bsl::size_t size = newConfig.subscriptions().size(); + bool result = true; + + for (bsl::size_t i = 0; i < size; ++i) { + const mqbconfm::Expression& expression = + newConfig.subscriptions()[i].expression(); + + if (mqbconfm::ExpressionVersion::E_VERSION_1 == expression.version()) { + if (expression.text().length()) { + bmqeval::CompilationContext context(allocator); + + if (!bmqeval::SimpleEvaluator::validate(expression.text(), + context)) { + errorDescription + << "Expression validation failed: [ expression: " + << expression << ", rc: " << context.lastError() + << ", reason: \"" << context.lastErrorMessage() + << "\" ]"; + result = false; + } + } + } + else { + if (expression.text().length()) { + errorDescription << "Invalid Expression: [ expression: " + << expression; + result = false; + } + } + } + + return result ? 0 : rc_INVALID_SUBSCRIPTION; } /// Given a definition `defn` for `domain`, ensures that the values provided @@ -429,7 +464,10 @@ int Domain::configure(bsl::ostream& errorDescription, } // Validate config. Return early if the configuration is not valid. - if (int rc = validateConfig(errorDescription, d_config, finalConfig)) { + if (int rc = validateConfig(errorDescription, + d_config, + finalConfig, + d_allocator_p)) { return (rc * 10 + rc_VALIDATION_FAILED); // RETURN } @@ -514,6 +552,7 @@ int Domain::configure(bsl::ostream& errorDescription, << " queues from " "domain " << d_name; + QueueMap::iterator it = d_queues.begin(); for (; it != d_queues.end(); it++) { bsl::function reconfigureQueueFn = bdlf::BindUtil::bind( @@ -525,8 +564,7 @@ int Domain::configure(bsl::ostream& errorDescription, d_dispatcher_p->execute(reconfigureQueueFn, cluster()); } } - BALL_LOG_INFO << "Domain '" << d_name << "' successfully " - << (isReconfigure ? "reconfigured" : "configured"); + // 'wait==false', so the result of reconfiguration is not known d_state = e_STARTED; return rc_SUCCESS; diff --git a/src/integration-tests/test_auto_subscriptions.py b/src/integration-tests/test_auto_subscriptions.py index 97dcf44a99..5d31f680a0 100644 --- a/src/integration-tests/test_auto_subscriptions.py +++ b/src/integration-tests/test_auto_subscriptions.py @@ -408,3 +408,95 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster): assert len(self.consumer.list(block=True)) == 0 assert len(self.consumer_bar.list(block=True)) == 0 assert len(self.consumer_baz.list(block=True)) == 0 + + @tweak.domain.subscriptions( + [ + { + "appId": "", + "expression": {"version": "E_VERSION_1", "text": "invalid expression"}, + } + ] + ) + def test_configure_invalid(self, cluster: Cluster): + """ + Configure the priority queue to evaluate auto subscription negatively. + Make sure the queue does not get the message. + Make sure the same is the case after restarts. + """ + + proxies = cluster.proxy_cycle() + + # 1: Setup producers and consumers + + next(proxies) + proxy = next(proxies) + + consumer = proxy.create_client("consumer") + + consumer.open( + tc.URI_PRIORITY_SC, + flags=["read"], + consumer_priority=1, + succeed=False, + ) + + cluster.config.domains[ + tc.DOMAIN_PRIORITY_SC + ].definition.parameters.subscriptions[0]["expression"]["text"] = "x==1" + + cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True) + + consumer.open( + tc.URI_PRIORITY_SC, + flags=["read"], + consumer_priority=1, + succeed=True, + ) + + @tweak.domain.subscriptions( + [{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}] + ) + def test_reconfigure_invalid(self, cluster: Cluster): + """ + Configure the priority queue to evaluate auto subscription negatively. + Make sure the queue does not get the message. + Make sure the same is the case after restarts. + """ + + proxies = cluster.proxy_cycle() + + # 1: Setup producers and consumers + + next(proxies) + proxy = next(proxies) + + consumer = proxy.create_client("consumer") + + consumer.open( + tc.URI_PRIORITY_SC, + flags=["read"], + consumer_priority=1, + succeed=True, + ) + + consumer.close( + tc.URI_PRIORITY_SC, + succeed=True, + ) + + cluster.config.domains[ + tc.DOMAIN_PRIORITY_SC + ].definition.parameters.subscriptions[0]["expression"][ + "text" + ] = "invalid expression" + + cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=None) + # TODO: why not succeed=False + + # The validation fails and the domain is going to keep the old config + consumer.open( + tc.URI_PRIORITY_SC, + flags=["read"], + consumer_priority=1, + succeed=True, + ) From 2eb05f817b6580e2b11764ec79ada8e6c711f7bc Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Wed, 22 May 2024 10:32:23 -0400 Subject: [PATCH 2/4] Addressing review Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/mqb/mqba/mqba_domainmanager.cpp | 3 ++- src/groups/mqb/mqbblp/mqbblp_domain.cpp | 24 +++++++++---------- .../test_auto_subscriptions.py | 17 +++++++------ 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index 3299bf7d0c..d02c8edaab 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -701,11 +701,12 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, if (configureResult.isError()) { result->makeError().message() = configureResult.error().d_details; + return -1; // RETURN } else { result->makeSuccess(); + return 0; // RETURN } - return 0; // RETURN } mwcu::MemOutStream os; diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 3532de9a5e..4e75ce8205 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -140,8 +140,8 @@ int validateConfig(bsl::ostream& errorDescription, // Validate newConfig.subscriptions() - bsl::size_t size = newConfig.subscriptions().size(); - bool result = true; + bsl::size_t size = newConfig.subscriptions().size(); + bool allSubscriptionsAreValid = true; for (bsl::size_t i = 0; i < size; ++i) { const mqbconfm::Expression& expression = @@ -158,20 +158,18 @@ int validateConfig(bsl::ostream& errorDescription, << expression << ", rc: " << context.lastError() << ", reason: \"" << context.lastErrorMessage() << "\" ]"; - result = false; + allSubscriptionsAreValid = false; } } } else { - if (expression.text().length()) { - errorDescription << "Invalid Expression: [ expression: " - << expression; - result = false; - } + errorDescription + << "Unsupported version: [ expression: " << expression << " ]"; + allSubscriptionsAreValid = false; } } - return result ? 0 : rc_INVALID_SUBSCRIPTION; + return allSubscriptionsAreValid ? 0 : rc_INVALID_SUBSCRIPTION; } /// Given a definition `defn` for `domain`, ensures that the values provided @@ -464,10 +462,10 @@ int Domain::configure(bsl::ostream& errorDescription, } // Validate config. Return early if the configuration is not valid. - if (int rc = validateConfig(errorDescription, - d_config, - finalConfig, - d_allocator_p)) { + if (const int rc = validateConfig(errorDescription, + d_config, + finalConfig, + d_allocator_p)) { return (rc * 10 + rc_VALIDATION_FAILED); // RETURN } diff --git a/src/integration-tests/test_auto_subscriptions.py b/src/integration-tests/test_auto_subscriptions.py index 5d31f680a0..1949babee1 100644 --- a/src/integration-tests/test_auto_subscriptions.py +++ b/src/integration-tests/test_auto_subscriptions.py @@ -419,9 +419,10 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster): ) def test_configure_invalid(self, cluster: Cluster): """ - Configure the priority queue to evaluate auto subscription negatively. - Make sure the queue does not get the message. - Make sure the same is the case after restarts. + Configure the priority queue with invalid auto subscription. + Make sure a queue fails to open. + Reconfigure the priority queue with valid auto subscription. + Make sure a queue opens successfully. """ proxies = cluster.proxy_cycle() @@ -458,9 +459,11 @@ def test_configure_invalid(self, cluster: Cluster): ) def test_reconfigure_invalid(self, cluster: Cluster): """ - Configure the priority queue to evaluate auto subscription negatively. - Make sure the queue does not get the message. - Make sure the same is the case after restarts. + Configure the priority queue with valid auto subscription. + Make sure a queue opens successfully. + Reconfigure the priority queue with valid auto subscription. + Make sure the reconfigure command fails. + Make sure a queue opens successfully. """ proxies = cluster.proxy_cycle() @@ -491,7 +494,7 @@ def test_reconfigure_invalid(self, cluster: Cluster): ] = "invalid expression" cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=None) - # TODO: why not succeed=False + cluster.last_known_leader.capture("Error processing command") # The validation fails and the domain is going to keep the old config consumer.open( From 2ad0ab43e9ce317936bd0a609983d080185f68b6 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Wed, 22 May 2024 14:24:30 -0400 Subject: [PATCH 3/4] addressing review Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/mqb/mqbblp/mqbblp_domain.cpp | 53 +++++++++++++++---------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 4e75ce8205..74d0a7f4c5 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -95,6 +95,34 @@ void afterAppIdUnregisteredDispatched(mqbi::Queue* queue, mqbi::Storage::AppIdKeyPair(appId, mqbu::StorageKey())); } +/// Validates an application subscription. +bool validdateSubscriptionExpression(bsl::ostream& errorDescription, + const mqbconfm::Expression& expression, + bslma::Allocator* allocator) +{ + if (mqbconfm::ExpressionVersion::E_VERSION_1 == expression.version()) { + if (!expression.text().empty()) { + bmqeval::CompilationContext context(allocator); + + if (!bmqeval::SimpleEvaluator::validate(expression.text(), + context)) { + errorDescription + << "Expression validation failed: [ expression: " + << expression << ", rc: " << context.lastError() + << ", reason: \"" << context.lastErrorMessage() << "\" ]"; + return false; // RETURN + } + } + } + else { + errorDescription << "Unsupported version: [ expression: " << expression + << " ]"; + return false; // RETURN + } + + return true; +} + /// Validates a domain configuration. If `previousDefn` is provided, also /// checks that the implied reconfiguration is also valid. int validateConfig(bsl::ostream& errorDescription, @@ -144,27 +172,10 @@ int validateConfig(bsl::ostream& errorDescription, bool allSubscriptionsAreValid = true; for (bsl::size_t i = 0; i < size; ++i) { - const mqbconfm::Expression& expression = - newConfig.subscriptions()[i].expression(); - - if (mqbconfm::ExpressionVersion::E_VERSION_1 == expression.version()) { - if (expression.text().length()) { - bmqeval::CompilationContext context(allocator); - - if (!bmqeval::SimpleEvaluator::validate(expression.text(), - context)) { - errorDescription - << "Expression validation failed: [ expression: " - << expression << ", rc: " << context.lastError() - << ", reason: \"" << context.lastErrorMessage() - << "\" ]"; - allSubscriptionsAreValid = false; - } - } - } - else { - errorDescription - << "Unsupported version: [ expression: " << expression << " ]"; + if (!validdateSubscriptionExpression( + errorDescription, + newConfig.subscriptions()[i].expression(), + allocator)) { allSubscriptionsAreValid = false; } } From 0c7479aa83ca61e7dccc82e3348c6a06d2942e02 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Fri, 24 May 2024 10:15:35 -0400 Subject: [PATCH 4/4] merging two tests Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- .../test_auto_subscriptions.py | 39 ++++--------------- 1 file changed, 7 insertions(+), 32 deletions(-) diff --git a/src/integration-tests/test_auto_subscriptions.py b/src/integration-tests/test_auto_subscriptions.py index 1949babee1..da3d6d72e7 100644 --- a/src/integration-tests/test_auto_subscriptions.py +++ b/src/integration-tests/test_auto_subscriptions.py @@ -417,11 +417,14 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster): } ] ) - def test_configure_invalid(self, cluster: Cluster): + def test_invalid_configuration(self, cluster: Cluster): """ - Configure the priority queue with invalid auto subscription. + Configure priority domain with invalid auto subscription. Make sure a queue fails to open. - Reconfigure the priority queue with valid auto subscription. + Reconfigure the domain with valid auto subscription. + Make sure a queue opens successfully. + Reconfigure the domain with invalid auto subscription. + Make sure the reconfigure command fails. Make sure a queue opens successfully. """ @@ -454,34 +457,6 @@ def test_configure_invalid(self, cluster: Cluster): succeed=True, ) - @tweak.domain.subscriptions( - [{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}] - ) - def test_reconfigure_invalid(self, cluster: Cluster): - """ - Configure the priority queue with valid auto subscription. - Make sure a queue opens successfully. - Reconfigure the priority queue with valid auto subscription. - Make sure the reconfigure command fails. - Make sure a queue opens successfully. - """ - - proxies = cluster.proxy_cycle() - - # 1: Setup producers and consumers - - next(proxies) - proxy = next(proxies) - - consumer = proxy.create_client("consumer") - - consumer.open( - tc.URI_PRIORITY_SC, - flags=["read"], - consumer_priority=1, - succeed=True, - ) - consumer.close( tc.URI_PRIORITY_SC, succeed=True, @@ -494,7 +469,7 @@ def test_reconfigure_invalid(self, cluster: Cluster): ] = "invalid expression" cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=None) - cluster.last_known_leader.capture("Error processing command") + assert cluster.last_known_leader.capture("Error processing command") # The validation fails and the domain is going to keep the old config consumer.open(