Skip to content

Commit

Permalink
Various improvements:
Browse files Browse the repository at this point in the history
-Clippy suggestions (all levels enabled) - removing bad code, having const fns, adding must_use, etc
-Upgraded all packages to their latest versions, including azure
-Removing all unwrap()s, now they are replaced with something better, or except() for more info.
-Replaced lazy_static with once_cell
  • Loading branch information
AsafMah committed Jun 23, 2022
1 parent 0f81cc0 commit 9e45fff
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 296 deletions.
42 changes: 21 additions & 21 deletions azure-kusto-data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,40 @@ keywords = ["sdk", "azure", "kusto", "azure-data-explorer"]
categories = ["api-bindings"]

[dependencies]
arrow = { version = "13", optional = true }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "66db4b485ce56b68be148708d9c810960a50be51", features = [
arrow = { version = "15.0.0", optional = true }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "8586a66b20fba463c39156f0390e583ec305ab2d", features = [
"enable_reqwest",
"enable_reqwest_gzip",
] }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "66db4b485ce56b68be148708d9c810960a50be51" }
async-trait = "0.1"
async-convert = "1"
bytes = "1"
futures = "0.3"
http = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = { version = "1.12.0", features = ["json"] }
thiserror = "1"
lazy_static = "1.4.0"
hashbrown = "0.12.0"
regex = "1.5.5"
time = { version = "0.3.9", features = [
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "8586a66b20fba463c39156f0390e583ec305ab2d" }
async-trait = "0.1.56"
async-convert = "1.0.0"
bytes = "1.1.0"
futures = "0.3.21"
http = "0.2.8"
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
serde_with = { version = "1.12.1", features = ["json"] }
thiserror = "1.0.31"
hashbrown = "0.12.1"
regex = "1.5.6"
time = { version = "0.3.11", features = [
"serde",
"parsing",
"formatting",
"macros",
"serde-well-known",
] }
derive_builder = "0.11.2"
once_cell = "1.12.0"

[dev-dependencies]
arrow = { version = "13", features = ["prettyprint"] }
dotenv = "*"
arrow = { version = "15.0.0", features = ["prettyprint"] }
dotenv = "0.15.0"
env_logger = "0.9"
tokio = { version = "1", features = ["macros"] }
chrono = "*"
oauth2 = "*"
tokio = { version = "1.19.2", features = ["macros"] }
chrono = "0.4.19"
oauth2 = "4.2.0"

[features]
default = ["arrow"]
Expand Down
4 changes: 2 additions & 2 deletions azure-kusto-data/examples/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
&client_secret,
);

let client = KustoClient::try_from(kcsb).unwrap();
let client = KustoClient::try_from(kcsb).expect("Failed to create Kusto client");

let response = client
.execute_command(database, query)
.into_future()
.await
.unwrap();
.expect("Failed to execute query");

println!("command response: {:?}", response);

Expand Down
4 changes: 2 additions & 2 deletions azure-kusto-data/examples/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
&client_secret,
);

let client = KustoClient::try_from(kcsb).unwrap();
let client = KustoClient::try_from(kcsb).expect("Failed to create Kusto client");

let response = client
.execute_query(database, query)
.into_future()
.await
.unwrap();
.expect("Failed to execute query");

for table in &response.tables {
match table {
Expand Down
71 changes: 25 additions & 46 deletions azure-kusto-data/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use azure_core::error::{ErrorKind, ResultExt};

use crate::error::Result;
use crate::models::ColumnType;
use crate::models::*;
use crate::models::{Column, DataTable};
use crate::types::{KustoDateTime, KustoDuration};

fn convert_array_string(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let strings: Vec<Option<String>> = serde_json::from_value(serde_json::Value::Array(values))?;
let strings: Vec<Option<&str>> = strings.iter().map(|opt| opt.as_deref()).collect();
let strings: Vec<Option<&str>> = strings.iter().map(Option::as_deref).collect();
Ok(Arc::new(StringArray::from(strings)))
}

Expand Down Expand Up @@ -85,42 +85,23 @@ fn convert_array_i64(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
Ok(Arc::new(Int64Array::from(ints)))
}

pub fn convert_column(data: Vec<serde_json::Value>, column: Column) -> Result<(Field, ArrayRef)> {
pub fn convert_column(data: Vec<serde_json::Value>, column: &Column) -> Result<(Field, ArrayRef)> {
let column_name = &column.column_name;
match column.column_type {
ColumnType::String => convert_array_string(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Utf8, true),
data,
)
}),
ColumnType::Bool | ColumnType::Boolean => convert_array_bool(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Boolean, true),
data,
)
}),
ColumnType::Int => convert_array_i32(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Int32, true),
data,
)
}),
ColumnType::Long => convert_array_i64(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Int64, true),
data,
)
}),
ColumnType::Real => convert_array_float(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Float64, true),
data,
)
}),
ColumnType::String => convert_array_string(data)
.map(|data| (Field::new(column_name, DataType::Utf8, true), data)),
ColumnType::Bool | ColumnType::Boolean => convert_array_bool(data)
.map(|data| (Field::new(column_name, DataType::Boolean, true), data)),
ColumnType::Int => convert_array_i32(data)
.map(|data| (Field::new(column_name, DataType::Int32, true), data)),
ColumnType::Long => convert_array_i64(data)
.map(|data| (Field::new(column_name, DataType::Int64, true), data)),
ColumnType::Real => convert_array_float(data)
.map(|data| (Field::new(column_name, DataType::Float64, true), data)),
ColumnType::Datetime => convert_array_datetime(data).map(|data| {
(
Field::new(
column.column_name.as_str(),
column_name,
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
Expand All @@ -129,11 +110,7 @@ pub fn convert_column(data: Vec<serde_json::Value>, column: Column) -> Result<(F
}),
ColumnType::Timespan => convert_array_timespan(data).map(|data| {
(
Field::new(
column.column_name.as_str(),
DataType::Duration(TimeUnit::Nanosecond),
true,
),
Field::new(column_name, DataType::Duration(TimeUnit::Nanosecond), true),
data,
)
}),
Expand All @@ -158,7 +135,7 @@ pub fn convert_table(table: DataTable) -> Result<RecordBatch> {
buffer
.into_iter()
.zip(table.columns.into_iter())
.map(|(data, column)| convert_column(data, column))
.map(|(data, column)| convert_column(data, &column))
.try_for_each::<_, Result<()>>(|result| {
let (field, data) = result?;
fields.push(field);
Expand All @@ -173,6 +150,7 @@ pub fn convert_table(table: DataTable) -> Result<RecordBatch> {
#[cfg(test)]
mod tests {
use super::*;
use crate::models::TableKind;
use crate::operations::query::{KustoResponseDataSetV2, ResultTable};
use std::path::PathBuf;

Expand All @@ -188,7 +166,7 @@ mod tests {
column_name: "int_col".to_string(),
column_type: ColumnType::Int,
};
assert_eq!(c, ref_col)
assert_eq!(c, ref_col);
}

#[test]
Expand Down Expand Up @@ -218,23 +196,24 @@ mod tests {
}],
rows: vec![],
};
assert_eq!(t, ref_tbl)
assert_eq!(t, ref_tbl);
}

#[test]
fn read_data_types() {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/inputs/dataframe.json");

let data = std::fs::read_to_string(path).unwrap();
let tables: Vec<ResultTable> = serde_json::from_str(&data).unwrap();
let data = std::fs::read_to_string(path).expect("Failed to read file");
let tables: Vec<ResultTable> =
serde_json::from_str(&data).expect("Failed to deserialize result table");
let response = KustoResponseDataSetV2 { tables };
let record_batches = response
.into_record_batches()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
.expect("Failed to convert to record batches");

assert!(record_batches[0].num_columns() > 0);
assert!(record_batches[0].num_rows() > 0)
assert!(record_batches[0].num_rows() > 0);
}
}
9 changes: 3 additions & 6 deletions azure-kusto-data/src/authorization_policy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use azure_core::headers::{HeaderValue, AUTHORIZATION};
use azure_core::{auth::TokenCredential, Context, Policy, PolicyResult, Request};
use http::header::AUTHORIZATION;
use http::HeaderValue;
use std::sync::Arc;

#[derive(Clone)]
Expand Down Expand Up @@ -44,11 +43,9 @@ impl Policy for AuthorizationPolicy {
);

let token = self.credential.get_token(&self.resource).await?;
let auth_header_value = format!("Bearer {}", token.token.secret().clone());
let auth_header_value = format!("Bearer {}", token.token.secret());

request
.headers_mut()
.insert(AUTHORIZATION, HeaderValue::from_str(&auth_header_value)?);
request.insert_header(AUTHORIZATION, HeaderValue::from(auth_header_value));

next[0].send(ctx, request, &next[1..]).await
}
Expand Down
34 changes: 10 additions & 24 deletions azure-kusto-data/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ use crate::connection_string::{ConnectionString, ConnectionStringBuilder};
use crate::error::Result;
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};
use azure_core::auth::TokenCredential;
use azure_core::prelude::*;
use azure_core::{ClientOptions, Context, Pipeline, Request};
use azure_identity::token_credentials::{

use azure_core::{ClientOptions, Context, Pipeline};
use azure_identity::{
AzureCliCredential, ClientSecretCredential, DefaultAzureCredential,
ImdsManagedIdentityCredential, TokenCredentialOptions,
};
use http::Uri;

use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;

const API_VERSION: &str = "2019-02-13";

/// Options for specifying how a Kusto client will behave
#[derive(Clone, Default)]
pub struct KustoClientOptions {
Expand All @@ -24,6 +22,7 @@ pub struct KustoClientOptions {

impl KustoClientOptions {
/// Create new options
#[must_use]
pub fn new() -> Self {
Self::default()
}
Expand Down Expand Up @@ -57,7 +56,7 @@ fn new_pipeline_from_options(

/// Kusto client for Rust.
/// The client is a wrapper around the Kusto REST API.
/// To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/
/// To read more about it, go to [https://docs.microsoft.com/en-us/azure/kusto/api/rest/](https://docs.microsoft.com/en-us/azure/kusto/api/rest/)
///
/// The primary methods are:
/// `execute_query`: executes a KQL query against the Kusto service.
Expand Down Expand Up @@ -116,11 +115,11 @@ impl KustoClient {
.with_query(query.into())
.with_context(Context::new())
.build()
.unwrap()
.expect("Unexpected error when building query runner - please report this issue to the Kusto team")
}

/// Execute a KQL query.
/// To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/
/// To learn more about KQL go to [https://docs.microsoft.com/en-us/azure/kusto/query/](https://docs.microsoft.com/en-us/azure/kusto/query)
///
/// # Arguments
///
Expand All @@ -142,20 +141,7 @@ impl KustoClient {
V1QueryRunner(self.execute(database, query, QueryKind::Management))
}

pub(crate) fn prepare_request(&self, uri: Uri, http_method: http::Method) -> Request {
let mut request = Request::new(uri, http_method);
request.insert_headers(&Version::from(API_VERSION));
request.insert_headers(&Accept::from("application/json"));
request.insert_headers(&ContentType::new("application/json; charset=utf-8"));
request.insert_headers(&AcceptEncoding::from("gzip"));
request.insert_headers(&ClientVersion::from(format!(
"Kusto.Rust.Client:{}",
env!("CARGO_PKG_VERSION"),
)));
request
}

pub(crate) fn pipeline(&self) -> &Pipeline {
pub(crate) const fn pipeline(&self) -> &Pipeline {
&self.pipeline
}
}
Expand Down Expand Up @@ -183,7 +169,7 @@ impl<'a> TryFrom<ConnectionString<'a>> for KustoClient {
ConnectionString {
msi_auth: Some(true),
..
} => Arc::new(ImdsManagedIdentityCredential {}),
} => Arc::new(ImdsManagedIdentityCredential::default()),
ConnectionString {
az_cli: Some(true), ..
} => Arc::new(AzureCliCredential {}),
Expand Down
Loading

0 comments on commit 9e45fff

Please sign in to comment.