@@ -187,6 +187,49 @@ shards(Db, Args) ->
187
187
end ,
188
188
fabric_view :get_shards (Db , NewArgs ).
189
189
190
+ handle_row (Row0 , {Worker , _ } = Source , State ) ->
191
+ # collector {query_args = Args , counters = Counters0 , rows = Rows0 } = State ,
192
+ # mrargs {extra = Options , direction = Dir } = Args ,
193
+ Row1 =
194
+ case Row0 of
195
+ # view_row {} -> Row0 # view_row {worker = Source };
196
+ #{} -> Row0 #{worker => Source }
197
+ end ,
198
+ % It has to be ensured that rows of the same format are merged in case of
199
+ % mixed-cluster scenarios.
200
+ Row =
201
+ case couch_util :get_value (execution_stats_rolling , Options , false ) of
202
+ true ->
203
+ case Row1 of
204
+ # view_row {} ->
205
+ #{
206
+ key => Row1 # view_row .key ,
207
+ id => Row1 # view_row .id ,
208
+ value => Row1 # view_row .value ,
209
+ doc => Row1 # view_row .doc ,
210
+ worker => Row1 # view_row .worker ,
211
+ stats => #{}
212
+ };
213
+ #{} ->
214
+ Row1
215
+ end ;
216
+ false ->
217
+ case Row1 of
218
+ # view_row {} ->
219
+ Row1 ;
220
+ #{} ->
221
+ #{id := Id , key := Key } = Row1 ,
222
+ Value = maps :get (value , Row1 , null ),
223
+ Doc = maps :get (doc , Row1 , null ),
224
+ Worker = maps :get (worker , Row1 , null ),
225
+ # view_row {id = Id , key = Key , value = Value , doc = Doc , worker = Worker }
226
+ end
227
+ end ,
228
+ Rows = merge_row (Dir , Row , Rows0 ),
229
+ Counters1 = fabric_dict :update_counter (Worker , 1 , Counters0 ),
230
+ State1 = State # collector {rows = Rows , counters = Counters1 },
231
+ fabric_view :maybe_send_row (State1 ).
232
+
190
233
handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, _ , State ) ->
191
234
fabric_view :check_down_shards (State , NodeRef );
192
235
handle_message ({rexi_EXIT , Reason }, Worker , State ) ->
@@ -257,13 +300,10 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
257
300
update_seq = UpdateSeq0
258
301
}}
259
302
end ;
260
- handle_message (# view_row {} = Row , {Worker , From }, State ) ->
261
- # collector {query_args = Args , counters = Counters0 , rows = Rows0 } = State ,
262
- Dir = Args # mrargs .direction ,
263
- Rows = merge_row (Dir , Row # view_row {worker = {Worker , From }}, Rows0 ),
264
- Counters1 = fabric_dict :update_counter (Worker , 1 , Counters0 ),
265
- State1 = State # collector {rows = Rows , counters = Counters1 },
266
- fabric_view :maybe_send_row (State1 );
303
+ handle_message (# view_row {} = Row , {_ , _ } = Source , State ) ->
304
+ handle_row (Row , Source , State );
305
+ handle_message ({view_row , #{} = Row }, {_ , _ } = Source , State ) ->
306
+ handle_row (Row , Source , State );
267
307
handle_message (complete , Worker , State ) ->
268
308
Counters = fabric_dict :update_counter (Worker , 1 , State # collector .counters ),
269
309
fabric_view :maybe_send_row (State # collector {counters = Counters });
@@ -273,10 +313,24 @@ handle_message({execution_stats, _} = Msg, {_, From}, St) ->
273
313
rexi :stream_ack (From ),
274
314
{Go , St # collector {user_acc = Acc }}.
275
315
276
- merge_row (fwd , Row , Rows ) ->
316
+ insert_row (_Dir , _Key , R , []) ->
317
+ [R ];
318
+ insert_row (fwd , Key , R , [H | T ] = List ) ->
319
+ V1 = maps :get (Key , R ),
320
+ V2 = maps :get (Key , H ),
321
+ if
322
+ V1 =< V2 -> [R | List ];
323
+ true -> [H | insert_row (fwd , Key , R , T )]
324
+ end ;
325
+ insert_row (rev , Key , R , List ) ->
326
+ lists :reverse (insert_row (fwd , Key , R , lists :reverse (List ))).
327
+
328
+ merge_row (fwd , # view_row {} = Row , Rows ) ->
277
329
lists :keymerge (# view_row .id , [Row ], Rows );
278
- merge_row (rev , Row , Rows ) ->
279
- lists :rkeymerge (# view_row .id , [Row ], Rows ).
330
+ merge_row (rev , # view_row {} = Row , Rows ) ->
331
+ lists :rkeymerge (# view_row .id , [Row ], Rows );
332
+ merge_row (Dir , #{} = Row , Rows ) ->
333
+ insert_row (Dir , id , Row , Rows ).
280
334
281
335
all_docs_concurrency () ->
282
336
Value = config :get (" fabric" , " all_docs_concurrency" , " 10" ),
@@ -385,3 +439,83 @@ filter_keys_by_namespace(Keys, Namespace) when Namespace =:= <<"_local">> ->
385
439
);
386
440
filter_keys_by_namespace (Keys , _Namespace ) ->
387
441
Keys .
442
+
443
+ -ifdef (TEST ).
444
+ -include_lib (" couch/include/couch_eunit.hrl" ).
445
+
446
+ merge_row_record_fwd_test () ->
447
+ RowX1 = # view_row {id = 4 },
448
+ Row1 = # view_row {id = 1 },
449
+ Row2 = # view_row {id = 3 },
450
+ Row3 = # view_row {id = 5 },
451
+ Row4 = # view_row {id = 7 },
452
+ Rows = [Row1 , Row2 , Row3 , Row4 ],
453
+ Expected1 = [Row1 , Row2 , RowX1 , Row3 , Row4 ],
454
+ ? assertEqual (Expected1 , merge_row (fwd , RowX1 , Rows )),
455
+ RowX2 = # view_row {id = 0 },
456
+ Expected2 = [RowX2 , Row1 , Row2 , Row3 , Row4 ],
457
+ ? assertEqual (Expected2 , merge_row (fwd , RowX2 , Rows )),
458
+ RowX3 = # view_row {id = 8 },
459
+ Expected3 = [Row1 , Row2 , Row3 , Row4 , RowX3 ],
460
+ ? assertEqual (Expected3 , merge_row (fwd , RowX3 , Rows )),
461
+ RowX4 = # view_row {id = 5 },
462
+ Expected4 = [Row1 , Row2 , RowX4 , Row3 , Row4 ],
463
+ ? assertEqual (Expected4 , merge_row (fwd , RowX4 , Rows )).
464
+
465
+ merge_row_record_rev_test () ->
466
+ RowX1 = # view_row {id = 5 },
467
+ Row1 = # view_row {id = 2 },
468
+ Row2 = # view_row {id = 4 },
469
+ Row3 = # view_row {id = 6 },
470
+ Row4 = # view_row {id = 8 },
471
+ Rows = [Row4 , Row3 , Row2 , Row1 ],
472
+ Expected1 = [Row4 , Row3 , RowX1 , Row2 , Row1 ],
473
+ ? assertEqual (Expected1 , merge_row (rev , RowX1 , Rows )),
474
+ RowX2 = # view_row {id = 1 },
475
+ Expected2 = [Row4 , Row3 , Row2 , Row1 , RowX2 ],
476
+ ? assertEqual (Expected2 , merge_row (rev , RowX2 , Rows )),
477
+ RowX3 = # view_row {id = 9 },
478
+ Expected3 = [RowX3 , Row4 , Row3 , Row2 , Row1 ],
479
+ ? assertEqual (Expected3 , merge_row (rev , RowX3 , Rows )),
480
+ RowX4 = # view_row {id = 6 },
481
+ Expected4 = [Row4 , Row3 , RowX4 , Row2 , Row1 ],
482
+ ? assertEqual (Expected4 , merge_row (rev , RowX4 , Rows )).
483
+
484
+ merge_row_map_fwd_test () ->
485
+ RowX1 = #{id => 4 },
486
+ Row1 = #{id => 1 },
487
+ Row2 = #{id => 3 },
488
+ Row3 = #{id => 5 },
489
+ Row4 = #{id => 7 },
490
+ Rows = [Row1 , Row2 , Row3 , Row4 ],
491
+ Expected1 = [Row1 , Row2 , RowX1 , Row3 , Row4 ],
492
+ ? assertEqual (Expected1 , merge_row (fwd , RowX1 , Rows )),
493
+ RowX2 = #{id => 0 },
494
+ Expected2 = [RowX2 , Row1 , Row2 , Row3 , Row4 ],
495
+ ? assertEqual (Expected2 , merge_row (fwd , RowX2 , Rows )),
496
+ RowX3 = #{id => 8 },
497
+ Expected3 = [Row1 , Row2 , Row3 , Row4 , RowX3 ],
498
+ ? assertEqual (Expected3 , merge_row (fwd , RowX3 , Rows )),
499
+ RowX4 = #{id => 5 },
500
+ Expected4 = [Row1 , Row2 , RowX4 , Row3 , Row4 ],
501
+ ? assertEqual (Expected4 , merge_row (fwd , RowX4 , Rows )).
502
+
503
+ merge_row_map_rev_test () ->
504
+ RowX1 = #{id => 5 },
505
+ Row1 = #{id => 2 },
506
+ Row2 = #{id => 4 },
507
+ Row3 = #{id => 6 },
508
+ Row4 = #{id => 8 },
509
+ Rows = [Row4 , Row3 , Row2 , Row1 ],
510
+ Expected1 = [Row4 , Row3 , RowX1 , Row2 , Row1 ],
511
+ ? assertEqual (Expected1 , merge_row (rev , RowX1 , Rows )),
512
+ RowX2 = #{id => 1 },
513
+ Expected2 = [Row4 , Row3 , Row2 , Row1 , RowX2 ],
514
+ ? assertEqual (Expected2 , merge_row (rev , RowX2 , Rows )),
515
+ RowX3 = #{id => 9 },
516
+ Expected3 = [RowX3 , Row4 , Row3 , Row2 , Row1 ],
517
+ ? assertEqual (Expected3 , merge_row (rev , RowX3 , Rows )),
518
+ RowX4 = #{id => 6 },
519
+ Expected4 = [Row4 , Row3 , RowX4 , Row2 , Row1 ],
520
+ ? assertEqual (Expected4 , merge_row (rev , RowX4 , Rows )).
521
+ -endif .
0 commit comments