Skip to content

Commit

Permalink
Switch to faye/websocket client for proxy support
Browse files Browse the repository at this point in the history
The interface should be exactly the same but the error thrown on SSL
verification failure is now WebsocketError instead of an OpenSSL error.
This error will also now be thrown from any of the methods that execute
a job if it occurs before authentication has completed.  Previously
those methods would throw a RuntimeException due to authentication not
completing within the 5 second timeout due to an error.
  • Loading branch information
Ben Keith committed Jan 10, 2020
1 parent 07ad915 commit 4792490
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 59 deletions.
40 changes: 19 additions & 21 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,51 @@ PATH
remote: .
specs:
signalfx (2.1.0)
activesupport (>= 3.2)
activesupport (>= 3.2, < 6)
faye-websocket (~> 0.10.7)
i18n (= 1.1.0)
protobuf (>= 3.5.1)
rest-client (~> 2.0)
websocket-client-simple (~> 0.3.0)
thor (= 0.20.0)

GEM
remote: https://rubygems.org/
specs:
activesupport (5.2.1)
activesupport (5.2.4.1)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (>= 0.7, < 2)
minitest (~> 5.1)
tzinfo (~> 1.1)
addressable (2.5.2)
public_suffix (>= 2.0.2, < 4.0)
coderay (1.1.1)
concurrent-ruby (1.0.5)
concurrent-ruby (1.1.5)
crack (0.4.3)
safe_yaml (~> 1.0.0)
daemons (1.2.4)
diff-lcs (1.3)
docile (1.3.1)
domain_name (0.5.20180417)
domain_name (0.5.20190701)
unf (>= 0.0.5, < 1.0.0)
event_emitter (0.2.6)
eventmachine (1.2.5)
faye-websocket (0.10.7)
faye-websocket (0.10.9)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
hashdiff (0.3.7)
http-accept (1.7.0)
http-cookie (1.0.3)
domain_name (~> 0.5)
i18n (1.1.0)
concurrent-ruby (~> 1.0)
json (2.1.0)
method_source (0.8.2)
middleware (0.1.0)
mime-types (3.2.2)
mime-types (3.3.1)
mime-types-data (~> 3.2015)
mime-types-data (3.2018.0812)
minitest (5.11.3)
mime-types-data (3.2019.1009)
minitest (5.13.0)
netrc (0.11.0)
protobuf (3.8.4)
protobuf (3.10.3)
activesupport (>= 3.2)
middleware
thor
Expand All @@ -56,7 +58,8 @@ GEM
public_suffix (3.0.2)
rack (2.0.3)
rake (10.4.2)
rest-client (2.0.2)
rest-client (2.1.0)
http-accept (>= 1.7.0, < 2.0)
http-cookie (>= 1.0.2, < 2.0)
mime-types (>= 1.16, < 4.0)
netrc (~> 0.8)
Expand Down Expand Up @@ -86,29 +89,24 @@ GEM
rack (>= 1, < 3)
thor (0.20.0)
thread_safe (0.3.6)
tzinfo (1.2.5)
tzinfo (1.2.6)
thread_safe (~> 0.1)
unf (0.1.4)
unf_ext
unf_ext (0.0.7.5)
unf_ext (0.0.7.6)
webmock (2.3.2)
addressable (>= 2.3.6)
crack (>= 0.3.2)
hashdiff
websocket (1.2.8)
websocket-client-simple (0.3.0)
event_emitter
websocket
websocket-driver (0.6.5)
websocket-driver (0.7.1)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.2)
websocket-extensions (0.1.4)

PLATFORMS
ruby

DEPENDENCIES
bundler (~> 1.17.3)
faye-websocket (~> 0.10.7)
pry
rake (~> 10.0)
rspec (~> 3.3)
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ actually make it to SignalFx).
To send data through a HTTP proxy, set the environment variable `http_proxy`
with the proxy URL.

The SignalFlow client by default will use the proxy set in the `http_proxy`
envvar by default. To send SignalFlow websocket data through a separate proxy,
set the `proxy_url` keyword arg on the `client.signalflow` call.


### Sending multi-dimensional data

Reporting dimensions for the data is also optional, and can be
Expand Down
7 changes: 5 additions & 2 deletions lib/signalfx/signal_fx_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,11 @@ def send_event(event_type, event_category: EVENT_CATEGORIES[:USER_DEFINED],
#
# @return [SignalFlowClient] a newly instantiated client, configured with the
# api token and endpoints from this class
def signalflow
SignalFlowClient.new(@api_token, @stream_endpoint)
def signalflow(proxy_url: nil, debug: false)
if ENV["http_proxy"] and proxy_url == nil
proxy_url = ENV["http_proxy"]
end
SignalFlowClient.new(@api_token, @stream_endpoint, proxy_url: proxy_url, debug: debug)
end

protected
Expand Down
4 changes: 2 additions & 2 deletions lib/signalfx/signalflow/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
# our API reference for SignalFlow}. Hash keys will be symbols instead of
# strings.
class SignalFlowClient
def initialize(api_token, stream_endpoint)
@transport = SignalFlowWebsocketTransport.new(api_token, stream_endpoint)
def initialize(api_token, stream_endpoint, proxy_url = nil)
@transport = SignalFlowWebsocketTransport.new(api_token, stream_endpoint, proxy_url)
end

# Start a computation and attach to its output. If using WebSockets (the
Expand Down
95 changes: 67 additions & 28 deletions lib/signalfx/signalflow/websocket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

require 'json'
require 'thread'
require 'websocket-client-simple'
require 'faye/websocket'

require_relative './binary'
require_relative './channel'
require_relative './computation'

class WebsocketError < StandardError
def initialize(ws_err)
super ws_err.message
end
end


# A WebSocket transport for SignalFlow. This should not be used directly by
# end-users.
Expand All @@ -17,14 +23,17 @@ class SignalFlowWebsocketTransport
# A lower bound on the amount of time to wait for a computation to start
COMPUTATION_START_TIMEOUT_SECONDS = 30

def initialize(api_token, stream_endpoint, logger: Logger.new(STDOUT, progname: "signalfx"))
def initialize(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false)
@api_token = api_token
@stream_endpoint = stream_endpoint
@logger = logger
@compress = true
@proxy_url = proxy_url
@debug = debug

@lock = Mutex.new
@close_reason = nil
@last_error = nil
reinit
end

Expand Down Expand Up @@ -188,6 +197,9 @@ def send_msg(msg)
# The socket will be closed by the server if auth isn't successful
# within 5 seconds so no point in waiting longer
if Time.now - start_time > 5 || @close_reason
if @last_error
raise WebsocketError.new(@last_error)
end
raise "Could not authenticate to SignalFlow WebSocket: #{@close_reason}"
end
sleep 0.1
Expand All @@ -200,29 +212,43 @@ def send_msg(msg)
private :send_msg

def on_close(msg)
@close_reason = "(#{msg.code}, #{msg.data})"
if @debug
@logger.info("Websocket on_close: #{msg}")
end

@close_reason = "(#{msg.code}, #{msg.reason})"
@chan_callbacks.keys.each do |channel_name|
invoke_callback_for_channel({ :event => "CONNECTION_CLOSED" }, channel_name)
end

reinit
end

def on_error(e)
@logger.error("ERROR #{e.inspect}")
@last_error = e
end


def on_message(m)
begin
return if m.type == :ping
if m.type == :close
on_close(m)
return
end
if @debug
@logger.info("Websocket on_message: #{m}")
end

message_received(m.data, m.type == :text)
is_text = m.data.kind_of?(String)

begin
message_received(m.data, is_text)
rescue Exception => e
@logger.error("Error processing SignalFlow message: #{e.backtrace.first}: #{e.message} (#{e.class})")
end
end

def on_open
if @debug
@logger.info("Websocket on_open")
end

@ws.send({
:type => "authenticate",
:token => @api_token,
Expand All @@ -233,26 +259,38 @@ def on_open
# reactor.
def startup_client
this = self
WebSocket::Client::Simple.connect("#{@stream_endpoint}/v2/signalflow/connect",
# Verification is disabled by default so this is essential
{verify_mode: OpenSSL::SSL::VERIFY_PEER}) do |ws|
@ws = ws
ws.on :error do |e|
@logger.error("ERROR #{e.inspect}")
end

ws.on :close do |e|
this.on_close(e)
end
options = {
:tls => {
:verify_peer => true,
}
}
if @proxy_url
options[:proxy] = {
:origin => @proxy_url,
}
end
Thread.new {
EM.run {
@ws = Faye::WebSocket::Client.new("#{@stream_endpoint}/v2/signalflow/connect", [], options)
@ws.on :error do |e|
this.on_error(e)
end

ws.on :message do |m|
this.on_message(m)
end
@ws.on :close do |e|
this.on_close(e)
EM.stop_event_loop
end

ws.on :open do
this.on_open
end
end
@ws.on :message do |m|
this.on_message(m)
end

@ws.on :open do
this.on_open
end
}
}
end
private :startup_client

Expand Down Expand Up @@ -294,7 +332,8 @@ def parse_message(raw_msg, is_text)
if is_text
JSON.parse(raw_msg, {:symbolize_names => true})
else
BinaryMessageParser.parse(raw_msg)
# Convert the byte array to a string
BinaryMessageParser.parse(raw_msg.pack("c*"))
end
end
private :parse_message
Expand Down
8 changes: 5 additions & 3 deletions signalfx.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "webmock", "~> 2.3.1"
spec.add_development_dependency "thin", "~> 1.7"
spec.add_development_dependency "pry"
spec.add_development_dependency "faye-websocket", "~> 0.10.7"

# protobuf enforces this check but builds with a newer Ruby version so it's not enabled.
# Incorporating here to allow 2.2.0-1 users to successfully build and install signalfx.
active_support_max_version = "< 5" if Gem::Version.new(RUBY_VERSION) < Gem::Version.new("2.2.2")
active_support_max_version = Gem::Version.new(RUBY_VERSION) < Gem::Version.new("2.2.2") ? "<5" : "<6"
spec.add_dependency "activesupport", '>= 3.2', active_support_max_version

spec.add_dependency "protobuf", ">= 3.5.1"
spec.add_dependency "rest-client", "~> 2.0"
spec.add_dependency 'websocket-client-simple', "~> 0.3.0"
spec.add_dependency "faye-websocket", "~> 0.10.7"
spec.add_dependency "i18n", "= 1.1.0"
spec.add_dependency "thor", "= 0.20.0"

end
2 changes: 1 addition & 1 deletion spec/fake_signalflow/ssl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class SelfSignedCertificate
def initialize
# This randomly fails sometimes, probably due to lack of system entropy
begin
@key = OpenSSL::PKey::RSA.new(1024)
@key = OpenSSL::PKey::RSA.new(4096)
rescue OpenSSL::PKey::RSAError
sleep 0.1
retry
Expand Down
2 changes: 1 addition & 1 deletion spec/fake_signalflow/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def start_fake(host, port, use_ssl: false)

# Kill it with INT to give it a chance to cleanup, if that matters
[->(){
Process.kill("INT", server_pid)
Process.kill("TERM", server_pid)
}, reader]
end

Expand Down
2 changes: 1 addition & 1 deletion spec/signalflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def wait_for_messages(count=1, timeout=10, &block)
client = SignalFxClient.new 'GOOD_TOKEN', :stream_endpoint => "wss://#{host}:#{ssl_port}"
sf = client.signalflow()

expect{ sf.execute("data('cpu.utilization').publish()") }.to raise_error(OpenSSL::SSL::SSLError)
expect {sf.execute("data('cpu.utilization').publish()") }.to raise_error(WebsocketError)
end
end

Expand Down

0 comments on commit 4792490

Please sign in to comment.