@@ -13,13 +13,36 @@ def initialize source_database, destination_database, name, user_strategies
1313 @destination_database = destination_database
1414 @fields_missing_strategy = DataAnon ::Core ::FieldsMissingStrategy . new name
1515 @errors = DataAnon ::Core ::TableErrors . new ( @name )
16+ @bulk_process = defined? ( ::ActiveRecord ::Import )
1617 @primary_keys = [ ]
1718 end
1819
1920 def self . whitelist?
2021 false
2122 end
2223
24+ def bulk_process?
25+ @bulk_process
26+ end
27+
28+ def bulk_process flag
29+ @bulk_process = flag
30+ end
31+
32+ def collect_for_bulk_process record
33+ Thread . current [ :bulk_process_records ] << record
34+ end
35+
36+ def bulk_process_records
37+ if bulk_process?
38+ Thread . current [ :bulk_process_records ] = [ ]
39+ yield
40+ bulk_store Thread . current [ :bulk_process_records ]
41+ else
42+ yield
43+ end
44+ end
45+
2346 def process_fields &block
2447 self . instance_eval &block
2548 self
@@ -114,29 +137,35 @@ def process
114137 def process_table progress
115138 index = 0
116139
117- source_table_limited . each do |record |
118- index += 1
119- begin
120- process_record_if index , record
121- rescue => exception
122- @errors . log_error record , exception
140+ bulk_process_records do
141+ source_table_limited . each do |record |
142+ index += 1
143+ begin
144+ process_record_if index , record
145+ rescue => exception
146+ @errors . log_error record , exception
147+ end
148+ progress . show index
123149 end
124- progress . show index
125150 end
126151 end
127152
128153 def process_table_in_batches progress
129154 logger . info "Processing table #{ @name } records in batch size of #{ @batch_size } "
130155 index = 0
131156
132- source_table_limited . find_each ( :batch_size => @batch_size ) do |record |
133- index += 1
134- begin
135- process_record_if index , record
136- rescue => exception
137- @errors . log_error record , exception
157+ source_table_limited . find_in_batches ( :batch_size => @batch_size ) do |records |
158+ bulk_process_records do
159+ records . each do |record |
160+ index += 1
161+ begin
162+ process_record_if index , record
163+ rescue => exception
164+ @errors . log_error record , exception
165+ end
166+ progress . show index
167+ end
138168 end
139- progress . show index
140169 end
141170 end
142171
@@ -154,13 +183,15 @@ def process_table_in_threads progress
154183 end
155184
156185 thr = Thread . new {
157- records . each do |record |
158- begin
159- process_record_if index , record
160- index += 1
161- rescue => exception
162- puts exception . inspect
163- @errors . log_error record , exception
186+ bulk_process_records do
187+ records . each do |record |
188+ begin
189+ process_record_if index , record
190+ index += 1
191+ rescue => exception
192+ puts exception . inspect
193+ @errors . log_error record , exception
194+ end
164195 end
165196 end
166197 }
0 commit comments