diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index c5c4684089..b3dfa4145f 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -163,6 +163,10 @@ cmd_opts[:enable_size_metrics] = b } +op.on('--maxstdio NUMBER', "specify maxstdio number for increasing the number of opening files at once") {|s| + cmd_opts[:maxstdio] = s.to_i +} + op.on('-v', '--verbose', "increase verbose level (-v: debug, -vv: trace)", TrueClass) {|b| return unless b cur_level = cmd_opts.fetch(:log_level, default_opts[:log_level]) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 6cb60916f5..335cc6b60e 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -87,6 +87,7 @@ def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false) @source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel) @source_only_buffer_agent = nil @enable_input_metrics = system_config.enable_input_metrics || false + @maxstdio = system_config.maxstdio suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 34d9dadceb..654ac53dfc 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -641,6 +641,7 @@ def self.default_options conf_encoding: 'utf-8', disable_shared_socket: nil, config_file_type: :guess, + maxstdio: nil, } end @@ -681,6 +682,7 @@ def initialize(cl_opt) @log_path = opt[:log_path] @log_rotate_age = opt[:log_rotate_age] @log_rotate_size = opt[:log_rotate_size] + @maxstdio = opt[:maxstdio] @finished = false end @@ -754,6 +756,14 @@ def options } end + def setup_maxstdio + if Fluent.windows? + require "fluent/win32api" + Win32API._setmaxstdio(@maxstdio) + $log.debug "Current maxstdio is #{Win32API._getmaxstdio}" + end + end + def run_worker Process.setproctitle("worker:#{@system_config.process_name}") if @process_name @@ -766,6 +776,7 @@ def run_worker end install_main_process_signal_handlers + setup_maxstdio # This is the only log messsage for @standalone_worker $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid, ruby: RUBY_VERSION if @standalone_worker diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 630a8ae28f..5fec28b112 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -30,7 +30,7 @@ class SystemConfig :file_permission, :dir_permission, :counter_server, :counter_client, :strict_config_value, :enable_msgpack_time_support, :disable_shared_socket, :metrics, :enable_input_metrics, :enable_size_metrics, :enable_jit, :source_only_buffer, - :config_include_dir + :config_include_dir, :maxstdio ] config_param :workers, :integer, default: 1 @@ -61,6 +61,7 @@ class SystemConfig v.to_i(8) end config_param :config_include_dir, default: Fluent::DEFAULT_CONFIG_INCLUDE_DIR + config_param :maxstdio, :integer, default: nil config_section :log, required: false, init: true, multi: false do config_param :path, :string, default: nil config_param :format, :enum, list: [:text, :json], default: :text diff --git a/lib/fluent/win32api.rb b/lib/fluent/win32api.rb index d7757a8370..41621d7660 100644 --- a/lib/fluent/win32api.rb +++ b/lib/fluent/win32api.rb @@ -34,5 +34,7 @@ module Win32API extern "intptr_t _get_osfhandle(int)" extern "BOOL GetFileInformationByHandle(HANDLE, void *)" extern "BOOL GetFileInformationByHandleEx(HANDLE, int, void *, DWORD)" + extern 'int _setmaxstdio(int)' + extern 'int _getmaxstdio(void)' end if Fluent.windows? end diff --git a/test/config/test_system_config.rb b/test/config/test_system_config.rb index 9a37a3d272..590c0f9166 100644 --- a/test/config/test_system_config.rb +++ b/test/config/test_system_config.rb @@ -80,6 +80,7 @@ def parse_text(text) assert_nil(sc.enable_msgpack_time_support) assert(!sc.enable_jit) assert_nil(sc.log.path) + assert_nil(sc.maxstdio) assert_equal(:text, sc.log.format) assert_equal('%Y-%m-%d %H:%M:%S %z', sc.log.time_format) end @@ -100,6 +101,7 @@ def parse_text(text) 'enable_input_metrics' => ['enable_input_metrics', true], 'enable_size_metrics' => ['enable_size_metrics', true], 'enable_jit' => ['enable_jit', true], + 'maxstdio' => ['maxstdio', 768], ) test "accepts parameters" do |(k, v)| conf = parse_text(<<-EOS) diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index fd89ca8ce4..fa44ea39d9 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -1160,6 +1160,32 @@ def test_stop_parallel_old_supervisor_after_delay end end + data("Default", {}) + data("Small", {maxstdio: 100}) + data("Large", {maxstdio: 3000}) + data("Very large", {maxstdio: 10000}) + def test_maxstdio(cl_opt) + # TODO assert + + omit "maxstdio is only for Windows" unless Fluent.windows? + + supervisor = Fluent::Supervisor.new(cl_opt) + supervisor.setup_maxstdio + + files = [] + begin + 10000.times do |i| + file = File.open(File.join(@tmp_dir, "#{i}.txt"), "w") + files.append(file) + end + ensure + puts files.length + files.each do |file| + file.close + end + end + end + def create_debug_dummy_logger dl_opts = {} dl_opts[:log_level] = ServerEngine::DaemonLogger::DEBUG