Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions tests/antithesis/test-template/robustness/traffic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ func runTraffic(ctx context.Context, lg *zap.Logger, tf traffic.Traffic, hosts [
lg.Fatal("Failed empty database at start check", zap.Error(err))
}
maxRevisionChan := make(chan int64, 1)
watchConfig := client.WatchConfig{
RequestProgress: true,
}
g := errgroup.Group{}
startTime := time.Since(baseTime)
g.Go(func() error {
Expand All @@ -120,7 +117,6 @@ func runTraffic(ctx context.Context, lg *zap.Logger, tf traffic.Traffic, hosts [
Lg: lg,
Endpoints: hosts,
MaxRevisionChan: maxRevisionChan,
Cfg: watchConfig,
ClientSet: watchSet,
})
return err
Expand Down
15 changes: 5 additions & 10 deletions tests/robustness/client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"time"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -27,7 +28,6 @@ type CollectClusterWatchEventsParam struct {
Lg *zap.Logger
Endpoints []string
MaxRevisionChan <-chan int64
Cfg WatchConfig
ClientSet *ClientSet
}

Expand All @@ -43,7 +43,7 @@ func CollectClusterWatchEvents(ctx context.Context, param CollectClusterWatchEve
return err
}
defer c.Close()
return watchUntilRevision(ctx, param.Lg, c, memberMaxRevisionChan, param.Cfg)
return watchUntilRevision(ctx, param.Lg, c, memberMaxRevisionChan)
})
}
g.Go(func() error {
Expand All @@ -57,12 +57,8 @@ func CollectClusterWatchEvents(ctx context.Context, param CollectClusterWatchEve
return g.Wait()
}

type WatchConfig struct {
RequestProgress bool
}

// watchUntilRevision watches all changes until context is canceled, it has observed the revision provided via maxRevisionChan or maxRevisionChan was closed.
func watchUntilRevision(ctx context.Context, lg *zap.Logger, c *RecordingClient, maxRevisionChan <-chan int64, cfg WatchConfig) error {
func watchUntilRevision(ctx context.Context, lg *zap.Logger, c *RecordingClient, maxRevisionChan <-chan int64) error {
var maxRevision int64
var lastRevision int64 = 1
var closing bool
Expand Down Expand Up @@ -96,14 +92,13 @@ resetWatch:
cancel()
}
}
case <-time.After(100 * time.Millisecond):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: may consider making 100 as a const.

c.RequestProgress(ctx)
case resp, ok := <-watch:
if !ok {
lg.Info("Watch channel closed")
continue resetWatch
}
if cfg.RequestProgress {
c.RequestProgress(ctx)
}

if resp.Err() != nil {
if resp.Canceled {
Expand Down
1 change: 0 additions & 1 deletion tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func runScenario(ctx context.Context, t *testing.T, s scenarios.TestScenario, lg
Lg: lg,
Endpoints: endpoints,
MaxRevisionChan: maxRevisionChan,
Cfg: s.Watch,
ClientSet: watchSet,
})
return err
Expand Down
9 changes: 0 additions & 9 deletions tests/robustness/scenarios/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/failpoint"
"go.etcd.io/etcd/tests/v3/robustness/options"
"go.etcd.io/etcd/tests/v3/robustness/random"
Expand Down Expand Up @@ -84,7 +83,6 @@ type TestScenario struct {
Cluster e2e.EtcdProcessClusterConfig
Traffic traffic.Traffic
Profile traffic.Profile
Watch client.WatchConfig
}

func Exploratory(_ *testing.T) []TestScenario {
Expand Down Expand Up @@ -168,7 +166,6 @@ func Exploratory(_ *testing.T) []TestScenario {
Cluster: lazyfsCluster,
Traffic: s.Traffic,
Profile: profileWithoutCompaction,
Watch: s.Watch,
})
}
}
Expand Down Expand Up @@ -222,9 +219,6 @@ func Regression(t *testing.T) []TestScenario {
})
scenarios = append(scenarios, TestScenario{
Name: "Issue15220",
Watch: client.WatchConfig{
RequestProgress: true,
},
Profile: traffic.Profile{
KeyValue: &traffic.KeyValueMedium,
Watch: &traffic.WatchDefault,
Expand Down Expand Up @@ -311,9 +305,6 @@ func Regression(t *testing.T) []TestScenario {
scenarios = append(scenarios, TestScenario{
Name: "Issue20221",
Failpoint: failpoint.BlackholeUntilSnapshot,
Watch: client.WatchConfig{
RequestProgress: true,
},
Profile: traffic.Profile{
KeyValue: &traffic.KeyValueHigh,
Watch: &traffic.Watch{
Expand Down
Loading