diff --git a/fill/fill.go b/fill/fill.go index 92008dc..e6d7c8e 100644 --- a/fill/fill.go +++ b/fill/fill.go @@ -54,7 +54,7 @@ func fillArchive(srcWsp, dstWsp *whisper.Whisper, start, stop int) error { } tsStart += ts.Step() } - dstWsp.UpdateMany(points) + dstWsp.UpdateManyWithRetention(points, v.MaxRetention()) stop = fromTime if start >= stop { diff --git a/fill/fill_test.go b/fill/fill_test.go index 40dc6a2..f811a9a 100644 --- a/fill/fill_test.go +++ b/fill/fill_test.go @@ -113,7 +113,9 @@ func validateWhisper(path string, ts []*whisper.TimeSeriesPoint) error { } defer wsp.Close() - wspData, err := wsp.Fetch(0, int(time.Now().Unix())) + now := int(time.Now().Unix()) + fromTime := now - wsp.Retentions()[0].MaxRetention() + wspData, err := wsp.Fetch(fromTime, now) if err != nil { return err } @@ -151,8 +153,11 @@ func fetchFromFile(path string) ([]*whisper.TimeSeriesPoint, error) { } defer wsp.Close() + now := int(time.Now().Unix()) + fromTime := now - wsp.Retentions()[0].MaxRetention() + // Parse and fetch data from it - ts, err := wsp.Fetch(0, int(time.Now().Unix())) + ts, err := wsp.Fetch(fromTime, now) if err != nil { return tsp, err } @@ -164,7 +169,7 @@ func simulateFill(a, b []*whisper.TimeSeriesPoint) []*whisper.TimeSeriesPoint { // Assume that we are simulating the fill operation on WSP DBs created // with the above functions. // This is a shallow copy operation. - dataMerged := make([]*whisper.TimeSeriesPoint, 30) + dataMerged := make([]*whisper.TimeSeriesPoint, len(b)) // copy everything in b over to our return value copy(dataMerged, b) gapstart := -1 @@ -240,6 +245,155 @@ func TestFill(t *testing.T) { fmt.Println() } +func whisperCreateDataMany(path string, ts []*whisper.TimeSeriesPoint) error { + os.Remove(path) // Don't care if it fails + retentions, err := whisper.ParseRetentionDefs("1m:30m,5m:60m,15m:150m") + if err != nil { + return err + } + wsp, err := whisper.Create(path, retentions, whisper.Sum, 0) + if err != nil { + return err + } + defer wsp.Close() + + // Iterate through the slice so we can support null values + wsp.UpdateMany(ts) + + return nil +} + +func whisperCreateNullsManyArchives(path string) ([]*whisper.TimeSeriesPoint, error) { + values := []float64{ + math.NaN(), + math.NaN(), + math.NaN(), + 0.0, + 7.0, + 1.0, + 7.0, + 2.0, + 9.0, + 9.0, + 9.0, + 4.0, + 3.0, + 2.0, + 7.0, + 5.0, + 1.0, + 9.0, + 4.0, + 4.0, + 1.0, + 5.0, + 5.0, + 8.0, + 4.0, + 2.0, + 6.0, + 0.0, + math.NaN(), + math.NaN(), + } + + now := int(time.Now().Unix()) + ts := make([]*whisper.TimeSeriesPoint, 0, len(values)) + for _, v := range values { + ts = append(ts, &whisper.TimeSeriesPoint{Value: v, Time: now}) + now -= 60 + } + + os.Remove(path) // Don't care if it fails + retentions, err := whisper.ParseRetentionDefs("1m:30m,5m:60m,15m:150m") + if err != nil { + return ts, err + } + wsp, err := whisper.Create(path, retentions, whisper.Sum, 0) + if err != nil { + return ts, err + } + defer wsp.Close() + + for _, point := range ts { + _ = wsp.Update(point.Value, point.Time) + } + + return ts, nil +} + +func TestTwoArchives(t *testing.T) { + dataC, err := whisperCreateNullsManyArchives("c1.wsp") + if err != nil { + t.Fatal(err) + } + + // Create an identical set of test data + err = whisperCreateDataMany("c2.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + err = whisperCreateDataMany("d1.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + err = whisperCreateDataMany("d2.wsp", dataC) + if err != nil { + t.Fatal(err) + } + + // whisper-fill.py needs to be in the PATH somewhere + log.Println("Running whisper-fill.py...") + c := exec.Command("whisper-fill.py", "c1.wsp", "d1.wsp") + + reference_err := c.Run() + pythonFill, err := fetchFromFile("d1.wsp") + + // Run my version + err = Files("c2.wsp", "d2.wsp", int(time.Now().Unix())) + if err != nil { + t.Error(err) + } + goFill, err := fetchFromFile("d2.wsp") + if err != nil { + t.Error(err) + } + + // Compare to what we think our version should be + simuFill := simulateFill(dataC, dataC) + + err = validateWhisper("d2.wsp", simuFill) + if err != nil { + t.Error(err) + } + + // Validate the reference if whisper-fill.py was found + if reference_err == nil { + err = validateWhisper("d1.wsp", simuFill) + if err != nil { + t.Error(err) + } + } + + if len(goFill) != len(pythonFill) { + t.Fatalf("length mismatch, python=%v, go=%v, expected=%v", len(goFill), len(pythonFill)) + } + + // Now try to print out a table of C, D, Python, Go, Simu + fmt.Printf("C \tD \tPython\tGo \tSimu\n") + fmt.Printf("======\t======\t======\t======\t======\n") + for i := 0; i < len(goFill); i++ { + if reference_err != nil { + fmt.Printf("%6.1f\t%6.1f\t%6.1f\t%6.1f\t%6.1f\n", dataC[i].Value, dataC[i].Value, math.NaN(), goFill[i].Value, simuFill[i].Value) + } else { + fmt.Printf("%6.1f\t%6.1f\t%6.1f\t%6.1f\t%6.1f\n", dataC[i].Value, dataC[i].Value, pythonFill[i].Value, goFill[i].Value, simuFill[i].Value) + } + } + fmt.Println() +} + func TestReference(t *testing.T) { // Create our random test data dataA, err := whisperCreate("a1.wsp") diff --git a/whisper/whisper.go b/whisper/whisper.go index 42b5568..e1c42f5 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -369,7 +369,9 @@ func (whisper *Whisper) Update(value float64, timestamp int) (err error) { return nil } -func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { +// UpdateManyWithRetention updates only archive with specified retention. +// retention = -1 means "update all possible archives" +func (whisper *Whisper) UpdateManyWithRetention(points []*TimeSeriesPoint, retention int) { // sort the points, newest first sort.Sort(timeSeriesPointsNewestFirst{points}) @@ -377,6 +379,9 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { var currentPoints []*TimeSeriesPoint for _, archive := range whisper.archives { + if retention != -1 && retention != archive.MaxRetention() { + continue + } currentPoints, points = extractPoints(points, now, archive.MaxRetention()) if len(currentPoints) == 0 { continue @@ -393,6 +398,10 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { } } +func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) { + whisper.UpdateManyWithRetention(points, -1) +} + func (whisper *Whisper) archiveUpdateMany(archive *archiveInfo, points []*TimeSeriesPoint) { alignedPoints := alignPoints(archive, points) intervals, packedBlocks := packSequences(archive, alignedPoints)