Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
Greyeye committed Jun 10, 2021
1 parent 08e29ba commit 96307d0
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 0 deletions.
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/Greyeye/s3trigger-lambda-sample

go 1.16

require (
github.com/aws/aws-lambda-go v1.24.0
github.com/aws/aws-sdk-go-v2 v1.6.0
github.com/aws/aws-sdk-go-v2/config v1.3.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.2.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.10.0
github.com/stretchr/testify v1.7.0
)
60 changes: 60 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aws/aws-lambda-go v1.24.0 h1:bOMerM175hLqHLdF1Nonfv1NA20nTIatuC0HK8eMoYg=
github.com/aws/aws-lambda-go v1.24.0/go.mod h1:jJmlefzPfGnckuHdXX7/80O3BvUUi12XOkbv4w9SGLU=
github.com/aws/aws-sdk-go-v2 v1.6.0 h1:r20hdhm8wZmKkClREfacXrKfX0Y7/s0aOoeraFbf/sY=
github.com/aws/aws-sdk-go-v2 v1.6.0/go.mod h1:tI4KhsR5VkzlUa2DZAdwx7wCAYGwkZZ1H31PYrBFx1w=
github.com/aws/aws-sdk-go-v2/config v1.3.0 h1:0JAnp0WcsgKilFLiZEScUTKIvTKa2LkicadZADza+u0=
github.com/aws/aws-sdk-go-v2/config v1.3.0/go.mod h1:lOxzHWDt/k7MMidA/K8DgXL4+ynnZYsDq65Qhs/l3dg=
github.com/aws/aws-sdk-go-v2/credentials v1.2.1 h1:AqQ8PzWll1wegNUOfIKcbp/JspTbJl54gNonrO6VUsY=
github.com/aws/aws-sdk-go-v2/credentials v1.2.1/go.mod h1:Rfvim1eZTC9W5s8YJyYYtl1KMk6e8fHv+wMRQGO4Ru0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.1.1 h1:w1ocBIhQkLgupEB3d0uOuBddqVYl0xpubz7HSTzWG8A=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.1.1/go.mod h1:GTXAhrxHQOj9N+J5tYVjwt+rpRyy/42qLjlgw9pz1a0=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.2.3 h1:5ohEP3BSrq8HMJLgVkEuEDGHRYfqc/ewqp0w9RFHYwk=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.2.3/go.mod h1:Xw5ywFlWuddXMxz/Pz4Qu4So7JuR7BpK6KdKx5dUhdg=
github.com/aws/aws-sdk-go-v2/internal/ini v1.0.0 h1:k7I9E6tyVWBo7H9ffpnxDWudtjau6Qt9rnOYgV+ciEQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.0.0/go.mod h1:g3XMXuxvqSMUjnsXXp/960152w0wFS4CXVYgQaSVOHE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.1.0 h1:XwqxIO9LtNXznBbEMNGumtLN60k4nVqDpVwVWx3XU/o=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.1.0/go.mod h1:zdjOOy0ojUn3iNELo6ycIHSMCp4xUbycSHfb8PnbbyM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.1.1 h1:l7pDLsmOGrnR8LT+3gIv8NlHpUhs7220E457KEC2UM0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.1.1/go.mod h1:2+ehJPkdIdl46VCj67Emz/EH2hpebHZtaLdzqg+sWOI=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.4.0 h1:VacTNowcxS2WG9cmHbBi7nYq34xFSud7OYSkezf2VyQ=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.4.0/go.mod h1:IpjxfORBAFfkMM0VEx5gPPnEy6WV4Hk0F/+zb/SUWyw=
github.com/aws/aws-sdk-go-v2/service/s3 v1.10.0 h1:BPUiwgs2sTnu1pzBa2oblYzo0qXLfVPblb6QVqcZWkg=
github.com/aws/aws-sdk-go-v2/service/s3 v1.10.0/go.mod h1:azwgEajHWHcobFQRqwHcwLv+m/aip/uZnuqpFm1MSZ4=
github.com/aws/aws-sdk-go-v2/service/sso v1.2.1 h1:alpXc5UG7al7QnttHe/9hfvUfitV8r3w0onPpPkGzi0=
github.com/aws/aws-sdk-go-v2/service/sso v1.2.1/go.mod h1:VimPFPltQ/920i1X0Sb0VJBROLIHkDg2MNP10D46OGs=
github.com/aws/aws-sdk-go-v2/service/sts v1.4.1 h1:9Z00tExoaLutWVDmY6LyvIAcKjHetkbdmpRt4JN/FN0=
github.com/aws/aws-sdk-go-v2/service/sts v1.4.1/go.mod h1:G9osDWA52WQ38BDcj65VY1cNmcAQXAXTsE8IWH8j81w=
github.com/aws/smithy-go v1.4.0 h1:3rsQpgRe+OoQgJhEwGNpIkosl0fJLdmQqF4gSFRjg+4=
github.com/aws/smithy-go v1.4.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
22 changes: 22 additions & 0 deletions lambda/lambda_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"context"
"github.com/aws/aws-lambda-go/events"
"log"
)

func (h *lambdaHandler) handler(ctx context.Context, S3events events.S3Event) error {

// S3 trigger may contain multiple events.
for _, record := range S3events.Records {
err := s3handler(ctx, h.awsClient, record, h.destinationBucket)
if err != nil {
log.Println("S3 Handling error")
return err
}

}
return nil

}
31 changes: 31 additions & 0 deletions lambda/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"context"
"github.com/Greyeye/s3trigger-lambda-sample/pkg/awsclient"
"github.com/aws/aws-lambda-go/lambda"
"log"
"os"
)

type lambdaHandler struct {
awsClient *awsclient.Clients
destinationBucket string
}

func main() {
ac, err := awsclient.New(context.Background())
if err != nil {
log.Fatal("aws sdk client config error ", err.Error())
return
}
if os.Getenv("destinationBucket") == "" {
log.Fatal("please check destinationBucket environment variables")
return
}
lh := &lambdaHandler{
awsClient: ac,
destinationBucket: os.Getenv("destinationBucket"),
}
lambda.Start(lh.handler)
}
89 changes: 89 additions & 0 deletions lambda/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"context"
"fmt"
"github.com/Greyeye/s3trigger-lambda-sample/pkg/awsclient"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"io"
"log"
"net/url"
"sync"
)

// FakeWriterAt returns the writer for a single threat writer.
// credit to
// https://dev.to/flowup/using-io-reader-io-writer-in-go-to-stream-data-3i7b
type FakeWriterAt struct {
w io.Writer
}

func (fw FakeWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
// ignore 'offset' because we forced sequential downloads
return fw.w.Write(p)
}

// s3handler is the handler function to start download/upload goroutine.
func s3handler(ctx context.Context, a *awsclient.Clients, record events.S3EventRecord, destinationBucket string) error {
s3rec := record.S3
fmt.Printf("[%s - %s] Bucket = %s, Key = %s \n", record.EventSource, record.EventTime, s3rec.Bucket.Name, s3rec.Object.Key)
pr, pw := io.Pipe()
wg := sync.WaitGroup{}
wg.Add(2)
// run downloader and uploader in tandem.
// it is connected via io.Pipe() and stream byte data between each other.
// eliminating need the step to download S3 object to local storage then upload.
go downloader(ctx, a, pw, &wg, s3rec)
go uploader(ctx, a, &wg, pr, destinationBucket, s3rec)
wg.Wait()

return nil
}

// downloader object from S3 and send to PipeWriter.
func downloader(ctx context.Context, a *awsclient.Clients, pw *io.PipeWriter, wg *sync.WaitGroup, s3rec events.S3Entity) {
defer func() {
wg.Done()
pw.Close()
}()
// submitted payload may escape email at sign `@` with urlescape, need to unescape it.
keyEscapaed, err := url.QueryUnescape(s3rec.Object.Key)
if err != nil {
log.Println("failed to parse URL key: ", err)
return
}
file := &s3.GetObjectInput{Bucket: aws.String(s3rec.Bucket.Name), Key: aws.String(keyEscapaed)}
downloader := manager.NewDownloader(a.S3client)
// limit concurrency to 1, otherwise downloaded stream will be out of orders.
downloader.Concurrency = 1
_, err = downloader.Download(ctx, FakeWriterAt{pw}, file)
if err != nil {
fmt.Println("Download failed:", err.Error())
}
}

// uploader receive io.pipe buffer to destination S3 bucket.
func uploader(ctx context.Context, a *awsclient.Clients, wg *sync.WaitGroup, pr *io.PipeReader, destinationBucket string, s3rec events.S3Entity) {
uploader := manager.NewUploader(a.S3client)
uploader.Concurrency = 1
defer func() {
wg.Done()
pr.Close()
}()
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
Body: pr,
Bucket: aws.String(destinationBucket),
Key: aws.String(s3rec.Object.Key),
// ContentType: aws.String("text/csv"),
// ideally, content Type should be set, but events.S3Entity does not contain metadata.
// hardcode the content type, or build another parameters to setup contentType.
// default ContentType will be used without specifying the value. (eg application/octet-stream)
})
if err != nil {
fmt.Println("uploading failed: ", s3rec.Object.Key)
fmt.Println(err.Error())
}
}
122 changes: 122 additions & 0 deletions lambda/s3_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"bytes"
"context"
"github.com/Greyeye/s3trigger-lambda-sample/pkg/awsclient"
"github.com/Greyeye/s3trigger-lambda-sample/pkg/awsclient/mocks"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"io"
"io/ioutil"
"os"
"sync"
"testing"
)

type nopCloser struct {
io.Reader
}

func (nopCloser) Close() error { return nil }

func TestDownloader(t *testing.T) {
testFilename := "test.txt"
testWordList := "Hi there, this is a test file"
// prepare mock for GetObject
mockAWS := new(mocks.S3clientIface)
mockAWS.On("GetObject", mock.Anything, mock.Anything, mock.Anything).Return(&s3.GetObjectOutput{
ContentLength: 10,
Body: nopCloser{bytes.NewBufferString(testWordList)},
}, nil)

ac := &awsclient.Clients{}
// override s3Client with mock AWS interface
ac.S3client = mockAWS
pr, pw := io.Pipe()
wg := sync.WaitGroup{}

wg.Add(2)
// start the process with dummy S3Object metadata.
// Object's content is submitted as part of unit test. (see testWordList)
go downloader(context.TODO(), ac, pw, &wg,
events.S3Entity{
Bucket: events.S3Bucket{
Name: "s3buckettest",
},
Object: events.S3Object{
Key: "dummyfile.txt",
Size: 10,
},
})
// create temp file to store data.
f, err := os.Create(testFilename)
if err != nil {
t.Fatal(err)
}
// use io.copy to write data from downloader to the file.
go func() {
io.Copy(f, pr)
wg.Done()
}()
wg.Wait()

//open unzipped file and verify the content.
data, err := ioutil.ReadFile(testFilename)
assert.Nil(t, err)
assert.Equal(t, testWordList, string(data))
// remove file to clean up
os.Remove(testFilename)
}

func TestBufferUploader(t *testing.T) {
//testFilename := "test.txt"
testWordList := "Hi there, this is a test file"
// prepare mock for GetObject
mockAWS := new(mocks.S3clientIface)
mockAWS.On("PutObject", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&s3.PutObjectOutput{
ETag: aws.String("dummyTag"),
VersionId: aws.String("dummy version"),
}, nil)
mockAWS.On("GetObject", mock.Anything, mock.Anything, mock.Anything).Return(&s3.GetObjectOutput{
ContentLength: 10,
Body: nopCloser{bytes.NewBufferString(testWordList)},
}, nil)
//CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput
ac := &awsclient.Clients{}
// override s3Client with mock AWS interface
ac.S3client = mockAWS
pr, pw := io.Pipe()
wg := sync.WaitGroup{}

wg.Add(2)

go downloader(context.TODO(), ac, pw, &wg,
events.S3Entity{
Bucket: events.S3Bucket{
Name: "s3buckettest",
},
Object: events.S3Object{
Key: "dummyfile.txt",
Size: 10,
},
})

go uploader(context.TODO(), ac, &wg, pr, "destinationBucket", events.S3Entity{
Bucket: events.S3Bucket{
Name: "s3buckettest",
},
Object: events.S3Object{
Key: "dummyfile.txt",
Size: 5,
},
})

// use io.copy to write data from downloader to the file.
wg.Wait()

}
34 changes: 34 additions & 0 deletions pkg/awsclient/api_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package awsclient

import (
"context"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

type Clients struct {
S3client S3clientIface
}

// S3clientIface is interface to mock aws s3 services
type S3clientIface interface {
GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObject(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
}

// New will setup the AWS client service with s3client interface
func New(ctx context.Context) (*Clients, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, err
}
// init all client connections

return &Clients{
S3client: s3.NewFromConfig(cfg),
}, nil
}

0 comments on commit 96307d0

Please sign in to comment.