Skip to content

Commit

Permalink
Processor updater
Browse files Browse the repository at this point in the history
  • Loading branch information
lucamolteni committed Apr 11, 2023
1 parent 138d5f7 commit b4e4338
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void processorIsProvisioned() {
"1",
item.getMetadata().getLabels().get(Resources.LABEL_DEPLOYMENT_RESOURCE_VERSION)));

ManagedProcessor mc = until(
ManagedProcessor mp = until(
() -> fleetShardClient.getProcessor(clusterId, DEPLOYMENT_ID),
item -> {
return item.getSpec().getDeploymentResourceVersion() == 1L
Expand All @@ -95,7 +95,7 @@ void processorIsProvisioned() {

assertThat(s1).satisfies(item -> {
assertThat(item.getMetadata().getName())
.isEqualTo(Secrets.generateProcessorSecretId(mc.getSpec().getDeploymentId()));
.isEqualTo(Secrets.generateProcessorSecretId(mp.getSpec().getDeploymentId()));

assertThatJson(Secrets.extract(item, SECRET_ENTRY_SERVICE_ACCOUNT))
.isObject()
Expand Down Expand Up @@ -125,14 +125,14 @@ void processorIsProvisioned() {
.containsEntry("my.cos.bf2.org/processor-group", "baz");
});

assertThat(mc.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX);
assertThat(mc.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL);
assertThat(mc.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName());
assertThat(mp.getMetadata().getName()).startsWith(Resources.PROCESSOR_PREFIX);
assertThat(mp.getSpec().getKafka().getUrl()).isEqualTo(KAFKA_URL);
assertThat(mp.getSpec().getSecret()).isEqualTo(s1.getMetadata().getName());

assertThat(mc.getMetadata().getLabels())
assertThat(mp.getMetadata().getLabels())
.containsEntry("cos.bf2.org/organization-id", "20000000")
.containsEntry("cos.bf2.org/pricing-tier", "essential");
assertThat(mc.getMetadata().getAnnotations())
assertThat(mp.getMetadata().getAnnotations())
.containsEntry("my.cos.bf2.org/processor-group", "baz");

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ public Boolean deleteConnector(ManagedConnector managedConnector) {
.delete().isEmpty();
}

public Boolean deleteProcessor(ManagedProcessor managedProcessor) {
return !kubernetesClient.resources(ManagedProcessor.class)
.inNamespace(managedProcessor.getMetadata().getNamespace())
.withName(managedProcessor.getMetadata().getName())
.withPropagationPolicy(DeletionPropagation.FOREGROUND)
.delete().isEmpty();
}

public Optional<ManagedProcessor> getProcessor(NamespacedName id) {
if (processorsInformer == null) {
throw new IllegalStateException("Informer must be started before adding handlers");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
package org.bf2.cos.fleetshard.sync.resources;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import org.bf2.cos.fleet.manager.model.ProcessorDeploymentStatus;
import org.bf2.cos.fleetshard.api.ManagedProcessor;
import org.bf2.cos.fleetshard.support.metrics.MetricsSupport;
import org.bf2.cos.fleetshard.sync.FleetShardSyncConfig;
import org.bf2.cos.fleetshard.sync.client.FleetManagerClient;
import org.bf2.cos.fleetshard.sync.client.FleetManagerClientException;
Expand All @@ -23,126 +14,42 @@

@ApplicationScoped
public class ProcessorStatusUpdater {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusUpdater.class);

public static final String CONNECTOR_STATE = "processor.state";
public static final String CONNECTOR_STATE_COUNT = "processor.state.count";

static final int CONNECTOR_STATE_READY = 1;
static final int CONNECTOR_STATE_FAILED = 2;
static final int CONNECTOR_STATE_DELETED = 3;
static final int CONNECTOR_STATE_STOPPED = 4;
static final int CONNECTOR_STATE_IN_PROCESS = 5;

private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorStatusUpdater.class);
@Inject
FleetManagerClient fleetManagerClient;
@Inject
FleetShardClient connectorClient;
@Inject
MeterRegistry registry;
@Inject
FleetShardSyncConfig config;

public void update(ManagedProcessor processor) {
LOGGER.debug("Update processor status (name: {}, phase: {})",
processor.getMetadata().getName(),
processor.getStatus().getPhase());

// try {
// ProcessorDeploymentStatus ProcessorDeploymentStatus = Process.extract(processor);

// fleetManagerClient.updateConnectorStatus(processor, ProcessorDeploymentStatus);

// LOGGER.debug("Updating Connector status metrics (Connector_id: {}, state: {})",
// processor.get(), ProcessorDeploymentStatus.getPhase());
//
// switch (ProcessorDeploymentStatus.getPhase()) {
// case READY:
// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_READY);
// break;
// case FAILED:
// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_FAILED);
// break;
// case DELETED:
// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_DELETED);
// break;
// case STOPPED:
// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_STOPPED);
// break;
// default:
// measure(processor, ProcessorDeploymentStatus, CONNECTOR_STATE_IN_PROCESS);
// break;
// }

// } catch (FleetManagerClientException e) {
// if (e.getStatusCode() == 410) {
// LOGGER.info("Connector " + processor.getMetadata().getName() + " does not exists anymore, deleting it");
// if (connectorClient.deleteConnector(processor)) {
// LOGGER.info("Connector " + processor.getMetadata().getName() + " deleted");
// }
// } else {
// LOGGER.warn("Error updating status of connector " + processor.getMetadata().getName(), e);
// }
// } catch (Exception e) {
// LOGGER.warn("Error updating status of connector " + processor.getMetadata().getName(), e);
// }
}

/*
* Expose a Gauge metric "cos_fleetshard_sync_connector_state" which reveals the current connector state.
* Metric value of 1 implies that the connector is in Ready state. Similarly, 2 -> Failed, 3 -> Deleted,
* 4 -> Stopped, 5 -> In Process
* Also exposing a Counter metrics "cos_fleetshard_sync_connector_state_count_total" which reveals each
* state count for the connector
*/
private void measure(ManagedProcessor processor, ProcessorDeploymentStatus ProcessorDeploymentStatus, int connectorState) {

List<Tag> tags = MetricsSupport.tags(config.metrics().recorder(), processor);
tags.add(Tag.of("cos.processor.id", processor.getSpec().getProcessorId()));
tags.add(Tag.of("cos.deployment.id", processor.getSpec().getDeploymentId()));
tags.add(Tag.of("cos.namespace", processor.getMetadata().getNamespace()));

// String connectorResourceVersion = String.valueOf(processor.getSpec().getDeployment().getConnectorResourceVersion());

Gauge gauge = registry.find(config.metrics().baseName() + "." + CONNECTOR_STATE).tags(tags).gauge();

if (gauge != null) {
registry.remove(gauge);
}

Gauge.builder(config.metrics().baseName() + "." + CONNECTOR_STATE, () -> new AtomicInteger(connectorState))
.tags(tags)
.register(registry);

// Adding deletion timestamp for metric housekeeping
if (CONNECTOR_STATE_DELETED == connectorState) {
LOGGER.info("Adding current timestamp to the deleted connector");
tags.add(Tag.of("deletion_timestamp", Instant.now().toString()));
}

Counter.builder(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT)
.tags(tags)
.tag("cos.connector.state", ProcessorDeploymentStatus.getPhase().getValue())
// .tag("cos.connector.resourceversion", connectorResourceVersion)
.register(registry)
.increment();

if (CONNECTOR_STATE_FAILED == connectorState) {
Counter counter = registry.find(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT)
.tags(tags).tag("cos.connector.state", "ready")
.tagKeys("cos.connector.resourceversion").counter();

if (counter != null && counter.count() != 0) {

// Exposing a new state "failed_but_ready" when a connector has already started but now failing
Counter.builder(config.metrics().baseName() + "." + CONNECTOR_STATE_COUNT)
.tags(tags)
.tag("cos.connector.state", "failed_but_ready")
// .tag("cos.connector.resourceversion", connectorResourceVersion)
.register(registry)
.increment();
processor.getMetadata().getName(),
processor.getStatus().getPhase());

fleetManagerClient.getProcessorDeployments(Long.parseLong(processor.getSpec().getDeploymentId()), processorDeployments -> {
// TODO remove .get()

try {
ProcessorDeploymentStatus status = processorDeployments.stream().findFirst().get().getStatus();

fleetManagerClient.updateProcessorStatus(processor.getSpec().getClusterId(), processor.getSpec().getDeploymentId(), status);

LOGGER.debug("Updating Processor status metrics (Processor_id: {}, state: {})",
processor.getSpec().getProcessorId(), status.getPhase());
} catch (FleetManagerClientException e) {
if (e.getStatusCode() == 410) {
LOGGER.info("Processor " + processor.getMetadata().getName() + " does not exists anymore, deleting it");
if (connectorClient.deleteProcessor(processor)) {
LOGGER.info("Processor " + processor.getMetadata().getName() + " deleted");
}
} else {
LOGGER.warn("Error updating status of processor " + processor.getMetadata().getName(), e);
}
} catch (Exception e) {
LOGGER.warn("Error updating status of processor " + processor.getMetadata().getName(), e);
}
}

});
}
}

0 comments on commit b4e4338

Please sign in to comment.