Skip to content

Commit

Permalink
Set subscription type according to related parameters (#592)
Browse files Browse the repository at this point in the history
* Set subscription type according to related parameters

* Generate charts

* Remove hard cord

* Pass retainOrdering&retainKeyOrdering to sink details
  • Loading branch information
jiangpengcheng committed Mar 3, 2023
1 parent b2b6b17 commit f2e72b3
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 4 deletions.
1 change: 1 addition & 0 deletions api/compute/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type SinkSpec struct {
MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"`
ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"`
RetainOrdering bool `json:"retainOrdering,omitempty"`
RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"`
DeadLetterTopic string `json:"deadLetterTopic,omitempty"`

RuntimeFlags string `json:"runtimeFlags,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6433,6 +6433,8 @@ spec:
x-kubernetes-int-or-string: true
type: object
type: object
retainKeyOrdering:
type: boolean
retainOrdering:
type: boolean
runtimeFlags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3167,6 +3167,8 @@ spec:
x-kubernetes-int-or-string: true
type: object
type: object
retainKeyOrdering:
type: boolean
retainOrdering:
type: boolean
runtimeFlags:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functionmeshes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6434,6 +6434,8 @@ spec:
x-kubernetes-int-or-string: true
type: object
type: object
retainKeyOrdering:
type: boolean
retainOrdering:
type: boolean
runtimeFlags:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_sinks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3146,6 +3146,8 @@ spec:
x-kubernetes-int-or-string: true
type: object
type: object
retainKeyOrdering:
type: boolean
retainOrdering:
type: boolean
runtimeFlags:
Expand Down
20 changes: 16 additions & 4 deletions controllers/spec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ import (
)

func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails {
runtime := proto.FunctionDetails_JAVA
if function.Spec.Golang != nil {
runtime = proto.FunctionDetails_GO
} else if function.Spec.Python != nil {
runtime = proto.FunctionDetails_PYTHON
}
fd := &proto.FunctionDetails{
Tenant: function.Spec.Tenant,
Namespace: function.Spec.Namespace,
Expand All @@ -43,7 +49,7 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails
LogTopic: function.Spec.LogTopic,
ProcessingGuarantees: convertProcessingGuarantee(function.Spec.ProcessingGuarantee),
UserConfig: getUserConfig(generateFunctionConfig(function)),
Runtime: proto.FunctionDetails_JAVA,
Runtime: runtime,
AutoAck: getBoolFromPtrOrDefault(function.Spec.AutoAck, true),
Parallelism: getInt32FromPtrOrDefault(function.Spec.Replicas, 1),
Source: generateFunctionInputSpec(function),
Expand Down Expand Up @@ -185,7 +191,7 @@ func generateFunctionInputSpec(function *v1alpha1.Function) *proto.SourceSpec {
ClassName: "",
Configs: "",
TypeClassName: function.Spec.Input.TypeClassName,
SubscriptionType: proto.SubscriptionType_SHARED,
SubscriptionType: getSubscriptionType(function.Spec.RetainOrdering, function.Spec.RetainKeyOrdering, function.Spec.ProcessingGuarantee),
InputSpecs: inputSpecs,
TimeoutMs: uint64(function.Spec.Timeout),
Builtin: "",
Expand Down Expand Up @@ -320,6 +326,8 @@ func convertSinkDetails(sink *v1alpha1.Sink) *proto.FunctionDetails {
RetryDetails: generateRetryDetails(sink.Spec.MaxMessageRetry, sink.Spec.DeadLetterTopic),
RuntimeFlags: sink.Spec.RuntimeFlags,
ComponentType: proto.FunctionDetails_SINK,
RetainOrdering: sink.Spec.RetainOrdering,
RetainKeyOrdering: sink.Spec.RetainKeyOrdering,
}

if sink.Spec.SecretsMap != nil {
Expand All @@ -334,7 +342,7 @@ func generateSinkInputSpec(sink *v1alpha1.Sink) *proto.SourceSpec {

return &proto.SourceSpec{
TypeClassName: sink.Spec.Input.TypeClassName,
SubscriptionType: getSubscriptionType(sink.Spec.RetainOrdering, sink.Spec.ProcessingGuarantee),
SubscriptionType: getSubscriptionType(sink.Spec.RetainOrdering, sink.Spec.RetainKeyOrdering, sink.Spec.ProcessingGuarantee),
InputSpecs: inputSpecs,
TimeoutMs: uint64(sink.Spec.Timeout),
SubscriptionName: sink.Spec.SubscriptionName,
Expand All @@ -344,11 +352,15 @@ func generateSinkInputSpec(sink *v1alpha1.Sink) *proto.SourceSpec {
}
}

func getSubscriptionType(retainOrdering bool, processingGuarantee v1alpha1.ProcessGuarantee) proto.SubscriptionType {
func getSubscriptionType(retainOrdering bool, retainKeyOrdering bool, processingGuarantee v1alpha1.ProcessGuarantee) proto.SubscriptionType {
if retainOrdering || processingGuarantee == v1alpha1.EffectivelyOnce {
return proto.SubscriptionType_FAILOVER
}

if retainKeyOrdering {
return proto.SubscriptionType_KEY_SHARED
}

return proto.SubscriptionType_SHARED
}

Expand Down

0 comments on commit f2e72b3

Please sign in to comment.