Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CubeProxy/lua/init_worker_phase.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ local function monitor_cache_usage()
ngx.shared.local_cache:set("cache_free_space", cache_free_space)
end

-- Seed the random number generator for each worker to ensure
-- that cache TTL jitter (math.random) works correctly.
math.randomseed(ngx.now() * 1000 + ngx.worker.id())

local worker_id = ngx.worker.id()
-- Only worker 0 performs these timed tasks
-- Even if worker PID is changed, worker ID still keep same
Expand Down
95 changes: 53 additions & 42 deletions CubeProxy/lua/rewrite_phase.lua
Original file line number Diff line number Diff line change
Expand Up @@ -79,60 +79,71 @@ local function get_backend_address(ins_id, container_port)
local host_ip = cache:get(cache_backend_ip_key)
local host_port = cache:get(cache_backend_port_key)
if host_ip and host_port then
cache:set(cache_backend_ip_key, host_ip, timeout)
cache:set(cache_backend_port_key, host_port, timeout)
return host_ip, host_port
end

local metadata, err = load_sandbox_proxy_metadata(ins_id)
if err then
ngx.log(ngx.ERR, "LEVEL_ERROR||", err)
ngx.var.cube_retcode = "310500"
ngx.exit(500)
end
local fn = function()
-- 2nd check
local h_ip = cache:get(cache_backend_ip_key)
local h_port = cache:get(cache_backend_port_key)
if h_ip and h_port then
return {ip = h_ip, port = h_port}, nil
end

local metadata_map = {}
for i = 1, #metadata, 2 do
local k = metadata[i]
local v = metadata[i + 1]
metadata_map[k] = v
cache:set(ins_id .. ":" .. k, v, timeout)
end
local metadata, err = load_sandbox_proxy_metadata(ins_id)
if err then
return nil, {code = "310500", level = ngx.ERR, msg = err}
end

local target_host_ip = metadata_map["HostIP"]
local target_sandbox_ip = metadata_map["SandboxIP"]
if utils:is_null(target_host_ip) then
ngx.log(ngx.ERR, "LEVEL_WARN||",
string.format("request %s using instance %s misses HostIP", ngx.var.http_x_cube_request_id, ins_id))
ngx.var.cube_retcode = "310507"
ngx.exit(500)
local metadata_map = {}
for i = 1, #metadata, 2 do
local k = metadata[i]
local v = metadata[i + 1]
metadata_map[k] = v
cache:set(ins_id .. ":" .. k, v, timeout)
end

local target_host_ip = metadata_map["HostIP"]
local target_sandbox_ip = metadata_map["SandboxIP"]
if utils:is_null(target_host_ip) then
return nil, {code = "310507", level = ngx.WARN, msg = string.format("request %s using instance %s misses HostIP", ngx.var.http_x_cube_request_id, ins_id)}
end

if not utils:is_null(caller_host_ip) and caller_host_ip == target_host_ip then
if utils:is_null(target_sandbox_ip) then
return nil, {code = "310507", level = ngx.ERR, msg = string.format("request %s instance %s on local host %s misses SandboxIP", ngx.var.http_x_cube_request_id, ins_id, caller_host_ip)}
end
h_ip = target_sandbox_ip
h_port = container_port
else
h_ip = target_host_ip
h_port = metadata_map[container_port]
if utils:is_null(h_port) then
return nil, {code = "310507", level = ngx.ERR, msg = string.format("request %s instance %s misses host port mapping for container_port %s", ngx.var.http_x_cube_request_id, ins_id, container_port)}
end
end

cache:set(cache_backend_ip_key, h_ip, timeout)
cache:set(cache_backend_port_key, h_port, timeout)
return {ip = h_ip, port = h_port}, nil
end

if not utils:is_null(caller_host_ip) and caller_host_ip == target_host_ip then
if utils:is_null(target_sandbox_ip) then
ngx.log(ngx.ERR, "LEVEL_ERROR||",
string.format("request %s instance %s on local host %s misses SandboxIP",
ngx.var.http_x_cube_request_id, ins_id, caller_host_ip))
ngx.var.cube_retcode = "310507"
local res, err, shared = utils:singleflight_do("local_cache_locks", "meta:" .. ins_id, fn)

if not res then
if type(err) == "table" and err.code then
local level_str = (err.level == ngx.WARN) and "LEVEL_WARN||" or "LEVEL_ERROR||"
ngx.log(err.level or ngx.ERR, level_str, err.msg)
ngx.var.cube_retcode = err.code
ngx.exit(500)
end
host_ip = target_sandbox_ip
host_port = container_port
else
host_ip = target_host_ip
host_port = metadata_map[container_port]
if utils:is_null(host_port) then
ngx.log(ngx.ERR, "LEVEL_ERROR||",
string.format("request %s instance %s misses host port mapping for container_port %s",
ngx.var.http_x_cube_request_id, ins_id, container_port))
ngx.var.cube_retcode = "310507"
else
ngx.log(ngx.ERR, "LEVEL_ERROR||", tostring(err))
ngx.var.cube_retcode = "310500"
ngx.exit(500)
end
end

cache:set(cache_backend_ip_key, host_ip, timeout)
cache:set(cache_backend_port_key, host_port, timeout)
return host_ip, host_port
return res.ip, res.port
end

-- resolve sandbox id and container port from Host: <port>-<sandbox_id>.<domain>
Expand Down
100 changes: 72 additions & 28 deletions CubeProxy/lua/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,49 +43,93 @@ function _M.is_null(self, str)
end

--[[
1 arg:
- backend_ip: the backend to check
- check_local: whether check the local cache first
Mimics golang.org/x/sync/singleflight Do() interface.
Since OpenResty workers are separate processes, we use resty.lock for cross-worker synchronization.
The fn() provided MUST implement a cache check (double-checked locking) as its first step.
Returns: val, err, shared (boolean indicating if we waited for the lock)
]]
function _M.singleflight_do(self, lock_dict_name, key, fn)
local resty_lock = require "resty.lock"
local lock, err = resty_lock:new(lock_dict_name, {
exptime = 10,
timeout = 5
})
if not lock then
ngx.log(ngx.ERR, "LEVEL_ERROR||", "failed to create singleflight lock: ", err)
return nil, err, false
end

local elapsed, err = lock:lock(key)
if not elapsed then
ngx.log(ngx.ERR, "LEVEL_ERROR||", "failed to acquire singleflight lock: ", err)
return nil, err, false
end

local shared = (elapsed > 0)
local ok, res, func_err = xpcall(fn, debug.traceback)

local unlock_ok, unlock_err = lock:unlock()
if not unlock_ok and unlock_err ~= "unlocked" then
ngx.log(ngx.ERR, "LEVEL_ERROR||", "singleflight unlock failed: ", unlock_err)
end

if not ok then
ngx.log(ngx.ERR, "LEVEL_ERROR||", "singleflight fn error: ", res)
return nil, res, shared
end

return res, func_err, shared
end

--[[
2 args:
- backend_ip: the backend IP to check
- check_remote: if true, query Redis on cache miss; if false, return false immediately
2 return values:
- true if redis mark the backend is faulty, false otherwise
- true if the backend is marked faulty, false otherwise
- error: any error that occurred during executing the function
--]]
function _M.is_faulty_backend(self, backend_ip, check_remote)
local cache = ngx.shared.faulty_backend

-- 1st check: fast path, no lock needed
local value = cache:get(backend_ip)
if not value or value ~= "true" then
return false, nil
if value ~= nil then
return value == "true", nil
end

-- network I/O is disabled in header_filter phase, return directly
if check_remote == false then
return true, nil
end

local redis = require "redis_iresty"
local red = redis:new({
redis_ip = ngx.var.redis_ip,
redis_port = ngx.var.redis_port,
redis_pd = ngx.var.redis_pd,
redis_index = ngx.var.redis_index
})
local key = "faulty_backend_set"
local err
value, err = red:smembers(key)
if err then
return false, err
end

if not value then
return false, nil
end

for _, v in ipairs(value) do
if v == backend_ip then
return true, nil
local fn = function()
-- 2nd check: another concurrent request may have populated the cache while we waited
local val = cache:get(backend_ip)
if val ~= nil then
return val == "true", nil
end

-- cache is still empty — we are the single flight, query Redis
local redis = require "redis_iresty"
local red = redis:new({
redis_ip = ngx.var.redis_ip,
redis_port = ngx.var.redis_port,
redis_pd = ngx.var.redis_pd,
redis_index = ngx.var.redis_index
})
local res, err = red:sismember("faulty_backend_set", backend_ip)
if err then
return false, err
end

local is_faulty = (res == 1)
cache:set(backend_ip, is_faulty and "true" or "false", 5)
return is_faulty, nil
end

return false, nil
local res, err, shared = self:singleflight_do("faulty_backend_locks", "faulty:" .. backend_ip, fn)
return res, err
end

return _M
2 changes: 2 additions & 0 deletions CubeProxy/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ http {
}

lua_shared_dict local_cache 500m;
lua_shared_dict local_cache_locks 5m;
lua_shared_dict faulty_backend 100m;
lua_shared_dict faulty_backend_locks 1m;
init_worker_by_lua_file lua/init_worker_phase.lua;

server {
Expand Down