diff --git a/CHANGELOG.md b/CHANGELOG.md index 0870176c..317e75ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,10 +9,16 @@ All notable changes to this project will be documented in this file. - Various documentation of the CRD ([#537]). - Document support for Apache Iceberg extensions ([#556]). - Helm: support labels in values.yaml ([#560]). +- Support for NiFi `1.25.0` ([#571]). + +### Changed + +- A service for a single NiFi node is created for the reporting task to avoid JWT issues ([#571]). [#537]: https://github.com/stackabletech/nifi-operator/pull/537 [#556]: https://github.com/stackabletech/nifi-operator/pull/556 [#560]: https://github.com/stackabletech/nifi-operator/pull/560 +[#571]: https://github.com/stackabletech/nifi-operator/pull/571 ## [23.11.0] - 2023-11-24 diff --git a/docs/modules/nifi/examples/getting_started/getting_started.sh b/docs/modules/nifi/examples/getting_started/getting_started.sh index 3b56ea48..ec92556d 100755 --- a/docs/modules/nifi/examples/getting_started/getting_started.sh +++ b/docs/modules/nifi/examples/getting_started/getting_started.sh @@ -143,7 +143,7 @@ metadata: name: simple-nifi spec: image: - productVersion: 1.23.2 + productVersion: 1.25.0 clusterConfig: authentication: - authenticationClass: simple-nifi-users diff --git a/docs/modules/nifi/examples/getting_started/getting_started.sh.j2 b/docs/modules/nifi/examples/getting_started/getting_started.sh.j2 index 889a6abf..b4a5d298 100755 --- a/docs/modules/nifi/examples/getting_started/getting_started.sh.j2 +++ b/docs/modules/nifi/examples/getting_started/getting_started.sh.j2 @@ -143,7 +143,7 @@ metadata: name: simple-nifi spec: image: - productVersion: 1.23.2 + productVersion: 1.25.0 clusterConfig: authentication: - authenticationClass: simple-nifi-users diff --git a/docs/modules/nifi/pages/usage_guide/custom_processors.adoc b/docs/modules/nifi/pages/usage_guide/custom_processors.adoc index b51065dd..4afe2005 100644 --- a/docs/modules/nifi/pages/usage_guide/custom_processors.adoc +++ b/docs/modules/nifi/pages/usage_guide/custom_processors.adoc @@ -18,7 +18,7 @@ A simple Dockerfile would look like show in the following listing. [source,Dockerfile] ---- -FROM docker.stackable.tech/stackable/nifi:1.23.2-stackable0.0.0-dev +FROM docker.stackable.tech/stackable/nifi:1.25.0-stackable0.0.0-dev COPY /path/to/your/nar.file /stackable/nifi/lib/ ---- @@ -28,8 +28,8 @@ You then need to make this image available to your Kubernetes cluster and specif ---- spec: image: - productVersion: 1.23.2 - custom: "docker.company.org/nifi:1.23.2-customprocessor" + productVersion: 1.25.0 + custom: "docker.company.org/nifi:1.25.0-customprocessor" ---- == Using the Official Image @@ -96,7 +96,7 @@ metadata: name: simple-nifi spec: image: - productVersion: 1.23.2 + productVersion: 1.25.0 clusterConfig: authentication: - authenticationClass: simple-nifi-admin-user diff --git a/docs/modules/nifi/pages/usage_guide/index.adoc b/docs/modules/nifi/pages/usage_guide/index.adoc index 3bdb966b..932dc1a5 100644 --- a/docs/modules/nifi/pages/usage_guide/index.adoc +++ b/docs/modules/nifi/pages/usage_guide/index.adoc @@ -13,7 +13,7 @@ metadata: name: simple-nifi spec: image: - productVersion: 1.23.2 + productVersion: 1.25.0 clusterConfig: zookeeperConfigMapName: simple-nifi-znode # <1> authentication: # <2> diff --git a/docs/modules/nifi/pages/usage_guide/updating.adoc b/docs/modules/nifi/pages/usage_guide/updating.adoc index 9ce78c0e..c21697a4 100644 --- a/docs/modules/nifi/pages/usage_guide/updating.adoc +++ b/docs/modules/nifi/pages/usage_guide/updating.adoc @@ -1,7 +1,7 @@ = Updating NiFi Updating (or downgrading for that matter) the deployed version of NiFi is as simple as changing the version stated in the CRD. -Continuing the example above, to change the deployed version from `1.23.2` to `1.21.0` you'd simply deploy the following CRD. +Continuing the example above, to change the deployed version from `1.25.0` to `1.21.0` you'd simply deploy the following CRD. [source,yaml] ---- diff --git a/docs/modules/nifi/partials/supported-versions.adoc b/docs/modules/nifi/partials/supported-versions.adoc index 9e1a7d6d..803308bf 100644 --- a/docs/modules/nifi/partials/supported-versions.adoc +++ b/docs/modules/nifi/partials/supported-versions.adoc @@ -2,5 +2,6 @@ // This is a separate file, since it is used by both the direct NiFi-Operator documentation, and the overarching // Stackable Platform documentation. -- 1.23.2 -- 1.21.0 (deprecated) +- 1.25.0 +- 1.23.2 (deprecated) +- 1.21.0 (LTS) diff --git a/examples/simple-nifi-cluster.yaml b/examples/simple-nifi-cluster.yaml index 3980f430..85644c49 100644 --- a/examples/simple-nifi-cluster.yaml +++ b/examples/simple-nifi-cluster.yaml @@ -7,7 +7,7 @@ metadata: name: simple-zk spec: image: - productVersion: 3.8.3 + productVersion: 3.9.1 servers: roleGroups: default: @@ -47,7 +47,7 @@ metadata: name: simple-nifi spec: image: - productVersion: 1.23.2 + productVersion: 1.25.0 clusterConfig: authentication: - authenticationClass: simple-nifi-admin-user diff --git a/rust/operator-binary/src/config.rs b/rust/operator-binary/src/config.rs index 6dbf1434..cc506185 100644 --- a/rust/operator-binary/src/config.rs +++ b/rust/operator-binary/src/config.rs @@ -21,8 +21,8 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use crate::{ - authentication::{STACKABLE_SERVER_TLS_DIR, STACKABLE_TLS_STORE_PASSWORD}, operations::graceful_shutdown::graceful_shutdown_config_properties, + security::authentication::{STACKABLE_SERVER_TLS_DIR, STACKABLE_TLS_STORE_PASSWORD}, }; pub const NIFI_CONFIG_DIRECTORY: &str = "/stackable/nifi/conf"; diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 96b7532b..50b23909 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -12,7 +12,6 @@ use product_config::{ writer::{to_java_properties_string, PropertiesWriterError}, ProductConfigManager, }; -use rand::{distributions::Alphanumeric, Rng}; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_nifi_crd::{ authentication::resolve_authentication_classes, Container, CurrentlySupportedListenerClasses, @@ -25,7 +24,6 @@ use stackable_operator::{ builder::{ resources::ResourceRequirementsBuilder, ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, PodSecurityContextBuilder, SecretFormat, - SecretOperatorVolumeSourceBuilder, VolumeBuilder, }, client::Client, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, @@ -34,10 +32,9 @@ use stackable_operator::{ k8s_openapi::{ api::{ apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy}, - batch::v1::{Job, JobSpec}, core::v1::{ ConfigMap, ConfigMapKeySelector, ConfigMapVolumeSource, EmptyDirVolumeSource, - EnvVar, EnvVarSource, Node, ObjectFieldSelector, Probe, Secret, SecretVolumeSource, + EnvVar, EnvVarSource, Node, ObjectFieldSelector, Probe, SecretVolumeSource, Service, ServicePort, ServiceSpec, TCPSocketAction, Volume, }, }, @@ -71,11 +68,6 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use tracing::Instrument; use crate::{ - authentication::{ - NifiAuthenticationConfig, AUTHORIZERS_XML_FILE_NAME, - LOGIN_IDENTITY_PROVIDERS_XML_FILE_NAME, STACKABLE_ADMIN_USER_NAME, - STACKABLE_SERVER_TLS_DIR, STACKABLE_TLS_STORE_PASSWORD, - }, config::{ self, build_bootstrap_conf, build_nifi_properties, build_state_management_xml, validated_product_config, NifiRepository, JVM_SECURITY_PROPERTIES_FILE, @@ -83,18 +75,22 @@ use crate::{ }, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, product_logging::{extend_role_group_config_map, resolve_vector_aggregator_address}, + reporting_task::{self, build_reporting_task, build_reporting_task_service_name}, + security::{ + authentication::{ + NifiAuthenticationConfig, AUTHORIZERS_XML_FILE_NAME, + LOGIN_IDENTITY_PROVIDERS_XML_FILE_NAME, STACKABLE_SERVER_TLS_DIR, + STACKABLE_TLS_STORE_PASSWORD, + }, + build_tls_volume, check_or_generate_sensitive_key, + tls::{KEYSTORE_NIFI_CONTAINER_MOUNT, KEYSTORE_VOLUME_NAME, TRUSTSTORE_VOLUME_NAME}, + }, OPERATOR_NAME, }; pub const NIFI_CONTROLLER_NAME: &str = "nificluster"; pub const NIFI_UID: i64 = 1000; -const KEYSTORE_VOLUME_NAME: &str = "keystore"; -const KEYSTORE_NIFI_CONTAINER_MOUNT: &str = "/stackable/keystore"; -const KEYSTORE_REPORTING_TASK_MOUNT: &str = "/stackable/cert"; - -const TRUSTSTORE_VOLUME_NAME: &str = "truststore"; - const DOCKER_IMAGE_BASE_NAME: &str = "nifi"; pub struct Ctx { @@ -135,18 +131,6 @@ pub enum Error { source: stackable_operator::error::Error, }, - #[snafu(display("failed to check sensitive property key secret"))] - SensitiveKeySecret { - source: stackable_operator::error::Error, - }, - - #[snafu(display( - "sensitive key secret [{}/{}] is missing, but auto generation is disabled", - name, - namespace - ))] - SensitiveKeySecretMissing { name: String, namespace: String }, - #[snafu(display("failed to apply Service for {}", rolegroup))] ApplyRoleGroupService { source: stackable_operator::error::Error, @@ -174,6 +158,11 @@ pub enum Error { rolegroup: RoleGroupRef, }, + #[snafu(display("failed to apply create ReportingTask service"))] + ApplyCreateReportingTaskService { + source: stackable_operator::error::Error, + }, + #[snafu(display("failed to apply create ReportingTask job"))] ApplyCreateReportingTaskJob { source: stackable_operator::error::Error, @@ -269,7 +258,7 @@ pub enum Error { #[snafu(display("Invalid NiFi Authentication Configuration"))] InvalidNifiAuthenticationConfig { - source: crate::authentication::Error, + source: crate::security::authentication::Error, }, #[snafu(display("Failed to resolve NiFi Authentication Configuration"))] @@ -303,14 +292,17 @@ pub enum Error { source: stackable_operator::kvp::LabelError, }, - #[snafu(display("failed to build TLS certificate SecretClass Volume"))] - TlsCertSecretClassVolumeBuild { - source: stackable_operator::builder::SecretOperatorVolumeSourceBuilderError, - }, - #[snafu(display("failed to add Authentication Volumes and VolumeMounts"))] AddAuthVolumes { - source: crate::authentication::Error, + source: crate::security::authentication::Error, + }, + + #[snafu(display("security failure"))] + Security { source: crate::security::Error }, + + #[snafu(display("reporting task failure"))] + ReportingTask { + source: crate::reporting_task::Error, }, } @@ -344,7 +336,9 @@ pub async fn reconcile_nifi(nifi: Arc, ctx: Arc) -> Result, ctx: Arc) -> Result Result { - let nifi_name = nifi.name_any(); - let nifi_namespace: &str = &nifi.namespace().context(ObjectHasNoNamespaceSnafu)?; - let product_version = &resolved_product_image.product_version; - let nifi_connect_url = - format!("https://{nifi_name}.{nifi_namespace}.svc.cluster.local:{HTTPS_PORT}/nifi-api",); - - let (admin_username_file, admin_password_file) = - nifi_auth_config.get_user_and_password_file_paths(); - - let user_name_command = if admin_username_file.is_empty() { - // In case of the username being simple (e.g admin for SingleUser) just use it as is - format!("-u {STACKABLE_ADMIN_USER_NAME}") - } else { - // If the username is a bind dn (e.g. cn=integrationtest,ou=users,dc=example,dc=org) we have to extract the cn/dn/uid (in this case integrationtest) - format!( - "-u \"$(cat {admin_username_file} | grep -oP '((cn|dn|uid)=\\K[^,]+|.*)' | head -n 1)\"" - ) - }; - - let args = vec![ - "/stackable/python/create_nifi_reporting_task.py".to_string(), - format!("-n {nifi_connect_url}"), - user_name_command, - format!("-p \"$(cat {admin_password_file})\""), - format!("-v {product_version}"), - format!("-m {METRICS_PORT}"), - format!("-c {KEYSTORE_REPORTING_TASK_MOUNT}/ca.crt"), - ]; - let mut cb = ContainerBuilder::new("create-reporting-task").with_context(|_| { - IllegalContainerNameSnafu { - container_name: "create-reporting-task".to_string(), - } - })?; - cb.image_from_product_image(resolved_product_image) - .command(vec!["sh".to_string(), "-c".to_string()]) - .args(vec![args.join(" ")]) - // The VolumeMount for the secret operator key store certificates - .add_volume_mount(KEYSTORE_VOLUME_NAME, KEYSTORE_REPORTING_TASK_MOUNT) - .resources( - ResourceRequirementsBuilder::new() - .with_cpu_request("100m") - .with_cpu_limit("400m") - .with_memory_request("512Mi") - .with_memory_limit("512Mi") - .build(), - ); - - let job_name = format!( - "{}-create-reporting-task-{}", - nifi.name_any(), - product_version.replace('.', "-") - ); - - let mut pb = PodBuilder::new(); - nifi_auth_config - .add_volumes_and_mounts(&mut pb, vec![&mut cb]) - .context(AddAuthVolumesSnafu)?; - - let pod = pb - .metadata( - ObjectMetaBuilder::new() - .name(job_name.clone()) - .namespace_opt(nifi.namespace()) - .build(), - ) - .image_pull_secrets_from_product_image(resolved_product_image) - .restart_policy("OnFailure") - .service_account_name(sa_name) - .security_context( - PodSecurityContextBuilder::new() - .run_as_user(NIFI_UID) - .run_as_group(0) - .fs_group(1000) - .build(), - ) - .add_container(cb.build()) - .add_volume(build_keystore_volume( - KEYSTORE_VOLUME_NAME, - &nifi_name, - SecretFormat::TlsPem, - )?) - .build_template(); - - let job = Job { - metadata: ObjectMetaBuilder::new() - .name(job_name) - .namespace_opt(nifi.namespace()) - .ownerreference_from_resource(nifi, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .build(), - spec: Some(JobSpec { - backoff_limit: Some(100), - ttl_seconds_after_finished: Some(120), - template: pod, - ..JobSpec::default() - }), - ..Job::default() - }; - - Ok(job) -} - -async fn check_or_generate_sensitive_key( - client: &Client, - nifi: &NifiCluster, -) -> Result { - let sensitive_config = &nifi.spec.cluster_config.sensitive_properties; - let namespace: &str = &nifi.namespace().context(ObjectHasNoNamespaceSnafu)?; - - match client - .get_opt::(&sensitive_config.key_secret, namespace) - .await - .with_context(|_| SensitiveKeySecretSnafu {})? - { - Some(_) => Ok(false), - None => { - if !sensitive_config.auto_generate { - return Err(Error::SensitiveKeySecretMissing { - name: sensitive_config.key_secret.clone(), - namespace: namespace.to_string(), - }); - } - tracing::info!("No existing sensitive properties key found, generating new one"); - let password: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(15) - .map(char::from) - .collect(); - - let mut secret_data = BTreeMap::new(); - secret_data.insert("nifiSensitivePropsKey".to_string(), password); - - let new_secret = Secret { - metadata: ObjectMetaBuilder::new() - .namespace(namespace) - .name(&sensitive_config.key_secret.to_string()) - .build(), - string_data: Some(secret_data), - ..Secret::default() - }; - client - .create(&new_secret) - .await - .with_context(|_| SensitiveKeySecretSnafu {})?; - Ok(true) - } - } -} - fn external_node_port(nifi_service: &Service) -> Result { let external_ports = nifi_service .spec @@ -1496,30 +1330,6 @@ fn external_node_port(nifi_service: &Service) -> Result { port.node_port.with_context(|| ExternalPortSnafu {}) } -fn build_keystore_volume( - volume_name: &str, - nifi_name: &str, - secret_format: SecretFormat, -) -> Result { - let mut secret_volume_source_builder = SecretOperatorVolumeSourceBuilder::new("tls"); - - if secret_format == SecretFormat::TlsPkcs12 { - secret_volume_source_builder.with_tls_pkcs12_password(STACKABLE_TLS_STORE_PASSWORD); - } - - Ok(VolumeBuilder::new(volume_name) - .ephemeral( - secret_volume_source_builder - .with_node_scope() - .with_pod_scope() - .with_service_scope(nifi_name) - .with_format(secret_format) - .build() - .context(TlsCertSecretClassVolumeBuildSnafu)?, - ) - .build()) -} - /// Used for the `ZOOKEEPER_HOSTS` and `ZOOKEEPER_CHROOT` env vars. fn zookeeper_env_var(name: &str, configmap_name: &str) -> EnvVar { EnvVar { @@ -1544,9 +1354,12 @@ async fn get_proxy_hosts( let node_role_service_fqdn = nifi .node_role_service_fqdn() .context(NoRoleServiceFqdnSnafu)?; + let reporting_task_service_name = + reporting_task::build_reporting_task_fqdn_service_name(nifi).context(ReportingTaskSnafu)?; let mut proxy_setting = vec![ node_role_service_fqdn.clone(), format!("{node_role_service_fqdn}:{HTTPS_PORT}"), + format!("{reporting_task_service_name}:{HTTPS_PORT}"), ]; // In case NodePort is used add them as well @@ -1585,7 +1398,7 @@ pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> A Action::requeue(*Duration::from_secs(10)) } -fn build_recommended_labels<'a>( +pub fn build_recommended_labels<'a>( owner: &'a NifiCluster, app_version: &'a str, role: &'a str, diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 691d0c47..daf37ab5 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,8 +1,9 @@ -mod authentication; mod config; mod controller; mod operations; mod product_logging; +mod reporting_task; +mod security; use clap::{crate_description, crate_version, Parser}; use futures::stream::StreamExt; diff --git a/rust/operator-binary/src/reporting_task/mod.rs b/rust/operator-binary/src/reporting_task/mod.rs new file mode 100644 index 00000000..78b681bc --- /dev/null +++ b/rust/operator-binary/src/reporting_task/mod.rs @@ -0,0 +1,353 @@ +//! The NiFi Reporting Task for Prometheus metrics is created via the NiFi Rest API. +//! +//! This module contains methods to create all required resources (Job, Service) and helper methods +//! to create the Prometheus Reporting Task. +//! +//! Before NiFi 1.25.0 there was only the actual Kubernetes Job required to create and run the Reporting Task Job. +//! +//! Due to changes in the JWT validation in 1.25.0, the issuer refers to the FQDN of the Pod that was created, e.g.: +//! { +//! "sub": "admin", +//! "iss": "test-nifi-node-default-0.test-nifi-node-default.default.svc.cluster.local:8443", +//! } +//! which was different in e.g. 1.23.2 +//! { +//! "sub": "admin", +//! "iss": "SingleUserLoginIdentityProvider", +//! } +//! This caused problems when using the generated JWT against a different node (due to randomness of the service). +//! +//! "An error occurred while attempting to decode the Jwt: Signed JWT rejected: Another algorithm expected, or no matching key(s) found" +//! +//! Therefore, since the support of NiFi 1.25.0, an additional service for the Reporting Task Job containing a +//! random but deterministic NiFi node to ensure the communication with a single node. +//! +use std::collections::BTreeMap; + +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_nifi_crd::{ + NifiCluster, NifiRole, APP_NAME, HTTPS_PORT, HTTPS_PORT_NAME, METRICS_PORT, +}; +use stackable_operator::{ + builder::{ + resources::ResourceRequirementsBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, + PodSecurityContextBuilder, SecretFormat, + }, + commons::product_image_selection::ResolvedProductImage, + k8s_openapi::api::{ + batch::v1::{Job, JobSpec}, + core::v1::{Service, ServicePort, ServiceSpec}, + }, + kube::ResourceExt, + kvp::Labels, +}; + +use crate::security::{ + authentication::{NifiAuthenticationConfig, STACKABLE_ADMIN_USER_NAME}, + build_tls_volume, +}; + +use super::controller::{build_recommended_labels, NIFI_UID}; + +const REPORTING_TASK_CERT_VOLUME_NAME: &str = "tls"; +const REPORTING_TASK_CERT_VOLUME_MOUNT: &str = "/stackable/cert"; +const REPORTING_TASK_CONTAINER_NAME: &str = "reporting-task"; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("object defines no name"))] + ObjectHasNoName, + + #[snafu(display("object defines no namespace"))] + ObjectHasNoNamespace, + + #[snafu(display("failed to build metadata"))] + MetadataBuild { + source: stackable_operator::builder::ObjectMetaBuilderError, + }, + + #[snafu(display("illegal container name: [{container_name}]"))] + IllegalContainerName { + source: stackable_operator::error::Error, + container_name: String, + }, + + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::error::Error, + }, + + #[snafu(display("failed to add Authentication Volumes and VolumeMounts"))] + AddAuthVolumes { + source: crate::security::authentication::Error, + }, + + #[snafu(display("failed to build labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, + + #[snafu(display("failed to build secret volume"))] + SecretVolumeBuildFailure { source: crate::security::Error }, + + #[snafu(display("failed to create reporting task service, no role groups defined"))] + FailedBuildReportingTaskService, +} + +type Result = std::result::Result; + +/// Build required resources to create the reporting task in NiFi. +/// This will return +/// * a Job that creates and runs the reporting task via the NiFi Rest API. +/// * a Service that contains of one single NiFi node. +/// +/// The Service is required in order to communicate only with one designated NiFi node. +/// This is necessary as the generated JWT was changed in 1.25.0 and corrected the issuer +/// from SingleUserLoginIdentityProvider to the FQDN of the pod. +/// The NiFi role service will randomly delegate to different NiFi nodes which will +/// then fail requests to other nodes. +pub fn build_reporting_task( + nifi: &NifiCluster, + resolved_product_image: &ResolvedProductImage, + nifi_auth_config: &NifiAuthenticationConfig, + sa_name: &str, +) -> Result<(Job, Service)> { + Ok(( + build_reporting_task_job(nifi, resolved_product_image, nifi_auth_config, sa_name)?, + build_reporting_task_service(nifi, resolved_product_image)?, + )) +} + +/// Return the name of the reporting task. +pub fn build_reporting_task_service_name(nifi_cluster_name: &str) -> String { + format!("{nifi_cluster_name}-{REPORTING_TASK_CONTAINER_NAME}") +} + +/// Return the FQDN (with namespace, domain) of the reporting task. +pub fn build_reporting_task_fqdn_service_name(nifi: &NifiCluster) -> Result { + let nifi_cluster_name = nifi.name_any(); + let nifi_namespace: &str = &nifi.namespace().context(ObjectHasNoNamespaceSnafu)?; + let reporting_task_service_name = build_reporting_task_service_name(&nifi_cluster_name); + + Ok(format!( + "{reporting_task_service_name}.{nifi_namespace}.svc.cluster.local" + )) +} + +/// Return the name of the first pod belonging to the first role group that contains more than 0 replicas. +/// If no replicas are set in any rolegroup (e.g. HPA, see ) +/// return the first rolegroup just in case. +/// This is required to only select a single node in the Reporting Task Service. +fn get_reporting_task_service_selector_pod(nifi: &NifiCluster) -> Result { + let cluster_name = nifi.name_any(); + let node_name = NifiRole::Node.to_string(); + + // sort the rolegroups to avoid random sorting and therefore unnecessary reconciles + let sorted_role_groups = nifi + .spec + .nodes + .iter() + .flat_map(|role| &role.role_groups) + .collect::>(); + + let mut selector_role_group = None; + for (role_group_name, role_group) in sorted_role_groups { + // just pick the first rolegroup in case no replicas are set + if selector_role_group.is_none() { + selector_role_group = Some(role_group_name); + } + + if let Some(replicas) = role_group.replicas { + if replicas > 0 { + selector_role_group = Some(role_group_name); + break; + } + } + } + + Ok(format!( + "{cluster_name}-{node_name}-{role_group_name}-0", + role_group_name = selector_role_group.context(FailedBuildReportingTaskServiceSnafu)? + )) +} + +/// Build the internal Reporting Task Service in order to communicate with a single NiFi node. +fn build_reporting_task_service( + nifi: &NifiCluster, + resolved_product_image: &ResolvedProductImage, +) -> Result { + let nifi_cluster_name = nifi.name_any(); + let role_name = NifiRole::Node.to_string(); + let mut selector: BTreeMap = Labels::role_selector(nifi, APP_NAME, &role_name) + .context(LabelBuildSnafu)? + .into(); + + let service_selector_pod = get_reporting_task_service_selector_pod(nifi)?; + selector.insert( + "statefulset.kubernetes.io/pod-name".to_string(), + service_selector_pod, + ); + + Ok(Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(nifi) + .name(build_reporting_task_service_name(&nifi_cluster_name)) + .ownerreference_from_resource(nifi, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + nifi, + &resolved_product_image.app_version_label, + &role_name, + "global", + )) + .context(MetadataBuildSnafu)? + .build(), + spec: Some(ServiceSpec { + ports: Some(vec![ServicePort { + name: Some(HTTPS_PORT_NAME.to_string()), + port: HTTPS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }]), + selector: Some(selector), + ..ServiceSpec::default() + }), + status: None, + }) +} + +/// Build the [`Job`](`stackable_operator::k8s_openapi::api::batch::v1::Job`) that creates a +/// NiFi `ReportingTask` in order to enable JVM and NiFi metrics. +/// +/// The Job is run via the [`tools`](https://github.com/stackabletech/docker-images/tree/main/tools) +/// docker image and more specifically the `create_nifi_reporting_task.py` Python script. +/// +/// This script uses the [`nipyapi`](https://nipyapi.readthedocs.io/en/latest/readme.html) +/// library to authenticate and run the required REST calls to the NiFi REST API. +/// +/// In order to authenticate we need the `username` and `password` from the +/// [`NifiAuthenticationConfig`](`crate::security::authentication::NifiAuthenticationConfig`) +/// as well as a public certificate provided by the Stackable +/// [`secret-operator`](https://github.com/stackabletech/secret-operator) +/// +fn build_reporting_task_job( + nifi: &NifiCluster, + resolved_product_image: &ResolvedProductImage, + nifi_auth_config: &NifiAuthenticationConfig, + sa_name: &str, +) -> Result { + let reporting_task_fqdn_service_name = build_reporting_task_fqdn_service_name(nifi)?; + let product_version = &resolved_product_image.product_version; + let nifi_connect_url = + format!("https://{reporting_task_fqdn_service_name}:{HTTPS_PORT}/nifi-api",); + + let (admin_username_file, admin_password_file) = + nifi_auth_config.get_user_and_password_file_paths(); + + let user_name_command = if admin_username_file.is_empty() { + // In case of the username being simple (e.g admin for SingleUser) just use it as is + format!("-u {STACKABLE_ADMIN_USER_NAME}") + } else { + // If the username is a bind dn (e.g. cn=integrationtest,ou=users,dc=example,dc=org) we have to extract the cn/dn/uid (in this case integrationtest) + format!( + "-u \"$(cat {admin_username_file} | grep -oP '((cn|dn|uid)=\\K[^,]+|.*)' | head -n 1)\"" + ) + }; + + let args = vec![ + "/stackable/python/create_nifi_reporting_task.py".to_string(), + format!("-n {nifi_connect_url}"), + user_name_command, + format!("-p \"$(cat {admin_password_file})\""), + format!("-v {product_version}"), + format!("-m {METRICS_PORT}"), + format!("-c {REPORTING_TASK_CERT_VOLUME_MOUNT}/ca.crt"), + ]; + let mut cb = ContainerBuilder::new(REPORTING_TASK_CONTAINER_NAME).with_context(|_| { + IllegalContainerNameSnafu { + container_name: REPORTING_TASK_CONTAINER_NAME.to_string(), + } + })?; + cb.image_from_product_image(resolved_product_image) + .command(vec!["sh".to_string(), "-c".to_string()]) + .args(vec![args.join(" ")]) + // The VolumeMount for the secret operator key store certificates + .add_volume_mount( + REPORTING_TASK_CERT_VOLUME_NAME, + REPORTING_TASK_CERT_VOLUME_MOUNT, + ) + .resources( + ResourceRequirementsBuilder::new() + .with_cpu_request("100m") + .with_cpu_limit("400m") + .with_memory_request("512Mi") + .with_memory_limit("512Mi") + .build(), + ); + + let job_name = format!( + "{}-create-reporting-task-{}", + nifi.name_any(), + product_version.replace('.', "-") + ); + + let mut pb = PodBuilder::new(); + nifi_auth_config + .add_volumes_and_mounts(&mut pb, vec![&mut cb]) + .context(AddAuthVolumesSnafu)?; + + let pod = pb + .metadata( + ObjectMetaBuilder::new() + .name_and_namespace(nifi) + .name(job_name.clone()) + .ownerreference_from_resource(nifi, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .build(), + ) + .image_pull_secrets_from_product_image(resolved_product_image) + .restart_policy("OnFailure") + .service_account_name(sa_name) + .security_context( + PodSecurityContextBuilder::new() + .run_as_user(NIFI_UID) + .run_as_group(0) + .fs_group(1000) + .build(), + ) + .add_container(cb.build()) + .add_volume( + build_tls_volume( + REPORTING_TASK_CERT_VOLUME_NAME, + vec![], + SecretFormat::TlsPem, + ) + .context(SecretVolumeBuildFailureSnafu)?, + ) + .build_template(); + + let job = Job { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(nifi) + .name(job_name) + .ownerreference_from_resource(nifi, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(build_recommended_labels( + nifi, + &resolved_product_image.app_version_label, + "global", + "global", + )) + .context(MetadataBuildSnafu)? + .build(), + spec: Some(JobSpec { + backoff_limit: Some(100), + ttl_seconds_after_finished: Some(120), + template: pod, + ..JobSpec::default() + }), + ..Job::default() + }; + + Ok(job) +} diff --git a/rust/operator-binary/src/authentication.rs b/rust/operator-binary/src/security/authentication.rs similarity index 100% rename from rust/operator-binary/src/authentication.rs rename to rust/operator-binary/src/security/authentication.rs diff --git a/rust/operator-binary/src/security/mod.rs b/rust/operator-binary/src/security/mod.rs new file mode 100644 index 00000000..059a9e82 --- /dev/null +++ b/rust/operator-binary/src/security/mod.rs @@ -0,0 +1,33 @@ +use snafu::{ResultExt, Snafu}; +use stackable_nifi_crd::NifiCluster; +use stackable_operator::client::Client; +use stackable_operator::{builder::SecretFormat, k8s_openapi::api::core::v1::Volume}; + +pub mod authentication; +pub mod sensitive_key; +pub mod tls; + +type Result = std::result::Result; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("tls failure"))] + Tls { source: tls::Error }, + + #[snafu(display("sensistive key failure"))] + SensitiveKey { source: sensitive_key::Error }, +} + +pub async fn check_or_generate_sensitive_key(client: &Client, nifi: &NifiCluster) -> Result { + sensitive_key::check_or_generate_sensitive_key(client, nifi) + .await + .context(SensitiveKeySnafu) +} + +pub fn build_tls_volume( + volume_name: &str, + service_scopes: Vec<&str>, + secret_format: SecretFormat, +) -> Result { + tls::build_tls_volume(volume_name, service_scopes, secret_format).context(TlsSnafu) +} diff --git a/rust/operator-binary/src/security/sensitive_key.rs b/rust/operator-binary/src/security/sensitive_key.rs new file mode 100644 index 00000000..dea2a94f --- /dev/null +++ b/rust/operator-binary/src/security/sensitive_key.rs @@ -0,0 +1,76 @@ +use std::collections::BTreeMap; + +use rand::{distributions::Alphanumeric, Rng}; +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_nifi_crd::NifiCluster; +use stackable_operator::{ + builder::ObjectMetaBuilder, client::Client, k8s_openapi::api::core::v1::Secret, + kube::ResourceExt, +}; + +type Result = std::result::Result; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("object defines no namespace"))] + ObjectHasNoNamespace, + + #[snafu(display("failed to check sensitive property key secret"))] + SensitiveKeySecret { + source: stackable_operator::error::Error, + }, + + #[snafu(display( + "sensitive key secret [{}/{}] is missing, but auto generation is disabled", + name, + namespace + ))] + SensitiveKeySecretMissing { name: String, namespace: String }, +} + +pub(crate) async fn check_or_generate_sensitive_key( + client: &Client, + nifi: &NifiCluster, +) -> Result { + let sensitive_config = &nifi.spec.cluster_config.sensitive_properties; + let namespace: &str = &nifi.namespace().context(ObjectHasNoNamespaceSnafu)?; + + match client + .get_opt::(&sensitive_config.key_secret, namespace) + .await + .context(SensitiveKeySecretSnafu)? + { + Some(_) => Ok(false), + None => { + if !sensitive_config.auto_generate { + return Err(Error::SensitiveKeySecretMissing { + name: sensitive_config.key_secret.clone(), + namespace: namespace.to_string(), + }); + } + tracing::info!("No existing sensitive properties key found, generating new one"); + let password: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(15) + .map(char::from) + .collect(); + + let mut secret_data = BTreeMap::new(); + secret_data.insert("nifiSensitivePropsKey".to_string(), password); + + let new_secret = Secret { + metadata: ObjectMetaBuilder::new() + .namespace(namespace) + .name(&sensitive_config.key_secret.to_string()) + .build(), + string_data: Some(secret_data), + ..Secret::default() + }; + client + .create(&new_secret) + .await + .context(SensitiveKeySecretSnafu)?; + Ok(true) + } + } +} diff --git a/rust/operator-binary/src/security/tls.rs b/rust/operator-binary/src/security/tls.rs new file mode 100644 index 00000000..158b570f --- /dev/null +++ b/rust/operator-binary/src/security/tls.rs @@ -0,0 +1,49 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::{SecretFormat, SecretOperatorVolumeSourceBuilder, VolumeBuilder}, + k8s_openapi::api::core::v1::Volume, +}; + +use crate::security::authentication::STACKABLE_TLS_STORE_PASSWORD; + +pub const KEYSTORE_VOLUME_NAME: &str = "keystore"; +pub const KEYSTORE_NIFI_CONTAINER_MOUNT: &str = "/stackable/keystore"; +pub const TRUSTSTORE_VOLUME_NAME: &str = "truststore"; + +type Result = std::result::Result; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to build TLS certificate SecretClass Volume"))] + TlsCertSecretClassVolumeBuild { + source: stackable_operator::builder::SecretOperatorVolumeSourceBuilderError, + }, +} + +pub(crate) fn build_tls_volume( + volume_name: &str, + service_scopes: Vec<&str>, + secret_format: SecretFormat, +) -> Result { + // TODO: Make adaptable (https://github.com/stackabletech/nifi-operator/issues/499) + let mut secret_volume_source_builder = SecretOperatorVolumeSourceBuilder::new("tls"); + + if secret_format == SecretFormat::TlsPkcs12 { + secret_volume_source_builder.with_tls_pkcs12_password(STACKABLE_TLS_STORE_PASSWORD); + } + + for scope in service_scopes { + secret_volume_source_builder.with_service_scope(scope); + } + + Ok(VolumeBuilder::new(volume_name) + .ephemeral( + secret_volume_source_builder + .with_node_scope() + .with_pod_scope() + .with_format(secret_format) + .build() + .context(TlsCertSecretClassVolumeBuildSnafu)?, + ) + .build()) +} diff --git a/tests/templates/kuttl/logging/01-install-nifi-vector-aggregator.yaml b/tests/templates/kuttl/logging/01-install-nifi-vector-aggregator.yaml index 90f0f581..eeff53b4 100644 --- a/tests/templates/kuttl/logging/01-install-nifi-vector-aggregator.yaml +++ b/tests/templates/kuttl/logging/01-install-nifi-vector-aggregator.yaml @@ -5,7 +5,7 @@ commands: - script: >- helm install nifi-vector-aggregator vector --namespace $NAMESPACE - --version 0.26.0 + --version 0.30.0 --repo https://helm.vector.dev --values nifi-vector-aggregator-values.yaml --- diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 946a1212..c24ec169 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -9,20 +9,21 @@ dimensions: - 3.8.3 - name: zookeeper-latest values: - - 3.8.3 + - 3.9.1 - name: nifi values: - 1.21.0 - 1.23.2 + - 1.25.0 - name: nifi_old values: - 1.21.0 - name: nifi_new values: - - 1.23.2 + - 1.25.0 - name: nifi-latest values: - - 1.23.2 + - 1.25.0 - name: ldap-use-tls values: - "false"