From 61f89d3d20eb85181326f97c895503c7ddf16b22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 8 Apr 2025 14:34:31 +0200 Subject: [PATCH 01/18] Resync Cargo.nix --- Cargo.nix | 65 +++++++++++++++---------------------------------------- 1 file changed, 18 insertions(+), 47 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index 301d71bc..5a7f9604 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -5878,7 +5878,7 @@ rec { } { name = "zerocopy"; - packageId = "zerocopy 0.8.21"; + packageId = "zerocopy 0.8.23"; usesDefaultFeatures = false; features = [ "simd" ]; } @@ -6223,13 +6223,10 @@ rec { }; "ring" = rec { crateName = "ring"; - version = "0.17.8"; + version = "0.17.14"; edition = "2021"; - links = "ring_core_0_17_8"; - sha256 = "03fwlb1ssrmfxdckvqv033pfmk01rhx9ynwi7r186dcfcp5s8zy1"; - authors = [ - "Brian Smith " - ]; + links = "ring_core_0_17_14_"; + sha256 = "1dw32gv19ccq4hsx3ribhpdzri1vnrlcfqb2vj41xn4l49n9ws54"; dependencies = [ { name = "cfg-if"; @@ -6244,14 +6241,13 @@ rec { name = "libc"; packageId = "libc"; usesDefaultFeatures = false; - target = { target, features }: ((("android" == target."os" or null) || ("linux" == target."os" or null)) && (("aarch64" == target."arch" or null) || ("arm" == target."arch" or null))); + target = { target, features }: ((("aarch64" == target."arch" or null) && ("little" == target."endian" or null)) && ("apple" == target."vendor" or null) && (("ios" == target."os" or null) || ("macos" == target."os" or null) || ("tvos" == target."os" or null) || ("visionos" == target."os" or null) || ("watchos" == target."os" or null))); } { - name = "spin"; - packageId = "spin"; + name = "libc"; + packageId = "libc"; usesDefaultFeatures = false; - target = { target, features }: (("aarch64" == target."arch" or null) || ("arm" == target."arch" or null) || ("x86" == target."arch" or null) || ("x86_64" == target."arch" or null)); - features = [ "once" ]; + target = { target, features }: (((("aarch64" == target."arch" or null) && ("little" == target."endian" or null)) || (("arm" == target."arch" or null) && ("little" == target."endian" or null))) && (("android" == target."os" or null) || ("linux" == target."os" or null))); } { name = "untrusted"; @@ -6260,7 +6256,7 @@ rec { { name = "windows-sys"; packageId = "windows-sys 0.52.0"; - target = { target, features }: (("aarch64" == target."arch" or null) && ("windows" == target."os" or null)); + target = { target, features }: ((("aarch64" == target."arch" or null) && ("little" == target."endian" or null)) && ("windows" == target."os" or null)); features = [ "Win32_Foundation" "Win32_System_Threading" ]; } ]; @@ -7443,31 +7439,6 @@ rec { }; resolvedDefaultFeatures = [ "all" ]; }; - "spin" = rec { - crateName = "spin"; - version = "0.9.8"; - edition = "2015"; - sha256 = "0rvam5r0p3a6qhc18scqpvpgb3ckzyqxpgdfyjnghh8ja7byi039"; - authors = [ - "Mathijs van de Nes " - "John Ericson " - "Joshua Barretto " - ]; - features = { - "barrier" = [ "mutex" ]; - "default" = [ "lock_api" "mutex" "spin_mutex" "rwlock" "once" "lazy" "barrier" ]; - "fair_mutex" = [ "mutex" ]; - "lazy" = [ "once" ]; - "lock_api" = [ "lock_api_crate" ]; - "lock_api_crate" = [ "dep:lock_api_crate" ]; - "portable-atomic" = [ "dep:portable-atomic" ]; - "portable_atomic" = [ "portable-atomic" ]; - "spin_mutex" = [ "mutex" ]; - "ticket_mutex" = [ "mutex" ]; - "use_ticket_mutex" = [ "mutex" "ticket_mutex" ]; - }; - resolvedDefaultFeatures = [ "once" ]; - }; "stable_deref_trait" = rec { crateName = "stable_deref_trait"; version = "1.2.0"; @@ -11467,11 +11438,11 @@ rec { }; resolvedDefaultFeatures = [ "byteorder" "default" "derive" "simd" "zerocopy-derive" ]; }; - "zerocopy 0.8.21" = rec { + "zerocopy 0.8.23" = rec { crateName = "zerocopy"; - version = "0.8.21"; + version = "0.8.23"; edition = "2021"; - sha256 = "0y4lz5l7a7h5rsy37jwmjrs3pc9i2jgwyigm257i6pfxn91i3w6w"; + sha256 = "1inbxgqhsxghawsss8x8517g30fpp8s3ll2ywy88ncm40m6l95zx"; authors = [ "Joshua Liebow-Feeser " "Jack Wrenn " @@ -11479,19 +11450,19 @@ rec { dependencies = [ { name = "zerocopy-derive"; - packageId = "zerocopy-derive 0.8.21"; + packageId = "zerocopy-derive 0.8.23"; optional = true; } { name = "zerocopy-derive"; - packageId = "zerocopy-derive 0.8.21"; + packageId = "zerocopy-derive 0.8.23"; target = { target, features }: false; } ]; devDependencies = [ { name = "zerocopy-derive"; - packageId = "zerocopy-derive 0.8.21"; + packageId = "zerocopy-derive 0.8.23"; } ]; features = { @@ -11529,11 +11500,11 @@ rec { ]; }; - "zerocopy-derive 0.8.21" = rec { + "zerocopy-derive 0.8.23" = rec { crateName = "zerocopy-derive"; - version = "0.8.21"; + version = "0.8.23"; edition = "2021"; - sha256 = "18kdbw6k5hgw81l8pn1rp67zg13hxsdj3vmbr6196aglyj386b3i"; + sha256 = "0m7iwisxz111sgkski722nyxv0rixbs0a9iylrcvhpfx1qfw0lk3"; procMacro = true; libName = "zerocopy_derive"; authors = [ From fd6ffadb45629b330dbe925eea046e3f6555d4fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 8 Apr 2025 14:35:22 +0200 Subject: [PATCH 02/18] Add Kubernetes state provider --- rust/operator-binary/src/config/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index 6be7bba3..bbe6cb93 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -597,6 +597,11 @@ pub fn build_state_management_xml() -> String { 10 seconds Open + + kubernetes-provider + org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider + simple-nifi + ", &NifiRepository::State.mount_path(), ) From 914181f0ca2ab3a10c78301f732f2baf6d883130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 8 Apr 2025 14:37:31 +0200 Subject: [PATCH 03/18] Use raw strings to clean up escaping... --- rust/operator-binary/src/config/mod.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index bbe6cb93..7aa4079a 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -579,31 +579,31 @@ pub fn build_nifi_properties( pub fn build_state_management_xml() -> String { format!( - " + r#" local-provider org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider - {} - false - 16 - 2 mins + {local_state_path} + false + 16 + 2 mins zk-provider org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider - ${{env:ZOOKEEPER_HOSTS}} - ${{env:ZOOKEEPER_CHROOT}} - 10 seconds - Open + ${{env:ZOOKEEPER_HOSTS}} + ${{env:ZOOKEEPER_CHROOT}} + 10 seconds + Open kubernetes-provider org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider - simple-nifi + simple-nifi - ", - &NifiRepository::State.mount_path(), + "#, + local_state_path = &NifiRepository::State.mount_path(), ) } From aaf6c2b323f0fd2d58391bcbcf3819aa85c1cd2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 8 Apr 2025 14:58:23 +0200 Subject: [PATCH 04/18] Prefix the Kubernetes state configmaps by the actual stacklet name, rather than hard-coding it --- rust/operator-binary/src/config/mod.rs | 2 +- rust/operator-binary/src/controller.rs | 53 +++++++++++++------------- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index 7aa4079a..961cd364 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -600,7 +600,7 @@ pub fn build_state_management_xml() -> String { kubernetes-provider org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider - simple-nifi + ${{env:STACKLET_NAME}} "#, local_state_path = &NifiRepository::State.mount_path(), diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 648feb1c..70c56fbe 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -914,7 +914,7 @@ async fn build_node_rolegroup_statefulset( merged_config: &NifiConfig, nifi_auth_config: &NifiAuthenticationConfig, version_change_state: &VersionChangeState, - sa_name: &str, + service_account_name: &str, ) -> Result { tracing::debug!("Building statefulset"); let role_group = role.role_groups.get(&rolegroup_ref.role_group); @@ -950,18 +950,33 @@ async fn build_node_rolegroup_statefulset( env_vars.push(EnvVar { name: "CONTAINERDEBUG_LOG_DIRECTORY".to_string(), value: Some(format!("{STACKABLE_LOG_DIR}/containerdebug")), - value_from: None, + ..Default::default() }); - env_vars.push(zookeeper_env_var( - "ZOOKEEPER_HOSTS", - &nifi.spec.cluster_config.zookeeper_config_map_name, - )); + env_vars.push(EnvVar { + name: "STACKLET_NAME".to_string(), + value: Some(nifi.name_unchecked().to_string()), + ..Default::default() + }); - env_vars.push(zookeeper_env_var( - "ZOOKEEPER_CHROOT", - &nifi.spec.cluster_config.zookeeper_config_map_name, - )); + let zookeeper_env_var = |name: &str| EnvVar { + name: name.to_string(), + value_from: Some(EnvVarSource { + config_map_key_ref: Some(ConfigMapKeySelector { + name: nifi + .spec + .cluster_config + .zookeeper_config_map_name + .to_string(), + key: name.to_string(), + ..ConfigMapKeySelector::default() + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }; + env_vars.push(zookeeper_env_var("ZOOKEEPER_HOSTS")); + env_vars.push(zookeeper_env_var("ZOOKEEPER_CHROOT")); if let NifiAuthenticationConfig::Oidc { oidc, .. } = nifi_auth_config { env_vars.extend(AuthenticationProvider::client_credentials_env_var_mounts( @@ -1348,7 +1363,7 @@ async fn build_node_rolegroup_statefulset( ..Volume::default() }) .context(AddVolumeSnafu)? - .service_account_name(sa_name) + .service_account_name(service_account_name) .security_context( PodSecurityContextBuilder::new() .run_as_user(NIFI_UID) @@ -1460,22 +1475,6 @@ fn external_node_port(nifi_service: &Service) -> Result { port.node_port.with_context(|| ExternalPortSnafu {}) } -/// Used for the `ZOOKEEPER_HOSTS` and `ZOOKEEPER_CHROOT` env vars. -fn zookeeper_env_var(name: &str, configmap_name: &str) -> EnvVar { - EnvVar { - name: name.to_string(), - value_from: Some(EnvVarSource { - config_map_key_ref: Some(ConfigMapKeySelector { - name: configmap_name.to_string(), - key: name.to_string(), - ..ConfigMapKeySelector::default() - }), - ..EnvVarSource::default() - }), - ..EnvVar::default() - } -} - async fn get_proxy_hosts( client: &Client, nifi: &v1alpha1::NifiCluster, From e4609dc32fbad79f38704f6308edf7c80a432353 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 9 Apr 2025 09:17:12 +0200 Subject: [PATCH 05/18] Make clustering mode configurable, add Kubernetes mode --- .../helm/nifi-operator/templates/roles.yaml | 27 +++++++++- rust/operator-binary/src/config/mod.rs | 50 ++++++++++++++----- rust/operator-binary/src/controller.rs | 39 ++++++++------- rust/operator-binary/src/crd/mod.rs | 22 ++++++-- 4 files changed, 101 insertions(+), 37 deletions(-) diff --git a/deploy/helm/nifi-operator/templates/roles.yaml b/deploy/helm/nifi-operator/templates/roles.yaml index 53b90c67..17127ac4 100644 --- a/deploy/helm/nifi-operator/templates/roles.yaml +++ b/deploy/helm/nifi-operator/templates/roles.yaml @@ -133,9 +133,11 @@ rules: - apiGroups: - "" resources: - - configmaps - secrets - serviceaccounts + # This is redundant with the rule for specifically about configmaps + # (due to clustering), but we read them for other purposes too + - configmaps verbs: - get - apiGroups: @@ -144,6 +146,29 @@ rules: - events verbs: - create + # Required for Kubernetes-managed clustering, see https://nifi.apache.org/nifi-docs/administration-guide.html#kubernetes-clustering + - apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - get + - update + # undocumented but required + - patch + # Required for Kubernetes cluster state provider, see https://nifi.apache.org/nifi-docs/administration-guide.html#kubernetes-configmap-cluster-state-provider + - apiGroups: + - "" + resources: + - configmaps + verbs: + - create + - delete + - get + - list + - patch + - update {{ if .Capabilities.APIVersions.Has "security.openshift.io/v1" }} - apiGroups: - security.openshift.io diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index 961cd364..bd452489 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -249,7 +249,10 @@ pub fn build_nifi_properties( // The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster. properties.insert( "nifi.state.management.provider.cluster".to_string(), - "zk-provider".to_string(), + match spec.cluster_config.clustering_mode { + v1alpha1::NifiClusteringMode::ZooKeeper { .. } => "zk-provider".to_string(), + v1alpha1::NifiClusteringMode::Kubernetes { .. } => "kubernetes-provider".to_string(), + }, ); // Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server properties.insert( @@ -558,18 +561,39 @@ pub fn build_nifi_properties( "".to_string(), ); - // zookeeper properties, used for cluster management - // this will be replaced via a container command script - properties.insert( - "nifi.zookeeper.connect.string".to_string(), - "${env:ZOOKEEPER_HOSTS}".to_string(), - ); - - // this will be replaced via a container command script - properties.insert( - "nifi.zookeeper.root.node".to_string(), - "${env:ZOOKEEPER_CHROOT}".to_string(), - ); + match spec.cluster_config.clustering_mode { + v1alpha1::NifiClusteringMode::ZooKeeper { .. } => { + properties.insert( + "nifi.cluster.leader.election.implementation".to_string(), + "CuratorLeaderElectionManager".to_string(), + ); + + // this will be replaced via a container command script + properties.insert( + "nifi.zookeeper.connect.string".to_string(), + "${env:ZOOKEEPER_HOSTS}".to_string(), + ); + + // this will be replaced via a container command script + properties.insert( + "nifi.zookeeper.root.node".to_string(), + "${env:ZOOKEEPER_CHROOT}".to_string(), + ); + } + + v1alpha1::NifiClusteringMode::Kubernetes {} => { + properties.insert( + "nifi.cluster.leader.election.implementation".to_string(), + "KubernetesLeaderElectionManager".to_string(), + ); + + // this will be replaced via a container command script + properties.insert( + "nifi.cluster.leader.election.kubernetes.lease.prefix".to_string(), + "${env:STACKLET_NAME}".to_string(), + ); + } + } // override with config overrides properties.extend(overrides); diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 70c56fbe..047cce4b 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -959,24 +959,27 @@ async fn build_node_rolegroup_statefulset( ..Default::default() }); - let zookeeper_env_var = |name: &str| EnvVar { - name: name.to_string(), - value_from: Some(EnvVarSource { - config_map_key_ref: Some(ConfigMapKeySelector { - name: nifi - .spec - .cluster_config - .zookeeper_config_map_name - .to_string(), - key: name.to_string(), - ..ConfigMapKeySelector::default() - }), - ..EnvVarSource::default() - }), - ..EnvVar::default() - }; - env_vars.push(zookeeper_env_var("ZOOKEEPER_HOSTS")); - env_vars.push(zookeeper_env_var("ZOOKEEPER_CHROOT")); + match &nifi.spec.cluster_config.clustering_mode { + v1alpha1::NifiClusteringMode::ZooKeeper { + zookeeper_config_map_name, + } => { + let zookeeper_env_var = |name: &str| EnvVar { + name: name.to_string(), + value_from: Some(EnvVarSource { + config_map_key_ref: Some(ConfigMapKeySelector { + name: zookeeper_config_map_name.to_string(), + key: name.to_string(), + ..ConfigMapKeySelector::default() + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }; + env_vars.push(zookeeper_env_var("ZOOKEEPER_HOSTS")); + env_vars.push(zookeeper_env_var("ZOOKEEPER_CHROOT")); + } + v1alpha1::NifiClusteringMode::Kubernetes {} => {} + } if let NifiAuthenticationConfig::Oidc { oidc, .. } = nifi_auth_config { env_vars.extend(AuthenticationProvider::client_credentials_env_var_mounts( diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 983189a6..c577e84d 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -138,11 +138,8 @@ pub mod versioned { #[serde(skip_serializing_if = "Option::is_none")] pub vector_aggregator_config_map_name: Option, - /// NiFi requires a ZooKeeper cluster connection to run. - /// Provide the name of the ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) - /// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) - /// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - pub zookeeper_config_map_name: String, + #[serde(flatten)] + pub clustering_mode: NifiClusteringMode, /// Extra volumes similar to `.spec.volumes` on a Pod to mount into every container, this can be useful to for /// example make client certificates, keytabs or similar things available to processors. These volumes will be @@ -168,6 +165,21 @@ pub mod versioned { #[serde(default)] pub create_reporting_task_job: CreateReportingTaskJob, } + + #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] + #[serde(untagged)] + pub enum NifiClusteringMode { + #[serde(rename_all = "camelCase")] + ZooKeeper { + // TODO: reword to clarify that ZK is optional for 2.0+ + /// NiFi requires a ZooKeeper cluster connection to run. + /// Provide the name of the ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) + /// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) + /// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. + zookeeper_config_map_name: String, + }, + Kubernetes {}, + } } impl HasStatusCondition for v1alpha1::NifiCluster { From 7daf23e486c5f02f436d32754092b0a80d2ff426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 9 Apr 2025 11:09:05 +0200 Subject: [PATCH 06/18] Regenerate CRD --- deploy/helm/nifi-operator/crds/crds.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/deploy/helm/nifi-operator/crds/crds.yaml b/deploy/helm/nifi-operator/crds/crds.yaml index 7511f55a..2e261b23 100644 --- a/deploy/helm/nifi-operator/crds/crds.yaml +++ b/deploy/helm/nifi-operator/crds/crds.yaml @@ -26,6 +26,10 @@ spec: description: A NiFi cluster stacklet. This resource is managed by the Stackable operator for Apache NiFi. Find more information on how to use it and the resources that the operator generates in the [operator documentation](https://docs.stackable.tech/home/nightly/nifi/). properties: clusterConfig: + anyOf: + - required: + - zookeeperConfigMapName + - {} description: Settings that affect all roles and role groups. The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level. properties: authentication: @@ -163,7 +167,6 @@ spec: required: - authentication - sensitiveProperties - - zookeeperConfigMapName type: object clusterOperation: default: From 82a9bf65d56cdaee8a52b8248622037562354e0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 9 Apr 2025 11:17:41 +0200 Subject: [PATCH 07/18] Rewrite zookeeperConfigMapName docs to emphasize that it is optional for NiFi 2.x --- deploy/helm/nifi-operator/crds/crds.yaml | 5 ++++- rust/operator-binary/src/crd/mod.rs | 12 ++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/deploy/helm/nifi-operator/crds/crds.yaml b/deploy/helm/nifi-operator/crds/crds.yaml index 2e261b23..4fb397ab 100644 --- a/deploy/helm/nifi-operator/crds/crds.yaml +++ b/deploy/helm/nifi-operator/crds/crds.yaml @@ -162,7 +162,10 @@ spec: nullable: true type: string zookeeperConfigMapName: - description: NiFi requires a ZooKeeper cluster connection to run. Provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. + description: |- + NiFi can either use ZooKeeper or Kubernetes for managing its cluster state. To use ZooKeeper, provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. + + The Kubernetes provider will be used if this field is unset. Kubernetes is only supported for NiFi 2.x and newer, NiFi 1.x requires ZooKeeper. type: string required: - authentication diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index c577e84d..b40286c4 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -166,16 +166,20 @@ pub mod versioned { pub create_reporting_task_job: CreateReportingTaskJob, } + // This is flattened in for backwards compatibility reasons, `zookeeper_config_map_name` already existed and used to be mandatory. + // For v1alpha2, consider migrating this to a tagged enum for consistency. #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] #[serde(untagged)] pub enum NifiClusteringMode { #[serde(rename_all = "camelCase")] ZooKeeper { - // TODO: reword to clarify that ZK is optional for 2.0+ - /// NiFi requires a ZooKeeper cluster connection to run. - /// Provide the name of the ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) - /// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) + /// NiFi can either use ZooKeeper or Kubernetes for managing its cluster state. To use ZooKeeper, provide the name of the + /// ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) here. + /// When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) /// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. + /// + /// The Kubernetes provider will be used if this field is unset. Kubernetes is only supported for NiFi 2.x and newer, + /// NiFi 1.x requires ZooKeeper. zookeeper_config_map_name: String, }, Kubernetes {}, From 0ff1c222967f36c4bf7a2d846ac82189ec2c966f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 9 Apr 2025 11:51:15 +0200 Subject: [PATCH 08/18] Update docs to mention Kubernetes backend --- docs/modules/nifi/pages/index.adoc | 2 +- .../nifi/pages/usage_guide/clustering.adoc | 41 +++++++++++++++++++ .../modules/nifi/pages/usage_guide/index.adoc | 2 +- .../usage_guide/zookeeper-connection.adoc | 14 ------- docs/modules/nifi/partials/nav.adoc | 2 +- 5 files changed, 44 insertions(+), 17 deletions(-) create mode 100644 docs/modules/nifi/pages/usage_guide/clustering.adoc delete mode 100644 docs/modules/nifi/pages/usage_guide/zookeeper-connection.adoc diff --git a/docs/modules/nifi/pages/index.adoc b/docs/modules/nifi/pages/index.adoc index 34e74dac..ead76339 100644 --- a/docs/modules/nifi/pages/index.adoc +++ b/docs/modules/nifi/pages/index.adoc @@ -35,7 +35,7 @@ Every role group is accessible through it's own Service, and there is a Service == Dependencies -Apache NiFi depends on Apache ZooKeeper which you can run in Kubernetes with the xref:zookeeper:index.adoc[]. +Apache NiFi 1.x depends on Apache ZooKeeper which you can run in Kubernetes with the xref:zookeeper:index.adoc[]. == [[demos]]Demos diff --git a/docs/modules/nifi/pages/usage_guide/clustering.adoc b/docs/modules/nifi/pages/usage_guide/clustering.adoc new file mode 100644 index 00000000..66ec4ac1 --- /dev/null +++ b/docs/modules/nifi/pages/usage_guide/clustering.adoc @@ -0,0 +1,41 @@ += Clustering +:description: Apache NiFi requires a backend for cluster management, and supports either Kubernetes or Apache ZooKeeper. +:page-aliases: usage_guide/zookeeper-connection.adoc + +Apache NiFi requires{empty}footnote:[Apache NiFi also supports a single-node mode with no cluster backend, but this is not supported by the Stackable Operator for Apache NiFi. The Stackable Operator does require a cluster backend.] an external backend for state management and leader election. + +Currently, the Stackable Operator for Apache NiFi supports the following backends: + +- xref:#backend-kubernetes[] +- xref:#backend-zookeeper[] + +CAUTION: The cluster backend of an existing cluster should never be changed. Otherwise data loss may occur, both due to losing NiFi processor state, and due to potential split-brain scenarios during the migration. + +[#backend-kubernetes] +== Kubernetes + +NOTE: The Kubernetes provider is only supported by Apache NiFi 2.0 or newer. When using NiFi 1.x, use the xref:#backend-zookeeper[] backend instead. + +The Kubernetes backend is used by default (unless the xref:#backend-zookeeper[] backend is configured), and stores all state in Kubernetes objects, in the same namespace as the `NifiCluster` object. + +It requires (nor supports) no configuration. + +[#backend-zookeeper] +== Apache ZooKeeper + +NiFi can also be configured to store its state in Apache ZooKeeper. + +NiFi in cluster mode requires an Apache ZooKeeper ensemble for state management and leader election purposes, the Stackable operator for Apache NiFi does not support single node deployments without ZooKeeper, hence this is a required setting. + +This is enabled by setting the `spec.clusterConfig.zookeeperConfigMapName` to a xref:concepts:service-discovery.adoc[discovery ConfigMap]: + +[source,yaml] +---- +spec: + clusterConfig: + zookeeperConfigMapName: simple-nifi-znode +---- + +The ConfigMap needs to contain two keys: `ZOOKEEPER_HOSTS` containing the value being the ZooKeeper connection string, and `ZOOKEEPER_CHROOT` containing the ZooKeeper chroot. + +The xref:zookeeper:index.adoc[Stackable operator for Apache ZooKeeper] automatically creates this ConfigMap for every ZookeeperZnode object. diff --git a/docs/modules/nifi/pages/usage_guide/index.adoc b/docs/modules/nifi/pages/usage_guide/index.adoc index 79e803be..13dea386 100644 --- a/docs/modules/nifi/pages/usage_guide/index.adoc +++ b/docs/modules/nifi/pages/usage_guide/index.adoc @@ -43,7 +43,7 @@ spec: replicas: 3 ---- -<1> The xref:usage_guide/zookeeper-connection.adoc[ZooKeeper instance] to use. +<1> The xref:usage_guide/clustering.adoc#backend-zookeeper[ZooKeeper instance] to use. <2> How users should xref:usage_guide/security.adoc[authenticate] themselves. <3> xref:usage_guide/extra-volumes.adoc[Extra volumes] with files that can be referenced in custom workflows. <4> xref:usage_guide/resource-configuration.adoc[CPU and memory configuration] can be set per role group. diff --git a/docs/modules/nifi/pages/usage_guide/zookeeper-connection.adoc b/docs/modules/nifi/pages/usage_guide/zookeeper-connection.adoc deleted file mode 100644 index 489dc645..00000000 --- a/docs/modules/nifi/pages/usage_guide/zookeeper-connection.adoc +++ /dev/null @@ -1,14 +0,0 @@ -= Connecting NiFi to Apache ZooKeeper -:description: Connect NiFi to Apache ZooKeeper using the Stackable operator for cluster management, requiring a ZooKeeper ensemble for state management and leader election. - -NiFi in cluster mode requires an Apache ZooKeeper ensemble for state management and leader election purposes, the Stackable operator for Apache NiFi does not support single node deployments without ZooKeeper, hence this is a required setting. - -[source,yaml] ----- -spec: - clusterConfig: - zookeeperConfigMapName: simple-nifi-znode ----- - -Configuration happens via a xref:concepts:service-discovery.adoc[discovery ConfigMap], which needs to contain two keys called `ZOOKEEPER_HOSTS` with the value being the ZooKeeper connection string and `ZOOKEEPER_CHROOT` with the value being the ZooKeeper chroot. -When using the xref:zookeeper:index.adoc[Stackable operator for Apache ZooKeeper], the operator creates this ConfigMap for every ZNode automatically. diff --git a/docs/modules/nifi/partials/nav.adoc b/docs/modules/nifi/partials/nav.adoc index 5c09e5ce..7da49df5 100644 --- a/docs/modules/nifi/partials/nav.adoc +++ b/docs/modules/nifi/partials/nav.adoc @@ -3,7 +3,7 @@ ** xref:nifi:getting_started/first_steps.adoc[] * xref:nifi:usage_guide/index.adoc[] ** xref:nifi:usage_guide/listenerclass.adoc[] -** xref:nifi:usage_guide/zookeeper-connection.adoc[] +** xref:nifi:usage_guide/clustering.adoc[] ** xref:nifi:usage_guide/extra-volumes.adoc[] ** xref:nifi:usage_guide/external_ports.adoc[] ** xref:nifi:usage_guide/security.adoc[] From 1aa50cf183ffeb67e82d6c25cb18f9628db27273 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 9 Apr 2025 12:02:07 +0200 Subject: [PATCH 09/18] Rename clustering_mode to clustering_backend --- rust/operator-binary/src/config/mod.rs | 12 ++++++------ rust/operator-binary/src/controller.rs | 6 +++--- rust/operator-binary/src/crd/mod.rs | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index bd452489..614c24b2 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -249,9 +249,9 @@ pub fn build_nifi_properties( // The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster. properties.insert( "nifi.state.management.provider.cluster".to_string(), - match spec.cluster_config.clustering_mode { - v1alpha1::NifiClusteringMode::ZooKeeper { .. } => "zk-provider".to_string(), - v1alpha1::NifiClusteringMode::Kubernetes { .. } => "kubernetes-provider".to_string(), + match spec.cluster_config.clustering_backend { + v1alpha1::NifiClusteringBackend::ZooKeeper { .. } => "zk-provider".to_string(), + v1alpha1::NifiClusteringBackend::Kubernetes { .. } => "kubernetes-provider".to_string(), }, ); // Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server @@ -561,8 +561,8 @@ pub fn build_nifi_properties( "".to_string(), ); - match spec.cluster_config.clustering_mode { - v1alpha1::NifiClusteringMode::ZooKeeper { .. } => { + match spec.cluster_config.clustering_backend { + v1alpha1::NifiClusteringBackend::ZooKeeper { .. } => { properties.insert( "nifi.cluster.leader.election.implementation".to_string(), "CuratorLeaderElectionManager".to_string(), @@ -581,7 +581,7 @@ pub fn build_nifi_properties( ); } - v1alpha1::NifiClusteringMode::Kubernetes {} => { + v1alpha1::NifiClusteringBackend::Kubernetes {} => { properties.insert( "nifi.cluster.leader.election.implementation".to_string(), "KubernetesLeaderElectionManager".to_string(), diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 047cce4b..b15066fb 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -959,8 +959,8 @@ async fn build_node_rolegroup_statefulset( ..Default::default() }); - match &nifi.spec.cluster_config.clustering_mode { - v1alpha1::NifiClusteringMode::ZooKeeper { + match &nifi.spec.cluster_config.clustering_backend { + v1alpha1::NifiClusteringBackend::ZooKeeper { zookeeper_config_map_name, } => { let zookeeper_env_var = |name: &str| EnvVar { @@ -978,7 +978,7 @@ async fn build_node_rolegroup_statefulset( env_vars.push(zookeeper_env_var("ZOOKEEPER_HOSTS")); env_vars.push(zookeeper_env_var("ZOOKEEPER_CHROOT")); } - v1alpha1::NifiClusteringMode::Kubernetes {} => {} + v1alpha1::NifiClusteringBackend::Kubernetes {} => {} } if let NifiAuthenticationConfig::Oidc { oidc, .. } = nifi_auth_config { diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index b40286c4..38f256d7 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -139,7 +139,7 @@ pub mod versioned { pub vector_aggregator_config_map_name: Option, #[serde(flatten)] - pub clustering_mode: NifiClusteringMode, + pub clustering_backend: NifiClusteringBackend, /// Extra volumes similar to `.spec.volumes` on a Pod to mount into every container, this can be useful to for /// example make client certificates, keytabs or similar things available to processors. These volumes will be @@ -170,7 +170,7 @@ pub mod versioned { // For v1alpha2, consider migrating this to a tagged enum for consistency. #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] #[serde(untagged)] - pub enum NifiClusteringMode { + pub enum NifiClusteringBackend { #[serde(rename_all = "camelCase")] ZooKeeper { /// NiFi can either use ZooKeeper or Kubernetes for managing its cluster state. To use ZooKeeper, provide the name of the From f1a33680c02a14918058dcdb5ef3f41f45077e59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 10 Apr 2025 18:04:54 +0200 Subject: [PATCH 10/18] Make the nifi prepare script more idempotent Previously, these bits would error when re-executed, causing confusing errors when templating fails. --- rust/operator-binary/src/controller.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index b15066fb..6704b950 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -1024,11 +1024,17 @@ async fn build_node_rolegroup_statefulset( format!("echo Importing {KEYSTORE_NIFI_CONTAINER_MOUNT}/keystore.p12 to {STACKABLE_SERVER_TLS_DIR}/keystore.p12"), format!("cp {KEYSTORE_NIFI_CONTAINER_MOUNT}/keystore.p12 {STACKABLE_SERVER_TLS_DIR}/keystore.p12"), format!("echo Importing {KEYSTORE_NIFI_CONTAINER_MOUNT}/truststore.p12 to {STACKABLE_SERVER_TLS_DIR}/truststore.p12"), + // secret-operator currently encrypts keystores with RC2, which NiFi is unable to read: https://github.com/stackabletech/nifi-operator/pull/510 + // As a workaround, reencrypt the keystore with keytool. + // keytool crashes if the target truststore already exists (covering up the true error + // if the init container fails later on in the script), so delete it first. + format!("test ! -e {STACKABLE_SERVER_TLS_DIR}/truststore.p12 || rm {STACKABLE_SERVER_TLS_DIR}/truststore.p12"), format!("keytool -importkeystore -srckeystore {KEYSTORE_NIFI_CONTAINER_MOUNT}/truststore.p12 -destkeystore {STACKABLE_SERVER_TLS_DIR}/truststore.p12 -srcstorepass {STACKABLE_TLS_STORE_PASSWORD} -deststorepass {STACKABLE_TLS_STORE_PASSWORD}"), + "echo Replacing config directory".to_string(), "cp /conf/* /stackable/nifi/conf".to_string(), - "ln -sf /stackable/log_config/logback.xml /stackable/nifi/conf/logback.xml".to_string(), - format!("export NODE_ADDRESS=\"{node_address}\""), + "test -L /stackable/nifi/conf/logback.xml || ln -sf /stackable/log_config/logback.xml /stackable/nifi/conf/logback.xml".to_string(), + format!(r#"export NODE_ADDRESS="{node_address}""#), ]); // This commands needs to go first, as they might set env variables needed by the templating From db206b8e671f6f648c2ac5a838e812490e62799d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 6 May 2025 13:28:24 +0200 Subject: [PATCH 11/18] Only include the used cluster state provider in xml config --- rust/operator-binary/src/config/mod.rs | 45 +++++++++++++++----------- rust/operator-binary/src/controller.rs | 5 ++- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index 614c24b2..b96ea50a 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -19,8 +19,8 @@ use strum::{Display, EnumIter}; use crate::{ crd::{ - v1alpha1, NifiConfig, NifiConfigFragment, NifiRole, NifiStorageConfig, HTTPS_PORT, - PROTOCOL_PORT, + v1alpha1::{self, NifiClusteringBackend}, + NifiConfig, NifiConfigFragment, NifiRole, NifiStorageConfig, HTTPS_PORT, PROTOCOL_PORT, }, operations::graceful_shutdown::graceful_shutdown_config_properties, security::{ @@ -601,33 +601,42 @@ pub fn build_nifi_properties( Ok(format_properties(properties)) } -pub fn build_state_management_xml() -> String { +pub fn build_state_management_xml(clustering_backend: &NifiClusteringBackend) -> String { + // Inert providers are ignored by NiFi itself, but templating still fails if they refer to invalid environment variables, + // so only include the actually used provider. + let cluster_provider = match clustering_backend { + NifiClusteringBackend::ZooKeeper { .. } => { + r#" + zk-provider + org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider + ${env:ZOOKEEPER_HOSTS} + ${env:ZOOKEEPER_CHROOT} + 10 seconds + Open + "# + } + NifiClusteringBackend::Kubernetes {} => { + r#" + kubernetes-provider + org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider + ${env:STACKLET_NAME} + "# + } + }; format!( r#" - local-provider + local-provider org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider {local_state_path} false 16 2 mins - - zk-provider - org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider - ${{env:ZOOKEEPER_HOSTS}} - ${{env:ZOOKEEPER_CHROOT}} - 10 seconds - Open - - - kubernetes-provider - org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider - ${{env:STACKLET_NAME}} - + {cluster_provider} "#, - local_state_path = &NifiRepository::State.mount_path(), + local_state_path = NifiRepository::State.mount_path(), ) } diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 6704b950..331399c2 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -812,7 +812,10 @@ async fn build_node_rolegroup_config_map( rolegroup: rolegroup.clone(), })?, ) - .add_data(NIFI_STATE_MANAGEMENT_XML, build_state_management_xml()) + .add_data( + NIFI_STATE_MANAGEMENT_XML, + build_state_management_xml(&nifi.spec.cluster_config.clustering_backend), + ) .add_data( LOGIN_IDENTITY_PROVIDERS_XML_FILE_NAME, login_identity_provider_xml, From ef63fa96e521e6f672477e88661cf4bad3285950 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 6 May 2025 13:46:15 +0200 Subject: [PATCH 12/18] Add test Copied from smoke since I'm not aware of a native way to exclude specific dimension combinations. --- .../smoke-zookeeperless/00-patch-ns.yaml.j2 | 9 +++ .../smoke-zookeeperless/00-range-limit.yaml | 11 +++ .../smoke-zookeeperless/10-assert.yaml.j2 | 10 +++ ...tor-aggregator-discovery-configmap.yaml.j2 | 9 +++ .../kuttl/smoke-zookeeperless/30-assert.yaml | 25 +++++++ .../30-install-nifi.yaml.j2 | 73 ++++++++++++++++++ .../smoke-zookeeperless/31-assert.yaml.j2 | 6 ++ .../kuttl/smoke-zookeeperless/32-assert.yaml | 19 +++++ .../kuttl/smoke-zookeeperless/33-assert.yaml | 7 ++ .../kuttl/smoke-zookeeperless/40-assert.yaml | 12 +++ .../40-scale-up-nifi.yaml.j2 | 8 ++ .../kuttl/smoke-zookeeperless/50-assert.yaml | 12 +++ .../50-install-test-nifi.yaml | 28 +++++++ .../smoke-zookeeperless/60-assert.yaml.j2 | 9 +++ .../60-prepare-test-nifi.yaml | 7 ++ .../kuttl/smoke-zookeeperless/70-assert.yaml | 42 +++++++++++ .../70-enable-anonymous.yaml.j2 | 9 +++ .../kuttl/smoke-zookeeperless/cacert.pem | 20 +++++ .../kuttl/smoke-zookeeperless/test_nifi.py | 75 +++++++++++++++++++ .../smoke-zookeeperless/test_nifi_metrics.py | 44 +++++++++++ tests/test-definition.yaml | 6 ++ 21 files changed, 441 insertions(+) create mode 100644 tests/templates/kuttl/smoke-zookeeperless/00-patch-ns.yaml.j2 create mode 100644 tests/templates/kuttl/smoke-zookeeperless/00-range-limit.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/10-assert.yaml.j2 create mode 100644 tests/templates/kuttl/smoke-zookeeperless/10-install-vector-aggregator-discovery-configmap.yaml.j2 create mode 100644 tests/templates/kuttl/smoke-zookeeperless/30-assert.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/30-install-nifi.yaml.j2 create mode 100644 tests/templates/kuttl/smoke-zookeeperless/31-assert.yaml.j2 create mode 100644 tests/templates/kuttl/smoke-zookeeperless/32-assert.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/33-assert.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/40-assert.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/40-scale-up-nifi.yaml.j2 create mode 100644 tests/templates/kuttl/smoke-zookeeperless/50-assert.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/50-install-test-nifi.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/60-assert.yaml.j2 create mode 100644 tests/templates/kuttl/smoke-zookeeperless/60-prepare-test-nifi.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/70-assert.yaml create mode 100644 tests/templates/kuttl/smoke-zookeeperless/70-enable-anonymous.yaml.j2 create mode 100644 tests/templates/kuttl/smoke-zookeeperless/cacert.pem create mode 100755 tests/templates/kuttl/smoke-zookeeperless/test_nifi.py create mode 100755 tests/templates/kuttl/smoke-zookeeperless/test_nifi_metrics.py diff --git a/tests/templates/kuttl/smoke-zookeeperless/00-patch-ns.yaml.j2 b/tests/templates/kuttl/smoke-zookeeperless/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/smoke-zookeeperless/00-range-limit.yaml b/tests/templates/kuttl/smoke-zookeeperless/00-range-limit.yaml new file mode 100644 index 00000000..8fd02210 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/00-range-limit.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: v1 +kind: LimitRange +metadata: + name: limit-request-ratio +spec: + limits: + - type: "Container" + maxLimitRequestRatio: + cpu: 5 + memory: 1 diff --git a/tests/templates/kuttl/smoke-zookeeperless/10-assert.yaml.j2 b/tests/templates/kuttl/smoke-zookeeperless/10-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/10-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/smoke-zookeeperless/10-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/smoke-zookeeperless/10-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/10-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/smoke-zookeeperless/30-assert.yaml b/tests/templates/kuttl/smoke-zookeeperless/30-assert.yaml new file mode 100644 index 00000000..ae825d11 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/30-assert.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 1200 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-nifi-node-default +spec: + template: + spec: + terminationGracePeriodSeconds: 300 +status: + readyReplicas: 2 + replicas: 2 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-nifi-node +status: + expectedPods: 2 + currentHealthy: 2 + disruptionsAllowed: 1 diff --git a/tests/templates/kuttl/smoke-zookeeperless/30-install-nifi.yaml.j2 b/tests/templates/kuttl/smoke-zookeeperless/30-install-nifi.yaml.j2 new file mode 100644 index 00000000..c6bb1e16 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/30-install-nifi.yaml.j2 @@ -0,0 +1,73 @@ +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: simple-nifi-users +spec: + provider: + static: + userCredentialsSecret: + name: simple-nifi-admin-credentials +--- +apiVersion: v1 +kind: Secret +metadata: + name: simple-nifi-admin-credentials +stringData: + admin: > + passwordWithSpecialCharacter\@<&>"' +--- +apiVersion: v1 +kind: Secret +metadata: + name: nifi-sensitive-property-key +stringData: + nifiSensitivePropsKey: mYsUp3rS3cr3tk3y +--- +apiVersion: nifi.stackable.tech/v1alpha1 +kind: NifiCluster +metadata: + name: test-nifi +spec: + image: +{% if test_scenario['values']['nifi_new'].find(",") > 0 %} + custom: "{{ test_scenario['values']['nifi_new'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['nifi_new'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['nifi_new'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + listenerClass: {{ test_scenario['values']['listener-class'] }} + authentication: + - authenticationClass: simple-nifi-users + hostHeaderCheck: + allowAll: false + additionalAllowedHosts: + - example.com:1234 + sensitiveProperties: + keySecret: nifi-sensitive-property-key +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + nodes: + envOverrides: + COMMON_VAR: role-value # overridden by role group below + ROLE_VAR: role-value # only defined here at role level + configOverrides: + "nifi.properties": + "nifi.diagnostics.on.shutdown.enabled": "true" + "nifi.diagnostics.on.shutdown.verbose": "false" + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 2 + envOverrides: + COMMON_VAR: group-value # overrides role value + GROUP_VAR: group-value # only defined here at group level + configOverrides: + "nifi.properties": + "nifi.diagnostics.on.shutdown.enabled": "false" + "nifi.diagnostics.on.shutdown.max.filecount": "20" diff --git a/tests/templates/kuttl/smoke-zookeeperless/31-assert.yaml.j2 b/tests/templates/kuttl/smoke-zookeeperless/31-assert.yaml.j2 new file mode 100644 index 00000000..06b1dc78 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/31-assert.yaml.j2 @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 30 +commands: +- script: kubectl get cm -n $NAMESPACE test-nifi-node-default -o yaml | grep -- 'nifi.web.proxy.host=.*example.com:1234' | xargs test ! -z diff --git a/tests/templates/kuttl/smoke-zookeeperless/32-assert.yaml b/tests/templates/kuttl/smoke-zookeeperless/32-assert.yaml new file mode 100644 index 00000000..00da1613 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/32-assert.yaml @@ -0,0 +1,19 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +commands: + # + # Test envOverrides + # + - script: | + kubectl -n $NAMESPACE get sts test-nifi-node-default -o yaml | yq -e '.spec.template.spec.containers[] | select (.name == "nifi") | .env[] | select (.name == "COMMON_VAR" and .value == "group-value")' + kubectl -n $NAMESPACE get sts test-nifi-node-default -o yaml | yq -e '.spec.template.spec.containers[] | select (.name == "nifi") | .env[] | select (.name == "GROUP_VAR" and .value == "group-value")' + kubectl -n $NAMESPACE get sts test-nifi-node-default -o yaml | yq -e '.spec.template.spec.containers[] | select (.name == "nifi") | .env[] | select (.name == "ROLE_VAR" and .value == "role-value")' + # + # Test configOverrides + # + - script: | + kubectl -n $NAMESPACE get cm test-nifi-node-default -o yaml | yq -e '.data."nifi.properties"' | grep "nifi.diagnostics.on.shutdown.enabled=false" + kubectl -n $NAMESPACE get cm test-nifi-node-default -o yaml | yq -e '.data."nifi.properties"' | grep "nifi.diagnostics.on.shutdown.verbose=false" + kubectl -n $NAMESPACE get cm test-nifi-node-default -o yaml | yq -e '.data."nifi.properties"' | grep "nifi.diagnostics.on.shutdown.max.filecount=20" diff --git a/tests/templates/kuttl/smoke-zookeeperless/33-assert.yaml b/tests/templates/kuttl/smoke-zookeeperless/33-assert.yaml new file mode 100644 index 00000000..b2a98140 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/33-assert.yaml @@ -0,0 +1,7 @@ +--- +# This test checks if the containerdebug-state.json file is present and valid +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +commands: + - script: kubectl exec -n $NAMESPACE --container nifi test-nifi-node-default-0 -- cat /stackable/log/containerdebug-state.json | jq --exit-status '"valid JSON"' diff --git a/tests/templates/kuttl/smoke-zookeeperless/40-assert.yaml b/tests/templates/kuttl/smoke-zookeeperless/40-assert.yaml new file mode 100644 index 00000000..88f50b77 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/40-assert.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 1200 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-nifi-node-default +status: + readyReplicas: 3 + replicas: 3 diff --git a/tests/templates/kuttl/smoke-zookeeperless/40-scale-up-nifi.yaml.j2 b/tests/templates/kuttl/smoke-zookeeperless/40-scale-up-nifi.yaml.j2 new file mode 100644 index 00000000..987b0745 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/40-scale-up-nifi.yaml.j2 @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + kubectl --namespace $NAMESPACE + patch nificlusters.nifi.stackable.tech test-nifi + --type=merge --patch '{"spec":{"nodes": {"roleGroups": {"default": {"replicas": 3}}}}}' diff --git a/tests/templates/kuttl/smoke-zookeeperless/50-assert.yaml b/tests/templates/kuttl/smoke-zookeeperless/50-assert.yaml new file mode 100644 index 00000000..d511ff46 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/50-assert.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-nifi +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/smoke-zookeeperless/50-install-test-nifi.yaml b/tests/templates/kuttl/smoke-zookeeperless/50-install-test-nifi.yaml new file mode 100644 index 00000000..3bc67dbc --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/50-install-test-nifi.yaml @@ -0,0 +1,28 @@ +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-nifi + labels: + app: test-nifi +spec: + replicas: 1 + selector: + matchLabels: + app: test-nifi + template: + metadata: + labels: + app: test-nifi + spec: + containers: + - name: test-nifi + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + command: ["sleep", "infinity"] + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "128Mi" + cpu: "400m" diff --git a/tests/templates/kuttl/smoke-zookeeperless/60-assert.yaml.j2 b/tests/templates/kuttl/smoke-zookeeperless/60-assert.yaml.j2 new file mode 100644 index 00000000..b62dc7c6 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/60-assert.yaml.j2 @@ -0,0 +1,9 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +commands: + - script: kubectl exec -n $NAMESPACE test-nifi-0 -- python /tmp/test_nifi.py -u admin -p 'passwordWithSpecialCharacter\@<&>"'"'" -n $NAMESPACE -c 3 +{% if test_scenario['values']['nifi_new'].startswith('1.') %} + - script: kubectl exec -n $NAMESPACE test-nifi-0 -- python /tmp/test_nifi_metrics.py -n $NAMESPACE +{% endif %} diff --git a/tests/templates/kuttl/smoke-zookeeperless/60-prepare-test-nifi.yaml b/tests/templates/kuttl/smoke-zookeeperless/60-prepare-test-nifi.yaml new file mode 100644 index 00000000..c3ac9b79 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/60-prepare-test-nifi.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl cp -n $NAMESPACE ./test_nifi_metrics.py test-nifi-0:/tmp + - script: kubectl cp -n $NAMESPACE ./test_nifi.py test-nifi-0:/tmp + - script: kubectl cp -n $NAMESPACE ./cacert.pem test-nifi-0:/tmp diff --git a/tests/templates/kuttl/smoke-zookeeperless/70-assert.yaml b/tests/templates/kuttl/smoke-zookeeperless/70-assert.yaml new file mode 100644 index 00000000..29b0ff4d --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/70-assert.yaml @@ -0,0 +1,42 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: v1 +kind: Event +reason: Started +source: + component: kubelet +involvedObject: + apiVersion: v1 + kind: Pod + name: test-nifi-node-default-0 +--- +apiVersion: v1 +kind: Event +reason: Started +source: + component: kubelet +involvedObject: + apiVersion: v1 + kind: Pod + name: test-nifi-node-default-1 +--- +apiVersion: v1 +kind: Event +reason: Started +source: + component: kubelet +involvedObject: + apiVersion: v1 + kind: Pod + name: test-nifi-node-default-2 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-nifi-node-default +status: + readyReplicas: 3 + replicas: 3 diff --git a/tests/templates/kuttl/smoke-zookeeperless/70-enable-anonymous.yaml.j2 b/tests/templates/kuttl/smoke-zookeeperless/70-enable-anonymous.yaml.j2 new file mode 100644 index 00000000..f39ce021 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/70-enable-anonymous.yaml.j2 @@ -0,0 +1,9 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + kubectl --namespace $NAMESPACE + patch nificlusters.nifi.stackable.tech test-nifi + --type=merge --patch '{"spec":{"config": {"authentication": {"allowAnonymousAccess": true}}}}' + - command: kubectl rollout restart statefulset test-nifi-node-default --namespace $NAMESPACE diff --git a/tests/templates/kuttl/smoke-zookeeperless/cacert.pem b/tests/templates/kuttl/smoke-zookeeperless/cacert.pem new file mode 100644 index 00000000..ebe73910 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/cacert.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDVTCCAj2gAwIBAgIJAJ8/0entaUgnMA0GCSqGSIb3DQEBCwUAMCYxJDAiBgNV +BAMMG3NlY3JldC1vcGVyYXRvciBzZWxmLXNpZ25lZDAeFw0yMjAxMTIxNDU3NDVa +Fw0yNDAxMTIxNTAyNDVaMCYxJDAiBgNVBAMMG3NlY3JldC1vcGVyYXRvciBzZWxm +LXNpZ25lZDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALLKNGhq5gE+ +mL9zFCLqtc22CLk8BSbjesjUEhBK3kxDvFDa2ou5atH0eUFjtOSszay2oBrCTVWK +wZBsdUkL0HkW/wq9A8EUkQ8EownXnsxpI61CLNGLPpBZc+CRHhyWDD6BqwGvEHEv +W546mh6k49//7zCiYfTK9/LCKBCFdDV6Sb7mNJ8HbNUj54uwC6iOgH25OCRDh4Bt +zXoSrV9GLAm6AM25ZFo+ONOUBMtv7pavaR0CFMnAixl2NKV2wyLBYAYnJgdJFzGD +8mP6HwuR7e2g7PkcyC01EnX4iOIuuKHT/Xl9ynut4nHI7g6popotgashrQ5Jf8MS +Kf98O12LzSMCAwEAAaOBhTCBgjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBRf +U9OxCBwCqYiUjWqY05sz3a6cmjBABgNVHSMEOTA3oSqkKDAmMSQwIgYDVQQDDBtz +ZWNyZXQtb3BlcmF0b3Igc2VsZi1zaWduZWSCCQCfP9Hp7WlIJzAOBgNVHQ8BAf8E +BAMCAYYwDQYJKoZIhvcNAQELBQADggEBAA8Flk1XOb1pH33Mbie5ronP2xw/xf6t +Ox3PBEZ+5/jSPdIwoSaRp9JoP0L9Rg68jzcl5QMa4pOYWe+C1q8aZP0tjfq1eJfO +UD5ik2DQgEuoF1ELgW1xoM38vkd8wgE711swDHK2zAsOudSzO4XZ4rQ6kaXXtoej +2kFhxDYcC+na90LdkJM0kAqrjxlFaP7WgUK+HA2iN00CFSOI9FVdppLtootbcb+y ++WfXxM7gA9Exg4f2vKGVx7UxB/k4AbPvogBQZvK8VoAQocAhWrw7o2rqAesAw6JD +WwQjM69TlEfbHYXtTfMbi01Wi5TtVhFCjyXK6KDsqSgU+9McExIy70k= +-----END CERTIFICATE----- diff --git a/tests/templates/kuttl/smoke-zookeeperless/test_nifi.py b/tests/templates/kuttl/smoke-zookeeperless/test_nifi.py new file mode 100755 index 00000000..b885c09f --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/test_nifi.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +import requests +import json +import argparse +import urllib3 +from time import sleep + + +def get_token(nifi_host, username, password): + nifi_headers = { + 'content-type': 'application/x-www-form-urlencoded; charset=UTF-8', + } + data = {'username': username, 'password': password} + + # TODO: handle actual errors when connecting properly + nifi_url = nifi_host + '/nifi-api/access/token' + response = requests.post(nifi_url, headers=nifi_headers, data=data, verify=False) # , cert='./tmp/cacert.pem') + + if response.ok: + nifi_token = response.content.decode('utf-8') + return "Bearer " + nifi_token + else: + print(f"Failed to get token: {response.status_code}: {response.content}") + exit(-1) + + +if __name__ == '__main__': + # Construct an argument parser + all_args = argparse.ArgumentParser() + + # Add arguments to the parser + all_args.add_argument("-u", "--user", required=True, + help="Username to connect as") + all_args.add_argument("-p", "--password", required=True, + help="Password for the user") + all_args.add_argument("-n", "--namespace", required=True, + help="Namespace the test is running in") + all_args.add_argument("-c", "--count", required=True, + help="The expected number of Nodes") + args = vars(all_args.parse_args()) + + # disable warnings as we have specified non-verified https connections + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + host = f"https://test-nifi-node-default-1.test-nifi-node-default.{args['namespace']}.svc.cluster.local:8443" + token = get_token(host, args['user'], args['password']) + headers = {'Authorization': token} + node_count = int(args['count']) + + x = 0 + while x < 15: + url = host + '/nifi-api/controller/cluster' + cluster = requests.get(url, headers=headers, verify=False) # , cert='/tmp/cacert.pem') + if cluster.status_code != 200: + print("Waiting for cluster...") + else: + cluster_data = json.loads(cluster.content.decode('utf-8')) + nodes = cluster_data['cluster']['nodes'] + if len(nodes) != node_count: + print(f"Cluster should have {node_count} nodes at this stage, but has: {len(nodes)}") + else: + connected = True + for node in nodes: + if node['status'] != "CONNECTED": + print(f"Node {node['nodeId']} is in state {node['status']} but should have been CONNECTED") + connected = False + if connected: + print("Test succeeded!") + exit(0) + print("Retrying...") + x = x + 1 + sleep(10) + + print("Test failed") + exit(-1) diff --git a/tests/templates/kuttl/smoke-zookeeperless/test_nifi_metrics.py b/tests/templates/kuttl/smoke-zookeeperless/test_nifi_metrics.py new file mode 100755 index 00000000..86076038 --- /dev/null +++ b/tests/templates/kuttl/smoke-zookeeperless/test_nifi_metrics.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +import argparse +import requests +import time +from requests.exceptions import ConnectionError + +if __name__ == '__main__': + # Construct an argument parser + all_args = argparse.ArgumentParser() + # Add arguments to the parser + all_args.add_argument("-m", "--metric", required=False, default="nifi_amount_bytes_read", + help="The name of a certain metric to check") + all_args.add_argument("-n", "--namespace", required=True, + help="The namespace the test is running in") + all_args.add_argument("-p", "--port", required=False, default="8081", + help="The port where metrics are exposed") + all_args.add_argument("-t", "--timeout", required=False, default="120", + help="The timeout in seconds to wait for the metrics port to be opened") + + args = vars(all_args.parse_args()) + metric_name = args["metric"] + namespace = args["namespace"] + port = args["port"] + timeout = int(args["timeout"]) + + url = f"http://test-nifi-node-default-0.test-nifi-node-default.{namespace}.svc.cluster.local:{port}/metrics" + + # wait for 'timeout' seconds + t_end = time.time() + timeout + while time.time() < t_end: + try: + response = requests.post(url) + response.raise_for_status() + if metric_name in response.text: + print("Test metrics succeeded!") + exit(0) + else: + print(f"Could not find metric [{metric_name}] in response:\n {response.text}") + time.sleep(timeout) + except ConnectionError: + # NewConnectionError is expected until metrics are available + time.sleep(10) + + exit(-1) diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 0336c48a..c70246c9 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -64,6 +64,12 @@ tests: - zookeeper - listener-class - openshift + # Merge into smoke once we drop support for NiFi 1.x + - name: smoke-zookeeperless + dimensions: + - nifi_new + - listener-class + - openshift - name: resources dimensions: - nifi From 4f6352097b97b9d98184d995e5af254f9fc94fbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 6 May 2025 13:57:42 +0200 Subject: [PATCH 13/18] Explicitly fail when trying to use NiFi 1.x in zookeeperless mode --- rust/operator-binary/src/config/mod.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index b96ea50a..b4c40068 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -5,7 +5,7 @@ use std::{ use jvm::build_merged_jvm_config; use product_config::{types::PropertyNameKind, ProductConfigManager}; -use snafu::{ResultExt, Snafu}; +use snafu::{ensure, ResultExt, Snafu}; use stackable_operator::{ commons::resources::Resources, memory::MemoryQuantity, @@ -95,6 +95,11 @@ pub enum Error { #[snafu(display("failed to generate OIDC config"))] GenerateOidcConfig { source: oidc::Error }, + + #[snafu(display( + "NiFi 1.x requires ZooKeeper (hint: upgrade to NiFi 2.x or set .spec.clusterConfig.zookeeperConfigMapName)" + ))] + Nifi1RequiresZookeeper, } /// Create the NiFi bootstrap.conf @@ -142,13 +147,15 @@ pub fn build_nifi_properties( overrides: BTreeMap, product_version: &str, ) -> Result { + // TODO: Remove once we dropped support for all NiFi 1.x versions + let is_nifi_1 = product_version.starts_with("1."); + let mut properties = BTreeMap::new(); // Core Properties // According to https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance#MigrationGuidance-Migratingto2.0.0-M1 // The nifi.flow.configuration.file property in nifi.properties must be changed to reference // "flow.json.gz" instead of "flow.xml.gz" - // TODO: Remove once we dropped support for all 1.x.x versions - let flow_file_name = if product_version.starts_with("1.") { + let flow_file_name = if is_nifi_1 { "flow.xml.gz" } else { "flow.json.gz" @@ -582,6 +589,8 @@ pub fn build_nifi_properties( } v1alpha1::NifiClusteringBackend::Kubernetes {} => { + ensure!(!is_nifi_1, Nifi1RequiresZookeeperSnafu); + properties.insert( "nifi.cluster.leader.election.implementation".to_string(), "KubernetesLeaderElectionManager".to_string(), From 212bdffa81f54d88f7732ad7d2c6eedfb47d0708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 8 May 2025 15:41:58 +0200 Subject: [PATCH 14/18] Minor docs rewording --- docs/modules/nifi/pages/usage_guide/clustering.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/nifi/pages/usage_guide/clustering.adoc b/docs/modules/nifi/pages/usage_guide/clustering.adoc index 66ec4ac1..417c9f9b 100644 --- a/docs/modules/nifi/pages/usage_guide/clustering.adoc +++ b/docs/modules/nifi/pages/usage_guide/clustering.adoc @@ -18,7 +18,7 @@ NOTE: The Kubernetes provider is only supported by Apache NiFi 2.0 or newer. Whe The Kubernetes backend is used by default (unless the xref:#backend-zookeeper[] backend is configured), and stores all state in Kubernetes objects, in the same namespace as the `NifiCluster` object. -It requires (nor supports) no configuration. +It takes no configuration. [#backend-zookeeper] == Apache ZooKeeper From a86fff867bc9c533d36fd2827f9b0e1e5d2938b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 8 May 2025 16:29:41 +0200 Subject: [PATCH 15/18] Fix a compile error that I missed --- rust/operator-binary/src/main.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 6556e381..50c4501d 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use clap::Parser; +use crd::v1alpha1::NifiClusteringBackend; use futures::stream::StreamExt; use stackable_operator::{ YamlSchema, @@ -182,5 +183,10 @@ fn references_config_map( return false; }; - nifi.spec.cluster_config.zookeeper_config_map_name == config_map.name_any() + match &nifi.spec.cluster_config.clustering_backend { + NifiClusteringBackend::ZooKeeper { + zookeeper_config_map_name, + } => *zookeeper_config_map_name == config_map.name_any(), + NifiClusteringBackend::Kubernetes {} => false, + } } From fc3b29579c991de33e691e9fe32612db825c4656 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 8 May 2025 16:35:21 +0200 Subject: [PATCH 16/18] refmt with the right rustfmt version --- rust/operator-binary/src/config/mod.rs | 14 ++--- rust/operator-binary/src/controller.rs | 8 +-- rust/operator-binary/src/crd/affinity.rs | 65 +++++++++++------------- rust/operator-binary/src/main.rs | 11 ++-- 4 files changed, 43 insertions(+), 55 deletions(-) diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs index ac213c49..477cd0f7 100644 --- a/rust/operator-binary/src/config/mod.rs +++ b/rust/operator-binary/src/config/mod.rs @@ -743,9 +743,7 @@ mod tests { "#; let bootstrap_conf = construct_bootstrap_conf(input); - assert_eq!( - bootstrap_conf, - indoc! {" + assert_eq!(bootstrap_conf, indoc! {" conf.dir=./conf graceful.shutdown.seconds=300 java=java @@ -764,8 +762,7 @@ mod tests { lib.dir=./lib preserve.environment=false run.as= - "} - ); + "}); } #[test] @@ -811,9 +808,7 @@ mod tests { "#; let bootstrap_conf = construct_bootstrap_conf(input); - assert_eq!( - bootstrap_conf, - indoc! {" + assert_eq!(bootstrap_conf, indoc! {" conf.dir=./conf graceful.shutdown.seconds=300 java=java @@ -834,8 +829,7 @@ mod tests { lib.dir=./lib preserve.environment=false run.as= - "} - ); + "}); } fn construct_bootstrap_conf(nifi_cluster: &str) -> String { diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index b1e06641..c9f56312 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -1250,10 +1250,10 @@ async fn build_node_rolegroup_statefulset( } nifi_auth_config - .add_volumes_and_mounts( - &mut pod_builder, - vec![&mut container_prepare, container_nifi], - ) + .add_volumes_and_mounts(&mut pod_builder, vec![ + &mut container_prepare, + container_nifi, + ]) .context(AddAuthVolumesSnafu)?; let metadata = ObjectMetaBuilder::new() diff --git a/rust/operator-binary/src/crd/affinity.rs b/rust/operator-binary/src/crd/affinity.rs index ac1e570e..bb1ed6f9 100644 --- a/rust/operator-binary/src/crd/affinity.rs +++ b/rust/operator-binary/src/crd/affinity.rs @@ -61,39 +61,36 @@ mod tests { serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); let merged_config = nifi.merged_config(&NifiRole::Node, "default").unwrap(); - assert_eq!( - merged_config.affinity, - StackableAffinity { - pod_affinity: None, - pod_anti_affinity: Some(PodAntiAffinity { - preferred_during_scheduling_ignored_during_execution: Some(vec![ - WeightedPodAffinityTerm { - pod_affinity_term: PodAffinityTerm { - label_selector: Some(LabelSelector { - match_expressions: None, - match_labels: Some(BTreeMap::from([ - ("app.kubernetes.io/name".to_string(), "nifi".to_string(),), - ( - "app.kubernetes.io/instance".to_string(), - "simple-nifi".to_string(), - ), - ( - "app.kubernetes.io/component".to_string(), - "node".to_string(), - ) - ])) - }), - topology_key: "kubernetes.io/hostname".to_string(), - ..Default::default() - }, - weight: 70 - } - ]), - required_during_scheduling_ignored_during_execution: None, - }), - node_affinity: None, - node_selector: None, - } - ); + assert_eq!(merged_config.affinity, StackableAffinity { + pod_affinity: None, + pod_anti_affinity: Some(PodAntiAffinity { + preferred_during_scheduling_ignored_during_execution: Some(vec![ + WeightedPodAffinityTerm { + pod_affinity_term: PodAffinityTerm { + label_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(BTreeMap::from([ + ("app.kubernetes.io/name".to_string(), "nifi".to_string(),), + ( + "app.kubernetes.io/instance".to_string(), + "simple-nifi".to_string(), + ), + ( + "app.kubernetes.io/component".to_string(), + "node".to_string(), + ) + ])) + }), + topology_key: "kubernetes.io/hostname".to_string(), + ..Default::default() + }, + weight: 70 + } + ]), + required_during_scheduling_ignored_during_execution: None, + }), + node_affinity: None, + node_selector: None, + }); } } diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 50c4501d..a3fec9f4 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -92,13 +92,10 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let event_recorder = Arc::new(Recorder::new( - client.as_kube_client(), - Reporter { - controller: NIFI_FULL_CONTROLLER_NAME.to_string(), - instance: None, - }, - )); + let event_recorder = Arc::new(Recorder::new(client.as_kube_client(), Reporter { + controller: NIFI_FULL_CONTROLLER_NAME.to_string(), + instance: None, + })); let nifi_controller = Controller::new( watch_namespace.get_api::>(&client), From 69bb4cca2eef20d0ff4b6a993b1a4f58fc7b306a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 8 May 2025 16:43:09 +0200 Subject: [PATCH 17/18] refmt python --- .../kuttl/smoke-zookeeperless/test_nifi.py | 58 +++++++++++-------- .../smoke-zookeeperless/test_nifi_metrics.py | 38 ++++++++---- 2 files changed, 62 insertions(+), 34 deletions(-) diff --git a/tests/templates/kuttl/smoke-zookeeperless/test_nifi.py b/tests/templates/kuttl/smoke-zookeeperless/test_nifi.py index b885c09f..4280424a 100755 --- a/tests/templates/kuttl/smoke-zookeeperless/test_nifi.py +++ b/tests/templates/kuttl/smoke-zookeeperless/test_nifi.py @@ -8,61 +8,71 @@ def get_token(nifi_host, username, password): nifi_headers = { - 'content-type': 'application/x-www-form-urlencoded; charset=UTF-8', + "content-type": "application/x-www-form-urlencoded; charset=UTF-8", } - data = {'username': username, 'password': password} + data = {"username": username, "password": password} # TODO: handle actual errors when connecting properly - nifi_url = nifi_host + '/nifi-api/access/token' - response = requests.post(nifi_url, headers=nifi_headers, data=data, verify=False) # , cert='./tmp/cacert.pem') + nifi_url = nifi_host + "/nifi-api/access/token" + response = requests.post( + nifi_url, headers=nifi_headers, data=data, verify=False + ) # , cert='./tmp/cacert.pem') if response.ok: - nifi_token = response.content.decode('utf-8') + nifi_token = response.content.decode("utf-8") return "Bearer " + nifi_token else: print(f"Failed to get token: {response.status_code}: {response.content}") exit(-1) -if __name__ == '__main__': +if __name__ == "__main__": # Construct an argument parser all_args = argparse.ArgumentParser() # Add arguments to the parser - all_args.add_argument("-u", "--user", required=True, - help="Username to connect as") - all_args.add_argument("-p", "--password", required=True, - help="Password for the user") - all_args.add_argument("-n", "--namespace", required=True, - help="Namespace the test is running in") - all_args.add_argument("-c", "--count", required=True, - help="The expected number of Nodes") + all_args.add_argument("-u", "--user", required=True, help="Username to connect as") + all_args.add_argument( + "-p", "--password", required=True, help="Password for the user" + ) + all_args.add_argument( + "-n", "--namespace", required=True, help="Namespace the test is running in" + ) + all_args.add_argument( + "-c", "--count", required=True, help="The expected number of Nodes" + ) args = vars(all_args.parse_args()) # disable warnings as we have specified non-verified https connections urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) host = f"https://test-nifi-node-default-1.test-nifi-node-default.{args['namespace']}.svc.cluster.local:8443" - token = get_token(host, args['user'], args['password']) - headers = {'Authorization': token} - node_count = int(args['count']) + token = get_token(host, args["user"], args["password"]) + headers = {"Authorization": token} + node_count = int(args["count"]) x = 0 while x < 15: - url = host + '/nifi-api/controller/cluster' - cluster = requests.get(url, headers=headers, verify=False) # , cert='/tmp/cacert.pem') + url = host + "/nifi-api/controller/cluster" + cluster = requests.get( + url, headers=headers, verify=False + ) # , cert='/tmp/cacert.pem') if cluster.status_code != 200: print("Waiting for cluster...") else: - cluster_data = json.loads(cluster.content.decode('utf-8')) - nodes = cluster_data['cluster']['nodes'] + cluster_data = json.loads(cluster.content.decode("utf-8")) + nodes = cluster_data["cluster"]["nodes"] if len(nodes) != node_count: - print(f"Cluster should have {node_count} nodes at this stage, but has: {len(nodes)}") + print( + f"Cluster should have {node_count} nodes at this stage, but has: {len(nodes)}" + ) else: connected = True for node in nodes: - if node['status'] != "CONNECTED": - print(f"Node {node['nodeId']} is in state {node['status']} but should have been CONNECTED") + if node["status"] != "CONNECTED": + print( + f"Node {node['nodeId']} is in state {node['status']} but should have been CONNECTED" + ) connected = False if connected: print("Test succeeded!") diff --git a/tests/templates/kuttl/smoke-zookeeperless/test_nifi_metrics.py b/tests/templates/kuttl/smoke-zookeeperless/test_nifi_metrics.py index 86076038..fb75a747 100755 --- a/tests/templates/kuttl/smoke-zookeeperless/test_nifi_metrics.py +++ b/tests/templates/kuttl/smoke-zookeeperless/test_nifi_metrics.py @@ -4,18 +4,34 @@ import time from requests.exceptions import ConnectionError -if __name__ == '__main__': +if __name__ == "__main__": # Construct an argument parser all_args = argparse.ArgumentParser() # Add arguments to the parser - all_args.add_argument("-m", "--metric", required=False, default="nifi_amount_bytes_read", - help="The name of a certain metric to check") - all_args.add_argument("-n", "--namespace", required=True, - help="The namespace the test is running in") - all_args.add_argument("-p", "--port", required=False, default="8081", - help="The port where metrics are exposed") - all_args.add_argument("-t", "--timeout", required=False, default="120", - help="The timeout in seconds to wait for the metrics port to be opened") + all_args.add_argument( + "-m", + "--metric", + required=False, + default="nifi_amount_bytes_read", + help="The name of a certain metric to check", + ) + all_args.add_argument( + "-n", "--namespace", required=True, help="The namespace the test is running in" + ) + all_args.add_argument( + "-p", + "--port", + required=False, + default="8081", + help="The port where metrics are exposed", + ) + all_args.add_argument( + "-t", + "--timeout", + required=False, + default="120", + help="The timeout in seconds to wait for the metrics port to be opened", + ) args = vars(all_args.parse_args()) metric_name = args["metric"] @@ -35,7 +51,9 @@ print("Test metrics succeeded!") exit(0) else: - print(f"Could not find metric [{metric_name}] in response:\n {response.text}") + print( + f"Could not find metric [{metric_name}] in response:\n {response.text}" + ) time.sleep(timeout) except ConnectionError: # NewConnectionError is expected until metrics are available From cc48070109b6bc9d47651ab909bc584d9124ff0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 13 May 2025 14:21:56 +0200 Subject: [PATCH 18/18] Changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0497f782..ff3b0129 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. - Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept. - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. +- NiFi 2.x now supports storing cluster state in Kuberntes instead of ZooKeeper ([#775]). ### Changed @@ -37,6 +38,7 @@ All notable changes to this project will be documented in this file. [#771]: https://github.com/stackabletech/nifi-operator/pull/771 [#772]: https://github.com/stackabletech/nifi-operator/pull/772 [#774]: https://github.com/stackabletech/nifi-operator/pull/774 +[#775]: https://github.com/stackabletech/nifi-operator/pull/775 [#776]: https://github.com/stackabletech/nifi-operator/pull/776 [#782]: https://github.com/stackabletech/nifi-operator/pull/782 [#787]: https://github.com/stackabletech/nifi-operator/pull/787