Skip to content

Commit 9465448

Browse files
committed
feat(query): Fetch query response in JSON format.
Allows row deserialization into a `T: Deserialize`, which eliminates the limitations of `Query::fetch`: * when the table schema is not known: `SELECT * from ?` * when the table schema is not specified: `DESCRIBE TABLE ?` * when we read less columns than we select
1 parent 2ecfa84 commit 9465448

File tree

3 files changed

+169
-0
lines changed

3 files changed

+169
-0
lines changed

src/query.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::fmt;
44
use url::Url;
55

66
use crate::headers::with_request_headers;
7+
#[cfg(feature = "watch")]
8+
use crate::watch;
79
use crate::{
810
cursor::RowBinaryCursor,
911
error::{Error, Result},
@@ -90,6 +92,60 @@ impl Query {
9092
Ok(RowCursor(RowBinaryCursor::new(response)))
9193
}
9294

95+
/// Executes the query, returning a [`RowJsonCursor`] to obtain results.
96+
#[cfg(feature = "watch")]
97+
pub fn json<T>(mut self) -> Result<watch::RowJsonCursor<T>> {
98+
self.sql.append(" FORMAT JSONEachRowWithProgress");
99+
100+
let response = self.do_execute(true)?;
101+
Ok(watch::RowJsonCursor::new(response))
102+
}
103+
104+
/// Executes the query and returns just a single row.
105+
///
106+
/// Note that `T` must be owned.
107+
#[cfg(feature = "watch")]
108+
pub async fn json_one<T>(self) -> Result<T>
109+
where
110+
T: for<'b> Deserialize<'b>,
111+
{
112+
match self.json()?.next().await {
113+
Ok(Some(row)) => Ok(row),
114+
Ok(None) => Err(Error::RowNotFound),
115+
Err(err) => Err(err),
116+
}
117+
}
118+
119+
/// Executes the query and returns at most one row.
120+
///
121+
/// Note that `T` must be owned.
122+
#[cfg(feature = "watch")]
123+
pub async fn json_optional<T>(self) -> Result<Option<T>>
124+
where
125+
T: for<'b> Deserialize<'b>,
126+
{
127+
self.json()?.next().await
128+
}
129+
130+
/// Executes the query and returns all the generated results,
131+
/// collected into a [`Vec`].
132+
///
133+
/// Note that `T` must be owned.
134+
#[cfg(feature = "watch")]
135+
pub async fn json_all<T>(self) -> Result<Vec<T>>
136+
where
137+
T: for<'b> Deserialize<'b>,
138+
{
139+
let mut result = Vec::new();
140+
let mut cursor = self.json::<T>()?;
141+
142+
while let Some(row) = cursor.next().await? {
143+
result.push(row);
144+
}
145+
146+
Ok(result)
147+
}
148+
93149
/// Executes the query and returns just a single row.
94150
///
95151
/// Note that `T` must be owned.

src/watch.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use sha1::{Digest, Sha1};
66
use crate::{
77
cursor::JsonCursor,
88
error::{Error, Result},
9+
response::Response,
910
row::Row,
1011
sql::{Bind, SqlBuilder},
1112
Client, Compression,
@@ -163,6 +164,23 @@ impl EventCursor {
163164
}
164165
}
165166

167+
/// A cursor that emits rows in JSON format.
168+
pub struct RowJsonCursor<T>(JsonCursor<T>);
169+
170+
impl<T> RowJsonCursor<T> {
171+
pub(crate) fn new(response: Response) -> Self {
172+
Self(JsonCursor::new(response))
173+
}
174+
175+
/// Emits the next row.
176+
pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>>
177+
where
178+
T: Deserialize<'b>,
179+
{
180+
self.0.next().await
181+
}
182+
}
183+
166184
// === RowCursor ===
167185

168186
/// A cursor that emits `(Version, T)`.

tests/it/query.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,101 @@ async fn overrides_client_options() {
215215
assert_eq!(value, override_value);
216216
}
217217

218+
#[cfg(feature = "watch")]
219+
#[tokio::test]
220+
async fn fetches_json_row() {
221+
let client = prepare_database!();
222+
223+
let value = client
224+
.query("SELECT 1,2,3")
225+
.json_one::<serde_json::Value>()
226+
.await
227+
.unwrap();
228+
229+
assert_eq!(value, serde_json::json!({ "1": 1, "2": 2, "3": 3}));
230+
231+
let value = client
232+
.query("SELECT (1,2,3) as data")
233+
.json_one::<serde_json::Value>()
234+
.await
235+
.unwrap();
236+
237+
assert_eq!(value, serde_json::json!({ "data": [1,2,3]}));
238+
}
239+
240+
#[cfg(feature = "watch")]
241+
#[tokio::test]
242+
async fn fetches_json_struct() {
243+
let client = prepare_database!();
244+
245+
#[derive(Debug, Deserialize, PartialEq)]
246+
struct Row {
247+
one: i8,
248+
two: String,
249+
three: f32,
250+
four: bool,
251+
}
252+
253+
let value = client
254+
.query("SELECT -1 as one, '2' as two, 3.0 as three, false as four")
255+
.json_one::<Row>()
256+
.await
257+
.unwrap();
258+
259+
assert_eq!(
260+
value,
261+
Row {
262+
one: -1,
263+
two: "2".to_owned(),
264+
three: 3.0,
265+
four: false,
266+
}
267+
);
268+
}
269+
270+
#[cfg(feature = "watch")]
271+
#[tokio::test]
272+
async fn describes_table() {
273+
let client = prepare_database!();
274+
275+
let columns = client
276+
.query("DESCRIBE TABLE system.users")
277+
.json_all::<serde_json::Value>()
278+
.await
279+
.unwrap();
280+
for c in &columns {
281+
println!("{c}");
282+
}
283+
let columns = columns
284+
.into_iter()
285+
.map(|row| {
286+
let column_name = row
287+
.as_object()
288+
.expect("JSONEachRow")
289+
.get("name")
290+
.expect("`system.users` must contain the `name` column");
291+
(column_name.as_str().unwrap().to_owned(), row)
292+
})
293+
.collect::<std::collections::HashMap<String, serde_json::Value>>();
294+
dbg!(&columns);
295+
296+
let name_column = columns
297+
.get("name")
298+
.expect("`system.users` must contain the `name` column");
299+
assert_eq!(
300+
name_column.as_object().unwrap().get("type").unwrap(),
301+
&serde_json::json!("String")
302+
);
303+
304+
let id_column = columns
305+
.get("id")
306+
.expect("`system.users` must contain the `id` column");
307+
assert_eq!(
308+
id_column.as_object().unwrap().get("type").unwrap(),
309+
&serde_json::json!("UUID")
310+
);
311+
}
312+
218313
#[tokio::test]
219314
async fn prints_query() {
220315
let client = prepare_database!();

0 commit comments

Comments
 (0)