Skip to content

Commit

Permalink
senders: Skips messages larger than 256K
Browse files Browse the repository at this point in the history
GCP quotas page for the logging v2(https://cloud.google.com/logging/quotas) specifies
that a LogEntry should have at maximum 256KB in size, that is not a hard limit, but
an estimation.
In anycase we should check if any of our LogEntry objects are bigger than the quota
allows and not send those logs. Instead we try to truncate the message to smaller size
and if that isn't enough we send a default log message.
  • Loading branch information
italomg committed Jun 18, 2024
1 parent 9803f9c commit 9e20cfc
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
2 changes: 2 additions & 0 deletions journalpump/senders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import re # type: ignore[no-redef]

KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30

# GCP logging also relies on this MAX message size
MAX_KAFKA_MESSAGE_SIZE = 1024**2 # 1 MiB

MAX_ERROR_MESSAGES = 8
Expand Down
21 changes: 21 additions & 0 deletions journalpump/senders/google_cloud_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ class GoogleCloudLoggingSender(LogSender):
0: "EMERGENCY",
}

# A bit on the safe side, not exactly 256KB but this
# is an approximation anyway
# according to https://cloud.google.com/logging/quotas
_LOG_ENTRY_QUOTA = 250 * 1024

# Somewhat arbitrary maximum message size choosen, this gives a 56K
# headroom for the other fields in the LogEntry
_MAX_MESSAGE_SIZE = 200 * 1024

def __init__(self, *, config, googleapiclient_request_builder=None, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
credentials = None
Expand Down Expand Up @@ -62,6 +71,18 @@ def send_messages(self, *, messages, cursor):
for message in messages:
msg_str = message.decode("utf8")
msg = json.loads(msg_str)

# This might not measure exactly 256K but should be a good enough approximation to handle this error.
# We try truncating the message if it isn't possible then it is skip.
if len(message) > GoogleCloudLoggingSender._LOG_ENTRY_QUOTA:
if "MESSAGE" in msg:
msg["MESSAGE"] = f'{msg["MESSAGE"][:GoogleCloudLoggingSender._MAX_MESSAGE_SIZE]}[MESSAGE TRUNCATED]'
messsage_size = len(json.dumps(msg))
if messsage_size > GoogleCloudLoggingSender._LOG_ENTRY_QUOTA:
msg = { "MESSAGE": "Log entry cannot be logged because its size is greater than GCP logging quota of 256K" }
else:
msg = { "MESSAGE": "Log entry cannot be logged because its size is greater than GCP logging quota of 256K" }

timestamp = msg.pop("timestamp", None)
journald_priority = msg.pop("PRIORITY", None)

Expand Down
52 changes: 52 additions & 0 deletions test/test_google_cloud_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
from googleapiclient.http import RequestMockBuilder as GoogleApiClientRequestMockBuilder
from httplib2 import Response as HttpLib2Response
from journalpump.senders.google_cloud_logging import GoogleCloudLoggingSender
from string import printable
from typing import Dict, List
from unittest import mock

import json
import random


class TestGoogleCloudLoggingSender:
Expand Down Expand Up @@ -144,3 +146,53 @@ def test_correct_timestamp(self):
cursor=None,
)
assert sender._sent_count == 1 # pylint: disable=protected-access

def test_big_logentry_is_truncated(self):
"""Check that message was not marked as sent if GoogleApi returns error"""
message_content = "A" * 257_00
request_builder = self._generate_request_builder(
[{"jsonPayload": { "MESSAGE": message_content[:GoogleCloudLoggingSender._MAX_MESSAGE_SIZE] }}],
)

sender = GoogleCloudLoggingSender(
name="googlecloudlogging",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
config=self.CONFIG,
googleapiclient_request_builder=request_builder,
)
message = { "MESSAGE": message_content }
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 1

def test_big_logentry_sends_default(self):
"""Check that message was not marked as sent if GoogleApi returns error"""
request_builder = self._generate_request_builder([
{
"jsonPayload": {
"MESSAGE": "Log entry cannot be logged because its size is greater than GCP logging quota of 256K"
}
}
])

sender = GoogleCloudLoggingSender(
name="googlecloudlogging",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
config=self.CONFIG,
googleapiclient_request_builder=request_builder,
)
message = {
"MESSAGE": "A" * 200_000,
"OTHER_FIELD": "B" * 200_000
}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 1

message = {
"OTHER_FIELD": "B" * 257_000
}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 2

0 comments on commit 9e20cfc

Please sign in to comment.