From 89e61c71d1deb1efb545467418921e247656bbd9 Mon Sep 17 00:00:00 2001 From: qijun Date: Mon, 2 Nov 2020 17:53:57 +0800 Subject: [PATCH 1/3] add launch utility --- cmd/launch/main.go | 41 +++++++++++++++++++++++++++++++++++++++ example/allreduce/main.go | 33 +++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 cmd/launch/main.go create mode 100644 example/allreduce/main.go diff --git a/cmd/launch/main.go b/cmd/launch/main.go new file mode 100644 index 00000000..5e936a04 --- /dev/null +++ b/cmd/launch/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "flag" + "fmt" + "os/exec" + "strings" +) + +var numNodes = flag.Int("numNodes", 1, "The number of nodes for distributed training") +var nodeRank = flag.Int("nodeRank", 0, "The rank of the node") +var nprocPerNode = flag.Int("nprocPerNode", 1, "The number of processes on each node") +var masterAddr = flag.String("masterAddr", "127.0.0.1", "The address of master node(rank 0)") +var masterPort = flag.Int("masterPort", 11111, "The port of master node") +var sharedFile = flag.String("sharedFile", "", "The shared file which could be access by all processes") +var trainingCmd = flag.String("trainingCmd", "", "The training command") + +func main() { + flag.Parse() + + commands := []string{} + size := (*numNodes) * (*nprocPerNode) + for i := 0; i < *nprocPerNode; i++ { + rank := (*nprocPerNode)*(*nodeRank) + i + cmd := fmt.Sprintf("%s -rank=%d -size=%d", *trainingCmd, rank, size) + if *masterAddr != "" { + cmd = fmt.Sprintf("%s -masterAddr=%s -masterPort=%d", cmd, *masterAddr, *masterPort) + } else if *sharedFile != "" { + cmd = fmt.Sprintf("%s -sharedFile=%s", cmd, *sharedFile) + } else { + panic("Must set value for masterAddr or sharedFile") + } + commands = append(commands, cmd) + } + + for _, cmd := range commands { + args := strings.Fields(cmd) + cmd := exec.Command(args[0], args[1:]...) + cmd.Start() + } +} diff --git a/example/allreduce/main.go b/example/allreduce/main.go new file mode 100644 index 00000000..49f284d5 --- /dev/null +++ b/example/allreduce/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + + torch "github.com/wangkuiyi/gotorch" +) + +var masterAddr = flag.String("masterAddr", "127.0.0.1", "The address of master node(rank 0)") +var masterPort = flag.Int("masterPort", 11111, "The port of master node") +var rank = flag.Int("rank", 0, "The rank of the current process") +var size = flag.Int("size", 0, "The size of the processes") + +func main() { + flag.Parse() + + f, err := os.OpenFile(fmt.Sprintf("%d.log", *rank), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("error opening file: %v", err) + } + defer f.Close() + log.SetOutput(f) + + a := torch.NewTensor([][]float32{{1, 2}, {3, 4}}) + ts := torch.NewTCPStore(*masterAddr, int64(*masterPort), int64(*size), *rank == 0) + pg := torch.NewProcessGroupGloo(ts, int64(*rank), int64(*size)) + + pg.AllReduce([]torch.Tensor{a}) + log.Println(a) +} From 6769233996f7db6b77363d912b39a7074ec431e1 Mon Sep 17 00:00:00 2001 From: qijun Date: Tue, 3 Nov 2020 15:46:49 +0800 Subject: [PATCH 2/3] add mnist example --- example/allreduce/main.go | 46 ++++++++++++++++++++++++++++++++++----- nn/module.go | 19 ++++++++++++---- 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/example/allreduce/main.go b/example/allreduce/main.go index 49f284d5..6a966032 100644 --- a/example/allreduce/main.go +++ b/example/allreduce/main.go @@ -7,27 +7,63 @@ import ( "os" torch "github.com/wangkuiyi/gotorch" + F "github.com/wangkuiyi/gotorch/nn/functional" + "github.com/wangkuiyi/gotorch/vision/models" ) var masterAddr = flag.String("masterAddr", "127.0.0.1", "The address of master node(rank 0)") var masterPort = flag.Int("masterPort", 11111, "The port of master node") var rank = flag.Int("rank", 0, "The rank of the current process") -var size = flag.Int("size", 0, "The size of the processes") +var size = flag.Int("size", 1, "The size of the processes") + +func getGrads(params []torch.Tensor) (grads []torch.Tensor) { + for _, p := range params { + grads = append(grads, p.Grad()) + } + return +} func main() { flag.Parse() - f, err := os.OpenFile(fmt.Sprintf("%d.log", *rank), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + f, err := os.OpenFile(fmt.Sprintf("%d.log", *rank), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { log.Fatalf("error opening file: %v", err) } defer f.Close() log.SetOutput(f) - a := torch.NewTensor([][]float32{{1, 2}, {3, 4}}) ts := torch.NewTCPStore(*masterAddr, int64(*masterPort), int64(*size), *rank == 0) + defer ts.Close() pg := torch.NewProcessGroupGloo(ts, int64(*rank), int64(*size)) + defer pg.Close() + + net := models.MLP() + opt := torch.SGD(0.01, 0.5, 0, 0, false) + params := net.Parameters() + opt.AddParameters(params) + + log.Println(params[0].Index(0).Item()) + for _, p := range params { + pg.Broadcast([]torch.Tensor{p}) + } + log.Println(params[0].Index(0).Item()) - pg.AllReduce([]torch.Tensor{a}) - log.Println(a) + for i := 0; i < 10; i++ { + data := torch.Rand([]int64{16, 28, 28}, false) + label := torch.Ones([]int64{16}, false).CastTo(torch.Long) + + opt.ZeroGrad() + pred := net.Forward(data) + loss := F.NllLoss(pred, label, torch.Tensor{}, -100, "mean") + loss.Backward() + + grads := getGrads(params) + + log.Println(grads[0].Index(0).Item()) + pg.AllReduceCoalesced(grads) + log.Println(grads[0].Index(0).Item()) + + opt.Step() + } } diff --git a/nn/module.go b/nn/module.go index 5f21caf3..fd9e0031 100644 --- a/nn/module.go +++ b/nn/module.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "reflect" + "sort" torch "github.com/wangkuiyi/gotorch" ) @@ -163,12 +164,21 @@ func (m *Module) NamedBuffers() map[string]torch.Tensor { return r } +func sortKeys(ts map[string]torch.Tensor) (keys []string) { + for k := range ts { + keys = append(keys, k) + } + sort.Strings(keys) + return +} + // Parameters returns trainable parameters (recursively) func (m *Module) Parameters() []torch.Tensor { result := make([]torch.Tensor, 0) n := m.NamedParameters() - for _, v := range n { - result = append(result, v) + keys := sortKeys(n) + for _, k := range keys { + result = append(result, n[k]) } return result } @@ -177,8 +187,9 @@ func (m *Module) Parameters() []torch.Tensor { func (m *Module) Buffers() []torch.Tensor { result := make([]torch.Tensor, 0) n := m.NamedBuffers() - for _, v := range n { - result = append(result, v) + keys := sortKeys(n) + for _, k := range keys { + result = append(result, n[k]) } return result } From 1170e5d49e97a492a6c3d5f0dfd7f9d2b2c9cd1d Mon Sep 17 00:00:00 2001 From: qijun Date: Tue, 3 Nov 2020 16:08:53 +0800 Subject: [PATCH 3/3] clean code --- example/allreduce/main.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/example/allreduce/main.go b/example/allreduce/main.go index 6a966032..7f129ab9 100644 --- a/example/allreduce/main.go +++ b/example/allreduce/main.go @@ -2,9 +2,6 @@ package main import ( "flag" - "fmt" - "log" - "os" torch "github.com/wangkuiyi/gotorch" F "github.com/wangkuiyi/gotorch/nn/functional" @@ -26,13 +23,6 @@ func getGrads(params []torch.Tensor) (grads []torch.Tensor) { func main() { flag.Parse() - f, err := os.OpenFile(fmt.Sprintf("%d.log", *rank), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) - if err != nil { - log.Fatalf("error opening file: %v", err) - } - defer f.Close() - log.SetOutput(f) - ts := torch.NewTCPStore(*masterAddr, int64(*masterPort), int64(*size), *rank == 0) defer ts.Close() pg := torch.NewProcessGroupGloo(ts, int64(*rank), int64(*size)) @@ -43,11 +33,9 @@ func main() { params := net.Parameters() opt.AddParameters(params) - log.Println(params[0].Index(0).Item()) for _, p := range params { pg.Broadcast([]torch.Tensor{p}) } - log.Println(params[0].Index(0).Item()) for i := 0; i < 10; i++ { data := torch.Rand([]int64{16, 28, 28}, false) @@ -59,10 +47,7 @@ func main() { loss.Backward() grads := getGrads(params) - - log.Println(grads[0].Index(0).Item()) pg.AllReduceCoalesced(grads) - log.Println(grads[0].Index(0).Item()) opt.Step() }