Skip to content

Commit

Permalink
Decouple config models from internal models (#1161)
Browse files Browse the repository at this point in the history
  • Loading branch information
cristiangreco authored Oct 9, 2023
1 parent 07c6b66 commit c292431
Show file tree
Hide file tree
Showing 27 changed files with 576 additions and 475 deletions.
27 changes: 16 additions & 11 deletions cmd/yace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ var (
profilingEnabled bool

logger logging.Logger

cfg = config.ScrapeConf{}
)

func main() {
Expand Down Expand Up @@ -204,7 +202,8 @@ func NewYACEApp() *cli.App {
Action: func(c *cli.Context) error {
logger = logging.NewLogger(logFormat, debug, "version", version)
logger.Info("Parsing config")
if err := cfg.Load(configFile, logger); err != nil {
cfg := config.ScrapeConf{}
if _, err := cfg.Load(configFile, logger); err != nil {
logger.Error(err, "Couldn't read config file", "path", configFile)
os.Exit(1)
}
Expand Down Expand Up @@ -239,26 +238,29 @@ func startScraper(c *cli.Context) error {
}

logger.Info("Parsing config")
if err := cfg.Load(configFile, logger); err != nil {

cfg := config.ScrapeConf{}
jobsCfg, err := cfg.Load(configFile, logger)
if err != nil {
return fmt.Errorf("Couldn't read %s: %w", configFile, err)
}

featureFlags := c.StringSlice(enableFeatureFlag)
s := NewScraper(featureFlags)
var cache cachingFactory = v1.NewFactory(cfg, fips, logger)
var cache cachingFactory = v1.NewFactory(logger, jobsCfg, fips)
for _, featureFlag := range featureFlags {
if featureFlag == config.AwsSdkV2 {
var err error
// Can't override cache while also creating err
cache, err = v2.NewFactory(cfg, fips, logger)
cache, err = v2.NewFactory(logger, jobsCfg, fips)
if err != nil {
return fmt.Errorf("failed to construct aws sdk v2 client cache: %w", err)
}
}
}

ctx, cancelRunningScrape := context.WithCancel(context.Background())
go s.decoupled(ctx, logger, cache)
go s.decoupled(ctx, logger, jobsCfg, cache)

mux := http.NewServeMux()

Expand Down Expand Up @@ -291,20 +293,23 @@ func startScraper(c *cli.Context) error {
w.WriteHeader(http.StatusNotFound)
return
}

logger.Info("Parsing config")
if err := cfg.Load(configFile, logger); err != nil {
newCfg := config.ScrapeConf{}
newJobsCfg, err := newCfg.Load(configFile, logger)
if err != nil {
logger.Error(err, "Couldn't read config file", "path", configFile)
return
}

logger.Info("Reset clients cache")
cache = v1.NewFactory(cfg, fips, logger)
cache = v1.NewFactory(logger, newJobsCfg, fips)
for _, featureFlag := range featureFlags {
if featureFlag == config.AwsSdkV2 {
logger.Info("Using aws sdk v2")
var err error
// Can't override cache while also creating err
cache, err = v2.NewFactory(cfg, fips, logger)
cache, err = v2.NewFactory(logger, newJobsCfg, fips)
if err != nil {
logger.Error(err, "Failed to construct aws sdk v2 client cache", "path", configFile)
return
Expand All @@ -314,7 +319,7 @@ func startScraper(c *cli.Context) error {

cancelRunningScrape()
ctx, cancelRunningScrape = context.WithCancel(context.Background())
go s.decoupled(ctx, logger, cache)
go s.decoupled(ctx, logger, newJobsCfg, cache)
})

logger.Info("Yace startup completed", "version", version, "feature_flags", strings.Join(featureFlags, ","))
Expand Down
11 changes: 6 additions & 5 deletions cmd/yace/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
exporter "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type scraper struct {
Expand Down Expand Up @@ -40,9 +41,9 @@ func (s *scraper) makeHandler() func(http.ResponseWriter, *http.Request) {
}
}

func (s *scraper) decoupled(ctx context.Context, logger logging.Logger, cache cachingFactory) {
func (s *scraper) decoupled(ctx context.Context, logger logging.Logger, jobsCfg model.JobsConfig, cache cachingFactory) {
logger.Debug("Starting scraping async")
s.scrape(ctx, logger, cache)
s.scrape(ctx, logger, jobsCfg, cache)

scrapingDuration := time.Duration(scrapingInterval) * time.Second
ticker := time.NewTicker(scrapingDuration)
Expand All @@ -54,12 +55,12 @@ func (s *scraper) decoupled(ctx context.Context, logger logging.Logger, cache ca
return
case <-ticker.C:
logger.Debug("Starting scraping async")
go s.scrape(ctx, logger, cache)
go s.scrape(ctx, logger, jobsCfg, cache)
}
}
}

func (s *scraper) scrape(ctx context.Context, logger logging.Logger, cache cachingFactory) {
func (s *scraper) scrape(ctx context.Context, logger logging.Logger, jobsCfg model.JobsConfig, cache cachingFactory) {
if !sem.TryAcquire(1) {
// This shouldn't happen under normal use, users should adjust their configuration when this occurs.
// Let them know by logging a warning.
Expand Down Expand Up @@ -98,7 +99,7 @@ func (s *scraper) scrape(ctx context.Context, logger logging.Logger, cache cachi
err := exporter.UpdateMetrics(
ctx,
logger,
cfg,
jobsCfg,
newRegistry,
cache,
options...,
Expand Down
9 changes: 4 additions & 5 deletions pkg/clients/cloudwatch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)
Expand All @@ -19,14 +18,14 @@ type Client interface {
// ListMetrics returns the list of metrics and dimensions for a given namespace
// and metric name. Results pagination is handled automatically: the caller can
// optionally pass a non-nil func in order to handle results pages.
ListMetrics(ctx context.Context, namespace string, metric *config.Metric, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error)
ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error)

// GetMetricData returns the output of the GetMetricData CloudWatch API.
// Results pagination is handled automatically.
GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []MetricDataResult

// GetMetricStatistics returns the output of the GetMetricStatistics CloudWatch API.
GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *config.Metric) []*model.Datapoint
GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint
}

// ConcurrencyLimiter limits the concurrency when calling AWS CloudWatch APIs. The functions implemented
Expand Down Expand Up @@ -61,7 +60,7 @@ func NewLimitedConcurrencyClient(client Client, limiter ConcurrencyLimiter) Clie
}
}

func (c limitedConcurrencyClient) GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *config.Metric) []*model.Datapoint {
func (c limitedConcurrencyClient) GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint {
c.limiter.Acquire(getMetricStatisticsCall)
res := c.client.GetMetricStatistics(ctx, logger, dimensions, namespace, metric)
c.limiter.Release(getMetricStatisticsCall)
Expand All @@ -75,7 +74,7 @@ func (c limitedConcurrencyClient) GetMetricData(ctx context.Context, logger logg
return res
}

func (c limitedConcurrencyClient) ListMetrics(ctx context.Context, namespace string, metric *config.Metric, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error) {
func (c limitedConcurrencyClient) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error) {
c.limiter.Acquire(listMetricsCall)
res, err := c.client.ListMetrics(ctx, namespace, metric, recentlyActiveOnly, fn)
c.limiter.Release(listMetricsCall)
Expand Down
5 changes: 2 additions & 3 deletions pkg/clients/cloudwatch/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"

cloudwatch_client "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/promutil"
Expand All @@ -26,7 +25,7 @@ func NewClient(logger logging.Logger, cloudwatchAPI cloudwatchiface.CloudWatchAP
}
}

func (c client) ListMetrics(ctx context.Context, namespace string, metric *config.Metric, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error) {
func (c client) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error) {
filter := &cloudwatch.ListMetricsInput{
MetricName: aws.String(metric.Name),
Namespace: aws.String(namespace),
Expand Down Expand Up @@ -129,7 +128,7 @@ func toMetricDataResult(resp cloudwatch.GetMetricDataOutput) []cloudwatch_client
return output
}

func (c client) GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *config.Metric) []*model.Datapoint {
func (c client) GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint {
filter := createGetMetricStatisticsInput(dimensions, &namespace, metric, logger)

if c.logger.IsDebugEnabled() {
Expand Down
3 changes: 1 addition & 2 deletions pkg/clients/cloudwatch/v1/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch"

cloudwatch_client "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/promutil"
Expand Down Expand Up @@ -71,7 +70,7 @@ func toCloudWatchDimensions(dimensions []*model.Dimension) []*cloudwatch.Dimensi
return cwDim
}

func createGetMetricStatisticsInput(dimensions []*model.Dimension, namespace *string, metric *config.Metric, logger logging.Logger) *cloudwatch.GetMetricStatisticsInput {
func createGetMetricStatisticsInput(dimensions []*model.Dimension, namespace *string, metric *model.MetricConfig, logger logging.Logger) *cloudwatch.GetMetricStatisticsInput {
period := metric.Period
length := metric.Length
delay := metric.Delay
Expand Down
5 changes: 2 additions & 3 deletions pkg/clients/cloudwatch/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"

cloudwatch_client "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/promutil"
Expand All @@ -26,7 +25,7 @@ func NewClient(logger logging.Logger, cloudwatchAPI *cloudwatch.Client) cloudwat
}
}

func (c client) ListMetrics(ctx context.Context, namespace string, metric *config.Metric, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error) {
func (c client) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, recentlyActiveOnly bool, fn func(page []*model.Metric)) ([]*model.Metric, error) {
filter := &cloudwatch.ListMetricsInput{
MetricName: aws.String(metric.Name),
Namespace: aws.String(namespace),
Expand Down Expand Up @@ -133,7 +132,7 @@ func toMetricDataResult(resp cloudwatch.GetMetricDataOutput) []cloudwatch_client
return output
}

func (c client) GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *config.Metric) []*model.Datapoint {
func (c client) GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint {
filter := createGetMetricStatisticsInput(logger, dimensions, &namespace, metric)
if c.logger.IsDebugEnabled() {
c.logger.Debug("GetMetricStatistics", "input", filter)
Expand Down
3 changes: 1 addition & 2 deletions pkg/clients/cloudwatch/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"

cloudwatch_client "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/promutil"
Expand Down Expand Up @@ -72,7 +71,7 @@ func toCloudWatchDimensions(dimensions []*model.Dimension) []types.Dimension {
return cwDim
}

func createGetMetricStatisticsInput(logger logging.Logger, dimensions []*model.Dimension, namespace *string, metric *config.Metric) *cloudwatch.GetMetricStatisticsInput {
func createGetMetricStatisticsInput(logger logging.Logger, dimensions []*model.Dimension, namespace *string, metric *model.MetricConfig) *cloudwatch.GetMetricStatisticsInput {
period := metric.Period
length := metric.Length
delay := metric.Delay
Expand Down
8 changes: 4 additions & 4 deletions pkg/clients/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/account"
cloudwatch_client "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/tagging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

// Factory is an interface to abstract away all logic required to produce the different
// YACE specific clients which wrap AWS clients
type Factory interface {
GetCloudwatchClient(region string, role config.Role, concurrency cloudwatch_client.ConcurrencyConfig) cloudwatch_client.Client
GetTaggingClient(region string, role config.Role, concurrencyLimit int) tagging.Client
GetAccountClient(region string, role config.Role) account.Client
GetCloudwatchClient(region string, role model.Role, concurrency cloudwatch_client.ConcurrencyConfig) cloudwatch_client.Client
GetTaggingClient(region string, role model.Role, concurrencyLimit int) tagging.Client
GetAccountClient(region string, role model.Role) account.Client
}
5 changes: 2 additions & 3 deletions pkg/clients/tagging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"errors"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
)

type Client interface {
GetResources(ctx context.Context, job *config.Job, region string) ([]*model.TaggedResource, error)
GetResources(ctx context.Context, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error)
}

var ErrExpectedToFindResources = errors.New("expected to discover resources but none were found")
Expand All @@ -26,7 +25,7 @@ func NewLimitedConcurrencyClient(client Client, maxConcurrency int) Client {
}
}

func (c limitedConcurrencyClient) GetResources(ctx context.Context, job *config.Job, region string) ([]*model.TaggedResource, error) {
func (c limitedConcurrencyClient) GetResources(ctx context.Context, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
c.sem <- struct{}{}
res, err := c.client.GetResources(ctx, job, region)
<-c.sem
Expand Down
2 changes: 1 addition & 1 deletion pkg/clients/tagging/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewClient(
}
}

func (c client) GetResources(ctx context.Context, job *config.Job, region string) ([]*model.TaggedResource, error) {
func (c client) GetResources(ctx context.Context, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
svc := config.SupportedServices.GetService(job.Type)
var resources []*model.TaggedResource
shouldHaveDiscoveredResources := false
Expand Down
15 changes: 7 additions & 8 deletions pkg/clients/tagging/v1/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (
"github.com/aws/aws-sdk-go/service/storagegateway"
"github.com/grafana/regexp"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/promutil"
)

type ServiceFilter struct {
// ResourceFunc can be used to fetch additional resources
ResourceFunc func(context.Context, client, *config.Job, string) ([]*model.TaggedResource, error)
ResourceFunc func(context.Context, client, model.DiscoveryJob, string) ([]*model.TaggedResource, error)

// FilterFunc can be used to the input resources or to drop based on some condition
FilterFunc func(context.Context, client, []*model.TaggedResource) ([]*model.TaggedResource, error)
Expand Down Expand Up @@ -84,7 +83,7 @@ var ServiceFilters = map[string]ServiceFilter{
},
},
"AWS/AutoScaling": {
ResourceFunc: func(ctx context.Context, client client, job *config.Job, region string) ([]*model.TaggedResource, error) {
ResourceFunc: func(ctx context.Context, client client, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
pageNum := 0
var resources []*model.TaggedResource
err := client.autoscalingAPI.DescribeAutoScalingGroupsPagesWithContext(ctx, &autoscaling.DescribeAutoScalingGroupsInput{},
Expand Down Expand Up @@ -171,7 +170,7 @@ var ServiceFilters = map[string]ServiceFilter{
},
},
"AWS/EC2Spot": {
ResourceFunc: func(ctx context.Context, client client, job *config.Job, region string) ([]*model.TaggedResource, error) {
ResourceFunc: func(ctx context.Context, client client, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
pageNum := 0
var resources []*model.TaggedResource
err := client.ec2API.DescribeSpotFleetRequestsPagesWithContext(ctx, &ec2.DescribeSpotFleetRequestsInput{},
Expand Down Expand Up @@ -204,7 +203,7 @@ var ServiceFilters = map[string]ServiceFilter{
},
},
"AWS/Prometheus": {
ResourceFunc: func(ctx context.Context, client client, job *config.Job, region string) ([]*model.TaggedResource, error) {
ResourceFunc: func(ctx context.Context, client client, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
pageNum := 0
var resources []*model.TaggedResource
err := client.prometheusSvcAPI.ListWorkspacesPagesWithContext(ctx, &prometheusservice.ListWorkspacesInput{},
Expand Down Expand Up @@ -237,7 +236,7 @@ var ServiceFilters = map[string]ServiceFilter{
},
},
"AWS/StorageGateway": {
ResourceFunc: func(ctx context.Context, client client, job *config.Job, region string) ([]*model.TaggedResource, error) {
ResourceFunc: func(ctx context.Context, client client, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
pageNum := 0
var resources []*model.TaggedResource
err := client.storageGatewayAPI.ListGatewaysPagesWithContext(ctx, &storagegateway.ListGatewaysInput{},
Expand Down Expand Up @@ -277,7 +276,7 @@ var ServiceFilters = map[string]ServiceFilter{
},
},
"AWS/TransitGateway": {
ResourceFunc: func(ctx context.Context, client client, job *config.Job, region string) ([]*model.TaggedResource, error) {
ResourceFunc: func(ctx context.Context, client client, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
pageNum := 0
var resources []*model.TaggedResource
err := client.ec2API.DescribeTransitGatewayAttachmentsPagesWithContext(ctx, &ec2.DescribeTransitGatewayAttachmentsInput{},
Expand Down Expand Up @@ -313,7 +312,7 @@ var ServiceFilters = map[string]ServiceFilter{
// Resource discovery only targets the protections, protections are global, so they will only be discoverable in us-east-1.
// Outside us-east-1 no resources are going to be found. We use the shield.ListProtections API to get the protections +
// protected resources to add to the tagged resources. This data is eventually usable for joining with metrics.
ResourceFunc: func(ctx context.Context, c client, job *config.Job, region string) ([]*model.TaggedResource, error) {
ResourceFunc: func(ctx context.Context, c client, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
var output []*model.TaggedResource
pageNum := 0
// Default page size is only 20 which can easily lead to throttling
Expand Down
2 changes: 1 addition & 1 deletion pkg/clients/tagging/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewClient(
}
}

func (c client) GetResources(ctx context.Context, job *config.Job, region string) ([]*model.TaggedResource, error) {
func (c client) GetResources(ctx context.Context, job model.DiscoveryJob, region string) ([]*model.TaggedResource, error) {
svc := config.SupportedServices.GetService(job.Type)
var resources []*model.TaggedResource
shouldHaveDiscoveredResources := false
Expand Down
Loading

0 comments on commit c292431

Please sign in to comment.