From 006693f67b17258518d50478554d36eb12954f68 Mon Sep 17 00:00:00 2001 From: phoenixdong Date: Tue, 11 Jun 2024 12:01:54 +0800 Subject: [PATCH] add format check --- .github/workflows/format.yml | 32 +++++++ .pre-commit-config.yaml | 19 ++++ flagscale/auto_tuner/generate.py | 12 +-- flagscale/auto_tuner/prune/history.py | 75 +++++++++------ flagscale/auto_tuner/prune/pruner.py | 1 + flagscale/auto_tuner/record/recorder.py | 116 ++++++++++++++--------- flagscale/auto_tuner/search/algorithm.py | 12 ++- flagscale/auto_tuner/search/searcher.py | 6 +- flagscale/auto_tuner/tuner.py | 77 ++++++++------- flagscale/datasets/sft_dataset.py | 56 +++++++---- flagscale/logger.py | 11 ++- flagscale/patches_utils.py | 9 +- 12 files changed, 281 insertions(+), 145 deletions(-) create mode 100644 .github/workflows/format.yml create mode 100644 .pre-commit-config.yaml diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml new file mode 100644 index 000000000..85d2d9875 --- /dev/null +++ b/.github/workflows/format.yml @@ -0,0 +1,32 @@ +name: format + +on: + pull_request: + branches: [ "main" ] + types: [opened, synchronize, reopened] + +jobs: + format: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.10 + uses: actions/setup-python@v2 + with: + python-version: "3.10" + - name: Install dependencies + run: | + pip install black + - name: Run Black + run: >- + black --check --diff --include + flagscale/auto_tuner/*.py + flagscale/auto_tuner/prune/*.py + flagscale/auto_tuner/record/*.py + flagscale/auto_tuner/search/*.py + flagscale/launcher/*.py + flagscale/logger.py + flagscale/patches_utils.py + flagscale/datasets/sft_dataset.py + ./ \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000..2c77b8d4e --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,19 @@ +repos: + - repo: local + hooks: + - id: black + name: black + entry: black + language: system + types: [python] + files: | + (?x)^( + flagscale/auto_tuner/.*\.py| + flagscale/auto_tuner/prune/\..*\.py| + flagscale/auto_tuner/record/\..*\.py| + flagscale/auto_tuner/search/\..*\.py| + flagscale/launcher/\..*\.py| + flagscale/logger\.py| + flagscale/patches_utils\.py| + flagscale/datasets/sft_dataset\.py + )$ \ No newline at end of file diff --git a/flagscale/auto_tuner/generate.py b/flagscale/auto_tuner/generate.py index 18c0cd6b2..3baa32f44 100644 --- a/flagscale/auto_tuner/generate.py +++ b/flagscale/auto_tuner/generate.py @@ -16,8 +16,7 @@ def __init__(self, config): "tensor_model_parallel_size": "tensor_model_parallel_size", "sequence_parallel": "sequence_parallel", "pipeline_model_parallel_size": "pipeline_model_parallel_size", - "num_layers_per_virtual_pipeline_stage": - "num_layers_per_virtual_pipeline_stage", + "num_layers_per_virtual_pipeline_stage": "num_layers_per_virtual_pipeline_stage", "recompute_method": "recompute_method", "recompute_granularity": "recompute_granularity", "recompute_num_layers": "recompute_num_layers", @@ -81,14 +80,15 @@ def gen(self, strategy): # Set train_iters of each task if "control" in config.experiment.auto_tuner: config.train.model.train_iters = config.experiment.auto_tuner.control.get( - "train_iters", 5) + "train_iters", 5 + ) else: config.train.model.train_iters = 5 # log dir - config.experiment.exp_dir = os.path.join(config.experiment.exp_dir, - "auto_tuner", - f"task_{strategy['idx']}") + config.experiment.exp_dir = os.path.join( + config.experiment.exp_dir, "auto_tuner", f"task_{strategy['idx']}" + ) return config diff --git a/flagscale/auto_tuner/prune/history.py b/flagscale/auto_tuner/prune/history.py index 5359d7a5a..a0bed9ead 100644 --- a/flagscale/auto_tuner/prune/history.py +++ b/flagscale/auto_tuner/prune/history.py @@ -26,8 +26,7 @@ def prune_by_micro_batch_size(config, strategy, history=[]): if retrieval: for item in retrieval: # performance prune - if item["micro_batch_size"] > micro_batch_size and item[ - "performance"]: + if item["micro_batch_size"] > micro_batch_size and item["performance"]: logger.info( f"The strategy {strategy} has been pruned by micro_batch_size performance." ) @@ -36,8 +35,7 @@ def prune_by_micro_batch_size(config, strategy, history=[]): strategy["pruned"] = True return True # memory prune - if item["micro_batch_size"] < micro_batch_size and item[ - "max_mem"] == "OOM": + if item["micro_batch_size"] < micro_batch_size and item["max_mem"] == "OOM": logger.info( f"The strategy {strategy} has been pruned by micro_batch_size memory." ) @@ -91,10 +89,13 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True - if (use_recompute and item["use_recompute"] - and recompute_method == "block" - and recompute_method == item["recompute_method"] - and item["performance"]): + if ( + use_recompute + and item["use_recompute"] + and recompute_method == "block" + and recompute_method == item["recompute_method"] + and item["performance"] + ): if recompute_num_layers > item["recompute_num_layers"]: logger.info( f"The strategy {strategy} has been pruned by block recompute_num_layers performance." @@ -104,10 +105,13 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True - if (use_recompute and item["use_recompute"] - and recompute_method == "uniform" - and recompute_method == item["recompute_method"] - and item["performance"]): + if ( + use_recompute + and item["use_recompute"] + and recompute_method == "uniform" + and recompute_method == item["recompute_method"] + and item["performance"] + ): if recompute_num_layers > item["recompute_num_layers"]: logger.info( f"The strategy {strategy} has been pruned by uniform recompute_num_layers performance." @@ -117,8 +121,7 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True # memory prune - if not use_recompute and item["use_recompute"] and item[ - "max_mem"] == "OOM": + if not use_recompute and item["use_recompute"] and item["max_mem"] == "OOM": logger.info( f"The strategy {strategy} has been pruned by use_recompute memory." ) @@ -127,11 +130,16 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True - if (use_recompute and item["use_recompute"] - and recompute_method == "uniform" - and recompute_method == item["recompute_method"]): - if (recompute_num_layers > item["recompute_num_layers"] - and item["max_mem"] == "OOM"): + if ( + use_recompute + and item["use_recompute"] + and recompute_method == "uniform" + and recompute_method == item["recompute_method"] + ): + if ( + recompute_num_layers > item["recompute_num_layers"] + and item["max_mem"] == "OOM" + ): logger.info( f"The strategy {strategy} has been pruned by uniform recompute_num_layers memory." ) @@ -140,11 +148,16 @@ def prune_by_recompute(config, strategy, history=[]): strategy["pruned"] = True return True - if (use_recompute and item["use_recompute"] - and recompute_method == "block" - and recompute_method == item["recompute_method"]): - if (recompute_num_layers < item["recompute_num_layers"] - and item["max_mem"] == "OOM"): + if ( + use_recompute + and item["use_recompute"] + and recompute_method == "block" + and recompute_method == item["recompute_method"] + ): + if ( + recompute_num_layers < item["recompute_num_layers"] + and item["max_mem"] == "OOM" + ): logger.info( f"The strategy {strategy} has been pruned by block recompute_num_layers memory." ) @@ -163,8 +176,11 @@ def prune_by_sequence_parallel(config, strategy, history=[]): if retrieval: for item in retrieval: # performance prune - if item["sequence_parallel"] and item[ - "performance"] and not sequence_parallel: + if ( + item["sequence_parallel"] + and item["performance"] + and not sequence_parallel + ): logger.info( f"The strategy {strategy} has been pruned by sequence_parallel performance." ) @@ -173,8 +189,11 @@ def prune_by_sequence_parallel(config, strategy, history=[]): strategy["pruned"] = True return True # memory prune - if item["sequence_parallel"] and item[ - "max_mem"] == "OOM" and not sequence_parallel: + if ( + item["sequence_parallel"] + and item["max_mem"] == "OOM" + and not sequence_parallel + ): logger.info( f"The strategy {strategy} has been pruned by sequence_parallel memory." ) diff --git a/flagscale/auto_tuner/prune/pruner.py b/flagscale/auto_tuner/prune/pruner.py index 7c332da7f..9f521283c 100644 --- a/flagscale/auto_tuner/prune/pruner.py +++ b/flagscale/auto_tuner/prune/pruner.py @@ -14,6 +14,7 @@ def prune(self, strategy, history=[]): if func(self.config, strategy, history): not_run = True break + history.append(strategy) if not_run: self.pruned_count += 1 diff --git a/flagscale/auto_tuner/record/recorder.py b/flagscale/auto_tuner/record/recorder.py index 3b9f216dd..97ac15b9c 100644 --- a/flagscale/auto_tuner/record/recorder.py +++ b/flagscale/auto_tuner/record/recorder.py @@ -15,16 +15,24 @@ def __init__(self, config): "history.csv", ) # Metric to grep in the last rank of last node log file - if "auto_tuner" in self.config and "performance" in self.config.experiment.auto_tuner: + if ( + "auto_tuner" in self.config + and "performance" in self.config.experiment.auto_tuner + ): self.metric = self.config.experiment.auto_tuner.performance.get( - "name", "elapsed time per iteration \(ms\):") + "name", "elapsed time per iteration \(ms\):" + ) else: self.metric = "elapsed time per iteration \(ms\):" # Sort order of performance, order just in [ascend, and descend], default ascend - if "auto_tuner" in self.config and "performance" in self.config.experiment.auto_tuner: + if ( + "auto_tuner" in self.config + and "performance" in self.config.experiment.auto_tuner + ): self.sorted_order = self.config.experiment.auto_tuner.performance.get( - "order", "ascend") + "order", "ascend" + ) else: self.sorted_order = "ascend" @@ -46,8 +54,7 @@ def record(self, task, strategy): # If task is stopped by autotuner, task may not be failed,just hang or too slow. elif self.cur_strategy.get("stopped_by_tuner", False): - performace = self.grep_performance(peformance_path, - self.metric) + performace = self.grep_performance(peformance_path, self.metric) strategy["performance"] = performace strategy["max_mem"] = self.grep_max_memory(host_path) strategy["error"] = None @@ -66,9 +73,11 @@ def record(self, task, strategy): strategy["error"] = None # Pass back to platform if need - if ("airs_switch" in self.config.experiment.auto_tuner.platform - and self.config.experiment.auto_tuner.platform.airs_switch - and strategy["performance"]): + if ( + "airs_switch" in self.config.experiment.auto_tuner.platform + and self.config.experiment.auto_tuner.platform.airs_switch + and strategy["performance"] + ): self.pass_back_to_platform(strategy) def pass_back_to_platform(self, strategy): @@ -76,8 +85,9 @@ def pass_back_to_platform(self, strategy): seq_len = int(self.config.train.model.seq_length) throughput = gbs * seq_len / (strategy["performance"] / 1000) day = round( - self.config.train.model.train_samples * seq_len / - (throughput * 60 * 60 * 24), + self.config.train.model.train_samples + * seq_len + / (throughput * 60 * 60 * 24), 2, ) command = [ @@ -85,8 +95,11 @@ def pass_back_to_platform(self, strategy): "-D", f"{strategy['data_parallel_size']}", "--distributed_optimizer", - (f"{strategy['use_distributed_optimizer']}" if - strategy["use_distributed_optimizer"] is not None else "False"), + ( + f"{strategy['use_distributed_optimizer']}" + if strategy["use_distributed_optimizer"] is not None + else "False" + ), "-E", f"{strategy['expert_model_parallel_size']}", "-C", @@ -96,22 +109,37 @@ def pass_back_to_platform(self, strategy): "-L", f"{strategy['pipeline_model_parallel_size']}", "-G", - (f"{strategy['recompute_granularity']}" - if strategy["recompute_granularity"] else "None"), + ( + f"{strategy['recompute_granularity']}" + if strategy["recompute_granularity"] + else "None" + ), "-R", - (f"{strategy['recompute_method']}" - if strategy["recompute_granularity"] else "None"), + ( + f"{strategy['recompute_method']}" + if strategy["recompute_granularity"] + else "None" + ), "-N", - (f"{strategy['recompute_num_layers']}" - if strategy["recompute_num_layers"] else "0"), + ( + f"{strategy['recompute_num_layers']}" + if strategy["recompute_num_layers"] + else "0" + ), "-S", - (f"{strategy['sequence_parallel']}" - if strategy["sequence_parallel"] is not None else "False"), + ( + f"{strategy['sequence_parallel']}" + if strategy["sequence_parallel"] is not None + else "False" + ), "-T", f"{strategy['tensor_model_parallel_size']}", "-V", - (f"{strategy['num_layers_per_virtual_pipeline_stage']}" - if strategy["num_layers_per_virtual_pipeline_stage"] else "0"), + ( + f"{strategy['num_layers_per_virtual_pipeline_stage']}" + if strategy["num_layers_per_virtual_pipeline_stage"] + else "0" + ), "--throughput", f"{int(throughput)}", "--day", @@ -156,8 +184,7 @@ def grep_max_memory(self, path, pattern="max reserved"): except: continue assert value is not None, "Can't grep the max memory" - self.logger.info( - f"task_{self.cur_strategy['idx']} max_memory: {max_memory}") + self.logger.info(f"task_{self.cur_strategy['idx']} max_memory: {max_memory}") return max_memory def get_performance_and_host_path(self, task): @@ -183,10 +210,10 @@ def get_performance_and_host_path(self, task): outputs = os.listdir(os.path.join(details, max_host)) assert len(outputs) == 1, f"the sub dir of {outputs} must be just one." new_outputs = os.listdir(os.path.join(details, max_host, outputs[0])) - assert len(new_outputs - ) == 1, f"the sub dir of {new_outputs} must be just one." - last_path = os.path.join(details, max_host, outputs[0], new_outputs[0], - "attempt_0") + assert len(new_outputs) == 1, f"the sub dir of {new_outputs} must be just one." + last_path = os.path.join( + details, max_host, outputs[0], new_outputs[0], "attempt_0" + ) last_dir = None last_dir_rank = 0 for item in os.listdir(last_path): @@ -202,9 +229,7 @@ def get_performance_and_host_path(self, task): raise ValueError("The log file does not exist.") return log_path, logs - def grep_performance(self, - path, - pattern="elapsed time per iteration \(ms\):"): + def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"): """Read the log file and return the performance.""" metric_pattern = pattern + r":* *(\d+(\.\d*)?)|(\d+(\.\d*)?) *" + pattern if not path or not os.path.exists(path): @@ -228,8 +253,7 @@ def grep_performance(self, continue assert value is not None, "Can't grep the performance" if not performance: - self.logger.info( - f"task_{self.cur_strategy['idx']} performance: {None}") + self.logger.info(f"task_{self.cur_strategy['idx']} performance: {None}") return None if len(performance) == 1: self.logger.info( @@ -238,8 +262,7 @@ def grep_performance(self, return round(performance[0], 3) else: average = sum(performance[1:]) / (len(performance) - 1) - self.logger.info( - f"task_{self.cur_strategy['idx']} performance: {average}") + self.logger.info(f"task_{self.cur_strategy['idx']} performance: {average}") return round(average, 3) def grep_error(self, path, pattern="Error"): @@ -267,8 +290,7 @@ def grep_error(self, path, pattern="Error"): else: errors_info.add(line) - self.logger.info( - f"task_{self.cur_strategy['idx']} error: {errors_info}") + self.logger.info(f"task_{self.cur_strategy['idx']} error: {errors_info}") return errors_info def sort(self, history): @@ -281,21 +303,20 @@ def sort(self, history): if self.sorted_order == "ascend": sorted_history = sorted( no_pruned_history, - key=lambda x: - (x["performance"] - if x["performance"] is not None else float("inf")), + key=lambda x: ( + x["performance"] if x["performance"] is not None else float("inf") + ), ) elif self.sorted_order == "descend": sorted_history = sorted( no_pruned_history, - key=lambda x: - (x["performance"] - if x["performance"] is not None else float("-inf")), + key=lambda x: ( + x["performance"] if x["performance"] is not None else float("-inf") + ), reverse=True, ) else: - raise ValueError( - f"The sorted order {self.sorted_order} is not supported.") + raise ValueError(f"The sorted order {self.sorted_order} is not supported.") assert sorted_history is not None return sorted_history @@ -309,4 +330,5 @@ def save(self, history): df = df.reindex(columns=cols) if "stopped_by_tuner" in df.columns: df = df.drop(columns=["stopped_by_tuner"]) - df.to_csv(self.path, index=False, escapechar='\\') + + df.to_csv(self.path, index=False, escapechar="\\") diff --git a/flagscale/auto_tuner/search/algorithm.py b/flagscale/auto_tuner/search/algorithm.py index b36df980f..a9c03dc3a 100644 --- a/flagscale/auto_tuner/search/algorithm.py +++ b/flagscale/auto_tuner/search/algorithm.py @@ -24,15 +24,19 @@ def __init__(self, strategies, config): def checkout(self, mode): if mode == "memory": from ..utils import sort_by_memory + if self.idx > 0 and self.idx < len(self.strategies): - self.strategies = self.strategies[:self.idx] + sorted( - self.strategies[self.idx:], key=sort_by_memory) + self.strategies = self.strategies[: self.idx] + sorted( + self.strategies[self.idx :], key=sort_by_memory + ) elif mode == "performance": from ..utils import sort_by_performance + if self.idx > 0 and self.idx < len(self.strategies): - self.strategies = self.strategies[:self.idx] + sorted( - self.strategies[self.idx:], key=sort_by_performance) + self.strategies = self.strategies[: self.idx] + sorted( + self.strategies[self.idx :], key=sort_by_performance + ) def search(self): """Return a task iteratively.""" diff --git a/flagscale/auto_tuner/search/searcher.py b/flagscale/auto_tuner/search/searcher.py index 0a4835643..dcbffba22 100644 --- a/flagscale/auto_tuner/search/searcher.py +++ b/flagscale/auto_tuner/search/searcher.py @@ -151,8 +151,10 @@ def build_space(self, config): # Set virtual pipeline parallel degree space["num_layers_per_virtual_pipeline_stage"] = ( [i for i in range(1, num_layers + 1)] - if "num_layers_per_virtual_pipeline_stage" not in config.experiment.auto_tuner.space - or config.experiment.auto_tuner.space.num_layers_per_virtual_pipeline_stage == "auto" + if "num_layers_per_virtual_pipeline_stage" + not in config.experiment.auto_tuner.space + or config.experiment.auto_tuner.space.num_layers_per_virtual_pipeline_stage + == "auto" else config.experiment.auto_tuner.space.num_layers_per_virtual_pipeline_stage ) self._sort( diff --git a/flagscale/auto_tuner/tuner.py b/flagscale/auto_tuner/tuner.py index abe20f53e..d8bf92f8c 100644 --- a/flagscale/auto_tuner/tuner.py +++ b/flagscale/auto_tuner/tuner.py @@ -33,7 +33,8 @@ def __init__(self, config: DictConfig): handler = logging.FileHandler(log_path, mode="w") handler.setLevel(logging.INFO) formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s") + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) handler.setFormatter(formatter) logger.addHandler(handler) self.logger = logger @@ -59,8 +60,7 @@ def __init__(self, config: DictConfig): # The interval of task monitoring if "control" not in self.config.experiment.auto_tuner: self.config.experiment.auto_tuner.control = {} - self.interval = self.config.experiment.auto_tuner.control.get( - "interval", 10) + self.interval = self.config.experiment.auto_tuner.control.get("interval", 10) # Set platform envs if "platform" not in self.config.experiment.auto_tuner: @@ -71,38 +71,42 @@ def __init__(self, config: DictConfig): self.config.experiment.auto_tuner.platform.airs_switch = True if os.environ.get("AIRS_SIZE", None): - self.config.experiment.auto_tuner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.config.experiment.auto_tuner.nnodes = int(os.environ["AIRS_SIZE"]) # Set original config - self.orig_config.experiment.runner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.orig_config.experiment.runner.nnodes = int(os.environ["AIRS_SIZE"]) # Set config - self.config.experiment.runner.nnodes = int( - os.environ["AIRS_SIZE"]) + self.config.experiment.runner.nnodes = int(os.environ["AIRS_SIZE"]) if os.environ.get("AIRS_ACCELERATOR_COUNT", None): self.config.experiment.auto_tuner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) # Set original config self.orig_config.experiment.runner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) # Set config self.config.experiment.runner.nproc_per_node = int( - os.environ["AIRS_ACCELERATOR_COUNT"]) + os.environ["AIRS_ACCELERATOR_COUNT"] + ) if os.environ.get("AIRS_FBMEM", None): - self.config.experiment.auto_tuner.memory = int( - os.environ["AIRS_FBMEM"]) + self.config.experiment.auto_tuner.memory = int(os.environ["AIRS_FBMEM"]) if os.environ.get("AIRS_HOSTFILE_PATH", None): # Set original config self.orig_config.experiment.runner.hostfile = os.environ[ - "AIRS_HOSTFILE_PATH"] + "AIRS_HOSTFILE_PATH" + ] # Set config self.config.experiment.runner.hostfile = os.environ[ - "AIRS_HOSTFILE_PATH"] + "AIRS_HOSTFILE_PATH" + ] - self.config.experiment.auto_tuner.cards = self.config.experiment.auto_tuner.nnodes * self.config.experiment.auto_tuner.nproc_per_node + self.config.experiment.auto_tuner.cards = ( + self.config.experiment.auto_tuner.nnodes + * self.config.experiment.auto_tuner.nproc_per_node + ) # Build core sub modules, such as Searcher, Pruner, Generator and Recorder self.searcher = Searcher(self.config) @@ -116,11 +120,11 @@ def __init__(self, config: DictConfig): # The max time per task, unit: second # NOTE: The task will be stopped if the time is reached or done. self.max_time_per_task = self.config.experiment.auto_tuner.control.get( - "max_time_per_task", 300) + "max_time_per_task", 300 + ) # The max time of auto tuner, if None, no limit. - self.max_time = self.config.experiment.auto_tuner.control.get( - "max_time", None) + self.max_time = self.config.experiment.auto_tuner.control.get("max_time", None) # The start time of each task, used to control each task when stop self.start_task_time = None @@ -158,9 +162,11 @@ def tune(self): self.logger.info(f"Record task_{self.idx}:") self.record() - if (self.cur_strategy["performance"] - and self.config.experiment.auto_tuner.platform.get( - "airs_switch", False) and not self.has_checkout): + if ( + self.cur_strategy["performance"] + and self.config.experiment.auto_tuner.platform.get("airs_switch", False) + and not self.has_checkout + ): self.checkout() # get best strategy @@ -173,7 +179,8 @@ def tune(self): self.logger.info(f"No strategy can run so far.") tuner_end_time = time.time() self.logger.info( - f"AutoTuner Ended in {tuner_end_time - tuner_start_time} seconds.") + f"AutoTuner Ended in {tuner_end_time - tuner_start_time} seconds." + ) # Run the best task if self.config.experiment.auto_tuner.control.get("run_best", True): @@ -182,8 +189,7 @@ def tune(self): self.logger.info(f"Run best Strategy: {best_strategy}") else: raise ValueError(f"No strategy can run.") - best_task = self.generator.gen_best_task(best_strategy, - self.orig_config) + best_task = self.generator.gen_best_task(best_strategy, self.orig_config) best_task.action = "run" runner = SSHRunner(best_task) runner.run(monitor=True, interval=60) @@ -259,7 +265,8 @@ def monitor(self): try: status = self.runner._query_status() self.logger.info( - f"task_{self.cur_strategy['idx']} status: {status.name}") + f"task_{self.cur_strategy['idx']} status: {status.name}" + ) if status == JobStatus.COMPLETED_OR_IDLE: break if status == JobStatus.RUNNING: @@ -276,15 +283,18 @@ def monitor(self): end_time = time.time() # Add elapsed time - self.cur_strategy["elapsed_time"] = round( - end_time - self.task_start_time, 2) + self.cur_strategy["elapsed_time"] = round(end_time - self.task_start_time, 2) # Add start time readable_task_start_time = datetime.datetime.fromtimestamp( - self.task_start_time).strftime("%Y-%m-%d %H:%M:%S") + self.task_start_time + ).strftime("%Y-%m-%d %H:%M:%S") self.cur_strategy["start_time"] = readable_task_start_time - self.logger.info("task_{} monitor time: {:.2f}s".format( - self.cur_strategy["idx"], self.cur_strategy["elapsed_time"])) + self.logger.info( + "task_{} monitor time: {:.2f}s".format( + self.cur_strategy["idx"], self.cur_strategy["elapsed_time"] + ) + ) def record(self): """Record the task result to csv""" @@ -293,7 +303,6 @@ def record(self): def get_best(self): sorted_history = self.recorder.sort(self.history) - if sorted_history and sorted_history[0] and sorted_history[0][ - "performance"]: + if sorted_history and sorted_history[0] and sorted_history[0]["performance"]: return sorted_history[0] return None diff --git a/flagscale/datasets/sft_dataset.py b/flagscale/datasets/sft_dataset.py index 874d9209b..0aa99a8e1 100644 --- a/flagscale/datasets/sft_dataset.py +++ b/flagscale/datasets/sft_dataset.py @@ -9,12 +9,21 @@ import numpy import torch -from megatron.core.datasets.gpt_dataset import GPTDataset, GPTDatasetConfig, _get_ltor_masks_and_position_ids -from megatron.core.datasets.indexed_dataset import IndexedDataset, get_bin_path, get_idx_path +from megatron.core.datasets.gpt_dataset import ( + GPTDataset, + GPTDatasetConfig, + _get_ltor_masks_and_position_ids, +) +from megatron.core.datasets.indexed_dataset import ( + IndexedDataset, + get_bin_path, + get_idx_path, +) from megatron.core.datasets.utils import Split logger = logging.getLogger(__name__) + @dataclass class SFTDatasetConfig(GPTDatasetConfig): """Configuration object for Megatron Core SFT datasets""" @@ -50,35 +59,44 @@ def __init__( config: SFTDatasetConfig, ) -> None: self.config = config - self.apply_sft_dataset_separated_loss_mask_if_existed = config.apply_sft_dataset_separated_loss_mask_if_existed + self.apply_sft_dataset_separated_loss_mask_if_existed = ( + config.apply_sft_dataset_separated_loss_mask_if_existed + ) self.loss_mask_dataset = None super().__init__( - indexed_dataset, dataset_path, indexed_indices, num_samples, index_split, config + indexed_dataset, + dataset_path, + indexed_indices, + num_samples, + index_split, + config, ) self._build_loss_mask_dataset() def _build_loss_mask_dataset(self) -> None: """ - Load Loss Mask IndexedDataset + Load Loss Mask IndexedDataset """ path_prefix = None - base_prefix = '_text_document' - loss_mask_prefix = '_loss_mask_document' + base_prefix = "_text_document" + loss_mask_prefix = "_loss_mask_document" if self.dataset_path.endswith(base_prefix): - path_prefix = self.dataset_path[:-len(base_prefix)] + loss_mask_prefix + path_prefix = self.dataset_path[: -len(base_prefix)] + loss_mask_prefix if self.apply_sft_dataset_separated_loss_mask_if_existed and path_prefix: idx_path = get_idx_path(path_prefix) bin_path = get_bin_path(path_prefix) if os.path.exists(idx_path) and os.path.exists(bin_path): self.loss_mask_dataset = IndexedDataset( - path_prefix, multimodal=False, mmap=self.config.mmap_bin_files) + path_prefix, multimodal=False, mmap=self.config.mmap_bin_files + ) - print(f'> Used Dataset: aux_loss_mask ...') + print(f"> Used Dataset: aux_loss_mask ...") if self.loss_mask_dataset is not None: - assert len(self.dataset) == len(self.loss_mask_dataset), \ - f"Samples are not equal, ({len(self.dataset)} != {len(self.loss_mask_dataset)})" + assert len(self.dataset) == len( + self.loss_mask_dataset + ), f"Samples are not equal, ({len(self.dataset)} != {len(self.loss_mask_dataset)})" def __getitem__(self, idx: Optional[int]) -> Dict[str, torch.Tensor]: """Abstract method implementation @@ -139,11 +157,12 @@ def __getitem__(self, idx: Optional[int]) -> Dict[str, torch.Tensor]: # aux dataset aux_loss_mask, _ = self._query_document_sample_shuffle_indices_aux_dataset( - self.loss_mask_dataset, idx) + self.loss_mask_dataset, idx + ) if aux_loss_mask is not None: - if idx % 100 == 0: - print(f'> Used aux_loss_mask at current sample={idx} ...') - loss_mask = torch.from_numpy(aux_loss_mask).float()[1:].contiguous() + if idx % 100 == 0: + print(f"> Used aux_loss_mask at current sample={idx} ...") + loss_mask = torch.from_numpy(aux_loss_mask).float()[1:].contiguous() if self.config.create_attention_mask: return { @@ -210,11 +229,12 @@ def _query_document_sample_shuffle_indices_aux_dataset( offset = 0 if i > doc_index_beg else doc_index_beg_offset length = None if i < doc_index_end else doc_index_end_offset + 1 sample_parts.append( - aux_dataset.get(self.document_index[i], offset=offset, length=length) + aux_dataset.get( + self.document_index[i], offset=offset, length=length + ) ) return ( numpy.array(numpy.concatenate(sample_parts), dtype=numpy.int64), numpy.array(document_ids, dtype=numpy.int64), ) - diff --git a/flagscale/logger.py b/flagscale/logger.py index d25781f1a..183f9cebf 100644 --- a/flagscale/logger.py +++ b/flagscale/logger.py @@ -12,7 +12,9 @@ def __init__(self, name, level=logging.INFO): for handler in self.logger.handlers[:]: self.logger.removeHandler(handler) - formatter = logging.Formatter('[%(asctime)s %(name)s %(filename)s %(levelname)s] %(message)s') + formatter = logging.Formatter( + "[%(asctime)s %(name)s %(filename)s %(levelname)s] %(message)s" + ) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) @@ -34,12 +36,15 @@ def critical(self, message): def debug(self, message): self.logger.debug(message) + GLOBAL_LOGGER = None + def get_logger(): global GLOBAL_LOGGER if GLOBAL_LOGGER is None: - GLOBAL_LOGGER = Logger('FlagScale') + GLOBAL_LOGGER = Logger("FlagScale") return GLOBAL_LOGGER -logger = get_logger() \ No newline at end of file + +logger = get_logger() diff --git a/flagscale/patches_utils.py b/flagscale/patches_utils.py index 8e86ce1fd..68d619dd5 100644 --- a/flagscale/patches_utils.py +++ b/flagscale/patches_utils.py @@ -1,16 +1,19 @@ -# this file is used for adding tools func to processing patches +# this file is used for adding tools func to processing patches + def add_patches_module(path: str, module_dict: dict): if len(module_dict) == 0: raise Exception(f"module dict is None") import sys + print(f"{path} is being instead, using module {module_dict}") for k in sys.modules: if k.startswith(path): for module_name, module_ in module_dict.items(): import re + class_pattern = re.compile("\w*\.w*") - if not re.match(class_pattern, module_name): + if not re.match(class_pattern, module_name): try: if getattr(sys.modules[k], module_name, None): setattr(sys.modules[k], module_name, module_) @@ -19,5 +22,5 @@ def add_patches_module(path: str, module_dict: dict): else: class_name, fuc_name = module_name.split(".") class_obj = getattr(sys.modules[k], class_name, None) - if class_obj and getattr(class_obj, fuc_name , None): + if class_obj and getattr(class_obj, fuc_name, None): setattr(class_obj, fuc_name, module_)