diff --git a/backend/app/get_travel_time.py b/backend/app/get_travel_time.py index 070030b..8321dfa 100644 --- a/backend/app/get_travel_time.py +++ b/backend/app/get_travel_time.py @@ -11,6 +11,10 @@ import json from app.getGitHash import getGitHash +# this configures a setting (globally?) and removes a warning message +# https://pandas.pydata.org/pandas-docs/stable/user_guide/copy_on_write.html#copy-on-write +pandas.options.mode.copy_on_write = True + # the way we currently do it def mean_daily_mean(obs): # group the observations by date @@ -73,7 +77,7 @@ def get_travel_time(start_node, end_node, start_time, end_time, start_date, end_ SELECT link_dir, dt::text, - tx, + EXTRACT('epoch' FROM tx)::bigint AS tx, mean::real AS speed_kmph FROM here.ta WHERE @@ -104,13 +108,6 @@ def get_travel_time(start_node, end_node, start_time, end_time, start_date, end_ total_corridor_length = links_df['length'].sum() - links_df = pandas.DataFrame({ - 'link_dir': [l['link_dir'] for l in links], - 'length': [l['length_m'] for l in links] - }).set_index('link_dir') - - total_corridor_length = links_df['length'].sum() - query_params = { "link_dir_list": [link['link_dir'] for link in links], "node_start": start_node, @@ -132,7 +129,6 @@ def get_travel_time(start_node, end_node, start_time, end_time, start_date, end_ ) connection.close() - # create custom binning bins = make_bins(links_df, link_speeds_df) @@ -151,16 +147,28 @@ def get_travel_time(start_node, end_node, start_time, end_time, start_date, end_ 'query_params': query_params } } - # rolling join of bins to data - link_speeds_df = pandas.merge_asof( + # Pandas is very bad at joining(?), so we start by CROSS joining the bins + # to the data, though this is very inefficient + link_speeds_df = pandas.merge( link_speeds_df, - pandas.DataFrame({'tx': bins,'bin':bins}), - on='tx', - direction='forward' - ).set_index('link_dir') + pandas.DataFrame({ + 'bin': [ bin['id'] for bin in bins ], + 'bin_first_tx': [ bin['first'] for bin in bins ], + 'bin_last_tx': [ bin['last'] for bin in bins ] + }), + how='cross' + ) + # Now we filter out the records that don't align with their bins + link_speeds_df = link_speeds_df.query( + 'tx >= bin_first_tx & tx <= bin_last_tx' + ) # drop column used only for binning - link_speeds_df.drop('tx',axis='columns',inplace=True) - + link_speeds_df.drop( + ['tx', 'bin_first_tx', 'bin_last_tx'], + axis='columns', + inplace=True + ) + link_speeds_df = link_speeds_df.set_index('link_dir') # join previously queried link lengths link_speeds_df = link_speeds_df.join(links_df) # calculate link travel times from speed and length (in seconds) @@ -236,21 +244,40 @@ def get_travel_time(start_node, end_node, start_time, end_time, start_date, end_ def make_bins(links_df, link_speeds_df): """Create the smallest temporal bins possible while ensuring at least 80% of links, by length, have observations.""" - # start with an empty set of links - links = set() - bin_ends = list() - total_length = links_df['length'].sum() - minimum_length = 0.8 * total_length - for tx in link_speeds_df.tx.unique(): - # add links one bin at a time - five_min_bin = link_speeds_df[link_speeds_df['tx']==tx] - links.update(five_min_bin.link_dir.unique()) - # measure the length of links in the set + # start with empty list of bins, defined by their ends + # TODO: define bin starts as well + bins = list() + minimum_length = 0.8 * links_df['length'].sum() + + links_per_5mbin = {} + bin_counter = 1 + # iterate over 5-min time bins with data, in chronological order + for tx in sorted(list(link_speeds_df.tx.unique())): + # get the data for this 5min bin + bin5m = link_speeds_df[link_speeds_df['tx']==tx] + # add the distinct links from this 5-minute bin + links_per_5mbin[tx] = bin5m.link_dir.unique() + # in case data is very sparse, drop observations more than one hour + # prior to the current 5min bin - this window is moving! + keys_to_drop = [] + for tx_key in links_per_5mbin.keys(): + if tx - tx_key >= 3600: # seconds, i.e. 1 hour + keys_to_drop.append(tx_key) + for tx_key in keys_to_drop: + del links_per_5mbin[tx_key] + # get all the links in all the 5m bins so far + links = set( link for linklist in links_per_5mbin.values() for link in linklist ) + # measure the length of links in that set length_so_far = links_df.loc[list(links),'length'].sum() - # define length threshold + # compare against length threshold; if met, end the bin if length_so_far >= minimum_length: - bin_ends.append(tx) - links = set() # reset + bins.append({ + 'id': bin_counter, + 'first': min(links_per_5mbin.keys()), + 'last': tx # equivalently, max(links_per_5mbin.keys()) + }) + bin_counter += 1 + links_per_5mbin = {} else: pass - return bin_ends + return bins diff --git a/frontend/package-lock.json b/frontend/package-lock.json index aacab40..af6247c 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -12,6 +12,7 @@ "@emotion/react": "^11.10.6", "@emotion/styled": "^11.10.6", "@mui/material": "5.x", + "d3-array": "^3.2.4", "express": "^4.18.2", "maplibre-gl": "4.7.x", "p-queue": "^8.0.1", @@ -3731,6 +3732,18 @@ "resolved": "https://registry.npmjs.org/csstype/-/csstype-3.1.3.tgz", "integrity": "sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==" }, + "node_modules/d3-array": { + "version": "3.2.4", + "resolved": "https://registry.npmjs.org/d3-array/-/d3-array-3.2.4.tgz", + "integrity": "sha512-tdQAmyA18i4J7wprpYq8ClcxZy3SC31QMeByyCFyRt7BVHdREQZ5lpzoe5mFEYZUWe+oq8HBvk9JjpibyEV4Jg==", + "license": "ISC", + "dependencies": { + "internmap": "1 - 2" + }, + "engines": { + "node": ">=12" + } + }, "node_modules/data-view-buffer": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/data-view-buffer/-/data-view-buffer-1.0.1.tgz", @@ -5499,6 +5512,15 @@ "node": ">= 0.4" } }, + "node_modules/internmap": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/internmap/-/internmap-2.0.3.tgz", + "integrity": "sha512-5Hh7Y1wQbvY5ooGgPbDaL5iYLAPzMTUrjMulskHLH6wnv/A+1q5rgEaiuqEjB+oxGXIVZs1FF+R/KPN3ZSQYYg==", + "license": "ISC", + "engines": { + "node": ">=12" + } + }, "node_modules/interpret": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/interpret/-/interpret-2.2.0.tgz", diff --git a/frontend/package.json b/frontend/package.json index 7ec08d3..ea8f708 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -44,6 +44,7 @@ "@emotion/react": "^11.10.6", "@emotion/styled": "^11.10.6", "@mui/material": "5.x", + "d3-array": "^3.2.4", "express": "^4.18.2", "maplibre-gl": "4.7.x", "p-queue": "^8.0.1", diff --git a/frontend/src/travelTimeQuery.js b/frontend/src/travelTimeQuery.js index 9048ba2..d47eb10 100644 --- a/frontend/src/travelTimeQuery.js +++ b/frontend/src/travelTimeQuery.js @@ -1,4 +1,5 @@ import { domain } from './domain.js' +import { quantile } from 'd3-array' export class TravelTimeQuery { #corridor @@ -80,15 +81,32 @@ export class TravelTimeQuery { record.set('hoursInRange', this.hoursInRange) record.set('mean_travel_time_minutes', this.#results?.travel_time?.minutes) record.set('mean_travel_time_seconds', this.#results?.travel_time?.seconds) - // minimum and maximum travel time observations + // other stats + record.set( + 'num_pseudo_obs', + this.#results.observations.length + ) record.set( 'max_travel_time_seconds', Math.max(...this.#results.observations.map(o => o.seconds)) ) + record.set( + 'median_travel_time_seconds', + quantile([...this.#results.observations.map(o => o.seconds)], 0.5) + ) + record.set( + 'p85_travel_time_seconds', + quantile([...this.#results.observations.map(o => o.seconds)], 0.85) + ) record.set( 'min_travel_time_seconds', Math.min(...this.#results.observations.map(o => o.seconds)) ) + record.set( + 'corridor_length_meters', + this.#corridor.links.reduce((a,l)=>a+l.length_m, 0) + ) + // turning these off in the frontend until they're ready for production //record.set('moe_lower_p95', this.#results?.confidence?.intervals?.['p=0.95']?.lower?.seconds) //record.set('moe_upper_p95', this.#results?.confidence?.intervals?.['p=0.95']?.upper?.seconds)