diff --git a/cos-fleet-manager-api/src/openapi/specs/connector_mgmt-private.yaml b/cos-fleet-manager-api/src/openapi/specs/connector_mgmt-private.yaml index 1f7461da..eb4c0f0c 100644 --- a/cos-fleet-manager-api/src/openapi/specs/connector_mgmt-private.yaml +++ b/cos-fleet-manager-api/src/openapi/specs/connector_mgmt-private.yaml @@ -405,6 +405,174 @@ paths: schema: $ref: 'connector_mgmt.yaml#/components/schemas/Error' + '/api/connector_mgmt/v2alpha1/agent/kafka_connector_clusters/{connector_cluster_id}/processors/deployments': + parameters: + - name: connector_cluster_id + description: The id of the connector cluster + schema: + type: string + in: path + required: true + get: + tags: + - Connector Clusters Agent + security: + - Bearer: [ ] + operationId: getClusterAssignedProcessorDeployments + summary: Returns a list of processor deployments assigned to the cluster. + description: Returns a list of processor deployments assigned to the cluster. + parameters: + - $ref: 'connector_mgmt.yaml#/components/parameters/page' + - $ref: 'connector_mgmt.yaml#/components/parameters/size' + - in: query + name: gt_version + description: filters the processors to those with a version greater than the given value + schema: + type: integer + format: int64 + - in: query + name: watch + description: watch for changes to the resources and return them as a stream of watch events. Specify gt_version to specify the starting point. + schema: + type: string + + responses: + '200': + content: + application/json: + schema: + $ref: '#/components/schemas/ProcessorDeploymentList' + description: A list of processors + '401': + content: + application/json: + schema: + $ref: 'connector_mgmt.yaml#/components/schemas/Error' + examples: + 401Example: + $ref: 'connector_mgmt.yaml#/components/examples/401Example' + description: Auth token is invalid + '500': + content: + application/json: + schema: + $ref: 'connector_mgmt.yaml#/components/schemas/Error' + examples: + 500Example: + $ref: 'connector_mgmt.yaml#/components/examples/500Example' + description: Unexpected error occurred + + '/api/connector_mgmt/v2alpha1/agent/kafka_connector_clusters/{connector_cluster_id}/processors/deployments/{processor_deployment_id}': + parameters: + - name: connector_cluster_id + description: The id of the connector cluster + schema: + type: string + in: path + required: true + - name: processor_deployment_id + description: The id of the processor deployment + schema: + type: string + in: path + required: true + get: + tags: + - Connector Clusters Agent + security: + - Bearer: [ ] + operationId: getClusterAssignedProcessorDeploymentById + summary: Returns a processor deployment assigned to the cluster. + description: Returns a processor deployment assigned to the cluster. + responses: + '200': + content: + application/json: + schema: + $ref: '#/components/schemas/ProcessorDeployment' + description: A processor deployment + '401': + content: + application/json: + schema: + $ref: 'connector_mgmt.yaml#/components/schemas/Error' + examples: + 401Example: + $ref: 'connector_mgmt.yaml#/components/examples/401Example' + description: Auth token is invalid + '410': + content: + application/json: + schema: + $ref: 'connector_mgmt.yaml#/components/schemas/Error' + description: deployment has been deleted + '500': + content: + application/json: + schema: + $ref: 'connector_mgmt.yaml#/components/schemas/Error' + examples: + 500Example: + $ref: 'connector_mgmt.yaml#/components/examples/500Example' + description: Unexpected error occurred + + '/api/connector_mgmt/v2alpha1/agent/kafka_connector_clusters/{connector_cluster_id}/processors/deployments/{processor_deployment_id}/status': + parameters: + - name: connector_cluster_id + description: The id of the connector cluster + schema: + type: string + in: path + required: true + - name: processor_deployment_id + description: The id of the processor deployment + schema: + type: string + in: path + required: true + put: + tags: + - Connector Clusters Agent + operationId: updateProcessorDeploymentStatus + summary: update the processor deployment status + description: update the processor deployment status + security: + - Bearer: [ ] + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ProcessorDeploymentStatus' + required: true + responses: + '200': + description: Processor deployment status is updated + '400': + content: + application/json: + schema: + $ref: 'connector_mgmt.yaml#/components/schemas/Error' + examples: + 400InvalidIdExample: + $ref: '#/components/examples/400InvalidIdExample' + description: id value is not valid + '404': + content: + application/json: + schema: + $ref: 'connector_mgmt.yaml#/components/schemas/Error' + examples: + 404Example: + $ref: 'connector_mgmt.yaml#/components/examples/404Example' + # This is deliberate to hide the endpoints for unauthorised users + description: Auth token is not valid. + '410': + description: deployment has been deleted + content: + application/json: + schema: + $ref: 'connector_mgmt.yaml#/components/schemas/Error' + components: schemas: @@ -664,6 +832,102 @@ components: items: $ref: '#/components/schemas/MetaV1Condition' + # + # Processors + # + ProcessorDeployment: + description: Holds the deployment configuration of a processor + allOf: + - $ref: 'connector_mgmt.yaml#/components/schemas/ObjectReference' + - type: object + properties: + metadata: + type: object + required: + - created_at + - updated_at + - resource_version + - resolved_secrets + properties: + created_at: + format: date-time + type: string + updated_at: + format: date-time + type: string + resource_version: + type: integer + format: int64 + resolved_secrets: + type: boolean + annotations: + type: object + additionalProperties: + type: "string" + spec: + $ref: '#/components/schemas/ProcessorDeploymentSpec' + status: + $ref: '#/components/schemas/ProcessorDeploymentStatus' + + ProcessorDeploymentSpec: + description: Holds the deployment specification of a processor + type: object + properties: + processor_id: + type: string + namespace_id: + type: string + processor_type_id: + type: string + desired_state: + $ref: 'connector_mgmt.yaml#/components/schemas/ProcessorDesiredState' + shard_metadata: + type: object + processor_resource_version: + type: integer + format: int64 + kafka: + $ref: 'connector_mgmt.yaml#/components/schemas/KafkaConnectionSettings' + service_account: + $ref: 'connector_mgmt.yaml#/components/schemas/ServiceAccount' + definition: + type: object + operator_id: + description: an optional operator id that the processor should be run under. + type: string + + ProcessorDeploymentStatus: + description: The status of processor deployment + type: object + properties: + phase: + $ref: 'connector_mgmt.yaml#/components/schemas/ProcessorState' + resource_version: + type: integer + format: int64 + operators: + type: object + properties: + assigned: + $ref: '#/components/schemas/ConnectorOperator' + available: + $ref: '#/components/schemas/ConnectorOperator' + conditions: + type: array + items: + $ref: '#/components/schemas/MetaV1Condition' + + ProcessorDeploymentList: + allOf: + - $ref: 'connector_mgmt.yaml#/components/schemas/List' + - type: object + properties: + items: + type: array + items: + allOf: + - $ref: '#/components/schemas/ProcessorDeployment' + securitySchemes: Bearer: scheme: bearer diff --git a/cos-fleet-manager-api/src/openapi/specs/connector_mgmt.yaml b/cos-fleet-manager-api/src/openapi/specs/connector_mgmt.yaml index b6ad3a23..6925a76c 100644 --- a/cos-fleet-manager-api/src/openapi/specs/connector_mgmt.yaml +++ b/cos-fleet-manager-api/src/openapi/specs/connector_mgmt.yaml @@ -452,6 +452,15 @@ paths: 404Example: $ref: "#/components/examples/404Example" description: No matching resource exists + "409": + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + examples: + 404Example: + $ref: "#/components/examples/409Example" + description: An attempt was made to modify an immutable field "410": content: application/json: @@ -1009,6 +1018,391 @@ paths: $ref: "#/components/examples/500Example" description: An unexpected error occurred creating the connector namespace + /api/connector_mgmt/v2alpha1/processors: + get: + description: Returns a list of processors + operationId: listProcessors + parameters: + - description: Page index + examples: + page: + value: "1" + explode: true + in: query + name: page + required: false + schema: + type: string + style: form + - description: Number of items in each page + examples: + size: + value: "100" + explode: true + in: query + name: size + required: false + schema: + type: string + style: form + - description: |- + Specifies the order by criteria. The syntax of this parameter is + similar to the syntax of the `order by` clause of an SQL statement. + Each query can be ordered by any of the underlying resource fields supported in the search parameter. + For example, to return all Processors ordered by their name, use the following syntax: + + ```sql + name asc + ``` + + If the parameter isn't provided, or if the value is empty, then + the results are ordered by name. + examples: + orderBy: + value: name asc + explode: true + in: query + name: orderBy + required: false + schema: + type: string + style: form + - description: | + Search criteria. + + The syntax of this parameter is similar to the syntax of the `where` clause of a + SQL statement. + + Allowed fields in the search depend on the resource type: + + * Cluster: id, created_at, updated_at, owner, organisation_id, name, state, client_id + * Namespace: id, created_at, updated_at, name, cluster_id, owner, expiration, tenant_user_id, tenant_organisation_id, state + * [manstis] Need to add more + + Allowed operators are `<>`, `=`, `LIKE`, or `ILIKE`. + Allowed conjunctive operators are `AND` and `OR`. However, you can use a maximum of 10 conjunctions in a search query. + + Examples: + + To return a Processor with the name `processor1`, use the following syntax: + + ``` + name = processor1 + ``` + + To return a Processor with a name that starts with `proc`, use the following syntax: + + ``` + name like proc%25 + ``` + + To return a Processor with a name containing `roc` matching any character case combination, use the following syntax: + + ``` + name ilike %25roc%25 + ``` + + If the parameter isn't provided, or if the value is empty, then all the resources + that the user has permission to see are returned. + + Note. If the query is invalid, an error is returned. + examples: + search: + value: name = processor1 + explode: true + in: query + name: search + required: false + schema: + type: string + style: form + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/ProcessorList' + description: A list of processors + "401": + content: + application/json: + examples: + "401Example": + $ref: '#/components/examples/401Example' + schema: + $ref: '#/components/schemas/Error' + description: Auth token is invalid + "500": + content: + application/json: + examples: + "500Example": + $ref: '#/components/examples/500Example' + schema: + $ref: '#/components/schemas/Error' + description: Unexpected error occurred + security: + - Bearer: [ ] + summary: Returns a list of processors + tags: + - Processors + post: + description: Create a new processor + operationId: createProcessor + parameters: + - description: Perform the action in an asynchronous manner + explode: true + in: query + name: async + required: true + schema: + type: boolean + style: form + requestBody: + content: + application/json: + examples: + ProcessorCreateExample: + $ref: '#/components/examples/ProcessorCreateExample' + schema: + $ref: '#/components/schemas/ProcessorRequest' + description: Processor data + required: true + responses: + "202": + content: + application/json: + schema: + $ref: '#/components/schemas/Processor' + description: Accepted + "400": + content: + application/json: + examples: + "400CreationExample": + $ref: '#/components/examples/400CreationExample' + schema: + $ref: '#/components/schemas/Error' + description: Validation errors occurred + "401": + content: + application/json: + examples: + "401Example": + $ref: '#/components/examples/401Example' + schema: + $ref: '#/components/schemas/Error' + description: Auth token is invalid + "404": + content: + application/json: + examples: + "404Example": + $ref: '#/components/examples/404Example' + schema: + $ref: '#/components/schemas/Error' + description: The requested resource doesn't exist + "500": + content: + application/json: + examples: + "500Example": + $ref: '#/components/examples/500Example' + schema: + $ref: '#/components/schemas/Error' + description: An unexpected error occurred creating the processor + security: + - Bearer: [ ] + summary: Create a new processor + tags: + - Processors + /api/connector_mgmt/v2alpha1/processors/{id}: + delete: + description: Delete a processor + operationId: deleteProcessor + parameters: + - description: The ID of record + explode: false + in: path + name: id + required: true + schema: + type: string + style: simple + responses: + "204": + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + description: Deleted + "401": + content: + application/json: + examples: + "401Example": + $ref: '#/components/examples/401Example' + schema: + $ref: '#/components/schemas/Error' + description: Auth token is invalid + "404": + content: + application/json: + examples: + "404DeleteExample": + $ref: '#/components/examples/404DeleteExample' + schema: + $ref: '#/components/schemas/Error' + description: No processor with the specified ID exists + "500": + content: + application/json: + examples: + "500DeleteExample": + $ref: '#/components/examples/500DeleteExample' + schema: + $ref: '#/components/schemas/Error' + description: Unexpected error occurred + security: + - Bearer: [ ] + summary: Delete a processor + tags: + - Processors + get: + description: Get a processor + operationId: getProcessor + parameters: + - description: The ID of record + explode: false + in: path + name: id + required: true + schema: + type: string + style: simple + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/Processor' + description: The processor matching the request + "401": + content: + application/json: + examples: + "401Example": + $ref: '#/components/examples/401Example' + schema: + $ref: '#/components/schemas/Error' + description: Auth token is invalid + "404": + content: + application/json: + examples: + "404Example": + $ref: '#/components/examples/404Example' + schema: + $ref: '#/components/schemas/Error' + description: No matching processor exists + "410": + content: + application/json: + examples: + "404Example": + $ref: '#/components/examples/410Example' + schema: + $ref: '#/components/schemas/Error' + description: The requested resource doesn't exist anymore + "500": + content: + application/json: + examples: + "500Example": + $ref: '#/components/examples/500Example' + schema: + $ref: '#/components/schemas/Error' + description: Unexpected error occurred + security: + - Bearer: [ ] + summary: Get a processor + tags: + - Processors + patch: + description: Patch a processor + operationId: patchProcessor + parameters: + - description: The ID of record + explode: false + in: path + name: id + required: true + schema: + type: string + style: simple + requestBody: + content: + application/merge-patch+json: + schema: + type: object + application/json-patch+json: + schema: + description: A JSON Patch, RFC 6902 - https://tools.ietf.org/html/rfc6902 + type: object + application/json: + schema: + $ref: '#/components/schemas/ProcessorRequest' + description: Data to patch the processor with + required: true + responses: + "202": + content: + application/json: + schema: + $ref: '#/components/schemas/Processor' + description: The processor matching the request + "401": + content: + application/json: + examples: + "401Example": + $ref: '#/components/examples/401Example' + schema: + $ref: '#/components/schemas/Error' + description: Auth token is invalid + "404": + content: + application/json: + examples: + "404Example": + $ref: '#/components/examples/404Example' + schema: + $ref: '#/components/schemas/Error' + description: No matching resource exists + "410": + content: + application/json: + examples: + "404Example": + $ref: '#/components/examples/410Example' + schema: + $ref: '#/components/schemas/Error' + description: The requested resource doesn't exist anymore + "500": + content: + application/json: + examples: + "500Example": + $ref: '#/components/examples/500Example' + schema: + $ref: '#/components/schemas/Error' + description: Unexpected error occurred + security: + - Bearer: [ ] + summary: Patch a processor + tags: + - Processors + components: schemas: @@ -1563,6 +1957,120 @@ components: - tenant - status + # + # Processors + # + Processor: + allOf: + - $ref: "#/components/schemas/ObjectReference" + - $ref: "#/components/schemas/ProcessorMeta" + - $ref: "#/components/schemas/ProcessorConfiguration" + - $ref: "#/components/schemas/ProcessorStatus" + + ProcessorList: + allOf: + - $ref: '#/components/schemas/List' + - $ref: '#/components/schemas/ProcessorList_allOf' + + ProcessorList_allOf: + properties: + items: + items: + $ref: '#/components/schemas/Processor' + type: array + + ProcessorRequest: + allOf: + - $ref: '#/components/schemas/ProcessorRequestMeta' + - $ref: '#/components/schemas/ProcessorConfiguration' + + ProcessorMeta: + allOf: + - $ref: "#/components/schemas/ObjectMeta" + - $ref: "#/components/schemas/ProcessorRequestMeta" + - type: object + properties: + resource_version: + type: integer + format: int64 + + ProcessorRequestMeta: + type: object + required: + - name + - namespace_id + - processor_type_id + - desired_state + properties: + name: + type: string + namespace_id: + type: string + processor_type_id: + type: string + channel: + $ref: "#/components/schemas/Channel" + desired_state: + $ref: "#/components/schemas/ProcessorDesiredState" + annotations: + $ref: "#/components/schemas/ProcessorResourceAnnotations" + + ProcessorConfiguration: + required: + - kafka + - service_account + - processor + properties: + kafka: + $ref: "#/components/schemas/KafkaConnectionSettings" + service_account: + $ref: '#/components/schemas/ServiceAccount' + schema_registry: + $ref: "#/components/schemas/SchemaRegistryConnectionSettings" + processor: + # processor specific properties + # data shape + # processors + # error handling + type: object + + ProcessorResourceAnnotations: + description: Name-value string annotations for resource + type: object + additionalProperties: + type: "string" + + ProcessorDesiredState: + type: string + enum: + - ready + - stopped + - deleted + + ProcessorState: + type: string + enum: + - preparing + - prepared + - provisioning + - ready + - updating + - stopped + - failed + - deprovisioning + - deleting + - deleted + + ProcessorStatus: + properties: + status: + type: object + properties: + state: + $ref: "#/components/schemas/ProcessorState" + error: + type: string + MemoryQuota: description: Memory quota for limits or requests type: string @@ -1837,6 +2345,22 @@ components: name: "MyEvalNamespace" annotations: "cos.bf2.org/profile": "evaluation-profile" + ProcessorCreateExample: + value: + name: "MyProcessor" + namespace_id: "my-namespace-id" + desired_state: "ready" + kafka: + id: "kafka-id" + url: "kafka-url" + service_account: + client_id: "SA-121212" + client_secret: "secret" + processor: + flows: "MyDefinition" + error_handling: + dead_letter_queue: + topic: "dlq" 400CreationExample: value: id: "103" diff --git a/cos-fleetshard-api/model/src/main/java/org/bf2/cos/fleetshard/api/ManagedProcessor.java b/cos-fleetshard-api/model/src/main/java/org/bf2/cos/fleetshard/api/ManagedProcessor.java index a04737f2..8b70d1c5 100644 --- a/cos-fleetshard-api/model/src/main/java/org/bf2/cos/fleetshard/api/ManagedProcessor.java +++ b/cos-fleetshard-api/model/src/main/java/org/bf2/cos/fleetshard/api/ManagedProcessor.java @@ -7,7 +7,15 @@ import io.fabric8.kubernetes.model.annotation.Group; import io.fabric8.kubernetes.model.annotation.ShortNames; import io.fabric8.kubernetes.model.annotation.Version; +import io.sundr.builder.annotations.Buildable; +import io.sundr.builder.annotations.BuildableReference; +import lombok.EqualsAndHashCode; +import lombok.ToString; +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder", refs = @BuildableReference(CustomResource.class), + editableEnabled = false) @Version(ManagedConnector.VERSION) @Group(ManagedConnector.GROUP) @ShortNames("mpsr") diff --git a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Processors.java b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Processors.java new file mode 100644 index 00000000..9998d425 --- /dev/null +++ b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Processors.java @@ -0,0 +1,15 @@ +package org.bf2.cos.fleetshard.support.resources; + +public final class Processors { + + private Processors() { + } + + public static String generateProcessorId(String id) { + if (id.startsWith(Resources.PROCESSOR_PREFIX)) { + return id; + } + + return Resources.PROCESSOR_PREFIX + id; + } +} diff --git a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Resources.java b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Resources.java index b47b0db8..35c77d40 100644 --- a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Resources.java +++ b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Resources.java @@ -29,6 +29,7 @@ public final class Resources { public static final String LABEL_NAMESPACE_ID = "cos.bf2.org/namespace.id"; public static final String LABEL_DEPLOYMENT_ID = "cos.bf2.org/deployment.id"; public static final String LABEL_CONNECTOR_ID = "cos.bf2.org/connector.id"; + public static final String LABEL_PROCESSOR_ID = "cos.bf2.org/processor.id"; public static final String LABEL_CONNECTOR_TYPE_ID = "cos.bf2.org/connector.type.id"; public static final String LABEL_CONNECTOR_OPERATOR = "cos.bf2.org/connector.operator"; public static final String LABEL_DEPLOYMENT_RESOURCE_VERSION = "cos.bf2.org/deployment.resource.version"; @@ -54,6 +55,8 @@ public final class Resources { public static final String CONNECTOR_SECRET_DEPLOYMENT_SUFFIX = "-deploy"; public static final String CONNECTOR_CONFIGMAP_SUFFIX = "-configmap"; + public static final String PROCESSOR_PREFIX = "mpsr-"; + public static final String LABEL_KCP_TARGET_CLUSTER_ID = "kcp.dev/cluster"; public static final String LABEL_KUBERNETES_NAME = "app.kubernetes.io/name"; diff --git a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Secrets.java b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Secrets.java index 8ad704ad..5ab8dff4 100644 --- a/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Secrets.java +++ b/cos-fleetshard-support/src/main/java/org/bf2/cos/fleetshard/support/resources/Secrets.java @@ -17,6 +17,7 @@ public final class Secrets { public static final String SECRET_ENTRY_CONNECTOR = "connector"; + public static final String SECRET_ENTRY_PROCESSOR = "processor"; public static final String SECRET_ENTRY_SERVICE_ACCOUNT = "serviceAccount"; public static final String SECRET_ENTRY_META = "meta"; @@ -129,6 +130,19 @@ public static String generateConnectorSecretId(String id) { return answer; } + public static String generateProcessorSecretId(String id) { + String answer = id; + + if (!answer.startsWith(Resources.PROCESSOR_PREFIX)) { + answer = Resources.PROCESSOR_PREFIX + answer; + } + if (!answer.endsWith(Resources.CONNECTOR_SECRET_DEPLOYMENT_SUFFIX)) { + answer += Resources.CONNECTOR_SECRET_DEPLOYMENT_SUFFIX; + } + + return answer; + } + public static String generateSecretId(String id) { String answer = id; diff --git a/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java b/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java index 3b37d3e1..daf65de8 100644 --- a/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java +++ b/cos-fleetshard-sync-it/src/main/java/org/bf2/cos/fleetshard/sync/it/SyncResource.java @@ -10,6 +10,7 @@ import org.bf2.cos.fleetshard.sync.housekeeping.reapers.AddonReaper; import org.bf2.cos.fleetshard.sync.resources.ConnectorDeploymentProvisioner; import org.bf2.cos.fleetshard.sync.resources.ConnectorNamespaceProvisioner; +import org.bf2.cos.fleetshard.sync.resources.ProcessorDeploymentProvisioner; import org.bf2.cos.fleetshard.sync.resources.ResourcePoll; @ApplicationScoped @@ -20,6 +21,8 @@ public class SyncResource { @Inject ConnectorDeploymentProvisioner deploymentProvisioner; @Inject + ProcessorDeploymentProvisioner processorDeploymentProvisioner; + @Inject AddonReaper addonReaper; @Inject ResourcePoll resourceSync; @@ -38,12 +41,20 @@ public void pollConnectors(Long gv) { deploymentProvisioner.poll(gv); } + @Path("/provisioner/processors") + @POST + @Consumes(MediaType.TEXT_PLAIN) + public void pollProcessors(Long gv) { + processorDeploymentProvisioner.poll(gv); + } + @Path("/provisioner/all") @POST @Consumes(MediaType.TEXT_PLAIN) public void poll() { namespaceProvisioner.poll(0); deploymentProvisioner.poll(0); + processorDeploymentProvisioner.poll(0); } @Path("/provisioner/sync") diff --git a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java new file mode 100644 index 00000000..c9e69176 --- /dev/null +++ b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java @@ -0,0 +1,262 @@ +package org.bf2.cos.fleetshard.sync.it; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import javax.ws.rs.core.MediaType; + +import com.fasterxml.jackson.databind.JsonNode; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.http.ContentTypeHeader; +import com.github.tomakehurst.wiremock.http.RequestMethod; +import io.fabric8.kubernetes.api.model.Namespace; +import io.fabric8.kubernetes.api.model.Secret; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import org.bf2.cos.fleet.manager.model.KafkaConnectionSettings; +import org.bf2.cos.fleet.manager.model.ProcessorDesiredState; +import org.bf2.cos.fleet.manager.model.ServiceAccount; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.resources.Namespaces; +import org.bf2.cos.fleetshard.support.resources.Resources; +import org.bf2.cos.fleetshard.support.resources.Secrets; +import org.bf2.cos.fleetshard.sync.it.support.FleetManagerMockServer; +import org.bf2.cos.fleetshard.sync.it.support.FleetManagerTestInstance; +import org.bf2.cos.fleetshard.sync.it.support.SyncTestProfile; +import org.bf2.cos.fleetshard.sync.it.support.SyncTestSupport; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Test; + +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static io.restassured.RestAssured.given; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson; +import static org.assertj.core.api.Assertions.assertThat; +import static org.bf2.cos.fleetshard.support.resources.Resources.uid; +import static org.bf2.cos.fleetshard.support.resources.Secrets.SECRET_ENTRY_SERVICE_ACCOUNT; +import static org.bf2.cos.fleetshard.support.resources.Secrets.toBase64; + +@QuarkusTest +@TestProfile(ProcessorProvisionerTest.Profile.class) +public class ProcessorProvisionerTest extends SyncTestSupport { + + public static final String DEPLOYMENT_ID = uid(); + public static final String KAFKA_URL = "kafka.acme.com:2181"; + public static final String KAFKA_CLIENT_ID = uid(); + public static final String KAFKA_CLIENT_SECRET = toBase64(uid()); + + @FleetManagerTestInstance + FleetManagerMockServer server; + + @ConfigProperty(name = "cos.cluster.id") + String clusterId; + + @Test + void processorIsProvisioned() { + { + given() + .contentType(MediaType.TEXT_PLAIN) + .body(0L) + .post("/test/provisioner/namespaces"); + + Namespace ns1 = until( + () -> fleetShardClient.getNamespace(clusterId), + Objects::nonNull); + } + + { + // + // Deployment v1 + // + + given() + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.TEXT_PLAIN) + .body(0L) + .post("/test/provisioner/processors"); + + Secret s1 = until( + () -> fleetShardClient.getProcessorSecret(clusterId, DEPLOYMENT_ID), + item -> Objects.equals( + "1", + item.getMetadata().getLabels().get(Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION))); + + ManagedProcessor mp = until( + () -> fleetShardClient.getProcessor(clusterId, DEPLOYMENT_ID), + item -> { + return item.getSpec().getDeploymentResourceVersion() == 1L + && item.getSpec().getSecret() != null; + }); + + assertThat(s1).satisfies(item -> { + assertThat(item.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(mp.getSpec().getDeploymentId())); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_SERVICE_ACCOUNT)) + .isObject() + .containsEntry("client_id", KAFKA_CLIENT_ID) + .containsEntry("client_secret", KAFKA_CLIENT_SECRET); + }); + + assertThat(mp.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX); + assertThat(mp.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL); + assertThat(mp.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName()); + } + { + // + // Deployment v2 + // + + given() + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.TEXT_PLAIN) + .body(1L) + .post("/test/provisioner/processors"); + + Secret s1 = until( + () -> fleetShardClient.getProcessorSecret(clusterId, DEPLOYMENT_ID), + item -> Objects.equals( + "2", + item.getMetadata().getLabels().get(Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION))); + + ManagedProcessor mc = until( + () -> fleetShardClient.getProcessor(clusterId, DEPLOYMENT_ID), + item -> { + return item.getSpec().getDeploymentResourceVersion() == 2L + && item.getSpec().getSecret() != null; + }); + + assertThat(s1).satisfies(item -> { + assertThat(item.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(mc.getSpec().getDeploymentId())); + + assertThatJson(Secrets.extract(item, SECRET_ENTRY_SERVICE_ACCOUNT)) + .isObject() + .containsEntry("client_id", KAFKA_CLIENT_ID) + .containsEntry("client_secret", KAFKA_CLIENT_SECRET); + }); + + assertThat(mc.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX); + assertThat(mc.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL); + assertThat(mc.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName()); + } + } + + public static class Profile extends SyncTestProfile { + + @Override + public Map getConfigOverrides() { + return Map.of( + "cos.cluster.id", getId(), + "test.namespace", Namespaces.generateNamespaceId(getId()), + "cos.namespace", Namespaces.generateNamespaceId(getId()), + "cos.resources.poll-interval", "disabled", + "cos.resources.resync-interval", "disabled", + "cos.resources.update-interval", "disabled", + "cos.metrics.recorder.tags.annotations[0]", "my.cos.bf2.org/processor-group", + "cos.metrics.recorder.tags.labels[0]", "cos.bf2.org/organization-id", + "cos.metrics.recorder.tags.labels[1]", "cos.bf2.org/pricing-tier"); + } + + @Override + public List testResources() { + return List.of( + new TestResourceEntry(FleetManagerTestResource.class)); + } + } + + public static class FleetManagerTestResource extends org.bf2.cos.fleetshard.sync.it.support.ControlPlaneTestResource { + + @Override + protected void configure(FleetManagerMockServer server) { + final String clusterId = ConfigProvider.getConfig().getValue("cos.cluster.id", String.class); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/namespaces", + resp -> { + JsonNode body = namespaceList( + namespace(clusterId, clusterId)); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + + server.stubMatching( + RequestMethod.PUT, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments/.*/status", + () -> WireMock.ok()); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments", + req -> req.withQueryParam("gt_version", equalTo("0")), + resp -> { + JsonNode body = processorDeploymentList( + processorDeployment(DEPLOYMENT_ID, 1L, + depl -> { + depl.getMetadata().annotations(Map.of()); + }, + spec -> { + spec.namespaceId(clusterId); + spec.processorId("processor-1"); + spec.processorTypeId("processor-type-1"); + spec.processorResourceVersion(1L); + spec.kafka( + new KafkaConnectionSettings() + .url(KAFKA_URL)); + spec.serviceAccount( + new ServiceAccount() + .clientId(KAFKA_CLIENT_ID) + .clientSecret(KAFKA_CLIENT_SECRET)); + spec.shardMetadata(node(n -> { + n.withArray("operators").addObject() + .put("type", "camel-connector-operator") + .put("version", "[1.0.0,2.0.0)"); + })); + + spec.desiredState(ProcessorDesiredState.READY); + })); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + + server.stubMatching( + RequestMethod.GET, + "/api/connector_mgmt/v1/agent/kafka_connector_clusters/.*/deployments", + req -> req.withQueryParam("gt_version", equalTo("1")), + resp -> { + JsonNode body = processorDeploymentList( + processorDeployment(DEPLOYMENT_ID, 2L, + depl -> { + depl.getMetadata().annotations(Map.of()); + }, + spec -> { + spec.namespaceId(clusterId); + spec.processorId("processor-1"); + spec.processorTypeId("processor-type-1"); + spec.processorResourceVersion(1L); + spec.kafka( + new KafkaConnectionSettings() + .url(KAFKA_URL)); + spec.serviceAccount( + new ServiceAccount() + .clientId(KAFKA_CLIENT_ID) + .clientSecret(KAFKA_CLIENT_SECRET)); + spec.shardMetadata(node(n -> { + n.withArray("operators").addObject() + .put("type", "camel-connector-operator") + .put("version", "[1.0.0,2.0.0)"); + })); + spec.desiredState(ProcessorDesiredState.READY); + })); + + resp.withHeader(ContentTypeHeader.KEY, APPLICATION_JSON) + .withJsonBody(body); + }); + } + } +} diff --git a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java index 2fe22694..9ff5b9e4 100644 --- a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java +++ b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/support/SyncTestSupport.java @@ -24,6 +24,9 @@ import org.bf2.cos.fleet.manager.model.ConnectorNamespaceStatus; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceTenant; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceTenantKind; +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentList; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentSpec; import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; import org.bf2.cos.fleetshard.sync.client.FleetShardClient; @@ -60,6 +63,19 @@ public static ObjectNode deploymentList(ConnectorDeployment... deployments) { return Serialization.jsonMapper().convertValue(items, ObjectNode.class); } + public static ObjectNode processorDeploymentList(ProcessorDeployment... deployments) { + var items = new ProcessorDeploymentList(); + items.page(1); + items.size(deployments.length); + items.total(deployments.length); + + for (ProcessorDeployment deployment : deployments) { + items.addItemsItem(deployment); + } + + return Serialization.jsonMapper().convertValue(items, ObjectNode.class); + } + public static ObjectNode namespaceList(ConnectorNamespaceDeployment... namespaces) { var items = new ConnectorNamespaceDeploymentList(); items.page(1); @@ -136,6 +152,24 @@ public static ConnectorDeployment deployment( return answer; } + public static ProcessorDeployment processorDeployment( + String name, + long revision, + Consumer deploymentConsumer, + Consumer deploymentSpecConsumer) { + + ProcessorDeployment answer = new ProcessorDeployment() + .kind("ProcessorDeployment") + .id(name) + .metadata(new ConnectorDeploymentAllOfMetadata().resourceVersion(revision)) + .spec(new ProcessorDeploymentSpec()); + + deploymentConsumer.accept(answer); + deploymentSpecConsumer.accept(answer.getSpec()); + + return answer; + } + public static JsonNode node(Consumer consumer) { ObjectNode answer = Serialization.jsonMapper().createObjectNode(); consumer.accept(answer); diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSync.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSync.java index 664bf920..0779860c 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSync.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSync.java @@ -11,6 +11,7 @@ import org.bf2.cos.fleetshard.sync.housekeeping.MetricsHousekeeper; import org.bf2.cos.fleetshard.sync.resources.ConnectorClusterStatusSync; import org.bf2.cos.fleetshard.sync.resources.ConnectorStatusSync; +import org.bf2.cos.fleetshard.sync.resources.ProcessorStatusSync; import org.bf2.cos.fleetshard.sync.resources.ResourcePoll; @ApplicationScoped @@ -26,6 +27,8 @@ public class FleetShardSync implements Service { @Inject ConnectorClusterStatusSync clusterStatusSync; @Inject + ProcessorStatusSync processorStatusSync; + @Inject Housekeeper housekeeping; @Inject MetricsHousekeeper metricsHousekeeping; @@ -55,12 +58,14 @@ public void startResourcesSync() throws Exception { resourceSync.start(); connectorStatusSync.start(); clusterStatusSync.start(); + processorStatusSync.start(); } public void stopResourcesSync() throws Exception { Resources.closeQuietly(resourceSync); Resources.closeQuietly(connectorStatusSync); Resources.closeQuietly(clusterStatusSync); + Resources.closeQuietly(processorStatusSync); } } diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSyncConfig.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSyncConfig.java index fe5c1425..c415844f 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSyncConfig.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/FleetShardSyncConfig.java @@ -7,6 +7,7 @@ import org.bf2.cos.fleetshard.api.ManagedConnector; import org.bf2.cos.fleetshard.api.ManagedConnectorCluster; +import org.bf2.cos.fleetshard.api.ManagedProcessor; import org.bf2.cos.fleetshard.support.DurationConverter; import org.bf2.cos.fleetshard.support.metrics.MetricsRecorderConfig; import org.bf2.cos.fleetshard.sync.resources.ConnectorNamespaceProvisioner; @@ -42,6 +43,13 @@ public interface FleetShardSyncConfig { */ Connectors connectors(); + /** + * Configuration options for processors. + * + * @return {@link Processors} + */ + Processors processors(); + /** * Configuration options for resources. * @@ -239,6 +247,22 @@ interface Connectors { Map annotations(); } + interface Processors { + /** + * An optional map of additional labels to be added to the generated {@link }. + * + * @return the additional labels + */ + Map labels(); + + /** + * An optional map of additional annotations to be added to the generated {@link ManagedProcessor}. + * + * @return the additional annotations + */ + Map annotations(); + } + interface Metrics { /** * The base name for metrics created by the operator. diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetManagerClient.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetManagerClient.java index 6085980f..c9db434d 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetManagerClient.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetManagerClient.java @@ -18,6 +18,9 @@ import org.bf2.cos.fleet.manager.model.ConnectorNamespaceDeployment; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceDeploymentList; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceDeploymentStatus; +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentList; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; import org.bf2.cos.fleetshard.api.ManagedConnector; import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; import org.eclipse.microprofile.rest.client.RestClientBuilder; @@ -77,9 +80,9 @@ public void getNamespaces(long gv, Consumer> consumer) { + public void getConnectorDeployments(long gv, Consumer> consumer) { RestClientHelper.run(() -> { - LOGGER.debug("polling deployment with gv: {}", gv); + LOGGER.debug("polling connector deployments with gv: {}", gv); final AtomicInteger counter = new AtomicInteger(); final List items = new ArrayList<>(); @@ -109,6 +112,38 @@ public void getDeployments(long gv, Consumer> co }); } + public void getProcessorDeployments(long gv, Consumer> consumer) { + RestClientHelper.run(() -> { + LOGGER.debug("polling processor deployments with gv: {}", gv); + + final AtomicInteger counter = new AtomicInteger(); + final List items = new ArrayList<>(); + + for (int i = 1; i < Integer.MAX_VALUE; i++) { + ProcessorDeploymentList list = controlPlane.getProcessorDeployments( + config.cluster().id(), + Integer.toString(i), + null, + gv); + + if (list == null || list.getItems() == null || list.getItems().isEmpty()) { + LOGGER.info("No processors for cluster {}", config.cluster().id()); + break; + } + + items.clear(); + items.addAll(list.getItems()); + items.sort(Comparator.comparingLong(d -> d.getMetadata().getResourceVersion())); + + consumer.accept(items); + + if (counter.addAndGet(items.size()) >= list.getTotal()) { + break; + } + } + }); + } + public void updateConnectorStatus(ManagedConnector connector, ConnectorDeploymentStatus status) { updateConnectorStatus( connector.getSpec().getClusterId(), @@ -130,6 +165,20 @@ public void updateConnectorStatus(String clusterId, String deploymentId, Connect }); } + public void updateProcessorStatus(String clusterId, String deploymentId, ProcessorDeploymentStatus status) { + RestClientHelper.run(() -> { + LOGGER.info("Update processor status: cluster_id={}, deployment_id={}, status={}", + clusterId, + deploymentId, + Serialization.asJson(status)); + + controlPlane.updateProcessorDeploymentStatus( + clusterId, + deploymentId, + status); + }); + } + public void updateNamespaceStatus(String clusterId, String namespaceId, ConnectorNamespaceDeploymentStatus status) { RestClientHelper.run(() -> { LOGGER.info("Update namespace status: cluster_id={}, namespace_id={}, status={}", @@ -155,4 +204,5 @@ public void updateClusterStatus(ConnectorClusterStatus status) { status); }); } + } diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetManagerClientApi.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetManagerClientApi.java index 5736d807..7cc6886f 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetManagerClientApi.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetManagerClientApi.java @@ -30,6 +30,8 @@ import org.bf2.cos.fleet.manager.model.ConnectorNamespaceDeployment; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceDeploymentList; import org.bf2.cos.fleet.manager.model.ConnectorNamespaceDeploymentStatus; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentList; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; public interface FleetManagerClientApi { @@ -57,6 +59,32 @@ ConnectorDeploymentList getConnectorDeployments( @QueryParam("gt_version") Long gtVersion) throws ApiException, ProcessingException; + /** + * Returns a list of processor deployments assigned to the cluster. + */ + @GET + @Path("/kafka_connector_clusters/{connector_cluster_id}/processors/deployments") + @Produces(MediaType.APPLICATION_JSON) + ProcessorDeploymentList getProcessorDeployments( + @PathParam("connector_cluster_id") String connectorClusterId, + @QueryParam("page") String page, + @QueryParam("size") String size, + @QueryParam("gt_version") Long gtVersion) + throws ApiException, ProcessingException; + + /** + * Update the status of a connector deployment + */ + @PUT + @Path("/kafka_connector_clusters/{connector_cluster_id}/processors/deployments/{deployment_id}/status") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + void updateProcessorDeploymentStatus( + @PathParam("connector_cluster_id") String connectorClusterId, + @PathParam("deployment_id") String deploymentId, + ProcessorDeploymentStatus processorDeploymentStatus) + throws ApiException, ProcessingException; + /** * Returns a connector namespace assigned to the cluster. */ diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java index d312cf30..d8015526 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java @@ -9,15 +9,18 @@ import javax.inject.Inject; import org.bf2.cos.fleet.manager.model.ConnectorDeployment; +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; import org.bf2.cos.fleetshard.api.ManagedConnector; import org.bf2.cos.fleetshard.api.ManagedConnectorCluster; import org.bf2.cos.fleetshard.api.ManagedConnectorClusterBuilder; import org.bf2.cos.fleetshard.api.ManagedConnectorClusterSpecBuilder; import org.bf2.cos.fleetshard.api.ManagedConnectorOperator; +import org.bf2.cos.fleetshard.api.ManagedProcessor; import org.bf2.cos.fleetshard.support.Service; import org.bf2.cos.fleetshard.support.resources.Clusters; import org.bf2.cos.fleetshard.support.resources.Connectors; import org.bf2.cos.fleetshard.support.resources.NamespacedName; +import org.bf2.cos.fleetshard.support.resources.Processors; import org.bf2.cos.fleetshard.support.resources.Resources; import org.bf2.cos.fleetshard.support.resources.Secrets; import org.bf2.cos.fleetshard.support.watch.Informers; @@ -46,6 +49,7 @@ public class FleetShardClient implements Service { private volatile SharedIndexInformer connectorsInformer; private volatile SharedIndexInformer operatorsInformer; private volatile SharedIndexInformer namespaceInformers; + private volatile SharedIndexInformer processorsInformer; @SuppressWarnings("PMD.DoNotTerminateVM") @Override @@ -62,6 +66,10 @@ public void start() throws Exception { .inAnyNamespace() .withLabel(Resources.LABEL_CLUSTER_ID, getClusterId()) .inform(); + processorsInformer = kubernetesClient.resources(ManagedProcessor.class) + .inAnyNamespace() + .withLabel(Resources.LABEL_CLUSTER_ID, getClusterId()) + .inform(); operatorsInformer.stopped().whenComplete((unused, throwable) -> { if (throwable != null) { @@ -81,6 +89,12 @@ public void start() throws Exception { System.exit(-1); } }); + processorsInformer.stopped().whenComplete((unused, throwable) -> { + if (throwable != null) { + LOGGER.warn("Processor informer has stopped working, exiting", throwable); + System.exit(-1); + } + }); } @Override @@ -88,6 +102,7 @@ public void stop() throws Exception { Resources.closeQuietly(operatorsInformer); Resources.closeQuietly(namespaceInformers); Resources.closeQuietly(connectorsInformer); + Resources.closeQuietly(processorsInformer); } public String getClusterId() { @@ -105,6 +120,13 @@ public long getMaxDeploymentResourceRevision() { .orElse(0); } + public long getProcessorMaxDeploymentResourceRevision() { + return this.processorsInformer.getIndexer().list().stream() + .mapToLong(c -> c.getSpec().getDeploymentResourceVersion()) + .max() + .orElse(0); + } + public long getMaxNamespaceResourceRevision() { return this.namespaceInformers.getIndexer().list().stream() .mapToLong(c -> { @@ -194,6 +216,12 @@ public Optional getSecret(ConnectorDeployment deployment) { deployment.getId()); } + public Optional getSecret(ProcessorDeployment deployment) { + return getSecret( + deployment.getSpec().getNamespaceId(), + deployment.getId()); + } + public Optional getSecret(NamespacedName id) { return Optional.ofNullable( kubernetesClient.secrets() @@ -210,6 +238,14 @@ public Optional getSecret(String namespaceId, String deploymentId) { .get()); } + public Optional getProcessorSecret(String namespaceId, String deploymentId) { + return Optional.ofNullable( + kubernetesClient.secrets() + .inNamespace(generateNamespaceId(namespaceId)) + .withName(Secrets.generateProcessorSecretId(deploymentId)) + .get()); + } + // ************************************* // // Connectors @@ -298,6 +334,94 @@ public String generateConnectorId(String namespaceId) { return Connectors.generateConnectorId(namespaceId); } + // ************************************* + // + // Processors + // + // ************************************* + + public Boolean deleteProcessor(ManagedProcessor managedProcessor) { + return !kubernetesClient.resources(ManagedProcessor.class) + .inNamespace(managedProcessor.getMetadata().getNamespace()) + .withName(managedProcessor.getMetadata().getName()) + .withPropagationPolicy(DeletionPropagation.FOREGROUND) + .delete().isEmpty(); + } + + public Optional getProcessor(NamespacedName id) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + final String key = Cache.namespaceKeyFunc(id.getNamespace(), id.getName()); + final ManagedProcessor val = processorsInformer.getIndexer().getByKey(key); + + return Optional.ofNullable(val); + } + + public Optional getProcessor(ProcessorDeployment deployment) { + return getProcessor( + deployment.getSpec().getNamespaceId(), + deployment.getId()); + } + + public Optional getProcessor(String namespaceId, String deploymentId) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + final String key = Cache.namespaceKeyFunc(generateNamespaceId(namespaceId), generateProcessorId(deploymentId)); + final ManagedProcessor val = processorsInformer.getIndexer().getByKey(key); + + return Optional.ofNullable(val); + } + + public List getAllProcessors() { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + return processorsInformer.getIndexer().list(); + } + + public List getProcessors(String namespace) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + return processorsInformer.getIndexer().byIndex(Cache.NAMESPACE_INDEX, namespace); + } + + public List getProcessors(Namespace namespace) { + return getProcessors(namespace.getMetadata().getName()); + } + + public void watchProcessors(Consumer handler) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + processorsInformer.addEventHandler(Informers.wrap(handler)); + } + + public void watchProcessors(ResourceEventHandler handler) { + if (processorsInformer == null) { + throw new IllegalStateException("Informer must be started before adding handlers"); + } + + processorsInformer.addEventHandler(handler); + } + + public ManagedProcessor createProcessor(ManagedProcessor processor) { + return kubernetesClient.resource(processor) + .inNamespace(processor.getMetadata().getNamespace()) + .createOrReplace(); + } + + public String generateProcessorId(String namespaceId) { + return Processors.generateProcessorId(namespaceId); + } + // ************************************* // // Operators diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/NamespacesReaper.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/NamespacesReaper.java index 17c39911..2d4f7129 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/NamespacesReaper.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/housekeeping/reapers/NamespacesReaper.java @@ -7,6 +7,7 @@ import javax.enterprise.context.ApplicationScoped; import org.bf2.cos.fleetshard.api.ManagedConnector; +import org.bf2.cos.fleetshard.api.ManagedProcessor; import org.bf2.cos.fleetshard.support.Service; import org.bf2.cos.fleetshard.support.resources.Namespaces; import org.bf2.cos.fleetshard.support.resources.Resources; @@ -73,14 +74,16 @@ private void doRun() { private void delete(Namespace ns) { Collection connectors = fleetShardClient.getConnectors(ns); + Collection processors = fleetShardClient.getProcessors(ns); try { - LOGGER.info("Deleting namespace: {} (id: {}, state: {}, expiration: {}, connectors {}", + LOGGER.info("Deleting namespace: {} (id: {}, state: {}, expiration: {}, connectors {}, processors {}", ns.getMetadata().getName(), Resources.getLabel(ns, Resources.LABEL_NAMESPACE_ID), Resources.getLabel(ns, Resources.LABEL_NAMESPACE_STATE), Resources.getAnnotation(ns, Resources.ANNOTATION_NAMESPACE_EXPIRATION), - connectors.size()); + connectors.size(), + processors.size()); fleetShardClient.deleteNamespace(ns); diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ConnectorDeploymentProvisioner.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ConnectorDeploymentProvisioner.java index 3ddc0a28..0103007a 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ConnectorDeploymentProvisioner.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ConnectorDeploymentProvisioner.java @@ -69,7 +69,7 @@ public class ConnectorDeploymentProvisioner { MetricsRecorder recorder; public void poll(long revision) { - fleetManager.getDeployments( + fleetManager.getConnectorDeployments( revision, this::provisionConnectors); } diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorDeploymentProvisioner.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorDeploymentProvisioner.java new file mode 100644 index 00000000..011bc779 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorDeploymentProvisioner.java @@ -0,0 +1,359 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.bf2.cos.fleet.manager.model.KafkaConnectionSettings; +import org.bf2.cos.fleet.manager.model.MetaV1Condition; +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; +import org.bf2.cos.fleetshard.api.Conditions; +import org.bf2.cos.fleetshard.api.KafkaSpec; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.api.Operator; +import org.bf2.cos.fleetshard.api.OperatorSelector; +import org.bf2.cos.fleetshard.support.OperatorSelectorUtil; +import org.bf2.cos.fleetshard.support.client.EventClient; +import org.bf2.cos.fleetshard.support.metrics.MetricsRecorder; +import org.bf2.cos.fleetshard.support.resources.Processors; +import org.bf2.cos.fleetshard.support.resources.Resources; +import org.bf2.cos.fleetshard.support.resources.Secrets; +import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClient; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.bf2.cos.fleetshard.sync.metrics.MetricsID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.node.ArrayNode; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Secret; + +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_CLUSTER_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_OPERATOR_ASSIGNED; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_OPERATOR_TYPE; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_PROCESSOR_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_UOW; +import static org.bf2.cos.fleetshard.support.resources.Resources.uid; + +@ApplicationScoped +public class ProcessorDeploymentProvisioner { + + public static final String METRICS_SUFFIX = "deployment.provision"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorDeploymentProvisioner.class); + + @Inject + FleetShardClient fleetShard; + @Inject + FleetManagerClient fleetManager; + @Inject + FleetShardSyncConfig config; + @Inject + EventClient eventClient; + + @Inject + @MetricsID(METRICS_SUFFIX) + MetricsRecorder recorder; + + public void poll(long revision) { + fleetManager.getProcessorDeployments( + revision, + this::provisionProcessor); + } + + private void provisionProcessor(Collection deployments) { + for (ProcessorDeployment deployment : deployments) { + this.recorder.record( + () -> provision(deployment), + e -> { + LOGGER.error("Failure while trying to provision processor deployment: id={}, revision={}", + deployment.getId(), + deployment.getMetadata().getResourceVersion(), + e); + + try { + MetaV1Condition condition = new MetaV1Condition(); + condition.setType(Conditions.TYPE_READY); + condition.setStatus(Conditions.STATUS_FALSE); + condition.setReason(Conditions.FAILED_TO_CREATE_OR_UPDATE_RESOURCE_REASON); + condition.setMessage(e.getMessage()); + + ProcessorDeploymentStatus status = new ProcessorDeploymentStatus(); + status.setResourceVersion(deployment.getMetadata().getResourceVersion()); + status.addConditionsItem(condition); + + fleetManager.updateProcessorStatus( + fleetShard.getClusterId(), + deployment.getId(), + status); + } catch (Exception ex) { + LOGGER.warn("Error wile reporting failure to the control plane", e); + } + + fleetShard.getConnectorCluster().ifPresent(cc -> { + eventClient.broadcastWarning( + "FailedToCreateOrUpdateResource", + String.format("Unable to create or update processor deployment %s, revision: %s, reason: %s", + deployment.getId(), + deployment.getMetadata().getResourceVersion(), + e.getMessage()), + cc); + }); + }); + } + } + + public void provision(ProcessorDeployment deployment) { + final String uow = uid(); + + LOGGER.info("Got cluster_id: {}, namespace_d: {}, processor_id: {}, deployment_id: {}, resource_version: {}, uow: {}", + fleetShard.getClusterId(), + deployment.getSpec().getNamespaceId(), + deployment.getSpec().getProcessorId(), + deployment.getId(), + deployment.getMetadata().getResourceVersion(), + uow); + + ManagedProcessor processor = createProcessor(uow, deployment, null); + final Secret secret = createProcessorSecret(uow, deployment, processor); + + LOGGER.info("CreateOrReplace - uow: {}, processor: {}/{}, secret: {}/{}", + uow, + processor.getMetadata().getNamespace(), + processor.getMetadata().getName(), + secret.getMetadata().getNamespace(), + secret.getMetadata().getName()); + } + + private ManagedProcessor createProcessor(String uow, ProcessorDeployment deployment, HasMetadata owner) { + + ManagedProcessor processor = fleetShard.getProcessor(deployment).orElseGet(() -> { + LOGGER.info( + "Processor not found (cluster_id: {}, namespace_id: {}, processor_id: {}, deployment_id: {}, resource_version: {}), creating a new one", + fleetShard.getClusterId(), + deployment.getSpec().getNamespaceId(), + deployment.getSpec().getProcessorId(), + deployment.getId(), + deployment.getMetadata().getResourceVersion()); + + ManagedProcessor answer = new ManagedProcessor(); + answer.setMetadata(new ObjectMeta()); + answer.getMetadata().setNamespace(fleetShard.generateNamespaceId(deployment.getSpec().getNamespaceId())); + answer.getMetadata().setName(Processors.generateProcessorId(deployment.getId())); + + Resources.setLabels( + answer, + LABEL_CLUSTER_ID, fleetShard.getClusterId(), + LABEL_PROCESSOR_ID, deployment.getSpec().getProcessorId(), + LABEL_DEPLOYMENT_ID, deployment.getId()); + + answer.getSpec().setClusterId(fleetShard.getClusterId()); + answer.getSpec().setProcessorId(deployment.getSpec().getProcessorId()); + answer.getSpec().setDeploymentId(deployment.getId()); + + return answer; + }); + + // TODO: change APIs to include a single operator + // move operator one level up + // include full operator info in ProcessorDeployment APIs + ArrayNode operatorsMeta = deployment.getSpec().getShardMetadata().withArray("operators"); + if (operatorsMeta.size() != 1) { + throw new IllegalArgumentException("Multiple selectors are not yet supported"); + } + + OperatorSelector operatorSelector = new OperatorSelector( + deployment.getSpec().getOperatorId(), + operatorsMeta.get(0).requiredAt("/type").asText(), + operatorsMeta.get(0).requiredAt("/version").asText()); + + if (operatorSelector.getId() == null) { + final OperatorSelector currentSelector = processor.getSpec().getOperatorSelector(); + + // don't select a new operator if previously set. + if (currentSelector != null && currentSelector.getId() != null) { + operatorSelector.setId(currentSelector.getId()); + } else { + Collection operators = fleetShard.getOperators() + .stream() + .map(mco -> new Operator( + mco.getMetadata().getName(), + mco.getSpec().getType(), + mco.getSpec().getVersion())) + .collect(Collectors.toList()); + + OperatorSelectorUtil.assign(operatorSelector, operators) + .map(Operator::getId) + .ifPresentOrElse( + operatorSelector::setId, + () -> { + eventClient.broadcastWarning( + "NoAssignableOperator", + String.format("Unable to find a supported operator for deployment_id: %s", deployment.getId()), + processor); + }); + } + } + if (operatorSelector.getId() != null) { + Resources.setLabel( + processor, + LABEL_OPERATOR_ASSIGNED, + operatorSelector.getId()); + } + if (operatorSelector.getType() != null) { + Resources.setLabel( + processor, + LABEL_OPERATOR_TYPE, + operatorSelector.getType()); + } + + if (config != null && config.processors() != null) { + config.processors().labels().forEach((k, v) -> { + Resources.setLabel(processor, k, v); + }); + config.processors().annotations().forEach((k, v) -> { + Resources.setAnnotation(processor, k, v); + }); + } + + Resources.setOwnerReferences( + processor, + owner); + + // add resource version to label + Resources.setLabel( + processor, + LABEL_DEPLOYMENT_RESOURCE_VERSION, + "" + deployment.getMetadata().getResourceVersion()); + + // add uow + Resources.setLabel( + processor, + LABEL_UOW, + uow); + + processor.getSpec().setDeploymentResourceVersion(deployment.getMetadata().getResourceVersion()); + processor.getSpec().setDesiredState(deployment.getSpec().getDesiredState().getValue()); + processor.getSpec().setProcessorTypeId(deployment.getSpec().getProcessorTypeId()); + + KafkaConnectionSettings kafkaConnectionSettings = deployment.getSpec().getKafka(); + if (kafkaConnectionSettings != null) { + processor.getSpec().setKafka(new KafkaSpec( + kafkaConnectionSettings.getId(), + kafkaConnectionSettings.getUrl())); + } + + processor.getSpec().setOperatorSelector(operatorSelector); + processor.getSpec().setSecret(Secrets.generateProcessorSecretId(deployment.getId())); + + copyMetadata(deployment, processor); + + LOGGER.info("Provisioning processor namespace: {}, name: {}, revision: {}", + processor.getMetadata().getNamespace(), + processor.getMetadata().getName(), + processor.getSpec().getDeploymentResourceVersion()); + + try { + return fleetShard.createProcessor(processor); + } catch (Exception e) { + LOGGER.warn("", e); + throw e; + } + } + + private Secret createProcessorSecret(String uow, ProcessorDeployment deployment, ManagedProcessor owner) { + Secret secret = fleetShard.getSecret(deployment) + .orElseGet(() -> { + LOGGER.info( + "Secret not found (cluster_id: {}, namespace_id: {}, processor_id: {}, deployment_id: {}, resource_version: {}), creating a new one", + fleetShard.getClusterId(), + deployment.getSpec().getNamespaceId(), + deployment.getSpec().getProcessorId(), + deployment.getId(), + deployment.getMetadata().getResourceVersion()); + + Secret answer = new Secret(); + answer.setMetadata(new ObjectMeta()); + answer.getMetadata().setNamespace(fleetShard.generateNamespaceId(deployment.getSpec().getNamespaceId())); + answer.getMetadata().setName(Secrets.generateProcessorSecretId(deployment.getId())); + + Resources.setLabels( + answer, + LABEL_CLUSTER_ID, fleetShard.getClusterId(), + LABEL_PROCESSOR_ID, deployment.getSpec().getProcessorId(), + LABEL_DEPLOYMENT_ID, deployment.getId(), + LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + deployment.getMetadata().getResourceVersion()); + + return answer; + }); + + Resources.setOwnerReferences( + secret, + owner); + + // add resource version to label + Resources.setLabel( + secret, + LABEL_DEPLOYMENT_RESOURCE_VERSION, + "" + deployment.getMetadata().getResourceVersion()); + + // add uow + Resources.setLabel( + secret, + LABEL_UOW, + uow); + + // copy operator type + Resources.setLabel( + secret, + LABEL_OPERATOR_TYPE, + owner.getMetadata().getLabels().get(LABEL_OPERATOR_TYPE)); + + Secrets.set(secret, Secrets.SECRET_ENTRY_SERVICE_ACCOUNT, deployment.getSpec().getServiceAccount()); + Secrets.set(secret, Secrets.SECRET_ENTRY_META, deployment.getSpec().getShardMetadata()); + + copyMetadata(deployment, secret); + + try { + return fleetShard.createSecret(secret); + } catch (Exception e) { + LOGGER.warn("", e); + throw e; + } + } + + // TODO remove duplication here + // Used for the billing model so it might be not necessary as well + private void copyMetadata(ProcessorDeployment deployment, HasMetadata target) { + if (deployment.getMetadata() != null && deployment.getMetadata().getAnnotations() != null) { + config.metrics().recorder().tags().labels() + .stream() + .flatMap(List::stream) + .forEach(key -> { + String val = deployment.getMetadata().getAnnotations().get(key); + if (val != null) { + Resources.setLabel(target, key, val); + } + }); + + config.metrics().recorder().tags().annotations() + .stream() + .flatMap(List::stream) + .forEach(key -> { + String val = deployment.getMetadata().getAnnotations().get(key); + if (val != null) { + Resources.setAnnotation(target, key, val); + } + }); + } + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusExtractor.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusExtractor.java new file mode 100644 index 00000000..cbe71ee5 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusExtractor.java @@ -0,0 +1,91 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import org.bf2.cos.fleet.manager.model.ConnectorDeploymentStatusOperators; +import org.bf2.cos.fleet.manager.model.ConnectorOperator; +import org.bf2.cos.fleet.manager.model.MetaV1Condition; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; +import org.bf2.cos.fleet.manager.model.ProcessorState; +import org.bf2.cos.fleetshard.api.Conditions; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.api.Operator; + +import io.fabric8.kubernetes.api.model.Condition; + +import static org.bf2.cos.fleetshard.api.ManagedConnector.DESIRED_STATE_DELETED; +import static org.bf2.cos.fleetshard.api.ManagedConnector.DESIRED_STATE_STOPPED; +import static org.bf2.cos.fleetshard.api.ManagedConnector.DESIRED_STATE_UNASSIGNED; + +public class ProcessorStatusExtractor { + public static ProcessorDeploymentStatus extract(ManagedProcessor processor) { + ProcessorDeploymentStatus status = new ProcessorDeploymentStatus(); + + status.setResourceVersion(processor.getSpec().getDeploymentResourceVersion()); + + if (processor.getSpec().getOperatorSelector() == null || processor.getSpec().getOperatorSelector().getId() == null) { + status.setPhase(ProcessorState.FAILED); + status.addConditionsItem(new MetaV1Condition() + .type(Conditions.TYPE_READY) + .status(Conditions.STATUS_FALSE) + .message("No assignable operator") + .reason(Conditions.NO_ASSIGNABLE_OPERATOR_REASON) + .lastTransitionTime(Conditions.now())); + + return status; + } + + if (processor.getStatus() != null && processor.getStatus().getProcessorStatus() != null) { + status.setOperators( + new ConnectorDeploymentStatusOperators() + .assigned( + toConnectorOperator(processor.getStatus().getProcessorStatus().getAssignedOperator())) + .available( + toConnectorOperator(processor.getStatus().getProcessorStatus().getAvailableOperator()))); + + if (processor.getStatus().getProcessorStatus() != null) { + if (processor.getStatus().getProcessorStatus().getPhase() != null) { + status.setPhase(ProcessorState.fromValue( + processor.getStatus().getProcessorStatus().getPhase())); + } + if (processor.getStatus().getProcessorStatus().getConditions() != null) { + for (var cond : processor.getStatus().getProcessorStatus().getConditions()) { + status.addConditionsItem(toMetaV1Condition(cond)); + } + } + } + } + + if (status.getPhase() == null) { + status.setPhase(ProcessorState.PROVISIONING); + + if (DESIRED_STATE_DELETED.equals(processor.getSpec().getDesiredState())) { + status.setPhase(ProcessorState.DEPROVISIONING); + } else if (DESIRED_STATE_STOPPED.equals(processor.getSpec().getDesiredState())) { + status.setPhase(ProcessorState.DEPROVISIONING); + } else if (DESIRED_STATE_UNASSIGNED.equals(processor.getSpec().getDesiredState())) { + status.setPhase(ProcessorState.DEPROVISIONING); + } + } + + return status; + } + + public static ConnectorOperator toConnectorOperator(Operator operator) { + if (operator == null) { + return null; + } + + return new ConnectorOperator() + .id(operator.getId()) + .type(operator.getType()) + .version(operator.getVersion()); + } + + public static MetaV1Condition toMetaV1Condition(Condition condition) { + return new MetaV1Condition() + .type(condition.getType()) + .status(condition.getStatus()) + .message(condition.getMessage()) + .reason(condition.getReason()) + .lastTransitionTime(condition.getLastTransitionTime()); + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java new file mode 100644 index 00000000..75b43a93 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSync.java @@ -0,0 +1,147 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.Temporal; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.Service; +import org.bf2.cos.fleetshard.support.metrics.StaticMetricsRecorder; +import org.bf2.cos.fleetshard.support.resources.NamespacedName; +import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; +import org.bf2.cos.fleetshard.sync.FleetShardSyncScheduler; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.bf2.cos.fleetshard.sync.metrics.MetricsID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.micrometer.core.instrument.Counter; + +@ApplicationScoped +public class ProcessorStatusSync implements Service { + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusSync.class); + + public static final String JOB_ID = "cos.processors.status.sync"; + public static final String METRICS_SYNC = "processors.status.sync"; + public static final String METRICS_UPDATE = "processors.status.update"; + + @Inject + ProcessorStatusUpdater updater; + @Inject + FleetShardClient connectorClient; + @Inject + FleetShardSyncConfig config; + @Inject + FleetShardSyncScheduler scheduler; + + @Inject + @MetricsID(METRICS_SYNC) + StaticMetricsRecorder syncRecorder; + @Inject + @MetricsID(METRICS_SYNC + ".total") + Counter syncTotalRecorder; + + @Inject + @MetricsID(METRICS_UPDATE) + StaticMetricsRecorder updateRecorder; + @Inject + @MetricsID(METRICS_UPDATE + ".total") + Counter updateTotalRecorder; + + private volatile Instant lastResync; + private volatile Instant lastUpdate; + + private final ConcurrentMap processors = new ConcurrentHashMap<>(); + + @Override + public void start() throws Exception { + LOGGER.info("Starting processor status sync"); + + connectorClient.watchProcessors(new ResourceEventHandler<>() { + @Override + public void onAdd(ManagedProcessor connector) { + processors.put(NamespacedName.of(connector), Instant.now()); + } + + @Override + public void onUpdate(ManagedProcessor ignored, ManagedProcessor connector) { + processors.put(NamespacedName.of(connector), Instant.now()); + } + + @Override + public void onDelete(ManagedProcessor connector, boolean deletedFinalStateUnknown) { + processors.remove(NamespacedName.of(connector)); + } + }); + + scheduler.schedule( + JOB_ID, + ConnectorStatusSyncJob.class, + config.resources().updateInterval()); + } + + @Override + public void stop() { + scheduler.shutdownQuietly(JOB_ID); + } + + public void run() { + final Duration resyncInterval = config.resources().resyncInterval(); + final Instant now = Instant.now(); + final boolean resync = lastResync == null || greater(lastResync, now, resyncInterval); + + if (resync) { + syncRecorder.record(this::sync); + lastResync = now; + } else { + updateRecorder.record(this::update); + } + + lastUpdate = now; + } + + private void sync() { + int count = 0; + + try { + for (ManagedProcessor processor : connectorClient.getAllProcessors()) { + updater.update(processor); + + count++; + } + } finally { + if (count > 0) { + syncTotalRecorder.increment(count); + } + } + } + + private void update() { + int count = 0; + + try { + for (Map.Entry entry : processors.entrySet()) { + if (entry.getValue().isAfter(lastUpdate)) { + connectorClient.getProcessor(entry.getKey()).ifPresent(updater::update); + + count++; + } + } + } finally { + if (count > 0) { + updateTotalRecorder.increment(count); + } + } + } + + private static boolean greater(Temporal startInclusive, Temporal endExclusive, Duration interval) { + return Duration.between(startInclusive, endExclusive).compareTo(interval) >= 0; + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSyncJob.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSyncJob.java new file mode 100644 index 00000000..25687827 --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusSyncJob.java @@ -0,0 +1,18 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import javax.inject.Inject; + +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; + +@DisallowConcurrentExecution +public class ProcessorStatusSyncJob implements Job { + @Inject + ProcessorStatusSync sync; + + @Override + public void execute(JobExecutionContext context) { + sync.run(); + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java new file mode 100644 index 00000000..4f1ceebd --- /dev/null +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java @@ -0,0 +1,48 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClient; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClientException; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class ProcessorStatusUpdater { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusUpdater.class); + @Inject + FleetManagerClient fleetManagerClient; + @Inject + FleetShardClient processorClient; + + public void update(ManagedProcessor processor) { + LOGGER.debug("Update processor status (name: {}, phase: {})", + processor.getMetadata().getName(), + processor.getStatus().getPhase()); + + try { + ProcessorDeploymentStatus processorDeploymentStatus = ProcessorStatusExtractor.extract(processor); + + fleetManagerClient.updateProcessorStatus(processor.getSpec().getClusterId(), + processor.getSpec().getDeploymentId(), + processorDeploymentStatus); + + } catch (FleetManagerClientException e) { + if (e.getStatusCode() == 410) { + LOGGER.info("Processor " + processor.getMetadata().getName() + " does not exists anymore, deleting it"); + if (processorClient.deleteProcessor(processor)) { + LOGGER.info("Processor " + processor.getMetadata().getName() + " deleted"); + } + } else { + LOGGER.warn("Error updating status of processor " + processor.getMetadata().getName(), e); + } + } catch (Exception e) { + LOGGER.warn("Error updating status of processor " + processor.getMetadata().getName(), e); + } + } +} diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java index d843bbd6..fa5719bb 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ResourcePoll.java @@ -31,6 +31,8 @@ public class ResourcePoll implements Service { @Inject ConnectorDeploymentProvisioner connectorsProvisioner; @Inject + ProcessorDeploymentProvisioner processorDeploymentProvisioner; + @Inject ConnectorNamespaceProvisioner namespaceProvisioner; @Inject @@ -76,6 +78,7 @@ public void run() { private void sync() { namespaceProvisioner.poll(BEGINNING); connectorsProvisioner.poll(BEGINNING); + processorDeploymentProvisioner.poll(BEGINNING); } private void poll() { @@ -83,5 +86,7 @@ private void poll() { connectorClient.getMaxNamespaceResourceRevision()); connectorsProvisioner.poll( connectorClient.getMaxDeploymentResourceRevision()); + processorDeploymentProvisioner.poll( + connectorClient.getProcessorMaxDeploymentResourceRevision()); } } diff --git a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/connector/ConnectorTestSupport.java b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/connector/ConnectorTestSupport.java index 5b1977e7..e0581dbb 100644 --- a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/connector/ConnectorTestSupport.java +++ b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/connector/ConnectorTestSupport.java @@ -86,8 +86,8 @@ public static ManagedConnectorCluster createCluster() { .build(); } - public static ConnectorDeployment createDeployment(long deploymentRevision) { - return createDeployment( + public static ConnectorDeployment createConnectorDeployment(long deploymentRevision) { + return createConnectorDeployment( deploymentRevision, () -> { ObjectNode answer = Serialization.jsonMapper().createObjectNode(); @@ -110,13 +110,14 @@ public static ConnectorDeployment createDeployment(long deploymentRevision) { }); } - public static ConnectorDeployment createDeployment(long deploymentRevision, Consumer customizer) { - ConnectorDeployment answer = createDeployment(deploymentRevision); + public static ConnectorDeployment createConnectorDeployment(long deploymentRevision, + Consumer customizer) { + ConnectorDeployment answer = createConnectorDeployment(deploymentRevision); customizer.accept(answer); return answer; } - public static ConnectorDeployment createDeployment( + public static ConnectorDeployment createConnectorDeployment( long deploymentRevision, Supplier connectorSpec, Supplier connectorMeta) { diff --git a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/processor/ProcessorTestSupport.java b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/processor/ProcessorTestSupport.java new file mode 100644 index 00000000..091b1b95 --- /dev/null +++ b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/processor/ProcessorTestSupport.java @@ -0,0 +1,275 @@ +package org.bf2.cos.fleetshard.sync.processor; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.bf2.cos.fleet.manager.model.ConnectorDeploymentAllOfMetadata; +import org.bf2.cos.fleet.manager.model.KafkaConnectionSettings; +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ProcessorDeploymentSpec; +import org.bf2.cos.fleet.manager.model.ProcessorDesiredState; +import org.bf2.cos.fleet.manager.model.ServiceAccount; +import org.bf2.cos.fleetshard.api.ManagedConnectorCluster; +import org.bf2.cos.fleetshard.api.ManagedConnectorClusterBuilder; +import org.bf2.cos.fleetshard.api.ManagedConnectorClusterSpecBuilder; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.support.resources.Clusters; +import org.bf2.cos.fleetshard.support.resources.Processors; +import org.bf2.cos.fleetshard.support.resources.Resources; +import org.bf2.cos.fleetshard.support.resources.Secrets; +import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; +import org.bf2.cos.fleetshard.sync.client.FleetManagerClient; +import org.bf2.cos.fleetshard.sync.client.FleetShardClient; +import org.mockito.Mockito; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.client.utils.Serialization; + +import static org.bf2.cos.fleetshard.support.resources.Resources.uid; +import static org.bf2.cos.fleetshard.support.resources.Secrets.toBase64; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +public final class ProcessorTestSupport { + + private ProcessorTestSupport() { + } + + public static Optional lookupProcessor( + Collection processors, + String clusterId, + ProcessorDeployment deployment) { + + return processors.stream().filter( + entry -> { + return Objects.equals(Processors.generateProcessorId(deployment.getId()), entry.getMetadata().getName()); + }).findFirst(); + } + + public static Optional lookupSecret( + Collection secrets, + String clusterId, + ProcessorDeployment deployment) { + + return secrets.stream().filter( + entry -> { + return Objects.equals(Secrets.generateProcessorSecretId(deployment.getId()), entry.getMetadata().getName()); + }).findFirst(); + } + + public static ManagedConnectorCluster createCluster() { + final String clusterId = uid(); + + return new ManagedConnectorClusterBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Clusters.CONNECTOR_CLUSTER_PREFIX + "-" + clusterId) + .addToLabels(Resources.LABEL_CLUSTER_ID, clusterId) + .build()) + .withSpec(new ManagedConnectorClusterSpecBuilder() + .withClusterId(clusterId) + .build()) + .build(); + } + + public static ProcessorDeployment createProcessorDeployment(long deploymentRevision) { + return createProcessorDeployment( + deploymentRevision, + () -> { + CamelConnectorMeta answer = new CamelConnectorMeta( + "sink", + "quay.io/mcs_dev/aws-s3-sink:0.0.1", + CamelConnectorOperator.of( + "camel-connector-operator", + "[1.0.0,2.0.0)"), + Map.of( + "kafka", "managed-kafka-source")); + + return Serialization.jsonMapper().convertValue(answer, JsonNode.class); + }); + } + + public static ProcessorDeployment createProcessorDeployment(long deploymentRevision, + Consumer customizer) { + ProcessorDeployment answer = createProcessorDeployment(deploymentRevision); + customizer.accept(answer); + return answer; + } + + public static ProcessorDeployment createProcessorDeployment( + long deploymentRevision, + Supplier connectorMeta) { + + final String namespaceId = "nid"; + final String deploymentId = "did"; + final String processorId = "pid"; + + return new ProcessorDeployment() + .kind("ProcessorDeployment") + .id(deploymentId) + .metadata(new ConnectorDeploymentAllOfMetadata() + .resourceVersion(deploymentRevision)) + .spec(new ProcessorDeploymentSpec() + .namespaceId(namespaceId) + .processorId(processorId) + .processorResourceVersion(1L) + .kafka(new KafkaConnectionSettings() + .url("kafka.acme.com:2181")) + .serviceAccount(new ServiceAccount() + .clientId(UUID.randomUUID().toString()) + .clientSecret(toBase64(UUID.randomUUID().toString()))) + .shardMetadata(connectorMeta.get()) + .desiredState(ProcessorDesiredState.READY)); + } + + public static FleetShardClient fleetShard( + String clusterId, + Collection processors, + Collection secrets) { + + Map allProcessors = processors.stream() + .collect(Collectors.toMap(e -> e.getMetadata().getName(), Function.identity())); + Map allSecrets = secrets.stream() + .collect(Collectors.toMap(e -> e.getMetadata().getName(), Function.identity())); + + FleetShardClient answer = Mockito.mock(FleetShardClient.class); + + when(answer.getClusterId()) + .thenAnswer(invocation -> clusterId); + + when(answer.getProcessor(any(ProcessorDeployment.class))) + .thenAnswer(invocation -> { + return lookupProcessor(allProcessors.values(), clusterId, invocation.getArgument(0)); + }); + + when(answer.getSecret(any(ProcessorDeployment.class))) + .thenAnswer(invocation -> { + return lookupSecret(allSecrets.values(), clusterId, invocation.getArgument(0)); + }); + + when(answer.createProcessor(any(ManagedProcessor.class))) + .thenAnswer(invocation -> { + var arg = invocation.getArgument(0, ManagedProcessor.class); + allProcessors.put(arg.getMetadata().getName(), arg); + return arg; + }); + when(answer.createSecret(any(Secret.class))) + .thenAnswer(invocation -> { + var arg = invocation.getArgument(0, Secret.class); + allSecrets.put(arg.getMetadata().getName(), arg); + return arg; + }); + + when(answer.getOrCreateManagedConnectorCluster()) + .thenAnswer(invocation -> { + return new ManagedConnectorClusterBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Clusters.CONNECTOR_CLUSTER_PREFIX + "-" + clusterId) + .addToLabels(Resources.LABEL_CLUSTER_ID, clusterId) + .build()) + .withSpec(new ManagedConnectorClusterSpecBuilder() + .withClusterId(clusterId) + .build()) + .build(); + }); + + return answer; + } + + public static FleetManagerClient fleetManagerClient() { + FleetManagerClient answer = Mockito.mock(FleetManagerClient.class); + return answer; + } + + public static FleetShardSyncConfig config() { + FleetShardSyncConfig answer = Mockito.mock(FleetShardSyncConfig.class); + when(answer.processors()).thenAnswer(invocation -> { + var processors = Mockito.mock(FleetShardSyncConfig.Processors.class); + when(processors.annotations()).thenReturn(Collections.emptyMap()); + when(processors.labels()).thenReturn(Collections.emptyMap()); + return processors; + }); + when(answer.imagePullSecretsName()).thenAnswer(invocation -> { + return "foo"; + }); + when(answer.namespace()).thenAnswer(invocation -> { + return "bar"; + }); + when(answer.metrics()).thenAnswer(invocation -> { + var metrics = Mockito.mock(FleetShardSyncConfig.Metrics.class); + when(metrics.baseName()).thenReturn("base"); + return metrics; + }); + when(answer.quota()).thenAnswer(invocation -> { + return Mockito.mock(FleetShardSyncConfig.Quota.class); + }); + when(answer.quota().defaultLimits()).thenAnswer(invocation -> { + return Mockito.mock(FleetShardSyncConfig.DefaultLimits.class); + }); + when(answer.quota().defaultRequest()).thenAnswer(invocation -> { + return Mockito.mock(FleetShardSyncConfig.DefaultRequest.class); + }); + + return answer; + } + + public static class CamelConnectorMeta { + + @JsonProperty("connector_type") + String connectorType; + + @JsonProperty("connector_image") + String connectorImage; + + @JsonProperty("operators") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + List operators; + + @JsonProperty("kamelets") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + Map kamelets; + + public CamelConnectorMeta( + String connectorType, + String connectorImage, + CamelConnectorOperator operator, + Map kamelets) { + + this.connectorType = connectorType; + this.connectorImage = connectorImage; + this.operators = List.of(operator); + this.kamelets = kamelets; + } + } + + public static class CamelConnectorOperator { + + @JsonProperty("type") + String type; + + @JsonProperty("version") + String version; + + public CamelConnectorOperator(String type, String version) { + this.type = type; + this.version = version; + } + + public static CamelConnectorOperator of(String type, String version) { + return new CamelConnectorOperator(type, version); + } + } +} diff --git a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ConnectorProvisionerTest.java b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ConnectorProvisionerTest.java index 9f2df8ad..db9bbd08 100644 --- a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ConnectorProvisionerTest.java +++ b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ConnectorProvisionerTest.java @@ -30,7 +30,7 @@ import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_ID; import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION; import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_UOW; -import static org.bf2.cos.fleetshard.sync.connector.ConnectorTestSupport.createDeployment; +import static org.bf2.cos.fleetshard.sync.connector.ConnectorTestSupport.createConnectorDeployment; import static org.mockito.Mockito.verify; public class ConnectorProvisionerTest { @@ -41,7 +41,7 @@ void createResources() { // // Given that no resources associated to the provided deployment exist // - final ConnectorDeployment deployment = createDeployment(0); + final ConnectorDeployment deployment = ConnectorTestSupport.createConnectorDeployment(0); final List connectors = List.of(); final List secrets = List.of(); @@ -132,7 +132,7 @@ void updateResources() { // // Given that the resources associated to the provided deployment exist // - final ConnectorDeployment oldDeployment = createDeployment(0); + final ConnectorDeployment oldDeployment = ConnectorTestSupport.createConnectorDeployment(0); final List connectors = List.of( new ManagedConnectorBuilder() @@ -168,7 +168,7 @@ void updateResources() { // // When deployment is updated // - final ConnectorDeployment newDeployment = createDeployment(0, d -> { + final ConnectorDeployment newDeployment = ConnectorTestSupport.createConnectorDeployment(0, d -> { d.getSpec().getKafka().setUrl("my-kafka.acme.com:218"); ((ObjectNode) d.getSpec().getConnectorSpec()).withObject("/connector").put("foo", "connector-baz"); ((ObjectNode) d.getSpec().getShardMetadata()).put("connector_image", "quay.io/mcs_dev/aws-s3-sink:0.1.0"); @@ -251,7 +251,7 @@ void updateAndCreateResources() { // // Given that the resources associated to the provided deployment exist // - final ConnectorDeployment oldDeployment = createDeployment(0); + final ConnectorDeployment oldDeployment = ConnectorTestSupport.createConnectorDeployment(0); final List connectors = List.of( new ManagedConnectorBuilder() @@ -287,7 +287,7 @@ void updateAndCreateResources() { // // When a change to the deployment happen that ends up with a new resource version // - final ConnectorDeployment newDeployment = createDeployment(0, d -> { + final ConnectorDeployment newDeployment = ConnectorTestSupport.createConnectorDeployment(0, d -> { d.getMetadata().setResourceVersion(1L); d.getSpec().getKafka().setUrl("my-kafka.acme.com:218"); ((ObjectNode) d.getSpec().getConnectorSpec()).withObject("/connector").put("foo", "connector-baz"); diff --git a/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ProcessorProvisionerTest.java b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ProcessorProvisionerTest.java new file mode 100644 index 00000000..657e6347 --- /dev/null +++ b/cos-fleetshard-sync/src/test/java/org/bf2/cos/fleetshard/sync/resources/ProcessorProvisionerTest.java @@ -0,0 +1,293 @@ +package org.bf2.cos.fleetshard.sync.resources; + +import java.util.List; +import java.util.UUID; + +import org.bf2.cos.fleet.manager.model.ProcessorDeployment; +import org.bf2.cos.fleet.manager.model.ServiceAccount; +import org.bf2.cos.fleetshard.api.ManagedProcessor; +import org.bf2.cos.fleetshard.api.ManagedProcessorBuilder; +import org.bf2.cos.fleetshard.api.ManagedProcessorSpec; +import org.bf2.cos.fleetshard.support.client.EventClient; +import org.bf2.cos.fleetshard.support.metrics.MetricsRecorder; +import org.bf2.cos.fleetshard.support.resources.Processors; +import org.bf2.cos.fleetshard.support.resources.Secrets; +import org.bf2.cos.fleetshard.sync.processor.ProcessorTestSupport; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.api.model.SecretBuilder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_CLUSTER_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_PROCESSOR_ID; +import static org.bf2.cos.fleetshard.support.resources.Resources.LABEL_UOW; +import static org.bf2.cos.fleetshard.sync.processor.ProcessorTestSupport.createProcessorDeployment; +import static org.mockito.Mockito.verify; + +public class ProcessorProvisionerTest { + private static final String CLUSTER_ID = UUID.randomUUID().toString(); + + @Test + void createResources() { + // + // Given that no resources associated to the provided deployment exist + // + final ProcessorDeployment deployment = createProcessorDeployment(0); + + final List processors = List.of(); + final List secrets = List.of(); + + final ArgumentCaptor sc = ArgumentCaptor.forClass(Secret.class); + final ArgumentCaptor mcc = ArgumentCaptor.forClass(ManagedProcessor.class); + + final ProcessorDeploymentProvisioner provisioner = new ProcessorDeploymentProvisioner(); + provisioner.config = ProcessorTestSupport.config(); + + provisioner.fleetShard = ProcessorTestSupport.fleetShard(CLUSTER_ID, processors, secrets); + provisioner.fleetManager = ProcessorTestSupport.fleetManagerClient(); + provisioner.eventClient = Mockito.mock(EventClient.class); + provisioner.recorder = Mockito.mock(MetricsRecorder.class); + + // + // When deployment is applied + // + provisioner.provision(deployment); + + verify(provisioner.fleetShard).createSecret(sc.capture()); + verify(provisioner.fleetShard).createProcessor(mcc.capture()); + + // + // Then resources must be created according to the deployment + // + assertThat(sc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(deployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, deployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, deployment.getId()) + .containsEntry(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + deployment.getMetadata().getResourceVersion()) + .containsKey(LABEL_UOW); + + }); + + assertThat(mcc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Processors.generateProcessorId(deployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, deployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, deployment.getId()) + .containsKey(LABEL_UOW); + + assertThat(val.getSpec()).satisfies(d -> { + assertThat(d.getSecret()).isEqualTo(sc.getValue().getMetadata().getName()); + assertThat(d.getKafka().getUrl()) + .isNotEmpty() + .isEqualTo(deployment.getSpec().getKafka().getUrl()); + }); + }); + } + + @Test + void updateResources() { + // + // Given that the resources associated to the provided deployment exist + // + final ProcessorDeployment oldDeployment = ProcessorTestSupport.createProcessorDeployment(0); + + final List processors = List.of( + new ManagedProcessorBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Processors.generateProcessorId(oldDeployment.getId())) + .addToLabels(LABEL_CLUSTER_ID, CLUSTER_ID) + .addToLabels(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .addToLabels(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .build()) + .withSpec(new ManagedProcessorSpec()) + .build()); + final List secrets = List.of( + new SecretBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Secrets.generateProcessorSecretId(oldDeployment.getId())) + .addToLabels(LABEL_CLUSTER_ID, CLUSTER_ID) + .addToLabels(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .addToLabels(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .addToLabels(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + oldDeployment.getMetadata().getResourceVersion()) + .build()) + .build()); + + final ProcessorDeploymentProvisioner provisioner = new ProcessorDeploymentProvisioner(); + provisioner.config = ProcessorTestSupport.config(); + provisioner.fleetShard = ProcessorTestSupport.fleetShard(CLUSTER_ID, processors, secrets); + provisioner.fleetManager = ProcessorTestSupport.fleetManagerClient(); + provisioner.eventClient = Mockito.mock(EventClient.class); + provisioner.recorder = Mockito.mock(MetricsRecorder.class); + + final ArgumentCaptor sc = ArgumentCaptor.forClass(Secret.class); + final ArgumentCaptor mcc = ArgumentCaptor.forClass(ManagedProcessor.class); + + // + // When deployment is updated + // + final ProcessorDeployment newDeployment = ProcessorTestSupport.createProcessorDeployment(0, d -> { + d.getSpec().getKafka().setUrl("my-kafka.acme.com:218"); + // TODO what is this in processsors? + // ((ObjectNode) d.getSpec()).withObject("/connector").put("foo", "connector-baz"); + }); + + provisioner.provision(newDeployment); + + verify(provisioner.fleetShard).createSecret(sc.capture()); + verify(provisioner.fleetShard).createProcessor(mcc.capture()); + + // + // Then the existing resources must be updated to reflect the changes made to the + // deployment. This scenario could happen when a resource on the connector cluster + // is amended outside the control of fleet manager (i.e. with kubectl) and in such + // case, the expected behavior is that the resource is re-set to the configuration + // from the fleet manager. + // + assertThat(sc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(oldDeployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, newDeployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, newDeployment.getId()) + .containsEntry(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + newDeployment.getMetadata().getResourceVersion()) + .containsKey(LABEL_UOW); + + var serviceAccountNode = Secrets.extract(val, Secrets.SECRET_ENTRY_SERVICE_ACCOUNT, ServiceAccount.class); + assertThat(serviceAccountNode.getClientSecret()) + .isEqualTo(newDeployment.getSpec().getServiceAccount().getClientSecret()); + assertThat(serviceAccountNode.getClientId()) + .isEqualTo(newDeployment.getSpec().getServiceAccount().getClientId()); + }); + + assertThat(mcc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Processors.generateProcessorId(oldDeployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .containsKey(LABEL_UOW); + + assertThat(val.getSpec()).satisfies(d -> { + assertThat(d.getDeploymentResourceVersion()).isEqualTo(oldDeployment.getMetadata().getResourceVersion()); + assertThat(d.getDeploymentResourceVersion()).isEqualTo(newDeployment.getMetadata().getResourceVersion()); + assertThat(d.getSecret()).isEqualTo(sc.getValue().getMetadata().getName()); + assertThat(d.getKafka().getUrl()) + .isNotEmpty() + .isEqualTo(newDeployment.getSpec().getKafka().getUrl()); + }); + }); + } + + @Test + void updateAndCreateResources() { + // + // Given that the resources associated to the provided deployment exist + // + final ProcessorDeployment oldDeployment = ProcessorTestSupport.createProcessorDeployment(0); + + final List processors = List.of( + new ManagedProcessorBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Processors.generateProcessorId(oldDeployment.getId())) + .addToLabels(LABEL_CLUSTER_ID, CLUSTER_ID) + .addToLabels(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .addToLabels(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .build()) + .withSpec(new ManagedProcessorSpec()) + .build()); + final List secrets = List.of( + new SecretBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(Secrets.generateProcessorSecretId(oldDeployment.getId())) + .addToLabels(LABEL_CLUSTER_ID, CLUSTER_ID) + .addToLabels(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .addToLabels(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .addToLabels(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + oldDeployment.getMetadata().getResourceVersion()) + .build()) + .build()); + + final ProcessorDeploymentProvisioner provisioner = new ProcessorDeploymentProvisioner(); + provisioner.config = ProcessorTestSupport.config(); + provisioner.fleetShard = ProcessorTestSupport.fleetShard(CLUSTER_ID, processors, secrets); + provisioner.fleetManager = ProcessorTestSupport.fleetManagerClient(); + provisioner.eventClient = Mockito.mock(EventClient.class); + provisioner.recorder = Mockito.mock(MetricsRecorder.class); + + final ArgumentCaptor sc = ArgumentCaptor.forClass(Secret.class); + final ArgumentCaptor processorArgumentCaptor = ArgumentCaptor.forClass(ManagedProcessor.class); + + // + // When a change to the deployment happen that ends up with a new resource version + // + final ProcessorDeployment newDeployment = ProcessorTestSupport.createProcessorDeployment(0, d -> { + d.getMetadata().setResourceVersion(1L); + d.getSpec().getKafka().setUrl("my-kafka.acme.com:218"); + // TODO what is this in processors? + // ((ObjectNode) d.getSpec()).withObject("/connector").put("foo", "connector-baz"); + }); + + provisioner.provision(newDeployment); + + verify(provisioner.fleetShard).createSecret(sc.capture()); + verify(provisioner.fleetShard).createProcessor(processorArgumentCaptor.capture()); + + // + // Then the managed connector resource is expected to be updated to reflect the + // changes made to the deployment + // + assertThat(sc.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Secrets.generateProcessorSecretId(oldDeployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, newDeployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, newDeployment.getId()) + .containsEntry(LABEL_DEPLOYMENT_RESOURCE_VERSION, "" + newDeployment.getMetadata().getResourceVersion()) + .containsKey(LABEL_UOW); + + var serviceAccountNode = Secrets.extract(val, Secrets.SECRET_ENTRY_SERVICE_ACCOUNT, ServiceAccount.class); + assertThat(serviceAccountNode.getClientSecret()) + .isEqualTo(newDeployment.getSpec().getServiceAccount().getClientSecret()); + assertThat(serviceAccountNode.getClientId()) + .isEqualTo(newDeployment.getSpec().getServiceAccount().getClientId()); + }); + + assertThat(processorArgumentCaptor.getValue()).satisfies(val -> { + assertThat(val.getMetadata().getName()) + .isEqualTo(Processors.generateProcessorId(oldDeployment.getId())); + + assertThat(val.getMetadata().getLabels()) + .containsEntry(LABEL_CLUSTER_ID, CLUSTER_ID) + .containsEntry(LABEL_PROCESSOR_ID, oldDeployment.getSpec().getProcessorId()) + .containsEntry(LABEL_DEPLOYMENT_ID, oldDeployment.getId()) + .containsKey(LABEL_UOW); + + assertThat(val.getSpec()).satisfies(d -> { + assertThat(d.getDeploymentResourceVersion()).isEqualTo(newDeployment.getMetadata().getResourceVersion()); + assertThat(d.getDeploymentResourceVersion()).isNotEqualTo(oldDeployment.getMetadata().getResourceVersion()); + assertThat(d.getSecret()).isEqualTo(sc.getValue().getMetadata().getName()); + assertThat(d.getKafka().getUrl()) + .isNotEmpty() + .isEqualTo(newDeployment.getSpec().getKafka().getUrl()); + }); + }); + } +} diff --git a/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml b/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml index 7807ef32..2f86155e 100644 --- a/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml +++ b/etc/kubernetes/manifests/base/apps/cos-fleetshard-sync/crds/kustomization.yaml @@ -3,3 +3,4 @@ resources: - ./managedconnectorclusters.cos.bf2.org-v1.yml - ./managedconnectoroperators.cos.bf2.org-v1.yml - ./managedconnectors.cos.bf2.org-v1.yml + - ./managedprocessors.cos.bf2.org-v1.yml