diff --git a/.cfignore b/.cfignore index b135d7e..5ddf11d 100644 --- a/.cfignore +++ b/.cfignore @@ -6,6 +6,5 @@ # App requirements !app.py -!pipe-watchdog.py !config.yaml diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..c82aea1 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +exclude = venv*,__pycache__,.git +max-line-length = 120 diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..eccdad7 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,11 @@ +language: python +sudo: false +python: + - "3.6" +dist: "trusty" +install: + - pip install -r requirements-dev.txt +script: + - make test-requirements test-flake8 +notifications: + email: false diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ca47475 --- /dev/null +++ b/Makefile @@ -0,0 +1,28 @@ +SHELL := /bin/bash +VIRTUALENV_ROOT := $(shell [ -z $$VIRTUAL_ENV ] && echo $$(pwd)/venv || echo $$VIRTUAL_ENV) + +.PHONY: virtualenv +virtualenv: + [ -z $$VIRTUAL_ENV ] && [ ! -d venv ] && python3 -m venv venv || true + +.PHONY: test-flake8 +test-flake8: virtualenv + ${VIRTUALENV_ROOT}/bin/flake8 . + +.PHONY: freeze-requirements +freeze-requirements: + rm -rf venv-freeze + python3 -m venv venv-freeze + $$(pwd)/venv-freeze/bin/pip install -r requirements-app.txt + echo '# This file is autogenerated. Do not edit it manually.' > requirements.txt + cat requirements-app.txt >> requirements.txt + echo '' >> requirements.txt + $$(pwd)/venv-freeze/bin/pip freeze -r <(sed '/^--/d' requirements-app.txt) | sed -n '/The following requirements were added by pip freeze/,$$p' >> requirements.txt + rm -rf venv-freeze + +.PHONY: test-requirements +test-requirements: + @diff requirements-app.txt requirements.txt | grep '<' \ + && { echo "requirements.txt doesn't match requirements-app.txt."; \ + echo "Run 'make freeze-requirements' to update."; exit 1; } \ + || { echo "requirements.txt is up to date"; exit 0; } diff --git a/README.md b/README.md index 6233de0..13c8a4c 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,10 @@ Digital Marketplace Cloudwatch to Graphite ## Purpose -Ships Cloudwatch metrics to Hosted Graphite using https://github.com/crccheck/cloudwatch-to-graphite. +Ships Cloudwatch metrics to Hosted Graphite. +Heavily on the work of https://github.com/crccheck/cloudwatch-to-graphite and the `leadbutt` script. Particularly config parsing to define metrics. -An example config file is also included (`config.yaml.example`) to help you define the correct structure for your metrics. +An example config file is included (`config.yaml.example`) to help you define the correct structure for your metrics. ## Initial setup diff --git a/app.py b/app.py index ccbe74d..71addaf 100644 --- a/app.py +++ b/app.py @@ -1,10 +1,9 @@ #!/usr/bin/env python -import os -import signal -import subprocess import logging -from datetime import datetime, timedelta +import os +from datetime import datetime, timedelta +import boto3 import requests import yaml from retrying import retry @@ -13,58 +12,113 @@ logger = logging.getLogger("app") -def get_timestamp(): - return ( - (datetime.now() - timedelta(days=5)) - datetime(1970, 1, 1) - ).total_seconds() +def get_config(): + """Get the config file.""" + with open('config.yaml') as fp: + config = yaml.load(fp) + return config def send_to_hostedgraphite(metrics): + """Put request to hostedgraphite.""" response = requests.put( "http://www.hostedgraphite.com/api/v1/sink", auth=(os.getenv("HOSTED_GRAPHITE_API_KEY"), ""), data=metrics ) - if response.status_code >= 400: - logger.warn(f"Error sending metrics to hosted graphite - Status code {response.status_code}") + if not response.ok: + logger.warning( + "Error sending metrics to hosted graphite - {}: {}".format(response.status_code, response.reason) + ) else: - logger.info(f"Metrics sent to hosted graphite - Status code {response.status_code}") + logger.info("Metrics sent to hosted graphite - Status code {}".format(response.status_code)) -def initialize_metrics(): - initialized_metrics = [] - timestamp = int(get_timestamp()) - with open('config.yaml') as fp: - config = yaml.load(fp) - for metric in config['Metrics']: - stats = metric['Statistics'] if isinstance(metric['Statistics'], list) else [metric['Statistics']] - for stat in stats: - metric_name = (metric['Options']['Formatter'] - .replace("%(statistic)s", stat.lower())) - initialized_metrics.append("{} 0.0 {}".format(metric_name, timestamp)) - send_to_hostedgraphite("\n".join(initialized_metrics)) +def format_config_metric_entry_for_hostedgraphite_base_metric(config_metric_entry, timestamp): + hostedgraphite_metric_name = config_metric_entry['Options']['Formatter'].replace( + "%(statistic)s", + config_metric_entry['Statistics'].lower() + ) + hostedgraphite_base_metric = "{} 0.0 {}".format(hostedgraphite_metric_name, timestamp) + return hostedgraphite_base_metric -@retry(wait_fixed=60000, retry_on_result=lambda res: res is None) -def call_leadbutt(): +def create_hostedgraphite_base_metrics(config): + """Take a metric entry from the config format it into a base metric used to initialise the metric on hostedgraphite. """ - See https://github.com/crccheck/cloudwatch-to-graphite for more info on leadbutt + base_metrics = [] + timestamp = int(((datetime.now() - timedelta(days=5)) - datetime(1970, 1, 1)).total_seconds()) + for config_metric_entry in config['Metrics']: + hostedgraphite_base_metric = format_config_metric_entry_for_hostedgraphite_base_metric( + config_metric_entry, + timestamp + ) + hostedgraphite_base_metrics.append(hostedgraphite_base_metric) + return hostedgraphite_base_metrics + + +def format_cloudwatch_metric_datapoint_for_hostedgraphite(metric, result): + """Given a cloudwatch metric datapoint convert it into the format hostedgraphite expects.""" + hostedgraphite_metric_name = ( + cloudwatch_metric_datapoint['Options']['Formatter'] % {'statistic': cloudwatch_metric_datapoint['Statistics']} + ).lower() + return '{0} {1} {2}'.format( + hostedgraphite_metric_name, + result[cloudwatch_metric_datapoint['Statistics']], + result['Timestamp'].strftime('%s') + ) + + +def get_metric_from_cloudwatch(client, metric): + """Call the client once for th supplied metric.""" + end_time = datetime.utcnow() + start_time = end_time - timedelta(seconds=600) + return client.get_metric_statistics( + Period=60, + StartTime=start_time, + EndTime=end_time, + MetricName=metric['MetricName'], + Namespace=metric['Namespace'], + Statistics=[metric['Statistics']], + Dimensions=[{'Name': k, 'Value': v} for k, v in metric['Dimensions'].items()], + Unit=metric.get('Unit', 'None'), + ) + + +def get_metrics_from_cloudwatch_and_format_for_hostedgraphite(config): + """Get metrics from config, call cloudwatch for metric entry and format metric datapoints to expected hostedgraphite + format. + + For each metric in the config call the cloudwatch api to return the datapoints for that metric. + For each cloudwatch datapoint create one hostedgraphite metric entry for supply to hostedgraphite. """ - result = subprocess.Popen("leadbutt", stdout=subprocess.PIPE) - metrics = result.communicate()[0].decode("utf-8") - send_to_hostedgraphite(metrics) + hostedgraphite_metrics = [] + + client = boto3.client('cloudwatch', region_name="eu-west-1") + for metric_entry in config['Metrics']: + cloudwatch_metric = get_metric_from_cloudwatch(client, metric_entry) + for cloudwatch_metric_datapoint in cloudwatch_metric['Datapoints']: + hostedgraphite_metric = format_cloudwatch_metric_datapoint_for_hostedgraphite( + metric_entry, + cloudwatch_metric_datapoint + ) + hostedgraphite_metrics.append(hostedgraphite_metric) + + return hostedgraphite_metrics if __name__ == '__main__': - logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s:%(name)s:%(levelname)s:%(message)s", - ) + logging.basicConfig(level=logging.INFO, format="%(asctime)s:%(name)s:%(levelname)s:%(message)s") + config = get_config() + + base_metrics = create_hostedgraphite_base_metrics(config) + send_to_hostedgraphite("\n".join(base_metrics)) - # python by default installs a signal handler that largely ignores SIGPIPE, but we want to use it for our watchdog - # mechanism, so reinstate the *unix* default, which is a fatal handler - signal.signal(signal.SIGPIPE, signal.SIG_DFL) + @retry(wait_fixed=60000, retry_on_result=lambda res: res is None) + def sleep_and_send_retry(): + """Wrapper to apply retry to get and send methods.""" + hostedgraphite_metrics = get_metrics_from_cloudwatch_and_format_for_hostedgraphite(config) + send_to_hostedgraphite('\n'.join(hostedgraphite_metrics)) - initialize_metrics() - call_leadbutt() + sleep_and_send_retry() diff --git a/config.yaml.j2 b/config.yaml.j2 index b4df554..0e31961 100644 --- a/config.yaml.j2 +++ b/config.yaml.j2 @@ -1,8 +1,6 @@ {% set apps = ["antivirus-api", "api", "admin-frontend", "briefs-frontend", "brief-responses-frontend", "buyer-frontend", "router", "search-api", "supplier-frontend", "user-frontend"] -%} {% set environments = ["preview", "staging", "production"] -%} {% set log_types = ["application", "nginx"] -%} -Auth: - region: "eu-west-1" Metrics: {% for environment in environments -%} {% for app in apps -%} @@ -10,7 +8,7 @@ Metrics: - Namespace: "DM-RequestTimeBuckets" MetricName: "{{ environment }}-{{ app }}-request-times-{{ bucket }}" Statistics: "Sum" - Dimensions: [] + Dimensions: {} Options: Formatter: 'cloudwatch.request_time_buckets.{{ environment }}.{{ app }}.request_time_bucket_{{ bucket }}.%(statistic)s' {% endfor -%} @@ -21,13 +19,13 @@ Metrics: - Namespace: "DM-500s" MetricName: "{{ environment }}{% if app != "nginx" %}-{{ app }}{% endif %}-nginx-500s" Statistics: "Sum" - Dimensions: [] + Dimensions: {} Options: Formatter: 'cloudwatch.application_500s.{{ environment }}.{{ app }}.500s.%(statistic)s' - Namespace: "DM-APIClient-Retries" MetricName: "{{ environment }}-{{ app }}-apiclient-retries" Statistics: "Sum" - Dimensions: [] + Dimensions: {} Options: Formatter: "cloudwatch.apiclient_retries.{{ environment }}.{{ app }}.retries.%(statistic)s" {% endfor -%} @@ -36,7 +34,7 @@ Metrics: - Namespace: "DM-429s" MetricName: "{{ environment }}-router-nginx-429s" Statistics: "Sum" - Dimensions: [] + Dimensions: {} Options: Formatter: "cloudwatch.router_429s.{{ environment }}.router.429s.%(statistic)s" {% endfor -%} @@ -53,5 +51,3 @@ Metrics: {% endfor -%} {% endfor -%} {% endfor -%} -Options: - Count: 10 diff --git a/manifest.yml.example b/manifest.yml.example index 9962c78..5469dd7 100644 --- a/manifest.yml.example +++ b/manifest.yml.example @@ -4,7 +4,7 @@ applications: buildpack: python_buildpack no-route: true health-check-type: none - command: python app.py 2>&1 | python pipe-watchdog.py + command: python app.py instances: 1 memory: 128M env: diff --git a/pipe-watchdog.py b/pipe-watchdog.py deleted file mode 100644 index 50a218a..0000000 --- a/pipe-watchdog.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python -import logging -import os -import select -import sys - - -TIMEOUT_SECONDS = 60 * 4 -BUFFER_SIZE_BYTES = 1<<12 - - -logger = logging.getLogger("pipe-watchdog") - - -if __name__ == "__main__": - logging.basicConfig( - level=logging.WARNING, - format="%(asctime)s:%(name)s:%(levelname)s:%(message)s", - ) - - while True: - # this call to `select` will block (for up to TIMEOUT_SECONDS) until the file descriptor(s) specified have data - # ready to be acted upon - here we're only giving it file descriptor 0 (stdin) to monitor for readability - ready_for_read, _, _ = select.select((0,), (), (), TIMEOUT_SECONDS) - - if not ready_for_read: - # select must have timed out - logger.error(f"Did not receive input for {TIMEOUT_SECONDS}s. Aborting.") - # an arbitrary exit code, but at least collisions should be unlikely making it unambiguous - sys.exit(131) - # previous processes in pipeline should receive SIGPIPE, hopefully aborting too. - - # now that we know this shouldn't block we'll try and read as many bytes as we immediately can (up to - # BUFFER_SIZE_BYTES) from file descriptor 0 (stdin) - buf = os.read(0, BUFFER_SIZE_BYTES) - if not buf: - logger.info(f"Reached EOF. Exiting normally.") - sys.exit(0) - - # forward our buffer of bytes back out to file descriptor 1 (stdout) - os.write(1, buf) diff --git a/requirements-app.txt b/requirements-app.txt new file mode 100644 index 0000000..3e55814 --- /dev/null +++ b/requirements-app.txt @@ -0,0 +1,4 @@ +retrying==1.3.3 +requests==2.17.3 +PyYAML==3.13 +boto3==1.9.6 diff --git a/requirements-dev.txt b/requirements-dev.txt index 2659cc5..fd518ec 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ -r requirements.txt Jinja2==2.10 +flake8==3.5.0 diff --git a/requirements.txt b/requirements.txt index d0ac20f..3613971 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,17 @@ +# This file is autogenerated. Do not edit it manually. retrying==1.3.3 -cloudwatch-to-graphite==0.9.2 -requests==2.17.3 \ No newline at end of file +requests==2.17.3 +PyYAML==3.13 +boto3==1.9.6 + +## The following requirements were added by pip freeze: +botocore==1.12.7 +certifi==2018.8.24 +chardet==3.0.4 +docutils==0.14 +idna==2.5 +jmespath==0.9.3 +python-dateutil==2.7.3 +s3transfer==0.1.13 +six==1.11.0 +urllib3==1.21.1