From d4612d9db09a99b878eeecb9e710ed5a170cb63f Mon Sep 17 00:00:00 2001 From: David Lee Date: Fri, 27 Feb 2015 02:43:00 -0800 Subject: [PATCH 1/4] Support subdomain and paths in process labels --- lib/invoker/dns_cache.rb | 58 ++++++++++++++++++++-- lib/invoker/ipc/dns_check_command.rb | 7 +-- lib/invoker/ipc/message.rb | 2 +- lib/invoker/power/balancer.rb | 19 ++++--- lib/invoker/power/http_parser.rb | 11 ++++ spec/invoker/config_spec.rb | 2 +- spec/invoker/ipc/dns_check_command_spec.rb | 8 +-- spec/invoker/power/balancer_spec.rb | 4 +- spec/invoker/power/http_parser_spec.rb | 10 ++++ 9 files changed, 96 insertions(+), 25 deletions(-) diff --git a/lib/invoker/dns_cache.rb b/lib/invoker/dns_cache.rb index 1ca0069..e213dc7 100644 --- a/lib/invoker/dns_cache.rb +++ b/lib/invoker/dns_cache.rb @@ -2,22 +2,70 @@ module Invoker class DNSCache attr_accessor :dns_data + # INI: + # [www.app] + # port = 8000 + # + # [www.app/chat] + # port = 8001 + # + # [app] + # port = 8002 + # + # [app/api] + # port = 8003 + # + # dns_data: + # { + # 'www.app' => [ + # ['/', 8000], + # ['/chat', 8001] + # ], + # 'app' => [ + # ['/', 8002], + # ['/api, 8003] + # ] + # } def initialize(config) - self.dns_data = {} + self.dns_data = Hash.new {|h,k| h[k] = []} @dns_mutex = Mutex.new Invoker.config.processes.each do |process| if process.port - dns_data[process.label] = { 'port' => process.port } + add(process.label, process.port) end end end - def [](process_name) - @dns_mutex.synchronize { dns_data[process_name] } + def find_process(host, path) + @dns_mutex.synchronize { + until dns_data.include?(host) || host.nil? + host = host.split('.', 2)[1] + end + + dns_data[host].reverse_each {|prefix, label, port| + if path_matches_prefix?(path, prefix) + return {:process_name => label, :port => port} + end + } + + nil + } end def add(name, port) - @dns_mutex.synchronize { dns_data[name] = { 'port' => port } } + @dns_mutex.synchronize { + host, path_prefix = split_host_path(name) + (dns_data[host] << [path_prefix.to_s, name, port]).sort! + } + end + + private + def split_host_path(label) + label.split(%r{(?=/)}, 2) + end + + def path_matches_prefix?(path, prefix) + path == prefix || path.start_with?(prefix + '/') end end end diff --git a/lib/invoker/ipc/dns_check_command.rb b/lib/invoker/ipc/dns_check_command.rb index 48876e1..f1ff697 100644 --- a/lib/invoker/ipc/dns_check_command.rb +++ b/lib/invoker/ipc/dns_check_command.rb @@ -2,12 +2,9 @@ module Invoker module IPC class DnsCheckCommand < BaseCommand def run_command(message_object) - process_detail = Invoker.dns_cache[message_object.process_name] + process_detail = Invoker.dns_cache.find_process(message_object.host, message_object.path) - dns_check_response = Invoker::IPC::Message::DnsCheckResponse.new( - process_name: message_object.process_name, - port: process_detail ? process_detail['port'] : nil - ) + dns_check_response = Invoker::IPC::Message::DnsCheckResponse.new(process_detail || {}) send_data(dns_check_response) true end diff --git a/lib/invoker/ipc/message.rb b/lib/invoker/ipc/message.rb index f4a22f2..2d7d6b0 100644 --- a/lib/invoker/ipc/message.rb +++ b/lib/invoker/ipc/message.rb @@ -146,7 +146,7 @@ class Remove < Base class DnsCheck < Base include Serialization - message_attributes :process_name + message_attributes :host, :path end class DnsCheckResponse < Base diff --git a/lib/invoker/power/balancer.rb b/lib/invoker/power/balancer.rb index 91f76d1..77cdc5b 100644 --- a/lib/invoker/power/balancer.rb +++ b/lib/invoker/power/balancer.rb @@ -22,8 +22,8 @@ def post_init class Balancer attr_accessor :connection, :http_parser, :session, :protocol - DEV_MATCH_REGEX = /([\w-]+)\.dev(\:\d+)?$/ - XIP_IO_MATCH_REGEX = /([\w-]+)\.\d+\.\d+\.\d+\.\d+\.xip\.io(\:\d+)?$/ + DEV_MATCH_REGEX = /([\w.-]+)\.dev(\:\d+)?$/ + XIP_IO_MATCH_REGEX = /([\w.-]+)\.\d+\.\d+\.\d+\.\d+\.xip\.io(\:\d+)?$/ def self.run(options = {}) start_http_proxy(InvokerHttpProxy, 'http', options) @@ -48,6 +48,7 @@ def initialize(connection, protocol) end def install_callbacks + http_parser.on_url { |url| url_received(url) } http_parser.on_headers_complete { |headers| headers_received(headers) } http_parser.on_message_complete { |full_message| complete_message_received(full_message) } connection.on_data { |data| upstream_data(data) } @@ -60,12 +61,16 @@ def complete_message_received(full_message) http_parser.reset end + def url_received(url) + @path = url + end + def headers_received(headers) if @session return end @session = UUID.generate() - dns_check_response = select_backend_config(headers['Host']) + dns_check_response = select_backend_config(headers['Host'], @path) if dns_check_response && dns_check_response.port connection.server(session, host: '0.0.0.0', port: dns_check_response.port) else @@ -107,11 +112,11 @@ def extract_host_from_domain(host) private - def select_backend_config(host) - matching_string = extract_host_from_domain(host) + def select_backend_config(domain, path) + matching_string = extract_host_from_domain(domain) return nil unless matching_string - if selected_app = matching_string[1] - dns_check(process_name: selected_app) + if host = matching_string[1] + dns_check(host: host, path: path) else nil end diff --git a/lib/invoker/power/http_parser.rb b/lib/invoker/power/http_parser.rb index 430e47a..af1f0e5 100644 --- a/lib/invoker/power/http_parser.rb +++ b/lib/invoker/power/http_parser.rb @@ -8,6 +8,7 @@ def initialize(protocol) @parser = HTTP::Parser.new @header = {} initialize_message_content + parser.on_url { |url| url_received(url) } parser.on_headers_complete { complete_headers_received } parser.on_header_field { |field_name| @last_key = field_name } parser.on_header_value { |field_value| header_value_received(field_value) } @@ -15,6 +16,10 @@ def initialize(protocol) parser.on_message_complete { complete_message_received } end + def on_url(&block) + @on_url_callback = block + end + # define a callback for invoking when complete header is parsed def on_headers_complete(&block) @on_headers_complete_callback = block @@ -63,6 +68,12 @@ def complete_headers_received @on_headers_complete_callback.call(@header) end end + + def url_received(url) + if @on_url_callback + @on_url_callback.call(url) + end + end end end end diff --git a/spec/invoker/config_spec.rb b/spec/invoker/config_spec.rb index d915741..941f12c 100644 --- a/spec/invoker/config_spec.rb +++ b/spec/invoker/config_spec.rb @@ -183,7 +183,7 @@ expect(dns_cache.dns_data).to_not be_empty expect(dns_cache.dns_data['web']).to_not be_empty - expect(dns_cache.dns_data['web']['port']).to eql 9001 + expect(dns_cache.dns_data['web'].first[2]).to eql 9001 ensure File.delete("/tmp/Procfile") end diff --git a/spec/invoker/ipc/dns_check_command_spec.rb b/spec/invoker/ipc/dns_check_command_spec.rb index ca0955c..9f72c34 100644 --- a/spec/invoker/ipc/dns_check_command_spec.rb +++ b/spec/invoker/ipc/dns_check_command_spec.rb @@ -5,9 +5,9 @@ let(:client) { Invoker::IPC::ClientHandler.new(client_socket) } describe "dns check for valid process" do - let(:message_object) { MM::DnsCheck.new(process_name: 'lolbro') } + let(:message_object) { MM::DnsCheck.new(host: 'lolbro', path: '/') } it "should response with dns check response" do - invoker_dns_cache.expects(:[]).returns('port' => 9000) + invoker_dns_cache.expects(:find_process).returns('process_name' => 'lolbro', 'port' => 9000) client_socket.string = message_object.encoded_message client.read_and_execute @@ -18,9 +18,9 @@ end describe "dns check for invalid process" do - let(:message_object) { MM::DnsCheck.new(process_name: 'foo') } + let(:message_object) { MM::DnsCheck.new(host: 'foo', path: '/') } it "should response with dns check response" do - invoker_dns_cache.expects(:[]).returns('port' => nil) + invoker_dns_cache.expects(:find_process).returns(nil) client_socket.string = message_object.encoded_message client.read_and_execute diff --git a/spec/invoker/power/balancer_spec.rb b/spec/invoker/power/balancer_spec.rb index b4a41b8..aa80ef0 100644 --- a/spec/invoker/power/balancer_spec.rb +++ b/spec/invoker/power/balancer_spec.rb @@ -27,7 +27,7 @@ expect(match).to_not be_nil matching_string = match[1] - expect(matching_string).to eq("bar") + expect(matching_string).to eq("emacs.bar") end it "should match hello-world.dev" do @@ -61,7 +61,7 @@ expect(match).to_not be_nil matching_string = match[1] - expect(matching_string).to eq("bar") + expect(matching_string).to eq("emacs.bar") end it "should match hello-world.10.0.0.1.xip.io" do diff --git a/spec/invoker/power/http_parser_spec.rb b/spec/invoker/power/http_parser_spec.rb index 7157256..17d4df8 100644 --- a/spec/invoker/power/http_parser_spec.rb +++ b/spec/invoker/power/http_parser_spec.rb @@ -5,6 +5,16 @@ describe "complete message received" do before { parser.reset } + it "should call url received with url" do + @header, @path = nil + parser.on_headers_complete { |header| @header = header } + parser.on_url { |url| @path = url } + parser << "GET /blah HTTP/1.1\r\n" + parser << "Host: localhost\r\n" + + expect(@path).to eql "/blah" + end + it "should call header received with full header" do @header = nil parser.on_headers_complete { |header| @header = header } From 507288089e810779d23e4e13d895d1899b531ecf Mon Sep 17 00:00:00 2001 From: David Lee Date: Mon, 2 Mar 2015 00:37:43 -0800 Subject: [PATCH 2/4] Add support for 'location' k --- lib/invoker/dns_cache.rb | 46 +++++++++++++++++++++++------------ lib/invoker/ipc/message.rb | 2 +- lib/invoker/parsers/config.rb | 1 + spec/invoker/config_spec.rb | 2 +- 4 files changed, 33 insertions(+), 18 deletions(-) diff --git a/lib/invoker/dns_cache.rb b/lib/invoker/dns_cache.rb index e213dc7..d8406c5 100644 --- a/lib/invoker/dns_cache.rb +++ b/lib/invoker/dns_cache.rb @@ -3,27 +3,36 @@ class DNSCache attr_accessor :dns_data # INI: - # [www.app] + # [app] # port = 8000 + # location = www2.app/blah # - # [www.app/chat] + # [chat] # port = 8001 + # location www.app/chat # - # [app] + # [api] # port = 8002 - # - # [app/api] - # port = 8003 + # location = www.app/api # # dns_data: # { - # 'www.app' => [ - # ['/', 8000], - # ['/chat', 8001] + # 'api' => [ + # ['', 8002] # ], # 'app' => [ - # ['/', 8002], - # ['/api, 8003] + # ['', 8000] + # ], + # 'chat' => [ + # ['', 8001] + # ], + # 'www.app' => [ + # ['', 8000], + # ['/api, 8002], + # ['/chat, 8001] + # ], + # 'www2.app' => [ + # ['/blah', 8000] # ] # } def initialize(config) @@ -32,6 +41,11 @@ def initialize(config) Invoker.config.processes.each do |process| if process.port add(process.label, process.port) + if process.location + process.location.split(' ').each do |loc| + add(loc, process.port) + end + end end end end @@ -42,9 +56,9 @@ def find_process(host, path) host = host.split('.', 2)[1] end - dns_data[host].reverse_each {|prefix, label, port| + dns_data[host].reverse_each {|prefix, port| if path_matches_prefix?(path, prefix) - return {:process_name => label, :port => port} + return {:port => port} end } @@ -52,10 +66,10 @@ def find_process(host, path) } end - def add(name, port) + def add(location, port) @dns_mutex.synchronize { - host, path_prefix = split_host_path(name) - (dns_data[host] << [path_prefix.to_s, name, port]).sort! + host, path_prefix = split_host_path(location) + (dns_data[host] << [path_prefix.to_s, port]).sort_by! &:length } end diff --git a/lib/invoker/ipc/message.rb b/lib/invoker/ipc/message.rb index 2d7d6b0..e48a8d8 100644 --- a/lib/invoker/ipc/message.rb +++ b/lib/invoker/ipc/message.rb @@ -151,7 +151,7 @@ class DnsCheck < Base class DnsCheckResponse < Base include Serialization - message_attributes :process_name, :port + message_attributes :port end class Ping < Base diff --git a/lib/invoker/parsers/config.rb b/lib/invoker/parsers/config.rb index f8dc67b..a65f70c 100644 --- a/lib/invoker/parsers/config.rb +++ b/lib/invoker/parsers/config.rb @@ -94,6 +94,7 @@ def make_pconfig(section) cmd: section["command"] } pconfig['port'] = section['port'] if section['port'] + pconfig['location'] = section['location'] if section['location'] pconfig['disable_autorun'] = section['disable_autorun'] if section['disable_autorun'] OpenStruct.new(pconfig) diff --git a/spec/invoker/config_spec.rb b/spec/invoker/config_spec.rb index 941f12c..ae227d7 100644 --- a/spec/invoker/config_spec.rb +++ b/spec/invoker/config_spec.rb @@ -183,7 +183,7 @@ expect(dns_cache.dns_data).to_not be_empty expect(dns_cache.dns_data['web']).to_not be_empty - expect(dns_cache.dns_data['web'].first[2]).to eql 9001 + expect(dns_cache.dns_data['web'].first[1]).to eql 9001 ensure File.delete("/tmp/Procfile") end From 7f9e9fc4c0609b37057d1c7c098dc08880da5b94 Mon Sep 17 00:00:00 2001 From: David Lee Date: Mon, 2 Mar 2015 05:04:00 -0800 Subject: [PATCH 3/4] Handle paths with queries more robustly --- lib/invoker/dns_cache.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/invoker/dns_cache.rb b/lib/invoker/dns_cache.rb index d8406c5..685e36e 100644 --- a/lib/invoker/dns_cache.rb +++ b/lib/invoker/dns_cache.rb @@ -79,7 +79,7 @@ def split_host_path(label) end def path_matches_prefix?(path, prefix) - path == prefix || path.start_with?(prefix + '/') + path.start_with?(prefix) && [nil, "/", "?"].member?(path[prefix.length]) end end end From d84a348678e6f9dc76871fd32750d3dc0837c34e Mon Sep 17 00:00:00 2001 From: David Lee Date: Mon, 2 Mar 2015 05:04:57 -0800 Subject: [PATCH 4/4] Disconnect from backend after each response This is to avoid issues where subsequent requests from a persistent HTTP connection would be directed to a stale proxy regardless of the location. Only a problem now because we're taking paths into consideration for proxy routing, whereas before, backends could be reused since persistent HTTP connections were guaranteed to end up with the same backend anyway. --- lib/invoker/power/balancer.rb | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/invoker/power/balancer.rb b/lib/invoker/power/balancer.rb index 77cdc5b..71088b3 100644 --- a/lib/invoker/power/balancer.rb +++ b/lib/invoker/power/balancer.rb @@ -43,6 +43,7 @@ def initialize(connection, protocol) @connection = connection @protocol = protocol @http_parser = HttpParser.new(protocol) + @response_http_parser = HttpParser.new(protocol) @session = nil @buffer = [] end @@ -51,11 +52,26 @@ def install_callbacks http_parser.on_url { |url| url_received(url) } http_parser.on_headers_complete { |headers| headers_received(headers) } http_parser.on_message_complete { |full_message| complete_message_received(full_message) } + + @response_http_parser.on_message_complete { response_complete } + connection.on_data { |data| upstream_data(data) } connection.on_response { |backend, data| backend_data(backend, data) } connection.on_finish { |backend, name| frontend_disconnect(backend, name) } end + def response_complete + # disconnect from backend after each response, since subsequent + # requests from persistent HTTP connections might be proxied to a stale + # backend + EventMachine.next_tick do # still needs chance to send data + connection.unbind_backend(@session) + @session = nil + @response_http_parser.reset + http_parser.reset + end + end + def complete_message_received(full_message) connection.relay_to_servers(full_message) http_parser.reset @@ -66,9 +82,6 @@ def url_received(url) end def headers_received(headers) - if @session - return - end @session = UUID.generate() dns_check_response = select_backend_config(headers['Host'], @path) if dns_check_response && dns_check_response.port @@ -93,6 +106,7 @@ def append_for_http_parsing(data) end def backend_data(backend, data) + @response_http_parser << data @backend_data = true data end