Skip to content

Commit a56afee

Browse files
authored
Accept primary compute spec in /promote, promotion corner cases testing (#12574)
https://github.com/neondatabase/cloud/issues/19011 - Accept `ComputeSpec` in `/promote` instead of just passing safekeepers and LSN. Update API spec - Add corner case tests for promotion when promotion or perwarm fails (using failpoints) - Print root error for prewarm and promotion in status handlers
1 parent 9e6ca29 commit a56afee

File tree

10 files changed

+242
-86
lines changed

10 files changed

+242
-86
lines changed

compute_tools/src/compute_prewarm.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ impl ComputeNode {
9090
}
9191

9292
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
93+
/// Has a failpoint "compute-prewarm"
9394
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
9495
{
9596
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -112,9 +113,8 @@ impl ComputeNode {
112113
Err(err) => {
113114
crate::metrics::LFC_PREWARM_ERRORS.inc();
114115
error!(%err, "could not prewarm LFC");
115-
116116
LfcPrewarmState::Failed {
117-
error: err.to_string(),
117+
error: format!("{err:#}"),
118118
}
119119
}
120120
};
@@ -135,16 +135,20 @@ impl ComputeNode {
135135
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
136136
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
137137

138+
#[cfg(feature = "testing")]
139+
fail::fail_point!("compute-prewarm", |_| {
140+
bail!("prewarm configured to fail because of a failpoint")
141+
});
142+
138143
info!(%url, "requesting LFC state from endpoint storage");
139144
let request = Client::new().get(&url).bearer_auth(token);
140145
let res = request.send().await.context("querying endpoint storage")?;
141-
let status = res.status();
142-
match status {
146+
match res.status() {
143147
StatusCode::OK => (),
144148
StatusCode::NOT_FOUND => {
145149
return Ok(false);
146150
}
147-
_ => bail!("{status} querying endpoint storage"),
151+
status => bail!("{status} querying endpoint storage"),
148152
}
149153

150154
let mut uncompressed = Vec::new();
@@ -205,24 +209,30 @@ impl ComputeNode {
205209
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
206210
error!(%err, "could not offload LFC state to endpoint storage");
207211
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
208-
error: err.to_string(),
212+
error: format!("{err:#}"),
209213
};
210214
}
211215

212216
async fn offload_lfc_impl(&self) -> Result<()> {
213217
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
214218
info!(%url, "requesting LFC state from Postgres");
215219

216-
let mut compressed = Vec::new();
217-
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
220+
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
218221
.await
219222
.context("connecting to postgres")?
220223
.query_one("select neon.get_local_cache_state()", &[])
221224
.await
222-
.context("querying LFC state")?
223-
.try_get::<usize, &[u8]>(0)
224-
.context("deserializing LFC state")
225-
.map(ZstdEncoder::new)?
225+
.context("querying LFC state")?;
226+
let state = row
227+
.try_get::<usize, Option<&[u8]>>(0)
228+
.context("deserializing LFC state")?;
229+
let Some(state) = state else {
230+
info!(%url, "empty LFC state, not exporting");
231+
return Ok(());
232+
};
233+
234+
let mut compressed = Vec::new();
235+
ZstdEncoder::new(state)
226236
.read_to_end(&mut compressed)
227237
.await
228238
.context("compressing LFC state")?;

compute_tools/src/compute_promote.rs

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,35 @@
11
use crate::compute::ComputeNode;
22
use anyhow::{Context, Result, bail};
3-
use compute_api::{
4-
responses::{LfcPrewarmState, PromoteState, SafekeepersLsn},
5-
spec::ComputeMode,
6-
};
3+
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
4+
use compute_api::spec::ComputeMode;
5+
use itertools::Itertools;
6+
use std::collections::HashMap;
77
use std::{sync::Arc, time::Duration};
88
use tokio::time::sleep;
9+
use tracing::info;
910
use utils::lsn::Lsn;
1011

1112
impl ComputeNode {
1213
/// Returns only when promote fails or succeeds. If a network error occurs
1314
/// and http client disconnects, this does not stop promotion, and subsequent
1415
/// calls block until promote finishes.
1516
/// Called by control plane on secondary after primary endpoint is terminated
16-
pub async fn promote(self: &Arc<Self>, safekeepers_lsn: SafekeepersLsn) -> PromoteState {
17+
/// Has a failpoint "compute-promotion"
18+
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
1719
let cloned = self.clone();
20+
let promote_fn = async move || {
21+
let Err(err) = cloned.promote_impl(cfg).await else {
22+
return PromoteState::Completed;
23+
};
24+
tracing::error!(%err, "promoting");
25+
PromoteState::Failed {
26+
error: format!("{err:#}"),
27+
}
28+
};
29+
1830
let start_promotion = || {
1931
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
20-
tokio::spawn(async move {
21-
tx.send(match cloned.promote_impl(safekeepers_lsn).await {
22-
Ok(_) => PromoteState::Completed,
23-
Err(err) => {
24-
tracing::error!(%err, "promoting");
25-
PromoteState::Failed {
26-
error: err.to_string(),
27-
}
28-
}
29-
})
30-
});
32+
tokio::spawn(async move { tx.send(promote_fn().await) });
3133
rx
3234
};
3335

@@ -47,9 +49,7 @@ impl ComputeNode {
4749
task.borrow().clone()
4850
}
4951

50-
// Why do we have to supply safekeepers?
51-
// For secondary we use primary_connection_conninfo so safekeepers field is empty
52-
async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> {
52+
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
5353
{
5454
let state = self.state.lock().unwrap();
5555
let mode = &state.pspec.as_ref().unwrap().spec.mode;
@@ -73,7 +73,7 @@ impl ComputeNode {
7373
.await
7474
.context("connecting to postgres")?;
7575

76-
let primary_lsn = safekeepers_lsn.wal_flush_lsn;
76+
let primary_lsn = cfg.wal_flush_lsn;
7777
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
7878
const RETRIES: i32 = 20;
7979
for i in 0..=RETRIES {
@@ -86,7 +86,7 @@ impl ComputeNode {
8686
if last_wal_replay_lsn >= primary_lsn {
8787
break;
8888
}
89-
tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
89+
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
9090
sleep(Duration::from_secs(1)).await;
9191
}
9292
if last_wal_replay_lsn < primary_lsn {
@@ -96,7 +96,7 @@ impl ComputeNode {
9696
// using $1 doesn't work with ALTER SYSTEM SET
9797
let safekeepers_sql = format!(
9898
"ALTER SYSTEM SET neon.safekeepers='{}'",
99-
safekeepers_lsn.safekeepers
99+
cfg.spec.safekeeper_connstrings.join(",")
100100
);
101101
client
102102
.query(&safekeepers_sql, &[])
@@ -106,6 +106,12 @@ impl ComputeNode {
106106
.query("SELECT pg_reload_conf()", &[])
107107
.await
108108
.context("reloading postgres config")?;
109+
110+
#[cfg(feature = "testing")]
111+
fail::fail_point!("compute-promotion", |_| {
112+
bail!("promotion configured to fail because of a failpoint")
113+
});
114+
109115
let row = client
110116
.query_one("SELECT * FROM pg_promote()", &[])
111117
.await
@@ -125,8 +131,36 @@ impl ComputeNode {
125131
bail!("replica in read only mode after promotion");
126132
}
127133

128-
let mut state = self.state.lock().unwrap();
129-
state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary;
130-
Ok(())
134+
{
135+
let mut state = self.state.lock().unwrap();
136+
let spec = &mut state.pspec.as_mut().unwrap().spec;
137+
spec.mode = ComputeMode::Primary;
138+
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
139+
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
140+
Self::merge_spec(new_conf, existing_conf);
141+
}
142+
info!("applied new spec, reconfiguring as primary");
143+
self.reconfigure()
144+
}
145+
146+
/// Merge old and new Postgres conf specs to apply on secondary.
147+
/// Change new spec's port and safekeepers since they are supplied
148+
/// differenly
149+
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
150+
let mut new_conf_set: HashMap<&str, &str> = new_conf
151+
.split_terminator('\n')
152+
.map(|e| e.split_once("=").expect("invalid item"))
153+
.collect();
154+
new_conf_set.remove("neon.safekeepers");
155+
156+
let existing_conf_set: HashMap<&str, &str> = existing_conf
157+
.split_terminator('\n')
158+
.map(|e| e.split_once("=").expect("invalid item"))
159+
.collect();
160+
new_conf_set.insert("port", existing_conf_set["port"]);
161+
*new_conf = new_conf_set
162+
.iter()
163+
.map(|(k, v)| format!("{k}={v}"))
164+
.join("\n");
131165
}
132166
}

compute_tools/src/http/openapi_spec.yaml

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ paths:
9696
content:
9797
application/json:
9898
schema:
99-
$ref: "#/components/schemas/SafekeepersLsn"
99+
$ref: "#/components/schemas/ComputeSchemaWithLsn"
100100
responses:
101101
200:
102102
description: Promote succeeded or wasn't started
@@ -297,14 +297,7 @@ paths:
297297
content:
298298
application/json:
299299
schema:
300-
type: object
301-
required:
302-
- spec
303-
properties:
304-
spec:
305-
# XXX: I don't want to explain current spec in the OpenAPI format,
306-
# as it could be changed really soon. Consider doing it later.
307-
type: object
300+
$ref: "#/components/schemas/ComputeSchema"
308301
responses:
309302
200:
310303
description: Compute configuration finished.
@@ -591,18 +584,25 @@ components:
591584
type: string
592585
example: "1.0.0"
593586

594-
SafekeepersLsn:
587+
ComputeSchema:
595588
type: object
596589
required:
597-
- safekeepers
590+
- spec
591+
properties:
592+
spec:
593+
type: object
594+
ComputeSchemaWithLsn:
595+
type: object
596+
required:
597+
- spec
598598
- wal_flush_lsn
599599
properties:
600-
safekeepers:
601-
description: Primary replica safekeepers
602-
type: string
600+
spec:
601+
$ref: "#/components/schemas/ComputeState"
603602
wal_flush_lsn:
604-
description: Primary last WAL flush LSN
605603
type: string
604+
description: "last WAL flush LSN"
605+
example: "0/028F10D8"
606606

607607
LfcPrewarmState:
608608
type: object
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use crate::http::JsonResponse;
2-
use axum::Form;
2+
use axum::extract::Json;
33
use http::StatusCode;
44

55
pub(in crate::http) async fn promote(
66
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
7-
Form(safekeepers_lsn): Form<compute_api::responses::SafekeepersLsn>,
7+
Json(cfg): Json<compute_api::responses::PromoteConfig>,
88
) -> axum::response::Response {
9-
let state = compute.promote(safekeepers_lsn).await;
10-
if let compute_api::responses::PromoteState::Failed { error } = state {
11-
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, error);
9+
let state = compute.promote(cfg).await;
10+
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
11+
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);
1212
}
1313
JsonResponse::success(StatusCode::OK, state)
1414
}

control_plane/src/bin/neon_local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1517,7 +1517,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
15171517
let endpoint = cplane
15181518
.endpoints
15191519
.get(endpoint_id.as_str())
1520-
.ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
1520+
.ok_or_else(|| anyhow!("endpoint {endpoint_id} not found"))?;
15211521

15221522
if !args.allow_multiple {
15231523
cplane.check_conflicting_endpoints(

libs/compute_api/src/responses.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,10 @@ pub enum PromoteState {
108108
Failed { error: String },
109109
}
110110

111-
#[derive(Deserialize, Serialize, Default, Debug, Clone)]
111+
#[derive(Deserialize, Default, Debug)]
112112
#[serde(rename_all = "snake_case")]
113-
/// Result of /safekeepers_lsn
114-
pub struct SafekeepersLsn {
115-
pub safekeepers: String,
113+
pub struct PromoteConfig {
114+
pub spec: ComputeSpec,
116115
pub wal_flush_lsn: utils::lsn::Lsn,
117116
}
118117

test_runner/fixtures/endpoint/http.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,10 @@ def prewarm_lfc_wait(self):
8787
def prewarmed():
8888
json = self.prewarm_lfc_status()
8989
status, err = json["status"], json.get("error")
90-
assert status == "completed", f"{status}, {err=}"
90+
assert status in ["failed", "completed", "skipped"], f"{status}, {err=}"
9191

9292
wait_until(prewarmed, timeout=60)
93+
assert self.prewarm_lfc_status()["status"] != "failed"
9394

9495
def offload_lfc_status(self) -> dict[str, str]:
9596
res = self.get(self.offload_url)
@@ -105,19 +106,19 @@ def offload_lfc_wait(self):
105106
def offloaded():
106107
json = self.offload_lfc_status()
107108
status, err = json["status"], json.get("error")
108-
assert status == "completed", f"{status}, {err=}"
109+
assert status in ["failed", "completed"], f"{status}, {err=}"
109110

110111
wait_until(offloaded)
112+
assert self.offload_lfc_status()["status"] != "failed"
111113

112-
def promote(self, safekeepers_lsn: dict[str, Any], disconnect: bool = False):
114+
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
113115
url = f"http://localhost:{self.external_port}/promote"
114116
if disconnect:
115117
try: # send first request to start promote and disconnect
116-
self.post(url, data=safekeepers_lsn, timeout=0.001)
118+
self.post(url, json=promote_spec, timeout=0.001)
117119
except ReadTimeout:
118120
pass # wait on second request which returns on promotion finish
119-
res = self.post(url, data=safekeepers_lsn)
120-
res.raise_for_status()
121+
res = self.post(url, json=promote_spec)
121122
json: dict[str, str] = res.json()
122123
return json
123124

test_runner/fixtures/neon_fixtures.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4794,9 +4794,10 @@ def create(
47944794
m = re.search(r"=\s*(\S+)", line)
47954795
assert m is not None, f"malformed config line {line}"
47964796
size = m.group(1)
4797-
assert size_to_bytes(size) >= size_to_bytes("1MB"), (
4798-
"LFC size cannot be set less than 1MB"
4799-
)
4797+
if size_to_bytes(size) > 0:
4798+
assert size_to_bytes(size) >= size_to_bytes("1MB"), (
4799+
"LFC size cannot be set less than 1MB"
4800+
)
48004801
lfc_path_escaped = str(lfc_path).replace("'", "''")
48014802
config_lines = [
48024803
f"neon.file_cache_path = '{lfc_path_escaped}'",
@@ -4951,6 +4952,10 @@ def respec(self, **kwargs: Any) -> None:
49514952
log.debug(json.dumps(dict(data_dict, **kwargs)))
49524953
json.dump(dict(data_dict, **kwargs), file, indent=4)
49534954

4955+
def get_compute_spec(self) -> dict[str, Any]:
4956+
out = json.loads((Path(self.endpoint_path()) / "config.json").read_text())["spec"]
4957+
return cast("dict[str, Any]", out)
4958+
49544959
def respec_deep(self, **kwargs: Any) -> None:
49554960
"""
49564961
Update the endpoint.json file taking into account nested keys.

0 commit comments

Comments
 (0)