forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CacheClusterFailover.py
114 lines (103 loc) · 5.49 KB
/
CacheClusterFailover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from cfnlint.helpers import bool_compare
from cfnlint.rules import CloudFormationLintRule
from cfnlint.rules import RuleMatch
class CacheClusterFailover(CloudFormationLintRule):
"""Check automatic failover on a cache cluster"""
id = 'E3026'
shortdesc = 'Check Elastic Cache Redis Cluster settings'
description = 'Evaluate Redis Cluster groups to make sure automatic failover is ' \
'enabled when cluster mode is enabled'
source_url = 'https://github.com/awslabs/cfn-python-lint'
tags = ['resources', 'elasticcache']
def __init__(self):
"""Init"""
super(CacheClusterFailover, self).__init__()
self.resource_property_types.append('AWS::ElastiCache::ReplicationGroup')
def is_cluster_enabled(self, properties):
"""Test if cluster is enabled """
if isinstance(properties, dict):
for property_name, property_value in properties.items():
if property_name == 'cluster-enabled' and property_value == 'yes':
return True
return False
def _test_cluster_settings(self, properties, path, pg_properties, pg_path, cfn, scenario):
""" test for each scenario """
results = []
pg_conditions = cfn.get_conditions_from_path(cfn.template, pg_path)
# test to make sure that any condition that may apply to the path for the Ref
# is not applicable
if pg_conditions and scenario:
for c_name, c_value in scenario.items():
if c_name in pg_conditions:
if c_value not in pg_conditions.get(c_name):
return results
if self.is_cluster_enabled(cfn.get_value_from_scenario(pg_properties, scenario)):
c_props = cfn.get_value_from_scenario(properties, scenario)
automatic_failover = c_props.get('AutomaticFailoverEnabled')
if bool_compare(automatic_failover, False):
pathmessage = path[:] + ['AutomaticFailoverEnabled']
if scenario is None:
message = '"AutomaticFailoverEnabled" must be misssing or True when setting up a cluster at {0}'
results.append(
RuleMatch(pathmessage, message.format('/'.join(map(str, pathmessage)))))
else:
message = '"AutomaticFailoverEnabled" must be misssing or True when setting up a cluster when {0} at {1}'
scenario_text = ' and '.join(
['when condition "%s" is %s' % (k, v) for (k, v) in scenario.items()])
results.append(
RuleMatch(pathmessage, message.format(scenario_text, '/'.join(map(str, pathmessage)))))
num_node_groups = c_props.get('NumNodeGroups')
if not num_node_groups:
# only test cache nodes if num node groups aren't specified
num_cache_nodes = c_props.get('NumCacheClusters', 0)
if num_cache_nodes <= 1:
pathmessage = path[:] + ['NumCacheClusters']
if scenario is None:
message = '"NumCacheClusters" must be greater than one when creating a cluster at {0}'
results.append(
RuleMatch(pathmessage, message.format('/'.join(map(str, pathmessage)))))
else:
message = '"NumCacheClusters" must be greater than one when creating a cluster when {0} at {1}'
scenario_text = ' and '.join(
['when condition "%s" is %s' % (k, v) for (k, v) in scenario.items()])
results.append(
RuleMatch(pathmessage, message.format(scenario_text, '/'.join(map(str, pathmessage)))))
return results
def test_cluster_settings(self, properties, path, pg_resource_name, pg_path, cfn):
""" Test cluster settings for the parameter group and Replication Group """
results = []
pg_properties = cfn.template.get('Resources', {}).get(
pg_resource_name, {}).get('Properties', {}).get('Properties', {})
scenarios = cfn.get_conditions_scenarios_from_object([
properties,
pg_properties
])
if scenarios:
for scenario in scenarios:
results.extend(
self._test_cluster_settings(properties, path, pg_properties, pg_path, cfn, scenario))
else:
results.extend(
self._test_cluster_settings(properties, path, pg_properties, pg_path, cfn, None))
return results
def match_resource_properties(self, properties, _, path, cfn):
"""Check CloudFormation Properties"""
matches = []
parameter_groups = properties.get_safe('CacheParameterGroupName', '', path)
for parameter_group in parameter_groups:
pg_value = parameter_group[0]
pg_path = parameter_group[1]
if isinstance(pg_value, dict):
for pg_key, pg_resource in pg_value.items():
if pg_key == 'Ref' and pg_resource in cfn.get_resources():
matches.extend(
self.test_cluster_settings(
properties, path,
pg_resource, pg_path,
cfn
))
return matches