@@ -86,24 +86,23 @@ type ErrorCallback func(err string)
8686// it must be created using the NewServer function.
8787type Server struct {
8888 impl Discovery
89- outputChan chan * message
90- outputWaiter sync.WaitGroup
9189 userAgent string
9290 reqProtocolVersion int
9391 initialized bool
9492 started bool
9593 syncStarted bool
9694 cachedPorts map [string ]* Port
9795 cachedErr string
96+ output io.Writer
97+ outputMutex sync.Mutex
9898}
9999
100100// NewServer creates a new discovery server backed by the
101101// provided pluggable discovery implementation. To start the server
102102// use the Run method.
103103func NewServer (impl Discovery ) * Server {
104104 return & Server {
105- impl : impl ,
106- outputChan : make (chan * message ),
105+ impl : impl ,
107106 }
108107}
109108
@@ -113,20 +112,20 @@ func NewServer(impl Discovery) *Server {
113112// the input stream is closed. In case of IO error the error is
114113// returned.
115114func (d * Server ) Run (in io.Reader , out io.Writer ) error {
116- d .startOutputProcessor ( out )
115+ d .output = out
117116 reader := bufio .NewReader (in )
118117 for {
119118 fullCmd , err := reader .ReadString ('\n' )
120119 if err != nil {
121- d .outputChan <- messageError ("command_error" , err .Error ())
120+ d .send ( messageError ("command_error" , err .Error () ))
122121 return err
123122 }
124123 fullCmd = strings .TrimSpace (fullCmd )
125124 split := strings .Split (fullCmd , " " )
126125 cmd := strings .ToUpper (split [0 ])
127126
128127 if ! d .initialized && cmd != "HELLO" && cmd != "QUIT" {
129- d .outputChan <- messageError ("command_error" , fmt .Sprintf ("First command must be HELLO, but got '%s'" , cmd ))
128+ d .send ( messageError ("command_error" , fmt .Sprintf ("First command must be HELLO, but got '%s'" , cmd ) ))
130129 continue
131130 }
132131
@@ -142,61 +141,62 @@ func (d *Server) Run(in io.Reader, out io.Writer) error {
142141 case "STOP" :
143142 d .stop ()
144143 case "QUIT" :
145- d .quit ()
144+ d .impl .Quit ()
145+ d .send (messageOk ("quit" ))
146146 return nil
147147 default :
148- d .outputChan <- messageError ("command_error" , fmt .Sprintf ("Command %s not supported" , cmd ))
148+ d .send ( messageError ("command_error" , fmt .Sprintf ("Command %s not supported" , cmd ) ))
149149 }
150150 }
151151}
152152
153153func (d * Server ) hello (cmd string ) {
154154 if d .initialized {
155- d .outputChan <- messageError ("hello" , "HELLO already called" )
155+ d .send ( messageError ("hello" , "HELLO already called" ) )
156156 return
157157 }
158158 re := regexp .MustCompile (`^(\d+) "([^"]+)"$` )
159159 matches := re .FindStringSubmatch (cmd )
160160 if len (matches ) != 3 {
161- d .outputChan <- messageError ("hello" , "Invalid HELLO command" )
161+ d .send ( messageError ("hello" , "Invalid HELLO command" ) )
162162 return
163163 }
164164 d .userAgent = matches [2 ]
165165 v , err := strconv .ParseInt (matches [1 ], 10 , 64 )
166166 if err != nil {
167- d .outputChan <- messageError ("hello" , "Invalid protocol version: " + matches [2 ])
167+ d .send ( messageError ("hello" , "Invalid protocol version: " + matches [2 ]) )
168168 return
169169 }
170170 d .reqProtocolVersion = int (v )
171171 if err := d .impl .Hello (d .userAgent , 1 ); err != nil {
172- d .outputChan <- messageError ("hello" , err .Error ())
172+ d .send ( messageError ("hello" , err .Error () ))
173173 return
174174 }
175- d .outputChan <- & message {
175+ d .send ( & message {
176176 EventType : "hello" ,
177177 ProtocolVersion : 1 , // Protocol version 1 is the only supported for now...
178178 Message : "OK" ,
179- }
179+ })
180180 d .initialized = true
181181}
182182
183183func (d * Server ) start () {
184184 if d .started {
185- d .outputChan <- messageError ("start" , "Discovery already STARTed" )
185+ d .send ( messageError ("start" , "Discovery already STARTed" ) )
186186 return
187187 }
188188 if d .syncStarted {
189- d .outputChan <- messageError ("start" , "Discovery already START_SYNCed, cannot START" )
189+ d .send ( messageError ("start" , "Discovery already START_SYNCed, cannot START" ) )
190190 return
191191 }
192192 d .cachedPorts = map [string ]* Port {}
193193 d .cachedErr = ""
194194 if err := d .impl .StartSync (d .eventCallback , d .errorCallback ); err != nil {
195- d .outputChan <- messageError ("start" , "Cannot START: " + err .Error ())
195+ d .send ( messageError ("start" , "Cannot START: " + err .Error () ))
196196 return
197197 }
198198 d .started = true
199- d .outputChan <- messageOk ("start" )
199+ d .send ( messageOk ("start" ) )
200200}
201201
202202func (d * Server ) eventCallback (event string , port * Port ) {
@@ -215,99 +215,84 @@ func (d *Server) errorCallback(msg string) {
215215
216216func (d * Server ) list () {
217217 if ! d .started {
218- d .outputChan <- messageError ("list" , "Discovery not STARTed" )
218+ d .send ( messageError ("list" , "Discovery not STARTed" ) )
219219 return
220220 }
221221 if d .syncStarted {
222- d .outputChan <- messageError ("list" , "discovery already START_SYNCed, LIST not allowed" )
222+ d .send ( messageError ("list" , "discovery already START_SYNCed, LIST not allowed" ) )
223223 return
224224 }
225225 if d .cachedErr != "" {
226- d .outputChan <- messageError ("list" , d .cachedErr )
226+ d .send ( messageError ("list" , d .cachedErr ) )
227227 return
228228 }
229229 ports := []* Port {}
230230 for _ , port := range d .cachedPorts {
231231 ports = append (ports , port )
232232 }
233- d .outputChan <- & message {
233+ d .send ( & message {
234234 EventType : "list" ,
235235 Ports : & ports ,
236- }
236+ })
237237}
238238
239239func (d * Server ) startSync () {
240240 if d .syncStarted {
241- d .outputChan <- messageError ("start_sync" , "Discovery already START_SYNCed" )
241+ d .send ( messageError ("start_sync" , "Discovery already START_SYNCed" ) )
242242 return
243243 }
244244 if d .started {
245- d .outputChan <- messageError ("start_sync" , "Discovery already STARTed, cannot START_SYNC" )
245+ d .send ( messageError ("start_sync" , "Discovery already STARTed, cannot START_SYNC" ) )
246246 return
247247 }
248248 if err := d .impl .StartSync (d .syncEvent , d .errorEvent ); err != nil {
249- d .outputChan <- messageError ("start_sync" , "Cannot START_SYNC: " + err .Error ())
249+ d .send ( messageError ("start_sync" , "Cannot START_SYNC: " + err .Error () ))
250250 return
251251 }
252252 d .syncStarted = true
253- d .outputChan <- messageOk ("start_sync" )
253+ d .send ( messageOk ("start_sync" ) )
254254}
255255
256256func (d * Server ) stop () {
257257 if ! d .syncStarted && ! d .started {
258- d .outputChan <- messageError ("stop" , "Discovery already STOPped" )
258+ d .send ( messageError ("stop" , "Discovery already STOPped" ) )
259259 return
260260 }
261261 if err := d .impl .Stop (); err != nil {
262- d .outputChan <- messageError ("stop" , "Cannot STOP: " + err .Error ())
262+ d .send ( messageError ("stop" , "Cannot STOP: " + err .Error () ))
263263 return
264264 }
265265 d .started = false
266266 if d .syncStarted {
267267 d .syncStarted = false
268268 }
269- d .outputChan <- messageOk ("stop" )
269+ d .send ( messageOk ("stop" ) )
270270}
271271
272272func (d * Server ) syncEvent (event string , port * Port ) {
273- d .outputChan <- & message {
273+ d .send ( & message {
274274 EventType : event ,
275275 Port : port ,
276- }
277- }
278-
279- func (d * Server ) quit () {
280- d .impl .Quit ()
281- d .outputChan <- messageOk ("quit" )
282- close (d .outputChan )
283- // If we don't wait for all messages
284- // to be consumed by the output processor
285- // we risk not printing the "quit" message.
286- // This may cause issues to consumers of
287- // the discovery since they expect a message
288- // that is never sent.
289- d .outputWaiter .Wait ()
276+ })
290277}
291278
292279func (d * Server ) errorEvent (msg string ) {
293- d .outputChan <- messageError ("start_sync" , msg )
280+ d .send ( messageError ("start_sync" , msg ) )
294281}
295282
296- func (d * Server ) startOutputProcessor (outWriter io.Writer ) {
297- // Start go routine to serialize messages printing
298- d .outputWaiter .Add (1 )
299- go func () {
300- for msg := range d .outputChan {
301- data , err := json .MarshalIndent (msg , "" , " " )
302- if err != nil {
303- // We are certain that this will be marshalled correctly
304- // so we don't handle the error
305- data , _ = json .MarshalIndent (messageError ("command_error" , err .Error ()), "" , " " )
306- }
307- fmt .Fprintln (outWriter , string (data ))
308- }
309- // We finished consuming all messages, now
310- // we can exit for real
311- d .outputWaiter .Done ()
312- }()
283+ func (d * Server ) send (msg * message ) {
284+ data , err := json .MarshalIndent (msg , "" , " " )
285+ if err != nil {
286+ // We are certain that this will be marshalled correctly
287+ // so we don't handle the error
288+ data , _ = json .MarshalIndent (messageError ("command_error" , err .Error ()), "" , " " )
289+ }
290+ data = append (data , '\n' )
291+
292+ d .outputMutex .Lock ()
293+ defer d .outputMutex .Unlock ()
294+ n , err := d .output .Write (data )
295+ if n != len (data ) || err != nil {
296+ panic ("ERROR" )
297+ }
313298}
0 commit comments