Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dead letter index: align error field to ECS and do not forward retryable errors #793

Merged
merged 27 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
30512d3
Move error field to error.message and error.type
kaiyan-sheng Sep 12, 2024
f08b49a
Move error parsing into a helper
zmoog Sep 17, 2024
089b19e
merge action and error into one dict
zmoog Sep 17, 2024
1080814
Cleanup
zmoog Sep 26, 2024
2ac4cdf
Fix http.response.status_code and clean up
zmoog Sep 26, 2024
feae60c
Set error.type with the exception type name as str
zmoog Sep 26, 2024
f37a3c6
Fix error.type
zmoog Sep 26, 2024
8c937b7
Test http.response.status_code
zmoog Sep 26, 2024
b3a85b2
Do not index connection errors and filter by type
zmoog Sep 30, 2024
cf2e156
Add es_dead_letter_forward_errors to config
zmoog Sep 30, 2024
e2f1abc
Fix private var name
zmoog Sep 30, 2024
0375d2f
Add logs to DLI
zmoog Sep 30, 2024
2ff886d
Logging
zmoog Sep 30, 2024
227d8a5
More logs
zmoog Sep 30, 2024
3109e90
Fix tests
zmoog Sep 30, 2024
afd9e40
Add test_es_dead_letter_index_with_included_action_error test
zmoog Sep 30, 2024
1fd2c03
Fix linter objections
zmoog Sep 30, 2024
6fdce15
Add _parse_error() tests
zmoog Sep 30, 2024
49d7b01
Fix test_es_dead_letter_index_with_included_action_error
zmoog Sep 30, 2024
82e663f
Clean up debug loggers
zmoog Oct 1, 2024
572e764
Add http.response.status_code check on parse_error
zmoog Oct 1, 2024
0e01d22
Update docs
zmoog Oct 2, 2024
6b83e5c
Clarify es_dead_letter_forward_errors docs
zmoog Oct 4, 2024
b1d4642
Exclude retryable errors from DLI
zmoog Oct 7, 2024
e485725
Add a non-retryable error test and fix linter
zmoog Oct 7, 2024
05f3290
Update retryable status codes list
zmoog Oct 7, 2024
f41a5f9
Update DLI docs and docstring
zmoog Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 100 additions & 11 deletions shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# you may not use this file except in compliance with the Elastic License 2.0.

import datetime
import http
import uuid
from typing import Any, Dict, Optional, Union

Expand All @@ -21,7 +22,10 @@

_EVENT_BUFFERED = "_EVENT_BUFFERED"
_EVENT_SENT = "_EVENT_SENT"
_VERSION_CONFLICT = 409
# List of HTTP status codes that are considered retryable
_retryable_http_status_codes = [
http.HTTPStatus.TOO_MANY_REQUESTS,
]


class JSONSerializer(Serializer):
Expand Down Expand Up @@ -172,11 +176,13 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio
"elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]}
)

if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT:
if "status" in error["create"] and error["create"]["status"] == http.HTTPStatus.CONFLICT:
# Skip duplicate events on dead letter index and replay queue
continue

failed.append({"error": error["create"]["error"], "action": action_failed[0]})
failed_error = {"action": action_failed[0]} | self._parse_error(error["create"])

failed.append(failed_error)

if len(failed) > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)})
Expand All @@ -185,6 +191,52 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio

return failed

def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]:
"""
Parses the error response from Elasticsearch and returns a
standardised error field.

The error field is a dictionary with the following keys:

- `message`: The error message
- `type`: The error type

If the error is not recognised, the `message` key is set
to "Unknown error".

It also sets the status code in the http field if it is present
as a number in the response.
"""
field: dict[str, Any] = {"error": {"message": "Unknown error", "type": "unknown"}}

if "status" in error and isinstance(error["status"], int):
# Collecting the HTTP response status code in the
# error field, if present, and the type is an integer.
#
# Sometimes the status code is a string, for example,
# when the connection to the server fails.
field["http"] = {"response": {"status_code": error["status"]}}

if "error" not in error:
return field

if isinstance(error["error"], str):
zmoog marked this conversation as resolved.
Show resolved Hide resolved
# Can happen with connection errors.
field["error"]["message"] = error["error"]
if "exception" in error:
# The exception field is usually an Exception object,
# so we convert it to a string.
field["error"]["type"] = str(type(error["exception"]))
elif isinstance(error["error"], dict):
# Can happen with status 5xx errors.
# In this case, we look for the "reason" and "type" fields.
if "reason" in error["error"]:
field["error"]["message"] = error["error"]["reason"]
if "type" in error["error"]:
field["error"]["type"] = error["error"]["type"]

return field

def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None:
self._event_id_generator = event_id_generator

Expand Down Expand Up @@ -243,26 +295,56 @@ def flush(self) -> None:
return

def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]:
"""
Send the failed actions to the dead letter index (DLI).

This function attempts to forward failed actions to the DLI, but may not do so
for one of the following reasons:

1. The action response does not have an HTTP status (e.g., the connection failed).
2. The list of action errors to forward is not empty, and the action error type is not in the list.
3. The action could not be encoded for indexing in the DLI.
4. The action failed indexing attempt in the DLI.

Args:
actions (list[Any]): A list of actions to be processed.

Returns:
list[Any]: A list of actions that were not indexed in the DLI.
constanca-m marked this conversation as resolved.
Show resolved Hide resolved
"""
non_indexed_actions: list[Any] = []
encoded_actions = []
dead_letter_errors: list[Any] = []

for action in actions:
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"http" not in action # no http status: connection error
or action["http"]["response"]["status_code"] in _retryable_http_status_codes
):
# We don't want to forward this action to
# the dead letter index.
#
# Add the action to the list of non-indexed
# actions and continue with the next one.
non_indexed_actions.append(action)
continue

# Reshape event to dead letter index
encoded = self._encode_dead_letter(action)
if not encoded:
shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action})
dead_letter_errors.append(action)
non_indexed_actions.append(action)

encoded_actions.append(encoded)

# If no action can be encoded, return original action list as failed
if len(encoded_actions) == 0:
return dead_letter_errors
return non_indexed_actions

errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs)
failed = self._handle_outcome(actions=encoded_actions, errors=errors)

if not isinstance(failed, list) or len(failed) == 0:
return dead_letter_errors
return non_indexed_actions

for action in failed:
event_payload = self._decode_dead_letter(action)
Expand All @@ -271,25 +353,32 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]:
shared_logger.error("cannot decode dead letter index event from payload", extra={"action": action})
continue

dead_letter_errors.append(event_payload)
non_indexed_actions.append(event_payload)

return dead_letter_errors
return non_indexed_actions

def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in outcome or "error" not in outcome:
return {}

# Assign random id in case bulk() results in error, it can be matched to the original
# action
return {
encoded = {
"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"_id": f"{uuid.uuid4()}",
"_id": str(uuid.uuid4()),
"_index": self._es_dead_letter_index,
"_op_type": "create",
"message": json_dumper(outcome["action"]),
"error": outcome["error"],
}

if "http" in outcome:
# the `http.response.status_code` is not
# always present in the error field.
encoded["http"] = outcome["http"]

return encoded

def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]:
return {}
Expand Down
116 changes: 115 additions & 1 deletion tests/handlers/aws/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4292,10 +4292,11 @@ def test_es_dead_letter_index(self) -> None:
assert res["hits"]["total"] == {"value": 1, "relation": "eq"}

assert (
res["hits"]["hits"][0]["_source"]["error"]["reason"]
res["hits"]["hits"][0]["_source"]["error"]["message"]
== "test_es_non_indexable_dead_letter_index fail message"
)
assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception"
assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500
dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"])
assert dead_letter_message["log"]["offset"] == 0
assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path
Expand Down Expand Up @@ -4419,3 +4420,116 @@ def test_es_non_indexable_dead_letter_index(self) -> None:
assert first_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]

def test_es_dead_letter_index_with_retryable_errors(self) -> None:
"""
Test that retryable errors are not redirected to the dead letter index (DLI).
"""
assert isinstance(self.elasticsearch, ElasticsearchContainer)
assert isinstance(self.localstack, LocalStackContainer)

sqs_queue_name = _time_based_id(suffix="source-sqs")
sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url())

dead_letter_index_name = "logs-generic-default-dli"

sqs_queue_arn = sqs_queue["QueueArn"]
sqs_queue_url = sqs_queue["QueueUrl"]
sqs_queue_url_path = sqs_queue["QueueUrlPath"]

config_yaml: str = f"""
inputs:
- type: sqs
id: "{sqs_queue_arn}"
tags: {self.default_tags}
outputs:
- type: "elasticsearch"
args:
# This IP address is non-routable and
# will always result in a connection failure.
elasticsearch_url: "0.0.0.0:9200"
es_dead_letter_index: "{dead_letter_index_name}"
ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint}
username: "{self.secret_arn}:username"
password: "{self.secret_arn}:password"
"""

config_file_path = "config.yaml"
config_bucket_name = _time_based_id(suffix="config-bucket")
_s3_upload_content_to_bucket(
client=self.s3_client,
content=config_yaml.encode("utf-8"),
content_type="text/plain",
bucket_name=config_bucket_name,
key=config_file_path,
)

os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"

fixtures = [
_load_file_fixture("cloudwatch-log-1.json"),
]

_sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures))

event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn)
message_id = event["Records"][0]["messageId"]

# Create pipeline to reject documents
processors = {
"processors": [
{
"fail": {
"message": "test_es_dead_letter_index_with_retryable_errors fail message",
}
},
]
}

self.elasticsearch.put_pipeline(
id="test_es_dead_letter_index_with_retryable_errors_fail_pipeline",
body=processors,
)

self.elasticsearch.create_data_stream(name="logs-generic-default")
self.elasticsearch.put_settings(
index="logs-generic-default",
body={"index.default_pipeline": "test_es_dead_letter_index_with_retryable_errors_fail_pipeline"},
)

self.elasticsearch.refresh(index="logs-generic-default")

self.elasticsearch.create_data_stream(name=dead_letter_index_name)

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
first_call = handler(event, ctx) # type:ignore

assert first_call == "completed"

# Test document has been rejected from target index
self.elasticsearch.refresh(index="logs-generic-default")

assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0

# Test event does not go into the dead letter queue
assert self.elasticsearch.exists(index=dead_letter_index_name) is True

self.elasticsearch.refresh(index=dead_letter_index_name)

assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0

# Test event has been redirected into the replay queue
events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn)
assert len(events["Records"]) == 1

first_body: dict[str, Any] = json_parser(events["Records"][0]["body"])

assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n")
assert first_body["event_payload"]["log"]["offset"] == 0
assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path
assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name
assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id
assert first_body["event_payload"]["cloud"]["provider"] == "aws"
assert first_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]
47 changes: 47 additions & 0 deletions tests/shippers/test_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,50 @@ def test_dumps(self) -> None:
with self.subTest("dumps dict"):
dumped = json_serializer.dumps({"key": "value"})
assert '{"key":"value"}' == dumped


@pytest.mark.unit
class TestParseError(TestCase):

def test_parse_error(self) -> None:
shipper = ElasticsearchShipper(
elasticsearch_url="elasticsearch_url",
username="username",
password="password",
tags=["tag1", "tag2", "tag3"],
)

with self.subTest("fail_processor_exception"):
error = shipper._parse_error(
{
"status": 500,
"error": {
"type": "fail_processor_exception",
"reason": "Fail message",
},
},
)

assert error["error"]["type"] == "fail_processor_exception"
assert error["error"]["message"] == "Fail message"
assert error["http"]["response"]["status_code"] == 500

with self.subTest("connection_error"):
error = shipper._parse_error(
{
"status": "N/A",
"error": "whatever",
"exception": elasticsearch.exceptions.ConnectionError("Connection error"),
}
)

assert error["error"]["type"] == "<class 'elasticsearch.exceptions.ConnectionError'>"
assert error["error"]["message"] == "whatever"
assert "http" not in error

with self.subTest("unknown_error"):
error = shipper._parse_error({})

assert error["error"]["type"] == "unknown"
assert error["error"]["message"] == "Unknown error"
assert "http" not in error