Skip to content

Commit

Permalink
Merge pull request #49 from aviaIguazio/tsdb
Browse files Browse the repository at this point in the history
Tsdb
  • Loading branch information
sasile authored Jul 19, 2018
2 parents 5908dd6 + 936af72 commit 076167a
Show file tree
Hide file tree
Showing 10 changed files with 545 additions and 10 deletions.
29 changes: 29 additions & 0 deletions examples/csv_2_tsdb_example.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
title = "Workload example"

[global]
Duration = "160s"
server="192.168.206.22"
port="44001"
TLSMode=false
[global.StatusCodesAcceptance]
200 = 100.0 # upto 100% return status of 200
204 = 100.0
205 = 100.0

[workloads]
[workloads.load_to_t1]
name="load_to_t1"
Generator="csv2tsdb"
container="1"
Target="t1/"
workers=1
payload="examples/payloads/order-book-sample.csv"
schema="examples/schemas/tsdb_schema_example.json"








56 changes: 56 additions & 0 deletions examples/schemas/tsdb_schema_example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"SETTINGS": {
"Format": "csv",
"Separator": "|",
"TSDBTime": "Time_Sequence",
"TSDBName" : "Broker_ID",
"TSDBValue" : "Total_Orders",
"TSDBLables" : "Status_Flag,Active_Status"
},
"COLUMNS": [
{
"Index": 2,
"Type": "S",
"Name": "Time_Sequence",
"Nullable": true
},
{
"Index": 3,
"Type": "N",
"Name": "Exchange_Transaction_ID",
"Nullable": true
},
{
"Index": 4,
"Type": "S",
"Name": "Status_Flag",
"Nullable": true
},
{
"Index": 10,
"Type": "N",
"Name": "Total_Orders",
"Nullable": true
},
{
"Index": 18,
"Type": "S",
"Name": "Active_Status",
"Nullable": true
},
{
"Index": 20,
"Type": "N",
"Name": "Total_New_Orders",
"Nullable": true
},
{
"Index": 34,
"Type": "S",
"Name": "Broker_ID",
"Nullable": true
}


]
}
1 change: 0 additions & 1 deletion http_blaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"sync"
"sync/atomic"
"time"
//"github.com/v3io/http_blaster/httpblaster/histogram"
"sort"
)

Expand Down
5 changes: 5 additions & 0 deletions httpblaster/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func (self *Executor) load_request_generator() (chan *request_generators.Request
release_req = false
}
break


case request_generators.LINE2STREAM:
req_gen = &request_generators.Line2StreamGenerator{}
break
Expand Down Expand Up @@ -115,6 +117,9 @@ func (self *Executor) load_request_generator() (chan *request_generators.Request
case request_generators.STREAM_GET:
req_gen = &request_generators.StreamGetGenerator{}
ch_response = make(chan *request_generators.Response)
case request_generators.CSV2TSDB:
req_gen = &request_generators.Csv2TSDB{}
break
default:
panic(fmt.Sprintf("unknown request generator %s", self.Workload.Generator))
}
Expand Down
8 changes: 8 additions & 0 deletions httpblaster/http_blaster_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
package httpblaster
/*
import (
"testing"
"io/ioutil"
"path"
)
import (
"bytes"
Expand Down Expand Up @@ -204,3 +211,4 @@ func Test_PUT_Multi_Worker(t *testing.T) {
}
t.Logf("total number of requests %d", uint64(count)*wl.req_count)
}
*/
93 changes: 86 additions & 7 deletions httpblaster/igz_data/emd_schema_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
"github.com/nu7hatch/gouuid"
"github.com/v3io/http_blaster/httpblaster/config"
"io/ioutil"
"regexp"
//"strconv"
"regexp"
"strings"
)

Expand All @@ -24,6 +23,11 @@ type SchemaSettings struct {
KeyFields string
KeyFormat string
UpdateFields string
TSDBName string
TSDBTime string
TSDBValue string
TSDBLables string

}

type SchemaValue struct {
Expand All @@ -47,11 +51,21 @@ type EmdSchemaParser struct {
update_fields_indexs []int
updateMode string
updateExpression string
tsdb_name string
tsdb_name_index int
tsdb_time string
tsdb_time_index int
tsdb_value string
tsdb_value_index int
tsdb_attributes string
tsdb_attributes_map map[string]int

}

func (self *EmdSchemaParser) LoadSchema(file_path, update_mode, update_expression string) error {

self.values_map = make(map[int]SchemaValue)
self.tsdb_attributes_map = make(map[string]int)
plan, _ := ioutil.ReadFile(file_path)
err := json.Unmarshal(plan, &self.JsonSchema)
if err != nil {
Expand All @@ -64,11 +78,19 @@ func (self *EmdSchemaParser) LoadSchema(file_path, update_mode, update_expressio
self.schema_key_fields = settings.KeyFields
self.updateMode = update_mode
self.updateExpression = update_expression
self.tsdb_time =settings.TSDBTime
self.tsdb_name =settings.TSDBName
self.tsdb_value =settings.TSDBValue
self.tsdb_attributes = settings.TSDBLables


for _, v := range columns {
self.values_map[v.Index] = v
}
self.GetKeyIndexes()
self.MapTSDBLablesIndexes()
self.GetTSDBNameIndex()
self.GetTSDBValueIndex()
if len(self.updateExpression) > 0 {
self.GetUpdateExpressionIndexes()
}
Expand Down Expand Up @@ -101,6 +123,32 @@ func (self *EmdSchemaParser) GetKeyIndexes() {
}
}

func (self *EmdSchemaParser) GetTSDBNameIndex() {
for _, v := range self.values_map {
if v.Name == self.tsdb_name {
self.tsdb_name_index = v.Index
}
}
}

func (self *EmdSchemaParser) GetTSDBValueIndex() {
for _, v := range self.values_map {
if v.Name == self.tsdb_value {
self.tsdb_value_index = v.Index
}
}
}

func (self *EmdSchemaParser) MapTSDBLablesIndexes() {
attributes := strings.Split(self.tsdb_attributes, ",")
for _, att := range attributes {
for _, v := range self.values_map {
if v.Name == att {
self.tsdb_attributes_map[att] = v.Index }
}
}
}

func (self *EmdSchemaParser) GetFieldsIndexes(fields, delimiter string) []int {
keys := strings.Split(fields, delimiter)
indexArray := make([]int, 1)
Expand All @@ -123,7 +171,8 @@ func (self *EmdSchemaParser) KeyFromCSVRecord(vals []string) string {
}
//when 1 key, return the key
if len(self.schema_key_indexs) == 1 {
return vals[0]
//fix bug of returning always key in position 0
return vals[self.schema_key_indexs[0]]
}
//when more the one key, generate formatted key
var keys []interface{}
Expand All @@ -134,19 +183,49 @@ func (self *EmdSchemaParser) KeyFromCSVRecord(vals []string) string {
return key
}

func (self *EmdSchemaParser) nameIndexFromCSVRecord(vals []string) string {
//when no keys, generate random
if len(self.schema_key_indexs) == 0 {
u, _ := uuid.NewV4()
return u.String()
}
//when 1 key, return the key
if len(self.schema_key_indexs) == 1 {
//fix bug of returning always key in position 0
return vals[self.schema_key_indexs[0]]
}
//when more the one key, generate formatted key
var keys []interface{}
for _, i := range self.schema_key_indexs {
keys = append(keys, vals[i])
}
key := fmt.Sprintf(self.schema_key_format, keys...)
return key
}



func (self *EmdSchemaParser) EmdFromCSVRecord(vals []string) string {
emd_item := NewEmdItem()
emd_item.InsertKey("key", T_STRING, self.KeyFromCSVRecord(vals))
for i, v := range vals {
err, igz_type, value := ConvertValue(self.values_map[i].Type, v)
if err != nil {
panic(fmt.Sprintf("conversion error %d %v %v", i, v, self.values_map[i]))
if val , ok :=self.values_map[i] ; ok {
err, igz_type, value := ConvertValue(val.Type, v)
if err != nil {
panic(fmt.Sprintf("conversion error %d %v %v", i, v, self.values_map[i]))
}
emd_item.InsertItemAttr(self.values_map[i].Name, igz_type, value)
}
emd_item.InsertItemAttr(self.values_map[i].Name, igz_type, value)
}
return string(emd_item.ToJsonString())
}

func (self *EmdSchemaParser) TSDBFromCSVRecord(vals []string) string {
tsdb_item := NewTSDBItem()
tsdb_item.GenerateStruct(vals,self)
return string(tsdb_item.ToJsonString())
}

func (self *EmdSchemaParser) EmdUpdateFromCSVRecord(vals []string) string {
emd_update := NewEmdItemUpdate()
//emd_update.InsertKey("key", T_STRING, self.KeyFromCSVRecord(vals))
Expand Down
Loading

0 comments on commit 076167a

Please sign in to comment.