Skip to content

Commit e4245de

Browse files
authored
Merge pull request #791 from etehtsea/enh-pg-out-example
feat(outbound-pg): Reuse connection during request lifecycle
2 parents 8a2a6d3 + ebf24c5 commit e4245de

File tree

6 files changed

+159
-37
lines changed

6 files changed

+159
-37
lines changed

crates/outbound-pg/src/lib.rs

+33-20
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use anyhow::anyhow;
22
use outbound_pg::*;
3+
use std::collections::HashMap;
34
use tokio_postgres::{
4-
tls::NoTlsStream,
55
types::{ToSql, Type},
6-
Connection, NoTls, Row, Socket,
6+
Client, NoTls, Row,
77
};
88

99
pub use outbound_pg::add_to_linker;
@@ -16,8 +16,9 @@ use wit_bindgen_wasmtime::{async_trait, wasmtime::Linker};
1616
wit_bindgen_wasmtime::export!({paths: ["../../wit/ephemeral/outbound-pg.wit"], async: *});
1717

1818
/// A simple implementation to support outbound pg connection
19-
#[derive(Default, Clone)]
20-
pub struct OutboundPg;
19+
pub struct OutboundPg {
20+
pub connections: HashMap<String, Client>,
21+
}
2122

2223
impl HostComponent for OutboundPg {
2324
type State = Self;
@@ -33,7 +34,9 @@ impl HostComponent for OutboundPg {
3334
&self,
3435
_component: &spin_manifest::CoreComponent,
3536
) -> anyhow::Result<Self::State> {
36-
Ok(Self)
37+
let connections = std::collections::HashMap::new();
38+
39+
Ok(Self { connections })
3740
}
3841
}
3942

@@ -45,19 +48,16 @@ impl outbound_pg::OutboundPg for OutboundPg {
4548
statement: &str,
4649
params: Vec<ParameterValue<'_>>,
4750
) -> Result<u64, PgError> {
48-
let (client, connection) = tokio_postgres::connect(address, NoTls)
49-
.await
50-
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
51-
52-
spawn(connection);
53-
5451
let params: Vec<&(dyn ToSql + Sync)> = params
5552
.iter()
5653
.map(to_sql_parameter)
5754
.collect::<anyhow::Result<Vec<_>>>()
5855
.map_err(|e| PgError::ValueConversionFailed(format!("{:?}", e)))?;
5956

60-
let nrow = client
57+
let nrow = self
58+
.get_client(address)
59+
.await
60+
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?
6161
.execute(statement, params.as_slice())
6262
.await
6363
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
@@ -71,19 +71,16 @@ impl outbound_pg::OutboundPg for OutboundPg {
7171
statement: &str,
7272
params: Vec<ParameterValue<'_>>,
7373
) -> Result<RowSet, PgError> {
74-
let (client, connection) = tokio_postgres::connect(address, NoTls)
75-
.await
76-
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
77-
78-
spawn(connection);
79-
8074
let params: Vec<&(dyn ToSql + Sync)> = params
8175
.iter()
8276
.map(to_sql_parameter)
8377
.collect::<anyhow::Result<Vec<_>>>()
8478
.map_err(|e| PgError::BadParameter(format!("{:?}", e)))?;
8579

86-
let results = client
80+
let results = self
81+
.get_client(address)
82+
.await
83+
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?
8784
.query(statement, params.as_slice())
8885
.await
8986
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
@@ -246,10 +243,26 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Err
246243
Ok(value)
247244
}
248245

249-
fn spawn(connection: Connection<Socket, NoTlsStream>) {
246+
impl OutboundPg {
247+
async fn get_client(&mut self, address: &str) -> anyhow::Result<&Client> {
248+
let client = match self.connections.entry(address.to_owned()) {
249+
std::collections::hash_map::Entry::Occupied(o) => o.into_mut(),
250+
std::collections::hash_map::Entry::Vacant(v) => v.insert(build_client(address).await?),
251+
};
252+
Ok(client)
253+
}
254+
}
255+
256+
async fn build_client(address: &str) -> anyhow::Result<Client> {
257+
tracing::log::debug!("Build new connection: {}", address);
258+
259+
let (client, connection) = tokio_postgres::connect(address, NoTls).await?;
260+
250261
tokio::spawn(async move {
251262
if let Err(e) = connection.await {
252263
tracing::warn!("Postgres connection error: {}", e);
253264
}
254265
});
266+
267+
Ok(client)
255268
}

crates/trigger/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,9 @@ pub fn add_default_host_components<T: Default + Send + 'static>(
138138
builder.add_host_component(outbound_redis::OutboundRedis {
139139
connections: Arc::new(RwLock::new(HashMap::new())),
140140
})?;
141-
builder.add_host_component(outbound_pg::OutboundPg)?;
141+
builder.add_host_component(outbound_pg::OutboundPg {
142+
connections: HashMap::new(),
143+
})?;
142144
Ok(())
143145
}
144146

examples/rust-outbound-pg/README.md

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Spin Outbound PostgreSQL example
2+
3+
This example shows how to access a PostgreSQL database from Spin component.
4+
5+
## Spin up
6+
7+
From example root:
8+
9+
```
10+
createdb spin_dev
11+
psql -d spin_dev -f db/testdata.sql
12+
RUST_LOG=spin=trace spin build --up
13+
```
14+
15+
Curl the read route:
16+
17+
```
18+
$ curl -i localhost:3000/read
19+
HTTP/1.1 200 OK
20+
content-length: 501
21+
date: Sun, 25 Sep 2022 15:45:02 GMT
22+
23+
Found 2 article(s) as follows:
24+
article: Article {
25+
id: 1,
26+
title: "My Life as a Goat",
27+
content: "I went to Nepal to live as a goat, and it was much better than being a butler.",
28+
authorname: "E. Blackadder",
29+
}
30+
article: Article {
31+
id: 2,
32+
title: "Magnificent Octopus",
33+
content: "Once upon a time there was a lovely little sausage.",
34+
authorname: "S. Baldrick",
35+
}
36+
37+
(Column info: id:DbDataType::Int32, title:DbDataType::Str, content:DbDataType::Str, authorname:DbDataType::Str)
38+
```
39+
40+
Curl the write route:
41+
42+
```
43+
$ curl -i localhost:3000/write
44+
HTTP/1.1 200 OK
45+
content-length: 9
46+
date: Sun, 25 Sep 2022 15:46:22 GMT
47+
48+
Count: 3
49+
```
+15-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
CREATE TABLE articletest (
2-
id integer not null,
3-
title varchar(40) not null,
4-
content varchar(8000) not null,
5-
authorname varchar(40) not null
2+
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
3+
title varchar(40) NOT NULL,
4+
content text NOT NULL,
5+
authorname varchar(40) NOT NULL
66
);
77

8-
INSERT INTO articletest VALUES (1, 'My Life as a Goat', 'I went to Nepal to live as a goat, and it was much better than being a butler.', 'E. Blackadder');
9-
INSERT INTO articletest VALUES (2, 'Magnificent Octopus', 'Once upon a time there was a lovely little sausage.', 'S. Baldrick');
8+
INSERT INTO articletest (title, content, authorname) VALUES
9+
(
10+
'My Life as a Goat',
11+
'I went to Nepal to live as a goat, and it was much better than being a butler.',
12+
'E. Blackadder'
13+
),
14+
(
15+
'Magnificent Octopus',
16+
'Once upon a time there was a lovely little sausage.',
17+
'S. Baldrick'
18+
);

examples/rust-outbound-pg/spin.toml

+2-4
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@ trigger = { type = "http", base = "/" }
55
version = "0.1.0"
66

77
[[component]]
8-
environment = { DB_URL = "host=localhost user=postgres password=123 dbname=postgres" }
8+
environment = { DB_URL = "host=localhost user=postgres dbname=spin_dev" }
99
id = "outbound-pg"
1010
source = "target/wasm32-wasi/release/rust_outbound_pg.wasm"
1111
[component.trigger]
12-
#route = "/write"
13-
route = "/read"
12+
route = "/..."
1413
[component.build]
1514
command = "cargo build --target wasm32-wasi --release"
16-

examples/rust-outbound-pg/src/lib.rs

+57-6
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,21 @@ struct Article {
1818
}
1919

2020
#[http_component]
21+
fn process(req: Request) -> Result<Response> {
22+
match req.uri().path() {
23+
"/read" => read(req),
24+
"/write" => write(req),
25+
"/pg_backend_pid" => pg_backend_pid(req),
26+
_ => Ok(http::Response::builder()
27+
.status(404)
28+
.body(Some("Not found".into()))?),
29+
}
30+
}
31+
2132
fn read(_req: Request) -> Result<Response> {
2233
let address = std::env::var(DB_URL_ENV)?;
2334

24-
let sql = "select id, title, content, authorname from articletest";
35+
let sql = "SELECT id, title, content, authorname FROM articletest";
2536
let rowset = pg::query(&address, sql, &[])
2637
.map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?;
2738

@@ -64,18 +75,48 @@ fn read(_req: Request) -> Result<Response> {
6475
.status(200)
6576
.body(Some(response.into()))?)
6677
}
67-
/*
78+
6879
fn write(_req: Request) -> Result<Response> {
6980
let address = std::env::var(DB_URL_ENV)?;
7081

71-
let sql = "insert into articletest values ('aaa', 'bbb', 'ccc')";
72-
let nrow_executed = pg::execute(&address, sql, &vec![]).map_err(|_| anyhow!("Error execute pg command"))?;
82+
let sql = "INSERT INTO articletest (title, content, authorname) VALUES ('aaa', 'bbb', 'ccc')";
83+
let nrow_executed =
84+
pg::execute(&address, sql, &[]).map_err(|_| anyhow!("Error execute pg command"))?;
7385

7486
println!("nrow_executed: {}", nrow_executed);
7587

76-
Ok(http::Response::builder().status(200).body(None)?)
88+
let sql = "SELECT COUNT(id) FROM articletest";
89+
let rowset = pg::query(&address, sql, &[])
90+
.map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?;
91+
let row = &rowset.rows[0];
92+
let count = as_bigint(&row[0])?;
93+
let response = format!("Count: {}\n", count);
94+
95+
Ok(http::Response::builder()
96+
.status(200)
97+
.body(Some(response.into()))?)
98+
}
99+
100+
fn pg_backend_pid(_req: Request) -> Result<Response> {
101+
let address = std::env::var(DB_URL_ENV)?;
102+
let sql = "SELECT pg_backend_pid()";
103+
104+
let get_pid = || {
105+
let rowset = pg::query(&address, sql, &[])
106+
.map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?;
107+
108+
let row = &rowset.rows[0];
109+
as_int(&row[0])
110+
};
111+
112+
assert_eq!(get_pid()?, get_pid()?);
113+
114+
let response = format!("pg_backend_pid: {}\n", get_pid()?);
115+
116+
Ok(http::Response::builder()
117+
.status(200)
118+
.body(Some(response.into()))?)
77119
}
78-
*/
79120

80121
fn as_owned_string(value: &pg::DbValue) -> anyhow::Result<String> {
81122
match value {
@@ -94,6 +135,16 @@ fn as_int(value: &pg::DbValue) -> anyhow::Result<i32> {
94135
}
95136
}
96137

138+
fn as_bigint(value: &pg::DbValue) -> anyhow::Result<i64> {
139+
match value {
140+
pg::DbValue::Int64(n) => Ok(*n),
141+
_ => Err(anyhow!(
142+
"Expected integer from database but got {:?}",
143+
value
144+
)),
145+
}
146+
}
147+
97148
fn format_col(column: &pg::Column) -> String {
98149
format!("{}:{:?}", column.name, column.data_type)
99150
}

0 commit comments

Comments
 (0)