Skip to content

Commit 6679d83

Browse files
committed
output: add metrics for number of stored chunks in secondary
Signed-off-by: Shizuo Fujita <[email protected]>
1 parent a3f4c82 commit 6679d83

File tree

3 files changed

+15
-0
lines changed

3 files changed

+15
-0
lines changed

lib/fluent/plugin/output.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ def initialize
213213
@rollback_count_metrics = nil
214214
@flush_time_count_metrics = nil
215215
@slow_flush_count_metrics = nil
216+
@secondary_chunk_count_metrics = nil
216217
@enable_size_metrics = false
217218

218219
# How to process events is decided here at once, but it will be decided in delayed way on #configure & #start
@@ -281,6 +282,7 @@ def configure(conf)
281282
@rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
282283
@flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
283284
@slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")
285+
@secondary_chunk_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "secondary_chunk_count", help_text: "Number of stored chunks in secondary")
284286

285287
if has_buffer_section
286288
unless implement?(:buffered) || implement?(:delayed_commit)
@@ -1117,6 +1119,7 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
11171119
if @retry # success to flush chunks in retries
11181120
if secondary
11191121
log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id)
1122+
@secondary_chunk_count_metrics.inc
11201123
else
11211124
log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id)
11221125
end
@@ -1594,6 +1597,7 @@ def statistics
15941597
'rollback_count' => @rollback_count_metrics.get,
15951598
'slow_flush_count' => @slow_flush_count_metrics.get,
15961599
'flush_time_count' => @flush_time_count_metrics.get,
1600+
'secondary_chunk_count' => @secondary_chunk_count_metrics.get,
15971601
}
15981602

15991603
if @buffer && @buffer.respond_to?(:statistics)

test/plugin/test_in_monitor_agent.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ def test_enable_input_metrics(with_config)
184184
"rollback_count" => Integer,
185185
"slow_flush_count" => Integer,
186186
"flush_time_count" => Integer,
187+
"secondary_chunk_count" => Integer,
187188
}
188189
output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config
189190
error_label_info = {
@@ -206,6 +207,7 @@ def test_enable_input_metrics(with_config)
206207
"rollback_count" => Integer,
207208
"slow_flush_count" => Integer,
208209
"flush_time_count" => Integer,
210+
"secondary_chunk_count" => Integer,
209211
}
210212
error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config
211213
opts = {with_config: with_config}
@@ -323,6 +325,7 @@ def test_enable_input_metrics(with_config)
323325
"rollback_count" => Integer,
324326
"slow_flush_count" => Integer,
325327
"flush_time_count" => Integer,
328+
"secondary_chunk_count" => Integer,
326329
}
327330
output_info.merge!("config" => {"@id" => "test_out", "@type" => "test_out"}) if with_config
328331
error_label_info = {
@@ -345,6 +348,7 @@ def test_enable_input_metrics(with_config)
345348
"rollback_count" => Integer,
346349
"slow_flush_count" => Integer,
347350
"flush_time_count" => Integer,
351+
"secondary_chunk_count" => Integer,
348352
}
349353
error_label_info.merge!("config" => {"@id"=>"null", "@type" => "null"}) if with_config
350354
opts = {with_config: with_config}
@@ -419,6 +423,7 @@ def test_enable_input_metrics(with_config)
419423
"rollback_count" => Integer,
420424
"slow_flush_count" => Integer,
421425
"flush_time_count" => Integer,
426+
"secondary_chunk_count" => Integer,
422427
}
423428
expect_test_out_record = {
424429
"plugin_id" => "test_out",
@@ -433,6 +438,7 @@ def test_enable_input_metrics(with_config)
433438
"rollback_count" => Integer,
434439
"slow_flush_count" => Integer,
435440
"flush_time_count" => Integer,
441+
"secondary_chunk_count" => Integer,
436442
}
437443
assert_fuzzy_equal(expect_relabel_record, d.events[1][2])
438444
assert_fuzzy_equal(expect_test_out_record, d.events[3][2])
@@ -571,6 +577,7 @@ def get(uri, header = {})
571577
"rollback_count" => Integer,
572578
"slow_flush_count" => Integer,
573579
"flush_time_count" => Integer,
580+
"secondary_chunk_count" => Integer,
574581
}
575582
expected_null_response.merge!("config" => {"@id" => "null", "@type" => "null"}) if with_config
576583
expected_null_response.merge!("retry" => {}) if with_retry
@@ -685,6 +692,7 @@ def get(uri, header = {})
685692
"rollback_count" => Integer,
686693
"slow_flush_count" => Integer,
687694
"flush_time_count" => Integer,
695+
"secondary_chunk_count" => Integer,
688696
}
689697
response = JSON.parse(get("http://127.0.0.1:#{@port}/api/plugins.json?with_config=no&with_retry=no&with_ivars=id,num_errors").body)
690698
test_in_response = response["plugins"][0]
@@ -816,6 +824,7 @@ def write(chunk)
816824
"rollback_count" => Integer,
817825
'slow_flush_count' => Integer,
818826
'flush_time_count' => Integer,
827+
"secondary_chunk_count" => Integer,
819828
}
820829
output.emit_events('test.tag', Fluent::ArrayEventStream.new([[event_time, {"message" => "test failed flush 1"}]]))
821830
# flush few times to check steps

test/test_plugin_classes.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ def initialize
154154
@rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new
155155
@flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new
156156
@slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new
157+
@secondary_chunk_count_metrics = FluentTest::FluentTestCounterMetrics.new
157158
end
158159

159160
attr_reader :events
@@ -284,6 +285,7 @@ def initialize
284285
@rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new
285286
@flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new
286287
@slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new
288+
@secondary_chunk_count_metrics = FluentTest::FluentTestCounterMetrics.new
287289
end
288290

289291
def format(tag, time, record)

0 commit comments

Comments
 (0)