Skip to content

Commit

Permalink
Refactor how messages are passed to Lua automations; adds some UI con…
Browse files Browse the repository at this point in the history
…trols as a WiP (#19)
  • Loading branch information
ddellacosta authored Jan 27, 2024
1 parent 07f2517 commit 8ead5ab
Show file tree
Hide file tree
Showing 2,094 changed files with 14,700 additions and 349 deletions.
2 changes: 1 addition & 1 deletion automation-service.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ common shared
, network-uri ^>= 2.6.4.1
, pretty-simple
, random ^>= 1.2.1.1
, retry ^>= 0.9.3.0
, retry ^>= 0.9.3.1
, safe ^>= 0.3.19
, scotty ^>= 0.12.1
, sqlite-simple ^>= 0.4.18.2
Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
pkgs.purescript
pkgs.nodejs_18
pkgs.esbuild
pkgs.sass
zlib
]);
in
Expand Down
1 change: 0 additions & 1 deletion src/Service/Automation.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE TemplateHaskell #-}

module Service.Automation
Expand Down
29 changes: 21 additions & 8 deletions src/Service/AutomationName.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ module Service.AutomationName
)
where

import Control.Monad (guard)
import Control.Applicative ((<|>))
import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Char as Char
import Data.Char (isDigit, isLower, isSpace)
import Data.Hashable (Hashable (..))
import Data.List (uncons)
import Data.Text (Text)
import qualified Data.Text as T
import GHC.Generics (Generic)
import Numeric.Natural (Natural)
import Safe (headMay)
import qualified Text.ParserCombinators.ReadP as RP
import Text.Read (readMaybe)

newtype Port = Port Natural
deriving (Generic, Eq, Ord, Show, Enum, Integral, Num, Real, Hashable, FromJSON)
Expand Down Expand Up @@ -46,11 +48,22 @@ parseAutomationName :: String -> Maybe AutomationName
parseAutomationName = \case
"Gold" -> Just Gold
"Null" -> Just Null
maybeLuaScript -> do
let filepath = filter (/= '"') maybeLuaScript
(firstChar, _remainder) <- uncons filepath
guard (Char.isLower firstChar) *>
(pure . LuaScript $ filepath)
httpOrLuaScript ->
headMay (RP.readP_to_S (parseHTTP <|> parseLuaScript) httpOrLuaScript) >>= fst

parseAutomationNameText :: Text -> Maybe AutomationName
parseAutomationNameText = parseAutomationName . T.unpack

parseHTTP :: RP.ReadP (Maybe AutomationName)
parseHTTP = do
_http <- RP.string "HTTP"
_space <- RP.string " "
port <- RP.munch1 isDigit
pure $ HTTP . Port <$> (readMaybe port :: Maybe Natural)

parseLuaScript :: RP.ReadP (Maybe AutomationName)
parseLuaScript = do
firstIsLower <- RP.satisfy isLower
scriptName <-
(firstIsLower:) <$> RP.manyTill RP.get (RP.satisfy isSpace *> pure () <|> RP.eof)
pure $ Just (LuaScript scriptName)
38 changes: 28 additions & 10 deletions src/Service/Automations/HTTP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Control.Lens (view)
import Control.Monad (forever)
import Control.Monad.IO.Unlift (MonadUnliftIO (..), liftIO)
import Control.Monad.Reader (MonadReader)
import Data.Aeson (decode)
import Data.Aeson (decode, encode)
import Data.ByteString.Lazy (ByteString)
import Data.Foldable (for_)
import Data.Text (Text)
Expand All @@ -21,12 +21,13 @@ import qualified Network.WebSockets as WS
import Service.App (Logger (..), debug)
import qualified Service.App.Logger as Logger
import qualified Service.Automation as Automation
import Service.Automation (Automation (..))
import Service.Automation (Automation (..), ClientMsg (..), Message (..))
import qualified Service.AutomationName as AutomationName
import Service.AutomationName (Port)
import Service.Env (Env, LogLevel (..), daemonBroadcast, devicesRawJSON, logger)
import qualified Service.MQTT.Messages.Daemon as Daemon
import UnliftIO.STM (TChan, TVar, atomically, readTChan, readTVarIO, writeTChan)
import UnliftIO.Async (concurrently_)
import UnliftIO.STM (TChan, TVar, atomically, dupTChan, readTChan, readTVarIO, writeTChan)
import Web.Scotty (file, get, middleware, raw, scottyApp, setHeader)

httpAutomation
Expand Down Expand Up @@ -93,13 +94,30 @@ mkRunAutomation port broadcastChan = do
logDebugMsg logger' "(TODO) Sending Group data to client"
-- WS.sendTextData conn =<< readTVarIO groups

-- set up process loop. For now just waits to receive msg
WS.withPingThread conn 30 (pure ()) $ forever $ do
received <- WS.receiveData conn
logDebugMsg logger' $ "Received " <> (T.pack $ show received)
for_ (decode received) $ atomically . writeTChan daemonBC

send _conn _logger' = atomically . readTChan $ broadcastChan
broadcastChanCopy <- atomically . dupTChan $ broadcastChan

WS.withPingThread conn 30 (pure ()) $ concurrently_
-- Wait for Daemon.Message values coming back from the
-- WebSocket connection.
(forever $ do
received <- WS.receiveData conn
logDebugMsg logger' $ "Received via WebSocket: " <> (T.pack $ show received)
for_ (decode received) $ atomically . writeTChan daemonBC
)
-- Coming from the other direction, wait for messages from the
-- broadcastChan that need to get passed to the WebSocket
-- connection. At the present time this means only messages
-- published to MQTT topics subscribed to by the client on the
-- other end of the WebSocket connection.
(forever $ do
msg <- atomically . readTChan $ broadcastChanCopy
logDebugMsg logger' $ "Received on broadcastChan: " <> (T.pack $ show msg)
case msg of
Client (AutomationName.HTTP msgPort) (ValueMsg v)
| msgPort == port -> WS.sendTextData conn $ encode v
| otherwise -> pure ()
_ -> pure ()
)

logDebugMsg :: (Logger logger) => logger -> Text -> IO ()
logDebugMsg logger' msg =
Expand Down
53 changes: 28 additions & 25 deletions src/Service/Automations/LuaScript.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import Network.MQTT.Topic (mkTopic, unTopic)
import qualified Service.App as App
import Service.App (Logger (..), debug)
import qualified Service.Automation as Automation
import Service.Automation (Automation (..))
import Service.Automation (Automation (..), ClientMsg (..), Message (..))
import qualified Service.AutomationName as AutomationName
import Service.Device (Device, DeviceId, toLuaDevice, topicSet)
import Service.Env (Env, LogLevel (Debug), config, daemonBroadcast, devices, groups, logger,
Expand All @@ -48,8 +48,7 @@ import qualified Service.TimeHelpers as TH
import System.Random (initStdGen, uniformR)
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Exception (handle, throwIO)
import UnliftIO.STM (TChan, TVar, atomically, dupTChan, newBroadcastTChan, readTChan, readTVar,
readTVarIO, writeTChan)
import UnliftIO.STM (TChan, TVar, atomically, readTChan, readTVar, readTVarIO, writeTChan)

luaAutomation
:: (Logger l, MQTTClient mc, MonadReader (Env l mc) m, MonadUnliftIO m)
Expand All @@ -68,21 +67,22 @@ mkCleanupAutomation
:: (Logger l, MQTTClient mc, MonadReader (Env l mc) m, MonadUnliftIO m)
=> FilePath
-> (TChan Automation.Message -> m ())
mkCleanupAutomation filepath = \_broadcastChan -> do
mkCleanupAutomation filepath = \broadcastChan -> do
debug $ "Starting Cleanup: LuaScript " <> T.pack filepath

logger' <- view logger
mqttClient' <- view mqttClient
daemonBroadcast' <- view daemonBroadcast
devices' <- view devices
groups' <- view groups
(logger', mqttClient', daemonBroadcast', devices', groups') <- (,,,,)
<$> view logger
<*> view mqttClient
<*> view daemonBroadcast
<*> view devices
<*> view groups

luaScriptPath' <- view $ config . luaScriptPath
luaState <- liftIO Lua.newstate

luaStatusString <- liftIO . Lua.unsafeRunWith luaState $ do
Lua.openlibs -- load the default Lua packages
loadAPI filepath logger' mqttClient' daemonBroadcast' devices' groups'
loadAPI filepath logger' mqttClient' daemonBroadcast' broadcastChan devices' groups'
loadScript luaScriptPath' filepath *> Lua.callTrace 0 0
callWhenExists "cleanup"

Expand All @@ -109,7 +109,7 @@ mkRunAutomation
:: (Logger l, MQTTClient mc, MonadReader (Env l mc) m, MonadUnliftIO m)
=> FilePath
-> (TChan Automation.Message -> m ())
mkRunAutomation filepath = \_broadcastChan -> do
mkRunAutomation filepath = \broadcastChan -> do
debug $ "Beginning run of LuaScript " <> T.pack filepath

logger' <- view logger
Expand All @@ -130,7 +130,7 @@ mkRunAutomation filepath = \_broadcastChan -> do
luaStatusString <- handle (\e -> pure . show $ (e :: Lua.Exception)) $
liftIO . Lua.unsafeRunWith luaState $ do
Lua.openlibs -- load the default Lua packages
loadAPI filepath logger' mqttClient' daemonBroadcast' devices' groups'
loadAPI filepath logger' mqttClient' daemonBroadcast' broadcastChan devices' groups'
-- TODO this needs error handling
loadScript luaScriptPath' filepath *> Lua.callTrace 0 0

Expand Down Expand Up @@ -176,10 +176,11 @@ loadAPI
-> logger
-> mqttClient
-> TChan Daemon.Message
-> TChan Automation.Message
-> TVar (HashMap DeviceId Device)
-> TVar (HashMap GroupId Group)
-> Lua.LuaE Lua.Exception ()
loadAPI filepath logger' mqttClient' daemonBroadcast' devices' groups' = do
loadAPI filepath logger' mqttClient' daemonBroadcast' broadcastChan devices' groups' = do
for_ functions $ \(fn, fnName) ->
pushDocumentedFunction fn *> Lua.setglobal fnName

Expand Down Expand Up @@ -264,9 +265,6 @@ loadAPI filepath logger' mqttClient' daemonBroadcast' devices' groups' = do
logDebugMsg :: DocumentedFunction Lua.Exception
logDebugMsg =
defun "logDebugMsg"
-- I'll be honest with you, I have no idea what types any of
-- these combinators are, other than having a bunch of LuaE in
-- there
### liftIO . logDebugMsg' filepath logger'
<#> parameter LM.peekText "string" "logString" "string to log"
=#> []
Expand Down Expand Up @@ -380,24 +378,29 @@ loadAPI filepath logger' mqttClient' daemonBroadcast' devices' groups' = do
subscribe =
defun "subscribe"
### (\topic -> do
listenerChan' <- atomically $ do
automationBroadcastChan <- newBroadcastTChan
listenerChan <- dupTChan $ automationBroadcastChan
writeTChan daemonBroadcast' $
Daemon.Subscribe thisAutoName (mkTopic topic) automationBroadcastChan
pure listenerChan
liftIO . mkListenerFn $ listenerChan'
atomically $ writeTChan daemonBroadcast' $
Daemon.Subscribe thisAutoName (mkTopic topic)
liftIO . mkListenerFn $ broadcastChan
)
<#> parameter LM.peekText "string" "topic" "topic to subscribe to"
=#> functionResult pushDocumentedFunction "function" "fn"

mkListenerFn :: TChan Value -> IO (DocumentedFunction Lua.Exception)
mkListenerFn :: TChan Automation.Message -> IO (DocumentedFunction Lua.Exception)
mkListenerFn listenerChan = do
fnName' <- liftIO UUID.nextRandom
let fnName = BS.pack . UUID.toString $ fnName'
pure $
defun (Lua.Name fnName)
### (atomically . readTChan $ listenerChan)
### (let
go = do
msg <- atomically . readTChan $ listenerChan
case msg of
(Client autoName (ValueMsg v))
| autoName == thisAutoName -> pure v
| otherwise -> go
_ -> go
in go
)
=#> functionResult LA.pushViaJSON "msg" "incoming data from subscribed topic"

timestampToCron :: DocumentedFunction Lua.Exception
Expand Down
Loading

0 comments on commit 8ead5ab

Please sign in to comment.