Skip to content

Commit

Permalink
feat: add --maximum-image-age option (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-Sinz authored Jan 27, 2021
1 parent 3221f4a commit 30b78ba
Showing 1 changed file with 181 additions and 105 deletions.
286 changes: 181 additions & 105 deletions vmss-prototype/vmss-prototype
Original file line number Diff line number Diff line change
Expand Up @@ -293,17 +293,6 @@ def get_sig_image_def(vmss_name):
return 'kamino-{0}-prototype'.format(vmss_name)


def convert_os_patch_datetime(patch_time):
"""
Convert a rebooter OS patched applied time string
to a datetime object
We depend on the format to be parsed as '%Y-%m-%dT%H:%M:%S+00:00'
:param patch_time: The rebooter generated time string
:return: Datetime object (or exception if failed to parse)
"""
return datetime.datetime.strptime(patch_time, '%Y-%m-%dT%H:%M:%S+00:00')


def lookup_one_value(value_dict, value_arg):
"""
Look up a value from a nested dictionary given the dotted string as the key
Expand Down Expand Up @@ -522,6 +511,31 @@ def vmss_build_sig_image(subscription, resource_group, sig_name, image_definitio
return output


def mktime_from_kubernetes(kube_time_string):
"""
Convert a kubernetes RFC time string into python time value
:param kube_time_string: The time string from a kubernetes element
:return: A Python time value (floating point value)
"""
return time.mktime(time.strptime(kube_time_string, '%Y-%m-%dT%H:%M:%SZ'))


def format_into_version(date_time):
"""
Convert a datetime into a version string
Note that the format of the string is yyyy.mm.dd
and is required to be in that kind of format for
the SIG to work correctly (defined by Azure)
It does not need to be year.month.day but it must
be a digits.digits.digits specifically and ordered
via the version ordering constraint.
:param date_time: The datetime object to convert
:return: The version string as we use within the SIG
"""
# Format for version numbers from timestamps
return date_time.strftime('%Y.%m.%d')


def vmss_prototype_update(sub_args):
"""
Update (create if not existing) the prototype based images for the
Expand Down Expand Up @@ -560,9 +574,6 @@ def vmss_prototype_update(sub_args):
# Pod types we will skip during drain/delete
pod_types_to_skip = ['DaemonSet', 'Node']

# Format for version numbers from timestamps
version_format = '%Y.%m.%d'

# The time format we use in snapshot tags
snapshot_time_format = '%Y-%m-%d %H:%M:%S.%f'

Expand All @@ -575,7 +586,7 @@ def vmss_prototype_update(sub_args):
snapshot_name = 'snapshot_{0}'.format(vmss)

# Create our version number from the managed image
version = datetime.datetime.now().strftime(version_format)
version = format_into_version(datetime.datetime.now())

# The VMSS specific image definition name
image_definition = get_sig_image_def(vmss)
Expand Down Expand Up @@ -640,7 +651,7 @@ def vmss_prototype_update(sub_args):
# We have a snapshot - we need to now check if it is too old
snapshot = json.loads(output)
snapshot_date = lookup_one_value(snapshot, 'tags.BuiltAt')
snapshot_version = datetime.datetime.strptime(snapshot_date, snapshot_time_format).strftime(version_format)
snapshot_version = format_into_version(datetime.datetime.strptime(snapshot_date, snapshot_time_format))

if snapshot_version != version:
# Clean up the snapshot as it is too old - we will need to
Expand Down Expand Up @@ -877,6 +888,97 @@ def vmss_prototype_update(sub_args):
logging.warning('Unable to determine capacity for VMSS {0}, will not add back 1 instance'.format(vmss))


def get_candidate_nodes(vmss, nodes, node_ignore_names, node_ignore_annotations, minimum_ready_time, pending_reboot_annotation, last_patch_annotation, latest_image):
"""
Returns an ordered list of candidates, with the longest healthy first.
Candidates are nodes names for the "prototype" node.
This may be an empty list which would mean that there are no valid candidates.
:param vmss: The name of the VMSS we are operating on
:param nodes: The Nodes items from kubernetes
:param node_ignore_names: The list of node names to ignore
:param node_ignore_annotations: The list of node annotations that, if existing, will cause the node to be ignored
:param minimum_ready_time: Minimum ready time before a node is considered
:param pending_reboot_annotation: The annotation on a node that signals it is pending a reboot
:param latest_patch_annotation: The annotation on a node that holds the date/time of the last OS patch update
:latest_image: The version string of the current latest image (or None, if OS patch should not be considered)
:return: An list of those nodes that passed the qualifying criteria
"""
candidates = []
for node in nodes:
metadata = node.get('metadata', {})
# If not a node in target VMSS, continue...
node_name = metadata.get('name', 'unknown')
if vmss not in node_name:
logging.debug('IGNORED: Node {0} not part of vmss {1}'.format(node_name, vmss))
continue

if node_name == os.getenv('NODE_ID', None):
logging.debug('IGNORED: Node {0} is the same as the node we are running on'.format(node_name))
continue

if node_name in node_ignore_names:
logging.debug('IGNORED: Node {0} is in the set of specific notes to ignore'.format(node_name))
continue

if node.get('spec', {}).get('unschedulable', False):
logging.debug('IGNORED: Node {0} as it is marked unschedulable'.format(node_name))
continue

# Filtering out masters normally is not needed unless they are, for
# some reason, within a VMSS. We still don't want to make fresh images
# here as masters are a bit special.
if metadata.get('labels', {}).get('kubernetes.io/role', '') == 'master':
logging.debug('IGNORED: Node {0} is a control plane node'.format(node_name))
continue

annotations = metadata.get('annotations', {})

ignore = [annotation for annotation in node_ignore_annotations if annotation in annotations]
if ignore:
logging.debug('IGNORED: Node {0} is annotated as needing to be ignored: {1}'.format(node_name, ignore))
continue

if pending_reboot_annotation in annotations:
logging.debug('IGNORED: Node {0} is annotated as pending reboot: {1}'.format(node_name, pending_reboot_annotation))
continue

if latest_image:
last_patch = annotations.get(last_patch_annotation, None)
if not last_patch:
logging.debug('IGNORED: Node {0} does not have a last patch annotation: {1}'.format(node_name, last_patch_annotation))
continue

last_patch_date_match = re.search(r'\d\d\d\d-\d\d-\d\d', last_patch)
if not last_patch_date_match:
logging.debug('IGNORED: Node {0} last patch annotation does not have a valid format: {1}={2}'.format(node_name, last_patch_annotation, last_patch))
continue

patch_version = last_patch_date_match.group(0).replace('-', '.')
if patch_version <= latest_image:
logging.debug('IGNORED: Node {0} latest patch of {1} not newer that latest image version {2}'.format(node_name, patch_version, latest_image))
continue

ready_time = None
for condition in node.get('status', {}).get('conditions', []):
if condition.get('type', '') == 'Ready':
if condition.get('status', '') == 'True':
ready_time = int(mktime_from_kubernetes(condition['lastHeartbeatTime']) - mktime_from_kubernetes(condition['lastTransitionTime']))
break
if not ready_time:
logging.debug('IGNORED: Node {0} is not ready')
continue

if ready_time < minimum_ready_time:
logging.debug('IGNORED: Node {0} has been ready for only {1}s and the minimum is {2}s'.format(node_name, ready_time, minimum_ready_time))
continue

logging.debug('CANDIDATE: Node {0} ready for {1}s'.format(node_name, ready_time))
candidates.append({'node': node_name, 'ready': ready_time})

candidates.sort(key=lambda data: data['ready'], reverse=True)
return [candidate['node'] for candidate in candidates]


def vmss_prototype_auto_update(sub_args):
"""
Auto update the VMSS (or set of VMSS) in the cluster if needed and ready
Expand Down Expand Up @@ -921,80 +1023,37 @@ def vmss_prototype_auto_update(sub_args):
latest_image = version['name']
logging.debug('Latest image for VMSS {0} is {1}'.format(vmss, latest_image))

potential_nodes = []
for node in nodes:
metadata = node.get('metadata', {})
# If not a node in target VMSS, continue...
node_name = metadata.get('name', 'unknown')
if vmss not in node_name:
logging.debug('IGNORED: Node {0} not part of vmss {1}'.format(node_name, vmss))
continue

if node_name == os.getenv('NODE_ID', None):
logging.debug('IGNORED: Node {0} is the same as the node we are running on'.format(node_name))
continue

if node_name in node_ignore_names:
logging.debug('IGNORED: Node {0} is in the set of specific notes to ignore'.format(node_name))
continue

if node.get('spec', {}).get('unschedulable', False):
logging.debug('IGNORED: Node {0} as it is marked unschedulable'.format(node_name))
continue

annotations = metadata.get('annotations', {})

ignore = [annotation for annotation in node_ignore_annotations if annotation in annotations]
if ignore:
logging.debug('IGNORED: Node {0} is annotated as needing to be ignored: {1}'.format(node_name, ignore))
continue

if sub_args.pending_reboot_annotation in annotations:
logging.debug('IGNORED: Node {0} is annotated as pending reboot: {1}'.format(node_name, sub_args.pending_reboot_annotation))
continue

last_patch = annotations.get(sub_args.last_patch_annotation, None)
if not last_patch:
logging.debug('IGNORED: Node {0} does not have a last patch annotation: {1}'.format(node_name, sub_args.last_patch_annotation))
continue

last_patch_date_match = re.search(r'\d\d\d\d-\d\d-\d\d', last_patch)
if not last_patch_date_match:
logging.debug('IGNORED: Node {0} last patch annotation does not have a valid format: {1}={2}'.format(node_name, sub_args.last_patch_annotation, last_patch))
continue

patch_version = last_patch_date_match.group(0).replace('-', '.')
if patch_version <= latest_image:
logging.debug('IGNORED: Node {0} latest patch of {1} not newer that latest image version {2}'.format(node_name, patch_version, latest_image))
continue

ready_time = None
for condition in node.get('status', {}).get('conditions', []):
if condition.get('type', '') == 'Ready':
if condition.get('status', '') == 'True':
ready_time = int(time.mktime(time.strptime(condition['lastHeartbeatTime'], '%Y-%m-%dT%H:%M:%SZ')) - time.mktime(time.strptime(condition['lastTransitionTime'], '%Y-%m-%dT%H:%M:%SZ')))
break
if not ready_time:
logging.debug('IGNORED: Node {0} is not ready')
continue

if ready_time < sub_args.minimum_ready_time:
logging.debug('IGNORED: Node {0} has been ready for only {1}s and the minimum is {2}s'.format(node_name, ready_time, sub_args.minimum_ready_time))
continue

potential_nodes.append({
'node': node_name,
'ready': ready_time
})

# Sort our list by ready time (earliest ready == longest running)
potential_nodes.sort(key=lambda data: data['ready'], reverse=True)
# Get a list of candidate nodes with newer OS patches than the current
# image version
potential_nodes = get_candidate_nodes(vmss, nodes,
node_ignore_names,
node_ignore_annotations,
sub_args.minimum_ready_time,
sub_args.pending_reboot_annotation,
sub_args.last_patch_annotation,
latest_image)

if len(potential_nodes) < sub_args.minimum_candidates and sub_args.maximum_image_age > 0:
if latest_image < format_into_version(datetime.datetime.now() - datetime.timedelta(days=sub_args.maximum_image_age)):
# We have an image older than maximum_image_age and we did not
# find OS patches that would satisfy the constraint so
# lets try again without looking for OS patches.
logging.info('Image maximum age reached - looking for any suitible node')
potential_nodes = get_candidate_nodes(vmss, nodes,
node_ignore_names,
node_ignore_annotations,
sub_args.minimum_ready_time,
sub_args.pending_reboot_annotation,
sub_args.last_patch_annotation,
None)

if len(potential_nodes) < sub_args.minimum_candidates:
logging.info('IGNORED: VMSS {0}: Found {1} candidates - minimum candidates is {2}'.format(vmss, len(potential_nodes), sub_args.minimum_candidates))
continue

candidate_node = potential_nodes[0]['node']
# We could try multiple different instances but the first one one the
# list is usually just fine. (Longest "healthy" state)
candidate_node = potential_nodes[0]
logging.info('VMSS {0}: Picked candiate node {1} from {2} candidates'.format(vmss, candidate_node, len(potential_nodes)))
cmd = [
os.path.realpath(__file__),
Expand Down Expand Up @@ -1110,11 +1169,11 @@ def in_cluster(sub_args, func):
])

login_cmd = ['az', 'login',
'--username', username,
'--password', password,
'--tenant', tenant,
'--service-principal'
]
'--username', username,
'--password', password,
'--tenant', tenant,
'--service-principal'
]
if username == "msi":
login_cmd = ['az', 'login', '--identity']
masq = [username, password, tenant]
Expand All @@ -1134,6 +1193,22 @@ def in_cluster(sub_args, func):
return func(sub_args)


def value_minimum_check(v, minimum, name='Value'):
"""
Ensure that the v is an int of at least minimum value
:param v: The input to be validated
:param minimum: The minimum legal value
:param name: The name to call the "value" in case of error
"""
if isinstance(v, str):
if not v.isdigit():
raise argparse.ArgumentTypeError('{0} must be a number that is at least {0}'.format(name, minimum))
v = int(v)
if v < minimum:
raise argparse.ArgumentTypeError("{0} must be at least {1}".format(name, minimum))
return v


def VmssName(v):
"""
VMSS Name validation type for argparser
Expand Down Expand Up @@ -1176,13 +1251,7 @@ def NewUpdatedNodes(v):
:param v: The proposed new-updated-nodes value
:return: The validated new-updated-nodes
"""
if isinstance(v, str):
if not v.isdigit():
raise argparse.ArgumentTypeError("new-updated-nodes must be a positive int: eg 1")
v = int(v)
if v < 0:
raise argparse.ArgumentTypeError("new-updated-nodes must be a positive int: eg 1")
return v
return value_minimum_check(v, 0, 'New updated nodes')


def SubscriptionId(v):
Expand All @@ -1202,10 +1271,7 @@ def HistorySize(v):
:param v: The proposed history size
:return: The history size
"""
v = int(v)
if v < 2:
raise argparse.ArgumentTypeError("Value must not be at least 2")
return v
return value_minimum_check(v, 2, 'History size')


def MinimumCandidates(v):
Expand All @@ -1214,10 +1280,18 @@ def MinimumCandidates(v):
:param v: The proposed minimum
:return: The minimum
"""
v = int(v)
if v < 1:
raise argparse.ArgumentTypeError("Value must not be at least 1")
return v
return value_minimum_check(v, 1, 'Minimum candidates')


def MaximumImageAge(v):
"""
Ensure that we have an int value that is the maximum number of days
an image can be before we create a fresh one even without an OS Patch
Zero means "no maximum age" - must not be negative.
:param v: The proposed maximum
:return: The maximum
"""
return value_minimum_check(v, 0, 'Maximum image age')


def MinimumReadyTime(v):
Expand Down Expand Up @@ -1326,6 +1400,8 @@ def register_commands(subparser):
help='Minimum time a node must in ready state before it is considered a candidate (default %(default)s)')
parser.add_argument('--minimum-candidates', default=1, type=MinimumCandidates,
help='Minimum number of acceptable candidates (default: %(default)s)')
parser.add_argument('--maximum-image-age', default=0, type=MaximumImageAge,
help='Maximum number of days old a prototype image should be before a replacement is made even without OS patches (0==no maximum) (default: %(default)s)')
parser.add_argument('--ignore-nodes', action='append', nargs='+', required=False, type=NodeName,
help='The name of nodes that should be ignored as potential prototype candidates')
parser.add_argument('--dry-run', default=False, action='store_true',
Expand Down

0 comments on commit 30b78ba

Please sign in to comment.