From f2e72b311b2f673f4bc2c285b6db21034cae15bd Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Thu, 2 Mar 2023 01:35:53 +0800 Subject: [PATCH] Set subscription type according to related parameters (#592) * Set subscription type according to related parameters * Generate charts * Remove hard cord * Pass retainOrdering&retainKeyOrdering to sink details --- api/compute/v1alpha1/sink_types.go | 1 + ...ompute.functionmesh.io-functionmeshes.yaml | 2 ++ .../crd-compute.functionmesh.io-sinks.yaml | 2 ++ ...ompute.functionmesh.io_functionmeshes.yaml | 2 ++ .../bases/compute.functionmesh.io_sinks.yaml | 2 ++ controllers/spec/utils.go | 20 +++++++++++++++---- 6 files changed, 25 insertions(+), 4 deletions(-) diff --git a/api/compute/v1alpha1/sink_types.go b/api/compute/v1alpha1/sink_types.go index 8ce4a79ee..28d4df0b7 100644 --- a/api/compute/v1alpha1/sink_types.go +++ b/api/compute/v1alpha1/sink_types.go @@ -64,6 +64,7 @@ type SinkSpec struct { MaxMessageRetry int32 `json:"maxMessageRetry,omitempty"` ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` RetainOrdering bool `json:"retainOrdering,omitempty"` + RetainKeyOrdering bool `json:"retainKeyOrdering,omitempty"` DeadLetterTopic string `json:"deadLetterTopic,omitempty"` RuntimeFlags string `json:"runtimeFlags,omitempty"` diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml index 5c7ef4778..ded8f38c1 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml @@ -6433,6 +6433,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + retainKeyOrdering: + type: boolean retainOrdering: type: boolean runtimeFlags: diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml index e25bbb279..8ac390ddb 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml @@ -3167,6 +3167,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + retainKeyOrdering: + type: boolean retainOrdering: type: boolean runtimeFlags: diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 8ce5ff112..ae74be8e8 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -6434,6 +6434,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + retainKeyOrdering: + type: boolean retainOrdering: type: boolean runtimeFlags: diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 303c8d708..430292265 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -3146,6 +3146,8 @@ spec: x-kubernetes-int-or-string: true type: object type: object + retainKeyOrdering: + type: boolean retainOrdering: type: boolean runtimeFlags: diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index e5bbb76c8..9dc85f903 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -35,6 +35,12 @@ import ( ) func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails { + runtime := proto.FunctionDetails_JAVA + if function.Spec.Golang != nil { + runtime = proto.FunctionDetails_GO + } else if function.Spec.Python != nil { + runtime = proto.FunctionDetails_PYTHON + } fd := &proto.FunctionDetails{ Tenant: function.Spec.Tenant, Namespace: function.Spec.Namespace, @@ -43,7 +49,7 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails LogTopic: function.Spec.LogTopic, ProcessingGuarantees: convertProcessingGuarantee(function.Spec.ProcessingGuarantee), UserConfig: getUserConfig(generateFunctionConfig(function)), - Runtime: proto.FunctionDetails_JAVA, + Runtime: runtime, AutoAck: getBoolFromPtrOrDefault(function.Spec.AutoAck, true), Parallelism: getInt32FromPtrOrDefault(function.Spec.Replicas, 1), Source: generateFunctionInputSpec(function), @@ -185,7 +191,7 @@ func generateFunctionInputSpec(function *v1alpha1.Function) *proto.SourceSpec { ClassName: "", Configs: "", TypeClassName: function.Spec.Input.TypeClassName, - SubscriptionType: proto.SubscriptionType_SHARED, + SubscriptionType: getSubscriptionType(function.Spec.RetainOrdering, function.Spec.RetainKeyOrdering, function.Spec.ProcessingGuarantee), InputSpecs: inputSpecs, TimeoutMs: uint64(function.Spec.Timeout), Builtin: "", @@ -320,6 +326,8 @@ func convertSinkDetails(sink *v1alpha1.Sink) *proto.FunctionDetails { RetryDetails: generateRetryDetails(sink.Spec.MaxMessageRetry, sink.Spec.DeadLetterTopic), RuntimeFlags: sink.Spec.RuntimeFlags, ComponentType: proto.FunctionDetails_SINK, + RetainOrdering: sink.Spec.RetainOrdering, + RetainKeyOrdering: sink.Spec.RetainKeyOrdering, } if sink.Spec.SecretsMap != nil { @@ -334,7 +342,7 @@ func generateSinkInputSpec(sink *v1alpha1.Sink) *proto.SourceSpec { return &proto.SourceSpec{ TypeClassName: sink.Spec.Input.TypeClassName, - SubscriptionType: getSubscriptionType(sink.Spec.RetainOrdering, sink.Spec.ProcessingGuarantee), + SubscriptionType: getSubscriptionType(sink.Spec.RetainOrdering, sink.Spec.RetainKeyOrdering, sink.Spec.ProcessingGuarantee), InputSpecs: inputSpecs, TimeoutMs: uint64(sink.Spec.Timeout), SubscriptionName: sink.Spec.SubscriptionName, @@ -344,11 +352,15 @@ func generateSinkInputSpec(sink *v1alpha1.Sink) *proto.SourceSpec { } } -func getSubscriptionType(retainOrdering bool, processingGuarantee v1alpha1.ProcessGuarantee) proto.SubscriptionType { +func getSubscriptionType(retainOrdering bool, retainKeyOrdering bool, processingGuarantee v1alpha1.ProcessGuarantee) proto.SubscriptionType { if retainOrdering || processingGuarantee == v1alpha1.EffectivelyOnce { return proto.SubscriptionType_FAILOVER } + if retainKeyOrdering { + return proto.SubscriptionType_KEY_SHARED + } + return proto.SubscriptionType_SHARED }