@@ -257,6 +257,7 @@ def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
257257 n_threads = max (1 , cpu_count ()* 9 // 10 ) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server
258258 # NOTE: this may present a small problem if this job runs on different hardware than the db,
259259 # but we should not run into that issue in prod.
260+ logger .info (f"using { n_threads } workers" )
260261
261262 srcsigs = Queue () # multi-consumer threadsafe!
262263
@@ -305,7 +306,8 @@ def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
305306 meta_lock = threading .Lock ()
306307
307308 def worker ():
308- logger .info ("starting thread: " + threading .current_thread ().name )
309+ name = threading .current_thread ().name
310+ logger .info ("starting thread" , thread = name )
309311 # set up new db connection for thread
310312 worker_dbc = Database ()
311313 worker_dbc .connect (connector_impl = self ._connector_impl )
@@ -319,8 +321,9 @@ def worker():
319321 dict (zip (w_cursor .column_names , x )) for x in w_cursor
320322 ))
321323 srcsigs .task_done ()
324+ logger .info ("completed pair" , thread = name , pair = f"({ source } , { signal } )" )
322325 except Empty :
323- logger .info ("no jobs left, thread terminating: " + threading . current_thread (). name )
326+ logger .info ("no jobs left, thread terminating" , thread = name )
324327 finally :
325328 worker_dbc .disconnect (False ) # cleanup
326329
@@ -334,7 +337,7 @@ def worker():
334337 logger .info ("jobs complete" )
335338 for t in threads :
336339 t .join ()
337- logger .error ( " threads terminated" )
340+ logger .info ( "all threads terminated" )
338341
339342 # sort the metadata because threaded workers dgaf
340343 sorting_fields = "data_source signal time_type geo_type" .split ()
0 commit comments