Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
41 changes: 41 additions & 0 deletions dataflows-cloud/helsinki-transit/connector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Connector for Helsinki Transit Dataflow

Use mqtt-source connector to read the Helkinki Transit live data feed.

### Prerequisites

*Checkout the connector configuration file [mqtt-helsinki.yaml](mqtt-helsinki.yaml) to get context on what we are doing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Login into your InfinyOn Cloud account (add link)



* Load Jolt Smartmodule to your cluster:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jolt -> Jaq


```bash
fluvio hub smartmodule download infinyon/[email protected]
```

### Start the mqtt connector

Checkout the connector configuration file [mqtt-helsinki.yaml](mqtt-helsinki.yaml) for context.

Start the cloud connector:

```bash
fluvio cloud connector create --config mqtt-helsinki.yaml
```

This connector refreshes the licenses every hour. Use fluvio to see the license numbers downloaded from the server:

```bash
fluvio consume helsinki
```

Use <Ctrl-C> to exit


### Clean-up

Delete connector:

```bash
fluvio cloud connector delete helsinki-mqtt
```
24 changes: 24 additions & 0 deletions dataflows-cloud/helsinki-transit/connector/mqtt-helsiniki.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: 0.1.0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: the file name mqtt-helsiniki.yaml should be mqtt-helsinki.yaml

meta:
version: 0.2.9
name: helsinki-mqtt
type: mqtt-source
topic: helsinki
mqtt:
url: "mqtt://mqtt.hsl.fi"
topic: "/hfp/v2/journey/ongoing/vp/bus/#"
payload_output_type: json


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space (remove)

transforms:
- uses: infinyon/[email protected]
with:
filter: |
{
vehicle: .payload.VP.veh,
tst: .payload.VP.tst,
speed: .payload.VP.spd,
lat: .payload.VP.lat,
long: .payload.VP.long,
route: .payload.VP.route
}
197 changes: 197 additions & 0 deletions dataflows-cloud/helsinki-transit/dataflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
apiVersion: 0.5.0
meta:
name: helsinki-stats
version: 0.1.0
namespace: my-org

config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End

types:
event:
type: object
properties:
vehicle:
type: i32
speed:
type: f64
optional: true
lat:
type: f64
optional: true
long:
type: f64
optional: true
route:
type: string
tst:
type: string

vehicle-position:
type: object
properties:
vehicle:
type: i32
speed:
type: f64
lat:
type: f64
long:
type: f64
route:
type: string
ts:
type: i64

average-speed:
type: object
properties:
vehicle:
type: i32
route:
type: string
speed:
type: f64

average-speed-list:
type: list
items:
type: average-speed


topics:
events:
name: helsinki
schema:
value:
type: event

vehicle-position:
schema:
value:
type: vehicle-position

average-speed:
schema:
value:
type: average-speed-list

services:
clean-events:
sources:
- type: topic
id: events

transforms:
- operator: filter
run: |
fn remove_incomplete_events(event: Event) -> Result<bool> {
let allow = event.lat.is_some() && event.long.is_some() && event.speed.is_some();
Ok(allow)
}

- operator: map
dependencies:
- name: chrono
version: "0.4.38"
run: |
fn clean_events(event: Event) -> Result<VehiclePosition> {
use chrono::naive::NaiveDateTime;
let no_timezone = NaiveDateTime::parse_from_str(&event.tst, "%Y-%m-%dT%H:%M:%S.%fZ")?;
let ts = no_timezone.and_utc().timestamp_millis();

let vp = VehiclePosition {
vehicle: event.vehicle,
route: event.route,
speed: event.speed.unwrap_or(0.0),
lat: event.lat.unwrap_or(0.0),
long: event.long.unwrap_or(0.0),
ts: ts,
};

Ok(vp)
}
sinks:
- type: topic
id: vehicle-position

generate-vehicle-stats:
sources:
- type: topic
id: vehicle-position

states:
vehicle-stat:
type: keyed-state
properties:
key:
type: i32
value:
type: arrow-row
properties:
route:
type: string
speed:
type: f64

window:
tumbling:
duration: 5s

assign-timestamp:
run: |
fn assign_timestamp(vp: VehiclePosition, _event_time: i64) -> Result<i64> {
Ok(vp.ts)
}

partition:
assign-key:
run: |
fn assign_key(vp: VehiclePosition) -> Result<i32> {
Ok(vp.vehicle)
}

update-state:
run: |
fn compute_average_speed(vp: VehiclePosition) -> Result<()> {
let mut veh = vehicle_stat();
veh.route = vp.route.clone();
veh.speed = (veh.speed + vp.speed) / 2.0f64;
veh.update()?;
println!("update speed for vehicle {}", vp.vehicle);
Ok(())
}

flush:
run: |
fn collect_vehicle_stats() -> Result<AverageSpeedList> {
let vs = sql("select * from vehicle_stat")?;
let vehicle_col = vs.key()?;
let speed_col = vs.col("speed")?;
let route_col = vs.col("route")?;

let mut avg_speed = vec![];

let rows = vs.rows()?;
while rows.next() {
let vehicle = rows.str(&vehicle_col)?;
let route = rows.str(&route_col)?;
let speed = rows.f64(&speed_col)?;
avg_speed.push(AverageSpeed {
vehicle: vehicle.parse()?,
route,
speed,
}
);
}

Ok(avg_speed)
}

sinks:
- type: topic
id: average-speed