Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port AllReduce of Gloo Library #375

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .circleci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ RUN sudo python3 -m pip install -qq pre-commit cpplint
RUN sudo gem install mdl
RUN go get golang.org/x/lint/golint
RUN sudo cp $GOPATH/bin/* /usr/local/bin/

# install gocv
RUN go get -u -d gocv.io/x/gocv
RUN cd $GOPATH/src/gocv.io/x/gocv && make install

# install gloo
COPY install_gloo.sh /
RUN sudo /install_gloo.sh && sudo rm /install_gloo.sh
6 changes: 6 additions & 0 deletions .circleci/install_gloo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
git clone https://github.com/facebookincubator/gloo.git
cd gloo
mkdir -p build
cd build
cmake ..
make && make install
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ addons:
packages:
- opencv
- pkgconfig
- libuv # to use gloo in macOS, we install libuv
update: true

branches:
Expand All @@ -33,6 +34,7 @@ install:

script:
- export LD_LIBRARY_PATH=$TRAVIS_BUILD_DIR/cgotorch/libtorch/lib
- pushd $HOME && bash $TRAVIS_BUILD_DIR/.circleci/install_gloo.sh && popd
- go generate ./...
- go install ./...
- go test -coverprofile=coverage.txt -covermode=atomic -v -race ./...
6 changes: 4 additions & 2 deletions cgotorch/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ objs := $(srcs:%.cc=%.o)
-O -c $< -o $@

libcgotorch.so: $(objs) ${LIBTORCH_DIR}
${CXX} -L libtorch/lib \
${CXX} -L libtorch/lib \
$(objs) \
-shared \
-o $@ ${INSTALL_NAME} \
-lc10d -lgloo \
-Wl,-rpath,libtorch/lib \
-Wl,-${LOAD} libtorch/lib/libc10.${LIB_SUFFIX} \
-lc10 -ltorch -ltorch_cpu
-lc10 -ltorch -ltorch_cpu \
${DEPS}

clean:
rm -rf *.so *.o
5 changes: 4 additions & 1 deletion cgotorch/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ LOAD="force_load"
LIB_SUFFIX="so"
INSTALL_NAME=""
CUDA_FLAGS=""
DEPS=""

function build_linux_no_cuda() {
CXX="clang++"
Expand Down Expand Up @@ -67,6 +68,7 @@ elif [[ "$OS" == "darwin" ]]; then
LIB_SUFFIX="dylib"
INSTALL_NAME="-install_name @rpath/\$@"
LOAD="all_load"
DEPS=`pkg-config --cflags --libs libuv`
if [[ ! -d "$DIR/$LIBTORCH_DIR" ]]; then
curl -LsO https://download.pytorch.org/libtorch/cpu/libtorch-macos-1.6.0.zip
unzip -qq -o libtorch-macos-1.6.0.zip -d macos
Expand All @@ -84,5 +86,6 @@ make CXX="$CXX" \
GLIBCXX_USE_CXX11_ABI="$GLIBCXX_USE_CXX11_ABI" \
LOAD="$LOAD" \
CUDA_FLAGS="$CUDA_FLAGS" \
-f Makefile -j
DEPS="$DEPS" \
-f Makefile -j `getconf _NPROCESSORS_ONLN`
popd
1 change: 1 addition & 0 deletions cgotorch/cgotorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "cgotorch/cuda.h"
#include "cgotorch/device.h"
#include "cgotorch/functional.h"
#include "cgotorch/gloo.h"
#include "cgotorch/init.h"
#include "cgotorch/memory.h"
#include "cgotorch/optim.h"
Expand Down
109 changes: 109 additions & 0 deletions cgotorch/gloo.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020, GoTorch Authors

#include "cgotorch/gloo.h"

#include <c10d/FileStore.hpp>
#include <c10d/TCPStore.hpp>

#include <memory> // NOLINT
#include <string> // NOLINT
#include <vector> // NOLINT

const char *Gloo_NewFileStore(const char *path, int64_t num_workers,
Store *store) {
try {
*store = new std::shared_ptr<c10d::Store>(
new c10d::FileStore(std::string(path), num_workers));
return nullptr;
} catch (const std::exception &e) {
return exception_str(e.what());
}
}

const char *Gloo_NewTCPStore(const char *addr, int64_t port,
int64_t num_workers, int64_t is_server,
Store *store) {
try {
*store = new std::shared_ptr<c10d::Store>(
new c10d::TCPStore(std::string(addr), port, num_workers, is_server));
return nullptr;
} catch (const std::exception &e) {
return exception_str(e.what());
}
}

const char *Gloo_DeleteStore(Store store) {
try {
store->reset();
return nullptr;
} catch (const std::exception &e) {
return exception_str(e.what());
}
}

const char *Gloo_NewProcessGroupGloo(Store store, int64_t rank, int64_t size,
ProcessGroupGloo *pg) {
try {
auto d = c10d::ProcessGroupGloo::createDefaultDevice();
auto opt = c10d::ProcessGroupGloo::Options();
opt.devices.push_back(d);
*pg = new c10d::ProcessGroupGloo(*store, rank, size, opt);
return nullptr;
} catch (const std::exception &e) {
return exception_str(e.what());
}
}

const char *Gloo_DeleteProcessGroupGloo(ProcessGroupGloo pg) {
try {
delete pg;
return nullptr;
} catch (const std::exception &e) {
return exception_str(e.what());
}
}

const char *Gloo_allreduce(ProcessGroupGloo pg, Tensor *tensors,
int64_t length) {
try {
std::vector<torch::Tensor> ts;
while (ts.size() < length) {
ts.push_back(**tensors++);
}
auto work = pg->allreduce(ts);
work->wait();
return nullptr;
} catch (const std::exception &e) {
return exception_str(e.what());
}
}

const char *Gloo_allreduce_coalesced(ProcessGroupGloo pg, Tensor *tensors,
int64_t length) {
try {
std::vector<torch::Tensor> ts;
while (ts.size() < length) {
ts.push_back(**tensors++);
}
auto work = pg->allreduce_coalesced(ts);
work->wait();
return nullptr;
} catch (const std::exception &e) {
return exception_str(e.what());
}
}

const char *Gloo_broadcast(ProcessGroupGloo pg, Tensor *tensors,
int64_t length) {
try {
std::vector<torch::Tensor> ts;
while (ts.size() < length) {
ts.push_back(**tensors++);
}
auto work = pg->broadcast(ts);
work->wait();
return nullptr;
} catch (const std::exception &e) {
return exception_str(e.what());
}
}
35 changes: 35 additions & 0 deletions cgotorch/gloo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* Copyright 2020, GoTorch Authors */

#pragma once

#include "cgotorch/torchdef.h"

#ifdef __cplusplus
extern "C" {
#endif
const char *Gloo_NewFileStore(const char *path, int64_t num_workers,
Store *store);

const char *Gloo_NewTCPStore(const char *addr, int64_t port,
int64_t num_workers, int64_t is_server,
Store *store);

const char *Gloo_DeleteStore(Store store);

const char *Gloo_NewProcessGroupGloo(Store store, int64_t rank, int64_t size,
ProcessGroupGloo *pg);

const char *Gloo_DeleteProcessGroupGloo(ProcessGroupGloo pg);

const char *Gloo_allreduce(ProcessGroupGloo pg, Tensor *tensors,
int64_t length);

const char *Gloo_allreduce_coalesced(ProcessGroupGloo pg, Tensor *tensors,
int64_t length);

const char *Gloo_broadcast(ProcessGroupGloo pg, Tensor *tensors,
int64_t length);

#ifdef __cplusplus
}
#endif
11 changes: 9 additions & 2 deletions cgotorch/torchdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@

#ifdef __cplusplus
#include <torch/torch.h>
#include <c10d/ProcessGroupGloo.hpp>
#include <c10d/Store.hpp>

#include <vector>
#include <memory> // NOLINT
#include <vector> // NOLINT
extern "C" {
typedef at::Tensor *Tensor;
typedef torch::optim::Optimizer *Optimizer;
typedef torch::data::datasets::MNIST *MNIST;
typedef torch::data::transforms::Normalize<> *Normalize;
typedef torch::Device *Device;
typedef std::vector<char> *ByteBuffer; // NOLINT
typedef std::vector<char> *ByteBuffer; // NOLINT
typedef std::shared_ptr<c10d::Store> *Store; // NOLINT
typedef c10d::ProcessGroupGloo *ProcessGroupGloo;
#else
typedef void *Tensor;
typedef void *Optimizer;
typedef void *MNIST;
typedef void *Normalize;
typedef void *Device;
typedef void *ByteBuffer;
typedef void *Store;
typedef void *ProcessGroupGloo;
#endif
typedef void *CUDAStream;

Expand Down
87 changes: 87 additions & 0 deletions gloo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package gotorch

// #cgo CFLAGS: -I ${SRCDIR}
// #cgo LDFLAGS: -L ${SRCDIR}/cgotorch -Wl,-rpath ${SRCDIR}/cgotorch -lcgotorch
// #cgo LDFLAGS: -L ${SRCDIR}/cgotorch/libtorch/lib -Wl,-rpath ${SRCDIR}/cgotorch/libtorch/lib -lc10 -lgloo -lc10d -ltorch -ltorch_cpu
// #include "cgotorch/cgotorch.h"
import "C"
import "unsafe"

// Store struct
type Store struct {
Store *C.Store
}

// NewFileStore creates a FileStore instance
func NewFileStore(path string, size int64) Store {
var t C.Store
MustNil(unsafe.Pointer(C.Gloo_NewFileStore(C.CString(path), C.int64_t(size), &t)))
return Store{&t}
}

// NewTCPStore creates a TCPStore instance
func NewTCPStore(addr string, port, size int64, isServer bool) Store {
is := 0
if isServer {
is = 1
}
var t C.Store
MustNil(unsafe.Pointer(C.Gloo_NewTCPStore(C.CString(addr), C.int64_t(port), C.int64_t(size), C.int64_t(is), &t)))
return Store{&t}
}

// Close a store
func (s Store) Close() {
MustNil(unsafe.Pointer(C.Gloo_DeleteStore(*s.Store)))
s.Store = nil
}

// ProcessGroupGloo struct
type ProcessGroupGloo struct {
PGG *C.ProcessGroupGloo
}

// NewProcessGroupGloo creates a ProcessGroupGloo instance
func NewProcessGroupGloo(s Store, rank, size int64) ProcessGroupGloo {
var t C.ProcessGroupGloo
MustNil(unsafe.Pointer(C.Gloo_NewProcessGroupGloo(*s.Store, C.int64_t(rank), C.int64_t(size), &t)))
return ProcessGroupGloo{&t}
}

// Close a group
func (pg ProcessGroupGloo) Close() {
MustNil(unsafe.Pointer(C.Gloo_DeleteProcessGroupGloo(*pg.PGG)))
pg.PGG = nil
}

// AllReduce method: todo(qijun) only support sum
func (pg ProcessGroupGloo) AllReduce(tensors []Tensor) {
CT := []C.Tensor{}
for _, t := range tensors {
CT = append(CT, C.Tensor(*t.T))
}
p := (*C.Tensor)(unsafe.Pointer(&CT[0]))
MustNil(unsafe.Pointer(C.Gloo_allreduce(*pg.PGG, p, C.int64_t(len(CT)))))
}

// AllReduceCoalesced method: tensors will be flattened and
// concatenated (coalesced). This means that input tensors
// must have the same device, layout and type.
func (pg ProcessGroupGloo) AllReduceCoalesced(tensors []Tensor) {
CT := []C.Tensor{}
for _, t := range tensors {
CT = append(CT, C.Tensor(*t.T))
}
p := (*C.Tensor)(unsafe.Pointer(&CT[0]))
MustNil(unsafe.Pointer(C.Gloo_allreduce_coalesced(*pg.PGG, p, C.int64_t(len(CT)))))
}

// Broadcast method
func (pg ProcessGroupGloo) Broadcast(tensors []Tensor) {
CT := []C.Tensor{}
for _, t := range tensors {
CT = append(CT, C.Tensor(*t.T))
}
p := (*C.Tensor)(unsafe.Pointer(&CT[0]))
MustNil(unsafe.Pointer(C.Gloo_broadcast(*pg.PGG, p, C.int64_t(len(CT)))))
}
Loading