From 95d94d7b7aeb901a51a5378acd21b5c8eefb0c71 Mon Sep 17 00:00:00 2001 From: Italo Garcia Date: Thu, 13 Jun 2024 20:00:39 +0200 Subject: [PATCH] senders: Skips messages larger than 256K GCP quotas page for the logging v2(https://cloud.google.com/logging/quotas) specifies that a LogEntry should have at maximum 256KB in size, that is not a hard limit, but an estimation. In anycase we should check if any of our LogEntry objects are bigger than the quota allows and not send those logs. Instead we try to truncate the message to smaller size and if that isn't enough we send a default log message. --- journalpump/senders/base.py | 2 + journalpump/senders/google_cloud_logging.py | 23 +++++++++- test/test_google_cloud_logging.py | 47 +++++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/journalpump/senders/base.py b/journalpump/senders/base.py index 5a3b00c..5c8a89c 100644 --- a/journalpump/senders/base.py +++ b/journalpump/senders/base.py @@ -14,6 +14,8 @@ import re # type: ignore[no-redef] KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30 + +# GCP logging also relies on this MAX message size MAX_KAFKA_MESSAGE_SIZE = 1024**2 # 1 MiB MAX_ERROR_MESSAGES = 8 diff --git a/journalpump/senders/google_cloud_logging.py b/journalpump/senders/google_cloud_logging.py index 264cabf..2930dfa 100644 --- a/journalpump/senders/google_cloud_logging.py +++ b/journalpump/senders/google_cloud_logging.py @@ -23,6 +23,15 @@ class GoogleCloudLoggingSender(LogSender): 0: "EMERGENCY", } + # A bit on the safe side, not exactly 256KB but this + # is an approximation anyway + # according to https://cloud.google.com/logging/quotas + _LOG_ENTRY_QUOTA = 250 * 1024 + + # Somewhat arbitrary maximum message size choosen, this gives a 56K + # headroom for the other fields in the LogEntry + _MAX_MESSAGE_SIZE = 200 * 1024 + def __init__(self, *, config, googleapiclient_request_builder=None, **kwargs): super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs) credentials = None @@ -62,6 +71,18 @@ def send_messages(self, *, messages, cursor): for message in messages: msg_str = message.decode("utf8") msg = json.loads(msg_str) + + # This might not measure exactly 256K but should be a good enough approximation to handle this error. + # We try truncating the message if it isn't possible then it is skip. + if len(message) > self._LOG_ENTRY_QUOTA: + DEFAULT_MESSAGE = "Log entry can't be logged because its size is greater than GCP logging quota of 256K" + if "MESSAGE" in msg: + msg["MESSAGE"] = f'{msg["MESSAGE"][:self._MAX_MESSAGE_SIZE]}[MESSAGE TRUNCATED]' + messsage_size = len(json.dumps(msg, ensure_ascii=False).encode("utf-8")) + if messsage_size > self._LOG_ENTRY_QUOTA: + msg = {"MESSAGE": DEFAULT_MESSAGE} + else: + msg = {"MESSAGE": DEFAULT_MESSAGE} timestamp = msg.pop("timestamp", None) journald_priority = msg.pop("PRIORITY", None) @@ -75,7 +96,7 @@ def send_messages(self, *, messages, cursor): if timestamp is not None: entry["timestamp"] = timestamp[:26] + "Z" # assume timestamp to be UTC if journald_priority is not None: - severity = GoogleCloudLoggingSender._SEVERITY_MAPPING.get(journald_priority, "DEFAULT") + severity = self._SEVERITY_MAPPING.get(journald_priority, "DEFAULT") entry["severity"] = severity body["entries"].append(entry) diff --git a/test/test_google_cloud_logging.py b/test/test_google_cloud_logging.py index 6b4e517..23651e1 100644 --- a/test/test_google_cloud_logging.py +++ b/test/test_google_cloud_logging.py @@ -144,3 +144,50 @@ def test_correct_timestamp(self): cursor=None, ) assert sender._sent_count == 1 # pylint: disable=protected-access + + def test_big_logentry_is_truncated(self): + """Check that message was not marked as sent if GoogleApi returns error""" + message_content = "A" * 257_00 + request_builder = self._generate_request_builder( + [{"jsonPayload": {"MESSAGE": message_content[: GoogleCloudLoggingSender._MAX_MESSAGE_SIZE]}}], + ) + + sender = GoogleCloudLoggingSender( + name="googlecloudlogging", + reader=mock.Mock(), + stats=mock.Mock(), + field_filter=None, + config=self.CONFIG, + googleapiclient_request_builder=request_builder, + ) + message = {"MESSAGE": message_content} + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 1 + + def test_big_logentry_sends_default(self): + """Check that message was not marked as sent if GoogleApi returns error""" + request_builder = self._generate_request_builder( + [ + { + "jsonPayload": { + "MESSAGE": "Log entry can't be logged because its size is greater than GCP logging quota of 256K" + } + } + ] + ) + + sender = GoogleCloudLoggingSender( + name="googlecloudlogging", + reader=mock.Mock(), + stats=mock.Mock(), + field_filter=None, + config=self.CONFIG, + googleapiclient_request_builder=request_builder, + ) + message = {"MESSAGE": "A" * 200_000, "OTHER_FIELD": "B" * 200_000} + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 1 + + message = {"OTHER_FIELD": "B" * 257_000} + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 2