diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua index 68bb0bc3388..80d19cad769 100644 --- a/kong/clustering/rpc/concentrator.lua +++ b/kong/clustering/rpc/concentrator.lua @@ -154,7 +154,14 @@ function _M:_event_loop(lconn) "unknown requester for RPC") local res, err = self.manager:_local_call(target_id, payload.method, - payload.params) + payload.params, not payload.id) + + -- notification has no callback or id + if not payload.id then + ngx_log(ngx_DEBUG, "[rpc] notification has no response") + goto continue + end + if res then -- call success res, err = self:_enqueue_rpc_response(reply_to, { @@ -180,6 +187,8 @@ function _M:_event_loop(lconn) ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) end end + + ::continue:: end end end @@ -287,9 +296,13 @@ end -- This way the manager code wouldn't tell the difference -- between calls made over WebSocket or concentrator function _M:call(node_id, method, params, callback) - local id = self:_get_next_id() + local id - self.interest[id] = callback + -- notification has no callback or id + if callback then + id = self:_get_next_id() + self.interest[id] = callback + end return self:_enqueue_rpc_request(node_id, { jsonrpc = jsonrpc.VERSION, diff --git a/kong/clustering/rpc/future.lua b/kong/clustering/rpc/future.lua index 68ed82720f0..ee91ed9e54f 100644 --- a/kong/clustering/rpc/future.lua +++ b/kong/clustering/rpc/future.lua @@ -12,25 +12,36 @@ local STATE_SUCCEED = 3 local STATE_ERRORED = 4 -function _M.new(node_id, socket, method, params) +function _M.new(node_id, socket, method, params, is_notification) local self = { method = method, params = params, - sema = semaphore.new(), socket = socket, node_id = node_id, - id = nil, - result = nil, - error = nil, - state = STATE_NEW, -- STATE_* + is_notification = is_notification, } + if not is_notification then + self.id = nil + self.result = nil + self.error = nil + self.state = STATE_NEW -- STATE_* + self.sema = semaphore.new() + end + return setmetatable(self, _MT) end -- start executing the future function _M:start() + -- notification has no callback + if self.is_notification then + return self.socket:call(self.node_id, + self.method, + self.params) + end + assert(self.state == STATE_NEW) self.state = STATE_IN_PROGRESS @@ -60,6 +71,10 @@ end function _M:wait(timeout) + if self.is_notification then + return nil, "the notification cannot be waited" + end + assert(self.state == STATE_IN_PROGRESS) local res, err = self.sema:wait(timeout) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 3d08963b468..ea5c4f5a282 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -33,6 +33,7 @@ local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL local parse_proxy_url = require("kong.clustering.utils").parse_proxy_url +local _log_prefix = "[rpc] " local RPC_MATA_V1 = "kong.meta.v1" local RPC_SNAPPY_FRAMED = "x-snappy-framed" @@ -276,7 +277,7 @@ end -- low level helper used internally by :call() and concentrator -- this one does not consider forwarding using concentrator -- when node does not exist -function _M:_local_call(node_id, method, params) +function _M:_local_call(node_id, method, params, is_notification) if not self.client_capabilities[node_id] then return nil, "node is not connected, node_id: " .. node_id end @@ -289,9 +290,14 @@ function _M:_local_call(node_id, method, params) local s = next(self.clients[node_id]) -- TODO: better LB? - local fut = future.new(node_id, s, method, params) + local fut = future.new(node_id, s, method, params, is_notification) assert(fut:start()) + -- notification need not to wait + if is_notification then + return true + end + local ok, err = fut:wait(5) if err then return nil, err @@ -305,9 +311,7 @@ function _M:_local_call(node_id, method, params) end --- public interface, try call on node_id locally first, --- if node is not connected, try concentrator next -function _M:call(node_id, method, ...) +function _M:_call_or_notify(is_notification, node_id, method, ...) local cap = utils.parse_method_name(method) local res, err = self:_find_node_and_check_capability(node_id, cap) @@ -318,20 +322,22 @@ function _M:call(node_id, method, ...) local params = {...} ngx_log(ngx_DEBUG, - "[rpc] calling ", method, + _log_prefix, + is_notification and "notifying " or "calling ", + method, "(node_id: ", node_id, ")", " via ", res == "local" and "local" or "concentrator" ) if res == "local" then - res, err = self:_local_call(node_id, method, params) + res, err = self:_local_call(node_id, method, params, is_notification) if not res then - ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err) + ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", err) return nil, err end - ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded") + ngx_log(ngx_DEBUG, _log_prefix, method, " succeeded") return res end @@ -339,29 +345,45 @@ function _M:call(node_id, method, ...) assert(res == "concentrator") -- try concentrator - local fut = future.new(node_id, self.concentrator, method, params) + local fut = future.new(node_id, self.concentrator, method, params, is_notification) assert(fut:start()) + if is_notification then + return true + end + local ok, err = fut:wait(5) if err then - ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err) + ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", err) return nil, err end if ok then - ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded") + ngx_log(ngx_DEBUG, _log_prefix, method, " succeeded") return fut.result end - ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", fut.error.message) + ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", fut.error.message) return nil, fut.error.message end +-- public interface, try call on node_id locally first, +-- if node is not connected, try concentrator next +function _M:call(node_id, method, ...) + return self:_call_or_notify(false, node_id, method, ...) +end + + +function _M:notify(node_id, method, ...) + return self:_call_or_notify(true, node_id, method, ...) +end + + -- handle incoming client connections function _M:handle_websocket() local rpc_protocol = ngx_var.http_sec_websocket_protocol @@ -379,7 +401,7 @@ function _M:handle_websocket() end if not meta_v1_supported then - ngx_log(ngx_ERR, "[rpc] unknown RPC protocol: " .. + ngx_log(ngx_ERR, _log_prefix, "unknown RPC protocol: " .. tostring(rpc_protocol) .. ", doesn't know how to communicate with client") return ngx_exit(ngx.HTTP_CLOSE) @@ -387,7 +409,7 @@ function _M:handle_websocket() local cert, err = validate_client_cert(self.conf, self.cluster_cert, ngx_var.ssl_client_raw_cert) if not cert then - ngx_log(ngx_ERR, "[rpc] client's certificate failed validation: ", err) + ngx_log(ngx_ERR, _log_prefix, "client's certificate failed validation: ", err) return ngx_exit(ngx.HTTP_CLOSE) end @@ -396,14 +418,14 @@ function _M:handle_websocket() local wb, err = server:new(WS_OPTS) if not wb then - ngx_log(ngx_ERR, "[rpc] unable to establish WebSocket connection with client: ", err) + ngx_log(ngx_ERR, _log_prefix, "unable to establish WebSocket connection with client: ", err) return ngx_exit(ngx.HTTP_CLOSE) end -- if timeout (default is 5s) we will close the connection local node_id, err = self:_handle_meta_call(wb) if not node_id then - ngx_log(ngx_ERR, "[rpc] unable to handshake with client: ", err) + ngx_log(ngx_ERR, _log_prefix, "unable to handshake with client: ", err) return ngx_exit(ngx.HTTP_CLOSE) end @@ -415,7 +437,7 @@ function _M:handle_websocket() self:_remove_socket(s) if not res then - ngx_log(ngx_ERR, "[rpc] RPC connection broken: ", err, " node_id: ", node_id) + ngx_log(ngx_ERR, _log_prefix, "RPC connection broken: ", err, " node_id: ", node_id) return ngx_exit(ngx.ERROR) end @@ -488,7 +510,7 @@ function _M:connect(premature, node_id, host, path, cert, key) local ok, err = c:connect(uri, opts) if not ok then - ngx_log(ngx_ERR, "[rpc] unable to connect to peer: ", err) + ngx_log(ngx_ERR, _log_prefix, "unable to connect to peer: ", err) goto err end @@ -497,7 +519,7 @@ function _M:connect(premature, node_id, host, path, cert, key) -- FIXME: resp_headers should not be case sensitive if not resp_headers or not resp_headers["sec_websocket_protocol"] then - ngx_log(ngx_ERR, "[rpc] peer did not provide sec_websocket_protocol, node_id: ", node_id) + ngx_log(ngx_ERR, _log_prefix, "peer did not provide sec_websocket_protocol, node_id: ", node_id) c:send_close() -- can't do much if this fails goto err end @@ -506,7 +528,7 @@ function _M:connect(premature, node_id, host, path, cert, key) local meta_cap = resp_headers["sec_websocket_protocol"] if meta_cap ~= RPC_MATA_V1 then - ngx_log(ngx_ERR, "[rpc] did not support protocol : ", meta_cap) + ngx_log(ngx_ERR, _log_prefix, "did not support protocol : ", meta_cap) c:send_close() -- can't do much if this fails goto err end @@ -514,7 +536,7 @@ function _M:connect(premature, node_id, host, path, cert, key) -- if timeout (default is 5s) we will close the connection local ok, err = self:_meta_call(c, meta_cap, node_id) if not ok then - ngx_log(ngx_ERR, "[rpc] unable to handshake with server, node_id: ", node_id, + ngx_log(ngx_ERR, _log_prefix, "unable to handshake with server, node_id: ", node_id, " err: ", err) c:send_close() -- can't do much if this fails goto err @@ -529,7 +551,7 @@ function _M:connect(premature, node_id, host, path, cert, key) self:_remove_socket(s) if not ok then - ngx_log(ngx_ERR, "[rpc] connection to node_id: ", node_id, " broken, err: ", + ngx_log(ngx_ERR, _log_prefix, "connection to node_id: ", node_id, " broken, err: ", err, ", reconnecting in ", reconnection_delay, " seconds") end end diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 045ca8c7557..2044acf170a 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -68,6 +68,11 @@ function _M._dispatch(premature, self, cb, payload) if not res then ngx_log(ngx_WARN, "[rpc] RPC callback failed: ", err) + -- notification has no response + if not payload.id then + return + end + res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR, err)) if not res then @@ -77,6 +82,12 @@ function _M._dispatch(premature, self, cb, payload) return end + -- notification has no response + if not payload.id then + ngx_log(ngx_DEBUG, "[rpc] notification has no response") + return + end + -- success res, err = self.outgoing:push({ jsonrpc = jsonrpc.VERSION, @@ -151,7 +162,7 @@ function _M:start() ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")") local dispatch_cb = self.manager.callbacks.callbacks[payload.method] - if not dispatch_cb then + if not dispatch_cb and payload.id then local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) if not res then return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err @@ -162,9 +173,9 @@ function _M:start() -- call dispatch local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", - self.node_id, payload.id, payload.method), + self.node_id, payload.id or 0, payload.method), 0, _M._dispatch, self, dispatch_cb, payload) - if not res then + if not res and payload.id then local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR)) if not reso then return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro @@ -271,9 +282,13 @@ end function _M:call(node_id, method, params, callback) assert(node_id == self.node_id) - local id = self:_get_next_id() + local id - self.interest[id] = callback + -- notification has no callback or id + if callback then + id = self:_get_next_id() + self.interest[id] = callback + end return self.outgoing:push({ jsonrpc = jsonrpc.VERSION,