diff --git a/spark_ec2.py b/spark_ec2.py index 28d72f43..adb96ce6 100644 --- a/spark_ec2.py +++ b/spark_ec2.py @@ -166,13 +166,11 @@ def setup_external_libs(libs): from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType from boto import ec2 - class UsageError(Exception): pass -# Configure and parse our command-line arguments -def parse_args(): +def get_parser(): parser = OptionParser( prog="spark-ec2", version="%prog {v}".format(v=SPARK_EC2_VERSION), @@ -332,11 +330,16 @@ def parse_args(): "--instance-profile-name", default=None, help="IAM profile name to launch instances under") - (opts, args) = parser.parse_args() + return parser + +# Configure and parse our command-line arguments +def parse_args(): + parser = get_parser() + opts, args = parser.parse_args() if len(args) != 2: parser.print_help() sys.exit(1) - (action, cluster_name) = args + action, cluster_name = args # Boto config check # http://boto.cloudhackers.com/en/latest/boto_config_tut.html @@ -353,7 +356,7 @@ def parse_args(): print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set", file=stderr) sys.exit(1) - return (opts, action, cluster_name) + return opts, action, cluster_name # Get the EC2 security group of the given name, creating it if it doesn't exist @@ -453,20 +456,28 @@ def get_tachyon_version(spark_version): return SPARK_TACHYON_MAP.get(spark_version, "") -# Attempt to resolve an appropriate AMI given the architecture and region of the request. -def get_spark_ami(opts): - if opts.instance_type in EC2_INSTANCE_TYPES: - instance_type = EC2_INSTANCE_TYPES[opts.instance_type] +# Attempt to resolve an appropriate AMI given the architecture and region of +# the request. +def get_spark_ami(instance_type, region, + spark_ec2_git_repo, + spark_ec2_git_branch): + """ + + """ + if instance_type in EC2_INSTANCE_TYPES: + instance_type = EC2_INSTANCE_TYPES[instance_type] else: instance_type = "pvm" - print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr) + print("Don't recognize %s, assuming type is pvm" % instance_type, + file=stderr) # URL prefix from which to fetch AMI information ami_prefix = "{r}/{b}/ami-list".format( - r=opts.spark_ec2_git_repo.replace("https://github.com", "https://raw.github.com", 1), - b=opts.spark_ec2_git_branch) + r=spark_ec2_git_repo.replace("https://github.com", + "https://raw.github.com", 1), + b=spark_ec2_git_branch) - ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type) + ami_path = "%s/%s/%s" % (ami_prefix, region, instance_type) reader = codecs.getreader("ascii") try: ami = reader(urlopen(ami_path)).read().strip() @@ -484,11 +495,13 @@ def get_spark_ami(opts): # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, cluster_name): if opts.identity_file is None: - print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr) + print("ERROR: Must provide an identity file (-i)", + " for ssh connections.", file=stderr) sys.exit(1) if opts.key_pair is None: - print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr) + print("ERROR: Must provide a key pair name (-k) to use on instances.", + file=stderr) sys.exit(1) user_data_content = None @@ -497,26 +510,28 @@ def launch_cluster(conn, opts, cluster_name): user_data_content = user_data_file.read() print("Setting up security groups...") - master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) - slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) + master_group = get_or_make_group(conn, cluster_name + "-master", + opts.vpc_id) + slave_group = get_or_make_group(conn, cluster_name + "-slaves", + opts.vpc_id) authorized_address = opts.authorized_address if master_group.rules == []: # Group was just now created if opts.vpc_id is None: master_group.authorize(src_group=master_group) master_group.authorize(src_group=slave_group) else: - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) + master_group.authorize(ip_protocol='icmp', from_port=-1, + to_port=-1, src_group=master_group) + master_group.authorize(ip_protocol='tcp', from_port=0, + to_port=65535, src_group=master_group) + master_group.authorize(ip_protocol='udp', from_port=0, + to_port=65535, src_group=master_group) + master_group.authorize(ip_protocol='icmp', from_port=-1, + to_port=-1, src_group=slave_group) + master_group.authorize(ip_protocol='tcp', from_port=0, + to_port=65535, src_group=slave_group) + master_group.authorize(ip_protocol='udp', from_port=0, + to_port=65535, src_group=slave_group) master_group.authorize('tcp', 22, 22, authorized_address) master_group.authorize('tcp', 8080, 8081, authorized_address) master_group.authorize('tcp', 18080, 18080, authorized_address) @@ -536,6 +551,9 @@ def launch_cluster(conn, opts, cluster_name): master_group.authorize('udp', 4242, 4242, authorized_address) # RM in YARN mode uses 8088 master_group.authorize('tcp', 8088, 8088, authorized_address) + # Jupyter notebook on port 8888 + master_group.authorize('tcp', 8888, 8888, authorized_address) + if opts.ganglia: master_group.authorize('tcp', 5080, 5080, authorized_address) if slave_group.rules == []: # Group was just now created @@ -543,18 +561,18 @@ def launch_cluster(conn, opts, cluster_name): slave_group.authorize(src_group=master_group) slave_group.authorize(src_group=slave_group) else: - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) + slave_group.authorize(ip_protocol='icmp', from_port=-1, + to_port=-1, src_group=master_group) + slave_group.authorize(ip_protocol='tcp', from_port=0, + to_port=65535, src_group=master_group) + slave_group.authorize(ip_protocol='udp', from_port=0, + to_port=65535, src_group=master_group) + slave_group.authorize(ip_protocol='icmp', from_port=-1, + to_port=-1, src_group=slave_group) + slave_group.authorize(ip_protocol='tcp', from_port=0, + to_port=65535, src_group=slave_group) + slave_group.authorize(ip_protocol='udp', from_port=0, + to_port=65535, src_group=slave_group) slave_group.authorize('tcp', 22, 22, authorized_address) slave_group.authorize('tcp', 8080, 8081, authorized_address) slave_group.authorize('tcp', 50060, 50060, authorized_address) @@ -563,8 +581,11 @@ def launch_cluster(conn, opts, cluster_name): slave_group.authorize('tcp', 60075, 60075, authorized_address) # Check if instances are already running in our groups - existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, - die_on_error=False) + existing_masters, existing_slaves = get_existing_cluster( + conn, opts, + cluster_name, + die_on_error=False) + if existing_slaves or (existing_masters and not opts.use_existing_master): print("ERROR: There are already instances running in group %s or %s" % (master_group.name, slave_group.name), file=stderr) @@ -572,14 +593,18 @@ def launch_cluster(conn, opts, cluster_name): # Figure out Spark AMI if opts.ami is None: - opts.ami = get_spark_ami(opts) + opts.ami = get_spark_ami(opts.instance_type, + opts.region, + opts.spark_ec2_git_repo, + opts.spark_ec2_git_branch) # we use group ids to work around https://github.com/boto/boto/issues/350 additional_group_ids = [] if opts.additional_security_group: additional_group_ids = [sg.id for sg in conn.get_all_security_groups() - if opts.additional_security_group in (sg.name, sg.id)] + if opts.additional_security_group in (sg.name, + sg.id)] print("Launching instances...") try: @@ -599,7 +624,8 @@ def launch_cluster(conn, opts, cluster_name): device.delete_on_termination = True block_map["/dev/sd" + chr(ord('s') + i)] = device - # AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342). + # AWS ignores the AMI-specified block device mapping for M3 (see + # SPARK-3342). if opts.instance_type.startswith('m3.'): for i in range(get_num_disks(opts.instance_type)): dev = BlockDeviceType() @@ -650,7 +676,8 @@ def launch_cluster(conn, opts, cluster_name): active_instance_ids.append(id_to_req[i].instance_id) if len(active_instance_ids) == opts.slaves: print("All %d slaves granted" % opts.slaves) - reservations = conn.get_all_reservations(active_instance_ids) + reservations = conn.get_all_reservations( + active_instance_ids) slave_nodes = [] for r in reservations: slave_nodes += r.instances @@ -661,12 +688,14 @@ def launch_cluster(conn, opts, cluster_name): except: print("Canceling spot instance requests") conn.cancel_spot_instance_requests(my_req_ids) - # Log a warning if any of these requests actually launched instances: + # Log a warning if any of these requests actually launched + # instances: (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) running = len(master_nodes) + len(slave_nodes) if running: - print(("WARNING: %d instances are still running" % running), file=stderr) + print(("WARNING: %d instances are still running" % running), + file=stderr) sys.exit(0) else: # Launch non-spot instances @@ -688,12 +717,13 @@ def launch_cluster(conn, opts, cluster_name): subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, - instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, + instance_initiated_shutdown_behavior= + opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) slave_nodes += slave_res.instances - print("Launched {s} slave{plural_s} in {z}, regid = {r}".format( + print("Launched {s} slave{plur_s} in {z}, regid = {r}".format( s=num_slaves_this_zone, - plural_s=('' if num_slaves_this_zone == 1 else 's'), + plur_s=('' if num_slaves_this_zone == 1 else 's'), z=zone, r=slave_res.id)) i += 1 @@ -722,7 +752,8 @@ def launch_cluster(conn, opts, cluster_name): subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content, - instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, + instance_initiated_shutdown_behavior= + opts.instance_initiated_shutdown_behavior, instance_profile_name=opts.instance_profile_name) master_nodes = master_res.instances @@ -735,46 +766,54 @@ def launch_cluster(conn, opts, cluster_name): # Give the instances descriptive names and set additional tags additional_tags = {} if opts.additional_tags.strip(): - additional_tags = dict( - map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',') - ) + additional_tags = dict(map(str.strip, tag.split(':', 1)) for tag in + opts.additional_tags.split(',')) for master in master_nodes: - master.add_tags( - dict(additional_tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) - ) + master.add_tags(dict(additional_tags, + Name='{cn}-master-{iid}'.format(cn=cluster_name, + iid=master.id))) for slave in slave_nodes: - slave.add_tags( - dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) - ) + slave.add_tags(dict(additional_tags, + Name='{cn}-slave-{iid}'.format(cn=cluster_name, + iid=slave.id))) # Return all the instances return (master_nodes, slave_nodes) - def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. - Returns a tuple of lists of EC2 instance objects for the masters and slaves. + Returns a tuple of lists of EC2 instance objects for the masters and + slaves. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) def get_instances(group_names): """ - Get all non-terminated instances that belong to any of the provided security groups. + Get all non-terminated instances that belong to any of the provided + security groups. EC2 reservation filters and instance states are documented here: http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options """ reservations = conn.get_all_reservations( filters={"instance.group-name": group_names}) - instances = itertools.chain.from_iterable(r.instances for r in reservations) - return [i for i in instances if i.state not in ["shutting-down", "terminated"]] - - master_instances = get_instances([cluster_name + "-master"]) - slave_instances = get_instances([cluster_name + "-slaves"]) + instances = itertools.chain.from_iterable(r.instances for r in + reservations) + return [i for i in instances if i.state not in ["shutting-down", + "terminated"]] + master_instances = None + slave_instances = None + if isinstance(opts, Bunch): + master_instances = opts.get("master_nodes") + slave_instances = opts.get("slave_nodes") + + if master_instances is None and slave_instances is None: + master_instances = get_instances([cluster_name + "-master"]) + slave_instances = get_instances([cluster_name + "-slaves"]) if any((master_instances, slave_instances)): print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( @@ -784,11 +823,13 @@ def get_instances(group_names): plural_s=('' if len(slave_instances) == 1 else 's'))) if not master_instances and die_on_error: - print("ERROR: Could not find a master for cluster {c} in region {r}.".format( - c=cluster_name, r=opts.region), file=sys.stderr) + print("ERROR: Could not find a master for ", + "cluster {c} in region {r}.".format(c=cluster_name, + r=opts.region), + file=sys.stderr) sys.exit(1) - return (master_instances, slave_instances) + return master_instances, slave_instances # Deploy configuration files and run setup scripts on a newly launched @@ -827,14 +868,13 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): # prevent ec2-variables.sh from being overwritten print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)) - ssh( - host=master, + ssh(host=master, opts=opts, - command="rm -rf spark-ec2" - + " && " - + "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo, - b=opts.spark_ec2_git_branch) - ) + command="rm -rf spark-ec2" + + " && " + + "git clone {r} -b {b} spark-ec2".format( + r=opts.spark_ec2_git_repo, + b=opts.spark_ec2_git_branch)) print("Deploying files to master...") deploy_files( @@ -843,16 +883,14 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): opts=opts, master_nodes=master_nodes, slave_nodes=slave_nodes, - modules=modules - ) + modules=modules) if opts.deploy_root_dir is not None: print("Deploying {s} to master...".format(s=opts.deploy_root_dir)) deploy_user_files( root_dir=opts.deploy_root_dir, opts=opts, - master_nodes=master_nodes - ) + master_nodes=master_nodes) print("Running setup on master...") setup_spark_cluster(master, opts) @@ -874,12 +912,14 @@ def is_ssh_available(host, opts, print_ssh_output=True): """ s = subprocess.Popen( ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', - '%s@%s' % (opts.user, host), stringify_command('true')], + '%s@%s' % (opts.user, host), + stringify_command('true')], stdout=subprocess.PIPE, - stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order + stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve + # output order ) - cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout - + # [1] is stderr, which we redirected to stdout + cmd_output = s.communicate()[0] if s.returncode != 0 and print_ssh_output: # extra leading newline is for spacing in wait_for_cluster_state() print(textwrap.dedent("""\n @@ -887,11 +927,9 @@ def is_ssh_available(host, opts, print_ssh_output=True): Host: {h} SSH return code: {r} SSH output: {o} - """).format( - h=host, - r=s.returncode, - o=cmd_output.strip() - )) + """).format(h=host, + r=s.returncode, + o=cmd_output.strip())) return s.returncode == 0 @@ -912,15 +950,15 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): """ Wait for all the instances in the cluster to reach a designated state. - cluster_instances: a list of boto.ec2.instance.Instance - cluster_state: a string representing the desired state of all the instances in the cluster - value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as - 'running', 'terminated', etc. - (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250) + cluster_instances: a list of boto.ec2.instance.Instance cluster_state: a + string representing the desired state of all the instances in the cluster + value can be 'ssh-ready' or a valid value from + boto.ec2.instance.InstanceState such as 'running', 'terminated', etc. + (would be nice to replace this with a proper enum: + http://stackoverflow.com/a/1695250) """ sys.stdout.write( - "Waiting for cluster to enter '{s}' state.".format(s=cluster_state) - ) + "Waiting for cluster to enter '{s}' state.".format(s=cluster_state)) sys.stdout.flush() start_time = datetime.now() @@ -958,15 +996,16 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): end_time = datetime.now() print("Cluster is now in '{s}' state. Waited {t} seconds.".format( s=cluster_state, - t=(end_time - start_time).seconds - )) - + t=(end_time - start_time).seconds)) -# Get number of local disks available for a given EC2 instance type. def get_num_disks(instance_type): - # Source: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html - # Last Updated: 2015-06-19 - # For easy maintainability, please keep this manually-inputted dictionary sorted by key. + """ + Get number of local disks available for a given EC2 instance type. + Source: + http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html + Last Updated: 2015-06-19 + For easy maintainability, please keep this manually-inputted dictionary sorted by key. + """ disks_by_instance = { "c1.medium": 1, "c1.xlarge": 4, @@ -1026,19 +1065,22 @@ def get_num_disks(instance_type): if instance_type in disks_by_instance: return disks_by_instance[instance_type] else: - print("WARNING: Don't know number of disks on instance type %s; assuming 1" - % instance_type, file=stderr) + print("WARNING: Don't know number of disks on instance", + " type %s; assuming 1" % instance_type, file=stderr) return 1 -# Deploy the configuration file templates in a given local directory to -# a cluster, filling in any template parameters with information about the -# cluster (e.g. lists of masters and slaves). Files are only deployed to -# the first master instance in the cluster, and we expect the setup -# script to be run on that instance to copy them to other nodes. -# -# root_dir should be an absolute path to the directory with the files we want to deploy. def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): + """ + Deploy the configuration file templates in a given local directory to + a cluster, filling in any template parameters with information about the + cluster (e.g. lists of masters and slaves). Files are only deployed to + the first master instance in the cluster, and we expect the setup + script to be run on that instance to copy them to other nodes. + + root_dir should be an absolute path to the directory with the files we + want to deploy. + """ active_master = get_dns_name(master_nodes[0], opts.private_ips) num_disks = get_num_disks(opts.instance_type) @@ -1055,7 +1097,8 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): if "." in opts.spark_version: # Pre-built Spark deploy - spark_v = get_validate_spark_version(opts.spark_version, opts.spark_git_repo) + spark_v = get_validate_spark_version(opts.spark_version, + opts.spark_git_repo) tachyon_v = get_tachyon_version(spark_v) else: # Spark-only custom deploy @@ -1064,9 +1107,13 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): print("Deploying Spark via git hash; Tachyon won't be set up") modules = filter(lambda x: x != "tachyon", modules) - master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes] + master_addresses = [get_dns_name(i, opts.private_ips) for + i in master_nodes] slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes] - worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else "" + worker_instances_str = "" + if opts.worker_instances: + worker_instances_str = "%d" % opts.worker_instances + template_vars = { "master_list": '\n'.join(master_addresses), "active_master": active_master, @@ -1108,7 +1155,8 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): with open(local_file, "w") as dest: text = src.read() for key in template_vars: - text = text.replace("{{" + key + "}}", template_vars[key]) + text = text.replace("{{" + key + "}}", + template_vars[key]) dest.write(text) dest.close() # rsync the whole directory over to the master machine @@ -1123,13 +1171,16 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): shutil.rmtree(tmp_dir) -# Deploy a given local directory to a cluster, WITHOUT parameter substitution. -# Note that unlike deploy_files, this works for binary files. -# Also, it is up to the user to add (or not) the trailing slash in root_dir. -# Files are only deployed to the first master instance in the cluster. -# -# root_dir should be an absolute path. def deploy_user_files(root_dir, opts, master_nodes): + """ + + Deploy a given local directory to a cluster, WITHOUT parameter + substitution. Note that unlike deploy_files, this works for binary files. + Also, it is up to the user to add (or not) the trailing slash in root_dir. + Files are only deployed to the first master instance in the cluster. + + root_dir should be an absolute path. + """ active_master = get_dns_name(master_nodes[0], opts.private_ips) command = [ 'rsync', '-rv', @@ -1174,11 +1225,13 @@ def ssh(host, opts, command): if e.returncode == 255: raise UsageError( "Failed to SSH to remote host {0}.\n" - "Please check that you have provided the correct --identity-file and " + "Please check that you have provided the correct " + + "--identity-file and " + "--key-pair parameters and try again.".format(host)) else: raise e - print("Error executing remote command, retrying after 30 seconds: {0}".format(e), + print("Error executing remote command, retrying after 30", + " seconds: {0}".format(e), file=stderr) time.sleep(30) tries = tries + 1 @@ -1201,14 +1254,16 @@ def _check_output(*popenargs, **kwargs): def ssh_read(host, opts, command): return _check_output( - ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)]) + ssh_command(opts) + ['%s@%s' % (opts.user, host), + stringify_command(command)]) def ssh_write(host, opts, command, arguments): tries = 0 while True: proc = subprocess.Popen( - ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)], + ssh_command(opts) + ['%s@%s' % (opts.user, host), + stringify_command(command)], stdin=subprocess.PIPE) proc.stdin.write(arguments) proc.stdin.close() @@ -1216,7 +1271,8 @@ def ssh_write(host, opts, command, arguments): if status == 0: break elif tries > 5: - raise RuntimeError("ssh_write failed with error %s" % proc.returncode) + raise RuntimeError("ssh_write failed with error %s" % + proc.returncode) else: print("Error {0} while executing remote command, retrying after 30 seconds". format(status), file=stderr) @@ -1252,15 +1308,14 @@ def get_ip_address(instance, private_ips=False): def get_dns_name(instance, private_ips=False): dns = instance.public_dns_name if not private_ips else \ instance.private_ip_address - if not dns: - raise UsageError("Failed to determine hostname of {0}.\n" - "Please check that you provided --private-ips if " - "necessary".format(instance)) return dns -def real_main(): - (opts, action, cluster_name) = parse_args() +def real_main(action, cluster_name, opts=None, **kwargs): + force = kwargs.pop('force', False) + if opts is None: + opts = set_opts() + opts.update(**kwargs) # Input parameter validation get_validate_spark_version(opts.spark_version, opts.spark_git_repo) @@ -1271,41 +1326,52 @@ def real_main(): # See: https://docs.python.org/3.5/whatsnew/2.7.html warnings.warn( "This option is deprecated and has no effect. " - "spark-ec2 automatically waits as long as necessary for clusters to start up.", - DeprecationWarning - ) + "spark-ec2 automatically waits as long as necessary for clusters", + "to start up.", + DeprecationWarning) if opts.identity_file is not None: if not os.path.exists(opts.identity_file): - print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file), + print("ERROR: The identity file '{f}' doesn't ", + "exist.".format(f=opts.identity_file), file=stderr) sys.exit(1) file_mode = os.stat(opts.identity_file).st_mode if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00': - print("ERROR: The identity file must be accessible only by you.", file=stderr) - print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file), + print("ERROR: The identity file must be accessible only by you.", + file=stderr) + print('You can fix this with: chmod 400', + '"{f}"'.format(f=opts.identity_file), file=stderr) sys.exit(1) if opts.instance_type not in EC2_INSTANCE_TYPES: - print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format( - t=opts.instance_type), file=stderr) + print("Warning: Unrecognized EC2 instance type for", + "instance-type: {t}".format(t=opts.instance_type), + file=stderr) if opts.master_instance_type != "": if opts.master_instance_type not in EC2_INSTANCE_TYPES: - print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( - t=opts.master_instance_type), file=stderr) - # Since we try instance types even if we can't resolve them, we check if they resolve first - # and, if they do, see if they resolve to the same virtualization type. + print("Warning: Unrecognized EC2 instance type for ", + "master-instance-type: {t}".format( + t=opts.master_instance_type), + file=stderr) + + # Since we try instance types even if we can't resolve them, we check + # if they resolve first and, if they do, see if they resolve to the + # same virtualization type. + if opts.instance_type in EC2_INSTANCE_TYPES and \ opts.master_instance_type in EC2_INSTANCE_TYPES: if EC2_INSTANCE_TYPES[opts.instance_type] != \ EC2_INSTANCE_TYPES[opts.master_instance_type]: - print("Error: spark-ec2 currently does not support having a master and slaves " - "with different AMI virtualization types.", file=stderr) + print("Error: spark-ec2 currently does not support having a ", + "master and slaves with different AMI virtualization ", + "types.", file=stderr) print("master instance virtualization type: {t}".format( - t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr) + t=EC2_INSTANCE_TYPES[opts.master_instance_type]), + file=stderr) print("slave instance virtualization type: {t}".format( t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr) sys.exit(1) @@ -1320,23 +1386,28 @@ def real_main(): opts.spark_ec2_git_repo.endswith(".git") or \ not opts.spark_ec2_git_repo.startswith("https://github.com") or \ not opts.spark_ec2_git_repo.endswith("spark-ec2"): - print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. " - "Furthermore, we currently only support forks named spark-ec2.", file=stderr) + print("spark-ec2-git-repo must be a github repo and it must not ", + "have a trailing / or .git. Furthermore, we currently only", + "support forks named spark-ec2.", file=stderr) sys.exit(1) if not (opts.deploy_root_dir is None or (os.path.isabs(opts.deploy_root_dir) and os.path.isdir(opts.deploy_root_dir) and os.path.exists(opts.deploy_root_dir))): - print("--deploy-root-dir must be an absolute path to a directory that exists " - "on the local file system", file=stderr) + print("--deploy-root-dir must be an absolute path to a directory ", + "that exists on the local file system", file=stderr) sys.exit(1) try: if opts.profile is None: conn = ec2.connect_to_region(opts.region) else: - conn = ec2.connect_to_region(opts.region, profile_name=opts.profile) + conn = ec2.connect_to_region(opts.region, + profile_name=opts.profile) + + opts.conn = conn + except Exception as e: print((e), file=stderr) sys.exit(1) @@ -1350,9 +1421,11 @@ def real_main(): print("ERROR: You have to start at least 1 slave", file=sys.stderr) sys.exit(1) if opts.resume: - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, + cluster_name) else: - (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) + (master_nodes, slave_nodes) = launch_cluster(conn, opts, + cluster_name) wait_for_cluster_state( conn=conn, opts=opts, @@ -1360,10 +1433,12 @@ def real_main(): cluster_state='ssh-ready' ) setup_cluster(conn, master_nodes, slave_nodes, opts, True) + opts.master_nodes = master_nodes + opts.slave_nodes = slave_nodes elif action == "destroy": (master_nodes, slave_nodes) = get_existing_cluster( - conn, opts, cluster_name, die_on_error=False) + conn, opts, cluster_name, die_on_error=False) if any(master_nodes + slave_nodes): print("The following instances will be terminated:") @@ -1371,8 +1446,13 @@ def real_main(): print("> %s" % get_dns_name(inst, opts.private_ips)) print("ALL DATA ON ALL NODES WILL BE LOST!!") - msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) - response = raw_input(msg) + if force: + response = "y" + else: + msg = "Are you sure you want to destroy the cluster " + msg += "{c}? (y/N) ".format(c=cluster_name) + response = raw_input(msg) + if response == "y": print("Terminating master...") for inst in master_nodes: @@ -1383,7 +1463,8 @@ def real_main(): # Delete security groups as well if opts.delete_groups: - group_names = [cluster_name + "-master", cluster_name + "-slaves"] + group_names = [cluster_name + "-master", cluster_name + + "-slaves"] wait_for_cluster_state( conn=conn, opts=opts, @@ -1394,46 +1475,53 @@ def real_main(): attempt = 1 while attempt <= 3: print("Attempt %d" % attempt) - groups = [g for g in conn.get_all_security_groups() if g.name in group_names] + groups = [g for g in conn.get_all_security_groups() if + g.name in group_names] success = True - # Delete individual rules in all groups before deleting groups to - # remove dependencies between them + # Delete individual rules in all groups before deleting + # groups to remove dependencies between them: for group in groups: print("Deleting rules in security group " + group.name) for rule in group.rules: for grant in rule.grants: - success &= group.revoke(ip_protocol=rule.ip_protocol, - from_port=rule.from_port, - to_port=rule.to_port, - src_group=grant) - - # Sleep for AWS eventual-consistency to catch up, and for instances - # to terminate + rv = group.revoke(ip_protocol=rule.ip_protocol, + from_port=rule.from_port, + to_port=rule.to_port, + src_group=grant) + success &= rv + + # Sleep for AWS eventual-consistency to catch up, and for + # instances to terminate time.sleep(30) # Yes, it does have to be this long :-( for group in groups: try: - # It is needed to use group_id to make it work with VPC + # It is needed to use group_id to make it work + # with VPC conn.delete_security_group(group_id=group.id) print("Deleted security group %s" % group.name) except boto.exception.EC2ResponseError: success = False - print("Failed to delete security group %s" % group.name) + print("Failed to delete security group %s" % + group.name) - # Unfortunately, group.revoke() returns True even if a rule was not - # deleted, so this needs to be rerun if something fails + # Unfortunately, group.revoke() returns True even if a rule + # was not deleted, so this needs to be rerun if something + # fails if success: break - attempt += 1 if not success: - print("Failed to delete all security groups after 3 tries.") + print("Failed to delete all security groups after 3 ", + "tries.") print("Try re-running in a few minutes.") elif action == "login": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, + cluster_name) if not master_nodes[0].public_dns_name and not opts.private_ips: - print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + print("Master has no public DNS name. ", + "Maybe you meant to specify --private-ips?") else: master = get_dns_name(master_nodes[0], opts.private_ips) print("Logging into master " + master + "...") @@ -1441,13 +1529,17 @@ def real_main(): if opts.proxy_port is not None: proxy_opt = ['-D', opts.proxy_port] subprocess.check_call( - ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) + ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % + (opts.user, master)]) elif action == "reboot-slaves": - response = raw_input( - "Are you sure you want to reboot the cluster " + - cluster_name + " slaves?\n" + - "Reboot cluster slaves " + cluster_name + " (y/N): ") + if force: + response = "y" + else: + response = raw_input( + "Are you sure you want to reboot the cluster " + + cluster_name + " slaves?\n" + + "Reboot cluster slaves " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) @@ -1458,20 +1550,25 @@ def real_main(): inst.reboot() elif action == "get-master": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, + cluster_name) if not master_nodes[0].public_dns_name and not opts.private_ips: - print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + print("Master has no public DNS name. ", + "Maybe you meant to specify --private-ips?") else: print(get_dns_name(master_nodes[0], opts.private_ips)) elif action == "stop": - response = raw_input( - "Are you sure you want to stop the cluster " + - cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + - "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + - "AMAZON EBS IF IT IS EBS-BACKED!!\n" + - "All data on spot-instance slaves will be lost.\n" + - "Stop cluster " + cluster_name + " (y/N): ") + if force: + response = "y" + else: + response = raw_input( + "Are you sure you want to stop the cluster " + + cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + + "All data on spot-instance slaves will be lost.\n" + + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": (master_nodes, slave_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) @@ -1488,7 +1585,8 @@ def real_main(): inst.stop() elif action == "start": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, + cluster_name) print("Starting slaves...") for inst in slave_nodes: if inst.state not in ["shutting-down", "terminated"]: @@ -1501,8 +1599,7 @@ def real_main(): conn=conn, opts=opts, cluster_instances=(master_nodes + slave_nodes), - cluster_state='ssh-ready' - ) + cluster_state='ssh-ready') # Determine types of running instances existing_master_type = master_nodes[0].instance_type @@ -1515,15 +1612,115 @@ def real_main(): opts.instance_type = existing_slave_type setup_cluster(conn, master_nodes, slave_nodes, opts, False) + opts.master_nodes = master_nodes + opts.slave_nodes = slave_nodes else: print("Invalid action: %s" % action, file=stderr) sys.exit(1) +class Bunch(object): + def __init__(self, **params): + for k, v in params.items(): + self.__dict__[k] = v + + def __repr__(self): + my_list = [] + for k, v in self.__dict__.items(): + my_list.append("%s : %s" % (k, v)) + my_list.sort() + my_str = "\n".join(my_list) + return my_str + + def __setattr__(self, name, value): + self.__dict__[name] = value + + def __getitem__(self, name): + return self.__dict__[name] + + def update(self, **kwargs): + self.__dict__.update(**kwargs) + + def get(self, name, default=None): + self.__dict__.get(name, default) + + +def set_opts(**kwargs): + """ + Set the options + + Parameters + ---------- + kwargs : these will be used as options of the Cluster object + + """ + # Set the default values to the ones set in the argparser: + parser = get_parser() + opts = Bunch(**parser.defaults) + opts.update(**kwargs) + return opts + + +class Cluster(object): + def __init__(self, cluster_name, **kwargs): + """ + Initialize a Spark EC2 cluster + + Parameters + ---------- + cluster_name : str + The name of the cluster + + kwargs : optional inputs (see inputs to `spark-ec2` CLI) + + """ + self.opts = set_opts(**kwargs) + # Cast hadoop_major_version as a string to avoid cryptic errors + # when launching: + self.opts.hadoop_major_version = str(self.opts.hadoop_major_version) + self.cluster_name = cluster_name + + def launch(self): + real_main('launch', self.cluster_name, opts=self.opts) + self.master = self.opts.master_nodes[0] + self.slaves = self.opts.slave_nodes + self.spark_dns = "https://" + self.master.dns_name + ":8080" + self.ganglia_dns = "https://" + self.master.dns_name + ":5080/ganglia" + + def destroy(self, force=False): + real_main('destroy', self.cluster_name, opts=self.opts, force=force) + + def login(self): + real_main('login', self.cluster_name, opts=self.opts) + + def stop(self): + real_main('stop', self.cluster_name, opts=self.opts) + + def start(self): + real_main('start', self.cluster_name, opts=self.opts) + # If you started an already existing cluster, you might need to set + # these + if not hasattr(self, "master"): + self.master = self.opts.master_nodes[0] + self.slaves = self.opts.slave_nodes + self.spark_dns = "https://" + self.master.dns_name + ":8080" + self.ganglia_dns = "https://" + self.master.dns_name + ":5080/ganglia" + + def get_master(self): + real_main('get-master', self.cluster_name, opts=self.opts) + + def reboot_slaves(self): + real_main('reboot-slaves', self.cluster_name, opts=self.opts) + + def upload_credentials(self): + pass + + def main(): + opts, action, cluster_name = parse_args() try: - real_main() + real_main(action, cluster_name, opts) except UsageError as e: print("\nError:\n", e, file=stderr) sys.exit(1)