Skip to content

Commit

Permalink
Support requeue delay msg
Browse files Browse the repository at this point in the history
  • Loading branch information
CrazyHZM committed Apr 8, 2024
1 parent 2ecb3f9 commit aaad42b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 3 deletions.
2 changes: 1 addition & 1 deletion internal/version/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime"
)

const Binary = "0.3.7-HA.1.12.9"
const Binary = "0.3.7-HA.1.13.0"

var (
Commit = "unset"
Expand Down
4 changes: 2 additions & 2 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2663,7 +2663,7 @@ func (c *Channel) processInFlightQueue(tnow int64) (bool, bool) {

if !c.IsConsumeDisabled() && !c.IsOrdered() &&
needPeekDelay && clientNum > 0 {
newAdded, cnt, err := c.peekAndReqDelayedMessages(tnow)
newAdded, cnt, err := c.PeekAndReqDelayedMessages(tnow)
if err == nil {
if newAdded > 0 && c.chLog.Level() >= levellogger.LOG_DEBUG {
c.chLog.LogDebugf("channel delayed waiting peeked %v added %v new : %v",
Expand Down Expand Up @@ -2709,7 +2709,7 @@ func (c *Channel) processInFlightQueue(tnow int64) (bool, bool) {
return dirty, checkFast
}

func (c *Channel) peekAndReqDelayedMessages(tnow int64) (int, int, error) {
func (c *Channel) PeekAndReqDelayedMessages(tnow int64) (int, int, error) {
if c.IsEphemeral() {
return 0, 0, nil
}
Expand Down
52 changes: 52 additions & 0 deletions nsqdserver/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer
router.Handle("POST", "/channel/fixconfirmed", http_api.Decorate(s.doFixChannelConfirmed, log, http_api.V1))
router.Handle("POST", "/channel/finishmemdelayed", http_api.Decorate(s.doFinishMemDelayed, log, http_api.V1))
router.Handle("POST", "/channel/emptydelayed", http_api.Decorate(s.doEmptyChannelDelayed, log, http_api.V1))
router.Handle("POST", "/channel/requeuedelayed", http_api.Decorate(s.doRequeueChannelDelayed, log, http_api.V1))
router.Handle("POST", "/channel/setoffset", http_api.Decorate(s.doSetChannelOffset, log, http_api.V1))
router.Handle("POST", "/channel/setorder", http_api.Decorate(s.doSetChannelOrder, log, http_api.V1))
router.Handle("POST", "/channel/setclientlimit", http_api.Decorate(s.doSetChannelClientLimit, log, http_api.V1))
Expand Down Expand Up @@ -224,6 +225,30 @@ func (s *httpServer) getExistingTopicChannelFromQuery(req *http.Request) (url.Va
return reqParams, topic, channelName, err
}

func (s *httpServer) getTimeOffsetFromQuery(req *http.Request) (int, error) {
reqParams, err := url.ParseQuery(req.URL.RawQuery)

if err != nil {
nsqd.NsqLogger().LogErrorf("failed to parse request params - %s", err)
return 1, http_api.Err{400, "INVALID_REQUEST"}
}

offsetStr := reqParams.Get("offset")

if offsetStr == "" {
nsqd.NsqLogger().LogErrorf("The value of offset does not exist. Set the default value to 1")
return 1, nil
}
offset, err := strconv.Atoi(offsetStr)

if err != nil {
nsqd.NsqLogger().LogErrorf("offset invalid - %s", err)
return 1, http_api.Err{400, "INVALID_REQUEST"}
}

return offset, err
}

//TODO: will be refactored for further extension
func getTag(reqParams url.Values) string {
return reqParams.Get("tag")
Expand Down Expand Up @@ -718,6 +743,33 @@ func (s *httpServer) doEmptyChannelDelayed(w http.ResponseWriter, req *http.Requ
return nil, nil
}

func (s *httpServer) doRequeueChannelDelayed(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
_, topic, channelName, err := s.getExistingTopicChannelFromQuery(req)
if err != nil {
return nil, err
}

offset, err := s.getTimeOffsetFromQuery(req)

channel, err := topic.GetExistingChannel(channelName)
if err != nil {
return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
}

if s.ctx.checkConsumeForMasterWrite(topic.GetTopicName(), topic.GetTopicPart()) {
_, _, err := channel.PeekAndReqDelayedMessages(time.Now().Add(time.Hour * time.Duration(offset)).UnixNano())
if err != nil {
nsqd.NsqLogger().Logf("failed to requeue the channel %v delayed data: %v, by client:%v",
channelName, err, req.RemoteAddr)
}
} else {
nsqd.NsqLogger().LogDebugf("should request to master: %v, from %v",
topic.GetFullName(), req.RemoteAddr)
return nil, http_api.Err{400, FailedOnNotLeader}
}
return nil, nil
}

func (s *httpServer) doFixChannelConfirmed(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
_, topic, channelName, err := s.getExistingTopicChannelFromQuery(req)
if err != nil {
Expand Down

0 comments on commit aaad42b

Please sign in to comment.