Skip to content

Commit

Permalink
fix behavior of setsum resulting in bad data (#550)
Browse files Browse the repository at this point in the history
* fix behavior of setsum resulting in bad data
* inform changelog
  • Loading branch information
sduchesneau authored Oct 21, 2024
1 parent 7199376 commit 05053e6
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 16 deletions.
3 changes: 3 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Server

> **Note** All caches for stores using the updatePolicy `set_sum` (added in substreams v1.7.0) and modules that depend on them will need to be deleted, since they may contain bad data.
* Fix bad data in stores using `set_sum` policy: squashing of store segments incorrectly "summed" some values that should have been "set" if the last event for a key on this segment was a "sum"
* Fix panic in initialization (`metrics sender not set`)

## v1.10.7
Expand Down
28 changes: 20 additions & 8 deletions storage/store/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,69 +132,81 @@ func TestStore_Merge(t *testing.T) {
{
name: "set_sum_int",
latest: newPartialStore(map[string][]byte{
"one": []byte("set:1"),
"two": []byte("sum:2"),
"one": []byte("set:1"),
"two": []byte("sum:2"),
"four": []byte("sum:2"),
}, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeInt64, nil),
prev: newStore(map[string][]byte{
"one": []byte("sum:1"),
"three": []byte("sum:3"),
"four": []byte("sum:2"),
}, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeInt64),
expectedError: false,
expectedKV: map[string][]byte{
"one": []byte("sum:1"),
"two": []byte("sum:2"),
"three": []byte("sum:3"),
"four": []byte("sum:4"),
},
},
{
name: "set_sum_float",
latest: newPartialStore(map[string][]byte{
"one": []byte("set:1.1"),
"two": []byte("sum:2.2"),
"one": []byte("set:1.1"),
"two": []byte("sum:2.2"),
"four": []byte("sum:2.2"),
}, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeFloat64, nil),
prev: newStore(map[string][]byte{
"one": []byte("sum:1.1"),
"three": []byte("sum:3.3"),
"four": []byte("sum:2.2"),
}, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeFloat64),
expectedError: false,
expectedKV: map[string][]byte{
"one": []byte("sum:1.1"),
"two": []byte("sum:2.2"),
"three": []byte("sum:3.3"),
"four": []byte("sum:4.4"),
},
},
{
name: "set_sum_big_int",
latest: newPartialStore(map[string][]byte{
"one": []byte("set:1"),
"two": []byte("sum:2"),
"one": []byte("set:1"),
"two": []byte("sum:2"),
"four": []byte("sum:2"),
}, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeBigInt, nil),
prev: newStore(map[string][]byte{
"one": []byte("sum:1"),
"three": []byte("sum:3"),
"four": []byte("sum:2"),
}, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeBigInt),
expectedError: false,
expectedKV: map[string][]byte{
"one": []byte("sum:1"),
"two": []byte("sum:2"),
"three": []byte("sum:3"),
"four": []byte("sum:4"),
},
},
{
name: "set_sum_big_decimal",
latest: newPartialStore(map[string][]byte{
"one": []byte("set:1.1"),
"two": []byte("sum:2.2"),
"one": []byte("set:1.1"),
"two": []byte("sum:2.2"),
"four": []byte("sum:2.2"),
}, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeBigDecimal, nil),
prev: newStore(map[string][]byte{
"one": []byte("sum:1.1"),
"three": []byte("sum:3.3"),
"four": []byte("sum:2.2"),
}, pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_SUM, manifest.OutputValueTypeBigDecimal),
expectedError: false,
expectedKV: map[string][]byte{
"one": []byte("sum:1.1"),
"two": []byte("sum:2.2"),
"three": []byte("sum:3.3"),
"four": []byte("sum:4.4"),
},
},
{
Expand Down
12 changes: 8 additions & 4 deletions storage/store/store_setsum.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ func (b *baseStore) setSumInt64(ord uint64, key string, value []byte) {
} else {
switch string(value[:4]) {
case "sum:":
prevPrefix := string(val[:4]) // if we had a 'set:' before, we keep it.
prev, _ := strconv.ParseInt(string(val[4:]), 10, 64)
next, _ := strconv.ParseInt(string(value[4:]), 10, 64)
data = []byte("sum:" + strconv.FormatInt(prev+next, 10))
data = []byte(prevPrefix + strconv.FormatInt(prev+next, 10))
case "set:":
data = value
default:
Expand All @@ -55,9 +56,10 @@ func (b *baseStore) setSumFloat64(ord uint64, key string, value []byte) {
} else {
switch string(value[:4]) {
case "sum:":
prevPrefix := string(val[:4]) // if we had a 'set:' before, we keep it.
prev, _ := strconv.ParseFloat(string(val[4:]), 64)
next, _ := strconv.ParseFloat(string(value[4:]), 64)
data = []byte("sum:" + strconv.FormatFloat(prev+next, 'g', 100, 64))
data = []byte(prevPrefix + strconv.FormatFloat(prev+next, 'g', 100, 64))
case "set:":
data = value
default:
Expand All @@ -84,9 +86,10 @@ func (b *baseStore) setSumBigInt(ord uint64, key string, value []byte) {
} else {
switch string(value[:4]) {
case "sum:":
prevPrefix := string(val[:4]) // if we had a 'set:' before, we keep it.
prev := valueToBigInt(val[4:])
next := valueToBigInt(value[4:])
data = []byte("sum:" + big.NewInt(0).Add(prev, next).String())
data = []byte(prevPrefix + big.NewInt(0).Add(prev, next).String())
case "set:":
data = value
default:
Expand All @@ -113,9 +116,10 @@ func (b *baseStore) setSumBigDecimal(ord uint64, key string, value []byte) {
} else {
switch string(value[:4]) {
case "sum:":
prevPrefix := string(val[:4]) // if we had a 'set:' before, we keep it.
prev := mustDecimalFromBytes(val[4:])
next := mustDecimalFromBytes(value[4:])
data = []byte("sum:" + prev.Add(next).String())
data = []byte(prevPrefix + prev.Add(next).String())
case "set:":
data = value
default:
Expand Down
8 changes: 4 additions & 4 deletions storage/store/store_setsum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStoreSetSumInt64(t *testing.T) {
existingValue: []byte("set:7"),
value: []byte("sum:3"),
expectedGetValue: []byte("10"),
expectedActualValue: []byte("sum:10"),
expectedActualValue: []byte("set:10"), // always keep a 'set:' prefix
},
}

Expand Down Expand Up @@ -150,7 +150,7 @@ func TestStoreSetSumFloat64(t *testing.T) {
existingValue: []byte("set:7.0"),
value: []byte("sum:3.0"),
expectedGetValue: []byte("10"),
expectedActualValue: []byte("sum:10"),
expectedActualValue: []byte("set:10"),
},
}

Expand Down Expand Up @@ -236,7 +236,7 @@ func TestStoreSetSumBigInt(t *testing.T) {
existingValue: []byte("set:7"),
value: []byte("sum:3"),
expectedGetValue: []byte("10"),
expectedActualValue: []byte("sum:10"),
expectedActualValue: []byte("set:10"),
},
}

Expand Down Expand Up @@ -322,7 +322,7 @@ func TestStoreSetSumBigDecimal(t *testing.T) {
existingValue: []byte("set:7.0"),
value: []byte("sum:3.0"),
expectedGetValue: []byte("10"),
expectedActualValue: []byte("sum:10"),
expectedActualValue: []byte("set:10"), // always keep a 'set:' prefix
},
}

Expand Down

0 comments on commit 05053e6

Please sign in to comment.