Skip to content

Commit b3bd1b0

Browse files
committed
cover more edge cases and rename some fns
1 parent 271f1c1 commit b3bd1b0

File tree

5 files changed

+48
-41
lines changed

5 files changed

+48
-41
lines changed

eip_operator/src/controller/eip.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::collections::HashMap;
44
use kube::api::{Api, PatchParams};
55
use kube::{Client, ResourceExt};
66
use kube_runtime::controller::Action;
7-
use tracing::{info, instrument};
7+
use tracing::{info, instrument, warn};
88

99
use eip_operator_shared::Error;
1010

@@ -108,18 +108,26 @@ impl k8s_controller::Context for Context {
108108
let data = resource.clone().data(serde_json::json!({
109109
"metadata": {
110110
"labels":{
111-
"eip.materialize.cloud/refresh": rand::thread_rng().gen::<u64>(),
111+
"eip.materialize.cloud/refresh": format!("{}",rand::thread_rng().gen::<u64>()),
112112

113113
}
114114
}
115115
}));
116-
resource_api
116+
if let Err(err) = resource_api
117117
.patch_metadata(
118118
&resource.name_unchecked(),
119119
&PatchParams::default(),
120120
&kube::core::params::Patch::Merge(serde_json::json!(data)),
121121
)
122-
.await?;
122+
.await
123+
{
124+
warn!(
125+
"Failed to patch resource {} refresh label for {}: err {:?}",
126+
resource.name_unchecked(),
127+
name,
128+
err
129+
);
130+
};
123131
}
124132
}
125133

eip_operator/src/controller/node.rs

+19-20
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use std::time::Duration;
22

33
use k8s_openapi::api::core::v1::Node;
44
use kube::api::{Api, ListParams};
5+
use kube::error::ErrorResponse;
56
use kube::{Client, ResourceExt};
67
use kube_runtime::controller::Action;
7-
use tracing::{event, info, instrument, Level};
8+
use tracing::{event, info, instrument, warn, Level};
89

910
use eip_operator_shared::Error;
1011

@@ -88,7 +89,9 @@ impl k8s_controller::Context for Context {
8889
if eip_description.network_interface_id != Some(eni_id.to_owned())
8990
|| eip_description.private_ip_address != Some(node_ip.to_owned())
9091
{
91-
match crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, &name).await {
92+
match crate::eip::set_status_should_attach(&eip_api, &eip, &eni_id, node_ip, &name)
93+
.await
94+
{
9295
Ok(_) => {
9396
info!("Found matching Eip, attaching it");
9497
let association_id = crate::aws::associate_eip(
@@ -103,16 +106,14 @@ impl k8s_controller::Context for Context {
103106
crate::eip::set_status_association_id(&eip_api, &eip_name, &association_id)
104107
.await?;
105108
}
106-
Err(err)
107-
if err
108-
.to_string()
109-
.contains("Operation cannot be fulfilled on eips.materialize.cloud") =>
110-
{
111-
info!(
109+
Err(Error::Kube {
110+
source: kube::Error::Api(ErrorResponse { reason, .. }),
111+
}) if reason == "Conflict" => {
112+
warn!(
112113
"Node {} failed to claim eip {}, rescheduling to try another",
113114
name, eip_name
114115
);
115-
return Ok(Some(Action::requeue(Duration::from_secs(1))));
116+
return Ok(Some(Action::requeue(Duration::from_secs(3))));
116117
}
117118
Err(e) => return Err(e),
118119
};
@@ -132,17 +133,15 @@ impl k8s_controller::Context for Context {
132133
);
133134
let node_labels = node.labels();
134135
let matched_eips = eip_api.list(&ListParams::default()).await?.items;
135-
let eip = matched_eips
136-
.into_iter()
137-
.filter(|eip| eip.attached())
138-
.find(|eip| {
139-
eip.matches_node(node_labels)
140-
&& eip
141-
.status
142-
.as_ref()
143-
.is_some_and(|s| s.resource_id == Some(node.name_unchecked().clone()))
144-
});
145-
if let Some(eip) = eip {
136+
// find all eips that match (there should be one, but lets not lean on that)
137+
let eips = matched_eips.into_iter().filter(|eip| {
138+
eip.matches_node(node_labels)
139+
&& eip
140+
.status
141+
.as_ref()
142+
.is_some_and(|s| s.resource_id == Some(node.name_unchecked().clone()))
143+
});
144+
for eip in eips {
146145
let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?;
147146
let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id)
148147
.await?

eip_operator/src/controller/pod.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use std::time::Duration;
2+
13
use k8s_openapi::api::core::v1::{Node, Pod};
24
use kube::api::{Api, ListParams, Patch, PatchParams};
35
use kube::{Client, ResourceExt};
46
use kube_runtime::controller::Action;
57
use serde::Deserialize;
6-
use tracing::{event, instrument, Level};
8+
use tracing::{event, instrument, warn, Level};
79

810
use eip_operator_shared::Error;
911

@@ -44,9 +46,14 @@ impl k8s_controller::Context for Context {
4446
crate::eip::create_for_pod(&eip_api, &name).await?;
4547
}
4648

49+
let node_name = match pod.node_name() {
50+
Some(node_name) => node_name,
51+
None => {
52+
warn!("Pod {} is not yet scheduled", name);
53+
return Ok(Some(Action::requeue(Duration::from_secs(3))));
54+
}
55+
};
4756
let pod_ip = pod.ip().ok_or(Error::MissingPodIp)?;
48-
let node_name = pod.node_name().ok_or(Error::MissingNodeName)?;
49-
5057
let node = node_api.get(node_name).await?;
5158

5259
let provider_id = node.provider_id().ok_or(Error::MissingProviderId)?;
@@ -84,8 +91,7 @@ impl k8s_controller::Context for Context {
8491
.ok_or(Error::MissingAddresses)?
8592
.swap_remove(0);
8693
let public_ip = eip_description.public_ip.ok_or(Error::MissingPublicIp)?;
87-
// having multiple EIPs
88-
crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, &name).await?;
94+
crate::eip::set_status_should_attach(&eip_api, &eip, &eni_id, pod_ip, &name).await?;
8995
if eip_description.network_interface_id != Some(eni_id.to_owned())
9096
|| eip_description.private_ip_address != Some(pod_ip.to_owned())
9197
{

eip_operator/src/eip.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,6 @@ pub mod v2 {
141141
Api::<Self>::namespaced(k8s_client, namespace.unwrap_or("default"))
142142
}
143143

144-
pub fn attached(&self) -> bool {
145-
self.status
146-
.as_ref()
147-
.map_or(false, |status| status.private_ip_address.is_some())
148-
}
149-
150144
pub fn matches_pod(&self, pod_name: &str) -> bool {
151145
match self.spec.selector {
152146
EipSelector::Pod {
@@ -388,7 +382,7 @@ pub(crate) async fn set_status_association_id(
388382
name: &str,
389383
association_id: &str,
390384
) -> Result<Eip, kube::Error> {
391-
event!(Level::INFO, "Updating status for created EIP.");
385+
event!(Level::INFO, "Updating status for assocaited EIP.");
392386
let patch = serde_json::json!({
393387
"apiVersion": Eip::version(),
394388
"kind": "Eip",
@@ -400,21 +394,21 @@ pub(crate) async fn set_status_association_id(
400394
let params = PatchParams::default();
401395
let result = api.patch_status(name, &params, &patch).await;
402396
if result.is_ok() {
403-
event!(Level::INFO, "Done updating status for created EIP.");
397+
event!(Level::INFO, "Done updating status for assocaited EIP.");
404398
}
405399
result
406400
}
407401

408402
/// Sets the eni and privateIpAddress fields in the Eip status.
409403
#[instrument(skip(api), err)]
410-
pub(crate) async fn set_status_attached(
404+
pub(crate) async fn set_status_should_attach(
411405
api: &Api<Eip>,
412406
eip: &Eip,
413407
eni: &str,
414408
private_ip_address: &str,
415409
resource_id: &str,
416410
) -> Result<Eip, Error> {
417-
event!(Level::INFO, "Updating status for attached EIP.");
411+
event!(Level::INFO, "Updating status for attaching EIP.");
418412
let mut eip = eip.clone();
419413
let status = eip.status.as_mut().ok_or(Error::MissingEipStatus)?;
420414
status.eni = Some(eni.to_owned());
@@ -428,7 +422,7 @@ pub(crate) async fn set_status_attached(
428422
)
429423
.await?;
430424
// let result = api.patch_status(name, &params, &patch).await;
431-
event!(Level::INFO, "Done updating status for attached EIP.");
425+
event!(Level::INFO, "Done updating status for attaching EIP.");
432426
Ok(result)
433427
}
434428

eip_operator_shared/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub enum Error {
5050
#[from]
5151
source: kube_runtime::wait::Error,
5252
},
53-
#[error("No EIP found with that podName.")]
53+
#[error("No EIP found with that podName `{0}`.")]
5454
NoEipResourceWithThatPodName(String),
5555
#[error("No EIP found with that node selector.")]
5656
NoEipResourceWithThatNodeSelector,

0 commit comments

Comments
 (0)