Skip to content

Commit

Permalink
[service/telemetry] Switch to a factory pattern (#10001)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Switches `service/telemetry` to a factory pattern. To avoid adding a lot
of public API in one go:
1. the actual factory builder is in an internal package
2. I have not added the `CreateMeterProvider` method yet

There are two goals with this: one is to make progress on #4970, the
other is to allow initializing telemetry sooner:

<!-- Issue number if applicable -->
#### Link to tracking issue
Updates #4970. 

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Updates existing tests to use `NewFactory`
  • Loading branch information
mx-psi authored May 21, 2024
1 parent 1159fda commit b1579a5
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 159 deletions.
25 changes: 25 additions & 0 deletions .chloggen/mx-psi_tel-factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service/telemetry

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate telemetry.New in favor of telemetry.NewFactory

# One or more tracking issues or pull requests related to the change
issues: [4970]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
32 changes: 5 additions & 27 deletions otelcol/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"time"

"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -32,6 +27,10 @@ type configSettings struct {
// unmarshal the configSettings from a confmap.Conf.
// After the config is unmarshalled, `Validate()` must be called to validate.
func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) {

telFactory := telemetry.NewFactory()
defaultTelConfig := *telFactory.CreateDefaultConfig().(*telemetry.Config)

// Unmarshal top level sections and validate.
cfg := &configSettings{
Receivers: configunmarshaler.NewConfigs(factories.Receivers),
Expand All @@ -41,28 +40,7 @@ func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) {
Extensions: configunmarshaler.NewConfigs(factories.Extensions),
// TODO: Add a component.ServiceFactory to allow this to be defined by the Service.
Service: service.Config{
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
Sampling: &telemetry.LogsSamplingConfig{
Enabled: true,
Tick: 10 * time.Second,
Initial: 10,
Thereafter: 100,
},
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]any(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelNormal,
Address: ":8888",
},
},
Telemetry: defaultTelConfig,
},
}

Expand Down
23 changes: 17 additions & 6 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,27 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
},
collectorConf: set.CollectorConf,
}
tel, err := telemetry.New(ctx, telemetry.Settings{BuildInfo: set.BuildInfo, ZapOptions: set.LoggingOptions}, cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}

// Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry.
res := resource.New(set.BuildInfo, cfg.Telemetry.Resource)
pcommonRes := pdataFromSdk(res)

logger := tel.Logger()
telFactory := telemetry.NewFactory()
telset := telemetry.Settings{
BuildInfo: set.BuildInfo,
ZapOptions: set.LoggingOptions,
}

logger, err := telFactory.CreateLogger(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create logger: %w", err)
}

tracerProvider, err := telFactory.CreateTracerProvider(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create tracer provider: %w", err)
}

logger.Info("Setting up own telemetry...")
mp, err := newMeterProvider(
meterProviderSettings{
Expand All @@ -116,7 +127,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
srv.telemetrySettings = servicetelemetry.TelemetrySettings{
Logger: logger,
MeterProvider: mp,
TracerProvider: tel.TracerProvider(),
TracerProvider: tracerProvider,
MetricsLevel: cfg.Telemetry.Metrics.Level,
// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
Expand Down
59 changes: 59 additions & 0 deletions service/telemetry/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"context"
"time"

"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/telemetry/internal"
)

func createDefaultConfig() component.Config {
return &Config{
Logs: LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
Sampling: &LogsSamplingConfig{
Enabled: true,
Tick: 10 * time.Second,
Initial: 10,
Thereafter: 100,
},
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]any(nil),
},
Metrics: MetricsConfig{
Level: configtelemetry.LevelNormal,
Address: ":8888",
},
}
}

// Factory is a telemetry factory.
type Factory = internal.Factory

// NewFactory creates a new Factory.
func NewFactory() Factory {
return internal.NewFactory(createDefaultConfig,
internal.WithLogger(func(_ context.Context, set Settings, cfg component.Config) (*zap.Logger, error) {
c := *cfg.(*Config)
return newLogger(c.Logs, set.ZapOptions)
}),
internal.WithTracerProvider(func(ctx context.Context, _ Settings, cfg component.Config) (trace.TracerProvider, error) {
c := *cfg.(*Config)
return newTracerProvider(ctx, c)
}),
)
}
116 changes: 116 additions & 0 deletions service/telemetry/internal/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/service/telemetry/internal"

import (
"context"

"go.opentelemetry.io/otel/trace"
tracenoop "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
)

// CreateSettings holds configuration for building Telemetry.
type CreateSettings struct {
BuildInfo component.BuildInfo
AsyncErrorChannel chan error
ZapOptions []zap.Option
}

// Factory is factory interface for telemetry.
// This interface cannot be directly implemented. Implementations must
// use the NewFactory to implement it.
type Factory interface {
// CreateDefaultConfig creates the default configuration for the telemetry.
// TODO: Should we just inherit from component.Factory?
CreateDefaultConfig() component.Config

// CreateLogger creates a logger.
CreateLogger(ctx context.Context, set CreateSettings, cfg component.Config) (*zap.Logger, error)

// CreateTracerProvider creates a TracerProvider.
CreateTracerProvider(ctx context.Context, set CreateSettings, cfg component.Config) (trace.TracerProvider, error)

// TODO: Add CreateMeterProvider.

// unexportedFactoryFunc is used to prevent external implementations of Factory.
unexportedFactoryFunc()
}

// FactoryOption apply changes to Factory.
type FactoryOption interface {
// applyTelemetryFactoryOption applies the option.
applyTelemetryFactoryOption(o *factory)
}

var _ FactoryOption = (*factoryOptionFunc)(nil)

// factoryOptionFunc is an FactoryOption created through a function.
type factoryOptionFunc func(*factory)

func (f factoryOptionFunc) applyTelemetryFactoryOption(o *factory) {
f(o)
}

var _ Factory = (*factory)(nil)

// factory is the implementation of Factory.
type factory struct {
createDefaultConfig component.CreateDefaultConfigFunc
CreateLoggerFunc
CreateTracerProviderFunc
}

func (f *factory) CreateDefaultConfig() component.Config {
return f.createDefaultConfig()
}

// CreateLoggerFunc is the equivalent of Factory.CreateLogger.
type CreateLoggerFunc func(context.Context, CreateSettings, component.Config) (*zap.Logger, error)

// WithLogger overrides the default no-op logger.
func WithLogger(createLogger CreateLoggerFunc) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.CreateLoggerFunc = createLogger
})
}

func (f *factory) CreateLogger(ctx context.Context, set CreateSettings, cfg component.Config) (*zap.Logger, error) {
if f.CreateLoggerFunc == nil {
return zap.NewNop(), nil
}
return f.CreateLoggerFunc(ctx, set, cfg)
}

// CreateTracerProviderFunc is the equivalent of Factory.CreateTracerProvider.
type CreateTracerProviderFunc func(context.Context, CreateSettings, component.Config) (trace.TracerProvider, error)

// WithTracerProvider overrides the default no-op tracer provider.
func WithTracerProvider(createTracerProvider CreateTracerProviderFunc) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.CreateTracerProviderFunc = createTracerProvider
})
}

func (f *factory) CreateTracerProvider(ctx context.Context, set CreateSettings, cfg component.Config) (trace.TracerProvider, error) {
if f.CreateTracerProviderFunc == nil {
return tracenoop.NewTracerProvider(), nil
}
return f.CreateTracerProviderFunc(ctx, set, cfg)
}

func (f *factory) unexportedFactoryFunc() {}

// NewFactory returns a new Factory.
func NewFactory(createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
createDefaultConfig: createDefaultConfig,
}
for _, op := range options {
op.applyTelemetryFactoryOption(f)
}
return f
}
53 changes: 53 additions & 0 deletions service/telemetry/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
// Copied from NewProductionConfig.
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.Level),
Development: cfg.Development,
Encoding: cfg.Encoding,
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: cfg.OutputPaths,
ErrorOutputPaths: cfg.ErrorOutputPaths,
DisableCaller: cfg.DisableCaller,
DisableStacktrace: cfg.DisableStacktrace,
InitialFields: cfg.InitialFields,
}

if zapCfg.Encoding == "console" {
// Human-readable timestamps for console format of logs.
zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}

logger, err := zapCfg.Build(options...)
if err != nil {
return nil, err
}
if cfg.Sampling != nil && cfg.Sampling.Enabled {
logger = newSampledLogger(logger, cfg.Sampling)
}

return logger, nil
}

func newSampledLogger(logger *zap.Logger, sc *LogsSamplingConfig) *zap.Logger {
// Create a logger that samples every Nth message after the first M messages every S seconds
// where N = sc.Thereafter, M = sc.Initial, S = sc.Tick.
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(
core,
sc.Tick,
sc.Initial,
sc.Thereafter,
)
})
return logger.WithOptions(opts)
}
Loading

0 comments on commit b1579a5

Please sign in to comment.