diff --git a/examples/csv_2_tsdb_example.toml b/examples/csv_2_tsdb_example.toml new file mode 100644 index 0000000..a2b8dde --- /dev/null +++ b/examples/csv_2_tsdb_example.toml @@ -0,0 +1,29 @@ +title = "Workload example" + +[global] + Duration = "160s" + server="192.168.206.22" + port="44001" + TLSMode=false + [global.StatusCodesAcceptance] + 200 = 100.0 # upto 100% return status of 200 + 204 = 100.0 + 205 = 100.0 + +[workloads] + [workloads.load_to_t1] + name="load_to_t1" + Generator="csv2tsdb" + container="1" + Target="t1/" + workers=1 + payload="examples/payloads/order-book-sample.csv" + schema="examples/schemas/tsdb_schema_example.json" + + + + + + + + diff --git a/examples/schemas/tsdb_schema_example.json b/examples/schemas/tsdb_schema_example.json new file mode 100644 index 0000000..4cfed31 --- /dev/null +++ b/examples/schemas/tsdb_schema_example.json @@ -0,0 +1,56 @@ +{ + "SETTINGS": { + "Format": "csv", + "Separator": "|", + "TSDBTime": "Time_Sequence", + "TSDBName" : "Broker_ID", + "TSDBValue" : "Total_Orders", + "TSDBLables" : "Status_Flag,Active_Status" + }, + "COLUMNS": [ + { + "Index": 2, + "Type": "S", + "Name": "Time_Sequence", + "Nullable": true + }, + { + "Index": 3, + "Type": "N", + "Name": "Exchange_Transaction_ID", + "Nullable": true + }, + { + "Index": 4, + "Type": "S", + "Name": "Status_Flag", + "Nullable": true + }, + { + "Index": 10, + "Type": "N", + "Name": "Total_Orders", + "Nullable": true + }, + { + "Index": 18, + "Type": "S", + "Name": "Active_Status", + "Nullable": true + }, + { + "Index": 20, + "Type": "N", + "Name": "Total_New_Orders", + "Nullable": true + }, + { + "Index": 34, + "Type": "S", + "Name": "Broker_ID", + "Nullable": true + } + + + ] +} diff --git a/http_blaster.go b/http_blaster.go index 088ea89..5c5fb05 100644 --- a/http_blaster.go +++ b/http_blaster.go @@ -34,7 +34,6 @@ import ( "sync" "sync/atomic" "time" - //"github.com/v3io/http_blaster/httpblaster/histogram" "sort" ) diff --git a/httpblaster/executor.go b/httpblaster/executor.go index 030ad90..02e88ac 100644 --- a/httpblaster/executor.go +++ b/httpblaster/executor.go @@ -85,6 +85,8 @@ func (self *Executor) load_request_generator() (chan *request_generators.Request release_req = false } break + + case request_generators.LINE2STREAM: req_gen = &request_generators.Line2StreamGenerator{} break @@ -115,6 +117,9 @@ func (self *Executor) load_request_generator() (chan *request_generators.Request case request_generators.STREAM_GET: req_gen = &request_generators.StreamGetGenerator{} ch_response = make(chan *request_generators.Response) + case request_generators.CSV2TSDB: + req_gen = &request_generators.Csv2TSDB{} + break default: panic(fmt.Sprintf("unknown request generator %s", self.Workload.Generator)) } diff --git a/httpblaster/http_blaster_test.go b/httpblaster/http_blaster_test.go index 82076f5..ad512a2 100644 --- a/httpblaster/http_blaster_test.go +++ b/httpblaster/http_blaster_test.go @@ -1,4 +1,11 @@ package httpblaster +/* +import ( + "testing" + "io/ioutil" + "path" +) + import ( "bytes" @@ -204,3 +211,4 @@ func Test_PUT_Multi_Worker(t *testing.T) { } t.Logf("total number of requests %d", uint64(count)*wl.req_count) } +*/ \ No newline at end of file diff --git a/httpblaster/igz_data/emd_schema_parser.go b/httpblaster/igz_data/emd_schema_parser.go index b0edcde..4b15c47 100644 --- a/httpblaster/igz_data/emd_schema_parser.go +++ b/httpblaster/igz_data/emd_schema_parser.go @@ -8,8 +8,7 @@ import ( "github.com/nu7hatch/gouuid" "github.com/v3io/http_blaster/httpblaster/config" "io/ioutil" - "regexp" - //"strconv" + "regexp" "strings" ) @@ -24,6 +23,11 @@ type SchemaSettings struct { KeyFields string KeyFormat string UpdateFields string + TSDBName string + TSDBTime string + TSDBValue string + TSDBLables string + } type SchemaValue struct { @@ -47,11 +51,21 @@ type EmdSchemaParser struct { update_fields_indexs []int updateMode string updateExpression string + tsdb_name string + tsdb_name_index int + tsdb_time string + tsdb_time_index int + tsdb_value string + tsdb_value_index int + tsdb_attributes string + tsdb_attributes_map map[string]int + } func (self *EmdSchemaParser) LoadSchema(file_path, update_mode, update_expression string) error { self.values_map = make(map[int]SchemaValue) + self.tsdb_attributes_map = make(map[string]int) plan, _ := ioutil.ReadFile(file_path) err := json.Unmarshal(plan, &self.JsonSchema) if err != nil { @@ -64,11 +78,19 @@ func (self *EmdSchemaParser) LoadSchema(file_path, update_mode, update_expressio self.schema_key_fields = settings.KeyFields self.updateMode = update_mode self.updateExpression = update_expression + self.tsdb_time =settings.TSDBTime + self.tsdb_name =settings.TSDBName + self.tsdb_value =settings.TSDBValue + self.tsdb_attributes = settings.TSDBLables + for _, v := range columns { self.values_map[v.Index] = v } self.GetKeyIndexes() + self.MapTSDBLablesIndexes() + self.GetTSDBNameIndex() + self.GetTSDBValueIndex() if len(self.updateExpression) > 0 { self.GetUpdateExpressionIndexes() } @@ -101,6 +123,32 @@ func (self *EmdSchemaParser) GetKeyIndexes() { } } +func (self *EmdSchemaParser) GetTSDBNameIndex() { + for _, v := range self.values_map { + if v.Name == self.tsdb_name { + self.tsdb_name_index = v.Index + } + } + } + +func (self *EmdSchemaParser) GetTSDBValueIndex() { + for _, v := range self.values_map { + if v.Name == self.tsdb_value { + self.tsdb_value_index = v.Index + } + } +} + +func (self *EmdSchemaParser) MapTSDBLablesIndexes() { + attributes := strings.Split(self.tsdb_attributes, ",") + for _, att := range attributes { + for _, v := range self.values_map { + if v.Name == att { + self.tsdb_attributes_map[att] = v.Index } + } + } +} + func (self *EmdSchemaParser) GetFieldsIndexes(fields, delimiter string) []int { keys := strings.Split(fields, delimiter) indexArray := make([]int, 1) @@ -123,7 +171,8 @@ func (self *EmdSchemaParser) KeyFromCSVRecord(vals []string) string { } //when 1 key, return the key if len(self.schema_key_indexs) == 1 { - return vals[0] + //fix bug of returning always key in position 0 + return vals[self.schema_key_indexs[0]] } //when more the one key, generate formatted key var keys []interface{} @@ -134,19 +183,49 @@ func (self *EmdSchemaParser) KeyFromCSVRecord(vals []string) string { return key } +func (self *EmdSchemaParser) nameIndexFromCSVRecord(vals []string) string { + //when no keys, generate random + if len(self.schema_key_indexs) == 0 { + u, _ := uuid.NewV4() + return u.String() + } + //when 1 key, return the key + if len(self.schema_key_indexs) == 1 { + //fix bug of returning always key in position 0 + return vals[self.schema_key_indexs[0]] + } + //when more the one key, generate formatted key + var keys []interface{} + for _, i := range self.schema_key_indexs { + keys = append(keys, vals[i]) + } + key := fmt.Sprintf(self.schema_key_format, keys...) + return key +} + + + func (self *EmdSchemaParser) EmdFromCSVRecord(vals []string) string { emd_item := NewEmdItem() emd_item.InsertKey("key", T_STRING, self.KeyFromCSVRecord(vals)) for i, v := range vals { - err, igz_type, value := ConvertValue(self.values_map[i].Type, v) - if err != nil { - panic(fmt.Sprintf("conversion error %d %v %v", i, v, self.values_map[i])) + if val , ok :=self.values_map[i] ; ok { + err, igz_type, value := ConvertValue(val.Type, v) + if err != nil { + panic(fmt.Sprintf("conversion error %d %v %v", i, v, self.values_map[i])) + } + emd_item.InsertItemAttr(self.values_map[i].Name, igz_type, value) } - emd_item.InsertItemAttr(self.values_map[i].Name, igz_type, value) } return string(emd_item.ToJsonString()) } +func (self *EmdSchemaParser) TSDBFromCSVRecord(vals []string) string { + tsdb_item := NewTSDBItem() + tsdb_item.GenerateStruct(vals,self) + return string(tsdb_item.ToJsonString()) +} + func (self *EmdSchemaParser) EmdUpdateFromCSVRecord(vals []string) string { emd_update := NewEmdItemUpdate() //emd_update.InsertKey("key", T_STRING, self.KeyFromCSVRecord(vals)) diff --git a/httpblaster/igz_data/igz_tsdb_item.go b/httpblaster/igz_data/igz_tsdb_item.go new file mode 100644 index 0000000..e5df0c1 --- /dev/null +++ b/httpblaster/igz_data/igz_tsdb_item.go @@ -0,0 +1,168 @@ +package igz_data + +import ( + "encoding/json" + "errors" + "fmt" + log "github.com/sirupsen/logrus" + "strconv" + "github.com/v3io/v3io-tsdb/pkg/utils" + "time" +) + +type IgzTSDBItem struct { + Lset utils.Labels + Time string + Value float64 +} + +func (self *IgzTSDBItem) GenerateStruct(vals []string,parser *EmdSchemaParser){ + //self.InsertTSDBName(parser.tsdb_attributes_map,vals,T_STRING,vals[parser.tsdb_name_index]) + self.InsertTSDBName(vals,parser) + self.InsertTime(vals ,parser) + self.InsertValue(vals[parser.tsdb_value_index]) +} + + +func (self *IgzTSDBItem) ToJsonString() string { + body, _ := json.Marshal(self) + return string(body) +} + +//func (self *IgzTSDBItem) InsertTSDBName(attributes_map map[string]int,vals []string,value_type IgzType, value interface{}) error { +func (self *IgzTSDBItem) InsertTSDBName(vals []string,parser *EmdSchemaParser) error { + for i, v := range parser.values_map { + if v.Name == parser.tsdb_time { + parser.tsdb_time_index = i + } + } + input := vals[parser.tsdb_name_index] + //add validation on time + self.Time= input + self.Lset = utils.Labels{{Name: "__name__",Value:input}} + for key, val := range parser.tsdb_attributes_map { + lable := utils.Label{Name: key, Value: vals[val]} + self.Lset= append(self.Lset,lable) + } + return nil +} + +func (self *IgzTSDBItem) InsertKey(key string, value_type IgzType, value interface{}) error { + strVal := value.(string) + + _,err := time.Parse( time.RFC3339,strVal) + if err != nil { + //fix convert + self.Time=strconv.FormatInt(time.Now().Unix() , 10) + } else{ + self.Time = strVal + } + + return nil +} + + +func (self *IgzTSDBItem) InsertTime(vals []string,parser *EmdSchemaParser) error { + for i, v := range parser.values_map { + if v.Name == parser.tsdb_time { + parser.tsdb_time_index = i + } + } + input := vals[parser.tsdb_time_index] + //add validation on time + self.Time= input + return nil +} + + + + +func (self *IgzTSDBItem) InsertValue(strVal string){ + f, err := strconv.ParseFloat(strVal, 64) + if err!=nil { + panic(fmt.Sprintf("conversion error to float %v %v", strVal)) + } + self.Value=f +} + +func NewTSDBItem() *IgzTSDBItem { + i := &IgzTSDBItem{} + return i +} + +type IgzTSDBItemUpdate struct { + UpdateMode string + UpdateExpression string +} + +func (self *IgzTSDBItemUpdate) ToJsonString() string { + body, _ := json.Marshal(self) + return string(body) +} + +type IgzTSDBItemQuery struct { + TableName string + AttributesToGet string + Key map[string]map[string]interface{} +} + +func (self *IgzTSDBItemQuery) ToJsonString() string { + body, _ := json.Marshal(self) + return string(body) +} + +func (self *IgzTSDBItemQuery) InsertKey(key string, value_type IgzType, value interface{}) error { + if _, ok := self.Key[key]; ok { + err := fmt.Sprintf("Key %s Override existing key %v", key, self.Key) + log.Error(err) + return errors.New(err) + } + self.Key[key] = make(map[string]interface{}) + self.Key[key][string(value_type)] = value + return nil +} + +type IgzTSDBItemsQuery struct { + TableName string + AttributesToGet string + Limit int + FilterExpression string + Segment int + TotalSegment int + Marker string + StartingKey map[string]map[string]interface{} + EndingKey map[string]map[string]interface{} +} + +func (self *IgzTSDBItemsQuery) ToJsonString() string { + body, _ := json.Marshal(self) + return string(body) +} + +func (self *IgzTSDBItemsQuery) InsertStartingKey(key string, value_type IgzType, value interface{}) error { + if _, ok := self.StartingKey[key]; ok { + err := fmt.Sprintf("Key %s Override existing key %v", key, self.StartingKey) + log.Error(err) + return errors.New(err) + } + self.StartingKey[key] = make(map[string]interface{}) + self.StartingKey[key][string(value_type)] = value + return nil +} + +func (self *IgzTSDBItemsQuery) InsertEndingKey(key string, value_type IgzType, value interface{}) error { + if _, ok := self.EndingKey[key]; ok { + err := fmt.Sprintf("Key %s Override existing key %v", key, self.EndingKey) + log.Error(err) + return errors.New(err) + } + self.EndingKey[key] = make(map[string]interface{}) + self.EndingKey[key][string(value_type)] = value + return nil +} + + +func (item IgzTSDBItem) ConvertToTSDBItem() *IgzTSDBItem{ + returnItem := IgzTSDBItem{} + return &returnItem +} diff --git a/httpblaster/request_generators/common.go b/httpblaster/request_generators/common.go index b564dc1..07de027 100644 --- a/httpblaster/request_generators/common.go +++ b/httpblaster/request_generators/common.go @@ -21,6 +21,7 @@ const ( RESTORE = "restore" LINE2HTTP = "line2http" REPLAY = "replay" + CSV2TSDB = "csv2tsdb" ) type RequestCommon struct { diff --git a/httpblaster/request_generators/csv2tsdb.go b/httpblaster/request_generators/csv2tsdb.go new file mode 100644 index 0000000..97641a3 --- /dev/null +++ b/httpblaster/request_generators/csv2tsdb.go @@ -0,0 +1,112 @@ +package request_generators + +import ( + "encoding/csv" + log "github.com/sirupsen/logrus" + "github.com/v3io/http_blaster/httpblaster/config" + "github.com/v3io/http_blaster/httpblaster/igz_data" + "io" + "os" + "runtime" + "strings" + "sync" + +) + +type Csv2TSDB struct { + workload config.Workload + RequestCommon +} + +func (self *Csv2TSDB) UseCommon(c RequestCommon) { + +} + +func (self *Csv2TSDB) generate_request(ch_records chan []string, ch_req chan *Request, host string, + wg *sync.WaitGroup) { + defer wg.Done() + parser := igz_data.EmdSchemaParser{} + var contentType string = "text/html" + e := parser.LoadSchema(self.workload.Schema, "", "") + if e != nil { + panic(e) + } + + for r := range ch_records { + + vals := parser.TSDBFromCSVRecord(r) + json_payload := vals + req := AcquireRequest() + self.PrepareRequest(contentType, self.workload.Header, "PUT", + self.base_uri, json_payload, host, req.Request) + ch_req <- req + } +} + +func (self *Csv2TSDB) generate(ch_req chan *Request, payload string, host string) { + defer close(ch_req) + var ch_records chan []string = make(chan []string, 1000) + parser := igz_data.EmdSchemaParser{} + e := parser.LoadSchema(self.workload.Schema, "", "") + if e != nil { + panic(e) + } + + wg := sync.WaitGroup{} + wg.Add(runtime.NumCPU()) + for c := 0; c < runtime.NumCPU(); c++ { + go self.generate_request(ch_records, ch_req, host, &wg) + } + + ch_files := self.FilesScan(self.workload.Payload) + + for f := range ch_files { + fp, err := os.Open(f) + if err != nil { + panic(err) + } + + r := csv.NewReader(fp) + r.Comma = parser.JsonSchema.Settings.Separator.Rune + var line_count = 0 + for { + record, err := r.Read() + if err != nil { + if err == io.EOF { + break + } + panic(err) + } + + if strings.HasPrefix(record[0], "#") { + log.Println("Skipping scv header ", strings.Join(record[:], ",")) + } else { + ch_records <- record + line_count++ + if line_count%1024 == 0 { + log.Printf("line: %d from file %s was submitted", line_count, f) + } + } + } + fp.Close() + } + + close(ch_records) + wg.Wait() +} + +func (self *Csv2TSDB) GenerateRequests(global config.Global, wl config.Workload, tls_mode bool, host string, ret_ch chan *Response, worker_qd int) chan *Request { + self.workload = wl + if self.workload.Header == nil { + self.workload.Header = make(map[string]string) + } + self.workload.Header["X-v3io-function"] = "PutItem" + + self.SetBaseUri(tls_mode, host, self.workload.Container, self.workload.Target) + + ch_req := make(chan *Request, worker_qd) + + go self.generate(ch_req, self.workload.Payload, host) + + return ch_req +} diff --git a/httpblaster/schema_parser_test.go b/httpblaster/schema_parser_test.go index 83e2969..5594b92 100644 --- a/httpblaster/schema_parser_test.go +++ b/httpblaster/schema_parser_test.go @@ -7,11 +7,23 @@ import ( "log" "os" "testing" + //"go/parser" + "strings" + + + "github.com/v3io/v3io-tsdb/pkg/utils" + "encoding/json" + "fmt" ) + + + + func Test_Schema_Parser(t *testing.T) { + //pwd, _ := os.Getwd() p := igz_data.EmdSchemaParser{} - e := p.LoadSchema("../example/schema_example.json") + e := p.LoadSchema("../examples/schemas/schema_example.json","","") if e != nil { t.Error(e) } @@ -34,8 +46,74 @@ func Test_Schema_Parser(t *testing.T) { } log.Println(record) - j := p.EmdFromCSVRecord() + j := p.EmdFromCSVRecord(record) log.Println(j) } } + + +func Test_tsdb_Schema_Parser(t *testing.T) { + //pwd, _ := os.Getwd() + p := igz_data.EmdSchemaParser{} + e := p.LoadSchema("../examples/schemas/tsdb_schema_example.json","","") + if e != nil { + t.Error(e) + } + + f, err := os.Open("../examples/payloads/order-book-sample.csv") + if err != nil { + panic(err) + } + defer f.Close() + r := csv.NewReader(f) + r.Comma = p.JsonSchema.Settings.Separator.Rune + var line_count = 0 + for { + record, err := r.Read() + if err != nil { + if err == io.EOF { + break + } + panic(err) + } + + if strings.HasPrefix(record[0], "#") { + log.Println("Skipping scv header ", strings.Join(record[:], ",")) + } else { + j := p.TSDBFromCSVRecord(record) + log.Println(j) + line_count++ + if line_count%1024 == 0 { + log.Printf("line: %d from file %s was submitted", line_count, f) + } + } + } + + + +} + +func Test_tsdb_to_json(t *testing.T) { + item := igz_data.IgzTSDBItem{} + item.Lset = utils.Labels{{Name: "__name__", Value: "name"}} + item.Time = "1529659800000" + item.Value = 1 + item2 := igz_data.IgzTSDBItem{} + item2.Lset = utils.Labels{{Name: "__name__", Value: "name2"}} + item2.Time = "1529659900000" + item2.Value = 2 + + items := []igz_data.IgzTSDBItem{item, item2} + body, _ := json.Marshal(items) + fmt.Println(string(body)) +} + + + + + + + + +