-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpublisher.go
157 lines (126 loc) · 3.87 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package pubsub
import (
"context"
"time"
"github.com/gofrs/uuid"
"github.com/gorilla/websocket"
"github.com/MicahParks/websocket-pubsub/clients"
)
// publisher holds all the required information for a publisher to the pubsub system.
type publisher struct {
cancel context.CancelFunc
conn *websocket.Conn
ctx context.Context
events chan<- Event
done chan struct{}
name string
pongDeadline time.Duration
publish chan<- *websocket.PreparedMessage
topic string
}
// newPublisher creates a new publisher and launches the appropriate goroutines.
func newPublisher(ctx context.Context, name string, conn *websocket.Conn, publish chan<- *websocket.PreparedMessage, topic string, options ...directClientOptions) (pub *publisher) {
// Wrap the context in a cancel function.
pubCtx, cancel := context.WithCancel(ctx)
// Flatten the options.
option := flattenDirectClientOptions(options)
// Create the publisher.
pub = &publisher{
cancel: cancel,
conn: conn,
ctx: pubCtx,
done: make(chan struct{}),
events: option.events,
name: name + "-" + uuid.NewV4().String(),
pongDeadline: *option.pongDeadline,
publish: publish,
topic: topic,
}
// Close the client when the context expires.
go func() {
// Report the event when this function returns.
if pub.events != nil {
event := Event{
ClientID: pub.name,
SubscriptionTopic: pub.topic,
Type: EventTypePublisherLeft,
}
defer func() {
select {
case pub.events <- event:
case <-time.After(*option.closeDeadline):
}
}()
}
// Wait for the client's context to expire.
<-pubCtx.Done()
// Wait for the writer goroutines to exit.
select {
case <-pub.done:
case <-time.After(*option.closeDeadline):
}
// Close the publisher's websocket.
_ = clients.CloseWebSocket(pub.conn, pub.pongDeadline) // Ignore any error.
}()
// Launch the publisher's pumping goroutines.
go pub.readPump()
go pub.writePump()
return pub
}
// Cancel implements the canceller interface.
func (p *publisher) Cancel() {
p.cancel()
}
// readPump is the single goroutine for reading websocket messages from the publisher. It handles pings, pongs, and
// published messages.
func (p *publisher) readPump() {
// If this goroutine is the first to exit, close the publisher.
defer p.Cancel()
// Read the published messages and publish them to the subscription.
var err error
var messageType int
for {
// Read the next message from the publisher.
var message []byte
if messageType, message, err = p.conn.ReadMessage(); err != nil {
// End the goroutine.
break
}
// Turn the received message into a prepared message for easier & faster transport.
var prepared *websocket.PreparedMessage
if prepared, err = websocket.NewPreparedMessage(messageType, message); err != nil {
// End the goroutine.
break
}
// Publish the message to the subscribers.
select {
case p.publish <- prepared:
case <-p.ctx.Done():
return
}
}
}
// writePump is the single goroutine for writing messages to the publisher's websocket. It only pings.
func (p *publisher) writePump() {
// Use the done channel to confirm the closing of the websocket will not interrupt anything.
defer close(p.done)
// Create a ping ticker for pinging the publisher in regular intervals.
pingTicker := time.NewTicker(time.Duration(float64(p.pongDeadline) * pingRatio))
defer pingTicker.Stop()
// Write messages to the subscriber until it is closed.
var err error
for {
select {
// Ping the publisher when it's time to do so.
case <-pingTicker.C:
if err = sendPing(p.conn, p.pongDeadline); err != nil {
// If the subscriber can't be pinged, close it.
p.Cancel()
return
}
// Exit this goroutine when the subscriber has closed.
case <-p.ctx.Done():
return
}
}
}