Skip to content

Commit

Permalink
refactor(lib): add common utils (#68)
Browse files Browse the repository at this point in the history
* refactor(lib): add common utils

* get_worker_name

* get_worker_id

* lint fix

* apply suggestions

* fix mistake
  • Loading branch information
chronolaw authored Sep 4, 2024
1 parent 8bd9846 commit 23160b6
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 40 deletions.
23 changes: 13 additions & 10 deletions lualib/resty/events/broker.lua
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
local codec = require "resty.events.codec"
local lrucache = require "resty.lrucache"
local queue = require "resty.events.queue"
local utils = require "resty.events.utils"
local server = require("resty.events.protocol").server
local is_timeout = server.is_timeout
local is_closed = server.is_closed


local is_timeout = utils.is_timeout
local is_closed = utils.is_closed
local get_worker_id = utils.get_worker_id
local get_worker_name = utils.get_worker_name


local setmetatable = setmetatable
local random = math.random


local ngx = ngx -- luacheck: ignore
local log = ngx.log
local exit = ngx.exit
local exiting = ngx.worker.exiting
local ngx_worker_id = ngx.worker.id
local worker_count = ngx.worker.count
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG
local NOTICE = ngx.NOTICE


local spawn = ngx.thread.spawn
local kill = ngx.thread.kill
local wait = ngx.thread.wait


local decode = codec.decode


local MAX_UNIQUE_EVENTS = 1024
local WEAK_KEYS_MT = { __mode = "k", }

Expand All @@ -38,12 +47,6 @@ do
end


local function get_worker_name(worker_id)
return worker_id == -1 and
"privileged agent" or "worker #" .. worker_id
end


local function terminating(self, worker_connection)
return not self._clients[worker_connection] or exiting()
end
Expand Down Expand Up @@ -215,7 +218,7 @@ function _M:init()
end

function _M:run()
local broker_id = ngx_worker_id()
local broker_id = get_worker_id()
if broker_id ~= self._opts.broker_id then
log(ERR, "broker got connection from worker on non-broker worker #", broker_id)
return exit(444)
Expand Down
14 changes: 9 additions & 5 deletions lualib/resty/events/init.lua
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
local utils = require "resty.events.utils"
local events_broker = require "resty.events.broker"
local events_worker = require "resty.events.worker"

local disable_listening = require "resty.events.disable_listening"

local ngx = ngx
local ngx_worker_id = ngx.worker.id

local get_worker_id = utils.get_worker_id


local type = type
local setmetatable = setmetatable
local str_sub = string.sub

local worker_count = ngx.worker.count()

local worker_count = utils.get_worker_count()


local _M = {
_VERSION = "0.3.0",
}
local _MT = { __index = _M, }


local function check_options(opts)
assert(type(opts) == "table", "Expected a table, got " .. type(opts))

Expand Down Expand Up @@ -107,7 +111,7 @@ end
function _M:init_worker()
local opts = self.opts

local worker_id = ngx_worker_id() or -1
local worker_id = get_worker_id()

local is_broker = opts.broker_id == worker_id or
opts.testing == true
Expand Down
23 changes: 8 additions & 15 deletions lualib/resty/events/protocol.lua
Original file line number Diff line number Diff line change
@@ -1,41 +1,38 @@
local frame = require "resty.events.frame"
local codec = require "resty.events.codec"
local utils = require "resty.events.utils"


local _recv_frame = frame.recv
local _send_frame = frame.send
local encode = codec.encode
local decode = codec.decode

local ngx = ngx
local worker_id = ngx.worker.id

local ngx = ngx -- luacheck: ignore
local worker_pid = ngx.worker.pid
local tcp = ngx.socket.tcp
local req_sock = ngx.req.socket
local ngx_header = ngx.header
local send_headers = ngx.send_headers
local flush = ngx.flush
local subsystem = ngx.config.subsystem
local get_worker_id = utils.get_worker_id


local type = type
local str_sub = string.sub
local str_find = string.find
local setmetatable = setmetatable


-- for high traffic pressure
local DEFAULT_TIMEOUT = 5000 -- 5000ms
local WORKER_INFO = {
id = 0,
pid = 0,
}

local function is_timeout(err)
return err and str_sub(err, -7) == "timeout"
end

local function is_closed(err)
return err and (str_sub(err, -6) == "closed" or
str_sub(err, -11) == "broken pipe")
end

local function recv_frame(self)
local sock = self.sock
Expand All @@ -56,8 +53,6 @@ local function send_frame(self, payload)
end

local _Server = {
is_closed = is_closed,
is_timeout = is_timeout,
recv_frame = recv_frame,
send_frame = send_frame,
}
Expand Down Expand Up @@ -109,8 +104,6 @@ function _Server.new()
end

local _Client = {
is_closed = is_closed,
is_timeout = is_timeout,
recv_frame = recv_frame,
send_frame = send_frame,
}
Expand Down Expand Up @@ -171,7 +164,7 @@ function _Client:connect(addr)
end
end -- subsystem == "http"

WORKER_INFO.id = worker_id() or -1
WORKER_INFO.id = get_worker_id()
WORKER_INFO.pid = worker_pid()

local _, err = _send_frame(sock, encode(WORKER_INFO))
Expand Down
43 changes: 43 additions & 0 deletions lualib/resty/events/utils.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
local str_sub = string.sub


local ngx = ngx -- luacheck: ignore
local ngx_worker_id = ngx.worker.id
local ngx_worker_count = ngx.worker.count


local function is_timeout(err)
return err and str_sub(err, -7) == "timeout"
end


local function is_closed(err)
return err and (str_sub(err, -6) == "closed" or
str_sub(err, -11) == "broken pipe")
end


local function get_worker_id()
return ngx_worker_id() or -1 -- -1 represents priviledged worker
end


local function get_worker_name(worker_id)
return worker_id == -1 and -- -1 represents priviledged worker
"privileged agent" or "worker #" .. worker_id
end


local function get_worker_count()
return ngx_worker_count()
end


return {
is_timeout = is_timeout,
is_closed = is_closed,

get_worker_id = get_worker_id,
get_worker_name = get_worker_name,
get_worker_count = get_worker_count,
}
24 changes: 14 additions & 10 deletions lualib/resty/events/worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,48 @@ local cjson = require "cjson.safe"
local codec = require "resty.events.codec"
local queue = require "resty.events.queue"
local callback = require "resty.events.callback"
local utils = require "resty.events.utils"


local frame_validate = require("resty.events.frame").validate
local client = require("resty.events.protocol").client
local is_timeout = client.is_timeout
local is_timeout = utils.is_timeout
local get_worker_id = utils.get_worker_id
local get_worker_name = utils.get_worker_name


local type = type
local assert = assert
local setmetatable = setmetatable
local random = math.random


local ngx = ngx -- luacheck: ignore
local log = ngx.log
local sleep = ngx.sleep
local exiting = ngx.worker.exiting
local ngx_worker_id = ngx.worker.id
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG
local NOTICE = ngx.NOTICE


local spawn = ngx.thread.spawn
local kill = ngx.thread.kill
local wait = ngx.thread.wait


local timer_at = ngx.timer.at


local encode = codec.encode
local decode = codec.decode
local cjson_encode = cjson.encode


local EVENTS_COUNT_LIMIT = 100
local EVENTS_SLEEP_TIME = 0.05


local EMPTY_T = {}

local EVENT_T = {
Expand All @@ -56,12 +66,6 @@ local _M = {}
local _MT = { __index = _M, }


local function get_worker_name(worker_id)
return worker_id == -1 and
"privileged agent" or "worker #" .. worker_id
end


-- gen a random number [0.01, 0.05]
-- it means that delay will be 10ms~50ms
local function random_delay()
Expand Down Expand Up @@ -340,7 +344,7 @@ end
function _M:init()
assert(self._opts)

self._worker_id = ngx_worker_id() or -1
self._worker_id = get_worker_id()

start_timers(self)

Expand All @@ -354,7 +358,7 @@ local function post_event(self, source, event, data, spec)
EVENT_T.source = source
EVENT_T.event = event
EVENT_T.data = data
EVENT_T.wid = self._worker_id or ngx_worker_id() or -1
EVENT_T.wid = self._worker_id or get_worker_id()

-- encode event info
str, err = encode(EVENT_T)
Expand Down

0 comments on commit 23160b6

Please sign in to comment.