Skip to content

Commit

Permalink
Implementing SignalFlow v2 client
Browse files Browse the repository at this point in the history
This uses WebSockets only for now, although it should be relatively easy
to add the SSE HTTP interface.
  • Loading branch information
Ben Keith committed Aug 29, 2017
1 parent 6bcf049 commit 96e91a6
Show file tree
Hide file tree
Showing 17 changed files with 1,162 additions and 16 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
sudo: required
language: ruby
dist: trusty

rvm:
- 2.2.3
55 changes: 42 additions & 13 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,44 +1,58 @@
PATH
remote: .
specs:
signalfx (1.0.2)
signalfx (1.1.0)
protobuf (~> 3.5.1, >= 3.5.1)
rest-client (~> 2.0)
websocket-client-simple (~> 0.3.0)

GEM
remote: https://rubygems.org/
specs:
activesupport (5.0.0.1)
activesupport (5.1.3)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (~> 0.7)
minitest (~> 5.1)
tzinfo (~> 1.1)
addressable (2.4.0)
concurrent-ruby (1.0.2)
coderay (1.1.1)
concurrent-ruby (1.0.5)
crack (0.4.3)
safe_yaml (~> 1.0.0)
daemons (1.2.4)
diff-lcs (1.2.5)
docile (1.1.5)
domain_name (0.5.20160826)
domain_name (0.5.20170404)
unf (>= 0.0.5, < 1.0.0)
event_emitter (0.2.6)
eventmachine (1.2.5)
faye-websocket (0.10.7)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
hashdiff (0.3.0)
http-cookie (1.0.2)
http-cookie (1.0.3)
domain_name (~> 0.5)
i18n (0.7.0)
i18n (0.8.6)
json (1.8.3)
method_source (0.8.2)
middleware (0.1.0)
mime-types (3.1)
mime-types-data (~> 3.2015)
mime-types-data (3.2016.0521)
minitest (5.9.0)
minitest (5.10.3)
netrc (0.11.0)
protobuf (3.5.5)
activesupport (>= 3.2)
middleware
thor
thread_safe
pry (0.10.4)
coderay (~> 1.1.0)
method_source (~> 0.8.1)
slop (~> 3.4)
rack (2.0.3)
rake (10.4.2)
rest-client (2.0.0)
rest-client (2.0.2)
http-cookie (>= 1.0.2, < 2.0)
mime-types (>= 1.16, < 4.0)
netrc (~> 0.8)
Expand All @@ -61,28 +75,43 @@ GEM
json (~> 1.8)
simplecov-html (~> 0.10.0)
simplecov-html (0.10.0)
thor (0.19.1)
thread_safe (0.3.5)
tzinfo (1.2.2)
slop (3.6.0)
thin (1.7.2)
daemons (~> 1.0, >= 1.0.9)
eventmachine (~> 1.0, >= 1.0.4)
rack (>= 1, < 3)
thor (0.20.0)
thread_safe (0.3.6)
tzinfo (1.2.3)
thread_safe (~> 0.1)
unf (0.1.4)
unf_ext
unf_ext (0.0.7.2)
unf_ext (0.0.7.4)
webmock (2.1.0)
addressable (>= 2.3.6)
crack (>= 0.3.2)
hashdiff
websocket (1.2.4)
websocket-client-simple (0.3.0)
event_emitter
websocket
websocket-driver (0.6.5)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.2)

PLATFORMS
ruby

DEPENDENCIES
bundler (~> 1.10)
faye-websocket (~> 0.10.7)
pry
rake (~> 10.0)
rspec (~> 3.3)
signalfx!
simplecov
thin (~> 1.7)
webmock (~> 2.1)

BUNDLED WITH
1.13.1
1.15.4
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,44 @@ client.send_event(
See `examples/generic_usecase.rb` for a complete code example for
sending events.

### SignalFlow

You can run SignalFlow computations as well. This library supports all of the
functionality described in our [API docs for
SignalFlow](https://developers.signalfx.com/reference#signalflowconnect). Right
now, the only supported transport mechanism is WebSockets.

To create a new SignalFlow client instance from an existing SignalFx client:

```ruby
sf = client.signalflow()
```

To execute and attach to a SignalFlow computation, do the
following:

```ruby
sf.execute("data('cpu.utilization').mean(by='host').publish()").each_message do |msg, detach|
case msg[:type]
when "data"
process_datapoints(msg.timestamp, msg.data)
end

if done_processing
detach.call
end
end
```

For the full API see [the RubyDocs for the SignalFlow
client](http://www.rubydoc.info/github/signalfx/signalfx-ruby/master/SignalFlowClient/)
(the `sf` var above).

The messages passed into the `each_message*` blocks will be decoded forms of
what is described in [our API reference for
SignalFlow](https://developers.signalfx.com/v2/reference#information-messages-specification).
Hash keys will be symbols instead of strings.

## License

Apache Software License v2. Copyright © 2015-2016
Expand Down
2 changes: 2 additions & 0 deletions lib/signalfx/conf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module RbConfig
# Default Parameters
DEFAULT_INGEST_ENDPOINT = 'https://ingest.signalfx.com'
DEFAULT_API_ENDPOINT = 'https://api.signalfx.com'
DEFAULT_STREAM_ENDPOINT = 'wss://stream.signalfx.com'
DEFAULT_BATCH_SIZE = 300 # Will wait for this many requests before posting
DEFAULT_TIMEOUT = 1

Expand Down
14 changes: 14 additions & 0 deletions lib/signalfx/signal_fx_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative './version'
require_relative './conf'
require_relative './signalflow/client'

require 'net/http'
require 'uri'
Expand Down Expand Up @@ -30,12 +31,16 @@ class SignalFxClient
def initialize(api_token,
enable_aws_unique_id: false,
ingest_endpoint: RbConfig::DEFAULT_INGEST_ENDPOINT,
api_endpoint: RbConfig::DEFAULT_API_ENDPOINT,
stream_endpoint: RbConfig::DEFAULT_STREAM_ENDPOINT,
timeout: RbConfig::DEFAULT_TIMEOUT,
batch_size: RbConfig::DEFAULT_BATCH_SIZE,
user_agents: [])

@api_token = api_token
@ingest_endpoint = ingest_endpoint
@api_endpoint = api_endpoint
@stream_endpoint = stream_endpoint
@timeout = timeout
@batch_size = batch_size
@user_agents = user_agents
Expand Down Expand Up @@ -161,6 +166,15 @@ def send_event(event_type, event_category: EVENT_CATEGORIES[:USER_DEFINED],
post(build_event(data), @ingest_endpoint, EVENT_ENDPOINT_SUFFIX)
end

# Create a new SignalFlow client. A single client can execute multiple
# computations that will be multiplexed over the same WebSocket connection.
#
# @return [SignalFlowClient] a newly instantiated client, configured with the
# api token and endpoints from this class
def signalflow
SignalFlowClient.new(@api_token, @api_endpoint, @stream_endpoint)
end

protected

def get_queue
Expand Down
60 changes: 60 additions & 0 deletions lib/signalfx/signalflow/binary.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
require 'json'
require 'zlib'

# Converts binary websocket messages into a hash
module BinaryMessageParser
# data should be a raw string
def parse(data)
# See https://developers.signalfx.com/v2/reference#section-binary-encoding-of-websocket-messages
# Second var is message type but we don't care about it since it is a
# rather opaque integer that can be many things when compression is on.
version, _, flags, _, channel, payload = data.unpack("CCb8CZ16a*")
compressed = flags[0] == "1"
is_json = flags[1] == "1"

if version != 1
raise "Unsupported SignalFlow version #{version}"
end

if compressed
payload = Zlib::Inflate.new(16+Zlib::MAX_WBITS).inflate(payload)
end

message = is_json ?
JSON.parse(payload, {:symbolize_names => true}) :
parse_binary_payload(payload)

message.merge({:channel => channel})
end
module_function :parse

def parse_binary_payload(payload)
# See https://developers.signalfx.com/v2/reference#section-binary-encoding-used-for-the-websocket
timestamp, element_count, tuples_raw = payload.unpack("Q>L>a*")
tuple_hashes = (0..element_count-1).map do |i|
type, tsid, value_raw = tuples_raw[i*17..i*17+16].unpack("CQ>a8")

value = case type
when 1 # long
value_raw.unpack("q>")
when 2 # double
value_raw.unpack("G")
when 3 # int (32 bit)
value_raw.unpack("l>")
end

{
:timeseries_id => tsid,
:value => value[0],
}
end

{
:type => "data",
:timestampMs => timestamp,
:timestamp => Time.at(timestamp/1000),
:data => tuple_hashes,
}
end
module_function :parse_binary_payload
end
Loading

0 comments on commit 96e91a6

Please sign in to comment.