@@ -19,29 +19,24 @@ package spp.probe.control
19
19
20
20
import io.vertx.core.AbstractVerticle
21
21
import io.vertx.core.eventbus.Message
22
- import io.vertx.core.json.Json
23
22
import io.vertx.core.json.JsonObject
24
23
import io.vertx.ext.bridge.BridgeEventType
25
24
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper
26
25
import org.apache.skywalking.apm.agent.core.context.util.ThrowableTransformer
27
26
import org.apache.skywalking.apm.agent.core.plugin.WitnessFinder
28
27
import spp.probe.ProbeConfiguration
29
28
import spp.probe.SourceProbe
30
- import spp.protocol.instrument.LiveInstrument
31
- import spp.protocol.instrument.breakpoint.LiveBreakpoint
32
- import spp.protocol.instrument.log.LiveLog
33
- import spp.protocol.instrument.meter.LiveMeter
34
- import spp.protocol.instrument.span.LiveSpan
35
- import spp.protocol.platform.PlatformAddress
36
- import spp.protocol.probe.ProbeAddress
37
- import spp.protocol.probe.command.LiveInstrumentCommand
38
- import spp.protocol.probe.command.LiveInstrumentCommand.CommandType
29
+ import spp.protocol.ProtocolMarshaller
30
+ import spp.protocol.instrument.*
31
+ import spp.protocol.instrument.command.CommandType
32
+ import spp.protocol.instrument.command.LiveInstrumentCommand
33
+ import spp.protocol.platform.ProbeAddress
34
+ import spp.protocol.platform.ProcessorAddress
39
35
import java.lang.instrument.Instrumentation
40
36
import java.lang.reflect.InvocationTargetException
41
37
import java.lang.reflect.Method
42
38
import java.util.*
43
39
import java.util.function.BiConsumer
44
- import kotlin.reflect.KClass
45
40
46
41
class LiveInstrumentRemote : AbstractVerticle () {
47
42
@@ -99,84 +94,66 @@ class LiveInstrumentRemote : AbstractVerticle() {
99
94
e.printStackTrace()
100
95
throw RuntimeException (e)
101
96
}
102
- vertx.eventBus()
103
- .localConsumer<JsonObject >(" local." + ProbeAddress .LIVE_BREAKPOINT_REMOTE .address + " :" + SourceProbe .PROBE_ID )
104
- .handler { handleInstrumentationRequest(LiveBreakpoint ::class , it) }
105
- vertx.eventBus()
106
- .localConsumer<JsonObject >(" local." + ProbeAddress .LIVE_LOG_REMOTE .address + " :" + SourceProbe .PROBE_ID )
107
- .handler { handleInstrumentationRequest(LiveLog ::class , it) }
108
- vertx.eventBus()
109
- .localConsumer<JsonObject >(" local." + ProbeAddress .LIVE_METER_REMOTE .address + " :" + SourceProbe .PROBE_ID )
110
- .handler { handleInstrumentationRequest(LiveMeter ::class , it) }
111
- vertx.eventBus()
112
- .localConsumer<JsonObject >(" local." + ProbeAddress .LIVE_SPAN_REMOTE .address + " :" + SourceProbe .PROBE_ID )
113
- .handler { handleInstrumentationRequest(LiveSpan ::class , it) }
97
+
98
+ vertx.eventBus() // global instrument remote
99
+ .localConsumer<JsonObject >(ProbeAddress .LIVE_INSTRUMENT_REMOTE )
100
+ .handler { handleInstrumentationRequest(it) }
101
+ vertx.eventBus() // probe specific instrument remote
102
+ .localConsumer<JsonObject >(ProbeAddress .LIVE_INSTRUMENT_REMOTE + " :" + SourceProbe .PROBE_ID )
103
+ .handler { handleInstrumentationRequest(it) }
114
104
}
115
105
116
- private fun handleInstrumentationRequest (clazz : KClass < out LiveInstrument >, it : Message <JsonObject >) {
106
+ private fun handleInstrumentationRequest (it : Message <JsonObject >) {
117
107
try {
118
- val command = Json .decodeValue (it.body().toString(), LiveInstrumentCommand :: class .java )
108
+ val command = ProtocolMarshaller .deserializeLiveInstrumentCommand (it.body())
119
109
when (command.commandType) {
120
- CommandType .ADD_LIVE_INSTRUMENT -> addInstrument(clazz, command)
110
+ CommandType .ADD_LIVE_INSTRUMENT -> addInstrument(command)
121
111
CommandType .REMOVE_LIVE_INSTRUMENT -> removeInstrument(command)
122
112
}
123
113
} catch (ex: InvocationTargetException ) {
124
114
if (ex.cause != null ) {
125
- publishCommandError(it, ex.cause!! , clazz )
115
+ publishCommandError(it, ex.cause!! )
126
116
} else {
127
- publishCommandError(it, ex.targetException, clazz )
117
+ publishCommandError(it, ex.targetException)
128
118
}
129
119
} catch (ex: Throwable ) {
130
- publishCommandError(it, ex, clazz )
120
+ publishCommandError(it, ex)
131
121
}
132
122
}
133
123
134
- private fun publishCommandError (it : Message <JsonObject >, ex : Throwable , clazz : KClass < out LiveInstrument > ) {
124
+ private fun publishCommandError (it : Message <JsonObject >, ex : Throwable ) {
135
125
val map: MutableMap <String , Any > = HashMap ()
136
126
map[" command" ] = it.body().toString()
137
127
map[" occurredAt" ] = System .currentTimeMillis()
138
128
map[" cause" ] = ThrowableTransformer .INSTANCE .convert2String(ex, 4000 )
139
129
140
- val address = when (clazz) {
141
- LiveBreakpoint ::class -> PlatformAddress .LIVE_BREAKPOINT_REMOVED .address
142
- LiveLog ::class -> PlatformAddress .LIVE_LOG_REMOVED .address
143
- LiveMeter ::class -> PlatformAddress .LIVE_METER_REMOVED .address
144
- LiveSpan ::class -> PlatformAddress .LIVE_SPAN_REMOVED .address
145
- else -> throw IllegalArgumentException (" Unknown instrument: $clazz " )
146
- }
147
130
FrameHelper .sendFrame(
148
- BridgeEventType .PUBLISH .name.lowercase(), address, JsonObject .mapFrom(map), SourceProbe .tcpSocket
131
+ BridgeEventType .PUBLISH .name.lowercase(), ProcessorAddress .LIVE_INSTRUMENT_REMOVED ,
132
+ JsonObject .mapFrom(map), SourceProbe .tcpSocket
149
133
)
150
134
}
151
135
152
- private fun addInstrument (clazz : KClass < out LiveInstrument >, command : LiveInstrumentCommand ) {
136
+ private fun addInstrument (command : LiveInstrumentCommand ) {
153
137
if (ProbeConfiguration .isNotQuite) println (" Adding instrument: $command " )
154
- val instrumentData = command.context.liveInstruments[0 ]
155
- applyInstrument!! .invoke(null , Json .decodeValue(instrumentData, clazz.java))
138
+ applyInstrument!! .invoke(null , command.instruments.first()) // todo: check for multiple
156
139
}
157
140
158
141
private fun removeInstrument (command : LiveInstrumentCommand ) {
159
- for (breakpointData in command.context.liveInstruments) {
160
- val breakpointObject = JsonObject (breakpointData)
161
- val breakpointId = breakpointObject.getString(" id" )
162
- val location = breakpointObject.getJsonObject(" location" )
163
- val source = location.getString(" source" )
164
- val line = location.getInteger(" line" )
165
- removeInstrument!! .invoke(null , source, line, breakpointId)
142
+ for (breakpoint in command.instruments) {
143
+ val breakpointId = breakpoint.id
144
+ val location = breakpoint.location
145
+ removeInstrument!! .invoke(null , location.source, location.line, breakpointId)
166
146
}
167
- for (locationData in command.context.locations) {
168
- val location = JsonObject (locationData)
169
- val source = location.getString(" source" )
170
- val line = location.getInteger(" line" )
171
- removeInstrument!! .invoke(null , source, line, null )
147
+ for (location in command.locations) {
148
+ removeInstrument!! .invoke(null , location.source, location.line, null )
172
149
}
173
150
}
174
151
175
152
companion object {
176
153
private val EVENT_CONSUMER = BiConsumer (fun (address : String? , json : String? ) {
177
154
if (ProbeConfiguration .isNotQuite) println (" Publishing event: $address , $json " )
178
155
FrameHelper .sendFrame(
179
- BridgeEventType .PUBLISH .name.lowercase(Locale .getDefault() ),
156
+ BridgeEventType .PUBLISH .name.lowercase(),
180
157
address,
181
158
JsonObject (json),
182
159
SourceProbe .tcpSocket
0 commit comments