diff --git a/CHANGELOG.md b/CHANGELOG.md index 0497f782..ea28a017 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ All notable changes to this project will be documented in this file. ### Added - Add rolling upgrade support for upgrades between NiFi 2 versions ([#771]). +- Added listener support for Nifi ([#784]). - Adds new telemetry CLI arguments and environment variables ([#782]). - Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept. - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. @@ -40,6 +41,7 @@ All notable changes to this project will be documented in this file. [#776]: https://github.com/stackabletech/nifi-operator/pull/776 [#782]: https://github.com/stackabletech/nifi-operator/pull/782 [#787]: https://github.com/stackabletech/nifi-operator/pull/787 +[#784]: https://github.com/stackabletech/nifi-operator/pull/784 [#789]: https://github.com/stackabletech/nifi-operator/pull/789 ## [25.3.0] - 2025-03-21 diff --git a/deploy/helm/nifi-operator/crds/crds.yaml b/deploy/helm/nifi-operator/crds/crds.yaml index 7511f55a..cef05d1d 100644 --- a/deploy/helm/nifi-operator/crds/crds.yaml +++ b/deploy/helm/nifi-operator/crds/crds.yaml @@ -94,20 +94,6 @@ spec: description: Allow all proxy hosts by turning off host header validation. See type: boolean type: object - listenerClass: - default: cluster-internal - description: |- - This field controls which type of Service the Operator creates for this NifiCluster: - - * cluster-internal: Use a ClusterIP service - - * external-unstable: Use a NodePort service - - This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - enum: - - cluster-internal - - external-unstable - type: string sensitiveProperties: description: These settings configure the encryption of sensitive properties in NiFi processors. NiFi supports encrypting sensitive properties in processors as they are written to disk. You can configure the encryption algorithm and the key to use. You can also let the operator generate an encryption key for you. properties: @@ -274,6 +260,14 @@ spec: description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. nullable: true type: string + listenerClass: + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the webserver. + enum: + - cluster-internal + - external-unstable + - external-stable + nullable: true + type: string logging: default: containers: {} @@ -761,6 +755,14 @@ spec: description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. nullable: true type: string + listenerClass: + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the webserver. + enum: + - cluster-internal + - external-unstable + - external-stable + nullable: true + type: string logging: default: containers: {} diff --git a/deploy/helm/nifi-operator/templates/roles.yaml b/deploy/helm/nifi-operator/templates/roles.yaml index 53b90c67..69b4c9c9 100644 --- a/deploy/helm/nifi-operator/templates/roles.yaml +++ b/deploy/helm/nifi-operator/templates/roles.yaml @@ -90,6 +90,12 @@ rules: verbs: - create - patch + - apiGroups: + - listeners.stackable.tech + resources: + - listeners + verbs: + - get - apiGroups: - {{ include "operator.name" . }}.stackable.tech resources: diff --git a/docs/modules/nifi/pages/usage_guide/listenerclass.adoc b/docs/modules/nifi/pages/usage_guide/listenerclass.adoc index 8ff77c87..55dcbbb1 100644 --- a/docs/modules/nifi/pages/usage_guide/listenerclass.adoc +++ b/docs/modules/nifi/pages/usage_guide/listenerclass.adoc @@ -1,19 +1,14 @@ = Service exposition with ListenerClasses :description: Configure Apache NiFi service exposure with cluster-internal or external-unstable listener classes. -Apache NiFi offers a web UI and an API. -The Operator deploys a service called `` (where `` is the name of the NifiCluster) through which NiFi can be reached. - -This service can have either the `cluster-internal` or `external-unstable` type. -`external-stable` is not supported for NiFi at the moment. -Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level. - -This is how the listener class is configured: +The operator deploys a xref:listener-operator:listener.adoc[Listener] for the Node pod. +The listener defaults to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`: [source,yaml] ---- spec: - clusterConfig: - listenerClass: cluster-internal # <1> + nodes: + config: + listenerClass: external-unstable # <1> ---- -<1> The default `cluster-internal` setting. +<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index cca71af6..4d2baa79 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -20,8 +20,11 @@ use stackable_operator::{ configmap::ConfigMapBuilder, meta::ObjectMetaBuilder, pod::{ - PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder, - security::PodSecurityContextBuilder, volume::SecretFormat, + PodBuilder, + container::ContainerBuilder, + resources::ResourceRequirementsBuilder, + security::PodSecurityContextBuilder, + volume::{ListenerOperatorVolumeSourceBuilderError, SecretFormat}, }, }, client::Client, @@ -37,15 +40,14 @@ use stackable_operator::{ apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy}, core::v1::{ ConfigMap, ConfigMapKeySelector, ConfigMapVolumeSource, EmptyDirVolumeSource, - EnvVar, EnvVarSource, Node, ObjectFieldSelector, Probe, SecretVolumeSource, - Service, ServicePort, ServiceSpec, TCPSocketAction, Volume, + EnvVar, EnvVarSource, ObjectFieldSelector, Probe, SecretVolumeSource, Service, + ServicePort, ServiceSpec, TCPSocketAction, Volume, }, }, apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, }, kube::{ Resource, ResourceExt, - api::ListParams, core::{DeserializeGuard, error_boundary}, runtime::{controller::Action, reflector::ObjectRef}, }, @@ -81,11 +83,11 @@ use crate::{ build_nifi_properties, build_state_management_xml, validated_product_config, }, crd::{ - APP_NAME, BALANCE_PORT, BALANCE_PORT_NAME, Container, CurrentlySupportedListenerClasses, - HTTPS_PORT, HTTPS_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, NifiConfig, + APP_NAME, BALANCE_PORT, BALANCE_PORT_NAME, Container, HTTPS_PORT, HTTPS_PORT_NAME, + LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, NifiConfig, NifiConfigFragment, NifiRole, NifiStatus, PROTOCOL_PORT, PROTOCOL_PORT_NAME, - STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, authentication::AuthenticationClassResolved, - v1alpha1, + STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, SupportedListenerClasses, + authentication::AuthenticationClassResolved, v1alpha1, }, operations::{ graceful_shutdown::add_graceful_shutdown_config, @@ -353,6 +355,11 @@ pub enum Error { #[snafu(display("Failed to determine the state of the version upgrade procedure"))] ClusterVersionUpdateState { source: upgrade::Error }, + + #[snafu(display("failed to build listener volume"))] + BuildListenerVolume { + source: ListenerOperatorVolumeSourceBuilderError, + }, } type Result = std::result::Result; @@ -375,11 +382,6 @@ pub async fn reconcile_nifi( .context(InvalidNifiClusterSnafu)?; let client = &ctx.client; - let namespace = &nifi - .metadata - .namespace - .clone() - .with_context(|| ObjectHasNoNamespaceSnafu {})?; let resolved_product_image: ResolvedProductImage = nifi .spec @@ -441,20 +443,6 @@ pub async fn reconcile_nifi( .map(Cow::Borrowed) .unwrap_or_default(); - let node_role_service = build_node_role_service(nifi, &resolved_product_image)?; - cluster_resources - .add(client, node_role_service) - .await - .context(ApplyRoleServiceSnafu)?; - - // This is read back to obtain the hosts that we later need to fill in the proxy_hosts variable - let updated_role_service = client - .get::(&nifi.name_any(), namespace) - .await - .with_context(|_| MissingServiceSnafu { - obj_ref: ObjectRef::new(&nifi.name_any()).within(namespace), - })?; - let nifi_authentication_config = NifiAuthenticationConfig::try_from( AuthenticationClassResolved::from(nifi, client) .await @@ -511,7 +499,7 @@ pub async fn reconcile_nifi( // Since we cannot predict which of the addresses a user might decide to use we will simply // add all of them to the setting for now. // For more information see - let proxy_hosts = get_proxy_hosts(client, nifi, &updated_role_service).await?; + let proxy_hosts = get_proxy_hosts(client, nifi, &merged_config).await?; let rg_configmap = build_node_rolegroup_config_map( nifi, @@ -648,52 +636,6 @@ pub async fn reconcile_nifi( Ok(Action::await_change()) } -/// The node-role service is the primary endpoint that should be used by clients that do not -/// perform internal load balancing including targets outside of the cluster. -pub fn build_node_role_service( - nifi: &v1alpha1::NifiCluster, - resolved_product_image: &ResolvedProductImage, -) -> Result { - let role_name = NifiRole::Node.to_string(); - - let role_svc_name = nifi.node_role_service_name(); - Ok(Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(nifi) - .name(&role_svc_name) - .ownerreference_from_resource(nifi, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - nifi, - &resolved_product_image.app_version_label, - &role_name, - "global", - )) - .context(MetadataBuildSnafu)? - .build(), - spec: Some(ServiceSpec { - type_: Some(nifi.spec.cluster_config.listener_class.k8s_service_type()), - ports: Some(vec![ServicePort { - name: Some(HTTPS_PORT_NAME.to_string()), - port: HTTPS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }]), - selector: Some( - Labels::role_selector(nifi, APP_NAME, &role_name) - .context(LabelBuildSnafu)? - .into(), - ), - external_traffic_policy: match nifi.spec.cluster_config.listener_class { - CurrentlySupportedListenerClasses::ClusterInternal => None, - CurrentlySupportedListenerClasses::ExternalUnstable => Some("Local".to_string()), - }, - ..ServiceSpec::default() - }), - status: None, - }) -} - /// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator #[allow(clippy::too_many_arguments)] async fn build_node_rolegroup_config_map( @@ -870,7 +812,7 @@ async fn build_node_rolegroup_statefulset( role: &Role, rolegroup_config: &HashMap>, merged_config: &NifiConfig, - nifi_auth_config: &NifiAuthenticationConfig, + nifi_authentication_config: &NifiAuthenticationConfig, rolling_update_supported: bool, replicas: Option, sa_name: &str, @@ -922,7 +864,7 @@ async fn build_node_rolegroup_statefulset( &nifi.spec.cluster_config.zookeeper_config_map_name, )); - if let NifiAuthenticationConfig::Oidc { oidc, .. } = nifi_auth_config { + if let NifiAuthenticationConfig::Oidc { oidc, .. } = nifi_authentication_config { env_vars.extend(AuthenticationProvider::client_credentials_env_var_mounts( oidc.client_credentials_secret_ref.clone(), )); @@ -973,7 +915,22 @@ async fn build_node_rolegroup_statefulset( ]); // This commands needs to go first, as they might set env variables needed by the templating - prepare_args.extend_from_slice(nifi_auth_config.get_additional_container_args().as_slice()); + prepare_args.extend_from_slice( + nifi_authentication_config + .get_additional_container_args() + .as_slice(), + ); + + if merged_config.listener_class == SupportedListenerClasses::ExternalUnstable { + prepare_args.extend(vec![ + "export LISTENER_DEFAULT_ADDRESS=$(cat /stackable/listener/default-address/address)" + .to_string(), + ]); + prepare_args.extend(vec![ + "export LISTENER_DEFAULT_PORT_HTTPS=$(cat /stackable/listener/default-address/ports/https)" + .to_string(), + ]); + } prepare_args.extend(vec![ "echo Templating config files".to_string(), @@ -1038,6 +995,8 @@ async fn build_node_rolegroup_statefulset( .context(AddVolumeMountSnafu)? .add_volume_mount(TRUSTSTORE_VOLUME_NAME, STACKABLE_SERVER_TLS_DIR) .context(AddVolumeMountSnafu)? + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)? .resources( ResourceRequirementsBuilder::new() .with_cpu_request("500m") @@ -1114,6 +1073,8 @@ async fn build_node_rolegroup_statefulset( .context(AddVolumeMountSnafu)? .add_volume_mount(TRUSTSTORE_VOLUME_NAME, STACKABLE_SERVER_TLS_DIR) .context(AddVolumeMountSnafu)? + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)? .add_container_port(HTTPS_PORT_NAME, HTTPS_PORT.into()) .add_container_port(PROTOCOL_PORT_NAME, PROTOCOL_PORT.into()) .add_container_port(BALANCE_PORT_NAME, BALANCE_PORT.into()) @@ -1140,6 +1101,28 @@ async fn build_node_rolegroup_statefulset( .resources(merged_config.resources.clone().into()); let mut pod_builder = PodBuilder::new(); + + let recommended_object_labels = build_recommended_labels( + nifi, + &resolved_product_image.app_version_label, + &rolegroup_ref.role, + &rolegroup_ref.role_group, + ); + let recommended_labels = + Labels::recommended(recommended_object_labels.clone()).context(LabelBuildSnafu)?; + + let listener_class = &merged_config.listener_class; + // all listeners will use ephemeral volumes as they can/should + // be removed when the pods are *terminated* (ephemeral PVCs will + // survive re-starts) + pod_builder + .add_listener_volume_by_listener_class( + LISTENER_VOLUME_NAME, + &listener_class.to_string(), + &recommended_labels, + ) + .context(AddVolumeSnafu)?; + add_graceful_shutdown_config(merged_config, &mut pod_builder).context(GracefulShutdownSnafu)?; // Add user configured extra volumes if any are specified @@ -1222,7 +1205,7 @@ async fn build_node_rolegroup_statefulset( } } - nifi_auth_config + nifi_authentication_config .add_volumes_and_mounts(&mut pod_builder, vec![ &mut container_prepare, container_nifi, @@ -1346,12 +1329,7 @@ async fn build_node_rolegroup_statefulset( .name(rolegroup_ref.object_name()) .ownerreference_from_resource(nifi, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - nifi, - &resolved_product_image.app_version_label, - &rolegroup_ref.role, - &rolegroup_ref.role_group, - )) + .with_recommended_labels(recommended_object_labels) .context(MetadataBuildSnafu)? .build(), spec: Some(StatefulSetSpec { @@ -1408,25 +1386,6 @@ async fn build_node_rolegroup_statefulset( }) } -fn external_node_port(nifi_service: &Service) -> Result { - let external_ports = nifi_service - .spec - .as_ref() - .with_context(|| ObjectHasNoSpecSnafu {})? - .ports - .as_ref() - .with_context(|| ExternalPortSnafu {})? - .iter() - .filter(|p| p.name == Some(HTTPS_PORT_NAME.to_string())) - .collect::>(); - - let port = external_ports - .first() - .with_context(|| ExternalPortSnafu {})?; - - port.node_port.with_context(|| ExternalPortSnafu {}) -} - /// Used for the `ZOOKEEPER_HOSTS` and `ZOOKEEPER_CHROOT` env vars. fn zookeeper_env_var(name: &str, configmap_name: &str) -> EnvVar { EnvVar { @@ -1446,7 +1405,7 @@ fn zookeeper_env_var(name: &str, configmap_name: &str) -> EnvVar { async fn get_proxy_hosts( client: &Client, nifi: &v1alpha1::NifiCluster, - nifi_service: &Service, + merged_config: &NifiConfig, ) -> Result { let host_header_check = nifi.spec.cluster_config.host_header_check.clone(); @@ -1478,32 +1437,10 @@ async fn get_proxy_hosts( proxy_hosts_set.extend(host_header_check.additional_allowed_hosts); - // In case NodePort is used add them as well - if nifi.spec.cluster_config.listener_class - == CurrentlySupportedListenerClasses::ExternalUnstable - { - let external_port = external_node_port(nifi_service)?; - - let cluster_nodes = client - .list::(&(), &ListParams::default()) - .await - .with_context(|_| MissingNodesSnafu { - obj_ref: ObjectRef::from_obj(nifi), - })?; - - // We need the addresses of all nodes to add these to the NiFi proxy setting - // Since there is no real convention about how to label these addresses we will simply - // take all published addresses for now to be on the safe side. - proxy_hosts_set.extend( - cluster_nodes - .into_iter() - .flat_map(|node| { - node.status - .unwrap_or_default() - .addresses - .unwrap_or_default() - }) - .map(|node_address| format!("{}:{external_port}", node_address.address)), + // If NodePort is used inject the address and port from the listener volume in the prepare container + if merged_config.listener_class == SupportedListenerClasses::ExternalUnstable { + proxy_hosts_set.insert( + "${env:LISTENER_DEFAULT_ADDRESS}:${env:LISTENER_DEFAULT_PORT_HTTPS}".to_string(), ); } diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index c3227e77..270b5902 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -1,8 +1,9 @@ pub mod affinity; pub mod authentication; pub mod tls; +pub mod utils; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use affinity::get_affinity; use serde::{Deserialize, Serialize}; @@ -20,7 +21,7 @@ use stackable_operator::{ }, config::{ fragment::{self, Fragment, ValidationError}, - merge::Merge, + merge::{Atomic, Merge}, }, k8s_openapi::{ api::core::v1::{PodTemplateSpec, Volume}, @@ -42,6 +43,7 @@ use stackable_operator::{ }; use strum::Display; use tls::NifiTls; +use utils::{PodRef, get_listener_podrefs}; pub const APP_NAME: &str = "nifi"; @@ -54,6 +56,8 @@ pub const BALANCE_PORT: u16 = 6243; pub const METRICS_PORT_NAME: &str = "metrics"; pub const METRICS_PORT: u16 = 8081; +pub const LISTENER_VOLUME_NAME: &str = "listener"; +pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; pub const STACKABLE_LOG_DIR: &str = "/stackable/log"; pub const STACKABLE_LOG_CONFIG_DIR: &str = "/stackable/log_config"; @@ -74,6 +78,12 @@ pub enum Error { #[snafu(display("fragment validation failure"))] FragmentValidationFailure { source: ValidationError }, + + #[snafu(display("object has no nodes defined"))] + NoNodesDefined, + + #[snafu(display("listener podrefs could not be resolved"))] + ListenerPodRef { source: utils::Error }, } #[versioned(version(name = "v1alpha1"))] @@ -152,18 +162,6 @@ pub mod versioned { #[schemars(schema_with = "raw_object_list_schema")] pub extra_volumes: Vec, - /// This field controls which type of Service the Operator creates for this NifiCluster: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, - // Docs are on the struct #[serde(default)] pub create_reporting_task_job: CreateReportingTaskJob, @@ -236,6 +234,11 @@ impl v1alpha1::NifiCluster { namespace: ns.clone(), role_group_service_name: rolegroup_ref.object_name(), pod_name: format!("{}-{}", rolegroup_ref.object_name(), i), + ports: HashMap::from([ + (HTTPS_PORT_NAME.to_owned(), HTTPS_PORT), + (METRICS_PORT_NAME.to_owned(), METRICS_PORT), + ]), + fqdn_override: None, }) })) } @@ -270,6 +273,100 @@ impl v1alpha1::NifiCluster { tracing::debug!("Merged config: {:?}", conf_rolegroup); fragment::validate(conf_rolegroup).context(FragmentValidationFailureSnafu) } + + pub fn merged_listener_class( + &self, + rolegroup_name: &String, + ) -> Result, Error> { + let listener_class_default = Some(SupportedListenerClasses::ClusterInternal); + let role = self.spec.nodes.as_ref().context(NoNodesDefinedSnafu)?; + + let mut listener_class_role = role.config.config.listener_class.to_owned(); + let mut listener_class_rolegroup = role + .role_groups + .get(rolegroup_name) + .map(|rg| rg.config.config.listener_class.clone()) + .unwrap_or_default(); + listener_class_role.merge(&listener_class_default); + listener_class_rolegroup.merge(&listener_class_role); + tracing::debug!("Merged listener-class: {:?}", listener_class_rolegroup); + Ok(listener_class_rolegroup) + } + + pub fn rolegroup_ref( + &self, + role_name: impl Into, + group_name: impl Into, + ) -> RoleGroupRef { + RoleGroupRef { + cluster: ObjectRef::from_obj(self), + role: role_name.into(), + role_group: group_name.into(), + } + } + + pub fn rolegroup_ref_and_replicas(&self) -> Vec<(RoleGroupRef, u16)> { + self.spec + .nodes + .iter() + .flat_map(|role| &role.role_groups) + // Order rolegroups consistently, to avoid spurious downstream rewrites + .collect::>() + .into_iter() + .filter(|(rolegroup_name, _)| self.resolved_listener_class_discoverable(rolegroup_name)) + .map(|(rolegroup_name, role_group)| { + ( + self.rolegroup_ref(NifiRole::Node.to_string(), rolegroup_name), + role_group.replicas.unwrap_or_default(), + ) + }) + .collect() + } + + fn resolved_listener_class_discoverable(&self, rolegroup_name: &&String) -> bool { + if let Ok(Some(listener_class)) = self.merged_listener_class(rolegroup_name) { + listener_class.discoverable() + } else { + // merged_listener_class returns an error if one of the roles was not found: + // all roles are mandatory for airflow to work, but a missing role will by + // definition not have a listener class + false + } + } + + pub fn pod_refs(&self) -> Result, Error> { + let ns = self.metadata.namespace.clone().context(NoNamespaceSnafu)?; + let rolegroup_ref_and_replicas = self.rolegroup_ref_and_replicas(); + + Ok(rolegroup_ref_and_replicas + .iter() + .flat_map(|(rolegroup_ref, replicas)| { + let ns = ns.clone(); + (0..*replicas).map(move |i| PodRef { + namespace: ns.clone(), + role_group_service_name: rolegroup_ref.object_name(), + pod_name: format!("{}-{}", rolegroup_ref.object_name(), i), + ports: HashMap::from([ + (HTTPS_PORT_NAME.to_owned(), HTTPS_PORT), + (METRICS_PORT_NAME.to_owned(), METRICS_PORT), + ]), + fqdn_override: None, + }) + }) + .collect()) + } + + pub async fn listener_refs( + &self, + client: &stackable_operator::client::Client, + ) -> Result, Error> { + let pod_refs = self.pod_refs()?; + + tracing::debug!("Pod references: {:#?}", pod_refs); + get_listener_podrefs(client, pod_refs, LISTENER_VOLUME_NAME) + .await + .context(ListenerPodRefSnafu) + } } #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] @@ -297,22 +394,51 @@ pub fn default_allow_all() -> bool { true } -// TODO: Temporary solution until listener-operator is finished +// // TODO: Temporary solution until listener-operator is finished +// #[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +// #[serde(rename_all = "PascalCase")] +// pub enum CurrentlySupportedListenerClasses { +// #[default] +// #[serde(rename = "cluster-internal")] +// ClusterInternal, +// #[serde(rename = "external-unstable")] +// ExternalUnstable, +// } + +// impl CurrentlySupportedListenerClasses { +// pub fn k8s_service_type(&self) -> String { +// match self { +// CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), +// CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), +// } +// } +// } + #[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "PascalCase")] -pub enum CurrentlySupportedListenerClasses { +pub enum SupportedListenerClasses { #[default] #[serde(rename = "cluster-internal")] + #[strum(serialize = "cluster-internal")] ClusterInternal, + #[serde(rename = "external-unstable")] + #[strum(serialize = "external-unstable")] ExternalUnstable, + + #[serde(rename = "external-stable")] + #[strum(serialize = "external-stable")] + ExternalStable, } -impl CurrentlySupportedListenerClasses { - pub fn k8s_service_type(&self) -> String { +impl Atomic for SupportedListenerClasses {} + +impl SupportedListenerClasses { + pub fn discoverable(&self) -> bool { match self { - CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), - CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), + SupportedListenerClasses::ClusterInternal => false, + SupportedListenerClasses::ExternalUnstable => true, + SupportedListenerClasses::ExternalStable => true, } } } @@ -504,6 +630,10 @@ pub struct NifiConfig { /// Please note that this can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. #[fragment_attrs(serde(default))] pub requested_secret_lifetime: Option, + + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the webserver. + #[serde(default)] + pub listener_class: SupportedListenerClasses, } impl NifiConfig { @@ -553,6 +683,7 @@ impl NifiConfig { affinity: get_affinity(cluster_name, role), graceful_shutdown_timeout: Some(DEFAULT_NODE_GRACEFUL_SHUTDOWN_TIMEOUT), requested_secret_lifetime: Some(Self::DEFAULT_NODE_SECRET_LIFETIME), + listener_class: Some(SupportedListenerClasses::ClusterInternal), } } } @@ -632,24 +763,3 @@ pub struct NifiStorageConfig { #[fragment_attrs(serde(default))] pub state_repo: PvcConfig, } - -/// Reference to a single `Pod` that is a component of a [`NifiCluster`] -/// Used for service discovery. -// TODO: this should move to operator-rs -pub struct PodRef { - pub namespace: String, - pub role_group_service_name: String, - pub pod_name: String, -} - -impl PodRef { - pub fn fqdn(&self, cluster_info: &KubernetesClusterInfo) -> String { - format!( - "{pod_name}.{service_name}.{namespace}.svc.{cluster_domain}", - pod_name = self.pod_name, - service_name = self.role_group_service_name, - namespace = self.namespace, - cluster_domain = cluster_info.cluster_domain - ) - } -} diff --git a/rust/operator-binary/src/crd/utils.rs b/rust/operator-binary/src/crd/utils.rs new file mode 100644 index 00000000..a54cd0bf --- /dev/null +++ b/rust/operator-binary/src/crd/utils.rs @@ -0,0 +1,104 @@ +use std::{borrow::Cow, collections::HashMap, num::TryFromIntError}; + +use futures::future::try_join_all; +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_operator::{ + commons::listener::Listener, k8s_openapi::api::core::v1::Pod, + kube::runtime::reflector::ObjectRef, utils::cluster_info::KubernetesClusterInfo, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("unable to get {listener} (for {pod})"))] + GetPodListener { + source: stackable_operator::client::Error, + listener: ObjectRef, + pod: ObjectRef, + }, + + #[snafu(display("{listener} (for {pod}) has no address"))] + PodListenerHasNoAddress { + listener: ObjectRef, + pod: ObjectRef, + }, + + #[snafu(display("port {port} ({port_name:?}) is out of bounds, must be within {range:?}", range = 0..=u16::MAX))] + PortOutOfBounds { + source: TryFromIntError, + port_name: String, + port: i32, + }, +} + +/// Reference to a single `Pod` that is a component of the product cluster +/// +/// Used for service discovery. +#[derive(Debug)] +pub struct PodRef { + pub namespace: String, + pub role_group_service_name: String, + pub pod_name: String, + pub fqdn_override: Option, + pub ports: HashMap, +} + +impl PodRef { + pub fn fqdn(&self, cluster_info: &KubernetesClusterInfo) -> Cow { + self.fqdn_override.as_deref().map_or_else( + || { + Cow::Owned(format!( + "{pod_name}.{role_group_service_name}.{namespace}.svc.{cluster_domain}", + pod_name = self.pod_name, + role_group_service_name = self.role_group_service_name, + namespace = self.namespace, + cluster_domain = cluster_info.cluster_domain, + )) + }, + Cow::Borrowed, + ) + } +} + +pub async fn get_listener_podrefs( + client: &stackable_operator::client::Client, + pod_refs: Vec, + listener_prefix: &str, +) -> Result, Error> { + try_join_all(pod_refs.into_iter().map(|pod_ref| async { + // N.B. use the naming convention for ephemeral listener volumes as we + // have defined all listeners to be so. + let listener_name = format!("{}-{listener_prefix}", pod_ref.pod_name); + let listener_ref = || ObjectRef::::new(&listener_name).within(&pod_ref.namespace); + let pod_obj_ref = || ObjectRef::::new(&pod_ref.pod_name).within(&pod_ref.namespace); + let listener = client + .get::(&listener_name, &pod_ref.namespace) + .await + .context(GetPodListenerSnafu { + listener: listener_ref(), + pod: pod_obj_ref(), + })?; + let listener_address = listener + .status + .and_then(|s| s.ingress_addresses?.into_iter().next()) + .context(PodListenerHasNoAddressSnafu { + listener: listener_ref(), + pod: pod_obj_ref(), + })?; + Ok(PodRef { + fqdn_override: Some(listener_address.address), + ports: listener_address + .ports + .into_iter() + .map(|(port_name, port)| { + let port = u16::try_from(port).context(PortOutOfBoundsSnafu { + port_name: &port_name, + port, + })?; + Ok((port_name, port)) + }) + .collect::>()?, + ..pod_ref + }) + })) + .await +} diff --git a/tests/templates/kuttl/external-access/00-patch-ns.yaml.j2 b/tests/templates/kuttl/external-access/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/external-access/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/external-access/00-range-limit.yaml b/tests/templates/kuttl/external-access/00-range-limit.yaml new file mode 100644 index 00000000..8fd02210 --- /dev/null +++ b/tests/templates/kuttl/external-access/00-range-limit.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: v1 +kind: LimitRange +metadata: + name: limit-request-ratio +spec: + limits: + - type: "Container" + maxLimitRequestRatio: + cpu: 5 + memory: 1 diff --git a/tests/templates/kuttl/external-access/00-rbac.yaml.j2 b/tests/templates/kuttl/external-access/00-rbac.yaml.j2 new file mode 100644 index 00000000..7ee61d23 --- /dev/null +++ b/tests/templates/kuttl/external-access/00-rbac.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-role +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: test-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-rb +subjects: + - kind: ServiceAccount + name: test-sa +roleRef: + kind: Role + name: test-role + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/external-access/10-assert.yaml.j2 b/tests/templates/kuttl/external-access/10-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/external-access/10-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/external-access/10-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/external-access/10-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/external-access/10-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/external-access/20-assert.yaml b/tests/templates/kuttl/external-access/20-assert.yaml new file mode 100644 index 00000000..e0766c49 --- /dev/null +++ b/tests/templates/kuttl/external-access/20-assert.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-zk-server-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/external-access/20-install-zk.yaml.j2 b/tests/templates/kuttl/external-access/20-install-zk.yaml.j2 new file mode 100644 index 00000000..2f1558cc --- /dev/null +++ b/tests/templates/kuttl/external-access/20-install-zk.yaml.j2 @@ -0,0 +1,29 @@ +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: test-zk +spec: + image: + productVersion: "{{ test_scenario['values']['zookeeper-latest'] }}" + pullPolicy: IfNotPresent + clusterConfig: + listenerClass: {{ test_scenario['values']['listener-class'] }} +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + servers: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: test-nifi-znode +spec: + clusterRef: + name: test-zk diff --git a/tests/templates/kuttl/external-access/30-assert.yaml b/tests/templates/kuttl/external-access/30-assert.yaml new file mode 100644 index 00000000..32f2b7cc --- /dev/null +++ b/tests/templates/kuttl/external-access/30-assert.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 1200 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-nifi-node-default +spec: + template: + spec: + terminationGracePeriodSeconds: 300 +status: + readyReplicas: 2 + replicas: 2 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-nifi-node +status: + expectedPods: 4 + currentHealthy: 4 + disruptionsAllowed: 1 diff --git a/tests/templates/kuttl/external-access/30-assert.yaml.j2 b/tests/templates/kuttl/external-access/30-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/external-access/30-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/external-access/30-install-nifi.yaml.j2 b/tests/templates/kuttl/external-access/30-install-nifi.yaml.j2 new file mode 100644 index 00000000..5838a12f --- /dev/null +++ b/tests/templates/kuttl/external-access/30-install-nifi.yaml.j2 @@ -0,0 +1,96 @@ +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: simple-nifi-users +spec: + provider: + static: + userCredentialsSecret: + name: simple-nifi-admin-credentials +--- +apiVersion: v1 +kind: Secret +metadata: + name: simple-nifi-admin-credentials +stringData: + admin: > + passwordWithSpecialCharacter\@<&>"' +--- +apiVersion: v1 +kind: Secret +metadata: + name: nifi-sensitive-property-key +stringData: + nifiSensitivePropsKey: mYsUp3rS3cr3tk3y +--- +apiVersion: nifi.stackable.tech/v1alpha1 +kind: NifiCluster +metadata: + name: test-nifi +spec: + image: +{% if test_scenario['values']['nifi'].find(",") > 0 %} + custom: "{{ test_scenario['values']['nifi'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['nifi'].split(',')[0] }}" +{% else %} + custom: null + productVersion: "{{ test_scenario['values']['nifi'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + zookeeperConfigMapName: test-nifi-znode + listenerClass: {{ test_scenario['values']['listener-class'] }} + authentication: + - authenticationClass: simple-nifi-users + hostHeaderCheck: + allowAll: false + sensitiveProperties: + keySecret: nifi-sensitive-property-key +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + nodes: + envOverrides: + COMMON_VAR: role-value # overridden by role group below + ROLE_VAR: role-value # only defined here at role level + configOverrides: + "nifi.properties": + "nifi.diagnostics.on.shutdown.enabled": "true" + "nifi.diagnostics.on.shutdown.verbose": "false" + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 2 + envOverrides: + COMMON_VAR: group-value # overrides role value + GROUP_VAR: group-value # only defined here at group level + configOverrides: + "nifi.properties": + "nifi.diagnostics.on.shutdown.enabled": "false" + "nifi.diagnostics.on.shutdown.max.filecount": "20" + roleGroups: + external-unstable: + replicas: 1 + config: + listenerClass: external-unstable + envOverrides: + COMMON_VAR: group-value # overrides role value + GROUP_VAR: group-value # only defined here at group level + configOverrides: + "nifi.properties": + "nifi.diagnostics.on.shutdown.enabled": "false" + "nifi.diagnostics.on.shutdown.max.filecount": "20" + cluster-internal: + replicas: 1 + config: + listenerClass: cluster-internal + envOverrides: + COMMON_VAR: group-value # overrides role value + GROUP_VAR: group-value # only defined here at group level + configOverrides: + "nifi.properties": + "nifi.diagnostics.on.shutdown.enabled": "false" + "nifi.diagnostics.on.shutdown.max.filecount": "20" diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index d83e0751..b253b67e 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -90,6 +90,11 @@ tests: - zookeeper-latest - oidc-use-tls - openshift + - name: external-access + dimensions: + - nifi + - zookeeper-latest + - openshift suites: - name: nightly patch: