@@ -58,7 +58,6 @@ func NewBESPipe(buildId, invocationId string) (BESPipeInterceptor, error) {
5858
5959 besBuildId : buildId ,
6060 besInvocationId : invocationId ,
61- streamCancels : make (map [besproxy.BESProxy ]context.CancelFunc ),
6261 wg : & sync.WaitGroup {},
6362 }, nil
6463}
@@ -72,7 +71,6 @@ type besPipe struct {
7271 besBuildId string
7372 besInvocationId string
7473 besProxies []besproxy.BESProxy
75- streamCancels map [besproxy.BESProxy ]context.CancelFunc
7674
7775 wg * sync.WaitGroup
7876}
@@ -92,14 +90,10 @@ func (bb *besPipe) RegisterBesProxy(ctx context.Context, p besproxy.BESProxy) {
9290
9391 bb .sendInitialLifecycleEvents (ctx , p )
9492
95- streamCtx , streamCancel := context .WithCancel (context .Background ())
96- bb .streamCancels [p ] = streamCancel
97-
98- err := p .PublishBuildToolEventStream (streamCtx , grpc .WaitForReady (false ))
93+ err := p .PublishBuildToolEventStream (ctx , grpc .WaitForReady (false ))
9994 if err != nil {
10095 // If we fail to create the build event stream to a proxy then print out an error but don't fail the GRPC call
10196 fmt .Fprintf (os .Stderr , "Error creating build event stream to %v: %s\n " , p .Host (), err .Error ())
102- streamCancel ()
10397 return
10498 }
10599
@@ -191,8 +185,6 @@ func (bb *besPipe) ServeWait(ctx context.Context) error {
191185 if err := p .CloseSend (); err != nil {
192186 fmt .Fprintf (os .Stderr , "Error closing build event stream to %v: %s\n " , p .Host (), err .Error ())
193187 }
194-
195- bb .streamCancels [p ]()
196188 }
197189 }()
198190 return nil
0 commit comments