Skip to content

output: add metrics for dropped oldest chunk count #4981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def initialize
@rollback_count_metrics = nil
@flush_time_count_metrics = nil
@slow_flush_count_metrics = nil
@drop_oldest_chunk_count_metrics = nil
@enable_size_metrics = false

# How to process events is decided here at once, but it will be decided in delayed way on #configure & #start
Expand Down Expand Up @@ -259,6 +260,7 @@ def configure(conf)
@rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
@flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
@slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")
@drop_oldest_chunk_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "drop_oldest_chunk_count", help_text: "Number of count that old chunk were discarded with drop_oldest_chunk")

if has_buffer_section
unless implement?(:buffered) || implement?(:delayed_commit)
Expand Down Expand Up @@ -977,6 +979,7 @@ def write_guard(&block)
if oldest
log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id)
@buffer.purge_chunk(oldest.unique_id)
@drop_oldest_chunk_count_metrics.inc
else
log.error "no queued chunks to be dropped for drop_oldest_chunk"
end
Expand Down Expand Up @@ -1575,6 +1578,7 @@ def statistics
'rollback_count' => @rollback_count_metrics.get,
'slow_flush_count' => @slow_flush_count_metrics.get,
'flush_time_count' => @flush_time_count_metrics.get,
'drop_oldest_chunk_count' => @drop_oldest_chunk_count_metrics.get,
}

if @buffer && @buffer.respond_to?(:statistics)
Expand Down
10 changes: 10 additions & 0 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def test_enable_input_metrics(with_config)
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config
error_label_info = {
Expand All @@ -208,6 +209,7 @@ def test_enable_input_metrics(with_config)
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config
opts = {with_config: with_config}
Expand Down Expand Up @@ -326,6 +328,7 @@ def test_enable_input_metrics(with_config)
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config
error_label_info = {
Expand All @@ -349,6 +352,7 @@ def test_enable_input_metrics(with_config)
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config
opts = {with_config: with_config}
Expand Down Expand Up @@ -424,6 +428,7 @@ def test_enable_input_metrics(with_config)
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
expect_test_out_record = {
"plugin_id" => "test_out",
Expand All @@ -439,6 +444,7 @@ def test_enable_input_metrics(with_config)
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
assert_fuzzy_equal(expect_relabel_record, d.events[1][2])
assert_fuzzy_equal(expect_test_out_record, d.events[3][2])
Expand Down Expand Up @@ -578,6 +584,7 @@ def get(uri, header = {})
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
Expand Down Expand Up @@ -643,6 +650,7 @@ def get(uri, header = {})
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
expected_null_response.merge!("retry" => {}) if with_retry
Expand Down Expand Up @@ -693,6 +701,7 @@ def get(uri, header = {})
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
"drop_oldest_chunk_count" => Integer,
}
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body)
test_in_response = response["plugins"][0]
Expand Down Expand Up @@ -825,6 +834,7 @@ def write(chunk)
"rollback_count" => Integer,
'slow_flush_count' => Integer,
'flush_time_count' => Integer,
"drop_oldest_chunk_count" => Integer,
}
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
# flush few times to check steps
Expand Down
4 changes: 3 additions & 1 deletion test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ def waiting(seconds)
@i.configure(config_element())

%w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics write_count_metrics
write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name|
write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics
drop_oldest_chunk_count_metrics].each do |metric_name|
assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics)
end

Expand All @@ -241,6 +242,7 @@ def waiting(seconds)
assert_equal 0, @i.rollback_count
assert_equal 0, @i.flush_time_count
assert_equal 0, @i.slow_flush_count
assert_equal 0, @i.drop_oldest_chunk_count
end

data(:new_api => :chunk,
Expand Down
1 change: 1 addition & 0 deletions test/plugin/test_output_as_buffered_overflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def waiting(seconds)
logs = @i.log.out.logs
assert{ logs.any?{|line| line.include?("failed to write data into buffer by buffer overflow") } }
assert{ logs.any?{|line| line.include?("dropping oldest chunk to make space after buffer overflow") } }
assert{ @i.drop_oldest_chunk_count > 0 }
end

test '#emit_events raises OverflowError if all buffer spaces are used by staged chunks' do
Expand Down