From b4e4338cbb340369c9716aab27d8bdd9a4001e99 Mon Sep 17 00:00:00 2001 From: Luca Molteni Date: Tue, 11 Apr 2023 13:53:09 +0200 Subject: [PATCH] Processor updater --- .../sync/it/ProcessorProvisionerTest.java | 14 +- .../sync/client/FleetShardClient.java | 8 + .../resources/ProcessorStatusUpdater.java | 145 ++++-------------- 3 files changed, 41 insertions(+), 126 deletions(-) diff --git a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java index 73b51388..abde1fa8 100644 --- a/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java +++ b/cos-fleetshard-sync-it/src/test/java/org/bf2/cos/fleetshard/sync/it/ProcessorProvisionerTest.java @@ -86,7 +86,7 @@ void processorIsProvisioned() { "1", item.getMetadata().getLabels().get(Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION))); - ManagedProcessor mc = until( + ManagedProcessor mp = until( () -> fleetShardClient.getProcessor(clusterId, DEPLOYMENT_ID), item -> { return item.getSpec().getDeploymentResourceVersion() == 1L @@ -95,7 +95,7 @@ void processorIsProvisioned() { assertThat(s1).satisfies(item -> { assertThat(item.getMetadata().getName()) - .isEqualTo(Secrets.generateProcessorSecretId(mc.getSpec().getDeploymentId())); + .isEqualTo(Secrets.generateProcessorSecretId(mp.getSpec().getDeploymentId())); assertThatJson(Secrets.extract(item, SECRET_ENTRY_SERVICE_ACCOUNT)) .isObject() @@ -125,14 +125,14 @@ void processorIsProvisioned() { .containsEntry("my.cos.bf2.org/processor-group", "baz"); }); - assertThat(mc.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX); - assertThat(mc.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL); - assertThat(mc.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName()); + assertThat(mp.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX); + assertThat(mp.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL); + assertThat(mp.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName()); - assertThat(mc.getMetadata().getLabels()) + assertThat(mp.getMetadata().getLabels()) .containsEntry("cos.bf2.org/organization-id", "20000000") .containsEntry("cos.bf2.org/pricing-tier", "essential"); - assertThat(mc.getMetadata().getAnnotations()) + assertThat(mp.getMetadata().getAnnotations()) .containsEntry("my.cos.bf2.org/processor-group", "baz"); } diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java index 883beece..2205267a 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/client/FleetShardClient.java @@ -260,6 +260,14 @@ public Boolean deleteConnector(ManagedConnector managedConnector) { .delete().isEmpty(); } + public Boolean deleteProcessor(ManagedProcessor managedProcessor) { + return !kubernetesClient.resources(ManagedProcessor.class) + .inNamespace(managedProcessor.getMetadata().getNamespace()) + .withName(managedProcessor.getMetadata().getName()) + .withPropagationPolicy(DeletionPropagation.FOREGROUND) + .delete().isEmpty(); + } + public Optional getProcessor(NamespacedName id) { if (processorsInformer == null) { throw new IllegalStateException("Informer must be started before adding handlers"); diff --git a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java index bd843d82..0cba6051 100644 --- a/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java +++ b/cos-fleetshard-sync/src/main/java/org/bf2/cos/fleetshard/sync/resources/ProcessorStatusUpdater.java @@ -1,19 +1,10 @@ package org.bf2.cos.fleetshard.sync.resources; -import java.time.Instant; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus; import org.bf2.cos.fleetshard.api.ManagedProcessor; -import org.bf2.cos.fleetshard.support.metrics.MetricsSupport; import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig; import org.bf2.cos.fleetshard.sync.client.FleetManagerClient; import org.bf2.cos.fleetshard.sync.client.FleetManagerClientException; @@ -23,126 +14,42 @@ @ApplicationScoped public class ProcessorStatusUpdater { - private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusUpdater.class); - - public static final String CONNECTOR_STATE = "processor.state"; - public static final String CONNECTOR_STATE_COUNT = "processor.state.count"; - - static final int CONNECTOR_STATE_READY = 1; - static final int CONNECTOR_STATE_FAILED = 2; - static final int CONNECTOR_STATE_DELETED = 3; - static final int CONNECTOR_STATE_STOPPED = 4; - static final int CONNECTOR_STATE_IN_PROCESS = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusUpdater.class); @Inject FleetManagerClient fleetManagerClient; @Inject FleetShardClient connectorClient; @Inject - MeterRegistry registry; - @Inject FleetShardSyncConfig config; public void update(ManagedProcessor processor) { LOGGER.debug("Update processor status (name: {}, phase: {})", - processor.getMetadata().getName(), - processor.getStatus().getPhase()); - - // try { - // ProcessorDeploymentStatus ProcessorDeploymentStatus = Process.extract(processor); - - // fleetManagerClient.updateConnectorStatus(processor, ProcessorDeploymentStatus); - - // LOGGER.debug("Updating Connector status metrics (Connector_id: {}, state: {})", - // processor.get(), ProcessorDeploymentStatus.getPhase()); - // - // switch (ProcessorDeploymentStatus.getPhase()) { - // case READY: - // measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_READY); - // break; - // case FAILED: - // measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_FAILED); - // break; - // case DELETED: - // measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_DELETED); - // break; - // case STOPPED: - // measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_STOPPED); - // break; - // default: - // measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_IN_PROCESS); - // break; - // } - - // } catch (FleetManagerClientException e) { - // if (e.getStatusCode() == 410) { - // LOGGER.info("Connector " + processor.getMetadata().getName() + " does not exists anymore, deleting it"); - // if (connectorClient.deleteConnector(processor)) { - // LOGGER.info("Connector " + processor.getMetadata().getName() + " deleted"); - // } - // } else { - // LOGGER.warn("Error updating status of connector " + processor.getMetadata().getName(), e); - // } - // } catch (Exception e) { - // LOGGER.warn("Error updating status of connector " + processor.getMetadata().getName(), e); - // } - } - - /* - * Expose a Gauge metric "cos_fleetshard_sync_connector_state" which reveals the current connector state. - * Metric value of 1 implies that the connector is in Ready state. Similarly, 2 -> Failed, 3 -> Deleted, - * 4 -> Stopped, 5 -> In Process - * Also exposing a Counter metrics "cos_fleetshard_sync_connector_state_count_total" which reveals each - * state count for the connector - */ - private void measure(ManagedProcessor processor, ProcessorDeploymentStatus ProcessorDeploymentStatus, int connectorState) { - - List tags = MetricsSupport.tags(config.metrics().recorder(), processor); - tags.add(Tag.of("cos.processor.id", processor.getSpec().getProcessorId())); - tags.add(Tag.of("cos.deployment.id", processor.getSpec().getDeploymentId())); - tags.add(Tag.of("cos.namespace", processor.getMetadata().getNamespace())); - - // String connectorResourceVersion = String.valueOf(processor.getSpec().getDeployment().getConnectorResourceVersion()); - - Gauge gauge = registry.find(config.metrics().baseName() + "." + CONNECTOR_STATE).tags(tags).gauge(); - - if (gauge != null) { - registry.remove(gauge); - } - - Gauge.builder(config.metrics().baseName() + "." + CONNECTOR_STATE, () -> new AtomicInteger(connectorState)) - .tags(tags) - .register(registry); - - // Adding deletion timestamp for metric housekeeping - if (CONNECTOR_STATE_DELETED == connectorState) { - LOGGER.info("Adding current timestamp to the deleted connector"); - tags.add(Tag.of("deletion_timestamp", Instant.now().toString())); - } - - Counter.builder(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT) - .tags(tags) - .tag("cos.connector.state", ProcessorDeploymentStatus.getPhase().getValue()) - // .tag("cos.connector.resourceversion", connectorResourceVersion) - .register(registry) - .increment(); - - if (CONNECTOR_STATE_FAILED == connectorState) { - Counter counter = registry.find(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT) - .tags(tags).tag("cos.connector.state", "ready") - .tagKeys("cos.connector.resourceversion").counter(); - - if (counter != null && counter.count() != 0) { - - // Exposing a new state "failed_but_ready" when a connector has already started but now failing - Counter.builder(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT) - .tags(tags) - .tag("cos.connector.state", "failed_but_ready") - // .tag("cos.connector.resourceversion", connectorResourceVersion) - .register(registry) - .increment(); + processor.getMetadata().getName(), + processor.getStatus().getPhase()); + + fleetManagerClient.getProcessorDeployments(Long.parseLong(processor.getSpec().getDeploymentId()), processorDeployments -> { + // TODO remove .get() + + try { + ProcessorDeploymentStatus status = processorDeployments.stream().findFirst().get().getStatus(); + + fleetManagerClient.updateProcessorStatus(processor.getSpec().getClusterId(), processor.getSpec().getDeploymentId(), status); + + LOGGER.debug("Updating Processor status metrics (Processor_id: {}, state: {})", + processor.getSpec().getProcessorId(), status.getPhase()); + } catch (FleetManagerClientException e) { + if (e.getStatusCode() == 410) { + LOGGER.info("Processor " + processor.getMetadata().getName() + " does not exists anymore, deleting it"); + if (connectorClient.deleteProcessor(processor)) { + LOGGER.info("Processor " + processor.getMetadata().getName() + " deleted"); + } + } else { + LOGGER.warn("Error updating status of processor " + processor.getMetadata().getName(), e); + } + } catch (Exception e) { + LOGGER.warn("Error updating status of processor " + processor.getMetadata().getName(), e); } - } - + }); } }