Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ucc ucx redis rivanna updates #681

Merged
merged 23 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5578219
New branch for merge to cylondata cylon
mstaylor Mar 16, 2023
8872f81
removes db flush and add python script
mstaylor Mar 23, 2023
8203316
adds support for conda environments with a local ucx (supports custom…
mstaylor Sep 8, 2023
3404452
removes cereal library
mstaylor Sep 9, 2023
5dabd61
updates aws dockerfile to include local ucx build
mstaylor Sep 10, 2023
a1ab85b
aws docker support - adds ucx local support (fixes issue with wrong b…
mstaylor Sep 10, 2023
0521735
aws docker support - adds ucx local support - partial - changes to ms…
mstaylor Sep 12, 2023
3bea691
aws docker support - adds ucx local support - partial - changes to ms…
mstaylor Sep 12, 2023
861c5a9
aws docker support - adds ucx local support - partial - boto3 test co…
mstaylor Sep 12, 2023
d1fa127
Merge branch 'main' into ucc-ucx-redis-local-ucx
arupcsedu Sep 15, 2023
8d4cfd3
initial commit - fixes github action build issues
mstaylor Nov 12, 2023
d27dab6
partial - merge with rivanna-ucc-ucx-redis
mstaylor Nov 13, 2023
44b1687
partial - merge with rivanna-ucc-ucx-redis
mstaylor Nov 13, 2023
e0a5acc
partial - merge with rivanna-ucc-ucx-redis
mstaylor Nov 13, 2023
ec3f697
partial - merge with rivanna-ucc-ucx-redis
mstaylor Nov 13, 2023
bfe75da
partial - merge with rivanna-ucc-ucx-redis
mstaylor Nov 13, 2023
1227723
partial - merge with rivanna-ucc-ucx-redis
mstaylor Nov 13, 2023
0d329a7
Merge remote-tracking branch 'originCylon/main' into ucc-ucx-redis-ri…
mstaylor Nov 13, 2023
55dfcc3
Merge remote-tracking branch 'fork/ucc-ucx-redis-rivanna-updates' int…
mstaylor Nov 13, 2023
f9b67aa
partial - removes macos workflow
mstaylor Nov 16, 2023
448acb3
Merge remote-tracking branch 'fork/ucc-ucx-redis-rivanna-updates' int…
mstaylor Nov 16, 2023
7989a39
partial - adds aws lambda dockerfile
mstaylor Nov 19, 2023
cdf192f
partial - updates macos action
mstaylor Nov 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions .github/workflows/conda-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- os: ubuntu-20.04
gcc: 9
ucc: "master"
ucx: "override-remote-address3"

steps:
- uses: actions/checkout@v2
Expand All @@ -39,7 +40,7 @@ jobs:
- uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: cylon_dev
environment-file: conda/environments/cylon.yml
environment-file: conda/environments/cylon_NoUCX.yml

- name: Activate conda
run: conda activate cylon_dev
Expand All @@ -51,17 +52,24 @@ jobs:
cmake .. -DBUILD_SHARED_LIBS=1 -DUSE_MPI=1 -DCMAKE_INSTALL_PREFIX=$HOME/gloo/install
make install

- name: Install UCX
run: |
git clone --single-branch -b ${{ matrix.ucx }} https://github.com/mstaylor/ucx.git $HOME/ucx
cd $HOME/ucx
./autogen.sh
./configure --prefix=$HOME/ucx/install --without-go
make install

- name: Install UCC
run: |
git clone --single-branch -b ${{ matrix.ucc }} https://github.com/openucx/ucc.git $HOME/ucc
cd $HOME/ucc
echo "conda ucx: $(conda list | grep ucx)"
./autogen.sh
./configure --prefix=$HOME/ucc/install --with-ucx=$CONDA/envs/cylon_dev
./configure --prefix=$HOME/ucc/install --with-ucx=$HOME/ucx/install
make install

- name: Build cylon, pycylon and run cpp test
run: python build.py -cmake-flags="-DCYLON_UCX=1 -DCYLON_GLOO=1 -DGLOO_INSTALL_PREFIX=$HOME/gloo/install -DCYLON_UCC=1 -DUCC_INSTALL_PREFIX=$HOME/ucc/install" -ipath="$HOME/cylon/install" --cpp --python --test
run: python build.py -cmake-flags="-DCYLON_UCX=1 -DCYLON_GLOO=1 -DGLOO_INSTALL_PREFIX=$HOME/gloo/install -DCYLON_UCC=1 -DUCC_INSTALL_PREFIX=$HOME/ucc/install -DUCX_INSTALL_PREFIX=$HOME/ucx/install" -ipath="$HOME/cylon/install" --cpp --python --test

- name: Run pytest
run: python build.py -ipath="$HOME/cylon/install" --pytest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
fail-fast: false
matrix:
include:
- os: macos-11
- os: macos-latest

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion aws/scripts/Join_Weak_Scaling.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 19,
"execution_count": 8,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
Expand Down
473 changes: 473 additions & 0 deletions aws/scripts/Join_Weak_Scaling_round2.ipynb

Large diffs are not rendered by default.

71 changes: 65 additions & 6 deletions aws/scripts/cylon_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def cylon_join(data=None):
df1 = DataFrame(pd.DataFrame(data1).add_prefix("col"))
df2 = DataFrame(pd.DataFrame(data2).add_prefix("col"))

timing = {'scaling': [], 'world': [], 'rows': [], 'max_value': [], 'rank': [], 'avg_t':[], 'tot_l':[]}

for i in range(data['it']):
env.barrier()
StopWatch.start(f"join_{i}_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -100,14 +102,28 @@ def cylon_join(data=None):
if env.rank == 0:
avg_t = sum_t / env.world_size
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l)
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l, file=open(data['output_summary_filename'], 'a'))
timing['scaling'].append(data['scaling'])
timing['world'].append(env.world_size)
timing['rows'].append(num_rows)
timing['max_value'].append(max_val)
timing['rank'].append(i)
timing['avg_t'].append(avg_t)
timing['tot_l'].append(tot_l)
StopWatch.stop(f"join_{i}_{data['host']}_{data['rows']}_{data['it']}")

StopWatch.stop(f"join_total_{data['host']}_{data['rows']}_{data['it']}")

if env.rank == 0:
StopWatch.benchmark(tag=str(data), filename=data['output_scaling_filename'])
upload_file(file_name=data['output_scaling_filename'], bucket=data['s3_bucket'], object_name=data['s3_stopwatch_object_name'])


if os.path.exists(data['output_summary_filename']):
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='a', index=False, header=False)
else:
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='w', index=False, header=True)


upload_file(file_name=data['output_summary_filename'], bucket=data['s3_bucket'],
object_name=data['s3_summary_object_name'])

Expand Down Expand Up @@ -151,6 +167,8 @@ def cylon_sort(data=None):
if env.rank == 0:
print("Task# ", data['task'])

timing = {'scaling': [], 'world': [], 'rows': [], 'max_value': [], 'rank': [], 'avg_t': [], 'tot_l': []}

for i in range(data['it']):
env.barrier()
StopWatch.start(f"sort_{i}_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -166,8 +184,15 @@ def cylon_sort(data=None):
if env.rank == 0:
avg_t = sum_t / env.world_size
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l)
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l,
file=open(data['output_summary_filename'], 'a'))
timing['scaling'].append(data['scaling'])
timing['world'].append(env.world_size)
timing['rows'].append(num_rows)
timing['max_value'].append(max_val)
timing['rank'].append(i)
timing['avg_t'].append(avg_t)
timing['tot_l'].append(tot_l)
#print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l,
# file=open(data['output_summary_filename'], 'a'))


StopWatch.stop(f"sort_{i}_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -178,9 +203,14 @@ def cylon_sort(data=None):
StopWatch.benchmark(tag=str(data), filename=data['output_scaling_filename'])
upload_file(file_name=data['output_scaling_filename'], bucket=data['s3_bucket'],
object_name=data['s3_stopwatch_object_name'])

if os.path.exists(data['output_summary_filename']):
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='a', index=False, header=False)
else:
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='w', index=False, header=True)

upload_file(file_name=data['output_summary_filename'], bucket=data['s3_bucket'],
object_name=data['s3_summary_object_name'])
redis_context.clearDB()


def cylon_slice(data=None):
Expand Down Expand Up @@ -222,6 +252,7 @@ def cylon_slice(data=None):
if env.rank == 0:
print("Task# ", data['task'])

timing = {'scaling': [], 'world': [], 'rows': [], 'max_value': [], 'rank': [], 'avg_t': [], 'tot_l': []}
for i in range(data['it']):
env.barrier()
StopWatch.start(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -239,8 +270,15 @@ def cylon_slice(data=None):
if env.rank == 0:
avg_t = sum_t / env.world_size
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l)
print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l,
file=open(data['output_summary_filename'], 'a'))
#print("### ", data['scaling'], env.world_size, num_rows, max_val, i, avg_t, tot_l,
# file=open(data['output_summary_filename'], 'a'))
timing['scaling'].append(data['scaling'])
timing['world'].append(env.world_size)
timing['rows'].append(num_rows)
timing['max_value'].append(max_val)
timing['rank'].append(i)
timing['avg_t'].append(avg_t)
timing['tot_l'].append(tot_l)
StopWatch.stop(f"slice_{i}_{data['host']}_{data['rows']}_{data['it']}")

StopWatch.stop(f"slice_total_{data['host']}_{data['rows']}_{data['it']}")
Expand All @@ -249,6 +287,12 @@ def cylon_slice(data=None):
StopWatch.benchmark(tag=str(data), filename=data['output_scaling_filename'])
upload_file(file_name=data['output_scaling_filename'], bucket=data['s3_bucket'],
object_name=data['s3_stopwatch_object_name'])

if os.path.exists(data['output_summary_filename']):
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='a', index=False, header=False)
else:
pd.DataFrame(timing).to_csv(data['output_summary_filename'], mode='w', index=False, header=True)

upload_file(file_name=data['output_summary_filename'], bucket=data['s3_bucket'],
object_name=data['s3_summary_object_name'])

Expand All @@ -257,28 +301,43 @@ def cylon_slice(data=None):

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="cylon scaling")

parser.add_argument('-n', dest='rows', type=int, **environ_or_required('ROWS'))

parser.add_argument('-i', dest='it', type=int, **environ_or_required('PARTITIONS')) #10

parser.add_argument('-u', dest='unique', type=float, **environ_or_required('UNIQUENESS'), help="unique factor") #0.9

parser.add_argument('-s', dest='scaling', type=str, **environ_or_required('SCALING'), choices=['s', 'w'],
help="s=strong w=weak") #w

parser.add_argument('-o', dest='operation', type=str, **environ_or_required('CYLON_OPERATION'), choices=['join', 'sort', 'slice'],
help="s=strong w=weak") # w

parser.add_argument('-w', dest='world_size', type=int, help="world size", **environ_or_required('WORLD_SIZE'))

parser.add_argument("-r", dest='redis_host', type=str, help="redis address, default to 127.0.0.1",
**environ_or_required('REDIS_HOST')) #127.0.0.1

parser.add_argument("-p1", dest='redis_port', type=int, help="name of redis port", **environ_or_required('REDIS_PORT')) #6379

parser.add_argument('-f1', dest='output_scaling_filename', type=str, help="Output filename for scaling results",
**environ_or_required('OUTPUT_SCALING_FILENAME'))

parser.add_argument('-f2', dest='output_summary_filename', type=str, help="Output filename for scaling summary results",
**environ_or_required('OUTPUT_SUMMARY_FILENAME'))

parser.add_argument('-b', dest='s3_bucket', type=str, help="S3 Bucket Name", **environ_or_required('S3_BUCKET'))

parser.add_argument('-o1', dest='s3_stopwatch_object_name', type=str, help="S3 Object Name", **environ_or_required('S3_STOPWATCH_OBJECT_NAME'))

parser.add_argument('-o2', dest='s3_summary_object_name', type=str, help="S3 Object Name",
**environ_or_required('S3_SUMMARY_OBJECT_NAME'))

args = vars(parser.parse_args())

args['host'] = "aws"

if args['operation'] == 'join':
print("executing cylon join operation")
cylon_join(args)
Expand Down
Loading
Loading