diff --git a/backend/app/get_travel_time.py b/backend/app/get_travel_time.py index 73fd06f..8321dfa 100644 --- a/backend/app/get_travel_time.py +++ b/backend/app/get_travel_time.py @@ -11,6 +11,10 @@ import json from app.getGitHash import getGitHash +# this configures a setting (globally?) and removes a warning message +# https://pandas.pydata.org/pandas-docs/stable/user_guide/copy_on_write.html#copy-on-write +pandas.options.mode.copy_on_write = True + # the way we currently do it def mean_daily_mean(obs): # group the observations by date @@ -73,7 +77,7 @@ def get_travel_time(start_node, end_node, start_time, end_time, start_date, end_ SELECT link_dir, dt::text, - tx, + EXTRACT('epoch' FROM tx)::bigint AS tx, mean::real AS speed_kmph FROM here.ta WHERE @@ -143,16 +147,28 @@ def get_travel_time(start_node, end_node, start_time, end_time, start_date, end_ 'query_params': query_params } } - # rolling join of bins to data - link_speeds_df = pandas.merge_asof( + # Pandas is very bad at joining(?), so we start by CROSS joining the bins + # to the data, though this is very inefficient + link_speeds_df = pandas.merge( link_speeds_df, - pandas.DataFrame({'tx': bins,'bin':bins}), - on='tx', - direction='forward' - ).set_index('link_dir') + pandas.DataFrame({ + 'bin': [ bin['id'] for bin in bins ], + 'bin_first_tx': [ bin['first'] for bin in bins ], + 'bin_last_tx': [ bin['last'] for bin in bins ] + }), + how='cross' + ) + # Now we filter out the records that don't align with their bins + link_speeds_df = link_speeds_df.query( + 'tx >= bin_first_tx & tx <= bin_last_tx' + ) # drop column used only for binning - link_speeds_df.drop('tx',axis='columns',inplace=True) - + link_speeds_df.drop( + ['tx', 'bin_first_tx', 'bin_last_tx'], + axis='columns', + inplace=True + ) + link_speeds_df = link_speeds_df.set_index('link_dir') # join previously queried link lengths link_speeds_df = link_speeds_df.join(links_df) # calculate link travel times from speed and length (in seconds) @@ -229,24 +245,39 @@ def make_bins(links_df, link_speeds_df): """Create the smallest temporal bins possible while ensuring at least 80% of links, by length, have observations.""" # start with empty list of bins, defined by their ends - bin_ends = list() + # TODO: define bin starts as well + bins = list() minimum_length = 0.8 * links_df['length'].sum() links_per_5mbin = {} + bin_counter = 1 # iterate over 5-min time bins with data, in chronological order for tx in sorted(list(link_speeds_df.tx.unique())): - # add links one bin at a time + # get the data for this 5min bin bin5m = link_speeds_df[link_speeds_df['tx']==tx] - # get the distinct links in this 5-minute bin + # add the distinct links from this 5-minute bin links_per_5mbin[tx] = bin5m.link_dir.unique() + # in case data is very sparse, drop observations more than one hour + # prior to the current 5min bin - this window is moving! + keys_to_drop = [] + for tx_key in links_per_5mbin.keys(): + if tx - tx_key >= 3600: # seconds, i.e. 1 hour + keys_to_drop.append(tx_key) + for tx_key in keys_to_drop: + del links_per_5mbin[tx_key] # get all the links in all the 5m bins so far links = set( link for linklist in links_per_5mbin.values() for link in linklist ) # measure the length of links in that set length_so_far = links_df.loc[list(links),'length'].sum() - # compare against length threshold + # compare against length threshold; if met, end the bin if length_so_far >= minimum_length: - bin_ends.append(tx) + bins.append({ + 'id': bin_counter, + 'first': min(links_per_5mbin.keys()), + 'last': tx # equivalently, max(links_per_5mbin.keys()) + }) + bin_counter += 1 links_per_5mbin = {} else: pass - return bin_ends + return bins