Skip to content

Commit

Permalink
Merge pull request #39 from newrelic/LOGGING-4864_consider_timeouts_t…
Browse files Browse the repository at this point in the history
…o_retry_sending

[LOGGING_4864] Consider timeout errors and add logging debug statements
  • Loading branch information
Matt Whelan authored Apr 14, 2021
2 parents 517636b + 1ba299a commit f34187c
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 25 deletions.
4 changes: 3 additions & 1 deletion serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ provider:
LICENSE_KEY: ${env:LICENSE_KEY}
# Determines if logs are forwarded to New Relic Logging
LOGGING_ENABLED: ${env:LOGGING_ENABLED, "False"}
# A boolean to determine if you want to output debug messages in the CloudWatch console
DEBUG_LOGGING_ENABLED: ${env:DEBUG_LOGGING_ENABLED, "False"}

custom:
pythonRequirements:
Expand All @@ -32,7 +34,7 @@ package:

functions:
newrelic-log-ingestion:
description: Send log data from CloudWatch Logs and S3 to New Relic Infrastructure (Cloud Integrations) and New Relic Logging.
description: Send log data from CloudWatch Logs to New Relic Infrastructure (Cloud Integrations) and New Relic Logging.
handler: src/function.lambda_handler
name: newrelic-log-ingestion

Expand Down
107 changes: 85 additions & 22 deletions src/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import datetime
import gzip
import json
import logging
import os
import re
import time

from base64 import b64decode
from enum import Enum
Expand All @@ -47,6 +49,8 @@
import aiohttp
import asyncio

logger = logging.getLogger()

try:
import newrelic.agent
except ImportError:
Expand All @@ -58,8 +62,10 @@
# Retrying configuration.
# Increasing these numbers will make the function longer in case of
# communication failures and that will increase the cost.
# Decreasing these number could increase the probility of data loss.
# Decreasing these number could increase the probability of data loss.

# Upon receiving the following error codes, the request will be retried up to MAX_RETRIES times
RETRYABLE_ERROR_CODES = [408, 429]
# Maximum number of retries
MAX_RETRIES = 3
# Initial backoff (in seconds) between retries
Expand All @@ -68,6 +74,14 @@
BACKOFF_MULTIPLIER = 2
# Max length in bytes of the payload
MAX_PAYLOAD_SIZE = 1000 * 1024
# Individual request timeout in seconds (non-configurable)
INDIVIDUAL_REQUEST_TIMEOUT_DURATION = 3
INDIVIDUAL_REQUEST_TIMEOUT = aiohttp.ClientTimeout(
total=INDIVIDUAL_REQUEST_TIMEOUT_DURATION
)
# Session max processing time (non-configurable).
# Reserves a time buffer for logs to be formatted before being sent.
SESSION_MAX_PROCESSING_TIME = 1

LAMBDA_LOG_GROUP_PREFIX = "/aws/lambda"
VPC_LOG_GROUP_PREFIX = "/aws/vpc/flow-logs"
Expand Down Expand Up @@ -104,7 +118,7 @@ class EntryType(Enum):
r"(?P<request_id>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
)

LOGGING_LAMBDA_VERSION = "1.0.2"
LOGGING_LAMBDA_VERSION = "1.0.3"
LOGGING_PLUGIN_METADATA = {"type": "lambda", "version": LOGGING_LAMBDA_VERSION}


Expand All @@ -125,14 +139,16 @@ def _format_error(e, text):

while retries < MAX_RETRIES:
if retries > 0:
print("Retrying in {} seconds".format(backoff))
logger.info("Retrying in {} seconds".format(backoff))
await asyncio.sleep(backoff)
backoff *= BACKOFF_MULTIPLIER

retries += 1

try:
resp = await session.post(url, data=data, headers=headers)
resp = await session.post(
url, data=data, headers=headers, timeout=INDIVIDUAL_REQUEST_TIMEOUT
)
resp.raise_for_status()
return resp.status, resp.url
except aiohttp.ClientResponseError as e:
Expand All @@ -144,12 +160,16 @@ def _format_error(e, text):
raise BadRequestException(
_format_error(e, "Review the region endpoint")
)
elif e.status == 429:
print("There was a {} error. Reason: {}".format(e.status, e.message))
elif e.status in RETRYABLE_ERROR_CODES:
logger.warning(f"There was a {e.status} error. Reason: {e.message}")
# Now retry the request
continue
elif 400 <= e.status < 500:
raise BadRequestException(e)
except asyncio.TimeoutError:
logger.warning(f"Timeout on {url} at attempt {retries}/{MAX_RETRIES}")
# Now retry the request
continue

raise MaxRetriesException()

Expand All @@ -169,6 +189,20 @@ def _filter_log_lines(log_entry):
return ret


def _calculate_session_timeout():
# Request 0
total = INDIVIDUAL_REQUEST_TIMEOUT_DURATION

# Requests 1 to N-1
backoff = INITIAL_BACKOFF
for retry in range(MAX_RETRIES - 1):
total += backoff + INDIVIDUAL_REQUEST_TIMEOUT_DURATION
backoff *= BACKOFF_MULTIPLIER

# Finally, add maximum worst-case-scenario expected processing time
return total + SESSION_MAX_PROCESSING_TIME


async def _send_log_entry(log_entry, context):
"""
This function sends the log entry to New Relic Infrastructure's ingest
Expand All @@ -184,7 +218,11 @@ async def _send_log_entry(log_entry, context):
"log_stream_name": context.log_stream_name,
}

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3)) as session:
session_timeout = _calculate_session_timeout()

async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=session_timeout)
) as session:
# Both Infrastructure and Logging require a "LICENSE_KEY" environment variable.
# In order to send data to the Infrastructure Pipeline, the customer doesn't need
# to do anything. To disable it, they'll set "INFRA_ENABLED" to "false".
Expand Down Expand Up @@ -219,7 +257,13 @@ async def _send_log_entry(log_entry, context):
requests.append(
_send_payload(_get_logging_request_creator(payload), session)
)
return await asyncio.gather(*requests)

logger.debug("Sending data to New Relic.....")
ini = time.perf_counter()
result = await asyncio.gather(*requests)
elapsed_millis = (time.perf_counter() - ini) * 1000
logger.debug(f"Time elapsed to send to New Relic: {elapsed_millis:0.2f}ms")
return result


async def _send_payload(request_creator, session, retry=False):
Expand All @@ -229,13 +273,19 @@ async def _send_payload(request_creator, session, retry=False):
session, req.get_full_url(), req.data, req.headers
)
except MaxRetriesException as e:
print("Retry limit reached. Failed to send log entry.")
logger.error("Retry limit reached. Failed to send log entry.")
if retry:
raise e
except BadRequestException as e:
print(e)
logger.error(e)
except asyncio.TimeoutError as e:
logger.error("Session timed out. Failed to send log entry.")
raise e
except Exception as e:
logger.error(f"Error occurred: {e}")
raise e
else:
print("Log entry sent. Response code: {}. url: {}".format(status, url))
logger.info("Log entry sent. Response code: {}. url: {}".format(status, url))
return status


Expand Down Expand Up @@ -408,6 +458,18 @@ def create_request():
return create_request


def _set_console_logging_level():
"""
Determines whether or not debug logging should be enabled based on the env var.
Defaults to false.
"""
if _debug_logging_enabled():
logger.setLevel(logging.DEBUG)
logger.debug("Enabled debug mode")
else:
logger.setLevel(logging.INFO)


def _get_logging_endpoint(ingest_url=None):
"""
Service url is determined by the lincese key's region.
Expand Down Expand Up @@ -506,25 +568,26 @@ def lambda_handler(event, context):
function's configuration.
"""

_set_console_logging_level()

# CloudWatch Log entries are compressed and encoded in Base64
event_data = b64decode(event["awslogs"]["data"])
log_entry_str = gzip.decompress(event_data).decode("utf-8")
log_entry = json.loads(log_entry_str)

# output additional helpful info if debug logging is enabled
# not enabled by default since parsing into json might be slow
if _debug_logging_enabled():
# calling '[0]' without a safety check looks sketchy, but Cloudwatch is never going
# to send us a log without at least one event
print(
"logGroup: {}, logStream: {}, timestamp: {}".format(
log_entry["logGroup"],
log_entry["logStream"],
datetime.datetime.fromtimestamp(
log_entry["logEvents"][0]["timestamp"] / 1000.0
),
)
# calling '[0]' without a safety check looks sketchy, but Cloudwatch is never going
# to send us a log without at least one event
logger.debug(
"logGroup: {}, logStream: {}, timestamp: {}".format(
log_entry["logGroup"],
log_entry["logStream"],
datetime.datetime.fromtimestamp(
log_entry["logEvents"][0]["timestamp"] / 1000.0
),
)
)

asyncio.run(_send_log_entry(log_entry, context))
# This makes it possible to chain this CW log consumer with others using a success destination
Expand Down
6 changes: 6 additions & 0 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Parameters:
Type: String
Description: IAM Role PermissionsBoundary (optional)
Default: ''
DebugLoggingEnabled:
Type: String
Description: A boolean to determine if you want to output debug messages in the CloudWatch console
Default: "false"

Conditions:
NoRole: !Equals ['', !Ref FunctionRole]
Expand Down Expand Up @@ -83,6 +87,7 @@ Resources:
LICENSE_KEY: !Ref NRLicenseKey
LOGGING_ENABLED: !Ref NRLoggingEnabled
INFRA_ENABLED: !Ref NRInfraLogging
DEBUG_LOGGING_ENABLED: !Ref DebugLoggingEnabled
NewRelicLogIngestionFunction:
Type: AWS::Serverless::Function
Condition: NoRole
Expand All @@ -103,6 +108,7 @@ Resources:
LICENSE_KEY: !Ref NRLicenseKey
LOGGING_ENABLED: !Ref NRLoggingEnabled
INFRA_ENABLED: !Ref NRInfraLogging
DEBUG_LOGGING_ENABLED: !Ref DebugLoggingEnabled
LambdaInvokePermissionNoCap:
Type: AWS::Lambda::Permission
Condition: NoCap
Expand Down
87 changes: 85 additions & 2 deletions test/log_ingestion_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from test.aws_log_events import AwsLogEvents

import asyncio
from asynctest import CoroutineMock
from asynctest import CoroutineMock, MagicMock

US_URL = "https://log-api.newrelic.com/log/v1"
EU_URL = "https://log-api.eu.newrelic.com/log/v1"
Expand Down Expand Up @@ -60,6 +60,22 @@ def mock_aio_post():
yield mocked_aio_post


class AsyncContextManagerMock(MagicMock):
async def __aenter__(self):
return self.aenter

async def __aexit__(self, *args):
pass


@pytest.fixture
def mock_aio_session():
with patch(
"aiohttp.ClientSession", new=AsyncContextManagerMock()
) as mocked_aio_session:
yield mocked_aio_session


@patch.dict(
os.environ,
{"INFRA_ENABLED": infra_enabled, "LOGGING_ENABLED": logging_enabled},
Expand Down Expand Up @@ -441,6 +457,69 @@ def test_when_first_two_calls_fail_code_should_retry(mock_aio_post):
assert mock_aio_post.call_count == 3


def test_session_duration_properly_calculated():
# Mock function configuration
function.MAX_RETRIES = 3
function.INITIAL_BACKOFF = 1
function.BACKOFF_MULTIPLIER = 2
function.INDIVIDUAL_REQUEST_TIMEOUT_DURATION = 3
function.SESSION_MAX_PROCESSING_TIME = 1

"""
Diagram of performed calls:
- Call 0: 3s
- Backoff 0: 1s (initial)
- Call 1: 3s
- Backoff 1: 1 * 2s (initial * multiplier)
- Call 2: 3s
- session_max_processing_time: 1s
TOTAL: 8s
"""
expected_max_session_time = 13

assert function._calculate_session_timeout() == expected_max_session_time


def test_when_session_timeouts_exception_should_be_raised(mock_aio_session):
expected_message = "timeout_in_session"
mock_aio_session.side_effect = asyncio.TimeoutError(expected_message)
event = aws_log_events.create_aws_event(["Test Message 1"])

with pytest.raises(asyncio.TimeoutError) as excinfo:
function.lambda_handler(event, context)
pytest.fail("TimeoutError should have been raised by the ClientSession")

assert expected_message == str(excinfo.value)


def test_when_exception_is_thrown_it_should_be_raised(mock_aio_session):
expected_message = "unexpected_exception_in_session"
mock_aio_session.side_effect = IOError(expected_message)
event = aws_log_events.create_aws_event(["Test Message 1"])

with pytest.raises(IOError) as excinfo:
function.lambda_handler(event, context)
pytest.fail(
"An unexpected exception should have been raised by the ClientSession"
)

assert expected_message == str(excinfo.value)


def test_when_first_call_timeouts_code_should_retry(mock_aio_post):
# First two calls timeout, and then third succeeds
mock_aio_post.side_effect = [
aio_post_timeout(),
aio_post_timeout(),
aio_post_response(),
]
event = aws_log_events.create_aws_event(["Test Message 1"])

function.lambda_handler(event, context)

assert mock_aio_post.call_count == 3


def test_logs_have_logstream_and_loggroup(mock_aio_post):
mock_aio_post.return_value = aio_post_response()
message = "Test Message 1"
Expand Down Expand Up @@ -506,10 +585,14 @@ def test_lambda_request_ids_are_extracted(mock_aio_post):
assert messages[4]["attributes"]["aws"]["lambda_request_id"] == expected_request_id2


async def aio_post_response():
def aio_post_response():
return MockHttpResponse("", 202)


def aio_post_timeout():
return asyncio.TimeoutError()


def urlopen_error_response():
return MockHttpResponse("", 429)

Expand Down

0 comments on commit f34187c

Please sign in to comment.