Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion test/storage-luatest/storage_1_1_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,18 @@ test_group.before_all(function(g)

vtest.cluster_new(g, global_cfg)
vtest.cluster_bootstrap(g, global_cfg)
vtest.cluster_rebalancer_disable(g)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you enable the rebalancer in the first commit? It doesn't affect the rebalancer, no?

vtest.cluster_wait_vclock_all(g)

vtest.cluster_exec_each_master(g, function()
box.schema.create_space('test_space')
box.space.test_space:format({
{name = 'pk', type = 'unsigned'},
{name = 'bucket_id', type = 'unsigned'},
})
box.space.test_space:create_index('primary', {parts = {'pk'}})
box.space.test_space:create_index(
'bucket_id', {parts = {'bucket_id'}, unique = false})
end)
Comment on lines +55 to +64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added in the first commit. And is not used here. Lets either move it to the commit which needs these things, or remove it entirely.

end)

test_group.after_all(function(g)
Expand Down Expand Up @@ -101,3 +112,82 @@ test_group.test_manual_bucket_send_doubled_buckets = function(g)
ilt.assert_equals(box.space._bucket:get(bid), nil)
end, {bid})
end

local function start_partial_bucket_move(src_storage, dest_storage, bucket_id)
src_storage:exec(function(bucket_id, replicaset_id)
local res, err = ivshard.storage.bucket_send(bucket_id, replicaset_id)
t.assert_not(res)
t.assert(err)
t.helpers.retrying({}, function()
-- The bucket on src_storage must be in "sending" state. The
-- recovery service on src_storage should not erase this bucket.
t.assert_equals(box.space._bucket:select(bucket_id)[1].status,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you select a single tuple by a unique index, better use :get().

'sending')
end)
Comment on lines +121 to +126
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need this in "retrying"? AFAIU, the bucket is guaranteed to be in SENDING state right after your bucket_send() above will fail. Which means this loop could be just an assertion. Right?

end, {bucket_id, dest_storage:replicaset_uuid()})

dest_storage:exec(function(bucket_id)
t.helpers.retrying({}, function()
-- The recovery service on dest_storage should clear this bucket.
t.assert_equals(box.space._bucket:select(bucket_id), {})
end)
end, {bucket_id})
end

local function wait_for_bucket_is_transferred(src_storage, dest_storage,
bucket_id)
src_storage:exec(function(bucket_id)
t.helpers.retrying({}, function()
t.assert_equals(box.space._bucket:select(bucket_id), {})
end)
end, {bucket_id})
dest_storage:exec(function(bucket_id)
t.helpers.retrying({}, function()
t.assert_equals(box.space._bucket:select(bucket_id)[1].status,
'active')
end)
end, {bucket_id})
end

--
-- Reduce spam of "Finish bucket recovery step" logs in recovery
-- service (gh-212).
--
test_group.test_no_logs_while_unsuccess_recovery = function(g)
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true
rawset(_G, 'old_call', ivshard.storage._call)
ivshard.storage._call = function(service_name, ...)
if service_name == 'recovery_bucket_stat' then
return error('TimedOut')
end
return _G.old_call(service_name, ...)
end
end)
local hanged_bucket_id_1 = vtest.storage_first_bucket(g.replica_1_a)
start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_1)
local hanged_bucket_id_2 = vtest.storage_first_bucket(g.replica_1_a)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hanged -> hung.

start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_2)
t.helpers.retrying({}, function()
g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end)
t.assert(g.replica_1_a:grep_log('Error during recovery of bucket'))
end)
t.assert_not(g.replica_1_a:grep_log('Finish bucket recovery step, 0'))
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false
ivshard.storage._call = _G.old_call
end)
t.helpers.retrying({timeout = 60}, function()
g.replica_2_a:exec(function()
ivshard.storage.garbage_collector_wakeup()
ivshard.storage.recovery_wakeup()
end)
g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end)
t.assert(g.replica_1_a:grep_log('Finish bucket recovery step, 2 ' ..
'sending buckets are recovered among'))
end)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a,
hanged_bucket_id_1)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a,
hanged_bucket_id_2)
end
2 changes: 1 addition & 1 deletion vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ local function recovery_step_by_type(type, limiter)
is_step_empty = false
::continue::
end
if not is_step_empty then
if recovered > 0 then
log.info('Finish bucket recovery step, %d %s buckets are recovered '..
'among %d', recovered, type, total)
end
Expand Down