diff --git a/lib/invoker/dns_cache.rb b/lib/invoker/dns_cache.rb index 1ca0069..685e36e 100644 --- a/lib/invoker/dns_cache.rb +++ b/lib/invoker/dns_cache.rb @@ -2,22 +2,84 @@ module Invoker class DNSCache attr_accessor :dns_data + # INI: + # [app] + # port = 8000 + # location = www2.app/blah + # + # [chat] + # port = 8001 + # location www.app/chat + # + # [api] + # port = 8002 + # location = www.app/api + # + # dns_data: + # { + # 'api' => [ + # ['', 8002] + # ], + # 'app' => [ + # ['', 8000] + # ], + # 'chat' => [ + # ['', 8001] + # ], + # 'www.app' => [ + # ['', 8000], + # ['/api, 8002], + # ['/chat, 8001] + # ], + # 'www2.app' => [ + # ['/blah', 8000] + # ] + # } def initialize(config) - self.dns_data = {} + self.dns_data = Hash.new {|h,k| h[k] = []} @dns_mutex = Mutex.new Invoker.config.processes.each do |process| if process.port - dns_data[process.label] = { 'port' => process.port } + add(process.label, process.port) + if process.location + process.location.split(' ').each do |loc| + add(loc, process.port) + end + end end end end - def [](process_name) - @dns_mutex.synchronize { dns_data[process_name] } + def find_process(host, path) + @dns_mutex.synchronize { + until dns_data.include?(host) || host.nil? + host = host.split('.', 2)[1] + end + + dns_data[host].reverse_each {|prefix, port| + if path_matches_prefix?(path, prefix) + return {:port => port} + end + } + + nil + } + end + + def add(location, port) + @dns_mutex.synchronize { + host, path_prefix = split_host_path(location) + (dns_data[host] << [path_prefix.to_s, port]).sort_by! &:length + } + end + + private + def split_host_path(label) + label.split(%r{(?=/)}, 2) end - def add(name, port) - @dns_mutex.synchronize { dns_data[name] = { 'port' => port } } + def path_matches_prefix?(path, prefix) + path.start_with?(prefix) && [nil, "/", "?"].member?(path[prefix.length]) end end end diff --git a/lib/invoker/ipc/dns_check_command.rb b/lib/invoker/ipc/dns_check_command.rb index 48876e1..f1ff697 100644 --- a/lib/invoker/ipc/dns_check_command.rb +++ b/lib/invoker/ipc/dns_check_command.rb @@ -2,12 +2,9 @@ module Invoker module IPC class DnsCheckCommand < BaseCommand def run_command(message_object) - process_detail = Invoker.dns_cache[message_object.process_name] + process_detail = Invoker.dns_cache.find_process(message_object.host, message_object.path) - dns_check_response = Invoker::IPC::Message::DnsCheckResponse.new( - process_name: message_object.process_name, - port: process_detail ? process_detail['port'] : nil - ) + dns_check_response = Invoker::IPC::Message::DnsCheckResponse.new(process_detail || {}) send_data(dns_check_response) true end diff --git a/lib/invoker/ipc/message.rb b/lib/invoker/ipc/message.rb index f4a22f2..e48a8d8 100644 --- a/lib/invoker/ipc/message.rb +++ b/lib/invoker/ipc/message.rb @@ -146,12 +146,12 @@ class Remove < Base class DnsCheck < Base include Serialization - message_attributes :process_name + message_attributes :host, :path end class DnsCheckResponse < Base include Serialization - message_attributes :process_name, :port + message_attributes :port end class Ping < Base diff --git a/lib/invoker/parsers/config.rb b/lib/invoker/parsers/config.rb index f8dc67b..a65f70c 100644 --- a/lib/invoker/parsers/config.rb +++ b/lib/invoker/parsers/config.rb @@ -94,6 +94,7 @@ def make_pconfig(section) cmd: section["command"] } pconfig['port'] = section['port'] if section['port'] + pconfig['location'] = section['location'] if section['location'] pconfig['disable_autorun'] = section['disable_autorun'] if section['disable_autorun'] OpenStruct.new(pconfig) diff --git a/lib/invoker/power/balancer.rb b/lib/invoker/power/balancer.rb index 91f76d1..71088b3 100644 --- a/lib/invoker/power/balancer.rb +++ b/lib/invoker/power/balancer.rb @@ -22,8 +22,8 @@ def post_init class Balancer attr_accessor :connection, :http_parser, :session, :protocol - DEV_MATCH_REGEX = /([\w-]+)\.dev(\:\d+)?$/ - XIP_IO_MATCH_REGEX = /([\w-]+)\.\d+\.\d+\.\d+\.\d+\.xip\.io(\:\d+)?$/ + DEV_MATCH_REGEX = /([\w.-]+)\.dev(\:\d+)?$/ + XIP_IO_MATCH_REGEX = /([\w.-]+)\.\d+\.\d+\.\d+\.\d+\.xip\.io(\:\d+)?$/ def self.run(options = {}) start_http_proxy(InvokerHttpProxy, 'http', options) @@ -43,29 +43,47 @@ def initialize(connection, protocol) @connection = connection @protocol = protocol @http_parser = HttpParser.new(protocol) + @response_http_parser = HttpParser.new(protocol) @session = nil @buffer = [] end def install_callbacks + http_parser.on_url { |url| url_received(url) } http_parser.on_headers_complete { |headers| headers_received(headers) } http_parser.on_message_complete { |full_message| complete_message_received(full_message) } + + @response_http_parser.on_message_complete { response_complete } + connection.on_data { |data| upstream_data(data) } connection.on_response { |backend, data| backend_data(backend, data) } connection.on_finish { |backend, name| frontend_disconnect(backend, name) } end + def response_complete + # disconnect from backend after each response, since subsequent + # requests from persistent HTTP connections might be proxied to a stale + # backend + EventMachine.next_tick do # still needs chance to send data + connection.unbind_backend(@session) + @session = nil + @response_http_parser.reset + http_parser.reset + end + end + def complete_message_received(full_message) connection.relay_to_servers(full_message) http_parser.reset end + def url_received(url) + @path = url + end + def headers_received(headers) - if @session - return - end @session = UUID.generate() - dns_check_response = select_backend_config(headers['Host']) + dns_check_response = select_backend_config(headers['Host'], @path) if dns_check_response && dns_check_response.port connection.server(session, host: '0.0.0.0', port: dns_check_response.port) else @@ -88,6 +106,7 @@ def append_for_http_parsing(data) end def backend_data(backend, data) + @response_http_parser << data @backend_data = true data end @@ -107,11 +126,11 @@ def extract_host_from_domain(host) private - def select_backend_config(host) - matching_string = extract_host_from_domain(host) + def select_backend_config(domain, path) + matching_string = extract_host_from_domain(domain) return nil unless matching_string - if selected_app = matching_string[1] - dns_check(process_name: selected_app) + if host = matching_string[1] + dns_check(host: host, path: path) else nil end diff --git a/lib/invoker/power/http_parser.rb b/lib/invoker/power/http_parser.rb index 430e47a..af1f0e5 100644 --- a/lib/invoker/power/http_parser.rb +++ b/lib/invoker/power/http_parser.rb @@ -8,6 +8,7 @@ def initialize(protocol) @parser = HTTP::Parser.new @header = {} initialize_message_content + parser.on_url { |url| url_received(url) } parser.on_headers_complete { complete_headers_received } parser.on_header_field { |field_name| @last_key = field_name } parser.on_header_value { |field_value| header_value_received(field_value) } @@ -15,6 +16,10 @@ def initialize(protocol) parser.on_message_complete { complete_message_received } end + def on_url(&block) + @on_url_callback = block + end + # define a callback for invoking when complete header is parsed def on_headers_complete(&block) @on_headers_complete_callback = block @@ -63,6 +68,12 @@ def complete_headers_received @on_headers_complete_callback.call(@header) end end + + def url_received(url) + if @on_url_callback + @on_url_callback.call(url) + end + end end end end diff --git a/spec/invoker/config_spec.rb b/spec/invoker/config_spec.rb index d915741..ae227d7 100644 --- a/spec/invoker/config_spec.rb +++ b/spec/invoker/config_spec.rb @@ -183,7 +183,7 @@ expect(dns_cache.dns_data).to_not be_empty expect(dns_cache.dns_data['web']).to_not be_empty - expect(dns_cache.dns_data['web']['port']).to eql 9001 + expect(dns_cache.dns_data['web'].first[1]).to eql 9001 ensure File.delete("/tmp/Procfile") end diff --git a/spec/invoker/ipc/dns_check_command_spec.rb b/spec/invoker/ipc/dns_check_command_spec.rb index ca0955c..9f72c34 100644 --- a/spec/invoker/ipc/dns_check_command_spec.rb +++ b/spec/invoker/ipc/dns_check_command_spec.rb @@ -5,9 +5,9 @@ let(:client) { Invoker::IPC::ClientHandler.new(client_socket) } describe "dns check for valid process" do - let(:message_object) { MM::DnsCheck.new(process_name: 'lolbro') } + let(:message_object) { MM::DnsCheck.new(host: 'lolbro', path: '/') } it "should response with dns check response" do - invoker_dns_cache.expects(:[]).returns('port' => 9000) + invoker_dns_cache.expects(:find_process).returns('process_name' => 'lolbro', 'port' => 9000) client_socket.string = message_object.encoded_message client.read_and_execute @@ -18,9 +18,9 @@ end describe "dns check for invalid process" do - let(:message_object) { MM::DnsCheck.new(process_name: 'foo') } + let(:message_object) { MM::DnsCheck.new(host: 'foo', path: '/') } it "should response with dns check response" do - invoker_dns_cache.expects(:[]).returns('port' => nil) + invoker_dns_cache.expects(:find_process).returns(nil) client_socket.string = message_object.encoded_message client.read_and_execute diff --git a/spec/invoker/power/balancer_spec.rb b/spec/invoker/power/balancer_spec.rb index b4a41b8..aa80ef0 100644 --- a/spec/invoker/power/balancer_spec.rb +++ b/spec/invoker/power/balancer_spec.rb @@ -27,7 +27,7 @@ expect(match).to_not be_nil matching_string = match[1] - expect(matching_string).to eq("bar") + expect(matching_string).to eq("emacs.bar") end it "should match hello-world.dev" do @@ -61,7 +61,7 @@ expect(match).to_not be_nil matching_string = match[1] - expect(matching_string).to eq("bar") + expect(matching_string).to eq("emacs.bar") end it "should match hello-world.10.0.0.1.xip.io" do diff --git a/spec/invoker/power/http_parser_spec.rb b/spec/invoker/power/http_parser_spec.rb index 7157256..17d4df8 100644 --- a/spec/invoker/power/http_parser_spec.rb +++ b/spec/invoker/power/http_parser_spec.rb @@ -5,6 +5,16 @@ describe "complete message received" do before { parser.reset } + it "should call url received with url" do + @header, @path = nil + parser.on_headers_complete { |header| @header = header } + parser.on_url { |url| @path = url } + parser << "GET /blah HTTP/1.1\r\n" + parser << "Host: localhost\r\n" + + expect(@path).to eql "/blah" + end + it "should call header received with full header" do @header = nil parser.on_headers_complete { |header| @header = header }