Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions api/v1alpha1/minicluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ type MiniClusterSpec struct {
// +optional
Interactive bool `json:"interactive"`

// Specify the name of the cluster service
// +kubebuilder:default="flux-service"
// +default="flux-service"
// +optional
ServiceName string `json:"serviceName,omitempty"`

// Specify the name of the job selector.
// You would want to do this if you intend to connect miniclusters
// Multiple jobs can be selected under a single services
// +optional
JobSelector string `json:"jobSelector"`

// Flux options for the broker, shared across cluster
// +optional
Flux FluxSpec `json:"flux"`
Expand Down Expand Up @@ -318,6 +330,16 @@ type FluxSpec struct {
// +optional
Wrap string `json:"wrap,omitempty"`

// Connect to this job in the same namespace (akin to BootServer but within cluster)
// +optional
Connection string `json:"connection,omitempty"`

// Additional number of nodes to allow from external boot-server
// This currently only allows local MiniCluster but could be
// extended to any general URI
// +optional
ConnectionSize int `json:"connectionSize,omitempty"`

// Single user executable to provide to flux start
// +kubebuilder:default="5s"
// +default="5s"
Expand Down Expand Up @@ -585,6 +607,17 @@ func (f *MiniCluster) Validate() bool {
f.Spec.Flux.InstallRoot = "/usr"
}

// If connected host or size is defined, both must be defined!
if f.Spec.Flux.Connection != "" && f.Spec.Flux.ConnectionSize <= 0 {
fmt.Printf("😥️ A Connection is defined by no nodes. Please define the size.\n")
return false
}
// Inverse of that...
if f.Spec.Flux.Connection == "" && f.Spec.Flux.ConnectionSize > 0 {
fmt.Printf("😥️ A Connection size is defined, but no MiniCluster name. Please define the flux->connection.\n")
return false
}

// If the MaxSize isn't set, ensure it's equal to the size
if f.Spec.MaxSize == 0 {
f.Spec.MaxSize = f.Spec.Size
Expand Down
19 changes: 19 additions & 0 deletions api/v1alpha1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@
"type": "string",
"default": "5s"
},
"connection": {
"description": "Connect to this job in the same namespace (akin to BootServer but within cluster)",
"type": "string"
},
"connectionSize": {
"description": "Additional number of nodes to allow from external boot-server This currently only allows local MiniCluster but could be extended to any general URI",
"type": "integer",
"format": "int32"
},
"installRoot": {
"description": "Install root location",
"type": "string",
Expand Down Expand Up @@ -480,6 +489,11 @@
"default": ""
}
},
"jobSelector": {
"description": "Specify the name of the job selector. You would want to do this if you intend to connect miniclusters Multiple jobs can be selected under a single services",
"type": "string",
"default": ""
},
"logging": {
"description": "Logging modes determine the output you see in the job log",
"default": {},
Expand All @@ -495,6 +509,11 @@
"default": {},
"$ref": "#/definitions/PodSpec"
},
"serviceName": {
"description": "Specify the name of the cluster service",
"type": "string",
"default": "flux-service"
},
"services": {
"description": "Services are one or more service containers to bring up alongside the MiniCluster.",
"type": "array",
Expand Down
30 changes: 30 additions & 0 deletions api/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions chart/templates/minicluster-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,15 @@ spec:
default: 5s
description: Single user executable to provide to flux start
type: string
connection:
description: Connect to this job in the same namespace (akin to
BootServer but within cluster)
type: string
connectionSize:
description: Additional number of nodes to allow from external boot-server
This currently only allows local MiniCluster but could be extended
to any general URI
type: integer
installRoot:
default: /usr
description: Install root location
Expand Down Expand Up @@ -302,6 +311,11 @@ spec:
type: string
description: Labels for the job
type: object
jobSelector:
description: Specify the name of the job selector. You would want to
do this if you intend to connect miniclusters Multiple jobs can be
selected under a single services
type: string
logging:
description: Logging modes determine the output you see in the job log
properties:
Expand Down Expand Up @@ -362,6 +376,10 @@ spec:
description: Service account name for the pod
type: string
type: object
serviceName:
default: flux-service
description: Specify the name of the cluster service
type: string
services:
description: Services are one or more service containers to bring up
alongside the MiniCluster.
Expand Down
18 changes: 18 additions & 0 deletions config/crd/bases/flux-framework.org_miniclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ spec:
default: 5s
description: Single user executable to provide to flux start
type: string
connection:
description: Connect to this job in the same namespace (akin to
BootServer but within cluster)
type: string
connectionSize:
description: Additional number of nodes to allow from external
boot-server This currently only allows local MiniCluster but
could be extended to any general URI
type: integer
installRoot:
default: /usr
description: Install root location
Expand Down Expand Up @@ -304,6 +313,11 @@ spec:
type: string
description: Labels for the job
type: object
jobSelector:
description: Specify the name of the job selector. You would want
to do this if you intend to connect miniclusters Multiple jobs can
be selected under a single services
type: string
logging:
description: Logging modes determine the output you see in the job
log
Expand Down Expand Up @@ -365,6 +379,10 @@ spec:
description: Service account name for the pod
type: string
type: object
serviceName:
default: flux-service
description: Specify the name of the cluster service
type: string
services:
description: Services are one or more service containers to bring
up alongside the MiniCluster.
Expand Down
4 changes: 2 additions & 2 deletions controllers/flux/extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,12 @@ func (r *MiniClusterReconciler) createMiniClusterIngress(
}
err := ctrl.SetControllerReference(cluster, ingress, r.Scheme)
if err != nil {
r.log.Error(err, "🔴 Create ingress", "Service", restfulServiceName)
r.log.Error(err, "🔴 Create ingress", "Service", cluster.Spec.ServiceName)
return err
}
err = r.Client.Create(ctx, ingress)
if err != nil {
r.log.Error(err, "🔴 Create ingress", "Service", restfulServiceName)
r.log.Error(err, "🔴 Create ingress", "Service", cluster.Spec.ServiceName)
return err
}
return nil
Expand Down
10 changes: 7 additions & 3 deletions controllers/flux/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ func (r *MiniClusterReconciler) newMiniClusterJob(
setAsFQDN := false

// We add the selector for the horizontal auto scaler, if active
// We can't use the job-name selector, as this would include the
// external sidecar service!
// We can't use the job-name or job-group selector, as this
// would include the external sidecar service!
podLabels["hpa-selector"] = cluster.Name
podLabels["job-group"] = cluster.Name
if cluster.Spec.JobSelector != "" {
podLabels["job-group"] = cluster.Spec.JobSelector
}

// This is an indexed-job
job := &batchv1.Job{
Expand Down Expand Up @@ -61,7 +65,7 @@ func (r *MiniClusterReconciler) newMiniClusterJob(
},
Spec: corev1.PodSpec{
// matches the service
Subdomain: restfulServiceName,
Subdomain: cluster.Spec.ServiceName,
SetHostnameAsFQDN: &setAsFQDN,
Volumes: getVolumes(cluster),
RestartPolicy: corev1.RestartPolicyOnFailure,
Expand Down
44 changes: 36 additions & 8 deletions controllers/flux/minicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,15 @@ func (r *MiniClusterReconciler) ensureMiniCluster(
}
}

// Create headless service for the MiniCluster
selector := map[string]string{"job-name": cluster.Name}
result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
// Create headless service for the MiniCluster. But...
// If we are expecting to connect to another one, use the service name
selector := map[string]string{"job-group": cluster.Name}
if cluster.Spec.JobSelector != "" {
selector["job-group"] = cluster.Spec.JobSelector
}
r.log.Info("MiniCluster", "ServiceSelector", selector["job-group"])

result, err = r.exposeServices(ctx, cluster, cluster.Spec.ServiceName, selector)
if err != nil {
return result, err
}
Expand Down Expand Up @@ -377,19 +383,32 @@ func (r *MiniClusterReconciler) getConfigMap(
}

// generateHostlist for a specific size given the cluster namespace and a size
func generateHostlist(cluster *api.MiniCluster, size int) string {
func generateHostlist(name string, size int) string {

// The hosts are generated through the max size, so the cluster can expand
return fmt.Sprintf("%s-[%s]", cluster.Name, generateRange(size))
return fmt.Sprintf("%s-[%s]", name, generateRange(size))
}

// generateFluxConfig creates the broker.toml file used to boostrap flux
func generateFluxConfig(cluster *api.MiniCluster) string {

// The hosts are generated through the max size, so the cluster can expand
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", restfulServiceName, cluster.Namespace)
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", cluster.Spec.ServiceName, cluster.Namespace)
hosts := fmt.Sprintf("[%s]", generateRange(int(cluster.Spec.MaxSize)))
fluxConfig := fmt.Sprintf(brokerConfigTemplate, cluster.FluxInstallRoot(), fqdn, cluster.Name, hosts)

// If we are connecting another cluster, we need to register the hostnames here
connectedHosts := ""
if cluster.Spec.Flux.Connection != "" {
connectedRange := fmt.Sprintf("[%s]", generateRange(int(cluster.Spec.Flux.ConnectionSize)))
connectedHosts = fmt.Sprintf(",%s-%s", cluster.Spec.Flux.Connection, connectedRange)
}

// TODO: clean this up and make into template
fluxConfig := fmt.Sprintf(brokerConfigTemplate,
cluster.FluxInstallRoot(),
fqdn, cluster.Name, hosts,
connectedHosts)

fluxConfig += "\n" + brokerArchiveSection
return fluxConfig
}
Expand All @@ -416,7 +435,14 @@ func generateWaitScript(cluster *api.MiniCluster, containerIndex int) (string, e
mainHost := fmt.Sprintf("%s-0", cluster.Name)

// The resources size must also match the max size in the cluster
hosts := generateHostlist(cluster, int(cluster.Spec.MaxSize))
hosts := generateHostlist(cluster.Name, int(cluster.Spec.MaxSize))

// And if we are adding external connected hosts (from a different MiniCluster)
// We need to account for them too.
if cluster.Spec.Flux.Connection != "" {
connectedHosts := generateHostlist(cluster.Spec.Flux.Connection, int(cluster.Spec.Flux.ConnectionSize))
hosts = fmt.Sprintf("%s,%s", hosts, connectedHosts)
}

// Ensure our requested users each each have a password
for i, user := range cluster.Spec.Users {
Expand Down Expand Up @@ -445,9 +471,11 @@ func generateWaitScript(cluster *api.MiniCluster, containerIndex int) (string, e

// The token uuid is the same across images
wt := WaitTemplate{
TotalSize: cluster.Spec.MaxSize + int32(cluster.Spec.Flux.ConnectionSize),
FluxUser: getFluxUser(cluster.Spec.FluxRestful.Username),
FluxToken: getRandomToken(cluster.Spec.FluxRestful.Token),
MainHost: mainHost,
Namespace: cluster.Namespace,
Hosts: hosts,
Cores: cores,
Container: container,
Expand Down
9 changes: 6 additions & 3 deletions controllers/flux/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ func (r *MiniClusterReconciler) newServicePod(
podLabels := r.getPodLabels(cluster)
podServiceName := cluster.Name + "-services"

// service selector?
podLabels["job-name"] = cluster.Name
// service selector
podLabels["job-group"] = cluster.Name
if cluster.Spec.JobSelector != "" {
podLabels["job-group"] = cluster.Spec.JobSelector
}

// This is an indexed-job
pod := &corev1.Pod{
Expand All @@ -122,7 +125,7 @@ func (r *MiniClusterReconciler) newServicePod(
},
Spec: corev1.PodSpec{
// This is the headless service name
Subdomain: restfulServiceName,
Subdomain: cluster.Spec.ServiceName,
Hostname: podServiceName,
SetHostnameAsFQDN: &setAsFQDN,
RestartPolicy: corev1.RestartPolicyOnFailure,
Expand Down
2 changes: 1 addition & 1 deletion controllers/flux/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// addScaleSelector populates the fields the horizontal auto scaler needs.
// Meaning: job-name is used to select pods to check. The size variable
// Meaning: hpa-selector is used to select pods to check. The size variable
// is updated later.
func (r *MiniClusterReconciler) addScaleSelector(
ctx context.Context,
Expand Down
3 changes: 1 addition & 2 deletions controllers/flux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
)

var (
restfulServiceName = "flux-service"
servicePort = 5000
servicePort = 5000
)

// exposeService will expose services - one for the port 5000 forward, and the other for job networking (headless)
Expand Down
Loading