Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GIT-2006: address inaccuracy in pod startup phase latency detection #2096

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 76 additions & 15 deletions clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ type eventData struct {
}

type podStartupLatencyMeasurement struct {
selector *measurementutil.ObjectSelector
isRunning bool
stopCh chan struct{}
selector *measurementutil.ObjectSelector
isPodProcessorRunning bool
podProcessorStopCh chan struct{}

isEventProcessorRunning bool
eventProcessorStopCh chan struct{}

// This queue can potentially grow indefinitely, beacause we put all changes here.
// Usually it's not recommended pattern, but we need it for measuring PodStartupLatency.
eventQueue *workqueue.Type
Expand Down Expand Up @@ -101,7 +105,15 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me
if err != nil {
return nil, err
}
return nil, p.start(config.ClusterFramework.GetClientSets().GetClient())
err = p.start(config.ClusterFramework.GetClientSets().GetClient())
if err != nil {
return nil, err
}
schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName)
if err != nil {
return nil, err
}
return nil, p.startEventProcessor(config.ClusterFramework.GetClientSets().GetClient(), schedulerName)
case "gather":
schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName)
if err != nil {
Expand All @@ -124,14 +136,61 @@ func (p *podStartupLatencyMeasurement) String() string {
return podStartupLatencyMeasurementName + ": " + p.selector.String()
}

func (p *podStartupLatencyMeasurement) startEventProcessor(c clientset.Interface, schedulerName string) error {
if p.isEventProcessorRunning {
klog.V(2).Infof("%s: pod event processor already running", p)
return nil
}

klog.V(2).Infof("%s: starting pod event processor...", p)
p.isEventProcessorRunning = true
p.eventProcessorStopCh = make(chan struct{})
selector := fields.Set{
"involvedObject.kind": "Pod",
"source": schedulerName,
}.AsSelector().String()
i := informer.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: selector,
})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return c.CoreV1().Events(p.selector.Namespace).Watch(context.TODO(), metav1.ListOptions{
LabelSelector: selector,
})
},
},
func(i1, i2 interface{}) {
if i1 != nil && i2 != nil {
return
}
if i2 == nil {
return
}
event := i1.(*corev1.Event)
key := createMetaNamespaceKey(event.InvolvedObject.Namespace, event.InvolvedObject.Name)
if _, exists := p.podStartupEntries.Get(key, createPhase); exists {
if !event.EventTime.IsZero() {
p.podStartupEntries.Set(key, schedulePhase, event.EventTime.Time)
} else {
p.podStartupEntries.Set(key, schedulePhase, event.FirstTimestamp.Time)
}
}
},
)
return informer.StartAndSync(i, p.eventProcessorStopCh, informerSyncTimeout)
}

func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
if p.isRunning {
if p.isPodProcessorRunning {
klog.V(2).Infof("%s: pod startup latancy measurement already running", p)
return nil
}
klog.V(2).Infof("%s: starting pod startup latency measurement...", p)
p.isRunning = true
p.stopCh = make(chan struct{})
p.isPodProcessorRunning = true
p.podProcessorStopCh = make(chan struct{})
i := informer.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
Expand All @@ -146,7 +205,7 @@ func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
p.addEvent,
)
go p.processEvents()
return informer.StartAndSync(i, p.stopCh, informerSyncTimeout)
return informer.StartAndSync(i, p.podProcessorStopCh, informerSyncTimeout)
}

func (p *podStartupLatencyMeasurement) addEvent(_, obj interface{}) {
Expand Down Expand Up @@ -176,9 +235,11 @@ func (p *podStartupLatencyMeasurement) processNextWorkItem() bool {
}

func (p *podStartupLatencyMeasurement) stop() {
if p.isRunning {
p.isRunning = false
close(p.stopCh)
if p.isPodProcessorRunning {
p.isPodProcessorRunning = false
p.isEventProcessorRunning = false
close(p.podProcessorStopCh)
close(p.eventProcessorStopCh)
p.eventQueue.ShutDown()
}
}
Expand Down Expand Up @@ -224,15 +285,15 @@ type podStartupLatencyCheck struct {

func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier string, schedulerName string) ([]measurement.Summary, error) {
klog.V(2).Infof("%s: gathering pod startup latency measurement...", p)
if !p.isRunning {
if !p.isPodProcessorRunning {
return nil, fmt.Errorf("metric %s has not been started", podStartupLatencyMeasurementName)
}

p.stop()

if err := p.gatherScheduleTimes(c, schedulerName); err != nil {
return nil, err
}
// if err := p.gatherScheduleTimes(c, schedulerName); err != nil {
// return nil, err
// }

checks := []podStartupLatencyCheck{
{
Expand Down