Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RD-11321-otel-distros-python-and-node-filter-out-empty-sqs-polls #481

Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a960fc4
fix: pip install failed on mac
sagivoululumigo Sep 6, 2023
a063aa7
feat: do not export empty sqs polling spans
sagivoululumigo Sep 7, 2023
7b8613f
chore: auto format code
sagivoululumigo Sep 7, 2023
8aaa755
fix: pip install failed on mac
sagivoululumigo Sep 6, 2023
3ab36de
feat: do not export empty sqs polling spans
sagivoululumigo Sep 7, 2023
d69b6f5
chore: auto format code
sagivoululumigo Sep 7, 2023
24dbb03
test: bool env var retrival edge cases
sagivoululumigo Sep 7, 2023
d48e913
test: new span processor and empty sqs span skipping
sagivoululumigo Sep 7, 2023
40c68e8
chore: auto format code
sagivoululumigo Sep 7, 2023
220c47f
resolve merge conflict
sagivoululumigo Sep 7, 2023
ff1a6a1
fix: remove debugging prints
sagivoululumigo Sep 7, 2023
05f11d5
refactor: rename NO_EXPORT attribute to SKIP_EXPORT
sagivoululumigo Sep 10, 2023
efda5e9
fix: wrong attribute name used
sagivoululumigo Sep 10, 2023
88b4406
test: integration test empty sqs polling span skipping
sagivoululumigo Sep 10, 2023
73887b3
chore: fix linting errors
sagivoululumigo Sep 10, 2023
be82dde
fix: change skipping empty sqs poll log line to info
sagivoululumigo Sep 10, 2023
7b96321
fix: edge case env var with None value
sagivoululumigo Sep 10, 2023
65b9113
chore: auto format code
sagivoululumigo Sep 10, 2023
eeb8ce6
fix: change env var name to have lumigo prefix
sagivoululumigo Sep 10, 2023
fbdb95c
docs: update README.md with filter empty sqs messages feature
sagivoululumigo Sep 10, 2023
302dfb0
docs: detailed possible values for LUMIGO_AUTO_FILTER_EMPTY_SQS
sagivoululumigo Sep 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The `lumigo_opentelemetry` package additionally supports the following configura
* We support more granular masking using the following parameters. If not given, the above configuration is the fallback: `LUMIGO_SECRET_MASKING_REGEX_HTTP_REQUEST_BODIES`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_REQUEST_HEADERS`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_RESPONSE_BODIES`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_RESPONSE_HEADERS`, `LUMIGO_SECRET_MASKING_REGEX_HTTP_QUERY_PARAMS`, `LUMIGO_SECRET_MASKING_REGEX_ENVIRONMENT`.
* `LUMIGO_SWITCH_OFF=true`: This option disables the Lumigo OpenTelemetry distro entirely; no instrumentation will be injected, no tracing data will be collected.
* `LUMIGO_REPORT_DEPENDENCIES=false`: This option disables the built-in dependency reporting to Lumigo SaaS. For more information, refer to the [Automated dependency reporting](#automated-dependency-reporting) section.
* `LUMIGO_AUTO_FILTER_EMPTY_SQS`: This option enables the automatic filtering of empty SQS messages from being sent to Lumigo SaaS.
sagivoululumigo marked this conversation as resolved.
Show resolved Hide resolved

### Execution Tags

Expand Down Expand Up @@ -340,6 +341,15 @@ for message in response.get("Messages", []):

Without the scope provided by the iterator over `response["Messages"]`, `span_1` would be without a parent span, and that would result in a separate invocation and a separate transaction in Lumigo.

### Filtering out empty SQS messages

A common pattern in SQS-based applications is to continuously poll an SQS queue for messages,
and to process them as they arrive.
In order not to clutter the Lumigo Dashboard with empty SQS polling messages, the default behavior is to filter them
out from being sent to Lumigo.

You can change this behavior by setting the environment variable `LUMIGO_AUTO_FILTER_EMPTY_SQS` to `false`.

## Testing

We use [nox](https://pypi.org/project/nox/) for setting up and running our tests.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pre-commit==2.20.0
psutil==5.9.1
pytest==7.1.1
pytest-cov==3.0.0
pyyaml==6.0
pyyaml==6.0.1
requests==2.27.1
types-attrs==19.1.0
types-boto==2.49.17
Expand Down
5 changes: 3 additions & 2 deletions src/lumigo_opentelemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def init() -> Dict[str, Any]:
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import SpanLimits, TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from lumigo_opentelemetry.resources.span_processor import LumigoSpanProcessor

DEFAULT_LUMIGO_ENDPOINT = (
"https://ga-otlp.lumigo-tracer-edge.golumigo.com/v1/traces"
Expand Down Expand Up @@ -125,7 +126,7 @@ def init() -> Dict[str, Any]:

if lumigo_token:
tracer_provider.add_span_processor(
BatchSpanProcessor(
LumigoSpanProcessor(
OTLPSpanExporter(
endpoint=lumigo_endpoint,
headers={"Authorization": f"LumigoToken {lumigo_token}"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
extract_region_from_arn,
get_resource_fullname,
)
from lumigo_opentelemetry.resources.span_processor import set_span_skip_export
from lumigo_opentelemetry import logger
from lumigo_opentelemetry.libs.general_utils import get_boolean_env_var
from lumigo_opentelemetry.libs.environment_variables import AUTO_FILTER_EMPTY_SQS


class AwsParser:
Expand Down Expand Up @@ -133,6 +137,22 @@ def parse_request(
}
span.set_attributes(attributes)

@staticmethod
def _should_skip_empty_sqs_polling_response(
operation_name: str, result: Dict[Any, Any]
) -> bool:
"""
checks the sqs response & returns true if the request receive messages from SQS but no messages were returned
"""

no_messages = not result or not result.get("Messages", None)
sqs_poll = operation_name == "ReceiveMessage"
return (
sqs_poll
and no_messages
and get_boolean_env_var(AUTO_FILTER_EMPTY_SQS, True)
)

@staticmethod
def parse_response(
span: Span, service_name: str, operation_name: str, result: Dict[Any, Any]
Expand All @@ -145,6 +165,14 @@ def parse_response(
{"lumigoData": json.dumps({"trigger": trigger_details})}
)

# Filter out sqs polls with empty response
if SqsParser._should_skip_empty_sqs_polling_response(operation_name, result):
logger.info(
"Not tracing empty SQS polling requests "
f"(override by setting the {AUTO_FILTER_EMPTY_SQS} env var to false)"
)
set_span_skip_export(span)


class LambdaParser(AwsParser):
@staticmethod
Expand Down
1 change: 1 addition & 0 deletions src/lumigo_opentelemetry/libs/environment_variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AUTO_FILTER_EMPTY_SQS = "LUMIGO_AUTO_FILTER_EMPTY_SQS"
27 changes: 27 additions & 0 deletions src/lumigo_opentelemetry/libs/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,30 @@ def get_max_size() -> int:
os.environ.get(OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, DEFAULT_MAX_ENTRY_SIZE),
)
)


def get_boolean_env_var(env_var_name: str, default: bool = False) -> bool:
"""
This function return the boolean value of the given environment variable.
If this values doesn't exist, return default.

@param env_var_name: The env var to get (case-sensitive)
@param default: Default value if env var is not set
@return: The boolean value of the env var
"""

env_var_value = os.environ.get(env_var_name, str(default))
env_var_value = (
env_var_value.lower() if isinstance(env_var_value, str) else env_var_value
)

is_truth_value = env_var_value == "true"
is_false_value = env_var_value == "false"
if not is_truth_value and not is_false_value:
logger.debug(
f'Invalid boolean value for env var "{env_var_name}", '
f'defaulting to value "{str(default).lower()}"'
)
return default

return is_truth_value
40 changes: 40 additions & 0 deletions src/lumigo_opentelemetry/resources/span_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from opentelemetry.trace import Span
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from lumigo_opentelemetry import logger


# A span attributes that if is set to True, the span will not be exported
SKIP_EXPORT_SPAN_ATTRIBUTE = "SKIP_EXPORT"
sagivoululumigo marked this conversation as resolved.
Show resolved Hide resolved


class LumigoSpanProcessor(BatchSpanProcessor):
def on_end(self, span: ReadableSpan) -> None:
if should_skip_exporting_span(span):
logger.debug("Not exporting span because it has NO_EXPORT=True attribute")
return

super().on_end(span)


def should_skip_exporting_span(span: ReadableSpan) -> bool:
"""
Given a span, returns an answer if the span should be exported or not.
@param span: A readable span to check
@return: True if the span should not be exported, False otherwise
"""
return span.attributes.get(SKIP_EXPORT_SPAN_ATTRIBUTE, False) is True


def set_span_skip_export(span: Span, skip_export: bool = True) -> None:
"""
marks the span as a span not intended for export (for example in spans that create a lot of noise and customers
do not want to trace)

@param span: The span to mark (The span is altered in place)
@param skip_export: Should the span be exported or not (default is True, the span will not be exported)
@return:
"""

span.set_attributes({SKIP_EXPORT_SPAN_ATTRIBUTE: skip_export})
5 changes: 5 additions & 0 deletions src/test/integration/boto3-sqs/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ def run():
client = boto3.client("sqs", region_name="eu-central-1")
queue = client.create_queue(QueueName="test")

# Simulate polling an empty sqs queue
client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1)
client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1)
client.receive_message(QueueUrl=queue["QueueUrl"], MaxNumberOfMessages=1)

client.send_message(
QueueUrl=queue["QueueUrl"],
MessageBody="Message_1",
Expand Down
27 changes: 26 additions & 1 deletion src/test/integration/boto3-sqs/tests/test_boto3_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
class TestBoto3SqsSpans(unittest.TestCase):
def test_boto3_instrumentation(self):
spans_container = SpansContainer.parse_spans_from_file()
self.assertEqual(9, len(spans_container.spans))
self.assertEqual(12, len(spans_container.spans))

[
create_queue_span,
empty_sqs_poll_1_span,
empty_sqs_poll_2_span,
empty_sqs_poll_3_span,
send_message_1_span,
send_message_2_span,
receive_message_1_span,
Expand All @@ -21,6 +24,28 @@ def test_boto3_instrumentation(self):
after_iterator_break_span,
] = spans_container.spans

# Make sure that all the empty polling spans are marked as skipped
for span in [
empty_sqs_poll_1_span,
empty_sqs_poll_2_span,
empty_sqs_poll_3_span,
]:
self.assertIsNotNone(span.get("attributes"))
self.assertEqual(span["attributes"].get("SKIP_EXPORT"), True)

# Make sure that other spans are not marked as skipped
for span in [
create_queue_span,
send_message_1_span,
send_message_2_span,
receive_message_1_span,
receive_message_2_span,
consume_message_2_span,
consume_message_1_span,
receive_message_2_span,
]:
self.assertNotEqual(span.get("attributes", {}).get("SKIP_EXPORT"), True)

self.assertEqual(create_queue_span["name"], "SQS.CreateQueue")
self.assertIsNone(create_queue_span["parent_id"])

Expand Down
108 changes: 108 additions & 0 deletions src/test/unit/instrumentations/botocore/test_parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging

import pytest
from unittest.mock import Mock, patch

from lumigo_opentelemetry.instrumentations.botocore.parsers import SqsParser


EMPTY_SQS_RESULT_1 = {}
EMPTY_SQS_RESULT_2 = {"Messages": []}
NON_EMPTY_SQS_RESULT = {"Messages": [{"MessageId": "1234", "Body": "test"}]}


@pytest.mark.parametrize(
"env_var_value, operation, result, should_skip",
[
# Check that empty sqs polls are skipped
("true", "ReceiveMessage", EMPTY_SQS_RESULT_1, True),
("true", "ReceiveMessage", EMPTY_SQS_RESULT_2, True),
# Check that non-empty polls are not skipped
("true", "ReceiveMessage", NON_EMPTY_SQS_RESULT, False),
# Check that other operations are not skipped
("true", "DeleteMessage", EMPTY_SQS_RESULT_1, False),
("true", "DeleteMessageBatch", EMPTY_SQS_RESULT_1, False),
("true", "SendMessage", EMPTY_SQS_RESULT_1, False),
("true", "SendMessageBatch", EMPTY_SQS_RESULT_1, False),
("true", "UnknownOperation", EMPTY_SQS_RESULT_1, False),
("true", None, EMPTY_SQS_RESULT_1, False),
# Check that empty sqs polls are not skipped if the env var is set to false
("false", "ReceiveMessage", EMPTY_SQS_RESULT_1, False),
("false", "ReceiveMessage", EMPTY_SQS_RESULT_2, False),
# Check that non-empty polls are not skipped if the env var is set to false
("false", "ReceiveMessage", NON_EMPTY_SQS_RESULT, False),
# Check that the default behavior is to skip empty sqs polls
(None, "ReceiveMessage", EMPTY_SQS_RESULT_1, True),
(None, "ReceiveMessage", EMPTY_SQS_RESULT_2, True),
("UnsupportedEnvVarValue", "ReceiveMessage", EMPTY_SQS_RESULT_2, True),
],
)
def test_sqs_skip_sqs_response(
env_var_value, operation, result, should_skip, monkeypatch
):
if env_var_value is not None:
monkeypatch.setenv("LUMIGO_AUTO_FILTER_EMPTY_SQS", env_var_value)

assert (
SqsParser._should_skip_empty_sqs_polling_response(operation, result)
== should_skip
)


@patch(
"lumigo_opentelemetry.instrumentations.botocore.parsers.SqsParser._should_skip_empty_sqs_polling_response"
)
def test_parse_sqs_response_skipping_empty_polls_outputs_log(should_skip_mock, caplog):
should_skip_mock.return_value = True
span = Mock(set_attribute=Mock())
service_name = "sqs"
operation_name = "ReceiveMessage"
result = {
"ResponseMetadata": {
"RequestId": "54bf0dab-cfab-5fa5-b284-50d83403c94c",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "54bf0dab-cfab-5fa5-b284-50d83403c94c",
"date": "Thu, 07 Sep 2023 16:25:12 GMT",
"content-type": "text/xml",
"content-length": "240",
"connection": "keep-alive",
},
"RetryAttempts": 0,
}
}

with caplog.at_level(logging.INFO):
SqsParser.parse_response(span, service_name, operation_name, result)

assert "not tracing empty sqs polling requests" in caplog.text.lower()


@patch(
"lumigo_opentelemetry.instrumentations.botocore.parsers.SqsParser._should_skip_empty_sqs_polling_response"
)
def test_parse_sqs_response_not_skipping_polls_no_output_log(should_skip_mock, caplog):
should_skip_mock.return_value = False
span = Mock(set_attribute=Mock())
service_name = "sqs"
operation_name = "ReceiveMessage"
result = {
"ResponseMetadata": {
"RequestId": "54bf0dab-cfab-5fa5-b284-50d83403c94c",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "54bf0dab-cfab-5fa5-b284-50d83403c94c",
"date": "Thu, 07 Sep 2023 16:25:12 GMT",
"content-type": "text/xml",
"content-length": "240",
"connection": "keep-alive",
},
"RetryAttempts": 0,
}
}

SqsParser.parse_response(span, service_name, operation_name, result)

assert "not tracing empty sqs polling requests" not in caplog.text.lower()

# Make sure that there is an info log
31 changes: 30 additions & 1 deletion src/test/unit/libs/test_general_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from lumigo_opentelemetry.libs.general_utils import get_max_size
import pytest

from lumigo_opentelemetry.libs.general_utils import get_max_size, get_boolean_env_var


def test_get_max_size_otel_span_attr_limit_is_set(monkeypatch):
Expand All @@ -19,3 +21,30 @@ def test_get_max_size_both_env_vars_are_set(monkeypatch):

def test_get_max_size_get_default_value():
assert get_max_size() == 2048


@pytest.mark.parametrize(
"env_var_value,default,expected_result",
[
# Normal truth values
("true", False, True),
("True", False, True),
("TRUE", False, True),
# Normal false values
("false", True, False),
("False", True, False),
("FALSE", True, False),
# Invalid values, use the default
("RandomValue", False, False),
("RandomValue", True, True),
# Empty values, use the default
(None, False, False),
(None, True, True),
],
)
def test_get_boolean_env_var(env_var_value, default, expected_result, monkeypatch):
"""Try getting a boolean value from env vars, and check all the different options work"""

if env_var_value is not None:
monkeypatch.setenv("TEST_VAR", env_var_value)
assert get_boolean_env_var("TEST_VAR", default=default) is expected_result
Loading