Skip to content
This repository was archived by the owner on May 25, 2021. It is now read-only.

Commit ae00a5a

Browse files
committed
Improve compaction task status updates
Previous the emsort related operations did not update the compaction task status. For large databases this leads to some very long waits while the compaction task stays at 100%. This change adds progress reports to the steps for sorting and copying document ids back into the database file.
1 parent 21c8d37 commit ae00a5a

File tree

2 files changed

+134
-34
lines changed

2 files changed

+134
-34
lines changed

src/couch_db_updater.erl

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
infos
3535
}).
3636

37+
-define(COMP_DOCID_BATCH_SIZE, 1000).
38+
3739
init({DbName, Filepath, Fd, Options}) ->
3840
erlang:put(io_priority, {db_update, DbName}),
3941
case lists:member(create, Options) of
@@ -1108,10 +1110,10 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
11081110
NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
11091111

11101112

1111-
copy_compact(Db, NewDb0, Retry) ->
1113+
copy_compact(Db, NewDb0, Retry, TotalChanges) ->
11121114
Compression = couch_compress:get_compression_method(),
11131115
NewDb = NewDb0#db{compression=Compression},
1114-
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
1116+
11151117
BufferSize = list_to_integer(
11161118
config:get("database_compaction", "doc_buffer_size", "524288")),
11171119
CheckpointAfter = couch_util:to_integer(
@@ -1147,6 +1149,7 @@ copy_compact(Db, NewDb0, Retry) ->
11471149
TaskProps0 = [
11481150
{type, database_compaction},
11491151
{database, Db#db.name},
1152+
{phase, seq_tree},
11501153
{progress, 0},
11511154
{changes_done, 0},
11521155
{total_changes, TotalChanges}
@@ -1193,17 +1196,19 @@ start_copy_compact(#db{}=Db) ->
11931196
open_compaction_files(Name, Header, Filepath, Options),
11941197
erlang:monitor(process, MFd),
11951198

1199+
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
1200+
11961201
% This is a bit worrisome. init_db/4 will monitor the data fd
11971202
% but it doesn't know about the meta fd. For now I'll maintain
11981203
% that the data fd is the old normal fd and meta fd is special
11991204
% and hope everything works out for the best.
12001205
unlink(DFd),
12011206

12021207
NewDb1 = copy_purge_info(Db, NewDb),
1203-
NewDb2 = copy_compact(Db, NewDb1, Retry),
1204-
NewDb3 = sort_meta_data(NewDb2),
1208+
NewDb2 = copy_compact(Db, NewDb1, Retry, TotalChanges),
1209+
NewDb3 = sort_meta_data(NewDb2, TotalChanges),
12051210
NewDb4 = commit_compaction_data(NewDb3),
1206-
NewDb5 = copy_meta_data(NewDb4),
1211+
NewDb5 = copy_meta_data(NewDb4, TotalChanges),
12071212
NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
12081213
close_db(NewDb6),
12091214

@@ -1323,12 +1328,84 @@ bind_id_tree(Db, Fd, State) ->
13231328
Db#db{id_tree=IdBtree}.
13241329

13251330

1326-
sort_meta_data(Db0) ->
1327-
{ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
1328-
Db0#db{id_tree=Ems}.
1331+
sort_meta_data(Db0, TotalChanges) ->
1332+
couch_task_status:update([
1333+
{phase, sort_ids_init},
1334+
{total_changes, TotalChanges},
1335+
{changes_done, 0},
1336+
{progress, 0}
1337+
]),
1338+
Ems0 = Db0#db.id_tree,
1339+
Options = [
1340+
{event_cb, fun emsort_cb/3},
1341+
{event_st, {init, 0, 0}}
1342+
],
1343+
Ems1 = couch_emsort:set_options(Ems0, Options),
1344+
{ok, Ems2} = couch_emsort:merge(Ems1),
1345+
Db0#db{id_tree=Ems2}.
1346+
1347+
1348+
emsort_cb(_Ems, {merge, chain}, {init, Copied, Nodes}) ->
1349+
{init, Copied, Nodes + 1};
1350+
emsort_cb(_Ems, row_copy, {init, Copied, Nodes})
1351+
when Copied >= ?COMP_DOCID_BATCH_SIZE ->
1352+
update_compact_task(Copied + 1),
1353+
{init, 0, Nodes};
1354+
emsort_cb(_Ems, row_copy, {init, Copied, Nodes}) ->
1355+
{init, Copied + 1, Nodes};
1356+
emsort_cb(Ems, {merge_start, reverse}, {init, Copied, Nodes}) ->
1357+
BBChunkSize = couch_emsort:get_bb_chunk_size(Ems),
1358+
1359+
% Subtract one because we already finished the first
1360+
% iteration when we were counting the number of nodes
1361+
% in the backbone.
1362+
Iters = calculate_sort_iters(Nodes, BBChunkSize, 0) - 1,
1363+
1364+
% Compaction retries mean we may have copied more than
1365+
% doc count rows. This accounts for that by using the
1366+
% number we've actually copied.
1367+
[PrevCopied] = couch_task_status:get([changes_done]),
1368+
TotalCopied = PrevCopied + Copied,
1369+
1370+
couch_task_status:update([
1371+
{phase, sort_ids},
1372+
{total_changes, Iters * TotalCopied},
1373+
{changes_done, 0},
1374+
{progress, 0}
1375+
]),
1376+
0;
1377+
1378+
emsort_cb(_Ems, row_copy, Copied)
1379+
when is_integer(Copied), Copied > ?COMP_DOCID_BATCH_SIZE ->
1380+
update_compact_task(Copied + 1),
1381+
0;
1382+
1383+
emsort_cb(_Ems, row_copy, Copied) when is_integer(Copied) ->
1384+
Copied + 1;
13291385

1386+
emsort_cb(_Ems, _Event, St) ->
1387+
St.
13301388

1331-
copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
1389+
1390+
calculate_sort_iters(Nodes, BBChunk, Count) when Nodes < BBChunk ->
1391+
Count;
1392+
calculate_sort_iters(Nodes0, BBChunk, Count) when BBChunk > 1 ->
1393+
Calc = fun(N0) ->
1394+
N1 = N0 div BBChunk,
1395+
N1 + if N1 rem BBChunk == 0 -> 0; true -> 1 end
1396+
end,
1397+
Nodes1 = Calc(Nodes0),
1398+
Nodes2 = Calc(Nodes1),
1399+
calculate_sort_iters(Nodes2, BBChunk, Count + 2).
1400+
1401+
1402+
copy_meta_data(#db{fd=Fd, header=Header}=Db, TotalChanges) ->
1403+
couch_task_status:update([
1404+
{phase, copy_ids},
1405+
{changes_done, 0},
1406+
{total_changes, TotalChanges},
1407+
{progress, 0}
1408+
]),
13321409
Src = Db#db.id_tree,
13331410
DstState = couch_db_header:id_tree_state(Header),
13341411
{ok, IdTree0} = couch_btree:open(DstState, Fd, [
@@ -1348,6 +1425,7 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
13481425
{ok, SeqTree} = couch_btree:add_remove(
13491426
Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
13501427
),
1428+
update_compact_task(length(Acc#merge_st.infos)),
13511429
Db#db{id_tree=IdTree, seq_tree=SeqTree}.
13521430

13531431

@@ -1359,6 +1437,7 @@ merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
13591437
} = Acc,
13601438
{ok, IdTree1} = couch_btree:add(IdTree0, Infos),
13611439
{ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
1440+
update_compact_task(length(Infos)),
13621441
Acc1 = Acc#merge_st{
13631442
id_tree=IdTree1,
13641443
seq_tree=SeqTree1,

src/couch_emsort.erl

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,18 @@
129129
% CA3 CD3
130130
%
131131

132-
-export([open/1, open/2, get_fd/1, get_state/1]).
132+
-export([open/1, open/2, set_options/2, get_fd/1, get_state/1]).
133+
-export([get_bb_chunk_size/1]).
133134
-export([add/2, merge/1, sort/1, iter/1, next/1]).
134135

135136

136137
-record(ems, {
137138
fd,
138139
root,
139140
bb_chunk = 10,
140-
chain_chunk = 100
141+
chain_chunk = 100,
142+
event_cb,
143+
event_st
141144
}).
142145

143146

@@ -156,7 +159,11 @@ set_options(Ems, [{root, Root} | Rest]) ->
156159
set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
157160
set_options(Ems#ems{chain_chunk=Count}, Rest);
158161
set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
159-
set_options(Ems#ems{bb_chunk=Count}, Rest).
162+
set_options(Ems#ems{bb_chunk=Count}, Rest);
163+
set_options(Ems, [{event_cb, EventCB} | Rest]) when is_function(EventCB, 3) ->
164+
set_options(Ems#ems{event_cb=EventCB}, Rest);
165+
set_options(Ems, [{event_st, EventSt} | Rest]) ->
166+
set_options(Ems#ems{event_st=EventSt}, Rest).
160167

161168

162169
get_fd(#ems{fd=Fd}) ->
@@ -167,6 +174,10 @@ get_state(#ems{root=Root}) ->
167174
Root.
168175

169176

177+
get_bb_chunk_size(#ems{bb_chunk = Size}) ->
178+
Size.
179+
180+
170181
add(Ems, []) ->
171182
{ok, Ems};
172183
add(Ems, KVs) ->
@@ -224,51 +235,55 @@ decimate(#ems{root={_BB, nil}}=Ems) ->
224235
% We have less than bb_chunk backbone pointers so we're
225236
% good to start streaming KV's back to the client.
226237
Ems;
227-
decimate(#ems{root={BB, NextBB}}=Ems) ->
238+
decimate(#ems{}=Ems0) ->
228239
% To make sure we have a bounded amount of data in RAM
229240
% at any given point we first need to decimate the data
230241
% by performing the first couple iterations of a merge
231242
% sort writing the intermediate results back to disk.
232243

233244
% The first pass gives us a sort with pointers linked from
234245
% largest to smallest.
235-
{RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
246+
{ok, Ems1} = event_notify(Ems0, {merge_start, forward}),
247+
{ok, Ems2} = merge_back_bone(Ems1, small),
236248

237249
% We have to run a second pass so that links are pointed
238250
% back from smallest to largest.
239-
{FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
251+
{ok, Ems3} = event_notify(Ems2, {merge_start, reverse}),
252+
{ok, Ems4} = merge_back_bone(Ems3, big),
240253

241254
% Continue deicmating until we have an acceptable bound on
242255
% the number of keys to use.
243-
decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
256+
decimate(Ems4).
244257

245258

246-
merge_back_bone(Ems, Choose, BB, NextBB) ->
247-
BBPos = merge_chains(Ems, Choose, BB),
248-
merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}).
259+
merge_back_bone(#ems{root={BB, NextBB}}=Ems0, Choose) ->
260+
{ok, Ems1, BBPos} = merge_chains(Ems0, Choose, BB),
261+
merge_rest_back_bone(Ems1, Choose, NextBB, {[BBPos], nil}).
249262

250263

251-
merge_rest_back_bone(_Ems, _Choose, nil, Acc) ->
252-
Acc;
253-
merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
254-
{ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
255-
NewPos = merge_chains(Ems, Choose, BB),
256-
{NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
257-
merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}).
264+
merge_rest_back_bone(Ems, _Choose, nil, Acc) ->
265+
{ok, Ems#ems{root=Acc}};
266+
merge_rest_back_bone(Ems0, Choose, BBPos, Acc) ->
267+
{ok, {BB, NextBB}} = couch_file:pread_term(Ems0#ems.fd, BBPos),
268+
{ok, Ems1, NewPos} = merge_chains(Ems0, Choose, BB),
269+
{NewBB, NewPrev} = append_item(Ems1, Acc, NewPos, Ems1#ems.bb_chunk),
270+
merge_rest_back_bone(Ems1, Choose, NextBB, {NewBB, NewPrev}).
258271

259272

260-
merge_chains(Ems, Choose, BB) ->
261-
Chains = init_chains(Ems, Choose, BB),
262-
merge_chains(Ems, Choose, Chains, {[], nil}).
273+
merge_chains(Ems0, Choose, BB) ->
274+
{ok, Ems1} = event_notify(Ems0, {merge, chain}),
275+
Chains = init_chains(Ems1, Choose, BB),
276+
merge_chains(Ems1, Choose, Chains, {[], nil}).
263277

264278

265279
merge_chains(Ems, _Choose, [], ChainAcc) ->
266280
{ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
267-
CPos;
268-
merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) ->
269-
{KV, RestChains} = choose_kv(Choose, Ems, Chains),
270-
{NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
271-
merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}).
281+
{ok, Ems, CPos};
282+
merge_chains(#ems{chain_chunk=CC}=Ems0, Choose, Chains, Acc) ->
283+
{KV, RestChains} = choose_kv(Choose, Ems0, Chains),
284+
{NewKVs, NewPrev} = append_item(Ems0, Acc, KV, CC),
285+
{ok, Ems1} = event_notify(Ems0, row_copy),
286+
merge_chains(Ems1, Choose, RestChains, {NewKVs, NewPrev}).
272287

273288

274289
init_chains(Ems, Choose, BB) ->
@@ -316,3 +331,9 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
316331
append_item(_Ems, {List, Prev}, Pos, _Size) ->
317332
{[Pos | List], Prev}.
318333

334+
335+
event_notify(#ems{event_cb = undefined} = Ems, _) ->
336+
{ok, Ems};
337+
event_notify(#ems{event_cb=EventCB, event_st=EventSt}=Ems, Event) ->
338+
NewSt = EventCB(Ems, Event, EventSt),
339+
{ok, Ems#ems{event_st=NewSt}}.

0 commit comments

Comments
 (0)