Skip to content

Commit

Permalink
Merge pull request #13 from v3io/restore
Browse files Browse the repository at this point in the history
Restore
  • Loading branch information
sasile authored Jul 18, 2017
2 parents 353304d + 9269a18 commit ec51e49
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 79 deletions.
17 changes: 0 additions & 17 deletions example.results

This file was deleted.

10 changes: 0 additions & 10 deletions example.results.executors

This file was deleted.

43 changes: 0 additions & 43 deletions example.results.log

This file was deleted.

6 changes: 6 additions & 0 deletions httpblaster/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (self *Executor) load_request_generator() (chan *fasthttp.Request, bool) {
case request_generators.JSON2KV:
req_gen = &request_generators.Json2KV{}
break
case request_generators.LINE2KV:
req_gen = &request_generators.Line2KvGenerator{}
break
case request_generators.RESTORE:
req_gen = &request_generators.RestoreGenerator{}
break
default:
panic(fmt.Sprintf("unknown request generator %s", self.Workload.Generator))
}
Expand Down
37 changes: 29 additions & 8 deletions httpblaster/request_generators/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,52 @@ const (
LINE2STREAM = "line2stream"
CSV2KV = "csv2kv"
JSON2KV = "json2kv"
LINE2KV = "line2kv"
RESTORE = "restore"
)

type RequestCommon struct {
ch_files chan string
base_uri string
}

var (
contentType string = "application/json"
)

func (self *RequestCommon) PrepareRequest(content_type string,
header_args map[string]string,
method string, uri string,
body string, host string) *fasthttp.Request {
req := fasthttp.AcquireRequest()

header := fasthttp.RequestHeader{}
header.SetContentType(content_type)
req.Header.SetContentType(content_type)
req.Header.SetMethod(method)
req.Header.SetRequestURI(uri)
req.Header.SetHost(host)
for k, v := range header_args {
req.Header.Set(k, v)
}

header.SetMethod(method)
header.SetRequestURI(uri)
header.SetHost(host)
req.AppendBodyString(body)
return req
}

func (self *RequestCommon) PrepareRequestBytes(content_type string,
header_args map[string]string,
method string, uri string,
body []byte, host string) *fasthttp.Request {
req := fasthttp.AcquireRequest()

req.Header.SetContentType(content_type)
req.Header.SetMethod(method)
req.Header.SetRequestURI(uri)
req.Header.SetHost(host)
for k, v := range header_args {
header.Set(k, v)
req.Header.Set(k, v)
}
req.AppendBodyString(body)
header.CopyTo(&req.Header)

req.AppendBody(body)
return req
}

Expand Down
94 changes: 94 additions & 0 deletions httpblaster/request_generators/line2kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package request_generators

import (
"bufio"
"fmt"
//"github.com/nu7hatch/gouuid"
"github.com/v3io/http_blaster/httpblaster/config"
//"github.com/v3io/http_blaster/httpblaster/igz_data"
"github.com/valyala/fasthttp"
"io"
"log"
"os"
"runtime"
"strings"
"sync"
)

type Line2KvGenerator struct {
RequestCommon
workload config.Workload
}

func (self *Line2KvGenerator) UseCommon(c RequestCommon) {

}

func (self *Line2KvGenerator) generate_request(ch_records chan []string,
ch_req chan *fasthttp.Request,
host string, wg *sync.WaitGroup) {
defer wg.Done()
for r := range ch_records {
req := self.PrepareRequest(contentType, self.workload.Header, "PUT",
r[0], r[1], host)
//panic(fmt.Sprintf("%+v",r))
ch_req <- req
}
log.Println("generate_request Done")
}

func (self *Line2KvGenerator) generate(ch_req chan *fasthttp.Request, payload string, host string) {
defer close(ch_req)
var ch_records chan []string = make(chan []string)
wg := sync.WaitGroup{}
ch_files := self.FilesScan(self.workload.Payload)

wg.Add(runtime.NumCPU())
for c := 0; c < runtime.NumCPU(); c++ {
go self.generate_request(ch_records, ch_req, host, &wg)
}

for f := range ch_files {
if file, err := os.Open(f); err == nil {
reader := bufio.NewReader(file)
var i int = 0
for {
address, addr_err := reader.ReadString('\n')
payload, payload_err := reader.ReadString('\n')

if addr_err == nil && payload_err == nil {
ch_records <- []string{strings.TrimSpace(address), string(payload)}
i++
} else if addr_err == io.EOF || payload_err == io.EOF {
break
} else {
log.Fatal(err)
}
}

log.Println(fmt.Sprintf("Finish file scaning, generated %d records", i))
} else {
panic(err)
}
}
close(ch_records)
log.Println("Waiting for generators to finish")
wg.Wait()
log.Println("generators done")
}

func (self *Line2KvGenerator) GenerateRequests(wl config.Workload, tls_mode bool, host string) chan *fasthttp.Request {
self.workload = wl
if self.workload.Header == nil {
self.workload.Header = make(map[string]string)
}
self.workload.Header["X-v3io-function"] = "PutItem"

self.SetBaseUri(tls_mode, host, self.workload.Container, self.workload.Target)

ch_req := make(chan *fasthttp.Request, 1000)

go self.generate(ch_req, self.workload.Payload, host)

return ch_req
}
2 changes: 1 addition & 1 deletion httpblaster/request_generators/line2stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (self *Line2StreamGenerator) generate_request(ch_records chan string,
for r := range ch_records {
sr := igz_data.NewStreamRecord("client", r, u.String(), 0, true)
r := igz_data.NewStreamRecords(sr)
req := self.PrepareRequest(contentType, self.workload.Header, self.workload.Type,
req := self.PrepareRequest(contentType, self.workload.Header, "PUT",
self.base_uri, r.ToJsonString(), host)
ch_req <- req
}
Expand Down
Loading

0 comments on commit ec51e49

Please sign in to comment.