From 7cf79e983f84b16d6dfbdd5257fb2cd07e6c4803 Mon Sep 17 00:00:00 2001 From: Vladimir Smirnov Date: Mon, 14 Aug 2017 17:08:58 +0200 Subject: [PATCH 1/2] Add a test for #19 This is only adding a test for #19 --- fill/fill_test.go | 160 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 157 insertions(+), 3 deletions(-) diff --git a/fill/fill_test.go b/fill/fill_test.go index 40dc6a2..f811a9a 100644 --- a/fill/fill_test.go +++ b/fill/fill_test.go @@ -113,7 +113,9 @@ func validateWhisper(path string, ts []*whisper.TimeSeriesPoint) error { } defer wsp.Close() - wspData, err := wsp.Fetch(0, int(time.Now().Unix())) + now := int(time.Now().Unix()) + fromTime := now - wsp.Retentions()[0].MaxRetention() + wspData, err := wsp.Fetch(fromTime, now) if err != nil { return err } @@ -151,8 +153,11 @@ func fetchFromFile(path string) ([]*whisper.TimeSeriesPoint, error) { } defer wsp.Close() + now := int(time.Now().Unix()) + fromTime := now - wsp.Retentions()[0].MaxRetention() + // Parse and fetch data from it - ts, err := wsp.Fetch(0, int(time.Now().Unix())) + ts, err := wsp.Fetch(fromTime, now) if err != nil { return tsp, err } @@ -164,7 +169,7 @@ func simulateFill(a, b []*whisper.TimeSeriesPoint) []*whisper.TimeSeriesPoint { // Assume that we are simulating the fill operation on WSP DBs created // with the above functions. // This is a shallow copy operation. - dataMerged := make([]*whisper.TimeSeriesPoint, 30) + dataMerged := make([]*whisper.TimeSeriesPoint, len(b)) // copy everything in b over to our return value copy(dataMerged, b) gapstart := -1 @@ -240,6 +245,155 @@ func TestFill(t *testing.T) { fmt.Println() } +func whisperCreateDataMany(path string, ts []*whisper.TimeSeriesPoint) error { + os.Remove(path) // Don't care if it fails + retentions, err := whisper.ParseRetentionDefs("1m:30m,5m:60m,15m:150m") + if err != nil { + return err + } + wsp, err := whisper.Create(path, retentions, whisper.Sum, 0) + if err != nil { + return err + } + defer wsp.Close() + + // Iterate through the slice so we can support null values + wsp.UpdateMany(ts) + + return nil +} + +func whisperCreateNullsManyArchives(path string) ([]*whisper.TimeSeriesPoint, error) { + values := []float64{ + math.NaN(), + math.NaN(), + math.NaN(), + 0.0, + 7.0, + 1.0, + 7.0, + 2.0, + 9.0, + 9.0, + 9.0, + 4.0, + 3.0, + 2.0, + 7.0, + 5.0, + 1.0, + 9.0, + 4.0, + 4.0, + 1.0, + 5.0, + 5.0, + 8.0, + 4.0, + 2.0, + 6.0, + 0.0, + math.NaN(), + math.NaN(), + } + + now := int(time.Now().Unix()) + ts := make([]*whisper.TimeSeriesPoint, 0, len(values)) + for _, v := range values { + ts = append(ts, &whisper.TimeSeriesPoint{Value: v, Time: now}) + now -= 60 + } + + os.Remove(path) // Don't care if it fails + retentions, err := whisper.ParseRetentionDefs("1m:30m,5m:60m,15m:150m") + if err != nil { + return ts, err + } + wsp, err := whisper.Create(path, retentions, whisper.Sum, 0) + if err != nil { + return ts, err + } + defer wsp.Close() + + for _, point := range ts { + _ = wsp.Update(point.Value, point.Time) + } + + return ts, nil +} + +func TestTwoArchives(t *testing.T) { + dataC, err := whisperCreateNullsManyArchives("c1.wsp") + if err != nil { + t.Fatal(err) + } + + // Create an identical set of test data + err = whisperCreateDataMany("c2.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + err = whisperCreateDataMany("d1.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + err = whisperCreateDataMany("d2.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + // whisper-fill.py needs to be in the PATH somewhere + log.Println("Running whisper-fill.py...") + c := exec.Command("whisper-fill.py", "c1.wsp", "d1.wsp") + + reference_err := c.Run() + pythonFill, err := fetchFromFile("d1.wsp") + + // Run my version + err = Files("c2.wsp", "d2.wsp", int(time.Now().Unix())) + if err != nil { + t.Error(err) + } + goFill, err := fetchFromFile("d2.wsp") + if err != nil { + t.Error(err) + } + + // Compare to what we think our version should be + simuFill := simulateFill(dataC, dataC) + + err = validateWhisper("d2.wsp", simuFill) + if err != nil { + t.Error(err) + } + + // Validate the reference if whisper-fill.py was found + if reference_err == nil { + err = validateWhisper("d1.wsp", simuFill) + if err != nil { + t.Error(err) + } + } + + if len(goFill) != len(pythonFill) { + t.Fatalf("length mismatch, python=%v, go=%v, expected=%v", len(goFill), len(pythonFill)) + } + + // Now try to print out a table of C, D, Python, Go, Simu + fmt.Printf("C \tD \tPython\tGo \tSimu\n") + fmt.Printf("======\t======\t======\t======\t======\n") + for i := 0; i < len(goFill); i++ { + if reference_err != nil { + fmt.Printf("%6.1f\t%6.1f\t%6.1f\t%6.1f\t%6.1f\n", dataC[i].Value, dataC[i].Value, math.NaN(), goFill[i].Value, simuFill[i].Value) + } else { + fmt.Printf("%6.1f\t%6.1f\t%6.1f\t%6.1f\t%6.1f\n", dataC[i].Value, dataC[i].Value, pythonFill[i].Value, goFill[i].Value, simuFill[i].Value) + } + } + fmt.Println() +} + func TestReference(t *testing.T) { // Create our random test data dataA, err := whisperCreate("a1.wsp") From 0e073c8a96447048743316bf732686e078109c09 Mon Sep 17 00:00:00 2001 From: Vladimir Smirnov Date: Tue, 15 Aug 2017 10:18:57 +0200 Subject: [PATCH 2/2] Add 'UpdateManyWithRetention' whisper function and migrate fill to use it Fill must read and write data to exactly same archive. For this purpose this commit intoroduce new function 'UpdateManyWithRetention' that also gets 'desired' archive's retention. Also migrate 'fillArchive' in fill package to use that function, instead of plain 'UpdateMany' Fixes #19 --- fill/fill.go | 2 +- whisper/whisper.go | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/fill/fill.go b/fill/fill.go index 92008dc..e6d7c8e 100644 --- a/fill/fill.go +++ b/fill/fill.go @@ -54,7 +54,7 @@ func fillArchive(srcWsp, dstWsp *whisper.Whisper, start, stop int) error { } tsStart += ts.Step() } - dstWsp.UpdateMany(points) + dstWsp.UpdateManyWithRetention(points, v.MaxRetention()) stop = fromTime if start >= stop { diff --git a/whisper/whisper.go b/whisper/whisper.go index 42b5568..e1c42f5 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -369,7 +369,9 @@ func (whisper *Whisper) Update(value float64, timestamp int) (err error) { return nil } -func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { +// UpdateManyWithRetention updates only archive with specified retention. +// retention = -1 means "update all possible archives" +func (whisper *Whisper) UpdateManyWithRetention(points []*TimeSeriesPoint, retention int) { // sort the points, newest first sort.Sort(timeSeriesPointsNewestFirst{points}) @@ -377,6 +379,9 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { var currentPoints []*TimeSeriesPoint for _, archive := range whisper.archives { + if retention != -1 && retention != archive.MaxRetention() { + continue + } currentPoints, points = extractPoints(points, now, archive.MaxRetention()) if len(currentPoints) == 0 { continue @@ -393,6 +398,10 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { } } +func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { + whisper.UpdateManyWithRetention(points, -1) +} + func (whisper *Whisper) archiveUpdateMany(archive *archiveInfo, points []*TimeSeriesPoint) { alignedPoints := alignPoints(archive, points) intervals, packedBlocks := packSequences(archive, alignedPoints)