diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index a50fcd6700f..d126e30cc79 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -273,7 +273,8 @@ query_all_docs(Db, Args0, Callback, Acc) -> all_docs_fold(Db, Args2, Callback, Acc1). query_view(Db, DDoc, VName) -> - query_view(Db, DDoc, VName, #mrargs{}). + Args = #mrargs{extra = [{view_row_map, true}]}, + query_view(Db, DDoc, VName, Args). query_view(Db, DDoc, VName, Args) when is_list(Args) -> query_view(Db, DDoc, VName, to_mrargs(Args), fun default_cb/2, []); @@ -325,7 +326,7 @@ get_view_info(Db, DDoc, VName) -> Db, DDoc, VName, - #mrargs{} + #mrargs{extra = [{view_row_map, true}]} ), %% get the total number of rows @@ -763,7 +764,7 @@ to_mrargs(KeyList) -> Index = lookup_index(couch_util:to_existing_atom(Key)), setelement(Index, Acc, Value) end, - #mrargs{}, + #mrargs{extra = [{view_row_map, true}]}, KeyList ). diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl index 522884eda0c..23d8c738eef 100644 --- a/src/couch_mrview/src/couch_mrview_http.erl +++ b/src/couch_mrview/src/couch_mrview_http.erl @@ -472,7 +472,7 @@ row_to_json(Id0, Row) -> parse_params(#httpd{} = Req, Keys) -> parse_params(chttpd:qs(Req), Keys); parse_params(Props, Keys) -> - Args = #mrargs{}, + Args = #mrargs{extra = [{view_row_map, true}]}, parse_params(Props, Keys, Args). parse_params(Props, Keys, Args) -> @@ -511,13 +511,16 @@ parse_body_and_query(Req, Keys) -> #mrargs{ keys = Keys, group = undefined, - group_level = undefined + group_level = undefined, + extra = [{view_row_map, true}] }, [keep_group_level] ). parse_body_and_query(Req, {Props}, Keys) -> - Args = #mrargs{keys = Keys, group = undefined, group_level = undefined}, + Args = #mrargs{ + keys = Keys, group = undefined, group_level = undefined, extra = [{view_row_map, true}] + }, BodyArgs0 = parse_params(Props, Keys, Args, [decoded]), BodyArgs1 = case is_view(Req) of diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl index 6e5ebfc25a1..9a069fac99c 100644 --- a/src/couch_replicator/src/couch_replicator_fabric.erl +++ b/src/couch_replicator/src/couch_replicator_fabric.erl @@ -81,6 +81,24 @@ docs_int(DbName, Workers, QueryArgs, Callback, Acc0) -> {ok, Resp} end. +handle_row(Row0, {Worker, From} = Source, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + Id = fabric_view_row:get_id(Row0), + Doc = fabric_view_row:get_doc(Row0), + case maybe_fetch_and_filter_doc(Id, Doc, State) of + {[_ | _]} = NewDoc -> + Row1 = fabric_view_row:set_doc(Row0, NewDoc), + Row = fabric_view_row:set_worker(Row1, Source), + Dir = Args#mrargs.direction, + Rows = merge_row(Dir, Row, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = Counters1}, + fabric_view:maybe_send_row(State1); + skip -> + rexi:stream_ack(From), + {ok, State} + end. + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -120,32 +138,31 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> user_acc = Acc }} end; -handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - case maybe_fetch_and_filter_doc(Id, Doc, State) of - {[_ | _]} = NewDoc -> - Row = Row0#view_row{doc = NewDoc}, - Dir = Args#mrargs.direction, - Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = Counters1}, - fabric_view:maybe_send_row(State1); - skip -> - rexi:stream_ack(From), - {ok, State} - end; +handle_message(#view_row{} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); +handle_message({view_row, #{}} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}). -merge_row(fwd, Row, Rows) -> - lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). +merge_row(Dir, Row, Rows) -> + lists:merge( + fun(RowA, RowB) -> + IdA = fabric_view_row:get_id(RowA), + IdB = fabric_view_row:get_id(RowB), + case Dir of + fwd -> IdA < IdB; + rev -> IdA > IdB + end + end, + [Row], + Rows + ). maybe_fetch_and_filter_doc(Id, undecided, State) -> - #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State, - FilterStates = proplists:get_value(filter_states, Extra), + #collector{db_name = DbName, query_args = #mrargs{extra = Options}} = State, + FilterStates = couch_util:get_value(filter_states, Options), case couch_replicator:active_doc(DbName, Id) of {ok, {Props} = DocInfo} -> DocState = couch_util:get_value(state, Props), @@ -156,3 +173,278 @@ maybe_fetch_and_filter_doc(Id, undecided, State) -> end; maybe_fetch_and_filter_doc(_Id, Doc, _State) -> Doc. + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +handle_message_test_() -> + { + foreach, + fun() -> + meck:new(foo, [non_strict]), + meck:new(fabric_view) + end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_handle_message_rexi_down), + ?TDEF_FE(t_handle_message_rexi_exit), + ?TDEF_FE(t_handle_message_meta_zero), + ?TDEF_FE(t_handle_message_meta), + ?TDEF_FE(t_handle_message_row_skip), + ?TDEF_FE(t_handle_message_row), + ?TDEF_FE(t_handle_message_complete) + ] + }. + +t_handle_message_rexi_down(_) -> + Message = {rexi_DOWN, undefined, {undefined, node}, undefined}, + meck:expect(fabric_view, check_down_shards, [state, node], meck:val(fabric_view_result)), + ?assertEqual(fabric_view_result, handle_message(Message, source, state)). + +t_handle_message_rexi_exit(_) -> + Message = {rexi_EXIT, reason}, + meck:expect( + fabric_view, handle_worker_exit, [state, source, reason], meck:val(fabric_view_result) + ), + ?assertEqual(fabric_view_result, handle_message(Message, source, state)). + +t_handle_message_meta_zero(_) -> + Meta = [{total, 3}, {offset, 2}, {update_seq, 1}], + Worker = {worker1, from}, + Counters1 = [{worker1, 0}, {worker2, 0}], + Counters2 = [{worker1, 1}, {worker2, 0}], + State1 = #collector{counters = Counters1, total_rows = 0, update_seq = nil, offset = 0}, + State2 = #collector{counters = Counters2, total_rows = 3, update_seq = nil, offset = 2}, + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({ok, State2}, handle_message({meta, Meta}, Worker, State1)). + +t_handle_message_meta(_) -> + Meta1 = [{total, 10}, {offset, 2}], + Meta2 = [{total, 10}, {offset, 5}], + Meta3 = [{total, 10}, {offset, 5}], + Worker = {worker1, from}, + Counters1 = [{worker1, 0}, {worker2, 3}, {worker3, 5}], + Counters2 = [{worker1, 0}, {worker2, 2}, {worker3, 4}], + State1 = #collector{ + counters = Counters1, + total_rows = 0, + update_seq = [], + offset = 0, + skip = 3, + callback = fun foo:bar/2, + user_acc = accumulator1 + }, + State2 = #collector{ + counters = Counters1, + total_rows = 0, + update_seq = nil, + offset = 0, + skip = 3, + callback = fun foo:bar/2, + user_acc = accumulator2 + }, + State3 = #collector{ + counters = Counters2, + total_rows = 10, + update_seq = [], + offset = 5, + skip = 3, + callback = fun foo:bar/2, + user_acc = updated_accumulator1 + }, + State4 = #collector{ + counters = Counters2, + total_rows = 10, + update_seq = nil, + offset = 5, + skip = 3, + callback = fun foo:bar/2, + user_acc = updated_accumulator2 + }, + meck:expect( + foo, + bar, + [ + {[{meta, Meta2}, accumulator1], meck:val({go1, updated_accumulator1})}, + {[{meta, Meta3}, accumulator2], meck:val({go2, updated_accumulator2})} + ] + ), + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({go1, State3}, handle_message({meta, Meta1}, Worker, State1)), + ?assertEqual({go2, State4}, handle_message({meta, Meta1}, Worker, State2)). + +t_handle_message_row_skip(_) -> + State = #collector{db_name = db, query_args = #mrargs{}}, + Worker = {worker, from}, + Row1 = #view_row{id = id, doc = undecided}, + Row2 = {view_row, #{id => id, doc => undecided}}, + meck:expect(couch_replicator, active_doc, [db, id], meck:val({error, not_found})), + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({ok, State}, handle_message(Row1, Worker, State)), + ?assertEqual({ok, State}, handle_message(Row2, Worker, State)). + +t_handle_message_row(_) -> + QueryArgs = #mrargs{direction = fwd, extra = [{filter_states, []}]}, + Counters1 = [{worker, 4}], + Counters2 = [{worker, 5}], + Worker = {worker, from}, + Doc = {[{<<"_id">>, doc_id}, {<<"_rev">>, doc_rev}]}, + Row1 = #view_row{id = id, doc = Doc}, + Row11 = #view_row{id = id, doc = Doc, worker = Worker}, + Row2 = {view_row, #{id => id, doc => undecided}}, + Row21 = {view_row, #{id => id, doc => Doc, worker => Worker}}, + Rows1 = [], + Rows2 = [], + Rows3 = [Row11], + Rows4 = [Row21], + State1 = #collector{db_name = db, query_args = QueryArgs, counters = Counters1, rows = Rows1}, + State2 = #collector{db_name = db, query_args = QueryArgs, counters = Counters1, rows = Rows2}, + State3 = #collector{db_name = db, query_args = QueryArgs, counters = Counters2, rows = Rows3}, + State4 = #collector{db_name = db, query_args = QueryArgs, counters = Counters2, rows = Rows4}, + meck:expect(couch_replicator, active_doc, [db, id], meck:val({ok, Doc})), + meck:expect(fabric_view, maybe_send_row, [ + {[State3], meck:val(next_row1)}, {[State4], meck:val(next_row2)} + ]), + ?assertEqual(next_row1, handle_message(Row1, Worker, State1)), + ?assertEqual(next_row2, handle_message(Row2, Worker, State2)). + +t_handle_message_complete(_) -> + Worker = worker, + Counters1 = [{Worker, 6}], + Counters2 = [{Worker, 7}], + State1 = #collector{counters = Counters1}, + State2 = #collector{counters = Counters2}, + meck:expect(fabric_view, maybe_send_row, [State2], meck:val(maybe_row)), + ?assertEqual(maybe_row, handle_message(complete, Worker, State1)). + +merge_row_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_merge_row_record_fwd), + ?TDEF_FE(t_merge_row_record_rev), + ?TDEF_FE(t_merge_row_map_fwd), + ?TDEF_FE(t_merge_row_map_rev), + ?TDEF_FE(t_merge_row_mixed_fwd), + ?TDEF_FE(t_merge_row_mixed_rev) + ] + }. + +t_merge_row_record_fwd(_) -> + RowX1 = #view_row{id = 4}, + Row1 = #view_row{id = 1}, + Row2 = #view_row{id = 3}, + Row3 = #view_row{id = 5}, + Row4 = #view_row{id = 7}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = #view_row{id = 0}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = #view_row{id = 8}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = #view_row{id = 5}, + Expected4 = [Row1, Row2, RowX4, Row3, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +t_merge_row_record_rev(_) -> + RowX1 = #view_row{id = 5}, + Row1 = #view_row{id = 2}, + Row2 = #view_row{id = 4}, + Row3 = #view_row{id = 6}, + Row4 = #view_row{id = 8}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = #view_row{id = 1}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = #view_row{id = 9}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = #view_row{id = 6}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). + +t_merge_row_map_fwd(_) -> + RowX1 = {view_row, #{id => 4}}, + Row1 = {view_row, #{id => 1}}, + Row2 = {view_row, #{id => 3}}, + Row3 = {view_row, #{id => 5}}, + Row4 = {view_row, #{id => 7}}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = {view_row, #{id => 0}}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = {view_row, #{id => 8}}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = {view_row, #{id => 5}}, + Expected4 = [Row1, Row2, RowX4, Row3, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +t_merge_row_map_rev(_) -> + RowX1 = {view_row, #{id => 5}}, + Row1 = {view_row, #{id => 2}}, + Row2 = {view_row, #{id => 4}}, + Row3 = {view_row, #{id => 6}}, + Row4 = {view_row, #{id => 8}}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = {view_row, #{id => 1}}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = {view_row, #{id => 9}}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = {view_row, #{id => 6}}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). + +t_merge_row_mixed_fwd(_) -> + RowX1 = #view_row{id = 4}, + Row1 = {view_row, #{id => 1}}, + Row2 = {view_row, #{id => 3}}, + Row3 = #view_row{id = 5}, + Row4 = {view_row, #{id => 7}}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = {view_row, #{id => 0}}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = {view_row, #{id => 8}}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = {view_row, #{id => 5}}, + Expected4 = [Row1, Row2, Row3, RowX4, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +t_merge_row_mixed_rev(_) -> + RowX1 = {view_row, #{id => 5}}, + Row1 = #view_row{id = 2}, + Row2 = #view_row{id = 4}, + Row3 = {view_row, #{id => 6}}, + Row4 = #view_row{id = 8}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = #view_row{id = 1}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = #view_row{id = 9}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = #view_row{id = 6}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl index daeb86e6058..f6b9b408099 100644 --- a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl +++ b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl @@ -23,29 +23,26 @@ docs(DbName, Options, Args0) -> set_io_priority(DbName, Options), #mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0, - FilterStates = proplists:get_value(filter_states, Extra), Args = Args0#mrargs{skip = 0, limit = Skip + Limit}, HealthThreshold = couch_replicator_scheduler:health_threshold(), {ok, Db} = couch_db:open_int(DbName, Options), - Acc = {DbName, FilterStates, HealthThreshold}, + Acc = {DbName, HealthThreshold, Extra}, couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc). docs_cb({meta, Meta}, Acc) -> ok = rexi:stream2({meta, Meta}), {ok, Acc}; -docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) -> - Id = couch_util:get_value(id, Row), - Doc = couch_util:get_value(doc, Row), - ViewRow = #view_row{ - id = Id, - key = couch_util:get_value(key, Row), - value = couch_util:get_value(value, Row) - }, +docs_cb({row, Props}, {DbName, HealthThreshold, Options} = Acc) -> + States = couch_util:get_value(filter_states, Options), + Id = couch_util:get_value(id, Props), + Doc = couch_util:get_value(doc, Props), case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of skip -> ok; Other -> - ok = rexi:stream2(ViewRow#view_row{doc = Other}) + ViewRow0 = fabric_view_row:from_props(Props, Options), + ViewRow = fabric_view_row:set_doc(ViewRow0, Other), + ok = rexi:stream2(ViewRow) end, {ok, Acc}; docs_cb(complete, Acc) -> @@ -95,3 +92,112 @@ rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) -> get_doc_state({Props}) -> couch_util:get_value(state, Props). + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +docs_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_docs) + ] + }. + +t_docs(_) -> + Options1 = [], + Options2 = [{io_priority, priority}], + QueryArgs1 = #mrargs{skip = 3, limit = 7, extra = extra}, + QueryArgs2 = #mrargs{skip = 0, limit = 10, extra = extra}, + Accumulator = {db_name, health_threshold, extra}, + meck:expect(couch_replicator_scheduler, health_threshold, [], meck:val(health_threshold)), + meck:expect(couch_db, open_int, [db_name, '_'], meck:val({ok, db})), + meck:expect( + couch_mrview, + query_all_docs, + [db, QueryArgs2, '_', Accumulator], + meck:val(all_docs) + ), + ?assertEqual(all_docs, docs(db_name, Options1, QueryArgs1)), + IoPrio1 = get(io_priority), + ?assertEqual({interactive, db_name}, IoPrio1), + ?assertEqual(all_docs, docs(db_name, Options2, QueryArgs1)), + IoPrio2 = get(io_priority), + ?assertEqual(priority, IoPrio2). + +docs_cb_test_() -> + { + foreach, + fun() -> + meck:new(mem3), + meck:new(rexi) + end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_docs_cb_meta), + ?TDEF_FE(t_docs_cb_row_skip), + ?TDEF_FE(t_docs_cb_row), + ?TDEF_FE(t_docs_cb_complete) + ] + }. + +t_docs_cb_meta(_) -> + Meta = {meta, meta}, + meck:expect(rexi, stream2, [Meta], meck:val(ok)), + ?assertEqual({ok, accumulator}, docs_cb(Meta, accumulator)). + +t_docs_cb_row_skip(_) -> + Accumulator1 = {db_name, health_threshold, []}, + Accumulator2 = {db_name, health_threshold, [{view_row_map, true}]}, + Row = {row, [{id, <<"_design/ddoc">>}, {doc, doc}]}, + meck:reset(rexi), + meck:expect(rexi, stream2, ['_'], undefined), + ?assertEqual({ok, Accumulator1}, docs_cb(Row, Accumulator1)), + ?assertNot(meck:called(rexi, stream2, '_')), + meck:reset(rexi), + meck:expect(rexi, stream2, ['_'], undefined), + ?assertEqual({ok, Accumulator2}, docs_cb(Row, Accumulator2)), + ?assertNot(meck:called(rexi, stream2, '_')). + +t_docs_cb_row(_) -> + Accumulator1 = {db_name, health_threshold, [{filter_states, []}]}, + Accumulator2 = {db_name, health_threshold, [{filter_states, []}, {view_row_map, true}]}, + Doc = {[{<<"_id">>, id}, {<<"_rev">>, rev}]}, + DocInfo1 = {[{state, other}]}, + DocInfo2 = {[{state, null}]}, + EtsInfo = {[{state, other}]}, + Row = {row, [{id, id}, {doc, Doc}]}, + ViewRow1 = #view_row{id = id, doc = DocInfo1}, + ViewRow2 = {view_row, #{id => id, doc => EtsInfo}}, + ViewRow3 = {view_row, #{id => id, doc => undecided}}, + meck:expect(mem3, dbname, [db_name], meck:val(mem3_db_name)), + meck:expect(couch_replicator, info_from_doc, [mem3_db_name, Doc], meck:val(DocInfo1)), + meck:expect(rexi, stream2, [ViewRow1], meck:val(ok)), + ?assertEqual({ok, Accumulator1}, docs_cb(Row, Accumulator1)), + meck:expect(couch_replicator, info_from_doc, [mem3_db_name, Doc], meck:val(DocInfo2)), + meck:expect( + couch_replicator_doc_processor, + doc_lookup, + [db_name, id, health_threshold], + meck:val({ok, EtsInfo}) + ), + meck:expect(rexi, stream2, [ViewRow2], meck:val(ok)), + ?assertEqual({ok, Accumulator2}, docs_cb(Row, Accumulator2)), + meck:expect(couch_replicator, info_from_doc, [mem3_db_name, Doc], meck:val(DocInfo2)), + meck:expect( + couch_replicator_doc_processor, + doc_lookup, + [db_name, id, health_threshold], + meck:val({error, not_found}) + ), + meck:expect(rexi, stream2, [ViewRow3], meck:val(ok)), + ?assertEqual({ok, Accumulator2}, docs_cb(Row, Accumulator2)). + +t_docs_cb_complete(_) -> + meck:expect(rexi, stream_last, [complete], meck:val(ok)), + ?assertEqual({ok, accumulator}, docs_cb(complete, accumulator)). + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl index 77c78efe2ab..56f1cdeab37 100644 --- a/src/couch_replicator/src/couch_replicator_httpd.erl +++ b/src/couch_replicator/src/couch_replicator_httpd.erl @@ -135,11 +135,12 @@ handle_scheduler_docs(Db, Req) when is_binary(Db) -> VArgs0 = couch_mrview_http:parse_params(Req, undefined), StatesQs = chttpd:qs_value(Req, "states"), States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs), + Extra = VArgs0#mrargs.extra, VArgs1 = VArgs0#mrargs{ view_type = map, include_docs = true, reduce = false, - extra = [{filter_states, States}] + extra = [{filter_states, States} | Extra] }, VArgs2 = couch_mrview_util:validate_args(VArgs1), Opts = [{user_ctx, Req#httpd.user_ctx}], diff --git a/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl index 76450a6924d..e04518b6722 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl @@ -131,17 +131,22 @@ t_scheduler_docs_total_rows({_Ctx, {RepDb, Source, Target}}) -> Body = test_util:wait( fun() -> case req(get, SchedulerDocsUrl) of - {200, #{<<"docs">> := [_ | _]} = Decoded} -> Decoded; - {_, #{}} -> wait + {200, #{<<"docs">> := [_ | _]} = Decoded} -> + Decoded; + {_, #{<<"error">> := Error, <<"reason">> := Reason}} -> + ?debugVal(Error, 100), + ?debugVal(binary_to_list(Reason), 100); + {_, #{}} -> + wait end end, 14000, 1000 ), + ?assertNotEqual(Body, timeout), Docs = maps:get(<<"docs">>, Body), TotalRows = maps:get(<<"total_rows">>, Body), - ?assertEqual(TotalRows, length(Docs)), - ok. + ?assertEqual(TotalRows, length(Docs)). t_local_docs_can_be_written({_Ctx, {RepDb, _, _}}) -> DocUrl1 = rep_doc_url(RepDb, <<"_local/doc1">>), diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl index 0c6ac5df83a..19eb6751c46 100644 --- a/src/fabric/include/fabric.hrl +++ b/src/fabric/include/fabric.hrl @@ -42,3 +42,17 @@ -record(view_row, {key, id, value, doc, worker}). -record(change, {key, id, value, deleted=false, doc, worker}). + +-type row_property_key() :: id | key | value | doc | worker. +-type row_properties() :: [{row_property_key(), any()}]. + +-type view_row_map() :: + #{ + id => term(), + key => term(), + value => term(), + doc => term(), + worker => term() + }. + +-type view_row() :: #view_row{} | {view_row, view_row_map()}. diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index c36c8f4df03..aa578ee8330 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -442,7 +442,8 @@ changes(DbName, Callback, Acc0, Options) -> %% @equiv query_view(DbName, DesignName, ViewName, #mrargs{}) query_view(DbName, DesignName, ViewName) -> - query_view(DbName, DesignName, ViewName, #mrargs{}). + QueryArgs = #mrargs{extra = [{view_row_map, true}]}, + query_view(DbName, DesignName, ViewName, QueryArgs). %% @equiv query_view(DbName, DesignName, %% ViewName, fun default_callback/2, [], QueryArgs) @@ -545,10 +546,11 @@ end_changes() -> %% @doc retrieve all the design docs from a database -spec design_docs(dbname()) -> {ok, [json_obj()]} | {error, Reason :: term()}. design_docs(DbName) -> + Extra0 = [{view_row_map, true}], Extra = case get(io_priority) of - undefined -> []; - Else -> [{io_priority, Else}] + undefined -> Extra0; + Else -> [{io_priority, Else} | Extra0] end, QueryArgs0 = #mrargs{ include_docs = true, diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index dab69ef4d2c..6209045f7a4 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -187,7 +187,8 @@ reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) -> Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), VAcc0 = #vacc{db = Db}, - couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). + Callback = fun(Msg, Acc) -> reduce_cb(Msg, Acc, Args#mrargs.extra) end, + couch_mrview:query_view(Db, DDoc, ViewName, Args, Callback, VAcc0). create_db(DbName) -> create_db(DbName, []). @@ -491,14 +492,9 @@ view_cb({meta, Meta}, Acc) -> % Map function starting ok = rexi:stream2({meta, Meta}), {ok, Acc}; -view_cb({row, Row}, Acc) -> +view_cb({row, Props}, #mrargs{extra = Options} = Acc) -> % Adding another row - ViewRow = #view_row{ - id = couch_util:get_value(id, Row), - key = couch_util:get_value(key, Row), - value = couch_util:get_value(value, Row), - doc = couch_util:get_value(doc, Row) - }, + ViewRow = fabric_view_row:from_props(Props, Options), ok = rexi:stream2(ViewRow), {ok, Acc}; view_cb(complete, Acc) -> @@ -510,24 +506,22 @@ view_cb(ok, ddoc_updated) -> view_cb(ok, insufficient_storage) -> rexi:reply({ok, insufficient_storage}). -reduce_cb({meta, Meta}, Acc) -> +reduce_cb({meta, Meta}, Acc, _Options) -> % Map function starting ok = rexi:stream2({meta, Meta}), {ok, Acc}; -reduce_cb({row, Row}, Acc) -> +reduce_cb({row, Props}, Acc, Options) -> % Adding another row - ok = rexi:stream2(#view_row{ - key = couch_util:get_value(key, Row), - value = couch_util:get_value(value, Row) - }), + ViewRow = fabric_view_row:from_props(Props, Options), + ok = rexi:stream2(ViewRow), {ok, Acc}; -reduce_cb(complete, Acc) -> +reduce_cb(complete, Acc, _Options) -> % Finish view output ok = rexi:stream_last(complete), {ok, Acc}; -reduce_cb(ok, ddoc_updated) -> +reduce_cb(ok, ddoc_updated, _Options) -> rexi:reply({ok, ddoc_updated}); -reduce_cb(ok, insufficient_storage) -> +reduce_cb(ok, insufficient_storage, _Options) -> rexi:reply({ok, insufficient_storage}). changes_enumerator(#full_doc_info{} = FDI, Acc) -> @@ -712,7 +706,7 @@ uuid(Db) -> binary:part(Uuid, {0, Prefix}). -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). maybe_filtered_json_doc_no_filter_test() -> Body = {[{<<"a">>, 1}]}, @@ -729,4 +723,94 @@ maybe_filtered_json_doc_with_filter_test() -> {JDocProps} = maybe_filtered_json_doc(Doc, [], Filter), ?assertEqual(JDocProps, [{<<"a">>, 1}]). +view_cb_test_() -> + { + foreach, + fun() -> meck:new(rexi) end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_view_cb_meta), + ?TDEF_FE(t_view_cb_row_record), + ?TDEF_FE(t_view_cb_row_map), + ?TDEF_FE(t_view_cb_complete), + ?TDEF_FE(t_view_cb_ddoc_updated), + ?TDEF_FE(t_view_cb_insufficient_storage) + ] + }. + +t_view_cb_meta(_) -> + meck:expect(rexi, stream2, [{meta, meta}], meck:val(ok)), + ?assertEqual({ok, acc}, view_cb({meta, meta}, acc)). + +t_view_cb_row_record(_) -> + Acc = #mrargs{}, + Props = [{id, id}, {key, key}, {value, value}], + ViewRow = #view_row{id = id, key = key, value = value}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), + ?assertEqual({ok, Acc}, view_cb({row, Props}, Acc)). + +t_view_cb_row_map(_) -> + Acc = #mrargs{extra = [{view_row_map, true}]}, + Props = [{id, id}, {key, key}, {value, value}], + ViewRow = {view_row, #{id => id, key => key, value => value}}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), + ?assertEqual({ok, Acc}, view_cb({row, Props}, Acc)). + +t_view_cb_complete(_) -> + meck:expect(rexi, stream_last, [complete], meck:val(ok)), + ?assertEqual({ok, acc}, view_cb(complete, acc)). + +t_view_cb_ddoc_updated(_) -> + meck:expect(rexi, reply, [{ok, ddoc_updated}], meck:val(ok)), + ?assertEqual(ok, view_cb(ok, ddoc_updated)). + +t_view_cb_insufficient_storage(_) -> + meck:expect(rexi, reply, [{ok, insufficient_storage}], meck:val(ok)), + ?assertEqual(ok, view_cb(ok, insufficient_storage)). + +reduce_cb_test_() -> + { + foreach, + fun() -> meck:new(rexi) end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_reduce_cb_meta), + ?TDEF_FE(t_reduce_cb_row_record), + ?TDEF_FE(t_reduce_cb_row_map), + ?TDEF_FE(t_reduce_cb_complete), + ?TDEF_FE(t_reduce_cb_ddoc_updated), + ?TDEF_FE(t_reduce_cb_insufficient_storage) + ] + }. + +t_reduce_cb_meta(_) -> + meck:expect(rexi, stream2, [{meta, meta}], meck:val(ok)), + ?assertEqual({ok, acc}, reduce_cb({meta, meta}, acc, options)). + +t_reduce_cb_row_record(_) -> + Options = [], + Props = [{id, id}, {key, key}, {value, value}], + ViewRow = #view_row{id = id, key = key, value = value}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), + ?assertEqual({ok, acc}, reduce_cb({row, Props}, acc, Options)). + +t_reduce_cb_row_map(_) -> + Options = [{view_row_map, true}], + Props = [{id, id}, {key, key}, {value, value}], + ViewRow = {view_row, #{id => id, key => key, value => value}}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), + ?assertEqual({ok, acc}, reduce_cb({row, Props}, acc, Options)). + +t_reduce_cb_complete(_) -> + meck:expect(rexi, stream_last, [complete], meck:val(ok)), + ?assertEqual({ok, acc}, reduce_cb(complete, acc, options)). + +t_reduce_cb_ddoc_updated(_) -> + meck:expect(rexi, reply, [{ok, ddoc_updated}], meck:val(ok)), + ?assertEqual(ok, reduce_cb(ok, ddoc_updated, options)). + +t_reduce_cb_insufficient_storage(_) -> + meck:expect(rexi, reply, [{ok, insufficient_storage}], meck:val(ok)), + ?assertEqual(ok, reduce_cb(ok, insufficient_storage, options)). + -endif. diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 87f464b365d..f84e9e301ff 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -15,7 +15,6 @@ -export([ remove_overlapping_shards/2, maybe_send_row/1, - transform_row/1, keydict/1, extract_view/4, get_shards/2, @@ -164,7 +163,7 @@ maybe_send_row(State) -> {Row0, NewState} -> Row1 = possibly_embed_doc(NewState, Row0), Row2 = detach_partition(Row1), - Row3 = transform_row(Row2), + Row3 = fabric_view_row:transform(Row2), case Callback(Row3, AccIn) of {stop, Acc} -> {stop, NewState#collector{user_acc = Acc, limit = Limit - 1}}; @@ -182,31 +181,23 @@ maybe_send_row(State) -> %% if include_docs=true is used when keys and %% the values contain "_id" then use the "_id"s %% to retrieve documents and embed in result -possibly_embed_doc( - _State, - #view_row{id = reduced} = Row -) -> - Row; -possibly_embed_doc( - _State, - #view_row{value = undefined} = Row -) -> - Row; possibly_embed_doc( #collector{db_name = DbName, query_args = Args}, - #view_row{key = _Key, id = _Id, value = Value, doc = _Doc} = Row + Row ) -> + IsReduced = fabric_view_row:get_id(Row) == reduced, #mrargs{include_docs = IncludeDocs} = Args, - case IncludeDocs andalso is_tuple(Value) of + Value = fabric_view_row:get_value(Row), + case (not IsReduced) andalso IncludeDocs andalso is_tuple(Value) of true -> {Props} = Value, - Rev0 = couch_util:get_value(<<"_rev">>, Props), case couch_util:get_value(<<"_id">>, Props) of null -> - Row#view_row{doc = null}; + fabric_view_row:set_doc(Row, null); undefined -> Row; IncId -> + Rev0 = couch_util:get_value(<<"_rev">>, Props), % use separate process to call fabric:open_doc % to not interfere with current call {Pid, Ref} = spawn_monitor(fun() -> @@ -215,21 +206,25 @@ possibly_embed_doc( undefined -> case fabric:open_doc(DbName, IncId, []) of {ok, NewDoc} -> - Row#view_row{doc = couch_doc:to_json_obj(NewDoc, [])}; + fabric_view_row:set_doc( + Row, couch_doc:to_json_obj(NewDoc, []) + ); {not_found, _} -> - Row#view_row{doc = null}; + fabric_view_row:set_doc(Row, null); Else -> - Row#view_row{doc = {error, Else}} + fabric_view_row:set_doc(Row, {error, Else}) end; Rev0 -> Rev = couch_doc:parse_rev(Rev0), case fabric:open_revs(DbName, IncId, [Rev], []) of {ok, [{ok, NewDoc}]} -> - Row#view_row{doc = couch_doc:to_json_obj(NewDoc, [])}; + fabric_view_row:set_doc( + Row, couch_doc:to_json_obj(NewDoc, []) + ); {ok, [{{not_found, _}, Rev}]} -> - Row#view_row{doc = null}; + fabric_view_row:set_doc(Row, null); Else -> - Row#view_row{doc = {error, Else}} + fabric_view_row:set_doc(Row, {error, Else}) end end ) @@ -239,14 +234,15 @@ possibly_embed_doc( Resp end end; - _ -> + false -> Row end. -detach_partition(#view_row{key = {p, _Partition, Key}} = Row) -> - Row#view_row{key = Key}; -detach_partition(#view_row{} = Row) -> - Row. +detach_partition(Row) -> + case fabric_view_row:get_key(Row) of + {p, _Partition, Key} -> fabric_view_row:set_key(Row, Key); + _Key -> Row + end. keydict(undefined) -> undefined; @@ -262,20 +258,21 @@ keydict(Keys) -> get_next_row(#collector{rows = []}) -> throw(complete); -get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> +get_next_row(#collector{reducer = RedSrc} = State0) when RedSrc =/= undefined -> #collector{ - query_args = #mrargs{direction = Dir}, + query_args = #mrargs{direction = Dir, extra = Options}, keys = Keys, - rows = RowDict, + rows = RowDict0, lang = Lang, counters = Counters0, collation = Collation - } = St, - {Key, RestKeys} = find_next_key(Keys, Dir, Collation, RowDict), - case reduce_row_dict_take(Key, RowDict, Collation) of - {Records, NewRowDict} -> + } = State0, + {Key, RestKeys} = find_next_key(Keys, Dir, Collation, RowDict0), + case reduce_row_dict_take(Key, RowDict0, Collation) of + {Records, RowDict} -> Counters = lists:foldl( - fun(#view_row{worker = {Worker, From}}, CntrsAcc) -> + fun(Row, CntrsAcc) -> + {Worker, From} = fabric_view_row:get_worker(Row), case From of {Pid, _} when is_pid(Pid) -> gen_server:reply(From, ok); @@ -287,17 +284,20 @@ get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> Counters0, Records ), - Wrapped = [[V] || #view_row{value = V} <- Records], + Wrapped = [[fabric_view_row:get_value(R)] || R <- Records], {ok, [Reduced]} = couch_query_servers:rereduce(Lang, [RedSrc], Wrapped), {ok, Finalized} = couch_query_servers:finalize(RedSrc, Reduced), - NewSt = St#collector{keys = RestKeys, rows = NewRowDict, counters = Counters}, - {#view_row{key = Key, id = reduced, value = Finalized}, NewSt}; + State = State0#collector{keys = RestKeys, rows = RowDict, counters = Counters}, + ViewRow = fabric_view_row:from_props( + [{key, Key}, {id, reduced}, {value, Finalized}], Options + ), + {ViewRow, State}; error -> - get_next_row(St#collector{keys = RestKeys}) + get_next_row(State0#collector{keys = RestKeys}) end; get_next_row(State) -> #collector{rows = [Row | Rest], counters = Counters0} = State, - {Worker, From} = Row#view_row.worker, + {Worker, From} = fabric_view_row:get_worker(Row), rexi:stream_ack(From), Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), {Row, State#collector{rows = Rest, counters = Counters1}}. @@ -338,19 +338,6 @@ find_next_key([], _, _, _) -> find_next_key([Key | Rest], _, _, _) -> {Key, Rest}. -transform_row(#view_row{value = {[{reduce_overflow_error, Msg}]}}) -> - {row, [{key, null}, {id, error}, {value, reduce_overflow_error}, {reason, Msg}]}; -transform_row(#view_row{key = Key, id = reduced, value = Value}) -> - {row, [{key, Key}, {value, Value}]}; -transform_row(#view_row{key = Key, id = undefined}) -> - {row, [{key, Key}, {id, error}, {value, not_found}]}; -transform_row(#view_row{key = Key, id = Id, value = Value, doc = undefined}) -> - {row, [{id, Id}, {key, Key}, {value, Value}]}; -transform_row(#view_row{key = Key, id = _Id, value = _Value, doc = {error, Reason}}) -> - {row, [{id, error}, {key, Key}, {value, Reason}]}; -transform_row(#view_row{key = Key, id = Id, value = Value, doc = Doc}) -> - {row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]}. - compare(fwd, <<"raw">>, A, B) -> A < B; compare(rev, <<"raw">>, A, B) -> B < A; compare(fwd, _, A, B) -> couch_ejson_compare:less_json(A, B); @@ -575,4 +562,300 @@ mk_shard(Name, Range) -> BName = list_to_binary(Name), #shard{name = BName, node = Node, range = Range}. +possibly_embed_doc_test_() -> + { + foreach, + fun() -> meck:new(fabric) end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_possibly_embed_doc_reduced), + ?TDEF_FE(t_possibly_embed_doc_no_docs), + ?TDEF_FE(t_possible_embed_doc_no_props), + ?TDEF_FE(t_possible_embed_doc_id_null), + ?TDEF_FE(t_possible_embed_doc_id_undefined), + ?TDEF_FE(t_possible_embed_doc_no_rev), + ?TDEF_FE(t_possible_embed_doc_no_rev_doc_not_found), + ?TDEF_FE(t_possible_embed_doc_no_rev_doc_error), + ?TDEF_FE(t_possible_embed_doc), + ?TDEF_FE(t_possible_embed_doc_not_found), + ?TDEF_FE(t_possible_embed_doc_error) + ] + }. + +t_possibly_embed_doc_reduced(_) -> + State = #collector{query_args = #mrargs{}}, + Row1 = #view_row{id = reduced}, + Row2 = {view_row, #{id => reduced}}, + ?assertEqual(Row1, possibly_embed_doc(State, Row1)), + ?assertEqual(Row2, possibly_embed_doc(State, Row2)). + +t_possibly_embed_doc_no_docs(_) -> + QueryArgs = #mrargs{include_docs = false}, + State = #collector{query_args = QueryArgs}, + Row1 = #view_row{id = id}, + Row2 = {view_row, #{id => id}}, + ?assertEqual(Row1, possibly_embed_doc(State, Row1)), + ?assertEqual(Row2, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc_no_props(_) -> + QueryArgs = #mrargs{include_docs = true}, + State = #collector{query_args = QueryArgs}, + Row1 = #view_row{id = id}, + Row2 = {view_row, #{id => id}}, + ?assertEqual(Row1, possibly_embed_doc(State, Row1)), + ?assertEqual(Row2, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc_id_null(_) -> + QueryArgs = #mrargs{include_docs = true}, + State = #collector{query_args = QueryArgs}, + Value = {[{<<"_id">>, null}]}, + Row1 = #view_row{id = id, value = Value}, + Row2 = {view_row, #{id => id, value => Value}}, + Row3 = #view_row{id = id, value = Value, doc = null}, + Row4 = {view_row, #{id => id, value => Value, doc => null}}, + ?assertEqual(Row3, possibly_embed_doc(State, Row1)), + ?assertEqual(Row4, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc_id_undefined(_) -> + QueryArgs = #mrargs{include_docs = true}, + State = #collector{query_args = QueryArgs}, + Value = {[{<<"_id">>, undefined}]}, + Row1 = #view_row{id = id, value = Value}, + Row2 = {view_row, #{id => id, value => Value}}, + ?assertEqual(Row1, possibly_embed_doc(State, Row1)), + ?assertEqual(Row2, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc_no_rev(_) -> + DbName = <<"db">>, + Id = <<"id">>, + DocId = <<"doc_id">>, + QueryArgs = #mrargs{include_docs = true}, + State = #collector{db_name = DbName, query_args = QueryArgs}, + Value = {[{<<"_id">>, Id}]}, + NewDoc = #doc{id = DocId}, + EmbeddedDoc = {[{<<"_id">>, DocId}]}, + Row1 = #view_row{id = id, value = Value}, + Row2 = {view_row, #{id => id, value => Value}}, + Row3 = #view_row{id = id, value = Value, doc = EmbeddedDoc}, + Row4 = {view_row, #{id => id, value => Value, doc => EmbeddedDoc}}, + meck:expect(fabric, open_doc, [DbName, Id, []], meck:val({ok, NewDoc})), + ?assertEqual(Row3, possibly_embed_doc(State, Row1)), + ?assertEqual(Row4, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc_no_rev_doc_not_found(_) -> + DbName = <<"db">>, + Id = <<"id">>, + QueryArgs = #mrargs{include_docs = true}, + State = #collector{db_name = DbName, query_args = QueryArgs}, + Value = {[{<<"_id">>, Id}]}, + Row1 = #view_row{id = id, value = Value}, + Row2 = {view_row, #{id => id, value => Value}}, + Row3 = #view_row{id = id, value = Value, doc = null}, + Row4 = {view_row, #{id => id, value => Value, doc => null}}, + meck:expect(fabric, open_doc, [DbName, Id, []], meck:val({not_found, undefined})), + ?assertEqual(Row3, possibly_embed_doc(State, Row1)), + ?assertEqual(Row4, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc_no_rev_doc_error(_) -> + DbName = <<"db">>, + Id = <<"id">>, + QueryArgs = #mrargs{include_docs = true}, + State = #collector{db_name = DbName, query_args = QueryArgs}, + Value = {[{<<"_id">>, Id}]}, + Row1 = #view_row{id = id, value = Value}, + Row2 = {view_row, #{id => id, value => Value}}, + Row3 = #view_row{id = id, value = Value, doc = {error, fabric_error}}, + Row4 = {view_row, #{id => id, value => Value, doc => {error, fabric_error}}}, + meck:expect(fabric, open_doc, [DbName, Id, []], meck:val(fabric_error)), + ?assertEqual(Row3, possibly_embed_doc(State, Row1)), + ?assertEqual(Row4, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc(_) -> + DbName = <<"db">>, + Id = <<"id">>, + Rev = <<"1-foo">>, + ParsedRev = {1, <<"foo">>}, + DocId = <<"doc_id">>, + QueryArgs = #mrargs{include_docs = true}, + State = #collector{db_name = DbName, query_args = QueryArgs}, + Value = {[{<<"_id">>, Id}, {<<"_rev">>, Rev}]}, + NewDoc = #doc{id = DocId}, + EmbeddedDoc = {[{<<"_id">>, DocId}]}, + Row1 = #view_row{id = id, value = Value}, + Row2 = {view_row, #{id => id, value => Value}}, + Row3 = #view_row{id = id, value = Value, doc = EmbeddedDoc}, + Row4 = {view_row, #{id => id, value => Value, doc => EmbeddedDoc}}, + meck:expect(fabric, open_revs, [DbName, Id, [ParsedRev], []], meck:val({ok, [{ok, NewDoc}]})), + ?assertEqual(Row3, possibly_embed_doc(State, Row1)), + ?assertEqual(Row4, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc_not_found(_) -> + DbName = <<"db">>, + Id = <<"id">>, + Rev = <<"1-foo">>, + ParsedRev = {1, <<"foo">>}, + QueryArgs = #mrargs{include_docs = true}, + State = #collector{db_name = DbName, query_args = QueryArgs}, + Value = {[{<<"_id">>, Id}, {<<"_rev">>, Rev}]}, + Row1 = #view_row{id = id, value = Value}, + Row2 = {view_row, #{id => id, value => Value}}, + Row3 = #view_row{id = id, value = Value, doc = null}, + Row4 = {view_row, #{id => id, value => Value, doc => null}}, + meck:expect( + fabric, + open_revs, + [DbName, Id, [ParsedRev], []], + meck:val({ok, [{{not_found, undefined}, ParsedRev}]}) + ), + ?assertEqual(Row3, possibly_embed_doc(State, Row1)), + ?assertEqual(Row4, possibly_embed_doc(State, Row2)). + +t_possible_embed_doc_error(_) -> + DbName = <<"db">>, + Id = <<"id">>, + Rev = <<"1-foo">>, + ParsedRev = {1, <<"foo">>}, + QueryArgs = #mrargs{include_docs = true}, + State = #collector{db_name = DbName, query_args = QueryArgs}, + Value = {[{<<"_id">>, Id}, {<<"_rev">>, Rev}]}, + Row1 = #view_row{id = id, value = Value}, + Row2 = {view_row, #{id => id, value => Value}}, + Row3 = #view_row{id = id, value = Value, doc = {error, fabric_error}}, + Row4 = {view_row, #{id => id, value => Value, doc => {error, fabric_error}}}, + meck:expect(fabric, open_revs, [DbName, Id, [ParsedRev], []], meck:val(fabric_error)), + ?assertEqual(Row3, possibly_embed_doc(State, Row1)), + ?assertEqual(Row4, possibly_embed_doc(State, Row2)). + +detach_partition_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_detach_partition_partition_record), + ?TDEF_FE(t_detach_partition_no_partition_record), + ?TDEF_FE(t_detach_partition_partition_map), + ?TDEF_FE(t_detach_partition_no_partition_map) + ] + }. + +t_detach_partition_partition_record(_) -> + ViewRow1 = #view_row{key = {p, partition, key}}, + ViewRow2 = #view_row{key = key}, + ?assertEqual(ViewRow2, detach_partition(ViewRow1)). + +t_detach_partition_no_partition_record(_) -> + ViewRow = #view_row{key = key}, + ?assertEqual(ViewRow, detach_partition(ViewRow)). + +t_detach_partition_partition_map(_) -> + ViewRow1 = {view_row, #{key => {p, partition, key}}}, + ViewRow2 = {view_row, #{key => key}}, + ?assertEqual(ViewRow2, detach_partition(ViewRow1)). + +t_detach_partition_no_partition_map(_) -> + ViewRow = {view_row, #{key => key}}, + ?assertEqual(ViewRow, detach_partition(ViewRow)). + +get_next_row_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_get_next_row_end), + ?TDEF_FE(t_get_next_row_map), + ?TDEF_FE(t_get_next_row_reduce) + ] + }. + +t_get_next_row_end(_) -> + State = #collector{rows = []}, + ?assertThrow(complete, get_next_row(State)). + +t_get_next_row_map(_) -> + Rest = [row2, row3], + Counters1 = [{worker, 8}], + Counters2 = [{worker, 7}], + Row1 = #view_row{worker = {worker, from}}, + Row2 = {view_row, #{worker => {worker, from}}}, + State1 = #collector{rows = [Row1 | Rest], counters = Counters1}, + State2 = #collector{rows = [Row2 | Rest], counters = Counters1}, + State3 = #collector{rows = Rest, counters = Counters2}, + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({Row1, State3}, get_next_row(State1)), + ?assertEqual({Row2, State3}, get_next_row(State2)). + +t_get_next_row_reduce(_) -> + QueryArgs1 = #mrargs{direction = fwd, extra = []}, + QueryArgs2 = #mrargs{direction = fwd, extra = [{view_row_map, true}]}, + KeysRest = [key2, key3], + Key = key1, + Keys = [key0, Key | KeysRest], + W1From = list_to_pid("<0.4.1>"), + W2From = list_to_pid("<0.4.2>"), + ViewRows1 = [ + #view_row{value = value1, worker = {worker1, W1From}}, + #view_row{value = value2, worker = {worker2, W2From}}, + #view_row{value = value3, worker = {worker2, W2From}} + ], + ViewRows2 = [ + {view_row, #{value => value1, worker => {worker1, W1From}}}, + {view_row, #{value => value2, worker => {worker2, W2From}}}, + {view_row, #{value => value3, worker => {worker2, W2From}}} + ], + Values = [[value1], [value2], [value3]], + RowDict1 = dict:from_list([{key1, ViewRows1}, {key2, undefined}, {key3, undefined}]), + RowDict2 = dict:from_list([{key1, ViewRows2}, {key2, undefined}, {key3, undefined}]), + RowDict3 = dict:from_list([{key2, undefined}, {key3, undefined}]), + Language = <<"language">>, + Collation = <<"raw">>, + Counters1 = [{worker1, 3}, {worker2, 5}], + Counters2 = [{worker1, 2}, {worker2, 3}], + State1 = #collector{ + query_args = QueryArgs1, + keys = Keys, + rows = RowDict1, + lang = Language, + counters = Counters1, + collation = Collation, + reducer = reducer + }, + State2 = #collector{ + query_args = QueryArgs2, + keys = Keys, + rows = RowDict2, + lang = Language, + counters = Counters1, + collation = Collation, + reducer = reducer + }, + State3 = #collector{ + query_args = QueryArgs1, + keys = KeysRest, + rows = RowDict3, + lang = Language, + collation = Collation, + counters = Counters2, + reducer = reducer + }, + State4 = #collector{ + query_args = QueryArgs2, + keys = KeysRest, + rows = RowDict3, + lang = Language, + collation = Collation, + counters = Counters2, + reducer = reducer + }, + Row1 = #view_row{key = Key, id = reduced, value = finalized}, + Row2 = {view_row, #{key => Key, id => reduced, value => finalized}}, + meck:expect(rexi, stream_ack, ['_'], meck:val(ok)), + meck:expect( + couch_query_servers, rereduce, [Language, [reducer], Values], meck:val({ok, [reduced]}) + ), + meck:expect(couch_query_servers, finalize, [reducer, reduced], meck:val({ok, finalized})), + ?assertEqual({Row1, State3}, get_next_row(State1)), + ?assertEqual({Row2, State4}, get_next_row(State2)). + -endif. diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index a6786bff788..decc49cf4ca 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -14,7 +14,7 @@ -export([go/5]). % exported for spawn --export([open_doc/4]). +-export([open_doc/5]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -70,7 +70,7 @@ go(DbName, Options, QueryArgs, Callback, Acc0) -> _ -> DocOptions0 end, SpawnFun = fun(Key) -> - spawn_monitor(?MODULE, open_doc, [DbName, Options ++ DocOptions1, Key, IncludeDocs]) + spawn_monitor(?MODULE, open_doc, [DbName, Options ++ DocOptions1, Key, IncludeDocs, Extra]) end, MaxJobs = all_docs_concurrency(), %% namespace can be _set_ to `undefined`, so we want simulate enum here @@ -187,6 +187,15 @@ shards(Db, Args) -> end, fabric_view:get_shards(Db, NewArgs). +handle_row(Row0, {Worker, _} = Source, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + Dir = Args#mrargs.direction, + Row = fabric_view_row:set_worker(Row0, Source), + Rows = merge_row(Dir, Row, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = Counters1}, + fabric_view:maybe_send_row(State1). + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -257,13 +266,10 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> update_seq = UpdateSeq0 }} end; -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - Dir = Args#mrargs.direction, - Rows = merge_row(Dir, Row#view_row{worker = {Worker, From}}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = Counters1}, - fabric_view:maybe_send_row(State1); +handle_message(#view_row{} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); +handle_message({view_row, #{}} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}); @@ -273,10 +279,19 @@ handle_message({execution_stats, _} = Msg, {_, From}, St) -> rexi:stream_ack(From), {Go, St#collector{user_acc = Acc}}. -merge_row(fwd, Row, Rows) -> - lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). +merge_row(Dir, Row, Rows) -> + lists:merge( + fun(RowA, RowB) -> + IdA = fabric_view_row:get_id(RowA), + IdB = fabric_view_row:get_id(RowB), + case Dir of + fwd -> IdA < IdB; + rev -> IdA > IdB + end + end, + [Row], + Rows + ). all_docs_concurrency() -> Value = config:get("fabric", "all_docs_concurrency", "10"), @@ -297,19 +312,24 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) -> _ -> {{value, {Pid, Ref}}, RestPids} = queue:out(Pids), Timeout = fabric_util:all_docs_timeout(), + Receive = fun(Row) -> + case Callback(fabric_view_row:transform(Row), AccIn) of + {ok, Acc} -> + doc_receive_loop( + Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc + ); + {stop, Acc} -> + cancel_read_pids(RestPids), + {ok, Acc} + end + end, receive {'DOWN', Ref, process, Pid, Row} -> case Row of #view_row{} -> - case Callback(fabric_view:transform_row(Row), AccIn) of - {ok, Acc} -> - doc_receive_loop( - Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc - ); - {stop, Acc} -> - cancel_read_pids(RestPids), - {ok, Acc} - end; + Receive(Row); + {view_row, #{}} -> + Receive(Row); Error -> cancel_read_pids(RestPids), Callback({error, Error}, AccIn) @@ -319,9 +339,11 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) -> end end. -open_doc(DbName, Options, Id, IncludeDocs) -> - try open_doc_int(DbName, Options, Id, IncludeDocs) of +open_doc(DbName, Options, Id, IncludeDocs, Extra) -> + try open_doc_int(DbName, Options, Id, IncludeDocs, Extra) of #view_row{} = Row -> + exit(Row); + {view_row, #{}} = Row -> exit(Row) catch Type:Reason:Stack -> @@ -331,25 +353,25 @@ open_doc(DbName, Options, Id, IncludeDocs) -> exit({Id, Reason}) end. -open_doc_int(DbName, Options, Id, IncludeDocs) -> +open_doc_int(DbName, Options, Id, IncludeDocs, Extra) -> Row = case fabric:open_doc(DbName, Id, [deleted | Options]) of {not_found, missing} -> Doc = undefined, - #view_row{key = Id}; + fabric_view_row:from_props([{key, Id}], Extra); {ok, #doc{deleted = true, revs = Revs}} -> Doc = null, {RevPos, [RevId | _]} = Revs, Value = {[{rev, couch_doc:rev_to_str({RevPos, RevId})}, {deleted, true}]}, - #view_row{key = Id, id = Id, value = Value}; + fabric_view_row:from_props([{key, Id}, {id, Id}, {value, Value}], Extra); {ok, #doc{revs = Revs} = Doc0} -> Doc = couch_doc:to_json_obj(Doc0, Options), {RevPos, [RevId | _]} = Revs, Value = {[{rev, couch_doc:rev_to_str({RevPos, RevId})}]}, - #view_row{key = Id, id = Id, value = Value} + fabric_view_row:from_props([{key, Id}, {id, Id}, {value, Value}], Extra) end, if - IncludeDocs -> Row#view_row{doc = Doc}; + IncludeDocs -> fabric_view_row:set_doc(Row, Doc); true -> Row end. @@ -385,3 +407,476 @@ filter_keys_by_namespace(Keys, Namespace) when Namespace =:= <<"_local">> -> ); filter_keys_by_namespace(Keys, _Namespace) -> Keys. + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +handle_message_test_() -> + { + foreach, + fun() -> + meck:new(foo, [non_strict]), + meck:new(fabric_view) + end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_handle_message_rexi_down), + ?TDEF_FE(t_handle_message_rexi_exit), + ?TDEF_FE(t_handle_message_meta_zero), + ?TDEF_FE(t_handle_message_meta), + ?TDEF_FE(t_handle_message_row), + ?TDEF_FE(t_handle_message_complete), + ?TDEF_FE(t_handle_message_execution_stats) + ] + }. + +t_handle_message_rexi_down(_) -> + Message = {rexi_DOWN, undefined, {undefined, node}, undefined}, + meck:expect(fabric_view, check_down_shards, [state, node], meck:val(fabric_view_result)), + ?assertEqual(fabric_view_result, handle_message(Message, source, state)). + +t_handle_message_rexi_exit(_) -> + Message = {rexi_EXIT, reason}, + meck:expect( + fabric_view, handle_worker_exit, [state, source, reason], meck:val(fabric_view_result) + ), + ?assertEqual(fabric_view_result, handle_message(Message, source, state)). + +t_handle_message_meta_zero(_) -> + Meta1 = [{total, 3}, {offset, 2}, {update_seq, 1}], + Meta2 = [{total, null}, {offset, null}, {update_seq, null}], + Worker = {worker1, from}, + Counters1 = [{worker1, 0}, {worker2, 0}], + Counters2 = [{worker1, 1}, {worker2, 0}], + State1 = #collector{counters = Counters1, total_rows = 0, update_seq = nil, offset = 0}, + State2 = #collector{counters = Counters2, total_rows = 3, update_seq = nil, offset = 2}, + State3 = #collector{counters = Counters1, total_rows = null, update_seq = null, offset = null}, + State4 = #collector{counters = Counters2, total_rows = null, update_seq = null, offset = null}, + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({ok, State2}, handle_message({meta, Meta1}, Worker, State1)), + ?assertEqual({ok, State4}, handle_message({meta, Meta2}, Worker, State3)). + +t_handle_message_meta(_) -> + Meta1 = [{total, 10}, {offset, 2}, {update_seq, seq}], + Meta2 = [{total, 10}, {offset, 5}, {update_seq, packed_seq}], + Meta3 = [{total, 10}, {offset, 5}], + Meta4 = [{total, null}, {offset, null}, {update_seq, null}], + Worker = {worker1, from}, + Counters1 = [{worker1, 0}, {worker2, 3}, {worker3, 5}], + Counters2 = [{worker1, 0}, {worker2, 2}, {worker3, 4}], + State1 = #collector{ + counters = Counters1, + total_rows = 0, + update_seq = [], + offset = 0, + skip = 3, + callback = fun foo:bar/2, + user_acc = accumulator1 + }, + State2 = #collector{ + counters = Counters1, + total_rows = 0, + update_seq = nil, + offset = 0, + skip = 3, + callback = fun foo:bar/2, + user_acc = accumulator2 + }, + State3 = #collector{ + counters = Counters1, + total_rows = 0, + update_seq = [], + offset = 0, + skip = 3, + callback = fun foo:bar/2, + user_acc = accumulator3 + }, + State4 = #collector{ + counters = Counters2, + total_rows = 10, + update_seq = [], + offset = 5, + skip = 3, + callback = fun foo:bar/2, + user_acc = updated_accumulator1 + }, + State5 = #collector{ + counters = Counters2, + total_rows = 10, + update_seq = nil, + offset = 5, + skip = 3, + callback = fun foo:bar/2, + user_acc = updated_accumulator2 + }, + State6 = #collector{ + counters = Counters2, + total_rows = null, + update_seq = [], + offset = null, + skip = 3, + callback = fun foo:bar/2, + user_acc = updated_accumulator3 + }, + meck:expect(fabric_view_changes, pack_seqs, [[{worker1, seq}]], meck:val(packed_seq)), + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + meck:expect(foo, bar, [{meta, Meta2}, accumulator1], meck:val({go1, updated_accumulator1})), + ?assertEqual({go1, State4}, handle_message({meta, Meta1}, Worker, State1)), + meck:expect(foo, bar, [{meta, Meta3}, accumulator2], meck:val({go2, updated_accumulator2})), + ?assertEqual({go2, State5}, handle_message({meta, Meta1}, Worker, State2)), + meck:expect(foo, bar, [{meta, Meta4}, accumulator3], meck:val({go3, updated_accumulator3})), + ?assertEqual({go3, State6}, handle_message({meta, Meta4}, Worker, State3)). + +t_handle_message_row(_) -> + Worker = {worker, from}, + QueryArgs1 = #mrargs{direction = fwd}, + QueryArgs2 = #mrargs{direction = rev}, + Counters1 = [{worker, 1}], + Counters2 = [{worker, 2}], + Row1 = #view_row{id = id2, key = key2, doc = doc2}, + Row2 = {view_row, #{id => id2, key => key2, doc => doc2}}, + Rows11 = #view_row{id = id1, key = key1, doc = doc1}, + Rows12 = #view_row{id = id2, key = key2, doc = doc2, worker = Worker}, + Rows13 = #view_row{id = id3, key = key3, doc = doc3}, + Rows21 = {view_row, #{id => id1, key => key1}}, + Rows22 = {view_row, #{id => id2, key => key2, doc => doc2, worker => Worker}}, + Rows23 = {view_row, #{id => id3, key => key3}}, + Rows1 = [Rows11, Rows13], + Rows2 = [Rows23, Rows21], + Rows3 = [Rows11, Rows12, Rows13], + Rows4 = [Rows23, Rows22, Rows21], + State1 = #collector{query_args = QueryArgs1, counters = Counters1, rows = Rows1}, + State2 = #collector{query_args = QueryArgs2, counters = Counters1, rows = Rows2}, + State3 = #collector{query_args = QueryArgs1, counters = Counters2, rows = Rows3}, + State4 = #collector{query_args = QueryArgs2, counters = Counters2, rows = Rows4}, + meck:expect(fabric_view, maybe_send_row, [State3], meck:val(send_row1)), + ?assertEqual(send_row1, handle_message(Row1, Worker, State1)), + meck:expect(fabric_view, maybe_send_row, [State4], meck:val(send_row2)), + ?assertEqual(send_row2, handle_message(Row2, Worker, State2)). + +t_handle_message_complete(_) -> + Worker = worker, + Counters1 = [{Worker, 6}], + Counters2 = [{Worker, 7}], + State1 = #collector{counters = Counters1}, + State2 = #collector{counters = Counters2}, + meck:expect(fabric_view, maybe_send_row, [State2], meck:val(maybe_row)), + ?assertEqual(maybe_row, handle_message(complete, Worker, State1)). + +t_handle_message_execution_stats(_) -> + Message = {execution_stats, stats}, + Source = {worker, from}, + meck:expect(foo, bar, [Message, accumulator], meck:val({go, updated_accumulator})), + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + State1 = #collector{callback = fun foo:bar/2, user_acc = accumulator}, + State2 = #collector{callback = fun foo:bar/2, user_acc = updated_accumulator}, + ?assertEqual({go, State2}, handle_message(Message, Source, State1)). + +open_doc_test_() -> + { + foreach, + fun() -> meck:new(fabric) end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_open_doc_not_found), + ?TDEF_FE(t_open_doc_deleted), + ?TDEF_FE(t_open_doc), + ?TDEF_FE(t_open_doc_error) + ] + }. + +t_open_doc_not_found(_) -> + Extra1 = [], + Extra2 = [{view_row_map, true}], + Options1 = [], + Options2 = [deleted], + Row1 = #view_row{key = id, doc = undefined}, + Row2 = {view_row, #{key => id}}, + meck:expect(fabric, open_doc, [db, id, Options2], meck:val({not_found, missing})), + {_, Ref1} = spawn_monitor(?MODULE, open_doc, [db, Options1, id, true, Extra1]), + receive + {'DOWN', Ref1, _, _, Result1} -> + ?assertEqual(Row1, Result1) + end, + {_, Ref2} = spawn_monitor(?MODULE, open_doc, [db, Options1, id, false, Extra2]), + receive + {'DOWN', Ref2, _, _, Result2} -> + ?assertEqual(Row2, Result2) + end. + +t_open_doc_deleted(_) -> + Extra1 = [], + Extra2 = [{view_row_map, true}], + Options1 = [], + Options2 = [deleted], + Revs = {1, [<<"foo">>]}, + Doc = #doc{deleted = true, revs = Revs}, + Value = {[{rev, <<"1-foo">>}, {deleted, true}]}, + Row1 = #view_row{key = id, id = id, value = Value, doc = null}, + Row2 = {view_row, #{key => id, id => id, value => Value, doc => null}}, + meck:expect(fabric, open_doc, [db, id, Options2], meck:val({ok, Doc})), + {_, Ref1} = spawn_monitor(?MODULE, open_doc, [db, Options1, id, true, Extra1]), + receive + {'DOWN', Ref1, _, _, Result1} -> + ?assertEqual(Row1, Result1) + end, + {_, Ref2} = spawn_monitor(?MODULE, open_doc, [db, Options1, id, true, Extra2]), + receive + {'DOWN', Ref2, _, _, Result2} -> + ?assertEqual(Row2, Result2) + end. + +t_open_doc(_) -> + Extra1 = [], + Extra2 = [{view_row_map, true}], + Options1 = [], + Options2 = [deleted], + Revs = {1, [<<"foo">>]}, + Doc = #doc{revs = Revs, id = <<"bar">>}, + DocJson = {[{<<"_id">>, <<"bar">>}, {<<"_rev">>, <<"1-foo">>}]}, + Value = {[{rev, <<"1-foo">>}]}, + Row1 = #view_row{key = id, id = id, value = Value, doc = DocJson}, + Row2 = {view_row, #{key => id, id => id, value => Value, doc => DocJson}}, + meck:expect(fabric, open_doc, [db, id, Options2], meck:val({ok, Doc})), + {_, Ref1} = spawn_monitor(?MODULE, open_doc, [db, Options1, id, true, Extra1]), + receive + {'DOWN', Ref1, _, _, Result1} -> + ?assertEqual(Row1, Result1) + end, + {_, Ref2} = spawn_monitor(?MODULE, open_doc, [db, Options1, id, true, Extra2]), + receive + {'DOWN', Ref2, _, _, Result2} -> + ?assertEqual(Row2, Result2) + end. + +t_open_doc_error(_) -> + Extra = [], + Options1 = [], + Options2 = [deleted], + Exception = {id, reason}, + meck:expect(fabric, open_doc, [db, id, Options2], meck:raise(error, reason)), + meck:expect( + couch_log, + error, + ["_all_docs open error: ~s ~s :: ~w ~w", [db, id, {error, reason}, '_']], + meck:val(ok) + ), + {_, Ref} = spawn_monitor(?MODULE, open_doc, [db, Options1, id, true, Extra]), + receive + {'DOWN', Ref, _, _, Result} -> + ?assertEqual(Exception, Result) + end. + +doc_receive_loop_test_() -> + { + foreach, + fun() -> + meck:new(foo, [non_strict]), + meck:new(fabric_util) + end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_doc_receive_loop_empty), + ?TDEF_FE(t_doc_receive_loop), + ?TDEF_FE(t_doc_receive_loop_error), + ?TDEF_FE(t_doc_receive_loop_timeout) + ] + }. + +t_doc_receive_loop_empty(_) -> + Keys = [], + Pids = queue:new(), + ?assertEqual( + {ok, accumulator}, + doc_receive_loop(Keys, Pids, undefined, undefined, undefined, accumulator) + ). + +t_doc_receive_loop(_) -> + Keys = [key1, key2, key3], + Pids = queue:from_list([]), + MaxJobs = 4, + Props1 = {row, [{id, id1}, {key, key1}, {value, value1}]}, + Props2 = {row, [{id, id2}, {key, key2}, {value, value2}]}, + Row1 = #view_row{id = id1, key = key1, value = value1}, + Row2 = {view_row, #{id => id2, key => key2, value => value2}}, + meck:expect( + foo, + spawned, + [ + {[key1], meck:raise(exit, Row1)}, + {[key2], meck:raise(exit, Row2)} + ] + ), + meck:expect(foo, spawn, fun(K) -> spawn_monitor(foo, spawned, [K]) end), + meck:expect( + foo, + callback, + [ + {[Props1, accumulator1], meck:val({ok, accumulator2})}, + {[Props2, accumulator2], meck:val({stop, accumulator3})} + ] + ), + meck:expect(fabric_util, all_docs_timeout, [], meck:val(1000)), + ?assertEqual( + {ok, accumulator3}, + doc_receive_loop(Keys, Pids, fun foo:spawn/1, MaxJobs, fun foo:callback/2, accumulator1) + ). + +t_doc_receive_loop_error(_) -> + Keys = [key1, key2, key3], + Pids = queue:from_list([]), + MaxJobs = 3, + meck:expect(foo, spawned, [key1], meck:raise(exit, error)), + meck:expect(foo, spawn, fun(K) -> spawn_monitor(foo, spawned, [K]) end), + meck:expect(foo, callback, [{error, error}, accumulator1], meck:val({ok, accumulator2})), + meck:expect(fabric_util, all_docs_timeout, [], meck:val(1000)), + ?assertEqual( + {ok, accumulator2}, + doc_receive_loop(Keys, Pids, fun foo:spawn/1, MaxJobs, fun foo:callback/2, accumulator1) + ). + +t_doc_receive_loop_timeout(_) -> + Keys = [key1, key2, key3], + Pids = queue:from_list([]), + MaxJobs = 3, + meck:expect(foo, spawned, fun(key1) -> timer:sleep(infinity) end), + meck:expect(foo, spawn, fun(K) -> spawn_monitor(foo, spawned, [K]) end), + meck:expect(foo, callback, ['_', '_'], undefined), + meck:expect(fabric_util, all_docs_timeout, [], meck:val(1)), + ?assertEqual( + timeout, + doc_receive_loop(Keys, Pids, fun foo:spawn/1, MaxJobs, fun foo:callback/2, accumulator1) + ), + ?assertNot(meck:called(foo, callback, '_')). + +merge_row_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_merge_row_record_fwd), + ?TDEF_FE(t_merge_row_record_rev), + ?TDEF_FE(t_merge_row_map_fwd), + ?TDEF_FE(t_merge_row_map_rev), + ?TDEF_FE(t_merge_row_mixed_fwd), + ?TDEF_FE(t_merge_row_mixed_rev) + ] + }. + +t_merge_row_record_fwd(_) -> + RowX1 = #view_row{id = 4}, + Row1 = #view_row{id = 1}, + Row2 = #view_row{id = 3}, + Row3 = #view_row{id = 5}, + Row4 = #view_row{id = 7}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = #view_row{id = 0}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = #view_row{id = 8}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = #view_row{id = 5}, + Expected4 = [Row1, Row2, RowX4, Row3, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +t_merge_row_record_rev(_) -> + RowX1 = #view_row{id = 5}, + Row1 = #view_row{id = 2}, + Row2 = #view_row{id = 4}, + Row3 = #view_row{id = 6}, + Row4 = #view_row{id = 8}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = #view_row{id = 1}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = #view_row{id = 9}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = #view_row{id = 6}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). + +t_merge_row_map_fwd(_) -> + RowX1 = {view_row, #{id => 4}}, + Row1 = {view_row, #{id => 1}}, + Row2 = {view_row, #{id => 3}}, + Row3 = {view_row, #{id => 5}}, + Row4 = {view_row, #{id => 7}}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = {view_row, #{id => 0}}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = {view_row, #{id => 8}}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = {view_row, #{id => 5}}, + Expected4 = [Row1, Row2, RowX4, Row3, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +t_merge_row_map_rev(_) -> + RowX1 = {view_row, #{id => 5}}, + Row1 = {view_row, #{id => 2}}, + Row2 = {view_row, #{id => 4}}, + Row3 = {view_row, #{id => 6}}, + Row4 = {view_row, #{id => 8}}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = {view_row, #{id => 1}}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = {view_row, #{id => 9}}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = {view_row, #{id => 6}}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). + +t_merge_row_mixed_fwd(_) -> + RowX1 = #view_row{id = 4}, + Row1 = {view_row, #{id => 1}}, + Row2 = {view_row, #{id => 3}}, + Row3 = #view_row{id = 5}, + Row4 = {view_row, #{id => 7}}, + Rows = [Row1, Row2, Row3, Row4], + Expected1 = [Row1, Row2, RowX1, Row3, Row4], + ?assertEqual(Expected1, merge_row(fwd, RowX1, Rows)), + RowX2 = {view_row, #{id => 0}}, + Expected2 = [RowX2, Row1, Row2, Row3, Row4], + ?assertEqual(Expected2, merge_row(fwd, RowX2, Rows)), + RowX3 = {view_row, #{id => 8}}, + Expected3 = [Row1, Row2, Row3, Row4, RowX3], + ?assertEqual(Expected3, merge_row(fwd, RowX3, Rows)), + RowX4 = {view_row, #{id => 5}}, + Expected4 = [Row1, Row2, Row3, RowX4, Row4], + ?assertEqual(Expected4, merge_row(fwd, RowX4, Rows)). + +t_merge_row_mixed_rev(_) -> + RowX1 = {view_row, #{id => 5}}, + Row1 = #view_row{id = 2}, + Row2 = #view_row{id = 4}, + Row3 = {view_row, #{id => 6}}, + Row4 = #view_row{id = 8}, + Rows = [Row4, Row3, Row2, Row1], + Expected1 = [Row4, Row3, RowX1, Row2, Row1], + ?assertEqual(Expected1, merge_row(rev, RowX1, Rows)), + RowX2 = #view_row{id = 1}, + Expected2 = [Row4, Row3, Row2, Row1, RowX2], + ?assertEqual(Expected2, merge_row(rev, RowX2, Rows)), + RowX3 = #view_row{id = 9}, + Expected3 = [RowX3, Row4, Row3, Row2, Row1], + ?assertEqual(Expected3, merge_row(rev, RowX3, Rows)), + RowX4 = #view_row{id = 6}, + Expected4 = [Row4, Row3, RowX4, Row2, Row1], + ?assertEqual(Expected4, merge_row(rev, RowX4, Rows)). + +-endif. diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 43922d5d59e..62077ac9f03 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -121,6 +121,37 @@ go(DbName, Workers, {map, View, _}, Args, Callback, Acc0) -> {ok, Resp} end. +handle_stop(State) -> + #collector{callback = Callback} = State, + {_, Acc} = Callback(complete, State#collector.user_acc), + {stop, State#collector{user_acc = Acc}}. + +handle_non_sorted(Row, {_, From}, State) -> + #collector{callback = Callback, user_acc = AccIn, limit = Limit} = State, + {Go, Acc} = Callback(fabric_view_row:transform(Row), AccIn), + rexi:stream_ack(From), + {Go, State#collector{user_acc = Acc, limit = Limit - 1}}. + +handle_sorted(Row0, {Worker, _} = Source, State) -> + #collector{ + query_args = #mrargs{direction = Dir}, + counters = Counters0, + rows = Rows0, + keys = KeyDict0, + collation = Collation + } = State, + Row = fabric_view_row:set_worker(Row0, Source), + {Rows, KeyDict} = merge_row( + Dir, + Collation, + KeyDict0, + Row, + Rows0 + ), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = Counters1, keys = KeyDict}, + fabric_view:maybe_send_row(State1). + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -176,40 +207,25 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> }} end; handle_message(#view_row{}, {_, _}, #collector{sorted = false, limit = 0} = State) -> - #collector{callback = Callback} = State, - {_, Acc} = Callback(complete, State#collector.user_acc), - {stop, State#collector{user_acc = Acc}}; -handle_message(#view_row{} = Row, {_, From}, #collector{sorted = false} = St) -> - #collector{callback = Callback, user_acc = AccIn, limit = Limit} = St, - {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), - rexi:stream_ack(From), - {Go, St#collector{user_acc = Acc, limit = Limit - 1}}; -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{ - query_args = #mrargs{direction = Dir}, - counters = Counters0, - rows = Rows0, - keys = KeyDict0, - collation = Collation - } = State, - {Rows, KeyDict} = merge_row( - Dir, - Collation, - KeyDict0, - Row#view_row{worker = {Worker, From}}, - Rows0 - ), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = Counters1, keys = KeyDict}, - fabric_view:maybe_send_row(State1); + handle_stop(State); +handle_message(#view_row{} = Row, {_, _} = Source, #collector{sorted = false} = State) -> + handle_non_sorted(Row, Source, State); +handle_message(#view_row{} = Row, {_, _} = Source, State) -> + handle_sorted(Row, Source, State); +handle_message({view_row, #{}}, {_, _}, #collector{sorted = false, limit = 0} = State) -> + handle_stop(State); +handle_message({view_row, #{}} = Row, {_, _} = Source, #collector{sorted = false} = State) -> + handle_non_sorted(Row, Source, State); +handle_message({view_row, #{}} = Row, {_, _} = Source, State) -> + handle_sorted(Row, Source, State); handle_message(complete, Worker, State) -> Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), fabric_view:maybe_send_row(State#collector{counters = Counters}); -handle_message({execution_stats, _} = Msg, {_, From}, St) -> - #collector{callback = Callback, user_acc = AccIn} = St, +handle_message({execution_stats, _} = Msg, {_, From}, State) -> + #collector{callback = Callback, user_acc = AccIn} = State, {Go, Acc} = Callback(Msg, AccIn), rexi:stream_ack(From), - {Go, St#collector{user_acc = Acc}}; + {Go, State#collector{user_acc = Acc}}; handle_message(ddoc_updated, _Worker, State) -> {stop, State}; handle_message(insufficient_storage, _Worker, State) -> @@ -217,7 +233,11 @@ handle_message(insufficient_storage, _Worker, State) -> merge_row(Dir, Collation, undefined, Row, Rows0) -> Rows1 = lists:merge( - fun(#view_row{key = KeyA, id = IdA}, #view_row{key = KeyB, id = IdB}) -> + fun(RowA, RowB) -> + KeyA = fabric_view_row:get_key(RowA), + KeyB = fabric_view_row:get_key(RowB), + IdA = fabric_view_row:get_id(RowA), + IdB = fabric_view_row:get_id(RowB), compare(Dir, Collation, {KeyA, IdA}, {KeyB, IdB}) end, [Row], @@ -240,12 +260,17 @@ merge_row(Dir, Collation, KeyDict0, Row, Rows0) -> _ -> fun couch_ejson_compare:less/2 end, - case maybe_update_keydict(Row#view_row.key, KeyDict0, CmpFun) of + Key = fabric_view_row:get_key(Row), + case maybe_update_keydict(Key, KeyDict0, CmpFun) of undefined -> {Rows0, KeyDict0}; KeyDict1 -> Rows1 = lists:merge( - fun(#view_row{key = A, id = IdA}, #view_row{key = B, id = IdB}) -> + fun(RowA, RowB) -> + A = fabric_view_row:get_key(RowA), + B = fabric_view_row:get_key(RowB), + IdA = fabric_view_row:get_id(RowA), + IdB = fabric_view_row:get_id(RowB), case {Dir, CmpFun(A, B)} of {fwd, 0} -> IdA < IdB; @@ -296,3 +321,298 @@ key_index(KeyA, [{KeyB, Value} | KVs], CmpFun) -> 0 -> Value; _ -> key_index(KeyA, KVs, CmpFun) end. + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +handle_message_test_() -> + { + foreach, + fun() -> + meck:new(foo, [non_strict]), + meck:new(fabric_view) + end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_handle_message_rexi_down), + ?TDEF_FE(t_handle_message_rexi_exit), + ?TDEF_FE(t_handle_message_meta_zero), + ?TDEF_FE(t_handle_message_meta), + ?TDEF_FE(t_handle_message_limit), + ?TDEF_FE(t_handle_message_non_sorted), + ?TDEF_FE(t_handle_message_sorted), + ?TDEF_FE(t_handle_message_complete), + ?TDEF_FE(t_handle_message_execution_stats), + ?TDEF_FE(t_handle_message_ddoc_updated), + ?TDEF_FE(t_handle_message_insufficient_storage) + ] + }. + +t_handle_message_rexi_down(_) -> + Message = {rexi_DOWN, undefined, {undefined, node}, undefined}, + meck:expect(fabric_view, check_down_shards, [state, node], meck:val(fabric_view_result)), + ?assertEqual(fabric_view_result, handle_message(Message, source, state)). + +t_handle_message_rexi_exit(_) -> + Message = {rexi_EXIT, reason}, + meck:expect( + fabric_view, handle_worker_exit, [state, source, reason], meck:val(fabric_view_result) + ), + ?assertEqual(fabric_view_result, handle_message(Message, source, state)). + +t_handle_message_meta_zero(_) -> + Meta = [{total, 3}, {offset, 2}, {update_seq, 1}], + Worker = {worker1, from}, + Counters1 = [{worker1, 0}, {worker2, 0}], + Counters2 = [{worker1, 1}, {worker2, 0}], + State1 = #collector{counters = Counters1, total_rows = 0, update_seq = nil, offset = 0}, + State2 = #collector{counters = Counters2, total_rows = 3, update_seq = nil, offset = 2}, + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({ok, State2}, handle_message({meta, Meta}, Worker, State1)). + +t_handle_message_meta(_) -> + Meta1 = [{total, 10}, {offset, 2}, {update_seq, seq}], + Meta2 = [{total, 10}, {offset, 5}, {update_seq, packed_seq}], + Meta3 = [{total, 10}, {offset, 5}], + Worker = {worker1, from}, + Counters1 = [{worker1, 0}, {worker2, 3}, {worker3, 5}], + Counters2 = [{worker1, 0}, {worker2, 2}, {worker3, 4}], + State1 = #collector{ + counters = Counters1, + total_rows = 0, + update_seq = [], + offset = 0, + skip = 3, + callback = fun foo:bar/2, + user_acc = accumulator1 + }, + State2 = #collector{ + counters = Counters1, + total_rows = 0, + update_seq = nil, + offset = 0, + skip = 3, + callback = fun foo:bar/2, + user_acc = accumulator2 + }, + State3 = #collector{ + counters = Counters2, + total_rows = 10, + update_seq = [], + offset = 5, + skip = 3, + callback = fun foo:bar/2, + user_acc = updated_accumulator1 + }, + State4 = #collector{ + counters = Counters2, + total_rows = 10, + update_seq = nil, + offset = 5, + skip = 3, + callback = fun foo:bar/2, + user_acc = updated_accumulator2 + }, + meck:expect( + foo, + bar, + [ + {[{meta, Meta2}, accumulator1], meck:val({go1, updated_accumulator1})}, + {[{meta, Meta3}, accumulator2], meck:val({go2, updated_accumulator2})} + ] + ), + meck:expect(fabric_view_changes, pack_seqs, [[{worker1, seq}]], meck:val(packed_seq)), + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({go1, State3}, handle_message({meta, Meta1}, Worker, State1)), + ?assertEqual({go2, State4}, handle_message({meta, Meta1}, Worker, State2)). + +t_handle_message_limit(_) -> + State1 = #collector{ + sorted = false, limit = 0, callback = fun foo:bar/2, user_acc = accumulator + }, + State2 = #collector{ + sorted = false, limit = 0, callback = fun foo:bar/2, user_acc = updated_accumulator + }, + Worker = {worker, from}, + Row1 = #view_row{}, + Row2 = {view_row, #{}}, + meck:expect(foo, bar, [complete, accumulator], meck:val({go, updated_accumulator})), + meck:expect(rexi, stream_ack, [from], undefined), + ?assertEqual({stop, State2}, handle_message(Row1, Worker, State1)), + ?assertEqual({stop, State2}, handle_message(Row2, Worker, State1)), + ?assertNot(meck:called(rexi, stream_ack, '_')). + +t_handle_message_non_sorted(_) -> + State1 = #collector{ + sorted = false, limit = 10, callback = fun foo:bar/2, user_acc = accumulator + }, + State2 = #collector{ + sorted = false, limit = 9, callback = fun foo:bar/2, user_acc = updated_accumulator + }, + Worker = {worker, from}, + Row1 = #view_row{id = id, key = key, doc = doc}, + Row2 = {view_row, #{id => id, key => key, doc => doc}}, + Props = {row, [{id, id}, {key, key}, {value, undefined}, {doc, doc}]}, + meck:expect(foo, bar, [Props, accumulator], meck:val({go, updated_accumulator})), + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({go, State2}, handle_message(Row1, Worker, State1)), + ?assertEqual({go, State2}, handle_message(Row2, Worker, State1)). + +t_handle_message_sorted(_) -> + QueryArgs = #mrargs{direction = fwd}, + Counters1 = [{worker, 1}], + Counters2 = [{worker, 2}], + Worker = {worker, from}, + Row1 = #view_row{id = id2, key = key2, doc = doc2}, + Row2 = {view_row, #{id => id2, key => key2, doc => doc2}}, + Props = {row, [{id, id2}, {key, key2}, {value, undefined}, {doc, doc2}]}, + Rows11 = #view_row{id = id1, key = key1, doc = doc1}, + Rows12 = #view_row{id = id2, key = key2, doc = doc2, worker = Worker}, + Rows13 = #view_row{id = id3, key = key3, doc = doc3}, + Rows21 = {view_row, #{id => id1, key => key1}}, + Rows22 = {view_row, #{id => id2, key => key2, doc => doc2, worker => Worker}}, + Rows23 = {view_row, #{id => id3, key => key3}}, + Rows1 = [Rows11, Rows13], + Rows2 = [Rows21, Rows23], + Rows3 = [Rows11, Rows12, Rows13], + Rows4 = [Rows21, Rows22, Rows23], + State1 = #collector{ + sorted = true, + limit = 10, + callback = fun foo:bar/2, + user_acc = accumulator, + query_args = QueryArgs, + counters = Counters1, + rows = Rows1, + collation = <<"raw">> + }, + State2 = #collector{ + sorted = true, + limit = 10, + callback = fun foo:bar/2, + user_acc = accumulator, + query_args = QueryArgs, + counters = Counters1, + rows = Rows2, + collation = <<"raw">> + }, + State3 = #collector{ + sorted = true, + limit = 10, + callback = fun foo:bar/2, + user_acc = accumulator, + query_args = QueryArgs, + counters = Counters2, + rows = Rows3, + collation = <<"raw">> + }, + State4 = #collector{ + sorted = true, + limit = 10, + callback = fun foo:bar/2, + user_acc = accumulator, + query_args = QueryArgs, + counters = Counters2, + rows = Rows4, + collation = <<"raw">> + }, + meck:expect(foo, bar, [Props, accumulator], meck:val({go, updated_accumulator})), + meck:expect( + fabric_view, + maybe_send_row, + [ + {[State3], meck:val(next_row1)}, + {[State4], meck:val(next_row2)} + ] + ), + ?assertEqual(next_row1, handle_message(Row1, Worker, State1)), + ?assertEqual(next_row2, handle_message(Row2, Worker, State2)). + +t_handle_message_complete(_) -> + Worker = worker, + Counters1 = [{Worker, 6}], + Counters2 = [{Worker, 7}], + State1 = #collector{counters = Counters1}, + State2 = #collector{counters = Counters2}, + meck:expect(fabric_view, maybe_send_row, [State2], meck:val(maybe_row)), + ?assertEqual(maybe_row, handle_message(complete, Worker, State1)). + +t_handle_message_execution_stats(_) -> + Message = {execution_stats, stats}, + Source = {worker, from}, + meck:expect(foo, bar, [Message, accumulator], meck:val({go, updated_accumulator})), + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + State1 = #collector{callback = fun foo:bar/2, user_acc = accumulator}, + State2 = #collector{callback = fun foo:bar/2, user_acc = updated_accumulator}, + ?assertEqual({go, State2}, handle_message(Message, Source, State1)). + +t_handle_message_ddoc_updated(_) -> + ?assertEqual({stop, state}, handle_message(ddoc_updated, source, state)). + +t_handle_message_insufficient_storage(_) -> + ?assertEqual({stop, state}, handle_message(insufficient_storage, source, state)). + +merge_row_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_merge_row_no_keys), + ?TDEF_FE(t_merge_row_raw), + ?TDEF_FE(t_merge_row) + ] + }. + +t_merge_row_no_keys(_) -> + Row1 = #view_row{id = id2, key = <<"key2">>}, + Rows11 = #view_row{id = id1, key = <<"key1">>}, + Rows13 = #view_row{id = id3, key = <<"key3">>}, + Rows1 = [Rows11, Rows13], + Rows3 = [Rows11, Row1, Rows13], + Row2 = {view_row, #{id => id2, key => <<"key2">>}}, + Rows21 = {view_row, #{id => id1, key => <<"key1">>}}, + Rows23 = {view_row, #{id => id3, key => <<"key3">>}}, + Rows2 = [Rows23, Rows21], + Rows4 = [Rows23, Row2, Rows21], + ?assertEqual({Rows3, undefined}, merge_row(fwd, <<"raw">>, undefined, Row1, Rows1)), + ?assertEqual({Rows3, undefined}, merge_row(fwd, <<"collation">>, undefined, Row1, Rows1)), + ?assertEqual({Rows4, undefined}, merge_row(rev, <<"raw">>, undefined, Row2, Rows2)), + ?assertEqual({Rows4, undefined}, merge_row(rev, <<"collation">>, undefined, Row2, Rows2)). + +t_merge_row_raw(_) -> + Keys1 = dict:from_list([{key1, id1}, {key2, id2}, {key3, id3}]), + Keys2 = dict:from_list([{key1, id1}, {key2, id2}, {key3, id3}]), + Row1 = #view_row{id = id22, key = key2}, + Rows11 = #view_row{id = id1, key = key1}, + Rows13 = #view_row{id = id3, key = key3}, + Rows1 = [Rows11, Rows13], + Rows3 = [Rows11, Row1, Rows13], + Row2 = {view_row, #{id => id22, key => key2}}, + Rows21 = {view_row, #{id => id1, key => key1}}, + Rows23 = {view_row, #{id => id3, key => key3}}, + Rows2 = [Rows23, Rows21], + Rows4 = [Row2, Rows23, Rows21], + ?assertEqual({Rows3, Keys2}, merge_row(fwd, <<"raw">>, Keys1, Row1, Rows1)), + ?assertEqual({Rows4, Keys2}, merge_row(rev, <<"raw">>, Keys1, Row2, Rows2)). + +t_merge_row(_) -> + Row1 = #view_row{id = id2, key = <<"key2">>}, + Rows11 = #view_row{id = id1, key = <<"key1">>}, + Rows13 = #view_row{id = id2, key = <<"key2">>}, + Rows1 = [Rows11, Rows13], + Rows3 = [Rows11, Row1, Rows13], + Row2 = {view_row, #{id => id2, key => <<"key2">>}}, + Rows21 = {view_row, #{id => id1, key => <<"key1">>}}, + Rows23 = {view_row, #{id => id2, key => <<"key2">>}}, + Rows2 = [Rows23, Rows21], + Rows4 = [Rows23, Rows21, Row2], + Keys1 = dict:from_list([{<<"key1">>, id1}, {<<"key2">>, id2}]), + Keys2 = dict:from_list([]), + ?assertEqual({Rows3, Keys1}, merge_row(fwd, <<"collation">>, Keys1, Row1, Rows1)), + ?assertEqual({Rows4, Keys1}, merge_row(rev, <<"collation">>, Keys1, Row2, Rows2)), + ?assertEqual({Rows1, Keys2}, merge_row(fwd, <<"collation">>, Keys2, Row1, Rows1)), + ?assertEqual({Rows4, Keys1}, merge_row(fwd, <<"raw">>, Keys1, Row2, Rows2)). + +-endif. diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index d4d17d5e1ee..f32cd0ce5e1 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -123,6 +123,16 @@ go2(DbName, Workers, {red, {_, Lang, View}, _} = VInfo, Args, Callback, Acc0) -> end end. +handle_row(Row0, {Worker, _} = Source, State) -> + #collector{counters = Counters0, rows = Rows0} = State, + true = fabric_dict:is_key(Worker, Counters0), + Row = fabric_view_row:set_worker(Row0, Source), + Key = fabric_view_row:get_key(Row), + Rows = dict:append(Key, Row, Rows0), + C1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows = Rows, counters = C1}, + fabric_view:maybe_send_row(State1). + handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> fabric_view:check_down_shards(State, NodeRef); handle_message({rexi_EXIT, Reason}, Worker, State) -> @@ -165,13 +175,10 @@ handle_message({meta, Meta0}, {Worker, From}, State) -> user_acc = Acc }} end; -handle_message(#view_row{key = Key} = Row, {Worker, From}, State) -> - #collector{counters = Counters0, rows = Rows0} = State, - true = fabric_dict:is_key(Worker, Counters0), - Rows = dict:append(Key, Row#view_row{worker = {Worker, From}}, Rows0), - C1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows = Rows, counters = C1}, - fabric_view:maybe_send_row(State1); +handle_message(#view_row{} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); +handle_message({view_row, #{}} = Row, {_, _} = Source, State) -> + handle_row(Row, Source, State); handle_message(complete, Worker, #collector{counters = Counters0} = State) -> true = fabric_dict:is_key(Worker, Counters0), C1 = fabric_dict:update_counter(Worker, 1, Counters0), @@ -183,3 +190,132 @@ handle_message(insufficient_storage, _Worker, State) -> os_proc_needed(<<"_", _/binary>>) -> false; os_proc_needed(_) -> true. + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +handle_message_test_() -> + { + foreach, + fun() -> + meck:new(foo, [non_strict]), + meck:new(fabric_view) + end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_handle_message_rexi_down), + ?TDEF_FE(t_handle_message_rexi_exit), + ?TDEF_FE(t_handle_message_meta_zero), + ?TDEF_FE(t_handle_message_meta), + ?TDEF_FE(t_handle_message_row), + ?TDEF_FE(t_handle_message_complete), + ?TDEF_FE(t_handle_message_ddoc_updated), + ?TDEF_FE(t_handle_message_insufficient_storage) + ] + }. + +t_handle_message_rexi_down(_) -> + Message = {rexi_DOWN, undefined, {undefined, node}, undefined}, + meck:expect(fabric_view, check_down_shards, [state, node], meck:val(fabric_view_result)), + ?assertEqual(fabric_view_result, handle_message(Message, source, state)). + +t_handle_message_rexi_exit(_) -> + Message = {rexi_EXIT, reason}, + meck:expect( + fabric_view, handle_worker_exit, [state, source, reason], meck:val(fabric_view_result) + ), + ?assertEqual(fabric_view_result, handle_message(Message, source, state)). + +t_handle_message_meta_zero(_) -> + Meta = [{total, 3}, {offset, 2}, {update_seq, 1}], + Worker = {worker1, from}, + Counters1 = [{worker1, 0}, {worker2, 0}], + Counters2 = [{worker1, 1}, {worker2, 0}], + State1 = #collector{counters = Counters1, update_seq = nil}, + State2 = #collector{counters = Counters2, update_seq = nil}, + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({ok, State2}, handle_message({meta, Meta}, Worker, State1)). + +t_handle_message_meta(_) -> + Meta1 = [{update_seq, seq}], + Meta2 = [{update_seq, packed_seq}], + Meta3 = [], + Worker = {worker1, from}, + Counters1 = [{worker1, 0}, {worker2, 3}, {worker3, 5}], + Counters2 = [{worker1, 0}, {worker2, 2}, {worker3, 4}], + State1 = #collector{ + counters = Counters1, + update_seq = [], + callback = fun foo:bar/2, + user_acc = accumulator1 + }, + State2 = #collector{ + counters = Counters1, + update_seq = nil, + callback = fun foo:bar/2, + user_acc = accumulator2 + }, + State3 = #collector{ + counters = Counters2, + update_seq = [], + callback = fun foo:bar/2, + user_acc = updated_accumulator1 + }, + State4 = #collector{ + counters = Counters2, + update_seq = nil, + callback = fun foo:bar/2, + user_acc = updated_accumulator2 + }, + meck:expect( + foo, + bar, + [ + {[{meta, Meta2}, accumulator1], meck:val({go1, updated_accumulator1})}, + {[{meta, Meta3}, accumulator2], meck:val({go2, updated_accumulator2})} + ] + ), + meck:expect(fabric_view_changes, pack_seqs, [[{worker1, seq}]], meck:val(packed_seq)), + meck:expect(rexi, stream_ack, [from], meck:val(ok)), + ?assertEqual({go1, State3}, handle_message({meta, Meta1}, Worker, State1)), + ?assertEqual({go2, State4}, handle_message({meta, Meta1}, Worker, State2)). + +t_handle_message_row(_) -> + Worker = {worker, from}, + Counters1 = [{worker, 3}], + Counters2 = [{worker, 4}], + Row1 = #view_row{key = key1}, + Row11 = #view_row{key = key1, worker = Worker}, + Rows1 = dict:from_list([{key1, []}, {key2, []}, {key3, []}]), + Rows3 = dict:from_list([{key1, [Row11]}, {key2, []}, {key3, []}]), + Row2 = {view_row, #{key => key1}}, + Row21 = {view_row, #{key => key1, worker => Worker}}, + Rows2 = dict:from_list([{key1, []}, {key2, []}, {key3, []}]), + Rows4 = dict:from_list([{key1, [Row21]}, {key2, []}, {key3, []}]), + State1 = #collector{counters = Counters1, rows = Rows1}, + State2 = #collector{counters = Counters1, rows = Rows2}, + State3 = #collector{counters = Counters2, rows = Rows3}, + State4 = #collector{counters = Counters2, rows = Rows4}, + meck:expect(fabric_view, maybe_send_row, [ + {[State3], meck:val(maybe_row1)}, {[State4], meck:val(maybe_row2)} + ]), + ?assertEqual(maybe_row1, handle_message(Row1, Worker, State1)), + ?assertEqual(maybe_row2, handle_message(Row2, Worker, State2)). + +t_handle_message_complete(_) -> + Worker = worker, + Counters1 = [{Worker, 6}], + Counters2 = [{Worker, 7}], + State1 = #collector{counters = Counters1}, + State2 = #collector{counters = Counters2}, + meck:expect(fabric_view, maybe_send_row, [State2], meck:val(maybe_row)), + ?assertEqual(maybe_row, handle_message(complete, Worker, State1)). + +t_handle_message_ddoc_updated(_) -> + ?assertEqual({stop, state}, handle_message(ddoc_updated, source, state)). + +t_handle_message_insufficient_storage(_) -> + ?assertEqual({stop, state}, handle_message(insufficient_storage, source, state)). + +-endif. diff --git a/src/fabric/src/fabric_view_row.erl b/src/fabric/src/fabric_view_row.erl new file mode 100644 index 00000000000..8a313a11d3f --- /dev/null +++ b/src/fabric/src/fabric_view_row.erl @@ -0,0 +1,355 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_view_row). +-export([ + from_props/2, + get_id/1, + get_key/1, + get_value/1, + get_doc/1, + get_worker/1, + set_key/2, + set_doc/2, + set_worker/2, + transform/1 +]). + +-include_lib("fabric/include/fabric.hrl"). + +from_props(Props, Options) -> + case couch_util:get_value(view_row_map, Options, false) of + true -> + Row = maps:from_list(Props), + {view_row, Row}; + false -> + #view_row{ + key = couch_util:get_value(key, Props), + id = couch_util:get_value(id, Props), + value = couch_util:get_value(value, Props), + doc = couch_util:get_value(doc, Props), + worker = couch_util:get_value(worker, Props) + } + end. + +get_id(#view_row{id = Id}) -> + Id; +get_id({view_row, #{id := Id}}) -> + Id; +get_id({view_row, #{}}) -> + undefined. + +get_key(#view_row{key = Key}) -> + Key; +get_key({view_row, #{key := Key}}) -> + Key; +get_key({view_row, #{}}) -> + undefined. + +set_key(#view_row{} = Row, Key) -> + Row#view_row{key = Key}; +set_key({view_row, #{} = Row}, Key) -> + {view_row, Row#{key => Key}}. + +get_value(#view_row{value = Value}) -> + Value; +get_value({view_row, #{value := Value}}) -> + Value; +get_value({view_row, #{}}) -> + undefined. + +get_doc(#view_row{doc = Doc}) -> + Doc; +get_doc({view_row, #{doc := Doc}}) -> + Doc; +get_doc({view_row, #{}}) -> + undefined. + +set_doc(#view_row{} = Row, Doc) -> + Row#view_row{doc = Doc}; +set_doc({view_row, #{} = Row}, Doc) -> + {view_row, Row#{doc => Doc}}. + +get_worker(#view_row{worker = Worker}) -> + Worker; +get_worker({view_row, #{worker := Worker}}) -> + Worker; +get_worker({view_row, #{}}) -> + undefined. + +set_worker(#view_row{} = Row, Worker) -> + Row#view_row{worker = Worker}; +set_worker({view_row, #{} = Row}, Worker) -> + {view_row, Row#{worker => Worker}}. + +transform(#view_row{value = {[{reduce_overflow_error, Msg}]}}) -> + {row, [{key, null}, {id, error}, {value, reduce_overflow_error}, {reason, Msg}]}; +transform(#view_row{key = Key, id = reduced, value = Value}) -> + {row, [{key, Key}, {value, Value}]}; +transform(#view_row{key = Key, id = undefined}) -> + {row, [{key, Key}, {id, error}, {value, not_found}]}; +transform(#view_row{key = Key, id = Id, value = Value, doc = undefined}) -> + {row, [{id, Id}, {key, Key}, {value, Value}]}; +transform(#view_row{key = Key, doc = {error, Reason}}) -> + {row, [{id, error}, {key, Key}, {value, Reason}]}; +transform(#view_row{key = Key, id = Id, value = Value, doc = Doc}) -> + {row, [{id, Id}, {key, Key}, {value, Value}, {doc, Doc}]}; +transform({view_row, #{} = Row0}) -> + Id = maps:get(id, Row0, undefined), + Key = maps:get(key, Row0, undefined), + Value = maps:get(value, Row0, undefined), + Doc = maps:get(doc, Row0, undefined), + Worker = maps:get(worker, Row0, undefined), + Row = #view_row{id = Id, key = Key, value = Value, doc = Doc, worker = Worker}, + transform(Row). + +-ifdef(TEST). +-include_lib("couch/include/couch_eunit.hrl"). + +from_props_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_from_props_record), + ?TDEF_FE(t_from_props_map_empty), + ?TDEF_FE(t_from_props_map) + ] + }. + +t_from_props_record(_) -> + Options1 = [], + Options2 = [{view_row_map, false}], + Props = [{id, id}, {key, key}, {value, value}], + ViewRow = #view_row{id = id, key = key, value = value, doc = undefined, worker = undefined}, + ?assertEqual(ViewRow, from_props(Props, Options1)), + ?assertEqual(ViewRow, from_props(Props, Options2)). + +t_from_props_map_empty(_) -> + Options = [{view_row_map, true}], + Props = [], + ViewRow = {view_row, #{}}, + ?assertEqual(ViewRow, from_props(Props, Options)). + +t_from_props_map(_) -> + Options = [{view_row_map, true}], + Props = [{id, id}, {key, key}, {doc, doc}], + ViewRow = {view_row, #{id => id, key => key, doc => doc}}, + ?assertEqual(ViewRow, from_props(Props, Options)). + +getter_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_get_id_record), + ?TDEF_FE(t_get_key_record), + ?TDEF_FE(t_get_value_record), + ?TDEF_FE(t_get_doc_record), + ?TDEF_FE(t_get_worker_record), + ?TDEF_FE(t_get_id_map), + ?TDEF_FE(t_get_key_map), + ?TDEF_FE(t_get_value_map), + ?TDEF_FE(t_get_doc_map), + ?TDEF_FE(t_get_worker_map) + ] + }. + +t_get_id_record(_) -> + ViewRow1 = #view_row{}, + ?assertEqual(undefined, get_id(ViewRow1)), + ViewRow2 = #view_row{id = id}, + ?assertEqual(id, get_id(ViewRow2)). + +t_get_id_map(_) -> + ViewRow1 = {view_row, #{}}, + ?assertEqual(undefined, get_id(ViewRow1)), + ViewRow2 = {view_row, #{id => id}}, + ?assertEqual(id, get_id(ViewRow2)). + +t_get_key_record(_) -> + ViewRow1 = #view_row{}, + ?assertEqual(undefined, get_key(ViewRow1)), + ViewRow2 = #view_row{key = key}, + ?assertEqual(key, get_key(ViewRow2)). + +t_get_key_map(_) -> + ViewRow1 = {view_row, #{}}, + ?assertEqual(undefined, get_key(ViewRow1)), + ViewRow2 = {view_row, #{key => key}}, + ?assertEqual(key, get_key(ViewRow2)). + +t_get_value_record(_) -> + ViewRow1 = #view_row{}, + ?assertEqual(undefined, get_value(ViewRow1)), + ViewRow2 = #view_row{value = value}, + ?assertEqual(value, get_value(ViewRow2)). + +t_get_value_map(_) -> + ViewRow1 = {view_row, #{}}, + ?assertEqual(undefined, get_value(ViewRow1)), + ViewRow2 = {view_row, #{value => value}}, + ?assertEqual(value, get_value(ViewRow2)). + +t_get_doc_record(_) -> + ViewRow1 = #view_row{}, + ?assertEqual(undefined, get_doc(ViewRow1)), + ViewRow2 = #view_row{doc = doc}, + ?assertEqual(doc, get_doc(ViewRow2)). + +t_get_doc_map(_) -> + ViewRow1 = {view_row, #{}}, + ?assertEqual(undefined, get_doc(ViewRow1)), + ViewRow2 = {view_row, #{doc => doc}}, + ?assertEqual(doc, get_doc(ViewRow2)). + +t_get_worker_record(_) -> + ViewRow1 = #view_row{}, + ?assertEqual(undefined, get_worker(ViewRow1)), + ViewRow2 = #view_row{worker = worker}, + ?assertEqual(worker, get_worker(ViewRow2)). + +t_get_worker_map(_) -> + ViewRow1 = {view_row, #{}}, + ?assertEqual(undefined, get_worker(ViewRow1)), + ViewRow2 = {view_row, #{worker => worker}}, + ?assertEqual(worker, get_worker(ViewRow2)). + +setter_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_set_key_record), + ?TDEF_FE(t_set_doc_record), + ?TDEF_FE(t_set_worker_record), + ?TDEF_FE(t_set_key_map), + ?TDEF_FE(t_set_doc_map), + ?TDEF_FE(t_set_worker_map) + ] + }. + +t_set_key_record(_) -> + ViewRow1 = #view_row{key = key}, + ViewRow2 = #view_row{key = updated_key}, + ?assertEqual(ViewRow2, set_key(ViewRow1, updated_key)). + +t_set_key_map(_) -> + ViewRow1 = {view_row, #{key => key}}, + ViewRow2 = {view_row, #{key => updated_key}}, + ?assertEqual(ViewRow2, set_key(ViewRow1, updated_key)). + +t_set_doc_record(_) -> + ViewRow1 = #view_row{doc = doc}, + ViewRow2 = #view_row{doc = updated_doc}, + ?assertEqual(ViewRow2, set_doc(ViewRow1, updated_doc)). + +t_set_doc_map(_) -> + ViewRow1 = {view_row, #{doc => doc}}, + ViewRow2 = {view_row, #{doc => updated_doc}}, + ?assertEqual(ViewRow2, set_doc(ViewRow1, updated_doc)). + +t_set_worker_record(_) -> + ViewRow1 = #view_row{worker = worker}, + ViewRow2 = #view_row{worker = updated_worker}, + ?assertEqual(ViewRow2, set_worker(ViewRow1, updated_worker)). + +t_set_worker_map(_) -> + ViewRow1 = {view_row, #{worker => worker}}, + ViewRow2 = {view_row, #{worker => updated_worker}}, + ?assertEqual(ViewRow2, set_worker(ViewRow1, updated_worker)). + +transform_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> ok end, + [ + ?TDEF_FE(t_transform_record_reduce_overflow_error), + ?TDEF_FE(t_transform_record_reduced), + ?TDEF_FE(t_transform_record_id_undefined), + ?TDEF_FE(t_transform_record_doc_undefined), + ?TDEF_FE(t_transform_record_doc_error), + ?TDEF_FE(t_transform_record), + ?TDEF_FE(t_transform_map_reduce_overflow_error), + ?TDEF_FE(t_transform_map_reduced), + ?TDEF_FE(t_transform_map_id_undefined), + ?TDEF_FE(t_transform_map_doc_undefined), + ?TDEF_FE(t_transform_map_doc_error), + ?TDEF_FE(t_transform_map) + ] + }. + +t_transform_record_reduce_overflow_error(_) -> + ViewRow = #view_row{value = {[{reduce_overflow_error, reason}]}}, + Props = {row, [{key, null}, {id, error}, {value, reduce_overflow_error}, {reason, reason}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_map_reduce_overflow_error(_) -> + ViewRow = {view_row, #{value => {[{reduce_overflow_error, reason}]}}}, + Props = {row, [{key, null}, {id, error}, {value, reduce_overflow_error}, {reason, reason}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_record_reduced(_) -> + ViewRow = #view_row{key = key, id = reduced, value = value}, + Props = {row, [{key, key}, {value, value}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_map_reduced(_) -> + ViewRow = {view_row, #{key => key, id => reduced, value => value}}, + Props = {row, [{key, key}, {value, value}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_record_id_undefined(_) -> + ViewRow = #view_row{key = key, id = undefined}, + Props = {row, [{key, key}, {id, error}, {value, not_found}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_map_id_undefined(_) -> + ViewRow = {view_row, #{key => key, id => undefined}}, + Props = {row, [{key, key}, {id, error}, {value, not_found}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_record_doc_undefined(_) -> + ViewRow = #view_row{key = key, id = id, value = value, doc = undefined}, + Props = {row, [{id, id}, {key, key}, {value, value}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_map_doc_undefined(_) -> + ViewRow = {view_row, #{key => key, id => id, value => value, doc => undefined}}, + Props = {row, [{id, id}, {key, key}, {value, value}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_record_doc_error(_) -> + ViewRow = #view_row{key = key, id = id, doc = {error, reason}}, + Props = {row, [{id, error}, {key, key}, {value, reason}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_map_doc_error(_) -> + ViewRow = {view_row, #{key => key, id => id, doc => {error, reason}}}, + Props = {row, [{id, error}, {key, key}, {value, reason}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_record(_) -> + ViewRow = #view_row{key = key, id = id, value = value, doc = doc}, + Props = {row, [{id, id}, {key, key}, {value, value}, {doc, doc}]}, + ?assertEqual(Props, transform(ViewRow)). + +t_transform_map(_) -> + ViewRow = {view_row, #{key => key, id => id, value => value, doc => doc}}, + Props = {row, [{id, id}, {key, key}, {value, value}, {doc, doc}]}, + ?assertEqual(Props, transform(ViewRow)). + +-endif. diff --git a/src/fabric/test/eunit/fabric_tests.erl b/src/fabric/test/eunit/fabric_tests.erl index 76400a9febb..da9a7efde95 100644 --- a/src/fabric/test/eunit/fabric_tests.erl +++ b/src/fabric/test/eunit/fabric_tests.erl @@ -14,6 +14,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). cleanup_index_files_test_() -> { @@ -238,3 +239,67 @@ delete_ddoc(DbName, DDocId) -> {ok, DDoc0} = fabric:open_doc(DbName, DDocId, [?ADMIN_CTX]), DDoc = DDoc0#doc{deleted = true, body = {[]}}, fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]). + +design_docs_test_() -> + { + foreach, + fun() -> ok end, + fun(_) -> meck:unload() end, + [ + ?TDEF_FE(t_design_docs_configuration), + ?TDEF_FE(t_design_docs_configuration_io_priority) + ] + }. + +t_design_docs_configuration(_) -> + DbName = <<"db">>, + AdminCtx = [?ADMIN_CTX], + QueryArgs = + #mrargs{ + include_docs = true, + extra = [{namespace, <<"_design">>}, {view_row_map, true}] + }, + meck:expect( + fabric, all_docs, [DbName, AdminCtx, '_', [], QueryArgs], meck:val(all_docs_result) + ), + ?assertEqual(all_docs_result, fabric:design_docs(DbName)). + +t_design_docs_configuration_io_priority(_) -> + DbName = <<"db">>, + AdminCtx = [?ADMIN_CTX], + QueryArgs = + #mrargs{ + include_docs = true, + extra = [{namespace, <<"_design">>}, {io_priority, io_priority}, {view_row_map, true}] + }, + meck:expect( + fabric, all_docs, [DbName, AdminCtx, '_', [], QueryArgs], meck:val(all_docs_result) + ), + put(io_priority, io_priority), + ?assertEqual(all_docs_result, fabric:design_docs(DbName)). + +query_view_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_query_view_configuration) + ] + }. + +t_query_view_configuration({_Ctx, DbName}) -> + DDocName = <<"foo">>, + ViewName = <<"bar">>, + QueryArgs = + #mrargs{ + view_type = map, + start_key_docid = <<>>, + end_key_docid = <<255>>, + extra = [{view_row_map, true}] + }, + Options = [], + Accumulator = [], + Parameters = [DbName, Options, '_', ViewName, QueryArgs, '_', Accumulator, '_'], + meck:expect(fabric_view_map, go, Parameters, meck:val(fabric_view_map_results)), + ?assertEqual(fabric_view_map_results, fabric:query_view(DbName, DDocName, ViewName)). diff --git a/src/mango/src/mango.hrl b/src/mango/src/mango.hrl index 07212bc6640..6109bb08103 100644 --- a/src/mango/src/mango.hrl +++ b/src/mango/src/mango.hrl @@ -53,9 +53,6 @@ keys_examined => non_neg_integer() }. --type row_property_key() :: id | key | value | doc. --type row_properties() :: [{row_property_key(), any()}]. - -type reason() :: needs_text_search | field_mismatch | sort_order_mismatch diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl index ac8714aa750..1f394308381 100644 --- a/src/mango/src/mango_cursor_view.erl +++ b/src/mango/src/mango_cursor_view.erl @@ -202,10 +202,12 @@ base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) - {ignore_partition_query_limit, true}, - % Request execution statistics in a map. The purpose of this option is - % to maintain interoperability on version upgrades. - % TODO remove this option in a later version. - {execution_stats_map, true} + % The purpose of the following options is to maintain + % interoperability on version upgrades: + % - Return execution statistics in a map + {execution_stats_map, true}, + % - Return view rows in a map + {view_row_map, true} ] }. @@ -350,14 +352,10 @@ view_cb({meta, Meta}, Acc) -> set_mango_msg_timestamp(), ok = rexi:stream2({meta, Meta}), {ok, Acc}; -view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> +view_cb({row, Props}, #mrargs{extra = Options} = Acc) -> mango_execution_stats:shard_incr_keys_examined(), couch_stats:increment_counter([mango, keys_examined]), - ViewRow = #view_row{ - id = couch_util:get_value(id, Row), - key = couch_util:get_value(key, Row), - doc = couch_util:get_value(doc, Row) - }, + ViewRow = fabric_view_row:from_props(Props, Options), % This supports receiving our "arguments" either as just the `selector` % or in the new record in `callback_args`. This is to support mid-upgrade % clusters where the non-upgraded coordinator nodes will send the older style. @@ -383,14 +381,15 @@ view_cb({row, Row}, #mrargs{extra = Options} = Acc) -> % However, this oddness is confined to being visible in this module. case match_and_extract_doc(Doc, Selector, Fields) of {match, FinalDoc} -> - FinalViewRow = ViewRow#view_row{doc = FinalDoc}, - ok = rexi:stream2(FinalViewRow), + ViewRow1 = fabric_view_row:set_doc(ViewRow, FinalDoc), + ok = rexi:stream2(ViewRow1), set_mango_msg_timestamp(); {no_match, undefined} -> maybe_send_mango_ping() end end, - case {ViewRow#view_row.doc, CoveringIndex} of + ViewRowDoc = fabric_view_row:get_doc(ViewRow), + case {ViewRowDoc, CoveringIndex} of {null, _} -> maybe_send_mango_ping(); {undefined, Index = #idx{}} -> @@ -440,14 +439,15 @@ match_and_extract_doc(Doc, Selector, Fields) -> {no_match, undefined} end. --spec derive_doc_from_index(#idx{}, #view_row{}) -> term(). -derive_doc_from_index(Index, #view_row{id = DocId, key = KeyData}) -> +-spec derive_doc_from_index(#idx{}, view_row()) -> term(). +derive_doc_from_index(Index, Row) -> Keys = - case KeyData of + case fabric_view_row:get_key(Row) of {p, _Partition, KeyValues} -> KeyValues; KeyValues -> KeyValues end, Columns = mango_idx:columns(Index), + DocId = fabric_view_row:get_id(Row), lists:foldr( fun({Column, Key}, Doc) -> mango_doc:set_field(Doc, Column, Key) end, mango_doc:set_field({[]}, <<"_id">>, DocId), @@ -747,7 +747,8 @@ base_opts_test() -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {view_row_map, true} ], MRArgs = #mrargs{ @@ -869,8 +870,10 @@ derive_doc_from_index_test() -> }, DocId = doc_id, Keys = [key1, key2], - ViewRow = #view_row{id = DocId, key = Keys}, + ViewRowOld = #view_row{id = DocId, key = Keys}, + ViewRow = {view_row, #{id => DocId, key => Keys}}, Doc = {[{<<"_id">>, DocId}, {<<"field2">>, key2}, {<<"field1">>, key1}]}, + ?assertEqual(Doc, derive_doc_from_index(Index, ViewRowOld)), ?assertEqual(Doc, derive_doc_from_index(Index, ViewRow)). derive_doc_from_index_partitioned_test() -> @@ -881,8 +884,10 @@ derive_doc_from_index_partitioned_test() -> }, DocId = doc_id, Keys = [key1, key2], - ViewRow = #view_row{id = DocId, key = {p, partition, Keys}}, + ViewRowOld = #view_row{id = DocId, key = {p, partition, Keys}}, + ViewRow = {view_row, #{id => DocId, key => {p, partition, Keys}}}, Doc = {[{<<"_id">>, DocId}, {<<"field2">>, key2}, {<<"field1">>, key1}]}, + ?assertEqual(Doc, derive_doc_from_index(Index, ViewRowOld)), ?assertEqual(Doc, derive_doc_from_index(Index, ViewRow)). composite_indexes_test() -> @@ -1087,7 +1092,8 @@ t_execute_ok_all_docs(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {view_row_map, true} ], Args = #mrargs{ @@ -1173,7 +1179,8 @@ t_execute_ok_query_view(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {view_row_map, true} ], Args = #mrargs{ @@ -1271,7 +1278,8 @@ t_execute_ok_all_docs_with_execution_stats(_) -> covering_index => undefined }}, {ignore_partition_query_limit, true}, - {execution_stats_map, true} + {execution_stats_map, true}, + {view_row_map, true} ], Args = #mrargs{ @@ -1360,7 +1368,9 @@ view_cb_test_() -> ?TDEF_FE(t_view_cb_row_missing_doc_triggers_quorum_fetch), ?TDEF_FE(t_view_cb_row_matching_covered_doc), ?TDEF_FE(t_view_cb_row_non_matching_covered_doc), - ?TDEF_FE(t_view_cb_row_backwards_compatible), + ?TDEF_FE(t_view_cb_row_backwards_compatible_callback_args), + ?TDEF_FE(t_view_cb_row_backwards_compatible_view_row_standard), + ?TDEF_FE(t_view_cb_row_backwards_compatible_view_row_quorum_fetch), ?TDEF_FE(t_view_cb_complete_shard_stats_v1), ?TDEF_FE(t_view_cb_complete_shard_stats_v2), ?TDEF_FE(t_view_cb_ok) @@ -1374,8 +1384,8 @@ t_view_cb_meta(_) -> t_view_cb_row_matching_regular_doc(_) -> Row = [{id, id}, {key, key}, {doc, doc}], - Result = #view_row{id = id, key = key, doc = doc}, - meck:expect(rexi, stream2, [Result], meck:val(ok)), + ViewRow = {view_row, #{id => id, key => key, doc => doc}}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), Accumulator = #mrargs{ extra = [ @@ -1383,7 +1393,8 @@ t_view_cb_row_matching_regular_doc(_) -> selector => {[]}, fields => all_fields, covering_index => undefined - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1401,7 +1412,8 @@ t_view_cb_row_non_matching_regular_doc(_) -> selector => {[{<<"field">>, {[{<<"$exists">>, true}]}}]}, fields => all_fields, covering_index => undefined - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1419,7 +1431,8 @@ t_view_cb_row_null_doc(_) -> selector => {[]}, fields => all_fields, covering_index => undefined - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1429,7 +1442,7 @@ t_view_cb_row_null_doc(_) -> t_view_cb_row_missing_doc_triggers_quorum_fetch(_) -> Row = [{id, id}, {key, key}, {doc, undefined}], - ViewRow = #view_row{id = id, key = key, doc = undefined}, + ViewRow = {view_row, #{id => id, key => key}}, meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), Accumulator = #mrargs{ @@ -1438,7 +1451,8 @@ t_view_cb_row_missing_doc_triggers_quorum_fetch(_) -> selector => {[]}, fields => all_fields, covering_index => undefined - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1449,14 +1463,14 @@ t_view_cb_row_matching_covered_doc(_) -> Keys = [key1, key2], Row = [{id, id}, {key, Keys}, {doc, undefined}], Doc = {[{<<"field1">>, key1}, {<<"field2">>, key2}]}, - Result = #view_row{id = id, key = Keys, doc = Doc}, + ViewRow = {view_row, #{id => id, key => Keys, doc => Doc}}, Fields = [<<"field1">>, <<"field2">>], Index = #idx{ type = <<"json">>, def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]} }, - meck:expect(rexi, stream2, [Result], meck:val(ok)), + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), Accumulator = #mrargs{ extra = [ @@ -1464,7 +1478,8 @@ t_view_cb_row_matching_covered_doc(_) -> selector => {[]}, fields => Fields, covering_index => Index - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1487,7 +1502,8 @@ t_view_cb_row_non_matching_covered_doc(_) -> selector => {[{<<"field">>, {[{<<"$exists">>, true}]}}]}, fields => Fields, covering_index => Index - }} + }}, + {view_row_map, true} ] }, mango_execution_stats:shard_init(), @@ -1495,7 +1511,7 @@ t_view_cb_row_non_matching_covered_doc(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). -t_view_cb_row_backwards_compatible(_) -> +t_view_cb_row_backwards_compatible_callback_args(_) -> Row = [{id, id}, {key, key}, {doc, null}], meck:expect(rexi, stream2, ['_'], undefined), Accumulator = #mrargs{extra = [{selector, {[]}}]}, @@ -1504,6 +1520,24 @@ t_view_cb_row_backwards_compatible(_) -> ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), ?assertNot(meck:called(rexi, stream2, '_')). +t_view_cb_row_backwards_compatible_view_row_standard(_) -> + Row = [{id, id}, {key, key}, {doc, doc}], + ViewRow = #view_row{id = id, key = key, doc = doc}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), + Accumulator = #mrargs{extra = [{selector, {[]}}]}, + mango_execution_stats:shard_init(), + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:called(rexi, stream2, '_')). + +t_view_cb_row_backwards_compatible_view_row_quorum_fetch(_) -> + Row = [{id, id}, {key, key}, {doc, undefined}], + ViewRow = #view_row{id = id, key = key, doc = undefined}, + meck:expect(rexi, stream2, [ViewRow], meck:val(ok)), + Accumulator = #mrargs{extra = [{selector, {[]}}]}, + mango_execution_stats:shard_init(), + ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)), + ?assert(meck:called(rexi, stream2, '_')). + t_view_cb_complete_shard_stats_v1(_) -> meck:expect(rexi, stream2, [{execution_stats, {docs_examined, '_'}}], meck:val(ok)), meck:expect(rexi, stream_last, [complete], meck:val(ok)),