From 70633819b43523e9eccee1a2ab257be1e9995b0a Mon Sep 17 00:00:00 2001 From: Kevin Webb Date: Fri, 8 Feb 2019 13:26:30 -0500 Subject: [PATCH] move to server-based workflow --- .gitignore | 4 +- LICENSE | 2 +- README.md | 6 +- express.ts | 31 ----- process_status_changes.ts => mds_processor.ts | 35 +++-- package.json | 1 + server.ts | 38 ++++++ src/data/periodicity.ts | 12 +- src/data/sharedstreets.ts | 13 ++ src/metrics/generic_aggregator.ts | 88 ++++++++++++- src/metrics/status.ts | 122 +++++++++++------- tsconfig.json | 15 +++ yarn.lock | 12 +- 13 files changed, 269 insertions(+), 110 deletions(-) delete mode 100644 express.ts rename process_status_changes.ts => mds_processor.ts (67%) create mode 100644 server.ts create mode 100644 tsconfig.json diff --git a/.gitignore b/.gitignore index 8fce603..35a65a2 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -data/ +/data/ +node_modules/ + diff --git a/LICENSE b/LICENSE index 7ad3dc4..7f38100 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2018 SharedStreets +Copyright (c) 2018-2019 SharedStreets Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 6c369ad..e81988c 100644 --- a/README.md +++ b/README.md @@ -34,9 +34,7 @@ This project provides a pluggable architecture for adding new MDS provider data 1. ```yarn install ``` -2. Run ```process_status_changes.ts``` as a cron job. - -3. Run ```express.ts``` server api end point. +2. Run ```server.ts``` to run data collector and API end point. ### Technical Details @@ -56,4 +54,4 @@ The SharedStreets tools provide an additional layer of protection by encrypting **Encrypting state data:** most recent cached event is encrypted ![MDS events to device states](docs/images/event_process4.png) **Decrypting state data:** cached events can only be decrypted using data contained in a future event from the same vehicle -![MDS events to device states](docs/images/event_process5.png) \ No newline at end of file +![MDS events to device states](docs/images/event_process5.png) diff --git a/express.ts b/express.ts deleted file mode 100644 index 8f76467..0000000 --- a/express.ts +++ /dev/null @@ -1,31 +0,0 @@ -import * as fs from "fs"; -const h3 = require("h3-js"); -const express = require('express') -const app = express() - -var port = '8082'; - -app.get('/metric/availability/:week/:period', async (req, res) => -{ - if(req.params.week && req.params.period) { - - var content = fs.readFileSync('data/metrics/availability/' + req.params.week); - var data = JSON.parse(content.toString()); - - var polygons = {type:"FeatureCollection", features:[]}; - - - if(data[req.params.period]) { - var h3bins = data[req.params.period]; - for(var h3index of Object.keys(h3bins)) { - var h3Coords = h3.h3ToGeoBoundary(h3index, true) - var h3Feature = {type:"Feature", properties:{count:h3bins[h3index].fractionalCount}, geometry:{type:"Polygon", coordinates:[h3Coords]}}; - polygons.features.push(h3Feature); - } - } - - res.send(polygons); - } -}); - -app.listen(port, () => console.log(`app listening on port ${port}!`)); diff --git a/process_status_changes.ts b/mds_processor.ts similarity index 67% rename from process_status_changes.ts rename to mds_processor.ts index e2399e0..f31f676 100644 --- a/process_status_changes.ts +++ b/mds_processor.ts @@ -11,15 +11,19 @@ const h3 = require("h3-js"); async function getStatusChanges() { - const mdsProvider = new UberMDSProvider(); + const mdsProvider = new BirdMDSProvider(); // last two hours for testing... - var endTime = Math.round(Date.now() / 1000); - var startTime = endTime - (60 * 60 * 2); + var now = Math.round(Date.now() / 1000); + var startTime = now - (60 * 60 * (72 * 20)); + var endTime = startTime + (60 * 60 * 10) + - var mdsQuery = new MDSStatusChangeQuery(mdsProvider, startTime, endTime); var statusEventMap = new DiskBackedMDSStatusMap(); + var mdsQuery = new MDSStatusChangeQuery(mdsProvider, startTime, endTime); + + await mdsQuery.run(); // loads data pages for time range query var h3AvailabilityAggregator = new H3AvailabilityStatusAggregator(); @@ -27,18 +31,21 @@ async function getStatusChanges() { // event generator loop var statusEvents = statusEventMap.processStatusEvents(mdsQuery); for await(var event of statusEvents) { - if(event.error) { - console.log("out of order..."); - - // TODO QA logging for MDS data - } - else { - var statusMetric = new StatusMetric(event) - h3AvailabilityAggregator.addData(statusMetric); - } + + if(event.error) { + console.log("out of order..."); + + // TODO QA logging for MDS data + } + else { + var statusMetric = new StatusMetric(event) + h3AvailabilityAggregator.addData(statusMetric); + } + + h3AvailabilityAggregator.save(); } - h3AvailabilityAggregator.save(); + } getStatusChanges(); \ No newline at end of file diff --git a/package.json b/package.json index 0a36a9e..169aa3f 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,7 @@ "@turf/invariant": "^6.1.2", "@types/levelup": "^3.1.0", "@types/node": "^10.12.0", + "cors": "^2.8.5", "express": "^4.16.4", "h3-js": "^3.2.0", "leveldown": "^4.0.1", diff --git a/server.ts b/server.ts new file mode 100644 index 0000000..fe1d8f2 --- /dev/null +++ b/server.ts @@ -0,0 +1,38 @@ +import * as fs from "fs"; +import { H3AvailabilityStatusAggregator } from "./src/metrics/status"; + +const { fork } = require('child_process'); +const { join } = require('path'); + +var cors = require('cors') +const express = require('express'); +const app = express(); + +app.use(cors()) + +// boot the mds data processor +const childProcess = fork('mds_processor.ts'); + +var port = '8082'; + +app.get('/metric/:metric/:week/:period', async (req, res) => +{ + if(req.params.week && req.params.period) { + var weeks = [req.params.week]; + var period = req.params.period; + + if(req.params.metric === 'h3_availability') { + + var availabilityStatus = new H3AvailabilityStatusAggregator(); + try { + var geoJson = availabilityStatus.getGeoJson(weeks,period); + res.send(geoJson); + } + catch(e) { + console.log(e); + } + } + } +}); + +app.listen(port, () => console.log(`app listening on port ${port}!`)); diff --git a/src/data/periodicity.ts b/src/data/periodicity.ts index e65d42e..cf5ea46 100644 --- a/src/data/periodicity.ts +++ b/src/data/periodicity.ts @@ -25,10 +25,18 @@ export class Week { day:number; toString():string { - return this.year + '-' + this.month + '-' + this.day; + const zeroPad = (n) => { + if ( n < 10 ) { + return ( '0' + n.toString () ); + } + return n; + }; + return this.year + '-' + zeroPad(this.month) + '-' + zeroPad(this.day); } isEqual(week):boolean { + + if(this.year === week.year && this.month === week.month && this.day === week.day) return true; else @@ -140,7 +148,7 @@ export function getPeriodsForTimeRange(startTime:number, endTime:number):Periodi // calculate the factional range for data that begins and ends within the same period var newPeriod = new PeriodicTimestamp(); Object.assign(newPeriod, startPeriod); - newPeriod.fraction = endPeriod.fraction - (1 - startPeriod.fraction); + newPeriod.fraction = startPeriod.fraction - (1 - endPeriod.fraction); periods.push(newPeriod); } } diff --git a/src/data/sharedstreets.ts b/src/data/sharedstreets.ts index 0f8d70f..810b254 100644 --- a/src/data/sharedstreets.ts +++ b/src/data/sharedstreets.ts @@ -14,7 +14,19 @@ const SHST_API_SEARCH_RADIUS = 50; // meters export class SharedStreetsLocationRef { referenceId:string; + referenceLength:number location:number; + + getBin(targetBinSize:number):number { + if(targetBinSize > 0) { + var numberOfBins = Math.floor(this.referenceLength / targetBinSize); + var averageBinLength = this.referenceLength / numberOfBins; + var bin = Math.floor(this.location / averageBinLength) + 1; + return bin; + } + else + return 1; + } } async function pointToShStLocationRef(point:Feature):Promise { @@ -37,6 +49,7 @@ async function pointToShStLocationRef(point:Feature):Promise 0) { locationRef.referenceId = data.features[0].properties.referenceId; + locationRef.referenceLength = data.features[0].properties.referenceLength; locationRef.location = data.features[0].properties.location; return locationRef; diff --git a/src/metrics/generic_aggregator.ts b/src/metrics/generic_aggregator.ts index 7215ce3..12157f4 100644 --- a/src/metrics/generic_aggregator.ts +++ b/src/metrics/generic_aggregator.ts @@ -1,7 +1,9 @@ -import { Week } from "../data/periodicity"; +import { Week, PeriodicTimestamp } from "../data/periodicity"; import { SharedStreetsLocationRef } from "../data/sharedstreets"; - +import * as fs from "fs"; +import { join } from 'path'; +import { FeatureCollection } from "@turf/helpers/lib/geojson"; const DEFAULT_DATA_DIRECTORY = './data/metrics/'; @@ -24,23 +26,97 @@ export abstract class GenericMetric { } +export class Count extends GenericMetric { + count:number = 0; + increment() { + this.count = this.count + 1; + } +} + +export class FractionalCount extends GenericMetric { + count:number = 0; + fractionalCount:number = 0; + + add(fractionalValue) { + this.count = this.count + 1; + this.fractionalCount = this.fractionalCount + fractionalValue; + } +} + +export class Sum extends GenericMetric { + count:number = 0; + sum:number = 0; + add(value) { + this.count = this.count + 1; + this.sum = this.sum + value; + } + + avg():number { + if(this.count > 0) + return this.sum / this.count; + else + return 0; + } +} + + export abstract class GenericPeriodicMetric { abstract getPeriodicCounts(metricLabel:string):PeriodicValue[]; } -export abstract class GenericMetricAggregator { - abstract metricGroupName:string; +export abstract class GenericMetricAggregator { + + data = {}; constructor(directory=DEFAULT_DATA_DIRECTORY) { + fs.mkdirSync(this.getPath(), {recursive:true}); + + for(var week of fs.readdirSync(this.getPath())) { + var content = fs.readFileSync(join(this.getPath(), week)); + this.data[week] = JSON.parse(content.toString()); + } } + abstract getMetricName():string; + getPath():string { - return DEFAULT_DATA_DIRECTORY + this.metricGroupName; + return DEFAULT_DATA_DIRECTORY + this.getMetricName(); + } + + abstract defaultValue():V + + getBin(period:PeriodicTimestamp, binIndex:string ):V { + var week = period.week.toString(); + if(!this.data[week]){ + var periodMap = {}; + this.data[week] = periodMap; + } + + if(!this.data[week][period.period]){ + var dataMap = {}; + this.data[week][period.period] = dataMap; + } + + if(!this.data[week][period.period][binIndex]){ + this.data[week][period.period][binIndex] = this.defaultValue(); + } + + return this.data[week][period.period][binIndex]; } abstract addData(data:T); - abstract save(data:T); + + save() { + + for(var week of Object.keys(this.data)){ + var weekObject = this.data[week]; + var jsonContent = JSON.stringify(weekObject); + fs.writeFileSync(join(this.getPath(), week), jsonContent); + } + + } } + diff --git a/src/metrics/status.ts b/src/metrics/status.ts index d200cdd..c69bee9 100644 --- a/src/metrics/status.ts +++ b/src/metrics/status.ts @@ -1,8 +1,12 @@ import { EventType, MDSStatusChange } from "../data/mds"; import { StatusChangeEvent, StatusEventError, StatusEventErrorType } from "../data/status_changes"; -import { GenericPeriodicMetric } from "./generic_aggregator"; +import { GenericPeriodicMetric, FractionalCount, GenericMetricAggregator, GenericMetric } from "./generic_aggregator"; import * as fs from "fs"; +import { Geometries } from "@turf/helpers/lib/geojson"; +import { FeatureCollection } from "@turf/helpers"; + +const h3 = require("h3-js"); const DEFAULT_DATA_DIRECTORY = './data/metrics/' @@ -16,12 +20,14 @@ export enum StatusMetricType { OTHER = "other" } -export class StatusMetric { + +export class StatusMetric extends GenericMetric { statusEvent:StatusChangeEvent; statusType:StatusMetricType; constructor(statusEvent:StatusChangeEvent) { + super() this.statusEvent = statusEvent; @@ -48,73 +54,91 @@ export class StatusMetric { } } -export class FractionalCount { - count:number = 0; - fractionalCount:number = 0; - - add(fractionalValue) { - this.count = this.count + 1; - this.fractionalCount = this.fractionalCount + fractionalValue +export class H3AvailabilityStatusAggregator extends GenericMetricAggregator { + + constructor() { + super() } -} -export class H3AvailabilityStatusAggregator { + getMetricName():string { + return "h3_availability"; + } - metricName = "availability"; - data = {}; + defaultValue():FractionalCount { + return new FractionalCount(); + } + + addData(data:StatusMetric) { - constructor() { + if(data.statusType === StatusMetricType.AVAILABLE) { - fs.mkdirSync(this.getPath(), {recursive:true}); + var h3status = data.statusEvent.getH3StatusChange(); + var h3index = h3status.initialH3Status.getH3Index(); - for(var dataFile in fs.readdirSync(this.getPath())) { - var parts = dataFile.split('/'); - var week = parts[parts.length -1]; - var content = fs.readFileSync(dataFile); - this.data[week] = JSON.parse(content.toString()); + for(var period of data.statusEvent.getPeriods()) { + this.getBin(period, h3index).add(period.fraction) + } } } - getPath():string { - return DEFAULT_DATA_DIRECTORY + this.metricName + '/'; - } + getGeoJson(weeks:string[], filterPeriod:string):{} { + var h3sum:Map = new Map(); + for(var week of Object.keys(this.data)) { + for(var period of Object.keys(this.data[week])) { + if(filterPeriod && filterPeriod !== period) + continue; + for(var h3index of Object.keys(this.data[week][period])) { + + if(!h3sum.has(h3index)) { + h3sum.set(h3index, new FractionalCount()); + } + + var fractionalCount:FractionalCount = this.data[week][period][h3index]; + h3sum.get(h3index).count += 1; + h3sum.get(h3index).fractionalCount += fractionalCount.fractionalCount; + } + } + } + var featureCollection = {type:"FeatureCollection", features:[]} + for(var h3index of h3sum.keys()) { + + var averageFractionalCount = h3sum.get(h3index).fractionalCount / h3sum.get(h3index).count; + var h3Coords = h3.h3ToGeoBoundary(h3index, true) + var h3Feature = {type:"Feature", properties:{averageFractionalCount:averageFractionalCount}, geometry:{type:"Polygon", coordinates:[h3Coords]}}; + featureCollection.features.push(h3Feature); - save() { - for(var week of Object.keys(this.data)){ - var weekObject = this.data[week]; - var jsonContent = JSON.stringify(weekObject); - fs.writeFileSync(this.getPath() + week, jsonContent); } + + return featureCollection; + + } +} + +class ShStAvailabilityStatusAggregator extends GenericMetricAggregator { + + + getMetricName():string { + return "shst_availability"; } + + defaultValue():FractionalCount { + return new FractionalCount(); + } + addData(data:StatusMetric) { - if(data.statusType === StatusMetricType.AVAILABLE) { - var h3status = data.statusEvent.getH3StatusChange(); - var h3index = h3status.initialH3Status.getH3Index(); - for(var period of data.statusEvent.getPeriods()) { - var week = period.week.toString(); - if(!this.data[week]){ - var periodMap = {}; - this.data[week] = periodMap; - } + if(data.statusType === StatusMetricType.AVAILABLE) { - if(!this.data[week][period.period]){ - var h3Map = {}; - this.data[week][period.period] = h3Map; - } + var shStstatus = data.statusEvent.getShStStatusChange(); - if(!this.data[week][period.period][h3index]){ - var fracionalCount = new FractionalCount(); - this.data[week][period.period][h3index] = fracionalCount; - } + // TODO implement (optional?) binned aggreagion -- this is just aggregating at segment level + var shStReferenceId = shStstatus.initialShStStatus.shstLocationRef.referenceId; - this.data[week][period.period][h3index].add(period.fraction); + for(var period of data.statusEvent.getPeriods()) { + this.getBin(period, shStReferenceId).add(period.fraction) } } } -} - -class ShStAvailabilityStatusAggregator { } \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..da5a4a4 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,15 @@ +{ + "compilerOptions": { + "module": "commonjs", + "target": "es5", + "lib": ["es2015","esnext","esnext.asynciterable"], + "downlevelIteration": true, + "noImplicitAny": false, + "strict": false, + "outDir": "./dist", + "sourceMap": true + }, + "include": [ + "src/**/*" + ] +} diff --git a/yarn.lock b/yarn.lock index 70379a3..64e5c8b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -194,6 +194,14 @@ core-util-is@~1.0.0: resolved "https://registry.yarnpkg.com/core-util-is/-/core-util-is-1.0.2.tgz#b5fd54220aa2bc5ab57aab7140c940754503c1a7" integrity sha1-tf1UIgqivFq1eqtxQMlAdUUDwac= +cors@^2.8.5: + version "2.8.5" + resolved "https://registry.yarnpkg.com/cors/-/cors-2.8.5.tgz#eac11da51592dd86b9f06f6e7ac293b3df875d29" + integrity sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g== + dependencies: + object-assign "^4" + vary "^1" + debug@2.6.9: version "2.6.9" resolved "https://registry.yarnpkg.com/debug/-/debug-2.6.9.tgz#5d128515df134ff327e90a4c93f4e077a536341f" @@ -589,7 +597,7 @@ number-is-nan@^1.0.0: resolved "https://registry.yarnpkg.com/number-is-nan/-/number-is-nan-1.0.1.tgz#097b602b53422a522c1afb8790318336941a011d" integrity sha1-CXtgK1NCKlIsGvuHkDGDNpQaAR0= -object-assign@^4.1.0: +object-assign@^4, object-assign@^4.1.0: version "4.1.1" resolved "https://registry.yarnpkg.com/object-assign/-/object-assign-4.1.1.tgz#2109adc7965887cfc05cbbd442cac8bfbb360863" integrity sha1-IQmtx5ZYh8/AXLvUQsrIv7s2CGM= @@ -967,7 +975,7 @@ uuid@^3.3.2: version "3.3.2" resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.3.2.tgz#1b4af4955eb3077c501c23872fc6513811587131" -vary@~1.1.2: +vary@^1, vary@~1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/vary/-/vary-1.1.2.tgz#2299f02c6ded30d4a5961b0b9f74524a18f634fc" integrity sha1-IpnwLG3tMNSllhsLn3RSShj2NPw=