Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

build with cabal and ghc 8.8.4 #4

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ cabal.project.local

# datasets
*.csv

# profiling artifacts
*.ps
178 changes: 178 additions & 0 deletions app/LoadBalancer.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}

module Main where

import Control.Monad (replicateM, forM_, forever)
import Control.Monad.Loops (iterateM_)
import Control.Monad.Random.Class (getRandomR)
import Data.Coerce (coerce)
import Data.List (elemIndex, foldl1')
import Data.Maybe (fromMaybe)
import Data.Random.Source.PureMT (newPureMT)
import Deli (Channel, Deli, JobTiming(..))
import Deli.Printer (printResults)
import System.Random
import qualified Data.PQueue.Min as PQueue
import qualified Deli
import qualified Deli.Random

createWorker
:: Deli JobTiming (Channel JobTiming)
createWorker = do
workerChannel <- Deli.newChannel Nothing
Deli.fork $ forever $ do
job <- Deli.readChannel workerChannel
Deli.runJob job
return workerChannel

roundRobinWorkers
:: Int
-> Channel JobTiming
-> Deli JobTiming ()
roundRobinWorkers num jobChannel = do
chans :: [Channel JobTiming] <- replicateM num createWorker
-- create an infinite list of all channels, repeated,
-- then for each one, read from main queue, and write
-- to the worker's queue
let roundRobinList = cycle chans
forM_ roundRobinList $ \worker -> do
job <- Deli.readChannel jobChannel
Deli.writeChannel worker job

randomWorkers
:: Int
-> Channel JobTiming
-> Deli JobTiming ()
randomWorkers num jobChannel = do
chans :: [Channel JobTiming] <- replicateM num createWorker
forever $ do
randomWorkerIndex <- getRandomR (0, length chans - 1)
let workerQueue = chans !! randomWorkerIndex
job <- Deli.readChannel jobChannel
Deli.writeChannel workerQueue job

leastConn
:: Int
-> Channel JobTiming
-> Deli JobTiming ()
leastConn num jobChannel = do
chans :: [Channel JobTiming] <- replicateM num createWorker
forever $ do
job <- Deli.readChannel jobChannel

conns <- mapM Deli.channelLength chans
let minIndex = fromMaybe 0 $ elemIndex (foldl1' min conns) conns

Deli.writeChannel (chans !! minIndex) job

twoRandomChoices
:: Int
-> Channel JobTiming
-> Deli JobTiming ()
twoRandomChoices num jobChannel = do
chans :: [Channel JobTiming] <- replicateM num createWorker
forever $ do
job <- Deli.readChannel jobChannel

randomWorkerIndexA <- getRandomR (0, length chans - 1)
randomWorkerIndexB <- getRandomR (0, length chans - 1)

aLength <- Deli.channelLength (chans !! randomWorkerIndexA)
bLength <- Deli.channelLength (chans !! randomWorkerIndexB)

if aLength < bLength
then Deli.writeChannel (chans !! randomWorkerIndexA) job
else Deli.writeChannel (chans !! randomWorkerIndexB) job

data PriorityChannel = PriorityChannel
{ _pduration :: !Deli.Duration
, _pchannel :: !(Deli.Channel JobTiming)
} deriving (Eq, Ord, Show)

lwlDispatcher
:: Deli.Channel JobTiming
-> PQueue.MinQueue PriorityChannel
-> Deli JobTiming ()
lwlDispatcher !readChan !queue = do
now <- Deli.now
iterateM_ (dispatch readChan) (queue, now)

dispatch
:: Deli.Channel JobTiming
-> (PQueue.MinQueue PriorityChannel, Deli.Time)
-> Deli JobTiming (PQueue.MinQueue PriorityChannel, Deli.Time)
dispatch readChan (queue, prevTime) = do
job <- Deli.readChannel readChan
newTime <- Deli.now

durationMultiplier <- fromRational . toRational <$> getRandomR (0.7, 1.3 :: Float)


let mFun lastTime nowTime (PriorityChannel d c) =
PriorityChannel (max 0 (d - coerce (nowTime - lastTime))) c
!adjustedQueue = PQueue.map (mFun prevTime newTime) queue
(PriorityChannel shortestPrevDuration shortestQueue, deletedMin) = PQueue.deleteFindMin adjustedQueue

approxJobDuration = durationMultiplier * _jobDuration job
newPriorityChannel = PriorityChannel (shortestPrevDuration + approxJobDuration) shortestQueue
!addedBack = PQueue.insert newPriorityChannel deletedMin

Deli.writeChannel shortestQueue job
return (addedBack, newTime)

leastWorkLeft
:: Int
-> Channel JobTiming
-> Deli JobTiming ()
leastWorkLeft num jobChannel = do
chans :: [Channel JobTiming] <- replicateM num createWorker
let workQueue :: PQueue.MinQueue PriorityChannel
startingTimes = take num [0.00001, 0.00002..]
queueList = [PriorityChannel d c | (d, c) <- zip startingTimes chans]
workQueue = PQueue.fromAscList queueList
lwlDispatcher jobChannel workQueue

loadBalancerExample :: IO ()
loadBalancerExample = do
simulationGen <- newStdGen
inputGen <- newPureMT
let arrivals = Deli.Random.arrivalTimePoissonDistribution 1500
serviceTimes = Deli.Random.durationExponentialDistribution 0.025
numTests = 1000 * 1000 * 1
jobsA = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
jobsB = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
jobsC = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
jobsD = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
jobsE = take numTests $ Deli.Random.distributionToJobs arrivals serviceTimes inputGen
roundRobinRes = Deli.simulate simulationGen jobsA (roundRobinWorkers 48)
randomRes = Deli.simulate simulationGen jobsB (randomWorkers 48)
leastWorkLeftRes = Deli.simulate simulationGen jobsC (leastWorkLeft 48)
twoRandomChoicesRes = Deli.simulate simulationGen jobsD (twoRandomChoices 48)
leastConnRes = Deli.simulate simulationGen jobsE (leastConn 48)

putStrLn "## Round Robin ##"
printResults roundRobinRes
newline
putStrLn "## Random ##"
printResults randomRes
newline
putStrLn "## LeastWorkLeft ##"
printResults leastWorkLeftRes
newline
putStrLn "## TwoRandomChoices ##"
printResults twoRandomChoicesRes
newline
putStrLn "## LeastConn ##"
printResults leastConnRes
newline

where newline = putStrLn "\n"

main :: IO ()
main = do
loadBalancerExample
newline

where newline = putStrLn "\n"
113 changes: 113 additions & 0 deletions app/Performance.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import qualified Control.Monad.Concurrent as C
import Control.Monad (forever, replicateM_)
import Deli (Channel, Deli, JobTiming(..))
import Deli.Printer (printResults)
import System.Random
import qualified Deli

singleQueue
:: Channel JobTiming
-> Deli JobTiming ()
singleQueue queue =
forever $ do
job <- Deli.readChannel queue
Deli.runJob job

singleQueueExample :: IO ()
singleQueueExample = do
gen <- newStdGen
let durations = repeat 0.5
count = 1000 * 100
times = [0,1..(count - 1)]
jobs = zipWith JobTiming times durations
res = Deli.simulate gen jobs singleQueue
printResults res

chainedQueues
:: Channel JobTiming
-> Deli JobTiming ()
chainedQueues queue = do
middleChan <- Deli.newChannel Nothing
Deli.fork $ forever $ do
job <- Deli.readChannel middleChan
Deli.runJob job
forever $ do
job <- Deli.readChannel queue
Deli.writeChannel middleChan job

chainedQueueExample :: IO ()
chainedQueueExample = do
gen <- newStdGen
let durations = repeat 0.5
count = 1000 * 100
times = [0,1..(count - 1)]
jobs = zipWith JobTiming times durations
res = Deli.simulate gen jobs chainedQueues
printResults res

oneThread
:: Channel JobTiming
-> Deli JobTiming ()
oneThread queue = do
middleChan <- Deli.newChannel (Just 1)
forever $ do
jobA <- Deli.readChannel queue
Deli.writeChannel middleChan jobA
jobB <- Deli.readChannel middleChan
Deli.runJob jobB

oneThreadExample :: IO ()
oneThreadExample = do
gen <- newStdGen
let durations = repeat 0.5
count = 1000 * 1000
times = [0,1..(count - 1)]
jobs = zipWith JobTiming times durations
res = Deli.simulate gen jobs oneThread
printResults res

concurrentSingleExample
:: IO ()
concurrentSingleExample =
C.runConcurrentT $ do
chan <- C.newChannel (Just 1)
C.fork $ forever $
C.readChannel chan >> return ()
replicateM_ (1000 * 100 * 10) $ do
C.writeChannel chan True

concurrentChainedExample
:: IO ()
concurrentChainedExample =
C.runConcurrentT $ do
chanOne <- C.newChannel (Just 1)
chanTwo <- C.newChannel (Just 1)
C.fork $ forever $ do
val <- C.readChannel chanOne
C.writeChannel chanTwo val
C.fork $ forever $
C.readChannel chanTwo >> return ()
replicateM_ (1000 * 100 * 10) $ do
C.writeChannel chanOne True

main :: IO ()
main = do
newline
putStrLn "## singleQueueExample ##"
singleQueueExample
newline

newline
putStrLn "## chainedQueueExample ##"
chainedQueueExample
newline

newline
putStrLn "## oneThreadExample ##"
oneThreadExample
newline

concurrentSingleExample
concurrentChainedExample

where newline = putStrLn "\n"
40 changes: 33 additions & 7 deletions deli.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ library
, Deli
, Deli.Printer
, Deli.Random
build-depends: base >= 4.7 && < 5
build-depends: base
, MonadRandom
, bytestring
, containers
, dlist
, lens
, monad-loops
, mtl
, pqueue
, random
Expand All @@ -33,8 +34,9 @@ library
, tdigest
, time
, transformers
, strict
default-language: Haskell2010
ghc-options: -Wall
ghc-options: -Wall -O2

executable tutorial
hs-source-dirs: app
Expand All @@ -56,14 +58,38 @@ executable tutorial
default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N -O1

test-suite deli-test
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: Spec.hs
executable performance
hs-source-dirs: app
main-is: Performance.hs
build-depends: base
, deli
, random
ghc-options: -rtsopts -O2
default-language: Haskell2010

executable load-balancer
hs-source-dirs: app
main-is: LoadBalancer.hs
ghc-options: -O1
build-depends: base
, MonadRandom
, bytestring
, containers
, deli
ghc-options: -threaded -rtsopts -with-rtsopts=-N
, deepseq
, lens
, monad-loops
, mtl
, parallel
, pqueue
, random
, random-fu
, random-source
, tdigest
, time
default-language: Haskell2010
ghc-options: -Wall -O2


source-repository head
type: git
Expand Down
Loading