Skip to content

Commit

Permalink
add filesystem storage driver
Browse files Browse the repository at this point in the history
  • Loading branch information
aditsachde committed Jul 29, 2024
1 parent 092e5f3 commit 326f48c
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 37 deletions.
44 changes: 19 additions & 25 deletions integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"itko.dev/internal/ctsubmit"
)

func setup(startSignal chan<- struct{}, configChan chan<- ctsubmit.GlobalConfig) {
func setup(partialConfig ctsubmit.GlobalConfig, startSignal chan<- struct{}, configChan chan<- ctsubmit.GlobalConfig) {
ctx := context.Background()

// Testcontainers is nice, but consul and minio run nativily on macos.
Expand All @@ -29,28 +29,25 @@ func setup(startSignal chan<- struct{}, configChan chan<- ctsubmit.GlobalConfig)
consulEndpoint, consulCleanup := consulSetup(ctx)
defer consulCleanup()

minioEndpoint, minioUsername, minioPassword, minioBucket, minioRegion, minioCleanup := minioSetup(ctx)
defer minioCleanup()

// Upload config to Consul
logName := "testlog"
config := partialConfig
config.Name = logName

var ctmonitortileurl string
ctmonitortiledir := config.RootDirectory
ctmonitormasksize := config.MaskSize

if config.RootDirectory == "" {
minioEndpoint, minioUsername, minioPassword, minioBucket, minioRegion, minioCleanup := minioSetup(ctx)
defer minioCleanup()

config := ctsubmit.GlobalConfig{
Name: logName,
KeyPath: "./testdata/ct-http-server.privkey.plaintext.pem",
LogID: "lrviNpCI/wLGL5VTfK25b8cOdbP0YA7tGoQak5jST9o=",
ListenAddress: "localhost:3030",
MaskSize: 5,

S3Bucket: minioBucket,
S3Region: minioRegion,
S3EndpointUrl: minioEndpoint,
S3StaticCredentialUserName: minioUsername,
S3StaticCredentialPassword: minioPassword,

NotAfterStart: "2020-01-01T00:00:00Z",
NotAfterLimit: "2030-01-01T00:00:00Z",
FlushMs: 50,
config.S3Bucket = minioBucket
config.S3Region = minioRegion
config.S3EndpointUrl = minioEndpoint
config.S3StaticCredentialUserName = minioUsername
config.S3StaticCredentialPassword = minioPassword

ctmonitortileurl = minioEndpoint + "/" + minioBucket + "/"
}

ctsetup.MainMain(ctx, consulEndpoint, logName, "./testdata/fake-ca.cert", "./testdata/ct-http-server.privkey.plaintext.pem", config)
Expand All @@ -67,11 +64,8 @@ func setup(startSignal chan<- struct{}, configChan chan<- ctsubmit.GlobalConfig)
log.Fatalf("failed to create listener: %s", err)
}

ctmonitortileurl := minioEndpoint + "/" + minioBucket + "/"
ctmonitormasksize := config.MaskSize

go ctsubmit.MainMain(ctx, submitListener, logName, consulEndpoint, startSignal)
go ctmonitor.MainMain(monitorListener, ctmonitortileurl, ctmonitormasksize, startSignal)
go ctmonitor.MainMain(monitorListener, ctmonitortiledir, ctmonitortileurl, ctmonitormasksize, startSignal)
proxy(config.ListenAddress, monitorListener.Addr().String(), submitListener.Addr().String())
}

Expand Down
15 changes: 13 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ import (
"itko.dev/internal/ctsubmit"
)

var partialConfig ctsubmit.GlobalConfig = ctsubmit.GlobalConfig{
KeyPath: "./testdata/ct-http-server.privkey.plaintext.pem",
LogID: "lrviNpCI/wLGL5VTfK25b8cOdbP0YA7tGoQak5jST9o=",
ListenAddress: "localhost:3030",
MaskSize: 5,

NotAfterStart: "2020-01-01T00:00:00Z",
NotAfterLimit: "2030-01-01T00:00:00Z",
FlushMs: 50,
}

func TestCTIntegration(t *testing.T) {
// pprof endpoint
go func() {
Expand All @@ -24,7 +35,7 @@ func TestCTIntegration(t *testing.T) {
startSignal := make(chan struct{})
configChan := make(chan ctsubmit.GlobalConfig)

go setup(startSignal, configChan)
go setup(partialConfig, startSignal, configChan)
c := <-configChan
var config configpb.LogConfig

Expand Down Expand Up @@ -67,7 +78,7 @@ func TestCTHammer(t *testing.T) {
startSignal := make(chan struct{})
configChan := make(chan ctsubmit.GlobalConfig)

go setup(startSignal, configChan)
go setup(partialConfig, startSignal, configChan)
c := <-configChan
var config configpb.LogConfig

Expand Down
13 changes: 10 additions & 3 deletions internal/ctmonitor/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@ import (
)

// TODO: Evaluate if the context is actually needed
func Start(ctx context.Context, tileStoreUrl string, maskSize int) (http.Handler, error) {
storage := &UrlStorage{urlPrefix: tileStoreUrl}
f := newFetch(storage, maskSize)
func Start(ctx context.Context, tileStoreDir string, tileStoreUrl string, maskSize int) (http.Handler, error) {
var f Fetch

if tileStoreDir != "" {
storage := &FsStorage{root: tileStoreDir}
f = newFetch(storage, maskSize)
} else {
storage := &UrlStorage{urlPrefix: tileStoreUrl}
f = newFetch(storage, maskSize)
}

// Wrap the HTTP handler function with OTel instrumentation
wGetSth := otelhttp.NewHandler(http.HandlerFunc(wrapper(f.get_sth)), "get-sth")
Expand Down
6 changes: 3 additions & 3 deletions internal/ctmonitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (

// This is seperated so we can run this in the integration test.
// Tests don't need to export Otel to Honeycomb.
func MainMain(listener net.Listener, storeAddress string, maskSize int, startSignal chan<- struct{}) {
if storeAddress == "" {
func MainMain(listener net.Listener, storeDirectory string, storeAddress string, maskSize int, startSignal chan<- struct{}) {
if storeDirectory == "" && storeAddress == "" {
log.Fatal("Must provide a tile storage backend address")
}

mux, err := Start(context.Background(), storeAddress, maskSize)
mux, err := Start(context.Background(), storeDirectory, storeAddress, maskSize)
if err != nil {
log.Fatalf("Failed to get log handler: %v", err)
}
Expand Down
19 changes: 19 additions & 0 deletions internal/ctmonitor/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"net/http"
"os"
)

type Storage interface {
Expand Down Expand Up @@ -39,3 +40,21 @@ func (f *UrlStorage) Get(ctx context.Context, key string) (data []byte, notfound
}
return body, false, nil
}

// ------------------------------------------------------------

type FsStorage struct {
root string
}

func (f *FsStorage) Get(ctx context.Context, key string) (data []byte, notfounderr bool, err error) {
// try and read the file using os.Readfile
data, err = os.ReadFile(f.root + key)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, true, err
}
return nil, false, err
}
return data, false, nil
}
7 changes: 5 additions & 2 deletions internal/ctsubmit/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"

s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/google/certificate-transparency-go/x509"
Expand Down Expand Up @@ -105,8 +106,9 @@ func (b *Bucket) PutRecordHashes(ctx context.Context, hashes []RecordHashUpload,
var err error
f[e.hashPath], err = b.S.Get(ctx, "int/hashes/"+e.hashPath)
if err != nil {
// TODO: move this logic into the storage interface
var notFound *s3types.NoSuchKey
if errors.As(err, &notFound) {
if errors.As(err, &notFound) || errors.Is(err, os.ErrNotExist) {
// If the file is not found, create a new one.
f[e.hashPath] = make([]byte, 0)
} else {
Expand Down Expand Up @@ -245,8 +247,9 @@ func (b *Bucket) PutDedupeEntries(ctx context.Context, hashes []DedupeUpload, ma
var err error
f[e.hashPath], err = b.S.Get(ctx, "int/dedupe/"+e.hashPath)
if err != nil {
// TODO: move this logic into the storage interface
var notFound *s3types.NoSuchKey
if errors.As(err, &notFound) {
if errors.As(err, &notFound) || errors.Is(err, os.ErrNotExist) {
// If the file is not found, create a new one.
f[e.hashPath] = make([]byte, 0)
} else {
Expand Down
18 changes: 16 additions & 2 deletions internal/ctsubmit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type GlobalConfig struct {
ListenAddress string `json:"listenAddress"`
MaskSize int `json:"maskSize"`

// If this is set, the log will write to the filesystem instead of S3
// This value is prefered over the S3 values
RootDirectory string `json:"rootDirectory"`

S3Bucket string `json:"s3Bucket"`
S3Region string `json:"s3Region"`
S3EndpointUrl string `json:"s3EndpointUrl"`
Expand Down Expand Up @@ -201,8 +205,18 @@ func LoadLog(ctx context.Context, kvpath, consulAddress string) (*Log, error) {

stageOneCommChan := make(chan UnsequencedEntryWithReturnPath, 200)
stageTwoCommChan := make(chan []LogEntryWithReturnPath, 2)
s3Storage := NewS3Storage(gc.S3Region, gc.S3Bucket, gc.S3EndpointUrl, gc.S3StaticCredentialUserName, gc.S3StaticCredentialPassword)
bucket := Bucket{S: &s3Storage}

var bucket Bucket

if gc.RootDirectory != "" {
log.Println("Using filesystem storage")
fsStorage := NewFsStorage(gc.RootDirectory)
bucket = Bucket{S: &fsStorage}
} else {
log.Println("Using S3 storage")
s3Storage := NewS3Storage(gc.S3Region, gc.S3Bucket, gc.S3EndpointUrl, gc.S3StaticCredentialUserName, gc.S3StaticCredentialPassword)
bucket = Bucket{S: &s3Storage}
}

// Get the latest STH
var sth ct.SignedTreeHead
Expand Down
37 changes: 37 additions & 0 deletions internal/ctsubmit/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"io"
"net/http"
"os"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
Expand Down Expand Up @@ -83,3 +84,39 @@ func (b *S3Storage) Exists(ctx context.Context, key string) (bool, error) {
}
return true, nil
}

// ------------------------------------------------------------

type FsStorage struct {
root string
}

func NewFsStorage(rootDirectory string) FsStorage {
return FsStorage{
root: rootDirectory,
}
}

func (f *FsStorage) Get(ctx context.Context, key string) ([]byte, error) {
// try and read the file using os.Readfile
data, err := os.ReadFile(f.root + key)
if err != nil {
return nil, err
}
return data, nil
}

func (f *FsStorage) Set(ctx context.Context, key string, data []byte) error {
return os.WriteFile(f.root+key, data, 0644)
}

func (f *FsStorage) Exists(ctx context.Context, key string) (bool, error) {
_, err := os.Stat(f.root + key)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return false, nil
}
return false, err
}
return true, nil
}

0 comments on commit 326f48c

Please sign in to comment.