diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 8870d4782d..b864bf5ca5 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -186,6 +186,7 @@ def initialize @emit_records_metrics = nil @emit_size_metrics = nil @write_count_metrics = nil + @write_secondary_count_metrics = nil @rollback_count_metrics = nil @flush_time_count_metrics = nil @slow_flush_count_metrics = nil @@ -254,6 +255,7 @@ def configure(conf) @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") @write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") + @write_secondary_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_secondary_count", help_text: "Number of writing events in secondary") @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") @@ -1188,6 +1190,7 @@ def try_flush if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) @write_count_metrics.inc + @write_secondary_count_metrics.inc if using_secondary @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in of primary ( don't get ) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) @@ -1200,6 +1203,7 @@ def try_flush dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id @write_count_metrics.inc + @write_secondary_count_metrics.inc if using_secondary log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) @@ -1567,6 +1571,7 @@ def statistics 'retry_count' => @num_errors_metrics.get, 'emit_count' => @emit_count_metrics.get, 'write_count' => @write_count_metrics.get, + 'write_secondary_count' => @write_secondary_count_metrics.get, 'rollback_count' => @rollback_count_metrics.get, 'slow_flush_count' => @slow_flush_count_metrics.get, 'flush_time_count' => @flush_time_count_metrics.get, diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index b2b7e923e6..d5d694fe2c 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -181,6 +181,7 @@ def test_enable_input_metrics(with_config) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, @@ -203,6 +204,7 @@ def test_enable_input_metrics(with_config) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, @@ -320,6 +322,7 @@ def test_enable_input_metrics(with_config) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, @@ -342,6 +345,7 @@ def test_enable_input_metrics(with_config) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, @@ -416,6 +420,7 @@ def test_enable_input_metrics(with_config) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, @@ -430,6 +435,7 @@ def test_enable_input_metrics(with_config) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, @@ -568,6 +574,7 @@ def get(uri, header = {}) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, @@ -682,6 +689,7 @@ def get(uri, header = {}) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, "slow_flush_count" => Integer, "flush_time_count" => Integer, @@ -813,6 +821,7 @@ def write(chunk) "emit_records" => Integer, "emit_size" => Integer, "write_count" => Integer, + "write_secondary_count" => Integer, "rollback_count" => Integer, 'slow_flush_count' => Integer, 'flush_time_count' => Integer, diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 8784c881f9..35d068e6b0 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -1395,4 +1395,15 @@ def plugin_id_for_test? end end end + + test 'can use metrics plugins and fallback methods' do + @d = create_driver + + %w[healthy_nodes_count_metrics registered_nodes_count_metrics].each do |metric_name| + assert_true @d.instance.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + + assert_equal 0, @d.instance.healthy_nodes_count + assert_equal 0, @d.instance.registered_nodes_count + end end diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index 029bb144fc..db260b79d8 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -226,17 +226,21 @@ def waiting(seconds) test 'can use metrics plugins and fallback methods' do @i.configure(config_element()) - %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics - write_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name| + %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics write_count_metrics + write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name| assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) end assert_equal 0, @i.num_errors assert_equal 0, @i.emit_count + assert_equal 0, @i.emit_records assert_equal 0, @i.emit_size assert_equal 0, @i.emit_records assert_equal 0, @i.write_count + assert_equal 0, @i.write_secondary_count assert_equal 0, @i.rollback_count + assert_equal 0, @i.flush_time_count + assert_equal 0, @i.slow_flush_count end data(:new_api => :chunk, diff --git a/test/plugin/test_output_as_buffered_secondary.rb b/test/plugin/test_output_as_buffered_secondary.rb index c908daf74b..15ff633f71 100644 --- a/test/plugin/test_output_as_buffered_secondary.rb +++ b/test/plugin/test_output_as_buffered_secondary.rb @@ -308,6 +308,7 @@ def dummy_event_stream assert_equal 0, @i.write_count assert_equal 0, @i.num_errors + assert_equal 0, @i.write_secondary_count @i.enqueue_thread_wait @i.flush_thread_wakeup @@ -348,6 +349,8 @@ def dummy_event_stream assert{ @i.write_count > prev_write_count } assert{ @i.num_errors == prev_num_errors } + assert{ @i.write_secondary_count > 0 } + assert_nil @i.retry assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0] @@ -443,6 +446,8 @@ def dummy_event_stream assert{ @i.buffer.dequeued[chunks[0].unique_id].nil? } assert{ chunks.first.empty? } + assert{ @i.write_secondary_count > 0 } + assert_nil @i.retry logs = @i.log.out.logs @@ -737,6 +742,7 @@ def dummy_event_stream assert_equal 0, @i.write_count assert_equal 0, @i.num_errors + assert_equal 0, @i.write_secondary_count @i.enqueue_thread_wait @i.flush_thread_wakeup @@ -765,6 +771,8 @@ def dummy_event_stream prev_write_count = @i.write_count end + assert{ @i.write_secondary_count > 0 } + # retry_timeout == 60(sec), retry_secondary_threshold == 0.8 assert{ now >= first_failure + 60 * 0.8 } diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb index a973cfba73..5cfa18ce11 100644 --- a/test/test_plugin_classes.rb +++ b/test/test_plugin_classes.rb @@ -151,6 +151,7 @@ def initialize @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new @write_count_metrics = FluentTest::FluentTestCounterMetrics.new + @write_secondary_count_metrics = FluentTest::FluentTestCounterMetrics.new @rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new @flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new @slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new @@ -281,6 +282,7 @@ def initialize @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new @write_count_metrics = FluentTest::FluentTestCounterMetrics.new + @write_secondary_count_metrics = FluentTest::FluentTestCounterMetrics.new @rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new @flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new @slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new