diff --git a/skyplane/api/pipeline.py b/skyplane/api/pipeline.py index 42cb0649f..9bdc2253f 100644 --- a/skyplane/api/pipeline.py +++ b/skyplane/api/pipeline.py @@ -1,27 +1,24 @@ -import json -import time -import os import threading -from collections import defaultdict, Counter -from datetime import datetime -from functools import partial from datetime import datetime -import nacl.secret -import nacl.utils import urllib3 from typing import TYPE_CHECKING, Dict, List, Optional from skyplane import compute -from skyplane.api.tracker import TransferProgressTracker, TransferHook +from skyplane.api.tracker import TransferProgressTracker from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob from skyplane.api.config import TransferConfig -from skyplane.planner.planner import MulticastDirectPlanner +from skyplane.planner.planner import ( + MulticastDirectPlanner, + UnicastDirectPlanner, + UnicastILPPlanner, + MulticastILPPlanner, + MulticastMDSTPlanner, +) from skyplane.planner.topology import TopologyPlanGateway from skyplane.utils import logger -from skyplane.utils.definitions import gateway_docker_image, tmp_log_dir -from skyplane.utils.fn import PathLike, do_parallel +from skyplane.utils.definitions import tmp_log_dir from skyplane.api.dataplane import Dataplane @@ -39,6 +36,7 @@ def __init__( transfer_config: TransferConfig, # cloud_regions: dict, max_instances: Optional[int] = 1, + num_connections: Optional[int] = 32, planning_algorithm: Optional[str] = "direct", debug: Optional[bool] = False, ): @@ -67,8 +65,18 @@ def __init__( # planner self.planning_algorithm = planning_algorithm + if self.planning_algorithm == "direct": - self.planner = MulticastDirectPlanner(self.max_instances, 64) + # TODO: should find some ways to merge direct / Ndirect + self.planner = UnicastDirectPlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "Ndirect": + self.planner = MulticastDirectPlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "MDST": + self.planner = MulticastMDSTPlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "ILP": + self.planning_algorithm = MulticastILPPlanner(self.max_instances, num_connections) + elif self.planning_algorithm == "UnicastILP": + self.planning_algorithm = UnicastILPPlanner(self.max_instances, num_connections) else: raise ValueError(f"No such planning algorithm {planning_algorithm}") @@ -112,7 +120,7 @@ def start(self, debug=False, progress=False): # copy gateway logs if debug: dp.copy_gateway_logs() - except Exception as e: + except Exception: dp.copy_gateway_logs() dp.deprovision(spinner=True) return dp diff --git a/skyplane/api/usage.py b/skyplane/api/usage.py index 9d8fd7aec..5d9f57038 100644 --- a/skyplane/api/usage.py +++ b/skyplane/api/usage.py @@ -136,7 +136,7 @@ def log_exception( exception: Exception, args: Optional[Dict] = None, src_region_tag: Optional[str] = None, - dest_region_tag: Optional[str] = None, # TODO: fix this for mult-dest + dest_region_tags: Optional[List[str]] = None, # TODO: fix this for mult-dest session_start_timestamp_ms: Optional[int] = None, ): if cls.enabled(): @@ -146,7 +146,7 @@ def log_exception( error_dict=error_dict, arguments_dict=args, src_region_tag=src_region_tag, - dest_region_tag=dest_region_tag, + dest_region_tags=dest_region_tags, session_start_timestamp_ms=session_start_timestamp_ms, ) destination = client.write_usage_data(stats) @@ -158,7 +158,7 @@ def log_transfer( transfer_stats: Optional[Dict], args: Optional[Dict] = None, src_region_tag: Optional[str] = None, - dest_region_tags: Optional[str] = None, + dest_region_tags: Optional[List[str]] = None, session_start_timestamp_ms: Optional[int] = None, ): if cls.enabled(): @@ -250,7 +250,7 @@ def make_stat( arguments_dict: Optional[Dict] = None, transfer_stats: Optional[Dict] = None, src_region_tag: Optional[str] = None, - dest_region_tags: Optional[str] = None, + dest_region_tags: Optional[List[str]] = None, session_start_timestamp_ms: Optional[int] = None, ): if src_region_tag is None: @@ -261,7 +261,9 @@ def make_stat( dest_provider, dest_region = None, None else: # TODO: have usage stats view for multiple destinations - dest_provider, dest_region = dest_region_tags[0].split(":") + dest_region_tag = [dest_region_tag.split(":") for dest_region_tag in dest_region_tags] + dest_provider, dest_region = list(zip(*dest_region_tag)) + dest_provider, dest_region = ','.join(dest_provider), ','.join(dest_region) return UsageStatsToReport( skyplane_version=skyplane.__version__, @@ -284,7 +286,7 @@ def make_error( error_dict: Dict, arguments_dict: Optional[Dict] = None, src_region_tag: Optional[str] = None, - dest_region_tag: Optional[str] = None, + dest_region_tags: Optional[List[str]] = None, session_start_timestamp_ms: Optional[int] = None, ): if src_region_tag is None: @@ -292,10 +294,12 @@ def make_error( else: src_provider, src_region = src_region_tag.split(":") - if dest_region_tag is None: + if dest_region_tags is None: dest_provider, dest_region = None, None else: - dest_provider, dest_region = dest_region_tag.split(":") + dest_region_tag = [dest_region_tag.split(":") for dest_region_tag in dest_region_tags] + dest_provider, dest_region = list(zip(*dest_region_tag)) + dest_provider, dest_region = ','.join(dest_provider), ','.join(dest_region) return UsageStatsToReport( skyplane_version=skyplane.__version__, diff --git a/skyplane/cli/cli_transfer.py b/skyplane/cli/cli_transfer.py index 01e2d7676..18d04cc5f 100644 --- a/skyplane/cli/cli_transfer.py +++ b/skyplane/cli/cli_transfer.py @@ -21,12 +21,11 @@ from skyplane.api.usage import UsageClient from skyplane.config import SkyplaneConfig from skyplane.config_paths import cloud_config, config_path -from skyplane.obj_store.object_store_interface import ObjectStoreInterface, StorageInterface -from skyplane.obj_store.file_system_interface import FileSystemInterface +from skyplane.obj_store.object_store_interface import StorageInterface from skyplane.cli.impl.progress_bar import ProgressBarTransferHook from skyplane.utils import logger from skyplane.utils.definitions import GB, format_bytes -from skyplane.utils.path import parse_path +from skyplane.utils.path import parse_path, parse_multi_paths @dataclass @@ -50,8 +49,8 @@ def to_dict(self) -> Dict[str, Optional[Any]]: class SkyplaneCLI: - def __init__(self, src_region_tag: str, dst_region_tag: str, args: Dict[str, Any], skyplane_config: Optional[SkyplaneConfig] = None): - self.src_region_tag, self.dst_region_tag = src_region_tag, dst_region_tag + def __init__(self, src_region_tag: str, dst_region_tags: List[str], args: Dict[str, Any], skyplane_config: Optional[SkyplaneConfig] = None): + self.src_region_tag, self.dst_region_tags = src_region_tag, dst_region_tags self.args = args self.aws_config, self.azure_config, self.gcp_config, self.ibmcloud_config = self.to_api_config(skyplane_config or cloud_config) @@ -103,7 +102,8 @@ def to_api_config(self, config: SkyplaneConfig): return aws_config, azure_config, gcp_config, ibmcloud_config def make_transfer_config(self, config: SkyplaneConfig) -> TransferConfig: - intraregion = self.src_region_tag == self.dst_region_tag + # intraregion = self.src_region_tag == self.dst_region_tag + intraregion = self.src_region_tag return TransferConfig( autoterminate_minutes=config.get_flag("autoshutdown_minutes"), requester_pays=config.get_flag("requester_pays"), @@ -131,7 +131,7 @@ def check_config(self) -> bool: return True except skyplane.exceptions.BadConfigException as e: logger.exception(e) - UsageClient.log_exception("cli_check_config", e, self.args, self.src_region_tag, self.dst_region_tag) + UsageClient.log_exception("cli_check_config", e, self.args, self.src_region_tag, self.dst_region_tags) return False def transfer_cp_onprem(self, src: str, dst: str, recursive: bool) -> bool: @@ -144,7 +144,7 @@ def transfer_cp_onprem(self, src: str, dst: str, recursive: bool) -> bool: if rc == 0: print_stats_completed(request_time, None) transfer_stats = TransferStats(monitor_status="completed", total_runtime_s=request_time, throughput_gbits=0) - UsageClient.log_transfer(transfer_stats.to_dict(), self.args, self.src_region_tag, self.dst_region_tag) + UsageClient.log_transfer(transfer_stats.to_dict(), self.args, self.src_region_tag, self.dst_region_tags) return True else: typer.secho("Transfer not supported", fg="red") @@ -160,7 +160,7 @@ def transfer_sync_onprem(self, src: str, dst: str) -> bool: if rc == 0: print_stats_completed(request_time, None) transfer_stats = TransferStats(monitor_status="completed", total_runtime_s=request_time, throughput_gbits=0) - UsageClient.log_transfer(transfer_stats.to_dict(), self.args, self.src_region_tag, self.dst_region_tag) + UsageClient.log_transfer(transfer_stats.to_dict(), self.args, self.src_region_tag, self.dst_region_tags) return True else: typer.secho("Transfer not supported", fg="red") @@ -299,7 +299,7 @@ def force_deprovision(dp: skyplane.Dataplane): def run_transfer( src: str, - dst: str, + dst: List[str], recursive: bool, debug: bool, multipart: bool, @@ -315,9 +315,10 @@ def run_transfer( print_header() provider_src, bucket_src, path_src = parse_path(src) - provider_dst, bucket_dst, path_dst = parse_path(dst) + provider_dsts, bucket_dsts, path_dsts = parse_multi_paths(dst) src_region_tag = StorageInterface.create(f"{provider_src}:infer", bucket_src).region_tag() - dst_region_tag = StorageInterface.create(f"{provider_dst}:infer", bucket_dst).region_tag() + dst_region_tags = StorageInterface.create_region_tags(provider_dsts, bucket_dsts) + args = { "cmd": cmd, "recursive": True, @@ -330,7 +331,7 @@ def run_transfer( } # create CLI object - cli = SkyplaneCLI(src_region_tag=src_region_tag, dst_region_tag=dst_region_tag, args=args) + cli = SkyplaneCLI(src_region_tag=src_region_tag, dst_region_tags=dst_region_tags, args=args) if not cli.check_config(): typer.secho( f"Skyplane configuration file is not valid. Please reset your config by running `rm {config_path}` and then rerunning `skyplane init` to fix.", @@ -346,15 +347,15 @@ def run_transfer( pipeline.queue_sync(src, dst) # confirm transfer - if not cli.confirm_transfer(pipeline, src_region_tag, [dst_region_tag], 5, ask_to_confirm_transfer=not confirm): + if not cli.confirm_transfer(pipeline, src_region_tag, dst_region_tags, 5, ask_to_confirm_transfer=not confirm): return 1 # local->local transfers not supported (yet) - if provider_src == "local" and provider_dst == "local": + if provider_src == "local" and dst_region_tags[0] == "local": raise NotImplementedError("Local->local transfers not supported (yet)") # fall back options: local->cloud, cloud->local, small cloud->cloud transfers - if provider_src == "local" or provider_dst == "local": + if provider_src == "local" or provider_dsts[0] == "local": if cli.args["cmd"] == "cp": return 0 if cli.transfer_cp_onprem(src, dst, recursive) else 1 else: @@ -388,20 +389,20 @@ def run_transfer( logger.fs.exception(e) console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") console.print(e) - UsageClient.log_exception("cli_cp", e, args, cli.src_region_tag, cli.dst_region_tag) + UsageClient.log_exception("cli_cp", e, args, cli.src_region_tag, cli.dst_region_tags) console.print("[bold red]Deprovisioning was interrupted! VMs may still be running which will incur charges.[/bold red]") console.print("[bold red]Please manually deprovision the VMs by running `skyplane deprovision`.[/bold red]") return 1 except skyplane.exceptions.SkyplaneException as e: console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") console.print(e.pretty_print_str()) - UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tag) + UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tags) force_deprovision(dp) except Exception as e: logger.fs.exception(e) console.print(f"[bright_black]{traceback.format_exc()}[/bright_black]") console.print(e) - UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tag) + UsageClient.log_exception("cli_query_objstore", e, args, cli.src_region_tag, cli.dst_region_tags) force_deprovision(dp) diff --git a/skyplane/obj_store/storage_interface.py b/skyplane/obj_store/storage_interface.py index 84d8e8a19..7c2c6f7d4 100644 --- a/skyplane/obj_store/storage_interface.py +++ b/skyplane/obj_store/storage_interface.py @@ -63,3 +63,14 @@ def create(region_tag: str, bucket: str): return POSIXInterface(bucket) else: raise ValueError(f"Invalid region_tag {region_tag} - could not create interface") + + @staticmethod + def create_region_tags(provider_dsts, bucket_dsts): + if isinstance(provider_dsts, str): + provider_dsts = [provider_dsts] + + dst_region_tags = [] + for provider_dst, bucket_dst in zip(provider_dsts, bucket_dsts): + tag = StorageInterface.create(f"{provider_dst}:infer", bucket_dst).region_tag() + dst_region_tags.append(tag) + return dst_region_tags \ No newline at end of file diff --git a/skyplane/planner/planner.py b/skyplane/planner/planner.py index e4044f23f..6d0c11bf1 100644 --- a/skyplane/planner/planner.py +++ b/skyplane/planner/planner.py @@ -1,9 +1,12 @@ +import functools from importlib.resources import path from typing import List, Optional, Tuple -from typing import List, Optional, Tuple - +import numpy as np +import collections +import pandas as pd +from skyplane.planner.solver_ilp import ThroughputSolverILP +from skyplane.planner.solver import ThroughputProblem, BroadcastProblem, BroadcastSolution, GBIT_PER_GBYTE from skyplane import compute - from skyplane.planner.topology import TopologyPlan from skyplane.gateway.gateway_program import ( GatewayProgram, @@ -14,214 +17,641 @@ GatewayReceive, GatewaySend, ) - +import networkx as nx from skyplane.api.transfer_job import TransferJob +from pathlib import Path +from importlib.resources import files +from random import sample class Planner: - def plan(self) -> TopologyPlan: - raise NotImplementedError - - -class UnicastDirectPlanner(Planner): - # DO NOT USE THIS - broken for single-region transfers - def __init__(self, n_instances: int, n_connections: int): + def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): self.n_instances = n_instances self.n_connections = n_connections - super().__init__() + self.n_partitions = n_partitions + + def logical_plan(self) -> nx.DiGraph: + # create logical plan in nx.DiGraph format + raise NotImplementedError def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - # make sure only single destination - for job in jobs: - assert len(job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" + # create physical plan in TopologyPlan format + raise NotImplementedError + def verify_job_src_dsts(self, jobs: List[TransferJob], multicast=False) -> Tuple[str, List[str]]: src_region_tag = jobs[0].src_iface.region_tag() - dst_region_tag = jobs[0].dst_ifaces[0].region_tag() - # jobs must have same sources and destinations - for job in jobs[1:]: - assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" - - plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=[dst_region_tag]) - # TODO: use VM limits to determine how many instances to create in each region - # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(self.n_instances): - plan.add_gateway(src_region_tag) - plan.add_gateway(dst_region_tag) - - # ids of gateways in dst region - dst_gateways = plan.get_region_gateways(dst_region_tag) - - src_program = GatewayProgram() - dst_program = GatewayProgram() - - for job in jobs: - src_bucket = job.src_iface.bucket() - dst_bucket = job.dst_ifaces[0].bucket() - - # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) - - # source region gateway program - obj_store_read = src_program.add_operator( - GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + + if multicast: + # multicast checking + dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] + + # jobs must have same sources and destinations + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" + else: + # unicast checking + for job in jobs: + assert len(job.dst_ifaces) == 1, f"DirectPlanner only support single destination jobs, got {len(job.dst_ifaces)}" + + # jobs must have same sources and destinations + dst_region_tag = jobs[0].dst_ifaces[0].region_tag() + for job in jobs[1:]: + assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" + assert job.dst_ifaces[0].region_tag() == dst_region_tag, "All jobs must have same destination region" + dst_region_tags = [dst_region_tag] + + return src_region_tag, dst_region_tags + + @functools.lru_cache(maxsize=None) + def make_nx_graph(self, tp_grid_path: Optional[Path] = files("skyplane.data") / "throughput.csv") -> nx.DiGraph: + # create throughput / cost graph for all regions for planner + G = nx.DiGraph() + throughput = pd.read_csv(tp_grid_path) + for _, row in throughput.iterrows(): + if row["src_region"] == row["dst_region"]: + continue + G.add_edge(row["src_region"], row["dst_region"], cost=None, throughput=row["throughput_sent"] / 1e9) + + for edge in G.edges.data(): + if edge[-1]["cost"] is None: + edge[-1]["cost"] = compute.CloudProvider.get_transfer_cost(edge[0], edge[1]) + + assert all([edge[-1]["cost"] is not None for edge in G.edges.data()]) + return G + + def add_src_or_overlay_operator( + self, + solution_graph: nx.DiGraph, + gateway_program: GatewayProgram, + region: str, + partition_ids: List[int], + partition_offset: int, + plan: TopologyPlan, + bucket_info: Optional[Tuple[str, str]] = None, + dst_op: Optional[GatewayReceive] = None, + ) -> bool: + """ + :param solution_graph: nx.DiGraph of solution + :param gateway_program: GatewayProgram of region to add operator to + :param region: region to add operator to + :param partition_ids: list of partition ids to add operator to + :param partition_offset: offset of partition ids + :param plan: TopologyPlan of solution [for getting gateway ids] + :param bucket_info: tuple of (bucket_name, bucket_region) for object store + :param dst_op: if None, then this is either the source node or a overlay node; otherwise, this is the destination overlay node + """ + # partition_ids are set of ids that follow the same path from the out edges of the region + any_id = partition_ids[0] - partition_offset + next_regions = set([edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) + + # if partition_ids does not have a next region, then we cannot add an operator + if len(next_regions) == 0: + print( + f"Region {region}, any id: {any_id}, partition ids: {partition_ids}, has no next region to forward data to: {g.out_edges(region, data=True)}" ) - mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=obj_store_read, partition_id=partition_id) - for i in range(self.n_instances): - src_program.add_operator( - GatewaySend(target_gateway_id=dst_gateways[i].gateway_id, region=src_region_tag, num_connections=self.n_connections), - parent_handle=mux_or, - partition_id=partition_id, + return + + # identify if this is a destination overlay node or not + if dst_op is None: + # source node or overlay node + # TODO: add generate data locally operator + if bucket_info is None: + receive_op = GatewayReceive() + else: + receive_op = GatewayReadObjectStore( + bucket_name=bucket_info[0], bucket_region=bucket_info[1], num_connections=self.n_connections ) - - # dst region gateway program - recv_op = dst_program.add_operator(GatewayReceive(), partition_id=partition_id) - dst_program.add_operator( - GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections), parent_handle=recv_op, partition_id=partition_id + else: + # destination overlay node, dst_op is the parent node + receive_op = dst_op + + # find set of regions to send to for all partitions in partition_ids + g = solution_graph + region_to_id_map = {} + for next_region in next_regions: + region_to_id_map[next_region] = [] + for i in range(solution_graph.nodes[next_region]["num_vms"]): + region_to_id_map[next_region].append(plan.get_region_gateways(next_region)[i].gateway_id) + + # use muxand or muxor for partition_id + operation = "MUX_AND" if len(next_regions) > 1 else "MUX_OR" + mux_op = GatewayMuxAnd() if len(next_regions) > 1 else GatewayMuxOr() + + # non-dst node: add receive_op into gateway program + if dst_op is None: + gateway_program.add_operator(op=receive_op, partition_id=tuple(partition_ids)) + + # MUX_AND: send this partition to multiple regions + if operation == "MUX_AND": + if dst_op is not None and dst_op.op_type == "mux_and": + mux_op = receive_op + else: # do not add any nested mux_and if dst_op parent is mux_and + gateway_program.add_operator(op=mux_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) + + for next_region, next_region_ids in region_to_id_map.items(): + send_ops = [ + GatewaySend(target_gateway_id=id, region=next_region, num_connections=self.n_connections) for id in next_region_ids + ] + + # if there is more than one region to forward data to, add MUX_OR + if len(next_region_ids) > 1: + mux_or_op = GatewayMuxOr() + gateway_program.add_operator(op=mux_or_op, parent_handle=mux_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operators(ops=send_ops, parent_handle=mux_or_op.handle, partition_id=tuple(partition_ids)) + else: + # otherwise, the parent of send_op is mux_op ("MUX_AND") + assert len(send_ops) == 1 + gateway_program.add_operator(op=send_ops[0], parent_handle=mux_op.handle, partition_id=tuple(partition_ids)) + else: + # only send this partition to a single region + assert len(region_to_id_map) == 1 + next_region = list(region_to_id_map.keys())[0] + ids = [id for next_region_ids in region_to_id_map.values() for id in next_region_ids] + send_ops = [GatewaySend(target_gateway_id=id, region=next_region, num_connections=self.n_connections) for id in ids] + + # if num of gateways > 1, then connect to MUX_OR + if len(ids) > 1: + gateway_program.add_operator(op=mux_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operators(ops=send_ops, parent_handle=mux_op.handle) + else: + gateway_program.add_operators(ops=send_ops, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) + + return True + + def add_dst_operator( + self, + solution_graph, + gateway_program: GatewayProgram, + region: str, + partition_ids: List[int], + partition_offset: int, + plan: TopologyPlan, + obj_store: Tuple[str, str] = None, + ): + # operator that receives data + receive_op = GatewayReceive() + gateway_program.add_operator(receive_op, partition_id=tuple(partition_ids)) + + # operator that writes to the object store + write_op = GatewayWriteObjectStore(bucket_name=obj_store[0], bucket_region=obj_store[1], num_connections=self.n_connections) + + g = solution_graph + + # partition_ids are ids that follow the same path from the out edges of the region + any_id = partition_ids[0] - partition_offset + next_regions = set([edge[1] for edge in g.out_edges(region, data=True) if str(any_id) in edge[-1]["partitions"]]) + + # if no regions to forward data to, write to the object store + if len(next_regions) == 0: + gateway_program.add_operator(write_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) + + # otherwise, receive and write to the object store, then forward data to next regions + else: + mux_and_op = GatewayMuxAnd() + # receive and write + gateway_program.add_operator(mux_and_op, parent_handle=receive_op.handle, partition_id=tuple(partition_ids)) + gateway_program.add_operator(write_op, parent_handle=mux_and_op.handle, partition_id=tuple(partition_ids)) + + # forward: destination nodes are also forwarders + self.add_src_or_overlay_operator( + solution_graph, gateway_program, region, partition_ids, partition_offset, plan, dst_op=mux_and_op ) - # update cost per GB - plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) + def logical_plan_to_topology_plan(self, jobs: List[TransferJob], solution_graph: nx.graph) -> TopologyPlan: + """ + Given a logical plan, construct a gateway program for each region in the logical plan for the given jobs. + """ + # get source and destination regions + src_ifaces, dst_ifaces = [job.src_iface for job in jobs], [job.dst_ifaces for job in jobs] + src_region_tag = src_ifaces[0].region_tag() + dst_region_tags = [dst_iface.region_tag() for dst_iface in dst_ifaces[0]] + + # map from the node to the gateway program + region_to_gateway_program = {region: GatewayProgram() for region in solution_graph.nodes} + + # construct TopologyPlan for all the regions in solution_graph + overlay_region_tags = [node for node in solution_graph.nodes if node != src_region_tag and node not in dst_region_tags] + plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags, overlay_region_tags=overlay_region_tags) + for node in solution_graph.nodes: + plan.add_gateway(node) + + # iterate through all the jobs + for i in range(len(src_ifaces)): + src_bucket = src_ifaces[i].bucket() + dst_buckets = {dst_iface[i].region_tag(): dst_iface[i].bucket() for dst_iface in dst_ifaces} + + # iterate through all the regions in the solution graph + for node in solution_graph.nodes: + node_gateway_program = region_to_gateway_program[node] + partition_to_next_regions = {} + + # give each job a different partition offset i, so we can read/write to different buckets + for j in range(i, i + self.n_partitions): + partition_to_next_regions[j] = set( + [edge[1] for edge in solution_graph.out_edges(node, data=True) if str(j) in edge[-1]["partitions"]] + ) - # set gateway programs - plan.set_gateway_program(src_region_tag, src_program) - plan.set_gateway_program(dst_region_tag, dst_program) + keys_per_set = collections.defaultdict(list) + for key, value in partition_to_next_regions.items(): + keys_per_set[frozenset(value)].append(key) + + list_of_partitions = list(keys_per_set.values()) + + # source node: read from object store or generate random data, then forward data + for partitions in list_of_partitions: + if node == src_region_tag: + self.add_src_or_overlay_operator( + solution_graph, + node_gateway_program, + node, + partitions, + partition_offset=i, + plan=plan, + obj_store=(src_bucket, node), + ) + + # dst receive data, write to object store, forward data if needed + elif node in dst_region_tags: + dst_bucket = dst_buckets[node] + self.add_dst_operator( + solution_graph, + node_gateway_program, + node, + partitions, + partition_offset=i, + plan=plan, + obj_store=(dst_bucket, node), + ) + + # overlay node only forward data + else: + self.add_src_or_overlay_operator( + solution_graph, node_gateway_program, node, partitions, partition_offset=i, plan=plan, obj_store=None + ) + region_to_gateway_program[node] = node_gateway_program + assert len(region_to_gateway_program) > 0, f"Empty gateway program {node}" + + for node in solution_graph.nodes: + plan.set_gateway_program(node, region_to_gateway_program[node]) + + for edge in solution_graph.edges.data(): + src_region, dst_region = edge[0], edge[1] + plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region, dst_region) * ( + len(edge[-1]["partitions"]) / self.n_partitions + ) return plan class MulticastDirectPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() + def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) + + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + graph = nx.DiGraph() + graph.add_node(src_region) + for dst_region in dst_regions: + graph.add_node(dst_region) + graph.add_edge(src_region, dst_region, partitions=[str(i) for i in range(self.n_partitions)]) + + for node in graph.nodes: + graph.nodes[node]["num_vms"] = self.n_instances + return graph def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - src_region_tag = jobs[0].src_iface.region_tag() - dst_region_tags = [iface.region_tag() for iface in jobs[0].dst_ifaces] - # jobs must have same sources and destinations - for job in jobs[1:]: - assert job.src_iface.region_tag() == src_region_tag, "All jobs must have same source region" - assert [iface.region_tag() for iface in job.dst_ifaces] == dst_region_tags, "Add jobs must have same destination set" - - plan = TopologyPlan(src_region_tag=src_region_tag, dest_region_tags=dst_region_tags) - # TODO: use VM limits to determine how many instances to create in each region - # TODO: support on-sided transfers but not requiring VMs to be created in source/destination regions - for i in range(self.n_instances): - plan.add_gateway(src_region_tag) - for dst_region_tag in dst_region_tags: - plan.add_gateway(dst_region_tag) - - # initialize gateway programs per region - dst_program = {dst_region: GatewayProgram() for dst_region in dst_region_tags} - src_program = GatewayProgram() - - # iterate through all jobs - for job in jobs: - src_bucket = job.src_iface.bucket() - src_region_tag = job.src_iface.region_tag() - src_provider = src_region_tag.split(":")[0] - - # give each job a different partition id, so we can read/write to different buckets - partition_id = jobs.index(job) - - # source region gateway program - obj_store_read = src_program.add_operator( - GatewayReadObjectStore(src_bucket, src_region_tag, self.n_connections), partition_id=partition_id + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs) + solution_graph = self.logical_plan(src_region_tag, dst_region_tags) + return self.logical_plan_to_topology_plan(jobs, solution_graph) + + +class MulticastILPPlanner(Planner): + def __init__( + self, + n_instances: int, + n_connections: int, + target_time: Optional[float] = 10000, + n_partitions: Optional[int] = 1, + aws_only: bool = False, + gcp_only: bool = False, + azure_only: bool = False, + ): + super().__init__(n_instances, n_connections, n_partitions) + self.target_time = target_time + self.aws_only = aws_only + self.gcp_only = gcp_only + self.azure_only = azure_only + self.G = super().make_nx_graph() + + def multicast_solution_to_nxgraph(self, solution: BroadcastSolution) -> nx.DiGraph: + """ + Convert ILP solution to logical plan in nx graph + """ + v_result = solution.var_instances_per_region + result = np.array(solution.var_edge_partitions) + result_g = nx.DiGraph() # solution nx graph + for i in range(result.shape[0]): + edge = solution.var_edges[i] + partitions = [str(partition_i) for partition_i in range(result.shape[1]) if result[i][partition_i] > 0.5] + + if len(partitions) == 0: + continue + + src_node, dst_node = edge[0], edge[1] + result_g.add_edge( + src_node, + dst_node, + partitions=partitions, + throughput=self.G[src_node][dst_node]["throughput"], + cost=self.G[src_node][dst_node]["cost"], ) - # send to all destination - mux_and = src_program.add_operator(GatewayMuxAnd(), parent_handle=obj_store_read, partition_id=partition_id) - dst_prefixes = job.dst_prefixes - for i in range(len(job.dst_ifaces)): - dst_iface = job.dst_ifaces[i] - dst_prefix = dst_prefixes[i] - dst_region_tag = dst_iface.region_tag() - dst_bucket = dst_iface.bucket() - dst_gateways = plan.get_region_gateways(dst_region_tag) - - # special case where destination is same region as source - if dst_region_tag == src_region_tag: - src_program.add_operator( - GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), - parent_handle=mux_and, - partition_id=partition_id, - ) - continue - - # can send to any gateway in region - mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=mux_and, partition_id=partition_id) - for i in range(self.n_instances): - private_ip = False - if dst_gateways[i].provider == "gcp" and src_provider == "gcp": - # print("Using private IP for GCP to GCP transfer", src_region_tag, dst_region_tag) - private_ip = True - src_program.add_operator( - GatewaySend( - target_gateway_id=dst_gateways[i].gateway_id, - region=dst_region_tag, - num_connections=int(self.n_connections / len(dst_gateways)), - private_ip=private_ip, - ), - parent_handle=mux_or, - partition_id=partition_id, - ) - # each gateway also recieves data from source - recv_op = dst_program[dst_region_tag].add_operator(GatewayReceive(), partition_id=partition_id) - dst_program[dst_region_tag].add_operator( - GatewayWriteObjectStore(dst_bucket, dst_region_tag, self.n_connections, key_prefix=dst_prefix), - parent_handle=recv_op, - partition_id=partition_id, - ) + for i in range(len(v_result)): + num_vms = int(v_result[i]) + node = solution.var_nodes[i] + if node in result_g.nodes: + result_g.nodes[node]["num_vms"] = num_vms + + def logical_plan( + self, + src_region: str, + dst_regions: List[str], + gbyte_to_transfer: int = 1, + filter_node: bool = False, + filter_edge: bool = False, + solver_verbose: bool = False, + save_lp_path: Optional[str] = None, + ) -> nx.DiGraph: + import cvxpy as cp + + if solver is None: + solver = cp.GUROBI + + problem = BroadcastProblem( + src=src_region, + dsts=dst_regions, + gbyte_to_transfer=gbyte_to_transfer, + instance_limit=self.n_instances, + num_partitions=self.n_partitions, + required_time_budget=self.target_time, + ) + + g = self.G + + # node-approximation + if filter_node: + src_dst_li = [problem.src] + problem.dsts + sampled = [i for i in sample(list(self.G.nodes), 15) if i not in src_dst_li] + g = g.subgraph(src_dst_li + sampled).copy() + print(f"Filter node (only use): {src_dst_li + sampled}") + + cost = np.array([e[2] for e in g.edges(data="cost")]) + tp = np.array([e[2] for e in g.edges(data="throughput")]) + + edges = list(g.edges) + nodes = list(g.nodes) + num_edges, num_nodes = len(edges), len(nodes) + num_dest = len(problem.dsts) + print(f"Num edges: {num_edges}, num nodes: {num_nodes}, num dest: {num_dest}, runtime budget: {problem.required_time_budget}s") + + partition_size_gb = problem.gbyte_to_transfer / problem.num_partitions + partition_size_gbit = partition_size_gb * GBIT_PER_GBYTE + print("Partition size (gbit): ", partition_size_gbit) + + # define variables + p = cp.Variable((num_edges, problem.num_partitions), boolean=True) # whether edge is carrying partition + n = cp.Variable((num_nodes), boolean=True) # whether node transfers partition + f = cp.Variable((num_nodes * problem.num_partitions, num_nodes + 1), integer=True) # enforce flow conservation + v = cp.Variable((num_nodes), integer=True) # number of VMs per region + + # define objective + egress_cost = cp.sum(cost @ p) * partition_size_gb + instance_cost = cp.sum(v) * (problem.cost_per_instance_hr / 3600) * problem.required_time_budget + tot_cost = egress_cost + instance_cost + obj = cp.Minimize(tot_cost) + + # define constants + constraints = [] + + # constraints on VM per region + for i in range(num_nodes): + constraints.append(v[i] <= problem.instance_limit) + constraints.append(v[i] >= 0) + + # constraints to enforce flow between source/dest nodes + for c in range(problem.num_partitions): + for i in range(num_nodes): + for j in range(num_nodes + 1): + if i != j: + if j != num_nodes: + edge = (nodes[i], nodes[j]) + + constraints.append(f[c * num_nodes + i][j] <= p[edges.index(edge)][c] * num_dest) + # p = 0 -> f <= 0 + # p = 1 -> f <= num_dest + constraints.append(f[c * num_nodes + i][j] >= (p[edges.index(edge)][c] - 1) * (num_dest + 1) + 1) + # p = 0 -> f >= -(num_dest) + # p = 1 -> f >= 1 + + constraints.append(f[c * num_nodes + i][j] == -f[c * num_nodes + j][i]) + + # capacity constraint for special node + else: + if nodes[i] in problem.dsts: # only connected to destination nodes + constraints.append(f[c * num_nodes + i][j] <= 1) + else: + constraints.append(f[c * num_nodes + i][j] <= 0) + else: + constraints.append(f[c * num_nodes + i][i] == 0) + + # flow conservation + if nodes[i] != problem.src and i != num_nodes + 1: + constraints.append(cp.sum(f[c * num_nodes + i]) == 0) + + # source must have outgoing flow + constraints.append(cp.sum(f[c * num_nodes + nodes.index(problem.src), :]) == num_dest) + + # special node (connected to all destinations) must recieve all flow + constraints.append(cp.sum(f[c * num_nodes : (c + 1) * num_nodes, -1]) == num_dest) + + # node contained if edge is contained + for edge in edges: + constraints.append(n[nodes.index(edge[0])] >= cp.max(p[edges.index(edge)])) + constraints.append(n[nodes.index(edge[1])] >= cp.max(p[edges.index(edge)])) + + # edge approximation + if filter_edge: + for edge in edges: + if edge[0] != problem.src and edge[1] not in problem.dsts: + # cannot be in graph + constraints.append(p[edges.index(edge)] == 0) + + # throughput constraint + for edge_i in range(num_edges): + node_i = nodes.index(edge[0]) + constraints.append(cp.sum(p[edge_i] * partition_size_gbit) <= problem.required_time_budget * tp[edge_i] * v[node_i]) + + # instance limits + for node in nodes: + region = node.split(":")[0] + if region == "aws": + ingress_limit_gbps, egress_limit_gbps = problem.aws_instance_throughput_limit + elif region == "gcp": + ingress_limit_gbps, egress_limit_gbps = problem.gcp_instance_throughput_limit + elif region == "azure": + ingress_limit_gbps, egress_limit_gbps = problem.azure_instance_throughput_limit + elif region == "cloudflare": # TODO: not supported yet in the tput / cost graph + ingress_limit_gbps, egress_limit_gbps = 1, 1 + + node_i = nodes.index(node) + # egress + i = np.zeros(num_edges) + for e in g.edges: + if e[0] == node: # edge goes to dest + i[edges.index(e)] = 1 + + constraints.append(cp.sum(i @ p) * partition_size_gbit <= problem.required_time_budget * egress_limit_gbps * v[node_i]) + + # ingress + i = np.zeros(num_edges) + for e in g.edges: + # edge goes to dest + if e[1] == node: + i[edges.index(e)] = 1 + constraints.append(cp.sum(i @ p) * partition_size_gbit <= problem.required_time_budget * ingress_limit_gbps * v[node_i]) + + print("Define problem done.") + + # solve + prob = cp.Problem(obj, constraints) + if solver == cp.GUROBI or solver == "gurobi": + solver_options = {} + solver_options["Threads"] = 1 + if save_lp_path: + solver_options["ResultFile"] = str(save_lp_path) + if not solver_verbose: + solver_options["OutputFlag"] = 0 + cost = prob.solve(verbose=solver_verbose, qcp=True, solver=cp.GUROBI, reoptimize=True, **solver_options) + elif solver == cp.CBC or solver == "cbc": + solver_options = {} + solver_options["maximumSeconds"] = 60 + solver_options["numberThreads"] = 1 + cost = prob.solve(verbose=solver_verbose, solver=cp.CBC, **solver_options) + else: + cost = prob.solve(solver=solver, verbose=solver_verbose) + + if prob.status == "optimal": + solution = BroadcastSolution( + problem=problem, + is_feasible=True, + var_edges=edges, + var_nodes=nodes, + var_edge_partitions=p.value, + var_node_transfer_partitions=n.value, + var_instances_per_region=v.value, + var_flow=f.value, + cost_egress=egress_cost.value, + cost_instance=instance_cost.value, + cost_total=tot_cost.value, + ) + else: + solution = BroadcastSolution(problem=problem, is_feasible=False, extra_data=dict(status=prob.status)) - # update cost per GB - plan.cost_per_gb += compute.CloudProvider.get_transfer_cost(src_region_tag, dst_region_tag) + return self.multicast_solution_to_nxgraph(solution) - # set gateway programs - plan.set_gateway_program(src_region_tag, src_program) - for dst_region_tag, program in dst_program.items(): - if dst_region_tag != src_region_tag: # don't overwrite - plan.set_gateway_program(dst_region_tag, program) - return plan + def plan(self, jobs: List[TransferJob]) -> TopologyPlan: + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs, multicast=True) + solution_graph = self.logical_plan(src_region_tag, dst_region_tags) + return self.logical_plan_to_topology_plan(jobs, solution_graph) -class UnicastILPPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections - self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() +class MulticastMDSTPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) + self.G = super().make_nx_graph() + + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + h = self.G.copy() + h.remove_edges_from(list(h.in_edges(src_region)) + list(nx.selfloop_edges(h))) + + DST_graph = nx.algorithms.tree.Edmonds(h.subgraph([src_region] + dst_regions)) + opt_DST = DST_graph.find_optimum(attr="cost", kind="min", preserve_attrs=True, style="arborescence") + + # Construct MDST graph + MDST_graph = nx.DiGraph() + for edge in list(opt_DST.edges()): + s, d = edge[0], edge[1] + MDST_graph.add_edge(s, d, partitions=[str(i) for i in list(range(self.num_partitions))]) + + for node in MDST_graph.nodes: + MDST_graph.nodes[node]["num_vms"] = self.n_instances + + return MDST_graph def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("ILP solver not implemented yet") + src_region_tag, dst_region_tags = self.verify_job_src_dsts(jobs, multicast=True) + solution_graph = self.logical_plan(src_region_tag, dst_region_tags) + return self.logical_plan_to_topology_plan(jobs, solution_graph) -class MulticastILPPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float): - self.n_instances = n_instances - self.n_connections = n_connections - self.solver_required_throughput_gbits = required_throughput_gbits - super().__init__() +class UnicastDirectPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) + + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + graph = nx.DiGraph() + graph.add_node(src_region) + for dst_region in dst_regions: + graph.add_node(dst_region) + graph.add_edge(src_region, dst_region, partitions=[str(i) for i in range(self.n_partitions)]) + + for node in graph.nodes: + graph.nodes[node]["num_vms"] = self.n_instances + + return graph def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("ILP solver not implemented yet") + src_region_tag, dst_region_tag = self.verify_job_src_dsts(jobs) + solution_graph = self.logical_plan(src_region_tag, dst_region_tag) + return self.logical_plan_to_topology_plan(jobs, solution_graph) -class MulticastMDSTPlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() +class UnicastILPPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) + self.solver_required_throughput_gbits = required_throughput_gbits + + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + problem = ThroughputProblem( + src=src_region, + dst=dst_regions, + required_throughput_gbits=self.solver_required_throughput_gbits, + gbyte_to_transfer=1, + instance_limit=self.n_instances, + ) + + with path("skyplane.data", "throughput.csv") as solver_throughput_grid: + tput = ThroughputSolverILP(solver_throughput_grid) + solution = tput.solve_min_cost(problem) + + if not solution.is_feasible: + raise RuntimeError("No feasible solution found") + + return tput.to_replication_topology(solution) def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("MDST solver not implemented yet") + src_region_tag, dst_region_tag = self.verify_job_src_dsts(jobs) + solution_graph = self.logical_plan(src_region_tag, dst_region_tag) + return self.logical_plan_to_topology_plan(jobs, solution_graph) -class MulticastSteinerTreePlanner(Planner): - def __init__(self, n_instances: int, n_connections: int): - self.n_instances = n_instances - self.n_connections = n_connections - super().__init__() +class UnicastRONSolverPlanner(Planner): + def __init__(self, n_instances: int, n_connections: int, required_throughput_gbits: float, n_partitions: Optional[int] = 1): + super().__init__(n_instances, n_connections, n_partitions) + self.solver_required_throughput_gbits = required_throughput_gbits + + def logical_plan(self, src_region: str, dst_regions: List[str]) -> nx.DiGraph: + raise NotImplementedError("RON solver not implemented yet") def plan(self, jobs: List[TransferJob]) -> TopologyPlan: - raise NotImplementedError("Steiner tree solver not implemented yet") + raise NotImplementedError("RON solver not implemented yet") \ No newline at end of file diff --git a/skyplane/planner/solver.py b/skyplane/planner/solver.py index 5a18f1ed4..121332bf2 100644 --- a/skyplane/planner/solver.py +++ b/skyplane/planner/solver.py @@ -15,6 +15,86 @@ GBIT_PER_GBYTE = 8 +@dataclass +class BroadcastProblem: + src: str + dsts: List[str] + + gbyte_to_transfer: float + instance_limit: int # max # of vms per region + num_partitions: int + + required_time_budget: float = 10 # ILP specific, default to 10s + + const_throughput_grid_gbits: Optional[np.ndarray] = None # if not set, load from profiles + const_cost_per_gb_grid: Optional[np.ndarray] = None # if not set, load from profiles + + # provider bandwidth limits (ingress, egress) + aws_instance_throughput_limit: Tuple[float, float] = (10, 5) + gcp_instance_throughput_limit: Tuple[float, float] = (16, 7) # limited to 12.5 gbps due to CPU limit + azure_instance_throughput_limit: Tuple[float, float] = (16, 16) # limited to 12.5 gbps due to CPU limit + + # benchmarked_throughput_connections is the number of connections that the iperf3 throughput grid was run at, + # we assume throughput is linear up to this connection limit + benchmarked_throughput_connections = 64 + cost_per_instance_hr = 0.54 # based on m5.8xlarge spot + instance_cost_multiplier = 1.0 + # instance_provision_time_s = 0.0 + + def to_summary_dict(self): + """Simple summary of the problem""" + return { + "src": self.src, + "dsts": self.dsts, + "gbyte_to_transfer": self.gbyte_to_transfer, + "instance_limit": self.instance_limit, + "num_partitions": self.num_partitions, + "required_time_budget": self.required_time_budget, + "aws_instance_throughput_limit": self.aws_instance_throughput_limit, + "gcp_instance_throughput_limit": self.gcp_instance_throughput_limit, + "azure_instance_throughput_limit": self.azure_instance_throughput_limit, + "benchmarked_throughput_connections": self.benchmarked_throughput_connections, + "cost_per_instance_hr": self.cost_per_instance_hr, + "instance_cost_multiplier": self.instance_cost_multiplier + # "instance_provision_time_s": self.instance_provision_time_s, + } + + +@dataclass +class BroadcastSolution: + problem: BroadcastProblem + is_feasible: bool + extra_data: Optional[Dict] = None + + var_edges: Optional[List] = None # need to fix this, just for testing + var_nodes: Optional[List] = None # need to fix this, just for testing + + # solution variables + var_edge_partitions: Optional[np.ndarray] = None # each edge carries each partition or not + var_node_transfer_partitions: Optional[np.ndarray] = None # whether node transfers partition + var_instances_per_region: Optional[np.ndarray] = None # number of VMs per region + var_flow: Optional[np.ndarray] = None # enforce flow conservation, just used for checking + + # solution values + cost_egress: Optional[float] = None + cost_instance: Optional[float] = None + cost_total: Optional[float] = None + transfer_runtime_s: Optional[float] = None # NOTE: might not be able to calculate here + throughput_achieved_gbits: Optional[List[float]] = None # NOTE: might not be able to calculate here + + def to_summary_dict(self): + """Print simple summary of solution.""" + return { + "is_feasible": self.is_feasible, + "solution": { + "cost_egress": self.cost_egress, + "cost_instance": self.cost_instance, + "cost_total": self.cost_total, + "time_budget": self.problem.required_time_budget, + }, + } + + @dataclass class ThroughputProblem: src: str @@ -348,4 +428,4 @@ def to_replication_topology(self, solution: ThroughputSolution, scale_to_capacit ) obj_store_edges.add(("dst", e.dst_region, e.dst_instance_idx)) - return replication_topology, scale_factor + return replication_topology, scale_factor \ No newline at end of file diff --git a/skyplane/utils/path.py b/skyplane/utils/path.py index ea23e781b..c607601ee 100644 --- a/skyplane/utils/path.py +++ b/skyplane/utils/path.py @@ -1,7 +1,6 @@ import re from pathlib import Path from typing import Optional, Tuple - from skyplane.utils import logger @@ -62,3 +61,14 @@ def is_plausible_local_path(path_test: str): # path is subsitutute for bucket return "local", path, path + + +def parse_multi_paths(paths): + if isinstance(paths, str): + paths = [paths] + + parsed_paths = [] + for path in paths: + p = parse_path(path) + parsed_paths.append(p) + return list(zip(*parsed_paths)) \ No newline at end of file