Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions test/rebalancer/rebalancer.result
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
---
...
wait_rebalancer_state("Rebalance routes are sent", test_run)
wait_rebalancer_state("The following rebalancer routes were sent", test_run)
---
...
wait_rebalancer_state('The cluster is balanced ok', test_run)
Expand Down Expand Up @@ -239,7 +239,7 @@ cfg.rebalancer_disbalance_threshold = 0.01
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
wait_rebalancer_state('The cluster is balanced ok', test_run)
Expand Down Expand Up @@ -318,12 +318,13 @@ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
---
- [150, 'receiving']
...
wait_rebalancer_state("Some buckets are not active", test_run)
---
...
-- We should not check the certain status of buckets (e.g. receiving) because
-- in rare cases we accidentally can get the wrong one. For example we wait for
-- "receiving" status, but get "garbage" due to some previous rebalancer error.
wait_rebalancer_state('Error during downloading rebalancer states:.*' .. \
'REBALANCER_INVALID_STATE', test_run) \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit message still mentions BUCKET_INVALID_STATE.

_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}})
---
- [150, 'active']
...
vshard.storage.sync()
---
Expand Down
10 changes: 7 additions & 3 deletions test/rebalancer/rebalancer.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, true)

test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
wait_rebalancer_state("Rebalance routes are sent", test_run)
wait_rebalancer_state("The following rebalancer routes were sent", test_run)

wait_rebalancer_state('The cluster is balanced ok', test_run)
_bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
Expand Down Expand Up @@ -118,7 +118,7 @@ _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
-- Return 1%.
cfg.rebalancer_disbalance_threshold = 0.01
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
wait_rebalancer_state('The cluster is balanced ok', test_run)
_bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
_bucket.index.status:min({vshard.consts.BUCKET.ACTIVE})
Expand Down Expand Up @@ -156,7 +156,11 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, false)
test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
wait_rebalancer_state("Some buckets are not active", test_run)
-- We should not check the certain status of buckets (e.g. receiving) because
-- in rare cases we accidentally can get the wrong one. For example we wait for
-- "receiving" status, but get "garbage" due to some previous rebalancer error.
wait_rebalancer_state('Error during downloading rebalancer states:.*' .. \
'REBALANCER_INVALID_STATE', test_run) \
_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}})
vshard.storage.sync()

Expand Down
4 changes: 2 additions & 2 deletions test/rebalancer/stress_add_remove_several_rs.result
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ add_replicaset()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
-- Now, add a second replicaset.
Expand Down Expand Up @@ -422,7 +422,7 @@ remove_second_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
-- Rebalancing has been started - now remove second replicaset.
Expand Down
4 changes: 2 additions & 2 deletions test/rebalancer/stress_add_remove_several_rs.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fiber.sleep(0.5)
test_run:switch('box_1_a')
add_replicaset()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)

-- Now, add a second replicaset.

Expand Down Expand Up @@ -153,7 +153,7 @@ fiber.sleep(0.5)
test_run:switch('box_1_a')
remove_second_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
-- Rebalancing has been started - now remove second replicaset.
remove_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
Expand Down
146 changes: 145 additions & 1 deletion test/storage-luatest/storage_1_1_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,18 @@ test_group.before_all(function(g)

vtest.cluster_new(g, global_cfg)
vtest.cluster_bootstrap(g, global_cfg)
vtest.cluster_rebalancer_disable(g)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you enable the rebalancer in the first commit? It doesn't affect the rebalancer, no?

vtest.cluster_wait_vclock_all(g)

vtest.cluster_exec_each_master(g, function()
box.schema.create_space('test_space')
box.space.test_space:format({
{name = 'pk', type = 'unsigned'},
{name = 'bucket_id', type = 'unsigned'},
})
box.space.test_space:create_index('primary', {parts = {'pk'}})
box.space.test_space:create_index(
'bucket_id', {parts = {'bucket_id'}, unique = false})
end)
Comment on lines +55 to +64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added in the first commit. And is not used here. Lets either move it to the commit which needs these things, or remove it entirely.

end)

test_group.after_all(function(g)
Expand Down Expand Up @@ -101,3 +112,136 @@ test_group.test_manual_bucket_send_doubled_buckets = function(g)
ilt.assert_equals(box.space._bucket:get(bid), nil)
end, {bid})
end

local function start_partial_bucket_move(src_storage, dest_storage, bucket_id)
src_storage:exec(function(bucket_id, replicaset_id)
local res, err = ivshard.storage.bucket_send(bucket_id, replicaset_id)
t.assert_not(res)
t.assert(err)
t.helpers.retrying({}, function()
-- The bucket on src_storage must be in "sending" state. The
-- recovery service on src_storage should not erase this bucket.
t.assert_equals(box.space._bucket:select(bucket_id)[1].status,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you select a single tuple by a unique index, better use :get().

'sending')
end)
Comment on lines +121 to +126
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need this in "retrying"? AFAIU, the bucket is guaranteed to be in SENDING state right after your bucket_send() above will fail. Which means this loop could be just an assertion. Right?

end, {bucket_id, dest_storage:replicaset_uuid()})

dest_storage:exec(function(bucket_id)
t.helpers.retrying({}, function()
-- The recovery service on dest_storage should clear this bucket.
t.assert_equals(box.space._bucket:select(bucket_id), {})
end)
end, {bucket_id})
end

local function wait_for_bucket_is_transferred(src_storage, dest_storage,
bucket_id)
src_storage:exec(function(bucket_id)
t.helpers.retrying({}, function()
t.assert_equals(box.space._bucket:select(bucket_id), {})
end)
end, {bucket_id})
dest_storage:exec(function(bucket_id)
t.helpers.retrying({}, function()
t.assert_equals(box.space._bucket:select(bucket_id)[1].status,
'active')
end)
end, {bucket_id})
end

local function move_bucket(src_storage, dest_storage, bucket_id)
src_storage:exec(function(bucket_id, replicaset_id)
local res, err = ivshard.storage.bucket_send(bucket_id, replicaset_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, give it a long timeout. Otherwise this test will be flaky.

t.assert(res and not err, 'Error during transferring bucket')
end, {bucket_id, dest_storage:replicaset_uuid()})
wait_for_bucket_is_transferred(src_storage, dest_storage, bucket_id)
end

--
-- Reduce spam of "Finish bucket recovery step" logs and add logging of
-- recovered buckets in recovery service (gh-212).
--
test_group.test_no_logs_while_unsuccess_recovery = function(g)
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true
rawset(_G, 'old_call', ivshard.storage._call)
ivshard.storage._call = function(service_name, ...)
if service_name == 'recovery_bucket_stat' then
return error('TimedOut')
end
return _G.old_call(service_name, ...)
end
end)
local hanged_bucket_id_1 = vtest.storage_first_bucket(g.replica_1_a)
start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_1)
local hanged_bucket_id_2 = vtest.storage_first_bucket(g.replica_1_a)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hanged -> hung.

start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_2)
t.helpers.retrying({}, function()
g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end)
t.assert(g.replica_1_a:grep_log('Error during recovery of bucket'))
end)
t.assert_not(g.replica_1_a:grep_log('Finish bucket recovery step, 0'))
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false
ivshard.storage._call = _G.old_call
end)
t.helpers.retrying({timeout = 60}, function()
g.replica_2_a:exec(function()
ivshard.storage.garbage_collector_wakeup()
ivshard.storage.recovery_wakeup()
end)
g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end)
-- In some rare cases the recovery service can recover buckets one
-- by one. As a result we get multiple "Finish bucket recovery" and
-- "Recovery buckets" logs with different bucket ids and buckets'
-- count. That is why we should grep general logs without buckets'
-- count and bucket ids to avoid flakiness.
t.assert(g.replica_1_a:grep_log('Finish bucket recovery step'))
t.assert(g.replica_1_a:grep_log('Recovered buckets'))
end)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a,
hanged_bucket_id_1)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a,
hanged_bucket_id_2)
end

--
-- Add logging of routes in rebalancer service (gh-212).
--
test_group.test_rebalancer_routes_logging = function(g)
local moved_bucket_from_2 = vtest.storage_first_bucket(g.replica_2_a)
move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket_from_2)
local moved_bucket_from_3 = vtest.storage_first_bucket(g.replica_3_a)
move_bucket(g.replica_3_a, g.replica_1_a, moved_bucket_from_3)
t.helpers.retrying({timeout = 60}, function()
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the test case for which you have enabled the rebalancer in the first commit, then please, 1) enable the rebalancer here, 2) disable it in the end of the test case.

t.assert(g.replica_1_a:grep_log('Apply rebalancer routes with 1 ' ..
'workers'))
end)
local rebalancer_routes_msg = string.format(
"{\"%s\":{\"%s\":1}}", g.replica_1_a:replicaset_uuid(),
g.replica_2_a:replicaset_uuid())
t.helpers.retrying({}, function()
t.assert(g.replica_1_a:grep_log(rebalancer_routes_msg))
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
g.replica_1_a:grep_log('The cluster is balanced ok.')
end)
end

--
-- Add replicaset.id into rebalancer_request_state errors (gh-212).
--
test_group.test_no_log_spam_when_buckets_no_active = function(g)
local moved_bucket = vtest.storage_first_bucket(g.replica_1_a)
move_bucket(g.replica_1_a, g.replica_2_a, moved_bucket)
vtest.storage_stop(g.replica_2_a)
local err_log = string.format('Error during downloading rebalancer ' ..
'states:.*"replicaset_id":"%s"',
g.replica_2_a:replicaset_uuid())
t.helpers.retrying({timeout = 60}, function()
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
t.assert(g.replica_1_a:grep_log(err_log))
end)
vtest.storage_start(g.replica_2_a, global_cfg)
move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket)
end
16 changes: 13 additions & 3 deletions test/unit/rebalancer.result
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,11 @@ build_routes(replicasets)
vshard.storage.internal.is_master = true
---
...
get_state = vshard.storage._rebalancer_request_state
---
...
get_state = function() \
local res, err = vshard.storage._rebalancer_request_state() \
if res == nil then err.trace = nil end \
return res, err \
end \
_bucket = box.schema.create_space('_bucket')
---
...
Expand All @@ -318,13 +320,21 @@ get_state()
---
- bucket_active_count: 2
bucket_pinned_count: 0
- null
...
_bucket:replace{1, consts.BUCKET.RECEIVING}
---
- [1, 'receiving']
...
get_state()
---
- null
- buckets_state: receiving
code: 42
replica_id: _
type: ShardingError
message: Replica _ has receiving buckets during rebalancing
name: REBALANCER_INVALID_STATE
...
vshard.storage.internal.is_master = false
---
Expand Down
6 changes: 5 additions & 1 deletion test/unit/rebalancer.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ build_routes(replicasets)
-- Test rebalancer local state.
--
vshard.storage.internal.is_master = true
get_state = vshard.storage._rebalancer_request_state
get_state = function() \
local res, err = vshard.storage._rebalancer_request_state() \
if res == nil then err.trace = nil end \
return res, err \
end \
_bucket = box.schema.create_space('_bucket')
pk = _bucket:create_index('pk')
status = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
Expand Down
5 changes: 5 additions & 0 deletions vshard/error.lua
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ local error_message_template = {
msg = 'Mismatch server name: expected "%s", but got "%s"',
args = {'expected_name', 'actual_name'},
},
[42] = {
name = 'REBALANCER_INVALID_STATE',
msg = 'Replica %s has %s buckets during rebalancing',
args = {'replica_id', 'buckets_state'}
}
}

--
Expand Down
Loading
Loading