-
Notifications
You must be signed in to change notification settings - Fork 33
Improve logging of rebalancer and recovery #586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
271ba02
93c964a
ab86c7f
dcfa98a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,3 +101,141 @@ test_group.test_manual_bucket_send_doubled_buckets = function(g) | |
| ilt.assert_equals(box.space._bucket:get(bid), nil) | ||
| end, {bid}) | ||
| end | ||
|
|
||
| local rebalancer_recovery_group = t.group('rebalancer-recovery-logging') | ||
|
|
||
| local function start_bucket_move(src_storage, dest_storage, bucket_id) | ||
| src_storage:exec(function(bucket_id, replicaset_id) | ||
| ivshard.storage.bucket_send(bucket_id, replicaset_id) | ||
| end, {bucket_id, dest_storage:replicaset_uuid()}) | ||
|
|
||
| dest_storage:exec(function(bucket_id) | ||
| t.helpers.retrying({timeout = 10}, function() | ||
| t.assert(box.space._bucket:select(bucket_id)) | ||
| end) | ||
| end, {bucket_id}) | ||
Serpentian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| end | ||
|
|
||
| local function wait_for_bucket_is_transferred(src_storage, dest_storage, | ||
| bucket_id) | ||
| src_storage:exec(function(bucket_id) | ||
| t.helpers.retrying({}, function() | ||
| t.assert_equals(box.space._bucket:select(bucket_id), {}) | ||
| end) | ||
| end, {bucket_id}) | ||
| dest_storage:exec(function(bucket_id) | ||
| t.helpers.retrying({}, function() | ||
| t.assert_equals(box.space._bucket:select(bucket_id)[1].status, | ||
| 'active') | ||
| end) | ||
| end, {bucket_id}) | ||
| end | ||
|
|
||
| rebalancer_recovery_group.before_all(function(g) | ||
| global_cfg = vtest.config_new(cfg_template) | ||
| vtest.cluster_new(g, global_cfg) | ||
| g.router = vtest.router_new(g, 'router', global_cfg) | ||
| vtest.cluster_bootstrap(g, global_cfg) | ||
| vtest.cluster_wait_vclock_all(g) | ||
|
|
||
| vtest.cluster_exec_each_master(g, function() | ||
| box.schema.create_space('test_space') | ||
| box.space.test_space:format({ | ||
| {name = 'pk', type = 'unsigned'}, | ||
| {name = 'bucket_id', type = 'unsigned'}, | ||
| }) | ||
| box.space.test_space:create_index('primary', {parts = {'pk'}}) | ||
| box.space.test_space:create_index( | ||
| 'bucket_id', {parts = {'bucket_id'}, unique = false}) | ||
| end) | ||
| end) | ||
|
|
||
| rebalancer_recovery_group.after_all(function(g) | ||
| g.cluster:drop() | ||
| end) | ||
|
|
||
| -- | ||
| -- Improve logging of rebalancer and recovery (gh-212). | ||
| -- | ||
| rebalancer_recovery_group.test_no_logs_while_unsuccess_recovery = function(g) | ||
| g.replica_2_a:exec(function() | ||
| ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true | ||
| rawset(_G, 'old_call', ivshard.storage._call) | ||
| ivshard.storage._call = function(service_name, ...) | ||
| if service_name == 'recovery_bucket_stat' then | ||
| return error('TimedOut') | ||
| end | ||
| return _G.old_call(service_name, ...) | ||
| end | ||
| end) | ||
| local hanged_bucket_id_1 = vtest.storage_first_bucket(g.replica_1_a) | ||
| start_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_1) | ||
| local hanged_bucket_id_2 = vtest.storage_first_bucket(g.replica_1_a) | ||
| start_bucket_move(g.replica_1_a, g.replica_2_a, hanged_bucket_id_2) | ||
| t.helpers.retrying({}, function() | ||
| t.assert(g.replica_1_a:grep_log('Error during recovery of bucket 1')) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: it's better to wakeup the |
||
| end) | ||
| t.assert_not(g.replica_1_a:grep_log('Finish bucket recovery step, 0')) | ||
| g.replica_2_a:exec(function() | ||
| ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false | ||
| ivshard.storage._call = _G.old_call | ||
| end) | ||
| t.helpers.retrying({timeout = 60}, function() | ||
| g.replica_2_a:exec(function() | ||
| ivshard.storage.garbage_collector_wakeup() | ||
| ivshard.storage.recovery_wakeup() | ||
| end) | ||
| g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end) | ||
| -- In some rare cases the recovery service can recover buckets one | ||
| -- by one. As a result we get multiple "Finish bucket recovery" and | ||
| -- "Recovery buckets" logs with different bucket ids and buckets' | ||
| -- count. That is why we should grep general logs without buckets' | ||
| -- count and bucket ids to avoid flakiness. | ||
| t.assert(g.replica_1_a:grep_log('Finish bucket recovery step')) | ||
| t.assert(g.replica_1_a:grep_log('Recovered buckets')) | ||
| end) | ||
| wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a, | ||
| hanged_bucket_id_1) | ||
| wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a, | ||
| hanged_bucket_id_2) | ||
| end | ||
|
|
||
| rebalancer_recovery_group.test_rebalancer_routes_logging = function(g) | ||
| local moved_bucket_from_2 = vtest.storage_first_bucket(g.replica_2_a) | ||
| start_bucket_move(g.replica_2_a, g.replica_1_a, moved_bucket_from_2) | ||
| local moved_bucket_from_3 = vtest.storage_first_bucket(g.replica_3_a) | ||
| start_bucket_move(g.replica_3_a, g.replica_1_a, moved_bucket_from_3) | ||
| t.helpers.retrying({timeout = 60}, function() | ||
| g.replica_1_a:exec(function() | ||
| ivshard.storage.rebalancer_wakeup() | ||
| end) | ||
| t.assert(g.replica_1_a:grep_log('Apply rebalancer routes with 1 ' .. | ||
| 'workers')) | ||
| end) | ||
| local rebalancer_routes_msg = string.format( | ||
| "{\"%s\":{\"%s\":1,\"%s\":1}}", g.replica_1_a:replicaset_uuid(), | ||
| g.replica_3_a:replicaset_uuid(), g.replica_2_a:replicaset_uuid()) | ||
| t.assert(g.replica_1_a:grep_log(rebalancer_routes_msg)) | ||
| t.helpers.retrying({}, function() | ||
| g.replica_1_a:grep_log('The cluster is balanced ok.') | ||
| end) | ||
| end | ||
|
|
||
| rebalancer_recovery_group.test_no_log_spam_when_buckets_no_active = function(g) | ||
| local moved_bucket = vtest.storage_first_bucket(g.replica_2_a) | ||
| start_bucket_move(g.replica_1_a, g.replica_2_a, moved_bucket) | ||
| wait_for_bucket_is_transferred(g.replica_1_a, g.replica_2_a, moved_bucket) | ||
| vtest.storage_stop(g.replica_2_a) | ||
| local buckets_not_active = string.format('Some buckets in replicaset ' .. | ||
| '%s are not active', | ||
| g.replica_2_a:replicaset_uuid()) | ||
| t.helpers.retrying({timeout = 60}, function() | ||
| g.replica_1_a:exec(function() | ||
| ivshard.storage.rebalancer_wakeup() | ||
| end) | ||
| t.assert(g.replica_1_a:grep_log(buckets_not_active)) | ||
| end) | ||
| vtest.storage_start(g.replica_2_a, global_cfg) | ||
| start_bucket_move(g.replica_2_a, g.replica_1_a, moved_bucket) | ||
| wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a, moved_bucket) | ||
| end | ||
Serpentian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -5,6 +5,7 @@ local lmsgpack = require('msgpack') | |||
| local netbox = require('net.box') -- for net.box:self() | ||||
| local trigger = require('internal.trigger') | ||||
| local ffi = require('ffi') | ||||
| local json_encode = require('json').encode | ||||
Serpentian marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| local yaml_encode = require('yaml').encode | ||||
| local fiber_clock = lfiber.clock | ||||
| local fiber_yield = lfiber.yield | ||||
|
|
@@ -931,6 +932,7 @@ local function recovery_step_by_type(type) | |||
| local recovered = 0 | ||||
| local total = 0 | ||||
| local start_format = 'Starting %s buckets recovery step' | ||||
| local recovered_buckets = {SENT = {}, GARBAGE = {}, ACTIVE = {}} | ||||
| for _, bucket in _bucket.index.status:pairs(type) do | ||||
| lfiber.testcancel() | ||||
| total = total + 1 | ||||
|
|
@@ -990,22 +992,26 @@ local function recovery_step_by_type(type) | |||
| if recovery_local_bucket_is_sent(bucket, remote_bucket) then | ||||
| _bucket:update({bucket_id}, {{'=', 2, BSENT}}) | ||||
| recovered = recovered + 1 | ||||
| table.insert(recovered_buckets['SENT'], bucket_id) | ||||
| elseif recovery_local_bucket_is_garbage(bucket, remote_bucket) then | ||||
| _bucket:update({bucket_id}, {{'=', 2, BGARBAGE}}) | ||||
| recovered = recovered + 1 | ||||
| table.insert(recovered_buckets['SENT'], bucket_id) | ||||
| elseif recovery_local_bucket_is_active(bucket, remote_bucket) then | ||||
| _bucket:replace({bucket_id, BACTIVE}) | ||||
| recovered = recovered + 1 | ||||
| table.insert(recovered_buckets['ACTIVE'], bucket_id) | ||||
| elseif is_step_empty then | ||||
| log.info('Bucket %s is %s local and %s on replicaset %s, waiting', | ||||
| bucket_id, bucket.status, remote_bucket.status, peer_id) | ||||
| end | ||||
| is_step_empty = false | ||||
| ::continue:: | ||||
| end | ||||
| if not is_step_empty then | ||||
| if recovered > 0 then | ||||
| log.info('Finish bucket recovery step, %d %s buckets are recovered '.. | ||||
| 'among %d', recovered, type, total) | ||||
| 'among %d. Recovered buckets: %s', recovered, type, total, | ||||
| json_encode(recovered_buckets)) | ||||
| end | ||||
| return total, recovered | ||||
| end | ||||
|
|
@@ -2794,7 +2800,7 @@ local function rebalancer_download_states() | |||
| replicaset, 'vshard.storage.rebalancer_request_state', {}, | ||||
Serpentian marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| {timeout = consts.REBALANCER_GET_STATE_TIMEOUT}) | ||||
| if state == nil then | ||||
| return | ||||
| return nil, replicaset.id | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be carefully rebased, now error is returned from the In that err (or any other there) we'll just set the Line 1653 in b4adaea
And we'll print the whole err, not just the replicaset's id |
||||
| end | ||||
| local bucket_count = state.bucket_active_count + | ||||
| state.bucket_pinned_count | ||||
|
|
@@ -2809,7 +2815,7 @@ local function rebalancer_download_states() | |||
| end | ||||
| local sum = total_bucket_active_count + total_bucket_locked_count | ||||
| if sum == M.total_bucket_count then | ||||
| return replicasets, total_bucket_active_count | ||||
| return total_bucket_active_count, replicasets | ||||
| else | ||||
| log.info('Total active bucket count is not equal to total. '.. | ||||
| 'Possibly a boostrap is not finished yet. Expected %d, but '.. | ||||
|
|
@@ -2833,18 +2839,19 @@ local function rebalancer_service_f(service) | |||
| end | ||||
| service:set_activity('downloading states') | ||||
| lfiber.testcancel() | ||||
| local status, replicasets, total_bucket_active_count = | ||||
| local status, total_bucket_active_count, replicasets = | ||||
| pcall(rebalancer_download_states) | ||||
| if M.module_version ~= module_version then | ||||
| return | ||||
| end | ||||
| if not status or replicasets == nil then | ||||
| if not status or total_bucket_active_count == nil then | ||||
| if not status then | ||||
| log.error(service:set_status_error( | ||||
| 'Error during downloading rebalancer states: %s', | ||||
| replicasets)) | ||||
| end | ||||
| log.info('Some buckets are not active, retry rebalancing later') | ||||
| log.info('Some buckets in replicaset %s are not active, retry ' .. | ||||
| 'rebalancing later', replicasets) | ||||
| service:set_activity('idling') | ||||
| lfiber.testcancel() | ||||
| lfiber.sleep(consts.REBALANCER_WORK_INTERVAL) | ||||
Serpentian marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
|
|
@@ -2897,8 +2904,9 @@ local function rebalancer_service_f(service) | |||
| goto continue | ||||
| end | ||||
| end | ||||
| log.info('Rebalance routes are sent. Schedule next wakeup after '.. | ||||
| '%f seconds', consts.REBALANCER_WORK_INTERVAL) | ||||
| log.info('The following rebalancer routes were sent: %s. ' .. | ||||
| 'Schedule next wakeup after %f seconds', json_encode(routes), | ||||
| consts.REBALANCER_WORK_INTERVAL) | ||||
| service:set_activity('idling') | ||||
| lfiber.testcancel() | ||||
| lfiber.sleep(consts.REBALANCER_WORK_INTERVAL) | ||||
|
|
||||
Uh oh!
There was an error while loading. Please reload this page.