Skip to content

Commit

Permalink
feat: support for Cassandra 3.x
Browse files Browse the repository at this point in the history
very sketchy
  • Loading branch information
thibaultcha committed Sep 1, 2016
1 parent 6f01ace commit baf0210
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 8 deletions.
31 changes: 26 additions & 5 deletions src/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ function Host:new(address, options)
host = host,
port = port,
address = address,
protocol_version = DEFAULT_PROTOCOL_VERSION,
protocol_version = options.protocol_options.version or DEFAULT_PROTOCOL_VERSION,
options = options,
reconnection_policy = options.policies.reconnection
}
Expand Down Expand Up @@ -259,15 +259,13 @@ function Host:connect()

if ready then
log.debug("Host at "..self.address.." is ready with protocol v"..self.protocol_version)

if self.options.keyspace ~= nil then
local _, err = change_keyspace(self, self.options.keyspace)
if err then
log.err("Could not set keyspace. "..err)
return false, err
end
end

self.connected = true
return true
end
Expand Down Expand Up @@ -1004,9 +1002,11 @@ function Cassandra.spawn_session(options)
end

local SELECT_PEERS_QUERY = "SELECT peer,data_center,rack,rpc_address,release_version FROM system.peers"
local SELECT_LOCAL_QUERY = "SELECT release_version FROM system.local"

-- Retrieve cluster informations from a connected contact_point
function Cassandra.refresh_hosts(options)
function Cassandra.refresh_hosts(ops)
local options = opts.parse_cluster(ops)
local addresses = {}

local lock, lock_err, elapsed = lock_mutex(options.shm, "refresh_hosts")
Expand All @@ -1029,21 +1029,38 @@ function Cassandra.refresh_hosts(options)
end

local peers_query = Requests.QueryRequest(SELECT_PEERS_QUERY)
local local_query = Requests.QueryRequest(SELECT_LOCAL_QUERY)
local hosts = {}

local local_host = {
unhealthy_at = 0,
reconnection_delay = 0
}
hosts[coordinator.address] = local_host

local rows, err = coordinator:send(local_query)
if not rows then
return nil, err
end

local release_version = rows[1].release_version
local ok, cache_err = cache.set_cluster_info(options.shm, release_version)
if not ok then
return nil, cache_err
end

log.info("Local info retrieved")

local rows, err = coordinator:send(peers_query)
if err then
return nil, err
end

for _, row in ipairs(rows) do
for i, row in ipairs(rows) do
if i > 1 and release_version ~= row.release_version then
return nil, Errors.DriverError("nodes have different release versions ("..release_version.." vs "..row.release_version..")")
end

local address = options.policies.address_resolution(row["rpc_address"], options.protocol_options.default_port)
log.info("Adding host "..address)
hosts[address] = {
Expand Down Expand Up @@ -1129,6 +1146,10 @@ function Cassandra.spawn_cluster(options)
return true
end

function Cassandra.get_cluster_info(options)
return cache.get_cluster_info(options.shm)
end

--- Type serializer shorthands.
-- When binding parameters to a query from `execute`, some
-- types cannot be infered automatically and will require manual
Expand Down
26 changes: 25 additions & 1 deletion src/cassandra/cache.lua
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,36 @@ local function get_prepared_query_id(options, query)
return value, nil, prepared_key
end

local function set_cluster_info(shm, release_version)
local dict = get_dict(shm)
local ok, err = dict:safe_set("cluster:release_version", release_version)
if not ok then
return false, Errors.SharedDictError("Cannot store cluster info in shm "..shm..": "..err, shm)
end
return true
end

local function get_cluster_info(shm)
local dict = get_dict(shm)
local release_version, err = dict:get("cluster:release_version")
if err then
return nil, Errors.SharedDictError("Cannot retrieve cluster info in shm "..shm..": "..err, shm)
elseif not release_version then
return nil, Errors.DriverError("No cluster info in shm "..shm)
end
return {
release_version = release_version
}
end

return {
get_dict = get_dict,
get_host = get_host,
set_host = set_host,
set_hosts = set_hosts,
get_hosts = get_hosts,
set_prepared_query_id = set_prepared_query_id,
get_prepared_query_id = get_prepared_query_id
get_prepared_query_id = get_prepared_query_id,
set_cluster_info = set_cluster_info,
get_cluster_info = get_cluster_info
}
3 changes: 1 addition & 2 deletions src/cassandra/options.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ local DEFAULTS = {
retry_on_timeout = true
},
protocol_options = {
version = 3,
default_port = 9042,
max_schema_consensus_wait = 10000
},
Expand Down Expand Up @@ -173,8 +174,6 @@ local function parse_cluster(options)
return nil, "contact_points must contain at least one contact point"
end

options.keyspace = nil -- it makes no sense to use keyspace in this context

return options
end

Expand Down

0 comments on commit baf0210

Please sign in to comment.