Skip to content

Commit e2c9b82

Browse files
committed
fixes bug where multiple nodes fight over an eip
- introduces resource_ids to eip status - moves setting attachment status to be before an attempt at attaching - moves status update from patch to replace this should raise errors if the resource has changed allowing the resource_id to function somewhat as a lock - resource won't attempt to claim a resource with a resource id - fixes cargo deny errors
1 parent f9deed6 commit e2c9b82

File tree

11 files changed

+272
-240
lines changed

11 files changed

+272
-240
lines changed

Cargo.lock

+179-167
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ members = [
88

99
[workspace.package]
1010
edition = "2021"
11-
rust-version = "1.74.0"
11+
rust-version = "1.76.0"
1212

1313

1414
# Use this section only to change the source of dependencies that might

deny.toml

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
[graph]
12
targets = [
23
{ triple = "aarch64-apple-darwin" },
34
{ triple = "aarch64-unknown-linux-gnu" },
@@ -6,7 +7,7 @@ targets = [
67
]
78

89
[advisories]
9-
vulnerability = "deny"
10+
version = 2
1011

1112
[bans]
1213
multiple-versions = "deny"
@@ -20,9 +21,9 @@ skip = [
2021
{ name = "hashbrown", version = "0.12.3" },
2122
{ name = "hashbrown", version = "0.14.0" },
2223
{ name = "nix", version = "0.26.4" },
23-
{ name = "nix", version = "0.27.1" },
2424
{ name = "ordered-float", version = "2.10.0" },
25-
{ name = "ordered-float", version = "3.4.0" },
25+
{ name = "fastrand", version = "2.0.1" },
26+
{ name = "regex-syntax", version = "0.6.29" },
2627
]
2728

2829
# Use `tracing` instead.
@@ -35,14 +36,14 @@ name = "env_logger"
3536
name = "rustls"
3637

3738
[licenses]
39+
version = 2
3840
allow = [
3941
"Apache-2.0",
4042
"BSD-2-Clause",
4143
"BSD-3-Clause",
4244
"MIT",
4345
"Unicode-DFS-2016",
4446
]
45-
copyleft = "deny"
4647

4748
[[licenses.clarify]]
4849
name = "ring"

eip_operator/Cargo.toml

+4-14
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,10 @@ license = "Apache-2.0"
77
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
88

99
[dependencies]
10-
aws-config = { version = "0.55.1", default-features = false, features = [
11-
"native-tls",
12-
] }
13-
aws-sdk-ec2 = { version = "0.28", default-features = false, features = [
14-
"native-tls",
15-
"rt-tokio",
16-
] }
17-
aws-sdk-servicequotas = { version = "0.28", default-features = false, features = [
18-
"native-tls",
19-
"rt-tokio",
20-
] }
21-
aws-smithy-http = { version = "0.55", default-features = false, features = [
22-
"rt-tokio",
23-
] }
10+
aws-config = { version = "0.101", default-features = false}
11+
aws-sdk-ec2 = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
12+
aws-sdk-servicequotas = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
13+
aws-smithy-http = { version = "0.59", default-features = false, features = [ "rt-tokio" ] }
2414
futures = { workspace = true }
2515

2616

eip_operator/src/aws.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use aws_sdk_ec2::operation::disassociate_address::DisassociateAddressError;
99
use aws_sdk_ec2::operation::release_address::{ReleaseAddressError, ReleaseAddressOutput};
1010
use aws_sdk_ec2::types::{Address, DomainType, Filter, ResourceType, Tag, TagSpecification};
1111
use aws_sdk_ec2::Client as Ec2Client;
12-
use tracing::{debug, info, instrument};
12+
use tracing::{debug, error, info, instrument};
1313

1414
pub(crate) const LEGACY_CLUSTER_NAME_TAG: &str = "eip.aws.materialize.com/cluster_name";
1515

@@ -150,19 +150,24 @@ pub(crate) async fn describe_addresses_with_tag_value(
150150
pub(crate) async fn disassociate_eip(
151151
ec2_client: &Ec2Client,
152152
association_id: &str,
153-
) -> Result<(), SdkError<DisassociateAddressError>> {
153+
) -> Result<(), DisassociateAddressError> {
154154
match ec2_client
155155
.disassociate_address()
156156
.association_id(association_id)
157157
.send()
158158
.await
159159
{
160160
Ok(_) => Ok(()),
161-
Err(e) if e.to_string().contains("InvalidAssociationID.NotFound") => {
162-
info!(already_disassociated = true);
163-
Ok(())
161+
Err(e) => {
162+
let e = e.into_service_error();
163+
if e.meta().code() == Some("InvalidAssociationID.NotFound") {
164+
info!("Association id {} already disassociated", association_id);
165+
Ok(())
166+
} else {
167+
error!("Error disassociating {} - {:?}", association_id, e);
168+
Err(e)
169+
}
164170
}
165-
Err(e) => Err(e),
166171
}
167172
}
168173

eip_operator/src/controller/node.rs

+18-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use k8s_openapi::api::core::v1::Node;
22
use kube::api::{Api, ListParams};
33
use kube::Client;
44
use kube_runtime::controller::Action;
5-
use tracing::{event, instrument, Level};
5+
use tracing::{event, info, instrument, Level};
66

77
use eip_operator_shared::Error;
88

@@ -71,9 +71,17 @@ impl k8s_controller::Context for Context {
7171
if eip_description.network_interface_id != Some(eni_id.to_owned())
7272
|| eip_description.private_ip_address != Some(node_ip.to_owned())
7373
{
74-
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip).await?;
74+
if eip.status.as_ref().is_some_and(|s| s.resource_id.is_none()) {
75+
crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, name).await?;
76+
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip)
77+
.await?;
78+
} else {
79+
info!(
80+
"Found matching Eip {} for pod {}, but it is in use",
81+
eip_name, name
82+
);
83+
}
7584
}
76-
crate::eip::set_status_attached(&eip_api, eip_name, &eni_id, node_ip).await?;
7785

7886
Ok(None)
7987
}
@@ -94,7 +102,13 @@ impl k8s_controller::Context for Context {
94102
let eip = all_eips
95103
.into_iter()
96104
.filter(|eip| eip.attached())
97-
.find(|eip| eip.matches_node(node_labels));
105+
.find(|eip| {
106+
eip.matches_node(node_labels)
107+
&& eip
108+
.status
109+
.as_ref()
110+
.is_some_and(|s| s.resource_id == Some(node.metadata.name.clone().unwrap()))
111+
});
98112
if let Some(eip) = eip {
99113
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
100114
let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id)

eip_operator/src/controller/pod.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ impl k8s_controller::Context for Context {
7171
.into_iter()
7272
.find(|eip| eip.matches_pod(name))
7373
.ok_or_else(|| Error::NoEipResourceWithThatPodName(name.to_owned()))?;
74-
let eip_name = eip.name().ok_or(Error::MissingEipName)?;
7574
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
7675
let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id)
7776
.await?
@@ -82,10 +81,10 @@ impl k8s_controller::Context for Context {
8281
if eip_description.network_interface_id != Some(eni_id.to_owned())
8382
|| eip_description.private_ip_address != Some(pod_ip.to_owned())
8483
{
84+
crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, name).await?;
8585
crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, pod_ip).await?;
86+
add_dns_target_annotation(&pod_api, name, &public_ip, allocation_id).await?;
8687
}
87-
crate::eip::set_status_attached(&eip_api, eip_name, &eni_id, pod_ip).await?;
88-
add_dns_target_annotation(&pod_api, name, &public_ip, allocation_id).await?;
8988
Ok(None)
9089
}
9190

eip_operator/src/eip.rs

+24-20
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
22
use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams};
33
use kube::core::crd::merge_crds;
4-
use kube::{Client, CustomResourceExt};
4+
use kube::{Client, CustomResourceExt, ResourceExt};
55
use kube_runtime::wait::{await_condition, conditions};
66
use schemars::JsonSchema;
77
use serde::{Deserialize, Serialize};
@@ -121,7 +121,8 @@ pub mod v2 {
121121
printcolumn = r#"{"name": "PublicIP", "type": "string", "description": "Public IP address of the EIP.", "jsonPath": ".status.publicIpAddress"}"#,
122122
printcolumn = r#"{"name": "Selector", "type": "string", "description": "Selector for the pod or node to associate the EIP with.", "jsonPath": ".spec.selector", "priority": 1}"#,
123123
printcolumn = r#"{"name": "ENI", "type": "string", "description": "ID of the Elastic Network Interface of the pod.", "jsonPath": ".status.eni", "priority": 1}"#,
124-
printcolumn = r#"{"name": "PrivateIP", "type": "string", "description": "Private IP address of the pod.", "jsonPath": ".status.privateIpAddress", "priority": 1}"#
124+
printcolumn = r#"{"name": "PrivateIP", "type": "string", "description": "Private IP address of the pod.", "jsonPath": ".status.privateIpAddress", "priority": 1}"#,
125+
printcolumn = r#"{"name": "ResourceId", "type": "string", "description": "ID of resource the EIP is attached to..", "jsonPath": ".status.resourceId", "priority": 1}"#
125126
)]
126127
pub struct EipSpec {
127128
pub selector: EipSelector,
@@ -205,13 +206,14 @@ pub mod v2 {
205206
}
206207

207208
/// The status fields for the Eip Kubernetes custom resource.
208-
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
209+
#[derive(Clone, Default, Debug, Deserialize, Serialize, JsonSchema)]
209210
#[serde(rename_all = "camelCase")]
210211
pub struct EipStatus {
211212
pub allocation_id: Option<String>,
212213
pub public_ip_address: Option<String>,
213214
pub eni: Option<String>,
214215
pub private_ip_address: Option<String>,
216+
pub resource_id: Option<String>,
215217
}
216218

217219
/// Registers the Eip custom resource with Kubernetes,
@@ -335,26 +337,27 @@ pub(crate) async fn set_status_created(
335337
#[instrument(skip(api), err)]
336338
pub(crate) async fn set_status_attached(
337339
api: &Api<Eip>,
338-
name: &str,
340+
eip: &Eip,
339341
eni: &str,
340342
private_ip_address: &str,
341-
) -> Result<Eip, kube::Error> {
343+
resource_id: &str,
344+
) -> Result<Eip, Error> {
342345
event!(Level::INFO, "Updating status for attached EIP.");
343-
let patch = serde_json::json!({
344-
"apiVersion": Eip::version(),
345-
"kind": "Eip",
346-
"status": {
347-
"eni": eni,
348-
"privateIpAddress": private_ip_address,
349-
}
350-
});
351-
let patch = Patch::Merge(&patch);
352-
let params = PatchParams::default();
353-
let result = api.patch_status(name, &params, &patch).await;
354-
if result.is_ok() {
355-
event!(Level::INFO, "Done updating status for attached EIP.");
356-
}
357-
result
346+
let mut eip = eip.clone();
347+
let status = eip.status.as_mut().ok_or(Error::MissingEipStatus)?;
348+
status.eni = Some(eni.to_owned());
349+
status.private_ip_address = Some(private_ip_address.to_owned());
350+
status.resource_id = Some(resource_id.to_owned());
351+
let result = api
352+
.replace_status(
353+
&eip.name_unchecked(),
354+
&PostParams::default(),
355+
serde_json::to_vec(&eip.clone())?,
356+
)
357+
.await?;
358+
// let result = api.patch_status(name, &params, &patch).await;
359+
event!(Level::INFO, "Done updating status for attached EIP.");
360+
Ok(result)
358361
}
359362

360363
/// Unsets the eni and privateIpAddress fields in the Eip status.
@@ -367,6 +370,7 @@ pub(crate) async fn set_status_detached(api: &Api<Eip>, name: &str) -> Result<Ei
367370
"status": {
368371
"eni": None::<String>,
369372
"privateIpAddress": None::<String>,
373+
"resourceId": None::<String>,
370374
}
371375
});
372376
let patch = Patch::Merge(&patch);

eip_operator/src/main.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ async fn run() -> Result<(), Error> {
5353
let k8s_client = Client::try_default().await?;
5454

5555
debug!("Getting ec2_client...");
56-
let mut config_loader = aws_config::from_env();
56+
let mut config_loader = eip_operator_shared::aws_config_loader_default();
57+
5758
if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") {
5859
config_loader = config_loader.endpoint_url(endpoint);
5960
}
@@ -253,7 +254,7 @@ async fn report_eip_quota_status(
253254
quota_client: &ServiceQuotaClient,
254255
) -> Result<(), Error> {
255256
let addresses_result = ec2_client.describe_addresses().send().await?;
256-
let allocated = addresses_result.addresses().unwrap_or_default().len();
257+
let allocated = addresses_result.addresses().len();
257258
let quota_result = quota_client
258259
.get_service_quota()
259260
.service_code("ec2")

eip_operator_shared/Cargo.toml

+13-15
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,31 @@ license = "Apache-2.0"
77
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
88

99
[dependencies]
10-
aws-sdk-ec2 = { version = "0.28", default-features = false, features = [
11-
"native-tls",
12-
"rt-tokio",
13-
] }
14-
aws-sdk-servicequotas = { version = "0.28", default-features = false, features = [
15-
"native-tls",
16-
"rt-tokio",
17-
] }
18-
aws-smithy-http = { version = "0.55", default-features = false, features = [
19-
"rt-tokio",
20-
] }
10+
aws-config = { version = "0.101", default-features = false}
11+
aws-sdk-ec2 = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
12+
aws-sdk-servicequotas = { version = "0.38", default-features = false, features = [ "rt-tokio" ] }
13+
aws-smithy-http = { version = "0.59", default-features = false, features = [ "rt-tokio" ] }
14+
aws-smithy-runtime-api = "0.101"
15+
aws-smithy-runtime = { version = "0.101", features = ["connector-hyper-0-14-x"] }
16+
hyper-tls = { version = "0.5.0" }
17+
18+
2119
futures = "0.3"
2220
hyper = { version = "0.14.27", features = ["http2"] }
23-
hyper-tls = { version = "0.5.0" }
2421
kube = { workspace = true }
2522
kube-runtime = { workspace = true }
2623
native-tls = { version = "0.2.11", features = ["alpn"] }
27-
opentelemetry = { version = "0.20", features = ["rt-tokio", "trace"] }
28-
opentelemetry-otlp = { version = "0.13" }
24+
opentelemetry = { version = "0.21", features = ["trace"] }
25+
opentelemetry_sdk = { version = "0.21", features = ["trace", "rt-tokio"] }
26+
opentelemetry-otlp = { version = "0.14" }
2927
serde = "1"
3028
serde_json = "1"
3129
thiserror = "1"
3230
tokio-native-tls = { version = "0.3.1" }
3331
tokio = { workspace = true }
3432
tonic = { version = "0.9.2", features = ["transport"] }
3533
tracing = "0.1"
36-
tracing-opentelemetry = "0.20"
34+
tracing-opentelemetry = "0.22"
3735
tracing-subscriber = { version = "0.3", features = [
3836
"registry",
3937
"env-filter",

eip_operator_shared/src/lib.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ use std::net::AddrParseError;
44
use std::str::FromStr;
55
use std::time::Duration;
66

7+
use aws_config::{BehaviorVersion, ConfigLoader};
8+
use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
9+
710
use aws_sdk_ec2::error::SdkError;
811
use aws_sdk_ec2::operation::allocate_address::AllocateAddressError;
912
use aws_sdk_ec2::operation::associate_address::AssociateAddressError;
@@ -16,9 +19,9 @@ use aws_sdk_servicequotas::operation::get_service_quota::GetServiceQuotaError;
1619
use futures::Future;
1720
use hyper::client::HttpConnector;
1821
use hyper_tls::HttpsConnector;
19-
use opentelemetry::sdk::trace::{Config, Sampler};
20-
use opentelemetry::sdk::Resource as OtelResource;
2122
use opentelemetry::KeyValue;
23+
use opentelemetry_sdk::trace::{Config, Sampler};
24+
use opentelemetry_sdk::Resource as OtelResource;
2225
use tokio::time::error::Elapsed;
2326
use tonic::metadata::{MetadataKey, MetadataMap};
2427
use tonic::transport::Endpoint;
@@ -112,7 +115,7 @@ pub enum Error {
112115
#[error("AWS disassociate_address reported error: {source}")]
113116
AwsDisassociateAddress {
114117
#[from]
115-
source: SdkError<DisassociateAddressError>,
118+
source: DisassociateAddressError,
116119
},
117120
#[error("AWS release_address reported error: {source}")]
118121
AwsReleaseAddress {
@@ -254,7 +257,7 @@ where
254257
))
255258
.with_resource(otr),
256259
)
257-
.install_batch(opentelemetry::runtime::Tokio)
260+
.install_batch(opentelemetry_sdk::runtime::Tokio)
258261
.unwrap();
259262
let otel_layer = tracing_opentelemetry::layer()
260263
.with_tracer(tracer)
@@ -276,3 +279,8 @@ where
276279
};
277280
f().await
278281
}
282+
283+
pub fn aws_config_loader_default() -> ConfigLoader {
284+
aws_config::defaults(BehaviorVersion::latest())
285+
.http_client(HyperClientBuilder::new().build(HttpsConnector::new()))
286+
}

0 commit comments

Comments
 (0)