Skip to content

Commit

Permalink
Merge pull request #286 from lukaszbudnik/loader-health-checks
Browse files Browse the repository at this point in the history
Loader health checks
  • Loading branch information
lukaszbudnik authored Sep 8, 2021
2 parents 22e908b + eb737f4 commit 8ef8cd2
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 7 deletions.
54 changes: 52 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ curl http://localhost:8080/

Sample HTTP response:

```
{"release":"v2020.1.3","commitSha":"b56a2694fcdb523e0c3f3e79b2d7a1b61f28a91f","commitDate":"2020-10-12T13:22:57+02:00","apiVersions":["v1","v2"]}
```json
{
"release": "refs/tags/v2021.1.0",
"sha": "3ede93745e459e1214513b21ef76d94d09d10ae7",
"apiVersions": ["v2"]
}
```

## /v2 - GraphQL API
Expand Down Expand Up @@ -653,6 +657,52 @@ The following metrics are available:
- `migrator_gin_migrations_applied{type="tenant_migrations_total"}` - migrator total tenant migrations applied (for all tenants)
- `migrator_gin_migrations_applied{type="tenant_scripts_total"}` - migrator total tenant scripts applied (for all tenants)

# Health Checks

Health checks are available at `/health` endpoint. migrator implements [Eclipse MicroProfile Health 3.0 RC4](https://download.eclipse.org/microprofile/microprofile-health-3.0-RC4/microprofile-health-spec.html) spec.

A successful response returns HTTP 200 OK code:

```json
{
"status": "UP",
"checks": [
{
"name": "DB",
"status": "UP"
},
{
"name": "Loader",
"status": "UP"
}
]
}
```

In case one of the checks has DOWN status then the overall status is DOWN. Failed check has `data` field which provides more information on why its status is DOWN. Health check will also return HTTP 503 Service Unavailable code:

```json
{
"status": "DOWN",
"checks": [
{
"name": "DB",
"status": "DOWN",
"data": {
"details": "failed to connect to database: dial tcp 127.0.0.1:5432: connect: connection refused"
}
},
{
"name": "Loader",
"status": "DOWN",
"data": {
"details": "open /nosuchdir/migrations: no such file or directory"
}
}
]
}
```

# Tutorials

In this section I provide links to more in-depth migrator tutorials.
Expand Down
9 changes: 9 additions & 0 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,15 @@ func (c *coordinator) HealthCheck() types.HealthResponse {
response.Status = types.HealthStatusDown
}

// Loader check
err = c.loader.HealthCheck()
if err == nil {
checks = append(checks, types.HealthChecks{Name: "Loader", Status: types.HealthStatusUp})
} else {
checks = append(checks, types.HealthChecks{Name: "Loader", Status: types.HealthStatusDown, Data: &types.HealthData{Details: err.Error()}})
response.Status = types.HealthStatusDown
}

response.Checks = checks

return response
Expand Down
24 changes: 24 additions & 0 deletions coordinator/coordinator_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,26 @@ func (m *mockedDiskLoader) GetSourceMigrations() []types.Migration {
return []types.Migration{m1, m2, m3, m4, m5}
}

func (m *mockedDiskLoader) HealthCheck() error {
return nil
}

func newMockedDiskLoader(_ context.Context, _ *config.Config) loader.Loader {
return &mockedDiskLoader{}
}

type mockedDiskLoaderHealthCheckError struct {
mockedDiskLoader
}

func (m *mockedDiskLoaderHealthCheckError) HealthCheck() error {
return errors.New("trouble maker")
}

func newMockedDiskLoaderHealthCheckError(_ context.Context, _ *config.Config) loader.Loader {
return &mockedDiskLoaderHealthCheckError{}
}

type mockedNotifier struct {
returnError bool
}
Expand Down Expand Up @@ -64,6 +80,10 @@ func (m *mockedBrokenCheckSumDiskLoader) GetSourceMigrations() []types.Migration
return []types.Migration{m1}
}

func (m *mockedBrokenCheckSumDiskLoader) HealthCheck() error {
return nil
}

func newBrokenCheckSumMockedDiskLoader(_ context.Context, _ *config.Config) loader.Loader {
return new(mockedBrokenCheckSumDiskLoader)
}
Expand All @@ -77,6 +97,10 @@ func (m *mockedDifferentScriptCheckSumMockedDiskLoader) GetSourceMigrations() []
return []types.Migration{m1, m2}
}

func (m *mockedDifferentScriptCheckSumMockedDiskLoader) HealthCheck() error {
return nil
}

func newDifferentScriptCheckSumMockedDiskLoader(_ context.Context, _ *config.Config) loader.Loader {
return new(mockedDifferentScriptCheckSumMockedDiskLoader)
}
Expand Down
17 changes: 16 additions & 1 deletion coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,15 @@ func TestCreateTenant(t *testing.T) {
assert.NotNil(t, results.Version)
}

func TestHealthCheckDBOK(t *testing.T) {
func TestHealthCheckDBAndLoaderOK(t *testing.T) {
coordinator := New(context.TODO(), nil, newNoopMetrics(), newMockedConnector, newMockedDiskLoader, newErrorMockedNotifier)
defer coordinator.Dispose()
healthResponse := coordinator.HealthCheck()
assert.Equal(t, types.HealthStatusUp, healthResponse.Status)
assert.Equal(t, "DB", healthResponse.Checks[0].Name)
assert.Equal(t, types.HealthStatusUp, healthResponse.Checks[0].Status)
assert.Equal(t, "Loader", healthResponse.Checks[1].Name)
assert.Equal(t, types.HealthStatusUp, healthResponse.Checks[1].Status)
}

func TestHealthCheckDBKO(t *testing.T) {
Expand All @@ -383,4 +385,17 @@ func TestHealthCheckDBKO(t *testing.T) {
assert.Equal(t, types.HealthStatusDown, healthResponse.Status)
assert.Equal(t, "DB", healthResponse.Checks[0].Name)
assert.Equal(t, types.HealthStatusDown, healthResponse.Checks[0].Status)
assert.Equal(t, "Loader", healthResponse.Checks[1].Name)
assert.Equal(t, types.HealthStatusUp, healthResponse.Checks[1].Status)
}

func TestHealthCheckLoaderKO(t *testing.T) {
coordinator := New(context.TODO(), nil, newNoopMetrics(), newMockedConnector, newMockedDiskLoaderHealthCheckError, newErrorMockedNotifier)
defer coordinator.Dispose()
healthResponse := coordinator.HealthCheck()
assert.Equal(t, types.HealthStatusDown, healthResponse.Status)
assert.Equal(t, "DB", healthResponse.Checks[0].Name)
assert.Equal(t, types.HealthStatusUp, healthResponse.Checks[0].Status)
assert.Equal(t, "Loader", healthResponse.Checks[1].Name)
assert.Equal(t, types.HealthStatusDown, healthResponse.Checks[1].Status)
}
34 changes: 30 additions & 4 deletions loader/azureblob_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,38 @@ func (abl *azureBlobLoader) getAzureStorageCredentials() (azblob.Credential, err
return nil, err
}

err = azureServicePrincipalToken.Refresh()
if err != nil {
return nil, err
}
token := azureServicePrincipalToken.Token()

credential := azblob.NewTokenCredential(token.AccessToken, nil)
return credential, nil
}

func (abl *azureBlobLoader) HealthCheck() error {
credential, err := abl.getAzureStorageCredentials()
if err != nil {
return err
}

p := azblob.NewPipeline(credential, azblob.PipelineOptions{})

// check if optional prefixes are provided
baseLocation := strings.TrimRight(abl.config.BaseLocation, "/")
indx := common.FindNthIndex(baseLocation, '/', 4)

prefix := ""
if indx > -1 {
prefix = baseLocation[indx+1:]
baseLocation = baseLocation[:indx]
}

u, err := url.Parse(baseLocation)
if err != nil {
return err
}

containerURL := azblob.NewContainerURL(*u, p)

_, err = containerURL.ListBlobsFlatSegment(abl.ctx, azblob.Marker{}, azblob.ListBlobsSegmentOptions{Prefix: prefix, MaxResults: 1})

return err
}
40 changes: 40 additions & 0 deletions loader/azureblob_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,43 @@ func TestAzureGetSourceMigrationsWithOptionalPrefix(t *testing.T) {
assert.Contains(t, migrations[11].File, "prod/artefacts/migrations/tenants-scripts/b.sql")

}

func TestAzureHealthCheck(t *testing.T) {
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")

if len(accountName) == 0 || len(accountKey) == 0 {
t.Skip("skipping test AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set")
}

baseLocation := fmt.Sprintf("https://%v.blob.core.windows.net/myothercontainer/prod/artefacts/", accountName)

config := &config.Config{
BaseLocation: baseLocation,
SingleMigrations: []string{"migrations/config", "migrations/ref"},
TenantMigrations: []string{"migrations/tenants"},
SingleScripts: []string{"migrations/config-scripts"},
TenantScripts: []string{"migrations/tenants-scripts"},
}

loader := &azureBlobLoader{baseLoader{context.TODO(), config}}
err := loader.HealthCheck()
assert.Nil(t, err)
}

func TestAzureMsiCredentials(t *testing.T) {
// in CI/CD env the MSI credentials are not available
// this code just assures that if no shared key envs are present it will fallback to MSI
// unsetting one of the shared key envs will cause fallback to MSI
os.Unsetenv("AZURE_STORAGE_ACCESS_KEY")

config := &config.Config{
BaseLocation: "https://justtesting.blob.core.windows.net/myothercontainer/prod/artefacts/",
SingleMigrations: []string{"migrations/config", "migrations/ref"},
TenantMigrations: []string{"migrations/tenants"},
SingleScripts: []string{"migrations/config-scripts"},
TenantScripts: []string{"migrations/tenants-scripts"},
}

loader := &azureBlobLoader{baseLoader{context.TODO(), config}}
loader.getAzureStorageCredentials()
}
9 changes: 9 additions & 0 deletions loader/disk_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ func (dl *diskLoader) GetSourceMigrations() []types.Migration {
return migrations
}

func (dl *diskLoader) HealthCheck() error {
absBaseDir, err := filepath.Abs(dl.config.BaseLocation)
if err != nil {
return err
}
_, err = ioutil.ReadDir(absBaseDir)
return err
}

func (dl *diskLoader) getDirs(baseDir string, migrationsDirs []string) []string {
var filteredDirs []string
for _, migrationsDir := range migrationsDirs {
Expand Down
9 changes: 9 additions & 0 deletions loader/disk_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,12 @@ func TestDiskGetDiskMigrations(t *testing.T) {
assert.Contains(t, migrations[10].File, "test/migrations/tenants-scripts/a.sql")
assert.Contains(t, migrations[11].File, "test/migrations/tenants-scripts/b.sql")
}

func TestDiskHealthCheck(t *testing.T) {
config := &config.Config{
BaseLocation: "/path/to/baseDir",
}
loader := New(context.TODO(), config)
err := loader.HealthCheck()
assert.NotNil(t, err)
}
1 change: 1 addition & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
// Loader interface abstracts all loading operations performed by migrator
type Loader interface {
GetSourceMigrations() []types.Migration
HealthCheck() error
}

// Factory is a factory method for creating Loader instance
Expand Down
29 changes: 29 additions & 0 deletions loader/s3_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,35 @@ func (s3l *s3Loader) GetSourceMigrations() []types.Migration {
return s3l.doGetSourceMigrations(client)
}

func (s3l *s3Loader) HealthCheck() error {
sess, err := session.NewSession()
if err != nil {
return err
}
client := s3.New(sess)
return s3l.doHealthCheck(client)
}

func (s3l *s3Loader) doHealthCheck(client s3iface.S3API) error {
bucketWithPrefixes := strings.Split(strings.Replace(strings.TrimRight(s3l.config.BaseLocation, "/"), "s3://", "", 1), "/")

bucket := bucketWithPrefixes[0]
prefix := "/"
if len(bucketWithPrefixes) > 1 {
prefix = strings.Join(bucketWithPrefixes[1:], "/")
}

input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(1),
}

_, err := client.ListObjectsV2(input)

return err
}

func (s3l *s3Loader) doGetSourceMigrations(client s3iface.S3API) []types.Migration {
migrations := []types.Migration{}

Expand Down
21 changes: 21 additions & 0 deletions loader/s3_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type mockS3Client struct {
s3iface.S3API
}

func (m *mockS3Client) ListObjectsV2(input *s3.ListObjectsV2Input) (*s3.ListObjectsV2Output, error) {
return &s3.ListObjectsV2Output{}, nil
}

func (m *mockS3Client) ListObjectsV2Pages(input *s3.ListObjectsV2Input, callback func(*s3.ListObjectsV2Output, bool) bool) error {

var contents []*s3.Object
Expand Down Expand Up @@ -140,3 +144,20 @@ func TestS3GetSourceMigrationsBucketWithPrefix(t *testing.T) {
assert.Contains(t, migrations[10].File, "application-x/prod/migrations/tenants-scripts/recreate-triggers.sql")
assert.Contains(t, migrations[11].File, "application-x/prod/migrations/tenants-scripts/run-reports.sql")
}

func TestS3HealthCheck(t *testing.T) {
mock := &mockS3Client{}

config := &config.Config{
BaseLocation: "s3://your-bucket-migrator",
SingleMigrations: []string{"migrations/config", "migrations/ref"},
TenantMigrations: []string{"migrations/tenants"},
SingleScripts: []string{"migrations/config-scripts"},
TenantScripts: []string{"migrations/tenants-scripts"},
}

loader := &s3Loader{baseLoader{context.TODO(), config}}
err := loader.doHealthCheck(mock)

assert.Nil(t, err)
}

0 comments on commit 8ef8cd2

Please sign in to comment.