Skip to content

Commit 397ce0f

Browse files
committed
feat: add TTFS state metrics\n\nCloses #190
1 parent bd24903 commit 397ce0f

File tree

14 files changed

+270
-96
lines changed

14 files changed

+270
-96
lines changed

docs/docs/plugins/metrics.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ Each snapshot contains:
2424
inter-arrival times, bursts, buffer occupancy, plugin overhead.
2525
- **Actions** – sent/delivered/undelivered counts, ops/sec, delivery latency quantiles, queue time, buffer metrics,
2626
plugin overhead.
27-
- **State** – transition counts, vetoed transitions, reducer latency quantiles, throughput.
27+
- **State** – transition counts, vetoed transitions, started-in-initial-state, time-to-first state, reducer latency quantiles, throughput.
2828
- **Subscriptions** – subscribe/unsubscribe events, current/peak subscribers, average/median lifetimes, sampled counts.
2929
- **Lifecycle** – start/stop counters, total uptime, current/average/median lifetimes, bootstrap latency.
3030
- **Exceptions** – total/handled counts, recovery latency (average/median).
3131
- **Meta** – schema version, window length, EMA alpha, generated-at timestamp, start time, store name/id, run id.
3232

33-
Total: 66+ numeric metrics per snapshot.
33+
Total: 67+ numeric metrics per snapshot.
3434

3535
## Usage guide
3636

metrics/src/commonMain/kotlin/pro/respawn/flowmvi/metrics/MetricsCollector.kt

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ internal class MetricsCollector<S : MVIState, I : MVIIntent, A : MVIAction>(
112112
private val stateDurations = P2QuantileEstimator(Q50.value, Q90.value, Q95.value, Q99.value)
113113
private val stateTransitions = atomic(0L)
114114
private val stateVetoed = atomic(0L)
115+
private val startedInInitialState = atomic(false)
116+
private val timeToFirstStateStart = atomic<TimeMark?>(null)
117+
private val timeToFirstState = atomic<Duration?>(null)
115118

116119
private val subscribersMedian = P2QuantileEstimator(Q50.value)
117120
private val lifetimeMedian = P2QuantileEstimator(Q50.value)
@@ -176,6 +179,10 @@ internal class MetricsCollector<S : MVIState, I : MVIIntent, A : MVIAction>(
176179

177180
private suspend fun PipelineContext<S, I, A>.recordStart(child: StorePlugin<S, I, A>) {
178181
lastConfig.value = config
182+
val shouldTrackTtfs = states.value == config.initial
183+
startedInInitialState.value = shouldTrackTtfs
184+
timeToFirstState.value = null
185+
timeToFirstStateStart.value = if (shouldTrackTtfs) timeSource.markNow() else null
179186
firstStartAt.compareAndSet(null, clock.now())
180187
startCount.incrementAndGet()
181188
val startedAt = timeSource.markNow()
@@ -186,6 +193,7 @@ internal class MetricsCollector<S : MVIState, I : MVIIntent, A : MVIAction>(
186193

187194
private fun ShutdownContext<S, I, A>.recordStop(child: StorePlugin<S, I, A>, e: Exception?) {
188195
stopCount.incrementAndGet()
196+
timeToFirstStateStart.value = null
189197
currentStart.getAndSet(null)?.let { startedAt ->
190198
val runDurationMillis = startedAt.elapsedNow()
191199
uptimeTotalMillis.addAndGet(runDurationMillis.inWholeMilliseconds)
@@ -295,6 +303,7 @@ internal class MetricsCollector<S : MVIState, I : MVIIntent, A : MVIAction>(
295303
stateTransitions.incrementAndGet()
296304
val vetoed = result == null || result === old
297305
if (vetoed) stateVetoed.incrementAndGet()
306+
recordTimeToFirstState(result, config.initial)
298307
send(Event.StateProcessed(duration))
299308
return result
300309
}
@@ -325,16 +334,15 @@ internal class MetricsCollector<S : MVIState, I : MVIIntent, A : MVIAction>(
325334
}
326335

327336
private fun send(event: Event) {
328-
currentRun.value?.channel.let {
329-
// todo: cleanup to self-contained class
330-
if (it == null && currentlyDebuggable) throw UnrecoverableException(
331-
message = "Metrics were attempted to be sent outside the store lifecycle, " +
332-
"which should be impossible. If you detected this, please report to the maintainers. " +
333-
"This will not crash with debuggable=false"
334-
)
335-
// on release, just drop, don't crash
336-
it?.trySend(event)
337-
}
337+
val channel = currentRun.value?.channel
338+
// todo: cleanup to self-contained class
339+
if (channel == null && currentlyDebuggable) throw UnrecoverableException(
340+
message = "Metrics were attempted to be sent outside the store lifecycle, " +
341+
"which should be impossible. If you detected this, please report to the maintainers. " +
342+
"This will not crash with debuggable=false"
343+
)
344+
// on release, just drop, don't crash
345+
channel?.trySend(event)
338346
}
339347

340348
private fun updateIntentOccupancy() {
@@ -489,6 +497,8 @@ internal class MetricsCollector<S : MVIState, I : MVIIntent, A : MVIAction>(
489497
state = StateMetrics(
490498
transitions = stateTransitions.value,
491499
transitionsVetoed = stateVetoed.value,
500+
startedInInitialState = startedInInitialState.value,
501+
timeToFirstState = timeToFirstState.value,
492502
updateAvg = statePerf.averageTimeMillis.toDurationOrZero(),
493503
updateP50 = stateDurations.getQuantile(Q50.value).toDurationOrZero(),
494504
updateP90 = stateDurations.getQuantile(Q90.value).toDurationOrZero(),
@@ -547,6 +557,9 @@ internal class MetricsCollector<S : MVIState, I : MVIIntent, A : MVIAction>(
547557
actionInFlight.value = 0
548558
stateTransitions.value = 0
549559
stateVetoed.value = 0
560+
startedInInitialState.value = false
561+
timeToFirstStateStart.value = null
562+
timeToFirstState.value = null
550563
subscribersState.value = SubscribersState(
551564
events = 0,
552565
current = 0,
@@ -596,6 +609,14 @@ internal class MetricsCollector<S : MVIState, I : MVIIntent, A : MVIAction>(
596609
}
597610

598611
private val currentlyDebuggable: Boolean get() = lastConfig.value?.debuggable == true
612+
613+
private fun recordTimeToFirstState(result: S?, initial: S) {
614+
if (result == null || result == initial) return
615+
val start = timeToFirstStateStart.value ?: return
616+
if (timeToFirstState.compareAndSet(null, start.elapsedNow())) {
617+
timeToFirstStateStart.value = null
618+
}
619+
}
599620
}
600621

601622
private data class SubscribersState(

metrics/src/commonMain/kotlin/pro/respawn/flowmvi/metrics/api/MetricSurface.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ public sealed interface MetricSurface {
1616
public companion object {
1717

1818
public val V1: MetricSurface = V1Surface
19+
public val V1_1: MetricSurface = V11Surface
1920

2021
/**
2122
* Resolve a surface instance for the requested [version], falling back to the latest supported one.
2223
*/
2324
public fun fromVersion(version: MetricsSchemaVersion): MetricSurface = when (version) {
2425
MetricsSchemaVersion.V1_0 -> V1
25-
else -> V1 // fallback until newer surfaces are introduced
26+
MetricsSchemaVersion.V1_1 -> V1_1
27+
else -> V1_1 // fallback until newer surfaces are introduced
2628
}
2729
}
2830
}
@@ -31,3 +33,8 @@ private data object V1Surface : MetricSurface {
3133

3234
override val version: MetricsSchemaVersion = MetricsSchemaVersion.V1_0
3335
}
36+
37+
private data object V11Surface : MetricSurface {
38+
39+
override val version: MetricsSchemaVersion = MetricsSchemaVersion.V1_1
40+
}

metrics/src/commonMain/kotlin/pro/respawn/flowmvi/metrics/api/MetricsSchemaVersion.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ public data class MetricsSchemaVersion(
3636
/** Initial released schema. */
3737
public val V1_0: MetricsSchemaVersion = MetricsSchemaVersion(1, 0)
3838

39+
/** Adds time-to-first-state metric. */
40+
public val V1_1: MetricsSchemaVersion = MetricsSchemaVersion(1, 1)
41+
3942
/** The version used by newly produced snapshots. */
40-
public val CURRENT: MetricsSchemaVersion = V1_0
43+
public val CURRENT: MetricsSchemaVersion = V1_1
4144
}
4245
}

metrics/src/commonMain/kotlin/pro/respawn/flowmvi/metrics/api/MetricsSnapshot.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ public data class StateMetrics(
140140
val transitions: Long,
141141
/** State transitions vetoed or rolled back. */
142142
val transitionsVetoed: Long,
143+
/** Whether the current run started in the configured initial state. */
144+
val startedInInitialState: Boolean,
145+
/** Time to first non-initial state transition in the current run (if applicable). */
146+
val timeToFirstState: Duration?,
143147
/** Average reducer or state update duration. */
144148
val updateAvg: Duration,
145149
/** Median reducer or state update duration. */

metrics/src/commonMain/kotlin/pro/respawn/flowmvi/metrics/api/MetricsSnapshotExtensions.kt

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,18 @@ package pro.respawn.flowmvi.metrics.api
55
*
66
* Currently no-op aside from updating [Meta.schemaVersion]; future versions can drop/rename fields here.
77
*/
8-
public fun MetricsSnapshot.downgradeTo(target: MetricsSchemaVersion): MetricsSnapshot =
9-
if (target == meta.schemaVersion) this else copy(meta = meta.copy(schemaVersion = target))
8+
public fun MetricsSnapshot.downgradeTo(target: MetricsSchemaVersion): MetricsSnapshot {
9+
if (target == meta.schemaVersion) return this
10+
val downgradedState = if (target < MetricsSchemaVersion.V1_1) {
11+
state.copy(
12+
startedInInitialState = false,
13+
timeToFirstState = null
14+
)
15+
} else {
16+
state
17+
}
18+
return copy(
19+
meta = meta.copy(schemaVersion = target),
20+
state = downgradedState
21+
)
22+
}

metrics/src/commonMain/kotlin/pro/respawn/flowmvi/metrics/openmetrics/OpenMetricsSink.kt

Lines changed: 76 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -377,46 +377,79 @@ private fun actionMetrics(snapshot: MetricsSnapshot, base: Map<String, String>,
377377

378378
private fun stateMetrics(snapshot: MetricsSnapshot, base: Map<String, String>, timestampMillis: Long?): List<Metric> {
379379
val state = snapshot.state
380-
return listOf(
381-
counter(
382-
name = "state_transitions_total",
383-
help = "State transitions",
384-
value = state.transitions,
385-
base = base,
386-
timestampMillis = timestampMillis
387-
),
388-
counter(
389-
name = "state_transitions_vetoed_total",
390-
help = "Vetoed state transitions",
391-
value = state.transitionsVetoed,
392-
base = base,
393-
timestampMillis = timestampMillis
394-
),
395-
gauge(
396-
name = "state_update_seconds_avg",
397-
help = "Average state update",
398-
value = state.updateAvg.seconds(),
399-
base = base,
400-
timestampMillis = timestampMillis,
401-
unit = SECONDS_UNIT
402-
),
403-
quantileGauge(
404-
name = "state_update_seconds",
405-
help = "State update quantiles",
406-
quantiles = STATE_QUANTILES,
407-
source = state,
408-
base = base,
409-
timestampMillis = timestampMillis,
410-
unit = SECONDS_UNIT
411-
),
412-
gauge(
413-
name = "state_ops_per_second",
414-
help = "State transition throughput",
415-
value = state.opsPerSecond,
416-
base = base,
417-
timestampMillis = timestampMillis
418-
),
419-
)
380+
return buildList {
381+
add(
382+
counter(
383+
name = "state_transitions_total",
384+
help = "State transitions",
385+
value = state.transitions,
386+
base = base,
387+
timestampMillis = timestampMillis
388+
)
389+
)
390+
add(
391+
counter(
392+
name = "state_transitions_vetoed_total",
393+
help = "Vetoed state transitions",
394+
value = state.transitionsVetoed,
395+
base = base,
396+
timestampMillis = timestampMillis
397+
)
398+
)
399+
if (snapshot.meta.schemaVersion >= MetricsSchemaVersion.V1_1) {
400+
add(
401+
gauge(
402+
name = "state_started_in_initial_state",
403+
help = "Whether run started in initial state",
404+
value = if (state.startedInInitialState) 1.0 else 0.0,
405+
base = base,
406+
timestampMillis = timestampMillis
407+
)
408+
)
409+
val _ = state.timeToFirstState?.let { duration ->
410+
add(
411+
gauge(
412+
name = "state_time_to_first_state_seconds",
413+
help = "Time to first non-initial state",
414+
value = duration.seconds(),
415+
base = base,
416+
timestampMillis = timestampMillis,
417+
unit = SECONDS_UNIT
418+
)
419+
)
420+
}
421+
}
422+
add(
423+
gauge(
424+
name = "state_update_seconds_avg",
425+
help = "Average state update",
426+
value = state.updateAvg.seconds(),
427+
base = base,
428+
timestampMillis = timestampMillis,
429+
unit = SECONDS_UNIT
430+
)
431+
)
432+
add(
433+
quantileGauge(
434+
name = "state_update_seconds",
435+
help = "State update quantiles",
436+
quantiles = STATE_QUANTILES,
437+
source = state,
438+
base = base,
439+
timestampMillis = timestampMillis,
440+
unit = SECONDS_UNIT
441+
)
442+
)
443+
add(
444+
gauge(
445+
name = "state_ops_per_second",
446+
help = "State transition throughput",
447+
value = state.opsPerSecond,
448+
base = base,
449+
timestampMillis = timestampMillis
450+
)
451+
)
452+
}
420453
}
421454

422455
private fun subscriptionMetrics(
@@ -641,9 +674,9 @@ private fun <T> quantileGauge(
641674

642675
private fun baseLabels(snapshot: MetricsSnapshot): Map<String, String> = buildMap {
643676
put("schema_version", snapshot.meta.schemaVersion.value)
644-
snapshot.meta.storeName?.let { put("store", it) }
645-
snapshot.meta.storeId?.let { put("store_id", it.toString()) }
646-
snapshot.meta.runId?.let { put("run_id", it) }
677+
val _ = snapshot.meta.storeName?.let { put("store", it) }
678+
val _ = snapshot.meta.storeId?.let { put("store_id", it.toString()) }
679+
val _ = snapshot.meta.runId?.let { put("run_id", it) }
647680
}
648681

649682
private fun Duration.seconds(): Double = when {

0 commit comments

Comments
 (0)