Skip to content

Commit

Permalink
fix store operation time metrics and other minor linter complaints
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Apr 12, 2024
1 parent a18b046 commit edf62b4
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 26 deletions.
1 change: 1 addition & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* fix a possible panic() when an request is interrupted during the file loading phase of a squashing operation.
* fix a rare possibility of stalling if only some fullkv stores caches were deleted, but further segments were still present.
* fix stats counters for store operations time

## v1.5.3

Expand Down
67 changes: 44 additions & 23 deletions wasm/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,148 +117,169 @@ func (c *Call) ReachedLogsMaxByteCount() bool {
}

func (c *Call) DoSet(ord uint64, key string, value []byte) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateSimple("set", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, key)
c.outputStore.SetBytes(ord, key, value)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetIfNotExists(ord uint64, key string, value []byte) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateSimple("set_if_not_exists", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, key)
c.outputStore.SetBytesIfNotExists(ord, key, value)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoAppend(ord uint64, key string, value []byte) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateSimple("append", pbsubstreams.Module_KindStore_UPDATE_POLICY_APPEND, key)
if err := c.outputStore.Append(ord, key, value); err != nil {
c.ReturnError(fmt.Errorf("appending to store: %w", err))
}
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoDeletePrefix(ord uint64, prefix string) {
defer c.stats.RecordModuleWasmStoreDeletePrefix(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.traceStateWrites("delete_prefix", prefix)
c.outputStore.DeletePrefix(ord, prefix)
c.stats.RecordModuleWasmStoreDeletePrefix(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoAddBigInt(ord uint64, key string, value string) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("add_bigint", pbsubstreams.Module_KindStore_UPDATE_POLICY_ADD, "bigint", key)

toAdd, _ := new(big.Int).SetString(value, 10)
c.outputStore.SumBigInt(ord, key, toAdd)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoAddBigDecimal(ord uint64, key string, value string) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithTwoValueTypes("add_bigdecimal", pbsubstreams.Module_KindStore_UPDATE_POLICY_ADD, "bigdecimal", "bigfloat", key)

toAdd, err := decimal.NewFromString(string(value))
if err != nil {
c.ReturnError(fmt.Errorf("parsing bigdecimal: %w", err))
}
c.outputStore.SumBigDecimal(ord, key, toAdd.Truncate(34))
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoAddInt64(ord uint64, key string, value int64) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("add_int64", pbsubstreams.Module_KindStore_UPDATE_POLICY_ADD, "int64", key)
c.outputStore.SumInt64(ord, key, value)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoAddFloat64(ord uint64, key string, value float64) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("add_float64", pbsubstreams.Module_KindStore_UPDATE_POLICY_ADD, "float64", key)
c.outputStore.SumFloat64(ord, key, value)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetMinInt64(ord uint64, key string, value int64) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("set_min_int64", pbsubstreams.Module_KindStore_UPDATE_POLICY_MIN, "int64", key)
c.outputStore.SetMinInt64(ord, key, value)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetMinBigInt(ord uint64, key string, value string) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("set_min_bigint", pbsubstreams.Module_KindStore_UPDATE_POLICY_MIN, "bigint", key)
toSet, _ := new(big.Int).SetString(value, 10)
c.outputStore.SetMinBigInt(ord, key, toSet)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetMinFloat64(ord uint64, key string, value float64) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("set_min_float64", pbsubstreams.Module_KindStore_UPDATE_POLICY_MIN, "float64", key)
c.outputStore.SetMinFloat64(ord, key, value)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetMinBigDecimal(ord uint64, key string, value string) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithTwoValueTypes("set_min_bigdecimal", pbsubstreams.Module_KindStore_UPDATE_POLICY_MIN, "bigdecimal", "bigfloat", key)
toAdd, err := decimal.NewFromString(value)
if err != nil {
c.ReturnError(fmt.Errorf("parsing bigdecimal: %w", err))
}
c.outputStore.SetMinBigDecimal(ord, key, toAdd.Truncate(34))
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetMaxInt64(ord uint64, key string, value int64) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("set_max_int64", pbsubstreams.Module_KindStore_UPDATE_POLICY_MAX, "int64", key)
c.outputStore.SetMaxInt64(ord, key, value)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetMaxBigInt(ord uint64, key string, value string) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("set_max_bigint", pbsubstreams.Module_KindStore_UPDATE_POLICY_MAX, "bigint", key)
toSet, _ := new(big.Int).SetString(value, 10)
c.outputStore.SetMaxBigInt(ord, key, toSet)

c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetMaxFloat64(ord uint64, key string, value float64) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithValueType("set_max_float64", pbsubstreams.Module_KindStore_UPDATE_POLICY_MAX, "float64", key)
c.outputStore.SetMaxFloat64(ord, key, value)
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}
func (c *Call) DoSetMaxBigDecimal(ord uint64, key string, value string) {
defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now()))
now := time.Now()
c.validateWithTwoValueTypes("set_max_bigdecimal", pbsubstreams.Module_KindStore_UPDATE_POLICY_MAX, "bigdecimal", "bigfloat", key)
toAdd, err := decimal.NewFromString(value)
if err != nil {
c.ReturnError(fmt.Errorf("parsing bigdecimal: %w", err))
}
c.outputStore.SetMaxBigDecimal(ord, key, toAdd.Truncate(34))
c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now))
}

func (c *Call) DoGetAt(storeIndex int, ord uint64, key string) (value []byte, found bool) {
defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now()))
now := time.Now()
defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }()
c.validateStoreIndex(storeIndex, "get_at")
readStore := c.inputStores[storeIndex]
c.traceStateReads("get_at", storeIndex, found, key)
return readStore.GetAt(ord, key)
}

func (c *Call) DoHasAt(storeIndex int, ord uint64, key string) (found bool) {
defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now()))
now := time.Now()
defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }()
c.validateStoreIndex(storeIndex, "has_at")
readStore := c.inputStores[storeIndex]
c.traceStateReads("has_at", storeIndex, found, key)
return readStore.HasAt(ord, key)
}

func (c *Call) DoGetFirst(storeIndex int, key string) (value []byte, found bool) {
defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now()))
now := time.Now()
defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }()
c.validateStoreIndex(storeIndex, "get_first")
readStore := c.inputStores[storeIndex]
c.traceStateReads("get_first", storeIndex, found, key)
return readStore.GetFirst(key)
}

func (c *Call) DoHasFirst(storeIndex int, key string) (found bool) {
defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now()))
now := time.Now()
defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }()
c.validateStoreIndex(storeIndex, "has_first")
readStore := c.inputStores[storeIndex]
c.traceStateReads("has_first", storeIndex, found, key)
return readStore.HasFirst(key)
}

func (c *Call) DoGetLast(storeIndex int, key string) (value []byte, found bool) {
defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now()))
now := time.Now()
defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }()
c.validateStoreIndex(storeIndex, "get_last")
readStore := c.inputStores[storeIndex]
c.traceStateReads("get_last", storeIndex, found, key)
return readStore.GetLast(key)
}

func (c *Call) DoHasLast(storeIndex int, key string) (found bool) {
defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now()))
now := time.Now()
defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }()
c.validateStoreIndex(storeIndex, "has_last")
readStore := c.inputStores[storeIndex]
c.traceStateReads("has_last", storeIndex, found, key)
Expand Down
8 changes: 6 additions & 2 deletions wasm/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"context"
)

type callCtxType string

const callCtx = callCtxType("call")

func WithContext(ctx context.Context, call *Call) context.Context {
return context.WithValue(ctx, "call", call)
return context.WithValue(ctx, callCtx, call)
}

func FromContext(ctx context.Context) *Call {
return ctx.Value("call").(*Call)
return ctx.Value(callCtx).(*Call)
}
2 changes: 1 addition & 1 deletion wasm/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import (
"github.com/streamingfast/logging"
)

var zlog, tracer = logging.PackageLogger("wasm-runtime", "github.com/streamingfast/substreams/wasm")
var zlog, _ = logging.PackageLogger("wasm-runtime", "github.com/streamingfast/substreams/wasm")

0 comments on commit edf62b4

Please sign in to comment.