Skip to content

Commit

Permalink
Add a non-retryable error test and fix linter
Browse files Browse the repository at this point in the history
  • Loading branch information
zmoog committed Oct 7, 2024
1 parent b1d4642 commit e485725
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 228 deletions.
2 changes: 1 addition & 1 deletion share/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

from typing import Any, Callable, List, Optional, Union
from typing import Any, Callable, Optional, Union

import yaml

Expand Down
2 changes: 1 addition & 1 deletion shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import datetime
import uuid
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, Optional, Union

import elasticapm # noqa: F401
from elasticsearch import Elasticsearch
Expand Down
338 changes: 112 additions & 226 deletions tests/handlers/aws/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4421,229 +4421,115 @@ def test_es_non_indexable_dead_letter_index(self) -> None:
assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]

# def test_es_dead_letter_index_with_excluded_action_error(self) -> None:
# assert isinstance(self.elasticsearch, ElasticsearchContainer)
# assert isinstance(self.localstack, LocalStackContainer)
#
# sqs_queue_name = _time_based_id(suffix="source-sqs")
# sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url())
#
# dead_letter_index_name = "logs-generic-default-dli"
#
# sqs_queue_arn = sqs_queue["QueueArn"]
# sqs_queue_url = sqs_queue["QueueUrl"]
# sqs_queue_url_path = sqs_queue["QueueUrlPath"]
#
# config_yaml: str = f"""
# inputs:
# - type: sqs
# id: "{sqs_queue_arn}"
# tags: {self.default_tags}
# outputs:
# - type: "elasticsearch"
# args:
# elasticsearch_url: "{self.elasticsearch.get_url()}"
# es_dead_letter_index: "{dead_letter_index_name}"
# ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint}
# username: "{self.secret_arn}:username"
# password: "{self.secret_arn}:password"
# """
#
# config_file_path = "config.yaml"
# config_bucket_name = _time_based_id(suffix="config-bucket")
# _s3_upload_content_to_bucket(
# client=self.s3_client,
# content=config_yaml.encode("utf-8"),
# content_type="text/plain",
# bucket_name=config_bucket_name,
# key=config_file_path,
# )
#
# os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"
#
# fixtures = [
# _load_file_fixture("cloudwatch-log-1.json"),
# ]
#
# _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures))
#
# event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn)
# message_id = event["Records"][0]["messageId"]
#
# # Create pipeline to reject documents
# processors = {
# "processors": [
# {
# "fail": {
# "message": "test_es_dead_letter_index_with_excluded_action_error fail message",
# }
# },
# ]
# }
#
# self.elasticsearch.put_pipeline(
# id="test_es_dead_letter_index_with_excluded_action_error_fail_pipeline",
# body=processors,
# )
#
# self.elasticsearch.create_data_stream(name="logs-generic-default")
# self.elasticsearch.put_settings(
# index="logs-generic-default",
# body={"index.default_pipeline": "test_es_dead_letter_index_with_excluded_action_error_fail_pipeline"},
# )
#
# self.elasticsearch.refresh(index="logs-generic-default")
#
# self.elasticsearch.create_data_stream(name=dead_letter_index_name)
#
# ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
# first_call = handler(event, ctx) # type:ignore
#
# assert first_call == "completed"
#
# # Test document has been rejected from target index
# self.elasticsearch.refresh(index="logs-generic-default")
#
# assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0
#
# # Test event does not go into the dead letter queue
# assert self.elasticsearch.exists(index=dead_letter_index_name) is True
#
# self.elasticsearch.refresh(index=dead_letter_index_name)
#
# assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0
#
# # Test event has been redirected into the replay queue
# events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn)
# assert len(events["Records"]) == 1
#
# first_body: dict[str, Any] = json_parser(events["Records"][0]["body"])
#
# assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n")
# assert first_body["event_payload"]["log"]["offset"] == 0
# assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path
# assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name
# assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id
# assert first_body["event_payload"]["cloud"]["provider"] == "aws"
# assert first_body["event_payload"]["cloud"]["region"] == "us-east-1"
# assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
# assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]
#
# def test_es_dead_letter_index_with_included_action_error(self) -> None:
# assert isinstance(self.elasticsearch, ElasticsearchContainer)
# assert isinstance(self.localstack, LocalStackContainer)
#
# sqs_queue_name = _time_based_id(suffix="source-sqs")
# sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url())
#
# dead_letter_index_name = "logs-generic-default-dli"
#
# sqs_queue_arn = sqs_queue["QueueArn"]
# sqs_queue_url = sqs_queue["QueueUrl"]
# sqs_queue_url_path = sqs_queue["QueueUrlPath"]
#
# config_yaml: str = f"""
# inputs:
# - type: sqs
# id: "{sqs_queue_arn}"
# tags: {self.default_tags}
# outputs:
# - type: "elasticsearch"
# args:
# elasticsearch_url: "{self.elasticsearch.get_url()}"
# es_dead_letter_index: "{dead_letter_index_name}"
# es_dead_letter_forward_errors:
# - fail_processor_exception
# ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint}
# username: "{self.secret_arn}:username"
# password: "{self.secret_arn}:password"
# """
#
# config_file_path = "config.yaml"
# config_bucket_name = _time_based_id(suffix="config-bucket")
# _s3_upload_content_to_bucket(
# client=self.s3_client,
# content=config_yaml.encode("utf-8"),
# content_type="text/plain",
# bucket_name=config_bucket_name,
# key=config_file_path,
# )
#
# os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"
#
# fixtures = [
# _load_file_fixture("cloudwatch-log-1.json"),
# ]
#
# _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures))
#
# event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn)
# message_id = event["Records"][0]["messageId"]
#
# # Create pipeline to reject documents
# processors = {
# "processors": [
# {
# "fail": {
# "message": "test_es_dead_letter_index_with_included_action_error fail message",
# }
# },
# ]
# }
#
# self.elasticsearch.put_pipeline(
# id="test_es_dead_letter_index_with_included_action_error_fail_pipeline",
# body=processors,
# )
#
# self.elasticsearch.create_data_stream(name="logs-generic-default")
# self.elasticsearch.put_settings(
# index="logs-generic-default",
# body={"index.default_pipeline": "test_es_dead_letter_index_with_included_action_error_fail_pipeline"},
# )
#
# self.elasticsearch.refresh(index="logs-generic-default")
#
# self.elasticsearch.create_data_stream(name=dead_letter_index_name)
#
# ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
# first_call = handler(event, ctx) # type:ignore
#
# assert first_call == "completed"
#
# # Test document has been rejected from target index
# self.elasticsearch.refresh(index="logs-generic-default")
#
# assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0
#
# # Test document has been redirected to dli
# assert self.elasticsearch.exists(index=dead_letter_index_name) is True
#
# self.elasticsearch.refresh(index=dead_letter_index_name)
#
# assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 1
#
# res = self.elasticsearch.search(index=dead_letter_index_name, sort="_seq_no")
#
# assert res["hits"]["total"] == {"value": 1, "relation": "eq"}
#
# assert (
# res["hits"]["hits"][0]["_source"]["error"]["message"]
# == "test_es_dead_letter_index_with_included_action_error fail message"
# )
# assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception"
# assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500
# dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"])
# assert dead_letter_message["log"]["offset"] == 0
# assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path
# assert dead_letter_message["aws"]["sqs"]["name"] == sqs_queue_name
# assert dead_letter_message["aws"]["sqs"]["message_id"] == message_id
# assert dead_letter_message["cloud"]["provider"] == "aws"
# assert dead_letter_message["cloud"]["region"] == "us-east-1"
# assert dead_letter_message["cloud"]["account"]["id"] == "000000000000"
# assert dead_letter_message["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]
#
# # Test event does not go into the replay queue
# events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn)
#
# assert len(events["Records"]) == 0
def test_es_dead_letter_index_with_retryable_errors(self) -> None:
"""
Test that retryable errors are not redirected to the dead letter index (DLI).
"""
assert isinstance(self.elasticsearch, ElasticsearchContainer)
assert isinstance(self.localstack, LocalStackContainer)

sqs_queue_name = _time_based_id(suffix="source-sqs")
sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url())

dead_letter_index_name = "logs-generic-default-dli"

sqs_queue_arn = sqs_queue["QueueArn"]
sqs_queue_url = sqs_queue["QueueUrl"]
sqs_queue_url_path = sqs_queue["QueueUrlPath"]

config_yaml: str = f"""
inputs:
- type: sqs
id: "{sqs_queue_arn}"
tags: {self.default_tags}
outputs:
- type: "elasticsearch"
args:
# This IP address is non-routable and
# will always result in a connection failure.
elasticsearch_url: "0.0.0.0:9200"
es_dead_letter_index: "{dead_letter_index_name}"
ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint}
username: "{self.secret_arn}:username"
password: "{self.secret_arn}:password"
"""

config_file_path = "config.yaml"
config_bucket_name = _time_based_id(suffix="config-bucket")
_s3_upload_content_to_bucket(
client=self.s3_client,
content=config_yaml.encode("utf-8"),
content_type="text/plain",
bucket_name=config_bucket_name,
key=config_file_path,
)

os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"

fixtures = [
_load_file_fixture("cloudwatch-log-1.json"),
]

_sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures))

event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn)
message_id = event["Records"][0]["messageId"]

# Create pipeline to reject documents
processors = {
"processors": [
{
"fail": {
"message": "test_es_dead_letter_index_with_retryable_errors fail message",
}
},
]
}

self.elasticsearch.put_pipeline(
id="test_es_dead_letter_index_with_retryable_errors_fail_pipeline",
body=processors,
)

self.elasticsearch.create_data_stream(name="logs-generic-default")
self.elasticsearch.put_settings(
index="logs-generic-default",
body={"index.default_pipeline": "test_es_dead_letter_index_with_retryable_errors_fail_pipeline"},
)

self.elasticsearch.refresh(index="logs-generic-default")

self.elasticsearch.create_data_stream(name=dead_letter_index_name)

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
first_call = handler(event, ctx) # type:ignore

assert first_call == "completed"

# Test document has been rejected from target index
self.elasticsearch.refresh(index="logs-generic-default")

assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0

# Test event does not go into the dead letter queue
assert self.elasticsearch.exists(index=dead_letter_index_name) is True

self.elasticsearch.refresh(index=dead_letter_index_name)

assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0

# Test event has been redirected into the replay queue
events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn)
assert len(events["Records"]) == 1

first_body: dict[str, Any] = json_parser(events["Records"][0]["body"])

assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n")
assert first_body["event_payload"]["log"]["offset"] == 0
assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path
assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name
assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id
assert first_body["event_payload"]["cloud"]["provider"] == "aws"
assert first_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]

0 comments on commit e485725

Please sign in to comment.