Skip to content

Commit

Permalink
Merge pull request #24 from thibaultCha/fix/nil-prepared-query-id
Browse files Browse the repository at this point in the history
fix(prepared) correct check for consensus timeout
  • Loading branch information
thibaultcha committed Jan 16, 2016
2 parents b95d1f6 + ddcdb1f commit 555c0d0
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .ci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
set -e

if [ "$OPENRESTY_TESTS" != "yes" ]; then
busted -v --coverage -o gtest
make lint
busted -v --coverage -o gtest --repeat 3
luacov-coveralls -i cassandra
else
prove -l t
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ env:
- LUA=lua5.3
- LUA=luajit-2.0
- LUA=luajit-2.1
- OPENRESTY_TESTS: yes
- OPENRESTY_TESTS: "yes"
LUA: luajit-2.1
before_install:
- bash .ci/setup_cassandra.sh
Expand Down
6 changes: 3 additions & 3 deletions spec/integration/cassandra_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ describe("spawn_session()", function()
describe("execute()", function()
after_each(function()
-- drop keyspace in case tests failed
session:execute("DROP KEYSPACE resty_cassandra_spec")
utils.drop_keyspace(session, "resty_cassandra_spec")
end)
it("should require argument #1 to be a string", function()
assert.has_error(function()
Expand Down Expand Up @@ -200,7 +200,7 @@ describe("spawn_session()", function()
assert.falsy(err)
assert.is_table(res)

local res, err = session:execute("DROP KEYSPACE resty_cassandra_spec")
res, err = session:execute("DROP KEYSPACE resty_cassandra_spec")
assert.falsy(err)
assert.is_table(res)
assert.equal(0, #res)
Expand All @@ -221,7 +221,7 @@ describe("spawn_session()", function()
assert.falsy(err)

_, err = session:execute [[
CREATE TABLE resty_cassandra_spec.fixture_table(
CREATE TABLE IF NOT EXISTS resty_cassandra_spec.fixture_table(
id uuid PRIMARY KEY,
value varchar
)
Expand Down
6 changes: 1 addition & 5 deletions spec/spec_utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ function _M.create_keyspace(session, keyspace)
end

function _M.drop_keyspace(session, keyspace)
local res, err = session:execute("DROP KEYSPACE "..keyspace)
if err then
error(err)
end
return res
session:execute("DROP KEYSPACE "..keyspace)
end

local delta = 0.0000001
Expand Down
23 changes: 16 additions & 7 deletions src/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,20 @@ end
function RequestHandler:wait_for_schema_consensus()
log.info("Waiting for schema consensus")

local match, err
local match, t_diff, err
local start = time_utils.get_time()

repeat
time_utils.wait(0.5)
match, err = check_schema_consensus(self)
until match or err ~= nil or (time_utils.get_time() - start) < self.options.protocol_options.max_schema_consensus_wait
t_diff = time_utils.get_time() - start
until match or err ~= nil or t_diff >= self.options.protocol_options.max_schema_consensus_wait

return err
if err ~= nil then
return err
elseif not match then
log.err("Waiting for schema consensus timed out. "..t_diff.." > "..self.options.protocol_options.max_schema_consensus_wait)
end
end

function RequestHandler:send_on_next_coordinator(request)
Expand Down Expand Up @@ -757,7 +762,7 @@ local function prepare_query(request_handler, query)
local prepared_key_lock = prepared_key.."_lock"
local lock, lock_err, elapsed = lock_mutex(request_handler.options.prepared_shm, prepared_key_lock)
if lock_err then
return nil, lock_err
return nil, "Could not create lock for prepare request: "..lock_err
end

if elapsed and elapsed == 0 then
Expand All @@ -767,26 +772,30 @@ local function prepare_query(request_handler, query)
local res, err = request_handler:send(prepare_request)
if err then
return nil, err
elseif res.query_id == nil then
return nil, "Could not retrieve query id from prepare request"
end
query_id = res.query_id
local ok, cache_err = cache.set_prepared_query_id(request_handler.options, query, query_id)
if not ok then
return nil, cache_err
return nil, "Could not insert query id in cache for prepared query: "..cache_err
end
log.info("Query prepared for host "..request_handler.coordinator.address)
else
-- once the lock is resolved, all other workers can retry to get the query, and should
-- instantly succeed. We then skip the preparation part.
query_id, cache_err = cache.get_prepared_query_id(request_handler.options, query)
if cache_err then
return nil, cache_err
return nil, "Could not get query id from cache for prepared query: "..cache_err
elseif query_id == nil then
return nil, "No query id found in cache for prepared query"
end
end

-- UNLOCK MUTEX
lock_err = unlock_mutex(lock)
if lock_err then
return nil, "Error unlocking mutex for query preparation: "..lock_err
return nil, "Error unlocking mutex for query for prepare request: "..lock_err
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/cassandra/options.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ local DEFAULTS = {
},
protocol_options = {
default_port = 9042,
max_schema_consensus_wait = 5000
max_schema_consensus_wait = 10000
},
socket_options = {
connect_timeout = 1000, -- ms
Expand Down

0 comments on commit 555c0d0

Please sign in to comment.