Skip to content

Commit

Permalink
move metrics code ahead of config check
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickmann committed Nov 13, 2024
1 parent db26a23 commit f12ea58
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,23 @@
*/
package org.graylog.failure;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.graylog2.Configuration;
import org.graylog2.plugin.Message;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.codahale.metrics.MetricRegistry.name;

/**
* A service consuming and processing failure batches submitted via {@link FailureSubmissionQueue}.
* The processing is done in a dedicated thread, the lifecycle of this service is managed
Expand All @@ -61,26 +54,19 @@ public class FailureHandlingService extends AbstractExecutionThreadService {
private final Configuration configuration;
private final MessageQueueAcknowledger acknowledger;
private Thread executionThread;
private final MetricRegistry metricRegistry;
private final ObjectMapper objectMapper;
private final Meter dummyMeter = new Meter();

@Inject
public FailureHandlingService(
@Named("fallbackFailureHandler") FailureHandler fallbackFailureHandler,
Set<FailureHandler> failureHandlers,
FailureSubmissionQueue failureSubmissionQueue,
Configuration configuration,
MessageQueueAcknowledger acknowledger,
MetricRegistry metricRegistry,
ObjectMapperProvider objectMapperProvider) {
MessageQueueAcknowledger acknowledger) {
this.fallbackFailureHandlerAsList = Lists.newArrayList(fallbackFailureHandler);
this.failureHandlers = failureHandlers;
this.failureSubmissionQueue = failureSubmissionQueue;
this.configuration = configuration;
this.acknowledger = acknowledger;
this.metricRegistry = metricRegistry;
this.objectMapper = objectMapperProvider.get();
}

@Override
Expand Down Expand Up @@ -142,8 +128,6 @@ protected void run() throws Exception {
}

private void handle(FailureBatch failureBatch) {
failureBatch.getFailures().forEach(this::updateMetrics);

suitableHandlers(failureBatch)
.forEach(handler -> {
try {
Expand All @@ -165,27 +149,6 @@ private void handle(FailureBatch failureBatch) {
}
}

private void updateMetrics(Failure failure) {
if (failure == null || failure.failedMessage() == null) {
return;
}

final Map<String, Object> searchObject = failure.failedMessage().toElasticSearchObject(objectMapper, dummyMeter);
final Object inputId = searchObject.get(Message.FIELD_GL2_SOURCE_INPUT);
if (inputId != null) {
switch (failure.failureType()) {
case INDEXING -> {
final String indexingFailureMetricName = name("org.graylog2", inputId.toString(), "failures.indexing");
metricRegistry.meter(indexingFailureMetricName).mark();
}
case PROCESSING -> {
final String processingFailureMetricName = name("org.graylog2", inputId.toString(), "failures.processing");
metricRegistry.meter(processingFailureMetricName).mark();
}
}
}
}

private List<FailureHandler> suitableHandlers(FailureBatch failureBatch) {
final List<FailureHandler> suitableHandlers = suitableHandlers(failureHandlers, failureBatch)
.filter(FailureHandler::isEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@
*/
package org.graylog.failure;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.graylog2.indexer.messages.Indexable;
import org.graylog2.indexer.messages.IndexingError;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.Map;

import static com.codahale.metrics.MetricRegistry.name;
import static org.graylog2.indexer.messages.IndexingError.Type.MappingError;
import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_INPUT;

/**
* A supplementary service layer, which is aimed to simplify failure
Expand All @@ -47,12 +53,20 @@ public class FailureSubmissionService {
private final FailureSubmissionQueue failureSubmissionQueue;
private final FailureHandlingConfiguration failureHandlingConfiguration;

private final MetricRegistry metricRegistry;
private final ObjectMapper objectMapper;
private final Meter dummyMeter = new Meter();

@Inject
public FailureSubmissionService(
FailureSubmissionQueue failureSubmissionQueue,
FailureHandlingConfiguration failureHandlingConfiguration) {
FailureHandlingConfiguration failureHandlingConfiguration,
MetricRegistry metricRegistry,
ObjectMapperProvider objectMapperProvider) {
this.failureSubmissionQueue = failureSubmissionQueue;
this.failureHandlingConfiguration = failureHandlingConfiguration;
this.metricRegistry = metricRegistry;
this.objectMapper = objectMapperProvider.get();
}

/**
Expand Down Expand Up @@ -91,6 +105,8 @@ private boolean submitProcessingErrorsInternal(Message message, List<Message.Pro
return true;
}

updateProcessingFailureMetric(message);

if (!message.supportsFailureHandling()) {
logger.warn("Submitted a message with processing errors, which doesn't support failure handling!");
return true;
Expand Down Expand Up @@ -147,6 +163,7 @@ public void submitIndexingErrors(Collection<IndexingError> indexingErrors) {
final FailureBatch fb = FailureBatch.indexingFailureBatch(
indexingErrors.stream()
.filter(ie -> {
updateIndexingFailureMetric(ie.message());
if (!ie.message().supportsFailureHandling()) {
logger.warn("Submitted a message with indexing errors, which doesn't support failure handling!");
return false;
Expand All @@ -155,7 +172,7 @@ public void submitIndexingErrors(Collection<IndexingError> indexingErrors) {
}
})
.map(this::fromIndexingError)
.collect(Collectors.toList()));
.toList());

if (fb.size() > 0) {
failureSubmissionQueue.submitBlocking(fb);
Expand All @@ -181,4 +198,21 @@ private IndexingFailure fromIndexingError(IndexingError indexingError) {
indexingError.index()
);
}

private void updateProcessingFailureMetric(Message message) {
Object inputId = message.getField(FIELD_GL2_SOURCE_INPUT);
if (inputId != null) {
final String indexingFailureMetricName = name("org.graylog2", inputId.toString(), "failures.processing");
metricRegistry.meter(indexingFailureMetricName).mark();
}
}

private void updateIndexingFailureMetric(Indexable message) {
final Map<String, Object> searchObject = message.toElasticSearchObject(objectMapper, dummyMeter);
Object inputId = searchObject.get(FIELD_GL2_SOURCE_INPUT);
if (inputId != null) {
final String indexingFailureMetricName = name("org.graylog2", inputId.toString(), "failures.indexing");
metricRegistry.meter(indexingFailureMetricName).mark();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageFactory;
import org.graylog2.plugin.Tools;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -54,7 +53,6 @@ public class FailureHandlingServiceTest {
private final MessageQueueAcknowledger acknowledger = mock(MessageQueueAcknowledger.class);
private final MetricRegistry metricRegistry = new MetricRegistry();
private MessageFactory messageFactory;
private final ObjectMapperProvider objectMapperProvider = mock(ObjectMapperProvider.class);

private FailureSubmissionQueue failureSubmissionQueue;

Expand All @@ -75,7 +73,7 @@ public void run_whenNoSuitableCustomHandlerProvided_thenSuitableFallbackOneIsUse
final FailureHandler fallbackIndexingFailureHandler = enabledFailureHandler(indexingFailureBatch);

final FailureHandlingService underTest = new FailureHandlingService(fallbackIndexingFailureHandler,
Set.of(customFailureHandler), failureSubmissionQueue, configuration, acknowledger, metricRegistry, objectMapperProvider);
Set.of(customFailureHandler), failureSubmissionQueue, configuration, acknowledger);

underTest.startAsync();
underTest.awaitRunning();
Expand Down Expand Up @@ -107,7 +105,7 @@ public void run_whenNoSuitableCustomHandlerAndNoSuitableFallbackHandlerFound_the
final FailureHandler fallbackFailureHandler = enabledFailureHandler();

final FailureHandlingService underTest = new FailureHandlingService(fallbackFailureHandler,
ImmutableSet.of(customFailureHandler), failureSubmissionQueue, configuration, acknowledger, metricRegistry, objectMapperProvider);
ImmutableSet.of(customFailureHandler), failureSubmissionQueue, configuration, acknowledger);

underTest.startAsync();
underTest.awaitRunning();
Expand Down Expand Up @@ -140,7 +138,7 @@ public void run_whenCustomHandlersProvided_thenFallbackHandlerIgnored() throws E
final FailureHandler fallbackIndexingFailureHandler = enabledFailureHandler(indexingFailureBatch);

final FailureHandlingService underTest = new FailureHandlingService(fallbackIndexingFailureHandler,
ImmutableSet.of(customIndexingFailureHandler1, customIndexingFailureHandler2), failureSubmissionQueue, configuration, acknowledger, metricRegistry, objectMapperProvider);
ImmutableSet.of(customIndexingFailureHandler1, customIndexingFailureHandler2), failureSubmissionQueue, configuration, acknowledger);

underTest.startAsync();
underTest.awaitRunning();
Expand Down Expand Up @@ -169,7 +167,7 @@ public void run_serviceNotInterruptedUponHandlerException() throws Exception {
doThrow(new RuntimeException()).when(fallbackIndexingFailureHandler).handle(indexingFailureBatch2);

final FailureHandlingService underTest = new FailureHandlingService(fallbackIndexingFailureHandler,
ImmutableSet.of(), failureSubmissionQueue, configuration, acknowledger, metricRegistry, objectMapperProvider);
ImmutableSet.of(), failureSubmissionQueue, configuration, acknowledger);

underTest.startAsync();
underTest.awaitRunning();
Expand Down Expand Up @@ -199,7 +197,7 @@ public void run_acknowledgesFlaggedProcessingErrorsOnlyOnce() throws Interrupted

final FailureHandlingService underTest = new FailureHandlingService(fallbackFailureHandler,
ImmutableSet.of(customFailureHandler1, customFailureHandler2),
failureSubmissionQueue, configuration, acknowledger, metricRegistry, objectMapperProvider);
failureSubmissionQueue, configuration, acknowledger);

// when
underTest.startAsync();
Expand All @@ -224,7 +222,7 @@ public void run_doesNotAcknowledgeIndexingErrors() throws InterruptedException {
final FailureHandler customFailureHandler = enabledFailureHandler(indexingFailureBatch);

final FailureHandlingService underTest = new FailureHandlingService(fallbackFailureHandler,
ImmutableSet.of(customFailureHandler), failureSubmissionQueue, configuration, acknowledger, metricRegistry, objectMapperProvider);
ImmutableSet.of(customFailureHandler), failureSubmissionQueue, configuration, acknowledger);

// when
underTest.startAsync();
Expand All @@ -248,7 +246,7 @@ public void shutDown_uponShutdownAllRemainingFailuresAreHandled() throws Excepti

final FailureSubmissionQueue failureSubmissionQueue = mock(FailureSubmissionQueue.class);
final FailureHandlingService underTest = new FailureHandlingService(fallbackFailureHandler,
ImmutableSet.of(), failureSubmissionQueue, configuration, acknowledger, metricRegistry, objectMapperProvider);
ImmutableSet.of(), failureSubmissionQueue, configuration, acknowledger);

when(configuration.getFailureHandlingShutdownAwait()).thenReturn(com.github.joschi.jadconfig.util.Duration.milliseconds(300));
when(failureSubmissionQueue.consumeBlockingWithTimeout(300L))
Expand Down
Loading

0 comments on commit f12ea58

Please sign in to comment.