Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions bin/benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dalli_url = ENV['BENCH_CACHE_URL'] || "127.0.0.1:11211"
if dalli_url.include?('unix')
ENV['BENCH_CACHE_URL'].gsub('unix://','')
end
bench_target = ENV['BENCH_TARGET'] || 'set'
bench_target = ENV['BENCH_TARGET'] || 'all'
bench_time = (ENV['BENCH_TIME'] || 10).to_i
bench_warmup = (ENV['BENCH_WARMUP'] || 3).to_i
bench_payload_size = (ENV['BENCH_PAYLOAD_SIZE'] || 700_000).to_i
Expand Down Expand Up @@ -67,6 +67,14 @@ sock.write(TERMINATOR)
sock.flush
sock.readline # clear the buffer

raise 'dalli client mismatch' if payload != client.get('key')

sock.write("mg sock_key v\r\n")
sock.readline
sock_value = sock.read(payload.bytesize)
sock.read(TERMINATOR.bytesize)
raise 'sock mismatch' if payload != sock_value

# ensure we have basic data for the benchmarks and get calls
payload_smaller = 'B' * 50_000
pairs = {}
Expand Down Expand Up @@ -151,17 +159,22 @@ end
if %w[all get].include?(bench_target)
Benchmark.ips do |x|
x.config(warmup: bench_warmup, time: bench_time, suite: suite)
x.report('get dalli') { client.get('key') }
x.report('get dalli') do
result = client.get('key')
raise 'mismatch' unless result == payload
end
# NOTE: while this is the fastest it is not thread safe and is blocking vs IO sharing friendly
x.report('get sock') do
sock.write("get sock_key\r\n")
sock.write("mg sock_key v\r\n")
sock.readline
sock.read(payload.bytesize)
result = sock.read(payload.bytesize)
sock.read(TERMINATOR.bytesize)
raise 'mismatch' unless result == payload
end
# NOTE: This shows that when adding thread safety & non-blocking IO we are slower for single process/thread use case
x.report('get sock non-blocking') do
@lock.synchronize do
sock.write("get sock_key\r\n")
sock.write("mg sock_key v\r\n")
sock.readline
count = payload.bytesize
value = String.new(capacity: count + 1)
Expand All @@ -177,6 +190,8 @@ if %w[all get].include?(bench_target)
end
break if value.bytesize == count
end
sock.read(TERMINATOR.bytesize)
raise 'mismatch' unless value == payload
end
end
x.compare!
Expand All @@ -186,8 +201,14 @@ end
if %w[all get_multi].include?(bench_target)
Benchmark.ips do |x|
x.config(warmup: bench_warmup, time: bench_time, suite: suite)
x.report('get 100 keys') { client.get_multi(pairs.keys) }
x.report('get 100 keys raw sock') { sock_get_multi(sock, pairs) }
x.report('get 100 keys') do
result = client.get_multi(pairs.keys)
raise 'mismatch' unless result == pairs
end
x.report('get 100 keys raw sock') do
result = sock_get_multi(sock, pairs)
raise 'mismatch' unless result == pairs
end
x.compare!
end
end
Expand Down
18 changes: 18 additions & 0 deletions bin/profile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#
# run with:
# RUBY_YJIT_ENABLE=1 bundle exec bin/profile
# view profiles locally with (add to gemfile for easy access):
# bundle exec profile-viewer client_get_multi_allocations.json
require 'bundler/inline'
require 'json'

Expand Down Expand Up @@ -118,6 +120,22 @@ def sock_set_multi(sock, pairs)
sock.gets(TERMINATOR) # clear the buffer
end

def allocations
x = GC.stat(:total_allocated_objects)
yield
GC.stat(:total_allocated_objects) - x
end

if bench_target == 'get_multi_allocations'
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
puts "allocations:"
puts allocations { client.get_multi(pairs.keys) }
Vernier.profile(out: 'client_get_multi_allocations.json', allocation_interval: 1) do
client.get_multi(pairs.keys) while Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time < bench_time
end
exit 0
end

if %w[all get].include?(bench_target)
Vernier.profile(out: 'client_get_profile.json') do
start_time = Time.now
Expand Down
2 changes: 1 addition & 1 deletion lib/dalli/protocol/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ def quiet?
alias multi? quiet?

# NOTE: Additional public methods should be overridden in Dalli::Threadsafe
ALLOWED_QUIET_OPS = %i[add replace set delete incr decr append prepend flush noop].freeze

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not understanding the reason for making this public (at least as part of this PR)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rubocop update complained about private constant.


private

ALLOWED_QUIET_OPS = %i[add replace set delete incr decr append prepend flush noop].freeze
def verify_allowed_quiet!(opkey)
return if ALLOWED_QUIET_OPS.include?(opkey)

Expand Down
12 changes: 9 additions & 3 deletions lib/dalli/protocol/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Protocol
# Manages the socket connection to the server, including ensuring liveness
# and retries.
##
class ConnectionManager
class ConnectionManager # rubocop:disable Metrics/ClassLength
DEFAULTS = {
# seconds between trying to contact a remote server
down_retry_delay: 30,
Expand Down Expand Up @@ -160,14 +160,20 @@ def read_line
error_on_request!(e)
end

def read_byte
@sock.readbyte
rescue SystemCallError, *TIMEOUT_ERRORS, EOFError => e
error_on_request!(e)
end

def read(count)
@sock.read(count)
rescue SystemCallError, *TIMEOUT_ERRORS, EOFError => e
error_on_request!(e)
end

def read_exact(count)
@sock.read(count)
def read_to_outstring(count, outstring)
@sock.read(count, outstring)
rescue SystemCallError, *TIMEOUT_ERRORS, EOFError => e
error_on_request!(e)
end
Expand Down
56 changes: 41 additions & 15 deletions lib/dalli/protocol/meta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@ module Protocol
# protocol. Contains logic for managing connection state to the server (retries, etc),
# formatting requests to the server, and unpacking responses.
##
class Meta < Base
class Meta < Base # rubocop:disable Metrics/ClassLength
TERMINATOR = "\r\n"
META_NOOP = "mn\r\n"
META_NOOP_RESP = 'MN'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where's this used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah these got refactored away, I can remove them

META_VALUE_RESP = 'VA'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above

META_GET_REQ_NO_FLAGS = "v k q\r\n"
META_GET_REQ = "v f k q\r\n"
M_CONSTANT = 'M'.ord
V_CONSTANT = 'V'.ord
SUPPORTS_CAPACITY = Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.4.0')

def response_processor
Expand Down Expand Up @@ -57,27 +64,46 @@ def read_multi_req(keys)
# Pre-allocate the results hash with expected size
results = SUPPORTS_CAPACITY ? Hash.new(nil, capacity: keys.size) : {}
optimized_for_raw = @value_marshaller.raw_by_default
key_index = optimized_for_raw ? 2 : 3
terminator_buffer = String.new(TERMINATOR, capacity: TERMINATOR.size)

post_get_req = optimized_for_raw ? "v k q\r\n" : "v f k q\r\n"
post_get_req = optimized_for_raw ? META_GET_REQ_NO_FLAGS : META_GET_REQ
keys.each do |key|
@connection_manager.write("mg #{key} #{post_get_req}")
end
@connection_manager.write("mn\r\n")
@connection_manager.write(META_NOOP)
@connection_manager.flush

terminator_length = TERMINATOR.length
while (line = @connection_manager.readline)
break if line == TERMINATOR || line[0, 2] == 'MN'
next unless line[0, 3] == 'VA '

# VA value_length flags key
tokens = line.split
value = @connection_manager.read_exact(tokens[1].to_i)
bitflags = optimized_for_raw ? 0 : @response_processor.bitflags_from_tokens(tokens)
@connection_manager.read_exact(terminator_length) # read the terminator
results[tokens[key_index].byteslice(1..-1)] =
@value_marshaller.retrieve(value, bitflags)
while (byte = @connection_manager.read_byte)
if byte == M_CONSTANT
@connection_manager.readline
break
end
unless byte == V_CONSTANT
@connection_manager.readline
next
end

@connection_manager.read_byte # skip 'A'
@connection_manager.read_byte # skip terminator
line = @connection_manager.readline
# VA value_length kNAME\r\n
# if rindex and linex are equal split out flags
right_seperator_index = line.rindex(' ')
left_seperator_index = line.index(' ')
bitflags = if right_seperator_index == left_seperator_index
0
else
line.byteslice(left_seperator_index + 2, right_seperator_index - left_seperator_index - 1).to_i
end

# +2 on index skips the space and 'k', then - 4 for the ' k' and "\r\n"
key = line.byteslice(right_seperator_index + 2, (line.length - right_seperator_index - 4))

value = @connection_manager.read(line.to_i)
@connection_manager.read_to_outstring(terminator_length, terminator_buffer)
results[key] =
bitflags.zero? ? value : @value_marshaller.retrieve(value, bitflags)
end
results
end
Expand Down
3 changes: 2 additions & 1 deletion lib/dalli/protocol/meta/response_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ResponseProcessor
def initialize(io_source, value_marshaller)
@io_source = io_source
@value_marshaller = value_marshaller
@terminator_buffer = String.new(TERMINATOR, capacity: TERMINATOR.size)
end

def meta_get_with_value(cache_nils: false, skip_flags: false)
Expand Down Expand Up @@ -249,7 +250,7 @@ def next_line_to_tokens

def read_data(data_size)
resp_data = @io_source.read(data_size)
@io_source.read(TERMINATOR.bytesize)
@io_source.read_to_outstring(TERMINATOR.bytesize, @terminator_buffer)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this removes an additional allocation for all the non multi calls

resp_data
end
end
Expand Down
Loading
Loading