Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2594945
Implement `LoggingReceiverFluentForward` in Otel Logging.
franciscovalentecastro Nov 14, 2025
4f57eec
Update otelopscol with `fluentforward` otel receiver.
franciscovalentecastro Nov 14, 2025
9eaa386
Add `otel_logging` to `TestFluentForwardLog`.
franciscovalentecastro Nov 14, 2025
3409ae0
Add `logging-otel-receiver_forward` confgenerator test.
franciscovalentecastro Nov 14, 2025
a793c03
Move `labels` (otel `attributes`) to `jsonPayload`.
franciscovalentecastro Nov 14, 2025
261d690
Use only `CopyFrom` for testing.
franciscovalentecastro Nov 14, 2025
9a026b2
Update goldens.
franciscovalentecastro Nov 14, 2025
d69544e
Try using `CustomConvertFunc`.
franciscovalentecastro Nov 15, 2025
2b7f19a
Update goldens
franciscovalentecastro Nov 15, 2025
d0762be
Set empty body if missing.
franciscovalentecastro Nov 15, 2025
0241dab
Update goldens.
franciscovalentecastro Nov 15, 2025
760999c
Handle all cases with transform processor.
franciscovalentecastro Nov 15, 2025
f681257
Update goldens.
franciscovalentecastro Nov 15, 2025
1598b3f
Create `otelFluentForwardSetLogNameComponents`.
franciscovalentecastro Nov 15, 2025
a682e1a
Update goldens.
franciscovalentecastro Nov 15, 2025
455986a
Update `Concat` implementation.
franciscovalentecastro Nov 17, 2025
7289836
Update goldens.
franciscovalentecastro Nov 17, 2025
d79faa6
Update to `otelFluentForwardSetLogNameComponents`.
franciscovalentecastro Nov 17, 2025
5853f1d
Add comments and update `ModifyFields` use.
franciscovalentecastro Nov 18, 2025
0cf01ec
Update goldens.
franciscovalentecastro Nov 18, 2025
44bba8b
Add some tests.
franciscovalentecastro Nov 18, 2025
a07a642
Use `transform_processor` instead of `modify_fields` to append `fluen…
franciscovalentecastro Nov 18, 2025
a935c90
Update goldens.
franciscovalentecastro Nov 18, 2025
44afff2
Fix otelopscol commit.
franciscovalentecastro Dec 9, 2025
a5f8c6d
Merge branch 'master' into fcovalente-fluent-forward
franciscovalentecastro Dec 9, 2025
e088e15
Simplify test.
franciscovalentecastro Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion confgenerator/confgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,17 @@ func (p PipelineInstance) OTelComponents(ctx context.Context) (map[string]otel.R
}

if processors, ok := receiverPipeline.Processors["logs"]; ok {
receiverPipeline.Processors["logs"] = append(
pipelineProcessors := append(
processors,
otelSetLogNameComponents(ctx, p.RID)...,
)
if p.Receiver.Type() == "fluent_forward" {
pipelineProcessors = append(
pipelineProcessors,
otelFluentForwardSetLogNameComponents()...,
)
}
receiverPipeline.Processors["logs"] = pipelineProcessors
}

outR[receiverPipelineName] = receiverPipeline
Expand Down
17 changes: 17 additions & 0 deletions confgenerator/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel/ottl"
"github.com/GoogleCloudPlatform/ops-agent/internal/platform"
)

Expand Down Expand Up @@ -77,6 +78,22 @@ func otelSetLogNameComponents(ctx context.Context, logName string) []otel.Compon
return components
}

func otelFluentForwardSetLogNameComponents() []otel.Component {
bodyFluentTag := ottl.LValue{"body", "fluent.tag"}
logName := ottl.LValue{"attributes", "gcp.log_name"}

return []otel.Component{
otel.Transform(
"log", "log",
ottl.NewStatements(
logName.SetIf(ottl.Concat([]ottl.Value{logName, bodyFluentTag}, "."),
ottl.And(logName.IsPresent(), bodyFluentTag.IsPresent())),
bodyFluentTag.DeleteIf(bodyFluentTag.IsPresent()),
),
),
}
}

// stackdriverOutputComponent generates a component that outputs logs matching the regex `match` using `userAgent`.
func stackdriverOutputComponent(ctx context.Context, match, userAgent, storageLimitSize, compress string) fluentbit.Component {
config := map[string]string{
Expand Down
44 changes: 44 additions & 0 deletions confgenerator/logging_receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,50 @@ func (r LoggingReceiverFluentForward) Components(ctx context.Context, tag string
}}
}

func (r LoggingReceiverFluentForward) Pipelines(ctx context.Context) ([]otel.ReceiverPipeline, error) {
body := ottl.LValue{"body"}
bodyMessage := ottl.LValue{"body", "message"}
attributes := ottl.LValue{"attributes"}
cacheBodyString := ottl.LValue{"cache", "body_string"}
cacheBodyMap := ottl.LValue{"cache", "body_map"}

processors := []otel.Component{
otel.Transform(
"log", "log",
// Transformations required to convert "fluentforwardreceiver" output to the expected ops agent "fluent_forward" LogEntry format.
// In summary, this moves all resulting "fluentforwardreceiver" fields into "body" (jsonPayload).
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/release/v0.136.x/receiver/fluentforwardreceiver/conversion.go#L171
ottl.NewStatements(
// "fluentforwardreceiver" sets "log" and "message" as "body". All other fields are set as "attributes".
cacheBodyString.SetIf(body, body.IsString()),
cacheBodyMap.SetIf(body, body.IsMap()),
// Merge "cache['body_string']", "cache['body_map']" and "attributes" into "body" (jsonPayload).
body.Set(ottl.RValue("{}")),
bodyMessage.SetIf(cacheBodyString, cacheBodyString.IsPresent()),
body.MergeMapsIf(cacheBodyMap, "upsert", cacheBodyMap.IsPresent()),
body.MergeMapsIf(attributes, "upsert", attributes.IsPresent()),
attributes.Set(ottl.RValue("{}")),
),
),
}

return []otel.ReceiverPipeline{{
Receiver: otel.Component{
Type: "fluentforward",
Config: map[string]any{
"endpoint": fmt.Sprintf("%s:%d", r.ListenHost, r.ListenPort),
},
},
Processors: map[string][]otel.Component{
"logs": processors,
},

ExporterTypes: map[string]otel.ExporterType{
"logs": otel.OTel,
},
}}, nil
}

func init() {
LoggingReceiverTypes.RegisterType(func() LoggingReceiver { return &LoggingReceiverFluentForward{} })
}
Expand Down
8 changes: 8 additions & 0 deletions confgenerator/otel/ottl/ottl.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ func ExtractPatternsRubyRegex(a Value, pattern string, omitEmptyValues bool) Val
return valuef(`ExtractPatternsRubyRegex(%s, %q, %v)`, a, pattern, omitEmptyValues)
}

func Concat(values []Value, delimiter string) Value {
stringValues := []string{}
for _, v := range values {
stringValues = append(stringValues, v.String())
}
return valuef(`Concat([%s], "%s")`, strings.Join(stringValues, ","), delimiter)
}

func ConvertCase(a Value, toCase string) Value {
return valuef(`ConvertCase(%s, %q)`, a, toCase)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
otel_logging
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

function process(tag, timestamp, record)
local v = "ops-agent";
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["agent.googleapis.com/health/agentKind"] = value
end)(v)
local v = "latest";
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["agent.googleapis.com/health/agentVersion"] = value
end)(v)
local v = "v1";
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["agent.googleapis.com/health/schemaVersion"] = value
end)(v)
return 2, timestamp, record
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

function process(tag, timestamp, record)
local __field_0 = (function()
return record["severity"]
end)();
(function(value)
record["severity"] = value
end)(nil);
local v = __field_0;
if v == "debug" then v = "DEBUG"
elseif v == "error" then v = "ERROR"
elseif v == "info" then v = "INFO"
elseif v == "warn" then v = "WARNING"
end
(function(value)
record["logging.googleapis.com/severity"] = value
end)(v)
return 2, timestamp, record
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

function shallow_merge(record, parsedRecord)
-- If no exiting record exists
if (record == nil) then
return parsedRecord
end

for k, v in pairs(parsedRecord) do
record[k] = v
end

return record
end

function merge(record, parsedRecord)
-- If no exiting record exists
if record == nil then
return parsedRecord
end

-- Potentially overwrite or merge the original records.
for k, v in pairs(parsedRecord) do
-- If there is no conflict
if k == "logging.googleapis.com/logName" then
-- Ignore the parsed payload since the logName is controlled
-- by the OpsAgent.
elseif k == "logging.googleapis.com/labels" then
-- LogEntry.labels are basically a map[string]string and so only require a
-- shallow merge (one level deep merge).
record[k] = shallow_merge(record[k], v)
else
record[k] = v
end
end

return record
end

function parser_merge_record(tag, timestamp, record)
originalPayload = record["logging.googleapis.com/__tmp"]
if originalPayload == nil then
return 0, timestamp, record
end

-- Remove original payload
record["logging.googleapis.com/__tmp"] = nil
record = merge(originalPayload, record)
return 2, timestamp, record
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

function parser_nest(tag, timestamp, record)
local nestedRecord = {}
local parseKey = "message"
for k, v in pairs(record) do
if k ~= parseKey then
nestedRecord[k] = v
end
end

local result = {}
result[parseKey] = record[parseKey]
result["logging.googleapis.com/__tmp"] = nestedRecord

return 2, timestamp, result
end

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"resourceMetrics":[{"resource":{"attributes":[{"key":"k","value":{"stringValue":"v"}}]},"scopeMetrics":[{"scope":{},"metrics":[{"name":"agent.googleapis.com/agent/ops_agent/enabled_receivers","gauge":{"dataPoints":[{"attributes":[{"key":"telemetry_type","value":{"stringValue":"metrics"}},{"key":"receiver_type","value":{"stringValue":"hostmetrics"}}],"asInt":"1"},{"attributes":[{"key":"telemetry_type","value":{"stringValue":"logs"}},{"key":"receiver_type","value":{"stringValue":"files"}}],"asInt":"1"},{"attributes":[{"key":"telemetry_type","value":{"stringValue":"logs"}},{"key":"receiver_type","value":{"stringValue":"fluent_forward"}}],"asInt":"1"}]}}]}]}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"resourceMetrics":[{"resource":{"attributes":[{"key":"k","value":{"stringValue":"v"}}]},"scopeMetrics":[{"scope":{},"metrics":[{"name":"agent.googleapis.com/agent/internal/ops/feature_tracking","gauge":{"dataPoints":[{"attributes":[{"key":"module","value":{"stringValue":"logging"}},{"key":"feature","value":{"stringValue":"service:pipelines"}},{"key":"key","value":{"stringValue":"default_pipeline_overridden"}},{"key":"value","value":{"stringValue":"false"}}],"asInt":"1"},{"attributes":[{"key":"module","value":{"stringValue":"metrics"}},{"key":"feature","value":{"stringValue":"service:pipelines"}},{"key":"key","value":{"stringValue":"default_pipeline_overridden"}},{"key":"value","value":{"stringValue":"false"}}],"asInt":"1"},{"attributes":[{"key":"module","value":{"stringValue":"global"}},{"key":"feature","value":{"stringValue":"default:self_log"}},{"key":"key","value":{"stringValue":"default_self_log_file_collection"}},{"key":"value","value":{"stringValue":"true"}}],"asInt":"1"},{"attributes":[{"key":"module","value":{"stringValue":"logging"}},{"key":"feature","value":{"stringValue":"service:otel_logging"}},{"key":"key","value":{"stringValue":"otel_logging_supported_config"}},{"key":"value","value":{"stringValue":"true"}}],"asInt":"1"},{"attributes":[{"key":"module","value":{"stringValue":"logging"}},{"key":"feature","value":{"stringValue":"receivers:fluent_forward"}},{"key":"key","value":{"stringValue":"[0].enabled"}},{"key":"value","value":{"stringValue":"true"}}],"asInt":"1"}]}}]}]}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
- module: logging
feature: service:pipelines
key: default_pipeline_overridden
value: "false"
- module: metrics
feature: service:pipelines
key: default_pipeline_overridden
value: "false"
- module: global
feature: default:self_log
key: default_self_log_file_collection
value: "true"
- module: logging
feature: service:otel_logging
key: otel_logging_supported_config
value: "true"
- module: logging
feature: receivers:fluent_forward
key: "[0].enabled"
value: "true"
Loading
Loading