From bb8dd2d229d6901eecf266ac1b984793519f9575 Mon Sep 17 00:00:00 2001 From: Leon Date: Wed, 20 Aug 2025 15:46:50 +0800 Subject: [PATCH] init --- .../transformer_component_workload_ops.go | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/controllers/apps/component/transformer_component_workload_ops.go b/controllers/apps/component/transformer_component_workload_ops.go index 547552a2abb..00c7be0dbff 100644 --- a/controllers/apps/component/transformer_component_workload_ops.go +++ b/controllers/apps/component/transformer_component_workload_ops.go @@ -264,6 +264,9 @@ func (r *componentWorkloadOps) buildDataReplicationTask() error { // replicas to be created newReplicas := r.desiredCompPodNameSet.Difference(r.runningItsPodNameSet).UnsortedList() + if len(newReplicas) == 0 { + return nil + } // replicas in provisioning that the data has not been loaded provisioningReplicas, err := component.GetReplicasStatusFunc(r.protoITS, func(s component.ReplicaStatus) bool { @@ -273,17 +276,13 @@ func (r *componentWorkloadOps) buildDataReplicationTask() error { return err } - replicas := append(slices.Clone(newReplicas), provisioningReplicas...) - if len(replicas) == 0 { - return nil - } - // the source replica - source, err := r.sourceReplica(r.synthesizeComp.LifecycleActions.DataDump) + source, err := r.sourceReplica(r.synthesizeComp.LifecycleActions.DataDump, provisioningReplicas) if err != nil { return err } + replicas := append(slices.Clone(newReplicas), provisioningReplicas...) parameters, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, replicas) if err != nil { return err @@ -298,16 +297,23 @@ func (r *componentWorkloadOps) buildDataReplicationTask() error { return createOrUpdateEnvConfigMap(transCtx, r.dag, parameters) } -func (r *componentWorkloadOps) sourceReplica(dataDump *appsv1.Action) (*corev1.Pod, error) { +func (r *componentWorkloadOps) sourceReplica(dataDump *appsv1.Action, provisioningReplicas []string) (*corev1.Pod, error) { pods, err := component.ListOwnedPods(r.transCtx.Context, r.cli, r.synthesizeComp.Namespace, r.synthesizeComp.ClusterName, r.synthesizeComp.Name) if err != nil { return nil, err } + if len(provisioningReplicas) > 0 { + // exclude provisioning replicas + pods = slices.DeleteFunc(pods, func(pod *corev1.Pod) bool { + return slices.Contains(provisioningReplicas, pod.Name) + }) + } if len(pods) > 0 { if len(dataDump.Exec.TargetPodSelector) == 0 { dataDump.Exec.TargetPodSelector = appsv1.AnyReplica } + // TODO: idempotence for provisioning replicas pods, err = lifecycle.SelectTargetPods(pods, nil, dataDump) if err != nil { return nil, err