Skip to content

Commit

Permalink
Merge branch 'master' into input-diagnostics
Browse files Browse the repository at this point in the history
  • Loading branch information
gally47 authored Dec 13, 2024
2 parents 3b71be6 + e38c44d commit 4f9be57
Show file tree
Hide file tree
Showing 46 changed files with 630 additions and 538 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.joda.time.DateTime;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Optional;

public class CloudTrailCodec extends AbstractCodec {
public static final String NAME = "AWSCloudTrail";
Expand All @@ -49,9 +50,8 @@ public CloudTrailCodec(@Assisted Configuration configuration, @AWSObjectMapper O
this.messageFactory = messageFactory;
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
public Optional<Message> decodeSafe(@Nonnull RawMessage rawMessage) {
try {
final CloudTrailRecord record = objectMapper.readValue(rawMessage.getPayload(), CloudTrailRecord.class);
final String source = configuration.getString(Config.CK_OVERRIDE_SOURCE, "aws-cloudtrail");
Expand All @@ -61,9 +61,10 @@ public Message decode(@Nonnull RawMessage rawMessage) {
message.addField("full_message", record.getFullMessage());
message.addField(AWS.SOURCE_GROUP_IDENTIFIER, true);

return message;
return Optional.of(message);
} catch (Exception e) {
throw new RuntimeException("Could not deserialize CloudTrail record.", e);
throw InputProcessingException.create("Could not deserialize CloudTrail record.",
e, rawMessage, new String(rawMessage.getPayload(), charset));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

public class FlowLogMessage {
private static final Logger LOG = LoggerFactory.getLogger(FlowLogMessage.class);

Expand Down Expand Up @@ -74,13 +72,11 @@ private FlowLogMessage(DateTime timestamp,
this.logStatus = logStatus;
}

@Nullable
public static FlowLogMessage fromLogEvent(final KinesisLogEntry logEvent) {
final String[] parts = logEvent.message().split(" ");

if (parts.length != 14) {
LOG.warn("Received FlowLog message with not exactly 14 fields. Skipping. Message was: [{}]", logEvent.message());
return null;
throw new RuntimeException("Received FlowLog message with not exactly 14 fields. Skipping. Message was: [%s]".formatted(logEvent.message()));
}

return new FlowLogMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.graylog.integrations.aws.codecs;

import com.google.inject.assistedinject.Assisted;
import jakarta.inject.Inject;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
Expand All @@ -28,17 +29,15 @@
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import jakarta.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class AWSCodec extends AbstractCodec {
Expand All @@ -64,28 +63,21 @@ public AWSCodec(@Assisted Configuration configuration,
this.availableCodecs = availableCodecs;
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
public Optional<Message> decodeSafe(@Nonnull RawMessage rawMessage) {

// Load the codec by message type.
final AWSMessageType awsMessageType = AWSMessageType.valueOf(configuration.getString(CK_AWS_MESSAGE_TYPE));
final Codec.Factory<? extends Codec> codecFactory = this.availableCodecs.get(awsMessageType.getCodecName());
if (codecFactory == null) {
LOG.error("A codec with name [{}] could not be found.", awsMessageType.getCodecName());
return null;
throw InputProcessingException.create("A codec with name [%s] could not be found.".formatted(awsMessageType.getCodecName()),
rawMessage, new String(rawMessage.getPayload(), charset));
}

final Codec codec = codecFactory.create(configuration);

// Parse the message with the specified codec.
final Message message = codec.decode(new RawMessage(rawMessage.getPayload()));
if (message == null) {
LOG.error("Failed to decode message for codec [{}].", codec.getName());
return null;
}

return message;
return codec.decodeSafe(new RawMessage(rawMessage.getPayload()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Optional;

public abstract class AbstractKinesisCodec extends AbstractCodec {

Expand All @@ -46,27 +47,24 @@ public abstract class AbstractKinesisCodec extends AbstractCodec {
this.objectMapper = objectMapper;
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
public Optional<Message> decodeSafe(@Nonnull RawMessage rawMessage) {
try {
final KinesisLogEntry entry = objectMapper.readValue(rawMessage.getPayload(), KinesisLogEntry.class);

try {
return decodeLogData(entry);
} catch (Exception e) {
LOG.error("Couldn't decode log event <{}>", entry);

// Message will be dropped when returning null
return null;
throw InputProcessingException.create("Couldn't decode log event <%s>".formatted(entry),
e, rawMessage, new String(rawMessage.getPayload(), charset));
}
} catch (IOException e) {
throw new RuntimeException("Couldn't deserialize log data", e);
} catch (Exception e) {
throw InputProcessingException.create("Couldn't deserialize log data",
e, rawMessage, new String(rawMessage.getPayload(), charset));
}
}

@Nullable
protected abstract Message decodeLogData(@Nonnull final KinesisLogEntry event);
protected abstract Optional<Message> decodeLogData(@Nonnull final KinesisLogEntry event);

@Nonnull
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.joda.time.Seconds;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class KinesisCloudWatchFlowLogCodec extends AbstractKinesisCodec {
public static final String NAME = "FlowLog";
Expand Down Expand Up @@ -67,16 +67,11 @@ public KinesisCloudWatchFlowLogCodec(@Assisted Configuration configuration, Obje
this.noFlowLogPrefix = configuration.getBoolean(AWSCodec.CK_FLOW_LOG_PREFIX, AWSCodec.FLOW_LOG_PREFIX_DEFAULT);
}

@Nullable
@Override
public Message decodeLogData(@Nonnull final KinesisLogEntry logEvent) {
public Optional<Message> decodeLogData(@Nonnull final KinesisLogEntry logEvent) {
try {
final FlowLogMessage flowLogMessage = FlowLogMessage.fromLogEvent(logEvent);

if (flowLogMessage == null) {
return null;
}

final String source = configuration.getString(KinesisCloudWatchFlowLogCodec.Config.CK_OVERRIDE_SOURCE, SOURCE);
final Message result = messageFactory.createMessage(
buildSummary(flowLogMessage),
Expand All @@ -88,7 +83,7 @@ public Message decodeLogData(@Nonnull final KinesisLogEntry logEvent) {
result.addField(FIELD_LOG_STREAM, logEvent.logStream());
result.addField(SOURCE_GROUP_IDENTIFIER, true);

return result;
return Optional.of(result);
} catch (Exception e) {
throw new RuntimeException("Could not deserialize AWS FlowLog record.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.graylog2.plugin.inputs.codecs.Codec;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Optional;

public class KinesisRawLogCodec extends AbstractKinesisCodec {
public static final String NAME = "CloudWatchRawLog";
Expand All @@ -42,9 +42,8 @@ public KinesisRawLogCodec(@Assisted Configuration configuration, ObjectMapper ob
this.messageFactory = messageFactory;
}

@Nullable
@Override
public Message decodeLogData(@Nonnull final KinesisLogEntry logEvent) {
public Optional<Message> decodeLogData(@Nonnull final KinesisLogEntry logEvent) {
try {
final String source = configuration.getString(KinesisCloudWatchFlowLogCodec.Config.CK_OVERRIDE_SOURCE, SOURCE);
Message result = messageFactory.createMessage(
Expand All @@ -56,7 +55,7 @@ public Message decodeLogData(@Nonnull final KinesisLogEntry logEvent) {
result.addField(FIELD_LOG_GROUP, logEvent.logGroup());
result.addField(FIELD_LOG_STREAM, logEvent.logStream());

return result;
return Optional.of(result);
} catch (Exception e) {
throw new RuntimeException("Could not deserialize AWS FlowLog record.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.base.Preconditions;
import jakarta.inject.Inject;
import jakarta.ws.rs.BadRequestException;
import org.apache.commons.collections.CollectionUtils;
import org.graylog.integrations.aws.AWSClientBuilderUtil;
import org.graylog.integrations.aws.AWSLogMessage;
Expand Down Expand Up @@ -69,10 +71,6 @@
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

import jakarta.inject.Inject;

import jakarta.ws.rs.BadRequestException;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -359,11 +357,9 @@ private KinesisHealthCheckResponse detectAndParseMessage(String logMessage, Date
throw new BadRequestException("Encoding the message to bytes failed.", e);
}

final Message fullyParsedMessage = codec.decode(new RawMessage(payload));
if (fullyParsedMessage == null) {
throw new BadRequestException(String.format(Locale.ROOT, "Message decoding failed. More information might be " +
"available by enabling Debug logging. message [%s]", logMessage));
}
final Message fullyParsedMessage = codec.decodeSafe(new RawMessage(payload)).orElseThrow(() ->
new BadRequestException(String.format(Locale.ROOT, "Message decoding failed. More information might be " +
"available by enabling Debug logging. message [%s]", logMessage)));

LOG.debug("Successfully parsed message type [{}] with codec [{}].", awsMessageType, awsMessageType.getCodecName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.failure.InputProcessingException;
import org.graylog2.plugin.journal.RawMessage;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
Expand All @@ -37,6 +38,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

public class PaloAltoCodec implements Codec {

Expand Down Expand Up @@ -64,45 +66,42 @@ public PaloAltoCodec(@Assisted Configuration configuration, MessageFactory messa
configuration.getString(CK_TRAFFIC_TEMPLATE, PaloAltoTemplateDefaults.TRAFFIC_TEMPLATE));
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
public Optional<Message> decodeSafe(@Nonnull RawMessage rawMessage) {
String s = new String(rawMessage.getPayload(), StandardCharsets.UTF_8);
LOG.trace("Received raw message: {}", s);

String timezoneID = configuration.getString(CK_TIMEZONE);
// previously existing PA inputs after updating will not have a Time Zone configured, default to UTC
DateTimeZone timezone = timezoneID != null ? DateTimeZone.forID(timezoneID) : DateTimeZone.UTC;
LOG.trace("Configured time zone: {}", timezone);
PaloAltoMessageBase p = parser.parse(s, timezone);

// Return when error occurs parsing syslog header.
if (p == null) {
return null;
}

Message message = messageFactory.createMessage(p.payload(), p.source(), p.timestamp());

switch (p.panType()) {
case "THREAT":
final PaloAltoTypeParser parserThreat = new PaloAltoTypeParser(templates.getThreatMessageTemplate());
message.addFields(parserThreat.parseFields(p.fields(), timezone));
break;
case "SYSTEM":
final PaloAltoTypeParser parserSystem = new PaloAltoTypeParser(templates.getSystemMessageTemplate());
message.addFields(parserSystem.parseFields(p.fields(), timezone));
break;
case "TRAFFIC":
final PaloAltoTypeParser parserTraffic = new PaloAltoTypeParser(templates.getTrafficMessageTemplate());
message.addFields(parserTraffic.parseFields(p.fields(), timezone));
break;
default:
LOG.error("Unsupported PAN type [{}]. Not adding any parsed fields.", p.panType());
try {
PaloAltoMessageBase p = parser.parse(s, timezone);
Message message = messageFactory.createMessage(p.payload(), p.source(), p.timestamp());

switch (p.panType()) {
case "THREAT":
final PaloAltoTypeParser parserThreat = new PaloAltoTypeParser(templates.getThreatMessageTemplate());
message.addFields(parserThreat.parseFields(p.fields(), timezone));
break;
case "SYSTEM":
final PaloAltoTypeParser parserSystem = new PaloAltoTypeParser(templates.getSystemMessageTemplate());
message.addFields(parserSystem.parseFields(p.fields(), timezone));
break;
case "TRAFFIC":
final PaloAltoTypeParser parserTraffic = new PaloAltoTypeParser(templates.getTrafficMessageTemplate());
message.addFields(parserTraffic.parseFields(p.fields(), timezone));
break;
default:
LOG.error("Unsupported PAN type [{}]. Not adding any parsed fields.", p.panType());
}

LOG.trace("Successfully processed [{}] message with [{}] fields.", p.panType(), message.getFieldCount());

return Optional.of(message);
} catch (Exception e) {
throw InputProcessingException.create("Could not decode PaloAlto9x message.", e, rawMessage, s);
}

LOG.trace("Successfully processed [{}] message with [{}] fields.", p.panType(), message.getFieldCount());

return message;
}

@Override
Expand Down
Loading

0 comments on commit 4f9be57

Please sign in to comment.