Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Set/get topic schema compatibility strategy #784

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ The following is an incomplete list of features that are not yet implemented:

## Different With Java Pulsar Admin

We move the subscription commands from the topics to the subscriptions in pulsarctl.
Subscription commands have moved from the `topics` category to the `subscriptions` category in `pulsarctl`.

| pulsar-admin | pulsarctl |
| ------------ | --------- |
Expand All @@ -173,6 +173,13 @@ We move the subscription commands from the topics to the subscriptions in pulsar
| bin/pulsar-admin topics reset-cursor | pulsarctl subscription seek |
| bin/pulsar-admin topics subscriptions | pulsarctl subscription list |

Topic schema compatibility commands have moved from the `topicPolicies` category to the `topic` category in `pulsarctl`.

| pulsar-admin | pulsarctl |
| ------------ | --------- |
| bin/pulsar-admin topicPolicies set-schema-compatibility-strategy | pulsarctl topic set-schema-compatibility-strategy |
| bin/pulsar-admin topicPolicies get-schema-compatibility-strategy | pulsarctl topic get-schema-compatibility-strategy |

## Contribute

Contributions are welcomed and greatly appreciated.
Expand Down
30 changes: 30 additions & 0 deletions pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pulsar
import (
"fmt"
"strconv"
"strings"

"github.com/streamnative/pulsarctl/pkg/pulsar/common"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
Expand Down Expand Up @@ -224,6 +225,15 @@ type Topics interface {

// SetInactiveTopicPolicies sets the inactive topic policies on a topic
SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error

// SetSchemaCompatibilityStrategy sets the strategy used to check the a new schema provided
// by a producer is compatible with the current schema before it is installed
SetSchemaCompatibilityStrategy(topic utils.TopicName,
strategy utils.SchemaCompatibilityStrategy) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, could you explain a little more? Are the values in each expected to differ? Or is a separate strategy required for another reason?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are different values here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll take a look


// GetSchemaCompatibilityStrategy returns the strategy used to check the a new schema provided
// by a producer is compatible with the current schema before it is installed
GetSchemaCompatibilityStrategy(topic utils.TopicName) (utils.SchemaCompatibilityStrategy, error)
}

type topics struct {
Expand Down Expand Up @@ -696,3 +706,23 @@ func (t *topics) SetInactiveTopicPolicies(topic utils.TopicName, data utils.Inac
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
return t.pulsar.Client.Post(endpoint, data)
}

func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
strategy utils.SchemaCompatibilityStrategy) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.String(), "schemaCompatibilityStrategy")
return t.pulsar.Client.Put(endpoint, strategy.String())
}

func (t *topics) GetSchemaCompatibilityStrategy(topic utils.TopicName) (
utils.SchemaCompatibilityStrategy, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.String(), "schemaCompatibilityStrategy")
b, err := t.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
if err != nil {
return "", err
}
s, err := utils.ParseSchemaAutoUpdateCompatibilityStrategy(strings.ReplaceAll(string(b), "\"", ""))
if err != nil {
return "", err
}
return s, nil
}