Skip to content

Commit ac15ccf

Browse files
committed
Reduce YAML load
Before: * Load on each resource * Load/Dump on each resource if necessary After: * Load on each resource_uri_prefix * Load/Dump on each resource if necessary
1 parent fff7919 commit ac15ccf

File tree

4 files changed

+91
-17
lines changed

4 files changed

+91
-17
lines changed

exe/triglav-agent-hdfs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Triglav::Agent::Configuration.configure do |config|
66
# config.cli_class = Triglav::Agent::Hdfs::CLI
77
# config.setting_class = Triglav::Agent::Hdfs::Setting
88
# config.worker_module = Triglav::Agent::Hdfs::Worker
9-
# config.processor_class = Triglav::Agent::Hdfs::Processor
9+
config.processor_class = Triglav::Agent::Hdfs::Processor
1010
config.monitor_class = Triglav::Agent::Hdfs::Monitor
1111
config.connection_class = Triglav::Agent::Hdfs::Connection
1212
end

lib/triglav/agent/hdfs.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ module Hdfs
99
require 'triglav/agent/hdfs/connection'
1010
require 'triglav/agent/hdfs/version'
1111
require 'triglav/agent/hdfs/monitor'
12+
require 'triglav/agent/hdfs/processor'

lib/triglav/agent/hdfs/monitor.rb

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,50 +16,48 @@ class Monitor < Base::Monitor
1616
# unit: 'daily', 'hourly', or 'singular'
1717
# timezone: '+09:00'
1818
# span_in_days: 32
19-
def initialize(connection, resource_uri_prefix, resource)
19+
# @param [Hash] last_modification_times for a resource
20+
def initialize(connection, resource_uri_prefix, resource, last_modification_times)
2021
@connection = connection
2122
@resource_uri_prefix = resource_uri_prefix
2223
@resource = resource
2324
@status = Triglav::Agent::Status.new(resource_uri_prefix, resource.uri)
24-
@last_modification_times = get_last_modification_times
25+
@last_modification_times = get_last_modification_times(last_modification_times)
2526
end
2627

2728
def process
2829
unless resource_valid?
2930
$logger.warn { "Broken resource: #{resource.to_s}" }
30-
return nil
31+
return [nil, nil]
3132
end
32-
$logger.debug { "Start process #{resource.uri}" }
33+
started = Time.now
34+
$logger.debug { "Start Monitor#process #{resource.uri}" }
3335

3436
events, new_last_modification_times = get_events
3537

36-
$logger.debug { "Finish process #{resource.uri}" }
38+
elapsed = Time.now - started
39+
$logger.debug { "Finish Monitor#process #{resource.uri} elapsed:#{elapsed.to_f}" }
3740

38-
return nil if events.nil? || events.empty?
39-
yield(events) if block_given? # send_message
40-
update_status_file(new_last_modification_times)
41-
true
41+
return [nil, nil] if events.nil? or events.empty?
42+
[events, new_last_modification_times]
4243
end
4344

4445
private
4546

4647
def get_events
4748
new_last_modification_times = get_new_last_modification_times
4849
latest_files = select_latest_files(new_last_modification_times)
50+
new_last_modification_times[:max] = new_last_modification_times.values.max
4951
events = build_events(latest_files)
5052
[events, new_last_modification_times]
5153
rescue => e
5254
$logger.warn { "#{e.class} #{e.message} #{e.backtrace.join("\n ")}" }
5355
nil
5456
end
5557

56-
def update_status_file(last_modification_times)
57-
last_modification_times[:max] = last_modification_times.values.max
58-
@status.set(last_modification_times)
59-
end
60-
61-
def get_last_modification_times
62-
last_modification_times = @status.get || {}
58+
def get_last_modification_times(last_modification_times)
59+
last_modification_times ||= {}
60+
# ToDo: want to remove accessing Status in Monitor class
6361
max_last_modification_time = last_modification_times[:max] || @status.getsetnx([:max], $setting.debug? ? 0 : get_current_time)
6462
removes = last_modification_times.keys - paths.keys
6563
appends = paths.keys - last_modification_times.keys
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
require 'triglav/agent/base/processor'
2+
3+
module Triglav::Agent
4+
module Hdfs
5+
class Processor < Base::Processor
6+
def process
7+
before_process
8+
success_count = 0
9+
consecutive_error_count = 0
10+
Parallel.each(resources, parallel_opts) do |resource|
11+
raise Parallel::Break if stopped?
12+
events = nil
13+
new_resource_statuses = nil
14+
begin
15+
@connection_pool.with do |connection|
16+
resource_statuses = get_resource_statuses(resource)
17+
monitor = monitor_class.new(
18+
connection, resource_uri_prefix, resource, resource_statuses
19+
)
20+
events, new_resource_statuses = monitor.process
21+
end
22+
if events
23+
$logger.info { "send_messages:#{events.map(&:to_hash).to_json}" }
24+
@api_client_pool.with {|api_client| api_client.send_messages(events) }
25+
end
26+
@mutex.synchronize do
27+
set_resource_statuses(new_resource_statuses, resource) if new_resource_statuses
28+
success_count += 1
29+
consecutive_error_count = 0
30+
end
31+
rescue => e
32+
log_error(e)
33+
$logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events
34+
@mutex.synchronize do
35+
raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count
36+
end
37+
end
38+
end
39+
success_count
40+
ensure
41+
after_process
42+
end
43+
44+
private
45+
46+
def before_process
47+
super
48+
started = Time.now
49+
@resource_uri_prefix_statuses = Triglav::Agent::Status.new(resource_uri_prefix).get
50+
elapsed = Time.now - started
51+
$logger.info { "Read status #{resource_uri_prefix} #{elapsed.to_f}sec" }
52+
@started = Time.now
53+
$logger.info { "Start Processor#process #{resource_uri_prefix}" }
54+
end
55+
56+
def after_process
57+
super
58+
elapsed = Time.now - @started
59+
$logger.info { "Finish Processor#process #{resource_uri_prefix} elapsed:#{elapsed.to_f}" }
60+
end
61+
62+
def get_resource_statuses(resource)
63+
resource_statuses = @resource_uri_prefix_statuses[resource.uri.to_sym]
64+
end
65+
66+
def set_resource_statuses(resource_statuses, resource)
67+
started = Time.now
68+
resource_status = Triglav::Agent::Status.new(resource_uri_prefix, resource.uri)
69+
resource_status.set(resource_statuses)
70+
elapsed = Time.now - started
71+
$logger.info { "Store status resource:#{resource.uri} #{elapsed.to_f}sec" }
72+
end
73+
end
74+
end
75+
end

0 commit comments

Comments
 (0)