Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle unresponsive nodes #474

Merged
merged 16 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM rust:1.74.0-slim-bookworm AS chef
FROM --platform=$BUILDPLATFORM rust:1.76.0-slim-bookworm AS chef
RUN cargo install --locked cargo-chef
ARG TARGETARCH
RUN echo -n "$TARGETARCH" | sed 's#amd64#x86_64#;s#arm64#aarch64#' > /cargo_arch
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
⚠️⚠️ **WARNING!** ⚠️⚠️ The Materialize K8s-eip-operator will soon be archived and no longer be under active development.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably enough to say that it will soon be archived or will soon be archived and is no longer under active development.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up the wording per node improvements. I left in the warning portion as I found that in another MZ public archive as a practice.

# k8s-eip-operator

Manage external connections to Kubernetes pods or nodes using AWS Elastic IPs (EIPs).
Expand Down
44 changes: 42 additions & 2 deletions eip_operator/src/controller/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl k8s_controller::Context for Context {
.rsplit_once('/')
.ok_or(Error::MalformedProviderId)?
.1;

let matched_eips: Vec<Eip> = eip_api
.list(&ListParams::default())
.await?
Expand All @@ -64,6 +65,41 @@ impl k8s_controller::Context for Context {
if matched_eips.is_empty() {
return Err(Error::NoEipResourceWithThatNodeSelector);
}

// Check the existing Node for EIP association and ready status.
// Node's that go in to a "NotReady" or "Unknown" state should have their EIP
// disassociated to allow a new node to spawn and use the EIP.
let node_condition_ready_status = crate::node::get_ready_status_from_node(node)
.ok_or(Error::MissingNodeReadyCondition)?;
match node_condition_ready_status.as_str() {
// Remove the EIP from nodes with an Unknown or NotReady ready status.
// An Unknown ready status could mean the node is unresponsive or experienced a hardware failure.
// A NotReady ready status could mean the node is experiencing a network issue.
"Unknown" | "False" => {
// Skip disassociation if no EIP is not associated with the node.
let node_eip = matched_eips.iter().find(|eip| {
eip.status.as_ref().is_some_and(|s| {
s.resource_id.is_some()
&& s.resource_id.as_ref().map(|r| r == &name).unwrap_or(false)
})
});
if let Some(eip) = node_eip {
let node_eip_name = eip.name_unchecked();
warn!(
"Node {} is in an unknown state, disassociating EIP {}",
&name.clone(),
&node_eip_name
);
crate::aws::disassociate_eip(&self.ec2_client, &node_eip_name).await?;
crate::eip::set_status_detached(&eip_api, &node_eip_name).await?;

return Ok(None);
}
}
// Skip Ready Status True and continue with EIP node association.
_ => {}
}

let eip = matched_eips.into_iter().find(|eip| {
eip.status.as_ref().is_some_and(|s| {
s.resource_id.is_none()
Expand All @@ -86,8 +122,12 @@ impl k8s_controller::Context for Context {

let eni_id = crate::aws::get_eni_from_private_ip(&instance_description, node_ip)
.ok_or(Error::NoInterfaceWithThatIp)?;
if eip_description.network_interface_id != Some(eni_id.to_owned())
|| eip_description.private_ip_address != Some(node_ip.to_owned())
// Ensure only node's marked with ready status True are associated with an EIP.
// We don't want to associate an EIP with a node that is not ready and potentially remove it
// in the next reconciliation loop if the node is still coming online.
if (eip_description.network_interface_id != Some(eni_id.to_owned())
|| eip_description.private_ip_address != Some(node_ip.to_owned()))
&& node_condition_ready_status == "True"
{
match crate::eip::set_status_should_attach(&eip_api, &eip, &eni_id, node_ip, &name)
.await
Expand Down
123 changes: 123 additions & 0 deletions eip_operator/src/egress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use crate::eip::v2::Eip;
use crate::Error;
use k8s_openapi::api::core::v1::Node;
use kube::api::{Api, ListParams, Patch, PatchParams};
use std::collections::{BTreeMap, BTreeSet};
use tracing::{info, instrument};

use crate::EGRESS_GATEWAY_NODE_SELECTOR_LABEL_KEY;
use crate::EGRESS_GATEWAY_NODE_SELECTOR_LABEL_VALUE;

/// Applies label to node specifying the status of the egress gateway node.
#[instrument(skip(api), err)]
async fn add_gateway_status_label(
api: &Api<Node>,
name: &str,
status: &str,
) -> Result<Node, kube::Error> {
info!(
"Adding gateway status label {} value {} to node {}",
crate::EGRESS_NODE_STATUS_LABEL,
status,
name
);
let patch = serde_json::json!({
"apiVersion": "v1",
"kind": "Node",
"metadata": {
"labels": {
// Ensure the status is lowercase to match conventions
crate::EGRESS_NODE_STATUS_LABEL: status.to_lowercase(),
}
}
});
let patch = Patch::Apply(&patch);
let params = PatchParams::apply(crate::FIELD_MANAGER);
api.patch(name, &params, &patch).await
}

/// Retrieve all egress nodes in the cluster.
async fn get_egress_nodes(api: &Api<Node>) -> Result<Vec<Node>, kube::Error> {
let params = ListParams::default().labels(
format!(
"{}={}",
EGRESS_GATEWAY_NODE_SELECTOR_LABEL_KEY, EGRESS_GATEWAY_NODE_SELECTOR_LABEL_VALUE
)
.as_str(),
);

match api.list(&params).await {
Ok(node_list) => Ok(node_list.items),
Err(e) => Err(e),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need this if we're using the node selector on the eip.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much cleaner and removed the need for some CONST labels in egress improvements


/// Update state label on egress nodes.
/// Note: Egress traffic will be immediately dropped when the status label is changed away from "true".
#[instrument(skip(), err)]
pub(crate) async fn label_egress_nodes(
eip_api: &Api<Eip>,
node_api: &Api<Node>,
) -> Result<(), Error> {
info!("Updating egress node status labels.");
let node_list = get_egress_nodes(&Api::all(node_api.clone().into())).await?;
if node_list.is_empty() {
info!("No egress nodes found. Skipping egress cleanup.");
return Ok(());
}

let node_names_and_status: BTreeMap<String, String> =
crate::node::get_nodes_ready_status(node_list)?;
let (nodes_status_ready, nodes_status_unknown): (BTreeSet<String>, BTreeSet<String>) =
node_names_and_status.iter().fold(
evanharmon marked this conversation as resolved.
Show resolved Hide resolved
(BTreeSet::new(), BTreeSet::new()),
|(mut ready, mut unknown), (name, status)| {
match status.as_str() {
"True" => {
ready.insert(name.clone());
}
"Unknown" => {
unknown.insert(name.clone());
}
// Ignore nodes in other states.
&_ => {
info!("Ignoring node {} with status {}", name, status);
}
}
(ready, unknown)
},
);

// Ensure an egress node exists with an EIP and Ready state of `true`.
let eip_resource_ids: BTreeSet<String> = eip_api
.list(&ListParams::default())
.await?
.items
.into_iter()
.filter_map(|eip| eip.status.and_then(|s| s.resource_id))
.collect();
let matched_ready_nodes_with_eip: BTreeSet<String> = nodes_status_ready
.intersection(&eip_resource_ids)
.cloned()
.collect();

if matched_ready_nodes_with_eip.is_empty() {
info!("No ready egress nodes found with EIPs. Skipping egress labeling.");
return Ok(());
}

info!(
"Found ready egress nodes with EIPs: {:?}",
matched_ready_nodes_with_eip
);
// Set egress status for nodes ready with an EIP attached.
for node_name in nodes_status_ready {
evanharmon marked this conversation as resolved.
Show resolved Hide resolved
add_gateway_status_label(node_api, &node_name, "ready").await?;
}
// Attempt cleanup of nodes in a ready state of `Unknown` if another node is ready with an EIP.
for node_name in nodes_status_unknown {
add_gateway_status_label(node_api, &node_name, "unknown").await?;
}

Ok(())
}
36 changes: 35 additions & 1 deletion eip_operator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::collections::{HashMap, HashSet};
use std::pin::pin;
use std::time::Duration;

use aws_sdk_ec2::types::Filter;
use aws_sdk_ec2::Client as Ec2Client;
use aws_sdk_servicequotas::types::ServiceQuota;
use aws_sdk_servicequotas::Client as ServiceQuotaClient;
use futures::future::join_all;
use futures::TryStreamExt;
use json_patch::{PatchOperation, RemoveOperation, TestOperation};
use k8s_controller::Controller;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::api::core::v1::{Node, Pod};
use kube::api::{Api, ListParams, Patch, PatchParams};
use kube::{Client, ResourceExt};
use tokio::task;
Expand All @@ -20,8 +22,10 @@ use eip::v2::Eip;

mod aws;
mod controller;
mod egress;
mod eip;
mod kube_ext;
mod node;

const LEGACY_MANAGE_EIP_LABEL: &str = "eip.aws.materialize.com/manage";
const LEGACY_POD_FINALIZER_NAME: &str = "eip.aws.materialize.com/disassociate";
Expand All @@ -30,6 +34,9 @@ const FIELD_MANAGER: &str = "eip.materialize.cloud";
const AUTOCREATE_EIP_LABEL: &str = "eip.materialize.cloud/autocreate_eip";
const EIP_ALLOCATION_ID_ANNOTATION: &str = "eip.materialize.cloud/allocation_id";
const EXTERNAL_DNS_TARGET_ANNOTATION: &str = "external-dns.alpha.kubernetes.io/target";
const EGRESS_GATEWAY_NODE_SELECTOR_LABEL_KEY: &str = "workload";
const EGRESS_GATEWAY_NODE_SELECTOR_LABEL_VALUE: &str = "materialize-egress";
const EGRESS_NODE_STATUS_LABEL: &str = "egress-gateway.materialize.cloud/status";

// See https://us-east-1.console.aws.amazon.com/servicequotas/home/services/ec2/quotas
// and filter in the UI for EC2 quotas like this, or use the CLI:
Expand Down Expand Up @@ -89,6 +96,9 @@ async fn run() -> Result<(), Error> {
None => Api::<Eip>::all(k8s_client.clone()),
};

debug!("Getting node api");
let node_api = Api::<Node>::all(k8s_client.clone());

debug!("Cleaning up any orphaned EIPs");
cleanup_orphan_eips(
&ec2_client,
Expand Down Expand Up @@ -149,6 +159,30 @@ async fn run() -> Result<(), Error> {
task::spawn(eip_controller.run())
});

tasks.push({
let eip_api = eip_api.clone();
let node_api = node_api.clone();
let watch_config =
kube_runtime::watcher::Config::default().labels(EGRESS_GATEWAY_NODE_SELECTOR_LABEL_KEY);

task::spawn(async move {
let mut watcher = pin!(kube_runtime::watcher(node_api.clone(), watch_config));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm I'd think we'd want to watch Eip resources rather than Node, since we set the Eip's status each time the Eip is associated or disassociated. it's not clear to me that this fires if we associate an Eip right now

separately, this watcher is also self-triggering, where setting the egress gateway status will trigger another reconciliation. not a correctness issue if it converges, but could create some read/write amplification or hard-to-debug issues if it does anything unexpected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find! The watcher is now correctly configured on Eip resources.


while let Some(node) = watcher.try_next().await.unwrap_or_else(|e| {
event!(Level::ERROR, err = %e, "Error watching nodes");
None
}) {
if let kube_runtime::watcher::Event::Applied(_)
| kube_runtime::watcher::Event::Deleted(_) = node
{
if let Err(err) = crate::egress::label_egress_nodes(&eip_api, &node_api).await {
event!(Level::ERROR, err = %err, "Node egress labeling reporting error");
}
}
}
})
});

join_all(tasks).await;

debug!("exiting");
Expand Down
33 changes: 33 additions & 0 deletions eip_operator/src/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::collections::BTreeMap;

use eip_operator_shared::Error;
use k8s_openapi::api::core::v1::Node;

/// Get Ready status from the node status field.
pub(crate) fn get_ready_status_from_node(node: &Node) -> Option<String> {
node.status
.as_ref()?
.conditions
.as_ref()?
.iter()
.find(|c| c.type_ == "Ready")
.map(|condition| condition.status.clone())
}

/// Retrieve node names and ready status given a list of nodes.
pub(crate) fn get_nodes_ready_status(
node_list: Vec<Node>,
) -> Result<BTreeMap<String, String>, Error> {
let mut node_ready_status_map = BTreeMap::new();

for node in node_list {
if let Some(ref node_name) = node.metadata.name {
let ready_status =
get_ready_status_from_node(&node).ok_or(Error::MissingNodeReadyCondition)?;

node_ready_status_map.insert(node_name.to_string(), ready_status);
}
}

Ok(node_ready_status_map)
}
4 changes: 4 additions & 0 deletions eip_operator_shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub enum Error {
MissingNodeName,
#[error("Node does not have labels.")]
MissingNodeLabels,
#[error("Node does not have a status.")]
MissingNodeStatus,
#[error("Node does not have a ready condition.")]
MissingNodeReadyCondition,
#[error("Node does not have a provider_id in its spec.")]
MissingProviderId,
#[error("Node provider_id is not in expected format.")]
Expand Down
Loading