Skip to content

Commit 7e63969

Browse files
Include publish latency and avg. sub count per channel
1 parent 70bb893 commit 7e63969

File tree

1 file changed

+132
-39
lines changed

1 file changed

+132
-39
lines changed

subscriber.go

Lines changed: 132 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type testResult struct {
5858
Addresses []string `json:"Addresses"`
5959
}
6060

61-
func publisherRoutine(clientName string, channels []string, mode string, measureRTT bool, verbose bool, dataSize int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client, useLimiter bool, rateLimiter *rate.Limiter) {
61+
func publisherRoutine(clientName string, channels []string, mode string, measureRTT bool, verbose bool, dataSize int, ctx context.Context, wg *sync.WaitGroup, client *redis.Client, useLimiter bool, rateLimiter *rate.Limiter, publishLatencyChannel chan int64, subscriberCountChannel chan int64) {
6262
defer wg.Done()
6363

6464
if verbose {
@@ -112,15 +112,28 @@ func publisherRoutine(clientName string, channels []string, mode string, measure
112112
} else {
113113
msg = paddingPayload
114114
}
115+
116+
// Measure publish latency
117+
startPublish := time.Now().UnixNano()
118+
var subscriberCount int64
115119
var err error
116120
switch mode {
117121
case "spublish":
118-
err = client.SPublish(ctx, ch, msg).Err()
122+
subscriberCount, err = client.SPublish(ctx, ch, msg).Result()
119123
default:
120-
err = client.Publish(ctx, ch, msg).Err()
124+
subscriberCount, err = client.Publish(ctx, ch, msg).Result()
121125
}
126+
publishLatency := time.Now().UnixNano() - startPublish
127+
122128
if err != nil {
123129
log.Printf("Error publishing to channel %s: %v", ch, err)
130+
} else {
131+
// Send metrics to channels
132+
publishLatencyChannel <- publishLatency
133+
subscriberCountChannel <- subscriberCount
134+
if verbose {
135+
log.Printf("Published to %s: %d subscribers, latency: %d ns", ch, subscriberCount, publishLatency)
136+
}
124137
}
125138
atomic.AddUint64(&totalMessages, 1)
126139
}
@@ -414,7 +427,9 @@ func main() {
414427
}
415428
pprof.StartCPUProfile(f)
416429
}
417-
rttLatencyChannel := make(chan int64, 100000) // Channel for RTT measurements. buffer of 100K messages to process
430+
rttLatencyChannel := make(chan int64, 1000000) // Channel for RTT measurements. buffer of 1M messages to process
431+
publishLatencyChannel := make(chan int64, 1000000) // Channel for publish latency measurements
432+
subscriberCountChannel := make(chan int64, 1000000) // Channel for subscriber count tracking
418433
totalCreatedClients := 0
419434
if strings.Contains(*mode, "publish") {
420435
var requestRate = Inf
@@ -472,7 +487,7 @@ func main() {
472487
}
473488

474489
wg.Add(1)
475-
go publisherRoutine(publisherName, channels, *mode, *measureRTT, *verbose, *dataSize, ctx, &wg, client, useRateLimiter, rateLimiter)
490+
go publisherRoutine(publisherName, channels, *mode, *measureRTT, *verbose, *dataSize, ctx, &wg, client, useRateLimiter, rateLimiter, publishLatencyChannel, subscriberCountChannel)
476491
atomic.AddInt64(&totalPublishers, 1)
477492
atomic.AddUint64(&totalConnects, 1)
478493
}
@@ -548,7 +563,7 @@ func main() {
548563
w := new(tabwriter.Writer)
549564

550565
tick := time.NewTicker(time.Duration(*client_update_tick) * time.Second)
551-
closed, start_time, duration, totalMessages, messageRateTs, rttValues := updateCLI(tick, c, total_messages, w, *test_time, *measureRTT, *mode, rttLatencyChannel, *verbose)
566+
closed, start_time, duration, totalMessages, messageRateTs, rttValues, publishLatencyValues, subscriberCountValues := updateCLI(tick, c, total_messages, w, *test_time, *measureRTT, *mode, rttLatencyChannel, publishLatencyChannel, subscriberCountChannel, *verbose)
552567
messageRate := float64(totalMessages) / float64(duration.Seconds())
553568

554569
if *cpuprofile != "" {
@@ -558,22 +573,60 @@ func main() {
558573
fmt.Fprintf(w, "Mode: %s\n", *mode)
559574
fmt.Fprintf(w, "Total Duration: %f Seconds\n", duration.Seconds())
560575
fmt.Fprintf(w, "Message Rate: %f msg/sec\n", messageRate)
561-
if *measureRTT && (*mode != "publish" && *mode != "spublish") {
562-
hist := hdrhistogram.New(1, 10_000_000, 3) // 1us to 10s, 3 sig digits
563-
for _, rtt := range rttValues {
564-
_ = hist.RecordValue(rtt)
576+
577+
if strings.Contains(*mode, "publish") {
578+
// Publisher mode: show publish latency and subscriber count stats
579+
if len(publishLatencyValues) > 0 {
580+
hist := hdrhistogram.New(1, 10_000_000, 3) // 1ns to 10s, 3 sig digits
581+
for _, latency := range publishLatencyValues {
582+
_ = hist.RecordValue(latency)
583+
}
584+
avg := hist.Mean()
585+
p50 := hist.ValueAtQuantile(50.0)
586+
p95 := hist.ValueAtQuantile(95.0)
587+
p99 := hist.ValueAtQuantile(99.0)
588+
p999 := hist.ValueAtQuantile(99.9)
589+
fmt.Fprintf(w, "Avg Publish Latency %.3f ms\n", avg/1000000.0)
590+
fmt.Fprintf(w, "P50 Publish Latency %.3f ms\n", float64(p50)/1000000.0)
591+
fmt.Fprintf(w, "P95 Publish Latency %.3f ms\n", float64(p95)/1000000.0)
592+
fmt.Fprintf(w, "P99 Publish Latency %.3f ms\n", float64(p99)/1000000.0)
593+
fmt.Fprintf(w, "P999 Publish Latency %.3f ms\n", float64(p999)/1000000.0)
594+
}
595+
596+
if len(subscriberCountValues) > 0 {
597+
hist := hdrhistogram.New(0, 1_000_000, 3) // 0 to 1M subscribers, 3 sig digits
598+
for _, count := range subscriberCountValues {
599+
_ = hist.RecordValue(count)
600+
}
601+
avg := hist.Mean()
602+
p50 := hist.ValueAtQuantile(50.0)
603+
p95 := hist.ValueAtQuantile(95.0)
604+
p99 := hist.ValueAtQuantile(99.0)
605+
p999 := hist.ValueAtQuantile(99.9)
606+
fmt.Fprintf(w, "Avg Subscribers %.1f (per-node in cluster mode)\n", avg)
607+
fmt.Fprintf(w, "P50 Subscribers %d\n", p50)
608+
fmt.Fprintf(w, "P95 Subscribers %d\n", p95)
609+
fmt.Fprintf(w, "P99 Subscribers %d\n", p99)
610+
fmt.Fprintf(w, "P999 Subscribers %d\n", p999)
611+
}
612+
} else if *measureRTT {
613+
// Subscriber mode with RTT measurement
614+
if len(rttValues) > 0 {
615+
hist := hdrhistogram.New(1, 10_000_000, 3) // 1ns to 10s, 3 sig digits
616+
for _, rtt := range rttValues {
617+
_ = hist.RecordValue(rtt)
618+
}
619+
avg := hist.Mean()
620+
p50 := hist.ValueAtQuantile(50.0)
621+
p95 := hist.ValueAtQuantile(95.0)
622+
p99 := hist.ValueAtQuantile(99.0)
623+
p999 := hist.ValueAtQuantile(99.9)
624+
fmt.Fprintf(w, "Avg RTT %.3f ms\n", avg/1000000.0)
625+
fmt.Fprintf(w, "P50 RTT %.3f ms\n", float64(p50)/1000000.0)
626+
fmt.Fprintf(w, "P95 RTT %.3f ms\n", float64(p95)/1000000.0)
627+
fmt.Fprintf(w, "P99 RTT %.3f ms\n", float64(p99)/1000000.0)
628+
fmt.Fprintf(w, "P999 RTT %.3f ms\n", float64(p999)/1000000.0)
565629
}
566-
avg := hist.Mean()
567-
p50 := hist.ValueAtQuantile(50.0)
568-
p95 := hist.ValueAtQuantile(95.0)
569-
p99 := hist.ValueAtQuantile(99.0)
570-
p999 := hist.ValueAtQuantile(99.9)
571-
fmt.Fprintf(w, "Avg RTT %.3f ms\n", avg/1000000.0)
572-
fmt.Fprintf(w, "P50 RTT %.3f ms\n", float64(p50)/1000000.0)
573-
fmt.Fprintf(w, "P95 RTT %.3f ms\n", float64(p95)/1000000.0)
574-
fmt.Fprintf(w, "P99 RTT %.3f ms\n", float64(p99)/1000000.0)
575-
fmt.Fprintf(w, "P999 RTT %.3f ms\n", float64(p999)/1000000.0)
576-
} else {
577630
}
578631
fmt.Fprintf(w, "#################################################\n")
579632
fmt.Fprint(w, "\r\n")
@@ -656,8 +709,10 @@ func updateCLI(
656709
measureRTT bool,
657710
mode string,
658711
rttLatencyChannel chan int64,
712+
publishLatencyChannel chan int64,
713+
subscriberCountChannel chan int64,
659714
verbose bool,
660-
) (bool, time.Time, time.Duration, uint64, []float64, []int64) {
715+
) (bool, time.Time, time.Duration, uint64, []float64, []int64, []int64, []int64) {
661716

662717
start := time.Now()
663718
prevTime := time.Now()
@@ -666,27 +721,28 @@ func updateCLI(
666721
messageRateTs := []float64{}
667722
tickRttValues := []int64{}
668723
rttValues := []int64{}
724+
tickPublishLatencyValues := []int64{}
725+
publishLatencyValues := []int64{}
726+
tickSubscriberCountValues := []int64{}
727+
subscriberCountValues := []int64{}
669728

670729
w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight)
671730

672731
// Header
673-
if measureRTT {
674-
fmt.Fprint(w, "Test Time\tTotal Messages\t Message Rate \tConnect Rate \t")
732+
fmt.Fprint(w, "Test Time\tTotal Messages\t Message Rate \tConnect Rate \t")
675733

676-
if strings.Contains(mode, "subscribe") {
677-
fmt.Fprint(w, "Active subscriptions\t")
678-
} else {
679-
fmt.Fprint(w, "Active publishers\t")
734+
if strings.Contains(mode, "subscribe") {
735+
fmt.Fprint(w, "Active subscriptions\t")
736+
if measureRTT {
737+
fmt.Fprint(w, "Avg RTT (ms)\t")
680738
}
681-
fmt.Fprint(w, "Avg RTT (ms)\t\n")
682739
} else {
683-
fmt.Fprint(w, "Test Time\tTotal Messages\t Message Rate \tConnect Rate \t")
684-
if strings.Contains(mode, "subscribe") {
685-
fmt.Fprint(w, "Active subscriptions\t\n")
686-
} else {
687-
fmt.Fprint(w, "Active publishers\t\n")
688-
}
740+
// Publisher mode
741+
fmt.Fprint(w, "Active publishers\t")
742+
fmt.Fprint(w, "Pub Latency (ms)\t")
743+
fmt.Fprint(w, "Avg Subs per channel per node\t")
689744
}
745+
fmt.Fprint(w, "\n")
690746
w.Flush()
691747

692748
// Main loop
@@ -696,6 +752,14 @@ func updateCLI(
696752
rttValues = append(rttValues, rtt)
697753
tickRttValues = append(tickRttValues, rtt)
698754

755+
case publishLatency := <-publishLatencyChannel:
756+
publishLatencyValues = append(publishLatencyValues, publishLatency)
757+
tickPublishLatencyValues = append(tickPublishLatencyValues, publishLatency)
758+
759+
case subscriberCount := <-subscriberCountChannel:
760+
subscriberCountValues = append(subscriberCountValues, subscriberCount)
761+
tickSubscriberCountValues = append(tickSubscriberCountValues, subscriberCount)
762+
699763
case <-tick.C:
700764
now := time.Now()
701765
took := now.Sub(prevTime)
@@ -725,7 +789,7 @@ func updateCLI(
725789
if verbose {
726790
fmt.Printf("[DEBUG] Test time reached! Stopping after %.2f seconds\n", elapsed.Seconds())
727791
}
728-
return true, start, time.Since(start), totalMessages, messageRateTs, rttValues
792+
return true, start, time.Since(start), totalMessages, messageRateTs, rttValues, publishLatencyValues, subscriberCountValues
729793
}
730794
}
731795

@@ -738,7 +802,36 @@ func updateCLI(
738802
fmt.Fprintf(w, "%d\t", atomic.LoadInt64(&totalPublishers))
739803
}
740804

741-
if measureRTT {
805+
// For publisher mode, show publish latency instead of RTT
806+
if strings.Contains(mode, "publish") {
807+
var avgPublishLatencyMs float64
808+
if len(tickPublishLatencyValues) > 0 {
809+
var total int64
810+
for _, v := range tickPublishLatencyValues {
811+
total += v
812+
}
813+
avgPublishLatencyMs = float64(total) / float64(len(tickPublishLatencyValues)) / 1000000.0
814+
tickPublishLatencyValues = tickPublishLatencyValues[:0]
815+
fmt.Fprintf(w, "%.3f\t", avgPublishLatencyMs)
816+
} else {
817+
fmt.Fprintf(w, "--\t")
818+
}
819+
820+
// Show average subscriber count
821+
var avgSubscriberCount float64
822+
if len(tickSubscriberCountValues) > 0 {
823+
var total int64
824+
for _, v := range tickSubscriberCountValues {
825+
total += v
826+
}
827+
avgSubscriberCount = float64(total) / float64(len(tickSubscriberCountValues))
828+
tickSubscriberCountValues = tickSubscriberCountValues[:0]
829+
fmt.Fprintf(w, "%.1f\t", avgSubscriberCount)
830+
} else {
831+
fmt.Fprintf(w, "--\t")
832+
}
833+
} else if measureRTT {
834+
// For subscriber mode with RTT measurement
742835
var avgRTTms float64
743836
if len(tickRttValues) > 0 {
744837
var total int64
@@ -757,12 +850,12 @@ func updateCLI(
757850
w.Flush()
758851

759852
if message_limit > 0 && totalMessages >= uint64(message_limit) {
760-
return true, start, time.Since(start), totalMessages, messageRateTs, rttValues
853+
return true, start, time.Since(start), totalMessages, messageRateTs, rttValues, publishLatencyValues, subscriberCountValues
761854
}
762855

763856
case <-c:
764857
fmt.Println("received Ctrl-c - shutting down")
765-
return true, start, time.Since(start), totalMessages, messageRateTs, rttValues
858+
return true, start, time.Since(start), totalMessages, messageRateTs, rttValues, publishLatencyValues, subscriberCountValues
766859
}
767860
}
768861
}

0 commit comments

Comments
 (0)