Skip to content

Commit b68ddbf

Browse files
authored
[Cosmos] Cross-Partition queries, with optional external Query Engine integration (#2577)
* begin work on query engine * update non-query-engine tests * implement query-via-query-engine * add 'deque' to rust cspell dictionary * remove raw_value and tracing features from dependencies * update assets.json for cosmos * address clippy lint * rename Context::into_borrowed to Context::to_borrowed * a few docs updates * pr feedback * re-enable tracing feature on azure_data_cosmos tests
1 parent 93f9bab commit b68ddbf

25 files changed

+1099
-184
lines changed

eng/dict/rust-custom.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ bindgen
22
cfg
33
cfgs
44
consts
5+
deque
56
impls
67
newtype
78
oneshot

sdk/core/azure_core/src/http/pager.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,18 @@ impl<T> PageStream<T> {
120120
stream: Box::pin(stream),
121121
}
122122
}
123+
124+
/// Creates a [`Pager<T>`] from a raw stream of [`Result<T>`](typespec::Result<T>) values.
125+
///
126+
/// This constructor is used when you are implementing a completely custom stream and want to use it as a pager.
127+
pub fn from_stream<S>(stream: S) -> Self
128+
where
129+
S: Stream<Item = Result<T, Error>> + Send + 'static,
130+
{
131+
Self {
132+
stream: Box::pin(stream),
133+
}
134+
}
123135
}
124136

125137
impl<T> futures::Stream for PageStream<T> {

sdk/cosmos/assets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"AssetsRepo": "Azure/azure-sdk-assets",
33
"AssetsRepoPrefixPath": "rust",
4-
"Tag": "rust/azure_data_cosmos_43a7f435b6",
4+
"Tag": "rust/azure_data_cosmos_ff23846344",
55
"TagPrefix": "rust/azure_data_cosmos"
66
}

sdk/cosmos/azure_data_cosmos/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ url.workspace = true
2525

2626
[dev-dependencies]
2727
azure_identity.workspace = true
28-
azure_core_test.workspace = true
28+
azure_core_test = { workspace = true, features = ["tracing"] }
2929
clap.workspace = true
3030
reqwest.workspace = true
3131
time.workspace = true
@@ -38,6 +38,7 @@ workspace = true
3838
[features]
3939
default = ["hmac_rust"]
4040
key_auth = [] # Enables support for key-based authentication (Primary Keys and Resource Tokens)
41+
preview_query_engine = [] # Enables support for the PREVIEW external query engine
4142
hmac_rust = ["azure_core/hmac_rust"]
4243
hmac_openssl = ["azure_core/hmac_openssl"]
4344

sdk/cosmos/azure_data_cosmos/examples/cosmos/query.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ enum Subcommands {
2525

2626
/// The partition key to use when querying the container. Currently this only supports a single string partition key.
2727
#[arg(long, short)]
28-
partition_key: String,
28+
partition_key: Option<String>,
2929
},
3030
Databases {
3131
/// The query to execute.
@@ -52,7 +52,11 @@ impl QueryCommand {
5252
let db_client = client.database_client(&database);
5353
let container_client = db_client.container_client(&container);
5454

55-
let pk = PartitionKey::from(&partition_key);
55+
let pk = match partition_key {
56+
Some(pk) => PartitionKey::from(pk),
57+
None => PartitionKey::EMPTY,
58+
};
59+
5660
let mut items =
5761
container_client.query_items::<serde_json::Value>(&query, pk, None)?;
5862

sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,22 @@ use crate::{
77
options::{QueryOptions, ReadContainerOptions},
88
pipeline::CosmosPipeline,
99
resource_context::{ResourceLink, ResourceType},
10-
DeleteContainerOptions, FeedPager, ItemOptions, PartitionKey, Query, QueryPartitionStrategy,
11-
ReplaceContainerOptions, ThroughputOptions,
10+
DeleteContainerOptions, FeedPager, ItemOptions, PartitionKey, Query, ReplaceContainerOptions,
11+
ThroughputOptions,
1212
};
1313

14-
use azure_core::http::{headers, request::Request, response::Response, Method};
14+
use azure_core::http::{
15+
headers,
16+
request::{options::ContentType, Request},
17+
response::Response,
18+
Method,
19+
};
1520
use serde::{de::DeserializeOwned, Serialize};
1621

1722
/// A client for working with a specific container in a Cosmos DB account.
1823
///
1924
/// You can get a `Container` by calling [`DatabaseClient::container_client()`](crate::clients::DatabaseClient::container_client()).
25+
#[derive(Clone)]
2026
pub struct ContainerClient {
2127
link: ResourceLink,
2228
items_link: ResourceLink,
@@ -109,6 +115,7 @@ impl ContainerClient {
109115
let options = options.unwrap_or_default();
110116
let url = self.pipeline.url(&self.link);
111117
let mut req = Request::new(url, Method::Put);
118+
req.insert_headers(&ContentType::APPLICATION_JSON)?;
112119
req.set_json(&properties)?;
113120
self.pipeline
114121
.send(options.method_options.context, &mut req, self.link.clone())
@@ -260,6 +267,7 @@ impl ContainerClient {
260267
req.insert_header(headers::PREFER, constants::PREFER_MINIMAL);
261268
}
262269
req.insert_headers(&partition_key.into())?;
270+
req.insert_headers(&ContentType::APPLICATION_JSON)?;
263271
req.set_json(&item)?;
264272
self.pipeline
265273
.send(
@@ -351,6 +359,7 @@ impl ContainerClient {
351359
req.insert_header(headers::PREFER, constants::PREFER_MINIMAL);
352360
}
353361
req.insert_headers(&partition_key.into())?;
362+
req.insert_headers(&ContentType::APPLICATION_JSON)?;
354363
req.set_json(&item)?;
355364
self.pipeline
356365
.send(options.method_options.context, &mut req, link)
@@ -440,6 +449,7 @@ impl ContainerClient {
440449
}
441450
req.insert_header(constants::IS_UPSERT, "true");
442451
req.insert_headers(&partition_key.into())?;
452+
req.insert_headers(&ContentType::APPLICATION_JSON)?;
443453
req.set_json(&item)?;
444454
self.pipeline
445455
.send(
@@ -453,7 +463,7 @@ impl ContainerClient {
453463
/// Reads a specific item from the container.
454464
///
455465
/// # Arguments
456-
/// * `partition_key` - The partition key of the item to read.
466+
/// * `partition_key` - The partition key of the item to read. See [`PartitionKey`] for more information on how to specify a partition key.
457467
/// * `item_id` - The id of the item to read.
458468
/// * `options` - Optional parameters for the request
459469
///
@@ -606,6 +616,7 @@ impl ContainerClient {
606616
req.insert_header(headers::PREFER, constants::PREFER_MINIMAL);
607617
}
608618
req.insert_headers(&partition_key.into())?;
619+
req.insert_headers(&ContentType::APPLICATION_JSON)?;
609620
req.set_json(&patch)?;
610621

611622
self.pipeline
@@ -625,12 +636,18 @@ impl ContainerClient {
625636
/// # Arguments
626637
///
627638
/// * `query` - The query to execute.
628-
/// * `partition_key_strategy` - The partition key to scope the query on.
639+
/// * `partition_key` - The partition key to scope the query on, or specify an empty key (`()`) to perform a cross-partition query.
629640
/// * `options` - Optional parameters for the request.
630641
///
642+
/// # Cross Partition Queries
643+
///
644+
/// Cross-partition queries are significantly limited in the current version of the Cosmos DB SDK.
645+
/// They are run on the gateway and limited to simple projections (`SELECT`) and filtering (`WHERE`).
646+
/// For more details, see [the Cosmos DB documentation page on cross-partition queries](https://learn.microsoft.com/en-us/rest/api/cosmos-db/querying-cosmosdb-resources-using-the-rest-api#queries-that-cannot-be-served-by-gateway).
647+
///
631648
/// # Examples
632649
///
633-
/// The `query` and `partition_key_strategy` parameters accept anything that can be transformed [`Into`] their relevant types.
650+
/// The `query` and `partition_key` parameters accept anything that can be transformed [`Into`] their relevant types.
634651
/// This allows simple queries without parameters to be expressed easily:
635652
///
636653
/// ```rust,no_run
@@ -666,23 +683,38 @@ impl ContainerClient {
666683
/// ```
667684
///
668685
/// See [`PartitionKey`](crate::PartitionKey) for more information on how to specify a partition key, and [`Query`] for more information on how to specify a query.
669-
pub fn query_items<T: DeserializeOwned + Send>(
686+
pub fn query_items<T: DeserializeOwned + 'static>(
670687
&self,
671688
query: impl Into<Query>,
672-
partition_key: impl Into<QueryPartitionStrategy>,
689+
partition_key: impl Into<PartitionKey>,
673690
options: Option<QueryOptions<'_>>,
674691
) -> azure_core::Result<FeedPager<T>> {
675-
let options = options.unwrap_or_default();
676-
let url = self.pipeline.url(&self.items_link);
677-
let mut base_request = Request::new(url, Method::Post);
678-
let QueryPartitionStrategy::SinglePartition(partition_key) = partition_key.into();
679-
base_request.insert_headers(&partition_key)?;
692+
#[cfg_attr(not(feature = "preview_query_engine"), allow(unused_mut))]
693+
let mut options = options.unwrap_or_default();
694+
let partition_key = partition_key.into();
695+
let query = query.into();
696+
697+
#[cfg(feature = "preview_query_engine")]
698+
if partition_key.is_empty() {
699+
if let Some(query_engine) = options.query_engine.take() {
700+
return crate::query::executor::QueryExecutor::new(
701+
self.pipeline.clone(),
702+
self.link.clone(),
703+
query,
704+
options,
705+
query_engine,
706+
)?
707+
.into_stream();
708+
}
709+
}
680710

711+
let url = self.pipeline.url(&self.items_link);
681712
self.pipeline.send_query_request(
682713
options.method_options.context,
683-
query.into(),
684-
base_request,
714+
query,
715+
url,
685716
self.items_link.clone(),
717+
|r| r.insert_headers(&partition_key),
686718
)
687719
}
688720
}

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ use crate::{
1010
};
1111
use azure_core::{
1212
credentials::TokenCredential,
13-
http::{request::Request, response::Response, Method, Url},
13+
http::{
14+
request::{options::ContentType, Request},
15+
response::Response,
16+
Method, Url,
17+
},
1418
};
1519
use serde::Serialize;
1620
use std::sync::Arc;
@@ -135,13 +139,13 @@ impl CosmosClient {
135139
) -> azure_core::Result<FeedPager<DatabaseProperties>> {
136140
let options = options.unwrap_or_default();
137141
let url = self.pipeline.url(&self.databases_link);
138-
let base_request = Request::new(url, Method::Post);
139142

140143
self.pipeline.send_query_request(
141144
options.method_options.context,
142145
query.into(),
143-
base_request,
146+
url,
144147
self.databases_link.clone(),
148+
|_| Ok(()),
145149
)
146150
}
147151

@@ -167,6 +171,7 @@ impl CosmosClient {
167171
let url = self.pipeline.url(&self.databases_link);
168172
let mut req = Request::new(url, Method::Post);
169173
req.insert_headers(&options.throughput)?;
174+
req.insert_headers(&ContentType::APPLICATION_JSON)?;
170175
req.set_json(&RequestBody { id })?;
171176

172177
self.pipeline

sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ use crate::{
1111
ThroughputOptions,
1212
};
1313

14-
use azure_core::http::{request::Request, response::Response, Method};
14+
use azure_core::http::{
15+
request::{options::ContentType, Request},
16+
response::Response,
17+
Method,
18+
};
1519

1620
/// A client for working with a specific database in a Cosmos DB account.
1721
///
@@ -110,13 +114,13 @@ impl DatabaseClient {
110114
) -> azure_core::Result<FeedPager<ContainerProperties>> {
111115
let options = options.unwrap_or_default();
112116
let url = self.pipeline.url(&self.containers_link);
113-
let base_request = Request::new(url, Method::Post);
114117

115118
self.pipeline.send_query_request(
116119
options.method_options.context,
117120
query.into(),
118-
base_request,
121+
url,
119122
self.containers_link.clone(),
123+
|_| Ok(()),
120124
)
121125
}
122126

@@ -136,6 +140,7 @@ impl DatabaseClient {
136140
let url = self.pipeline.url(&self.containers_link);
137141
let mut req = Request::new(url, Method::Post);
138142
req.insert_headers(&options.throughput)?;
143+
req.insert_headers(&ContentType::APPLICATION_JSON)?;
139144
req.set_json(&properties)?;
140145

141146
self.pipeline

sdk/cosmos/azure_data_cosmos/src/constants.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ use azure_core::http::{
1313

1414
pub const QUERY: HeaderName = HeaderName::from_static("x-ms-documentdb-query");
1515
pub const PARTITION_KEY: HeaderName = HeaderName::from_static("x-ms-documentdb-partitionkey");
16+
pub const PARTITION_KEY_RANGE_ID: HeaderName =
17+
HeaderName::from_static("x-ms-documentdb-partitionkeyrangeid");
18+
pub const QUERY_ENABLE_CROSS_PARTITION: HeaderName =
19+
HeaderName::from_static("x-ms-documentdb-query-enablecrosspartition");
20+
pub const IS_QUERY_PLAN_REQUEST: HeaderName =
21+
HeaderName::from_static("x-ms-cosmos-is-query-plan-request");
22+
pub const SUPPORTED_QUERY_FEATURES: HeaderName =
23+
HeaderName::from_static("x-ms-cosmos-supported-query-features");
1624
pub const CONTINUATION: HeaderName = HeaderName::from_static("x-ms-continuation");
1725
pub const INDEX_METRICS: HeaderName = HeaderName::from_static("x-ms-cosmos-index-utilization");
1826
pub const QUERY_METRICS: HeaderName = HeaderName::from_static("x-ms-documentdb-query-metrics");

sdk/cosmos/azure_data_cosmos/src/feed.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::constants;
1111
/// Cosmos DB queries can be executed using non-HTTP transports, depending on the circumstances.
1212
/// They may also produce results that don't directly correlate to specific HTTP responses (as in the case of cross-partition queries).
1313
/// Because of this, Cosmos DB query responses use `FeedPage` to represent the results, rather than a more generic type like [`Response`](azure_core::http::Response).
14+
#[derive(Debug)]
1415
pub struct FeedPage<T> {
1516
/// The items in the response.
1617
items: Vec<T>,
@@ -24,6 +25,16 @@ pub struct FeedPage<T> {
2425
}
2526

2627
impl<T> FeedPage<T> {
28+
/// Creates a new `FeedPage` instance.
29+
#[cfg_attr(not(feature = "preview_query_engine"), allow(dead_code))]
30+
pub(crate) fn new(items: Vec<T>, continuation: Option<String>, headers: Headers) -> Self {
31+
Self {
32+
items,
33+
continuation,
34+
headers,
35+
}
36+
}
37+
2738
/// Gets the items in this page of results.
2839
pub fn items(&self) -> &[T] {
2940
&self.items

sdk/cosmos/azure_data_cosmos/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ mod feed;
1616
mod options;
1717
mod partition_key;
1818
pub(crate) mod pipeline;
19-
mod query;
19+
pub mod query;
2020
pub(crate) mod resource_context;
2121
pub(crate) mod utils;
2222

@@ -27,6 +27,6 @@ pub use clients::CosmosClient;
2727

2828
pub use options::*;
2929
pub use partition_key::*;
30-
pub use query::*;
30+
pub use query::Query;
3131

3232
pub use feed::{FeedPage, FeedPager};

sdk/cosmos/azure_data_cosmos/src/options/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,25 @@ pub struct QueryDatabasesOptions<'a> {
7070
#[derive(Clone, Default)]
7171
pub struct QueryOptions<'a> {
7272
pub method_options: ClientMethodOptions<'a>,
73+
74+
/// An external query engine to use for executing the query.
75+
///
76+
/// NOTE: This is an unstable feature and may change in the future.
77+
/// Specifically, the query engine may be built-in to the SDK in the future, and this option may be removed entirely.
78+
#[cfg(feature = "preview_query_engine")]
79+
pub query_engine: Option<crate::query::QueryEngineRef>,
80+
}
81+
82+
impl QueryOptions<'_> {
83+
pub fn into_owned(self) -> QueryOptions<'static> {
84+
QueryOptions {
85+
method_options: ClientMethodOptions {
86+
context: self.method_options.context.into_owned(),
87+
},
88+
#[cfg(feature = "preview_query_engine")]
89+
query_engine: self.query_engine,
90+
}
91+
}
7392
}
7493

7594
/// Options to be passed to [`ContainerClient::read()`](crate::clients::ContainerClient::read()).

0 commit comments

Comments
 (0)