Skip to content

Commit

Permalink
Merge branch 'optimize-exception' into 'master'
Browse files Browse the repository at this point in the history
feat: optimize the json ext parse

See merge request !29
  • Loading branch information
absolute8511 committed Nov 13, 2020
2 parents 8ddfdee + 06d6de7 commit 386719b
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 116 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/golang/snappy v0.0.1
github.com/gorilla/sessions v1.1.3
github.com/hashicorp/golang-lru v0.5.3
github.com/json-iterator/go v1.1.10
github.com/judwhite/go-svc v1.0.0
github.com/julienschmidt/httprouter v1.2.0
github.com/kr/pretty v0.2.0 // indirect
Expand All @@ -35,6 +36,7 @@ require (
github.com/tidwall/gjson v1.1.3
github.com/tidwall/match v1.0.1 // indirect
github.com/twinj/uuid v1.0.0
github.com/valyala/fastjson v1.6.1
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8
github.com/wendal/errors v0.0.0-20181209125328-7f31f4b264ec // indirect
github.com/youzan/go-nsq v1.7.2-HA
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ github.com/gorilla/sessions v1.1.3/go.mod h1:8KCfur6+4Mqcc6S0FEfKuN15Vl5MgXW92AE
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/judwhite/go-svc v1.0.0 h1:W447kYhZsqC14hkfNG8XLy9wbYibeMW75g5DtAIpFGw=
github.com/judwhite/go-svc v1.0.0/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE=
Expand All @@ -130,8 +131,10 @@ github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mreiferson/go-options v0.0.0-20161229190002-77551d20752b h1:xjKomx939vefURtocD1uaKvcvAp1dNYX05i0TIpnfVI=
github.com/mreiferson/go-options v0.0.0-20161229190002-77551d20752b/go.mod h1:A0JOgZNsj9V+npbgxH0Ib75PvrHS6Ezri/4HdcTp/DI=
Expand Down Expand Up @@ -182,6 +185,8 @@ github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/twinj/uuid v1.0.0 h1:fzz7COZnDrXGTAOHGuUGYd6sG+JMq+AoE7+Jlu0przk=
github.com/twinj/uuid v1.0.0/go.mod h1:mMgcE1RHFUFqe5AfiwlINXisXfDGro23fWdPUfOMjRY=
github.com/valyala/fastjson v1.6.1 h1:qJs/Kz/HebWzk8LmhOrSm7kdOyJBr1XB+zSkYtEEfQE=
github.com/valyala/fastjson v1.6.1/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8 h1:EVObHAr8DqpoJCVv6KYTle8FEImKhtkfcZetNqxDoJQ=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc=
Expand Down
23 changes: 6 additions & 17 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/youzan/nsq/internal/protocol"

simpleJson "github.com/bitly/go-simplejson"
"github.com/youzan/nsq/internal/ext"
"github.com/youzan/nsq/internal/levellogger"
"github.com/youzan/nsq/internal/quantile"
Expand Down Expand Up @@ -2212,20 +2211,12 @@ exit:
func (c *Channel) shouldSkipZanTest(msg *Message) bool {
if c.IsZanTestSkipped() && msg.ExtVer == ext.JSON_HEADER_EXT_VER {
//check if zan_test header contained in json header
extHeader, err := simpleJson.NewJson(msg.ExtBytes)
extHeader, err := NewJsonExt(msg.ExtBytes)
if err != nil {
return false
}
if flag, exist := extHeader.CheckGet(ext.ZAN_TEST_KEY); exist {
tb, err := flag.Bool()
if err != nil {
ts, _ := flag.String()
if ts != "" {
tb, _ = strconv.ParseBool(ts)
}
}
return tb
}
zanTest, _ := extHeader.GetBoolOrStringBool(ext.ZAN_TEST_KEY)
return zanTest
}
return false
}
Expand All @@ -2237,12 +2228,10 @@ func parseTagIfAny(msg *Message) (string, error) {
case ext.TAG_EXT_VER:
msgTag = string(msg.ExtBytes)
case ext.JSON_HEADER_EXT_VER:
var jsonExt *simpleJson.Json
jsonExt, err = simpleJson.NewJson(msg.ExtBytes)
var extHeader IJsonExt
extHeader, err = NewJsonExt(msg.ExtBytes)
if err == nil {
if tagJson, exist := jsonExt.CheckGet(ext.CLIENT_DISPATCH_TAG_KEY); exist {
msgTag, err = tagJson.String()
}
msgTag, _ = extHeader.GetString(ext.CLIENT_DISPATCH_TAG_KEY)
} else if len(msg.ExtBytes) == 0 {
err = nil
msgTag = ""
Expand Down
10 changes: 5 additions & 5 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,8 @@ func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) {
opts.MaxRdyCount = 100
opts.MaxConfirmWin = 10
opts.Logger = newTestLogger(t)
opts.MsgTimeout = 100 * time.Millisecond
// use large to delay the period scan
opts.QueueScanRefreshInterval = 10 * time.Second
opts.MsgTimeout = time.Second * 3
opts.QueueScanRefreshInterval = time.Second / 10
opts.QueueScanInterval = time.Millisecond * 100
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
Expand All @@ -372,7 +371,7 @@ func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) {
lastDelay := time.Now()
for time.Since(start) < time.Second*5 {
select {
case <-time.After(time.Second):
case <-time.After(time.Second * 2):
timeout++
case outputMsg, ok := <-channel.clientMsgChan:
if !ok {
Expand All @@ -393,7 +392,8 @@ func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) {
// requeue with different timeout to make sure the memory deferred cnt is high
// since after timeout deferred cnt will be reset
lastDelay = lastDelay.Add(time.Millisecond * 101)
delay := time.Since(lastDelay)
delay := lastDelay.Sub(now)
t.Logf("consume %v delay to %s, %s", outputMsg.ID, lastDelay, delay)
channel.RequeueMessage(1, "", outputMsg.ID, delay, false)
}
}
Expand Down
179 changes: 179 additions & 0 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"errors"
"fmt"
"io"
"strconv"
"sync/atomic"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/valyala/fastjson"
"github.com/youzan/nsq/internal/ext"
)

Expand Down Expand Up @@ -511,3 +514,179 @@ func DecodeDelayedMessage(b []byte, isExt bool) (*Message, error) {
msg.Body = b[pos:]
return &msg, nil
}

var errJsonKeyNotFound = errors.New("json key not found")
var errJsonTypeNotMatch = errors.New("json value type not match")

func IsNotFoundJsonKey(err error) bool {
return errJsonKeyNotFound == err
}

type IJsonExt interface {
KeysCheck(func(string) bool)
GetString(string) (string, error)
GetBool(string) (bool, error)
GetBoolOrStringBool(string) (bool, error)
}

func NewJsonExt(b []byte) (IJsonExt, error) {
return newJsonExtObjV2(b)
//return &JsonExtObj{
// jsonExt: jsoniter.Get(b),
//}, nil
}

type JsonExtObj struct {
jsonExt jsoniter.Any
}

func (jeo *JsonExtObj) KeysCheck(ck func(key string) bool) {
if jeo.jsonExt == nil {
return
}
if jeo.jsonExt.LastError() != nil {
return
}
keys := jeo.jsonExt.Keys()
for _, k := range keys {
run := ck(k)
if !run {
break
}
}
}

func (jeo *JsonExtObj) GetString(key string) (string, error) {
if jeo.jsonExt == nil {
return "", errJsonKeyNotFound
}
if jeo.jsonExt.LastError() != nil {
return "", jeo.jsonExt.LastError()
}
extHeader := jeo.jsonExt.Get(key)
if extHeader.LastError() != nil {
return "", errJsonKeyNotFound
}
if extHeader.ValueType() == jsoniter.StringValue {
return extHeader.ToString(), nil
}
return "", errJsonTypeNotMatch
}

func (jeo *JsonExtObj) GetBool(key string) (bool, error) {
if jeo.jsonExt == nil {
return false, errJsonKeyNotFound
}
if jeo.jsonExt.LastError() != nil {
return false, jeo.jsonExt.LastError()
}
extHeader := jeo.jsonExt.Get(key)
if extHeader.LastError() != nil {
return false, errJsonKeyNotFound
}
if extHeader.ValueType() == jsoniter.BoolValue {
return extHeader.ToBool(), nil
}
return false, errJsonTypeNotMatch
}

func (jeo *JsonExtObj) GetBoolOrStringBool(key string) (bool, error) {
if jeo.jsonExt == nil {
return false, errJsonKeyNotFound
}
if jeo.jsonExt.LastError() != nil {
return false, jeo.jsonExt.LastError()
}
extHeader := jeo.jsonExt.Get(key)
if extHeader.LastError() != nil {
return false, errJsonKeyNotFound
}
if extHeader.ValueType() == jsoniter.BoolValue {
return extHeader.ToBool(), nil
}
if extHeader.ValueType() == jsoniter.StringValue {
ts := extHeader.ToString()
tb := false
if ts != "" {
tb, _ = strconv.ParseBool(ts)
}
return tb, nil
}
return false, errJsonTypeNotMatch
}

type jsonExtObjV2 struct {
jsonExt *fastjson.Value
}

func newJsonExtObjV2(b []byte) (IJsonExt, error) {
v, err := fastjson.ParseBytes(b)
if err != nil {
return nil, err
}
return &jsonExtObjV2{
jsonExt: v,
}, nil
}
func (jeo *jsonExtObjV2) KeysCheck(ck func(string) bool) {
if jeo.jsonExt == nil {
return
}
o, err := jeo.jsonExt.Object()
if err != nil {
return
}
o.Visit(func(k []byte, v *fastjson.Value) {
ck(string(k))
})
}

func (jeo *jsonExtObjV2) GetString(key string) (string, error) {
if jeo.jsonExt == nil {
return "", errJsonKeyNotFound
}
extHeader := jeo.jsonExt.Get(key)
if extHeader == nil {
return "", errJsonKeyNotFound
}
if extHeader.Type() != fastjson.TypeString {
return "", errJsonTypeNotMatch
}
v, err := extHeader.StringBytes()
return string(v), err
}

func (jeo *jsonExtObjV2) GetBool(key string) (bool, error) {
if jeo.jsonExt == nil {
return false, errJsonKeyNotFound
}
extHeader := jeo.jsonExt.Get(key)
if extHeader == nil {
return false, errJsonKeyNotFound
}
if extHeader.Type() != fastjson.TypeTrue && extHeader.Type() != fastjson.TypeFalse {
return false, errJsonTypeNotMatch
}
return extHeader.GetBool(), nil
}

func (jeo *jsonExtObjV2) GetBoolOrStringBool(key string) (bool, error) {
if jeo.jsonExt == nil {
return false, errJsonKeyNotFound
}
extHeader := jeo.jsonExt.Get(key)
if extHeader == nil {
return false, errJsonKeyNotFound
}

if extHeader.Type() == fastjson.TypeString {
ts, _ := extHeader.StringBytes()
tb := false
if len(ts) > 0 {
tb, _ = strconv.ParseBool(string(ts))
}
return tb, nil
}

return extHeader.GetBool(), nil
}
29 changes: 29 additions & 0 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/bitly/go-simplejson"
jsoniter "github.com/json-iterator/go"
"github.com/youzan/nsq/internal/http_api"
"github.com/youzan/nsq/internal/levellogger"
)
Expand Down Expand Up @@ -550,3 +551,31 @@ func TestLoadTopicChannel(t *testing.T) {
t.Errorf("should closed this channel after reload")
}
}

func BenchmarkJsonExtV1(b *testing.B) {
jb := []byte("{\"key\":\"true\", \"##key\":\"v1\", \"keybool\":true}")
b.ReportAllocs()
for i := 0; i < b.N; i++ {
extHeader := &JsonExtObj{
jsonExt: jsoniter.Get(jb),
}
extHeader.GetBoolOrStringBool("key")
extHeader.GetBoolOrStringBool("keybool")
extHeader.GetBoolOrStringBool("keynoexsit")
extHeader.GetString("key")
extHeader.GetString("keynoexsit")
}
}

func BenchmarkJsonExtV2(b *testing.B) {
jb := []byte("{\"key\":\"true\", \"##key\":\"v1\", \"keybool\":true}")
b.ReportAllocs()
for i := 0; i < b.N; i++ {
extHeader, _ := newJsonExtObjV2(jb)
extHeader.GetBoolOrStringBool("key")
extHeader.GetBoolOrStringBool("keybool")
extHeader.GetBoolOrStringBool("keynoexsit")
extHeader.GetString("key")
extHeader.GetString("keynoexsit")
}
}
Loading

0 comments on commit 386719b

Please sign in to comment.