diff --git a/go.mod b/go.mod index ab5cf76d..039a7285 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/golang/snappy v0.0.1 github.com/gorilla/sessions v1.1.3 github.com/hashicorp/golang-lru v0.5.3 + github.com/json-iterator/go v1.1.10 github.com/judwhite/go-svc v1.0.0 github.com/julienschmidt/httprouter v1.2.0 github.com/kr/pretty v0.2.0 // indirect @@ -35,6 +36,7 @@ require ( github.com/tidwall/gjson v1.1.3 github.com/tidwall/match v1.0.1 // indirect github.com/twinj/uuid v1.0.0 + github.com/valyala/fastjson v1.6.1 github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8 github.com/wendal/errors v0.0.0-20181209125328-7f31f4b264ec // indirect github.com/youzan/go-nsq v1.7.2-HA diff --git a/go.sum b/go.sum index 597e0dba..f37ecc78 100644 --- a/go.sum +++ b/go.sum @@ -110,6 +110,7 @@ github.com/gorilla/sessions v1.1.3/go.mod h1:8KCfur6+4Mqcc6S0FEfKuN15Vl5MgXW92AE github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/judwhite/go-svc v1.0.0 h1:W447kYhZsqC14hkfNG8XLy9wbYibeMW75g5DtAIpFGw= github.com/judwhite/go-svc v1.0.0/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE= @@ -130,8 +131,10 @@ github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mreiferson/go-options v0.0.0-20161229190002-77551d20752b h1:xjKomx939vefURtocD1uaKvcvAp1dNYX05i0TIpnfVI= github.com/mreiferson/go-options v0.0.0-20161229190002-77551d20752b/go.mod h1:A0JOgZNsj9V+npbgxH0Ib75PvrHS6Ezri/4HdcTp/DI= @@ -182,6 +185,8 @@ github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/twinj/uuid v1.0.0 h1:fzz7COZnDrXGTAOHGuUGYd6sG+JMq+AoE7+Jlu0przk= github.com/twinj/uuid v1.0.0/go.mod h1:mMgcE1RHFUFqe5AfiwlINXisXfDGro23fWdPUfOMjRY= +github.com/valyala/fastjson v1.6.1 h1:qJs/Kz/HebWzk8LmhOrSm7kdOyJBr1XB+zSkYtEEfQE= +github.com/valyala/fastjson v1.6.1/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8 h1:EVObHAr8DqpoJCVv6KYTle8FEImKhtkfcZetNqxDoJQ= github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE= github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc= diff --git a/nsqd/channel.go b/nsqd/channel.go index f1dc6549..d3be11a4 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -15,7 +15,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/youzan/nsq/internal/protocol" - simpleJson "github.com/bitly/go-simplejson" "github.com/youzan/nsq/internal/ext" "github.com/youzan/nsq/internal/levellogger" "github.com/youzan/nsq/internal/quantile" @@ -2212,20 +2211,12 @@ exit: func (c *Channel) shouldSkipZanTest(msg *Message) bool { if c.IsZanTestSkipped() && msg.ExtVer == ext.JSON_HEADER_EXT_VER { //check if zan_test header contained in json header - extHeader, err := simpleJson.NewJson(msg.ExtBytes) + extHeader, err := NewJsonExt(msg.ExtBytes) if err != nil { return false } - if flag, exist := extHeader.CheckGet(ext.ZAN_TEST_KEY); exist { - tb, err := flag.Bool() - if err != nil { - ts, _ := flag.String() - if ts != "" { - tb, _ = strconv.ParseBool(ts) - } - } - return tb - } + zanTest, _ := extHeader.GetBoolOrStringBool(ext.ZAN_TEST_KEY) + return zanTest } return false } @@ -2237,12 +2228,10 @@ func parseTagIfAny(msg *Message) (string, error) { case ext.TAG_EXT_VER: msgTag = string(msg.ExtBytes) case ext.JSON_HEADER_EXT_VER: - var jsonExt *simpleJson.Json - jsonExt, err = simpleJson.NewJson(msg.ExtBytes) + var extHeader IJsonExt + extHeader, err = NewJsonExt(msg.ExtBytes) if err == nil { - if tagJson, exist := jsonExt.CheckGet(ext.CLIENT_DISPATCH_TAG_KEY); exist { - msgTag, err = tagJson.String() - } + msgTag, _ = extHeader.GetString(ext.CLIENT_DISPATCH_TAG_KEY) } else if len(msg.ExtBytes) == 0 { err = nil msgTag = "" diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index df8cec46..f6222c2f 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -347,9 +347,8 @@ func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) { opts.MaxRdyCount = 100 opts.MaxConfirmWin = 10 opts.Logger = newTestLogger(t) - opts.MsgTimeout = 100 * time.Millisecond - // use large to delay the period scan - opts.QueueScanRefreshInterval = 10 * time.Second + opts.MsgTimeout = time.Second * 3 + opts.QueueScanRefreshInterval = time.Second / 10 opts.QueueScanInterval = time.Millisecond * 100 _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) @@ -372,7 +371,7 @@ func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) { lastDelay := time.Now() for time.Since(start) < time.Second*5 { select { - case <-time.After(time.Second): + case <-time.After(time.Second * 2): timeout++ case outputMsg, ok := <-channel.clientMsgChan: if !ok { @@ -393,7 +392,8 @@ func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) { // requeue with different timeout to make sure the memory deferred cnt is high // since after timeout deferred cnt will be reset lastDelay = lastDelay.Add(time.Millisecond * 101) - delay := time.Since(lastDelay) + delay := lastDelay.Sub(now) + t.Logf("consume %v delay to %s, %s", outputMsg.ID, lastDelay, delay) channel.RequeueMessage(1, "", outputMsg.ID, delay, false) } } diff --git a/nsqd/message.go b/nsqd/message.go index b32badb2..2bfd0837 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "io" + "strconv" "sync/atomic" "time" + jsoniter "github.com/json-iterator/go" + "github.com/valyala/fastjson" "github.com/youzan/nsq/internal/ext" ) @@ -511,3 +514,179 @@ func DecodeDelayedMessage(b []byte, isExt bool) (*Message, error) { msg.Body = b[pos:] return &msg, nil } + +var errJsonKeyNotFound = errors.New("json key not found") +var errJsonTypeNotMatch = errors.New("json value type not match") + +func IsNotFoundJsonKey(err error) bool { + return errJsonKeyNotFound == err +} + +type IJsonExt interface { + KeysCheck(func(string) bool) + GetString(string) (string, error) + GetBool(string) (bool, error) + GetBoolOrStringBool(string) (bool, error) +} + +func NewJsonExt(b []byte) (IJsonExt, error) { + return newJsonExtObjV2(b) + //return &JsonExtObj{ + // jsonExt: jsoniter.Get(b), + //}, nil +} + +type JsonExtObj struct { + jsonExt jsoniter.Any +} + +func (jeo *JsonExtObj) KeysCheck(ck func(key string) bool) { + if jeo.jsonExt == nil { + return + } + if jeo.jsonExt.LastError() != nil { + return + } + keys := jeo.jsonExt.Keys() + for _, k := range keys { + run := ck(k) + if !run { + break + } + } +} + +func (jeo *JsonExtObj) GetString(key string) (string, error) { + if jeo.jsonExt == nil { + return "", errJsonKeyNotFound + } + if jeo.jsonExt.LastError() != nil { + return "", jeo.jsonExt.LastError() + } + extHeader := jeo.jsonExt.Get(key) + if extHeader.LastError() != nil { + return "", errJsonKeyNotFound + } + if extHeader.ValueType() == jsoniter.StringValue { + return extHeader.ToString(), nil + } + return "", errJsonTypeNotMatch +} + +func (jeo *JsonExtObj) GetBool(key string) (bool, error) { + if jeo.jsonExt == nil { + return false, errJsonKeyNotFound + } + if jeo.jsonExt.LastError() != nil { + return false, jeo.jsonExt.LastError() + } + extHeader := jeo.jsonExt.Get(key) + if extHeader.LastError() != nil { + return false, errJsonKeyNotFound + } + if extHeader.ValueType() == jsoniter.BoolValue { + return extHeader.ToBool(), nil + } + return false, errJsonTypeNotMatch +} + +func (jeo *JsonExtObj) GetBoolOrStringBool(key string) (bool, error) { + if jeo.jsonExt == nil { + return false, errJsonKeyNotFound + } + if jeo.jsonExt.LastError() != nil { + return false, jeo.jsonExt.LastError() + } + extHeader := jeo.jsonExt.Get(key) + if extHeader.LastError() != nil { + return false, errJsonKeyNotFound + } + if extHeader.ValueType() == jsoniter.BoolValue { + return extHeader.ToBool(), nil + } + if extHeader.ValueType() == jsoniter.StringValue { + ts := extHeader.ToString() + tb := false + if ts != "" { + tb, _ = strconv.ParseBool(ts) + } + return tb, nil + } + return false, errJsonTypeNotMatch +} + +type jsonExtObjV2 struct { + jsonExt *fastjson.Value +} + +func newJsonExtObjV2(b []byte) (IJsonExt, error) { + v, err := fastjson.ParseBytes(b) + if err != nil { + return nil, err + } + return &jsonExtObjV2{ + jsonExt: v, + }, nil +} +func (jeo *jsonExtObjV2) KeysCheck(ck func(string) bool) { + if jeo.jsonExt == nil { + return + } + o, err := jeo.jsonExt.Object() + if err != nil { + return + } + o.Visit(func(k []byte, v *fastjson.Value) { + ck(string(k)) + }) +} + +func (jeo *jsonExtObjV2) GetString(key string) (string, error) { + if jeo.jsonExt == nil { + return "", errJsonKeyNotFound + } + extHeader := jeo.jsonExt.Get(key) + if extHeader == nil { + return "", errJsonKeyNotFound + } + if extHeader.Type() != fastjson.TypeString { + return "", errJsonTypeNotMatch + } + v, err := extHeader.StringBytes() + return string(v), err +} + +func (jeo *jsonExtObjV2) GetBool(key string) (bool, error) { + if jeo.jsonExt == nil { + return false, errJsonKeyNotFound + } + extHeader := jeo.jsonExt.Get(key) + if extHeader == nil { + return false, errJsonKeyNotFound + } + if extHeader.Type() != fastjson.TypeTrue && extHeader.Type() != fastjson.TypeFalse { + return false, errJsonTypeNotMatch + } + return extHeader.GetBool(), nil +} + +func (jeo *jsonExtObjV2) GetBoolOrStringBool(key string) (bool, error) { + if jeo.jsonExt == nil { + return false, errJsonKeyNotFound + } + extHeader := jeo.jsonExt.Get(key) + if extHeader == nil { + return false, errJsonKeyNotFound + } + + if extHeader.Type() == fastjson.TypeString { + ts, _ := extHeader.StringBytes() + tb := false + if len(ts) > 0 { + tb, _ = strconv.ParseBool(string(ts)) + } + return tb, nil + } + + return extHeader.GetBool(), nil +} diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index e885a640..d7f93f61 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -17,6 +17,7 @@ import ( "time" "github.com/bitly/go-simplejson" + jsoniter "github.com/json-iterator/go" "github.com/youzan/nsq/internal/http_api" "github.com/youzan/nsq/internal/levellogger" ) @@ -550,3 +551,31 @@ func TestLoadTopicChannel(t *testing.T) { t.Errorf("should closed this channel after reload") } } + +func BenchmarkJsonExtV1(b *testing.B) { + jb := []byte("{\"key\":\"true\", \"##key\":\"v1\", \"keybool\":true}") + b.ReportAllocs() + for i := 0; i < b.N; i++ { + extHeader := &JsonExtObj{ + jsonExt: jsoniter.Get(jb), + } + extHeader.GetBoolOrStringBool("key") + extHeader.GetBoolOrStringBool("keybool") + extHeader.GetBoolOrStringBool("keynoexsit") + extHeader.GetString("key") + extHeader.GetString("keynoexsit") + } +} + +func BenchmarkJsonExtV2(b *testing.B) { + jb := []byte("{\"key\":\"true\", \"##key\":\"v1\", \"keybool\":true}") + b.ReportAllocs() + for i := 0; i < b.N; i++ { + extHeader, _ := newJsonExtObjV2(jb) + extHeader.GetBoolOrStringBool("key") + extHeader.GetBoolOrStringBool("keybool") + extHeader.GetBoolOrStringBool("keynoexsit") + extHeader.GetString("key") + extHeader.GetString("keynoexsit") + } +} diff --git a/nsqd/stats.go b/nsqd/stats.go index 27dcdfdc..75c71c4c 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -23,25 +23,25 @@ const ( ) type TopicStats struct { - TopicName string `json:"topic_name"` - TopicFullName string `json:"topic_full_name"` - TopicPartition string `json:"topic_partition"` - Channels []ChannelStats `json:"channels"` - Depth int64 `json:"depth"` - BackendDepth int64 `json:"backend_depth"` - BackendStart int64 `json:"backend_start"` - MessageCount uint64 `json:"message_count"` - IsLeader bool `json:"is_leader"` - HourlyPubSize int64 `json:"hourly_pubsize"` - Clients []ClientPubStats `json:"client_pub_stats"` - MsgSizeStats []int64 `json:"msg_size_stats"` - MsgWriteLatencyStats []int64 `json:"msg_write_latency_stats"` - IsMultiOrdered bool `json:"is_multi_ordered"` - IsMultiPart bool `json:"is_multi_part"` - IsExt bool `json:"is_ext"` - IsChannelAutoCreateDisabled bool `json:"is_channel_auto_create_disabled"` - StatsdName string `json:"statsd_name"` - PubFailedCnt int64 `json:"pub_failed_cnt"` + TopicName string `json:"topic_name"` + TopicFullName string `json:"topic_full_name"` + TopicPartition string `json:"topic_partition"` + Channels []ChannelStats `json:"channels"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + BackendStart int64 `json:"backend_start"` + MessageCount uint64 `json:"message_count"` + IsLeader bool `json:"is_leader"` + HourlyPubSize int64 `json:"hourly_pubsize"` + Clients []ClientPubStats `json:"client_pub_stats"` + MsgSizeStats []int64 `json:"msg_size_stats"` + MsgWriteLatencyStats []int64 `json:"msg_write_latency_stats"` + IsMultiOrdered bool `json:"is_multi_ordered"` + IsMultiPart bool `json:"is_multi_part"` + IsExt bool `json:"is_ext"` + IsChannelAutoCreateDisabled bool `json:"is_channel_auto_create_disabled"` + StatsdName string `json:"statsd_name"` + PubFailedCnt int64 `json:"pub_failed_cnt"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` } @@ -56,24 +56,24 @@ func NewTopicStats(t *Topic, channels []ChannelStats, filterClients bool) TopicS clients = t.detailStats.GetPubClientStats() } return TopicStats{ - TopicName: t.GetTopicName(), - TopicFullName: t.GetFullName(), - TopicPartition: strconv.Itoa(t.GetTopicPart()), - Channels: channels, - Depth: t.TotalDataSize(), - BackendDepth: t.TotalDataSize(), - BackendStart: t.GetQueueReadStart(), - MessageCount: t.TotalMessageCnt(), - IsLeader: !t.IsWriteDisabled(), - Clients: clients, - MsgSizeStats: t.detailStats.GetMsgSizeStats(), - MsgWriteLatencyStats: t.detailStats.GetMsgWriteLatencyStats(), - IsMultiOrdered: t.IsOrdered(), - IsMultiPart: t.GetDynamicInfo().MultiPart, - IsExt: t.IsExt(), - IsChannelAutoCreateDisabled: t.IsChannelAutoCreateDisabled(), - PubFailedCnt: t.PubFailed(), - StatsdName: statsdName, + TopicName: t.GetTopicName(), + TopicFullName: t.GetFullName(), + TopicPartition: strconv.Itoa(t.GetTopicPart()), + Channels: channels, + Depth: t.TotalDataSize(), + BackendDepth: t.TotalDataSize(), + BackendStart: t.GetQueueReadStart(), + MessageCount: t.TotalMessageCnt(), + IsLeader: !t.IsWriteDisabled(), + Clients: clients, + MsgSizeStats: t.detailStats.GetMsgSizeStats(), + MsgWriteLatencyStats: t.detailStats.GetMsgWriteLatencyStats(), + IsMultiOrdered: t.IsOrdered(), + IsMultiPart: t.GetDynamicInfo().MultiPart, + IsExt: t.IsExt(), + IsChannelAutoCreateDisabled: t.IsChannelAutoCreateDisabled(), + PubFailedCnt: t.PubFailed(), + StatsdName: statsdName, E2eProcessingLatency: t.AggregateChannelE2eProcessingLatency().Result(), } @@ -423,7 +423,7 @@ func (self *TopicMsgStatsInfo) UpdateMsgSizeStats(msgSize int64) { bucket = len(self.MsgSizeStats) - 1 } atomic.AddInt64(&self.MsgSizeStats[bucket], 1) - if msgSize >= 100 { + if msgSize >= 1024 { TopicWriteByteSize.With(prometheus.Labels{ "topic": self.topicName, "partition": self.topicPart, diff --git a/nsqdserver/protocol_v2.go b/nsqdserver/protocol_v2.go index f33366e3..481d6640 100644 --- a/nsqdserver/protocol_v2.go +++ b/nsqdserver/protocol_v2.go @@ -16,7 +16,6 @@ import ( "time" "unsafe" - simpleJson "github.com/bitly/go-simplejson" ps "github.com/prometheus/client_golang/prometheus" "github.com/youzan/nsq/consistence" "github.com/youzan/nsq/internal/ext" @@ -1626,7 +1625,7 @@ func (p *protocolV2) internalPubExtAndTrace(client *nsqd.ClientV2, params [][]by var needTraceRsp bool var realBody []byte var extContent ext.IExtContent - var jsonHeader *simpleJson.Json + var jsonHeader nsqd.IJsonExt extContent = ext.NewNoExt() if traceEnable && !pubExt { traceID = binary.BigEndian.Uint64(messageBody[:nsqd.MsgTraceIDLength]) @@ -1641,19 +1640,16 @@ func (p *protocolV2) internalPubExtAndTrace(client *nsqd.ClientV2, params [][]by fmt.Sprintf("invalid body size %d in ext json header content length", bodyLen)) } extJsonBytes := messageBody[nsqd.MsgJsonHeaderLength : nsqd.MsgJsonHeaderLength+extJsonLen] - //validate json header passin - jsonHeader, err = simpleJson.NewJson(extJsonBytes) + jsonHeader, err = nsqd.NewJsonExt(extJsonBytes) if err != nil { - return nil, protocol.NewClientErr(err, ext.E_INVALID_JSON_HEADER, "fail to parse json header") + return nil, protocol.NewClientErr(err, ext.E_INVALID_JSON_HEADER, "fail to parse json header:"+err.Error()) } - //parse traceID, if there is any - traceIDJson, existInJsonHeader := jsonHeader.CheckGet(ext.TRACE_ID_KEY) - if existInJsonHeader { - traceIDStr, err := traceIDJson.String() - if err != nil { - return nil, protocol.NewClientErr(err, "INVALID_TRACE_ID", "passin trace id should be string") - } + traceIDStr, jerr := jsonHeader.GetString(ext.TRACE_ID_KEY) + if jerr != nil && !nsqd.IsNotFoundJsonKey(jerr) { + return nil, protocol.NewClientErr(nil, "INVALID_TRACE_ID", "passin trace id should be string") + } + if jerr == nil { traceID, err = strconv.ParseUint(traceIDStr, 10, 0) if err != nil { return nil, protocol.NewClientErr(err, "INVALID_TRACE_ID", "invalid trace id") @@ -1672,14 +1668,16 @@ func (p *protocolV2) internalPubExtAndTrace(client *nsqd.ClientV2, params [][]by asyncAction = false } if !topic.IsExt() && extContent.ExtVersion() != ext.NO_EXT_VER { + notOK := true if p.ctx.getOpts().AllowExtCompatible { - filterIllegalZanTestHeader(topicName, jsonHeader) + canIgnoreExt := canIgnoreJsonHeader(topicName, jsonHeader) + if canIgnoreExt { + extContent = ext.NewNoExt() + nsqd.NsqLogger().Debugf("ext content ignored in topic: %v", topicName) + notOK = false + } } - canIgnoreExt := canIgnoreJsonHeader(topicName, jsonHeader) - if p.ctx.getOpts().AllowExtCompatible && canIgnoreExt { - extContent = ext.NewNoExt() - nsqd.NsqLogger().Debugf("ext content ignored in topic: %v", topicName) - } else { + if notOK { nsqd.NsqLogger().Infof("ext content not supported in topic: %v", topicName) return nil, protocol.NewClientErr(nil, ext.E_EXT_NOT_SUPPORT, fmt.Sprintf("ext content not supported in topic %v", topicName)) @@ -1910,18 +1908,22 @@ func readMPUBEXT(r io.Reader, tmp []byte, topic *nsqd.Topic, maxMessageSize int6 extJsonBytes = msgBody[nsqd.MsgJsonHeaderLength : nsqd.MsgJsonHeaderLength+extJsonLen] //parse trace id, if there is a json ext if extJsonLen > 0 { - jsonHeader, err := simpleJson.NewJson(extJsonBytes) - if err != nil { - return nil, buffers, protocol.NewClientErr(err, ext.E_INVALID_JSON_HEADER, "fail to parse json header") + if !topicExt && !allowExtCompatible { + nsqd.NsqLogger().Infof("ext content not supported in topic: %v", topicName) + return nil, buffers, protocol.NewClientErr(nil, ext.E_EXT_NOT_SUPPORT, + fmt.Sprintf("ext content not supported in topic %v", topicName)) + } + jsonHeader, jerr := nsqd.NewJsonExt(extJsonBytes) + if jerr != nil { + return nil, buffers, protocol.NewClientErr(jerr, ext.E_INVALID_JSON_HEADER, "fail to parse json header:"+jerr.Error()) } //parse traceID, if there is any - traceIDJson, existInJsonHeader := jsonHeader.CheckGet(ext.TRACE_ID_KEY) - if existInJsonHeader { - traceIDStr, err := traceIDJson.String() - if err != nil { - return nil, buffers, protocol.NewClientErr(err, "INVALID_TRACE_ID", "passin trace id should be string") - } + traceIDStr, jerr := jsonHeader.GetString(ext.TRACE_ID_KEY) + if jerr != nil && !nsqd.IsNotFoundJsonKey(jerr) { + return nil, buffers, protocol.NewClientErr(err, "INVALID_TRACE_ID", "passin trace id should be string") + } + if jerr == nil { traceID, err = strconv.ParseUint(traceIDStr, 10, 0) if err != nil { return nil, buffers, protocol.NewClientErr(err, "INVALID_TRACE_ID", "invalid trace id") @@ -1929,11 +1931,8 @@ func readMPUBEXT(r io.Reader, tmp []byte, topic *nsqd.Topic, maxMessageSize int6 } //check compatibility when topic does not support ext if !topicExt { - if allowExtCompatible { - filterIllegalZanTestHeader(topicName, jsonHeader) - } canIgnoreExt = canIgnoreJsonHeader(topicName, jsonHeader) - if allowExtCompatible && canIgnoreExt { + if canIgnoreExt { nsqd.NsqLogger().Debugf("ext content ignored in topic: %v", topicName) } else { nsqd.NsqLogger().Infof("ext content not supported in topic: %v", topicName) @@ -1962,40 +1961,25 @@ func readMPUBEXT(r io.Reader, tmp []byte, topic *nsqd.Topic, maxMessageSize int6 return messages, buffers, nil } -//remove any zan_test header in json ext if value != true(bool, string) -func filterIllegalZanTestHeader(topicName string, jsonHeader *simpleJson.Json) { - if jsonHeader != nil { - flag, exist := jsonHeader.CheckGet(ext.ZAN_TEST_KEY) - if exist { - tb, err := flag.Bool() - if err != nil { - ts, _ := flag.String() - if ts != "" { - tb, _ = strconv.ParseBool(ts) - } - } - if !tb { - jsonHeader.Del(ext.ZAN_TEST_KEY) - nsqd.NsqLogger().Debugf("illegal zan test header removed in topic: %v, %v", topicName, flag) - } - } - } -} - //return true when there are only preserved kv in json header, and false otherwise -func canIgnoreJsonHeader(topicName string, jsonHeader *simpleJson.Json) bool { +func canIgnoreJsonHeader(topicName string, jsonHeader nsqd.IJsonExt) bool { canIgnoreExt := true if jsonHeader != nil { // if only internal header, we can ignore - m, _ := jsonHeader.Map() - for k, _ := range m { + jsonHeader.KeysCheck(func(k string) bool { // for future, if any internal header can not be ignored, we should check here + if k == ext.ZAN_TEST_KEY { + nsqd.NsqLogger().Debugf("illegal zan test header ignored in topic: %v", topicName) + return true + } if !strings.HasPrefix(k, "##") { canIgnoreExt = false nsqd.NsqLogger().Debugf("custom ext content can not be ignored in topic: %v, %v", topicName, k) - break + // stop early + return false } - } + return true + }) } return canIgnoreExt } diff --git a/nsqdserver/protocol_v2_test.go b/nsqdserver/protocol_v2_test.go index 3ba5f786..a09998e2 100644 --- a/nsqdserver/protocol_v2_test.go +++ b/nsqdserver/protocol_v2_test.go @@ -1149,6 +1149,99 @@ func TestPubJsonHeaderIgnored(t *testing.T) { test.Equal(t, len(data) >= 2, true) test.Equal(t, data[:2], []byte("OK")) + jext.Custom[ext.ZAN_TEST_KEY] = "true" + cmd, _ = nsq.PublishWithJsonExt(topicName, "0", make([]byte, 5), jext.ToJson()) + cmd.WriteTo(conn) + resp, _ = nsq.ReadResponse(conn) + frameType, data, _ = nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + test.Equal(t, frameType, frameTypeResponse) + test.Equal(t, len(data) >= 2, true) + test.Equal(t, data[:2], []byte("OK")) + + jext.Custom["k1"] = "v1" + cmd, _ = nsq.PublishWithJsonExt(topicName, "0", make([]byte, 5), jext.ToJson()) + cmd.WriteTo(conn) + resp, _ = nsq.ReadResponse(conn) + frameType, data, _ = nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + test.Equal(t, frameType, frameTypeError) + test.Equal(t, true, strings.Contains(string(data), ext.E_EXT_NOT_SUPPORT)) +} + +func TestPubJsonHeaderNotCompatible(t *testing.T) { + topicName := "test_json_header_nocompatible" + strconv.Itoa(int(time.Now().Unix())) + + opts := nsqdNs.NewOptions() + opts.Logger = newTestLogger(t) + opts.AllowExtCompatible = false + opts.AllowSubExtCompatible = true + if testing.Verbose() { + opts.LogLevel = 4 + nsqdNs.SetLogger(opts.Logger) + } + tcpAddr, _, nsqd, nsqdServer := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqdServer.Exit() + topic := nsqd.GetTopicIgnPart(topicName) + topicDynConf := nsqdNs.TopicDynamicConf{ + AutoCommit: 1, + SyncEvery: 1, + Ext: false, + } + topic.SetDynamicInfo(topicDynConf, nil) + + topic.GetChannel("ch") + + conn, err := mustConnectNSQD(tcpAddr) + test.Equal(t, err, nil) + identify(t, conn, nil, frameTypeResponse) + var jext nsq.MsgExt + jext.TraceID = 1 + cmd, _ := nsq.PublishWithJsonExt(topicName, "0", make([]byte, 5), jext.ToJson()) + cmd.WriteTo(conn) + resp, _ := nsq.ReadResponse(conn) + frameType, data, _ := nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + test.Equal(t, frameType, frameTypeError) + test.Equal(t, true, strings.Contains(string(data), ext.E_EXT_NOT_SUPPORT)) + + jext.DispatchTag = "tag" + cmd, _ = nsq.PublishWithJsonExt(topicName, "0", make([]byte, 5), jext.ToJson()) + cmd.WriteTo(conn) + resp, _ = nsq.ReadResponse(conn) + frameType, data, _ = nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + test.Equal(t, frameType, frameTypeError) + test.Equal(t, true, strings.Contains(string(data), ext.E_EXT_NOT_SUPPORT)) + + jext.TraceID = 0 + jext.DispatchTag = "tag" + cmd, _ = nsq.PublishWithJsonExt(topicName, "0", make([]byte, 5), jext.ToJson()) + cmd.WriteTo(conn) + resp, _ = nsq.ReadResponse(conn) + frameType, data, _ = nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + test.Equal(t, frameType, frameTypeError) + test.Equal(t, true, strings.Contains(string(data), ext.E_EXT_NOT_SUPPORT)) + + cmd, _ = nsq.PublishWithJsonExt(topicName, "0", make([]byte, 5), []byte("{}")) + cmd.WriteTo(conn) + resp, _ = nsq.ReadResponse(conn) + frameType, data, _ = nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + test.Equal(t, frameType, frameTypeError) + test.Equal(t, true, strings.Contains(string(data), ext.E_EXT_NOT_SUPPORT)) + + jext.Custom[ext.ZAN_TEST_KEY] = "true" + cmd, _ = nsq.PublishWithJsonExt(topicName, "0", make([]byte, 5), jext.ToJson()) + cmd.WriteTo(conn) + resp, _ = nsq.ReadResponse(conn) + frameType, data, _ = nsq.UnpackResponse(resp) + t.Logf("frameType: %d, data: %s", frameType, data) + test.Equal(t, frameType, frameTypeError) + test.Equal(t, true, strings.Contains(string(data), ext.E_EXT_NOT_SUPPORT)) + jext.Custom["k1"] = "v1" cmd, _ = nsq.PublishWithJsonExt(topicName, "0", make([]byte, 5), jext.ToJson()) cmd.WriteTo(conn)