Skip to content

Commit 040b538

Browse files
committed
feat: support for grow and shrink
This is the first fully working design (and example) for growing the cluster based on ensemble rules! Very awesome! Signed-off-by: vsoch <[email protected]>
1 parent 9e42739 commit 040b538

File tree

8 files changed

+213
-92
lines changed

8 files changed

+213
-92
lines changed

api/v1alpha1/ensemble_types.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ func (m *Member) Size() int32 {
102102
return 0
103103
}
104104

105-
//func (e *Ensemble) RequeueAfter() time.Duration {
106-
// return time.Duration(time.Duration(e.Spec.CheckSeconds) * time.Second)
107-
//}
105+
func (e *Ensemble) ServiceName() string {
106+
return fmt.Sprintf("%s-grpc", e.Name)
107+
}
108108

109109
// Validate ensures we have data that is needed, and sets defaults if needed
110110
func (e *Ensemble) Validate() error {

controllers/ensemble/api.go

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ import (
2020
)
2121

2222
// getDeploymentAddress gets the address of the deployment
23-
// Note that this assumes we have one container running (and one address)
24-
func (r *EnsembleReconciler) getServiceAddress(
23+
func (r *EnsembleReconciler) getDeploymentAddress(
2524
ctx context.Context,
2625
ensemble *api.Ensemble,
27-
name string,
2826
) (string, error) {
2927

3028
// The MiniCluster service is being provided by the index 0 pod, so we can find it here.
@@ -73,6 +71,44 @@ func (r *EnsembleReconciler) getServiceAddress(
7371
return ipAddress, nil
7472
}
7573

74+
// getServiceAddress gets the service ClusterIP serving the grpc endpoint
75+
func (r *EnsembleReconciler) getServiceAddress(
76+
ctx context.Context,
77+
ensemble *api.Ensemble,
78+
) (string, error) {
79+
80+
// The MiniCluster service is being provided by the index 0 pod, so we can find it here.
81+
clientset, err := kubernetes.NewForConfig(r.RESTConfig)
82+
if err != nil {
83+
return "", err
84+
}
85+
86+
// List all services with this name (just the one!)
87+
services, err := clientset.CoreV1().Services(ensemble.Namespace).List(
88+
ctx,
89+
metav1.ListOptions{
90+
FieldSelector: "metadata.name=" + ensemble.ServiceName(),
91+
},
92+
)
93+
if err != nil {
94+
return "", err
95+
}
96+
97+
// Get the ip address of the first (only for now) pod
98+
var ipAddress string
99+
for _, svc := range services.Items {
100+
ipAddress = svc.Spec.ClusterIP
101+
break
102+
}
103+
104+
// If we don't have an ip address yet, try again later
105+
if ipAddress == "" {
106+
fmt.Println(" No grpc services found")
107+
return "", fmt.Errorf("no grpc services found, not ready yet")
108+
}
109+
return ipAddress, nil
110+
}
111+
76112
func (r *EnsembleReconciler) createServiceAccount(
77113
ctx context.Context,
78114
ensemble *api.Ensemble,
@@ -146,10 +182,11 @@ func (r *EnsembleReconciler) createRole(
146182
{
147183
APIGroups: []string{"flux-framework.org"},
148184
Resources: []string{"miniclusters"},
149-
Verbs: []string{"get", "list", "create", "update", "delete"},
185+
Verbs: []string{"get", "list", "create", "update", "delete", "patch"},
150186
},
151187
},
152188
}
189+
153190
ctrl.SetControllerReference(ensemble, role, r.Scheme)
154191
err = r.Create(ctx, role)
155192
if err != nil {
@@ -212,37 +249,42 @@ func (r *EnsembleReconciler) createRoleBinding(
212249

213250
}
214251

252+
// createService creates the service for the grpc
253+
// This is used to expose the port to the cluster
254+
// TODO stopped here - bring up interactive and debug grpc (it worked before)
215255
func (r *EnsembleReconciler) createService(
216256
ctx context.Context,
217257
ensemble *api.Ensemble,
218258
) (ctrl.Result, error) {
219259

220-
serviceName := fmt.Sprintf("%s-grpc", ensemble.Name)
221-
222260
// First see if we already have it!
223261
svc := &corev1.Service{}
224262
err := r.Get(
225263
ctx,
226264
types.NamespacedName{
227-
Name: serviceName,
265+
Name: ensemble.ServiceName(),
228266
Namespace: ensemble.Namespace,
229267
},
230268
svc,
231269
)
232270

233-
// Deployment labels to match for service
234-
appLabels := getDeploymentLabels(ensemble)
235-
port, err := strconv.Atoi(ensemble.Spec.Sidecar.Port)
236-
if err != nil {
237-
return ctrl.Result{}, err
238-
}
239-
240271
// If we haven't found it, create it
241272
if err != nil {
242273
if errors.IsNotFound(err) {
274+
275+
// Deployment labels to match for service
276+
appLabels := getDeploymentLabels(ensemble)
277+
port, err := strconv.Atoi(ensemble.Spec.Sidecar.Port)
278+
if err != nil {
279+
return ctrl.Result{}, err
280+
}
281+
243282
svc = &corev1.Service{
244-
TypeMeta: metav1.TypeMeta{},
245-
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ensemble.Namespace},
283+
TypeMeta: metav1.TypeMeta{},
284+
ObjectMeta: metav1.ObjectMeta{
285+
Name: ensemble.ServiceName(),
286+
Namespace: ensemble.Namespace,
287+
},
246288
Spec: corev1.ServiceSpec{
247289
Ports: []corev1.ServicePort{
248290
{
@@ -271,7 +313,7 @@ func (r *EnsembleReconciler) createService(
271313
}
272314
// We already have the service account, no error
273315
// and continue to next thing.
274-
return ctrl.Result{}, nil
316+
return ctrl.Result{Requeue: true}, nil
275317
}
276318

277319
// ensureEnsembleService creates the deployment to run the ensemble service
@@ -389,6 +431,7 @@ func (r *EnsembleReconciler) newEnsembleDeployment(ensemble *api.Ensemble) (*app
389431
command := []string{
390432
"ensemble-server",
391433
"start",
434+
"--kubernetes",
392435
"--host", "0.0.0.0",
393436
"--port", ensemble.Spec.Sidecar.Port,
394437
"--workers", workers,
@@ -411,7 +454,9 @@ func (r *EnsembleReconciler) newEnsembleDeployment(ensemble *api.Ensemble) (*app
411454
Labels: appLabels,
412455
},
413456
Spec: corev1.PodSpec{
414-
Subdomain: ensemble.Name,
457+
458+
// This needs to match the service name
459+
Subdomain: ensemble.ServiceName(),
415460
ServiceAccountName: ensemble.Name,
416461
Containers: []corev1.Container{
417462
{

controllers/ensemble/configmap.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ var (
3939
// getConfigMap gets the entrypoint config map
4040
func (r *EnsembleReconciler) ensureEnsembleConfig(
4141
ctx context.Context,
42+
name string,
4243
ensemble *api.Ensemble,
4344
member *api.Member,
4445
) (ctrl.Result, error) {
@@ -49,7 +50,7 @@ func (r *EnsembleReconciler) ensureEnsembleConfig(
4950
err := r.Get(
5051
ctx,
5152
types.NamespacedName{
52-
Name: ensemble.Name,
53+
Name: name,
5354
Namespace: ensemble.Namespace,
5455
},
5556
existing,
@@ -61,7 +62,7 @@ func (r *EnsembleReconciler) ensureEnsembleConfig(
6162
if errors.IsNotFound(err) {
6263

6364
// Finally create the config map
64-
cm := r.createConfigMap(ensemble, member)
65+
cm := r.createConfigMap(ensemble, member, name)
6566
r.Log.Info("✨ Creating Ensemble YAML ✨")
6667
err = r.Create(ctx, cm)
6768
if err != nil {
@@ -80,15 +81,19 @@ func (r *EnsembleReconciler) ensureEnsembleConfig(
8081
}
8182

8283
// createConfigMap generates a config map with some kind of data
83-
func (r *EnsembleReconciler) createConfigMap(ensemble *api.Ensemble, member *api.Member) *corev1.ConfigMap {
84+
func (r *EnsembleReconciler) createConfigMap(
85+
ensemble *api.Ensemble,
86+
member *api.Member,
87+
name string,
88+
) *corev1.ConfigMap {
8489

8590
data := map[string]string{
8691
ensembleYamlName: member.Ensemble,
8792
}
8893
cm := &corev1.ConfigMap{
8994
TypeMeta: metav1.TypeMeta{},
9095
ObjectMeta: metav1.ObjectMeta{
91-
Name: ensemble.Name,
96+
Name: name,
9297
Namespace: ensemble.Namespace,
9398
},
9499
Data: data,

controllers/ensemble/ensemble_controller.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,16 @@ func (r *EnsembleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
122122
// This indicates the ensemble member is a MiniCluster
123123
if !reflect.DeepEqual(member.MiniCluster, minicluster.MiniClusterSpec{}) {
124124

125+
// Name is the index + ensemble name
126+
name := fmt.Sprintf("%s-%d", ensemble.Name, i)
127+
125128
// Create the config map volume (the ensemble.yaml)
126129
// for the MiniCluster to run as the entrypoint
127-
result, err := r.ensureEnsembleConfig(ctx, &ensemble, &member)
130+
result, err := r.ensureEnsembleConfig(ctx, name, &ensemble, &member)
128131
if err != nil {
129132
return result, err
130133
}
131134

132-
// Name is the index + ensemble name
133-
name := fmt.Sprintf("%s-%d", ensemble.Name, i)
134135
result, err = r.ensureMiniClusterEnsemble(ctx, name, &ensemble, &member)
135136
if err != nil {
136137
return result, err

controllers/ensemble/minicluster.go

Lines changed: 5 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (r *EnsembleReconciler) ensureMiniClusterEnsemble(
5252

5353
// We first need the address of the grpc service
5454
// if this fails, we try again - it might not be ready
55-
ipAddress, err := r.getServiceAddress(ctx, ensemble, name)
55+
ipAddress, err := r.getServiceAddress(ctx, ensemble)
5656
if err != nil {
5757
return ctrl.Result{Requeue: true}, err
5858
}
@@ -96,45 +96,6 @@ func (r *EnsembleReconciler) getExistingMiniCluster(
9696
return existing, err
9797
}
9898

99-
// updateMiniCluster size gets its current size from the status and updated
100-
// if it is valid
101-
func (r *EnsembleReconciler) updateMiniClusterSize(
102-
ctx context.Context,
103-
ensemble *api.Ensemble,
104-
scale int32,
105-
name string,
106-
) (ctrl.Result, error) {
107-
108-
mc, err := r.getExistingMiniCluster(ctx, name, ensemble)
109-
110-
// Check the size against what we have
111-
size := mc.Spec.Size
112-
113-
// We can only scale if we are left with at least one node
114-
// If we want to scale to 0, this should be a termination event
115-
newSize := size + scale
116-
if newSize < 1 {
117-
fmt.Printf(" Ignoring scaling event, new size %d is < 1\n", newSize)
118-
return ctrl.Result{}, err
119-
}
120-
if newSize <= mc.Spec.MaxSize {
121-
fmt.Printf(" Updating size from %d to %d\n", size, newSize)
122-
mc.Spec.Size = newSize
123-
124-
// TODO: this will trigger reconcile. Can we set the time?
125-
err = r.Update(ctx, mc)
126-
if err != nil {
127-
return ctrl.Result{}, err
128-
}
129-
130-
} else {
131-
fmt.Printf(" Ignoring scaling event %d to %d, outside allowed boundary\n", size, newSize)
132-
}
133-
134-
// Check again in the allotted time
135-
return ctrl.Result{}, err
136-
}
137-
13899
// newMiniCluster creates a new ensemble minicluster
139100
func (r *EnsembleReconciler) newMiniCluster(
140101
name string,
@@ -159,11 +120,11 @@ func (r *EnsembleReconciler) newMiniCluster(
159120
// Add the config map as a volume to the main container
160121
container := spec.Spec.Containers[0]
161122
volume := minicluster.ContainerVolume{
162-
ConfigMapName: ensemble.Name,
123+
ConfigMapName: name,
163124
Path: "/ensemble-entrypoint",
164125
Items: items,
165126
}
166-
container.Volumes = map[string]minicluster.ContainerVolume{ensemble.Name: volume}
127+
container.Volumes = map[string]minicluster.ContainerVolume{name: volume}
167128
container.RunFlux = true
168129
container.Launcher = true
169130

@@ -177,10 +138,10 @@ func (r *EnsembleReconciler) newMiniCluster(
177138
// Note that we aren't creating a headless service so that the different members are isolated.
178139
// Otherwise they would all be on the same service address, which might get ugly.
179140
ensembleYamlPath := filepath.Join(ensembleYamlDirName, ensembleYamlName)
180-
prefix := "ensemble run --executor minicluster --host"
141+
prefix := "ensemble run --kubernetes --executor minicluster --host"
181142
container.Command = fmt.Sprintf("%s %s --port %s --name %s %s",
182143
prefix, host,
183-
ensemble.Spec.Sidecar.Port, ensemble.Name,
144+
ensemble.Spec.Sidecar.Port, name,
184145
ensembleYamlPath,
185146
)
186147
spec.Spec.Containers[0] = container

docs/getting_started/custom-resource-definition.md

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,7 @@ spec:
129129
#### Sidecar
130130
131131
The sidecar is where the gRPC service (deployment) runs alongside the members. You can customize options related
132-
to this deployment, although you likely don't need to. I find this useful for development (e.g., using a development container
133-
and asking to pull always). These are the options available:
132+
to this deployment, although you likely don't need to. I find this useful for development (e.g., using a development container and asking to pull always). These are the options available:
134133
135134
136135
```yaml
@@ -163,7 +162,6 @@ Members is a list of members to add to your ensemble. In the future this could s
163162
but for now we are focusing on Flux Operator MiniCluster, which has a nice setup to allow for a sidecar container
164163
to monitor the Flux queue, doing everything from submitting jobs to reporting status. This is a list, so you
165164
could have two MiniCluster types, for example, that have different resources. For each member, you can define the following:
166-
167165
##### Ensemble
168166
169167
The ensemble section is a text chunk that should coincide with the ensemble.yaml that is described by ensemble-python. It will create a config map that is mapped as a volume to run the ensemble.
@@ -178,3 +176,15 @@ start the MiniCluster in interactive mode.
178176
179177
Note that for sidecar images, we provide automated builds for two versions of each of rocky and ubuntu.
180178
You can find them [here](https://github.com/converged-computing/ensemble-operator/pkgs/container/ensemble-operator-api).
179+
180+
##### Branch
181+
182+
If you want to test a development branch of ensemble-python, you can specify it alongside your minicluster / ensemble.
183+
For example:
184+
185+
```yaml
186+
# Install ensemble python from this branch instead of pip (for development)
187+
- branch: add-support-minicluster-autoscale
188+
minicluster:
189+
...
190+
```

0 commit comments

Comments
 (0)