diff --git a/src/couch_index/src/couch_index_util.erl b/src/couch_index/src/couch_index_util.erl index db8aad470e1..9a16d06d67b 100644 --- a/src/couch_index/src/couch_index_util.erl +++ b/src/couch_index/src/couch_index_util.erl @@ -14,6 +14,7 @@ -export([root_dir/0, index_dir/2, index_file/3]). -export([load_doc/3, sort_lib/1, hexsig/1]). +-export([get_purge_checkpoints/2, cleanup_purges/3]). -include_lib("couch/include/couch_db.hrl"). @@ -72,3 +73,49 @@ sort_lib([{LName, LCode} | Rest], LAcc) -> hexsig(Sig) -> couch_util:to_hex(Sig). + +% Helper function for indexes to get their purge checkpoints as signatures. +% +get_purge_checkpoints(DbName, Type) when is_binary(DbName), is_binary(Type) -> + couch_util:with_db(DbName, fun(Db) -> get_purge_checkpoints(Db, Type) end); +get_purge_checkpoints(Db, Type) when is_binary(Type) -> + Prefix = <>, + PrefixSize = byte_size(Prefix), + FoldFun = fun(#doc{id = Id}, Acc) -> + case Id of + <> -> {ok, Acc#{Sig => Id}}; + _ -> {stop, Acc} + end + end, + Opts = [{start_key, Prefix}], + {ok, Signatures = #{}} = couch_db:fold_local_docs(Db, FoldFun, #{}, Opts), + Signatures. + +% Helper functions for indexes to clean their purge checkpoints. +% +cleanup_purges(DbName, #{} = Sigs, #{} = Checkpoints) when is_binary(DbName) -> + couch_util:with_db(DbName, fun(Db) -> + cleanup_purges(Db, Sigs, Checkpoints) + end); +cleanup_purges(Db, #{} = Sigs, #{} = CheckpointsMap) -> + InactiveMap = maps:without(maps:keys(Sigs), CheckpointsMap), + InactiveCheckpoints = maps:values(InactiveMap), + DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end, + lists:foreach(DeleteFun, InactiveCheckpoints). + +delete_checkpoint(Db, DocId) -> + DbName = couch_db:name(Db), + LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s", + couch_log:debug(LogMsg, [?MODULE, DbName, DocId]), + try couch_db:open_doc(Db, DocId, []) of + {ok, Doc = #doc{}} -> + Deleted = Doc#doc{deleted = true, body = {[]}}, + couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]); + {not_found, _} -> + ok + catch + Tag:Error -> + ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p", + couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]), + ok + end. diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl index 5b5afbdce07..e8a2833a7ca 100644 --- a/src/couch_mrview/src/couch_mrview_cleanup.erl +++ b/src/couch_mrview/src/couch_mrview_cleanup.erl @@ -14,12 +14,9 @@ -export([ run/1, - cleanup_purges/3, - cleanup_indices/2 + cleanup/2 ]). --include_lib("couch/include/couch_db.hrl"). - run(Db) -> Indices = couch_mrview_util:get_index_files(Db), Checkpoints = couch_mrview_util:get_purge_checkpoints(Db), @@ -28,15 +25,26 @@ run(Db) -> ok = cleanup_purges(Db1, Sigs, Checkpoints), ok = cleanup_indices(Sigs, Indices). -cleanup_purges(DbName, Sigs, Checkpoints) when is_binary(DbName) -> - couch_util:with_db(DbName, fun(Db) -> - cleanup_purges(Db, Sigs, Checkpoints) - end); -cleanup_purges(Db, #{} = Sigs, #{} = CheckpointsMap) -> - InactiveMap = maps:without(maps:keys(Sigs), CheckpointsMap), - InactiveCheckpoints = maps:values(InactiveMap), - DeleteFun = fun(DocId) -> delete_checkpoint(Db, DocId) end, - lists:foreach(DeleteFun, InactiveCheckpoints). +% erpc endpoint for fabric_index_cleanup:cleanup_indexes/2 +% +cleanup(Dbs, #{} = Sigs) -> + try + lists:foreach( + fun(Db) -> + Indices = couch_mrview_util:get_index_files(Db), + Checkpoints = couch_mrview_util:get_purge_checkpoints(Db), + ok = cleanup_purges(Db, Sigs, Checkpoints), + ok = cleanup_indices(Sigs, Indices) + end, + Dbs + ) + catch + error:database_does_not_exist -> + ok + end. + +cleanup_purges(Db, Sigs, Checkpoints) -> + couch_index_util:cleanup_purges(Db, Sigs, Checkpoints). cleanup_indices(#{} = Sigs, #{} = IndexMap) -> Fun = fun(_, Files) -> lists:foreach(fun delete_file/1, Files) end, @@ -54,20 +62,3 @@ delete_file(File) -> couch_log:error(ErrLog, [?MODULE, File, Tag, Error]), ok end. - -delete_checkpoint(Db, DocId) -> - DbName = couch_db:name(Db), - LogMsg = "~p : deleting inactive purge checkpoint ~s : ~s", - couch_log:debug(LogMsg, [?MODULE, DbName, DocId]), - try couch_db:open_doc(Db, DocId, []) of - {ok, Doc = #doc{}} -> - Deleted = Doc#doc{deleted = true, body = {[]}}, - couch_db:update_doc(Db, Deleted, [?ADMIN_CTX]); - {not_found, _} -> - ok - catch - Tag:Error -> - ErrLog = "~p : error deleting checkpoint ~s : ~s error: ~p:~p", - couch_log:error(ErrLog, [?MODULE, DbName, DocId, Tag, Error]), - ok - end. diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 7149fc1a544..5405e8db826 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -16,6 +16,7 @@ -export([get_local_purge_doc_id/1, get_value_from_options/2]). -export([verify_view_filename/1, get_signature_from_filename/1]). -export([get_signatures/1, get_purge_checkpoints/1, get_index_files/1]). +-export([get_signatures_from_ddocs/2]). -export([ddoc_to_mrst/2, init_state/4, reset_index/3]). -export([make_header/1]). -export([index_file/2, compaction_file/2, open_file/1]). @@ -94,40 +95,35 @@ get_signatures(DbName) when is_binary(DbName) -> couch_util:with_db(DbName, fun get_signatures/1); get_signatures(Db) -> DbName = couch_db:name(Db), - % get_design_docs/1 returns ejson for clustered shards, and - % #full_doc_info{}'s for other cases. {ok, DDocs} = couch_db:get_design_docs(Db), + % get_design_docs/1 returns ejson for clustered shards, and + % #full_doc_info{}'s for other cases. Both are transformed to #doc{} records FoldFun = fun ({[_ | _]} = EJsonDoc, Acc) -> Doc = couch_doc:from_json_obj(EJsonDoc), - {ok, Mrst} = ddoc_to_mrst(DbName, Doc), - Sig = couch_util:to_hex_bin(Mrst#mrst.sig), - Acc#{Sig => true}; + [Doc | Acc]; (#full_doc_info{} = FDI, Acc) -> {ok, Doc} = couch_db:open_doc_int(Db, FDI, [ejson_body]), - {ok, Mrst} = ddoc_to_mrst(DbName, Doc), - Sig = couch_util:to_hex_bin(Mrst#mrst.sig), - Acc#{Sig => true} + [Doc | Acc] + end, + DDocs1 = lists:foldl(FoldFun, [], DDocs), + get_signatures_from_ddocs(DbName, DDocs1). + +% From a list of design #doc{} records returns signatures map: #{Sig => true} +% +get_signatures_from_ddocs(DbName, DDocs) when is_list(DDocs) -> + FoldFun = fun(#doc{} = Doc, Acc) -> + {ok, Mrst} = ddoc_to_mrst(DbName, Doc), + Sig = couch_util:to_hex_bin(Mrst#mrst.sig), + Acc#{Sig => true} end, lists:foldl(FoldFun, #{}, DDocs). % Returns a map of `Sig => DocId` elements for all the purge view % checkpoint docs. Sig is a hex-encoded binary. % -get_purge_checkpoints(DbName) when is_binary(DbName) -> - couch_util:with_db(DbName, fun get_purge_checkpoints/1); get_purge_checkpoints(Db) -> - FoldFun = fun(#doc{id = Id}, Acc) -> - case Id of - <> -> - {ok, Acc#{Sig => Id}}; - _ -> - {stop, Acc} - end - end, - Opts = [{start_key, <>}], - {ok, Signatures = #{}} = couch_db:fold_local_docs(Db, FoldFun, #{}, Opts), - Signatures. + couch_index_util:get_purge_checkpoints(Db, <<"mrview">>). % Returns a map of `Sig => [FilePath, ...]` elements. Sig is a hex-encoded % binary and FilePaths are lists as they intended to be passed to couch_file diff --git a/src/dreyfus/src/clouseau_rpc.erl b/src/dreyfus/src/clouseau_rpc.erl index 921746a7a6a..c036a5a9af0 100644 --- a/src/dreyfus/src/clouseau_rpc.erl +++ b/src/dreyfus/src/clouseau_rpc.erl @@ -262,10 +262,17 @@ rename(DbName) -> %% and an analyzer represented in a Javascript function in a design document. %% `Sig` is used to check if an index description is changed, %% and the index needs to be reconstructed. --spec cleanup(DbName :: string_as_binary(_), ActiveSigs :: [sig()]) -> +-spec cleanup(DbName :: string_as_binary(_), SigList :: list() | SigMap :: #{sig() => true}) -> ok. -cleanup(DbName, ActiveSigs) -> +% Compatibility clause to help when running search index cleanup during +% a mixed cluster state. Remove after version 3.6 +% +cleanup(DbName, SigList) when is_list(SigList) -> + SigMap = #{Sig => true || Sig <- SigList}, + cleanup(DbName, SigMap); +cleanup(DbName, #{} = SigMap) -> + ActiveSigs = maps:keys(SigMap), gen_server:cast({cleanup, clouseau()}, {cleanup, DbName, ActiveSigs}). %% a binary with value <<"tokens">> diff --git a/src/dreyfus/src/dreyfus_fabric_cleanup.erl b/src/dreyfus/src/dreyfus_fabric_cleanup.erl index 86960812d40..0488211be91 100644 --- a/src/dreyfus/src/dreyfus_fabric_cleanup.erl +++ b/src/dreyfus/src/dreyfus_fabric_cleanup.erl @@ -14,99 +14,50 @@ -module(dreyfus_fabric_cleanup). --include("dreyfus.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --export([go/1]). +-export([go/1, go_local/3]). go(DbName) -> - DesignDocs = - case fabric:design_docs(DbName) of - {ok, DDocs} when is_list(DDocs) -> - DDocs; - Else -> - couch_log:debug("Invalid design docs: ~p~n", [Else]), - [] - end, - ActiveSigs = lists:usort( - lists:flatmap( - fun active_sigs/1, - [couch_doc:from_json_obj(DD) || DD <- DesignDocs] - ) - ), - cleanup_local_purge_doc(DbName, ActiveSigs), - clouseau_rpc:cleanup(DbName, ActiveSigs), - ok. + case fabric_util:get_design_doc_records(DbName) of + {ok, DDocs} when is_list(DDocs) -> + Sigs = dreyfus_util:get_signatures_from_ddocs(DbName, DDocs), + Shards = mem3:shards(DbName), + ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1, Shards), + Fun = fun(Node, Dbs, Acc) -> + erpc:send_request(Node, ?MODULE, go_local, [DbName, Dbs, Sigs], Node, Acc) + end, + Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode), + recv(DbName, Reqs, fabric_util:abs_request_timeout()); + Error -> + couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE, DbName, Error]), + Error + end. -active_sigs(#doc{body = {Fields}} = Doc) -> +% erpc endpoint for go/1 and fabric_index_cleanup:cleanup_indexes/2 +% +go_local(DbName, Dbs, #{} = Sigs) -> try - {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}), - {IndexNames, _} = lists:unzip(RawIndexes), - [ - begin - {ok, Index} = dreyfus_index:design_doc_to_index(Doc, IndexName), - Index#index.sig - end - || IndexName <- IndexNames - ] + lists:foreach( + fun(Db) -> + Checkpoints = dreyfus_util:get_purge_checkpoints(Db), + ok = couch_index_util:cleanup_purges(Db, Sigs, Checkpoints) + end, + Dbs + ), + clouseau_rpc:cleanup(DbName, Sigs), + ok catch - error:{badmatch, _Error} -> - [] + error:database_does_not_exist -> + ok end. -cleanup_local_purge_doc(DbName, ActiveSigs) -> - {ok, BaseDir} = clouseau_rpc:get_root_dir(), - DbNamePattern = <>, - Pattern0 = filename:join([BaseDir, "shards", "*", DbNamePattern, "*"]), - Pattern = binary_to_list(iolist_to_binary(Pattern0)), - DirListStrs = filelib:wildcard(Pattern), - DirList = [iolist_to_binary(DL) || DL <- DirListStrs], - LocalShards = mem3:local_shards(DbName), - ActiveDirs = lists:foldl( - fun(LS, AccOuter) -> - lists:foldl( - fun(Sig, AccInner) -> - DirName = filename:join([BaseDir, LS#shard.name, Sig]), - [DirName | AccInner] - end, - AccOuter, - ActiveSigs - ) - end, - [], - LocalShards - ), - - DeadDirs = DirList -- ActiveDirs, - lists:foreach( - fun(IdxDir) -> - Sig = dreyfus_util:get_signature_from_idxdir(IdxDir), - case Sig of - undefined -> - ok; - _ -> - DocId = dreyfus_util:get_local_purge_doc_id(Sig), - LocalShards = mem3:local_shards(DbName), - lists:foreach( - fun(LS) -> - ShardDbName = LS#shard.name, - {ok, ShardDb} = couch_db:open_int(ShardDbName, []), - case couch_db:open_doc(ShardDb, DocId, []) of - {ok, LocalPurgeDoc} -> - couch_db:update_doc( - ShardDb, - LocalPurgeDoc#doc{deleted = true}, - [?ADMIN_CTX] - ); - {not_found, _} -> - ok - end, - couch_db:close(ShardDb) - end, - LocalShards - ) - end - end, - DeadDirs - ). +recv(DbName, Reqs, Timeout) -> + case erpc:receive_response(Reqs, Timeout, true) of + {ok, _Lable, Reqs1} -> + recv(DbName, Reqs1, Timeout); + {Error, Label, Reqs1} -> + ErrMsg = "~p : error cleaning dreyfus indexes db:~p req:~p error:~p", + couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]), + recv(DbName, Reqs1, Timeout); + no_request -> + ok + end. diff --git a/src/dreyfus/src/dreyfus_index.erl b/src/dreyfus/src/dreyfus_index.erl index c97a837d51c..5295a0065fb 100644 --- a/src/dreyfus/src/dreyfus_index.erl +++ b/src/dreyfus/src/dreyfus_index.erl @@ -22,13 +22,13 @@ % public api. -export([ start_link/2, - design_doc_to_index/2, + design_doc_to_index/3, await/2, search/2, info/1, group1/2, group2/2, - design_doc_to_indexes/1 + design_doc_to_indexes/2 ]). % gen_server api. @@ -87,14 +87,14 @@ to_index_pid(Pid) -> false -> Pid end. -design_doc_to_indexes(#doc{body = {Fields}} = Doc) -> +design_doc_to_indexes(DbName, #doc{body = {Fields}} = Doc) -> RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}), case RawIndexes of {IndexList} when is_list(IndexList) -> {IndexNames, _} = lists:unzip(IndexList), lists:flatmap( fun(IndexName) -> - case (catch design_doc_to_index(Doc, IndexName)) of + case (catch design_doc_to_index(DbName, Doc, IndexName)) of {ok, #index{} = Index} -> [Index]; _ -> [] end @@ -301,7 +301,7 @@ open_index(DbName, #index{analyzer = Analyzer, sig = Sig}) -> Error end. -design_doc_to_index(#doc{id = Id, body = {Fields}}, IndexName) -> +design_doc_to_index(DbName, #doc{id = Id, body = {Fields}}, IndexName) -> Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>), {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}), InvalidDDocError = @@ -323,6 +323,7 @@ design_doc_to_index(#doc{id = Id, body = {Fields}}, IndexName) -> ) ), {ok, #index{ + dbname = DbName, analyzer = Analyzer, ddoc_id = Id, def = Def, diff --git a/src/dreyfus/src/dreyfus_rpc.erl b/src/dreyfus/src/dreyfus_rpc.erl index ffa75fb614b..3aed82d76ce 100644 --- a/src/dreyfus/src/dreyfus_rpc.erl +++ b/src/dreyfus/src/dreyfus_rpc.erl @@ -46,7 +46,7 @@ call(Fun, DbName, DDoc, IndexName, QueryArgs0) -> stale = Stale } = QueryArgs, {_LastSeq, MinSeq} = calculate_seqs(Db, Stale), - case dreyfus_index:design_doc_to_index(DDoc, IndexName) of + case dreyfus_index:design_doc_to_index(DbName, DDoc, IndexName) of {ok, Index} -> try rexi:reply(index_call(Fun, DbName, Index, QueryArgs, MinSeq)) @@ -81,7 +81,7 @@ info(DbName, DDoc, IndexName) -> info_int(DbName, DDoc, IndexName) -> erlang:put(io_priority, {search, DbName}), check_interactive_mode(), - case dreyfus_index:design_doc_to_index(DDoc, IndexName) of + case dreyfus_index:design_doc_to_index(DbName, DDoc, IndexName) of {ok, Index} -> case dreyfus_index_manager:get_index(DbName, Index) of {ok, Pid} -> @@ -102,7 +102,7 @@ info_int(DbName, DDoc, IndexName) -> disk_size(DbName, DDoc, IndexName) -> erlang:put(io_priority, {search, DbName}), check_interactive_mode(), - case dreyfus_index:design_doc_to_index(DDoc, IndexName) of + case dreyfus_index:design_doc_to_index(DbName, DDoc, IndexName) of {ok, Index} -> Result = dreyfus_index_manager:get_disk_size(DbName, Index), rexi:reply(Result); diff --git a/src/dreyfus/src/dreyfus_util.erl b/src/dreyfus/src/dreyfus_util.erl index a8afc36b605..b8806c0893e 100644 --- a/src/dreyfus/src/dreyfus_util.erl +++ b/src/dreyfus/src/dreyfus_util.erl @@ -25,9 +25,11 @@ ensure_local_purge_docs/2, get_value_from_options/2, get_local_purge_doc_id/1, + get_purge_checkpoints/1, get_local_purge_doc_body/4, maybe_create_local_purge_doc/2, maybe_create_local_purge_doc/3, + get_signatures_from_ddocs/2, get_signature_from_idxdir/1, verify_index_exists/2 ]). @@ -305,7 +307,7 @@ ensure_local_purge_docs(DbName, DDocs) -> undefined -> false; _ -> - try dreyfus_index:design_doc_to_indexes(DDoc) of + try dreyfus_index:design_doc_to_indexes(DbName, DDoc) of SIndexes -> ensure_local_purge_doc(Db, SIndexes) catch _:_ -> @@ -360,6 +362,32 @@ maybe_create_local_purge_doc(Db, IndexPid, Index) -> get_local_purge_doc_id(Sig) -> ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ "dreyfus-" ++ Sig). +% Returns a map of `Sig => DocId` elements for all the purge view +% checkpoint docs. Sig is a hex-encoded binary. +% +get_purge_checkpoints(Db) -> + couch_index_util:get_purge_checkpoints(Db, <<"dreyfus">>). + +get_signatures_from_ddocs(DbName, DesignDocs) -> + SigList = lists:flatmap(fun(Doc) -> active_sigs(DbName, Doc) end, DesignDocs), + #{Sig => true || Sig <- SigList}. + +active_sigs(DbName, #doc{body = {Fields}} = Doc) -> + try + {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}), + {IndexNames, _} = lists:unzip(RawIndexes), + [ + begin + {ok, Index} = dreyfus_index:design_doc_to_index(DbName, Doc, IndexName), + Index#index.sig + end + || IndexName <- IndexNames + ] + catch + error:{badmatch, _Error} -> + [] + end. + get_signature_from_idxdir(IdxDir) -> IdxDirList = filename:split(IdxDir), Sig = lists:last(IdxDirList), @@ -415,7 +443,7 @@ verify_index_exists(DbName, Props) -> case couch_db:get_design_doc(Db, DDocId) of {ok, #doc{} = DDoc} -> {ok, IdxState} = dreyfus_index:design_doc_to_index( - DDoc, IndexName + DbName, DDoc, IndexName ), IdxState#index.sig == Sig; {not_found, _} -> diff --git a/src/dreyfus/test/eunit/dreyfus_purge_test.erl b/src/dreyfus/test/eunit/dreyfus_purge_test.erl index 17bd5cd892f..a7c0068e01c 100644 --- a/src/dreyfus/test/eunit/dreyfus_purge_test.erl +++ b/src/dreyfus/test/eunit/dreyfus_purge_test.erl @@ -1102,17 +1102,17 @@ get_sigs(DbName) -> {ok, DesignDocs} = fabric:design_docs(DbName), lists:usort( lists:flatmap( - fun active_sigs/1, + fun(Doc) -> active_sigs(DbName, Doc) end, [couch_doc:from_json_obj(DD) || DD <- DesignDocs] ) ). -active_sigs(#doc{body = {Fields}} = Doc) -> +active_sigs(DbName, #doc{body = {Fields}} = Doc) -> {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}), {IndexNames, _} = lists:unzip(RawIndexes), [ begin - {ok, Index} = dreyfus_index:design_doc_to_index(Doc, IndexName), + {ok, Index} = dreyfus_index:design_doc_to_index(DbName, Doc, IndexName), Index#index.sig end || IndexName <- IndexNames diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 0d567ac474a..3a6ce7ebd48 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -66,9 +66,10 @@ -export([ design_docs/1, reset_validation_funs/1, - cleanup_index_files/0, - cleanup_index_files/1, + cleanup_index_files_all_nodes/0, cleanup_index_files_all_nodes/1, + cleanup_index_files_this_node/0, + cleanup_index_files_this_node/1, dbname/1, db_uuids/1 ]). @@ -634,54 +635,17 @@ reset_validation_funs(DbName) -> || #shard{node = Node, name = Name} <- mem3:shards(DbName) ]. -%% @doc clean up index files for all Dbs --spec cleanup_index_files() -> [ok]. -cleanup_index_files() -> - {ok, Dbs} = fabric:all_dbs(), - [cleanup_index_files(Db) || Db <- Dbs]. +cleanup_index_files_this_node() -> + fabric_index_cleanup:cleanup_this_node(). -%% @doc clean up index files for a specific db --spec cleanup_index_files(dbname()) -> ok. -cleanup_index_files(DbName) -> - try - ShardNames = [mem3:name(S) || S <- mem3:local_shards(dbname(DbName))], - cleanup_local_indices_and_purge_checkpoints(ShardNames) - catch - error:database_does_not_exist -> - ok - end. +cleanup_index_files_this_node(Db) -> + fabric_index_cleanup:cleanup_this_node(dbname(Db)). -cleanup_local_indices_and_purge_checkpoints([]) -> - ok; -cleanup_local_indices_and_purge_checkpoints([_ | _] = Dbs) -> - AllIndices = lists:map(fun couch_mrview_util:get_index_files/1, Dbs), - AllPurges = lists:map(fun couch_mrview_util:get_purge_checkpoints/1, Dbs), - Sigs = couch_mrview_util:get_signatures(hd(Dbs)), - ok = cleanup_purges(Sigs, AllPurges, Dbs), - ok = cleanup_indices(Sigs, AllIndices). - -cleanup_purges(Sigs, AllPurges, Dbs) -> - Fun = fun(DbPurges, Db) -> - couch_mrview_cleanup:cleanup_purges(Db, Sigs, DbPurges) - end, - lists:zipwith(Fun, AllPurges, Dbs), - ok. +cleanup_index_files_all_nodes() -> + fabric_index_cleanup:cleanup_all_nodes(). -cleanup_indices(Sigs, AllIndices) -> - Fun = fun(DbIndices) -> - couch_mrview_cleanup:cleanup_indices(Sigs, DbIndices) - end, - lists:foreach(Fun, AllIndices). - -%% @doc clean up index files for a specific db on all nodes --spec cleanup_index_files_all_nodes(dbname()) -> [reference()]. -cleanup_index_files_all_nodes(DbName) -> - lists:foreach( - fun(Node) -> - rexi:cast(Node, {?MODULE, cleanup_index_files, [DbName]}) - end, - mem3:nodes() - ). +cleanup_index_files_all_nodes(Db) -> + fabric_index_cleanup:cleanup_all_nodes(dbname(Db)). %% some simple type validation and transcoding dbname(DbName) when is_list(DbName) -> diff --git a/src/fabric/src/fabric_index_cleanup.erl b/src/fabric/src/fabric_index_cleanup.erl new file mode 100644 index 00000000000..13759ba1d1e --- /dev/null +++ b/src/fabric/src/fabric_index_cleanup.erl @@ -0,0 +1,81 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_index_cleanup). + +-export([ + cleanup_all_nodes/0, + cleanup_all_nodes/1, + cleanup_this_node/0, + cleanup_this_node/1 +]). + +cleanup_all_nodes() -> + Fun = fun(DbName, _) -> cleanup_all_nodes(DbName) end, + mem3:fold_dbs(Fun, nil), + ok. + +cleanup_all_nodes(DbName) -> + cleanup_indexes(DbName, mem3_util:live_nodes()). + +cleanup_this_node() -> + Fun = fun(DbName, _) -> + case mem3:local_shards(DbName) of + [_ | _] -> cleanup_this_node(DbName); + [] -> ok + end + end, + mem3:fold_dbs(Fun, nil), + ok. + +cleanup_this_node(DbName) -> + cleanup_indexes(DbName, [config:node_name()]). + +cleanup_indexes(DbName, Nodes) -> + try fabric_util:get_design_doc_records(DbName) of + {ok, DDocs} when is_list(DDocs) -> + VSigs = couch_mrview_util:get_signatures_from_ddocs(DbName, DDocs), + DSigs = dreyfus_util:get_signatures_from_ddocs(DbName, DDocs), + NSigs = nouveau_util:get_signatures_from_ddocs(DbName, DDocs), + Shards = [S || S <- mem3:shards(DbName), lists:member(mem3:node(S), Nodes)], + ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1, Shards), + Fun = fun(Node, Dbs, Acc) -> + Acc1 = send(Node, couch_mrview_cleanup, cleanup, [Dbs, VSigs], Acc), + Acc2 = send(Node, dreyfus_fabric_cleanup, go_local, [DbName, Dbs, DSigs], Acc1), + Acc3 = send(Node, nouveau_fabric_cleanup, go_local, [DbName, Dbs, NSigs], Acc2), + Acc3 + end, + Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode), + recv(DbName, Reqs, fabric_util:abs_request_timeout()); + Error -> + couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE, DbName, Error]), + Error + catch + error:database_does_not_exist -> + ok + end. + +send(Node, M, F, A, Reqs) -> + Label = {Node, M, F}, + erpc:send_request(Node, M, F, A, Label, Reqs). + +recv(DbName, Reqs, Timeout) -> + case erpc:receive_response(Reqs, Timeout, true) of + {ok, _Label, Reqs1} -> + recv(DbName, Reqs1, Timeout); + {Error, Label, Reqs1} -> + ErrMsg = "~p : error cleaning indexes db:~p req:~p error:~p", + couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]), + recv(DbName, Reqs1, Timeout); + no_request -> + ok + end. diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 88d9839df68..b10f20a637f 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -26,6 +26,7 @@ doc_id_and_rev/1 ]). -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0, view_timeout/1, timeout/2]). +-export([abs_request_timeout/0]). -export([log_timeout/2, remove_done_workers/2]). -export([is_users_db/1, is_replicator_db/1]). -export([open_cluster_db/1, open_cluster_db/2]). @@ -35,6 +36,7 @@ -export([worker_ranges/1]). -export([get_uuid_prefix_len/0]). -export([isolate/1, isolate/2]). +-export([get_design_doc_records/1]). -compile({inline, [{doc_id_and_rev, 1}]}). @@ -107,6 +109,15 @@ log_timeout(Workers, EndPoint) -> Workers ). +% Return {abs, MonotonicMSec}. This is a format used by erpc to +% provide an absolute time limit for a collection or requests +% See https://www.erlang.org/doc/apps/kernel/erpc.html#t:timeout_time/0 +% +abs_request_timeout() -> + Timeout = fabric_util:request_timeout(), + NowMSec = erlang:monotonic_time(millisecond), + {abs, NowMSec + Timeout}. + remove_done_workers(Workers, WaitingIndicator) -> [W || {W, WI} <- fabric_dict:to_list(Workers), WI == WaitingIndicator]. @@ -391,6 +402,19 @@ worker_ranges(Workers) -> get_uuid_prefix_len() -> config:get_integer("fabric", "uuid_prefix_len", 7). +% Get design #doc{} records. Run in an isolated process. This is often used +% when computing signatures of various indexes +% +get_design_doc_records(DbName) -> + fabric_util:isolate(fun() -> + case fabric:design_docs(DbName) of + {ok, DDocs} when is_list(DDocs) -> + Fun = fun({[_ | _]} = Doc) -> couch_doc:from_json_obj(Doc) end, + {ok, lists:map(Fun, DDocs)}; + Else -> + Else + end + end). % If we issue multiple fabric calls from the same process we have to isolate % them so in case of error they don't pollute the processes dictionary or the % mailbox diff --git a/src/fabric/test/eunit/fabric_tests.erl b/src/fabric/test/eunit/fabric_tests.erl index 2788af19ecc..77327f4458a 100644 --- a/src/fabric/test/eunit/fabric_tests.erl +++ b/src/fabric/test/eunit/fabric_tests.erl @@ -47,17 +47,20 @@ teardown({Ctx, DbName}) -> test_util:stop_couch(Ctx). t_cleanup_index_files(_) -> - CheckFun = fun(Res) -> Res =:= ok end, - ?assert(lists:all(CheckFun, fabric:cleanup_index_files())). + ?assertEqual(ok, fabric:cleanup_index_files_this_node()), + ?assertEqual(ok, fabric:cleanup_index_files_all_nodes()). t_cleanup_index_files_with_existing_db({_, DbName}) -> - ?assertEqual(ok, fabric:cleanup_index_files(DbName)). + ?assertEqual(ok, fabric:cleanup_index_files_this_node(DbName)), + ?assertEqual(ok, fabric:cleanup_index_files_all_nodes(DbName)), + ?assertEqual(ok, fabric:cleanup_index_files_this_node(<<"non_existent">>)), + ?assertEqual(ok, fabric:cleanup_index_files_all_nodes(<<"non_existent">>)). t_cleanup_index_files_with_view_data({_, DbName}) -> Sigs = sigs(DbName), Indices = indices(DbName), Purges = purges(DbName), - ok = fabric:cleanup_index_files(DbName), + ok = fabric:cleanup_index_files_all_nodes(DbName), % We haven't inadvertently removed any active index bits ?assertEqual(Sigs, sigs(DbName)), ?assertEqual(Indices, indices(DbName)), @@ -65,7 +68,7 @@ t_cleanup_index_files_with_view_data({_, DbName}) -> t_cleanup_index_files_with_deleted_db(_) -> SomeDb = ?tempdb(), - ?assertEqual(ok, fabric:cleanup_index_files(SomeDb)). + ?assertEqual(ok, fabric:cleanup_index_files_all_nodes(SomeDb)). t_cleanup_index_file_after_ddoc_update({_, DbName}) -> ?assertEqual( @@ -84,7 +87,7 @@ t_cleanup_index_file_after_ddoc_update({_, DbName}) -> ), update_ddoc(DbName, <<"_design/foo">>, <<"bar1">>), - ok = fabric:cleanup_index_files(DbName), + ok = fabric:cleanup_index_files_all_nodes(DbName), {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar1">>), % One 4bc stays, da8 should gone and 9e3 is added @@ -120,7 +123,7 @@ t_cleanup_index_file_after_ddoc_delete({_, DbName}) -> ), delete_ddoc(DbName, <<"_design/foo">>), - ok = fabric:cleanup_index_files(DbName), + ok = fabric:cleanup_index_files_all_nodes(DbName), % 4bc stays the same, da8 should be gone ?assertEqual( @@ -137,13 +140,13 @@ t_cleanup_index_file_after_ddoc_delete({_, DbName}) -> ), delete_ddoc(DbName, <<"_design/boo">>), - ok = fabric:cleanup_index_files(DbName), + ok = fabric:cleanup_index_files_all_nodes(DbName), ?assertEqual([], indices(DbName)), ?assertEqual([], purges(DbName)), % cleaning a db with all deleted indices should still work - ok = fabric:cleanup_index_files(DbName), + ok = fabric:cleanup_index_files_all_nodes(DbName), ?assertEqual([], indices(DbName)), ?assertEqual([], purges(DbName)). diff --git a/src/ken/src/ken_server.erl b/src/ken/src/ken_server.erl index b73088e6cca..acafb41c54d 100644 --- a/src/ken/src/ken_server.erl +++ b/src/ken/src/ken_server.erl @@ -341,7 +341,7 @@ update_ddoc_indexes(Name, #doc{} = Doc, State) -> search_updated(Name, Doc, Seq, State) -> case should_update(Doc, <<"indexes">>) of true -> - try dreyfus_index:design_doc_to_indexes(Doc) of + try dreyfus_index:design_doc_to_indexes(Name, Doc) of SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State) catch _:_ -> diff --git a/src/mem3/src/mem3_reshard_index.erl b/src/mem3/src/mem3_reshard_index.erl index 41e225d2216..0cd6f2cf31e 100644 --- a/src/mem3/src/mem3_reshard_index.erl +++ b/src/mem3/src/mem3_reshard_index.erl @@ -101,7 +101,7 @@ nouveau_indices(DbName, Doc) -> dreyfus_indices(DbName, Doc) -> try - Indices = dreyfus_index:design_doc_to_indexes(Doc), + Indices = dreyfus_index:design_doc_to_indexes(DbName, Doc), [{?DREYFUS, DbName, Index} || Index <- Indices] catch Tag:Err -> diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl index a72f07d6e34..9fed4e29546 100644 --- a/src/mem3/test/eunit/mem3_reshard_test.erl +++ b/src/mem3/test/eunit/mem3_reshard_test.erl @@ -371,7 +371,7 @@ indices_can_be_built_with_errors(#{db1 := Db}) -> end)}. mock_dreyfus_indices() -> - meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) -> + meck:expect(dreyfus_index, design_doc_to_indexes, fun(_, Doc) -> #doc{body = {BodyProps}} = Doc, case couch_util:get_value(<<"indexes">>, BodyProps) of undefined -> diff --git a/src/nouveau/src/nouveau_fabric_cleanup.erl b/src/nouveau/src/nouveau_fabric_cleanup.erl index 07167e8c788..75c2190b8d4 100644 --- a/src/nouveau/src/nouveau_fabric_cleanup.erl +++ b/src/nouveau/src/nouveau_fabric_cleanup.erl @@ -14,40 +14,52 @@ -module(nouveau_fabric_cleanup). --include_lib("couch/include/couch_db.hrl"). - --include("nouveau.hrl"). --include_lib("mem3/include/mem3.hrl"). - --export([go/1]). +-export([go/1, go_local/3]). go(DbName) -> - DesignDocs = - case fabric:design_docs(DbName) of - {ok, DDocs} when is_list(DDocs) -> - DDocs; - Else -> - couch_log:debug("Invalid design docs: ~p~n", [Else]), - [] - end, - ActiveSigs = - lists:usort( - lists:flatmap( - fun(Doc) -> active_sigs(DbName, Doc) end, - [couch_doc:from_json_obj(DD) || DD <- DesignDocs] - ) - ), - Shards = mem3:shards(DbName), - lists:foreach( - fun(Shard) -> - Path = - <<"shards/", (mem3_util:range_to_hex(Shard#shard.range))/binary, "/", DbName/binary, - ".*/*">>, - rexi:cast(Shard#shard.node, {nouveau_rpc, cleanup, [Path, ActiveSigs]}) - end, - Shards - ). + case fabric_util:get_design_doc_records(DbName) of + {ok, DDocs} when is_list(DDocs) -> + Sigs = nouveau_util:get_signatures_from_ddocs(DbName, DDocs), + Shards = mem3:shards(DbName), + ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1, Shards), + Fun = fun(Node, Dbs, Acc) -> + erpc:send_request(Node, ?MODULE, go_local, [DbName, Dbs, Sigs], Node, Acc) + end, + Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode), + recv(DbName, Reqs, fabric_util:abs_request_timeout()); + Error -> + couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE, DbName, Error]), + Error + end. + +% erpc endpoint for go/1 and fabric_index_cleanup:cleanup_indexes/2 +% +go_local(DbName, Dbs, Sigs) -> + try + lists:foreach( + fun(Db) -> + Sz = byte_size(DbName), + <<"shards/", Range:17/binary, "/", DbName:Sz/binary, ".", _/binary>> = Db, + Checkpoints = nouveau_util:get_purge_checkpoints(Db), + ok = couch_index_util:cleanup_purges(Db, Sigs, Checkpoints), + Path = <<"shards/", Range/binary, "/", DbName/binary, ".*/*">>, + nouveau_api:delete_path(nouveau_util:index_name(Path), maps:keys(Sigs)) + end, + Dbs + ) + catch + error:database_does_not_exist -> + ok + end. -active_sigs(DbName, #doc{} = Doc) -> - Indexes = nouveau_util:design_doc_to_indexes(DbName, Doc), - lists:map(fun(Index) -> Index#index.sig end, Indexes). +recv(DbName, Reqs, Timeout) -> + case erpc:receive_response(Reqs, Timeout, true) of + {ok, _Label, Reqs1} -> + recv(DbName, Reqs1, Timeout); + {Error, Label, Reqs1} -> + ErrMsg = "~p : error cleaning nouveau indexes db:~p node: ~p error:~p", + couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]), + recv(DbName, Reqs1, Timeout); + no_request -> + ok + end. diff --git a/src/nouveau/src/nouveau_rpc.erl b/src/nouveau/src/nouveau_rpc.erl index f7ab5a4332a..2037c7e7efd 100644 --- a/src/nouveau/src/nouveau_rpc.erl +++ b/src/nouveau/src/nouveau_rpc.erl @@ -17,8 +17,7 @@ -export([ search/3, - info/2, - cleanup/2 + info/2 ]). -include("nouveau.hrl"). @@ -88,7 +87,3 @@ info(DbName, #index{} = Index0) -> {error, Reason} -> rexi:reply({error, Reason}) end. - -cleanup(Path, Exclusions) -> - nouveau_api:delete_path(nouveau_util:index_name(Path), Exclusions), - rexi:reply(ok). diff --git a/src/nouveau/src/nouveau_util.erl b/src/nouveau/src/nouveau_util.erl index 0dfcb1e1e0b..3df43f2ffcf 100644 --- a/src/nouveau/src/nouveau_util.erl +++ b/src/nouveau/src/nouveau_util.erl @@ -27,6 +27,8 @@ maybe_create_local_purge_doc/2, get_local_purge_doc_id/1, get_local_purge_doc_body/3, + get_purge_checkpoints/1, + get_signatures_from_ddocs/2, nouveau_url/0 ]). @@ -174,6 +176,9 @@ maybe_create_local_purge_doc(Db, Index) -> get_local_purge_doc_id(Sig) -> iolist_to_binary([?LOCAL_DOC_PREFIX, "purge-", "nouveau-", Sig]). +get_purge_checkpoints(Db) -> + couch_index_util:get_purge_checkpoints(Db, <<"nouveau">>). + get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) -> #index{ name = IdxName, @@ -193,5 +198,13 @@ get_local_purge_doc_body(LocalDocId, PurgeSeq, Index) -> ]}, couch_doc:from_json_obj(JsonList). +get_signatures_from_ddocs(DbName, DesignDocs) -> + SigList = lists:flatmap(fun(Doc) -> active_sigs(DbName, Doc) end, DesignDocs), + #{Sig => true || Sig <- SigList}. + +active_sigs(DbName, #doc{} = Doc) -> + Indexes = nouveau_util:design_doc_to_indexes(DbName, Doc), + lists:map(fun(Index) -> Index#index.sig end, Indexes). + nouveau_url() -> config:get("nouveau", "url", "http://127.0.0.1:5987"). diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl index 74e62594469..eabb751adbc 100644 --- a/src/smoosh/src/smoosh_channel.erl +++ b/src/smoosh/src/smoosh_channel.erl @@ -451,7 +451,7 @@ re_enqueue(Obj) -> cleanup_index_files(DbName) -> case should_clean_up_indices() of - true -> fabric:cleanup_index_files(DbName); + true -> fabric:cleanup_index_files_this_node(DbName); false -> ok end. diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl index 96caa28f033..5170248753d 100644 --- a/src/smoosh/test/smoosh_tests.erl +++ b/src/smoosh/test/smoosh_tests.erl @@ -155,7 +155,7 @@ t_index_cleanup_happens_by_default(DbName) -> get_channel_pid("index_cleanup") ! unpause, {ok, _} = fabric:query_view(DbName, <<"foo">>, <<"bar">>), % View cleanup should have been invoked - meck:wait(fabric, cleanup_index_files, [DbName], 4000), + meck:wait(fabric, cleanup_index_files_this_node, [DbName], 4000), wait_view_compacted(DbName, <<"foo">>). t_index_cleanup_can_be_disabled(DbName) ->