@@ -20,25 +20,73 @@ import (
2020 "golang.org/x/time/rate"
2121)
2222
23- func benchmarkRoutine (radixClient Client , ruedisClient rueidis.Client , useRuedis , useCSC , enableMultiExec bool , datapointsChan chan datapoint , continueOnError bool , cmdS [][]string , commandsCDF []float32 , keyspacelen , datasize , number_samples uint64 , loop bool , debug_level int , wg * sync.WaitGroup , keyplace , dataplace []int , readOnly []bool , useLimiter bool , rateLimiter * rate.Limiter , waitReplicas , waitReplicasMs int , cacheOptions * rueidis.CacheOptions ) {
23+ func benchmarkRoutine (radixClient Client , ruedisClient rueidis.Client , useRuedis , useCSC , enableMultiExec bool , datapointsChan chan datapoint , continueOnError bool , cmdS [][]string , commandsCDF []float32 , keyspacelen , datasize , number_samples uint64 , loop bool , debug_level int , wg * sync.WaitGroup , keyplace , dataplace []int , readOnly []bool , useLimiter bool , rateLimiter * rate.Limiter , waitReplicas , waitReplicasMs , pipelineSize int , cacheOptions * rueidis.CacheOptions ) {
2424
2525 defer wg .Done ()
26- for i := 0 ; uint64 (i ) < number_samples || loop ; i ++ {
27- cmdPos := sample (commandsCDF )
28- kplace := keyplace [cmdPos ]
29- dplace := dataplace [cmdPos ]
30- isReadOnly := readOnly [cmdPos ]
31- cmds := cmdS [cmdPos ]
32- newCmdS , key := keyBuildLogic (kplace , dplace , datasize , keyspacelen , cmds , charset )
33- if useLimiter {
34- r := rateLimiter .ReserveN (time .Now (), int (1 ))
35- time .Sleep (r .Delay ())
26+
27+ // Create per-goroutine random number generator to avoid lock contention
28+ rng := rand .New (rand .NewSource (time .Now ().UnixNano ()))
29+
30+ // Create per-goroutine data cache to avoid mutex contention
31+ dataCache := make (map [int ]string )
32+
33+ // Pipeline support for radix client only
34+ if ! useRuedis && pipelineSize > 1 {
35+ // Pipeline mode for radix client
36+ pipelineCommands := make ([][]string , 0 , pipelineSize )
37+ pipelineKeys := make ([]string , 0 , pipelineSize )
38+
39+ for i := 0 ; uint64 (i ) < number_samples || loop ; i ++ {
40+ cmdPos := sample (commandsCDF , rng )
41+ kplace := keyplace [cmdPos ]
42+ dplace := dataplace [cmdPos ]
43+ cmds := cmdS [cmdPos ]
44+ newCmdS , key := keyBuildLogic (kplace , dplace , datasize , keyspacelen , cmds , charset , rng , dataCache )
45+
46+ // Collect commands for pipeline
47+ pipelineCommands = append (pipelineCommands , newCmdS )
48+ pipelineKeys = append (pipelineKeys , key )
49+
50+ // When we have enough commands or reached the end, send the pipeline
51+ if len (pipelineCommands ) == pipelineSize || (uint64 (i + 1 ) >= number_samples && ! loop ) {
52+ if useLimiter {
53+ r := rateLimiter .ReserveN (time .Now (), len (pipelineCommands ))
54+ time .Sleep (r .Delay ())
55+ }
56+ sendCmdLogicRadixPipeline (radixClient , pipelineCommands , pipelineKeys , enableMultiExec , datapointsChan , continueOnError , debug_level , waitReplicas , waitReplicasMs )
57+
58+ // Reset pipeline
59+ pipelineCommands = pipelineCommands [:0 ]
60+ pipelineKeys = pipelineKeys [:0 ]
61+ }
3662 }
37- if useRuedis {
38- sendCmdLogicRuedis (ruedisClient , newCmdS , enableMultiExec , datapointsChan , continueOnError , debug_level , useCSC , isReadOnly , cacheOptions , waitReplicas , waitReplicasMs )
39- } else {
40- sendCmdLogicRadix (radixClient , newCmdS , enableMultiExec , key , datapointsChan , continueOnError , debug_level , waitReplicas , waitReplicasMs )
4163
64+ // Send any remaining commands in the pipeline
65+ if len (pipelineCommands ) > 0 {
66+ if useLimiter {
67+ r := rateLimiter .ReserveN (time .Now (), len (pipelineCommands ))
68+ time .Sleep (r .Delay ())
69+ }
70+ sendCmdLogicRadixPipeline (radixClient , pipelineCommands , pipelineKeys , enableMultiExec , datapointsChan , continueOnError , debug_level , waitReplicas , waitReplicasMs )
71+ }
72+ } else {
73+ // Original single command mode
74+ for i := 0 ; uint64 (i ) < number_samples || loop ; i ++ {
75+ cmdPos := sample (commandsCDF , rng )
76+ kplace := keyplace [cmdPos ]
77+ dplace := dataplace [cmdPos ]
78+ isReadOnly := readOnly [cmdPos ]
79+ cmds := cmdS [cmdPos ]
80+ newCmdS , key := keyBuildLogic (kplace , dplace , datasize , keyspacelen , cmds , charset , rng , dataCache )
81+ if useLimiter {
82+ r := rateLimiter .ReserveN (time .Now (), int (1 ))
83+ time .Sleep (r .Delay ())
84+ }
85+ if useRuedis {
86+ sendCmdLogicRuedis (ruedisClient , newCmdS , enableMultiExec , datapointsChan , continueOnError , debug_level , useCSC , isReadOnly , cacheOptions , waitReplicas , waitReplicasMs )
87+ } else {
88+ sendCmdLogicRadix (radixClient , newCmdS , enableMultiExec , key , datapointsChan , continueOnError , debug_level , waitReplicas , waitReplicasMs )
89+ }
4290 }
4391 }
4492}
@@ -166,6 +214,61 @@ func sendCmdLogicRadix(conn Client, newCmdS []string, enableMultiExec bool, key
166214 datapointsChan <- datapoint {! (err != nil ), duration .Microseconds (), cacheHit }
167215}
168216
217+ func sendCmdLogicRadixPipeline (conn Client , cmdsList [][]string , keys []string , enableMultiExec bool , datapointsChan chan datapoint , continueOnError bool , debug_level int , waitReplicas , waitReplicasMs int ) {
218+ ctx := context .Background ()
219+ cacheHit := false
220+ var err error
221+ startT := time .Now ()
222+
223+ if enableMultiExec {
224+ // For MULTI/EXEC, we need to handle each command individually
225+ // This is not ideal for pipelining, but maintains compatibility
226+ for i , newCmdS := range cmdsList {
227+ key := keys [i ]
228+ sendCmdLogicRadix (conn , newCmdS , enableMultiExec , key , datapointsChan , continueOnError , debug_level , waitReplicas , waitReplicasMs )
229+ }
230+ return
231+ }
232+
233+ // Create pipeline
234+ p := radix .NewPipeline ()
235+
236+ // Add all commands to pipeline
237+ for _ , newCmdS := range cmdsList {
238+ cmd := radix .Cmd (nil , newCmdS [0 ], newCmdS [1 :]... )
239+ p .Append (cmd )
240+ }
241+
242+ // Add WAIT commands if needed
243+ if waitReplicas > 0 {
244+ for range cmdsList {
245+ p .Append (radix .Cmd (nil , "WAIT" , fmt .Sprintf ("%d" , waitReplicas ), fmt .Sprintf ("%d" , waitReplicasMs )))
246+ }
247+ }
248+
249+ // Execute pipeline
250+ err = conn .Do (ctx , p )
251+ endT := time .Now ()
252+
253+ if err != nil {
254+ if continueOnError {
255+ if debug_level > 0 {
256+ log .Println (fmt .Sprintf ("Received an error with the following pipeline commands: %v, error: %v" , cmdsList , err ))
257+ }
258+ } else {
259+ log .Fatalf ("Received an error with the following pipeline commands: %v, error: %v" , cmdsList , err )
260+ }
261+ }
262+
263+ // Calculate duration and send datapoints for each command in the pipeline
264+ duration := endT .Sub (startT )
265+ //avgDurationPerCmd := duration.Microseconds() / int64(len(cmdsList))
266+
267+ for range cmdsList {
268+ datapointsChan <- datapoint {! (err != nil ), duration .Microseconds (), cacheHit }
269+ }
270+ }
271+
169272func onInvalidations (messages []rueidis.RedisMessage ) {
170273 if messages != nil {
171274 cscInvalidationMutex .Lock ()
@@ -292,6 +395,7 @@ func main() {
292395 continueonerror := flag .Bool ("continue-on-error" , false , "Output verbose info" )
293396 resp := flag .String ("resp" , "" , "redis command response protocol (2 - RESP 2, 3 - RESP 3). If empty will not enforce it." )
294397 nameserver := flag .String ("nameserver" , "" , "the IP address of the DNS name server. The IP address can be an IPv4 or an IPv6 address. If empty will use the default host namserver." )
398+ pipelineSize := flag .Int ("P" , 1 , "Pipeline <numreq> requests. Default 1 (no pipeline)." )
295399 flag .Var (& benchmarkCommands , "cmd" , "Specify a query to send in quotes. Each command that you specify is run with its ratio. For example:-cmd=\" SET __key__ __value__\" -cmd-ratio=1" )
296400 flag .Var (& benchmarkCommandsRatios , "cmd-ratio" , "The query ratio vs other queries used in the same benchmark. Each command that you specify is run with its ratio. For example: -cmd=\" SET __key__ __value__\" -cmd-ratio=0.8 -cmd=\" GET __key__\" -cmd-ratio=0.2" )
297401
@@ -429,14 +533,19 @@ func main() {
429533 }
430534 fmt .Printf ("Using random seed: %d\n " , * seed )
431535 rand .Seed (* seed )
536+ mainRng := rand .New (rand .NewSource (* seed ))
432537 var cluster * radix.Cluster
433538 var radixStandalone radix.Client
434539 var ruedisClient rueidis.Client
435540 var err error = nil
436541 datapointsChan := make (chan datapoint , * numberRequests )
542+
543+ // For radix client with pipelining, create shared connection pools
544+ var sharedRadixPools = make (map [string ]radix.Client )
545+
437546 for clientId := 1 ; uint64 (clientId ) <= * clients ; clientId ++ {
438547 wg .Add (1 )
439- connectionStr := fmt .Sprintf ("%s:%d" , ips [rand .Int63n (int64 (len (ips )))], * port )
548+ connectionStr := fmt .Sprintf ("%s:%d" , ips [mainRng .Int63n (int64 (len (ips )))], * port )
440549 if * verbose {
441550 fmt .Printf ("Using connection string %s for client %d\n " , connectionStr , clientId )
442551 }
@@ -490,15 +599,21 @@ func main() {
490599 if err != nil {
491600 panic (err )
492601 }
493- go benchmarkRoutine (radixStandalone , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , & cacheOptions )
602+ go benchmarkRoutine (radixStandalone , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , * pipelineSize , & cacheOptions )
494603 } else {
495- // legacy radix code
604+ // legacy radix code with shared connection pools for better pipeline performance
496605 if * clusterMode {
497606 cluster = getOSSClusterConn (connectionStr , opts , 1 )
498- go benchmarkRoutine (cluster , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , nil )
607+ go benchmarkRoutine (cluster , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , * pipelineSize , nil )
499608 } else {
500- radixStandalone = getStandaloneConn (connectionStr , opts , 1 )
501- go benchmarkRoutine (radixStandalone , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , nil )
609+ // Use shared connection pool for better pipeline performance
610+ if sharedRadixPools [connectionStr ] == nil {
611+ // Calculate optimal pool size based on pipeline size and clients
612+ poolSize := int (* clients )
613+ sharedRadixPools [connectionStr ] = getStandaloneConn (connectionStr , opts , uint64 (poolSize ))
614+ }
615+ radixStandalone = sharedRadixPools [connectionStr ]
616+ go benchmarkRoutine (radixStandalone , ruedisClient , * useRuedis , * cscEnabled , * multi , datapointsChan , * continueonerror , cmds , cdf , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , cmdKeyplaceHolderPos , cmdDataplaceHolderPos , cmdReadOnly , useRateLimiter , rateLimiter , * waitReplicas , * waitReplicasMs , * pipelineSize , nil )
502617 }
503618 }
504619
0 commit comments