Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def initialize
@emit_records_metrics = nil
@emit_size_metrics = nil
@write_count_metrics = nil
@write_secondary_count_metrics = nil
@rollback_count_metrics = nil
@flush_time_count_metrics = nil
@slow_flush_count_metrics = nil
Expand Down Expand Up @@ -254,6 +255,7 @@ def configure(conf)
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events")
@write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events")
@write_secondary_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_secondary_count", help_text: "Number of writing events in secondary")
@rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
@flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
@slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")
Expand Down Expand Up @@ -1188,6 +1190,7 @@ def try_flush
if output.delayed_commit
log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
@write_count_metrics.inc
@write_secondary_count_metrics.inc if using_secondary
@dequeued_chunks_mutex.synchronize do
# delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>)
@dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout)
Expand All @@ -1200,6 +1203,7 @@ def try_flush
dump_chunk_id = dump_unique_id_hex(chunk_id)
log.trace "adding write count", instance: self.object_id
@write_count_metrics.inc
@write_secondary_count_metrics.inc if using_secondary
log.trace "executing sync write", chunk: dump_chunk_id

output.write(chunk)
Expand Down Expand Up @@ -1567,6 +1571,7 @@ def statistics
'retry_count' => @num_errors_metrics.get,
'emit_count' => @emit_count_metrics.get,
'write_count' => @write_count_metrics.get,
'write_secondary_count' => @write_secondary_count_metrics.get,
'rollback_count' => @rollback_count_metrics.get,
'slow_flush_count' => @slow_flush_count_metrics.get,
'flush_time_count' => @flush_time_count_metrics.get,
Expand Down
9 changes: 9 additions & 0 deletions test/plugin/test_in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def test_enable_input_metrics(with_config)
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
Expand All @@ -203,6 +204,7 @@ def test_enable_input_metrics(with_config)
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
Expand Down Expand Up @@ -320,6 +322,7 @@ def test_enable_input_metrics(with_config)
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
Expand All @@ -342,6 +345,7 @@ def test_enable_input_metrics(with_config)
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
Expand Down Expand Up @@ -416,6 +420,7 @@ def test_enable_input_metrics(with_config)
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
Expand All @@ -430,6 +435,7 @@ def test_enable_input_metrics(with_config)
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
Expand Down Expand Up @@ -568,6 +574,7 @@ def get(uri, header = {})
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
Expand Down Expand Up @@ -682,6 +689,7 @@ def get(uri, header = {})
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
"slow_flush_count" => Integer,
"flush_time_count" => Integer,
Expand Down Expand Up @@ -813,6 +821,7 @@ def write(chunk)
"emit_records" => Integer,
"emit_size" => Integer,
"write_count" => Integer,
"write_secondary_count" => Integer,
"rollback_count" => Integer,
'slow_flush_count' => Integer,
'flush_time_count' => Integer,
Expand Down
11 changes: 11 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1395,4 +1395,15 @@ def plugin_id_for_test?
end
end
end

test 'can use metrics plugins and fallback methods' do
@d = create_driver

%w[healthy_nodes_count_metrics registered_nodes_count_metrics].each do |metric_name|
assert_true @d.instance.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics)
end

assert_equal 0, @d.instance.healthy_nodes_count
assert_equal 0, @d.instance.registered_nodes_count
end
end
8 changes: 6 additions & 2 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,21 @@ def waiting(seconds)
test 'can use metrics plugins and fallback methods' do
@i.configure(config_element())

%w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics
write_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name|
%w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics write_count_metrics
write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name|
assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics)
end

assert_equal 0, @i.num_errors
assert_equal 0, @i.emit_count
assert_equal 0, @i.emit_records
assert_equal 0, @i.emit_size
assert_equal 0, @i.emit_records
assert_equal 0, @i.write_count
assert_equal 0, @i.write_secondary_count
assert_equal 0, @i.rollback_count
assert_equal 0, @i.flush_time_count
assert_equal 0, @i.slow_flush_count
end

data(:new_api => :chunk,
Expand Down
8 changes: 8 additions & 0 deletions test/plugin/test_output_as_buffered_secondary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def dummy_event_stream

assert_equal 0, @i.write_count
assert_equal 0, @i.num_errors
assert_equal 0, @i.write_secondary_count

@i.enqueue_thread_wait
@i.flush_thread_wakeup
Expand Down Expand Up @@ -348,6 +349,8 @@ def dummy_event_stream
assert{ @i.write_count > prev_write_count }
assert{ @i.num_errors == prev_num_errors }

assert{ @i.write_secondary_count > 0 }

assert_nil @i.retry

assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0]
Expand Down Expand Up @@ -443,6 +446,8 @@ def dummy_event_stream
assert{ @i.buffer.dequeued[chunks[0].unique_id].nil? }
assert{ chunks.first.empty? }

assert{ @i.write_secondary_count > 0 }

assert_nil @i.retry

logs = @i.log.out.logs
Expand Down Expand Up @@ -737,6 +742,7 @@ def dummy_event_stream

assert_equal 0, @i.write_count
assert_equal 0, @i.num_errors
assert_equal 0, @i.write_secondary_count

@i.enqueue_thread_wait
@i.flush_thread_wakeup
Expand Down Expand Up @@ -765,6 +771,8 @@ def dummy_event_stream
prev_write_count = @i.write_count
end

assert{ @i.write_secondary_count > 0 }

# retry_timeout == 60(sec), retry_secondary_threshold == 0.8

assert{ now >= first_failure + 60 * 0.8 }
Expand Down
2 changes: 2 additions & 0 deletions test/test_plugin_classes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def initialize
@emit_records_metrics = FluentTest::FluentTestCounterMetrics.new
@emit_size_metrics = FluentTest::FluentTestCounterMetrics.new
@write_count_metrics = FluentTest::FluentTestCounterMetrics.new
@write_secondary_count_metrics = FluentTest::FluentTestCounterMetrics.new
@rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new
@flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new
@slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new
Expand Down Expand Up @@ -281,6 +282,7 @@ def initialize
@emit_records_metrics = FluentTest::FluentTestCounterMetrics.new
@emit_size_metrics = FluentTest::FluentTestCounterMetrics.new
@write_count_metrics = FluentTest::FluentTestCounterMetrics.new
@write_secondary_count_metrics = FluentTest::FluentTestCounterMetrics.new
@rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new
@flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new
@slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new
Expand Down