Skip to content

Commit

Permalink
Merge pull request #380 from ambiata/topic/better-recursive-upload
Browse files Browse the repository at this point in the history
Topic/better recursive upload
  • Loading branch information
erikd-ambiata authored Nov 10, 2017
2 parents c3c6997 + 437545f commit c0cb6eb
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 47 deletions.
10 changes: 5 additions & 5 deletions mismi-cli/main/s3.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ data UnitPrefix =
deriving (Eq, Show)

data Command =
Upload Recursive FilePath Address WriteMode
Upload Recursive FilePath Address WriteMode Int
| Download Recursive Address FilePath
| Copy Address Address WriteMode
| Concat [Address] Address WriteMode Int
Expand Down Expand Up @@ -126,10 +126,10 @@ run c = do
let
e' = configure (over serviceRetry (set retryAttempts 10 . set exponentBase 0.6) s3) e
orDie O.renderError . O.runAWS e' $ case c of
Upload NotRecursive s d m ->
Upload NotRecursive s d m _ ->
uploadWithModeOrFail m s d
Upload Recursive s d m ->
uploadRecursiveWithModeOrFail m s d
Upload Recursive s d m f ->
uploadRecursiveWithModeOrFail m s d f
Download NotRecursive s d ->
renderExit renderDownloadError . download s . optAppendFileName d $ key s
Download Recursive s d ->
Expand Down Expand Up @@ -301,7 +301,7 @@ commandP' :: Force -> Parser Command
commandP' f = XOA.subparser $
command' "upload"
"Upload a file to s3."
(Upload <$> recursive' <*> filepath' <*> address' <*> writeMode' f)
(Upload <$> recursive' <*> filepath' <*> address' <*> writeMode' f <*> fork')
<> command' "download"
"Download a file from s3."
(Download <$> recursive' <*> address' <*> filepath')
Expand Down
1 change: 1 addition & 0 deletions mismi-s3-core/mismi-s3-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ test-suite test
, ambiata-mismi-s3-core
, ambiata-p
, attoparsec
, filepath >= 1.3 && < 1.5
, text
, QuickCheck >= 2.7 && < 2.10
, quickcheck-instances == 0.3.*
32 changes: 22 additions & 10 deletions mismi-s3-core/test/Test/Mismi/S3/Core/Arbitrary.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,29 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Test.Mismi.S3.Core.Arbitrary where

import Data.Text as T
import qualified Data.List as L
import qualified Data.Text as T

import Disorder.Corpus
import Disorder.Corpus (simpsons, southpark)

import Mismi.S3.Core.Data

import P

import Test.QuickCheck
import Test.QuickCheck (Arbitrary (..), Gen)
import qualified Test.QuickCheck as QC
import Test.QuickCheck.Instances ()

import System.FilePath (FilePath)

instance Arbitrary WriteMode where
arbitrary = elements [Fail, Overwrite]
arbitrary = QC.elements [Fail, Overwrite]

instance Arbitrary Bucket where
arbitrary = Bucket <$> elements southpark
arbitrary = Bucket <$> QC.elements southpark

instance Arbitrary Address where
arbitrary = frequency [
arbitrary = QC.frequency [
(9, Address <$> arbitrary <*> arbitrary)
, (1, flip Address (Key "") <$> arbitrary)
]
Expand All @@ -32,8 +35,17 @@ instance Arbitrary Key where
-- Unfortunately unicode characters aren't supported in the Haskell AWS library
-- https://github.com/ambiata/vee/issues/7
arbitrary =
let genPath = elements ["happy", "sad", ".", ":", "-"]
let genPath = QC.elements ["happy", "sad", ".", ":", "-"]
path = do
sep <- elements ["-", "=", "#", ""]
T.take 256 . T.intercalate "/" <$> listOf1 (T.intercalate sep <$> listOf1 genPath)
in (Key . append "tests/") <$> path
sep <- QC.elements ["-", "=", "#", ""]
T.take 256 . T.intercalate "/" <$> QC.listOf1 (T.intercalate sep <$> QC.listOf1 genPath)
in (Key . T.append "tests/") <$> path


fileNameSizePairs :: Int -> Gen [(FilePath, Int64)]
fileNameSizePairs len = do
names <- QC.vectorOf len $ QC.elements simpsons
lengths <- QC.vectorOf len $ QC.choose (1, 1000000000)
pure $ L.zipWith3 zipper names [(0::Int) ..] lengths
where
zipper n i l = (n <> show i, l)
1 change: 1 addition & 0 deletions mismi-s3-core/test/mismi-s3-core-test.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ library
, ambiata-mismi-s3-core
, ambiata-p
, attoparsec
, filepath >= 1.3 && < 1.5
, text
, QuickCheck >= 2.7 && < 2.10
, quickcheck-instances == 0.3.*
Expand Down
2 changes: 2 additions & 0 deletions mismi-s3/mismi-s3.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ library
, filepath >= 1.3 && < 1.5
, http-client >= 0.4.18 && < 0.5
, http-types == 0.8.*
, lifted-async == 0.9.*
, mtl >= 2.1 && < 2.3
, process >= 1.2 && < 1.5
, resourcet == 1.1.*
Expand Down Expand Up @@ -89,6 +90,7 @@ test-suite test
, ambiata-p
, ambiata-x-eithert
, conduit
, containers == 0.5.*
, directory == 1.2.*
, exceptions
, filepath
Expand Down
90 changes: 62 additions & 28 deletions mismi-s3/src/Mismi/S3/Commands.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ module Mismi.S3.Commands (
, grantReadAccess
, hoistUploadError
, hoistDownloadError
, chunkFilesBySize
) where

import Control.Arrow ((***))

import Control.Concurrent.Async.Lifted (mapConcurrently_)
import Control.Exception (ioError)
import qualified Control.Exception as CE
import Control.Lens ((.~), (^.), to, view)
Expand Down Expand Up @@ -122,7 +123,7 @@ import System.IO.Error (IOError)
import System.Directory (createDirectoryIfMissing, doesFileExist, getDirectoryContents)
import System.FilePath (FilePath, (</>), takeDirectory)
import System.Posix.IO (OpenMode(..), openFd, closeFd, fdSeek, defaultFileFlags)
import System.Posix.Files (getFileStatus, isDirectory, isRegularFile)
import System.Posix.Files (fileSize, getFileStatus, isDirectory, isRegularFile)
import qualified "unix-bytestring" System.Posix.IO.ByteString as UBS

import System.Timeout.Lifted (timeout)
Expand Down Expand Up @@ -370,13 +371,13 @@ upload :: FilePath -> Address -> EitherT UploadError AWS ()
upload =
uploadWithMode Fail

uploadRecursive :: FilePath -> Address -> EitherT UploadError AWS ()
uploadRecursive :: FilePath -> Address -> Int -> EitherT UploadError AWS ()
uploadRecursive =
uploadRecursiveWithMode Fail

uploadRecursiveOrFail :: FilePath -> Address -> AWS ()
uploadRecursiveOrFail f a =
eitherT hoistUploadError pure $ uploadRecursive f a
uploadRecursiveOrFail :: FilePath -> Address -> Int -> AWS ()
uploadRecursiveOrFail f a i =
eitherT hoistUploadError pure $ uploadRecursive f a i

uploadOrFail :: FilePath -> Address -> AWS ()
uploadOrFail f a =
Expand All @@ -386,9 +387,9 @@ uploadWithModeOrFail :: WriteMode -> FilePath -> Address -> AWS ()
uploadWithModeOrFail w f a =
eitherT hoistUploadError pure $ uploadWithMode w f a

uploadRecursiveWithModeOrFail :: WriteMode -> FilePath -> Address -> AWS ()
uploadRecursiveWithModeOrFail w f a =
eitherT hoistUploadError pure $ uploadRecursiveWithMode w f a
uploadRecursiveWithModeOrFail :: WriteMode -> FilePath -> Address -> Int -> AWS ()
uploadRecursiveWithModeOrFail w f a i =
eitherT hoistUploadError pure $ uploadRecursiveWithMode w f a i

hoistUploadError :: UploadError -> AWS ()
hoistUploadError e =
Expand All @@ -410,8 +411,7 @@ uploadWithMode m f a = do
unlessM (liftIO $ doesFileExist f) . left $ UploadSourceMissing f
s <- liftIO $ withFile f ReadMode $ \h ->
hFileSize h
let chunk = 100 * 1024 * 1024
case s < chunk of
case s < bigChunkSize of
True ->
lift $ uploadSingle f a
False ->
Expand All @@ -422,21 +422,27 @@ uploadWithMode m f a = do
-- better default.
case s > 1024 * 1024 * 1024 of
True ->
multipartUpload f a s (2 * chunk) 20
multipartUpload f a s (2 * bigChunkSize) 20
False ->
multipartUpload f a s chunk 20
multipartUpload f a s bigChunkSize 20



bigChunkSize :: Integer
bigChunkSize = 100 * 1024 * 1024


uploadSingle :: FilePath -> Address -> AWS ()
uploadSingle file a = do
rq <- N.chunkedFile (ChunkSize $ 1024 * 1024) file
void . send $ f' A.putObject a rq & A.poServerSideEncryption .~ pure sse

multipartUpload :: FilePath -> Address -> Integer -> Integer -> Int -> EitherT UploadError AWS ()
multipartUpload file a fileSize chunk fork = do
multipartUpload file a fSize chunk fork = do
e <- ask
mpu <- lift $ createMultipartUpload a

let chunks = calculateChunksCapped (fromInteger fileSize) (fromInteger chunk) 4096 -- max 4096 prts returned
let chunks = calculateChunksCapped (fromInteger fSize) (fromInteger chunk) 4096 -- max 4096 prts returned

r <- liftIO $
consume (forM_ chunks . writeQueue) fork $ multipartUploadWorker e mpu file a
Expand Down Expand Up @@ -474,22 +480,49 @@ multipartUploadWorker e mpu file a (o, c, i) =
pure $! Right $! PartResponse i m


uploadRecursiveWithMode :: WriteMode -> FilePath -> Address -> EitherT UploadError AWS ()
uploadRecursiveWithMode m src (Address buck ky) = do
uploadRecursiveWithMode :: WriteMode -> FilePath -> Address -> Int -> EitherT UploadError AWS ()
uploadRecursiveWithMode mode src (Address buck ky) fork = do
es <- tryIO $ getFileStatus src
case es of
Left _ -> left $ UploadSourceMissing src
Right st -> unless (isDirectory st) . left $ UploadSourceNotDirectory src
files <- liftIO $ listRecursivelyLocal src
let prefixLen = L.length (src </> "a") - 1
outputAddrs = fmap (\fp -> Address buck (ky // Key (T.pack $ L.drop prefixLen fp))) files
mapM_ (uncurry (uploadWithMode m)) $ L.zip files outputAddrs
files <- liftIO (listRecursivelyLocal src)
mapM_ uploadFiles $ chunkFilesBySize fork (fromIntegral bigChunkSize) files
where
uploadFiles :: [(FilePath, Int64)] -> EitherT UploadError AWS ()
uploadFiles [] = pure ()
uploadFiles [(f,s)]
| fromIntegral s < bigChunkSize = lift . uploadSingle f $ uploadAddress f
| otherwise = uploadWithMode mode f $ uploadAddress f
uploadFiles xs =
mapConcurrently_ (\ (f, _) -> lift . uploadSingle f $ uploadAddress f) xs


prefixLen = L.length (src </> "a") - 1

uploadAddress :: FilePath -> Address
uploadAddress fp = Address buck (ky // Key (T.pack $ L.drop prefixLen fp))

-- Take a list of files and their sizes, and convert it to a list of tests
-- where the total size of of the files in the sub list is less than `maxSize`
-- and the length of the sub lists is <= `maxCount`.
chunkFilesBySize :: Int -> Int64 -> [(FilePath, Int64)] -> [[(FilePath, Int64)]]
chunkFilesBySize maxCount maxSize =
takeFiles 0 [] . L.sortOn snd
where
takeFiles :: Int64 -> [(FilePath, Int64)] -> [(FilePath, Int64)] -> [[(FilePath, Int64)]]
takeFiles _ acc [] = [acc]
takeFiles current acc ((x, s):xs) =
if current + s < maxSize && L.length acc < maxCount
then takeFiles (current + s) ((x, s):acc) xs
else ((x, s):acc) : takeFiles 0 [] xs

-- | Like `listRecursively` but for the local filesystem.
listRecursivelyLocal :: MonadIO m => FilePath -> m [FilePath]
-- Also returns
listRecursivelyLocal :: MonadIO m => FilePath -> m [(FilePath, Int64)]
listRecursivelyLocal topdir = do
entries <- liftIO $ listDirectory topdir
(dirs, files) <- liftIO . partitionDirsFiles $ fmap (topdir </>) entries
(dirs, files) <- liftIO . partitionDirsFilesWithSizes $ fmap (topdir </>) entries
others <- concatMapM listRecursivelyLocal dirs
pure $ files <> others

Expand All @@ -502,16 +535,17 @@ listDirectory path =
f filename =
filename /= "." && filename /= ".."

partitionDirsFiles :: MonadIO m => [FilePath] -> m ([FilePath], [FilePath])
partitionDirsFiles =
partitionDirsFilesWithSizes :: MonadIO m => [FilePath] -> m ([FilePath], [(FilePath, Int64)])
partitionDirsFilesWithSizes =
pworker ([], [])
where
pworker (dirs, files) [] = pure (dirs, files)
pworker (dirs, files) (x:xs) = do
xstat <- liftIO $ getFileStatus x
pworker
(if isDirectory xstat then x : dirs else dirs, if isRegularFile xstat then x : files else files)
xs
let xsize = fromIntegral $ fileSize xstat
newDirs = if isDirectory xstat then x : dirs else dirs
newFiles = if isRegularFile xstat then (x, xsize) : files else files
pworker (newDirs, newFiles) xs

write :: Address -> Text -> AWS WriteResult
write =
Expand Down
2 changes: 1 addition & 1 deletion mismi-s3/test/Test/IO/Mismi/S3/Commands.hs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ prop_upload_recursive = once . testAWS $ do

addr <- withKey (// Key "top") <$> newAddress

eitherT (fail . show) pure $ uploadRecursive tmpdir addr
eitherT (fail . show) pure $ uploadRecursive tmpdir addr 2

a <- read (withKey (// Key "a") addr)
c <- read (withKey (// Key "b/c") addr)
Expand Down
32 changes: 29 additions & 3 deletions mismi-s3/test/Test/Mismi/S3/Commands.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ module Test.Mismi.S3.Commands where

import Control.Lens ((.~))

import Data.Time.Clock
import qualified Data.List as DL
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as DM
import Data.Time.Clock (NominalDiffTime, addUTCTime, getCurrentTime)

import Disorder.Core.IO
import Disorder.Core.IO (testIO)

import Mismi.S3.Commands
import qualified Mismi.S3.Amazonka as A
Expand All @@ -17,9 +20,13 @@ import P

import System.IO

import Test.QuickCheck
import Test.Mismi.S3.Core.Arbitrary

import Test.QuickCheck (Positive (..), Property, (===), quickCheckAll)
import qualified Test.QuickCheck as QC
import Test.QuickCheck.Instances ()


prop_filter_old :: Positive NominalDiffTime -> Property
prop_filter_old (Positive i) = testIO $ do
n <- getCurrentTime
Expand All @@ -33,6 +40,25 @@ prop_filter_failure = testIO $ do
let r = filterOld n $ A.multipartUpload & A.muInitiated .~ Just n
pure $ r === False

prop_chunk_files_by_size :: Property
prop_chunk_files_by_size =
QC.forAll (QC.choose (2, 10)) $ \ maxFilesPerChunk ->
QC.forAll (QC.choose (10, 100)) $ \ fileCount ->
QC.forAll (QC.choose (1000, 10000)) $ \ maxChunkSize ->
QC.forAll (fileNameSizePairs fileCount) $ \ pairs ->
let chunks = chunkFilesBySize maxFilesPerChunk maxChunkSize pairs
chunkSizes = DL.map (multiChunkSum (DM.fromList pairs) . DL.map fst) chunks
in
DL.filter (> maxChunkSize) chunkSizes === []

where
multiChunkSum :: Map FilePath Int64 -> [FilePath] -> Int64
multiChunkSum _ [] = 0
multiChunkSum _ [_] = 0 -- Don't care about size of single file chunk.
multiChunkSum sizes xs =
sum $ mapMaybe (\ x -> DM.lookup x sizes) xs


return []
tests :: IO Bool
tests = $quickCheckAll

0 comments on commit c0cb6eb

Please sign in to comment.