@@ -35,14 +35,13 @@ import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser
35
35
import io.vertx.kotlin.coroutines.await
36
36
import kotlinx.coroutines.runBlocking
37
37
import spp.cli.PlatformCLI
38
- import spp.protocol.SourceMarkerServices
38
+ import spp.protocol.ProtocolMarshaller.deserializeLiveInstrumentRemoved
39
+ import spp.protocol.SourceServices.Provide.toLiveInstrumentSubscriberAddress
39
40
import spp.protocol.extend.TCPServiceFrameParser
40
- import spp.protocol.instrument.LiveInstrumentEvent
41
- import spp.protocol.instrument.LiveInstrumentEventType
42
- import spp.protocol.instrument.breakpoint.event.LiveBreakpointHit
43
- import spp.protocol.instrument.breakpoint.event.LiveBreakpointRemoved
44
- import spp.protocol.instrument.log.event.LiveLogHit
45
- import spp.protocol.instrument.log.event.LiveLogRemoved
41
+ import spp.protocol.instrument.event.LiveBreakpointHit
42
+ import spp.protocol.instrument.event.LiveInstrumentEvent
43
+ import spp.protocol.instrument.event.LiveInstrumentEventType
44
+ import spp.protocol.instrument.event.LiveLogHit
46
45
47
46
class SubscribeEvents : CliktCommand (
48
47
help = " Listens for and outputs live events. Subscribes to all events by default"
@@ -89,7 +88,7 @@ class SubscribeEvents : CliktCommand(
89
88
).await()
90
89
socket!! .handler(FrameParser (TCPServiceFrameParser (vertx, socket)))
91
90
92
- vertx.eventBus().consumer<JsonObject >(" local. " + SourceMarkerServices . Provide . LIVE_INSTRUMENT_SUBSCRIBER ) {
91
+ vertx.eventBus().consumer<JsonObject >(toLiveInstrumentSubscriberAddress( PlatformCLI .developer.id) ) {
93
92
val liveEvent = Json .decodeValue(it.body().toString(), LiveInstrumentEvent ::class .java)
94
93
95
94
// todo: impl filter on platform
@@ -107,15 +106,9 @@ class SubscribeEvents : CliktCommand(
107
106
return @consumer
108
107
}
109
108
}
110
- LiveInstrumentEventType .BREAKPOINT_REMOVED -> {
111
- val breakpointRemoved = Json .decodeValue(liveEvent.data, LiveBreakpointRemoved ::class .java)
112
- if (breakpointRemoved.breakpointId !in instrumentIds) {
113
- return @consumer
114
- }
115
- }
116
- LiveInstrumentEventType .LOG_REMOVED -> {
117
- val logRemoved = Json .decodeValue(liveEvent.data, LiveLogRemoved ::class .java)
118
- if (logRemoved.logId !in instrumentIds) {
109
+ LiveInstrumentEventType .BREAKPOINT_REMOVED , LiveInstrumentEventType .LOG_REMOVED -> {
110
+ val logRemoved = deserializeLiveInstrumentRemoved(JsonObject (liveEvent.data))
111
+ if (logRemoved.liveInstrument.id !in instrumentIds) {
119
112
return @consumer
120
113
}
121
114
}
@@ -164,9 +157,9 @@ class SubscribeEvents : CliktCommand(
164
157
// register listener
165
158
FrameHelper .sendFrame(
166
159
BridgeEventType .REGISTER .name.lowercase(),
167
- SourceMarkerServices . Provide . LIVE_INSTRUMENT_SUBSCRIBER ,
168
- JsonObject (),
169
- socket
160
+ toLiveInstrumentSubscriberAddress( PlatformCLI .developer.id), null ,
161
+ JsonObject (). apply { PlatformCLI .developer.accessToken?. let { put( " auth-token " , it) } } ,
162
+ null , null , socket
170
163
)
171
164
println (" Listening for events..." )
172
165
}
0 commit comments