File tree Expand file tree Collapse file tree 2 files changed +22
-27
lines changed
Expand file tree Collapse file tree 2 files changed +22
-27
lines changed Original file line number Diff line number Diff line change @@ -61,3 +61,23 @@ func (s *subscriptionMap) Delete(subscriptionID string) {
6161 defer s .Unlock ()
6262 delete (s .map_ , subscriptionID )
6363}
64+
65+ func (s * subscriptionMap ) GetOrHandleCompletion (id string , subType string ) (* subscription , error ) {
66+ s .Lock ()
67+ defer s .Unlock ()
68+ sub , ok := s .map_ [id ]
69+ if ! ok {
70+ return nil , fmt .Errorf ("received message for unknown subscription ID '%s'" , id )
71+ }
72+ if sub .hasBeenUnsubscribed {
73+ return nil , nil
74+ }
75+ if subType == webSocketTypeComplete {
76+ sub .hasBeenUnsubscribed = true
77+ s .map_ [id ] = sub
78+ reflect .ValueOf (sub .interfaceChan ).Close ()
79+ return nil , nil
80+ }
81+
82+ return & sub , nil
83+ }
Original file line number Diff line number Diff line change 66 "encoding/json"
77 "fmt"
88 "net/http"
9- "reflect"
109 "strings"
1110 "sync"
1211 "time"
@@ -130,26 +129,6 @@ func (w *webSocketClient) listenWebSocket() {
130129 }
131130}
132131
133- func (w * webSocketClient ) getSubAndHandleCompletion (wsMsg webSocketReceiveMessage ) (* subscription , error ) {
134- w .subscriptions .Lock ()
135- defer w .subscriptions .Unlock ()
136- sub , success := w .subscriptions .map_ [wsMsg .ID ]
137- if ! success {
138- return nil , fmt .Errorf ("received message for unknown subscription ID '%s'" , wsMsg .ID )
139- }
140- if sub .hasBeenUnsubscribed {
141- return nil , nil
142- }
143- if wsMsg .Type == webSocketTypeComplete {
144- sub .hasBeenUnsubscribed = true
145- w .subscriptions .map_ [wsMsg .ID ] = sub
146- reflect .ValueOf (sub .interfaceChan ).Close ()
147- return nil , nil
148- }
149-
150- return & sub , nil
151- }
152-
153132func (w * webSocketClient ) forwardWebSocketData (message []byte ) error {
154133 var wsMsg webSocketReceiveMessage
155134 err := json .Unmarshal (message , & wsMsg )
@@ -159,14 +138,10 @@ func (w *webSocketClient) forwardWebSocketData(message []byte) error {
159138 if wsMsg .ID == "" { // e.g. keep-alive messages
160139 return nil
161140 }
162-
163- sub , err := w .getSubAndHandleCompletion (wsMsg )
164- if err != nil {
141+ sub , err := w .subscriptions .GetOrHandleCompletion (wsMsg .ID , wsMsg .Type )
142+ if err != nil || sub == nil {
165143 return err
166144 }
167- if sub == nil {
168- return nil
169- }
170145
171146 return sub .forwardDataFunc (sub .interfaceChan , wsMsg .Payload )
172147}
You can’t perform that action at this time.
0 commit comments