Skip to content

Commit 8a5be0a

Browse files
authored
Merge pull request #925 from muzarski/use_prepared_metadata_to_decode_rows
Use prepared statement result metadata to decode rows
2 parents 29f6744 + 58a9885 commit 8a5be0a

File tree

11 files changed

+224
-32
lines changed

11 files changed

+224
-32
lines changed

scylla-cql/benches/benchmark.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ fn make_query(contents: &str, values: SerializedValues) -> query::Query<'_> {
1414
consistency: scylla_cql::Consistency::LocalQuorum,
1515
serial_consistency: None,
1616
values: Cow::Owned(values),
17+
skip_metadata: false,
1718
page_size: None,
1819
paging_state: None,
1920
timestamp: None,

scylla-cql/src/frame/request/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ mod tests {
150150
timestamp: None,
151151
page_size: Some(323),
152152
paging_state: Some(vec![2, 1, 3, 7].into()),
153+
skip_metadata: false,
153154
values: {
154155
let mut vals = SerializedValues::new();
155156
vals.add_value(&2137, &ColumnType::Int).unwrap();
@@ -177,6 +178,7 @@ mod tests {
177178
timestamp: Some(3423434),
178179
page_size: None,
179180
paging_state: None,
181+
skip_metadata: false,
180182
values: {
181183
let mut vals = SerializedValues::new();
182184
vals.add_value(&42, &ColumnType::Int).unwrap();
@@ -234,6 +236,7 @@ mod tests {
234236
timestamp: None,
235237
page_size: None,
236238
paging_state: None,
239+
skip_metadata: false,
237240
values: Cow::Borrowed(SerializedValues::EMPTY),
238241
};
239242
let query = Query {

scylla-cql/src/frame/request/query.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub struct QueryParameters<'a> {
6363
pub timestamp: Option<i64>,
6464
pub page_size: Option<i32>,
6565
pub paging_state: Option<Bytes>,
66+
pub skip_metadata: bool,
6667
pub values: Cow<'a, SerializedValues>,
6768
}
6869

@@ -74,6 +75,7 @@ impl Default for QueryParameters<'_> {
7475
timestamp: None,
7576
page_size: None,
7677
paging_state: None,
78+
skip_metadata: false,
7779
values: Cow::Borrowed(SerializedValues::EMPTY),
7880
}
7981
}
@@ -88,6 +90,10 @@ impl QueryParameters<'_> {
8890
flags |= FLAG_VALUES;
8991
}
9092

93+
if self.skip_metadata {
94+
flags |= FLAG_SKIP_METADATA;
95+
}
96+
9197
if self.page_size.is_some() {
9298
flags |= FLAG_PAGE_SIZE;
9399
}
@@ -143,6 +149,7 @@ impl<'q> QueryParameters<'q> {
143149
)));
144150
}
145151
let values_flag = (flags & FLAG_VALUES) != 0;
152+
let skip_metadata = (flags & FLAG_SKIP_METADATA) != 0;
146153
let page_size_flag = (flags & FLAG_PAGE_SIZE) != 0;
147154
let paging_state_flag = (flags & FLAG_WITH_PAGING_STATE) != 0;
148155
let serial_consistency_flag = (flags & FLAG_WITH_SERIAL_CONSISTENCY) != 0;
@@ -192,6 +199,7 @@ impl<'q> QueryParameters<'q> {
192199
timestamp,
193200
page_size,
194201
paging_state,
202+
skip_metadata,
195203
values,
196204
})
197205
}

scylla-cql/src/frame/response/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ pub mod event;
55
pub mod result;
66
pub mod supported;
77

8-
use crate::{errors::QueryError, frame::frame_errors::ParseError};
9-
10-
use crate::frame::protocol_features::ProtocolFeatures;
118
pub use error::Error;
129
pub use supported::Supported;
1310

14-
use super::TryFromPrimitiveError;
11+
use crate::frame::protocol_features::ProtocolFeatures;
12+
use crate::frame::response::result::ResultMetadata;
13+
use crate::frame::TryFromPrimitiveError;
14+
use crate::{errors::QueryError, frame::frame_errors::ParseError};
1515

1616
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1717
#[repr(u8)]
@@ -64,6 +64,7 @@ impl Response {
6464
features: &ProtocolFeatures,
6565
opcode: ResponseOpcode,
6666
buf: &mut &[u8],
67+
cached_metadata: Option<&ResultMetadata>,
6768
) -> Result<Response, ParseError> {
6869
let response = match opcode {
6970
ResponseOpcode::Error => Response::Error(Error::deserialize(features, buf)?),
@@ -72,7 +73,7 @@ impl Response {
7273
Response::Authenticate(authenticate::Authenticate::deserialize(buf)?)
7374
}
7475
ResponseOpcode::Supported => Response::Supported(Supported::deserialize(buf)?),
75-
ResponseOpcode::Result => Response::Result(result::deserialize(buf)?),
76+
ResponseOpcode::Result => Response::Result(result::deserialize(buf, cached_metadata)?),
7677
ResponseOpcode::Event => Response::Event(event::Event::deserialize(buf)?),
7778
ResponseOpcode::AuthChallenge => {
7879
Response::AuthChallenge(authenticate::AuthChallenge::deserialize(buf)?)

scylla-cql/src/frame/response/result.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ pub struct ColumnSpec {
386386
pub typ: ColumnType,
387387
}
388388

389-
#[derive(Debug, Default)]
389+
#[derive(Debug, Clone, Default)]
390390
pub struct ResultMetadata {
391391
col_count: usize,
392392
pub paging_state: Option<Bytes>,
@@ -886,17 +886,29 @@ pub fn deser_cql_value(typ: &ColumnType, buf: &mut &[u8]) -> StdResult<CqlValue,
886886
})
887887
}
888888

889-
fn deser_rows(buf: &mut &[u8]) -> StdResult<Rows, ParseError> {
890-
let metadata = deser_result_metadata(buf)?;
889+
fn deser_rows(
890+
buf: &mut &[u8],
891+
cached_metadata: Option<&ResultMetadata>,
892+
) -> StdResult<Rows, ParseError> {
893+
let server_metadata = deser_result_metadata(buf)?;
894+
895+
let metadata = match cached_metadata {
896+
Some(metadata) => metadata.clone(),
897+
None => {
898+
// No cached_metadata provided. Server is supposed to provide the result metadata.
899+
if server_metadata.col_count != server_metadata.col_specs.len() {
900+
return Err(ParseError::BadIncomingData(format!(
901+
"Bad result metadata provided in the response. Expected {} column specifications, received: {}",
902+
server_metadata.col_count,
903+
server_metadata.col_specs.len()
904+
)));
905+
}
906+
server_metadata
907+
}
908+
};
891909

892910
let original_size = buf.len();
893911

894-
// TODO: the protocol allows an optimization (which must be explicitly requested on query by
895-
// the driver) where the column metadata is not sent with the result.
896-
// Implement this optimization. We'll then need to take the column types by a parameter.
897-
// Beware of races; our column types may be outdated.
898-
assert!(metadata.col_count == metadata.col_specs.len());
899-
900912
let rows_count: usize = types::read_int(buf)?.try_into()?;
901913

902914
let mut rows = Vec::with_capacity(rows_count);
@@ -946,11 +958,14 @@ fn deser_schema_change(buf: &mut &[u8]) -> StdResult<SchemaChange, ParseError> {
946958
})
947959
}
948960

949-
pub fn deserialize(buf: &mut &[u8]) -> StdResult<Result, ParseError> {
961+
pub fn deserialize(
962+
buf: &mut &[u8],
963+
cached_metadata: Option<&ResultMetadata>,
964+
) -> StdResult<Result, ParseError> {
950965
use self::Result::*;
951966
Ok(match types::read_int(buf)? {
952967
0x0001 => Void,
953-
0x0002 => Rows(deser_rows(buf)?),
968+
0x0002 => Rows(deser_rows(buf, cached_metadata)?),
954969
0x0003 => SetKeyspace(deser_set_keyspace(buf)?),
955970
0x0004 => Prepared(deser_prepared(buf)?),
956971
0x0005 => SchemaChange(deser_schema_change(buf)?),

scylla/src/statement/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub(crate) struct StatementConfig {
1616

1717
pub(crate) is_idempotent: bool,
1818

19+
pub(crate) skip_result_metadata: bool,
1920
pub(crate) tracing: bool,
2021
pub(crate) timestamp: Option<i64>,
2122
pub(crate) request_timeout: Option<Duration>,

scylla/src/statement/prepared_statement.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::time::Duration;
1010
use thiserror::Error;
1111
use uuid::Uuid;
1212

13-
use scylla_cql::frame::response::result::ColumnSpec;
13+
use scylla_cql::frame::response::result::{ColumnSpec, PartitionKeyIndex, ResultMetadata};
1414

1515
use super::StatementConfig;
1616
use crate::frame::response::result::PreparedMetadata;
@@ -37,6 +37,7 @@ pub struct PreparedStatement {
3737
#[derive(Debug)]
3838
struct PreparedStatementSharedData {
3939
metadata: PreparedMetadata,
40+
result_metadata: ResultMetadata,
4041
statement: String,
4142
}
4243

@@ -59,6 +60,7 @@ impl PreparedStatement {
5960
id: Bytes,
6061
is_lwt: bool,
6162
metadata: PreparedMetadata,
63+
result_metadata: ResultMetadata,
6264
statement: String,
6365
page_size: Option<i32>,
6466
config: StatementConfig,
@@ -67,6 +69,7 @@ impl PreparedStatement {
6769
id,
6870
shared: Arc::new(PreparedStatementSharedData {
6971
metadata,
72+
result_metadata,
7073
statement,
7174
}),
7275
prepare_tracing_ids: Vec::new(),
@@ -270,6 +273,27 @@ impl PreparedStatement {
270273
self.config.tracing
271274
}
272275

276+
/// Make use of cached metadata to decode results
277+
/// of the statement's execution.
278+
///
279+
/// If true, the driver will request the server not to
280+
/// attach the result metadata in response to the statement execution.
281+
///
282+
/// The driver will cache the result metadata received from the server
283+
/// after statement preparation and will use it
284+
/// to deserialize the results of statement execution.
285+
///
286+
/// This option is false by default.
287+
pub fn set_use_cached_result_metadata(&mut self, use_cached_metadata: bool) {
288+
self.config.skip_result_metadata = use_cached_metadata;
289+
}
290+
291+
/// Gets the information whether the driver uses cached metadata
292+
/// to decode the results of the statement's execution.
293+
pub fn get_use_cached_result_metadata(&self) -> bool {
294+
self.config.skip_result_metadata
295+
}
296+
273297
/// Sets the default timestamp for this statement in microseconds.
274298
/// If not None, it will replace the server side assigned timestamp as default timestamp
275299
/// If a statement contains a `USING TIMESTAMP` clause, calling this method won't change
@@ -301,11 +325,31 @@ impl PreparedStatement {
301325
self.partitioner_name = partitioner_name;
302326
}
303327

304-
/// Access metadata about this prepared statement as returned by the database
305-
pub fn get_prepared_metadata(&self) -> &PreparedMetadata {
328+
/// Access metadata about the bind variables of this statement as returned by the database
329+
pub(crate) fn get_prepared_metadata(&self) -> &PreparedMetadata {
306330
&self.shared.metadata
307331
}
308332

333+
/// Access column specifications of the bind variables of this statement
334+
pub fn get_variable_col_specs(&self) -> &[ColumnSpec] {
335+
&self.shared.metadata.col_specs
336+
}
337+
338+
/// Access info about partition key indexes of the bind variables of this statement
339+
pub fn get_variable_pk_indexes(&self) -> &[PartitionKeyIndex] {
340+
&self.shared.metadata.pk_indexes
341+
}
342+
343+
/// Access metadata about the result of prepared statement returned by the database
344+
pub(crate) fn get_result_metadata(&self) -> &ResultMetadata {
345+
&self.shared.result_metadata
346+
}
347+
348+
/// Access column specifications of the result set returned after the execution of this statement
349+
pub fn get_result_set_col_specs(&self) -> &[ColumnSpec] {
350+
&self.shared.result_metadata.col_specs
351+
}
352+
309353
/// Get the name of the partitioner used for this statement.
310354
pub(crate) fn get_partitioner_name(&self) -> &PartitionerName {
311355
&self.partitioner_name

scylla/src/transport/caching_session.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{QueryResult, Session};
88
use bytes::Bytes;
99
use dashmap::DashMap;
1010
use futures::future::try_join_all;
11-
use scylla_cql::frame::response::result::PreparedMetadata;
11+
use scylla_cql::frame::response::result::{PreparedMetadata, ResultMetadata};
1212
use scylla_cql::types::serialize::batch::BatchValues;
1313
use scylla_cql::types::serialize::row::SerializeRow;
1414
use std::collections::hash_map::RandomState;
@@ -23,6 +23,7 @@ struct RawPreparedStatementData {
2323
id: Bytes,
2424
is_confirmed_lwt: bool,
2525
metadata: PreparedMetadata,
26+
result_metadata: ResultMetadata,
2627
partitioner_name: PartitionerName,
2728
}
2829

@@ -168,6 +169,7 @@ where
168169
raw.id.clone(),
169170
raw.is_confirmed_lwt,
170171
raw.metadata.clone(),
172+
raw.result_metadata.clone(),
171173
query.contents,
172174
page_size,
173175
query.config,
@@ -195,6 +197,7 @@ where
195197
id: prepared.get_id().clone(),
196198
is_confirmed_lwt: prepared.is_confirmed_lwt(),
197199
metadata: prepared.get_prepared_metadata().clone(),
200+
result_metadata: prepared.get_result_metadata().clone(),
198201
partitioner_name: prepared.get_partitioner_name().clone(),
199202
};
200203
self.cache.insert(query_contents, raw);

0 commit comments

Comments
 (0)