Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch processing #32

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7070a16
Basic functionality without S3
berkeli Oct 12, 2022
e7b64bd
add image type verification, refactor
berkeli Oct 12, 2022
f2360a3
add terraform for aws infra
berkeli Oct 12, 2022
b861b03
add s3 uploading
berkeli Oct 12, 2022
134bcb3
refactor
berkeli Oct 12, 2022
73d9115
refactoring
berkeli Oct 12, 2022
c9e803f
extract csv headers to var
berkeli Oct 12, 2022
d215c6f
refactor tests
berkeli Oct 14, 2022
613f3f9
refactor
berkeli Oct 16, 2022
d9af147
start refactor
berkeli Oct 17, 2022
6ae30a8
Implementation functional
berkeli Oct 18, 2022
dba6a6a
add large input
berkeli Oct 18, 2022
8c8c8d7
add tests
berkeli Oct 21, 2022
c428908
profiling / refactoring
berkeli Oct 24, 2022
0697255
refactor
berkeli Nov 3, 2022
91fbffe
use pipeline struct
berkeli Nov 3, 2022
119418f
add tests
berkeli Nov 7, 2022
8c886e0
refactor
berkeli Nov 7, 2022
9dfe75b
use local server for download tests
berkeli Nov 7, 2022
ea35bb4
update output file format
berkeli Nov 15, 2022
37a6107
add exponential backoff for downloads
berkeli Nov 15, 2022
6063ea7
add exponential backoff to s3 upload
berkeli Nov 15, 2022
64740c8
prevent duplicates from being processed using md5
berkeli Nov 16, 2022
abdafd5
refactor pipeline
berkeli Nov 22, 2022
f53550c
fix tests
berkeli Nov 22, 2022
cb290d8
refactor
berkeli Nov 22, 2022
49efb05
Merge branch 'main' into batch-processing-3
berkeli Nov 22, 2022
88254a6
remove bind outputs for test
berkeli Nov 22, 2022
60dad20
refactor
berkeli Nov 23, 2022
48c52d7
refactor
berkeli Nov 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.DS_Store
!*/assets
buggy-app/volumes
.DS_Store


2 changes: 2 additions & 0 deletions batch-processing/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
outputs/
*tfstate*
*.terraform*
7 changes: 4 additions & 3 deletions batch-processing/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,21 @@ RUN go build -o /out
##
FROM base as test

ENTRYPOINT [ "go", "test", "-v" ]
ENTRYPOINT [ "go", "test", "-cover", "-v" ]

##
## DEVELOP
##
FROM base as develop

COPY .bash_history /root/.bash_history
# COPY .bash_history /root/.bash_history
ENTRYPOINT [ "/bin/bash" ]
# go run -race . --input /inputs/large-input.csv --output /outputs/output.csv --output-failed /outputs/failed.csv & sleep 2 && sh profile.sh

##
## RUN
##
FROM base as run

WORKDIR /
ENTRYPOINT /out --input /inputs/gradient.jpg --output /outputs/gradient_bw.jpg
ENTRYPOINT /out --input /inputs/large-input.csv --output /outputs/output.csv --output-failed /outputs/failed.csv
13 changes: 9 additions & 4 deletions batch-processing/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@ SHELL=/bin/bash
.PHONY: run test develop

outputs:
mkdir -p outputs
mkdir -p tmp/outputs

run: outputs
docker build --target run -t run .
docker run \
--mount type=bind,source="$$(echo $$HOME)/.aws",target=/root/.aws \
-e AWS_ROLE_ARN="arn:aws:iam::297880250375:role/GoCourseLambdaUserReadWriteS3"\
-e AWS_REGION=eu-west-2 -e AWS_PROFILE=cyfplus -e S3_BUCKET="batch-processing-berkeli"\
--mount type=bind,source="$$(pwd)/outputs",target=/outputs \
--rm run

test: outputs
docker build --target test -t test .
docker run \
--rm test
--rm test \

develop: outputs
docker build --target develop -t develop .
docker run -it \
--mount type=bind,source="$$(pwd)",target=/app \
--mount type=bind,source="/tmp",target=/tmp \
--mount type=bind,source="$$(pwd)/outputs",target=/outputs \
--mount type=bind,source="$$(echo $$HOME)/.aws",target=/root/.aws \
-e AWS_ROLE_ARN="arn:aws:iam::297880250375:role/GoCourseLambdaUserReadWriteS3"\
-e AWS_REGION=eu-west-2 -e AWS_PROFILE=cyfplus -e S3_BUCKET="batch-processing-berkeli"\
--mount type=bind,source="$$(pwd)/outputs",target=/app/outputs \
--rm develop
9 changes: 8 additions & 1 deletion batch-processing/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,12 @@ module github.com/CodeYourFuture/immersive-go-course/batch-processing
go 1.19

require (
gopkg.in/gographics/imagick.v2 v2.6.2 // indirect
github.com/aws/aws-sdk-go v1.44.114
github.com/stretchr/testify v1.8.0
gopkg.in/gographics/imagick.v2 v2.6.2
github.com/cenkalti/backoff/v4 v4.1.3
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
34 changes: 34 additions & 0 deletions batch-processing/go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,36 @@
github.com/aws/aws-sdk-go v1.44.114 h1:plIkWc/RsHr3DXBj4MEw9sEW4CcL/e2ryokc+CKyq1I=
github.com/aws/aws-sdk-go v1.44.114/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/gographics/imagick.v2 v2.6.2 h1:8ILTJzDKQKSYSfav+9GZs9H8zOOR2UtZVTWkUdFoiZ8=
gopkg.in/gographics/imagick.v2 v2.6.2/go.mod h1:/QVPLV/iKdNttRKthmDkeeGg+vdHurVEPc8zkU0XgBk=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
188 changes: 188 additions & 0 deletions batch-processing/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package main

import (
"crypto/md5"
"encoding/csv"
"encoding/hex"
"fmt"
"image"
_ "image/gif"
_ "image/jpeg"
_ "image/png"
"io"
"log"
"net/http"
"os"
"strings"
"time"

"errors"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/cenkalti/backoff/v4"
)

const (
InvalidCSVFormat = "csv file must have a header row with 'url' as the first column"
CouldNotFetchImage = "received status %d when trying to download image"
EmptyCSV = "provided CSV file appears to be empty"
)

/**
* Download the image from the URL
* @param: {string} URL - the URL of the image to download
* @return: {[]byte} hash - md5 sum of the image
* @return: {string} ext - the file extension (format) of the image
* @return: {error} err - any error that occurred
*/
func DownloadFileFromUrl(URL string, file *os.File) ([]byte, string, error) {
response, err := http.Get(URL)
if err != nil {
return nil, "", err
}
defer response.Body.Close()

if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices {
return nil, "", fmt.Errorf(CouldNotFetchImage, response.StatusCode)
}

hashReader := md5.New()

tee := io.TeeReader(response.Body, file)
teeHash := io.TeeReader(tee, hashReader)

_, format, err := image.Decode(teeHash)

if err != nil {
return nil, "", err
}

hash := hashReader.Sum(nil)

SupportedImageTypes := []string{"jpeg", "png", "gif"}

if !contains(SupportedImageTypes, format) {
return nil, "", fmt.Errorf("unsupported image type, only the following are supported: %s", SupportedImageTypes)
}

return hash, format, nil
}

/**
* Download the image from the URL with exponential backoff
* @param: {string} URL - the URL of the image to download
* @return: {io.Reader} body - the body of the image
* @return: {string} ext - the file extension (format) of the image
* @return: {string} hash - the md5 hash of the image
* @return: {error} err - any error that occurred
*/
func DownloadWithBackoff(url string, maxRetries uint64, file *os.File) (string, string, error) {
var format string
var hash []byte
var err error

operation := func() error {
hash, format, err = DownloadFileFromUrl(url, file)

if err != nil {
return err
}

return nil
}

notify := func(err error, t time.Duration) {
log.Printf("Error downloading file from %s: %s. Retrying in %s)\n", url, err, t)
}

b := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), maxRetries)

err = backoff.RetryNotify(operation, b, notify)

if err != nil {
return "", "", err
}

return hex.EncodeToString(hash), format, nil
}

/**
* Helper function to check if a string is in a slice of strings
* @param: {[]string} slice - the slice of strings to search
* @param: {string} value - the value to search for
* @return: {bool} - true if the value is in the slice, false otherwise
*/
func contains(arr []string, val string) bool {
for _, v := range arr {
if v == val {
return true
}
}

return false
}

/**
* Function opens the csv file, validates headers and returns the reader.
* @param: {string} filename - the path of the CSV file
* @return: {csv.Reader} csvReader - the CSV reader
* @return: {error} err - any error that occurred
*/
func OpenCSVFile(filename string) (*csv.Reader, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}

csvReader := csv.NewReader(file)
csvReader.FieldsPerRecord = 1

header, err := csvReader.Read()

if err == io.EOF {
return nil, errors.New(EmptyCSV)
}

if err != nil {
return nil, fmt.Errorf("error reading CSV: %v", err)
}

if strings.ToLower(header[0]) != "url" {
return nil, errors.New(InvalidCSVFormat)
}

return csvReader, nil
}

func UploadToS3WithBackoff(file *os.File, key string, a *AWSConfig, maxRetries uint64) error {
operation := func() error {
_, err := a.PutObject(&s3.PutObjectInput{
Bucket: &a.s3bucket,
Key: &key,
Body: file,
})
if err != nil {
return err
}

return nil
}

notify := func(err error, t time.Duration) {
log.Printf("Error uploading file to S3: %s. Retrying in %s)\n", err, t)
}

b := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), maxRetries)

err := backoff.RetryNotify(operation, b, notify)

return err
}

func InputPath(key, ext string) string {
return fmt.Sprintf("/outputs/%s.%s", key, ext)
}

func OutputPath(key, ext string) string {
return fmt.Sprintf("/outputs/%s-converted.%s", key, ext)
}
Loading