Skip to content

Commit

Permalink
Start support for AWS IoT setup
Browse files Browse the repository at this point in the history
Initial support for an MQTT and message queue setup for device
connections as an alternative to websockets
  • Loading branch information
jjcarstens committed Nov 19, 2023
1 parent 6d99d21 commit 870613a
Show file tree
Hide file tree
Showing 9 changed files with 479 additions and 0 deletions.
35 changes: 35 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,38 @@ else
end

config :sentry, environment_name: "developent"

broker_opts = [
name: NervesHub.AWSIoT.PintBroker,
rules: [{"nh/device_messages", &Broadway.test_message(:nerves_hub_iot_messages, &1.payload)}],
on_connect: fn client_id ->
payload = %{clientId: client_id, eventType: :connected}
Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload))
end,
on_disconnect: fn client_id ->
payload = %{
clientId: client_id,
eventType: :disconnected,
disconnectReason: "CONNECTION_LOST"
}

Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload))
end
]

config :nerves_hub, NervesHub.AWSIoT,
# Run a PintBroker for local process and/or device connections
local_broker: {PintBroker, broker_opts},
queues: [
[
name: :nerves_hub_iot_messages,
producer: [
module: {Broadway.DummyProducer, []}
# To test fetching from development queues registered with AWS, use the producer
# below. You may need to set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY ¬
# module: {BroadwaySQS.Producer, queue_url: "nerves-hub-iot-messages"}
],
processors: [default: []],
batchers: [default: [batch_size: 10, batch_timeout: 2000]]
]
]
17 changes: 17 additions & 0 deletions config/release.exs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,23 @@ if nerves_hub_app in ["all", "device"] do
certfile: "/etc/ssl/#{host}.pem",
cacertfile: "/etc/ssl/ca.pem"
]

if System.get_env("AWS_IOT_ENABLED") in ["1", "true", "t"] do
config :nerves_hub, NervesHub.AWSIoT,
queues: [
[
# AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY must be set
name: :nerves_hub_iot_messages,
producer: [
module:
{BroadwaySQS.Producer,
queue_url: System.get_env("AWS_IOT_SQS_QUEUE", "nerves-hub-iot-messages")}
],
processors: [default: []],
batchers: [default: [batch_size: 10, batch_timeout: 2000]]
]
]
end
end

config :sentry,
Expand Down
33 changes: 33 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,36 @@ config :opentelemetry, :processors,
config :sentry,
environment_name: :test,
included_environments: []

## AWS IoT
broker_opts = [
name: NervesHub.AWSIoT.PintBroker,
rules: [{"nh/device_messages", &Broadway.test_message(:nerves_hub_iot_messages, &1.payload)}],
on_connect: fn client_id ->
payload = %{clientId: client_id, eventType: :connected}
Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload))
end,
on_disconnect: fn client_id ->
payload = %{
clientId: client_id,
eventType: :disconnected,
disconnectReason: "CONNECTION_LOST"
}

Broadway.test_message(:nerves_hub_iot_messages, Jason.encode!(payload))
end
]

config :nerves_hub, NervesHub.AWSIoT,
# Use PintBroker for local device connections in tests
local_broker: {PintBroker, broker_opts},
queues: [
[
name: :nerves_hub_iot_messages,
producer: [
module: {Broadway.DummyProducer, []}
],
processors: [default: []],
batchers: [default: [batch_size: 10, batch_timeout: 2000]]
]
]
3 changes: 3 additions & 0 deletions lib/nerves_hub/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ defmodule NervesHub.Application do

defp endpoints(:test) do
[
NervesHub.AWSIoT,
NervesHub.Devices.Supervisor,
NervesHubWeb.DeviceEndpoint,
NervesHubWeb.Endpoint
Expand All @@ -65,6 +66,7 @@ defmodule NervesHub.Application do
case Application.get_env(:nerves_hub, :app) do
"all" ->
[
NervesHub.AWSIoT,
NervesHub.Deployments.Supervisor,
NervesHub.Devices.Supervisor,
NervesHubWeb.DeviceEndpoint,
Expand All @@ -74,6 +76,7 @@ defmodule NervesHub.Application do

"device" ->
[
NervesHub.AWSIoT,
NervesHub.Deployments.Supervisor,
NervesHub.Devices.Supervisor,
NervesHubWeb.DeviceEndpoint,
Expand Down
164 changes: 164 additions & 0 deletions lib/nerves_hub/aws_iot.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
defmodule NervesHub.AWSIoT do
@moduledoc """
Support for common AWS IOT infrastructure including MQTT and SQS
Requires `:queues` to be defined in the application config or
the supervisor is simply ignored
See docs.nerves-hub.org for a general overview of the architecture
"""
use Supervisor

@type opt :: {:queues, [keyword()]}
@spec start_link([opt]) :: Supervisor.on_start()
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl Supervisor
def init(opts) do
opts =
Application.get_env(:nerves_hub, __MODULE__, [])
|> Keyword.merge(opts)

case opts[:queues] do
queues when is_list(queues) and length(queues) > 0 ->
children =
Enum.map(queues, &{__MODULE__.SQS, &1})
|> maybe_add_local_broker(opts)

Supervisor.init(children, strategy: :one_for_one)

_ ->
:ignore
end
end

defp maybe_add_local_broker(children, opts) do
if broker_spec = opts[:local_broker] do
[broker_spec | children]
else
children
end
end

if Application.compile_env(:nerves_hub, [__MODULE__, :local_broker], false) do
def publish(serial, event, payload) do
data = Jason.encode!(%{event: event, payload: payload})
PintBroker.publish(__MODULE__.PintBroker, "nh/#{serial}", data)
end
else
def publish(serial, event, payload) do
# TODO: Topic and data may change soon
# Stubbing out initial idea here for now
data = %{event: event, payload: payload}
topic = "/topics/nh/#{serial}"

ExAws.Operation.JSON.new(:iot_data, %{path: topic, data: data})
|> ExAws.request()
end
end

defmodule SQS do
@moduledoc """
Consumer for AWS SQS messages
This is the ingestion point of devices coming from the MQTT
broker. A message from a device must include the `"identifier"`
key either in the payload or pulled from the topic via the
AWS IoT rule that forwards to the queue.
The system must also be setup with a rule to forward [AWS Lifecycle
events](https://docs.aws.amazon.com/iot/latest/developerguide/life-cycle-events.html)
to a queue for tracking device online/offline presence
Right now, all configured queues are handled by this module.
In the future, we may want to separate handling for each
queue in it's own module.
"""
use Broadway

alias Broadway.Message
alias NervesHub.Devices
alias NervesHub.Devices.DeviceLink

require Logger

def start_link(opts), do: Broadway.start_link(__MODULE__, opts)

@impl Broadway
def handle_message(_processor, %{data: raw} = msg, _context) do
case Jason.decode(raw) do
{:ok, data} ->
Message.put_data(msg, data)
|> process_message()

_ ->
Message.failed(msg, :malformed)
end
end

@impl Broadway
def handle_batch(_batcher, messages, batch_info, _context) do
Logger.debug("[SQS] Handled #{inspect(batch_info.size)}")
messages
end

defp process_message(%{data: %{"eventType" => "connected"} = data} = msg) do
# TODO: Maybe use more info from the connection?
# Example payload of AWS lifecycle connected event
# principalIdentifier is a SHA256 fingerprint of the certificate that
# is Base16 encoded
# {
# "clientId": "186b5",
# "timestamp": 1573002230757,
# "eventType": "connected",
# "sessionIdentifier": "a4666d2a7d844ae4ac5d7b38c9cb7967",
# "principalIdentifier": "12345678901234567890123456789012",
# "ipAddress": "192.0.2.0",
# "versionNumber": 0
# }

with {:ok, device} <- Devices.get_by_identifier(data["clientId"]),
push_cb = &NervesHub.AWSIoT.publish(device.identifier, &1, &2),
# TODO: Adjust DeviceLink for this since we won't have
# metadata at this point
{:ok, _device} <- DeviceLink.connect(device, push_cb, %{}) do
msg
else
_ ->
Message.failed(msg, :unknown_device)
end
end

defp process_message(%{data: %{"eventType" => "disconnected"} = data} = msg) do
# TODO: Maybe use more of the disconnect data?
# Example payload of AWS lifecyle disconnect event
# {
# "clientId": "186b5",
# "timestamp": 1573002340451,
# "eventType": "disconnected",
# "sessionIdentifier": "a4666d2a7d844ae4ac5d7b38c9cb7967",
# "principalIdentifier": "12345678901234567890123456789012",
# "clientInitiatedDisconnect": true,
# "disconnectReason": "CLIENT_INITIATED_DISCONNECT",
# "versionNumber": 0
# }
with {:ok, device} <- Devices.get_by_identifier(data["clientId"]) do
# TODO: Update DeviceLink.disconect with reason and track?
Logger.debug(
"[AWS IoT] device #{data["clientId"]} disconnected: #{data["disconnectReason"]}"
)

DeviceLink.disconnect(device)
end

msg
end

defp process_message(msg) do
# TODO: Track unhandled msg
msg
end
end
end
2 changes: 2 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ defmodule NervesHub.MixProject do
{:base62, "~> 1.2"},
{:bcrypt_elixir, "~> 3.0"},
{:circular_buffer, "~> 0.4.1"},
{:broadway_sqs, "~> 0.7"},
{:comeonin, "~> 5.3"},
{:cowboy, "~> 2.0", override: true},
{:crontab, "~> 1.1"},
Expand Down Expand Up @@ -92,6 +93,7 @@ defmodule NervesHub.MixProject do
{:phoenix_pubsub, "~> 2.0"},
{:phoenix_swoosh, "~> 1.0"},
{:phoenix_view, "~> 2.0"},
{:pint_broker, "~> 1.0", only: [:dev, :test]},
{:plug, "~> 1.7"},
{:plug_cowboy, "~> 2.1"},
{:postgrex, "~> 0.14"},
Expand Down
Loading

0 comments on commit 870613a

Please sign in to comment.