Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added environment specific labels to client library when running in Cloud Run Jobs #877

Merged
merged 10 commits into from
Apr 17, 2024
72 changes: 70 additions & 2 deletions google/cloud/logging_v2/handlers/_monitored_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import logging
import os

from google.cloud.logging_v2.resource import Resource
Expand Down Expand Up @@ -67,6 +69,20 @@
_PROJECT_NAME = "project/project-id"
"""Attribute in metadata server when in GKE environment."""

_GAE_RESOURCE_TYPE = "gae_app"
"""Resource type for App Engine environment."""

_CLOUD_RUN_JOB_RESOURCE_TYPE = "cloud_run_job"
"""Resource type for Cloud Run Jobs."""

_GAE_TRACE_ID_LABEL = "appengine.googleapis.com/trace_id"
"""Extra trace label to be added on App Engine environments"""

_CLOUD_RUN_JOBS_EXECUTION_NAME_LABEL = "run.googleapis.com/execution_name"
_CLOUD_RUN_JOBS_TASK_INDEX_LABEL = "run.googleapis.com/task_index"
_CLOUD_RUN_JOBS_TASK_ATTEMPT_LABEL = "run.googleapis.com/task_attempt"
"""Extra labels for Cloud Run environments to be recognized by Cloud Run Jobs web UI."""


def _create_functions_resource():
"""Create a standardized Cloud Functions resource.
Expand Down Expand Up @@ -159,7 +175,7 @@ def _create_cloud_run_job_resource():
region = retrieve_metadata_server(_REGION_ID)
project = retrieve_metadata_server(_PROJECT_NAME)
resource = Resource(
type="cloud_run_job",
type=_CLOUD_RUN_JOB_RESOURCE_TYPE,
labels={
"project_id": project if project else "",
"job_name": os.environ.get(_CLOUD_RUN_JOB_ID, ""),
Expand All @@ -177,7 +193,7 @@ def _create_app_engine_resource():
zone = retrieve_metadata_server(_ZONE_ID)
project = retrieve_metadata_server(_PROJECT_NAME)
resource = Resource(
type="gae_app",
type=_GAE_RESOURCE_TYPE,
labels={
"project_id": project if project else "",
"module_id": os.environ.get(_GAE_SERVICE_ENV, ""),
Expand Down Expand Up @@ -233,3 +249,55 @@ def detect_resource(project=""):
else:
# use generic global resource
return _create_global_resource(project)


@functools.lru_cache(maxsize=None)
def _get_environmental_labels(resource_type):
"""Builds a dictionary of labels to be inserted into a LogRecord of the given resource type.
This function should only build a dict of items that are consistent across multiple logging statements
of the same resource type, such as environment variables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add a comment about cache?

Returns:
dict:
A dict representation of labels and the values of those labels
"""
labels = {}

def set_item(key, environ_var):
val = os.environ.get(environ_var, "")
if val:
labels[key] = val

if resource_type == _CLOUD_RUN_JOB_RESOURCE_TYPE:
set_item(
_CLOUD_RUN_JOBS_EXECUTION_NAME_LABEL,
_CLOUD_RUN_EXECUTION_ID,
)
set_item(_CLOUD_RUN_JOBS_TASK_INDEX_LABEL, _CLOUD_RUN_TASK_INDEX)
set_item(_CLOUD_RUN_JOBS_TASK_ATTEMPT_LABEL, _CLOUD_RUN_TASK_ATTEMPT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this works, but I find the indirection a bit harder to read than it probably needs to be. Maybe something like:

var_map =  ((_CLOUD_RUN_EXECUTION_ID, _CLOUD_RUN_JOBS_EXECUTION_NAME_LABEL), (_CLOUD_RUN_TASK_INDEX, _CLOUD_RUN_JOBS_TASK_INDEX_LABEL), (_CLOUD_RUN_TASK_ATTEMPT, _CLOUD_RUN_JOBS_TASK_ATTEMPT_LABEL))

for envvar, label_name in var_map:
  value = os.environ.get(envvar, None):
  if value:
    labels[label_name] = value
  


return labels


def add_resource_labels(resource: Resource, record: logging.LogRecord):
"""Returns additional labels to be appended on to a LogRecord object based on the
local environment. Defaults to an empty dictionary if none apply. This is only to be
used for CloudLoggingHandler, as the structured logging daemon already does this.

Args:
resource (google.cloud.logging.Resource): Resource based on the environment
record (logging.LogRecord): A LogRecord object representing a log record
Returns:
Dict[str, str]: New labels to append to the labels of the LogRecord
"""
if not resource:
return None

# Get environmental labels from the resource type
labels = _get_environmental_labels(resource.type)

# Add labels from log record
if resource.type == _GAE_RESOURCE_TYPE and record._trace is not None:
labels[_GAE_TRACE_ID_LABEL] = record._trace

return labels
16 changes: 6 additions & 10 deletions google/cloud/logging_v2/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import logging

from google.cloud.logging_v2.handlers.transports import BackgroundThreadTransport
from google.cloud.logging_v2.handlers._monitored_resources import detect_resource
from google.cloud.logging_v2.handlers._monitored_resources import (
detect_resource,
add_resource_labels,
)
from google.cloud.logging_v2.handlers._helpers import get_request_data

DEFAULT_LOGGER_NAME = "python"
Expand All @@ -40,12 +43,6 @@
"""These environments require us to remove extra handlers on setup"""
_CLEAR_HANDLER_RESOURCE_TYPES = ("gae_app", "cloud_function")

"""Extra trace label to be added on App Engine environments"""
_GAE_TRACE_ID_LABEL = "appengine.googleapis.com/trace_id"

"""Resource name for App Engine environments"""
_GAE_RESOURCE_TYPE = "gae_app"


class CloudLoggingFilter(logging.Filter):
"""Python standard ``logging`` Filter class to add Cloud Logging
Expand Down Expand Up @@ -206,9 +203,8 @@ def emit(self, record):
labels = record._labels
message = _format_and_parse_message(record, self)

if resource.type == _GAE_RESOURCE_TYPE and record._trace is not None:
# add GAE-specific label
labels = {_GAE_TRACE_ID_LABEL: record._trace, **(labels or {})}
labels = {**add_resource_labels(resource, record), **(labels or {})} or None

# send off request
self.transport.send(
record,
Expand Down
61 changes: 47 additions & 14 deletions tests/unit/handlers/test__monitored_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import unittest

import logging
import mock
import os
import functools

from google.cloud.logging_v2.handlers._monitored_resources import (
_create_functions_resource,
)
from google.cloud.logging_v2.handlers._monitored_resources import (
_create_app_engine_resource,
)
from google.cloud.logging_v2.handlers._monitored_resources import (
_create_functions_resource,
_create_kubernetes_resource,
)
from google.cloud.logging_v2.handlers._monitored_resources import (
_create_cloud_run_service_resource,
)
from google.cloud.logging_v2.handlers._monitored_resources import (
_create_cloud_run_job_resource,
)
from google.cloud.logging_v2.handlers._monitored_resources import (
_create_compute_resource,
)
from google.cloud.logging_v2.handlers._monitored_resources import (
_create_global_resource,
detect_resource,
add_resource_labels,
)
from google.cloud.logging_v2.handlers._monitored_resources import detect_resource
from google.cloud.logging_v2.handlers import _monitored_resources
from google.cloud.logging_v2.resource import Resource

Expand Down Expand Up @@ -353,3 +344,45 @@ def test_detect_partial_data(self):
# project id not returned from metadata serve
# should be empty string
self.assertEqual(resource.labels["project_id"], "")


@pytest.mark.parametrize(
"resource_type,os_environ,record_attrs,expected_labels",
[
(
_monitored_resources._GAE_RESOURCE_TYPE,
{},
{"_trace": "trace_id"},
{_monitored_resources._GAE_TRACE_ID_LABEL: "trace_id"},
),
(
_monitored_resources._CLOUD_RUN_JOB_RESOURCE_TYPE,
{
_monitored_resources._CLOUD_RUN_EXECUTION_ID: "test_job_12345",
_monitored_resources._CLOUD_RUN_TASK_INDEX: "1",
_monitored_resources._CLOUD_RUN_TASK_ATTEMPT: "12",
},
{},
{
_monitored_resources._CLOUD_RUN_JOBS_EXECUTION_NAME_LABEL: "test_job_12345",
_monitored_resources._CLOUD_RUN_JOBS_TASK_INDEX_LABEL: "1",
_monitored_resources._CLOUD_RUN_JOBS_TASK_ATTEMPT_LABEL: "12",
},
),
("global", {}, {}, {}),
],
)
def test_add_resource_labels(resource_type, os_environ, record_attrs, expected_labels):
os.environ.clear()
record = logging.LogRecord("logname", None, None, None, "test", None, None)

resource = Resource(type=resource_type, labels={})

for attr, val in record_attrs.items():
setattr(record, attr, val)

os.environ.update(os_environ)

labels = add_resource_labels(resource, record)

assert expected_labels == labels