Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions lib/dalli/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ class Server

ALLOWED_MULTI_OPS = %i[set setq delete deleteq add addq replace replaceq].freeze

# Maximum number of unread quiet-mode write ACKs to drain via noop before
# giving up and closing the connection instead. Each ACK read is bounded by
# socket_timeout (default 450 ms), so above this threshold the drain could
# hold ring.lock for MAX_SAFE_DRAIN_COUNT * socket_timeout seconds — long
# enough to exceed a Rack timeout. Below the threshold the drain is fast
# (typically < 5 ms total) and avoids a TCP reconnect on every read that
# follows a write.
MAX_SAFE_DRAIN_COUNT = 10

def initialize(attribs, options = {})
@hostname, @port, @weight, @socket_type = parse_hostname(attribs)
@fail_count = 0
Expand All @@ -57,6 +66,7 @@ def initialize(attribs, options = {})
@pid = nil
@inprogress = nil
@pending_multi_response = nil
@pending_write_count = 0
end

def name
Expand All @@ -73,10 +83,23 @@ def request(op, *args)
raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive?
@inprogress = true
begin
# if we have exited a multi block, flush any responses that might still be pending
# if we have exited a multi block, discard any pending write ACKs
if @pending_multi_response && (!multi? || !ALLOWED_MULTI_OPS.include?(op))
noop
@pending_multi_response = false
if @pending_write_count > MAX_SAFE_DRAIN_COUNT
# Too many pending ACKs to drain safely. Draining blocks ring.lock
# for up to pending_write_count * socket_timeout seconds — long
# enough to exceed a Rack timeout. Close instead: the write ACKs
# are fire-and-forget from quiet mode and were never intended to be
# read, so discarding them is semantically correct.
@pending_write_count = 0
close
raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive?
@inprogress = true
else
@pending_write_count = 0
noop
end
end
result = send(op, *args)
@inprogress = false
Expand Down Expand Up @@ -128,6 +151,7 @@ def close
@sock = nil
@pid = nil
@inprogress = false
@pending_write_count = 0
end

def lock!
Expand All @@ -153,8 +177,17 @@ def compressor
def multi_response_start(keys)
verify_state
if @pending_multi_response
noop
@pending_multi_response = false
if @pending_write_count > MAX_SAFE_DRAIN_COUNT
# See #request for the rationale: close when the pending count is large
# enough that the noop drain would block ring.lock for too long.
@pending_write_count = 0
close
raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive?
else
@pending_write_count = 0
noop
end
end
@inprogress = true
send_multiget(keys)
Expand Down Expand Up @@ -318,8 +351,12 @@ def set(key, value, ttl, cas, options)
guard_max_value(key, value) do
req = [REQUEST, OPCODES[multi? ? :setq : :set], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, cas, flags, ttl, key, value].pack(FORMAT[:set])
write(req)
@pending_multi_response ||= multi?
cas_response unless multi?
if multi?
@pending_multi_response = true
@pending_write_count += 1
else
cas_response
end
end
end

Expand All @@ -330,8 +367,12 @@ def add(key, value, ttl, options)
guard_max_value(key, value) do
req = [REQUEST, OPCODES[multi? ? :addq : :add], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, 0, flags, ttl, key, value].pack(FORMAT[:add])
write(req)
@pending_multi_response ||= multi?
cas_response unless multi?
if multi?
@pending_multi_response = true
@pending_write_count += 1
else
cas_response
end
end
end

Expand All @@ -342,16 +383,24 @@ def replace(key, value, ttl, cas, options)
guard_max_value(key, value) do
req = [REQUEST, OPCODES[multi? ? :replaceq : :replace], key.bytesize, 8, 0, 0, value.bytesize + key.bytesize + 8, 0, cas, flags, ttl, key, value].pack(FORMAT[:replace])
write(req)
@pending_multi_response ||= multi?
cas_response unless multi?
if multi?
@pending_multi_response = true
@pending_write_count += 1
else
cas_response
end
end
end

def delete(key, cas)
req = [REQUEST, OPCODES[multi? ? :deleteq : :delete], key.bytesize, 0, 0, 0, key.bytesize, 0, cas, key].pack(FORMAT[:delete])
write(req)
@pending_multi_response ||= multi?
generic_response unless multi?
if multi?
@pending_multi_response = true
@pending_write_count += 1
else
generic_response
end
end

def flush(ttl)
Expand Down
96 changes: 95 additions & 1 deletion test/test_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,43 @@
end
end

it 'flushes pending_multi_response before writing' do
it 'drains pending_multi_response with noop when write count is at or below threshold' do
memcached_persistent do |dc, port|
ring = dc.send(:ring)
s = ring.servers.first
assert s.alive?

original_sock = s.sock
s.instance_variable_set(:@pending_multi_response, true)
s.instance_variable_set(:@pending_write_count, Dalli::Server::MAX_SAFE_DRAIN_COUNT)

s.expects(:noop).once
s.multi_response_start(['somekey'])

assert_equal false, s.instance_variable_get(:@pending_multi_response)
assert_equal 0, s.instance_variable_get(:@pending_write_count)
assert_equal original_sock, s.sock, "Socket should be reused when write count is within threshold"

s.multi_response_abort
end
end

it 'closes and reconnects when pending write count exceeds threshold' do
memcached_persistent do |dc, port|
ring = dc.send(:ring)
s = ring.servers.first
assert s.alive?

original_sock = s.sock
s.instance_variable_set(:@pending_multi_response, true)
s.instance_variable_set(:@pending_write_count, Dalli::Server::MAX_SAFE_DRAIN_COUNT + 1)

s.multi_response_start(['somekey'])

assert_equal false, s.instance_variable_get(:@pending_multi_response)
assert_equal 0, s.instance_variable_get(:@pending_write_count)
refute_nil s.sock, "Socket should be open after reconnect"
refute_equal original_sock, s.sock, "Socket should be a new connection when write count exceeds threshold"

s.multi_response_abort
end
Expand Down Expand Up @@ -180,6 +205,35 @@
end
end

describe 'pending_write_count' do
it 'increments on quiet set and resets after drain' do
memcached_persistent do |dc|
ring = dc.send(:ring)
s = ring.servers.first
assert s.alive?

original_dalli_multi = Thread.current[:dalli_multi]
begin
Thread.current[:dalli_multi] = true
s.request(:set, 'k1', 'v', 100, 0, {})
s.request(:set, 'k2', 'v', 100, 0, {})
s.request(:set, 'k3', 'v', 100, 0, {})
ensure
Thread.current[:dalli_multi] = original_dalli_multi
end

assert_equal 3, s.instance_variable_get(:@pending_write_count)
assert_equal true, s.instance_variable_get(:@pending_multi_response)

# A subsequent read (below threshold) drains with noop, resets count
s.request(:get, 'k1')

assert_equal 0, s.instance_variable_get(:@pending_write_count)
assert_equal false, s.instance_variable_get(:@pending_multi_response)
end
end
end

describe 'request error handling' do
it 'closes socket on Timeout::Error and re-raises' do
memcached_persistent do |dc|
Expand All @@ -198,6 +252,46 @@
end
end

it 'drains pending_multi_response with noop when write count is at or below threshold' do
memcached_persistent do |dc|
ring = dc.send(:ring)
s = ring.servers.first
assert s.alive?

original_sock = s.sock
s.instance_variable_set(:@pending_multi_response, true)
s.instance_variable_set(:@pending_write_count, Dalli::Server::MAX_SAFE_DRAIN_COUNT)

s.expects(:noop).once
result = s.request(:get, 'somekey')

assert_equal false, s.instance_variable_get(:@pending_multi_response)
assert_equal 0, s.instance_variable_get(:@pending_write_count)
assert_equal original_sock, s.sock, "Socket should be reused when write count is within threshold"
assert_nil result, "GET on non-existent key should return nil"
end
end

it 'closes and reconnects when pending write count exceeds threshold before a non-multi op' do
memcached_persistent do |dc|
ring = dc.send(:ring)
s = ring.servers.first
assert s.alive?

original_sock = s.sock
s.instance_variable_set(:@pending_multi_response, true)
s.instance_variable_set(:@pending_write_count, Dalli::Server::MAX_SAFE_DRAIN_COUNT + 1)

result = s.request(:get, 'somekey')

assert_equal false, s.instance_variable_get(:@pending_multi_response)
assert_equal 0, s.instance_variable_get(:@pending_write_count)
refute_nil s.sock, "Socket should be open after reconnect"
refute_equal original_sock, s.sock, "Socket should be a new connection when write count exceeds threshold"
assert_nil result, "GET on non-existent key should return nil"
end
end

it 'returns false on MarshalError' do
memcached_persistent do |dc|
ring = dc.send(:ring)
Expand Down
Loading