Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes bug where multiple nodes fight over an eip #435

Merged
merged 4 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
346 changes: 179 additions & 167 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [

[workspace.package]
edition = "2021"
rust-version = "1.74.0"
rust-version = "1.76.0"


# Use this section only to change the source of dependencies that might
Expand Down
8 changes: 2 additions & 6 deletions cilium_eip_no_masquerade_agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ impl RuleManager {
}

async fn cleanup_legacy_per_pod_rules(&self, pod: &Pod) -> Result<(), Error> {
let pod_name = pod
.metadata
.name
.as_ref()
.ok_or(eip_operator_shared::Error::MissingPodName)?;
let pod_name = pod.name_unchecked();

// Assuming that if it doesn't have an IP during cleanup, that it never had one.
if let Some(pod_ip_str) = &pod
Expand All @@ -98,7 +94,7 @@ impl RuleManager {
self.ip_rule_handle.del(rule).execute().await?;
}
}
self.remove_finalizer(pod, pod_name).await?;
self.remove_finalizer(pod, &pod_name).await?;
Ok(())
}

Expand Down
9 changes: 5 additions & 4 deletions deny.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[graph]
targets = [
{ triple = "aarch64-apple-darwin" },
{ triple = "aarch64-unknown-linux-gnu" },
Expand All @@ -6,7 +7,7 @@ targets = [
]

[advisories]
vulnerability = "deny"
version = 2

[bans]
multiple-versions = "deny"
Expand All @@ -20,9 +21,9 @@ skip = [
{ name = "hashbrown", version = "0.12.3" },
{ name = "hashbrown", version = "0.14.0" },
{ name = "nix", version = "0.26.4" },
{ name = "nix", version = "0.27.1" },
{ name = "ordered-float", version = "2.10.0" },
{ name = "ordered-float", version = "3.4.0" },
{ name = "fastrand", version = "2.0.1" },
{ name = "regex-syntax", version = "0.6.29" },
]

# Use `tracing` instead.
Expand All @@ -35,14 +36,14 @@ name = "env_logger"
name = "rustls"

[licenses]
version = 2
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"MIT",
"Unicode-DFS-2016",
]
copyleft = "deny"

[[licenses.clarify]]
name = "ring"
Expand Down
18 changes: 4 additions & 14 deletions eip_operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,10 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
aws-config = { version = "0.55.1", default-features = false, features = [
"native-tls",
] }
aws-sdk-ec2 = { version = "0.28", default-features = false, features = [
"native-tls",
"rt-tokio",
] }
aws-sdk-servicequotas = { version = "0.28", default-features = false, features = [
"native-tls",
"rt-tokio",
] }
aws-smithy-http = { version = "0.55", default-features = false, features = [
"rt-tokio",
] }
aws-config = { version = "0.101", default-features = false}
aws-sdk-ec2 = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
aws-sdk-servicequotas = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
aws-smithy-http = { version = "0.59", default-features = false, features = [ "rt-tokio" ] }
futures = { workspace = true }


Expand Down
17 changes: 11 additions & 6 deletions eip_operator/src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aws_sdk_ec2::operation::disassociate_address::DisassociateAddressError;
use aws_sdk_ec2::operation::release_address::{ReleaseAddressError, ReleaseAddressOutput};
use aws_sdk_ec2::types::{Address, DomainType, Filter, ResourceType, Tag, TagSpecification};
use aws_sdk_ec2::Client as Ec2Client;
use tracing::{debug, info, instrument};
use tracing::{debug, error, info, instrument};

pub(crate) const LEGACY_CLUSTER_NAME_TAG: &str = "eip.aws.materialize.com/cluster_name";

Expand Down Expand Up @@ -150,19 +150,24 @@ pub(crate) async fn describe_addresses_with_tag_value(
pub(crate) async fn disassociate_eip(
ec2_client: &Ec2Client,
association_id: &str,
) -> Result<(), SdkError<DisassociateAddressError>> {
) -> Result<(), DisassociateAddressError> {
match ec2_client
.disassociate_address()
.association_id(association_id)
.send()
.await
{
Ok(_) => Ok(()),
Err(e) if e.to_string().contains("InvalidAssociationID.NotFound") => {
info!(already_disassociated = true);
Ok(())
Err(e) => {
let e = e.into_service_error();
if e.meta().code() == Some("InvalidAssociationID.NotFound") {
info!("Association id {} already disassociated", association_id);
Ok(())
} else {
error!("Error disassociating {} - {:?}", association_id, e);
Err(e)
}
}
Err(e) => Err(e),
}
}

Expand Down
66 changes: 55 additions & 11 deletions eip_operator/src/controller/eip.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use rand::Rng;
use std::collections::HashMap;

use kube::api::Api;
use kube::api::{Api, PatchParams};
use kube::{Client, ResourceExt};
use kube_runtime::controller::Action;
use tracing::instrument;
use tracing::{info, instrument, warn};

use eip_operator_shared::Error;

Expand Down Expand Up @@ -42,15 +43,15 @@ impl k8s_controller::Context for Context {
client: Client,
eip: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let eip_api = Api::namespaced(client, &eip.namespace().unwrap());
let eip_api = Api::namespaced(client.clone(), &eip.namespace().unwrap());

let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?;
let name = eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?;
let uid = eip.uid().ok_or(Error::MissingEipUid)?;
let name = eip.name_unchecked();
let selector = &eip.spec.selector;
let addresses = crate::aws::describe_addresses_with_tag_value(
&self.ec2_client,
crate::aws::EIP_UID_TAG,
uid,
&uid,
)
.await?
.addresses
Expand All @@ -59,8 +60,8 @@ impl k8s_controller::Context for Context {
0 => {
let response = crate::aws::allocate_address(
&self.ec2_client,
uid,
name,
&uid,
&name,
selector,
&self.cluster_name,
&eip.namespace().unwrap(),
Expand All @@ -86,7 +87,50 @@ impl k8s_controller::Context for Context {
return Err(Error::MultipleEipsTaggedForPod);
}
};
crate::eip::set_status_created(&eip_api, name, &allocation_id, &public_ip).await?;
crate::eip::set_status_created(&eip_api, &name, &allocation_id, &public_ip).await?;

if !eip.status.as_ref().is_some_and(|s| s.resource_id.is_some()) {
let resource_api = eip.get_resource_api(&client);
let matched_resources = resource_api
.list(&eip.get_resource_list_params())
.await?
.items;
info!(
"Eip apply for {} Found matched {} resources",
name,
matched_resources.len()
);
for resource in matched_resources {
info!(
"Updating eip refresh label for {}",
resource.name_unchecked()
);
let data = resource.clone().data(serde_json::json!({
"metadata": {
"labels":{
"eip.materialize.cloud/refresh": format!("{}",rand::thread_rng().gen::<u64>()),

}
}
}));
if let Err(err) = resource_api
.patch_metadata(
&resource.name_unchecked(),
&PatchParams::default(),
&kube::core::params::Patch::Merge(serde_json::json!(data)),
)
.await
{
warn!(
"Failed to patch resource {} refresh label for {}: err {:?}",
resource.name_unchecked(),
name,
err
);
};
}
}

Ok(None)
}

Expand All @@ -96,11 +140,11 @@ impl k8s_controller::Context for Context {
_client: Client,
eip: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?;
let uid = eip.uid().ok_or(Error::MissingEipUid)?;
let addresses = crate::aws::describe_addresses_with_tag_value(
&self.ec2_client,
crate::aws::EIP_UID_TAG,
uid,
&uid,
)
.await?
.addresses;
Expand Down
92 changes: 67 additions & 25 deletions eip_operator/src/controller/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::time::Duration;

use k8s_openapi::api::core::v1::Node;
use kube::api::{Api, ListParams};
use kube::Client;
use kube::error::ErrorResponse;
use kube::{Client, ResourceExt};
use kube_runtime::controller::Action;
use tracing::{event, instrument, Level};
use tracing::{event, info, instrument, warn, Level};

use eip_operator_shared::Error;

Expand Down Expand Up @@ -36,7 +39,7 @@ impl k8s_controller::Context for Context {
client: Client,
node: &Self::Resource,
) -> Result<Option<Action>, Self::Error> {
let name = node.metadata.name.as_ref().ok_or(Error::MissingNodeName)?;
let name = node.name_unchecked();
event!(Level::INFO, name = %name, "Applying node.");

let eip_api = Api::<Eip>::namespaced(
Expand All @@ -45,18 +48,33 @@ impl k8s_controller::Context for Context {
);

let node_ip = node.ip().ok_or(Error::MissingNodeIp)?;
let node_labels = node.labels().ok_or(Error::MissingNodeLabels)?;
let node_labels = node.labels();
let provider_id = node.provider_id().ok_or(Error::MissingProviderId)?;
let instance_id = provider_id
.rsplit_once('/')
.ok_or(Error::MalformedProviderId)?
.1;
let all_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = all_eips
let matched_eips: Vec<Eip> = eip_api
.list(&ListParams::default())
.await?
.items
.into_iter()
.find(|eip| eip.matches_node(node_labels))
.ok_or(Error::NoEipResourceWithThatNodeSelector)?;
let eip_name = eip.name().ok_or(Error::MissingEipName)?;
.filter(|eip| eip.matches_node(node_labels))
.collect();
if matched_eips.is_empty() {
return Err(Error::NoEipResourceWithThatNodeSelector);
}
let eip = matched_eips.into_iter().find(|eip| {
eip.status.as_ref().is_some_and(|s| {
s.resource_id.is_none()
|| s.resource_id.as_ref().map(|r| r == &name).unwrap_or(false)
})
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle the migration from older versions of the EIP operator, which don't have resource_id's? Is this OK just because we don't have multiple nodes that match currently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, there could be some re-alignment during this migration if there are multiple nodes matching a single eip, but it shouldn't be any more unstable than the existing implementation.

let Some(eip) = eip else {
info!("No un-associated eips found for node {}", &name);
return Ok(None);
};
let eip_name = eip.name_unchecked();
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id)
.await?
Expand All @@ -71,10 +89,35 @@ impl k8s_controller::Context for Context {
if eip_description.network_interface_id != Some(eni_id.to_owned())
|| eip_description.private_ip_address != Some(node_ip.to_owned())
{
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip).await?;
match crate::eip::set_status_should_attach(&eip_api, &eip, &eni_id, node_ip, &name)
.await
{
Ok(_) => {
info!("Found matching Eip, attaching it");
let association_id = crate::aws::associate_eip(
&self.ec2_client,
allocation_id,
&eni_id,
node_ip,
)
.await?
.association_id
.ok_or(Error::MissingAssociationId)?;
crate::eip::set_status_association_id(&eip_api, &eip_name, &association_id)
.await?;
}
Err(Error::Kube {
source: kube::Error::Api(ErrorResponse { reason, .. }),
}) if reason == "Conflict" => {
warn!(
"Node {} failed to claim eip {}, rescheduling to try another",
name, eip_name
);
return Ok(Some(Action::requeue(Duration::from_secs(3))));
}
Err(e) => return Err(e),
};
}
crate::eip::set_status_attached(&eip_api, eip_name, &eni_id, node_ip).await?;

Ok(None)
}

Expand All @@ -88,14 +131,17 @@ impl k8s_controller::Context for Context {
client.clone(),
self.namespace.as_deref().unwrap_or("default"),
);

let node_labels = node.labels().ok_or(Error::MissingNodeLabels)?;
let all_eips = eip_api.list(&ListParams::default()).await?.items;
let eip = all_eips
.into_iter()
.filter(|eip| eip.attached())
.find(|eip| eip.matches_node(node_labels));
if let Some(eip) = eip {
let node_labels = node.labels();
let matched_eips = eip_api.list(&ListParams::default()).await?.items;
// find all eips that match (there should be one, but lets not lean on that)
let eips = matched_eips.into_iter().filter(|eip| {
eip.matches_node(node_labels)
&& eip
.status
.as_ref()
.is_some_and(|s| s.resource_id == Some(node.name_unchecked().clone()))
});
for eip in eips {
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id)
.await?
Expand All @@ -106,11 +152,7 @@ impl k8s_controller::Context for Context {
crate::aws::disassociate_eip(&self.ec2_client, &association_id).await?;
}
}
crate::eip::set_status_detached(
&eip_api,
eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?,
)
.await?;
crate::eip::set_status_detached(&eip_api, &eip.name_unchecked()).await?;
}
Ok(None)
}
Expand Down
Loading
Loading