@@ -47,7 +47,7 @@ func (p *PubSub) handleNewStream(s network.Stream) {
4747 p .inboundStreamsMx .Lock ()
4848 other , dup := p .inboundStreams [peer ]
4949 if dup {
50- log . Debugf ("duplicate inbound stream from %s ; resetting other stream" , peer )
50+ p . logger . Debug ("duplicate inbound stream from; resetting other stream" , "peer " , peer )
5151 other .Reset ()
5252 }
5353 p .inboundStreams [peer ] = s
@@ -63,12 +63,16 @@ func (p *PubSub) handleNewStream(s network.Stream) {
6363
6464 r := msgio .NewVarintReaderSize (s , p .maxMessageSize )
6565 for {
66+ // Peek at the message length to know when we should mark the start time
67+ // for measuring how long it took to receive a message.
68+ _ , _ = r .NextMsgLen ()
69+ start := time .Now ()
6670 msgbytes , err := r .ReadMsg ()
6771 if err != nil {
6872 r .ReleaseMsg (msgbytes )
6973 if err != io .EOF {
7074 s .Reset ()
71- log . Debugf ("error reading rpc from %s: %s " , s .Conn ().RemotePeer (), err )
75+ p . rpcLogger . Debug ("error reading rpc" , " from" , s .Conn ().RemotePeer (), "err" , err )
7276 } else {
7377 // Just be nice. They probably won't read this
7478 // but it doesn't hurt to send it.
@@ -86,10 +90,14 @@ func (p *PubSub) handleNewStream(s network.Stream) {
8690 r .ReleaseMsg (msgbytes )
8791 if err != nil {
8892 s .Reset ()
89- log .Warnf ("bogus rpc from %s: %s" , s .Conn ().RemotePeer (), err )
93+
94+ p .rpcLogger .Warn ("bogus rpc from" , "peer" , s .Conn ().RemotePeer (), "err" , err )
9095 return
9196 }
9297
98+ timeToReceive := time .Since (start )
99+ p .rpcLogger .Debug ("received" , "peer" , s .Conn ().RemotePeer (), "duration_s" , timeToReceive .Seconds (), "rpc" , rpc )
100+
93101 rpc .from = peer
94102 select {
95103 case p .incoming <- rpc :
@@ -117,7 +125,7 @@ func (p *PubSub) notifyPeerDead(pid peer.ID) {
117125func (p * PubSub ) handleNewPeer (ctx context.Context , pid peer.ID , outgoing * rpcQueue ) {
118126 s , err := p .host .NewStream (p .ctx , pid , p .rt .Protocols ()... )
119127 if err != nil {
120- log . Debug ("opening new stream to peer: " , err , pid )
128+ p . logger . Debug ("error opening new stream to peer" , "err" , err , "peer" , pid )
121129
122130 select {
123131 case p .newPeerError <- pid :
@@ -149,7 +157,7 @@ func (p *PubSub) handlePeerDead(s network.Stream) {
149157
150158 _ , err := s .Read ([]byte {0 })
151159 if err == nil {
152- log . Debugf ("unexpected message from %s " , pid )
160+ p . logger . Debug ("unexpected message from peer" , "peer " , pid )
153161 }
154162
155163 s .Reset ()
@@ -170,21 +178,26 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
170178 }
171179
172180 _ , err = s .Write (buf )
173- return err
181+ if err != nil {
182+ p .rpcLogger .Debug ("failed to send message" , "peer" , s .Conn ().RemotePeer (), "rpc" , rpc , "err" , err )
183+ return err
184+ }
185+ p .rpcLogger .Debug ("sent" , "peer" , s .Conn ().RemotePeer (), "rpc" , rpc )
186+ return nil
174187 }
175188
176189 defer s .Close ()
177190 for ctx .Err () == nil {
178191 rpc , err := outgoing .Pop (ctx )
179192 if err != nil {
180- log . Debugf ( " popping message from the queue to send to %s: %s" , s .Conn ().RemotePeer (), err )
193+ p . logger . Debug ( "error popping message from the queue to send to peer" , "peer" , s .Conn ().RemotePeer (), "err" , err )
181194 return
182195 }
183196
184197 err = writeRpc (rpc )
185198 if err != nil {
186199 s .Reset ()
187- log . Debugf ( " writing message to %s: %s" , s .Conn ().RemotePeer (), err )
200+ p . logger . Debug ( "error writing message to peer" , "peer" , s .Conn ().RemotePeer (), "err" , err )
188201 return
189202 }
190203 }
0 commit comments