Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dinesh.gurumurthy/otel 2150 2 #33082

Draft
wants to merge 6 commits into
base: dinesh.gurumurthy/OTEL-2150
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions comp/otelcol/collector/impl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
corelog "github.com/DataDog/datadog-agent/comp/core/log/def"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util"
compdef "github.com/DataDog/datadog-agent/comp/def"
collectorcontrib "github.com/DataDog/datadog-agent/comp/otelcol/collector-contrib/def"
collector "github.com/DataDog/datadog-agent/comp/otelcol/collector/def"
Expand Down Expand Up @@ -124,17 +123,13 @@ func newConfigProviderSettings(uris []string, converter confmap.Converter, enhan
}
}

func generateID(group, resource, namespace, name string) string {
return string(util.GenerateKubeMetadataEntityID(group, resource, namespace, name))
}

func addFactories(reqs Requires, factories otelcol.Factories) {
if v, ok := reqs.LogsAgent.Get(); ok {
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.TraceAgent, reqs.Serializer, v, reqs.SourceProvider, reqs.StatsdClientWrapper)
} else {
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(reqs.TraceAgent, reqs.Serializer, nil, reqs.SourceProvider, reqs.StatsdClientWrapper)
}
factories.Processors[infraattributesprocessor.Type] = infraattributesprocessor.NewFactory(reqs.Tagger, generateID)
factories.Processors[infraattributesprocessor.Type] = infraattributesprocessor.NewFactoryForAgent(reqs.Tagger)
factories.Connectors[component.MustNewType("datadog")] = datadogconnector.NewFactory()
factories.Extensions[ddextension.Type] = ddextension.NewFactoryForAgent(&factories, newConfigProviderSettings(reqs.URIs, reqs.Converter, false))
}
Expand Down
2 changes: 1 addition & 1 deletion comp/otelcol/ddflareextension/impl/configstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// this is only used for config unmarshalling.
func addFactories(factories otelcol.Factories) {
factories.Exporters[datadogexporter.Type] = datadogexporter.NewFactory(nil, nil, nil, nil, nil)
factories.Processors[infraattributesprocessor.Type] = infraattributesprocessor.NewFactory(nil, nil)
factories.Processors[infraattributesprocessor.Type] = infraattributesprocessor.NewFactoryForAgent(nil, nil)
factories.Connectors[component.MustNewType("datadog")] = datadogconnector.NewFactory()
factories.Extensions[Type] = NewFactoryForAgent(nil, otelcol.ConfigProviderSettings{})
}
Expand Down
8 changes: 1 addition & 7 deletions comp/otelcol/otlp/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/logsagentexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/exporter/serializerexporter"
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/processor/infraattributesprocessor"
Expand Down Expand Up @@ -93,11 +92,6 @@ func (t *tagEnricher) Enrich(_ context.Context, extraTags []string, dimensions *
return enrichedTags
}

func generateID(group, resource, namespace, name string) string {

return string(util.GenerateKubeMetadataEntityID(group, resource, namespace, name))
}

func getComponents(s serializer.MetricSerializer, logsAgentChannel chan *message.Message, tagger tagger.Component) (
otelcol.Factories,
error,
Expand Down Expand Up @@ -133,7 +127,7 @@ func getComponents(s serializer.MetricSerializer, logsAgentChannel chan *message

processorFactories := []processor.Factory{batchprocessor.NewFactory()}
if tagger != nil {
processorFactories = append(processorFactories, infraattributesprocessor.NewFactory(tagger, generateID))
processorFactories = append(processorFactories, infraattributesprocessor.NewFactoryForAgent(tagger))
}
processors, err := processor.MakeFactoryMap(processorFactories...)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@ var unifiedServiceTagMap = map[string][]string{
tags.Version: {conventions.AttributeServiceVersion},
}

// GenerateKubeMetadataEntityID is a function that generates an entity ID for a Kubernetes resource.
type GenerateKubeMetadataEntityID func(group, resource, namespace, name string) string

// processInfraTags collects entities/tags from resourceAttributes and adds infra tags to resourceAttributes
func processInfraTags(
logger *zap.Logger,
tagger taggerClient,
cardinality types.TagCardinality,
generateID GenerateKubeMetadataEntityID,
resourceAttributes pcommon.Map,
) {
entityIDs := entityIDsFromAttributes(resourceAttributes, generateID)
entityIDs := entityIDsFromAttributes(resourceAttributes)
tagMap := make(map[string]string)

// Get all unique tags from resource attributes and global tags
Expand Down Expand Up @@ -91,7 +87,7 @@ func processInfraTags(
// TODO: Replace OriginIDFromAttributes in opentelemetry-mapping-go with this method
// entityIDsFromAttributes gets the entity IDs from resource attributes.
// If not found, an empty string slice is returned.
func entityIDsFromAttributes(attrs pcommon.Map, generateID GenerateKubeMetadataEntityID) []types.EntityID {
func entityIDsFromAttributes(attrs pcommon.Map) []types.EntityID {
entityIDs := make([]types.EntityID, 0, 8)
// Prefixes come from pkg/util/kubernetes/kubelet and pkg/util/containers.
if containerID, ok := attrs.Get(conventions.AttributeContainerID); ok {
Expand All @@ -113,11 +109,11 @@ func entityIDsFromAttributes(attrs pcommon.Map, generateID GenerateKubeMetadataE
}
}
if namespace, ok := attrs.Get(conventions.AttributeK8SNamespaceName); ok {
entityIDs = append(entityIDs, types.NewEntityID(types.KubernetesMetadata, generateID("", "namespaces", "", namespace.AsString())))
entityIDs = append(entityIDs, types.NewEntityID(types.KubernetesMetadata, fmt.Sprintf("/namespaces//%s", namespace.AsString())))
}

if nodeName, ok := attrs.Get(conventions.AttributeK8SNodeName); ok {
entityIDs = append(entityIDs, types.NewEntityID(types.KubernetesMetadata, generateID("", "nodes", "", nodeName.AsString())))
entityIDs = append(entityIDs, types.NewEntityID(types.KubernetesMetadata, fmt.Sprintf("/nodes//%s", nodeName.AsString())))
}
if podUID, ok := attrs.Get(conventions.AttributeK8SPodUID); ok {
entityIDs = append(entityIDs, types.NewEntityID(types.KubernetesPodUID, podUID.AsString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func TestLoadingConfigStrictLogs(t *testing.T) {
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
tc := newTestTaggerClient()
gc := newTestGenerateIDClient().generateID
f := NewFactory(tc, gc)
f := NewFactoryForAgent(tc)
cfg := f.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,23 @@ package infraattributesprocessor

import (
"context"
"fmt"
"strings"
"sync"

"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"go.uber.org/fx"

logfx "github.com/DataDog/datadog-agent/comp/core/log/fx"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
remoteTaggerfx "github.com/DataDog/datadog-agent/comp/core/tagger/fx-remote"
taggerTypes "github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/telemetry/telemetryimpl"
"github.com/DataDog/datadog-agent/pkg/api/security"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
Expand All @@ -18,18 +32,66 @@ import (

var processorCapabilities = consumer.Capabilities{MutatesData: true}

// TODO: Remove tagger and generateID as depenendencies to enable future import of
// infraattributesprocessor by external go packages like ocb
type factory struct {
tagger taggerClient
generateID GenerateKubeMetadataEntityID
tagger taggerClient
mu sync.Mutex
}

func (f *factory) initializeTaggerClient() error {
// Ensure that the tagger is initialized only once.
f.mu.Lock()
defer f.mu.Unlock()
if f.tagger != nil {
return nil
}
var client taggerClient
app := fx.New(
fx.Provide(func() config.Component {
pkgconfig := pkgconfigsetup.Datadog()
if pkgconfig == nil {
pkgconfig = pkgconfigmodel.NewConfig("DD", "DD", strings.NewReplacer("_", "."))
pkgconfigsetup.InitConfig(pkgconfig)
}
return pkgconfig
}),
fx.Provide(func(_ config.Component) log.Params {
return log.ForDaemon("otelcol", "log_file", pkgconfigsetup.DefaultOTelAgentLogFile)
}),
logfx.Module(),
telemetryimpl.Module(),
fxutil.FxAgentBase(),
remoteTaggerfx.Module(tagger.RemoteParams{
RemoteTarget: func(c config.Component) (string, error) {
return fmt.Sprintf(":%v", c.GetInt("cmd_port")), nil
},
RemoteTokenFetcher: func(c config.Component) func() (string, error) {
return func() (string, error) {
return security.FetchAuthToken(c)
}
},
RemoteFilter: taggerTypes.NewMatchAllFilter(),
}),
fx.Provide(func(t tagger.Component) taggerClient {
return t
}),
fx.Populate(&client),
)
if err := app.Err(); err != nil {
return err
}
f.tagger = client
return nil
}

// NewFactory returns a new factory for the InfraAttributes processor.
func NewFactory(tagger taggerClient, generateID GenerateKubeMetadataEntityID) processor.Factory {
func NewFactory() processor.Factory {
return NewFactoryForAgent(nil)
}

// NewFactoryForAgent returns a new factory for the InfraAttributes processor.
func NewFactoryForAgent(tagger taggerClient) processor.Factory {
f := &factory{
tagger: tagger,
generateID: generateID,
tagger: tagger,
}

return processor.NewFactory(
Expand All @@ -43,7 +105,7 @@ func NewFactory(tagger taggerClient, generateID GenerateKubeMetadataEntityID) pr

func (f *factory) createDefaultConfig() component.Config {
return &Config{
Cardinality: types.LowCardinality,
Cardinality: taggerTypes.LowCardinality,
}
}

Expand All @@ -53,7 +115,13 @@ func (f *factory) createMetricsProcessor(
cfg component.Config,
nextConsumer consumer.Metrics,
) (processor.Metrics, error) {
iap, err := newInfraAttributesMetricProcessor(set, cfg.(*Config), f.tagger, f.generateID)
if f.tagger == nil {
err := f.initializeTaggerClient()
if err != nil {
return nil, err
}
}
iap, err := newInfraAttributesMetricProcessor(set, cfg.(*Config), f.tagger)
if err != nil {
return nil, err
}
Expand All @@ -72,7 +140,13 @@ func (f *factory) createLogsProcessor(
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
iap, err := newInfraAttributesLogsProcessor(set, cfg.(*Config), f.tagger, f.generateID)
if f.tagger == nil {
err := f.initializeTaggerClient()
if err != nil {
return nil, err
}
}
iap, err := newInfraAttributesLogsProcessor(set, cfg.(*Config), f.tagger)
if err != nil {
return nil, err
}
Expand All @@ -91,7 +165,13 @@ func (f *factory) createTracesProcessor(
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
iap, err := newInfraAttributesSpanProcessor(set, cfg.(*Config), f.tagger, f.generateID)
if f.tagger == nil {
err := f.initializeTaggerClient()
if err != nil {
return nil, err
}
}
iap, err := newInfraAttributesSpanProcessor(set, cfg.(*Config), f.tagger)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ import (

func TestType(t *testing.T) {
tc := newTestTaggerClient()
gc := newTestGenerateIDClient().generateID
factory := NewFactory(tc, gc)
factory := NewFactoryForAgent(tc)
pType := factory.Type()

assert.Equal(t, pType, Type)
}

func TestCreateDefaultConfig(t *testing.T) {
tc := newTestTaggerClient()
gc := newTestGenerateIDClient().generateID
factory := NewFactory(tc, gc)
factory := NewFactoryForAgent(tc)
cfg := factory.CreateDefaultConfig()
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}
Expand All @@ -54,11 +52,10 @@ func TestCreateProcessors(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.configName))
require.NoError(t, err)
tc := newTestTaggerClient()
gc := newTestGenerateIDClient().generateID

for k := range cm.ToStringMap() {
// Check if all processor variations that are defined in test config can be actually created
factory := NewFactory(tc, gc)
factory := NewFactoryForAgent(tc)
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(k)
Expand Down Expand Up @@ -94,3 +91,10 @@ func TestCreateProcessors(t *testing.T) {
})
}
}

func TestInitializeTagger(t *testing.T) {
f := &factory{}
err := f.initializeTaggerClient()
assert.NoError(t, err)
assert.NotNil(t, f.tagger)
}
Loading
Loading