From 9e20cfca98893df93c715ff292b73f336f1f7cf6 Mon Sep 17 00:00:00 2001 From: Italo Garcia Date: Thu, 13 Jun 2024 20:00:39 +0200 Subject: [PATCH] senders: Skips messages larger than 256K GCP quotas page for the logging v2(https://cloud.google.com/logging/quotas) specifies that a LogEntry should have at maximum 256KB in size, that is not a hard limit, but an estimation. In anycase we should check if any of our LogEntry objects are bigger than the quota allows and not send those logs. Instead we try to truncate the message to smaller size and if that isn't enough we send a default log message. --- journalpump/senders/base.py | 2 + journalpump/senders/google_cloud_logging.py | 21 +++++++++ test/test_google_cloud_logging.py | 52 +++++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/journalpump/senders/base.py b/journalpump/senders/base.py index 5a3b00c..5c8a89c 100644 --- a/journalpump/senders/base.py +++ b/journalpump/senders/base.py @@ -14,6 +14,8 @@ import re # type: ignore[no-redef] KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30 + +# GCP logging also relies on this MAX message size MAX_KAFKA_MESSAGE_SIZE = 1024**2 # 1 MiB MAX_ERROR_MESSAGES = 8 diff --git a/journalpump/senders/google_cloud_logging.py b/journalpump/senders/google_cloud_logging.py index 264cabf..e3ee399 100644 --- a/journalpump/senders/google_cloud_logging.py +++ b/journalpump/senders/google_cloud_logging.py @@ -23,6 +23,15 @@ class GoogleCloudLoggingSender(LogSender): 0: "EMERGENCY", } + # A bit on the safe side, not exactly 256KB but this + # is an approximation anyway + # according to https://cloud.google.com/logging/quotas + _LOG_ENTRY_QUOTA = 250 * 1024 + + # Somewhat arbitrary maximum message size choosen, this gives a 56K + # headroom for the other fields in the LogEntry + _MAX_MESSAGE_SIZE = 200 * 1024 + def __init__(self, *, config, googleapiclient_request_builder=None, **kwargs): super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs) credentials = None @@ -62,6 +71,18 @@ def send_messages(self, *, messages, cursor): for message in messages: msg_str = message.decode("utf8") msg = json.loads(msg_str) + + # This might not measure exactly 256K but should be a good enough approximation to handle this error. + # We try truncating the message if it isn't possible then it is skip. + if len(message) > GoogleCloudLoggingSender._LOG_ENTRY_QUOTA: + if "MESSAGE" in msg: + msg["MESSAGE"] = f'{msg["MESSAGE"][:GoogleCloudLoggingSender._MAX_MESSAGE_SIZE]}[MESSAGE TRUNCATED]' + messsage_size = len(json.dumps(msg)) + if messsage_size > GoogleCloudLoggingSender._LOG_ENTRY_QUOTA: + msg = { "MESSAGE": "Log entry cannot be logged because its size is greater than GCP logging quota of 256K" } + else: + msg = { "MESSAGE": "Log entry cannot be logged because its size is greater than GCP logging quota of 256K" } + timestamp = msg.pop("timestamp", None) journald_priority = msg.pop("PRIORITY", None) diff --git a/test/test_google_cloud_logging.py b/test/test_google_cloud_logging.py index 6b4e517..9a8f66d 100644 --- a/test/test_google_cloud_logging.py +++ b/test/test_google_cloud_logging.py @@ -2,10 +2,12 @@ from googleapiclient.http import RequestMockBuilder as GoogleApiClientRequestMockBuilder from httplib2 import Response as HttpLib2Response from journalpump.senders.google_cloud_logging import GoogleCloudLoggingSender +from string import printable from typing import Dict, List from unittest import mock import json +import random class TestGoogleCloudLoggingSender: @@ -144,3 +146,53 @@ def test_correct_timestamp(self): cursor=None, ) assert sender._sent_count == 1 # pylint: disable=protected-access + + def test_big_logentry_is_truncated(self): + """Check that message was not marked as sent if GoogleApi returns error""" + message_content = "A" * 257_00 + request_builder = self._generate_request_builder( + [{"jsonPayload": { "MESSAGE": message_content[:GoogleCloudLoggingSender._MAX_MESSAGE_SIZE] }}], + ) + + sender = GoogleCloudLoggingSender( + name="googlecloudlogging", + reader=mock.Mock(), + stats=mock.Mock(), + field_filter=None, + config=self.CONFIG, + googleapiclient_request_builder=request_builder, + ) + message = { "MESSAGE": message_content } + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 1 + + def test_big_logentry_sends_default(self): + """Check that message was not marked as sent if GoogleApi returns error""" + request_builder = self._generate_request_builder([ + { + "jsonPayload": { + "MESSAGE": "Log entry cannot be logged because its size is greater than GCP logging quota of 256K" + } + } + ]) + + sender = GoogleCloudLoggingSender( + name="googlecloudlogging", + reader=mock.Mock(), + stats=mock.Mock(), + field_filter=None, + config=self.CONFIG, + googleapiclient_request_builder=request_builder, + ) + message = { + "MESSAGE": "A" * 200_000, + "OTHER_FIELD": "B" * 200_000 + } + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 1 + + message = { + "OTHER_FIELD": "B" * 257_000 + } + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 2