Skip to content

Commit

Permalink
Prevent duplicate _id events from reaching the replay queue (#729)
Browse files Browse the repository at this point in the history
* Skip duplicate events when sending to replay queue

* Version conflict global id

* Fix formatter

* Fix formatter v2

* Fix integration test [test_replay] to not use duplicated ids

* Fix linter

* Fin linter v2

* Fix integration tests

* Add changelog and update version
  • Loading branch information
emilioalvap authored Jul 10, 2024
1 parent 606c4c2 commit cce7939
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 107 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### v1.16.0 - 2024/07/09
##### Features
* Prevent duplicate _id events from reaching the replay queue [729](https://github.com/elastic/elastic-serverless-forwarder/pull/729).

### v1.15.0 - 2024/05/29
##### Features
* Enable multiple outputs for each input [725](https://github.com/elastic/elastic-serverless-forwarder/pull/725).
Expand Down
2 changes: 1 addition & 1 deletion share/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

version = "1.15.0"
version = "1.16.0"
6 changes: 6 additions & 0 deletions shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

_EVENT_BUFFERED = "_EVENT_BUFFERED"
_EVENT_SENT = "_EVENT_SENT"
_VERSION_CONFLICT = 409


class JSONSerializer(Serializer):
Expand Down Expand Up @@ -166,6 +167,11 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None:
shared_logger.warning(
"elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]}
)

if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT:
# Skip duplicate events on replay queue
continue

shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]})
if self._replay_handler is not None:
self._replay_handler(self._output_destination, self._replay_args, action_failed[0])
Expand Down
205 changes: 99 additions & 106 deletions tests/handlers/aws/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,42 +956,47 @@ def test_replay(self) -> None:
hash_kinesis_record = get_hex_prefix(f"stream-{kinesis_stream_name}-PartitionKey-{sequence_number}")
prefix_kinesis = f"{int(float(event_timestamps_kinesis_records[0]) * 1000)}-{hash_kinesis_record}"

# Create an expected id for s3-sqs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_s3_first}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)

# Create an expected id for sqs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_sqs}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)
# Create pipeline to reject documents
processors = {
"processors": [
{
"fail": {
"message": "test_replay_fail_pipeline_s3",
"if": f'ctx["_id"] == "{prefix_s3_first}-000000000000"',
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_sqs",
"if": f'ctx["_id"] == "{prefix_sqs}-000000000000"',
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_cloudwatch",
"if": f'ctx["_id"] == "{prefix_cloudwatch_logs}-000000000000"',
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_kinesis",
"if": f'ctx["_id"] == "{prefix_kinesis}-000000000000"',
}
},
]
}

# Create an expected id for cloudwatch-logs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_cloudwatch_logs}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)
self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors)

# Create an expected id for kinesis-data-stream so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_kinesis}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
self.elasticsearch.create_data_stream(name="logs-generic-default")
self.elasticsearch.put_settings(
index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"}
)

self.elasticsearch.refresh(index="logs-generic-default")

res = self.elasticsearch.search(index="logs-generic-default")
assert res["hits"]["total"] == {"value": 4, "relation": "eq"}
assert res["hits"]["total"] == {"value": 0, "relation": "eq"}

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)

Expand All @@ -1002,20 +1007,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1045,20 +1036,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand All @@ -1084,20 +1061,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1127,20 +1090,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1170,28 +1119,10 @@ def test_replay(self) -> None:

self.elasticsearch.refresh(index="logs-generic-default")

# Remove the expected id for s3-sqs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_s3_first}-000000000000"]}}}
)

# Remove the expected id for sqs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_sqs}-000000000000"]}}}
)

# Remove the expected id for cloudwatch logs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default",
body={"query": {"ids": {"values": [f"{prefix_cloudwatch_logs}-000000000000"]}}},
)

# Remove the expected id for kinesis data stream so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default",
body={"query": {"ids": {"values": [f"{prefix_kinesis}-000000000000"]}}},
)
# Remove pipeline processors
processors = {"processors": []}

self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors)
self.elasticsearch.refresh(index="logs-generic-default")

# let's update the config file so that logstash won't fail anymore
Expand Down Expand Up @@ -4202,3 +4133,65 @@ def test_ls_wrong_auth_creds(self) -> None:
assert second_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert second_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert second_body["event_payload"]["tags"] == ["forwarded", "tag1", "tag2", "tag3"]

def test_es_version_conflict_exception(self) -> None:
assert isinstance(self.elasticsearch, ElasticsearchContainer)
assert isinstance(self.localstack, LocalStackContainer)

sqs_queue_name = _time_based_id(suffix="source-sqs")
sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url())

sqs_queue_arn = sqs_queue["QueueArn"]
sqs_queue_url = sqs_queue["QueueUrl"]

config_yaml: str = f"""
inputs:
- type: sqs
id: "{sqs_queue_arn}"
tags: {self.default_tags}
outputs:
- type: "elasticsearch"
args:
elasticsearch_url: "{self.elasticsearch.get_url()}"
ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint}
username: "{self.secret_arn}:username"
password: "{self.secret_arn}:password"
"""

config_file_path = "config.yaml"
config_bucket_name = _time_based_id(suffix="config-bucket")
_s3_upload_content_to_bucket(
client=self.s3_client,
content=config_yaml.encode("utf-8"),
content_type="text/plain",
bucket_name=config_bucket_name,
key=config_file_path,
)

os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"

fixtures = [
_load_file_fixture("cloudwatch-log-1.json"),
]

_sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures))

event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn)

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
first_call = handler(event, ctx) # type:ignore

assert first_call == "completed"

# Index event a second time to trigger version conflict
second_call = handler(event, ctx) # type:ignore

assert second_call == "completed"

self.elasticsearch.refresh(index="logs-generic-default")

assert self.elasticsearch.count(index="logs-generic-default")["count"] == 1

# Test no duplicate events end in the replay queue
events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn)
assert len(events["Records"]) == 0
6 changes: 6 additions & 0 deletions tests/testcontainers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,9 @@ def index(self, **kwargs: Any) -> dict[str, Any]:
self._index_indices.add(kwargs["index"])

return self.es_client.index(**kwargs)

def create_data_stream(self, **kwargs: Any) -> dict[str, Any]:
if "name" in kwargs:
self._index_indices.add(kwargs["name"])

return self.es_client.indices.create_data_stream(**kwargs)

0 comments on commit cce7939

Please sign in to comment.