forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAttributeMismatch.py
100 lines (84 loc) · 3.79 KB
/
AttributeMismatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import six
from cfnlint.decode.node import list_node
from cfnlint.rules import CloudFormationLintRule
from cfnlint.rules import RuleMatch
class AttributeMismatch(CloudFormationLintRule):
"""Check DynamoDB Attributes"""
id = 'E3039'
shortdesc = 'AttributeDefinitions / KeySchemas mismatch'
description = 'Verify the set of Attributes in AttributeDefinitions and KeySchemas match'
source_url = 'https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html'
tags = ['resources', 'dynamodb']
def __init__(self):
"""Init"""
super(AttributeMismatch, self).__init__()
self.resource_property_types = ['AWS::DynamoDB::Table']
def _get_key_schema_attributes(self, key_schemas_sets):
""" Get Key Schema attributes """
keys = set()
for properties, _ in key_schemas_sets:
for key in properties:
attribute_name = key.get_safe('AttributeName', type_t=six.string_types)
if attribute_name:
keys.add(key.get('AttributeName'))
return keys
def _get_attribute_secondary(self, property_sets):
""" Get the key schemas from secondary indexes """
keys = set()
for properties, _ in property_sets:
for index in properties:
keys = keys.union(
self._get_key_schema_attributes(
index.get_safe('KeySchema', list_node([], None, None), [], list)
)
)
return keys
def check_property_set(self, property_set, path):
""" Check a property set """
matches = []
properties = property_set.get('Object')
keys = set()
attributes = set()
for attribute in properties.get('AttributeDefinitions', []):
attribute_name = attribute.get('AttributeName')
if isinstance(attribute_name, six.string_types):
attributes.add(attribute.get('AttributeName'))
else:
self.logger.info('attribute definitions is not using just strings')
return matches
keys = keys.union(
self._get_key_schema_attributes(
properties.get_safe('KeySchema', list_node([], None, None), [], list)
)
)
keys = keys.union(self._get_attribute_secondary(
properties.get_safe('GlobalSecondaryIndexes', list_node([], None, None), path, list
))) # pylint: disable=bad-continuation
keys = keys.union(self._get_attribute_secondary(
properties.get_safe('LocalSecondaryIndexes', list_node([], None, None), path, list
))) # pylint: disable=bad-continuation
if attributes != keys:
message = 'The set of Attributes in AttributeDefinitions: {0} and KeySchemas: {1} must match at {2}'
matches.append(RuleMatch(
path,
message.format(sorted(list(attributes)), sorted(
list(keys)), '/'.join(map(str, path)))
))
return matches
def check(self, properties, path, cfn):
"""Check itself"""
matches = []
property_sets = cfn.get_object_without_conditions(
properties, ['AttributeDefinitions', 'KeySchema', 'GlobalSecondaryIndexes', 'LocalSecondaryIndexes'])
for property_set in property_sets:
matches.extend(self.check_property_set(property_set, path))
return matches
def match_resource_properties(self, properties, _, path, cfn):
"""Match for sub properties"""
matches = []
matches.extend(self.check(properties, path, cfn))
return matches