Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type (
serverFunc func(context.Context, *TimedMsg)
)

func runQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int, f qcFunc) (*Result, error) {
func runQCBenchmark(opts Options, cfg *gorums.Configuration, quorum int, f qcFunc) (*Result, error) {
rpcCfg := BenchmarkConfigurationRpc(cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msg := Echo_builder{Payload: make([]byte, opts.Payload)}.Build()
Expand All @@ -63,7 +64,7 @@ func runQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int, f qcF
}

if opts.Remote {
err := qf(cfg.StartBenchmark(ctx, &StartRequest{}), quorum)
err := qf(rpcCfg.StartBenchmark(ctx, &StartRequest{}), quorum)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,7 +94,7 @@ func runQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int, f qcF

result := s.GetResult()
if opts.Remote {
memStats, err := stopBenchmarkQF(cfg.StopBenchmark(ctx, &StopRequest{}), cfg.Size())
memStats, err := stopBenchmarkQF(rpcCfg.StopBenchmark(ctx, &StopRequest{}), cfg.Size())
if err != nil {
return nil, err
}
Expand All @@ -103,7 +104,8 @@ func runQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int, f qcF
return result, nil
}

func runAsyncQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int) (*Result, error) {
func runAsyncQCBenchmark(opts Options, cfg *gorums.Configuration, quorum int) (*Result, error) {
rpcCfg := BenchmarkConfigurationRpc(cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msg := Echo_builder{Payload: make([]byte, opts.Payload)}.Build()
Expand All @@ -116,7 +118,7 @@ func runAsyncQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int)
var warmupFunc func() error
warmupFunc = func() error {
for ; !time.Now().After(warmupEnd) && atomic.LoadUint64(&async) < uint64(opts.MaxAsync); atomic.AddUint64(&async, 1) {
responses := cfg.AsyncQuorumCall(ctx, msg)
responses := rpcCfg.AsyncQuorumCall(ctx, msg)
g.Go(func() error {
err := qf(responses, quorum)
if err != nil {
Expand All @@ -139,7 +141,7 @@ func runAsyncQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int)
}

if opts.Remote {
err := qf(cfg.StartBenchmark(ctx, &StartRequest{}), cfg.Size())
err := qf(rpcCfg.StartBenchmark(ctx, &StartRequest{}), cfg.Size())
if err != nil {
return nil, err
}
Expand All @@ -150,7 +152,7 @@ func runAsyncQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int)
benchmarkFunc = func() error {
for ; !time.Now().After(endTime) && atomic.LoadUint64(&async) < uint64(opts.MaxAsync); atomic.AddUint64(&async, 1) {
start := time.Now()
responses := cfg.AsyncQuorumCall(ctx, msg)
responses := rpcCfg.AsyncQuorumCall(ctx, msg)
g.Go(func() error {
err := qf(responses, quorum)
if err != nil {
Expand All @@ -177,7 +179,7 @@ func runAsyncQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int)

result := s.GetResult()
if opts.Remote {
memStats, err := stopBenchmarkQF(cfg.StopBenchmark(ctx, &StopRequest{}), cfg.Size())
memStats, err := stopBenchmarkQF(BenchmarkConfigurationRpc(cfg).StopBenchmark(ctx, &StopRequest{}), cfg.Size())
if err != nil {
return nil, err
}
Expand All @@ -187,7 +189,8 @@ func runAsyncQCBenchmark(opts Options, cfg *BenchmarkConfiguration, quorum int)
return result, nil
}

func runServerBenchmark(opts Options, cfg *BenchmarkConfiguration, f serverFunc) (*Result, error) {
func runServerBenchmark(opts Options, cfg *gorums.Configuration, f serverFunc) (*Result, error) {
rpcCfg := BenchmarkConfigurationRpc(cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
payload := make([]byte, opts.Payload)
Expand All @@ -211,7 +214,7 @@ func runServerBenchmark(opts Options, cfg *BenchmarkConfiguration, f serverFunc)
return nil, err
}

err = qf(cfg.StartServerBenchmark(ctx, &StartRequest{}), cfg.Size())
err = qf(rpcCfg.StartServerBenchmark(ctx, &StartRequest{}), cfg.Size())
if err != nil {
return nil, err
}
Expand All @@ -227,7 +230,7 @@ func runServerBenchmark(opts Options, cfg *BenchmarkConfiguration, f serverFunc)
}
runtime.ReadMemStats(&end)

resp, err := stopServerBenchmarkQF(cfg.StopServerBenchmark(ctx, &StopRequest{}), cfg.Size())
resp, err := stopServerBenchmarkQF(rpcCfg.StopServerBenchmark(ctx, &StopRequest{}), cfg.Size())
if err != nil {
return nil, err
}
Expand All @@ -241,12 +244,15 @@ func runServerBenchmark(opts Options, cfg *BenchmarkConfiguration, f serverFunc)
}

// GetBenchmarks returns a list of Benchmarks that can be performed on the configuration
func GetBenchmarks(cfg *BenchmarkConfiguration, quorum int) []Bench {
func GetBenchmarks(cfg *gorums.Configuration, quorum int) []Bench {
rpcCfg := BenchmarkConfigurationRpc(cfg)
m := []Bench{
{
Name: "QuorumCall",
Description: "NodeStream based quorum call implementation with FIFO ordering",
runBench: func(opts Options) (*Result, error) { return runQCBenchmark(opts, cfg, quorum, cfg.QuorumCall) },
runBench: func(opts Options) (*Result, error) {
return runQCBenchmark(opts, cfg, quorum, rpcCfg.QuorumCall)
},
},
{
Name: "AsyncQuorumCall",
Expand All @@ -256,21 +262,23 @@ func GetBenchmarks(cfg *BenchmarkConfiguration, quorum int) []Bench {
{
Name: "SlowServer",
Description: "Quorum Call with a 10s processing time on the server",
runBench: func(opts Options) (*Result, error) { return runQCBenchmark(opts, cfg, quorum, cfg.SlowServer) },
runBench: func(opts Options) (*Result, error) {
return runQCBenchmark(opts, cfg, quorum, rpcCfg.SlowServer)
},
},
{
Name: "Multicast",
Description: "NodeStream based multicast implementation (servers measure latency and throughput)",
runBench: func(opts Options) (*Result, error) {
return runServerBenchmark(opts, cfg, func(ctx context.Context, msg *TimedMsg) { cfg.Multicast(ctx, msg) })
return runServerBenchmark(opts, cfg, func(ctx context.Context, msg *TimedMsg) { rpcCfg.Multicast(ctx, msg) })
},
},
}
return m
}

// RunBenchmarks runs all the benchmarks that match the given regex with the given options
func RunBenchmarks(benchRegex *regexp.Regexp, options Options, cfg *BenchmarkConfiguration, quorum int) ([]*Result, error) {
func RunBenchmarks(benchRegex *regexp.Regexp, options Options, cfg *gorums.Configuration, quorum int) ([]*Result, error) {
benchmarks := GetBenchmarks(cfg, quorum)
var results []*Result
for _, b := range benchmarks {
Expand Down
130 changes: 35 additions & 95 deletions benchmark/benchmark_gorums.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading