Skip to content

Support Zookeeperless NiFi #775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept.
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- NiFi 2.x now supports storing cluster state in Kuberntes instead of ZooKeeper ([#775]).

### Changed

Expand All @@ -37,6 +38,7 @@ All notable changes to this project will be documented in this file.
[#771]: https://github.com/stackabletech/nifi-operator/pull/771
[#772]: https://github.com/stackabletech/nifi-operator/pull/772
[#774]: https://github.com/stackabletech/nifi-operator/pull/774
[#775]: https://github.com/stackabletech/nifi-operator/pull/775
[#776]: https://github.com/stackabletech/nifi-operator/pull/776
[#782]: https://github.com/stackabletech/nifi-operator/pull/782
[#787]: https://github.com/stackabletech/nifi-operator/pull/787
Expand Down
10 changes: 8 additions & 2 deletions deploy/helm/nifi-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ spec:
description: A NiFi cluster stacklet. This resource is managed by the Stackable operator for Apache NiFi. Find more information on how to use it and the resources that the operator generates in the [operator documentation](https://docs.stackable.tech/home/nightly/nifi/).
properties:
clusterConfig:
anyOf:
- required:
- zookeeperConfigMapName
- {}
description: Settings that affect all roles and role groups. The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
properties:
authentication:
Expand Down Expand Up @@ -158,12 +162,14 @@ spec:
nullable: true
type: string
zookeeperConfigMapName:
description: NiFi requires a ZooKeeper cluster connection to run. Provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource.
description: |-
NiFi can either use ZooKeeper or Kubernetes for managing its cluster state. To use ZooKeeper, provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource.

The Kubernetes provider will be used if this field is unset. Kubernetes is only supported for NiFi 2.x and newer, NiFi 1.x requires ZooKeeper.
type: string
required:
- authentication
- sensitiveProperties
- zookeeperConfigMapName
type: object
clusterOperation:
default:
Expand Down
27 changes: 26 additions & 1 deletion deploy/helm/nifi-operator/templates/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,11 @@ rules:
- apiGroups:
- ""
resources:
- configmaps
- secrets
- serviceaccounts
# This is redundant with the rule for specifically about configmaps
# (due to clustering), but we read them for other purposes too
- configmaps
verbs:
- get
- apiGroups:
Expand All @@ -144,6 +146,29 @@ rules:
- events
verbs:
- create
# Required for Kubernetes-managed clustering, see https://nifi.apache.org/nifi-docs/administration-guide.html#kubernetes-clustering
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- get
- update
# undocumented but required
- patch
# Required for Kubernetes cluster state provider, see https://nifi.apache.org/nifi-docs/administration-guide.html#kubernetes-configmap-cluster-state-provider
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- delete
- get
- list
- patch
- update
{{ if .Capabilities.APIVersions.Has "security.openshift.io/v1" }}
- apiGroups:
- security.openshift.io
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/nifi/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Every role group is accessible through it's own Service, and there is a Service

== Dependencies

Apache NiFi depends on Apache ZooKeeper which you can run in Kubernetes with the xref:zookeeper:index.adoc[].
Apache NiFi 1.x depends on Apache ZooKeeper which you can run in Kubernetes with the xref:zookeeper:index.adoc[].

== [[demos]]Demos

Expand Down
41 changes: 41 additions & 0 deletions docs/modules/nifi/pages/usage_guide/clustering.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
= Clustering
:description: Apache NiFi requires a backend for cluster management, and supports either Kubernetes or Apache ZooKeeper.
:page-aliases: usage_guide/zookeeper-connection.adoc

Apache NiFi requires{empty}footnote:[Apache NiFi also supports a single-node mode with no cluster backend, but this is not supported by the Stackable Operator for Apache NiFi. The Stackable Operator does require a cluster backend.] an external backend for state management and leader election.

Currently, the Stackable Operator for Apache NiFi supports the following backends:

- xref:#backend-kubernetes[]
- xref:#backend-zookeeper[]

CAUTION: The cluster backend of an existing cluster should never be changed. Otherwise data loss may occur, both due to losing NiFi processor state, and due to potential split-brain scenarios during the migration.

[#backend-kubernetes]
== Kubernetes

NOTE: The Kubernetes provider is only supported by Apache NiFi 2.0 or newer. When using NiFi 1.x, use the xref:#backend-zookeeper[] backend instead.

The Kubernetes backend is used by default (unless the xref:#backend-zookeeper[] backend is configured), and stores all state in Kubernetes objects, in the same namespace as the `NifiCluster` object.

It takes no configuration.

[#backend-zookeeper]
== Apache ZooKeeper

NiFi can also be configured to store its state in Apache ZooKeeper.

NiFi in cluster mode requires an Apache ZooKeeper ensemble for state management and leader election purposes, the Stackable operator for Apache NiFi does not support single node deployments without ZooKeeper, hence this is a required setting.

This is enabled by setting the `spec.clusterConfig.zookeeperConfigMapName` to a xref:concepts:service-discovery.adoc[discovery ConfigMap]:

[source,yaml]
----
spec:
clusterConfig:
zookeeperConfigMapName: simple-nifi-znode
----

The ConfigMap needs to contain two keys: `ZOOKEEPER_HOSTS` containing the value being the ZooKeeper connection string, and `ZOOKEEPER_CHROOT` containing the ZooKeeper chroot.

The xref:zookeeper:index.adoc[Stackable operator for Apache ZooKeeper] automatically creates this ConfigMap for every ZookeeperZnode object.
2 changes: 1 addition & 1 deletion docs/modules/nifi/pages/usage_guide/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ spec:
replicas: 3
----

<1> The xref:usage_guide/zookeeper-connection.adoc[ZooKeeper instance] to use.
<1> The xref:usage_guide/clustering.adoc#backend-zookeeper[ZooKeeper instance] to use.
<2> How users should xref:usage_guide/security.adoc[authenticate] themselves.
<3> xref:usage_guide/extra-volumes.adoc[Extra volumes] with files that can be referenced in custom workflows.
<4> xref:usage_guide/resource-configuration.adoc[CPU and memory configuration] can be set per role group.
Expand Down
14 changes: 0 additions & 14 deletions docs/modules/nifi/pages/usage_guide/zookeeper-connection.adoc

This file was deleted.

2 changes: 1 addition & 1 deletion docs/modules/nifi/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
** xref:nifi:getting_started/first_steps.adoc[]
* xref:nifi:usage_guide/index.adoc[]
** xref:nifi:usage_guide/listenerclass.adoc[]
** xref:nifi:usage_guide/zookeeper-connection.adoc[]
** xref:nifi:usage_guide/clustering.adoc[]
** xref:nifi:usage_guide/extra-volumes.adoc[]
** xref:nifi:usage_guide/security.adoc[]
** xref:nifi:usage_guide/resource-configuration.adoc[]
Expand Down
115 changes: 81 additions & 34 deletions rust/operator-binary/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use jvm::build_merged_jvm_config;
use product_config::{ProductConfigManager, types::PropertyNameKind};
use snafu::{ResultExt, Snafu};
use snafu::{ResultExt, Snafu, ensure};
use stackable_operator::{
commons::resources::Resources,
memory::MemoryQuantity,
Expand All @@ -20,7 +20,7 @@ use strum::{Display, EnumIter};
use crate::{
crd::{
HTTPS_PORT, NifiConfig, NifiConfigFragment, NifiRole, NifiStorageConfig, PROTOCOL_PORT,
v1alpha1,
v1alpha1::{self, NifiClusteringBackend},
},
operations::graceful_shutdown::graceful_shutdown_config_properties,
security::{
Expand Down Expand Up @@ -96,6 +96,11 @@ pub enum Error {

#[snafu(display("failed to generate OIDC config"))]
GenerateOidcConfig { source: oidc::Error },

#[snafu(display(
"NiFi 1.x requires ZooKeeper (hint: upgrade to NiFi 2.x or set .spec.clusterConfig.zookeeperConfigMapName)"
))]
Nifi1RequiresZookeeper,
}

/// Create the NiFi bootstrap.conf
Expand Down Expand Up @@ -143,13 +148,15 @@ pub fn build_nifi_properties(
overrides: BTreeMap<String, String>,
product_version: &str,
) -> Result<String, Error> {
// TODO: Remove once we dropped support for all NiFi 1.x versions
let is_nifi_1 = product_version.starts_with("1.");

let mut properties = BTreeMap::new();
// Core Properties
// According to https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance#MigrationGuidance-Migratingto2.0.0-M1
// The nifi.flow.configuration.file property in nifi.properties must be changed to reference
// "flow.json.gz" instead of "flow.xml.gz"
// TODO: Remove once we dropped support for all 1.x.x versions
let flow_file_name = if product_version.starts_with("1.") {
let flow_file_name = if is_nifi_1 {
"flow.xml.gz"
} else {
"flow.json.gz"
Expand Down Expand Up @@ -250,7 +257,10 @@ pub fn build_nifi_properties(
// The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
properties.insert(
"nifi.state.management.provider.cluster".to_string(),
"zk-provider".to_string(),
match spec.cluster_config.clustering_backend {
v1alpha1::NifiClusteringBackend::ZooKeeper { .. } => "zk-provider".to_string(),
v1alpha1::NifiClusteringBackend::Kubernetes { .. } => "kubernetes-provider".to_string(),
},
);
// Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
properties.insert(
Expand Down Expand Up @@ -559,47 +569,84 @@ pub fn build_nifi_properties(
"".to_string(),
);

// zookeeper properties, used for cluster management
// this will be replaced via a container command script
properties.insert(
"nifi.zookeeper.connect.string".to_string(),
"${env:ZOOKEEPER_HOSTS}".to_string(),
);

// this will be replaced via a container command script
properties.insert(
"nifi.zookeeper.root.node".to_string(),
"${env:ZOOKEEPER_CHROOT}".to_string(),
);
match spec.cluster_config.clustering_backend {
v1alpha1::NifiClusteringBackend::ZooKeeper { .. } => {
properties.insert(
"nifi.cluster.leader.election.implementation".to_string(),
"CuratorLeaderElectionManager".to_string(),
);

// this will be replaced via a container command script
properties.insert(
"nifi.zookeeper.connect.string".to_string(),
"${env:ZOOKEEPER_HOSTS}".to_string(),
);

// this will be replaced via a container command script
properties.insert(
"nifi.zookeeper.root.node".to_string(),
"${env:ZOOKEEPER_CHROOT}".to_string(),
);
}

v1alpha1::NifiClusteringBackend::Kubernetes {} => {
ensure!(!is_nifi_1, Nifi1RequiresZookeeperSnafu);

properties.insert(
"nifi.cluster.leader.election.implementation".to_string(),
"KubernetesLeaderElectionManager".to_string(),
);

// this will be replaced via a container command script
properties.insert(
"nifi.cluster.leader.election.kubernetes.lease.prefix".to_string(),
"${env:STACKLET_NAME}".to_string(),
);
}
}

// override with config overrides
properties.extend(overrides);

Ok(format_properties(properties))
}

pub fn build_state_management_xml() -> String {
pub fn build_state_management_xml(clustering_backend: &NifiClusteringBackend) -> String {
// Inert providers are ignored by NiFi itself, but templating still fails if they refer to invalid environment variables,
// so only include the actually used provider.
let cluster_provider = match clustering_backend {
NifiClusteringBackend::ZooKeeper { .. } => {
r#"<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name="Connect String">${env:ZOOKEEPER_HOSTS}</property>
<property name="Root Node">${env:ZOOKEEPER_CHROOT}</property>
<property name="Session Timeout">10 seconds</property>
<property name="Access Control">Open</property>
</cluster-provider>"#
}
NifiClusteringBackend::Kubernetes {} => {
r#"<cluster-provider>
<id>kubernetes-provider</id>
<class>org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider</class>
<property name="ConfigMap Name Prefix">${env:STACKLET_NAME}</property>
</cluster-provider>"#
}
};
format!(
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>
r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<stateManagement>
<local-provider>
<id>local-provider</id>
<id>local-provider</id>
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
<property name=\"Directory\">{}</property>
<property name=\"Always Sync\">false</property>
<property name=\"Partitions\">16</property>
<property name=\"Checkpoint Interval\">2 mins</property>
<property name="Directory">{local_state_path}</property>
<property name="Always Sync">false</property>
<property name="Partitions">16</property>
<property name="Checkpoint Interval">2 mins</property>
</local-provider>
<cluster-provider>
<id>zk-provider</id>
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
<property name=\"Connect String\">${{env:ZOOKEEPER_HOSTS}}</property>
<property name=\"Root Node\">${{env:ZOOKEEPER_CHROOT}}</property>
<property name=\"Session Timeout\">10 seconds</property>
<property name=\"Access Control\">Open</property>
</cluster-provider>
</stateManagement>",
&NifiRepository::State.mount_path(),
{cluster_provider}
</stateManagement>"#,
local_state_path = NifiRepository::State.mount_path(),
)
}

Expand Down
Loading