diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index b864bf5ca5..696e3fe9c5 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -190,6 +190,7 @@ def initialize @rollback_count_metrics = nil @flush_time_count_metrics = nil @slow_flush_count_metrics = nil + @drop_oldest_chunk_count_metrics = nil @enable_size_metrics = false # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start @@ -259,6 +260,7 @@ def configure(conf) @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") + @drop_oldest_chunk_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "drop_oldest_chunk_count", help_text: "Number of count that old chunk were discarded with drop_oldest_chunk") if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) @@ -977,6 +979,7 @@ def write_guard(&block) if oldest log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id) @buffer.purge_chunk(oldest.unique_id) + @drop_oldest_chunk_count_metrics.inc else log.error "no queued chunks to be dropped for drop_oldest_chunk" end @@ -1575,6 +1578,7 @@ def statistics 'rollback_count' => @rollback_count_metrics.get, 'slow_flush_count' => @slow_flush_count_metrics.get, 'flush_time_count' => @flush_time_count_metrics.get, + 'drop_oldest_chunk_count' => @drop_oldest_chunk_count_metrics.get, } if @buffer && @buffer.respond_to?(:statistics) diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index d5d694fe2c..39bb5ff178 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -185,6 +185,7 @@ def test_enable_input_metrics(with_config) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { @@ -208,6 +209,7 @@ def test_enable_input_metrics(with_config) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config opts = {with_config: with_config} @@ -326,6 +328,7 @@ def test_enable_input_metrics(with_config) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config error_label_info = { @@ -349,6 +352,7 @@ def test_enable_input_metrics(with_config) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config opts = {with_config: with_config} @@ -424,6 +428,7 @@ def test_enable_input_metrics(with_config) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } expect_test_out_record = { "plugin_id" => "test_out", @@ -439,6 +444,7 @@ def test_enable_input_metrics(with_config) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } assert_fuzzy_equal(expect_relabel_record, d.events[1][2]) assert_fuzzy_equal(expect_test_out_record, d.events[3][2]) @@ -578,6 +584,7 @@ def get(uri, header = {}) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry @@ -643,6 +650,7 @@ def get(uri, header = {}) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config expected_null_response.merge!("retry" => {}) if with_retry @@ -693,6 +701,7 @@ def get(uri, header = {}) "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, + "drop_oldest_chunk_count" => Integer, } response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body) test_in_response = response["plugins"][0] @@ -825,6 +834,7 @@ def write(chunk) "rollback_count" => Integer, 'slow_flush_count' => Integer, 'flush_time_count' => Integer, + "drop_oldest_chunk_count" => Integer, } output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]])) # flush few times to check steps diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index db260b79d8..8fb1d188cd 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -227,7 +227,8 @@ def waiting(seconds) @i.configure(config_element()) %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics write_count_metrics - write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name| + write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics + drop_oldest_chunk_count_metrics].each do |metric_name| assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) end @@ -241,6 +242,7 @@ def waiting(seconds) assert_equal 0, @i.rollback_count assert_equal 0, @i.flush_time_count assert_equal 0, @i.slow_flush_count + assert_equal 0, @i.drop_oldest_chunk_count end data(:new_api => :chunk, diff --git a/test/plugin/test_output_as_buffered_overflow.rb b/test/plugin/test_output_as_buffered_overflow.rb index f559bdbe42..668558fcbe 100644 --- a/test/plugin/test_output_as_buffered_overflow.rb +++ b/test/plugin/test_output_as_buffered_overflow.rb @@ -217,6 +217,7 @@ def waiting(seconds) logs = @i.log.out.logs assert{ logs.any?{|line| line.include?("failed to write data into buffer by buffer overflow") } } assert{ logs.any?{|line| line.include?("dropping oldest chunk to make space after buffer overflow") } } + assert{ @i.drop_oldest_chunk_count > 0 } end test '#emit_events raises OverflowError if all buffer spaces are used by staged chunks' do