Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (r ReceiverOTLP) Pipelines(ctx context.Context) ([]otel.ReceiverPipeline, e
ExporterTypes: map[string]otel.ExporterType{
"metrics": receiverPipelineType,
"traces": otel.OTel,
"logs": otel.OTel,
"logs": otel.Logs,
},
Receiver: otel.Component{
Type: "otlp",
Expand Down
7 changes: 5 additions & 2 deletions cmd/ops_agent_uap_plugin/service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
LogsDirectory = "log/google-cloud-ops-agent"
FluentBitStateDiectory = "state/fluent-bit"
FluentBitRuntimeDirectory = "run/google-cloud-ops-agent-fluent-bit"
OtelStateDiectory = "state/opentelemetry-collector"
OtelRuntimeDirectory = "run/google-cloud-ops-agent-opentelemetry-collector"
DefaultPluginStateDirectory = "/var/lib/google-guest-agent/agent_state/plugins/ops-agent-plugin"
)
Expand Down Expand Up @@ -209,7 +210,8 @@ func generateSubagentConfigs(ctx context.Context, runCommand RunCommandFunc, plu
"-service", "otel",
"-in", OpsAgentConfigLocationLinux,
"-out", path.Join(pluginStateDirectory, OtelRuntimeDirectory),
"-logs", path.Join(pluginStateDirectory, LogsDirectory))
"-logs", path.Join(pluginStateDirectory, LogsDirectory),
"-state", path.Join(pluginStateDirectory, OtelStateDiectory))

if output, err := runCommand(otelConfigGenerationCmd); err != nil {
return fmt.Errorf("failed to generate Otel config:\ncommand output: %s\ncommand error: %s", output, err)
Expand All @@ -220,7 +222,8 @@ func generateSubagentConfigs(ctx context.Context, runCommand RunCommandFunc, plu
"-service", "fluentbit",
"-in", OpsAgentConfigLocationLinux,
"-out", path.Join(pluginStateDirectory, FluentBitRuntimeDirectory),
"-logs", path.Join(pluginStateDirectory, LogsDirectory), "-state", path.Join(pluginStateDirectory, FluentBitStateDiectory))
"-logs", path.Join(pluginStateDirectory, LogsDirectory),
"-state", path.Join(pluginStateDirectory, FluentBitStateDiectory))

if output, err := runCommand(fluentBitConfigGenerationCmd); err != nil {
return fmt.Errorf("failed to generate Fluntbit config:\ncommand output: %s\ncommand error: %s", output, err)
Expand Down
4 changes: 4 additions & 0 deletions cmd/ops_agent_windows/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func initServices() error {
if err := os.MkdirAll(fluentbitStoragePath, 0644); err != nil {
return err
}
otelStoragePath := filepath.Join(os.Getenv("PROGRAMDATA"), dataDirectory, `run\file_storage`)
if err := os.MkdirAll(otelStoragePath, 0644); err != nil {
return err
}
logDirectory := filepath.Join(os.Getenv("PROGRAMDATA"), dataDirectory, "log")
if err := os.MkdirAll(logDirectory, 0644); err != nil {
return err
Expand Down
76 changes: 54 additions & 22 deletions confgenerator/confgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,42 @@ import (
"github.com/GoogleCloudPlatform/ops-agent/internal/platform"
)

func googleCloudExporter(userAgent string, instrumentationLabels bool, serviceResourceLabels bool) otel.Component {
return otel.Component{
Type: "googlecloud",
Config: map[string]interface{}{
"user_agent": userAgent,
"metric": map[string]interface{}{
// Receivers are responsible for sending fully-qualified metric names.
// NB: If a receiver fails to send a full URL, OT will add the prefix `workload.googleapis.com/{metric_name}`.
// TODO(b/197129428): Write a test to make sure this doesn't happen.
"prefix": "",
// OT calls CreateMetricDescriptor by default. Skip because we want
// descriptors to be created implicitly with new time series.
"skip_create_descriptor": true,
// Omit instrumentation labels, which break agent metrics.
"instrumentation_library_labels": instrumentationLabels,
// Omit service labels, which break agent metrics.
"service_resource_labels": serviceResourceLabels,
"resource_filters": []map[string]interface{}{},
},
func googleCloudExporter(userAgent string, instrumentationLabels, serviceResourceLabels, logBuffering bool) otel.Component {
config := map[string]interface{}{
"user_agent": userAgent,
"metric": map[string]interface{}{
// Receivers are responsible for sending fully-qualified metric names.
// NB: If a receiver fails to send a full URL, OT will add the prefix `workload.googleapis.com/{metric_name}`.
// TODO(b/197129428): Write a test to make sure this doesn't happen.
"prefix": "",
// OT calls CreateMetricDescriptor by default. Skip because we want
// descriptors to be created implicitly with new time series.
"skip_create_descriptor": true,
// Omit instrumentation labels, which break agent metrics.
"instrumentation_library_labels": instrumentationLabels,
// Omit service labels, which break agent metrics.
"service_resource_labels": serviceResourceLabels,
"resource_filters": []map[string]interface{}{},
},
}
if logBuffering {
config["log"] = map[string]any{
"grpc_pool_size": 20,
}
config["sending_queue"] = map[string]any{
"enabled": true,
"num_consumers": 40,
"storage": FileStorageExtensionID(),
"sizer": "bytes",
"queue_size": 50000000, //50M
}
config["timeout"] = "60s"
}

return otel.Component{
Type: "googlecloud",
Config: config,
}
}

func ConvertPrometheusExporterToOtlpExporter(receiver otel.ReceiverPipeline, ctx context.Context) otel.ReceiverPipeline {
Expand Down Expand Up @@ -122,7 +137,17 @@ func (uc *UnifiedConfig) getOTelLogLevel() string {
return logLevel
}

func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context, outDir string) (string, error) {
// FileStorageExtensionID returns the file_storage extension used by all receivers and exporters.
func FileStorageExtensionID() string {
return "file_storage"
}

// FileStorageExtensionDirectoryPath returns the directory path for the file_storage extension.
func FileStorageExtensionDirectoryPath(stateDir string) string {
return path.Join(stateDir, "file_storage")
}

func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context, outDir, stateDir string) (string, error) {
p := platform.FromContext(ctx)
userAgent, _ := p.UserAgent("Google-Cloud-Ops-Agent-Metrics")
metricVersionLabel, _ := p.VersionLabel("google-cloud-ops-agent-metrics")
Expand All @@ -148,15 +173,22 @@ func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context, outDir string)
if expOtlpExporter {
extensions["googleclientauth"] = map[string]interface{}{}
}
if uc.Logging.Service.OTelLogging {
extensions["file_storage"] = map[string]interface{}{
"directory": FileStorageExtensionDirectoryPath(stateDir),
"create_directory": true,
}
}

otelConfig, err := otel.ModularConfig{
LogLevel: uc.getOTelLogLevel(),
ReceiverPipelines: receiverPipelines,
Pipelines: pipelines,
Extensions: extensions,
Exporters: map[otel.ExporterType]otel.Component{
otel.System: googleCloudExporter(userAgent, false, false),
otel.OTel: googleCloudExporter(userAgent, true, true),
otel.System: googleCloudExporter(userAgent, false, false, false),
otel.OTel: googleCloudExporter(userAgent, true, true, false),
otel.Logs: googleCloudExporter(userAgent, true, true, true),
otel.GMP: googleManagedPrometheusExporter(userAgent),
otel.OTLP: otlpExporter(userAgent),
},
Expand Down
2 changes: 1 addition & 1 deletion confgenerator/confgenerator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func generateConfigs(pc platformConfig, testDir string) (got map[string]string,
}

// Otel configs
otelGeneratedConfig, err := mergedUc.GenerateOtelConfig(ctx, "")
otelGeneratedConfig, err := mergedUc.GenerateOtelConfig(ctx, "", "")
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion confgenerator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ func (uc *UnifiedConfig) OTelLoggingSupported(ctx context.Context) bool {
ucLoggingCopy.Logging.Service = &LoggingService{}
}
ucLoggingCopy.Logging.Service.OTelLogging = true
_, err = ucLoggingCopy.GenerateOtelConfig(ctx, "")
_, err = ucLoggingCopy.GenerateOtelConfig(ctx, "", "")
return err == nil
}

Expand Down
2 changes: 1 addition & 1 deletion confgenerator/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (uc *UnifiedConfig) GenerateFilesFromConfig(ctx context.Context, service, l
}
}
case "otel":
otelConfig, err := uc.GenerateOtelConfig(ctx, outDir)
otelConfig, err := uc.GenerateOtelConfig(ctx, outDir, stateDir)
if err != nil {
return fmt.Errorf("can't parse configuration: %w", err)
}
Expand Down
14 changes: 9 additions & 5 deletions confgenerator/logging_receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ func (r LoggingReceiverFilesMixin) Pipelines(ctx context.Context) ([]otel.Receiv
"preserve_leading_whitespaces": true,
"preserve_trailing_whitespaces": true,
}
if !r.TransformationTest {
receiver_config["storage"] = FileStorageExtensionID()
}
if i := r.WildcardRefreshInterval; i != nil {
receiver_config["poll_interval"] = i.String()
}
Expand Down Expand Up @@ -241,7 +244,7 @@ func (r LoggingReceiverFilesMixin) Pipelines(ctx context.Context) ([]otel.Receiv
"logs": nil,
},
ExporterTypes: map[string]otel.ExporterType{
"logs": otel.OTel,
"logs": otel.Logs,
},
}}, nil
}
Expand Down Expand Up @@ -366,7 +369,7 @@ func (r LoggingReceiverSyslog) Pipelines(ctx context.Context) ([]otel.ReceiverPi
},

ExporterTypes: map[string]otel.ExporterType{
"logs": otel.OTel,
"logs": otel.Logs,
},
}}, nil
}
Expand Down Expand Up @@ -606,7 +609,7 @@ func (r LoggingReceiverWindowsEventLog) Pipelines(ctx context.Context) ([]otel.R
"start_at": "beginning",
"poll_interval": "1s",
"ignore_channel_errors": true,
// TODO: Configure storage
"storage": FileStorageExtensionID(),
}

var p []otel.Component
Expand All @@ -633,7 +636,7 @@ func (r LoggingReceiverWindowsEventLog) Pipelines(ctx context.Context) ([]otel.R
"logs": p,
},
ExporterTypes: map[string]otel.ExporterType{
"logs": otel.OTel,
"logs": otel.Logs,
},
})
}
Expand Down Expand Up @@ -966,6 +969,7 @@ func (r LoggingReceiverSystemd) Pipelines(ctx context.Context) ([]otel.ReceiverP
receiver_config := map[string]any{
"start_at": "beginning",
"priority": "debug",
"storage": FileStorageExtensionID(),
}

modify_fields_processors, err := LoggingProcessorModifyFields{
Expand Down Expand Up @@ -1011,7 +1015,7 @@ func (r LoggingReceiverSystemd) Pipelines(ctx context.Context) ([]otel.ReceiverP
},

ExporterTypes: map[string]otel.ExporterType{
"logs": otel.OTel,
"logs": otel.Logs,
},
}}, nil
}
Expand Down
22 changes: 16 additions & 6 deletions confgenerator/otel/modular.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// another exporter type.
OTel ExporterType = iota
System
Logs
GMP
OTLP
)
Expand All @@ -51,6 +52,8 @@ func (t ExporterType) Name() string {
return ""
} else if t == OTel {
return "otel"
} else if t == Logs {
return "logs"
} else if t == OTLP {
return "otlp"
} else {
Expand Down Expand Up @@ -221,6 +224,15 @@ func (c ModularConfig) Generate(ctx context.Context, expOtlpExporter bool) (stri
SetIfMissing: resourceDetectionProcessors[SetIfMissing].name("_global_1"),
}

exporterTypeProcessors := map[ExporterType]Component{
Logs: BatchProcessor(),
OTLP: CopyHostIDToInstanceID(), // b/459468648
}
exporterTypeProcessorNames := map[ExporterType]string{
Logs: exporterTypeProcessors[Logs].name("_global_2"),
OTLP: exporterTypeProcessors[OTLP].name("_global_3"),
}

for prefix, pipeline := range c.Pipelines {
// Receiver pipelines need to be instantiated once, since they might have more than one type.
// We do this work more than once if it's in more than one pipeline, but it should just overwrite the same names.
Expand Down Expand Up @@ -251,14 +263,12 @@ func (c ModularConfig) Generate(ctx context.Context, expOtlpExporter bool) (stri
if name, ok := resourceDetectionProcessorNames[rdm]; ok {
processorNames = append(processorNames, name)
processors[name] = resourceDetectionProcessors[rdm].Config
// b/459468648
if expOtlpExporter {
copyProcessor := CopyHostIDToInstanceID()
processorNames = append(processorNames, copyProcessor.name("_global_0"))
processors[copyProcessor.name("_global_0")] = copyProcessor.Config
}
}
exporterType := receiverPipeline.ExporterTypes[pipeline.Type]
if name, ok := exporterTypeProcessorNames[exporterType]; ok {
processorNames = append(processorNames, name)
processors[name] = exporterTypeProcessors[exporterType].Config
}
if _, ok := exporterNames[exporterType]; !ok {
exporter := c.Exporters[exporterType]
name := exporter.name(exporterType.Name())
Expand Down
11 changes: 11 additions & 0 deletions confgenerator/otel/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,17 @@ func MetricsRemoveServiceAttributes() Component {
}
}

func BatchProcessor() Component {
return Component{
Type: "batch",
Config: map[string]any{
"send_batch_max_size": 1000,
"send_batch_size": 1000,
"timeout": "200s",
},
}
}

func CopyHostIDToInstanceID() Component {
return Component{
Type: "transform",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ exporters:
service_resource_labels: false
skip_create_descriptor: true
user_agent: Google-Cloud-Ops-Agent-Metrics/latest (BuildDistro=build_distro;Platform=linux;ShortName=linux_platform;ShortVersion=linux_platform_version)
googlecloud/logs:
log:
grpc_pool_size: 20
metric:
instrumentation_library_labels: true
prefix: ""
resource_filters: []
service_resource_labels: true
skip_create_descriptor: true
sending_queue:
enabled: true
num_consumers: 40
queue_size: 50000000
sizer: bytes
storage: file_storage
timeout: 60s
user_agent: Google-Cloud-Ops-Agent-Metrics/latest (BuildDistro=build_distro;Platform=linux;ShortName=linux_platform;ShortVersion=linux_platform_version)
googlecloud/otel:
metric:
instrumentation_library_labels: true
Expand All @@ -23,6 +40,10 @@ processors:
agentmetrics/hostmetrics_0:
blank_label_metrics:
- system.cpu.utilization
batch/_global_2:
send_batch_max_size: 1000
send_batch_size: 1000
timeout: 200s
cumulativetodelta/loggingmetrics_4:
include:
match_type: strict
Expand Down Expand Up @@ -732,10 +753,11 @@ service:
pipelines:
logs/logs_otlp_otlp:
exporters:
- googlecloud/otel
- googlecloud/logs
processors:
- transform/otlp_0
- resourcedetection/_global_1
- batch/_global_2
receivers:
- otlp/otlp
metrics/default__pipeline_hostmetrics:
Expand Down
Loading