Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 26 additions & 29 deletions apps/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,38 +149,35 @@ func init() {
confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverElasticsearch{} })
}

type LoggingProcessorElasticsearchJson struct {
confgenerator.ConfigComponent `yaml:",inline"`
type LoggingProcessorMacroElasticsearchJson struct {
}

func (LoggingProcessorElasticsearchJson) Type() string {
func (LoggingProcessorMacroElasticsearchJson) Type() string {
return "elasticsearch_json"
}

func (p LoggingProcessorElasticsearchJson) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
c := []fluentbit.Component{}

func (p LoggingProcessorMacroElasticsearchJson) Expand(ctx context.Context) []confgenerator.InternalLoggingProcessor {
// sample log line:
// {"type": "server", "timestamp": "2022-01-17T18:31:47,365Z", "level": "INFO", "component": "o.e.n.Node", "cluster.name": "elasticsearch", "node.name": "ubuntu-jammy", "message": "initialized" }
// Logs are formatted based on configuration (log4j);
// See https://artifacts.elastic.co/javadoc/org/elasticsearch/elasticsearch/7.16.2/org/elasticsearch/common/logging/ESJsonLayout.html
// for general layout, and https://www.elastic.co/guide/en/elasticsearch/reference/current/logging.html for general configuration of logging

jsonParser := &confgenerator.LoggingProcessorParseJson{
ParserShared: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%dT%H:%M:%S,%L%z",
processors := []confgenerator.InternalLoggingProcessor{
confgenerator.LoggingProcessorParseJson{
ParserShared: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%dT%H:%M:%S,%L%z",
},
},
p.severityParser(),
}

c = append(c, jsonParser.Components(ctx, tag, uid)...)
c = append(c, p.severityParser(ctx, tag, uid)...)
c = append(c, p.nestingProcessors(ctx, tag, uid)...)
processors = append(processors, p.nestingProcessors()...)

return c
return processors
}

func (p LoggingProcessorElasticsearchJson) severityParser(ctx context.Context, tag, uid string) []fluentbit.Component {
func (p LoggingProcessorMacroElasticsearchJson) severityParser() confgenerator.InternalLoggingProcessor {
return confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
Expand All @@ -199,10 +196,10 @@ func (p LoggingProcessorElasticsearchJson) severityParser(ctx context.Context, t
},
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(ctx, tag, uid)
}
}

func (p LoggingProcessorElasticsearchJson) nestingProcessors(ctx context.Context, tag, uid string) []fluentbit.Component {
func (p LoggingProcessorMacroElasticsearchJson) nestingProcessors() []confgenerator.InternalLoggingProcessor {
// The majority of these prefixes come from here:
// https://www.elastic.co/guide/en/elasticsearch/reference/7.16/audit-event-types.html#audit-event-attributes
// Non-audit logs are formatted using the layout documented here, giving the "cluster" prefix:
Expand All @@ -223,17 +220,17 @@ func (p LoggingProcessorElasticsearchJson) nestingProcessors(ctx context.Context
"cluster",
}

c := make([]fluentbit.Component, 0, len(prefixes))
processors := make([]confgenerator.InternalLoggingProcessor, 0, len(prefixes))
for _, prefix := range prefixes {
nestProcessor := confgenerator.LoggingProcessorNestWildcard{
Wildcard: fmt.Sprintf("%s.*", prefix),
NestUnder: prefix,
RemovePrefix: fmt.Sprintf("%s.", prefix),
}
c = append(c, nestProcessor.Components(ctx, tag, uid)...)
processors = append(processors, nestProcessor)
}

return c
return processors
}

type LoggingProcessorElasticsearchGC struct {
Expand Down Expand Up @@ -271,12 +268,12 @@ func (p LoggingProcessorElasticsearchGC) Components(ctx context.Context, tag, ui
return c
}

type LoggingReceiverElasticsearchJson struct {
LoggingProcessorElasticsearchJson `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline"`
type LoggingReceiverMacroElasticsearchJson struct {
LoggingProcessorMacroElasticsearchJson `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}

func (r LoggingReceiverElasticsearchJson) Components(ctx context.Context, tag string) []fluentbit.Component {
func (r LoggingReceiverMacroElasticsearchJson) Expand(ctx context.Context) (confgenerator.InternalLoggingReceiver, []confgenerator.InternalLoggingProcessor) {
if len(r.ReceiverMixin.IncludePaths) == 0 {
// Default JSON logs for Elasticsearch
r.ReceiverMixin.IncludePaths = []string{
Expand All @@ -296,7 +293,6 @@ func (r LoggingReceiverElasticsearchJson) Components(ctx context.Context, tag st
// -- snip --
// "at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:166) ~[elasticsearch-7.16.2.jar:7.16.2]",
// "... 6 more"] }

r.ReceiverMixin.MultilineRules = []confgenerator.MultilineRule{
{
StateName: "start_state",
Expand All @@ -310,8 +306,7 @@ func (r LoggingReceiverElasticsearchJson) Components(ctx context.Context, tag st
},
}

c := r.ReceiverMixin.Components(ctx, tag)
return append(c, r.LoggingProcessorElasticsearchJson.Components(ctx, tag, "elasticsearch_json")...)
return &r.ReceiverMixin, r.LoggingProcessorMacroElasticsearchJson.Expand(ctx)
}

type LoggingReceiverElasticsearchGC struct {
Expand All @@ -332,6 +327,8 @@ func (r LoggingReceiverElasticsearchGC) Components(ctx context.Context, tag stri
}

func init() {
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverElasticsearchJson{} })
confgenerator.RegisterLoggingReceiverMacro(func() LoggingReceiverMacroElasticsearchJson {
return LoggingReceiverMacroElasticsearchJson{}
})
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverElasticsearchGC{} })
}
8 changes: 4 additions & 4 deletions confgenerator/testdata/feature/golden.csv
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ App,Field,Override,
*apps.LoggingReceiverElasticsearchGC,confgenerator.LoggingReceiverFilesMixin.BufferInMemory,
*apps.LoggingReceiverElasticsearchGC,confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
*apps.LoggingReceiverElasticsearchGC,confgenerator.LoggingReceiverFilesMixin.WildcardRefreshInterval,
*apps.LoggingReceiverElasticsearchJson,apps.LoggingProcessorElasticsearchJson.confgenerator.ConfigComponent.Type,
*apps.LoggingReceiverElasticsearchJson,confgenerator.LoggingReceiverFilesMixin.BufferInMemory,
*apps.LoggingReceiverElasticsearchJson,confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
*apps.LoggingReceiverElasticsearchJson,confgenerator.LoggingReceiverFilesMixin.WildcardRefreshInterval,
*apps.LoggingReceiverHadoop,apps.LoggingProcessorHadoop.confgenerator.ConfigComponent.Type,
*apps.LoggingReceiverHadoop,confgenerator.LoggingReceiverFilesMixin.BufferInMemory,
*apps.LoggingReceiverHadoop,confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
Expand Down Expand Up @@ -259,3 +255,7 @@ App,Field,Override,
*confgenerator.loggingReceiverMacroAdapter[*github.com/GoogleCloudPlatform/ops-agent/confgenerator.loggingFilesProcessorMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingProcessorMacroRedis]],confgenerator.ConfigComponent.Type,
*confgenerator.loggingReceiverMacroAdapter[*github.com/GoogleCloudPlatform/ops-agent/confgenerator.loggingFilesProcessorMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingProcessorMacroVarnish]],ReceiverMacro,
*confgenerator.loggingReceiverMacroAdapter[*github.com/GoogleCloudPlatform/ops-agent/confgenerator.loggingFilesProcessorMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingProcessorMacroVarnish]],confgenerator.ConfigComponent.Type,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroElasticsearchJson],apps.LoggingReceiverMacroElasticsearchJson.confgenerator.LoggingReceiverFilesMixin.BufferInMemory,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroElasticsearchJson],apps.LoggingReceiverMacroElasticsearchJson.confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroElasticsearchJson],apps.LoggingReceiverMacroElasticsearchJson.confgenerator.LoggingReceiverFilesMixin.WildcardRefreshInterval,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroElasticsearchJson],confgenerator.ConfigComponent.Type,
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- type: elasticsearch_json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{"type": "server", "timestamp": "2024-06-10T12:34:56,789Z", "level": "INFO", "component": "o.e.n.Node", "cluster.name": "elasticsearch", "node.name": "node-1", "message": "starting ..."}
{"type": "server", "timestamp": "2024-06-10T12:35:00,321Z", "level": "WARN", "component": "o.e.d.z.ZenDiscovery", "node.name": "node-1", "message": "failed to connect to master", "user.run_by.name": "admin", "event.action": "connect"}
{"type": "server", "timestamp": "2024-06-10T12:35:10,123Z", "level": "ERROR", "component": "o.e.b.Bootstrap", "node.name": "node-1", "message": "Exception", "error.type": "java.lang.IllegalStateException", "error.message": "failed to obtain node locks"}
{"type": "server", "timestamp": "2024-06-10T12:35:15,456Z", "level": "DEBUG", "component": "o.e.n.Node", "node.name": "node-1", "message": "stopping ..."}
{"type": "server", "timestamp": "2024-06-10T12:35:16,789Z", "level": "TRACE", "component": "o.e.n.Node", "node.name": "node-1", "message": "stopped"}
{"type": "server", "timestamp": "2024-06-10T12:35:17,123Z", "level": "DEPRECATION", "component": "o.e.n.Node", "node.name": "node-1", "message": "deprecated setting used"}
{"type": "server", "timestamp": "2024-06-10T12:35:18,456Z", "level": "CRITICAL", "component": "o.e.n.Node", "node.name": "node-1", "message": "critical error occurred"}
{"type": "server", "timestamp": "2024-06-10T12:35:19,456Z", "level": "FATAL", "component": "o.e.n.Node", "node.name": "node-1", "message": "fatal error occurred"}
{"type": "server", "timestamp": "2024-06-10T12:35:20,456Z", "level": "UNKNOWN", "component": "o.e.n.Node", "node.name": "node-1", "message": "unknown log level"}
{"type": "server", "timestamp": "2024-06-10T12:35:21,456Z", "level": "INFO", "message": "missing component and node name"}
{"type": "server", "timestamp": "2024-06-10T12:35:22,456Z", "level": "INFO", "component": "o.e.n.Node", "cluster.name": "elasticsearch", "message": "missing node name"}
{"type": "server", "timestamp": "2024-06-10T12:35:23,456Z", "level": "INFO", "component": "o.e.n.Node", "node.name": "node-1", "message": "missing cluster name"}
{"type": "server", "timestamp": "2024-06-10T12:35:24,456Z", "level": "INFO", "component": "o.e.n.Node", "node.name": "node-1", "message": "nested fields", "user.run_by.name": "testuser", "authentication.token.id": "abc123"}
{"type": "server", "timestamp": "2024-06-10T12:35:25,456Z", "level": "ERROR", "component": "o.e.b.ElasticsearchUncaughtExceptionHandler", "node.name": "node-1", "message": "uncaught exception in thread [main]", "stacktrace": ["org.elasticsearch.bootstrap.StartupException: java.lang.IllegalArgumentException: unknown setting [invalid.key]", "at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:166)", "... 6 more"]}
{"type": "server", "timestamp": "2024-06-10T12:35:26,456Z", "level": "INFO", "component": "o.e.n.Node", "node.name": "node-1", "message": "invalid timestamp", "timestamp": "not-a-timestamp"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
- entries:
Copy link
Contributor

@franciscovalentecastro franciscovalentecastro Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This is a general comment that i mentioned also in #1978 (comment). Another way of verifying that the refactor is correct is that the output_fluentbit.yaml should be the same before and after the update to elasticsearch.go.

- jsonPayload:
cluster:
name: elasticsearch
component: o.e.n.Node
level: INFO
message: starting ...
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2024-06-10T12:34:56.789000000Z
- jsonPayload:
component: o.e.d.z.ZenDiscovery
event:
action: connect
level: WARN
message: failed to connect to master
node:
name: node-1
type: server
user:
run_by:
name: admin
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 400.0
timestamp: 2024-06-10T12:35:00.321000000Z
- jsonPayload:
component: o.e.b.Bootstrap
error.message: failed to obtain node locks
error.type: java.lang.IllegalStateException
level: ERROR
message: Exception
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 500.0
timestamp: 2024-06-10T12:35:10.123000000Z
- jsonPayload:
component: o.e.n.Node
level: DEBUG
message: stopping ...
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 100.0
timestamp: 2024-06-10T12:35:15.456000000Z
- jsonPayload:
component: o.e.n.Node
level: TRACE
message: stopped
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 100.0
timestamp: 2024-06-10T12:35:16.789000000Z
- jsonPayload:
component: o.e.n.Node
level: DEPRECATION
message: deprecated setting used
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 400.0
timestamp: 2024-06-10T12:35:17.123000000Z
- jsonPayload:
component: o.e.n.Node
level: CRITICAL
message: critical error occurred
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 500.0
timestamp: 2024-06-10T12:35:18.456000000Z
- jsonPayload:
component: o.e.n.Node
level: FATAL
message: fatal error occurred
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 600.0
timestamp: 2024-06-10T12:35:19.456000000Z
- jsonPayload:
component: o.e.n.Node
level: UNKNOWN
message: unknown log level
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
timestamp: 2024-06-10T12:35:20.456000000Z
- jsonPayload:
level: INFO
message: missing component and node name
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2024-06-10T12:35:21.456000000Z
- jsonPayload:
cluster:
name: elasticsearch
component: o.e.n.Node
level: INFO
message: missing node name
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2024-06-10T12:35:22.456000000Z
- jsonPayload:
component: o.e.n.Node
level: INFO
message: missing cluster name
node:
name: node-1
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2024-06-10T12:35:23.456000000Z
- jsonPayload:
authentication:
token:
id: abc123
component: o.e.n.Node
level: INFO
message: nested fields
node:
name: node-1
type: server
user:
run_by:
name: testuser
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2024-06-10T12:35:24.456000000Z
- jsonPayload:
component: o.e.b.ElasticsearchUncaughtExceptionHandler
level: ERROR
message: uncaught exception in thread [main]
node:
name: node-1
stacktrace:
- "org.elasticsearch.bootstrap.StartupException: java.lang.IllegalArgumentException: unknown setting [invalid.key]"
- at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:166)
- ... 6 more
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 500.0
timestamp: 2024-06-10T12:35:25.456000000Z
- jsonPayload:
component: o.e.n.Node
level: INFO
message: invalid timestamp
node:
name: node-1
timestamp: not-a-timestamp
type: server
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/elasticsearch_json
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2024-06-10T12:35:26.456000000Z
partialSuccess: true
resource:
labels: {}
type: gce_instance
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- config_error: "failed generating OTel processor: &confgenerator.loggingProcessorMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingProcessorMacroElasticsearchJson]{ConfigComponent:confgenerator.ConfigComponent{Type:\"elasticsearch_json\"}, ProcessorMacro:apps.LoggingProcessorMacroElasticsearchJson{}}, err: unimplemented"
4 changes: 3 additions & 1 deletion transformation_test/transformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"time"

logpb "cloud.google.com/go/logging/apiv2/loggingpb"
_ "github.com/GoogleCloudPlatform/ops-agent/apps"
"github.com/GoogleCloudPlatform/ops-agent/apps"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
Expand Down Expand Up @@ -593,4 +593,6 @@ func sanitizeStacktrace(t *testing.T, input string) string {
func init() {
// The processors registered here are only meant to be used in transformation tests.
confgenerator.LoggingProcessorTypes.RegisterType(func() confgenerator.LoggingProcessor { return &confgenerator.LoggingProcessorWindowsEventLogV1{} })
confgenerator.RegisterLoggingProcessorMacro[apps.LoggingProcessorMacroElasticsearchJson]()

}