Skip to content

Commit

Permalink
style(lib): style clean for logging worker info (#66)
Browse files Browse the repository at this point in the history
* style(lib/worker): style clean for log notice info

* local func get_worker_name()

* msg fix

* fix mistake

* get_json

* luacheck: ignore
  • Loading branch information
chronolaw authored Aug 22, 2024
1 parent 2dcd1d7 commit 1405e7a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 67 deletions.
101 changes: 40 additions & 61 deletions lualib/resty/events/broker.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
local cjson = require "cjson.safe"
local codec = require "resty.events.codec"
local lrucache = require "resty.lrucache"
local queue = require "resty.events.queue"
Expand All @@ -9,7 +8,7 @@ local is_closed = server.is_closed
local setmetatable = setmetatable
local random = math.random

local ngx = ngx
local ngx = ngx -- luacheck: ignore
local log = ngx.log
local exit = ngx.exit
local exiting = ngx.worker.exiting
Expand All @@ -25,11 +24,26 @@ local wait = ngx.thread.wait

local decode = codec.decode

local cjson_encode = cjson.encode

local MAX_UNIQUE_EVENTS = 1024
local WEAK_KEYS_MT = { __mode = "k", }


local get_json
do
local cjson_encode = require("cjson.safe").encode

get_json = function(data)
return cjson_encode(decode(data))
end
end


local function get_worker_name(worker_id)
return worker_id == -1 and
"privileged agent" or "worker #" .. worker_id
end


local function terminating(self, worker_connection)
return not self._clients[worker_connection] or exiting()
end
Expand All @@ -49,13 +63,10 @@ local function broadcast_events(self, unique, data)
local worker_queue = queues[worker_id]
local ok, err = worker_queue:push(data)
if not ok then
if worker_id == -1 then
log(ERR, "failed to publish unique event to privileged agent: ", err,
". data is :", cjson_encode(decode(data)))
else
log(ERR, "failed to publish unique event to worker #", worker_id,
": ", err, ". data is :", cjson_encode(decode(data)))
end
log(ERR, "failed to publish unique event to ",
get_worker_name(worker_id),
": ", err,
". data is :", get_json(data))

else
n = n + 1
Expand All @@ -66,13 +77,10 @@ local function broadcast_events(self, unique, data)
local worker_queue = queues[worker_id]
local ok, err = worker_queue:push(data)
if not ok then
if worker_id == -1 then
log(ERR, "failed to publish event to privileged agent: ", err,
". data is :", cjson_encode(decode(data)))
else
log(ERR, "failed to publish event to worker #", worker_id,
": ", err, ". data is :", cjson_encode(decode(data)))
end
log(ERR, "failed to publish event to ",
get_worker_name(worker_id),
": ", err,
". data is :", get_json(data))

else
n = n + 1
Expand All @@ -98,23 +106,16 @@ local function read_thread(self, worker_connection)

if not data then
if not exiting() then
if worker_id == -1 then
log(ERR, "did not receive event from privileged agent")
else
log(ERR, "did not receive event from worker #", worker_id)
end
log(ERR, "did not receive event from ", get_worker_name(worker_id))
end
goto continue
end

local event_data, err = decode(data)
if not event_data then
if not exiting() then
if worker_id == -1 then
log(ERR, "failed to decode event data on privileged agent: ", err)
else
log(ERR, "failed to decode event data on worker #", worker_id, ": ", err)
end
log(ERR, "failed to decode event data on ",
get_worker_name(worker_id), ": ", err)
end
goto continue
end
Expand All @@ -123,11 +124,8 @@ local function read_thread(self, worker_connection)
local unique = event_data.spec.unique
if unique then
if self._uniques:get(unique) then
if worker_id == -1 then
log(DEBUG, "unique event is duplicate on privileged agent: ", unique)
else
log(DEBUG, "unique event is duplicate on worker #", worker_id, ": ", unique)
end
log(DEBUG, "unique event is duplicate on ",
get_worker_name(worker_id), ": ", unique)
goto continue
end

Expand Down Expand Up @@ -159,13 +157,9 @@ local function write_thread(self, worker_connection, worker_queue)
if err then
local ok, push_err = worker_queue:push_front(payload)
if not ok then
if worker_id == -1 then
log(ERR, "failed to retain event for privileged agent: ",
push_err, ". data is :", cjson_encode(decode(payload)))
else
log(ERR, "failed to retain event for worker #", worker_id, ": ",
push_err, ". data is :", cjson_encode(decode(payload)))
end
log(ERR, "failed to retain event for ",
get_worker_name(worker_id), ": ", push_err,
". data is :", get_json(payload))
end
return nil, "failed to send event: " .. err
end
Expand Down Expand Up @@ -263,13 +257,8 @@ function _M:run()
local read_thread_co = spawn(read_thread, self, worker_connection)
local write_thread_co = spawn(write_thread, self, worker_connection, queues[worker_id])

if worker_id == -1 then
log(NOTICE, "privileged agent connected to events broker (worker pid: ",
worker_pid, ")")
else
log(NOTICE, "worker #", worker_id, " connected to events broker (worker pid: ",
worker_pid, ")")
end
log(NOTICE, get_worker_name(worker_id),
" connected to events broker (worker pid: ", worker_pid, ")")

local ok, err, perr = wait(read_thread_co, write_thread_co)

Expand All @@ -282,24 +271,14 @@ function _M:run()
end

if not ok and not is_closed(err) then
if worker_id == -1 then
log(ERR, "event broker failed on worker privileged agent: ", err,
" (worker pid: ", worker_pid, ")")
else
log(ERR, "event broker failed on worker #", worker_id, ": ", err,
" (worker pid: ", worker_pid, ")")
end
log(ERR, "event broker failed on ", get_worker_name(worker_id),
": ", err, " (worker pid: ", worker_pid, ")")
return exit(ngx.ERROR)
end

if perr and not is_closed(perr) then
if worker_id == -1 then
log(ERR, "event broker failed on worker privileged agent: ", perr,
" (worker pid: ", worker_pid, ")")
else
log(ERR, "event broker failed on worker #", worker_id, ": ", perr,
" (worker pid: ", worker_pid, ")")
end
log(ERR, "event broker failed on ", get_worker_name(worker_id),
": ", perr, " (worker pid: ", worker_pid, ")")
return exit(ngx.ERROR)
end

Expand Down
16 changes: 10 additions & 6 deletions lualib/resty/events/worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ local assert = assert
local setmetatable = setmetatable
local random = math.random

local ngx = ngx
local ngx = ngx -- luacheck: ignore
local log = ngx.log
local sleep = ngx.sleep
local exiting = ngx.worker.exiting
Expand Down Expand Up @@ -55,6 +55,13 @@ local PAYLOAD_T = {
local _M = {}
local _MT = { __index = _M, }


local function get_worker_name(worker_id)
return worker_id == -1 and
"privileged agent" or "worker #" .. worker_id
end


-- gen a random number [0.01, 0.05]
-- it means that delay will be 10ms~50ms
local function random_delay()
Expand Down Expand Up @@ -280,11 +287,8 @@ function _M:communicate()
local read_thread_co = spawn(read_thread, self, broker_connection)
local write_thread_co = spawn(write_thread, self, broker_connection)

if self._worker_id == -1 then
log(NOTICE, "privileged agent is ready to accept events from ", listening)
else
log(NOTICE, "worker #", self._worker_id, " is ready to accept events from ", listening)
end
log(NOTICE, get_worker_name(self._worker_id),
" is ready to accept events from ", listening)

local ok, err, perr = wait(read_thread_co, write_thread_co)

Expand Down

0 comments on commit 1405e7a

Please sign in to comment.