From 484a96ebf74fc9f0f512cbba6f7dc1ca58320c18 Mon Sep 17 00:00:00 2001 From: Justin Bradfield Date: Fri, 15 Mar 2024 00:37:08 -0500 Subject: [PATCH 1/4] fixes bug where multiple nodes fight over an eip - introduces resource_ids to eip status - moves setting attachment status to be before an attempt at attaching - moves status update from patch to replace this should raise errors if the resource has changed allowing the resource_id to function somewhat as a lock - resource won't attempt to claim a resource with a resource id - fixes cargo deny errors --- Cargo.lock | 346 ++++++++++++++-------------- Cargo.toml | 2 +- deny.toml | 9 +- eip_operator/Cargo.toml | 18 +- eip_operator/src/aws.rs | 17 +- eip_operator/src/controller/eip.rs | 43 +++- eip_operator/src/controller/node.rs | 53 ++++- eip_operator/src/controller/pod.rs | 5 +- eip_operator/src/eip.rs | 94 ++++++-- eip_operator/src/main.rs | 5 +- eip_operator_shared/Cargo.toml | 28 ++- eip_operator_shared/src/lib.rs | 16 +- 12 files changed, 388 insertions(+), 248 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2e547c..88a4931 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,177 +85,159 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "0.55.3" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcdcf0d683fe9c23d32cf5b53c9918ea0a500375a9fb20109802552658e576c9" +checksum = "84f9625b71b3ee4adbfbca369c6680d156e316ed86d2c7199a2a134563917414" dependencies = [ "aws-credential-types", "aws-http", + "aws-runtime", "aws-sdk-sts", "aws-smithy-async", - "aws-smithy-client", "aws-smithy-http", - "aws-smithy-http-tower", "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.0.1", "http", "hyper", "time", "tokio", - "tower", "tracing", ] [[package]] name = "aws-credential-types" -version = "0.55.3" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fcdb2f7acbc076ff5ad05e7864bdb191ca70a6fd07668dc3a1a8bcd051de5ae" +checksum = "5924466398ac76ffd411d297b9d516dcebb0577f7344c0c15fd8e8e04d9c7895" dependencies = [ "aws-smithy-async", + "aws-smithy-runtime-api", "aws-smithy-types", - "fastrand", - "tokio", - "tracing", "zeroize", ] [[package]] -name = "aws-endpoint" -version = "0.55.3" +name = "aws-http" +version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cce1c41a6cfaa726adee9ebb9a56fcd2bbfd8be49fd8a04c5e20fd968330b04" +checksum = "bb9a3aa335a105a00975c971f1dad403c3175f2a210d98f39345c6af53923912" dependencies = [ - "aws-smithy-http", + "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", + "bytes", "http", - "regex", + "http-body", + "pin-project-lite", "tracing", ] [[package]] -name = "aws-http" -version = "0.55.3" +name = "aws-runtime" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aadbc44e7a8f3e71c8b374e03ecd972869eb91dd2bc89ed018954a52ba84bc44" +checksum = "b75844ecbdf3dc5e0f5ac5fd1088fb1623849990ea9445d2826258ce63be4de5" dependencies = [ "aws-credential-types", + "aws-http", + "aws-sigv4", + "aws-smithy-async", "aws-smithy-http", + "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes", + "fastrand 2.0.1", "http", - "http-body", - "lazy_static", "percent-encoding", - "pin-project-lite", "tracing", + "uuid", ] [[package]] name = "aws-sdk-ec2" -version = "0.28.0" +version = "0.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab2493c5857725eeafe12ec66ba4ce6feb3355e3af6828d9ef28d6152972a27" +checksum = "0eadf3b2bbaed2435729f6aadc5355d06a8e00cf819309a67d736594b04880ba" dependencies = [ "aws-credential-types", - "aws-endpoint", "aws-http", - "aws-sig-auth", + "aws-runtime", "aws-smithy-async", - "aws-smithy-client", "aws-smithy-http", - "aws-smithy-http-tower", "aws-smithy-json", "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes", - "fastrand", + "fastrand 2.0.1", "http", "regex", - "tokio-stream", - "tower", "tracing", ] [[package]] name = "aws-sdk-servicequotas" -version = "0.28.0" +version = "0.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87bc114a6e96f15b8ca1a68621ef44ba4ded021e9587633efac3cfedf716929a" +checksum = "e9177a64ca9f65cf293e2f3223b3e87594a3271688c741a61ff03574c6a0fdf4" dependencies = [ "aws-credential-types", - "aws-endpoint", "aws-http", - "aws-sig-auth", + "aws-runtime", "aws-smithy-async", - "aws-smithy-client", "aws-smithy-http", - "aws-smithy-http-tower", "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", "bytes", "http", "regex", - "tokio-stream", - "tower", "tracing", ] [[package]] name = "aws-sdk-sts" -version = "0.28.0" +version = "0.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "265fac131fbfc188e5c3d96652ea90ecc676a934e3174eaaee523c6cec040b3b" +checksum = "a4e3958c43a78f6c3822e62009a35802af5cc7c120fbe8e60b98565604569aae" dependencies = [ "aws-credential-types", - "aws-endpoint", "aws-http", - "aws-sig-auth", + "aws-runtime", "aws-smithy-async", - "aws-smithy-client", "aws-smithy-http", - "aws-smithy-http-tower", "aws-smithy-json", "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes", "http", "regex", - "tower", - "tracing", -] - -[[package]] -name = "aws-sig-auth" -version = "0.55.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b94acb10af0c879ecd5c7bdf51cda6679a0a4f4643ce630905a77673bfa3c61" -dependencies = [ - "aws-credential-types", - "aws-sigv4", - "aws-smithy-http", - "aws-types", - "http", "tracing", ] [[package]] name = "aws-sigv4" -version = "0.55.3" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d2ce6f507be68e968a33485ced670111d1cbad161ddbbab1e313c03d37d8f4c" +checksum = "06130e3686db3c5ae2fc44b3516fffe6b4d4eccebe09bd8ccc4067f3c9c183fb" dependencies = [ + "aws-credential-types", "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", "form_urlencoded", "hex", "hmac", @@ -270,127 +252,132 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "0.55.3" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13bda3996044c202d75b91afeb11a9afae9db9a721c6a7a427410018e286b880" +checksum = "d787b7e07925b450bed90d9d29ac8e57006c9c2ac907151d175ac0e376bfee0e" dependencies = [ "futures-util", "pin-project-lite", "tokio", - "tokio-stream", ] [[package]] -name = "aws-smithy-client" -version = "0.55.3" +name = "aws-smithy-http" +version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a86aa6e21e86c4252ad6a0e3e74da9617295d8d6e374d552be7d3059c41cedd" +checksum = "96daaad925331c72449423574fdc72b54af780d5a23ace3c0a6ad0ccbf378715" dependencies = [ - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-http-tower", + "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "fastrand", + "bytes-utils", + "futures-core", "http", "http-body", - "hyper", - "hyper-tls", + "once_cell", + "percent-encoding", "pin-project-lite", - "tokio", - "tower", + "pin-utils", "tracing", ] [[package]] -name = "aws-smithy-http" -version = "0.55.3" +name = "aws-smithy-json" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ff985bee3fe21046dc501fadc1d04a1161977c55a0cbbccd9b111c18206aa64" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4006503693766d34717efc5f58325062845fce26a683a71b70f23156d72e67" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b3b693869133551f135e1f2c77cb0b8277d9e3e17feaf2213f735857c4f0d28" +checksum = "d28af854558601b4202a4273b9720aebe43d73e472143e6056f16e3bd90bc837" dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "bytes-utils", - "futures-core", + "fastrand 2.0.1", "http", "http-body", "hyper", "once_cell", - "percent-encoding", "pin-project-lite", "pin-utils", "tokio", - "tokio-util", "tracing", ] [[package]] -name = "aws-smithy-http-tower" -version = "0.55.3" +name = "aws-smithy-runtime-api" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ae4f6c5798a247fac98a867698197d9ac22643596dc3777f0c76b91917616b9" +checksum = "e1c68e17e754b86da350b43add38294189121a880e9c3fb454f83ff7044f5257" dependencies = [ - "aws-smithy-http", + "aws-smithy-async", "aws-smithy-types", "bytes", "http", - "http-body", "pin-project-lite", - "tower", + "tokio", "tracing", ] -[[package]] -name = "aws-smithy-json" -version = "0.55.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23f9f42fbfa96d095194a632fbac19f60077748eba536eb0b9fecc28659807f8" -dependencies = [ - "aws-smithy-types", -] - -[[package]] -name = "aws-smithy-query" -version = "0.55.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98819eb0b04020a1c791903533b638534ae6c12e2aceda3e6e6fba015608d51d" -dependencies = [ - "aws-smithy-types", - "urlencoding", -] - [[package]] name = "aws-smithy-types" -version = "0.55.3" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16a3d0bf4f324f4ef9793b86a1701d9700fbcdbd12a846da45eed104c634c6e8" +checksum = "d97b978d8a351ea5744206ecc643a1d3806628680e9f151b4d6b7a76fec1596f" dependencies = [ "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", "itoa", "num-integer", + "pin-project-lite", + "pin-utils", "ryu", + "serde", "time", + "tokio", + "tokio-util", ] [[package]] name = "aws-smithy-xml" -version = "0.55.3" +version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1b9d12875731bd07e767be7baad95700c3137b56730ec9ddeedb52a5e5ca63b" +checksum = "97500a0d0884b9576e65076075f81d899cfbb84f7db5ca1dd317f0582204e528" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "0.55.3" +version = "0.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dd209616cc8d7bfb82f87811a5c655dc97537f592689b18743bddf5dc5c4829" +checksum = "61065f0c6070cb0f9aaddfa614605fb1049908481da71ba5b39b2ffca12f57e4" dependencies = [ "aws-credential-types", "aws-smithy-async", - "aws-smithy-client", - "aws-smithy-http", + "aws-smithy-runtime-api", "aws-smithy-types", "http", "rustc_version", @@ -722,9 +709,12 @@ name = "eip-operator-shared" version = "0.3.0" dependencies = [ "async-trait", + "aws-config", "aws-sdk-ec2", "aws-sdk-servicequotas", "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", "futures", "hyper", "hyper-tls", @@ -733,6 +723,7 @@ dependencies = [ "native-tls", "opentelemetry", "opentelemetry-otlp", + "opentelemetry_sdk", "rand", "rtnetlink", "serde", @@ -788,6 +779,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + [[package]] name = "fnv" version = "1.0.7" @@ -934,6 +931,12 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.24" @@ -1375,9 +1378,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "linked-hash-map" @@ -1454,9 +1457,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi", @@ -1668,26 +1671,32 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ - "opentelemetry_api", - "opentelemetry_sdk", + "futures-core", + "futures-sink", + "indexmap 2.2.1", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", ] [[package]] name = "opentelemetry-otlp" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275" +checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" dependencies = [ "async-trait", "futures-core", "http", + "opentelemetry", "opentelemetry-proto", "opentelemetry-semantic-conventions", - "opentelemetry_api", "opentelemetry_sdk", "prost", "thiserror", @@ -1697,11 +1706,11 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb" +checksum = "a2e155ce5cc812ea3d1dffbd1539aed653de4bf4882d60e6e04dcf0901d674e1" dependencies = [ - "opentelemetry_api", + "opentelemetry", "opentelemetry_sdk", "prost", "tonic", @@ -1709,47 +1718,30 @@ dependencies = [ [[package]] name = "opentelemetry-semantic-conventions" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" dependencies = [ "opentelemetry", ] -[[package]] -name = "opentelemetry_api" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" -dependencies = [ - "futures-channel", - "futures-util", - "indexmap 1.9.3", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror", - "urlencoding", -] - [[package]] name = "opentelemetry_sdk" -version = "0.20.0" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" +checksum = "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" dependencies = [ "async-trait", "crossbeam-channel", "futures-channel", "futures-executor", "futures-util", + "glob", "once_cell", - "opentelemetry_api", - "ordered-float 3.9.2", + "opentelemetry", + "ordered-float 4.2.0", "percent-encoding", "rand", - "regex", - "serde_json", "thiserror", "tokio", "tokio-stream", @@ -1766,9 +1758,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.9.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" dependencies = [ "num-traits", ] @@ -2285,7 +2277,7 @@ checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6" dependencies = [ "autocfg", "cfg-if", - "fastrand", + "fastrand 1.9.0", "redox_syscall", "rustix", "windows-sys", @@ -2551,27 +2543,31 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.3" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ - "lazy_static", "log", + "once_cell", "tracing-core", ] [[package]] name = "tracing-opentelemetry" -version = "0.20.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc09e402904a5261e42cf27aea09ccb7d5318c6717a9eec3d8e2e65c56b18f19" +checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596" dependencies = [ + "js-sys", "once_cell", "opentelemetry", + "opentelemetry_sdk", + "smallvec", "tracing", "tracing-core", "tracing-log", "tracing-subscriber", + "web-time", ] [[package]] @@ -2586,9 +2582,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "matchers", "nu-ansi-term", @@ -2644,6 +2640,12 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9" +[[package]] +name = "uuid" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" + [[package]] name = "valuable" version = "0.1.0" @@ -2737,6 +2739,16 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "web-time" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa30049b1c872b72c89866d458eae9f20380ab280ffd1b1e18df2d3e2d98cfe0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 29932ea..99f8b7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ [workspace.package] edition = "2021" -rust-version = "1.74.0" +rust-version = "1.76.0" # Use this section only to change the source of dependencies that might diff --git a/deny.toml b/deny.toml index 4884e25..25d42ae 100644 --- a/deny.toml +++ b/deny.toml @@ -1,3 +1,4 @@ +[graph] targets = [ { triple = "aarch64-apple-darwin" }, { triple = "aarch64-unknown-linux-gnu" }, @@ -6,7 +7,7 @@ targets = [ ] [advisories] -vulnerability = "deny" +version = 2 [bans] multiple-versions = "deny" @@ -20,9 +21,9 @@ skip = [ { name = "hashbrown", version = "0.12.3" }, { name = "hashbrown", version = "0.14.0" }, { name = "nix", version = "0.26.4" }, - { name = "nix", version = "0.27.1" }, { name = "ordered-float", version = "2.10.0" }, - { name = "ordered-float", version = "3.4.0" }, + { name = "fastrand", version = "2.0.1" }, + { name = "regex-syntax", version = "0.6.29" }, ] # Use `tracing` instead. @@ -35,6 +36,7 @@ name = "env_logger" name = "rustls" [licenses] +version = 2 allow = [ "Apache-2.0", "BSD-2-Clause", @@ -42,7 +44,6 @@ allow = [ "MIT", "Unicode-DFS-2016", ] -copyleft = "deny" [[licenses.clarify]] name = "ring" diff --git a/eip_operator/Cargo.toml b/eip_operator/Cargo.toml index d30c4ad..0cfdb6e 100644 --- a/eip_operator/Cargo.toml +++ b/eip_operator/Cargo.toml @@ -7,20 +7,10 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -aws-config = { version = "0.55.1", default-features = false, features = [ - "native-tls", -] } -aws-sdk-ec2 = { version = "0.28", default-features = false, features = [ - "native-tls", - "rt-tokio", -] } -aws-sdk-servicequotas = { version = "0.28", default-features = false, features = [ - "native-tls", - "rt-tokio", -] } -aws-smithy-http = { version = "0.55", default-features = false, features = [ - "rt-tokio", -] } +aws-config = { version = "0.101", default-features = false} +aws-sdk-ec2 = { version = "0.38", default-features = false, features = [ "rt-tokio" ] } +aws-sdk-servicequotas = { version = "0.38", default-features = false, features = [ "rt-tokio" ] } +aws-smithy-http = { version = "0.59", default-features = false, features = [ "rt-tokio" ] } futures = { workspace = true } diff --git a/eip_operator/src/aws.rs b/eip_operator/src/aws.rs index 310d9a0..c24b31a 100644 --- a/eip_operator/src/aws.rs +++ b/eip_operator/src/aws.rs @@ -9,7 +9,7 @@ use aws_sdk_ec2::operation::disassociate_address::DisassociateAddressError; use aws_sdk_ec2::operation::release_address::{ReleaseAddressError, ReleaseAddressOutput}; use aws_sdk_ec2::types::{Address, DomainType, Filter, ResourceType, Tag, TagSpecification}; use aws_sdk_ec2::Client as Ec2Client; -use tracing::{debug, info, instrument}; +use tracing::{debug, error, info, instrument}; pub(crate) const LEGACY_CLUSTER_NAME_TAG: &str = "eip.aws.materialize.com/cluster_name"; @@ -150,7 +150,7 @@ pub(crate) async fn describe_addresses_with_tag_value( pub(crate) async fn disassociate_eip( ec2_client: &Ec2Client, association_id: &str, -) -> Result<(), SdkError> { +) -> Result<(), DisassociateAddressError> { match ec2_client .disassociate_address() .association_id(association_id) @@ -158,11 +158,16 @@ pub(crate) async fn disassociate_eip( .await { Ok(_) => Ok(()), - Err(e) if e.to_string().contains("InvalidAssociationID.NotFound") => { - info!(already_disassociated = true); - Ok(()) + Err(e) => { + let e = e.into_service_error(); + if e.meta().code() == Some("InvalidAssociationID.NotFound") { + info!("Association id {} already disassociated", association_id); + Ok(()) + } else { + error!("Error disassociating {} - {:?}", association_id, e); + Err(e) + } } - Err(e) => Err(e), } } diff --git a/eip_operator/src/controller/eip.rs b/eip_operator/src/controller/eip.rs index 0328f94..701e9c3 100644 --- a/eip_operator/src/controller/eip.rs +++ b/eip_operator/src/controller/eip.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; -use kube::api::Api; +use kube::api::{Api, PatchParams}; use kube::{Client, ResourceExt}; use kube_runtime::controller::Action; -use tracing::instrument; +use tracing::{info, instrument}; use eip_operator_shared::Error; @@ -42,7 +43,7 @@ impl k8s_controller::Context for Context { client: Client, eip: &Self::Resource, ) -> Result, Self::Error> { - let eip_api = Api::namespaced(client, &eip.namespace().unwrap()); + let eip_api = Api::namespaced(client.clone(), &eip.namespace().unwrap()); let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?; let name = eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?; @@ -87,6 +88,42 @@ impl k8s_controller::Context for Context { } }; crate::eip::set_status_created(&eip_api, name, &allocation_id, &public_ip).await?; + + if eip.status.as_ref().is_some_and(|s| s.resource_id.is_some()) { + // do nothing + } else { + let resource_api = eip.get_resource_api(&client); + let matched_resources = resource_api + .list(&eip.get_resource_list_params()) + .await? + .items; + info!( + "Eip apply for {} Found matched {} resources", + name, + matched_resources.len() + ); + for resource in matched_resources { + info!( + "Updating eip refresh label for {}", + resource.name_unchecked() + ); + let data = resource.clone().data(serde_json::json!({ + "metadata": { + "labels":{ + "eip.materialize.cloud/refresh": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string() + } + } + })); + resource_api + .patch_metadata( + &resource.name_unchecked(), + &PatchParams::default(), + &kube::core::params::Patch::Merge(serde_json::json!(data)), + ) + .await?; + } + } + Ok(None) } diff --git a/eip_operator/src/controller/node.rs b/eip_operator/src/controller/node.rs index b409c78..ba2589b 100644 --- a/eip_operator/src/controller/node.rs +++ b/eip_operator/src/controller/node.rs @@ -1,8 +1,10 @@ +use std::time::Duration; + use k8s_openapi::api::core::v1::Node; use kube::api::{Api, ListParams}; use kube::Client; use kube_runtime::controller::Action; -use tracing::{event, instrument, Level}; +use tracing::{event, info, instrument, Level}; use eip_operator_shared::Error; @@ -51,11 +53,24 @@ impl k8s_controller::Context for Context { .rsplit_once('/') .ok_or(Error::MalformedProviderId)? .1; - let all_eips = eip_api.list(&ListParams::default()).await?.items; + let all_eips: Vec = eip_api + .list(&ListParams::default()) + .await? + .items + .into_iter() + .filter(|eip| eip.matches_node(node_labels)) + .collect(); + if all_eips.is_empty() { + return Err(Error::NoEipResourceWithThatNodeSelector); + } let eip = all_eips .into_iter() - .find(|eip| eip.matches_node(node_labels)) - .ok_or(Error::NoEipResourceWithThatNodeSelector)?; + .find(|eip| eip.status.as_ref().is_some_and(|s| s.resource_id.is_none())); + if eip.is_none() { + info!("No un-associated eips found for node {}", name); + return Ok(None); + } + let eip = eip.unwrap(); let eip_name = eip.name().ok_or(Error::MissingEipName)?; let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id) @@ -71,10 +86,26 @@ impl k8s_controller::Context for Context { if eip_description.network_interface_id != Some(eni_id.to_owned()) || eip_description.private_ip_address != Some(node_ip.to_owned()) { - crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip).await?; + match crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, name).await { + Ok(_) => { + info!("Found matching Eip, claiming it"); + crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip) + .await?; + } + Err(err) + if err + .to_string() + .contains("Operation cannot be fulfilled on eips.materialize.cloud") => + { + info!( + "Pod {} failed to claim eip {}, rescheduling to try another", + name, eip_name + ); + return Ok(Some(Action::requeue(Duration::from_secs(1)))); + } + Err(e) => return Err(e), + }; } - crate::eip::set_status_attached(&eip_api, eip_name, &eni_id, node_ip).await?; - Ok(None) } @@ -94,7 +125,13 @@ impl k8s_controller::Context for Context { let eip = all_eips .into_iter() .filter(|eip| eip.attached()) - .find(|eip| eip.matches_node(node_labels)); + .find(|eip| { + eip.matches_node(node_labels) + && eip + .status + .as_ref() + .is_some_and(|s| s.resource_id == Some(node.metadata.name.clone().unwrap())) + }); if let Some(eip) = eip { let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id) diff --git a/eip_operator/src/controller/pod.rs b/eip_operator/src/controller/pod.rs index d47e2e7..caef868 100644 --- a/eip_operator/src/controller/pod.rs +++ b/eip_operator/src/controller/pod.rs @@ -71,7 +71,6 @@ impl k8s_controller::Context for Context { .into_iter() .find(|eip| eip.matches_pod(name)) .ok_or_else(|| Error::NoEipResourceWithThatPodName(name.to_owned()))?; - let eip_name = eip.name().ok_or(Error::MissingEipName)?; let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id) .await? @@ -82,10 +81,10 @@ impl k8s_controller::Context for Context { if eip_description.network_interface_id != Some(eni_id.to_owned()) || eip_description.private_ip_address != Some(pod_ip.to_owned()) { + crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, name).await?; crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, pod_ip).await?; + add_dns_target_annotation(&pod_api, name, &public_ip, allocation_id).await?; } - crate::eip::set_status_attached(&eip_api, eip_name, &eni_id, pod_ip).await?; - add_dns_target_annotation(&pod_api, name, &public_ip, allocation_id).await?; Ok(None) } diff --git a/eip_operator/src/eip.rs b/eip_operator/src/eip.rs index d7594c6..ce5392f 100644 --- a/eip_operator/src/eip.rs +++ b/eip_operator/src/eip.rs @@ -1,7 +1,7 @@ use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams}; use kube::core::crd::merge_crds; -use kube::{Client, CustomResourceExt}; +use kube::{Client, CustomResourceExt, ResourceExt}; use kube_runtime::wait::{await_condition, conditions}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -63,7 +63,10 @@ pub mod v1 { } pub mod v2 { - use kube::api::Api; + use k8s_openapi::api::core::v1::{Node, Pod}; + use kube::api::{Api, ListParams}; + use kube::core::{DynamicObject, GroupVersionKind}; + use kube::discovery::ApiResource; use kube::{Client, CustomResource, Resource}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -121,7 +124,8 @@ pub mod v2 { printcolumn = r#"{"name": "PublicIP", "type": "string", "description": "Public IP address of the EIP.", "jsonPath": ".status.publicIpAddress"}"#, printcolumn = r#"{"name": "Selector", "type": "string", "description": "Selector for the pod or node to associate the EIP with.", "jsonPath": ".spec.selector", "priority": 1}"#, printcolumn = r#"{"name": "ENI", "type": "string", "description": "ID of the Elastic Network Interface of the pod.", "jsonPath": ".status.eni", "priority": 1}"#, - printcolumn = r#"{"name": "PrivateIP", "type": "string", "description": "Private IP address of the pod.", "jsonPath": ".status.privateIpAddress", "priority": 1}"# + printcolumn = r#"{"name": "PrivateIP", "type": "string", "description": "Private IP address of the pod.", "jsonPath": ".status.privateIpAddress", "priority": 1}"#, + printcolumn = r#"{"name": "ResourceId", "type": "string", "description": "ID of resource the EIP is attached to..", "jsonPath": ".status.resourceId", "priority": 1}"# )] pub struct EipSpec { pub selector: EipSelector, @@ -173,6 +177,51 @@ pub mod v2 { _ => false, } } + pub fn get_resource_api(&self, client: &Client) -> Api { + match self.spec.selector { + EipSelector::Pod { pod_name: _ } => { + let gvk = GroupVersionKind::gvk( + Pod::group(&()).as_ref(), + Pod::version(&()).as_ref(), + Pod::kind(&()).as_ref(), + ); + let api_resource = ApiResource::from_gvk(&gvk); + // Api::namespaced(client, &self.namespace().unwrap()) + Api::default_namespaced_with(client.clone(), &api_resource) + } + EipSelector::Node { selector: _ } => { + let gvk = GroupVersionKind::gvk( + Node::group(&()).as_ref(), + Node::version(&()).as_ref(), + Node::kind(&()).as_ref(), + ); + let api_resource = ApiResource::from_gvk(&gvk); + // Api::namespaced(client, &self.namespace().unwrap()) + Api::all_with(client.clone(), &api_resource) + // Api::all(client), + } + } + } + + pub fn get_resource_list_params(&self) -> ListParams { + match self.spec.selector { + EipSelector::Pod { ref pod_name } => ListParams { + field_selector: Some(format!("metadata.name={}", pod_name)), + ..Default::default() + }, + EipSelector::Node { ref selector } => { + let label_selector = selector + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(","); + ListParams { + label_selector: Some(label_selector), + ..Default::default() + } + } + } + } pub fn allocation_id(&self) -> Option<&str> { self.status @@ -205,13 +254,14 @@ pub mod v2 { } /// The status fields for the Eip Kubernetes custom resource. -#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[derive(Clone, Default, Debug, Deserialize, Serialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct EipStatus { pub allocation_id: Option, pub public_ip_address: Option, pub eni: Option, pub private_ip_address: Option, + pub resource_id: Option, } /// Registers the Eip custom resource with Kubernetes, @@ -335,26 +385,27 @@ pub(crate) async fn set_status_created( #[instrument(skip(api), err)] pub(crate) async fn set_status_attached( api: &Api, - name: &str, + eip: &Eip, eni: &str, private_ip_address: &str, -) -> Result { + resource_id: &str, +) -> Result { event!(Level::INFO, "Updating status for attached EIP."); - let patch = serde_json::json!({ - "apiVersion": Eip::version(), - "kind": "Eip", - "status": { - "eni": eni, - "privateIpAddress": private_ip_address, - } - }); - let patch = Patch::Merge(&patch); - let params = PatchParams::default(); - let result = api.patch_status(name, ¶ms, &patch).await; - if result.is_ok() { - event!(Level::INFO, "Done updating status for attached EIP."); - } - result + let mut eip = eip.clone(); + let status = eip.status.as_mut().ok_or(Error::MissingEipStatus)?; + status.eni = Some(eni.to_owned()); + status.private_ip_address = Some(private_ip_address.to_owned()); + status.resource_id = Some(resource_id.to_owned()); + let result = api + .replace_status( + &eip.name_unchecked(), + &PostParams::default(), + serde_json::to_vec(&eip.clone())?, + ) + .await?; + // let result = api.patch_status(name, ¶ms, &patch).await; + event!(Level::INFO, "Done updating status for attached EIP."); + Ok(result) } /// Unsets the eni and privateIpAddress fields in the Eip status. @@ -367,6 +418,7 @@ pub(crate) async fn set_status_detached(api: &Api, name: &str) -> Result, "privateIpAddress": None::, + "resourceId": None::, } }); let patch = Patch::Merge(&patch); diff --git a/eip_operator/src/main.rs b/eip_operator/src/main.rs index 50a23ee..22df5ea 100644 --- a/eip_operator/src/main.rs +++ b/eip_operator/src/main.rs @@ -53,7 +53,8 @@ async fn run() -> Result<(), Error> { let k8s_client = Client::try_default().await?; debug!("Getting ec2_client..."); - let mut config_loader = aws_config::from_env(); + let mut config_loader = eip_operator_shared::aws_config_loader_default(); + if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") { config_loader = config_loader.endpoint_url(endpoint); } @@ -253,7 +254,7 @@ async fn report_eip_quota_status( quota_client: &ServiceQuotaClient, ) -> Result<(), Error> { let addresses_result = ec2_client.describe_addresses().send().await?; - let allocated = addresses_result.addresses().unwrap_or_default().len(); + let allocated = addresses_result.addresses().len(); let quota_result = quota_client .get_service_quota() .service_code("ec2") diff --git a/eip_operator_shared/Cargo.toml b/eip_operator_shared/Cargo.toml index 4c8c758..ace288e 100644 --- a/eip_operator_shared/Cargo.toml +++ b/eip_operator_shared/Cargo.toml @@ -7,25 +7,23 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -aws-sdk-ec2 = { version = "0.28", default-features = false, features = [ - "native-tls", - "rt-tokio", -] } -aws-sdk-servicequotas = { version = "0.28", default-features = false, features = [ - "native-tls", - "rt-tokio", -] } -aws-smithy-http = { version = "0.55", default-features = false, features = [ - "rt-tokio", -] } +aws-config = { version = "0.101", default-features = false} +aws-sdk-ec2 = { version = "0.38", default-features = false, features = [ "rt-tokio" ] } +aws-sdk-servicequotas = { version = "0.38", default-features = false, features = [ "rt-tokio" ] } +aws-smithy-http = { version = "0.59", default-features = false, features = [ "rt-tokio" ] } +aws-smithy-runtime-api = "0.101" +aws-smithy-runtime = { version = "0.101", features = ["connector-hyper-0-14-x"] } +hyper-tls = { version = "0.5.0" } + + futures = "0.3" hyper = { version = "0.14.27", features = ["http2"] } -hyper-tls = { version = "0.5.0" } kube = { workspace = true } kube-runtime = { workspace = true } native-tls = { version = "0.2.11", features = ["alpn"] } -opentelemetry = { version = "0.20", features = ["rt-tokio", "trace"] } -opentelemetry-otlp = { version = "0.13" } +opentelemetry = { version = "0.21", features = ["trace"] } +opentelemetry_sdk = { version = "0.21", features = ["trace", "rt-tokio"] } +opentelemetry-otlp = { version = "0.14" } serde = "1" serde_json = "1" thiserror = "1" @@ -33,7 +31,7 @@ tokio-native-tls = { version = "0.3.1" } tokio = { workspace = true } tonic = { version = "0.9.2", features = ["transport"] } tracing = "0.1" -tracing-opentelemetry = "0.20" +tracing-opentelemetry = "0.22" tracing-subscriber = { version = "0.3", features = [ "registry", "env-filter", diff --git a/eip_operator_shared/src/lib.rs b/eip_operator_shared/src/lib.rs index 8260497..44e7366 100644 --- a/eip_operator_shared/src/lib.rs +++ b/eip_operator_shared/src/lib.rs @@ -4,6 +4,9 @@ use std::net::AddrParseError; use std::str::FromStr; use std::time::Duration; +use aws_config::{BehaviorVersion, ConfigLoader}; +use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; + use aws_sdk_ec2::error::SdkError; use aws_sdk_ec2::operation::allocate_address::AllocateAddressError; use aws_sdk_ec2::operation::associate_address::AssociateAddressError; @@ -16,9 +19,9 @@ use aws_sdk_servicequotas::operation::get_service_quota::GetServiceQuotaError; use futures::Future; use hyper::client::HttpConnector; use hyper_tls::HttpsConnector; -use opentelemetry::sdk::trace::{Config, Sampler}; -use opentelemetry::sdk::Resource as OtelResource; use opentelemetry::KeyValue; +use opentelemetry_sdk::trace::{Config, Sampler}; +use opentelemetry_sdk::Resource as OtelResource; use tokio::time::error::Elapsed; use tonic::metadata::{MetadataKey, MetadataMap}; use tonic::transport::Endpoint; @@ -112,7 +115,7 @@ pub enum Error { #[error("AWS disassociate_address reported error: {source}")] AwsDisassociateAddress { #[from] - source: SdkError, + source: DisassociateAddressError, }, #[error("AWS release_address reported error: {source}")] AwsReleaseAddress { @@ -254,7 +257,7 @@ where )) .with_resource(otr), ) - .install_batch(opentelemetry::runtime::Tokio) + .install_batch(opentelemetry_sdk::runtime::Tokio) .unwrap(); let otel_layer = tracing_opentelemetry::layer() .with_tracer(tracer) @@ -276,3 +279,8 @@ where }; f().await } + +pub fn aws_config_loader_default() -> ConfigLoader { + aws_config::defaults(BehaviorVersion::latest()) + .http_client(HyperClientBuilder::new().build(HttpsConnector::new())) +} From 271f1c1296a5c6c21e606fdfda96aae528220feb Mon Sep 17 00:00:00 2001 From: Justin Bradfield Date: Sat, 23 Mar 2024 00:45:52 -0500 Subject: [PATCH 2/4] address pr comments 1 --- cilium_eip_no_masquerade_agent/src/main.rs | 8 +-- eip_operator/src/controller/eip.rs | 25 +++++---- eip_operator/src/controller/node.rs | 60 ++++++++++++---------- eip_operator/src/controller/pod.rs | 47 ++++++++++------- eip_operator/src/eip.rs | 49 +++++++++++++----- eip_operator/src/kube_ext.rs | 7 --- eip_operator/src/main.rs | 6 +-- eip_operator_shared/src/lib.rs | 8 +-- 8 files changed, 117 insertions(+), 93 deletions(-) diff --git a/cilium_eip_no_masquerade_agent/src/main.rs b/cilium_eip_no_masquerade_agent/src/main.rs index eab413b..aee258e 100644 --- a/cilium_eip_no_masquerade_agent/src/main.rs +++ b/cilium_eip_no_masquerade_agent/src/main.rs @@ -71,11 +71,7 @@ impl RuleManager { } async fn cleanup_legacy_per_pod_rules(&self, pod: &Pod) -> Result<(), Error> { - let pod_name = pod - .metadata - .name - .as_ref() - .ok_or(eip_operator_shared::Error::MissingPodName)?; + let pod_name = pod.name_unchecked(); // Assuming that if it doesn't have an IP during cleanup, that it never had one. if let Some(pod_ip_str) = &pod @@ -98,7 +94,7 @@ impl RuleManager { self.ip_rule_handle.del(rule).execute().await?; } } - self.remove_finalizer(pod, pod_name).await?; + self.remove_finalizer(pod, &pod_name).await?; Ok(()) } diff --git a/eip_operator/src/controller/eip.rs b/eip_operator/src/controller/eip.rs index 701e9c3..3d745c7 100644 --- a/eip_operator/src/controller/eip.rs +++ b/eip_operator/src/controller/eip.rs @@ -1,5 +1,5 @@ +use rand::Rng; use std::collections::HashMap; -use std::time::{SystemTime, UNIX_EPOCH}; use kube::api::{Api, PatchParams}; use kube::{Client, ResourceExt}; @@ -45,13 +45,13 @@ impl k8s_controller::Context for Context { ) -> Result, Self::Error> { let eip_api = Api::namespaced(client.clone(), &eip.namespace().unwrap()); - let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?; - let name = eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?; + let uid = eip.uid().ok_or(Error::MissingEipUid)?; + let name = eip.name_unchecked(); let selector = &eip.spec.selector; let addresses = crate::aws::describe_addresses_with_tag_value( &self.ec2_client, crate::aws::EIP_UID_TAG, - uid, + &uid, ) .await? .addresses @@ -60,8 +60,8 @@ impl k8s_controller::Context for Context { 0 => { let response = crate::aws::allocate_address( &self.ec2_client, - uid, - name, + &uid, + &name, selector, &self.cluster_name, &eip.namespace().unwrap(), @@ -87,11 +87,9 @@ impl k8s_controller::Context for Context { return Err(Error::MultipleEipsTaggedForPod); } }; - crate::eip::set_status_created(&eip_api, name, &allocation_id, &public_ip).await?; + crate::eip::set_status_created(&eip_api, &name, &allocation_id, &public_ip).await?; - if eip.status.as_ref().is_some_and(|s| s.resource_id.is_some()) { - // do nothing - } else { + if !eip.status.as_ref().is_some_and(|s| s.resource_id.is_some()) { let resource_api = eip.get_resource_api(&client); let matched_resources = resource_api .list(&eip.get_resource_list_params()) @@ -110,7 +108,8 @@ impl k8s_controller::Context for Context { let data = resource.clone().data(serde_json::json!({ "metadata": { "labels":{ - "eip.materialize.cloud/refresh": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string() + "eip.materialize.cloud/refresh": rand::thread_rng().gen::(), + } } })); @@ -133,11 +132,11 @@ impl k8s_controller::Context for Context { _client: Client, eip: &Self::Resource, ) -> Result, Self::Error> { - let uid = eip.metadata.uid.as_ref().ok_or(Error::MissingEipUid)?; + let uid = eip.uid().ok_or(Error::MissingEipUid)?; let addresses = crate::aws::describe_addresses_with_tag_value( &self.ec2_client, crate::aws::EIP_UID_TAG, - uid, + &uid, ) .await? .addresses; diff --git a/eip_operator/src/controller/node.rs b/eip_operator/src/controller/node.rs index ba2589b..09fed3a 100644 --- a/eip_operator/src/controller/node.rs +++ b/eip_operator/src/controller/node.rs @@ -2,7 +2,7 @@ use std::time::Duration; use k8s_openapi::api::core::v1::Node; use kube::api::{Api, ListParams}; -use kube::Client; +use kube::{Client, ResourceExt}; use kube_runtime::controller::Action; use tracing::{event, info, instrument, Level}; @@ -38,7 +38,7 @@ impl k8s_controller::Context for Context { client: Client, node: &Self::Resource, ) -> Result, Self::Error> { - let name = node.metadata.name.as_ref().ok_or(Error::MissingNodeName)?; + let name = node.name_unchecked(); event!(Level::INFO, name = %name, "Applying node."); let eip_api = Api::::namespaced( @@ -47,31 +47,33 @@ impl k8s_controller::Context for Context { ); let node_ip = node.ip().ok_or(Error::MissingNodeIp)?; - let node_labels = node.labels().ok_or(Error::MissingNodeLabels)?; + let node_labels = node.labels(); let provider_id = node.provider_id().ok_or(Error::MissingProviderId)?; let instance_id = provider_id .rsplit_once('/') .ok_or(Error::MalformedProviderId)? .1; - let all_eips: Vec = eip_api + let matched_eips: Vec = eip_api .list(&ListParams::default()) .await? .items .into_iter() .filter(|eip| eip.matches_node(node_labels)) .collect(); - if all_eips.is_empty() { + if matched_eips.is_empty() { return Err(Error::NoEipResourceWithThatNodeSelector); } - let eip = all_eips - .into_iter() - .find(|eip| eip.status.as_ref().is_some_and(|s| s.resource_id.is_none())); - if eip.is_none() { - info!("No un-associated eips found for node {}", name); + let eip = matched_eips.into_iter().find(|eip| { + eip.status.as_ref().is_some_and(|s| { + s.resource_id.is_none() + || s.resource_id.as_ref().map(|r| r == &name).unwrap_or(false) + }) + }); + let Some(eip) = eip else { + info!("No un-associated eips found for node {}", &name); return Ok(None); - } - let eip = eip.unwrap(); - let eip_name = eip.name().ok_or(Error::MissingEipName)?; + }; + let eip_name = eip.name_unchecked(); let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id) .await? @@ -86,10 +88,19 @@ impl k8s_controller::Context for Context { if eip_description.network_interface_id != Some(eni_id.to_owned()) || eip_description.private_ip_address != Some(node_ip.to_owned()) { - match crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, name).await { + match crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, &name).await { Ok(_) => { - info!("Found matching Eip, claiming it"); - crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, node_ip) + info!("Found matching Eip, attaching it"); + let association_id = crate::aws::associate_eip( + &self.ec2_client, + allocation_id, + &eni_id, + node_ip, + ) + .await? + .association_id + .ok_or(Error::MissingAssociationId)?; + crate::eip::set_status_association_id(&eip_api, &eip_name, &association_id) .await?; } Err(err) @@ -98,7 +109,7 @@ impl k8s_controller::Context for Context { .contains("Operation cannot be fulfilled on eips.materialize.cloud") => { info!( - "Pod {} failed to claim eip {}, rescheduling to try another", + "Node {} failed to claim eip {}, rescheduling to try another", name, eip_name ); return Ok(Some(Action::requeue(Duration::from_secs(1)))); @@ -119,10 +130,9 @@ impl k8s_controller::Context for Context { client.clone(), self.namespace.as_deref().unwrap_or("default"), ); - - let node_labels = node.labels().ok_or(Error::MissingNodeLabels)?; - let all_eips = eip_api.list(&ListParams::default()).await?.items; - let eip = all_eips + let node_labels = node.labels(); + let matched_eips = eip_api.list(&ListParams::default()).await?.items; + let eip = matched_eips .into_iter() .filter(|eip| eip.attached()) .find(|eip| { @@ -130,7 +140,7 @@ impl k8s_controller::Context for Context { && eip .status .as_ref() - .is_some_and(|s| s.resource_id == Some(node.metadata.name.clone().unwrap())) + .is_some_and(|s| s.resource_id == Some(node.name_unchecked().clone())) }); if let Some(eip) = eip { let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; @@ -143,11 +153,7 @@ impl k8s_controller::Context for Context { crate::aws::disassociate_eip(&self.ec2_client, &association_id).await?; } } - crate::eip::set_status_detached( - &eip_api, - eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?, - ) - .await?; + crate::eip::set_status_detached(&eip_api, &eip.name_unchecked()).await?; } Ok(None) } diff --git a/eip_operator/src/controller/pod.rs b/eip_operator/src/controller/pod.rs index caef868..6beae04 100644 --- a/eip_operator/src/controller/pod.rs +++ b/eip_operator/src/controller/pod.rs @@ -33,7 +33,7 @@ impl k8s_controller::Context for Context { client: Client, pod: &Self::Resource, ) -> Result, Self::Error> { - let name = pod.metadata.name.as_ref().ok_or(Error::MissingPodName)?; + let name = pod.name_unchecked(); let eip_api = Api::::namespaced(client.clone(), &pod.namespace().unwrap()); let pod_api = Api::::namespaced(client.clone(), &pod.namespace().unwrap()); @@ -41,7 +41,7 @@ impl k8s_controller::Context for Context { if should_autocreate_eip(pod) { event!(Level::INFO, should_autocreate_eip = true); - crate::eip::create_for_pod(&eip_api, name).await?; + crate::eip::create_for_pod(&eip_api, &name).await?; } let pod_ip = pod.ip().ok_or(Error::MissingPodIp)?; @@ -65,12 +65,18 @@ impl k8s_controller::Context for Context { .ok_or(Error::NoInterfaceWithThatIp)? } }; - - let all_eips = eip_api.list(&ListParams::default()).await?.items; - let eip = all_eips + let eips = eip_api + .list(&ListParams::default()) + .await? + .items .into_iter() - .find(|eip| eip.matches_pod(name)) - .ok_or_else(|| Error::NoEipResourceWithThatPodName(name.to_owned()))?; + .filter(|eip| eip.matches_pod(&name)) + .collect::>(); + let eip = match eips.len() { + 0 => return Err(Error::NoEipResourceWithThatPodName(name.clone())), + 1 => eips[0].clone(), + _ => return Err(Error::MultipleEipsTaggedForPod), + }; let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let eip_description = crate::aws::describe_address(&self.ec2_client, allocation_id) .await? @@ -78,13 +84,20 @@ impl k8s_controller::Context for Context { .ok_or(Error::MissingAddresses)? .swap_remove(0); let public_ip = eip_description.public_ip.ok_or(Error::MissingPublicIp)?; + // having multiple EIPs + crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, &name).await?; if eip_description.network_interface_id != Some(eni_id.to_owned()) || eip_description.private_ip_address != Some(pod_ip.to_owned()) { - crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, name).await?; - crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, pod_ip).await?; - add_dns_target_annotation(&pod_api, name, &public_ip, allocation_id).await?; + let association_id = + crate::aws::associate_eip(&self.ec2_client, allocation_id, &eni_id, pod_ip) + .await? + .association_id + .ok_or(Error::MissingAssociationId)?; + crate::eip::set_status_association_id(&eip_api, &eip.name_unchecked(), &association_id) + .await?; } + add_dns_target_annotation(&pod_api, &name, &public_ip, allocation_id).await?; Ok(None) } @@ -94,12 +107,12 @@ impl k8s_controller::Context for Context { client: Client, pod: &Self::Resource, ) -> Result, Self::Error> { - let name = pod.metadata.name.as_ref().ok_or(Error::MissingPodUid)?; + let name = pod.name_unchecked(); let eip_api = Api::::namespaced(client.clone(), &pod.namespace().unwrap()); - let all_eips = eip_api.list(&ListParams::default()).await?.items; - let eip = all_eips.into_iter().find(|eip| eip.matches_pod(name)); + let matched_eips = eip_api.list(&ListParams::default()).await?.items; + let eip = matched_eips.into_iter().find(|eip| eip.matches_pod(&name)); if let Some(eip) = eip { let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id) @@ -111,15 +124,11 @@ impl k8s_controller::Context for Context { crate::aws::disassociate_eip(&self.ec2_client, &association_id).await?; } } - crate::eip::set_status_detached( - &eip_api, - eip.metadata.name.as_ref().ok_or(Error::MissingEipName)?, - ) - .await?; + crate::eip::set_status_detached(&eip_api, &eip.name_unchecked()).await?; }; if should_autocreate_eip(pod) { event!(Level::INFO, should_autocreate_eip = true); - crate::eip::delete(&eip_api, name).await?; + crate::eip::delete(&eip_api, &name).await?; } Ok(None) } diff --git a/eip_operator/src/eip.rs b/eip_operator/src/eip.rs index ce5392f..1fb30a7 100644 --- a/eip_operator/src/eip.rs +++ b/eip_operator/src/eip.rs @@ -67,7 +67,7 @@ pub mod v2 { use kube::api::{Api, ListParams}; use kube::core::{DynamicObject, GroupVersionKind}; use kube::discovery::ApiResource; - use kube::{Client, CustomResource, Resource}; + use kube::{Client, CustomResource, Resource, ResourceExt}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -125,7 +125,8 @@ pub mod v2 { printcolumn = r#"{"name": "Selector", "type": "string", "description": "Selector for the pod or node to associate the EIP with.", "jsonPath": ".spec.selector", "priority": 1}"#, printcolumn = r#"{"name": "ENI", "type": "string", "description": "ID of the Elastic Network Interface of the pod.", "jsonPath": ".status.eni", "priority": 1}"#, printcolumn = r#"{"name": "PrivateIP", "type": "string", "description": "Private IP address of the pod.", "jsonPath": ".status.privateIpAddress", "priority": 1}"#, - printcolumn = r#"{"name": "ResourceId", "type": "string", "description": "ID of resource the EIP is attached to..", "jsonPath": ".status.resourceId", "priority": 1}"# + printcolumn = r#"{"name": "ResourceId", "type": "string", "description": "ID of resource the EIP is attached to.", "jsonPath": ".status.resourceId", "priority": 1}"#, + printcolumn = r#"{"name": "AssociationId", "type": "string", "description": "ID of the association for the attachment.", "jsonPath": ".status.resourceId", "priority": 1}"# )] pub struct EipSpec { pub selector: EipSelector, @@ -140,10 +141,6 @@ pub mod v2 { Api::::namespaced(k8s_client, namespace.unwrap_or("default")) } - pub fn name(&self) -> Option<&str> { - self.metadata.name.as_deref() - } - pub fn attached(&self) -> bool { self.status .as_ref() @@ -186,8 +183,12 @@ pub mod v2 { Pod::kind(&()).as_ref(), ); let api_resource = ApiResource::from_gvk(&gvk); - // Api::namespaced(client, &self.namespace().unwrap()) - Api::default_namespaced_with(client.clone(), &api_resource) + Api::namespaced_with( + client.clone(), + // eips are namespaced + &self.namespace().unwrap_or("default".to_owned()), + &api_resource, + ) } EipSelector::Node { selector: _ } => { let gvk = GroupVersionKind::gvk( @@ -196,9 +197,7 @@ pub mod v2 { Node::kind(&()).as_ref(), ); let api_resource = ApiResource::from_gvk(&gvk); - // Api::namespaced(client, &self.namespace().unwrap()) Api::all_with(client.clone(), &api_resource) - // Api::all(client), } } } @@ -235,9 +234,9 @@ pub mod v2 { fn try_from(eip_v1: &super::v1::LaxEip) -> Result { if let Some(pod_name) = &eip_v1.spec.pod_name { - let name = eip_v1.metadata.name.as_ref().ok_or(Error::MissingEipName)?; + let name = eip_v1.name_unchecked(); let mut eip = Self::new( - name, + &name, EipSpec { selector: EipSelector::Pod { pod_name: pod_name.to_string(), @@ -262,6 +261,7 @@ pub struct EipStatus { pub eni: Option, pub private_ip_address: Option, pub resource_id: Option, + pub association_id: Option, } /// Registers the Eip custom resource with Kubernetes, @@ -381,6 +381,30 @@ pub(crate) async fn set_status_created( result } +/// Sets the associationId field in the Eip status. +#[instrument(skip(api), err)] +pub(crate) async fn set_status_association_id( + api: &Api, + name: &str, + association_id: &str, +) -> Result { + event!(Level::INFO, "Updating status for created EIP."); + let patch = serde_json::json!({ + "apiVersion": Eip::version(), + "kind": "Eip", + "status": { + "associationId": association_id, + } + }); + let patch = Patch::Merge(&patch); + let params = PatchParams::default(); + let result = api.patch_status(name, ¶ms, &patch).await; + if result.is_ok() { + event!(Level::INFO, "Done updating status for created EIP."); + } + result +} + /// Sets the eni and privateIpAddress fields in the Eip status. #[instrument(skip(api), err)] pub(crate) async fn set_status_attached( @@ -419,6 +443,7 @@ pub(crate) async fn set_status_detached(api: &Api, name: &str) -> Result, "privateIpAddress": None::, "resourceId": None::, + "associationId": None::, } }); let patch = Patch::Merge(&patch); diff --git a/eip_operator/src/kube_ext.rs b/eip_operator/src/kube_ext.rs index 34491d0..bdc733b 100644 --- a/eip_operator/src/kube_ext.rs +++ b/eip_operator/src/kube_ext.rs @@ -1,10 +1,7 @@ -use std::collections::BTreeMap; - use k8s_openapi::api::core::v1::{Node, Pod}; pub(crate) trait NodeExt { fn ip(&self) -> Option<&str>; - fn labels(&self) -> Option<&BTreeMap>; fn provider_id(&self) -> Option<&str>; } @@ -21,10 +18,6 @@ impl NodeExt for Node { }) } - fn labels(&self) -> Option<&BTreeMap> { - self.metadata.labels.as_ref() - } - fn provider_id(&self) -> Option<&str> { self.spec .as_ref() diff --git a/eip_operator/src/main.rs b/eip_operator/src/main.rs index 22df5ea..9b42844 100644 --- a/eip_operator/src/main.rs +++ b/eip_operator/src/main.rs @@ -10,7 +10,7 @@ use json_patch::{PatchOperation, RemoveOperation, TestOperation}; use k8s_controller::Controller; use k8s_openapi::api::core::v1::Pod; use kube::api::{Api, ListParams, Patch, PatchParams}; -use kube::{Client, Resource, ResourceExt}; +use kube::{Client, ResourceExt}; use tokio::task; use tracing::{debug, event, info, instrument, Level}; @@ -226,11 +226,11 @@ async fn cleanup_orphan_eips( .iter() .position(|s| s == LEGACY_POD_FINALIZER_NAME) { - let pod_name = pod.meta().name.as_ref().ok_or(Error::MissingPodName)?; + let pod_name = pod.name_unchecked(); let finalizer_path = format!("/metadata/finalizers/{}", position); pod_api .patch::( - pod_name, + &pod_name, &PatchParams::default(), &Patch::Json(json_patch::Patch(vec![ PatchOperation::Test(TestOperation { diff --git a/eip_operator_shared/src/lib.rs b/eip_operator_shared/src/lib.rs index 44e7366..2bcadfb 100644 --- a/eip_operator_shared/src/lib.rs +++ b/eip_operator_shared/src/lib.rs @@ -58,12 +58,6 @@ pub enum Error { MissingEipStatus, #[error("EIP does not have a UID in its metadata.")] MissingEipUid, - #[error("EIP does not have a name in its metadata.")] - MissingEipName, - #[error("Pod does not have a UID in its metadata.")] - MissingPodUid, - #[error("Pod does not have a name in its metadata.")] - MissingPodName, #[error("Pod does not have an IP address.")] MissingPodIp, #[error("Node does not have an IP address.")] @@ -80,6 +74,8 @@ pub enum Error { MultipleEipsTaggedForPod, #[error("allocation_id was None.")] MissingAllocationId, + #[error("aassociation_id was None.")] + MissingAssociationId, #[error("public_ip was None.")] MissingPublicIp, #[error("DescribeInstancesResult.reservations was None.")] From b3bd1b0e3d2b034eb16c09e677379d6b018e6a94 Mon Sep 17 00:00:00 2001 From: Justin Bradfield Date: Mon, 25 Mar 2024 13:16:48 -0500 Subject: [PATCH 3/4] cover more edge cases and rename some fns --- eip_operator/src/controller/eip.rs | 16 +++++++++--- eip_operator/src/controller/node.rs | 39 ++++++++++++++--------------- eip_operator/src/controller/pod.rs | 16 ++++++++---- eip_operator/src/eip.rs | 16 ++++-------- eip_operator_shared/src/lib.rs | 2 +- 5 files changed, 48 insertions(+), 41 deletions(-) diff --git a/eip_operator/src/controller/eip.rs b/eip_operator/src/controller/eip.rs index 3d745c7..af7894c 100644 --- a/eip_operator/src/controller/eip.rs +++ b/eip_operator/src/controller/eip.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use kube::api::{Api, PatchParams}; use kube::{Client, ResourceExt}; use kube_runtime::controller::Action; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; use eip_operator_shared::Error; @@ -108,18 +108,26 @@ impl k8s_controller::Context for Context { let data = resource.clone().data(serde_json::json!({ "metadata": { "labels":{ - "eip.materialize.cloud/refresh": rand::thread_rng().gen::(), + "eip.materialize.cloud/refresh": format!("{}",rand::thread_rng().gen::()), } } })); - resource_api + if let Err(err) = resource_api .patch_metadata( &resource.name_unchecked(), &PatchParams::default(), &kube::core::params::Patch::Merge(serde_json::json!(data)), ) - .await?; + .await + { + warn!( + "Failed to patch resource {} refresh label for {}: err {:?}", + resource.name_unchecked(), + name, + err + ); + }; } } diff --git a/eip_operator/src/controller/node.rs b/eip_operator/src/controller/node.rs index 09fed3a..fcf23d1 100644 --- a/eip_operator/src/controller/node.rs +++ b/eip_operator/src/controller/node.rs @@ -2,9 +2,10 @@ use std::time::Duration; use k8s_openapi::api::core::v1::Node; use kube::api::{Api, ListParams}; +use kube::error::ErrorResponse; use kube::{Client, ResourceExt}; use kube_runtime::controller::Action; -use tracing::{event, info, instrument, Level}; +use tracing::{event, info, instrument, warn, Level}; use eip_operator_shared::Error; @@ -88,7 +89,9 @@ impl k8s_controller::Context for Context { if eip_description.network_interface_id != Some(eni_id.to_owned()) || eip_description.private_ip_address != Some(node_ip.to_owned()) { - match crate::eip::set_status_attached(&eip_api, &eip, &eni_id, node_ip, &name).await { + match crate::eip::set_status_should_attach(&eip_api, &eip, &eni_id, node_ip, &name) + .await + { Ok(_) => { info!("Found matching Eip, attaching it"); let association_id = crate::aws::associate_eip( @@ -103,16 +106,14 @@ impl k8s_controller::Context for Context { crate::eip::set_status_association_id(&eip_api, &eip_name, &association_id) .await?; } - Err(err) - if err - .to_string() - .contains("Operation cannot be fulfilled on eips.materialize.cloud") => - { - info!( + Err(Error::Kube { + source: kube::Error::Api(ErrorResponse { reason, .. }), + }) if reason == "Conflict" => { + warn!( "Node {} failed to claim eip {}, rescheduling to try another", name, eip_name ); - return Ok(Some(Action::requeue(Duration::from_secs(1)))); + return Ok(Some(Action::requeue(Duration::from_secs(3)))); } Err(e) => return Err(e), }; @@ -132,17 +133,15 @@ impl k8s_controller::Context for Context { ); let node_labels = node.labels(); let matched_eips = eip_api.list(&ListParams::default()).await?.items; - let eip = matched_eips - .into_iter() - .filter(|eip| eip.attached()) - .find(|eip| { - eip.matches_node(node_labels) - && eip - .status - .as_ref() - .is_some_and(|s| s.resource_id == Some(node.name_unchecked().clone())) - }); - if let Some(eip) = eip { + // find all eips that match (there should be one, but lets not lean on that) + let eips = matched_eips.into_iter().filter(|eip| { + eip.matches_node(node_labels) + && eip + .status + .as_ref() + .is_some_and(|s| s.resource_id == Some(node.name_unchecked().clone())) + }); + for eip in eips { let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id) .await? diff --git a/eip_operator/src/controller/pod.rs b/eip_operator/src/controller/pod.rs index 6beae04..c240893 100644 --- a/eip_operator/src/controller/pod.rs +++ b/eip_operator/src/controller/pod.rs @@ -1,9 +1,11 @@ +use std::time::Duration; + use k8s_openapi::api::core::v1::{Node, Pod}; use kube::api::{Api, ListParams, Patch, PatchParams}; use kube::{Client, ResourceExt}; use kube_runtime::controller::Action; use serde::Deserialize; -use tracing::{event, instrument, Level}; +use tracing::{event, instrument, warn, Level}; use eip_operator_shared::Error; @@ -44,9 +46,14 @@ impl k8s_controller::Context for Context { crate::eip::create_for_pod(&eip_api, &name).await?; } + let node_name = match pod.node_name() { + Some(node_name) => node_name, + None => { + warn!("Pod {} is not yet scheduled", name); + return Ok(Some(Action::requeue(Duration::from_secs(3)))); + } + }; let pod_ip = pod.ip().ok_or(Error::MissingPodIp)?; - let node_name = pod.node_name().ok_or(Error::MissingNodeName)?; - let node = node_api.get(node_name).await?; let provider_id = node.provider_id().ok_or(Error::MissingProviderId)?; @@ -84,8 +91,7 @@ impl k8s_controller::Context for Context { .ok_or(Error::MissingAddresses)? .swap_remove(0); let public_ip = eip_description.public_ip.ok_or(Error::MissingPublicIp)?; - // having multiple EIPs - crate::eip::set_status_attached(&eip_api, &eip, &eni_id, pod_ip, &name).await?; + crate::eip::set_status_should_attach(&eip_api, &eip, &eni_id, pod_ip, &name).await?; if eip_description.network_interface_id != Some(eni_id.to_owned()) || eip_description.private_ip_address != Some(pod_ip.to_owned()) { diff --git a/eip_operator/src/eip.rs b/eip_operator/src/eip.rs index 1fb30a7..7c5d443 100644 --- a/eip_operator/src/eip.rs +++ b/eip_operator/src/eip.rs @@ -141,12 +141,6 @@ pub mod v2 { Api::::namespaced(k8s_client, namespace.unwrap_or("default")) } - pub fn attached(&self) -> bool { - self.status - .as_ref() - .map_or(false, |status| status.private_ip_address.is_some()) - } - pub fn matches_pod(&self, pod_name: &str) -> bool { match self.spec.selector { EipSelector::Pod { @@ -388,7 +382,7 @@ pub(crate) async fn set_status_association_id( name: &str, association_id: &str, ) -> Result { - event!(Level::INFO, "Updating status for created EIP."); + event!(Level::INFO, "Updating status for assocaited EIP."); let patch = serde_json::json!({ "apiVersion": Eip::version(), "kind": "Eip", @@ -400,21 +394,21 @@ pub(crate) async fn set_status_association_id( let params = PatchParams::default(); let result = api.patch_status(name, ¶ms, &patch).await; if result.is_ok() { - event!(Level::INFO, "Done updating status for created EIP."); + event!(Level::INFO, "Done updating status for assocaited EIP."); } result } /// Sets the eni and privateIpAddress fields in the Eip status. #[instrument(skip(api), err)] -pub(crate) async fn set_status_attached( +pub(crate) async fn set_status_should_attach( api: &Api, eip: &Eip, eni: &str, private_ip_address: &str, resource_id: &str, ) -> Result { - event!(Level::INFO, "Updating status for attached EIP."); + event!(Level::INFO, "Updating status for attaching EIP."); let mut eip = eip.clone(); let status = eip.status.as_mut().ok_or(Error::MissingEipStatus)?; status.eni = Some(eni.to_owned()); @@ -428,7 +422,7 @@ pub(crate) async fn set_status_attached( ) .await?; // let result = api.patch_status(name, ¶ms, &patch).await; - event!(Level::INFO, "Done updating status for attached EIP."); + event!(Level::INFO, "Done updating status for attaching EIP."); Ok(result) } diff --git a/eip_operator_shared/src/lib.rs b/eip_operator_shared/src/lib.rs index 2bcadfb..90eafe5 100644 --- a/eip_operator_shared/src/lib.rs +++ b/eip_operator_shared/src/lib.rs @@ -50,7 +50,7 @@ pub enum Error { #[from] source: kube_runtime::wait::Error, }, - #[error("No EIP found with that podName.")] + #[error("No EIP found with that podName `{0}`.")] NoEipResourceWithThatPodName(String), #[error("No EIP found with that node selector.")] NoEipResourceWithThatNodeSelector, From ed7cbc9b762e7ea8be32f04c972847db0567c25c Mon Sep 17 00:00:00 2001 From: Justin Bradfield Date: Tue, 26 Mar 2024 22:04:59 -0500 Subject: [PATCH 4/4] small cleanups --- eip_operator/src/controller/node.rs | 6 +++--- eip_operator/src/controller/pod.rs | 4 ++-- eip_operator/src/eip.rs | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/eip_operator/src/controller/node.rs b/eip_operator/src/controller/node.rs index fcf23d1..3de51cd 100644 --- a/eip_operator/src/controller/node.rs +++ b/eip_operator/src/controller/node.rs @@ -132,16 +132,16 @@ impl k8s_controller::Context for Context { self.namespace.as_deref().unwrap_or("default"), ); let node_labels = node.labels(); - let matched_eips = eip_api.list(&ListParams::default()).await?.items; + let all_eips = eip_api.list(&ListParams::default()).await?.items; // find all eips that match (there should be one, but lets not lean on that) - let eips = matched_eips.into_iter().filter(|eip| { + let matched_eips = all_eips.into_iter().filter(|eip| { eip.matches_node(node_labels) && eip .status .as_ref() .is_some_and(|s| s.resource_id == Some(node.name_unchecked().clone())) }); - for eip in eips { + for eip in matched_eips { let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id) .await? diff --git a/eip_operator/src/controller/pod.rs b/eip_operator/src/controller/pod.rs index c240893..0b4bb5a 100644 --- a/eip_operator/src/controller/pod.rs +++ b/eip_operator/src/controller/pod.rs @@ -117,8 +117,8 @@ impl k8s_controller::Context for Context { let eip_api = Api::::namespaced(client.clone(), &pod.namespace().unwrap()); - let matched_eips = eip_api.list(&ListParams::default()).await?.items; - let eip = matched_eips.into_iter().find(|eip| eip.matches_pod(&name)); + let all_eips = eip_api.list(&ListParams::default()).await?.items; + let eip = all_eips.into_iter().find(|eip| eip.matches_pod(&name)); if let Some(eip) = eip { let allocation_id = eip.allocation_id().ok_or(Error::MissingAllocationId)?; let addresses = crate::aws::describe_address(&self.ec2_client, allocation_id) diff --git a/eip_operator/src/eip.rs b/eip_operator/src/eip.rs index 7c5d443..ba92ff7 100644 --- a/eip_operator/src/eip.rs +++ b/eip_operator/src/eip.rs @@ -394,7 +394,7 @@ pub(crate) async fn set_status_association_id( let params = PatchParams::default(); let result = api.patch_status(name, ¶ms, &patch).await; if result.is_ok() { - event!(Level::INFO, "Done updating status for assocaited EIP."); + event!(Level::INFO, "Done updating status for associated EIP."); } result } @@ -408,7 +408,7 @@ pub(crate) async fn set_status_should_attach( private_ip_address: &str, resource_id: &str, ) -> Result { - event!(Level::INFO, "Updating status for attaching EIP."); + event!(Level::INFO, "Updating status before attaching EIP."); let mut eip = eip.clone(); let status = eip.status.as_mut().ok_or(Error::MissingEipStatus)?; status.eni = Some(eni.to_owned()); @@ -422,7 +422,7 @@ pub(crate) async fn set_status_should_attach( ) .await?; // let result = api.patch_status(name, ¶ms, &patch).await; - event!(Level::INFO, "Done updating status for attaching EIP."); + event!(Level::INFO, "Done updating status before attaching EIP."); Ok(result) }