diff --git a/lib/dalli/server.rb b/lib/dalli/server.rb index 62508891..13880613 100644 --- a/lib/dalli/server.rb +++ b/lib/dalli/server.rb @@ -45,6 +45,15 @@ class Server ALLOWED_MULTI_OPS = %i[set setq delete deleteq add addq replace replaceq].freeze + # Maximum number of unread quiet-mode write ACKs to drain via noop before + # giving up and closing the connection instead. Each ACK read is bounded by + # socket_timeout (default 450 ms), so above this threshold the drain could + # hold ring.lock for MAX_SAFE_DRAIN_COUNT * socket_timeout seconds — long + # enough to exceed a Rack timeout. Below the threshold the drain is fast + # (typically < 5 ms total) and avoids a TCP reconnect on every read that + # follows a write. + MAX_SAFE_DRAIN_COUNT = 10 + def initialize(attribs, options = {}) @hostname, @port, @weight, @socket_type = parse_hostname(attribs) @fail_count = 0 @@ -57,6 +66,7 @@ def initialize(attribs, options = {}) @pid = nil @inprogress = nil @pending_multi_response = nil + @pending_write_count = 0 end def name @@ -73,10 +83,23 @@ def request(op, *args) raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive? @inprogress = true begin - # if we have exited a multi block, flush any responses that might still be pending + # if we have exited a multi block, discard any pending write ACKs if @pending_multi_response && (!multi? || !ALLOWED_MULTI_OPS.include?(op)) - noop @pending_multi_response = false + if @pending_write_count > MAX_SAFE_DRAIN_COUNT + # Too many pending ACKs to drain safely. Draining blocks ring.lock + # for up to pending_write_count * socket_timeout seconds — long + # enough to exceed a Rack timeout. Close instead: the write ACKs + # are fire-and-forget from quiet mode and were never intended to be + # read, so discarding them is semantically correct. + @pending_write_count = 0 + close + raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive? + @inprogress = true + else + @pending_write_count = 0 + noop + end end result = send(op, *args) @inprogress = false @@ -128,6 +151,7 @@ def close @sock = nil @pid = nil @inprogress = false + @pending_write_count = 0 end def lock! @@ -153,8 +177,17 @@ def compressor def multi_response_start(keys) verify_state if @pending_multi_response - noop @pending_multi_response = false + if @pending_write_count > MAX_SAFE_DRAIN_COUNT + # See #request for the rationale: close when the pending count is large + # enough that the noop drain would block ring.lock for too long. + @pending_write_count = 0 + close + raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive? + else + @pending_write_count = 0 + noop + end end @inprogress = true send_multiget(keys) @@ -318,8 +351,12 @@ def set(key, value, ttl, cas, options) guard_max_value(key, value) do req = [REQUEST, OPCODES[multi? ? :setq : :set], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, cas, flags, ttl, key, value].pack(FORMAT[:set]) write(req) - @pending_multi_response ||= multi? - cas_response unless multi? + if multi? + @pending_multi_response = true + @pending_write_count += 1 + else + cas_response + end end end @@ -330,8 +367,12 @@ def add(key, value, ttl, options) guard_max_value(key, value) do req = [REQUEST, OPCODES[multi? ? :addq : :add], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, 0, flags, ttl, key, value].pack(FORMAT[:add]) write(req) - @pending_multi_response ||= multi? - cas_response unless multi? + if multi? + @pending_multi_response = true + @pending_write_count += 1 + else + cas_response + end end end @@ -342,16 +383,24 @@ def replace(key, value, ttl, cas, options) guard_max_value(key, value) do req = [REQUEST, OPCODES[multi? ? :replaceq : :replace], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, cas, flags, ttl, key, value].pack(FORMAT[:replace]) write(req) - @pending_multi_response ||= multi? - cas_response unless multi? + if multi? + @pending_multi_response = true + @pending_write_count += 1 + else + cas_response + end end end def delete(key, cas) req = [REQUEST, OPCODES[multi? ? :deleteq : :delete], key.bytesize, 0, 0, 0, key.bytesize, 0, cas, key].pack(FORMAT[:delete]) write(req) - @pending_multi_response ||= multi? - generic_response unless multi? + if multi? + @pending_multi_response = true + @pending_write_count += 1 + else + generic_response + end end def flush(ttl) diff --git a/test/test_server.rb b/test/test_server.rb index 90126d5a..e86ad7c3 100644 --- a/test/test_server.rb +++ b/test/test_server.rb @@ -136,18 +136,43 @@ end end - it 'flushes pending_multi_response before writing' do + it 'drains pending_multi_response with noop when write count is at or below threshold' do memcached_persistent do |dc, port| ring = dc.send(:ring) s = ring.servers.first assert s.alive? + original_sock = s.sock s.instance_variable_set(:@pending_multi_response, true) + s.instance_variable_set(:@pending_write_count, Dalli::Server::MAX_SAFE_DRAIN_COUNT) + s.expects(:noop).once + s.multi_response_start(['somekey']) + + assert_equal false, s.instance_variable_get(:@pending_multi_response) + assert_equal 0, s.instance_variable_get(:@pending_write_count) + assert_equal original_sock, s.sock, "Socket should be reused when write count is within threshold" + + s.multi_response_abort + end + end + + it 'closes and reconnects when pending write count exceeds threshold' do + memcached_persistent do |dc, port| + ring = dc.send(:ring) + s = ring.servers.first + assert s.alive? + + original_sock = s.sock + s.instance_variable_set(:@pending_multi_response, true) + s.instance_variable_set(:@pending_write_count, Dalli::Server::MAX_SAFE_DRAIN_COUNT + 1) s.multi_response_start(['somekey']) assert_equal false, s.instance_variable_get(:@pending_multi_response) + assert_equal 0, s.instance_variable_get(:@pending_write_count) + refute_nil s.sock, "Socket should be open after reconnect" + refute_equal original_sock, s.sock, "Socket should be a new connection when write count exceeds threshold" s.multi_response_abort end @@ -180,6 +205,35 @@ end end + describe 'pending_write_count' do + it 'increments on quiet set and resets after drain' do + memcached_persistent do |dc| + ring = dc.send(:ring) + s = ring.servers.first + assert s.alive? + + original_dalli_multi = Thread.current[:dalli_multi] + begin + Thread.current[:dalli_multi] = true + s.request(:set, 'k1', 'v', 100, 0, {}) + s.request(:set, 'k2', 'v', 100, 0, {}) + s.request(:set, 'k3', 'v', 100, 0, {}) + ensure + Thread.current[:dalli_multi] = original_dalli_multi + end + + assert_equal 3, s.instance_variable_get(:@pending_write_count) + assert_equal true, s.instance_variable_get(:@pending_multi_response) + + # A subsequent read (below threshold) drains with noop, resets count + s.request(:get, 'k1') + + assert_equal 0, s.instance_variable_get(:@pending_write_count) + assert_equal false, s.instance_variable_get(:@pending_multi_response) + end + end + end + describe 'request error handling' do it 'closes socket on Timeout::Error and re-raises' do memcached_persistent do |dc| @@ -198,6 +252,46 @@ end end + it 'drains pending_multi_response with noop when write count is at or below threshold' do + memcached_persistent do |dc| + ring = dc.send(:ring) + s = ring.servers.first + assert s.alive? + + original_sock = s.sock + s.instance_variable_set(:@pending_multi_response, true) + s.instance_variable_set(:@pending_write_count, Dalli::Server::MAX_SAFE_DRAIN_COUNT) + + s.expects(:noop).once + result = s.request(:get, 'somekey') + + assert_equal false, s.instance_variable_get(:@pending_multi_response) + assert_equal 0, s.instance_variable_get(:@pending_write_count) + assert_equal original_sock, s.sock, "Socket should be reused when write count is within threshold" + assert_nil result, "GET on non-existent key should return nil" + end + end + + it 'closes and reconnects when pending write count exceeds threshold before a non-multi op' do + memcached_persistent do |dc| + ring = dc.send(:ring) + s = ring.servers.first + assert s.alive? + + original_sock = s.sock + s.instance_variable_set(:@pending_multi_response, true) + s.instance_variable_set(:@pending_write_count, Dalli::Server::MAX_SAFE_DRAIN_COUNT + 1) + + result = s.request(:get, 'somekey') + + assert_equal false, s.instance_variable_get(:@pending_multi_response) + assert_equal 0, s.instance_variable_get(:@pending_write_count) + refute_nil s.sock, "Socket should be open after reconnect" + refute_equal original_sock, s.sock, "Socket should be a new connection when write count exceeds threshold" + assert_nil result, "GET on non-existent key should return nil" + end + end + it 'returns false on MarshalError' do memcached_persistent do |dc| ring = dc.send(:ring)