forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Elb.py
179 lines (158 loc) · 8.52 KB
/
Elb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import six
from cfnlint.rules import CloudFormationLintRule
from cfnlint.rules import RuleMatch
class Elb(CloudFormationLintRule):
"""Check if Elb Resource Properties"""
id = 'E2503'
shortdesc = 'Resource ELB Properties'
description = 'See if Elb Resource Properties are set correctly \
HTTPS has certificate HTTP has no certificate'
source_url = 'https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-elb-listener.html'
tags = ['properties', 'elb']
def __init__(self):
""" Init """
super(Elb, self).__init__()
self.resource_property_types = ['AWS::ElasticLoadBalancingV2::LoadBalancer']
def check_protocol_value(self, value, path, **kwargs):
"""
Check Protocol Value
"""
matches = []
if isinstance(value, six.string_types):
if value.upper() not in kwargs['accepted_protocols']:
message = 'Protocol must be {0} is invalid at {1}'
matches.append(RuleMatch(path, message.format(
(', '.join(kwargs['accepted_protocols'])), ('/'.join(map(str, path))))))
elif value.upper() in kwargs['certificate_protocols']:
if not kwargs['certificates']:
message = 'Certificates should be specified when using HTTPS for {0}'
matches.append(RuleMatch(path, message.format(('/'.join(map(str, path))))))
return matches
def get_loadbalancer_type(self, properties):
""" Check if type is application """
elb_type = properties.get('Type', 'application')
if isinstance(elb_type, six.string_types):
if elb_type == 'application':
return 'application'
return 'network'
return None
def check_alb_subnets(self, properties, path, scenario):
""" Validate at least two subnets with ALBs"""
matches = []
if self.get_loadbalancer_type(properties) == 'application':
subnets = properties.get('Subnets')
if isinstance(subnets, list):
if len(subnets) < 2:
if scenario:
message = 'You must specify at least two Subnets for load balancers with type "application" {0}'
scenario_text = ' and '.join(
['when condition "%s" is %s' % (k, v) for (k, v) in scenario.items()])
matches.append(RuleMatch(path, message.format(scenario_text)))
else:
matches.append(RuleMatch(
path[:] + ['Subnets'], 'You must specify at least two Subnets for load balancers with type "application"'))
subnet_mappings = properties.get('SubnetMappings')
if isinstance(subnet_mappings, list):
if len(subnet_mappings) < 2:
if scenario:
message = 'You must specify at least two SubnetMappings for load balancers with type "application" {0}'
scenario_text = ' and '.join(
['when condition "%s" is %s' % (k, v) for (k, v) in scenario.items()])
matches.append(RuleMatch(path, message.format(scenario_text)))
else:
matches.append(RuleMatch(
path[:] + ['SubnetMappings'], 'You must specify at least two SubnetMappings for load balancers with type "application"'))
return matches
def check_loadbalancer_allowed_attributes(self, properties, path, scenario):
""" Validate loadbalancer attributes per loadbalancer type"""
matches = []
allowed_attributes = {
'all': [
'access_logs.s3.enabled',
'access_logs.s3.bucket',
'access_logs.s3.prefix',
'deletion_protection.enabled'
],
'application': [
'idle_timeout.timeout_seconds',
'routing.http.desync_mitigation_mode',
'routing.http.drop_invalid_header_fields.enabled',
'routing.http.x_amzn_tls_version_and_cipher_suite.enabled',
'routing.http.xff_client_port.enabled',
'routing.http2.enabled',
'waf.fail_open.enabled'
],
'network': [
'load_balancing.cross_zone.enabled'
]
}
loadbalancer_attributes = properties.get('LoadBalancerAttributes')
if isinstance(loadbalancer_attributes, list):
for item in loadbalancer_attributes:
key = item.get('Key')
value = item.get('Value')
if isinstance(key, six.string_types) and isinstance(value, (six.string_types, bool, six.integer_types)):
loadbalancer = self.get_loadbalancer_type(properties)
if loadbalancer:
if key not in allowed_attributes['all'] and key not in allowed_attributes[loadbalancer]:
if scenario:
scenario_text = ' and '.join(
['when condition "%s" is %s' % (k, v) for (k, v) in scenario.items()])
message = 'Attribute "{0}" not allowed for load balancers with type "{1}" {2}'
matches.append(RuleMatch(path, message.format(
key, loadbalancer, scenario_text)))
else:
message = 'Attribute "{0}" not allowed for load balancers with type "{1}"'
matches.append(RuleMatch(path, message.format(key, loadbalancer)))
return matches
def match(self, cfn):
"""Check ELB Resource Parameters"""
matches = []
results = cfn.get_resource_properties(['AWS::ElasticLoadBalancingV2::Listener'])
for result in results:
matches.extend(
cfn.check_value(
result['Value'], 'Protocol', result['Path'],
check_value=self.check_protocol_value,
accepted_protocols=['GENEVE', 'HTTP', 'HTTPS', 'TCP', 'TCP_UDP', 'TLS', 'UDP'],
certificate_protocols=['HTTPS', 'TLS'],
certificates=result['Value'].get('Certificates')))
results = cfn.get_resource_properties(
['AWS::ElasticLoadBalancing::LoadBalancer', 'Listeners'])
for result in results:
if isinstance(result['Value'], list):
for index, listener in enumerate(result['Value']):
matches.extend(
cfn.check_value(
listener, 'Protocol', result['Path'] + [index],
check_value=self.check_protocol_value,
accepted_protocols=['HTTP', 'HTTPS', 'TCP', 'SSL'],
certificate_protocols=['HTTPS', 'SSL'],
certificates=listener.get('SSLCertificateId')))
return matches
def match_resource_properties(self, resource_properties, _, path, cfn):
""" Check Load Balancers """
matches = []
# Play out conditions to determine the relationship between property values
scenarios = cfn.get_object_without_nested_conditions(resource_properties, path)
for scenario in scenarios:
properties = scenario.get('Object')
if self.get_loadbalancer_type(properties) == 'network':
if properties.get('SecurityGroups'):
if scenario.get('Scenario'):
scenario_text = ' and '.join(['when condition "%s" is %s' % (
k, v) for (k, v) in scenario.get('Scenario').items()])
message = 'Security groups are not supported for load balancers with type "network" {0}'
matches.append(RuleMatch(path, message.format(scenario_text)))
else:
path = path + ['SecurityGroups']
matches.append(
RuleMatch(path, 'Security groups are not supported for load balancers with type "network"'))
matches.extend(self.check_alb_subnets(properties, path, scenario.get('Scenario')))
matches.extend(self.check_loadbalancer_allowed_attributes(
properties, path, scenario.get('Scenario')))
return matches