From e21f5ac0f342eced6b1c7f58c822c522b1db94a5 Mon Sep 17 00:00:00 2001 From: Sagiv Oulu <140942608+sagivoululumigo@users.noreply.github.com> Date: Mon, 11 Sep 2023 08:19:34 +0300 Subject: [PATCH] RD-11321-otel-distros-python-and-node-filter-out-empty-sqs-polls (#481) * feat: do not export empty sqs polling spans * test: new span processor and empty sqs span skipping * test: integration test empty sqs polling span skipping * docs: update README.md with filter empty sqs messages feature * docs: detailed possible values for LUMIGO_AUTO_FILTER_EMPTY_SQS --- README.md | 14 +++ requirements.txt | 2 +- src/lumigo_opentelemetry/__init__.py | 5 +- .../botocore/parsers/__init__.py | 28 +++++ .../libs/environment_variables.py | 1 + .../libs/general_utils.py | 27 +++++ .../resources/span_processor.py | 40 +++++++ src/test/integration/boto3-sqs/app/main.py | 5 + .../boto3-sqs/tests/test_boto3_sqs.py | 27 ++++- .../instrumentations/botocore/test_parsers.py | 108 ++++++++++++++++++ src/test/unit/libs/test_general_utils.py | 31 ++++- .../unit/resources/test_span_processor.py | 73 ++++++++++++ 12 files changed, 356 insertions(+), 5 deletions(-) create mode 100644 src/lumigo_opentelemetry/libs/environment_variables.py create mode 100644 src/lumigo_opentelemetry/resources/span_processor.py create mode 100644 src/test/unit/instrumentations/botocore/test_parsers.py create mode 100644 src/test/unit/resources/test_span_processor.py diff --git a/README.md b/README.md index cca1d8ad..8c2ee330 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ The `lumigo_opentelemetry` package additionally supports the following configura * We support more granular masking using the following parameters. If not given, the above configuration is the fallback: `LUMIGO_SECRET_MASKING_REGEX_HTTP_REQUEST_BODIES`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_REQUEST_HEADERS`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_RESPONSE_BODIES`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_RESPONSE_HEADERS`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_QUERY_PARAMS`, `LUMIGO_SECRET_MASKING_REGEX_ENVIRONMENT`. * `LUMIGO_SWITCH_OFF=true`: This option disables the Lumigo OpenTelemetry distro entirely; no instrumentation will be injected, no tracing data will be collected. * `LUMIGO_REPORT_DEPENDENCIES=false`: This option disables the built-in dependency reporting to Lumigo SaaS. For more information, refer to the [Automated dependency reporting](#automated-dependency-reporting) section. +* `LUMIGO_AUTO_FILTER_EMPTY_SQS`: This option enables the automatic filtering of empty SQS messages from being sent to Lumigo SaaS. For more information, refer to the [Filtering out empty SQS messages](#filtering-out-empty-sqs-messages) section. ### Execution Tags @@ -340,6 +341,19 @@ for message in response.get("Messages", []): Without the scope provided by the iterator over `response["Messages"]`, `span_1` would be without a parent span, and that would result in a separate invocation and a separate transaction in Lumigo. +### Filtering out empty SQS messages + +A common pattern in SQS-based applications is to continuously poll an SQS queue for messages, +and to process them as they arrive. +In order not to clutter the Lumigo Dashboard with empty SQS polling messages, the default behavior is to filter them +out from being sent to Lumigo. + +You can change this behavior by setting the boolean environment variable `LUMIGO_AUTO_FILTER_EMPTY_SQS` to `false`. +The possible variations are: +* `LUMIGO_AUTO_FILTER_EMPTY_SQS=true` filter out empty SQS polling messages +* `LUMIGO_AUTO_FILTER_EMPTY_SQS=false` do not filter out empty SQS polling messages +* No environment variable set (default): filter out empty SQS polling messages + ## Testing We use [nox](https://pypi.org/project/nox/) for setting up and running our tests. diff --git a/requirements.txt b/requirements.txt index 93c24a3f..b11db86c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,7 @@ pre-commit==2.20.0 psutil==5.9.1 pytest==7.1.1 pytest-cov==3.0.0 -pyyaml==6.0 +pyyaml==6.0.1 requests==2.27.1 types-attrs==19.1.0 types-boto==2.49.17 diff --git a/src/lumigo_opentelemetry/__init__.py b/src/lumigo_opentelemetry/__init__.py index ae77e231..5c52bb84 100644 --- a/src/lumigo_opentelemetry/__init__.py +++ b/src/lumigo_opentelemetry/__init__.py @@ -89,7 +89,8 @@ def init() -> Dict[str, Any]: from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace import SpanLimits, TracerProvider - from opentelemetry.sdk.trace.export import BatchSpanProcessor + + from lumigo_opentelemetry.resources.span_processor import LumigoSpanProcessor DEFAULT_LUMIGO_ENDPOINT = ( "https://ga-otlp.lumigo-tracer-edge.golumigo.com/v1/traces" @@ -125,7 +126,7 @@ def init() -> Dict[str, Any]: if lumigo_token: tracer_provider.add_span_processor( - BatchSpanProcessor( + LumigoSpanProcessor( OTLPSpanExporter( endpoint=lumigo_endpoint, headers={"Authorization": f"LumigoToken {lumigo_token}"}, diff --git a/src/lumigo_opentelemetry/instrumentations/botocore/parsers/__init__.py b/src/lumigo_opentelemetry/instrumentations/botocore/parsers/__init__.py index 9b3d8171..165d121e 100644 --- a/src/lumigo_opentelemetry/instrumentations/botocore/parsers/__init__.py +++ b/src/lumigo_opentelemetry/instrumentations/botocore/parsers/__init__.py @@ -12,6 +12,10 @@ extract_region_from_arn, get_resource_fullname, ) +from lumigo_opentelemetry.resources.span_processor import set_span_skip_export +from lumigo_opentelemetry import logger +from lumigo_opentelemetry.libs.general_utils import get_boolean_env_var +from lumigo_opentelemetry.libs.environment_variables import AUTO_FILTER_EMPTY_SQS class AwsParser: @@ -133,6 +137,22 @@ def parse_request( } span.set_attributes(attributes) + @staticmethod + def _should_skip_empty_sqs_polling_response( + operation_name: str, result: Dict[Any, Any] + ) -> bool: + """ + checks the sqs response & returns true if the request receive messages from SQS but no messages were returned + """ + + no_messages = not result or not result.get("Messages", None) + sqs_poll = operation_name == "ReceiveMessage" + return ( + sqs_poll + and no_messages + and get_boolean_env_var(AUTO_FILTER_EMPTY_SQS, True) + ) + @staticmethod def parse_response( span: Span, service_name: str, operation_name: str, result: Dict[Any, Any] @@ -145,6 +165,14 @@ def parse_response( {"lumigoData": json.dumps({"trigger": trigger_details})} ) + # Filter out sqs polls with empty response + if SqsParser._should_skip_empty_sqs_polling_response(operation_name, result): + logger.info( + "Not tracing empty SQS polling requests " + f"(override by setting the {AUTO_FILTER_EMPTY_SQS} env var to false)" + ) + set_span_skip_export(span) + class LambdaParser(AwsParser): @staticmethod diff --git a/src/lumigo_opentelemetry/libs/environment_variables.py b/src/lumigo_opentelemetry/libs/environment_variables.py new file mode 100644 index 00000000..d1613e81 --- /dev/null +++ b/src/lumigo_opentelemetry/libs/environment_variables.py @@ -0,0 +1 @@ +AUTO_FILTER_EMPTY_SQS = "LUMIGO_AUTO_FILTER_EMPTY_SQS" diff --git a/src/lumigo_opentelemetry/libs/general_utils.py b/src/lumigo_opentelemetry/libs/general_utils.py index 6cfa9f65..1ccf4bb8 100644 --- a/src/lumigo_opentelemetry/libs/general_utils.py +++ b/src/lumigo_opentelemetry/libs/general_utils.py @@ -70,3 +70,30 @@ def get_max_size() -> int: os.environ.get(OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, DEFAULT_MAX_ENTRY_SIZE), ) ) + + +def get_boolean_env_var(env_var_name: str, default: bool = False) -> bool: + """ + This function return the boolean value of the given environment variable. + If this values doesn't exist, return default. + + @param env_var_name: The env var to get (case-sensitive) + @param default: Default value if env var is not set + @return: The boolean value of the env var + """ + + env_var_value = os.environ.get(env_var_name, str(default)) + env_var_value = ( + env_var_value.lower() if isinstance(env_var_value, str) else env_var_value + ) + + is_truth_value = env_var_value == "true" + is_false_value = env_var_value == "false" + if not is_truth_value and not is_false_value: + logger.debug( + f'Invalid boolean value for env var "{env_var_name}", ' + f'defaulting to value "{str(default).lower()}"' + ) + return default + + return is_truth_value diff --git a/src/lumigo_opentelemetry/resources/span_processor.py b/src/lumigo_opentelemetry/resources/span_processor.py new file mode 100644 index 00000000..72ad226e --- /dev/null +++ b/src/lumigo_opentelemetry/resources/span_processor.py @@ -0,0 +1,40 @@ +from opentelemetry.trace import Span +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +from lumigo_opentelemetry import logger + + +# A span attributes that if is set to True, the span will not be exported +SKIP_EXPORT_SPAN_ATTRIBUTE = "SKIP_EXPORT" + + +class LumigoSpanProcessor(BatchSpanProcessor): + def on_end(self, span: ReadableSpan) -> None: + if should_skip_exporting_span(span): + logger.debug("Not exporting span because it has NO_EXPORT=True attribute") + return + + super().on_end(span) + + +def should_skip_exporting_span(span: ReadableSpan) -> bool: + """ + Given a span, returns an answer if the span should be exported or not. + @param span: A readable span to check + @return: True if the span should not be exported, False otherwise + """ + return span.attributes.get(SKIP_EXPORT_SPAN_ATTRIBUTE, False) is True + + +def set_span_skip_export(span: Span, skip_export: bool = True) -> None: + """ + marks the span as a span not intended for export (for example in spans that create a lot of noise and customers + do not want to trace) + + @param span: The span to mark (The span is altered in place) + @param skip_export: Should the span be exported or not (default is True, the span will not be exported) + @return: + """ + + span.set_attributes({SKIP_EXPORT_SPAN_ATTRIBUTE: skip_export}) diff --git a/src/test/integration/boto3-sqs/app/main.py b/src/test/integration/boto3-sqs/app/main.py index 9291d4a4..aa270366 100644 --- a/src/test/integration/boto3-sqs/app/main.py +++ b/src/test/integration/boto3-sqs/app/main.py @@ -13,6 +13,11 @@ def run(): client = boto3.client("sqs", region_name="eu-central-1") queue = client.create_queue(QueueName="test") + # Simulate polling an empty sqs queue + client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1) + client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1) + client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1) + client.send_message( QueueUrl=queue["QueueUrl"], MessageBody="Message_1", diff --git a/src/test/integration/boto3-sqs/tests/test_boto3_sqs.py b/src/test/integration/boto3-sqs/tests/test_boto3_sqs.py index 2a498fa2..b3305e61 100644 --- a/src/test/integration/boto3-sqs/tests/test_boto3_sqs.py +++ b/src/test/integration/boto3-sqs/tests/test_boto3_sqs.py @@ -5,10 +5,13 @@ class TestBoto3SqsSpans(unittest.TestCase): def test_boto3_instrumentation(self): spans_container = SpansContainer.parse_spans_from_file() - self.assertEqual(9, len(spans_container.spans)) + self.assertEqual(12, len(spans_container.spans)) [ create_queue_span, + empty_sqs_poll_1_span, + empty_sqs_poll_2_span, + empty_sqs_poll_3_span, send_message_1_span, send_message_2_span, receive_message_1_span, @@ -21,6 +24,28 @@ def test_boto3_instrumentation(self): after_iterator_break_span, ] = spans_container.spans + # Make sure that all the empty polling spans are marked as skipped + for span in [ + empty_sqs_poll_1_span, + empty_sqs_poll_2_span, + empty_sqs_poll_3_span, + ]: + self.assertIsNotNone(span.get("attributes")) + self.assertEqual(span["attributes"].get("SKIP_EXPORT"), True) + + # Make sure that other spans are not marked as skipped + for span in [ + create_queue_span, + send_message_1_span, + send_message_2_span, + receive_message_1_span, + receive_message_2_span, + consume_message_2_span, + consume_message_1_span, + receive_message_2_span, + ]: + self.assertNotEqual(span.get("attributes", {}).get("SKIP_EXPORT"), True) + self.assertEqual(create_queue_span["name"], "SQS.CreateQueue") self.assertIsNone(create_queue_span["parent_id"]) diff --git a/src/test/unit/instrumentations/botocore/test_parsers.py b/src/test/unit/instrumentations/botocore/test_parsers.py new file mode 100644 index 00000000..913e1d34 --- /dev/null +++ b/src/test/unit/instrumentations/botocore/test_parsers.py @@ -0,0 +1,108 @@ +import logging + +import pytest +from unittest.mock import Mock, patch + +from lumigo_opentelemetry.instrumentations.botocore.parsers import SqsParser + + +EMPTY_SQS_RESULT_1 = {} +EMPTY_SQS_RESULT_2 = {"Messages": []} +NON_EMPTY_SQS_RESULT = {"Messages": [{"MessageId": "1234", "Body": "test"}]} + + +@pytest.mark.parametrize( + "env_var_value, operation, result, should_skip", + [ + # Check that empty sqs polls are skipped + ("true", "ReceiveMessage", EMPTY_SQS_RESULT_1, True), + ("true", "ReceiveMessage", EMPTY_SQS_RESULT_2, True), + # Check that non-empty polls are not skipped + ("true", "ReceiveMessage", NON_EMPTY_SQS_RESULT, False), + # Check that other operations are not skipped + ("true", "DeleteMessage", EMPTY_SQS_RESULT_1, False), + ("true", "DeleteMessageBatch", EMPTY_SQS_RESULT_1, False), + ("true", "SendMessage", EMPTY_SQS_RESULT_1, False), + ("true", "SendMessageBatch", EMPTY_SQS_RESULT_1, False), + ("true", "UnknownOperation", EMPTY_SQS_RESULT_1, False), + ("true", None, EMPTY_SQS_RESULT_1, False), + # Check that empty sqs polls are not skipped if the env var is set to false + ("false", "ReceiveMessage", EMPTY_SQS_RESULT_1, False), + ("false", "ReceiveMessage", EMPTY_SQS_RESULT_2, False), + # Check that non-empty polls are not skipped if the env var is set to false + ("false", "ReceiveMessage", NON_EMPTY_SQS_RESULT, False), + # Check that the default behavior is to skip empty sqs polls + (None, "ReceiveMessage", EMPTY_SQS_RESULT_1, True), + (None, "ReceiveMessage", EMPTY_SQS_RESULT_2, True), + ("UnsupportedEnvVarValue", "ReceiveMessage", EMPTY_SQS_RESULT_2, True), + ], +) +def test_sqs_skip_sqs_response( + env_var_value, operation, result, should_skip, monkeypatch +): + if env_var_value is not None: + monkeypatch.setenv("LUMIGO_AUTO_FILTER_EMPTY_SQS", env_var_value) + + assert ( + SqsParser._should_skip_empty_sqs_polling_response(operation, result) + == should_skip + ) + + +@patch( + "lumigo_opentelemetry.instrumentations.botocore.parsers.SqsParser._should_skip_empty_sqs_polling_response" +) +def test_parse_sqs_response_skipping_empty_polls_outputs_log(should_skip_mock, caplog): + should_skip_mock.return_value = True + span = Mock(set_attribute=Mock()) + service_name = "sqs" + operation_name = "ReceiveMessage" + result = { + "ResponseMetadata": { + "RequestId": "54bf0dab-cfab-5fa5-b284-50d83403c94c", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "x-amzn-requestid": "54bf0dab-cfab-5fa5-b284-50d83403c94c", + "date": "Thu, 07 Sep 2023 16:25:12 GMT", + "content-type": "text/xml", + "content-length": "240", + "connection": "keep-alive", + }, + "RetryAttempts": 0, + } + } + + with caplog.at_level(logging.INFO): + SqsParser.parse_response(span, service_name, operation_name, result) + + assert "not tracing empty sqs polling requests" in caplog.text.lower() + + +@patch( + "lumigo_opentelemetry.instrumentations.botocore.parsers.SqsParser._should_skip_empty_sqs_polling_response" +) +def test_parse_sqs_response_not_skipping_polls_no_output_log(should_skip_mock, caplog): + should_skip_mock.return_value = False + span = Mock(set_attribute=Mock()) + service_name = "sqs" + operation_name = "ReceiveMessage" + result = { + "ResponseMetadata": { + "RequestId": "54bf0dab-cfab-5fa5-b284-50d83403c94c", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "x-amzn-requestid": "54bf0dab-cfab-5fa5-b284-50d83403c94c", + "date": "Thu, 07 Sep 2023 16:25:12 GMT", + "content-type": "text/xml", + "content-length": "240", + "connection": "keep-alive", + }, + "RetryAttempts": 0, + } + } + + SqsParser.parse_response(span, service_name, operation_name, result) + + assert "not tracing empty sqs polling requests" not in caplog.text.lower() + + # Make sure that there is an info log diff --git a/src/test/unit/libs/test_general_utils.py b/src/test/unit/libs/test_general_utils.py index db5ebc38..5aa5b93d 100644 --- a/src/test/unit/libs/test_general_utils.py +++ b/src/test/unit/libs/test_general_utils.py @@ -1,4 +1,6 @@ -from lumigo_opentelemetry.libs.general_utils import get_max_size +import pytest + +from lumigo_opentelemetry.libs.general_utils import get_max_size, get_boolean_env_var def test_get_max_size_otel_span_attr_limit_is_set(monkeypatch): @@ -19,3 +21,30 @@ def test_get_max_size_both_env_vars_are_set(monkeypatch): def test_get_max_size_get_default_value(): assert get_max_size() == 2048 + + +@pytest.mark.parametrize( + "env_var_value,default,expected_result", + [ + # Normal truth values + ("true", False, True), + ("True", False, True), + ("TRUE", False, True), + # Normal false values + ("false", True, False), + ("False", True, False), + ("FALSE", True, False), + # Invalid values, use the default + ("RandomValue", False, False), + ("RandomValue", True, True), + # Empty values, use the default + (None, False, False), + (None, True, True), + ], +) +def test_get_boolean_env_var(env_var_value, default, expected_result, monkeypatch): + """Try getting a boolean value from env vars, and check all the different options work""" + + if env_var_value is not None: + monkeypatch.setenv("TEST_VAR", env_var_value) + assert get_boolean_env_var("TEST_VAR", default=default) is expected_result diff --git a/src/test/unit/resources/test_span_processor.py b/src/test/unit/resources/test_span_processor.py new file mode 100644 index 00000000..9ea8c9f8 --- /dev/null +++ b/src/test/unit/resources/test_span_processor.py @@ -0,0 +1,73 @@ +from unittest.mock import Mock, patch + +import pytest + +from lumigo_opentelemetry.resources.span_processor import ( + set_span_skip_export, + should_skip_exporting_span, + LumigoSpanProcessor, +) + + +def test_set_span_no_export(): + """ + Given a span, check that the span is marked as not exported + """ + + for no_export in [True, False]: + span_mock = Mock(set_attribute=Mock()) + set_span_skip_export(span_mock, no_export) + (attributes,) = span_mock.set_attributes.call_args[0] + assert attributes == {"SKIP_EXPORT": no_export} + + # Check default value + span_mock = Mock(set_attribute=Mock()) + set_span_skip_export(span_mock) + (attributes,) = span_mock.set_attributes.call_args[0] + assert attributes == {"SKIP_EXPORT": True} + + +@pytest.mark.parametrize( + "attributes, should_export", + [ + # Default is to export + ({}, True), + # Use the value if it is set + ({"SKIP_EXPORT": False}, True), + ({"SKIP_EXPORT": True}, False), + ], +) +def test_should_not_export_span(attributes, should_export): + readable_span_mock = Mock(attributes=attributes) + + assert should_skip_exporting_span(readable_span_mock) is not should_export + + +@patch("lumigo_opentelemetry.resources.span_processor.should_skip_exporting_span") +@patch("opentelemetry.sdk.trace.export.BatchSpanProcessor.on_end") +def test_lumigo_span_processor_no_export_set( + mocked_super_on_end, mocked_should_not_export_span +): + processor = LumigoSpanProcessor(Mock()) + readable_span_mock = Mock() + mocked_should_not_export_span.return_value = True + processor.on_end(span=readable_span_mock) + + # Check if the parent of processor BatchSpanProcessor.on_end not called + mocked_super_on_end.assert_not_called() + + +@patch("lumigo_opentelemetry.resources.span_processor.should_skip_exporting_span") +@patch("opentelemetry.sdk.trace.export.BatchSpanProcessor.on_end") +def test_lumigo_span_processor_no_export_not_set( + mocked_super_on_end, mocked_should_not_export_span +): + processor = LumigoSpanProcessor(Mock()) + readable_span_mock = Mock() + + # should_not_export is False. i.e. the span should be exported + mocked_should_not_export_span.return_value = False + processor.on_end(span=readable_span_mock) + + # Check if the parent of processor BatchSpanProcessor.on_end was called + mocked_super_on_end.assert_called_once_with(readable_span_mock)