forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve][pip] PIP-376: Make topic policies service pluggable
- Loading branch information
1 parent
a678e97
commit 21128e7
Showing
1 changed file
with
138 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
# PIP-376: Make topic policies service pluggable | ||
|
||
# Background knowledge | ||
|
||
## Topic policies service and system topics | ||
|
||
[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events) introduces system topics and the topic level policies. However, the topic policies service (`TopicPoliciesService`) only has one implementation (`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So the following configs are both required (though they're all enabled by default now): | ||
|
||
```properties | ||
systemTopicEnabled=true | ||
topicLevelPoliciesEnabled=true | ||
``` | ||
|
||
However, if the Pulsar storage is switched to a S3-based solution (by modifying the `managedLedgerStorageClassName` config), using system topics to manage topic policies could have low performance (due to the S3 write and read latency) and higher cost (due to redundant S3 API calls). | ||
|
||
## Badly designed TopicPoliciesService interface | ||
|
||
The `TopicPoliciesService` interface is a terrible abstraction because it's never designed for 3rd party implementations. | ||
|
||
1. Methods that should not be exposed | ||
|
||
`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only used internally in `SystemTopicBasedTopicPoliciesService`. | ||
|
||
`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just creates a reader to replay the `__change_events` topic and construct the topic policies map. | ||
|
||
2. Confusing and inconsistent `getTopicPolicies` family | ||
|
||
There are two overrides of `getTopicPolicies`: | ||
|
||
```java | ||
TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; | ||
TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException; | ||
``` | ||
|
||
The 2nd method is equivalent to `getTopicPolicies(topicName, false)`. | ||
|
||
The semantics of these two methods are very intuitive. First, they are not synchronous methods that are blocked by waiting a future. They just start an asynchronous policies initialization (creating a reader to replay the `__change_events` topic), and then try to get the policies from the cache. If the asynchronous policies initialization didn't start, just throw `TopicPoliciesCacheNotInitException`. | ||
|
||
As you can see, these two methods are hard to use. And they are also only used in tests except for the `getTopicPoliciesAsyncWithRetry` method, which uses a user-provided executor and backoff policy to call `getTopicPolicies` until `TopicPoliciesCacheNotInitException` is not thrown: | ||
|
||
```java | ||
default CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName, | ||
final Backoff backoff, ScheduledExecutorService scheduledExecutorService, boolean isGlobal) { | ||
``` | ||
|
||
The `getTopicPolicies` overrides are only called in tests while `getTopicPoliciesAsyncWithRetry` is used in the core. It would be very confusing to users that want to implement their own topic policies service. They have to look deeply into the Pulsar's source code to know these details. | ||
https://github.com/apache/pulsar/pull/21231 adds two asynchronous overrides that are much more friendly to users: | ||
```java | ||
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal); | ||
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName); | ||
``` | ||
Now we have **5** asynchronous get methods. What's worse, unlike `getTopicPolicies`, `getTopicPoliciesAsync(topic)` is not equivalent to `getTopicPoliciesAsync(topic, false)`, instead, | ||
- `getTopicPoliciesAsync(topic)` will try getting local policies first, if absent, then try getting global policies | ||
- `getTopicPoliciesAsync(topic, true)` will try getting global policies | ||
- `getTopicPoliciesAsync(topic, false)` will try getting local policies | ||
|
||
It should be noted that the topic policies support global policies across clusters since [#12517](https://github.com/apache/pulsar/pull/12517). So there are local policies and global policies. | ||
|
||
Currently, | ||
- `getTopicPoliciesAsync(TopicName)` is used in `BrokerService#getTopicPoliciesBypassSystemTopic`, which is called when initializing the topic policies of `PersistentTopic` objects. So it uses the "local-first" semantics in case the global policies or local policies is deleted. | ||
- `getTopicPoliciesAsyncWithRetry` is used in `AdminResource#getTopicPoliciesAsyncWithRetry`, which is called for all topic policies admin APIs. Since these admin APIs all have a `isGlobal` field to indicate whether to get the global policies, it uses the "local only" or "global only" semantics. | ||
- Other methods are never called directly other than tests. | ||
|
||
Actually there is the 6th method `getTopicPoliciesIfExists`, which just tries to get the local topic policies from the cache. This method is the most clear and simple in all these stuffs. | ||
|
||
```java | ||
TopicPolicies getTopicPoliciesIfExists(TopicName topicName); | ||
``` | ||
|
||
# Motivation | ||
|
||
Make `TopicPoliciesService` pluggable so that users can customize topic policies service via another backend metadata store. | ||
|
||
# Goals | ||
|
||
## In Scope | ||
|
||
Redesign a clear and simple `TopicPoliciesService` interface for users to customize. | ||
|
||
# High Level Design | ||
|
||
Add a `topicPoliciesServiceClassName` config to specify the topic policies service class name. If the class name is not the default `SystemTopicBasedTopicPoliciesService`, `systemTopicEnabled` will not be required unless the implementation requires it. | ||
|
||
# Detailed Design | ||
|
||
## Design & Implementation Details | ||
|
||
1. Add a unified method to get topic policies. | ||
|
||
```java | ||
enum GetType { | ||
LOCAL_FIRST, // try getting the local topic policies, if not present, then get the global policies | ||
GLOBAL_ONLY, // only get the global policies | ||
LOCAL_ONLY, // only get the local policies | ||
} | ||
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type); | ||
``` | ||
|
||
`getTopicPoliciesAsyncWithRetry` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_ONLY)` or `getTopicPoliciesAsync(topicName, GLOBAL_ONLY)`. Other two original `getTopicPoliciesAsync` methods will be removed and replaced by `getTopicPoliciesAsync(topicName, LOCAL_FIRST)`. | ||
|
||
2. Move `addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` to private methods of `SystemTopicBasedTopicPoliciesService`. | ||
|
||
3. Add a `TestUtils` class in tests to include `getTopicPolicies` and `getTopicPoliciesBypassCacheAsync` methods. | ||
|
||
4. The generic parameter is removed from `TopicPolicyListener` because the value type should always be `TopicPolicies`. Also, mark this listener interface as `Stable`. | ||
|
||
5. Add a `PulserService` parameter to the `start` method so that the implementation can implement a constructor with empty parameter list and get the `PulsarService` instance from the `start` method. | ||
|
||
6. Add a `boolean` return value to `registerListener` since `PersistentTopic#initTopicPolicy` checks if the topic policies is enabled, while we can just use the return value to indicate if the `TopicPoliciesService` instance is `topicPoliciesServiceClassName.DISABLED`. | ||
|
||
Since now the topic policies service is decoupled with system topics, remove all `isSystemTopicAndTopicLevelPoliciesEnabled()` calls. | ||
|
||
### Configuration | ||
|
||
New config `topicLevelPoliciesEnabled` will be added. | ||
|
||
# Backward & Forward Compatibility | ||
|
||
If the downstream application needs to call APIs from `TopicPoliciesService`, it should modify the code to use the new API. | ||
|
||
# Alternatives | ||
|
||
## Keep the `TopicPoliciesService` interface compatible. | ||
|
||
This interface was badly designed because it has only one implementation. Keeping these methods here will be a burden for developers to develop a customized interface. They need to know where these confusing methods are called and need to take them very carefully. | ||
|
||
# General Notes | ||
|
||
# Links | ||
|
||
<!-- | ||
Updated afterwards | ||
--> | ||
* Mailing List discussion thread: | ||
* Mailing List voting thread: |