Skip to content

Commit 38b722c

Browse files
committed
feat(query): Fetch query response in JSON format.
Allows row deserialization into a `T: Deserialize`, which eliminates the limitations of `Query::fetch`: * when the table schema is not known: `SELECT * from ?` * when the table schema is not specified: `DESCRIBE TABLE ?` * when we read less columns than we select
1 parent 238d6ba commit 38b722c

File tree

3 files changed

+169
-0
lines changed

3 files changed

+169
-0
lines changed

src/query.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use serde::{Deserialize, Serialize};
33
use std::fmt::Display;
44
use url::Url;
55

6+
#[cfg(feature = "watch")]
7+
use crate::watch;
68
use crate::{
79
error::{Error, Result},
810
headers::with_request_headers,
@@ -90,6 +92,60 @@ impl Query {
9092
Ok(RowCursor::new(response))
9193
}
9294

95+
/// Executes the query, returning a [`watch::RowJsonCursor`] to obtain results.
96+
#[cfg(feature = "watch")]
97+
pub fn fetch_json<T>(mut self) -> Result<watch::RowJsonCursor<T>> {
98+
self.sql.append(" FORMAT JSONEachRowWithProgress");
99+
100+
let response = self.do_execute(true)?;
101+
Ok(watch::RowJsonCursor::new(response))
102+
}
103+
104+
/// Executes the query and returns just a single row.
105+
///
106+
/// Note that `T` must be owned.
107+
#[cfg(feature = "watch")]
108+
pub async fn fetch_json_one<T>(self) -> Result<T>
109+
where
110+
T: for<'b> Deserialize<'b>,
111+
{
112+
match self.fetch_json()?.next().await {
113+
Ok(Some(row)) => Ok(row),
114+
Ok(None) => Err(Error::RowNotFound),
115+
Err(err) => Err(err),
116+
}
117+
}
118+
119+
/// Executes the query and returns at most one row.
120+
///
121+
/// Note that `T` must be owned.
122+
#[cfg(feature = "watch")]
123+
pub async fn fetch_json_optional<T>(self) -> Result<Option<T>>
124+
where
125+
T: for<'b> Deserialize<'b>,
126+
{
127+
self.fetch_json()?.next().await
128+
}
129+
130+
/// Executes the query and returns all the generated results,
131+
/// collected into a [`Vec`].
132+
///
133+
/// Note that `T` must be owned.
134+
#[cfg(feature = "watch")]
135+
pub async fn fetch_json_all<T>(self) -> Result<Vec<T>>
136+
where
137+
T: for<'b> Deserialize<'b>,
138+
{
139+
let mut result = Vec::new();
140+
let mut cursor = self.fetch_json::<T>()?;
141+
142+
while let Some(row) = cursor.next().await? {
143+
result.push(row);
144+
}
145+
146+
Ok(result)
147+
}
148+
93149
/// Executes the query and returns just a single row.
94150
///
95151
/// Note that `T` must be owned.

src/watch.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use sha1::{Digest, Sha1};
66
use crate::{
77
cursor::JsonCursor,
88
error::{Error, Result},
9+
response::Response,
910
row::Row,
1011
sql::{Bind, SqlBuilder},
1112
Client, Compression,
@@ -165,6 +166,23 @@ impl EventCursor {
165166
}
166167
}
167168

169+
/// A cursor that emits rows in JSON format.
170+
pub struct RowJsonCursor<T>(JsonCursor<T>);
171+
172+
impl<T> RowJsonCursor<T> {
173+
pub(crate) fn new(response: Response) -> Self {
174+
Self(JsonCursor::new(response))
175+
}
176+
177+
/// Emits the next row.
178+
pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>>
179+
where
180+
T: Deserialize<'b>,
181+
{
182+
self.0.next().await
183+
}
184+
}
185+
168186
// === RowCursor ===
169187

170188
/// A cursor that emits `(Version, T)`.

tests/it/query.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,3 +263,98 @@ async fn prints_query() {
263263
"SELECT ?fields FROM test WHERE a = ? AND b < ?"
264264
);
265265
}
266+
267+
#[cfg(feature = "watch")]
268+
#[tokio::test]
269+
async fn fetches_json_row() {
270+
let client = prepare_database!();
271+
272+
let value = client
273+
.query("SELECT 1,2,3")
274+
.fetch_json_one::<serde_json::Value>()
275+
.await
276+
.unwrap();
277+
278+
assert_eq!(value, serde_json::json!({ "1": 1, "2": 2, "3": 3}));
279+
280+
let value = client
281+
.query("SELECT (1,2,3) as data")
282+
.fetch_json_one::<serde_json::Value>()
283+
.await
284+
.unwrap();
285+
286+
assert_eq!(value, serde_json::json!({ "data": [1,2,3]}));
287+
}
288+
289+
#[cfg(feature = "watch")]
290+
#[tokio::test]
291+
async fn fetches_json_struct() {
292+
let client = prepare_database!();
293+
294+
#[derive(Debug, Deserialize, PartialEq)]
295+
struct Row {
296+
one: i8,
297+
two: String,
298+
three: f32,
299+
four: bool,
300+
}
301+
302+
let value = client
303+
.query("SELECT -1 as one, '2' as two, 3.0 as three, false as four")
304+
.fetch_json_one::<Row>()
305+
.await
306+
.unwrap();
307+
308+
assert_eq!(
309+
value,
310+
Row {
311+
one: -1,
312+
two: "2".to_owned(),
313+
three: 3.0,
314+
four: false,
315+
}
316+
);
317+
}
318+
319+
#[cfg(feature = "watch")]
320+
#[tokio::test]
321+
async fn describes_table() {
322+
let client = prepare_database!();
323+
324+
let columns = client
325+
.query("DESCRIBE TABLE system.users")
326+
.fetch_json_all::<serde_json::Value>()
327+
.await
328+
.unwrap();
329+
for c in &columns {
330+
println!("{c}");
331+
}
332+
let columns = columns
333+
.into_iter()
334+
.map(|row| {
335+
let column_name = row
336+
.as_object()
337+
.expect("JSONEachRow")
338+
.get("name")
339+
.expect("`system.users` must contain the `name` column");
340+
(column_name.as_str().unwrap().to_owned(), row)
341+
})
342+
.collect::<std::collections::HashMap<String, serde_json::Value>>();
343+
dbg!(&columns);
344+
345+
let name_column = columns
346+
.get("name")
347+
.expect("`system.users` must contain the `name` column");
348+
assert_eq!(
349+
name_column.as_object().unwrap().get("type").unwrap(),
350+
&serde_json::json!("String")
351+
);
352+
353+
let id_column = columns
354+
.get("id")
355+
.expect("`system.users` must contain the `id` column");
356+
assert_eq!(
357+
id_column.as_object().unwrap().get("type").unwrap(),
358+
&serde_json::json!("UUID")
359+
);
360+
}

0 commit comments

Comments
 (0)