From baf0210b75ce49895c29244b2248030d798c5ae7 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Wed, 31 Aug 2016 16:50:21 -0700 Subject: [PATCH] feat: support for Cassandra 3.x very sketchy --- src/cassandra.lua | 31 ++++++++++++++++++++++++++----- src/cassandra/cache.lua | 26 +++++++++++++++++++++++++- src/cassandra/options.lua | 3 +-- 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/src/cassandra.lua b/src/cassandra.lua index 954c45e..ba3739e 100644 --- a/src/cassandra.lua +++ b/src/cassandra.lua @@ -98,7 +98,7 @@ function Host:new(address, options) host = host, port = port, address = address, - protocol_version = DEFAULT_PROTOCOL_VERSION, + protocol_version = options.protocol_options.version or DEFAULT_PROTOCOL_VERSION, options = options, reconnection_policy = options.policies.reconnection } @@ -259,7 +259,6 @@ function Host:connect() if ready then log.debug("Host at "..self.address.." is ready with protocol v"..self.protocol_version) - if self.options.keyspace ~= nil then local _, err = change_keyspace(self, self.options.keyspace) if err then @@ -267,7 +266,6 @@ function Host:connect() return false, err end end - self.connected = true return true end @@ -1004,9 +1002,11 @@ function Cassandra.spawn_session(options) end local SELECT_PEERS_QUERY = "SELECT peer,data_center,rack,rpc_address,release_version FROM system.peers" +local SELECT_LOCAL_QUERY = "SELECT release_version FROM system.local" -- Retrieve cluster informations from a connected contact_point -function Cassandra.refresh_hosts(options) +function Cassandra.refresh_hosts(ops) + local options = opts.parse_cluster(ops) local addresses = {} local lock, lock_err, elapsed = lock_mutex(options.shm, "refresh_hosts") @@ -1029,6 +1029,7 @@ function Cassandra.refresh_hosts(options) end local peers_query = Requests.QueryRequest(SELECT_PEERS_QUERY) + local local_query = Requests.QueryRequest(SELECT_LOCAL_QUERY) local hosts = {} local local_host = { @@ -1036,6 +1037,18 @@ function Cassandra.refresh_hosts(options) reconnection_delay = 0 } hosts[coordinator.address] = local_host + + local rows, err = coordinator:send(local_query) + if not rows then + return nil, err + end + + local release_version = rows[1].release_version + local ok, cache_err = cache.set_cluster_info(options.shm, release_version) + if not ok then + return nil, cache_err + end + log.info("Local info retrieved") local rows, err = coordinator:send(peers_query) @@ -1043,7 +1056,11 @@ function Cassandra.refresh_hosts(options) return nil, err end - for _, row in ipairs(rows) do + for i, row in ipairs(rows) do + if i > 1 and release_version ~= row.release_version then + return nil, Errors.DriverError("nodes have different release versions ("..release_version.." vs "..row.release_version..")") + end + local address = options.policies.address_resolution(row["rpc_address"], options.protocol_options.default_port) log.info("Adding host "..address) hosts[address] = { @@ -1129,6 +1146,10 @@ function Cassandra.spawn_cluster(options) return true end +function Cassandra.get_cluster_info(options) + return cache.get_cluster_info(options.shm) +end + --- Type serializer shorthands. -- When binding parameters to a query from `execute`, some -- types cannot be infered automatically and will require manual diff --git a/src/cassandra/cache.lua b/src/cassandra/cache.lua index 841ee21..f398d9b 100644 --- a/src/cassandra/cache.lua +++ b/src/cassandra/cache.lua @@ -202,6 +202,28 @@ local function get_prepared_query_id(options, query) return value, nil, prepared_key end +local function set_cluster_info(shm, release_version) + local dict = get_dict(shm) + local ok, err = dict:safe_set("cluster:release_version", release_version) + if not ok then + return false, Errors.SharedDictError("Cannot store cluster info in shm "..shm..": "..err, shm) + end + return true +end + +local function get_cluster_info(shm) + local dict = get_dict(shm) + local release_version, err = dict:get("cluster:release_version") + if err then + return nil, Errors.SharedDictError("Cannot retrieve cluster info in shm "..shm..": "..err, shm) + elseif not release_version then + return nil, Errors.DriverError("No cluster info in shm "..shm) + end + return { + release_version = release_version + } +end + return { get_dict = get_dict, get_host = get_host, @@ -209,5 +231,7 @@ return { set_hosts = set_hosts, get_hosts = get_hosts, set_prepared_query_id = set_prepared_query_id, - get_prepared_query_id = get_prepared_query_id + get_prepared_query_id = get_prepared_query_id, + set_cluster_info = set_cluster_info, + get_cluster_info = get_cluster_info } diff --git a/src/cassandra/options.lua b/src/cassandra/options.lua index f7aaca9..897e08c 100644 --- a/src/cassandra/options.lua +++ b/src/cassandra/options.lua @@ -27,6 +27,7 @@ local DEFAULTS = { retry_on_timeout = true }, protocol_options = { + version = 3, default_port = 9042, max_schema_consensus_wait = 10000 }, @@ -173,8 +174,6 @@ local function parse_cluster(options) return nil, "contact_points must contain at least one contact point" end - options.keyspace = nil -- it makes no sense to use keyspace in this context - return options end