Skip to content

Commit

Permalink
feat(clustering/rpc): support jsonrpc notification (#13948)
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw authored Dec 18, 2024
1 parent 59118e2 commit be7e356
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 37 deletions.
19 changes: 16 additions & 3 deletions kong/clustering/rpc/concentrator.lua
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,14 @@ function _M:_event_loop(lconn)
"unknown requester for RPC")

local res, err = self.manager:_local_call(target_id, payload.method,
payload.params)
payload.params, not payload.id)

-- notification has no callback or id
if not payload.id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
goto continue
end

if res then
-- call success
res, err = self:_enqueue_rpc_response(reply_to, {
Expand All @@ -180,6 +187,8 @@ function _M:_event_loop(lconn)
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end
end

::continue::
end
end
end
Expand Down Expand Up @@ -287,9 +296,13 @@ end
-- This way the manager code wouldn't tell the difference
-- between calls made over WebSocket or concentrator
function _M:call(node_id, method, params, callback)
local id = self:_get_next_id()
local id

self.interest[id] = callback
-- notification has no callback or id
if callback then
id = self:_get_next_id()
self.interest[id] = callback
end

return self:_enqueue_rpc_request(node_id, {
jsonrpc = jsonrpc.VERSION,
Expand Down
27 changes: 21 additions & 6 deletions kong/clustering/rpc/future.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,36 @@ local STATE_SUCCEED = 3
local STATE_ERRORED = 4


function _M.new(node_id, socket, method, params)
function _M.new(node_id, socket, method, params, is_notification)
local self = {
method = method,
params = params,
sema = semaphore.new(),
socket = socket,
node_id = node_id,
id = nil,
result = nil,
error = nil,
state = STATE_NEW, -- STATE_*
is_notification = is_notification,
}

if not is_notification then
self.id = nil
self.result = nil
self.error = nil
self.state = STATE_NEW -- STATE_*
self.sema = semaphore.new()
end

return setmetatable(self, _MT)
end


-- start executing the future
function _M:start()
-- notification has no callback
if self.is_notification then
return self.socket:call(self.node_id,
self.method,
self.params)
end

assert(self.state == STATE_NEW)
self.state = STATE_IN_PROGRESS

Expand Down Expand Up @@ -60,6 +71,10 @@ end


function _M:wait(timeout)
if self.is_notification then
return nil, "the notification cannot be waited"
end

assert(self.state == STATE_IN_PROGRESS)

local res, err = self.sema:wait(timeout)
Expand Down
68 changes: 45 additions & 23 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL
local parse_proxy_url = require("kong.clustering.utils").parse_proxy_url


local _log_prefix = "[rpc] "
local RPC_MATA_V1 = "kong.meta.v1"
local RPC_SNAPPY_FRAMED = "x-snappy-framed"

Expand Down Expand Up @@ -276,7 +277,7 @@ end
-- low level helper used internally by :call() and concentrator
-- this one does not consider forwarding using concentrator
-- when node does not exist
function _M:_local_call(node_id, method, params)
function _M:_local_call(node_id, method, params, is_notification)
if not self.client_capabilities[node_id] then
return nil, "node is not connected, node_id: " .. node_id
end
Expand All @@ -289,9 +290,14 @@ function _M:_local_call(node_id, method, params)

local s = next(self.clients[node_id]) -- TODO: better LB?

local fut = future.new(node_id, s, method, params)
local fut = future.new(node_id, s, method, params, is_notification)
assert(fut:start())

-- notification need not to wait
if is_notification then
return true
end

local ok, err = fut:wait(5)
if err then
return nil, err
Expand All @@ -305,9 +311,7 @@ function _M:_local_call(node_id, method, params)
end


-- public interface, try call on node_id locally first,
-- if node is not connected, try concentrator next
function _M:call(node_id, method, ...)
function _M:_call_or_notify(is_notification, node_id, method, ...)
local cap = utils.parse_method_name(method)

local res, err = self:_find_node_and_check_capability(node_id, cap)
Expand All @@ -318,50 +322,68 @@ function _M:call(node_id, method, ...)
local params = {...}

ngx_log(ngx_DEBUG,
"[rpc] calling ", method,
_log_prefix,
is_notification and "notifying " or "calling ",
method,
"(node_id: ", node_id, ")",
" via ", res == "local" and "local" or "concentrator"
)

if res == "local" then
res, err = self:_local_call(node_id, method, params)
res, err = self:_local_call(node_id, method, params, is_notification)

if not res then
ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err)
ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", err)
return nil, err
end

ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded")
ngx_log(ngx_DEBUG, _log_prefix, method, " succeeded")

return res
end

assert(res == "concentrator")

-- try concentrator
local fut = future.new(node_id, self.concentrator, method, params)
local fut = future.new(node_id, self.concentrator, method, params, is_notification)
assert(fut:start())

if is_notification then
return true
end

local ok, err = fut:wait(5)

if err then
ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err)
ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", err)

return nil, err
end

if ok then
ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded")
ngx_log(ngx_DEBUG, _log_prefix, method, " succeeded")

return fut.result
end

ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", fut.error.message)
ngx_log(ngx_DEBUG, _log_prefix, method, " failed, err: ", fut.error.message)

return nil, fut.error.message
end


-- public interface, try call on node_id locally first,
-- if node is not connected, try concentrator next
function _M:call(node_id, method, ...)
return self:_call_or_notify(false, node_id, method, ...)
end


function _M:notify(node_id, method, ...)
return self:_call_or_notify(true, node_id, method, ...)
end


-- handle incoming client connections
function _M:handle_websocket()
local rpc_protocol = ngx_var.http_sec_websocket_protocol
Expand All @@ -379,15 +401,15 @@ function _M:handle_websocket()
end

if not meta_v1_supported then
ngx_log(ngx_ERR, "[rpc] unknown RPC protocol: " ..
ngx_log(ngx_ERR, _log_prefix, "unknown RPC protocol: " ..
tostring(rpc_protocol) ..
", doesn't know how to communicate with client")
return ngx_exit(ngx.HTTP_CLOSE)
end

local cert, err = validate_client_cert(self.conf, self.cluster_cert, ngx_var.ssl_client_raw_cert)
if not cert then
ngx_log(ngx_ERR, "[rpc] client's certificate failed validation: ", err)
ngx_log(ngx_ERR, _log_prefix, "client's certificate failed validation: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

Expand All @@ -396,14 +418,14 @@ function _M:handle_websocket()

local wb, err = server:new(WS_OPTS)
if not wb then
ngx_log(ngx_ERR, "[rpc] unable to establish WebSocket connection with client: ", err)
ngx_log(ngx_ERR, _log_prefix, "unable to establish WebSocket connection with client: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

-- if timeout (default is 5s) we will close the connection
local node_id, err = self:_handle_meta_call(wb)
if not node_id then
ngx_log(ngx_ERR, "[rpc] unable to handshake with client: ", err)
ngx_log(ngx_ERR, _log_prefix, "unable to handshake with client: ", err)
return ngx_exit(ngx.HTTP_CLOSE)
end

Expand All @@ -415,7 +437,7 @@ function _M:handle_websocket()
self:_remove_socket(s)

if not res then
ngx_log(ngx_ERR, "[rpc] RPC connection broken: ", err, " node_id: ", node_id)
ngx_log(ngx_ERR, _log_prefix, "RPC connection broken: ", err, " node_id: ", node_id)
return ngx_exit(ngx.ERROR)
end

Expand Down Expand Up @@ -488,7 +510,7 @@ function _M:connect(premature, node_id, host, path, cert, key)

local ok, err = c:connect(uri, opts)
if not ok then
ngx_log(ngx_ERR, "[rpc] unable to connect to peer: ", err)
ngx_log(ngx_ERR, _log_prefix, "unable to connect to peer: ", err)
goto err
end

Expand All @@ -497,7 +519,7 @@ function _M:connect(premature, node_id, host, path, cert, key)
-- FIXME: resp_headers should not be case sensitive

if not resp_headers or not resp_headers["sec_websocket_protocol"] then
ngx_log(ngx_ERR, "[rpc] peer did not provide sec_websocket_protocol, node_id: ", node_id)
ngx_log(ngx_ERR, _log_prefix, "peer did not provide sec_websocket_protocol, node_id: ", node_id)
c:send_close() -- can't do much if this fails
goto err
end
Expand All @@ -506,15 +528,15 @@ function _M:connect(premature, node_id, host, path, cert, key)
local meta_cap = resp_headers["sec_websocket_protocol"]

if meta_cap ~= RPC_MATA_V1 then
ngx_log(ngx_ERR, "[rpc] did not support protocol : ", meta_cap)
ngx_log(ngx_ERR, _log_prefix, "did not support protocol : ", meta_cap)
c:send_close() -- can't do much if this fails
goto err
end

-- if timeout (default is 5s) we will close the connection
local ok, err = self:_meta_call(c, meta_cap, node_id)
if not ok then
ngx_log(ngx_ERR, "[rpc] unable to handshake with server, node_id: ", node_id,
ngx_log(ngx_ERR, _log_prefix, "unable to handshake with server, node_id: ", node_id,
" err: ", err)
c:send_close() -- can't do much if this fails
goto err
Expand All @@ -529,7 +551,7 @@ function _M:connect(premature, node_id, host, path, cert, key)
self:_remove_socket(s)

if not ok then
ngx_log(ngx_ERR, "[rpc] connection to node_id: ", node_id, " broken, err: ",
ngx_log(ngx_ERR, _log_prefix, "connection to node_id: ", node_id, " broken, err: ",
err, ", reconnecting in ", reconnection_delay, " seconds")
end
end
Expand Down
25 changes: 20 additions & 5 deletions kong/clustering/rpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ function _M._dispatch(premature, self, cb, payload)
if not res then
ngx_log(ngx_WARN, "[rpc] RPC callback failed: ", err)

-- notification has no response
if not payload.id then
return
end

res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR,
err))
if not res then
Expand All @@ -77,6 +82,12 @@ function _M._dispatch(premature, self, cb, payload)
return
end

-- notification has no response
if not payload.id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
return
end

-- success
res, err = self.outgoing:push({
jsonrpc = jsonrpc.VERSION,
Expand Down Expand Up @@ -151,7 +162,7 @@ function _M:start()
ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")")

local dispatch_cb = self.manager.callbacks.callbacks[payload.method]
if not dispatch_cb then
if not dispatch_cb and payload.id then
local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND))
if not res then
return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err
Expand All @@ -162,9 +173,9 @@ function _M:start()

-- call dispatch
local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s",
self.node_id, payload.id, payload.method),
self.node_id, payload.id or 0, payload.method),
0, _M._dispatch, self, dispatch_cb, payload)
if not res then
if not res and payload.id then
local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR))
if not reso then
return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro
Expand Down Expand Up @@ -271,9 +282,13 @@ end
function _M:call(node_id, method, params, callback)
assert(node_id == self.node_id)

local id = self:_get_next_id()
local id

self.interest[id] = callback
-- notification has no callback or id
if callback then
id = self:_get_next_id()
self.interest[id] = callback
end

return self.outgoing:push({
jsonrpc = jsonrpc.VERSION,
Expand Down

1 comment on commit be7e356

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:be7e3567e459d2df6c9252eb4332fca46887ab81
Artifacts available https://github.com/Kong/kong/actions/runs/12387451618

Please sign in to comment.