diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index b894d2130..ac255b18a 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * fix a possible panic() when an request is interrupted during the file loading phase of a squashing operation. * fix a rare possibility of stalling if only some fullkv stores caches were deleted, but further segments were still present. +* fix stats counters for store operations time ## v1.5.3 diff --git a/wasm/call.go b/wasm/call.go index 4d96ae52c..ef631b133 100644 --- a/wasm/call.go +++ b/wasm/call.go @@ -117,36 +117,41 @@ func (c *Call) ReachedLogsMaxByteCount() bool { } func (c *Call) DoSet(ord uint64, key string, value []byte) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateSimple("set", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET, key) c.outputStore.SetBytes(ord, key, value) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetIfNotExists(ord uint64, key string, value []byte) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateSimple("set_if_not_exists", pbsubstreams.Module_KindStore_UPDATE_POLICY_SET_IF_NOT_EXISTS, key) c.outputStore.SetBytesIfNotExists(ord, key, value) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoAppend(ord uint64, key string, value []byte) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateSimple("append", pbsubstreams.Module_KindStore_UPDATE_POLICY_APPEND, key) if err := c.outputStore.Append(ord, key, value); err != nil { c.ReturnError(fmt.Errorf("appending to store: %w", err)) } + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoDeletePrefix(ord uint64, prefix string) { - defer c.stats.RecordModuleWasmStoreDeletePrefix(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.traceStateWrites("delete_prefix", prefix) c.outputStore.DeletePrefix(ord, prefix) + c.stats.RecordModuleWasmStoreDeletePrefix(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoAddBigInt(ord uint64, key string, value string) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("add_bigint", pbsubstreams.Module_KindStore_UPDATE_POLICY_ADD, "bigint", key) toAdd, _ := new(big.Int).SetString(value, 10) c.outputStore.SumBigInt(ord, key, toAdd) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoAddBigDecimal(ord uint64, key string, value string) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithTwoValueTypes("add_bigdecimal", pbsubstreams.Module_KindStore_UPDATE_POLICY_ADD, "bigdecimal", "bigfloat", key) toAdd, err := decimal.NewFromString(string(value)) @@ -154,71 +159,82 @@ func (c *Call) DoAddBigDecimal(ord uint64, key string, value string) { c.ReturnError(fmt.Errorf("parsing bigdecimal: %w", err)) } c.outputStore.SumBigDecimal(ord, key, toAdd.Truncate(34)) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoAddInt64(ord uint64, key string, value int64) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("add_int64", pbsubstreams.Module_KindStore_UPDATE_POLICY_ADD, "int64", key) c.outputStore.SumInt64(ord, key, value) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoAddFloat64(ord uint64, key string, value float64) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("add_float64", pbsubstreams.Module_KindStore_UPDATE_POLICY_ADD, "float64", key) c.outputStore.SumFloat64(ord, key, value) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetMinInt64(ord uint64, key string, value int64) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("set_min_int64", pbsubstreams.Module_KindStore_UPDATE_POLICY_MIN, "int64", key) c.outputStore.SetMinInt64(ord, key, value) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetMinBigInt(ord uint64, key string, value string) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("set_min_bigint", pbsubstreams.Module_KindStore_UPDATE_POLICY_MIN, "bigint", key) toSet, _ := new(big.Int).SetString(value, 10) c.outputStore.SetMinBigInt(ord, key, toSet) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetMinFloat64(ord uint64, key string, value float64) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("set_min_float64", pbsubstreams.Module_KindStore_UPDATE_POLICY_MIN, "float64", key) c.outputStore.SetMinFloat64(ord, key, value) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetMinBigDecimal(ord uint64, key string, value string) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithTwoValueTypes("set_min_bigdecimal", pbsubstreams.Module_KindStore_UPDATE_POLICY_MIN, "bigdecimal", "bigfloat", key) toAdd, err := decimal.NewFromString(value) if err != nil { c.ReturnError(fmt.Errorf("parsing bigdecimal: %w", err)) } c.outputStore.SetMinBigDecimal(ord, key, toAdd.Truncate(34)) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetMaxInt64(ord uint64, key string, value int64) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("set_max_int64", pbsubstreams.Module_KindStore_UPDATE_POLICY_MAX, "int64", key) c.outputStore.SetMaxInt64(ord, key, value) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetMaxBigInt(ord uint64, key string, value string) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("set_max_bigint", pbsubstreams.Module_KindStore_UPDATE_POLICY_MAX, "bigint", key) toSet, _ := new(big.Int).SetString(value, 10) c.outputStore.SetMaxBigInt(ord, key, toSet) - + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetMaxFloat64(ord uint64, key string, value float64) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithValueType("set_max_float64", pbsubstreams.Module_KindStore_UPDATE_POLICY_MAX, "float64", key) c.outputStore.SetMaxFloat64(ord, key, value) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoSetMaxBigDecimal(ord uint64, key string, value string) { - defer c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(time.Now())) + now := time.Now() c.validateWithTwoValueTypes("set_max_bigdecimal", pbsubstreams.Module_KindStore_UPDATE_POLICY_MAX, "bigdecimal", "bigfloat", key) toAdd, err := decimal.NewFromString(value) if err != nil { c.ReturnError(fmt.Errorf("parsing bigdecimal: %w", err)) } c.outputStore.SetMaxBigDecimal(ord, key, toAdd.Truncate(34)) + c.stats.RecordModuleWasmStoreWrite(c.ModuleName, c.outputStore.SizeBytes(), time.Since(now)) } func (c *Call) DoGetAt(storeIndex int, ord uint64, key string) (value []byte, found bool) { - defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now())) + now := time.Now() + defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }() c.validateStoreIndex(storeIndex, "get_at") readStore := c.inputStores[storeIndex] c.traceStateReads("get_at", storeIndex, found, key) @@ -226,7 +242,8 @@ func (c *Call) DoGetAt(storeIndex int, ord uint64, key string) (value []byte, fo } func (c *Call) DoHasAt(storeIndex int, ord uint64, key string) (found bool) { - defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now())) + now := time.Now() + defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }() c.validateStoreIndex(storeIndex, "has_at") readStore := c.inputStores[storeIndex] c.traceStateReads("has_at", storeIndex, found, key) @@ -234,7 +251,8 @@ func (c *Call) DoHasAt(storeIndex int, ord uint64, key string) (found bool) { } func (c *Call) DoGetFirst(storeIndex int, key string) (value []byte, found bool) { - defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now())) + now := time.Now() + defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }() c.validateStoreIndex(storeIndex, "get_first") readStore := c.inputStores[storeIndex] c.traceStateReads("get_first", storeIndex, found, key) @@ -242,7 +260,8 @@ func (c *Call) DoGetFirst(storeIndex int, key string) (value []byte, found bool) } func (c *Call) DoHasFirst(storeIndex int, key string) (found bool) { - defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now())) + now := time.Now() + defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }() c.validateStoreIndex(storeIndex, "has_first") readStore := c.inputStores[storeIndex] c.traceStateReads("has_first", storeIndex, found, key) @@ -250,7 +269,8 @@ func (c *Call) DoHasFirst(storeIndex int, key string) (found bool) { } func (c *Call) DoGetLast(storeIndex int, key string) (value []byte, found bool) { - defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now())) + now := time.Now() + defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }() c.validateStoreIndex(storeIndex, "get_last") readStore := c.inputStores[storeIndex] c.traceStateReads("get_last", storeIndex, found, key) @@ -258,7 +278,8 @@ func (c *Call) DoGetLast(storeIndex int, key string) (value []byte, found bool) } func (c *Call) DoHasLast(storeIndex int, key string) (found bool) { - defer c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(time.Now())) + now := time.Now() + defer func() { c.stats.RecordModuleWasmStoreRead(c.ModuleName, time.Since(now)) }() c.validateStoreIndex(storeIndex, "has_last") readStore := c.inputStores[storeIndex] c.traceStateReads("has_last", storeIndex, found, key) diff --git a/wasm/context.go b/wasm/context.go index 5518149e9..06760c430 100644 --- a/wasm/context.go +++ b/wasm/context.go @@ -4,10 +4,14 @@ import ( "context" ) +type callCtxType string + +const callCtx = callCtxType("call") + func WithContext(ctx context.Context, call *Call) context.Context { - return context.WithValue(ctx, "call", call) + return context.WithValue(ctx, callCtx, call) } func FromContext(ctx context.Context) *Call { - return ctx.Value("call").(*Call) + return ctx.Value(callCtx).(*Call) } diff --git a/wasm/logging.go b/wasm/logging.go index ef6606d8f..22b38e205 100644 --- a/wasm/logging.go +++ b/wasm/logging.go @@ -4,4 +4,4 @@ import ( "github.com/streamingfast/logging" ) -var zlog, tracer = logging.PackageLogger("wasm-runtime", "github.com/streamingfast/substreams/wasm") +var zlog, _ = logging.PackageLogger("wasm-runtime", "github.com/streamingfast/substreams/wasm")