-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fail reconfigure cmd on invalid auto subscription #298
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -686,19 +686,25 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, | |
} | ||
} | ||
|
||
DecodeAndUpsertValue unused; | ||
DecodeAndUpsertValue configureResult; | ||
d_configProvider_p->clearCache(domainSp->name()); | ||
d_configProvider_p->getDomainConfig( | ||
domainSp->name(), | ||
bdlf::BindUtil::bind( | ||
&DomainManager::decodeAndUpsert, | ||
this, | ||
&unused, | ||
&configureResult, | ||
bdlf::PlaceHolders::_1, // configProviderStatus | ||
bdlf::PlaceHolders::_2, // configProviderResult | ||
domainSp->name(), | ||
domainSp->cluster()->name())); | ||
result->makeSuccess(); | ||
|
||
if (configureResult.isError()) { | ||
result->makeError().message() = configureResult.error().d_details; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we |
||
} | ||
else { | ||
result->makeSuccess(); | ||
} | ||
return 0; // RETURN | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -99,14 +99,16 @@ void afterAppIdUnregisteredDispatched(mqbi::Queue* queue, | |||||
/// checks that the implied reconfiguration is also valid. | ||||||
int validateConfig(bsl::ostream& errorDescription, | ||||||
const bdlb::NullableValue<mqbconfm::Domain>& previousDefn, | ||||||
const mqbconfm::Domain& newConfig) | ||||||
const mqbconfm::Domain& newConfig, | ||||||
bslma::Allocator* allocator) | ||||||
{ | ||||||
enum RcEnum { | ||||||
// Value for the various RC error categories | ||||||
rc_SUCCESS = 0, | ||||||
rc_NON_BLOOMBERG_CFG = -1, | ||||||
rc_CHANGED_DOMAIN_MODE = -2, | ||||||
rc_CHANGED_STORAGE_TYPE = -3 | ||||||
rc_CHANGED_STORAGE_TYPE = -3, | ||||||
rc_INVALID_SUBSCRIPTION = -4 | ||||||
}; | ||||||
|
||||||
if (previousDefn.isNull()) { | ||||||
|
@@ -136,7 +138,40 @@ int validateConfig(bsl::ostream& errorDescription, | |||||
return rc_CHANGED_STORAGE_TYPE; // RETURN | ||||||
} | ||||||
|
||||||
return 0; | ||||||
// Validate newConfig.subscriptions() | ||||||
|
||||||
bsl::size_t size = newConfig.subscriptions().size(); | ||||||
bool result = true; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rather rename the flag to make its meaning more obvious:
Suggested change
The problem with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, let's be more implicit then, like |
||||||
|
||||||
for (bsl::size_t i = 0; i < size; ++i) { | ||||||
const mqbconfm::Expression& expression = | ||||||
newConfig.subscriptions()[i].expression(); | ||||||
|
||||||
if (mqbconfm::ExpressionVersion::E_VERSION_1 == expression.version()) { | ||||||
if (expression.text().length()) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I would prefer this for clarity and also in general for collection types it's usually much more efficient to check if a collection is empty than get its length. |
||||||
bmqeval::CompilationContext context(allocator); | ||||||
|
||||||
if (!bmqeval::SimpleEvaluator::validate(expression.text(), | ||||||
context)) { | ||||||
errorDescription | ||||||
<< "Expression validation failed: [ expression: " | ||||||
<< expression << ", rc: " << context.lastError() | ||||||
<< ", reason: \"" << context.lastErrorMessage() | ||||||
<< "\" ]"; | ||||||
result = false; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I would prefer to validate and report everything |
||||||
} | ||||||
} | ||||||
} | ||||||
else { | ||||||
if (expression.text().length()) { | ||||||
errorDescription << "Invalid Expression: [ expression: " | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would rather leave a more explicit comment, so our users can clearly understand why it fails here Something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the second thought, let's reject unsupported versions. |
||||||
<< expression; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
result = false; | ||||||
} | ||||||
} | ||||||
} | ||||||
hallfox marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
return result ? 0 : rc_INVALID_SUBSCRIPTION; | ||||||
} | ||||||
|
||||||
/// Given a definition `defn` for `domain`, ensures that the values provided | ||||||
|
@@ -429,7 +464,10 @@ int Domain::configure(bsl::ostream& errorDescription, | |||||
} | ||||||
|
||||||
// Validate config. Return early if the configuration is not valid. | ||||||
if (int rc = validateConfig(errorDescription, d_config, finalConfig)) { | ||||||
if (int rc = validateConfig(errorDescription, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
d_config, | ||||||
finalConfig, | ||||||
d_allocator_p)) { | ||||||
return (rc * 10 + rc_VALIDATION_FAILED); // RETURN | ||||||
} | ||||||
|
||||||
|
@@ -514,6 +552,7 @@ int Domain::configure(bsl::ostream& errorDescription, | |||||
<< " queues from " | ||||||
"domain " | ||||||
<< d_name; | ||||||
|
||||||
QueueMap::iterator it = d_queues.begin(); | ||||||
for (; it != d_queues.end(); it++) { | ||||||
bsl::function<int()> reconfigureQueueFn = bdlf::BindUtil::bind( | ||||||
|
@@ -525,8 +564,7 @@ int Domain::configure(bsl::ostream& errorDescription, | |||||
d_dispatcher_p->execute(reconfigureQueueFn, cluster()); | ||||||
} | ||||||
} | ||||||
BALL_LOG_INFO << "Domain '" << d_name << "' successfully " | ||||||
<< (isReconfigure ? "reconfigured" : "configured"); | ||||||
// 'wait==false', so the result of reconfiguration is not known | ||||||
|
||||||
d_state = e_STARTED; | ||||||
return rc_SUCCESS; | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -408,3 +408,95 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster): | |
assert len(self.consumer.list(block=True)) == 0 | ||
assert len(self.consumer_bar.list(block=True)) == 0 | ||
assert len(self.consumer_baz.list(block=True)) == 0 | ||
|
||
@tweak.domain.subscriptions( | ||
[ | ||
{ | ||
"appId": "", | ||
"expression": {"version": "E_VERSION_1", "text": "invalid expression"}, | ||
} | ||
] | ||
) | ||
def test_configure_invalid(self, cluster: Cluster): | ||
""" | ||
Configure the priority queue to evaluate auto subscription negatively. | ||
Make sure the queue does not get the message. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For these both tests, there are no messages posted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not much of a value, I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then we should probably remove mentions of messages from these tests' docs |
||
Make sure the same is the case after restarts. | ||
""" | ||
|
||
proxies = cluster.proxy_cycle() | ||
|
||
# 1: Setup producers and consumers | ||
|
||
next(proxies) | ||
proxy = next(proxies) | ||
|
||
consumer = proxy.create_client("consumer") | ||
|
||
consumer.open( | ||
tc.URI_PRIORITY_SC, | ||
flags=["read"], | ||
consumer_priority=1, | ||
succeed=False, | ||
) | ||
|
||
cluster.config.domains[ | ||
tc.DOMAIN_PRIORITY_SC | ||
].definition.parameters.subscriptions[0]["expression"]["text"] = "x==1" | ||
|
||
cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True) | ||
|
||
consumer.open( | ||
tc.URI_PRIORITY_SC, | ||
flags=["read"], | ||
consumer_priority=1, | ||
succeed=True, | ||
) | ||
|
||
@tweak.domain.subscriptions( | ||
[{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}] | ||
) | ||
def test_reconfigure_invalid(self, cluster: Cluster): | ||
""" | ||
Configure the priority queue to evaluate auto subscription negatively. | ||
Make sure the queue does not get the message. | ||
Make sure the same is the case after restarts. | ||
""" | ||
|
||
proxies = cluster.proxy_cycle() | ||
|
||
# 1: Setup producers and consumers | ||
|
||
next(proxies) | ||
proxy = next(proxies) | ||
|
||
consumer = proxy.create_client("consumer") | ||
|
||
consumer.open( | ||
tc.URI_PRIORITY_SC, | ||
flags=["read"], | ||
consumer_priority=1, | ||
succeed=True, | ||
) | ||
|
||
consumer.close( | ||
tc.URI_PRIORITY_SC, | ||
succeed=True, | ||
) | ||
|
||
cluster.config.domains[ | ||
tc.DOMAIN_PRIORITY_SC | ||
].definition.parameters.subscriptions[0]["expression"][ | ||
"text" | ||
] = "invalid expression" | ||
|
||
cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=None) | ||
# TODO: why not succeed=False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a problem with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we want to make sure the command fails (so the whoever has issued the command can roll back)
|
||
|
||
# The validation fails and the domain is going to keep the old config | ||
consumer.open( | ||
tc.URI_PRIORITY_SC, | ||
flags=["read"], | ||
consumer_priority=1, | ||
succeed=True, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to do anything, just my observation here
Note that docs for
getDomainConfig
are incorrect. They state that the method is asynchronous:In fact, this method is synchronous and thread safe, protected by mutex.
It means that you are safe to access
configureResult
just after thegetDomainConfig
call because you are guaranteed that the method was already executed.I will update the docs later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that
Domain::configure
callsmqbi::Queue::configure
withfalse
as thewait
argument.Even if it was
true
, we call it from the cluster thread and do notsynchronize
. There are 3 threads involved here.The validation is synchronous, yes. The reconfiguration of queues is asynchronous.