Skip to content

Commit

Permalink
Abstract the update code from the device_channel.
Browse files Browse the repository at this point in the history
  • Loading branch information
mobileoverlord authored and jjcarstens committed May 26, 2020
1 parent 18eae90 commit b4067a5
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 246 deletions.
22 changes: 4 additions & 18 deletions lib/nerves_hub_link.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ defmodule NervesHubLink do
Checks if the device is connected to the NervesHub channel.
"""
@spec connected? :: boolean()
def connected?() do
device_channel_state()
|> Map.get(:connected?, false)
end
defdelegate connected?(), to: NervesHubLink.DeviceChannel

@doc """
Checks if the device has a socket connection with NervesHub
Expand All @@ -17,19 +14,8 @@ defmodule NervesHubLink do
as: :connected?

@doc """
Current status of the device channel
Current status of the update manager
"""
@spec status :: NervesHubLink.DeviceChannel.State.status()
def status() do
device_channel_state()
|> Map.get(:status, :unknown)
end

defp device_channel_state() do
GenServer.whereis(NervesHubLink.DeviceChannel)
|> case do
channel when is_pid(channel) -> GenServer.call(channel, :get_state)
_ -> %{}
end
end
@spec status :: NervesHubLink.UpdateManager.State.status()
defdelegate status(), to: NervesHubLink.UpdateManager
end
12 changes: 10 additions & 2 deletions lib/nerves_hub_link/application.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
defmodule NervesHubLink.Application do
use Application

alias NervesHubLink.{DeviceChannel, Configurator, Connection, ConsoleChannel, Socket}
alias NervesHubLink.{
DeviceChannel,
Configurator,
Connection,
ConsoleChannel,
Socket,
UpdateManager
}

def start(_type, _args) do
config = Configurator.build()
Expand All @@ -10,7 +17,8 @@ defmodule NervesHubLink.Application do
[
Connection,
{PhoenixClient.Socket, {config.socket, [name: Socket]}},
{DeviceChannel, [socket: Socket, params: config.params]}
{DeviceChannel, [socket: Socket, params: config.params]},
UpdateManager
]
|> add_console_child(config)

Expand Down
4 changes: 3 additions & 1 deletion lib/nerves_hub_link/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ defmodule NervesHubLink.Client do
"""
@spec update_available(update_data()) :: update_response()
def update_available(data) do
case apply_wrap(mod(), :update_available, [data]) do
ret = apply_wrap(mod(), :update_available, [data])

case ret do
:apply ->
:apply

Expand Down
144 changes: 34 additions & 110 deletions lib/nerves_hub_link/device_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,16 @@ defmodule NervesHubLink.DeviceChannel do
use GenServer
require Logger

alias NervesHubLink.{Client, HTTPFwupStream}
alias NervesHubLink.{Client, UpdateManager}
alias PhoenixClient.{Channel, Message}

@rejoin_after Application.get_env(:nerves_hub_link, :rejoin_after, 5_000)

defmodule State do
@type status ::
:idle
| :fwup_error
| :update_failed
| :update_rescheduled
| {:updating, integer()}
| :unknown

@type t :: %__MODULE__{
channel: pid(),
connected?: boolean(),
params: map(),
status: status(),
socket: pid(),
topic: String.t()
}
Expand All @@ -29,20 +20,43 @@ defmodule NervesHubLink.DeviceChannel do
topic: "device",
channel: nil,
params: %{},
status: :idle,
connected?: false
end

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def send_update_progress(progress) do
GenServer.cast(__MODULE__, {:send_update_progress, progress})
end

def send_update_status(status) do
GenServer.cast(__MODULE__, {:send_update_status, status})
end

def connected?() do
GenServer.call(__MODULE__, :connected?)
end

def init(opts) do
send(self(), :join)
{:ok, struct(State, opts)}
end

def handle_call(:get_state, _from, state), do: {:reply, state, state}
def handle_call(:connected?, _from, %{connected?: connected?} = state) do
{:reply, connected?, state}
end

def handle_cast({:send_update_progress, progress}, state) do
Channel.push_async(state.channel, "fwup_progress", %{value: progress})
{:noreply, state}
end

def handle_cast({:send_update_status, status}, state) do
Channel.push_async(state.channel, "status_update", %{status: status})
{:noreply, state}
end

def handle_info(%Message{event: "reboot"}, state) do
Logger.warn("Reboot Request from NervesHubLink")
Expand All @@ -52,8 +66,9 @@ defmodule NervesHubLink.DeviceChannel do
{:noreply, state}
end

def handle_info(%Message{event: "update", payload: params}, state) do
{:noreply, maybe_update_firmware(params, state)}
def handle_info(%Message{event: "update", payload: update}, state) do
UpdateManager.apply_update(update)
{:noreply, state}
end

def handle_info(%Message{event: event, payload: payload}, state)
Expand All @@ -69,8 +84,8 @@ defmodule NervesHubLink.DeviceChannel do
case Channel.join(socket, topic, params) do
{:ok, reply, channel} ->
NervesHubLink.Connection.connected()
state = %{state | channel: channel, connected?: true}
{:noreply, maybe_update_firmware(reply, state)}
_ = handle_join_reply(reply)
{:noreply, %{state | channel: channel, connected?: true}}

_error ->
NervesHubLink.Connection.disconnected()
Expand All @@ -79,106 +94,15 @@ defmodule NervesHubLink.DeviceChannel do
end
end

def handle_info({:fwup, {:ok, _status, _info} = message}, state) do
Logger.info("[NervesHubLink] FWUP Finished")
_ = Client.handle_fwup_message(message)
Nerves.Runtime.reboot()
{:noreply, state}
end

def handle_info({:fwup, message}, state) do
state =
case message do
{:progress, percent} ->
Channel.push_async(state.channel, "fwup_progress", %{value: percent})
%{state | status: {:updating, percent}}

{:error, _, _message} ->
Channel.push_async(state.channel, "status_update", %{status: "fwup error"})
%{state | status: :fwup_error}

_ ->
state
end

_ = Client.handle_fwup_message(message)
{:noreply, state}
end

def handle_info({:http_error, error}, state) do
_ = Client.handle_error(error)
Channel.push_async(state.channel, "status_update", %{status: "update failed"})
{:noreply, %{state | status: :update_failed}}
end

def handle_info({:update_reschedule, response}, state) do
{:noreply, maybe_update_firmware(response, state)}
end

def handle_info({:DOWN, _, :process, _, :normal}, state) do
{:noreply, state}
end

def handle_info({:DOWN, _, :process, _, reason}, state) do
Logger.error("HTTP Streaming Error: #{inspect(reason)}")
_ = Client.handle_error(reason)
Channel.push_async(state.channel, "status_update", %{status: "update failed"})
{:noreply, %{state | status: :update_failed}}
end

def handle_info(_message, state) do
{:noreply, state}
end

def terminate(_reason, _state), do: NervesHubLink.Connection.disconnected()

defp maybe_update_firmware(_data, %{status: {:updating, _percent}} = state) do
# Received an update message from NervesHub, but we're already in progress.
# It could be because the deployment/device was edited making a duplicate
# update message or a new deployment was created. Either way, lets not
# interrupt FWUP and let the task finish. After update and reboot, the
# device will check-in and get an update message if it was actually new and
# required
state
end

defp maybe_update_firmware(%{"firmware_url" => url} = data, state) do
# Cancel an existing timer if it exists.
# This prevents rescheduled updates`
# from compounding.
state = maybe_cancel_timer(state, :update_reschedule_timer)

# possibly offload update decision to an external module.
# This will allow application developers
# to control exactly when an update is applied.
case Client.update_available(data) do
:apply ->
{:ok, http} = HTTPFwupStream.start(self())
spawn_monitor(HTTPFwupStream, :get, [http, url])
Logger.info("[NervesHubLink] Downloading firmware: #{url}")
%{state | status: {:updating, 0}}

:ignore ->
state

{:reschedule, ms} ->
timer = Process.send_after(self(), {:update_reschedule, data}, ms)
Logger.info("[NervesHubLink] rescheduling firmware update in #{ms} milliseconds")
state = Map.put(state, :update_reschedule_timer, timer)

%{state | status: :update_rescheduled}
end
defp handle_join_reply(%{"firmware_url" => _url} = update) do
UpdateManager.apply_update(update)
end

defp maybe_update_firmware(_, state), do: state

defp maybe_cancel_timer(state, key) do
timer = Map.get(state, key)

if timer && Process.read_timer(timer) do
Process.cancel_timer(timer)
end

Map.delete(state, key)
end
defp handle_join_reply(_), do: :noop
end
Loading

0 comments on commit b4067a5

Please sign in to comment.