Skip to content

Support Zookeeperless NiFi #775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
61f89d3
Resync Cargo.nix
nightkr Apr 8, 2025
fd6ffad
Add Kubernetes state provider
nightkr Apr 8, 2025
914181f
Use raw strings to clean up escaping...
nightkr Apr 8, 2025
aaf6c2b
Prefix the Kubernetes state configmaps by the actual stacklet name, r…
nightkr Apr 8, 2025
e4609dc
Make clustering mode configurable, add Kubernetes mode
nightkr Apr 9, 2025
7daf23e
Regenerate CRD
nightkr Apr 9, 2025
82a9bf6
Rewrite zookeeperConfigMapName docs to emphasize that it is optional …
nightkr Apr 9, 2025
0ff1c22
Update docs to mention Kubernetes backend
nightkr Apr 9, 2025
1aa50cf
Rename clustering_mode to clustering_backend
nightkr Apr 9, 2025
f1a3368
Make the nifi prepare script more idempotent
nightkr Apr 10, 2025
db206b8
Only include the used cluster state provider in xml config
nightkr May 6, 2025
ef63fa9
Add test
nightkr May 6, 2025
4f63520
Explicitly fail when trying to use NiFi 1.x in zookeeperless mode
nightkr May 6, 2025
212bdff
Minor docs rewording
nightkr May 8, 2025
1eb1b66
Merge branch 'main' into feature/zkless
nightkr May 8, 2025
a86fff8
Fix a compile error that I missed
nightkr May 8, 2025
fc3b295
refmt with the right rustfmt version
nightkr May 8, 2025
69bb4cc
refmt python
nightkr May 8, 2025
cc48070
Changelog
nightkr May 13, 2025
2776378
Update CHANGELOG.md
nightkr May 26, 2025
8824341
Merge branch 'main' into feature/zkless
maltesander May 27, 2025
a7a1bb6
split smoke test into v1 and v2
maltesander May 27, 2025
52b67d2
newline
maltesander May 27, 2025
af379f3
rename dimensions
maltesander May 27, 2025
d02c82d
remove trailing spaces
maltesander May 27, 2025
c6e3b28
Formatting
nightkr May 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept.
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- NiFi 2.x now supports storing cluster state in Kubernetes instead of ZooKeeper ([#775]).
- Add test for Apache Iceberg integration ([#785]).

### Changed
Expand Down Expand Up @@ -38,6 +39,7 @@ All notable changes to this project will be documented in this file.
[#771]: https://github.com/stackabletech/nifi-operator/pull/771
[#772]: https://github.com/stackabletech/nifi-operator/pull/772
[#774]: https://github.com/stackabletech/nifi-operator/pull/774
[#775]: https://github.com/stackabletech/nifi-operator/pull/775
[#776]: https://github.com/stackabletech/nifi-operator/pull/776
[#782]: https://github.com/stackabletech/nifi-operator/pull/782
[#785]: https://github.com/stackabletech/nifi-operator/pull/785
Expand Down
10 changes: 8 additions & 2 deletions deploy/helm/nifi-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ spec:
description: A NiFi cluster stacklet. This resource is managed by the Stackable operator for Apache NiFi. Find more information on how to use it and the resources that the operator generates in the [operator documentation](https://docs.stackable.tech/home/nightly/nifi/).
properties:
clusterConfig:
anyOf:
- required:
- zookeeperConfigMapName
- {}
description: Settings that affect all roles and role groups. The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
properties:
authentication:
Expand Down Expand Up @@ -194,12 +198,14 @@ spec:
nullable: true
type: string
zookeeperConfigMapName:
description: NiFi requires a ZooKeeper cluster connection to run. Provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource.
description: |-
NiFi can either use ZooKeeper or Kubernetes for managing its cluster state. To use ZooKeeper, provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource.

The Kubernetes provider will be used if this field is unset. Kubernetes is only supported for NiFi 2.x and newer, NiFi 1.x requires ZooKeeper.
type: string
required:
- authentication
- sensitiveProperties
- zookeeperConfigMapName
type: object
clusterOperation:
default:
Expand Down
27 changes: 26 additions & 1 deletion deploy/helm/nifi-operator/templates/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,11 @@ rules:
- apiGroups:
- ""
resources:
- configmaps
- secrets
- serviceaccounts
# This is redundant with the rule for specifically about configmaps
# (due to clustering), but we read them for other purposes too
- configmaps
verbs:
- get
- apiGroups:
Expand All @@ -144,6 +146,29 @@ rules:
- events
verbs:
- create
# Required for Kubernetes-managed clustering, see https://nifi.apache.org/nifi-docs/administration-guide.html#kubernetes-clustering
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- get
- update
# undocumented but required
- patch
# Required for Kubernetes cluster state provider, see https://nifi.apache.org/nifi-docs/administration-guide.html#kubernetes-configmap-cluster-state-provider
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- delete
- get
- list
- patch
- update
{{ if .Capabilities.APIVersions.Has "security.openshift.io/v1" }}
- apiGroups:
- security.openshift.io
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/nifi/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Every role group is accessible through it's own Service, and there is a Service

== Dependencies

Apache NiFi depends on Apache ZooKeeper which you can run in Kubernetes with the xref:zookeeper:index.adoc[].
Apache NiFi 1.x depends on Apache ZooKeeper which you can run in Kubernetes with the xref:zookeeper:index.adoc[].

== [[demos]]Demos

Expand Down
41 changes: 41 additions & 0 deletions docs/modules/nifi/pages/usage_guide/clustering.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
= Clustering
:description: Apache NiFi requires a backend for cluster management, and supports either Kubernetes or Apache ZooKeeper.
:page-aliases: usage_guide/zookeeper-connection.adoc

Apache NiFi requires{empty}footnote:[Apache NiFi also supports a single-node mode with no cluster backend, but this is not supported by the Stackable Operator for Apache NiFi. The Stackable Operator does require a cluster backend.] an external backend for state management and leader election.

Currently, the Stackable Operator for Apache NiFi supports the following backends:

- xref:#backend-kubernetes[]
- xref:#backend-zookeeper[]

CAUTION: The cluster backend of an existing cluster should never be changed. Otherwise data loss may occur, both due to losing NiFi processor state, and due to potential split-brain scenarios during the migration.

[#backend-kubernetes]
== Kubernetes

NOTE: The Kubernetes provider is only supported by Apache NiFi 2.0 or newer. When using NiFi 1.x, use the xref:#backend-zookeeper[] backend instead.

The Kubernetes backend is used by default (unless the xref:#backend-zookeeper[] backend is configured), and stores all state in Kubernetes objects, in the same namespace as the `NifiCluster` object.

It takes no configuration.

[#backend-zookeeper]
== Apache ZooKeeper

NiFi can also be configured to store its state in Apache ZooKeeper.

NiFi in cluster mode requires an Apache ZooKeeper ensemble for state management and leader election purposes, the Stackable operator for Apache NiFi does not support single node deployments without ZooKeeper, hence this is a required setting.

This is enabled by setting the `spec.clusterConfig.zookeeperConfigMapName` to a xref:concepts:service-discovery.adoc[discovery ConfigMap]:

[source,yaml]
----
spec:
clusterConfig:
zookeeperConfigMapName: simple-nifi-znode
----

The ConfigMap needs to contain two keys: `ZOOKEEPER_HOSTS` containing the value being the ZooKeeper connection string, and `ZOOKEEPER_CHROOT` containing the ZooKeeper chroot.

The xref:zookeeper:index.adoc[Stackable operator for Apache ZooKeeper] automatically creates this ConfigMap for every ZookeeperZnode object.
2 changes: 1 addition & 1 deletion docs/modules/nifi/pages/usage_guide/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ spec:
replicas: 3
----

<1> The xref:usage_guide/zookeeper-connection.adoc[ZooKeeper instance] to use.
<1> The xref:usage_guide/clustering.adoc#backend-zookeeper[ZooKeeper instance] to use.
<2> How users should xref:usage_guide/security.adoc[authenticate] themselves.
<3> xref:usage_guide/extra-volumes.adoc[Extra volumes] with files that can be referenced in custom workflows.
<4> xref:usage_guide/resource-configuration.adoc[CPU and memory configuration] can be set per role group.
Expand Down
14 changes: 0 additions & 14 deletions docs/modules/nifi/pages/usage_guide/zookeeper-connection.adoc

This file was deleted.

2 changes: 1 addition & 1 deletion docs/modules/nifi/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
** xref:nifi:getting_started/first_steps.adoc[]
* xref:nifi:usage_guide/index.adoc[]
** xref:nifi:usage_guide/listenerclass.adoc[]
** xref:nifi:usage_guide/zookeeper-connection.adoc[]
** xref:nifi:usage_guide/clustering.adoc[]
** xref:nifi:usage_guide/extra-volumes.adoc[]
** xref:nifi:usage_guide/security.adoc[]
** xref:nifi:usage_guide/resource-configuration.adoc[]
Expand Down
115 changes: 81 additions & 34 deletions rust/operator-binary/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use jvm::build_merged_jvm_config;
use product_config::{ProductConfigManager, types::PropertyNameKind};
use snafu::{ResultExt, Snafu};
use snafu::{ResultExt, Snafu, ensure};
use stackable_operator::{
commons::resources::Resources,
memory::MemoryQuantity,
Expand All @@ -20,7 +20,7 @@ use strum::{Display, EnumIter};
use crate::{
crd::{
HTTPS_PORT, NifiConfig, NifiConfigFragment, NifiRole, NifiStorageConfig, PROTOCOL_PORT,
v1alpha1,
v1alpha1::{self, NifiClusteringBackend},
},
operations::graceful_shutdown::graceful_shutdown_config_properties,
security::{
Expand Down Expand Up @@ -96,6 +96,11 @@ pub enum Error {

#[snafu(display("failed to generate OIDC config"))]
GenerateOidcConfig { source: oidc::Error },

#[snafu(display(
"NiFi 1.x requires ZooKeeper (hint: upgrade to NiFi 2.x or set .spec.clusterConfig.zookeeperConfigMapName)"
))]
Nifi1RequiresZookeeper,
}

/// Create the NiFi bootstrap.conf
Expand Down Expand Up @@ -143,13 +148,15 @@ pub fn build_nifi_properties(
overrides: BTreeMap<String, String>,
product_version: &str,
) -> Result<String, Error> {
// TODO: Remove once we dropped support for all NiFi 1.x versions
let is_nifi_1 = product_version.starts_with("1.");

let mut properties = BTreeMap::new();
// Core Properties
// According to https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance#MigrationGuidance-Migratingto2.0.0-M1
// The nifi.flow.configuration.file property in nifi.properties must be changed to reference
// "flow.json.gz" instead of "flow.xml.gz"
// TODO: Remove once we dropped support for all 1.x.x versions
let flow_file_name = if product_version.starts_with("1.") {
let flow_file_name = if is_nifi_1 {
"flow.xml.gz"
} else {
"flow.json.gz"
Expand Down Expand Up @@ -250,7 +257,10 @@ pub fn build_nifi_properties(
// The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
properties.insert(
"nifi.state.management.provider.cluster".to_string(),
"zk-provider".to_string(),
match spec.cluster_config.clustering_backend {
v1alpha1::NifiClusteringBackend::ZooKeeper { .. } => "zk-provider".to_string(),
v1alpha1::NifiClusteringBackend::Kubernetes { .. } => "kubernetes-provider".to_string(),
},
);
// Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
properties.insert(
Expand Down Expand Up @@ -559,47 +569,84 @@ pub fn build_nifi_properties(
"".to_string(),
);

// zookeeper properties, used for cluster management
// this will be replaced via a container command script
properties.insert(
"nifi.zookeeper.connect.string".to_string(),
"${env:ZOOKEEPER_HOSTS}".to_string(),
);

// this will be replaced via a container command script
properties.insert(
"nifi.zookeeper.root.node".to_string(),
"${env:ZOOKEEPER_CHROOT}".to_string(),
);
match spec.cluster_config.clustering_backend {
v1alpha1::NifiClusteringBackend::ZooKeeper { .. } => {
properties.insert(
"nifi.cluster.leader.election.implementation".to_string(),
"CuratorLeaderElectionManager".to_string(),
);

// this will be replaced via a container command script
properties.insert(
"nifi.zookeeper.connect.string".to_string(),
"${env:ZOOKEEPER_HOSTS}".to_string(),
);

// this will be replaced via a container command script
properties.insert(
"nifi.zookeeper.root.node".to_string(),
"${env:ZOOKEEPER_CHROOT}".to_string(),
);
}

v1alpha1::NifiClusteringBackend::Kubernetes {} => {
ensure!(!is_nifi_1, Nifi1RequiresZookeeperSnafu);

properties.insert(
"nifi.cluster.leader.election.implementation".to_string(),
"KubernetesLeaderElectionManager".to_string(),
);

// this will be replaced via a container command script
properties.insert(
"nifi.cluster.leader.election.kubernetes.lease.prefix".to_string(),
"${env:STACKLET_NAME}".to_string(),
);
}
}

// override with config overrides
properties.extend(overrides);

Ok(format_properties(properties))
}

pub fn build_state_management_xml() -> String {
pub fn build_state_management_xml(clustering_backend: &NifiClusteringBackend) -> String {
// Inert providers are ignored by NiFi itself, but templating still fails if they refer to invalid environment variables,
// so only include the actually used provider.
let cluster_provider = match clustering_backend {
NifiClusteringBackend::ZooKeeper { .. } => {
r#"<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name="Connect String">${env:ZOOKEEPER_HOSTS}</property>
<property name="Root Node">${env:ZOOKEEPER_CHROOT}</property>
<property name="Session Timeout">10 seconds</property>
<property name="Access Control">Open</property>
</cluster-provider>"#
}
NifiClusteringBackend::Kubernetes {} => {
r#"<cluster-provider>
<id>kubernetes-provider</id>
<class>org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider</class>
<property name="ConfigMap Name Prefix">${env:STACKLET_NAME}</property>
</cluster-provider>"#
}
};
format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>
r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<stateManagement>
<local-provider>
<id>local-provider</id>
<id>local-provider</id>
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
<property name=\"Directory\">{}</property>
<property name=\"Always Sync\">false</property>
<property name=\"Partitions\">16</property>
<property name=\"Checkpoint Interval\">2 mins</property>
<property name="Directory">{local_state_path}</property>
<property name="Always Sync">false</property>
<property name="Partitions">16</property>
<property name="Checkpoint Interval">2 mins</property>
</local-provider>
<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name=\"Connect String\">${{env:ZOOKEEPER_HOSTS}}</property>
<property name=\"Root Node\">${{env:ZOOKEEPER_CHROOT}}</property>
<property name=\"Session Timeout\">10 seconds</property>
<property name=\"Access Control\">Open</property>
</cluster-provider>
</stateManagement>",
&NifiRepository::State.mount_path(),
{cluster_provider}
</stateManagement>"#,
local_state_path = NifiRepository::State.mount_path(),
)
}

Expand Down
Loading
Loading