diff --git a/kingpin/actors/aws/s3.py b/kingpin/actors/aws/s3.py index 6739e2fb..69958b48 100644 --- a/kingpin/actors/aws/s3.py +++ b/kingpin/actors/aws/s3.py @@ -378,6 +378,47 @@ class LifecycleConfig(SchemaCompareBase): } +class NotificationConfiguration(SchemaCompareBase): + + """Provides JSON-Schema based validation of the supplied Notification Config. + + .. code-block:: json + + { + "queue_configurations": [ + { + "queue_arn": "ARN of the SQS queue", + "events": ["s3:ObjectCreated:*"], + } + ] + } + """ + + SCHEMA = { + 'type': ['object', 'null'], + 'required': ['queue_configurations'], + 'properties': { + 'queue_configurations': { + 'type': ['array'], + 'items': { + 'type': 'object', + 'additionalProperties': False, + 'required': ['queue_arn', 'events'], + 'properties': { + 'queue_arn': { + 'type': 'string' + }, + 'events': { + 'type': 'array', + 'items': {'type': 'string'} + } + } + } + } + } + } + + class TaggingConfig(SchemaCompareBase): """Provides JSON-Schema based validation of the supplied tagging config. @@ -423,6 +464,7 @@ class Bucket(base.EnsurableAWSBaseActor): * Enable or Suspend Bucket Versioning. Note: It is impossible to actually _disable_ bucket versioning -- once it is enabled, you can only suspend it, or re-enable it. + * Enable Event Notification. (limited to SQS for now) **Note about Buckets with Files** @@ -506,6 +548,19 @@ class Bucket(base.EnsurableAWSBaseActor): (bool, None): Whether or not to enable Versioning on the bucket. If "None", then we don't manage versioning either way. Default: None + :notification_configuration: + (:py:class:`NotificationConfiguration`, None) + + If a dictionary is supplised, then it must conform to + :py:class:`NotificationConfiguration`, type and include mapping + queuearn & events + + If an empty dictionary is supplied, then Kingpin will explicitly remove + any Notification Configuration from the bucket. + + Finally, If None is supplies, Kingoin will ignore the checks entire on + this portion of the bucket configuration + **Examples** .. code-block:: json @@ -534,6 +589,17 @@ class Bucket(base.EnsurableAWSBaseActor): {"key": "my_key", "value": "billing-grp-1"}, ], "versioning": true, + "notification_configuration": { + "queue_configurations": [ + { + "queue_arn": "arn:aws:sqs:us-east-1:1234567:some_sqs", + "events": [ + "s3:ObjectCreated:*", + "s3:ObjectRemoved*" + ] + } + ] + } } } @@ -570,6 +636,7 @@ class Bucket(base.EnsurableAWSBaseActor): 'versioning': ((bool, None), None, ('Desired state of versioning on the bucket: ' 'true/false')), + 'notification_configuration': (NotificationConfiguration, None, '') } unmanaged_options = ['name', 'region'] @@ -595,6 +662,14 @@ def __init__(self, *args, **kwargs): if self.access_block is not None: self.access_block = self._snake_to_camel(self.access_block) + # If the NotificationConfiguration is anything but None, we parse + # it and pre-build the rules. + self.notification_configuration = \ + self.option('notification_configuration') + if self.notification_configuration is not None: + self.notification_configuration = \ + self._snake_to_camel(self.notification_configuration) + # Start out assuming the bucket doesn't exist. The _precache() method # will populate this with True if the bucket does exist. self._bucket_exists = False @@ -1133,3 +1208,56 @@ def _set_tags(self): self.s3_conn.put_bucket_tagging, Bucket=self.option('name'), Tagging={'TagSet': tagset}) + + @gen.coroutine + def _get_notification_configuration(self): + if self.notification_configuration is None: + raise gen.Return(None) + + if not self._bucket_exists: + raise gen.Return(None) + + raw = yield self.api_call( + self.s3_conn.get_bucket_notification_configuration, + Bucket=self.option('name')) + + existing_configurations = {} + for configuration in ['TopicConfigurations', + 'QueueConfigurations', + 'LambdaFunctionConfigurations']: + if configuration in raw: + existing_configurations[configuration] = raw[configuration] + raise gen.Return(existing_configurations) + + @gen.coroutine + def _compare_notification_configuration(self): + new = self.notification_configuration + if new is None: + self.log.debug('No Notification Configuration') + raise gen.Return(True) + + exist = yield self._get_notification_configuration() + diff = utils.diff_dicts(exist, new) + + if not diff: + self.log.debug('Notification Configurations match') + raise gen.Return(True) + + self.log.info('Bucket Notification Configuration differs:') + for line in diff.split('\n'): + self.log.info('Diff: %s' % line) + + raise gen.Return(False) + + @gen.coroutine + @dry('Would have added notification configurations') + def _set_notification_configuration(self): + if self.notification_configuration is None: + self.log.debug('No Notification Configurations') + raise gen.Return(None) + + self.log.info('Updating Bucket Notification Configuration') + yield self.api_call( + self.s3_conn.put_bucket_notification_configuration, + Bucket=self.option('name'), + NotificationConfiguration=self.notification_configuration) diff --git a/kingpin/actors/aws/test/test_s3.py b/kingpin/actors/aws/test/test_s3.py index 9029c40b..dd23dd84 100644 --- a/kingpin/actors/aws/test/test_s3.py +++ b/kingpin/actors/aws/test/test_s3.py @@ -52,6 +52,9 @@ def setUp(self): }, 'versioning': False, 'tags': [], + 'notification_configuration': { + 'queue_configurations': [] + } }) self.actor.s3_conn = mock.MagicMock() @@ -643,3 +646,193 @@ def test_set_tags(self): Tagging={'TagSet': [ {'Key': 'tag1', 'Value': 'v1'} ]})]) + + @testing.gen_test + def test_snake_to_camelcase_for_notification_configuration(self): + notification_configuration_snake_case = { + "queue_configurations": [{ + "queue_arn": + "arn:aws:sqs:us-east-1:1234567:test_sqs", + "events": ["s3:ObjectCreated:*"] + }] + } + + notification_configuration_camel_case = \ + self.actor._snake_to_camel(notification_configuration_snake_case) + + self.assertEqual(notification_configuration_camel_case, + {"QueueConfigurations": [{ + "QueueArn": + "arn:aws:sqs:us-east-1:1234567:test_sqs", + "Events": ["s3:ObjectCreated:*"] + }]}) + + @testing.gen_test + def test_set_notification_configurations_none(self): + self.actor.notification_configuration = None + yield self.actor._set_notification_configuration() + self.assertFalse( + self.actor + .s3_conn + .put_bucket_notification_configuration.called + ) + + @testing.gen_test + def test_set_notification_configurations_with_valid_configs(self): + self.actor.notification_configuration = { + "QueueConfigurations": [{ + "QueueArn": "arn:aws:sqs:us-east-1:1234567:test_sqs", + "Events": ["s3:ObjectCreated:*"] + }] + } + yield self.actor._set_notification_configuration() + self.actor\ + .s3_conn\ + .put_bucket_notification_configuration\ + .assert_has_calls([ + mock.call( + Bucket='test', + NotificationConfiguration={ + "QueueConfigurations": [{ + "QueueArn": + "arn:aws:sqs:us-east-1:1234567:test_sqs", + "Events": ["s3:ObjectCreated:*"] + }] + } + ) + ]) + + @testing.gen_test + def test_set_notification_configurations_with_multiple_configs(self): + self.actor.notification_configuration = { + "QueueConfigurations": [ + { + "QueueArn": "arn:aws:sqs:us-east-1:1:test1_sqs", + "Events": ["s3:ObjectCreated:*"] + }, + { + "QueueArn": "arn:aws:sqs:us-east-1:1:test2_sqs", + "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"] + } + ] + } + yield self.actor._set_notification_configuration() + self.actor \ + .s3_conn \ + .put_bucket_notification_configuration \ + .assert_has_calls([mock.call( + Bucket='test', + NotificationConfiguration={ + "QueueConfigurations": [ + { + "QueueArn": "arn:aws:sqs:us-east-1:1:test1_sqs", + "Events": ["s3:ObjectCreated:*"] + }, + { + "QueueArn": "arn:aws:sqs:us-east-1:1:test2_sqs", + "Events": ["s3:ObjectCreated:*", + "s3:ObjectRemoved:*"] + } + ] + } + )]) + + @testing.gen_test + def test_set_notification_configurations_no_configs(self): + self.actor.notification_configuration = { + "QueueConfigurations": [] + } + yield self.actor._set_notification_configuration() + self.actor.s3_conn\ + .put_bucket_notification_configuration\ + .assert_has_calls([]) + + @testing.gen_test + def test_set_notification_configurations_empty_queue_configs(self): + self.actor.notification_configuration = {} + yield self.actor._set_notification_configuration() + self.actor\ + .s3_conn\ + .put_bucket_notification_configuration\ + .assert_has_calls([]) + + @testing.gen_test + def test_get_notification_configuration_with_existing_queueconfigs(self): + self.actor._bucket_exists = True + self.actor.s3_conn.get_bucket_notification_configuration\ + .return_value = { + "QueueConfigurations": [{ + "QueueArn": "arn:aws:sqs", + "Events": ["s3:ObjectCreated:*"] + }] + } + ret = yield self.actor._get_notification_configuration() + self.assertEqual(type(ret), dict) + self.assertEqual(ret, + {'QueueConfigurations': [ + { + "QueueArn": "arn:aws:sqs", + "Events": ["s3:ObjectCreated:*"] + } + ]}) + + @testing.gen_test + def test_get_notification_configuration_no_bucket(self): + self.actor._bucket_exists = False + ret = yield self.actor._get_notification_configuration() + self.assertEqual(ret, None) + + @testing.gen_test + def test_get_notification_configuration_with_no_config(self): + self.actor._bucket_exists = True + self.actor.notification_configuration = None + ret = yield self.actor._get_notification_configuration() + self.assertEqual(ret, None) + + @testing.gen_test + def test_compare_notification_configuration_with_new_config(self): + self.actor.notification_configuration = { + "QueueConfigurations": [{ + "QueueArn": + "arn:aws:sqs:us-east-1:1234567:test_sqs", + "Events": ["s3:ObjectCreated:*"] + }] + } + self.actor\ + .s3_conn\ + .get_bucket_notification_configuration\ + .return_value = {} + ret = yield self.actor._compare_notification_configuration() + self.assertFalse(ret) + + @testing.gen_test + def test_compare_notification_configuration_with_no_updated_config(self): + self.actor._bucket_exists = True + self.actor.notification_configuration = { + "QueueConfigurations": [{ + "QueueArn": + "arn:aws:sqs:us-east-1:1234567:test_sqs", + "Events": ["s3:ObjectCreated:*"] + }] + } + self.actor\ + .s3_conn\ + .get_bucket_notification_configuration\ + .return_value = { + "QueueConfigurations": [{ + "QueueArn": "arn:aws:sqs:us-east-1:1234567:test_sqs", + "Events": ["s3:ObjectCreated:*"] + }] + } + ret = yield self.actor._compare_notification_configuration() + self.assertTrue(ret) + + @testing.gen_test + def test_compare_notification_configuration_with_no_config(self): + self.actor.notification_configuration = None + self.actor\ + .s3_conn\ + .get_bucket_notification_configuration\ + .return_value = {} + ret = yield self.actor._compare_notification_configuration() + self.assertTrue(ret)