Skip to content

Commit

Permalink
Initial support for differing persistent storage engines
Browse files Browse the repository at this point in the history
Refactor lib.storage to implement a polymorphic interface for
collection initialization and persistence. Initial backends
include ngx.shared and memcached, which both implement the same
pattern with respect to collection serialization and expiry handling.
  • Loading branch information
p0pr0ck5 committed Jun 27, 2016
1 parent 0fd044f commit ab779eb
Show file tree
Hide file tree
Showing 14 changed files with 1,311 additions and 47 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ env:
- V_OPENRESTY=1.9.15.1 DATE=20160617 TEST=unit
- V_OPENRESTY=1.9.15.1 DATE=20160617 TEST=acceptance
- V_OPENRESTY=1.9.15.1 DATE=20160617 TEST=regression
services:
- memcached
install:
- cpanm -v --notest Test::Nginx
before_script:
Expand Down
3 changes: 3 additions & 0 deletions lib/opts.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ _M.defaults = {
_res_body_mime_types = { ["text/plain"] = true, ["text/html"] = true },
_res_tid_header = false,
_score_threshold = 5,
_storage_backend = 'dict',
_storage_memcached_host = '127.0.0.1',
_storage_memcached_port = 11211,
_storage_zone = nil,
}

Expand Down
64 changes: 26 additions & 38 deletions lib/storage.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,19 @@ local cjson = require("cjson")
local logger = require("lib.log")
local util = require("lib.util")

local _valid_backends = { dict = true, memcached = true }

function _M.initialize(waf, storage, col)
if (not waf._storage_zone) then
return
local backend = waf._storage_backend
if (not util.table_has_key(backend, _valid_backends)) then
logger.fatal_fail(backend .. " is not a valid persistent storage backend")
end

local altered, serialized, shm
shm = ngx.shared[waf._storage_zone]
serialized = shm:get(col)
altered = false
local backend_m = require("lib.storage." .. backend)

if (not serialized) then
logger.log(waf, "Initializing an empty collection for " .. col)
storage[col] = {}
else
local data = cjson.decode(serialized)

-- because we're serializing out the contents of the collection
-- we need to roll our own expire handling. lua_shared_dict's
-- internal expiry can't act on individual collection elements
for key in pairs(data) do
if (not key:find("__", 1, true) and data["__expire_" .. key]) then
logger.log(waf, "checking " .. key)
if (data["__expire_" .. key] < ngx.time()) then
logger.log(waf, "Removing expired key: " .. key)
data["__expire_" .. key] = nil
data[key] = nil
altered = true
end
end
end
logger.log(waf, "Initializing storage type " .. backend)

storage[col] = data
end

storage[col]["__altered"] = altered
backend_m.initialize(waf, storage, col)
end

function _M.set_var(waf, ctx, element, value)
Expand Down Expand Up @@ -85,28 +63,38 @@ function _M.delete_var(waf, ctx, element)
if (storage[col][key]) then
storage[col][key] = nil
storage[col]["__altered"] = true

-- redis cant expire specific keys in a hash so we track them for hdel when persisting
if (waf._storage_backend == 'redis') then
waf._storage_redis_delkey_n = waf._storage_redis_delkey_n + 1
waf._storage_redis_delkey[waf._storage_redis_delkey_n] = key
end
else
logger.log(waf, key .. " was not found in " .. col)
end
end

function _M.persist(waf, storage)
if (not waf._storage_zone) then
return
local backend = waf._storage_backend
if (not util.table_has_key(backend, _valid_backends)) then
logger.fatal_fail(backend .. " is not a valid persistent storage backend")
end

local backend_m = require("lib.storage." .. backend)

if (not util.table_has_key(backend, _valid_backends)) then
logger.fatal_fail(backend .. " is not a valid persistent storage backend")
end

local shm = ngx.shared[waf._storage_zone]
logger.log(waf, 'Persisting storage type ' .. backend)

for col in pairs(storage) do
if (col ~= 'TX') then
logger.log(waf, 'Examining ' .. col)

if (storage[col]["__altered"]) then
local serialized = cjson.encode(storage[col])

logger.log(waf, 'Persisting value: ' .. tostring(serialized))

shm:set(col, serialized)
storage[col]["__altered"] = nil -- dont need to persist this flag
backend_m.persist(waf, col, storage[col])
else
logger.log(waf, "Not persisting a collection that wasn't altered")
end
Expand Down
57 changes: 57 additions & 0 deletions lib/storage/dict.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
local _M = {}

_M.version = "0.7.2"

local cjson = require("cjson")
local logger = require("lib.log")

function _M.initialize(waf, storage, col)
if (not waf._storage_zone) then
return
end

local altered, serialized, shm
shm = ngx.shared[waf._storage_zone]
serialized = shm:get(col)
altered = false

if (not serialized) then
logger.log(waf, "Initializing an empty collection for " .. col)
storage[col] = {}
else
local data = cjson.decode(serialized)

-- because we're serializing out the contents of the collection
-- we need to roll our own expire handling. lua_shared_dict's
-- internal expiry can't act on individual collection elements
for key in pairs(data) do
if (not key:find("__", 1, true) and data["__expire_" .. key]) then
logger.log(waf, "checking " .. key)
if (data["__expire_" .. key] < ngx.time()) then
logger.log(waf, "Removing expired key: " .. key)
data["__expire_" .. key] = nil
data[key] = nil
altered = true
end
end
end

storage[col] = data
end

storage[col]["__altered"] = altered
end

function _M.persist(waf, col, data)
if (not waf._storage_zone) then
return
end

local shm = ngx.shared[waf._storage_zone]
local serialized = cjson.encode(data)
logger.log(waf, 'Persisting value: ' .. tostring(serialized))
shm:set(col, serialized)
end


return _M
87 changes: 87 additions & 0 deletions lib/storage/memcached.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
local _M = {}

_M.version = "0.7.2"

local cjson = require("cjson")
local logger = require("lib.log")
local memcached_m = require("resty.memcached")

function _M.initialize(waf, storage, col)
local memcached = memcached_m:new()
local host = waf._storage_memcached_host
local port = waf._storage_memcached_port

local ok, err = memcached:connect(host, port)
if (not ok) then
logger.log(waf, "Error in connecting to memcached: " .. err)
storage[col] = {}
return
end

local serialized, flags, err = memcached:get(col)
if (err) then
logger.log(waf, "Error retrieving " .. col .. ": " .. err)
storage[col] = {}
return
end

local ok, err = memcached:set_keepalive(10000, 100)
if (not ok) then
logger.log(waf, "Error setting memcached keepalive: " .. err)
end

local altered = false

if (not serialized) then
logger.log(waf, "Initializing an empty collection for " .. col)
storage[col] = {}
else
local data = cjson.decode(serialized)

-- because we're serializing out the contents of the collection
-- we need to roll our own expire handling
for key in pairs(data) do
if (not key:find("__", 1, true) and data["__expire_" .. key]) then
logger.log(waf, "checking " .. key)
if (data["__expire_" .. key] < ngx.time()) then
logger.log(waf, "Removing expired key: " .. key)
data["__expire_" .. key] = nil
data[key] = nil
altered = true
end
end
end

storage[col] = data
end

storage[col]["__altered"] = altered
end

function _M.persist(waf, col, data)
local serialized = cjson.encode(data)
logger.log(waf, 'Persisting value: ' .. tostring(serialized))

local memcached = memcached_m:new()
local host = waf._storage_memcached_host
local port = waf._storage_memcached_port

local ok, err = memcached:connect(host, port)
if (not ok) then
logger.log(waf, "Error in connecting to memcached: " .. err)
return
end

local ok, err = memcached:set(col, serialized)
if (not ok) then
logger.log(waf, "Error persisting storage data: " .. err)
end

local ok, err = memcached:set_keepalive(10000, 100)
if (not ok) then
logger.log(waf, "Error setting memcached keepalive: " .. err)
end
end


return _M
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use Test::Nginx::Socket::Lua;

repeat_each(3);
plan tests => repeat_each() * 5 * blocks() - 6;
plan tests => repeat_each() * 5 * blocks() - 3;

no_shuffle();
run_tests();
Expand All @@ -14,6 +14,7 @@ __DATA__
init_by_lua '
local lua_resty_waf = require "waf"
lua_resty_waf.default_option("storage_zone", "store")
lua_resty_waf.default_option("storage_backend", "dict")
lua_resty_waf.default_option("debug", true)
';
--- config
Expand All @@ -34,6 +35,7 @@ __DATA__
GET /t
--- error_code: 200
--- error_log
Initializing storage type dict
Initializing an empty collection for FOO
--- no_error_log
[error]
Expand All @@ -44,6 +46,7 @@ Initializing an empty collection for FOO
init_by_lua '
local lua_resty_waf = require "waf"
lua_resty_waf.default_option("storage_zone", "store")
lua_resty_waf.default_option("storage_backend", "dict")
lua_resty_waf.default_option("debug", true)
';
--- config
Expand Down Expand Up @@ -91,6 +94,7 @@ Removing expired key:
init_by_lua '
local lua_resty_waf = require "waf"
lua_resty_waf.default_option("storage_zone", "store")
lua_resty_waf.default_option("storage_backend", "dict")
lua_resty_waf.default_option("debug", true)
';
--- config
Expand Down Expand Up @@ -139,6 +143,7 @@ Initializing an empty collection for FOO
init_by_lua '
local lua_resty_waf = require "waf"
lua_resty_waf.default_option("storage_zone", "store")
lua_resty_waf.default_option("storage_backend", "dict")
lua_resty_waf.default_option("debug", true)
';
--- config
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use Test::Nginx::Socket::Lua;

repeat_each(3);
plan tests => repeat_each() * 5 * blocks();
plan tests => repeat_each() * 5 * blocks() + 3;

no_shuffle();
run_tests();
Expand All @@ -14,6 +14,7 @@ __DATA__
init_by_lua '
local lua_resty_waf = require "waf"
lua_resty_waf.default_option("storage_zone", "store")
lua_resty_waf.default_option("storage_backend", "dict")
lua_resty_waf.default_option("debug", true)
';
--- config
Expand Down Expand Up @@ -42,8 +43,9 @@ __DATA__
GET /t
--- error_code: 200
--- error_log
Persisting storage type dict
Examining FOO
Persisting value: {"
Persisting value: {"COUNT":1}
--- no_error_log
[error]
Not persisting a collection that wasn't altered
Expand All @@ -54,6 +56,7 @@ Not persisting a collection that wasn't altered
init_by_lua '
local lua_resty_waf = require "waf"
lua_resty_waf.default_option("storage_zone", "store")
lua_resty_waf.default_option("storage_backend", "dict")
lua_resty_waf.default_option("debug", true)
';
--- config
Expand Down Expand Up @@ -91,6 +94,7 @@ Persisting value: {"
init_by_lua '
local lua_resty_waf = require "waf"
lua_resty_waf.default_option("storage_zone", "store")
lua_resty_waf.default_option("storage_backend", "dict")
lua_resty_waf.default_option("debug", true)
';
--- config
Expand All @@ -100,7 +104,7 @@ Persisting value: {"
local waf = lua_resty_waf:new()
local ctx = { storage = {}, col_lookup = { FOO = "FOO" } }
local var = require("cjson").encode({ COUNT = 5, __expire_COUNT = ngx.time() - 10 })
local var = require("cjson").encode({ COUNT = 5, __expire_COUNT = ngx.time() - 10, BAR = 1 })
local shm = ngx.shared[waf._storage_zone]
shm:set("FOO", var)
Expand All @@ -117,7 +121,7 @@ GET /t
--- error_code: 200
--- error_log
Examining FOO
Persisting value: {"
Persisting value: {"BAR":1}
--- no_error_log
[error]
Not persisting a collection that wasn't altered
Expand All @@ -136,13 +140,10 @@ Not persisting a collection that wasn't altered
local lua_resty_waf = require "waf"
local waf = lua_resty_waf:new()
local ctx = { storage = {}, col_lookup = { TX = "TX" } }
local ctx = { storage = { TX = {} }, col_lookup = { TX = "TX" } }
local var = require("cjson").encode({ COUNT = 5 })
local shm = ngx.shared[waf._storage_zone]
shm:set("TX", var)
local storage = require "lib.storage"
storage.initialize(waf, ctx.storage, "TX")
local element = { col = "TX", key = "COUNT", value = 1 }
storage.set_var(waf, ctx, element, element.value)
Expand Down
Loading

0 comments on commit ab779eb

Please sign in to comment.