diff --git a/CubeProxy/lua/init_worker_phase.lua b/CubeProxy/lua/init_worker_phase.lua index 83392f6e..74fa581f 100644 --- a/CubeProxy/lua/init_worker_phase.lua +++ b/CubeProxy/lua/init_worker_phase.lua @@ -3,6 +3,10 @@ local function monitor_cache_usage() ngx.shared.local_cache:set("cache_free_space", cache_free_space) end +-- Seed the random number generator for each worker to ensure +-- that cache TTL jitter (math.random) works correctly. +math.randomseed(ngx.now() * 1000 + ngx.worker.id()) + local worker_id = ngx.worker.id() -- Only worker 0 performs these timed tasks -- Even if worker PID is changed, worker ID still keep same diff --git a/CubeProxy/lua/rewrite_phase.lua b/CubeProxy/lua/rewrite_phase.lua index 15c26d14..75e25038 100644 --- a/CubeProxy/lua/rewrite_phase.lua +++ b/CubeProxy/lua/rewrite_phase.lua @@ -79,60 +79,71 @@ local function get_backend_address(ins_id, container_port) local host_ip = cache:get(cache_backend_ip_key) local host_port = cache:get(cache_backend_port_key) if host_ip and host_port then - cache:set(cache_backend_ip_key, host_ip, timeout) - cache:set(cache_backend_port_key, host_port, timeout) return host_ip, host_port end - local metadata, err = load_sandbox_proxy_metadata(ins_id) - if err then - ngx.log(ngx.ERR, "LEVEL_ERROR||", err) - ngx.var.cube_retcode = "310500" - ngx.exit(500) - end + local fn = function() + -- 2nd check + local h_ip = cache:get(cache_backend_ip_key) + local h_port = cache:get(cache_backend_port_key) + if h_ip and h_port then + return {ip = h_ip, port = h_port}, nil + end - local metadata_map = {} - for i = 1, #metadata, 2 do - local k = metadata[i] - local v = metadata[i + 1] - metadata_map[k] = v - cache:set(ins_id .. ":" .. k, v, timeout) - end + local metadata, err = load_sandbox_proxy_metadata(ins_id) + if err then + return nil, {code = "310500", level = ngx.ERR, msg = err} + end - local target_host_ip = metadata_map["HostIP"] - local target_sandbox_ip = metadata_map["SandboxIP"] - if utils:is_null(target_host_ip) then - ngx.log(ngx.ERR, "LEVEL_WARN||", - string.format("request %s using instance %s misses HostIP", ngx.var.http_x_cube_request_id, ins_id)) - ngx.var.cube_retcode = "310507" - ngx.exit(500) + local metadata_map = {} + for i = 1, #metadata, 2 do + local k = metadata[i] + local v = metadata[i + 1] + metadata_map[k] = v + cache:set(ins_id .. ":" .. k, v, timeout) + end + + local target_host_ip = metadata_map["HostIP"] + local target_sandbox_ip = metadata_map["SandboxIP"] + if utils:is_null(target_host_ip) then + return nil, {code = "310507", level = ngx.WARN, msg = string.format("request %s using instance %s misses HostIP", ngx.var.http_x_cube_request_id, ins_id)} + end + + if not utils:is_null(caller_host_ip) and caller_host_ip == target_host_ip then + if utils:is_null(target_sandbox_ip) then + return nil, {code = "310507", level = ngx.ERR, msg = string.format("request %s instance %s on local host %s misses SandboxIP", ngx.var.http_x_cube_request_id, ins_id, caller_host_ip)} + end + h_ip = target_sandbox_ip + h_port = container_port + else + h_ip = target_host_ip + h_port = metadata_map[container_port] + if utils:is_null(h_port) then + return nil, {code = "310507", level = ngx.ERR, msg = string.format("request %s instance %s misses host port mapping for container_port %s", ngx.var.http_x_cube_request_id, ins_id, container_port)} + end + end + + cache:set(cache_backend_ip_key, h_ip, timeout) + cache:set(cache_backend_port_key, h_port, timeout) + return {ip = h_ip, port = h_port}, nil end - if not utils:is_null(caller_host_ip) and caller_host_ip == target_host_ip then - if utils:is_null(target_sandbox_ip) then - ngx.log(ngx.ERR, "LEVEL_ERROR||", - string.format("request %s instance %s on local host %s misses SandboxIP", - ngx.var.http_x_cube_request_id, ins_id, caller_host_ip)) - ngx.var.cube_retcode = "310507" + local res, err, shared = utils:singleflight_do("local_cache_locks", "meta:" .. ins_id, fn) + + if not res then + if type(err) == "table" and err.code then + local level_str = (err.level == ngx.WARN) and "LEVEL_WARN||" or "LEVEL_ERROR||" + ngx.log(err.level or ngx.ERR, level_str, err.msg) + ngx.var.cube_retcode = err.code ngx.exit(500) - end - host_ip = target_sandbox_ip - host_port = container_port - else - host_ip = target_host_ip - host_port = metadata_map[container_port] - if utils:is_null(host_port) then - ngx.log(ngx.ERR, "LEVEL_ERROR||", - string.format("request %s instance %s misses host port mapping for container_port %s", - ngx.var.http_x_cube_request_id, ins_id, container_port)) - ngx.var.cube_retcode = "310507" + else + ngx.log(ngx.ERR, "LEVEL_ERROR||", tostring(err)) + ngx.var.cube_retcode = "310500" ngx.exit(500) end end - cache:set(cache_backend_ip_key, host_ip, timeout) - cache:set(cache_backend_port_key, host_port, timeout) - return host_ip, host_port + return res.ip, res.port end -- resolve sandbox id and container port from Host: -. diff --git a/CubeProxy/lua/utils.lua b/CubeProxy/lua/utils.lua index d862b213..76a30f77 100644 --- a/CubeProxy/lua/utils.lua +++ b/CubeProxy/lua/utils.lua @@ -43,49 +43,93 @@ function _M.is_null(self, str) end --[[ - 1 arg: - - backend_ip: the backend to check - - check_local: whether check the local cache first + Mimics golang.org/x/sync/singleflight Do() interface. + Since OpenResty workers are separate processes, we use resty.lock for cross-worker synchronization. + The fn() provided MUST implement a cache check (double-checked locking) as its first step. + Returns: val, err, shared (boolean indicating if we waited for the lock) +]] +function _M.singleflight_do(self, lock_dict_name, key, fn) + local resty_lock = require "resty.lock" + local lock, err = resty_lock:new(lock_dict_name, { + exptime = 10, + timeout = 5 + }) + if not lock then + ngx.log(ngx.ERR, "LEVEL_ERROR||", "failed to create singleflight lock: ", err) + return nil, err, false + end + + local elapsed, err = lock:lock(key) + if not elapsed then + ngx.log(ngx.ERR, "LEVEL_ERROR||", "failed to acquire singleflight lock: ", err) + return nil, err, false + end + + local shared = (elapsed > 0) + local ok, res, func_err = xpcall(fn, debug.traceback) + + local unlock_ok, unlock_err = lock:unlock() + if not unlock_ok and unlock_err ~= "unlocked" then + ngx.log(ngx.ERR, "LEVEL_ERROR||", "singleflight unlock failed: ", unlock_err) + end + + if not ok then + ngx.log(ngx.ERR, "LEVEL_ERROR||", "singleflight fn error: ", res) + return nil, res, shared + end + + return res, func_err, shared +end + +--[[ + 2 args: + - backend_ip: the backend IP to check + - check_remote: if true, query Redis on cache miss; if false, return false immediately 2 return values: - - true if redis mark the backend is faulty, false otherwise + - true if the backend is marked faulty, false otherwise - error: any error that occurred during executing the function --]] function _M.is_faulty_backend(self, backend_ip, check_remote) local cache = ngx.shared.faulty_backend + + -- 1st check: fast path, no lock needed local value = cache:get(backend_ip) - if not value or value ~= "true" then - return false, nil + if value ~= nil then + return value == "true", nil end + -- network I/O is disabled in header_filter phase, return directly if check_remote == false then - return true, nil - end - - local redis = require "redis_iresty" - local red = redis:new({ - redis_ip = ngx.var.redis_ip, - redis_port = ngx.var.redis_port, - redis_pd = ngx.var.redis_pd, - redis_index = ngx.var.redis_index - }) - local key = "faulty_backend_set" - local err - value, err = red:smembers(key) - if err then - return false, err - end - - if not value then return false, nil end - for _, v in ipairs(value) do - if v == backend_ip then - return true, nil + local fn = function() + -- 2nd check: another concurrent request may have populated the cache while we waited + local val = cache:get(backend_ip) + if val ~= nil then + return val == "true", nil end + + -- cache is still empty — we are the single flight, query Redis + local redis = require "redis_iresty" + local red = redis:new({ + redis_ip = ngx.var.redis_ip, + redis_port = ngx.var.redis_port, + redis_pd = ngx.var.redis_pd, + redis_index = ngx.var.redis_index + }) + local res, err = red:sismember("faulty_backend_set", backend_ip) + if err then + return false, err + end + + local is_faulty = (res == 1) + cache:set(backend_ip, is_faulty and "true" or "false", 5) + return is_faulty, nil end - return false, nil + local res, err, shared = self:singleflight_do("faulty_backend_locks", "faulty:" .. backend_ip, fn) + return res, err end return _M diff --git a/CubeProxy/nginx.conf b/CubeProxy/nginx.conf index e0fa4353..b2068fad 100644 --- a/CubeProxy/nginx.conf +++ b/CubeProxy/nginx.conf @@ -83,7 +83,9 @@ http { } lua_shared_dict local_cache 500m; + lua_shared_dict local_cache_locks 5m; lua_shared_dict faulty_backend 100m; + lua_shared_dict faulty_backend_locks 1m; init_worker_by_lua_file lua/init_worker_phase.lua; server {