diff --git a/serverless.yml b/serverless.yml index 28af037..c98361f 100644 --- a/serverless.yml +++ b/serverless.yml @@ -16,6 +16,8 @@ provider: LICENSE_KEY: ${env:LICENSE_KEY} # Determines if logs are forwarded to New Relic Logging LOGGING_ENABLED: ${env:LOGGING_ENABLED, "False"} + # A boolean to determine if you want to output debug messages in the CloudWatch console + DEBUG_LOGGING_ENABLED: ${env:DEBUG_LOGGING_ENABLED, "False"} custom: pythonRequirements: @@ -32,7 +34,7 @@ package: functions: newrelic-log-ingestion: - description: Send log data from CloudWatch Logs and S3 to New Relic Infrastructure (Cloud Integrations) and New Relic Logging. + description: Send log data from CloudWatch Logs to New Relic Infrastructure (Cloud Integrations) and New Relic Logging. handler: src/function.lambda_handler name: newrelic-log-ingestion diff --git a/src/function.py b/src/function.py index 9bbe9af..47467a1 100644 --- a/src/function.py +++ b/src/function.py @@ -37,8 +37,10 @@ import datetime import gzip import json +import logging import os import re +import time from base64 import b64decode from enum import Enum @@ -47,6 +49,8 @@ import aiohttp import asyncio +logger = logging.getLogger() + try: import newrelic.agent except ImportError: @@ -58,8 +62,10 @@ # Retrying configuration. # Increasing these numbers will make the function longer in case of # communication failures and that will increase the cost. -# Decreasing these number could increase the probility of data loss. +# Decreasing these number could increase the probability of data loss. +# Upon receiving the following error codes, the request will be retried up to MAX_RETRIES times +RETRYABLE_ERROR_CODES = [408, 429] # Maximum number of retries MAX_RETRIES = 3 # Initial backoff (in seconds) between retries @@ -68,6 +74,14 @@ BACKOFF_MULTIPLIER = 2 # Max length in bytes of the payload MAX_PAYLOAD_SIZE = 1000 * 1024 +# Individual request timeout in seconds (non-configurable) +INDIVIDUAL_REQUEST_TIMEOUT_DURATION = 3 +INDIVIDUAL_REQUEST_TIMEOUT = aiohttp.ClientTimeout( + total=INDIVIDUAL_REQUEST_TIMEOUT_DURATION +) +# Session max processing time (non-configurable). +# Reserves a time buffer for logs to be formatted before being sent. +SESSION_MAX_PROCESSING_TIME = 1 LAMBDA_LOG_GROUP_PREFIX = "/aws/lambda" VPC_LOG_GROUP_PREFIX = "/aws/vpc/flow-logs" @@ -104,7 +118,7 @@ class EntryType(Enum): r"(?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})" ) -LOGGING_LAMBDA_VERSION = "1.0.2" +LOGGING_LAMBDA_VERSION = "1.0.3" LOGGING_PLUGIN_METADATA = {"type": "lambda", "version": LOGGING_LAMBDA_VERSION} @@ -125,14 +139,16 @@ def _format_error(e, text): while retries < MAX_RETRIES: if retries > 0: - print("Retrying in {} seconds".format(backoff)) + logger.info("Retrying in {} seconds".format(backoff)) await asyncio.sleep(backoff) backoff *= BACKOFF_MULTIPLIER retries += 1 try: - resp = await session.post(url, data=data, headers=headers) + resp = await session.post( + url, data=data, headers=headers, timeout=INDIVIDUAL_REQUEST_TIMEOUT + ) resp.raise_for_status() return resp.status, resp.url except aiohttp.ClientResponseError as e: @@ -144,12 +160,16 @@ def _format_error(e, text): raise BadRequestException( _format_error(e, "Review the region endpoint") ) - elif e.status == 429: - print("There was a {} error. Reason: {}".format(e.status, e.message)) + elif e.status in RETRYABLE_ERROR_CODES: + logger.warning(f"There was a {e.status} error. Reason: {e.message}") # Now retry the request continue elif 400 <= e.status < 500: raise BadRequestException(e) + except asyncio.TimeoutError: + logger.warning(f"Timeout on {url} at attempt {retries}/{MAX_RETRIES}") + # Now retry the request + continue raise MaxRetriesException() @@ -169,6 +189,20 @@ def _filter_log_lines(log_entry): return ret +def _calculate_session_timeout(): + # Request 0 + total = INDIVIDUAL_REQUEST_TIMEOUT_DURATION + + # Requests 1 to N-1 + backoff = INITIAL_BACKOFF + for retry in range(MAX_RETRIES - 1): + total += backoff + INDIVIDUAL_REQUEST_TIMEOUT_DURATION + backoff *= BACKOFF_MULTIPLIER + + # Finally, add maximum worst-case-scenario expected processing time + return total + SESSION_MAX_PROCESSING_TIME + + async def _send_log_entry(log_entry, context): """ This function sends the log entry to New Relic Infrastructure's ingest @@ -184,7 +218,11 @@ async def _send_log_entry(log_entry, context): "log_stream_name": context.log_stream_name, } - async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3)) as session: + session_timeout = _calculate_session_timeout() + + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=session_timeout) + ) as session: # Both Infrastructure and Logging require a "LICENSE_KEY" environment variable. # In order to send data to the Infrastructure Pipeline, the customer doesn't need # to do anything. To disable it, they'll set "INFRA_ENABLED" to "false". @@ -219,7 +257,13 @@ async def _send_log_entry(log_entry, context): requests.append( _send_payload(_get_logging_request_creator(payload), session) ) - return await asyncio.gather(*requests) + + logger.debug("Sending data to New Relic.....") + ini = time.perf_counter() + result = await asyncio.gather(*requests) + elapsed_millis = (time.perf_counter() - ini) * 1000 + logger.debug(f"Time elapsed to send to New Relic: {elapsed_millis:0.2f}ms") + return result async def _send_payload(request_creator, session, retry=False): @@ -229,13 +273,19 @@ async def _send_payload(request_creator, session, retry=False): session, req.get_full_url(), req.data, req.headers ) except MaxRetriesException as e: - print("Retry limit reached. Failed to send log entry.") + logger.error("Retry limit reached. Failed to send log entry.") if retry: raise e except BadRequestException as e: - print(e) + logger.error(e) + except asyncio.TimeoutError as e: + logger.error("Session timed out. Failed to send log entry.") + raise e + except Exception as e: + logger.error(f"Error occurred: {e}") + raise e else: - print("Log entry sent. Response code: {}. url: {}".format(status, url)) + logger.info("Log entry sent. Response code: {}. url: {}".format(status, url)) return status @@ -408,6 +458,18 @@ def create_request(): return create_request +def _set_console_logging_level(): + """ + Determines whether or not debug logging should be enabled based on the env var. + Defaults to false. + """ + if _debug_logging_enabled(): + logger.setLevel(logging.DEBUG) + logger.debug("Enabled debug mode") + else: + logger.setLevel(logging.INFO) + + def _get_logging_endpoint(ingest_url=None): """ Service url is determined by the lincese key's region. @@ -506,6 +568,8 @@ def lambda_handler(event, context): function's configuration. """ + _set_console_logging_level() + # CloudWatch Log entries are compressed and encoded in Base64 event_data = b64decode(event["awslogs"]["data"]) log_entry_str = gzip.decompress(event_data).decode("utf-8") @@ -513,18 +577,17 @@ def lambda_handler(event, context): # output additional helpful info if debug logging is enabled # not enabled by default since parsing into json might be slow - if _debug_logging_enabled(): - # calling '[0]' without a safety check looks sketchy, but Cloudwatch is never going - # to send us a log without at least one event - print( - "logGroup: {}, logStream: {}, timestamp: {}".format( - log_entry["logGroup"], - log_entry["logStream"], - datetime.datetime.fromtimestamp( - log_entry["logEvents"][0]["timestamp"] / 1000.0 - ), - ) + # calling '[0]' without a safety check looks sketchy, but Cloudwatch is never going + # to send us a log without at least one event + logger.debug( + "logGroup: {}, logStream: {}, timestamp: {}".format( + log_entry["logGroup"], + log_entry["logStream"], + datetime.datetime.fromtimestamp( + log_entry["logEvents"][0]["timestamp"] / 1000.0 + ), ) + ) asyncio.run(_send_log_entry(log_entry, context)) # This makes it possible to chain this CW log consumer with others using a success destination diff --git a/template.yaml b/template.yaml index aee8bde..5535cad 100644 --- a/template.yaml +++ b/template.yaml @@ -43,6 +43,10 @@ Parameters: Type: String Description: IAM Role PermissionsBoundary (optional) Default: '' + DebugLoggingEnabled: + Type: String + Description: A boolean to determine if you want to output debug messages in the CloudWatch console + Default: "false" Conditions: NoRole: !Equals ['', !Ref FunctionRole] @@ -83,6 +87,7 @@ Resources: LICENSE_KEY: !Ref NRLicenseKey LOGGING_ENABLED: !Ref NRLoggingEnabled INFRA_ENABLED: !Ref NRInfraLogging + DEBUG_LOGGING_ENABLED: !Ref DebugLoggingEnabled NewRelicLogIngestionFunction: Type: AWS::Serverless::Function Condition: NoRole @@ -103,6 +108,7 @@ Resources: LICENSE_KEY: !Ref NRLicenseKey LOGGING_ENABLED: !Ref NRLoggingEnabled INFRA_ENABLED: !Ref NRInfraLogging + DEBUG_LOGGING_ENABLED: !Ref DebugLoggingEnabled LambdaInvokePermissionNoCap: Type: AWS::Lambda::Permission Condition: NoCap diff --git a/test/log_ingestion_test.py b/test/log_ingestion_test.py index 4615eef..28a55d7 100644 --- a/test/log_ingestion_test.py +++ b/test/log_ingestion_test.py @@ -12,7 +12,7 @@ from test.aws_log_events import AwsLogEvents import asyncio -from asynctest import CoroutineMock +from asynctest import CoroutineMock, MagicMock US_URL = "https://log-api.newrelic.com/log/v1" EU_URL = "https://log-api.eu.newrelic.com/log/v1" @@ -60,6 +60,22 @@ def mock_aio_post(): yield mocked_aio_post +class AsyncContextManagerMock(MagicMock): + async def __aenter__(self): + return self.aenter + + async def __aexit__(self, *args): + pass + + +@pytest.fixture +def mock_aio_session(): + with patch( + "aiohttp.ClientSession", new=AsyncContextManagerMock() + ) as mocked_aio_session: + yield mocked_aio_session + + @patch.dict( os.environ, {"INFRA_ENABLED": infra_enabled, "LOGGING_ENABLED": logging_enabled}, @@ -441,6 +457,69 @@ def test_when_first_two_calls_fail_code_should_retry(mock_aio_post): assert mock_aio_post.call_count == 3 +def test_session_duration_properly_calculated(): + # Mock function configuration + function.MAX_RETRIES = 3 + function.INITIAL_BACKOFF = 1 + function.BACKOFF_MULTIPLIER = 2 + function.INDIVIDUAL_REQUEST_TIMEOUT_DURATION = 3 + function.SESSION_MAX_PROCESSING_TIME = 1 + + """ + Diagram of performed calls: + - Call 0: 3s + - Backoff 0: 1s (initial) + - Call 1: 3s + - Backoff 1: 1 * 2s (initial * multiplier) + - Call 2: 3s + - session_max_processing_time: 1s + TOTAL: 8s + """ + expected_max_session_time = 13 + + assert function._calculate_session_timeout() == expected_max_session_time + + +def test_when_session_timeouts_exception_should_be_raised(mock_aio_session): + expected_message = "timeout_in_session" + mock_aio_session.side_effect = asyncio.TimeoutError(expected_message) + event = aws_log_events.create_aws_event(["Test Message 1"]) + + with pytest.raises(asyncio.TimeoutError) as excinfo: + function.lambda_handler(event, context) + pytest.fail("TimeoutError should have been raised by the ClientSession") + + assert expected_message == str(excinfo.value) + + +def test_when_exception_is_thrown_it_should_be_raised(mock_aio_session): + expected_message = "unexpected_exception_in_session" + mock_aio_session.side_effect = IOError(expected_message) + event = aws_log_events.create_aws_event(["Test Message 1"]) + + with pytest.raises(IOError) as excinfo: + function.lambda_handler(event, context) + pytest.fail( + "An unexpected exception should have been raised by the ClientSession" + ) + + assert expected_message == str(excinfo.value) + + +def test_when_first_call_timeouts_code_should_retry(mock_aio_post): + # First two calls timeout, and then third succeeds + mock_aio_post.side_effect = [ + aio_post_timeout(), + aio_post_timeout(), + aio_post_response(), + ] + event = aws_log_events.create_aws_event(["Test Message 1"]) + + function.lambda_handler(event, context) + + assert mock_aio_post.call_count == 3 + + def test_logs_have_logstream_and_loggroup(mock_aio_post): mock_aio_post.return_value = aio_post_response() message = "Test Message 1" @@ -506,10 +585,14 @@ def test_lambda_request_ids_are_extracted(mock_aio_post): assert messages[4]["attributes"]["aws"]["lambda_request_id"] == expected_request_id2 -async def aio_post_response(): +def aio_post_response(): return MockHttpResponse("", 202) +def aio_post_timeout(): + return asyncio.TimeoutError() + + def urlopen_error_response(): return MockHttpResponse("", 429)