14
14
15
15
-export ([go /1 ]).
16
16
17
- -include_lib (" fabric/include/fabric.hrl" ).
18
17
-include_lib (" mem3/include/mem3.hrl" ).
19
18
20
19
-record (pacc , {
@@ -30,13 +29,13 @@ go(DbName) ->
30
29
Fun = fun handle_message /3 ,
31
30
Acc0 = # pacc {
32
31
counters = fabric_dict :init (Workers , nil ),
33
- replies = [] ,
32
+ replies = sets : new ([{ version , 2 }]) ,
34
33
ring_opts = [{any , Shards }]
35
34
},
36
35
try
37
36
case fabric_util :recv (Workers , # shard .ref , Fun , Acc0 ) of
38
37
{ok , Res } ->
39
- {ok , Res };
38
+ {ok , sets : to_list ( Res ) };
40
39
{timeout , {WorkersDict , _ }} ->
41
40
DefunctWorkers = fabric_util :remove_done_workers (WorkersDict , nil ),
42
41
fabric_util :log_timeout (DefunctWorkers , " get_purged_infos" ),
@@ -67,13 +66,96 @@ handle_message({rexi_EXIT, Reason}, Shard, #pacc{} = Acc) ->
67
66
end ;
68
67
handle_message ({ok , Info }, # shard {} = Shard , # pacc {} = Acc ) ->
69
68
# pacc {counters = Counters , replies = Replies } = Acc ,
70
- Replies1 = [Info | Replies ],
69
+ InfoSet = sets :from_list (Info , [{version , 2 }]),
70
+ Replies1 = sets :union (InfoSet , Replies ),
71
71
Counters1 = fabric_dict :erase (Shard , Counters ),
72
72
case fabric_dict :size (Counters1 ) =:= 0 of
73
73
true ->
74
- {stop , lists : flatten ( Replies1 ) };
74
+ {stop , Replies1 };
75
75
false ->
76
76
{ok , Acc # pacc {counters = Counters1 , replies = Replies1 }}
77
77
end ;
78
78
handle_message (_ , _ , # pacc {} = Acc ) ->
79
79
{ok , Acc }.
80
+
81
+ -ifdef (TEST ).
82
+ -include_lib (" couch/include/couch_eunit.hrl" ).
83
+
84
+ make_shards () ->
85
+ S1 = # shard {node = n1 , range = [1 , 2 ], ref = make_ref ()},
86
+ S2 = # shard {node = n2 , range = [1 , 2 ], ref = make_ref ()},
87
+ S3 = # shard {node = n3 , range = [1 , 2 ], ref = make_ref ()},
88
+ S4 = # shard {node = n1 , range = [3 , 4 ], ref = make_ref ()},
89
+ S5 = # shard {node = n2 , range = [3 , 4 ], ref = make_ref ()},
90
+ [S1 , S2 , S3 , S4 , S5 ].
91
+
92
+ init_acc (Shards ) ->
93
+ # pacc {
94
+ counters = fabric_dict :init (Shards , nil ),
95
+ replies = sets :new ([{version , 2 }]),
96
+ ring_opts = [{any , Shards }]
97
+ }.
98
+
99
+ first_result_ok_test () ->
100
+ Shards = [S1 | _ ] = make_shards (),
101
+ Acc0 = init_acc (Shards ),
102
+ DocA = <<" a" >>,
103
+ RevsA = [<<" 1" >>, <<" 2" >>],
104
+
105
+ {ok , Acc1 } = handle_message ({ok , [{DocA , RevsA }]}, S1 , Acc0 ),
106
+ ? assertEqual ([{DocA , RevsA }], sets :to_list (Acc1 # pacc .replies )).
107
+
108
+ result_duplicate_test () ->
109
+ Shards = [S1 , S2 | _ ] = make_shards (),
110
+ Acc0 = init_acc (Shards ),
111
+ DocA = <<" a" >>,
112
+ RevsA1 = [<<" 1" >>, <<" 2" >>],
113
+
114
+ {ok , Acc1 } = handle_message ({ok , [{DocA , RevsA1 }]}, S1 , Acc0 ),
115
+ ? assertEqual ([{DocA , RevsA1 }], sets :to_list (Acc1 # pacc .replies )),
116
+
117
+ {ok , Acc2 } = handle_message ({ok , [{DocA , RevsA1 }]}, S2 , Acc1 ),
118
+ ? assertEqual ([{DocA , RevsA1 }], sets :to_list (Acc2 # pacc .replies )).
119
+
120
+ result_union_existing_test () ->
121
+ Shards = [S1 , S2 | _ ] = make_shards (),
122
+ Acc0 = init_acc (Shards ),
123
+ DocA = <<" a" >>,
124
+ RevsA1 = [<<" 1" >>, <<" 2" >>],
125
+ RevsA2 = [<<" 3" >>],
126
+
127
+ {ok , Acc1 } = handle_message ({ok , [{DocA , RevsA1 }]}, S1 , Acc0 ),
128
+ ? assertEqual ([{DocA , RevsA1 }], sets :to_list (Acc1 # pacc .replies )),
129
+
130
+ {ok , Acc2 } = handle_message ({ok , [{DocA , RevsA2 }]}, S2 , Acc1 ),
131
+ Res2 = lists :sort (sets :to_list (Acc2 # pacc .replies )),
132
+ ? assertEqual ([{DocA , RevsA1 }, {DocA , RevsA2 }], Res2 ).
133
+
134
+ res_finalize_test () ->
135
+ Shards = [S1 , S2 , S3 , S4 , S5 ] = make_shards (),
136
+ Acc0 = init_acc (Shards ),
137
+ DocA = <<" a" >>,
138
+ RevsA1 = [<<" 1" >>, <<" 2" >>],
139
+ RevsA2 = [<<" 3" >>],
140
+ DocB = <<" b" >>,
141
+ RevsB = [<<" 5" >>, <<" 6" >>],
142
+
143
+ {ok , Acc1 } = handle_message ({ok , [{DocA , RevsA1 }]}, S1 , Acc0 ),
144
+ ? assertEqual ([{DocA , RevsA1 }], sets :to_list (Acc1 # pacc .replies )),
145
+
146
+ {ok , Acc2 } = handle_message ({ok , [{DocA , RevsA1 }]}, S2 , Acc1 ),
147
+ ? assertEqual ([{DocA , RevsA1 }], sets :to_list (Acc2 # pacc .replies )),
148
+
149
+ {ok , Acc3 } = handle_message ({ok , [{DocA , RevsA2 }]}, S3 , Acc2 ),
150
+ Res3 = lists :sort (sets :to_list (Acc3 # pacc .replies )),
151
+ ? assertEqual ([{DocA , RevsA1 }, {DocA , RevsA2 }], Res3 ),
152
+
153
+ {ok , Acc4 } = handle_message ({ok , [{DocB , RevsB }]}, S4 , Acc3 ),
154
+ Res4 = lists :sort (sets :to_list (Acc4 # pacc .replies )),
155
+ ? assertEqual ([{DocA , RevsA1 }, {DocA , RevsA2 }, {DocB , RevsB }], Res4 ),
156
+
157
+ {stop , Acc5 } = handle_message ({ok , [{DocB , RevsB }]}, S5 , Acc4 ),
158
+ Res5 = lists :sort (sets :to_list (Acc5 )),
159
+ ? assertEqual ([{DocA , RevsA1 }, {DocA , RevsA2 }, {DocB , RevsB }], Res5 ).
160
+
161
+ -endif .
0 commit comments