Skip to content

Commit

Permalink
Fix cloud multi-nodes
Browse files Browse the repository at this point in the history
* Copy ssh key to allow connections from master to workers
* Use local ip for manager's ip such that workers can find it and connect to it
* Fix incompatibility between pandas and numpy 2.0.0
  • Loading branch information
satyaog committed Aug 15, 2024
1 parent 0f34dd2 commit 496128f
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 36 deletions.
18 changes: 13 additions & 5 deletions .github/workflows/cloud-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:

env:
MILABENCH_CONFIG: "config/standard.yaml"
MILABENCH_SYSTEM: "config/cloud-system.yaml"
MILABENCH_SYSTEM: "config/cloud-multinodes-system.yaml"
MILABENCH_BASE: "output"
MILABENCH_ARGS: ""
MILABENCH_DASH: "no"
Expand Down Expand Up @@ -98,15 +98,18 @@ jobs:
- name: install benchmarks
run: |
poetry run milabench install --variant ${{ matrix.arch }}
poetry run milabench install --variant ${{ matrix.arch }} \
--exclude llm-full-mp-gpus,llm-full-mp-nodes,llm-lora-ddp-gpus,llm-lora-ddp-nodes,llm-lora-mp-gpus,llm-lora-single
- name: prepare benchmarks
run: |
poetry run milabench prepare
poetry run milabench prepare \
--exclude llm-full-mp-gpus,llm-full-mp-nodes,llm-lora-ddp-gpus,llm-lora-ddp-nodes,llm-lora-mp-gpus,llm-lora-single
- name: run benchmarks
run: |
poetry run milabench run
poetry run milabench run \
--exclude llm-full-mp-gpus,llm-full-mp-nodes,llm-lora-ddp-gpus,llm-lora-ddp-nodes,llm-lora-mp-gpus,llm-lora-single
- name: Summary
run: |
Expand All @@ -118,6 +121,11 @@ jobs:
env:
GITHUB_TOKEN: ${{ github.token }}

- name: DEBUG state file
if: always()
run: |
cat /tmp/milabench/covalent_venv/lib/python*/site-packages/covalent_azure_plugin/infra/*.tfstate
- name: teardown cloud
if: always()
run: |
Expand All @@ -130,7 +138,7 @@ jobs:
--run-on ${{ matrix.run_on }} \
--all
- name: debug logs
- name: DEBUG logs
if: always()
run: |
cat ~/.cache/covalent/covalent_ui.log
4 changes: 4 additions & 0 deletions config/cloud-multinodes-system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ system:
- name: manager
# Use 1.1.1.1 as an ip placeholder
ip: 1.1.1.1
port: 5000
# Use this node as the master node or not
main: true
# User to use in remote milabench operations
Expand All @@ -21,11 +22,14 @@ system:
username: ubuntu
size: Standard_NC24ads_A100_v4
location: eastus2
disk_size: 512
azure__a100_x2:
username: ubuntu
size: Standard_NC48ads_A100_v4
location: eastus2
disk_size: 512
azure__a10_x2:
username: ubuntu
size: Standard_NV72ads_A10_v5
location: eastus2
disk_size: 512
8 changes: 8 additions & 0 deletions config/cloud-system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@ system:
username: ubuntu
size: Standard_NC24ads_A100_v4
location: eastus2
disk_size: 512
azure__a100_x2:
username: ubuntu
size: Standard_NC48ads_A100_v4
location: eastus2
disk_size: 512
azure__a10:
username: ubuntu
size: Standard_NV36ads_A10_v5
location: eastus2
disk_size: 512
azure__a10_x2:
username: ubuntu
size: Standard_NV72ads_A10_v5
location: eastus2
disk_size: 512
12 changes: 6 additions & 6 deletions config/examples/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ _defaults:

test:
inherits: _defaults
group: test_remote
install_group: test_remote
definition: ../../benchmarks/_template
group: simple
install_group: test
definition: ../../benchmarks/_templates/simple
plan:
method: njobs
n: 1

testing:
inherits: _defaults
definition: ../../benchmarks/_template
group: test_remote_2
install_group: test_remote_2
definition: ../../benchmarks/_templates/stdout
group: stdout
install_group: test
plan:
method: njobs
n: 1
13 changes: 10 additions & 3 deletions milabench/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,13 @@ def _get_main_and_workers(self):
def _argv(self, **_) -> List:
manager, nodes = self._get_main_and_workers()

# Find local ip such that workers can connect to the port
for manager_ip in manager["ipaddrlist"]:
if ":" in manager_ip or manager_ip == "127.0.0.1":
continue
if all(str.isnumeric(n) for n in manager_ip.split(".")):
break

num_machines = max(1, len(nodes) + 1)

# Cant do that maybe this run is constrained
Expand Down Expand Up @@ -976,9 +983,9 @@ def _argv(self, **_) -> List:
f"--machine_rank={self.rank}",
f"--num_machines={num_machines}",
*deepspeed_argv,
f"--gradient_accumulation_steps={self.pack.config.get('gradient_accumulation_steps', 1)}",
f"--num_cpu_threads_per_process={cpu_per_process}",
f"--main_process_ip={manager['ip']}",
f"--gradient_accumulation_steps={self.pack.config['gradient_accumulation_steps']}",
f"--num_cpu_threads_per_process={self.pack.config['argv']['--cpus_per_gpu']}",
f"--main_process_ip={manager_ip}",
f"--main_process_port={manager['port']}",
f"--num_processes={nproc}",
*self.accelerate_argv,
Expand Down
1 change: 1 addition & 0 deletions milabench/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def build_config(*config_files):
for layer in _config_layers(config_files):
all_configs = merge(all_configs, layer)

all_configs.setdefault("*", {})
all_configs["*"]["hash"] = compute_config_hash(all_configs)

all_configs = build_matrix_bench(all_configs)
Expand Down
5 changes: 0 additions & 5 deletions milabench/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@
import os
import sys

import yaml

from milabench.fs import XPath

from . import ROOT_FOLDER
from .commands import (
CmdCommand,
Command,
ListCommand,
SCPCommand,
SequenceCommand,
SSHCommand,
VoidCommand,
Expand Down
75 changes: 61 additions & 14 deletions milabench/scripts/covalent/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,80 @@ def _popen(cmd, *args, _env=None, **kwargs):
return_code = 0
try:
if args.setup:
dispatch_id = ct.dispatch(
ct.lattice(executor.get_connection_attributes), disable_run=False
)()

result = ct.get_result(dispatch_id=dispatch_id, wait=True).result
result = ct.dispatch_sync(
ct.lattice(executor.get_connection_attributes)
)().result

assert result and result[0]

all_connection_attributes, _ = result
master_host:str = None
for hostname, connection_attributes in all_connection_attributes.items():
print(f"hostname::>{hostname}")
for attribute,value in connection_attributes.items():
if attribute == "hostname":
continue
print(f"{attribute}::>{value}")

if argv:
dispatch_id = ct.dispatch(
ct.lattice(
lambda:ct.electron(_popen, executor=executor)(argv)
),
disable_run=False
)()
master_host = master_host or hostname

if len(all_connection_attributes) > 1:
# Add master node to known host to avoid unknown host error
# The authenticity of host '[hostname] ([IP address])' can't be established.
new_host = subprocess.run(
["ssh-keyscan", master_host],
stdout=subprocess.PIPE,
check=True
).stdout.decode("utf8")
known_hosts = pathlib.Path("~/.ssh/known_hosts").expanduser()
with known_hosts.open("at") as _f:
_f.write(new_host)

# Add ssh file to master node to allow connections to worker
# nodes
ssh_key_file = all_connection_attributes[master_host]["ssh_key_file"]
fn = pathlib.Path(ssh_key_file)
result = ct.dispatch_sync(
ct.lattice(executor.cp_to_remote)
)(f".ssh/{fn.name.split('.')[0]}", str(fn))

assert result.status == ct.status.COMPLETED

result = ct.get_result(dispatch_id=dispatch_id, wait=True).result
if argv:
result = ct.dispatch_sync(
ct.lattice(executor.list_running_instances)
)().result

assert result

dispatch_ids = set()
for connection_attributes in result.get(
(executor.state_prefix, executor.state_id),
{"env": None}
).values():
kwargs = {
**_get_executor_kwargs(args),
**connection_attributes
}
del kwargs["env"]

_executor:ct.executor.BaseExecutor = executor_cls(**kwargs)

dispatch_ids.add(
ct.dispatch(
ct.lattice(
lambda:ct.electron(_popen, executor=_executor)(argv)
),
disable_run=False
)()
)

for dispatch_id in dispatch_ids:
result = ct.get_result(dispatch_id=dispatch_id, wait=True).result

_return_code, _, _ = result if result is not None else (1, "", "")
return_code = return_code or _return_code

return_code, _, _ = result if result is not None else (1, "", "")
finally:
if args.teardown:
result = executor.stop_cloud_instance().result
Expand Down
4 changes: 4 additions & 0 deletions milabench/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ def _resolve_ip(ip):
if not offline:
# Resolve the IP
try:
# Workaround error with `gethostbyaddr` on azure DNS (like
# `inmako.eastus2.cloudapp.azure.com`). A proper fix might be a
# correct network config in terraform.
# socket.herror: [Errno 1] Unknown host
hostname, aliaslist, ipaddrlist = socket.gethostbyname_ex(ip)
lazy_raise = None

Expand Down
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ blessed = "^1.19.1"
pathspec = "^0.9.0"
cp-template = "^0.3.0"
pandas = ">=1.4.2"
numpy = ">=1.23.0,<2.0.0"
# Work around for compatibility issue between numpy 2.0.0 and pandas
# https://github.com/numpy/numpy/issues/26710
numpy = "^1.23.0"
pynvml = "^11.4.1"
tqdm = "^4.64.1"
pip-tools = "^7.4.1"
Expand Down

0 comments on commit 496128f

Please sign in to comment.