Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 34 additions & 39 deletions apps/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,41 @@ import (
"context"

"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/internal/secret"
)

type LoggingProcessorRabbitmq struct {
confgenerator.ConfigComponent `yaml:",inline"`
}
type LoggingProcessorMacroRabbitmq struct{}

func (*LoggingProcessorRabbitmq) Type() string {
func (LoggingProcessorMacroRabbitmq) Type() string {
return "rabbitmq"
}

func (p *LoggingProcessorRabbitmq) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
c := confgenerator.LoggingProcessorParseRegexComplex{
Parsers: []confgenerator.RegexParser{
{
// Sample log line:
// 2022-01-31 18:01:20.441571+00:00 [erro] <0.692.0> ** Connection attempt from node 'rabbit_ctl_17@keith-testing-rabbitmq' rejected. Invalid challenge reply. **
Regex: `^(?<timestamp>\d+-\d+-\d+\s+\d+:\d+:\d+[.,]\d+\+\d+:\d+) \[(?<severity>\w+)\] \<(?<process_id>\d+\.\d+\.\d+)\> (?<message>.*)$`,
Parser: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%d %H:%M:%S.%L%z",
func (p LoggingProcessorMacroRabbitmq) Expand(ctx context.Context) []confgenerator.InternalLoggingProcessor {
return []confgenerator.InternalLoggingProcessor{
confgenerator.LoggingProcessorParseRegexComplex{
Parsers: []confgenerator.RegexParser{
{
// Sample log line:
// 2022-01-31 18:01:20.441571+00:00 [erro] <0.692.0> ** Connection attempt from node 'rabbit_ctl_17@keith-testing-rabbitmq' rejected. Invalid challenge reply. **
Regex: `^(?<timestamp>\d+-\d+-\d+\s+\d+:\d+:\d+[.,]\d+\+\d+:\d+) \[(?<severity>\w+)\] \<(?<process_id>\d+\.\d+\.\d+)\> (?<message>.*)$`,
Parser: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%d %H:%M:%S.%L%z",
},
},
},
{
// Sample log line:
// 2023-02-01 12:45:14.705 [info] <0.801.0> Successfully set user tags for user 'admin' to [administrator]
Regex: `^(?<timestamp>\d+-\d+-\d+\s+\d+:\d+:\d+[.,]\d+\d+\d+) \[(?<severity>\w+)\] \<(?<process_id>\d+\.\d+\.\d+)\> (?<message>.*)$`,
Parser: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%d %H:%M:%S.%L",
{
// Sample log line:
// 2023-02-01 12:45:14.705 [info] <0.801.0> Successfully set user tags for user 'admin' to [administrator]
Regex: `^(?<timestamp>\d+-\d+-\d+\s+\d+:\d+:\d+[.,]\d+\d+\d+) \[(?<severity>\w+)\] \<(?<process_id>\d+\.\d+\.\d+)\> (?<message>.*)$`,
Parser: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%d %H:%M:%S.%L",
},
},
},
},
}.Components(ctx, tag, uid)

// severities documented here: https://www.rabbitmq.com/logging.html#log-levels
c = append(c,
// severities documented here: https://www.rabbitmq.com/logging.html#log-levels
confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
Expand All @@ -72,18 +68,16 @@ func (p *LoggingProcessorRabbitmq) Components(ctx context.Context, tag, uid stri
},
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(ctx, tag, uid)...,
)

return c
},
}
}

type LoggingReceiverRabbitmq struct {
LoggingProcessorRabbitmq `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
type LoggingReceiverMacroRabbitmq struct {
LoggingProcessorMacroRabbitmq `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}

func (r LoggingReceiverRabbitmq) Components(ctx context.Context, tag string) []fluentbit.Component {
func (r LoggingReceiverMacroRabbitmq) Expand(ctx context.Context) (confgenerator.InternalLoggingReceiver, []confgenerator.InternalLoggingProcessor) {
if len(r.ReceiverMixin.IncludePaths) == 0 {
r.ReceiverMixin.IncludePaths = []string{
"/var/log/rabbitmq/rabbit*.log",
Expand All @@ -108,13 +102,14 @@ func (r LoggingReceiverRabbitmq) Components(ctx context.Context, tag string) []f
Regex: `^(?!\d+-\d+-\d+ \d+:\d+:\d+\.\d+\+\d+:\d+)`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the r.ReceiverMixin.MultilineRules to a standalone confgenerator.LoggingProcessorParseMultilineRegex within the Expand method of "LoggingProcessorMacro".

The template for this update is elasticsearch.go in master after #2025 .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this into the Expand method and it seemed to not correctly parse the multiline entries. You can see with the diff in the golden files on my most recent commit.

},
}
c := r.ReceiverMixin.Components(ctx, tag)
c = append(c, r.LoggingProcessorRabbitmq.Components(ctx, tag, "rabbitmq")...)
return c

return &r.ReceiverMixin, r.LoggingProcessorMacroRabbitmq.Expand(ctx)
}

func init() {
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverRabbitmq{} })
confgenerator.RegisterLoggingReceiverMacro(func() LoggingReceiverMacroRabbitmq {
return LoggingReceiverMacroRabbitmq{}
})
}

type MetricsReceiverRabbitmq struct {
Expand Down
8 changes: 4 additions & 4 deletions confgenerator/testdata/feature/golden.csv
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ App,Field,Override,
*apps.LoggingReceiverPostgresql,confgenerator.LoggingReceiverFilesMixin.BufferInMemory,
*apps.LoggingReceiverPostgresql,confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
*apps.LoggingReceiverPostgresql,confgenerator.LoggingReceiverFilesMixin.WildcardRefreshInterval,
*apps.LoggingReceiverRabbitmq,apps.LoggingProcessorRabbitmq.confgenerator.ConfigComponent.Type,
*apps.LoggingReceiverRabbitmq,confgenerator.LoggingReceiverFilesMixin.BufferInMemory,
*apps.LoggingReceiverRabbitmq,confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
*apps.LoggingReceiverRabbitmq,confgenerator.LoggingReceiverFilesMixin.WildcardRefreshInterval,
*apps.LoggingReceiverSapHanaTrace,apps.LoggingProcessorSapHanaTrace.confgenerator.ConfigComponent.Type,
*apps.LoggingReceiverSapHanaTrace,confgenerator.LoggingReceiverFilesMixin.BufferInMemory,
*apps.LoggingReceiverSapHanaTrace,confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
Expand Down Expand Up @@ -251,3 +247,7 @@ App,Field,Override,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroElasticsearchJson],apps.LoggingReceiverMacroElasticsearchJson.confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroElasticsearchJson],apps.LoggingReceiverMacroElasticsearchJson.confgenerator.LoggingReceiverFilesMixin.WildcardRefreshInterval,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroElasticsearchJson],confgenerator.ConfigComponent.Type,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroRabbitmq],apps.LoggingReceiverMacroRabbitmq.confgenerator.LoggingReceiverFilesMixin.BufferInMemory,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroRabbitmq],apps.LoggingReceiverMacroRabbitmq.confgenerator.LoggingReceiverFilesMixin.RecordLogFilePath,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroRabbitmq],apps.LoggingReceiverMacroRabbitmq.confgenerator.LoggingReceiverFilesMixin.WildcardRefreshInterval,
*confgenerator.loggingReceiverMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingReceiverMacroRabbitmq],confgenerator.ConfigComponent.Type,
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- type: rabbitmq
19 changes: 19 additions & 0 deletions transformation_test/testdata/logging_processor-rabbitmq/input.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
2025-08-18 01:01:20.441571+00:00 [erro] <0.692.0> ** Connection attempt from node 'rabbit_ctl_17@keith-testing-rabbitmq' rejected. Invalid challenge reply. **
2025-08-18 01:05:10.000123+02:00 [info] <0.801.0> Successfully set user tags for user 'admin' to [administrator]
2025-08-18 01:06:33.987654-07:00 [warning] <0.550.0> Queue 'task_queue' is nearing memory threshold.
2025-08-18 01:45:14.705 [info] <0.801.0> Successfully set user tags for user 'guest' to []
2025-08-18 01:46:00.123 [debug] <0.200.0> Accepting AMQP connection <0.200.0> from 127.0.0.1:5672
2025-08-18 02:15:45.987+00:00 [error] <0.310.0> Login failed for user 'guest': invalid credentials
2025-08-18 02:16:01.001+00:00 [warning] <0.312.0> User 'guest' attempted to access vhost '/' but was denied
2025-08-18 02:22:33.444+00:00 [noti] <0.400.0> Node rabbit@node1 down: lost connection
2025-08-18 02:23:00.999+00:00 [info] <0.401.0> Node rabbit@node1 recovered and rejoined cluster
2025-08-18 02:55:55.123+00:00 [warning] <0.500.0> Memory high watermark reached (used: 2048MB, limit: 2000MB)
2025-08-18 03:56:01.456+00:00 [erro] <0.501.0> Disk free space too low. Free bytes: 500000, limit: 1000000
2025-08-18 04:06:06.789+00:00 [erro] <0.600.0> AMQP connection <0.600.0> (10.0.0.1:5672 -> 10.0.0.2:5672): connection closed unexpectedly
2025-08-18 04:06:10.101+00:00 [debug] <0.601.0> Re-establishing connection to peer rabbit@node2
2025-08-18 04:07:07.777+00:00 [noti] <123.456.789> {shutdown,{connection_closed, "Broken pipe"}}
2025-08-18 04:07:08.888 [erro] <0.999.0> Unexpected exception: {'EXIT',{{badmatch,{error,enoent}},[{rabbit_misc,read_config,1,[]}]}}
2025-08-18 05:07:43.557042+00:00 [erro] <0.130.0>
BOOT FAILED
===========
ERROR: could not bind to distribution port 25672, it is in use by another node: rabbit@keith-testing-rabbitmq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more log line samples at the end of input.log so the last multiline log entry is parsed completely.

Nit : Add a new line at the end of the file.

Copy link
Collaborator Author

@dyl10s dyl10s Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change but the parsing still does not seem to be working correctly. No changes are made after running make transformation_test_update

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see 🤔 ! Let me try to debug the reason of the multiline parse is not working.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added two "sample" log lines at the end of the input.log and the multiline parsing worked (see sample output at the end of the comment). My suggestion is two add more sample lines at the bottom of input.log.

Why this works ?

The way fluent-bit multiparse rules work is using a start_state and a cont state. The cont state in rabbitmq (^(?!\d+-\d+-\d+ \d+:\d+:\d+\.\d+\+\d+:\d+)) will keep matching lines that doesn't look as the start_state (don't have the timestamp log format). This means, for a multiline log entry to be completely parsed it needs a followup log that matches start_state so fluent-bit know the previous log ended.

input.log

...
2025-08-18 05:07:43.557042+00:00 [erro] <0.130.0>
BOOT FAILED
===========
ERROR: could not bind to distribution port 25672, it is in use by another node: rabbit@keith-testing-rabbitmq
2025-08-19 04:07:07.777+00:00 [noti] <123.456.789> {shutdown,{connection_closed, "Broken pipe"}}
2025-08-19 04:07:07.777+00:00 [noti] <123.456.789> {shutdown,{connection_closed, "Broken pipe"}}

output_fluentbit.yaml

...
    severity: 0.0
    timestamp: 2025-08-18T04:07:07.777000000Z
  - jsonPayload:
      message: |-
        2025-08-18 05:07:43.557042+00:00 [erro] <0.130.0>
        BOOT FAILED
        ===========
        ERROR: could not bind to distribution port 25672, it is in use by another node: rabbit@keith-testing-rabbitmq
    labels:
      compute.googleapis.com/resource_name: hostname
      logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
    logName: projects/my-project/logs/transformation_test
    timestamp: now
  - jsonPayload:
      message: "{shutdown,{connection_closed, \"Broken pipe\"}}"
      process_id: 123.456.789
      severity: noti
    labels:
      compute.googleapis.com/resource_name: hostname
      logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
    logName: projects/my-project/logs/transformation_test
    severity: 0.0
    timestamp: now
  partialSuccess: true
  resource:
    labels: {}
    type: gce_instance

Copy link
Member

@quentinmit quentinmit Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a fluent-bit bug. The flush_timeout setting is supposed to cause the final log entry to be emitted after the timeout expires even if there isn't another line. I'm guessing that the exit_on_eof setting is causing fluent-bit to exit before it has a chance to flush.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the additional log lines here but it does seem like a bug that could get fixed. This can work as a temporary solution though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good ! I created b/439825446 to track this bug.

Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
- entries:
- jsonPayload:
message: "** Connection attempt from node 'rabbit_ctl_17@keith-testing-rabbitmq' rejected. Invalid challenge reply. **"
process_id: 0.692.0
severity: erro
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
timestamp: 2025-08-18T01:01:20.441571000Z
- jsonPayload:
message: Successfully set user tags for user 'admin' to [administrator]
process_id: 0.801.0
severity: info
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2025-08-17T23:05:10.000123000Z
- jsonPayload:
message: 2025-08-18 01:06:33.987654-07:00 [warning] <0.550.0> Queue 'task_queue' is nearing memory threshold.
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
timestamp: now
- jsonPayload:
message: Successfully set user tags for user 'guest' to []
process_id: 0.801.0
severity: info
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2025-08-18T01:45:14.705000000Z
- jsonPayload:
message: Accepting AMQP connection <0.200.0> from 127.0.0.1:5672
process_id: 0.200.0
severity: debug
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 100.0
timestamp: 2025-08-18T01:46:00.123000000Z
- jsonPayload:
message: "Login failed for user 'guest': invalid credentials"
process_id: 0.310.0
severity: error
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 500.0
timestamp: 2025-08-18T02:15:45.987000000Z
- jsonPayload:
message: User 'guest' attempted to access vhost '/' but was denied
process_id: 0.312.0
severity: warning
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 400.0
timestamp: 2025-08-18T02:16:01.001000000Z
- jsonPayload:
message: "Node rabbit@node1 down: lost connection"
process_id: 0.400.0
severity: noti
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 0.0
timestamp: 2025-08-18T02:22:33.444000000Z
- jsonPayload:
message: Node rabbit@node1 recovered and rejoined cluster
process_id: 0.401.0
severity: info
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 200.0
timestamp: 2025-08-18T02:23:00.999000000Z
- jsonPayload:
message: "Memory high watermark reached (used: 2048MB, limit: 2000MB)"
process_id: 0.500.0
severity: warning
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 400.0
timestamp: 2025-08-18T02:55:55.123000000Z
- jsonPayload:
message: "Disk free space too low. Free bytes: 500000, limit: 1000000"
process_id: 0.501.0
severity: erro
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
timestamp: 2025-08-18T03:56:01.456000000Z
- jsonPayload:
message: "AMQP connection <0.600.0> (10.0.0.1:5672 -> 10.0.0.2:5672): connection closed unexpectedly"
process_id: 0.600.0
severity: erro
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
timestamp: 2025-08-18T04:06:06.789000000Z
- jsonPayload:
message: Re-establishing connection to peer rabbit@node2
process_id: 0.601.0
severity: debug
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 100.0
timestamp: 2025-08-18T04:06:10.101000000Z
- jsonPayload:
message: "{shutdown,{connection_closed, \"Broken pipe\"}}"
process_id: 123.456.789
severity: noti
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
severity: 0.0
timestamp: 2025-08-18T04:07:07.777000000Z
- jsonPayload:
message: "Unexpected exception: {'EXIT',{{badmatch,{error,enoent}},[{rabbit_misc,read_config,1,[]}]}}"
process_id: 0.999.0
severity: erro
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
timestamp: 2025-08-18T04:07:08.888000000Z
- jsonPayload:
message: 2025-08-18 05:07:43.557042+00:00 [erro] <0.130.0>
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
timestamp: now
- jsonPayload:
message: BOOT FAILED
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
timestamp: now
- jsonPayload:
message: ===========
labels:
logging.googleapis.com/instrumentation_source: agent.googleapis.com/rabbitmq
logName: projects/my-project/logs/transformation_test
timestamp: now
partialSuccess: true
resource:
labels: {}
type: gce_instance
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- config_error: "failed generating OTel processor: &confgenerator.loggingProcessorMacroAdapter[github.com/GoogleCloudPlatform/ops-agent/apps.LoggingProcessorMacroRabbitmq]{ConfigComponent:confgenerator.ConfigComponent{Type:\"rabbitmq\"}, ProcessorMacro:apps.LoggingProcessorMacroRabbitmq{}}, err: unimplemented"
2 changes: 1 addition & 1 deletion transformation_test/transformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,5 +594,5 @@ func init() {
// The processors registered here are only meant to be used in transformation tests.
confgenerator.LoggingProcessorTypes.RegisterType(func() confgenerator.LoggingProcessor { return &confgenerator.LoggingProcessorWindowsEventLogV1{} })
confgenerator.RegisterLoggingProcessorMacro[apps.LoggingProcessorMacroElasticsearchJson]()

confgenerator.RegisterLoggingProcessorMacro[apps.LoggingProcessorMacroRabbitmq]()
}