Skip to content

Commit

Permalink
senders: Skips messages larger than 256K
Browse files Browse the repository at this point in the history
GCP quotas page for the logging v2(https://cloud.google.com/logging/quotas) specifies
that a LogEntry should have at maximum 256KB in size, that is not a hard limit, but
an estimation.
In anycase we should check if any of our LogEntry objects are bigger than the quota
allows and not send those logs. Instead we try to truncate the message to smaller size
and if that isn't enough we send a default log message.
  • Loading branch information
italomg committed Jun 21, 2024
1 parent 9803f9c commit 95d94d7
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
2 changes: 2 additions & 0 deletions journalpump/senders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import re # type: ignore[no-redef]

KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30

# GCP logging also relies on this MAX message size
MAX_KAFKA_MESSAGE_SIZE = 1024**2 # 1 MiB

MAX_ERROR_MESSAGES = 8
Expand Down
23 changes: 22 additions & 1 deletion journalpump/senders/google_cloud_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ class GoogleCloudLoggingSender(LogSender):
0: "EMERGENCY",
}

# A bit on the safe side, not exactly 256KB but this
# is an approximation anyway
# according to https://cloud.google.com/logging/quotas
_LOG_ENTRY_QUOTA = 250 * 1024

# Somewhat arbitrary maximum message size choosen, this gives a 56K
# headroom for the other fields in the LogEntry
_MAX_MESSAGE_SIZE = 200 * 1024

def __init__(self, *, config, googleapiclient_request_builder=None, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
credentials = None
Expand Down Expand Up @@ -62,6 +71,18 @@ def send_messages(self, *, messages, cursor):
for message in messages:
msg_str = message.decode("utf8")
msg = json.loads(msg_str)

# This might not measure exactly 256K but should be a good enough approximation to handle this error.
# We try truncating the message if it isn't possible then it is skip.
if len(message) > self._LOG_ENTRY_QUOTA:
DEFAULT_MESSAGE = "Log entry can't be logged because its size is greater than GCP logging quota of 256K"
if "MESSAGE" in msg:
msg["MESSAGE"] = f'{msg["MESSAGE"][:self._MAX_MESSAGE_SIZE]}[MESSAGE TRUNCATED]'
messsage_size = len(json.dumps(msg, ensure_ascii=False).encode("utf-8"))
if messsage_size > self._LOG_ENTRY_QUOTA:
msg = {"MESSAGE": DEFAULT_MESSAGE}
else:
msg = {"MESSAGE": DEFAULT_MESSAGE}
timestamp = msg.pop("timestamp", None)
journald_priority = msg.pop("PRIORITY", None)

Expand All @@ -75,7 +96,7 @@ def send_messages(self, *, messages, cursor):
if timestamp is not None:
entry["timestamp"] = timestamp[:26] + "Z" # assume timestamp to be UTC
if journald_priority is not None:
severity = GoogleCloudLoggingSender._SEVERITY_MAPPING.get(journald_priority, "DEFAULT")
severity = self._SEVERITY_MAPPING.get(journald_priority, "DEFAULT")
entry["severity"] = severity
body["entries"].append(entry)

Expand Down
47 changes: 47 additions & 0 deletions test/test_google_cloud_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,50 @@ def test_correct_timestamp(self):
cursor=None,
)
assert sender._sent_count == 1 # pylint: disable=protected-access

def test_big_logentry_is_truncated(self):
"""Check that message was not marked as sent if GoogleApi returns error"""
message_content = "A" * 257_00
request_builder = self._generate_request_builder(
[{"jsonPayload": {"MESSAGE": message_content[: GoogleCloudLoggingSender._MAX_MESSAGE_SIZE]}}],
)

sender = GoogleCloudLoggingSender(
name="googlecloudlogging",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
config=self.CONFIG,
googleapiclient_request_builder=request_builder,
)
message = {"MESSAGE": message_content}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 1

def test_big_logentry_sends_default(self):
"""Check that message was not marked as sent if GoogleApi returns error"""
request_builder = self._generate_request_builder(
[
{
"jsonPayload": {
"MESSAGE": "Log entry can't be logged because its size is greater than GCP logging quota of 256K"
}
}
]
)

sender = GoogleCloudLoggingSender(
name="googlecloudlogging",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
config=self.CONFIG,
googleapiclient_request_builder=request_builder,
)
message = {"MESSAGE": "A" * 200_000, "OTHER_FIELD": "B" * 200_000}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 1

message = {"OTHER_FIELD": "B" * 257_000}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 2

0 comments on commit 95d94d7

Please sign in to comment.