diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4c42b2f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +FROM golang:1.11 + +## Install Python +#RUN apt-get install python2.7 +## Install node.js +#RUN apt-get install nodejs +## Install CMake (optional, only needed for tests and building Binaryen) +#RUN apt-get install cmake +## Install Java (optional, only needed for Closure Compiler minification) +#RUN apt-get install default-jre +# +## Get the emsdk repo +#RUN git clone https://github.com/juj/emsdk.git +# +## Enter that directory +#WORKDIR ./emsdk +#RUN git pull +## Download and install the latest SDK tools. +#RUN ./emsdk install latest +## Make the "latest" SDK "active" for the current user. (writes ~/.emscripten file) +#RUN ./emsdk activate latest +## Activate PATH and other environment variables in the current terminal +#RUN source ./emsdk_env. + +RUN apt-get install python3 +RUN go get github.com/hpcloud/tail +RUN go get github.com/rsms/gotalk +WORKDIR $GOPATH/src/github.com/geekSiddharth/inout/ + +COPY . $GOPATH/src/github.com/geekSiddharth/inout/ + +CMD ["make", "wasm"] diff --git a/README.md b/README.md index 8718e18..093b2c3 100644 --- a/README.md +++ b/README.md @@ -1,60 +1,60 @@ -# Server - -## Data Structure -- Program -- Job -- Node -- Program List -- Nodes List -- Jobs Queue - -## Functions -- Program To Jobs -- Task Scheduler (Global): Job to Node matching -- Add Node -- Remove Node -- Receive Info From Node - - Receives any message - - forwards it to apt function -- Send Job To Node - - a function that keeps on checking Job array and it sends the scheduled tasks to the scheduled node and make necessary changes to the job -- Result Aggregator - - Runs for each program - - - -### Handlers at teh server side functions -- receive-job: (json) - - It should send the following: - - IsOkay string `json:"is_okay"` - - It should accept: (json) - - JobId string `json:"job_id"` - - ProgramId string `json:"program_id"` - - Wasm string `json:"wasm"` //path of the wasm - - Parameters string `json:"parameters"` // input parameter - -### Handlers at teh server side -- resource-available: returns a json with the following fields - - accepts (json) - - cores - - mem - - returns - - Okay -- resource-used: accepts a json with the fields same as above and send "Okay" -- job-complete: called after a task is done - - accepts (json) - - JobId string `json:"job_id"` - - ProgramId string `json:"program_id"` - - Parameters string `json:"parameters"` - - Result string `json:"result"` - - returns - - "Okay" - - -### Task config - - file_name: `_input` should contains the input parameter - - each line should contain the parameter - - the parameter is passed as it is to the receive-job handler on the browser -### Result Config - I will result of the input taken from `_input` to this file - - file_name: `_output`. - - Each row is like: \t \ No newline at end of file +# SadlyDistributed + + Making volunteer computing easy af. + Zero Setup for volunteers. Easy for developers. + + We compile **your distributed code in almost any language** to **WebAssembly** and then all of it is **executed in a cluster of browsers**. + Anyone who just opens our website in their browser will become share his or her computing power with us. + +## Installation +We <3 Docker. Hence, we have provided a Dockerfile that takes care of all the dependencies of our server code. + +#### Building Docker file +``` + docker build -t sadlyDistributed . +``` + +#### Running our Server(with Prime Number Example) +``` + docker run -it -p 8899:8899 sadlyDistributed +``` + +## Mini-Tutorial +**How can you modify your distributed code for our architecture?** + +We have an example program for finding if a number(it can be as large as you can think of) is prime or not. +Code: [/client/programs/1](/client/programs/1) + +In general distributed computer, same code is replicated on multiple machines and each machine execute the code over a different range of values(or parameters). +The output from multiple machines is combined to produce the final output. Your job, as developer, is to give us your pre-existing distributed code, +in almost any language. The code should take `input from stdin` and gives `output to stdout`. +After that, you need to define how the range of values should be *divided* and how the results from them should be *combined* to produce the final answer. +After you have defined this logic, we scale your code to all the browsers available to us. + +You code interface with our architecture by reading and writing from files. We have two files: +- `input`: You specify the parameters that your code will take in. Each parameters is present in a new line. Our prime number uses the following format: +``` +1 110000 +110000 9990000 +9990000 99900000 +99900000 99900000 + + +``` +Note that our distributed code, `bigprimes.go`, is written to understand these range. In other words, +`go run bigprimes.go 110000 9990000` will tell us weather our hardcoded number is divisible by anything between 110000 and 9990000. +Our architecture read this file continuously(as new input arrives or so called tail reading) and distributes the parameters over free nodes. + +- `output`: Our architecture writes the stdout(or output) received from various machine to this file. +Now it is upto you to utilize the info in this file for combining the result. Our prime number used the following format: +``` +1 110000 false +99900000 99900000 false +110000 9990000 false + +``` +The parameter is same as specified in the input file. And the output is what your code wrote to stdout when given those parameters. +Our architecture updates this file as it receives output from different browsers. + +In our prime number example, we use two file in two different language to show how **our approach is language-agnostic**. +We have used GoLang for the code that needs to be distributed. And we have used python for generating inputs and combining the output. Neat, right? \ No newline at end of file diff --git a/main.go b/main.go new file mode 100644 index 0000000..f349393 --- /dev/null +++ b/main.go @@ -0,0 +1,259 @@ +package main + +import ( + "fmt" + "github.com/geekSiddharth/inout/server/job" + "github.com/geekSiddharth/inout/server/node" + "github.com/geekSiddharth/inout/server/program" + "github.com/geekSiddharth/inout/server/server" + "github.com/hpcloud/tail" + "github.com/rsms/gotalk" + "log" + "net/http" + "os" + "strconv" + "strings" + "time" +) + +var ( + serverThis server.Server // stores most of the info of the server + resultChan = make(chan program.Result, 1000) // results from sockets are fed into this common resultChan + hanlderLimiter = make(chan int, 1) +) + +// struct for sending a job to the client +type SendJob struct { + JobId string `json:"job_id"` + ProgramId string `json:"program_id"` + Wasm string `json:"wasm"` //path of the wasm + Parameters string `json:"parameters"` // input parameter +} + +// pretty useless it is for time being +type NewProgram struct { + ProgramId string `json:"program_id"` +} + +// to know if the job was properly scheduled or not +type JobReceiveResponse struct { + IsOkay string `json:"is_okay"` +} + +// +func scheduler() { + for { + serverThis.RWSocks.RLock() + // go through all the sockets + for sockThis, node_ := range serverThis.Socks { + if node_.IsNew == true { + //fmt.Printf("node avail: %s \n", node_.Sock.Addr()) + // do error checking + serverThis.RWJobs.Lock() + for _, job_ := range serverThis.Jobs { + if job_.IsScheduled == false { + //schedule it + sendJob := SendJob{ + JobId: job_.Id, + ProgramId: job_.ProgramId, + Parameters: job_.Parameters, + Wasm: "/programs/" + job_.ProgramId + "/main.wasm", + } + + jobRecieveResponse := &JobReceiveResponse{} + err := sockThis.Request("receive-job", &sendJob, jobRecieveResponse) + if err != nil { + fmt.Println(err) + } else { + + if strings.Compare(jobRecieveResponse.IsOkay, "Okay") == 0 { + job_.IsScheduled = true + job_.ScheduledTime = time.Now() + job_.Sock = sockThis + + node_.IsNew = false + serverThis.Socks[sockThis] = node_ + } else { + fmt.Printf("Job Receive Response error: %s \n", jobRecieveResponse.IsOkay) + } + + } + break + } // if - not scheduled ends + } // Jobs loops end + serverThis.RWJobs.Unlock() + } else { + //fmt.Printf("node busy: %s \n", node_.Sock.Addr()) + } + } + //time.Sleep(1 * time.Second) + serverThis.RWSocks.RUnlock() + } + +} + +func handleJobComplete(s *gotalk.Sock, r program.Result) (string, error) { + hanlderLimiter <- 1 + fmt.Println("in job complete handler") + + // Making the socket free + serverThis.RWSocks.Lock() + node_ := serverThis.Socks[s] + node_.IsNew = true + serverThis.Socks[s] = node_ + serverThis.RWSocks.Unlock() + + // delete the job and write to a file + serverThis.RWJobs.Lock() + // why not write result to the job + // delete job + delete(serverThis.Jobs, r.JobId) + serverThis.RWJobs.Unlock() + + // write to file + resultChan <- r + + <-hanlderLimiter + + fmt.Printf("Job Completed: %+v\n", r) + return "Okay", nil +} + +// a common field to write output to the corresponding outfile: "./client/programs/"+r.ProgramId+"/output" +func resultChanFeeder() { + for r := range resultChan { + f, err := os.OpenFile("./client/programs/"+r.ProgramId+"/output", os.O_APPEND|os.O_WRONLY, 0600) + if err != nil { + fmt.Printf("UNABLE to write: %s \n", r.ProgramId) + } + + defer f.Close() + fmt.Fprintf(f, "%s\t%s\n", r.Parameters, r.Result) + } +} + +// Creates jobs given the program ID +// Assumes that parameters will be given in: "./client/programs/"+programID+"/input" +// ends this thread +// kills all the socks with this program ID. To be Implemented. +// new jobs are assigned id and saved to serverThis.Jobs map +func programJobCreator(programID string) { + + t, err := tail.TailFile("./client/programs/"+programID+"/input", + tail.Config{ + Follow: true, + ReOpen: true, + Pipe: true, + Poll: true, + }) + + if err != nil { + fmt.Println("Unable to open the file for program id: %d", programID) + log.Fatal(err) + return + } + for line := range t.Lines { + text := strings.Trim(line.Text, "\n\t\r") + + switch text { + case "": + // End creating news jobs + return + case "": + // jon through jobs and delete all the jobs of this kind + // TODO: KILLL MEEEEEE + return + default: + // create a job object + newJob := job.Job{ + ProgramId: programID, + Parameters: text, + CreationTime: time.Now(), + } + + // save the job object to the map + serverThis.RWJobs.Lock() + _id := strconv.FormatInt(int64(len(serverThis.Jobs)+1), 10) + newJob.Id = _id + serverThis.Jobs[_id] = newJob + serverThis.RWJobs.Unlock() + } + } + +} + +// Whenever a new client is added this function is called +func onAcceptConnection(sock *gotalk.Sock) { + fmt.Println("Accepted: ", sock.Addr()) + + // updates: server.socks map + serverThis.RWSocks.Lock() + serverThis.Socks[sock] = node.GetNode(sock) + serverThis.RWSocks.Unlock() + + // closer handler for the socks + sock.CloseHandler = func(s *gotalk.Sock, _ int) { + + serverThis.RWJobs.Lock() + for job_id, job_ := range serverThis.Jobs { + if job_.IsScheduled == true && job_.Sock == s { + job_.IsScheduled = false + job_.Sock = nil + // Note: don't change the creation time + serverThis.Jobs[job_id] = job_ + } + } + serverThis.RWJobs.Unlock() + + serverThis.RWSocks.Lock() + delete(serverThis.Socks, s) + serverThis.RWSocks.Unlock() + fmt.Println("Closed") + } +} + +func main() { + serverThis = server.Server{} + serverThis.Init() + + // starting scheduler in background + go scheduler() + + // writes result to a file + go resultChanFeeder() + + // TODO: Make it dynamic -> Program + go programJobCreator("1") + + // Handle Result + gotalk.Handle("job-complete", handleJobComplete) + + // RESOURCE STUFFS + gotalk.Handle("resource-available", + func(s *gotalk.Sock, r node.Resource) (string, error) { + serverThis.Socks[s].UpdateResourceAvailable(r) + return "Okay", nil + }) + + gotalk.Handle("resource-used", + func(s *gotalk.Sock, r node.Resource) (string, error) { + serverThis.Socks[s].UpdateResourceUsed(r) + return "Okay", nil + }) + + //// Handler for making a new program + //gotalk.Handle("new-program", + // func(s *gotalk.Sock, newProgram NewProgram) (string, error) { + // go programJobCreator(newProgram.ProgramId) + // return "Okay", nil + // }) + + webSocketHandler := gotalk.WebSocketHandler() + webSocketHandler.OnAccept = onAcceptConnection + http.Handle("/gotalk/", webSocketHandler) + http.Handle("/", http.FileServer(http.Dir("./client"))) + err := http.ListenAndServe("0.0.0.0:8899", nil) + if err != nil { + panic("ListenAndServe: " + err.Error()) + } +} diff --git a/old_design_doc.md b/old_design_doc.md new file mode 100644 index 0000000..8718e18 --- /dev/null +++ b/old_design_doc.md @@ -0,0 +1,60 @@ +# Server + +## Data Structure +- Program +- Job +- Node +- Program List +- Nodes List +- Jobs Queue + +## Functions +- Program To Jobs +- Task Scheduler (Global): Job to Node matching +- Add Node +- Remove Node +- Receive Info From Node + - Receives any message + - forwards it to apt function +- Send Job To Node + - a function that keeps on checking Job array and it sends the scheduled tasks to the scheduled node and make necessary changes to the job +- Result Aggregator + - Runs for each program + - + +### Handlers at teh server side functions +- receive-job: (json) + - It should send the following: + - IsOkay string `json:"is_okay"` + - It should accept: (json) + - JobId string `json:"job_id"` + - ProgramId string `json:"program_id"` + - Wasm string `json:"wasm"` //path of the wasm + - Parameters string `json:"parameters"` // input parameter + +### Handlers at teh server side +- resource-available: returns a json with the following fields + - accepts (json) + - cores + - mem + - returns + - Okay +- resource-used: accepts a json with the fields same as above and send "Okay" +- job-complete: called after a task is done + - accepts (json) + - JobId string `json:"job_id"` + - ProgramId string `json:"program_id"` + - Parameters string `json:"parameters"` + - Result string `json:"result"` + - returns + - "Okay" + + +### Task config + - file_name: `_input` should contains the input parameter + - each line should contain the parameter + - the parameter is passed as it is to the receive-job handler on the browser +### Result Config + I will result of the input taken from `_input` to this file + - file_name: `_output`. + - Each row is like: \t \ No newline at end of file