Skip to content

Commit 1e354c3

Browse files
committed
Initial commit.
0 parents  commit 1e354c3

File tree

8 files changed

+900
-0
lines changed

8 files changed

+900
-0
lines changed

LICENSE

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2015 djoyahoy
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.
22+

README.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Go Stomp 1.2 Client
2+
===================
3+
TODO
4+
----
5+
1. Write documentation.
6+
2. Write tests.

client.go

+356
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
package stomp
2+
3+
import (
4+
"crypto/tls"
5+
"fmt"
6+
"io"
7+
"io/ioutil"
8+
"sync"
9+
"time"
10+
)
11+
12+
type receipts struct {
13+
closed chan struct{}
14+
orders map[string]chan struct{}
15+
lock *sync.Mutex
16+
}
17+
18+
func newReceipts() *receipts {
19+
return &receipts{
20+
closed: make(chan struct{}),
21+
orders: make(map[string]chan struct{}),
22+
lock: new(sync.Mutex),
23+
}
24+
}
25+
26+
func (r *receipts) Mark(id string) chan struct{} {
27+
r.lock.Lock()
28+
defer r.lock.Unlock()
29+
ch := make(chan struct{})
30+
r.orders[id] = ch
31+
return ch
32+
}
33+
34+
func (r *receipts) Clear(id string) {
35+
r.lock.Lock()
36+
defer r.lock.Unlock()
37+
ch, ok := r.orders[id]
38+
if ok {
39+
close(ch)
40+
delete(r.orders, id)
41+
}
42+
}
43+
44+
type receiptFunc func(rid string) error
45+
46+
func doWithReceipt(r *receipts, f receiptFunc) (err error) {
47+
id, err := newUUID()
48+
if err != nil {
49+
return err
50+
}
51+
52+
ch := r.Mark(id)
53+
defer func() {
54+
if err != nil {
55+
r.Clear(id)
56+
}
57+
}()
58+
59+
err = f(id)
60+
if err != nil {
61+
return err
62+
}
63+
64+
select {
65+
case <-ch:
66+
case <-r.closed:
67+
return fmt.Errorf("stomp: channel closed")
68+
}
69+
70+
return nil
71+
}
72+
73+
type Client struct {
74+
transport *Transport
75+
receipts *receipts
76+
MsgCh chan *Frame
77+
ErrCh chan *Frame
78+
}
79+
80+
func Connect(addr string, conf *Config, tr *TransportConfig) (*Client, error) {
81+
if conf == nil {
82+
conf = DefaultConfig
83+
}
84+
85+
if tr == nil {
86+
tr = DefaultTransportConfig
87+
}
88+
89+
// Create an underlying tcp connection. Use TLS if requested.
90+
conn, err := tr.Dial("tcp", addr)
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
if tr.TLSConfig != nil {
96+
tlsConn := tls.Client(conn, tr.TLSConfig)
97+
98+
errc := make(chan error, 2)
99+
var timer *time.Timer
100+
if d := tr.TLSHandshakeTimeout; d != 0 {
101+
timer = time.AfterFunc(d, func() {
102+
errc <- fmt.Errorf("stomp: tls handshake timed out")
103+
})
104+
}
105+
106+
go func() {
107+
err := tlsConn.Handshake()
108+
if timer != nil {
109+
timer.Stop()
110+
}
111+
errc <- err
112+
}()
113+
114+
if err := <-errc; err != nil {
115+
conn.Close()
116+
return nil, err
117+
}
118+
119+
conn = tlsConn
120+
}
121+
122+
req := NewFrame("CONNECT", nil)
123+
req.Headers["accept-version"] = Version
124+
if conf.Host != "" {
125+
req.Headers["host"] = conf.Host
126+
} else {
127+
req.Headers["host"] = "/"
128+
}
129+
if conf.Login != "" {
130+
req.Headers["login"] = conf.Login
131+
}
132+
if conf.Passcode != "" {
133+
req.Headers["passcode"] = conf.Passcode
134+
}
135+
req.Headers["heart-beat"] = conf.Heartbeat.toString()
136+
137+
err = NewEncoder(conn).Encode(req)
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
var resp Frame
143+
err = NewDecoder(conn).Decode(&resp)
144+
if err != nil {
145+
conn.Close()
146+
return nil, err
147+
}
148+
149+
if resp.Command != "CONNECTED" {
150+
defer conn.Close()
151+
152+
ct, ok := resp.Headers["content-type"]
153+
if !ok {
154+
return nil, fmt.Errorf("stomp: server response has no content-type")
155+
}
156+
if ct != "text/plain" {
157+
return nil, fmt.Errorf("stomp: server response has bad content-type %s", ct)
158+
}
159+
160+
buf, err := ioutil.ReadAll(resp.Body)
161+
if err != nil {
162+
return nil, err
163+
}
164+
return nil, fmt.Errorf("stomp: %s", string(buf))
165+
}
166+
167+
// Generate a heartbeat object based on the client and server requests.
168+
hb := Heartbeat{}
169+
v, ok := resp.Headers["heart-beat"]
170+
if ok {
171+
s, r := 0, 0
172+
fmt.Sscanf(v, "%d,%d", &s, &r)
173+
send := time.Millisecond * time.Duration(s)
174+
recv := time.Millisecond * time.Duration(r)
175+
if conf.Heartbeat.Send != 0 && recv != 0 {
176+
hb.Send = maxDuration(conf.Heartbeat.Send, recv)
177+
}
178+
if conf.Heartbeat.Recv != 0 && send != 0 {
179+
hb.Recv = maxDuration(conf.Heartbeat.Recv, send)
180+
}
181+
}
182+
183+
c := &Client{
184+
transport: NewTransport(conn),
185+
receipts: newReceipts(),
186+
MsgCh: make(chan *Frame),
187+
ErrCh: make(chan *Frame, 1),
188+
}
189+
go c.write(hb.Send)
190+
go c.read(hb.Recv)
191+
192+
return c, nil
193+
}
194+
195+
func (c *Client) write(d time.Duration) {
196+
if d <= 0 {
197+
return
198+
}
199+
for _ = range time.Tick(d) {
200+
err := c.transport.Heartbeat()
201+
if err != nil {
202+
return
203+
}
204+
}
205+
}
206+
207+
func (c *Client) read(d time.Duration) {
208+
loop:
209+
for {
210+
f, err := c.transport.Recv(d)
211+
if err != nil {
212+
break loop
213+
}
214+
215+
switch f.Command {
216+
case "HEARTBEAT":
217+
case "RECEIPT":
218+
id, ok := f.Headers["receipt-id"]
219+
if !ok {
220+
panic("stomp: received a receipt frame without an ID")
221+
}
222+
c.receipts.Clear(id)
223+
case "MESSAGE":
224+
c.MsgCh <- f
225+
case "ERROR":
226+
c.ErrCh <- f
227+
break loop
228+
default:
229+
panic(fmt.Sprintf("stomp: received unkown frame %s", f.Command))
230+
}
231+
}
232+
close(c.receipts.closed)
233+
close(c.MsgCh)
234+
}
235+
236+
func (c *Client) Disconnect() (err error) {
237+
defer c.transport.Close()
238+
239+
id, err := newUUID()
240+
if err != nil {
241+
return err
242+
}
243+
244+
ch := c.receipts.Mark(id)
245+
defer func() {
246+
if err != nil {
247+
c.receipts.Clear(id)
248+
}
249+
}()
250+
251+
err = c.transport.Disconnect(id)
252+
if err != nil {
253+
return err
254+
}
255+
256+
select {
257+
case <-ch:
258+
case <-c.receipts.closed:
259+
}
260+
261+
return nil
262+
}
263+
264+
func (c *Client) Send(dest string, hdrs *map[string]string, bodyType string, body io.Reader, receipt bool) error {
265+
if receipt {
266+
return doWithReceipt(c.receipts, func(rid string) error {
267+
return c.transport.Send(dest, hdrs, bodyType, body, &rid)
268+
})
269+
}
270+
return c.transport.Send(dest, hdrs, bodyType, body, nil)
271+
}
272+
273+
func (c *Client) Ack(id string, receipt bool) error {
274+
if receipt {
275+
return doWithReceipt(c.receipts, func(rid string) error {
276+
return c.transport.Ack(id, &rid)
277+
})
278+
}
279+
return c.transport.Ack(id, nil)
280+
}
281+
282+
func (c *Client) Nack(id string, receipt bool) error {
283+
if receipt {
284+
return doWithReceipt(c.receipts, func(rid string) error {
285+
return c.transport.Nack(id, &rid)
286+
})
287+
}
288+
return c.transport.Nack(id, nil)
289+
}
290+
291+
// AckMode defines a subscription ack mode.
292+
type AckMode string
293+
294+
const (
295+
// AutoMode defines STOMP 'auto' mode.
296+
AutoMode AckMode = "auto"
297+
298+
// ClientMode defines STOMP 'client' mode.
299+
ClientMode = "client"
300+
301+
// ClientIndividualMode defines STOMP 'client-individual' mode.
302+
ClientIndividualMode = "client-individual"
303+
)
304+
305+
func (c *Client) Subscribe(dest string, mode AckMode, receipt bool) (id string, err error) {
306+
id, err = newUUID()
307+
if err != nil {
308+
return "", err
309+
}
310+
311+
if receipt {
312+
err = doWithReceipt(c.receipts, func(rid string) error {
313+
return c.transport.Subscribe(id, dest, mode, &rid)
314+
})
315+
} else {
316+
err = c.transport.Subscribe(id, dest, mode, nil)
317+
}
318+
319+
return id, err
320+
}
321+
322+
func (c *Client) Unsubscribe(id string, receipt bool) (err error) {
323+
if receipt {
324+
return doWithReceipt(c.receipts, func(rid string) error {
325+
return c.transport.Unsubscribe(id, &rid)
326+
})
327+
}
328+
return c.transport.Unsubscribe(id, nil)
329+
}
330+
331+
func (c *Client) Begin(receipt bool) (tx *Tx, err error) {
332+
tid, err := newUUID()
333+
if err != nil {
334+
return nil, err
335+
}
336+
337+
if receipt {
338+
err = doWithReceipt(c.receipts, func(rid string) error {
339+
return c.transport.TxBegin(tid, &rid)
340+
})
341+
} else {
342+
err = c.transport.TxBegin(tid, nil)
343+
}
344+
345+
if err != nil {
346+
return nil, err
347+
}
348+
349+
tx = &Tx{
350+
tid: tid,
351+
done: false,
352+
receipts: c.receipts,
353+
transport: c.transport,
354+
}
355+
return tx, nil
356+
}

0 commit comments

Comments
 (0)