Skip to content

Commit

Permalink
Merge branch 'main' into do-not-record-last-confirm
Browse files Browse the repository at this point in the history
  • Loading branch information
bbpetukhov authored Oct 22, 2024
2 parents ca16e5f + 936a3ab commit c5dc9d2
Show file tree
Hide file tree
Showing 31 changed files with 835 additions and 869 deletions.
2 changes: 1 addition & 1 deletion docs/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ source 'https://rubygems.org'

# gem "rails"

gem "jekyll", "~> 4.2.2"
gem "jekyll", "~> 4.3.3"
gem "jekyll-seo-tag"
gem "jekyll-github-metadata"
gem "webrick"
132 changes: 107 additions & 25 deletions docs/docs/features/subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,55 @@ parent: Features
nav_order: 3
---

# Subscriptions (aka Topic-based Routing)
# Subscriptions
{: .no_toc }

* toc
{:toc}

## Introduction

Subscriptions provide consumer applications a powerful mechanism to express
interest in receiving only those messages which satisfy criteria specified by
them. In the absence of subscriptions, a consumer attached to a queue can
receive any and all messages posted on the queue, and should be in a position
to process all of them. In other words, the queue is viewed as a logical
stream of homogeneous data. While this may work in some or most cases, there
are scenarios where this restriction prevents a more flexible or natural
arrangement of consumer applications. For example, some users may prefer one
set of consumers to handle messages of a certain type, and another set of
consumers to handle messages of a certain other type. This is where
subscriptions come in -- they enable consumer applications to "subscribe" to
messages of a certain type, thereby *logically* converting a queue into a
stream of heterogeneous data.
Subscriptions provide consumer applications a powerful mechanism to only
receive the messages from a queue that match a specified expression.
In essence, subscriptions allow the user to achieve topic-based message
filtering and/or message routing.

In the absence of subscriptions, a consumer attached to a queue can receive
and should be able to process any and all messages posted on the queue. In
other words, the queue is viewed as a logical stream of homogeneous data.
While this may work in some or most cases, there are scenarios where this
is limiting.

For example, a user may prefer one set of consumers to handle messages of a
certain type, and another set of consumers to handle messages of a certain
other type. Or, a user may have a queue of messages that should all be
processed by some consumer applications, but certain applications may only be
interested in a certain subset of messages and want to ignore messages of a
certain type. This is where subscriptions come in -- they enable consumer
applications to "subscribe" to messages of a certain type and enable users
to filter out messages for certain applications but not others, thereby
*logically* converting a queue into a stream of heterogeneous data.

Concretely speaking, producer applications can put any interesting message
attributes in the *message properties* section of the message (*message
properties* are a list of key/value pairs that a producer can associate with a
message), and consumers can specify filters using one or more *message
properties*. For example, if a message contains these three properties:
message), and consumers can request BlazingMQ to filter messages using one or
more of those *message properties*.

For example, if a message contains these three properties:

- `CustomerId = 1234`
- `DestinationId = "ABC"`
- `OrderType = EXPRESS`

A consumer can provide a filter ("subscription expression") like so when
attaching to a queue:
A consumer can provide a filter ("subscription expression") like so to "match"
the above message:

- `CustomerId == 1234 && OrderType == EXPRESS`

In this case, a message having three properties as shown above will be routed
to the consumer with above filter (note that if a property is not specified by
the consumer, it is considered to be a wildcard).
In this case, a message having the properties as shown above will be routed
to the consumer with the above filter (note that if a property is not specified
in the subscription expression, it is considered to be a wildcard).

Similarly, users can spin up any number of consumers, each with different
filters. Users have to ensure that every message can be processed by at least
Expand All @@ -58,6 +67,66 @@ reader is familiar with various routing strategies (aka 'queue modes') as well
as general BlazingMQ terminology like *PUT*, *PUSH*, *ACK*, *CONFIRM* messages,
etc.

### Subscription Types

BlazingMQ provides two types of subscriptions:

- Application Subscriptions (message filtering)
- Consumer Subscriptions (message routing)

Users can leverage either or both types of subscriptions to achieve the desired
behavior. The two types of subscriptions are described below.

#### Application Subscriptions

Application Subscriptions provide the ability to filter out messages from an
application's queue in the BlazingMQ broker.

When a message is produced to a queue, BlazingMQ will evaluate all Application
Subscriptions and _auto-confirm_ the message on behalf of an application if
the message does not match the application's subscription expression. Since
BlazingMQ only routes unconfirmed messages to consumers, consumers will only
receive messages that match the configured Application Subscription.

Application Subscriptions are specified in the domain's configuration:

* Application Subscriptions are configured and evaluated per-*AppId* for fanout
queues.
- Note the BlazingMQ broker will still store each message until it is
confirmed by all *AppIds*, either via auto-confirm or a consumer.
* Application Subscriptions are configured with an empty *AppId* (i.e.
`appId=""`) for priority and broadcast queues. Auto-confirms apply to all
consumers of these queues.

#### Consumer Subscriptions

Consumer subscriptions allow each consumer instance to express the messages
it is capable of processing when it attaches to the queue. This allows users
to define the subset of consumers that BlazingMQ can route any given message
to.

When a message is produced to a queue, BlazingMQ will evaluate all Application
Subscriptions (as described above), and then evaluate Consumer Subscriptions
to determine which consumers are capable of processing the message. Then, all
standard routing logic (i.e. consumer priorities, round-robin, respecting
`maxUnconfirmed*` configurations) is used to deliver the message to a consumer.

Notes:

- BlazingMQ will only route a message to a consumer if the message matches that
consumer's subscription. If a consumer has no subscription, BlazingMQ can route
any message to it.

- If there is no matching consumer subscription for a message, the message will
remain in the queue, unconfirmed, until a consumer configures a subscription
matching the message. The message will count against the configured
queue/domain quota limits until it is confirmed or expires due to TTL.

- Each consumer instance can specify a different subscription.

- Users have to ensure that every message can be processed by at least
one consumer.

### Background
{:.no_toc}

Expand Down Expand Up @@ -110,6 +179,11 @@ matching subscription(s). Here’s how subscriptions work at a high level:
- Producers add any ‘interesting’ attributes of the message in its *message
properties*.

- Users specify one or more Application Subscriptions in the domain configuration
for one or more *AppIds*. Each *AppId* can have one or more boolean expression
containing one or more message properties. If there is no subscription for an
*AppId*, the application will receive all messages.

- Consumers specify one or more boolean expressions when opening the
queue. Each expression can contain one or more message properties. As an
example, an expression can look like:
Expand Down Expand Up @@ -147,18 +221,25 @@ matching subscription(s). Here’s how subscriptions work at a high level:
- Existing APIs will continue to work and consumer applications which do not
use subscriptions will not need to make any changes.

- In the BlazingMQ back-end, upon the arrival of a new message, BlazingMQ
primary node will try to match the message with a subscription and route the
message to the consumer with that subscription. See *Implementation Details*
- In the BlazingMQ back-end, upon the arrival of a new message, the BlazingMQ
primary node will first check each Application Subscription. The message
will be auto-confirmed for each application that does not have a matching
subscription. If there is a matching Application Subscription, Blazing will
then try to match the message with a consumer's subscription and route the
message to the corresponding consumer instance. See *Implementation Details*
section below for more info.

- Multiple expressions can be provided when using Application and/or Consumer
Subscriptions. The BlazingMQ primary node will check if a message matches
each provided expression, resulting in an implicit "OR" between expressions.

### Implementation Details
{:.no_toc}

The *Design* section above gives a high level overview of the feature. There
are, however, some additional details which are worth specifying.

1. **Overlapping Subscriptions**: in case if consumers specify overlapping
1. **Overlapping Consumer Subscriptions**: if consumers specify overlapping
subscriptions (e.g., `CustomerId == 0` and `CustomerId >= 0` ), BlazingMQ
will not make any attempt to merge those subscriptions, and the two
subscriptions will be treated independently of each other. **NOTE**: While
Expand Down Expand Up @@ -235,6 +316,7 @@ manipulation, as a tiny subset of the C programming language.
- Spaces, tabs and line feeds are ignored
- The language has three types: integer, string, and boolean
- The final result of an expression must be a boolean
- Limited to 128 characters in length

### Identifiers
{:.no_toc}
Expand Down
11 changes: 8 additions & 3 deletions src/applications/bmqstoragetool/m_bmqstoragetool_filemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ QueueMap FileManagerImpl::buildQueueMap(const bsl::string& cslFile,
cslFile.c_str());
bsl::string pattern(alloc);
bsl::string location(alloc);
BSLS_ASSERT(bdls::PathUtil::getBasename(&pattern, cslFile) == 0);
BSLS_ASSERT(bdls::PathUtil::getDirname(&location, cslFile) == 0);
int rc = bdls::PathUtil::getBasename(&pattern, cslFile);
BSLS_ASSERT(rc == 0);
rc = bdls::PathUtil::getDirname(&location, cslFile);
BSLS_ASSERT(rc == 0);
ledgerConfig.setLocation(location)
.setPattern(pattern)
.setMaxLogSize(fileSize)
Expand All @@ -145,7 +147,10 @@ QueueMap FileManagerImpl::buildQueueMap(const bsl::string& cslFile,

// Create and open the ledger
mqbsl::Ledger ledger(ledgerConfig, alloc);
BSLS_ASSERT(ledger.open(mqbsi::Ledger::e_READ_ONLY) == 0);
rc = ledger.open(mqbsi::Ledger::e_READ_ONLY);
BSLS_ASSERT(rc == 0);
(void)rc; // Compiler happiness

// Set guard to close the ledger
bdlb::ScopeExitAny guard(bdlf::BindUtil::bind(closeLedger, &ledger));

Expand Down
48 changes: 46 additions & 2 deletions src/groups/bmq/bmqeval/bmqeval_simpleevaluator.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,42 @@ static void test2_propertyNames()
{"aaa_111BBBaaa222__BBBaaa3_3_3BBB > 0", true},
{"B_a_1_B_a_2_B_a_3_B_a_4_B_a_5_B_ > 0", true},

// letters + dot
{"name. > 0", true},
{"NA.ME > 0", true},
{"Na.me. > 0", true},
{"name.Name > 0", true},
{"Name.Name. > 0", true},
{"n. > 0", true},
{"N. > 0", true},
{"aB.aB..aB...aB....aB..... > 0", true},
{"aaa.BBBaaa..BBBaaa...BBBaaa > 0", true},
{"B.a.B.a.B.a.B.a.B.a.B.a.B.a. > 0", true},

// letters + digits + dots
{"n1a2m3e4. > 0", true},
{"N1A2.M3E4 > 0", true},
{"N1a2.m3e4. > 0", true},
{"n1a2m3e4.N5a6m7e8 > 0", true},
{"N1a2m3e4.N5a6m7e8. > 0", true},
{"n0. > 0", true},
{"N.0 > 0", true},
{"aB1.aB..2a...B3aB4.... > 0", true},
{"aaa.111BBBaaa222..BBBaaa3.3.3BBB > 0", true},
{"B.a.1.B.a.2.B.a.3.B.a.4.B.a.5.B. > 0", true},

// letters + digits + dots + underscores
{"n1a2m3e4._ > 0", true},
{"N1A2.M3E4_ > 0", true},
{"N1a2_m3e4. > 0", true},
{"n1a2m3e4_N5a6m7e8. > 0", true},
{"N1a2m3e4.N5a6m7e8_ > 0", true},
{"n0_. > 0", true},
{"N_0. > 0", true},
{"aB1_aB._2a__.B3aB4..._ > 0", true},
{"aaa.111BBBaaa222__BBBaaa3.3_3BBB > 0", true},
{"B.a_1_B.a_2.B_a.3.B.a_4.B_a.5.B. > 0", true},

// readable examples
{"camelCase > 0", true},
{"snake_case > 0", true},
Expand All @@ -291,10 +327,11 @@ static void test2_propertyNames()
{"firmId > 0", true},
{"TheStandardAndPoor500 > 0", true},
{"SPX_IND > 0", true},
{"organization.repository > 0", true},

// all available characters
{"abcdefghijklmnopqrstuvwxyz_0123456789 > 0", true},
{"ABCDEFGHIJKLMNOPQRSTUVWXYZ_0123456789 > 0", true},
{"abcdefghijklmnopqrstuvwxyz_.0123456789 > 0", true},
{"ABCDEFGHIJKLMNOPQRSTUVWXYZ_.0123456789 > 0", true},

// negative examples
{"0name > 0", false},
Expand All @@ -307,6 +344,13 @@ static void test2_propertyNames()
{"_ > 0", false},
{"_11111111111111111111111111111 > 0", false},
{"22222222222222222222222222222_ > 0", false},
{".nameName > 0", false},
{"0.NameName > 0", false},
{".1n > 0", false},
{"1.N > 0", false},
{". > 0", false},
{".11111111111111111111111111111 > 0", false},
{"22222222222222222222222222222. > 0", false},
};
const TestParameters* testParametersEnd = testParameters +
sizeof(testParameters) /
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqeval/bmqeval_simpleevaluatorscanner.l
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
return SimpleEvaluatorParser::make_FALSE();
}

[a-zA-Z][a-zA-Z0-9_]* {
[a-zA-Z][a-zA-Z0-9_.]* {
updatePosition();
return SimpleEvaluatorParser::make_PROPERTY(yytext);
}
Expand Down
11 changes: 8 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
bslma::Allocator* storageManagerAllocator = d_allocators.get(
"StorageManager");

// Make a temporal pointer to the dispatcher to avoid a false-positive
// "uninitialized variable" warning
mqbi::Dispatcher* clusterDispatcher = dispatcher();

// Start the StorageManager
d_storageManager_mp.load(
isFSMWorkflow()
Expand All @@ -185,7 +189,7 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
&d_clusterData,
d_state,
d_clusterData.domainFactory(),
dispatcher(),
clusterDispatcher,
k_PARTITION_FSM_WATCHDOG_TIMEOUT_DURATION,
bdlf::BindUtil::bind(&Cluster::onRecoveryStatus,
this,
Expand Down Expand Up @@ -218,7 +222,7 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
bdlf::PlaceHolders::_2, // status
bdlf::PlaceHolders::_3), // primary leaseId
d_clusterData.domainFactory(),
dispatcher(),
clusterDispatcher,
&d_clusterData.miscWorkThreadPool(),
storageManagerAllocator)),
storageManagerAllocator);
Expand Down Expand Up @@ -3853,7 +3857,8 @@ void Cluster::getPartitionPrimaryNode(int* rc,
d_state.partitions();

// Check boundary conditions for partitionId
if (partitionId < 0 || partitionId >= partitions.size()) {
if (partitionId < 0 ||
static_cast<int>(partitions.size()) <= partitionId) {
errorDescription << "Invalid partition id: " << partitionId;
*rc = rc_ERROR;
return; // RETURN
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,14 @@ ClusterCatalog::ClusterCatalog(mqbi::Dispatcher* dispatcher,
, d_reversedClusterConnections(d_allocator_p)
, d_clusters(d_allocator_p)
, d_statContexts(statContexts)
, d_resources(resources)
, d_adminCb()
, d_requestManager(bmqp::EventType::e_CONTROL,
resources.bufferFactory(),
resources.scheduler(),
false, // lateResponseMode
d_allocator_p)
, d_stopRequestsManager(&d_requestManager, d_allocator_p)
, d_resources(resources)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_resources.scheduler()->clockType() ==
Expand Down
Loading

0 comments on commit c5dc9d2

Please sign in to comment.