129129% CA3 CD3
130130%
131131
132- -export ([open /1 , open /2 , get_fd /1 , get_state /1 ]).
132+ -export ([open /1 , open /2 , set_options /2 , get_fd /1 , get_state /1 ]).
133+ -export ([get_bb_chunk_size /1 ]).
133134-export ([add /2 , merge /1 , sort /1 , iter /1 , next /1 ]).
134135
135136
136137-record (ems , {
137138 fd ,
138139 root ,
139140 bb_chunk = 10 ,
140- chain_chunk = 100
141+ chain_chunk = 100 ,
142+ event_cb ,
143+ event_st
141144}).
142145
143146
@@ -156,7 +159,11 @@ set_options(Ems, [{root, Root} | Rest]) ->
156159set_options (Ems , [{chain_chunk , Count } | Rest ]) when is_integer (Count ) ->
157160 set_options (Ems # ems {chain_chunk = Count }, Rest );
158161set_options (Ems , [{back_bone_chunk , Count } | Rest ]) when is_integer (Count ) ->
159- set_options (Ems # ems {bb_chunk = Count }, Rest ).
162+ set_options (Ems # ems {bb_chunk = Count }, Rest );
163+ set_options (Ems , [{event_cb , EventCB } | Rest ]) when is_function (EventCB , 3 ) ->
164+ set_options (Ems # ems {event_cb = EventCB }, Rest );
165+ set_options (Ems , [{event_st , EventSt } | Rest ]) ->
166+ set_options (Ems # ems {event_st = EventSt }, Rest ).
160167
161168
162169get_fd (# ems {fd = Fd }) ->
@@ -167,6 +174,10 @@ get_state(#ems{root=Root}) ->
167174 Root .
168175
169176
177+ get_bb_chunk_size (# ems {bb_chunk = Size }) ->
178+ Size .
179+
180+
170181add (Ems , []) ->
171182 {ok , Ems };
172183add (Ems , KVs ) ->
@@ -224,51 +235,55 @@ decimate(#ems{root={_BB, nil}}=Ems) ->
224235 % We have less than bb_chunk backbone pointers so we're
225236 % good to start streaming KV's back to the client.
226237 Ems ;
227- decimate (# ems {root = { BB , NextBB }} = Ems ) ->
238+ decimate (# ems {} = Ems0 ) ->
228239 % To make sure we have a bounded amount of data in RAM
229240 % at any given point we first need to decimate the data
230241 % by performing the first couple iterations of a merge
231242 % sort writing the intermediate results back to disk.
232243
233244 % The first pass gives us a sort with pointers linked from
234245 % largest to smallest.
235- {RevBB , RevNextBB } = merge_back_bone (Ems , small , BB , NextBB ),
246+ {ok , Ems1 } = event_notify (Ems0 , {merge_start , forward }),
247+ {ok , Ems2 } = merge_back_bone (Ems1 , small ),
236248
237249 % We have to run a second pass so that links are pointed
238250 % back from smallest to largest.
239- {FwdBB , FwdNextBB } = merge_back_bone (Ems , big , RevBB , RevNextBB ),
251+ {ok , Ems3 } = event_notify (Ems2 , {merge_start , reverse }),
252+ {ok , Ems4 } = merge_back_bone (Ems3 , big ),
240253
241254 % Continue deicmating until we have an acceptable bound on
242255 % the number of keys to use.
243- decimate (Ems # ems { root = { FwdBB , FwdNextBB }} ).
256+ decimate (Ems4 ).
244257
245258
246- merge_back_bone (Ems , Choose , BB , NextBB ) ->
247- BBPos = merge_chains (Ems , Choose , BB ),
248- merge_rest_back_bone (Ems , Choose , NextBB , {[BBPos ], nil }).
259+ merge_back_bone (# ems { root = { BB , NextBB }} = Ems0 , Choose ) ->
260+ { ok , Ems1 , BBPos } = merge_chains (Ems0 , Choose , BB ),
261+ merge_rest_back_bone (Ems1 , Choose , NextBB , {[BBPos ], nil }).
249262
250263
251- merge_rest_back_bone (_Ems , _Choose , nil , Acc ) ->
252- Acc ;
253- merge_rest_back_bone (Ems , Choose , BBPos , Acc ) ->
254- {ok , {BB , NextBB }} = couch_file :pread_term (Ems # ems .fd , BBPos ),
255- NewPos = merge_chains (Ems , Choose , BB ),
256- {NewBB , NewPrev } = append_item (Ems , Acc , NewPos , Ems # ems .bb_chunk ),
257- merge_rest_back_bone (Ems , Choose , NextBB , {NewBB , NewPrev }).
264+ merge_rest_back_bone (Ems , _Choose , nil , Acc ) ->
265+ { ok , Ems # ems { root = Acc }} ;
266+ merge_rest_back_bone (Ems0 , Choose , BBPos , Acc ) ->
267+ {ok , {BB , NextBB }} = couch_file :pread_term (Ems0 # ems .fd , BBPos ),
268+ { ok , Ems1 , NewPos } = merge_chains (Ems0 , Choose , BB ),
269+ {NewBB , NewPrev } = append_item (Ems1 , Acc , NewPos , Ems1 # ems .bb_chunk ),
270+ merge_rest_back_bone (Ems1 , Choose , NextBB , {NewBB , NewPrev }).
258271
259272
260- merge_chains (Ems , Choose , BB ) ->
261- Chains = init_chains (Ems , Choose , BB ),
262- merge_chains (Ems , Choose , Chains , {[], nil }).
273+ merge_chains (Ems0 , Choose , BB ) ->
274+ {ok , Ems1 } = event_notify (Ems0 , {merge , chain }),
275+ Chains = init_chains (Ems1 , Choose , BB ),
276+ merge_chains (Ems1 , Choose , Chains , {[], nil }).
263277
264278
265279merge_chains (Ems , _Choose , [], ChainAcc ) ->
266280 {ok , CPos , _ } = couch_file :append_term (Ems # ems .fd , ChainAcc ),
267- CPos ;
268- merge_chains (# ems {chain_chunk = CC }= Ems , Choose , Chains , Acc ) ->
269- {KV , RestChains } = choose_kv (Choose , Ems , Chains ),
270- {NewKVs , NewPrev } = append_item (Ems , Acc , KV , CC ),
271- merge_chains (Ems , Choose , RestChains , {NewKVs , NewPrev }).
281+ {ok , Ems , CPos };
282+ merge_chains (# ems {chain_chunk = CC }= Ems0 , Choose , Chains , Acc ) ->
283+ {KV , RestChains } = choose_kv (Choose , Ems0 , Chains ),
284+ {NewKVs , NewPrev } = append_item (Ems0 , Acc , KV , CC ),
285+ {ok , Ems1 } = event_notify (Ems0 , row_copy ),
286+ merge_chains (Ems1 , Choose , RestChains , {NewKVs , NewPrev }).
272287
273288
274289init_chains (Ems , Choose , BB ) ->
@@ -316,3 +331,9 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
316331append_item (_Ems , {List , Prev }, Pos , _Size ) ->
317332 {[Pos | List ], Prev }.
318333
334+
335+ event_notify (# ems {event_cb = undefined } = Ems , _ ) ->
336+ {ok , Ems };
337+ event_notify (# ems {event_cb = EventCB , event_st = EventSt }= Ems , Event ) ->
338+ NewSt = EventCB (Ems , Event , EventSt ),
339+ {ok , Ems # ems {event_st = NewSt }}.
0 commit comments