Skip to content

Commit

Permalink
Let's compute the root_fields_to_add_to_expanded_event only once per … (
Browse files Browse the repository at this point in the history
#642)

* Let's compute the root_fields_to_add_to_expanded_event only once per events to expand

* docs and changelog

* Update share/expand_event_list_from_field.py

Co-authored-by: kaiyan-sheng <[email protected]>

* Example in docs

* lint

* Update docs/en/aws-elastic-serverless-forwarder-configuration.asciidoc

Co-authored-by: DeDe Morton <[email protected]>

* Update docs/en/aws-elastic-serverless-forwarder-configuration.asciidoc

Co-authored-by: DeDe Morton <[email protected]>

* Update docs/en/aws-elastic-serverless-forwarder-configuration.asciidoc

Co-authored-by: DeDe Morton <[email protected]>

---------

Co-authored-by: kaiyan-sheng <[email protected]>
Co-authored-by: DeDe Morton <[email protected]>
  • Loading branch information
3 people authored Mar 13, 2024
1 parent 7b0c0ef commit 407271d
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### v1.13.1 - 2024/03/07
##### Features
* Add documentation and optimise performance for `root_fields_to_add_to_expanded_event` [#642](https://github.com/elastic/elastic-serverless-forwarder/pull/642)

### v1.13.0 - 2024/02/23
##### Features
* Go beyond 4096b limit on CF Parameter for event triggers on SAR deployment [#627](https://github.com/elastic/elastic-serverless-forwarder/pull/627)
Expand Down
42 changes: 41 additions & 1 deletion docs/en/aws-elastic-serverless-forwarder-configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ inputs:
- type: "s3-sqs"
id: "arn:aws:sqs:%REGION%:%ACCOUNT%:%QUEUENAME%"
expand_event_list_from_field: "Records"
# root_fields_to_add_to_expanded_event: "all"
# root_fields_to_add_to_expanded_event: ["owner", "logGroup", "logStream"]
outputs:
- type: "elasticsearch"
args:
Expand All @@ -288,10 +290,17 @@ inputs:

You can define `inputs.[].expand_event_list_from_field` as a string with the value of a key in the JSON that contains a list of elements that must be sent as events instead of the encompassing JSON.

To inject root fields from the JSON object into the expanded events, define `inputs.[].root_fields_to_add_to_expanded_event`. The config takes one of the following values:

* the literal string "**all**" to inject all root fields (except the field you are expanding events from). For example, `root_fields_to_add_to_expanded_event: "all"`
* a list of the root fields that you want to inject. For example, `root_fields_to_add_to_expanded_event: ["owner", "logGroup", "logStream"]`



NOTE: When <<aws-serverless-route-service-logs, routing service logs>>, any value set for the `expand_event_list_from_field` configuration parameter will be ignored, because this will be automatically handled by the Elastic Serverless Forwarder.

[discrete]
=== Example
=== Example without `root_fields_to_add_to_expanded_event`

With the following input:

Expand Down Expand Up @@ -319,6 +328,37 @@ If `expand_event_list_from_field` is set to `Records`, four events will be forwa
{"@timestamp": "2022-06-16T04:06:36.189Z", "message": "{\"key\": \"value #4\"}"}
----

[discrete]
=== Example with `root_fields_to_add_to_expanded_event`

With the following input:

[source, json]
----
{"Records":[{"key": "value #1"},{"key": "value #2"}], "field1": "value 1a", "field2": "value 2a"}
{"Records":[{"key": "value #3"},{"key": "value #4"}], "field1": "value 1b", "field2": "value 2b"}
----

If `expand_event_list_from_field` is set to `Records`, and `root_fields_to_add_to_expanded_event` to `all`, four events will be forwarded:

[source, json]
----
{"@timestamp": "2022-06-16T04:06:21.105Z", "message": "{\"key\": \"value #1\", \"field1\": \"value 1a\", \"field2\": \"value 2a\""}
{"@timestamp": "2022-06-16T04:06:27.204Z", "message": "{\"key\": \"value #2\", \"field1\": \"value 1a\", \"field2\": \"value 2a\""}
{"@timestamp": "2022-06-16T04:06:31.154Z", "message": "{\"key\": \"value #3\", \"field1\": \"value 1b\", \"field2\": \"value 2b\""}
{"@timestamp": "2022-06-16T04:06:36.189Z", "message": "{\"key\": \"value #4\", \"field1\": \"value 1b\", \"field2\": \"value 2b\""}
----

If `expand_event_list_from_field` is set to `Records`, and `root_fields_to_add_to_expanded_event` to `["field1"]`, four events will be forwarded:

[source, json]
----
{"@timestamp": "2022-06-16T04:06:21.105Z", "message": "{\"key\": \"value #1\", \"field1\": \"value 1a\""}
{"@timestamp": "2022-06-16T04:06:27.204Z", "message": "{\"key\": \"value #2\", \"field1\": \"value 1a\""}
{"@timestamp": "2022-06-16T04:06:31.154Z", "message": "{\"key\": \"value #3\", \"field1\": \"value 1b\""}
{"@timestamp": "2022-06-16T04:06:36.189Z", "message": "{\"key\": \"value #4\", \"field1\": \"value 1b\""}
----


[discrete]
[[aws-serverless-manage-multiline-messages]]
Expand Down
37 changes: 19 additions & 18 deletions share/expand_event_list_from_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,30 @@ def _expand_event_list_from_field(
offset_skew = self._last_event_expanded_offset + 1
events_list = events_list[offset_skew:]

# Let's compute the root_fields_to_add_to_expanded_event only once per events to expand
root_fields_to_add_to_expanded_event: dict[str, Any] = {}
if self._root_fields_to_add_to_expanded_event == "all":
root_fields_to_add_to_expanded_event = deepcopy(json_object)
del root_fields_to_add_to_expanded_event[self._field_to_expand_event_list_from]
# we want to add only a list of root fields
elif isinstance(self._root_fields_to_add_to_expanded_event, list):
for root_field_to_add_to_expanded_event in self._root_fields_to_add_to_expanded_event:
if root_field_to_add_to_expanded_event in json_object:
root_fields_to_add_to_expanded_event[root_field_to_add_to_expanded_event] = json_object[
root_field_to_add_to_expanded_event
]
else:
shared_logger.debug(
f"`{root_field_to_add_to_expanded_event}` field specified in "
f"`root_fields_to_add_to_expanded_event` parameter is not present at root level"
f" to expanded event not present at root level"
)

for event_n, event in enumerate(events_list):
if self._root_fields_to_add_to_expanded_event:
root_fields_to_add_to_expanded_event: dict[str, Any] = {}
# we can and want to add the root fields only in case the event is a not empty json object
if isinstance(event, dict) and len(event) > 0:
# we want to add all the root fields
if self._root_fields_to_add_to_expanded_event == "all":
root_fields_to_add_to_expanded_event = deepcopy(json_object)
del root_fields_to_add_to_expanded_event[self._field_to_expand_event_list_from]
else:
# we want to add only a list of root fields
assert isinstance(self._root_fields_to_add_to_expanded_event, list)
for root_field_to_add_to_expanded_event in self._root_fields_to_add_to_expanded_event:
if root_field_to_add_to_expanded_event in json_object:
root_fields_to_add_to_expanded_event[root_field_to_add_to_expanded_event] = (
json_object[root_field_to_add_to_expanded_event]
)
else:
shared_logger.debug(
f"`{root_field_to_add_to_expanded_event}` field to be added"
f" to expanded event not present at root level"
)

event.update(root_fields_to_add_to_expanded_event)
else:
shared_logger.debug("root fields to be added on a non json object event")
Expand Down

0 comments on commit 407271d

Please sign in to comment.