diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl index 270fffe4..3f3c9198 100644 --- a/src/couch_db_updater.erl +++ b/src/couch_db_updater.erl @@ -34,6 +34,8 @@ infos }). +-define(COMP_DOCID_BATCH_SIZE, 1000). + init({DbName, Filepath, Fd, Options}) -> erlang:put(io_priority, {db_update, DbName}), case lists:member(create, Options) of @@ -1108,10 +1110,10 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> NewDb#db{id_tree=IdEms, seq_tree=SeqTree}. -copy_compact(Db, NewDb0, Retry) -> +copy_compact(Db, NewDb0, Retry, TotalChanges) -> Compression = couch_compress:get_compression_method(), NewDb = NewDb0#db{compression=Compression}, - TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), + BufferSize = list_to_integer( config:get("database_compaction", "doc_buffer_size", "524288")), CheckpointAfter = couch_util:to_integer( @@ -1147,6 +1149,7 @@ copy_compact(Db, NewDb0, Retry) -> TaskProps0 = [ {type, database_compaction}, {database, Db#db.name}, + {phase, seq_tree}, {progress, 0}, {changes_done, 0}, {total_changes, TotalChanges} @@ -1193,6 +1196,8 @@ start_copy_compact(#db{}=Db) -> open_compaction_files(Name, Header, Filepath, Options), erlang:monitor(process, MFd), + TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), + % This is a bit worrisome. init_db/4 will monitor the data fd % but it doesn't know about the meta fd. For now I'll maintain % that the data fd is the old normal fd and meta fd is special @@ -1200,10 +1205,10 @@ start_copy_compact(#db{}=Db) -> unlink(DFd), NewDb1 = copy_purge_info(Db, NewDb), - NewDb2 = copy_compact(Db, NewDb1, Retry), - NewDb3 = sort_meta_data(NewDb2), + NewDb2 = copy_compact(Db, NewDb1, Retry, TotalChanges), + NewDb3 = sort_meta_data(NewDb2, TotalChanges), NewDb4 = commit_compaction_data(NewDb3), - NewDb5 = copy_meta_data(NewDb4), + NewDb5 = copy_meta_data(NewDb4, TotalChanges), NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)), close_db(NewDb6), @@ -1323,12 +1328,84 @@ bind_id_tree(Db, Fd, State) -> Db#db{id_tree=IdBtree}. -sort_meta_data(Db0) -> - {ok, Ems} = couch_emsort:merge(Db0#db.id_tree), - Db0#db{id_tree=Ems}. +sort_meta_data(Db0, TotalChanges) -> + couch_task_status:update([ + {phase, sort_ids_init}, + {total_changes, TotalChanges}, + {changes_done, 0}, + {progress, 0} + ]), + Ems0 = Db0#db.id_tree, + Options = [ + {event_cb, fun emsort_cb/3}, + {event_st, {init, 0, 0}} + ], + Ems1 = couch_emsort:set_options(Ems0, Options), + {ok, Ems2} = couch_emsort:merge(Ems1), + Db0#db{id_tree=Ems2}. + + +emsort_cb(_Ems, {merge, chain}, {init, Copied, Nodes}) -> + {init, Copied, Nodes + 1}; +emsort_cb(_Ems, row_copy, {init, Copied, Nodes}) + when Copied >= ?COMP_DOCID_BATCH_SIZE -> + update_compact_task(Copied + 1), + {init, 0, Nodes}; +emsort_cb(_Ems, row_copy, {init, Copied, Nodes}) -> + {init, Copied + 1, Nodes}; +emsort_cb(Ems, {merge_start, reverse}, {init, Copied, Nodes}) -> + BBChunkSize = couch_emsort:get_bb_chunk_size(Ems), + + % Subtract one because we already finished the first + % iteration when we were counting the number of nodes + % in the backbone. + Iters = calculate_sort_iters(Nodes, BBChunkSize, 0) - 1, + + % Compaction retries mean we may have copied more than + % doc count rows. This accounts for that by using the + % number we've actually copied. + [PrevCopied] = couch_task_status:get([changes_done]), + TotalCopied = PrevCopied + Copied, + + couch_task_status:update([ + {phase, sort_ids}, + {total_changes, Iters * TotalCopied}, + {changes_done, 0}, + {progress, 0} + ]), + 0; + +emsort_cb(_Ems, row_copy, Copied) + when is_integer(Copied), Copied > ?COMP_DOCID_BATCH_SIZE -> + update_compact_task(Copied + 1), + 0; + +emsort_cb(_Ems, row_copy, Copied) when is_integer(Copied) -> + Copied + 1; +emsort_cb(_Ems, _Event, St) -> + St. -copy_meta_data(#db{fd=Fd, header=Header}=Db) -> + +calculate_sort_iters(Nodes, BBChunk, Count) when Nodes < BBChunk -> + Count; +calculate_sort_iters(Nodes0, BBChunk, Count) when BBChunk > 1 -> + Calc = fun(N0) -> + N1 = N0 div BBChunk, + N1 + if N1 rem BBChunk == 0 -> 0; true -> 1 end + end, + Nodes1 = Calc(Nodes0), + Nodes2 = Calc(Nodes1), + calculate_sort_iters(Nodes2, BBChunk, Count + 2). + + +copy_meta_data(#db{fd=Fd, header=Header}=Db, TotalChanges) -> + couch_task_status:update([ + {phase, copy_ids}, + {changes_done, 0}, + {total_changes, TotalChanges}, + {progress, 0} + ]), Src = Db#db.id_tree, DstState = couch_db_header:id_tree_state(Header), {ok, IdTree0} = couch_btree:open(DstState, Fd, [ @@ -1348,6 +1425,7 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) -> {ok, SeqTree} = couch_btree:add_remove( Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs ), + update_compact_task(length(Acc#merge_st.infos)), Db#db{id_tree=IdTree, seq_tree=SeqTree}. @@ -1359,6 +1437,7 @@ merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 -> } = Acc, {ok, IdTree1} = couch_btree:add(IdTree0, Infos), {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs), + update_compact_task(length(Infos)), Acc1 = Acc#merge_st{ id_tree=IdTree1, seq_tree=SeqTree1, diff --git a/src/couch_emsort.erl b/src/couch_emsort.erl index 2a25a232..d7f1b2be 100644 --- a/src/couch_emsort.erl +++ b/src/couch_emsort.erl @@ -129,7 +129,8 @@ % CA3 CD3 % --export([open/1, open/2, get_fd/1, get_state/1]). +-export([open/1, open/2, set_options/2, get_fd/1, get_state/1]). +-export([get_bb_chunk_size/1]). -export([add/2, merge/1, sort/1, iter/1, next/1]). @@ -137,7 +138,9 @@ fd, root, bb_chunk = 10, - chain_chunk = 100 + chain_chunk = 100, + event_cb, + event_st }). @@ -156,7 +159,11 @@ set_options(Ems, [{root, Root} | Rest]) -> set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) -> set_options(Ems#ems{chain_chunk=Count}, Rest); set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) -> - set_options(Ems#ems{bb_chunk=Count}, Rest). + set_options(Ems#ems{bb_chunk=Count}, Rest); +set_options(Ems, [{event_cb, EventCB} | Rest]) when is_function(EventCB, 3) -> + set_options(Ems#ems{event_cb=EventCB}, Rest); +set_options(Ems, [{event_st, EventSt} | Rest]) -> + set_options(Ems#ems{event_st=EventSt}, Rest). get_fd(#ems{fd=Fd}) -> @@ -167,6 +174,10 @@ get_state(#ems{root=Root}) -> Root. +get_bb_chunk_size(#ems{bb_chunk = Size}) -> + Size. + + add(Ems, []) -> {ok, Ems}; add(Ems, KVs) -> @@ -224,7 +235,7 @@ decimate(#ems{root={_BB, nil}}=Ems) -> % We have less than bb_chunk backbone pointers so we're % good to start streaming KV's back to the client. Ems; -decimate(#ems{root={BB, NextBB}}=Ems) -> +decimate(#ems{}=Ems0) -> % To make sure we have a bounded amount of data in RAM % at any given point we first need to decimate the data % by performing the first couple iterations of a merge @@ -232,43 +243,47 @@ decimate(#ems{root={BB, NextBB}}=Ems) -> % The first pass gives us a sort with pointers linked from % largest to smallest. - {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB), + {ok, Ems1} = event_notify(Ems0, {merge_start, forward}), + {ok, Ems2} = merge_back_bone(Ems1, small), % We have to run a second pass so that links are pointed % back from smallest to largest. - {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB), + {ok, Ems3} = event_notify(Ems2, {merge_start, reverse}), + {ok, Ems4} = merge_back_bone(Ems3, big), % Continue deicmating until we have an acceptable bound on % the number of keys to use. - decimate(Ems#ems{root={FwdBB, FwdNextBB}}). + decimate(Ems4). -merge_back_bone(Ems, Choose, BB, NextBB) -> - BBPos = merge_chains(Ems, Choose, BB), - merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}). +merge_back_bone(#ems{root={BB, NextBB}}=Ems0, Choose) -> + {ok, Ems1, BBPos} = merge_chains(Ems0, Choose, BB), + merge_rest_back_bone(Ems1, Choose, NextBB, {[BBPos], nil}). -merge_rest_back_bone(_Ems, _Choose, nil, Acc) -> - Acc; -merge_rest_back_bone(Ems, Choose, BBPos, Acc) -> - {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos), - NewPos = merge_chains(Ems, Choose, BB), - {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk), - merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}). +merge_rest_back_bone(Ems, _Choose, nil, Acc) -> + {ok, Ems#ems{root=Acc}}; +merge_rest_back_bone(Ems0, Choose, BBPos, Acc) -> + {ok, {BB, NextBB}} = couch_file:pread_term(Ems0#ems.fd, BBPos), + {ok, Ems1, NewPos} = merge_chains(Ems0, Choose, BB), + {NewBB, NewPrev} = append_item(Ems1, Acc, NewPos, Ems1#ems.bb_chunk), + merge_rest_back_bone(Ems1, Choose, NextBB, {NewBB, NewPrev}). -merge_chains(Ems, Choose, BB) -> - Chains = init_chains(Ems, Choose, BB), - merge_chains(Ems, Choose, Chains, {[], nil}). +merge_chains(Ems0, Choose, BB) -> + {ok, Ems1} = event_notify(Ems0, {merge, chain}), + Chains = init_chains(Ems1, Choose, BB), + merge_chains(Ems1, Choose, Chains, {[], nil}). merge_chains(Ems, _Choose, [], ChainAcc) -> {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc), - CPos; -merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) -> - {KV, RestChains} = choose_kv(Choose, Ems, Chains), - {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC), - merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}). + {ok, Ems, CPos}; +merge_chains(#ems{chain_chunk=CC}=Ems0, Choose, Chains, Acc) -> + {KV, RestChains} = choose_kv(Choose, Ems0, Chains), + {NewKVs, NewPrev} = append_item(Ems0, Acc, KV, CC), + {ok, Ems1} = event_notify(Ems0, row_copy), + merge_chains(Ems1, Choose, RestChains, {NewKVs, NewPrev}). init_chains(Ems, Choose, BB) -> @@ -316,3 +331,9 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size -> append_item(_Ems, {List, Prev}, Pos, _Size) -> {[Pos | List], Prev}. + +event_notify(#ems{event_cb = undefined} = Ems, _) -> + {ok, Ems}; +event_notify(#ems{event_cb=EventCB, event_st=EventSt}=Ems, Event) -> + NewSt = EventCB(Ems, Event, EventSt), + {ok, Ems#ems{event_st=NewSt}}.