@@ -22,6 +22,7 @@ import io.vertx.kotlin.coroutines.CoroutineVerticle
22
22
import io.vertx.kotlin.coroutines.await
23
23
import io.vertx.kotlin.coroutines.dispatcher
24
24
import kotlinx.coroutines.launch
25
+ import org.slf4j.LoggerFactory
25
26
import spp.jetbrains.monitor.skywalking.SkywalkingClient
26
27
import spp.jetbrains.monitor.skywalking.SkywalkingClient.DurationStep
27
28
import spp.protocol.platform.general.Service
@@ -43,63 +44,58 @@ class ServiceBridge(
43
44
var activeServices: List <Service > = emptyList()
44
45
45
46
override suspend fun start () {
46
- vertx.setPeriodic(5000 ) { timerId ->
47
- launch(vertx.dispatcher()) {
48
- activeServices = skywalkingClient.run {
49
- getServices(getDuration(ZonedDateTime .now().minusMinutes(15 ), DurationStep .MINUTE ))
50
- }
51
-
52
- if (activeServices.isNotEmpty()) {
53
- vertx.cancelTimer(timerId)
54
- vertx.eventBus().publish(activeServicesUpdatedAddress, activeServices)
55
-
56
- if (initServiceName != null ) {
57
- currentService = activeServices.find { it.name == initServiceName }
47
+ if (! setCurrentService()) {
48
+ // periodically check for current service
49
+ log.info(" No current service set, starting periodic check for current service" )
50
+ vertx.setPeriodic(5000 ) { timerId ->
51
+ launch(vertx.dispatcher()) {
52
+ if (setCurrentService()) {
53
+ vertx.cancelTimer(timerId)
58
54
}
59
- if (currentService == null ) {
60
- currentService = activeServices[0 ]
61
- }
62
- vertx.eventBus().publish(currentServiceUpdatedAddress, currentService)
63
55
}
64
56
}
65
57
}
66
58
67
59
// async accessors
68
60
vertx.eventBus().localConsumer<Boolean >(getCurrentServiceAddress) { msg ->
69
- if (msg.body() && currentService == null ) {
70
- val consumer = currentServiceConsumer(vertx)
71
- if (currentService != null ) {
72
- consumer.unregister()
73
- msg.reply(currentService)
74
- } else {
75
- consumer.handler {
76
- msg.reply(it.body())
77
- consumer.unregister()
78
- }
79
- }
80
- } else {
81
- msg.reply(currentService)
61
+ if (currentService == null ) {
62
+ log.warn(" No current service set" )
82
63
}
64
+ msg.reply(currentService)
83
65
}
84
66
vertx.eventBus().localConsumer<Boolean >(getActiveServicesAddress) { msg ->
85
- if (msg.body() && activeServices.isEmpty()) {
86
- val consumer = activeServicesConsumer(vertx)
87
- if (activeServices.isNotEmpty()) {
88
- consumer.unregister()
89
- msg.reply(activeServices)
90
- } else {
91
- consumer.handler {
92
- msg.reply(it.body())
93
- consumer.unregister()
94
- }
95
- }
96
- } else {
97
- msg.reply(activeServices)
67
+ if (activeServices.isEmpty()) {
68
+ log.warn(" No active services set" )
98
69
}
70
+ msg.reply(activeServices)
99
71
}
100
72
}
101
73
74
+ private suspend fun setCurrentService (): Boolean {
75
+ activeServices = skywalkingClient.run {
76
+ getServices(getDuration(ZonedDateTime .now().minusMinutes(15 ), DurationStep .MINUTE ))
77
+ }
78
+
79
+ if (activeServices.isNotEmpty()) {
80
+ vertx.eventBus().publish(activeServicesUpdatedAddress, activeServices)
81
+
82
+ if (initServiceName != null ) {
83
+ currentService = activeServices.find { it.name == initServiceName }
84
+ currentService?.let { log.info(" Current service set to: {}" , it.name) }
85
+ }
86
+ if (currentService == null ) {
87
+ currentService = activeServices[0 ]
88
+ log.info(" Current service set to: {}" , currentService!! .name)
89
+ }
90
+ vertx.eventBus().publish(currentServiceUpdatedAddress, currentService)
91
+ return true
92
+ }
93
+ return false
94
+ }
95
+
102
96
companion object {
97
+ private val log = LoggerFactory .getLogger(ServiceBridge ::class .java)
98
+
103
99
private const val rootAddress = " monitor.skywalking.service"
104
100
private const val getCurrentServiceAddress = " $rootAddress .currentService"
105
101
private const val getActiveServicesAddress = " $rootAddress .activeServices"
@@ -114,7 +110,7 @@ class ServiceBridge(
114
110
return vertx.eventBus().localConsumer(activeServicesUpdatedAddress)
115
111
}
116
112
117
- suspend fun getCurrentService (vertx : Vertx ): Service {
113
+ suspend fun getCurrentService (vertx : Vertx ): Service ? {
118
114
return vertx.eventBus()
119
115
.request<Service >(getCurrentServiceAddress, true ).await().body()
120
116
}
0 commit comments