Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed May 20, 2024
1 parent 7407f7d commit e62456e
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/groups/mqb/mqba/mqba_domainmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bslmt_mutex.h>
#include <bsls_atomic.h>
#include <bsls_cpp11.h>

namespace BloombergLP {
Expand Down
26 changes: 25 additions & 1 deletion src/groups/mqb/mqba/mqba_domainresolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ const bsls::TimeInterval k_DIR_CHECK_TTL = bsls::TimeInterval(60.0);

void DomainResolver::updateTimestamps()
{
// executed by the thread that holds the 'd_mutex'

BSLMT_MUTEXASSERT_IS_LOCKED_SAFE(&d_mutex); // mutex LOCKED

const bsls::TimeInterval now = mwcsys::Time::nowRealtimeClock();

if (now <= d_timestampsValidUntil) {
Expand All @@ -85,6 +89,8 @@ void DomainResolver::updateTimestamps()
bool DomainResolver::cacheLookup(mqbconfm::DomainResolver* out,
const bslstl::StringRef& domainName)
{
// executed by the thread that holds the 'd_mutex'

BSLMT_MUTEXASSERT_IS_LOCKED_SAFE(&d_mutex); // mutex LOCKED

CacheMap::const_iterator it = d_cache.find(domainName);
Expand All @@ -98,7 +104,7 @@ bool DomainResolver::cacheLookup(mqbconfm::DomainResolver* out,
// member is different than the 'd_lastCfgDirTimestamp'.
//
// NOTE: the caller must call 'updateTimestamps()' to update the
// d_lastCfgDirTimestamp' prior to calling this method.
// 'd_lastCfgDirTimestamp' prior to calling this method.
if (it->second.d_cfgDirTimestamp != d_lastCfgDirTimestamp) {
// Stale entry, clear from the map
d_cache.erase(it);
Expand All @@ -113,6 +119,8 @@ int DomainResolver::getOrRead(bsl::ostream& errorDescription,
mqbconfm::DomainResolver* out,
const bslstl::StringRef& domainName)
{
// executed by *ANY* thread

bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // mutex LOCKED

// Make sure we have the latest script timestamp
Expand Down Expand Up @@ -269,6 +277,8 @@ DomainResolver::~DomainResolver()

int DomainResolver::start(bsl::ostream& errorDescription)
{
// executed by *ANY* thread

// Verify that the config directory exists
const mqbcfg::AppConfig& brkrCfg = mqbcfg::BrokerConfig::get();
if (!bdls::FilesystemUtil::exists(brkrCfg.etcDir())) {
Expand All @@ -282,13 +292,16 @@ int DomainResolver::start(bsl::ostream& errorDescription)

void DomainResolver::stop()
{
// executed by *ANY* thread
// NOTHING
}

bmqp_ctrlmsg::Status
DomainResolver::getOrReadDomain(mqbconfm::DomainResolver* out,
const bslstl::StringRef& domainName)
{
// executed by *ANY* thread

mwcu::MemOutStream errorDescription;

int rc = getOrRead(errorDescription, out, domainName);
Expand All @@ -307,6 +320,8 @@ void DomainResolver::qualifyDomain(
const bslstl::StringRef& domainName,
const mqbi::DomainFactory::QualifiedDomainCb& callback)
{
// executed by *ANY* thread

mqbconfm::DomainResolver response;
bmqp_ctrlmsg::Status status = getOrReadDomain(&response, domainName);

Expand All @@ -316,6 +331,8 @@ void DomainResolver::qualifyDomain(
void DomainResolver::locateDomain(const bslstl::StringRef& domainName,
const LocateDomainCb& callback)
{
// executed by *ANY* thread

mqbconfm::DomainResolver response;
bmqp_ctrlmsg::Status status = getOrReadDomain(&response, domainName);

Expand All @@ -324,6 +341,8 @@ void DomainResolver::locateDomain(const bslstl::StringRef& domainName,

void DomainResolver::clearCache(const bslstl::StringRef& domainName)
{
// executed by *ANY* thread

bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // mutex LOCKED

if (domainName.length() == 0) {
Expand All @@ -343,6 +362,11 @@ int DomainResolver::processCommand(
const mqbcmd::DomainResolverCommand& command,
mqbcmd::Error* error)
{
// executed by *ANY* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(error);

if (command.isClearCacheValue()) {
if (command.clearCache().isAllValue()) {
clearCache();
Expand Down
15 changes: 4 additions & 11 deletions src/integration-tests/test_admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,13 @@ def test_admin_encoding(single_node: Cluster) -> None:
- Commands with incorrect encoding are handled with decode error.
- Commands without encoding return text output for backward compatibility.
"""
host, port = single_node.admin_endpoint

def is_compact(json_str: str) -> bool:
return " " not in json_str

# Start the admin client
admin = AdminClient()
admin.connect(host, port)
admin.connect(*single_node.admin_endpoint)

# Stage 1: encode as TEXT
cmds = [json.dumps({"help": {}, "encoding": "TEXT"}), "ENCODING TEXT HELP"]
Expand Down Expand Up @@ -220,11 +219,9 @@ def test_purge_breathing(single_node: Cluster) -> None:
proxy = next(proxies)
producer: Client = proxy.create_client("producer")

host, port = cluster.admin_endpoint

# Start the admin client
admin = AdminClient()
admin.connect(host, port)
admin.connect(*cluster.admin_endpoint)

# Stage 1: purge PRIORITY queue
for i in range(1, 6):
Expand Down Expand Up @@ -362,11 +359,9 @@ def test_purge_inactive(single_node: Cluster) -> None:
post_n_msgs(producer, task, posted_fanout)
producer.stop()

host, port = cluster.admin_endpoint

# Start the admin client.
admin = AdminClient()
admin.connect(host, port)
admin.connect(*cluster.admin_endpoint)

# Stage 2: PRIORITY purge

Expand Down Expand Up @@ -461,11 +456,9 @@ def test_commands_on_non_existing_domain(single_node: Cluster) -> None:
"""
cluster: Cluster = single_node

host, port = cluster.admin_endpoint

# Start the admin client
admin = AdminClient()
admin.connect(host, port)
admin.connect(*cluster.admin_endpoint)

# Stage 1: send commands to domains existing on disk but not yet loaded to the broker
# Note that we use different domains for each test case, because we want to check each
Expand Down
4 changes: 1 addition & 3 deletions src/integration-tests/test_reconfigure_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,8 @@ def run_consumers(max_attempts: int) -> int:

# Stage 3: reconfigure maxDeliveryAttempts to enable poison pill detection.
# Use an admin session to validate that the setting change reached the broker.
host, port = cluster.admin_endpoint

admin = AdminClient()
admin.connect(host, port)
admin.connect(*cluster.admin_endpoint)

res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS")
assert '"maxDeliveryAttempts" : 0' in res
Expand Down
2 changes: 1 addition & 1 deletion src/python/blazingmq/dev/it/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def name(self) -> str:
return self.config.name

@property
def admin_endpoint(self) -> Tuple[Optional[str], Optional[int]]:
def admin_endpoint(self) -> Union[Tuple[str, int], Tuple[None, None]]:
"""
Return a tuple containing (host, port) of an admin endpoint of this cluster, if the
admin endpoint is not decided, return (None, None) tuple
Expand Down

0 comments on commit e62456e

Please sign in to comment.