@@ -93,41 +93,38 @@ trait WatchableTests[F[_], Resource <: { def metadata: Option[ObjectMeta] }]
9393 })
9494 def processEvent (
9595 received : Ref [F , Map [String , Set [EventType ]]],
96- signal : SignallingRef [F , Boolean ]
97- ): Pipe [F , Either [String , WatchEvent [Resource ]], Unit ] =
98- _.flatMap {
96+ signal : SignallingRef [F , Boolean ],
97+ event : Either [String , WatchEvent [Resource ]]
98+ ): F [Unit ] =
99+ event match {
99100 case Right (we) if isExpectedResource(we) =>
100- Stream .eval {
101- for {
102- _ <- received.update(events =>
103- we.`object`.metadata.flatMap(_.namespace) match {
104- case Some (namespace) =>
105- val updated = events.get(namespace) match {
106- case Some (namespaceEvents) => namespaceEvents + we.`type`
107- case _ => Set (we.`type`)
108- }
109- events.updated(namespace, updated)
110- case _ =>
111- val crdNamespace = " customresourcedefinition"
112- events.updated(crdNamespace, events.getOrElse(crdNamespace, Set .empty) + we.`type`)
113- }
114- )
115- allReceived <- received.get.map(_ == expected)
116- _ <- F .whenA(allReceived)(signal.set(true ))
117- } yield ()
118- }
119- case _ => Stream .eval(F .unit)
101+ for {
102+ _ <- received.update(events =>
103+ we.`object`.metadata.flatMap(_.namespace) match {
104+ case Some (namespace) =>
105+ val updated = events.get(namespace) match {
106+ case Some (namespaceEvents) => namespaceEvents + we.`type`
107+ case _ => Set (we.`type`)
108+ }
109+ events.updated(namespace, updated)
110+ case _ =>
111+ val crdNamespace = " customresourcedefinition"
112+ events.updated(crdNamespace, events.getOrElse(crdNamespace, Set .empty) + we.`type`)
113+ }
114+ )
115+ allReceived <- received.get.map(_ == expected)
116+ _ <- F .whenA(allReceived)(signal.set(true ))
117+ } yield ()
118+ case _ => F .unit
120119 }
121120
122121 val watchEvents = for {
123122 signal <- SignallingRef [F , Boolean ](false )
124123 receivedEvents <- Ref .of(Map .empty[String , Set [EventType ]])
125124 watchStream = watchingNamespace
126- .map(watchApi)
127- .getOrElse(api)
125+ .fold(api)(watchApi)
128126 .watch(resourceVersion = resourceVersion)
129- .through(processEvent(receivedEvents, signal))
130- .evalMap(_ => receivedEvents.get)
127+ .evalTap(processEvent(receivedEvents, signal, _))
131128 .interruptWhen(signal)
132129 _ <- watchStream.interruptAfter(60 .seconds).compile.drain
133130 events <- receivedEvents.get
0 commit comments