Skip to content

✨ [POC] Scale tests & Tracing #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bootstrap/kubeadm/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ spec:
- "--bootstrap-token-ttl=${KUBEADM_BOOTSTRAP_TOKEN_TTL:=15m}"
image: controller:latest
name: manager
env:
- name: GOMAXPROCS
value: "16"
ports:
- containerPort: 9440
name: healthz
Expand Down
2 changes: 2 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.uid
- name: GOMAXPROCS
value: "16"
ports:
- containerPort: 9440
name: healthz
Expand Down
6 changes: 6 additions & 0 deletions controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

oteltrace "go.opentelemetry.io/otel/trace"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -65,6 +66,7 @@ type MachineReconciler struct {
UnstructuredCachingClient client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker
TraceProvider oteltrace.TracerProvider

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand All @@ -79,6 +81,7 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
UnstructuredCachingClient: r.UnstructuredCachingClient,
APIReader: r.APIReader,
Tracker: r.Tracker,
TraceProvider: r.TraceProvider,
WatchFilterValue: r.WatchFilterValue,
NodeDrainClientTimeout: r.NodeDrainClientTimeout,
}).SetupWithManager(ctx, mgr, options)
Expand Down Expand Up @@ -150,6 +153,8 @@ type ClusterTopologyReconciler struct {

RuntimeClient runtimeclient.Client

TraceProvider oteltrace.TracerProvider

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

Expand All @@ -164,6 +169,7 @@ func (r *ClusterTopologyReconciler) SetupWithManager(ctx context.Context, mgr ct
APIReader: r.APIReader,
RuntimeClient: r.RuntimeClient,
UnstructuredCachingClient: r.UnstructuredCachingClient,
TraceProvider: r.TraceProvider,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}
Expand Down
7 changes: 7 additions & 0 deletions controllers/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
traceutil "sigs.k8s.io/cluster-api/internal/util/trace"
)

// Get uses the client and reference to get an external, unstructured object.
func Get(ctx context.Context, c client.Reader, ref *corev1.ObjectReference, namespace string) (*unstructured.Unstructured, error) {
ctx, span := traceutil.Start(ctx, "external.Get")
defer span.End()

if ref == nil {
return nil, errors.Errorf("cannot get object - object reference not set")
}
Expand All @@ -48,6 +52,9 @@ func Get(ctx context.Context, c client.Reader, ref *corev1.ObjectReference, name

// Delete uses the client and reference to delete an external, unstructured object.
func Delete(ctx context.Context, c client.Writer, ref *corev1.ObjectReference) error {
ctx, span := traceutil.Start(ctx, "external.Delete")
defer span.End()

obj := new(unstructured.Unstructured)
obj.SetAPIVersion(ref.APIVersion)
obj.SetKind(ref.Kind)
Expand Down
12 changes: 12 additions & 0 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/go-logr/logr"
"github.com/pkg/errors"
oteltrace "go.opentelemetry.io/otel/trace"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -97,6 +98,8 @@ type ClusterCacheTracker struct {
// This information will be used to detected if the controller is running on a workload cluster, so
// that we can then access the apiserver directly.
controllerPodMetadata *metav1.ObjectMeta

traceProvider oteltrace.TracerProvider
}

// ClusterCacheTrackerOptions defines options to configure
Expand All @@ -121,6 +124,8 @@ type ClusterCacheTrackerOptions struct {
// This is used to calculate the user agent string.
// If not set, it defaults to "cluster-cache-tracker".
ControllerName string

TraceProvider oteltrace.TracerProvider
}

func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
Expand All @@ -135,6 +140,10 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
&corev1.Secret{},
}
}

if opts.TraceProvider == nil {
opts.TraceProvider = oteltrace.NewNoopTracerProvider()
}
}

// NewClusterCacheTracker creates a new ClusterCacheTracker.
Expand Down Expand Up @@ -166,6 +175,7 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
controllerPodMetadata: controllerPodMetadata,
log: *options.Log,
clientUncachedObjects: options.ClientUncachedObjects,
traceProvider: options.TraceProvider,
client: manager.GetClient(),
secretCachingClient: options.SecretCachingClient,
scheme: manager.GetScheme(),
Expand Down Expand Up @@ -294,6 +304,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
if err != nil {
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}
// FIXME: this seems to lead to problems with spans (random 10s spans in the trace)
// config.Wrap(tracing.WrapperFor(t.traceProvider)) //nolint:gocritic

// Create a client and a cache for the cluster.
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
Expand Down
2 changes: 2 additions & 0 deletions controlplane/kubeadm/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.uid
- name: GOMAXPROCS
value: "16"
ports:
- containerPort: 9440
name: healthz
Expand Down
3 changes: 3 additions & 0 deletions controlplane/kubeadm/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/trace"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -33,6 +34,7 @@ type KubeadmControlPlaneReconciler struct {
Client client.Client
SecretCachingClient client.Client
Tracker *remote.ClusterCacheTracker
TraceProvider trace.TracerProvider

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration
Expand All @@ -47,6 +49,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
Client: r.Client,
SecretCachingClient: r.SecretCachingClient,
Tracker: r.Tracker,
TraceProvider: r.TraceProvider,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
WatchFilterValue: r.WatchFilterValue,
Expand Down
16 changes: 16 additions & 0 deletions controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
traceutil "sigs.k8s.io/cluster-api/internal/util/trace"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/secret"
)
Expand Down Expand Up @@ -84,11 +85,17 @@ func (m *Management) List(ctx context.Context, list client.ObjectList, opts ...c
// GetMachinesForCluster returns a list of machines that can be filtered or not.
// If no filter is supplied then all machines associated with the target cluster are returned.
func (m *Management) GetMachinesForCluster(ctx context.Context, cluster *clusterv1.Cluster, filters ...collections.Func) (collections.Machines, error) {
ctx, span := traceutil.Start(ctx, "Management.GetMachinesForCluster")
defer span.End()

return collections.GetFilteredMachinesForCluster(ctx, m.Client, cluster, filters...)
}

// GetMachinePoolsForCluster returns a list of machine pools owned by the cluster.
func (m *Management) GetMachinePoolsForCluster(ctx context.Context, cluster *clusterv1.Cluster) (*expv1.MachinePoolList, error) {
ctx, span := traceutil.Start(ctx, "Management.GetMachinesForCluster")
defer span.End()

selectors := []client.ListOption{
client.InNamespace(cluster.GetNamespace()),
client.MatchingLabels{
Expand All @@ -103,6 +110,9 @@ func (m *Management) GetMachinePoolsForCluster(ctx context.Context, cluster *clu
// GetWorkloadCluster builds a cluster object.
// The cluster comes with an etcd client generator to connect to any etcd pod living on a managed machine.
func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (WorkloadCluster, error) {
ctx, span := traceutil.Start(ctx, "Management.GetWorkloadCluster")
defer span.End()

// TODO(chuckha): Inject this dependency.
// TODO(chuckha): memoize this function. The workload client only exists as long as a reconciliation loop.
restConfig, err := m.Tracker.GetRESTConfig(ctx, clusterKey)
Expand Down Expand Up @@ -178,6 +188,9 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
}

func (m *Management) getEtcdCAKeyPair(ctx context.Context, clusterKey client.ObjectKey) ([]byte, []byte, error) {
ctx, span := traceutil.Start(ctx, "Management.getEtcdCAKeyPair")
defer span.End()

etcdCASecret := &corev1.Secret{}
etcdCAObjectKey := client.ObjectKey{
Namespace: clusterKey.Namespace,
Expand Down Expand Up @@ -207,6 +220,9 @@ func (m *Management) getEtcdCAKeyPair(ctx context.Context, clusterKey client.Obj
}

func (m *Management) getAPIServerEtcdClientCert(ctx context.Context, clusterKey client.ObjectKey) (tls.Certificate, error) {
ctx, span := traceutil.Start(ctx, "Management.getAPIServerEtcdClientCert")
defer span.End()

apiServerEtcdClientCertificateSecret := &corev1.Secret{}
apiServerEtcdClientCertificateObjectKey := client.ObjectKey{
Namespace: clusterKey.Namespace,
Expand Down
7 changes: 7 additions & 0 deletions controlplane/kubeadm/internal/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
traceutil "sigs.k8s.io/cluster-api/internal/util/trace"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/failuredomains"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -58,6 +59,9 @@ type ControlPlane struct {

// NewControlPlane returns an instantiated ControlPlane.
func NewControlPlane(ctx context.Context, managementCluster ManagementCluster, client client.Client, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ownedMachines collections.Machines) (*ControlPlane, error) {
ctx, span := traceutil.Start(ctx, "NewControlPlane")
defer span.End()

infraObjects, err := getInfraResources(ctx, client, ownedMachines)
if err != nil {
return nil, err
Expand Down Expand Up @@ -255,6 +259,9 @@ func (c *ControlPlane) HasUnhealthyMachine() bool {

// PatchMachines patches all the machines conditions.
func (c *ControlPlane) PatchMachines(ctx context.Context) error {
ctx, span := traceutil.Start(ctx, "ControlPlane.PatchMachines")
defer span.End()

errList := []error{}
for i := range c.Machines {
machine := c.Machines[i]
Expand Down
38 changes: 37 additions & 1 deletion controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/blang/semver"
"github.com/pkg/errors"
oteltrace "go.opentelemetry.io/otel/trace"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -47,6 +48,7 @@ import (
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/contract"
"sigs.k8s.io/cluster-api/internal/util/ssa"
traceutil "sigs.k8s.io/cluster-api/internal/util/trace"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
Expand Down Expand Up @@ -76,6 +78,7 @@ type KubeadmControlPlaneReconciler struct {
controller controller.Controller
recorder record.EventRecorder
Tracker *remote.ClusterCacheTracker
TraceProvider oteltrace.TracerProvider

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration
Expand All @@ -95,6 +98,10 @@ type KubeadmControlPlaneReconciler struct {
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
if r.TraceProvider == nil {
r.TraceProvider = oteltrace.NewNoopTracerProvider()
}
tr := traceutil.Reconciler(r, r.TraceProvider, "kubeadmcontrolplane", "KubeadmControlPlane")
c, err := ctrl.NewControllerManagedBy(mgr).
For(&controlplanev1.KubeadmControlPlane{}).
Owns(&clusterv1.Machine{}).
Expand All @@ -109,7 +116,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
),
),
).Build(r)
).Build(tr)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}
Expand Down Expand Up @@ -301,6 +308,9 @@ func (r *KubeadmControlPlaneReconciler) initControlPlaneScope(ctx context.Contex
}

func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kcp *controlplanev1.KubeadmControlPlane) error {
ctx, span := traceutil.Start(ctx, "patchKubeadmControlPlane")
defer span.End()

// Always update the readyCondition by summarizing the state of other conditions.
conditions.SetSummary(kcp,
conditions.WithConditions(
Expand Down Expand Up @@ -332,6 +342,9 @@ func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kc

// reconcile handles KubeadmControlPlane reconciliation.
func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPlane *internal.ControlPlane) (res ctrl.Result, reterr error) {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcile")
defer span.End()

log := ctrl.LoggerFrom(ctx)
log.Info("Reconcile KubeadmControlPlane")

Expand Down Expand Up @@ -507,6 +520,9 @@ func (r *KubeadmControlPlaneReconciler) reconcileClusterCertificates(ctx context
// The implementation does not take non-control plane workloads into consideration. This may or may not change in the future.
// Please see https://github.com/kubernetes-sigs/cluster-api/issues/2064.
func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcileDelete")
defer span.End()

log := ctrl.LoggerFrom(ctx)
log.Info("Reconcile KubeadmControlPlane deletion")

Expand Down Expand Up @@ -594,6 +610,9 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(_ context.C
// Otherwise, fields would be co-owned by our "old" "manager" and "capi-kubeadmcontrolplane" and then we would not be
// able to e.g. drop labels and annotations.
func (r *KubeadmControlPlaneReconciler) syncMachines(ctx context.Context, controlPlane *internal.ControlPlane) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.syncMachines")
defer span.End()

patchHelpers := map[string]*patch.Helper{}
for machineName := range controlPlane.Machines {
m := controlPlane.Machines[machineName]
Expand Down Expand Up @@ -677,6 +696,9 @@ func (r *KubeadmControlPlaneReconciler) syncMachines(ctx context.Context, contro
// reconcileControlPlaneConditions is responsible of reconciling conditions reporting the status of static pods and
// the status of the etcd cluster.
func (r *KubeadmControlPlaneReconciler) reconcileControlPlaneConditions(ctx context.Context, controlPlane *internal.ControlPlane) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcileControlPlaneConditions")
defer span.End()

// If the cluster is not yet initialized, there is no way to connect to the workload cluster and fetch information
// for updating conditions. Return early.
if !controlPlane.KCP.Status.Initialized {
Expand Down Expand Up @@ -706,6 +728,9 @@ func (r *KubeadmControlPlaneReconciler) reconcileControlPlaneConditions(ctx cont
//
// NOTE: this func uses KCP conditions, it is required to call reconcileControlPlaneConditions before this.
func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, controlPlane *internal.ControlPlane) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcileEtcdMembers")
defer span.End()

log := ctrl.LoggerFrom(ctx)

// If etcd is not managed by KCP this is a no-op.
Expand Down Expand Up @@ -758,6 +783,9 @@ func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context
}

func (r *KubeadmControlPlaneReconciler) reconcileCertificateExpiries(ctx context.Context, controlPlane *internal.ControlPlane) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcileCertificateExpiries")
defer span.End()

log := ctrl.LoggerFrom(ctx)

// Return if there are no KCP-owned control-plane machines.
Expand Down Expand Up @@ -828,6 +856,9 @@ func (r *KubeadmControlPlaneReconciler) reconcileCertificateExpiries(ctx context
}

func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, machines collections.Machines, cluster *clusterv1.Cluster) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.adoptMachines")
defer span.End()

// We do an uncached full quorum read against the KCP to avoid re-adopting Machines the garbage collector just intentionally orphaned
// See https://github.com/kubernetes/kubernetes/issues/42639
uncached := controlplanev1.KubeadmControlPlane{}
Expand Down Expand Up @@ -905,6 +936,9 @@ func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *
}

func (r *KubeadmControlPlaneReconciler) adoptOwnedSecrets(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, currentOwner *bootstrapv1.KubeadmConfig, clusterName string) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.adoptOwnedSecrets")
defer span.End()

secrets := corev1.SecretList{}
if err := r.Client.List(ctx, &secrets, client.InNamespace(kcp.Namespace), client.MatchingLabels{clusterv1.ClusterNameLabel: clusterName}); err != nil {
return errors.Wrap(err, "error finding secrets for adoption")
Expand Down Expand Up @@ -941,6 +975,8 @@ func (r *KubeadmControlPlaneReconciler) adoptOwnedSecrets(ctx context.Context, k

// ensureCertificatesOwnerRef ensures an ownerReference to the owner is added on the Secrets holding certificates.
func (r *KubeadmControlPlaneReconciler) ensureCertificatesOwnerRef(ctx context.Context, certificates secret.Certificates, owner metav1.OwnerReference) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.ensureCertificatesOwnerRef")
defer span.End()
for _, c := range certificates {
if c.Secret == nil {
continue
Expand Down
Loading