Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocking client #52

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ jobs:
strategy:
max-parallel: 1
matrix:
runtime: [Tokio, async-std]
runtime: [Tokio, async-std, blocking]
environment: [development, staging]
include:
- runtime: async-std
flags: --no-default-features --features async-std,default-tls
- runtime: blocking
flags: --no-default-features --features blocking,default-tls --tests
- environment: development
url: TESTING_DEV_API_URL
token: TESTING_DEV_TOKEN
Expand Down
33 changes: 22 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ include = ["src/**/*.rs", "README.md", "LICENSE-APACHE", "LICENSE-MIT"]
resolver = "2"

[dependencies]
reqwest = { version = "0.11", default-features = false, features = ["json", "stream", "gzip", "blocking"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = { version = "0.4", features = ["serde"] }
Expand All @@ -25,19 +24,26 @@ bytes = "1"
flate2 = "1"
http = "0.2"
backoff = { version = "0.4", features = ["futures"] }
futures = "0.3"
tokio = { version = "1", optional = true, features = ["rt", "sync"] }
async-std = { version = "1", optional = true, features = ["tokio1"] }
url = "2"
tracing = { version = "0.1" }
tokio-stream = "0.1"
bitflags = "2"
maybe-async = "0.2.7"

# sync packages
ureq = { version = "2.7.1", optional = true, features = ["json"] }

# async packages
futures = { version = "0.3", optional = true }
async-trait = { version = "0.1", optional = true }
tokio = { version = "1", optional = true, features = ["rt", "sync"] }
tokio-stream = { version = "0.1", optional = true }
async-std = { version = "1", optional = true, features = ["tokio1"] }
reqwest = { version = "0.11", optional = true, default-features = false, features = ["json", "stream", "gzip", "blocking"] }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
async-std = { version = "1", features = ["attributes"] }
serde_test = "1"
test-context = "0.1"
async-trait = "0.1"
futures-util = "0.3"
httpmock = "0.6"
Expand All @@ -46,8 +52,13 @@ tracing-subscriber = { version = "0.3", features = ["ansi", "env-filter"] }

[features]
default = ["tokio", "default-tls"]
tokio = ["backoff/tokio", "dep:tokio"]
async-std = ["backoff/async-std", "dep:async-std"]
default-tls = ["reqwest/default-tls"]
native-tls = ["reqwest/native-tls"]
rustls-tls = ["reqwest/rustls-tls"]
tokio = ["backoff/tokio", "dep:tokio", "futures", "async-trait", "tokio-stream", "reqwest"]
async-std = ["backoff/async-std", "dep:async-std", "futures", "async-trait", "tokio-stream", "reqwest"]
default-tls = ["reqwest/default-tls", "ureq/tls"]
native-tls = ["reqwest/native-tls", "ureq/native-tls"]
rustls-tls = ["reqwest/rustls-tls", "ureq/rustls"]
blocking = ["ureq", "maybe-async/is_sync"]

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ that can be enabled or disabled:
- **native-tls**: Enables TLS functionality provided by `native-tls`.
- **rustls-tls**: Enables TLS functionality provided by `rustls`.
- **tokio** _(enabled by default)_: Enables the usage with the `tokio` runtime.
- **async-std** : Enables the usage with the `async-std` runtime.
- **async-std**: Enables the usage with the `async-std` runtime.
- **blocking**: Provides a sync client.

## License

Expand Down
61 changes: 37 additions & 24 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
use async_std::task::spawn_blocking;
use bytes::Bytes;
use flate2::{write::GzEncoder, Compression};
#[cfg(not(feature = "blocking"))]
use futures::Stream;
use reqwest::header;
use http::header;
use maybe_async::{async_impl, maybe_async};
use serde::Serialize;
use std::{
env, fmt::Debug as FmtDebug, io::Write, result::Result as StdResult,
time::Duration as StdDuration,
};
#[cfg(not(feature = "blocking"))]
use std::time::Duration as StdDuration;
use std::{env, fmt::Debug as FmtDebug, io::Write};
#[cfg(feature = "tokio")]
use tokio::task::spawn_blocking;
#[cfg(not(feature = "blocking"))]
use tokio_stream::StreamExt;
use tracing::instrument;

Expand All @@ -21,7 +23,7 @@ use crate::{
LegacyQueryResult, Query, QueryOptions, QueryParams, QueryResult,
},
error::{Error, Result},
http::{self, HeaderMap},
http::{Client as HttpClient, HeaderMap},
is_personal_token, users,
};

Expand Down Expand Up @@ -53,7 +55,7 @@ static API_URL: &str = "https://api.axiom.co";
/// ```
#[derive(Debug, Clone)]
pub struct Client {
http_client: http::Client,
http_client: HttpClient,

url: String,
pub datasets: datasets::Client,
Expand All @@ -78,12 +80,13 @@ impl Client {
}

/// Get client version.
pub async fn version(&self) -> String {
pub fn version(&self) -> String {
env!("CARGO_PKG_VERSION").to_string()
}

/// Executes the given query specified using the Axiom Processing Language (APL).
/// To learn more about APL, see the APL documentation at https://www.axiom.co/docs/apl/introduction.
#[maybe_async]
#[instrument(skip(self, opts))]
pub async fn query<S, O>(&self, apl: S, opts: O) -> Result<QueryResult>
where
Expand Down Expand Up @@ -122,20 +125,17 @@ impl Client {
let res = self.http_client.post(path, &req).await?;

let saved_query_id = res
.headers()
.get("X-Axiom-History-Query-Id")
.map(|s| s.to_str())
.transpose()
.map_err(|_e| Error::InvalidQueryId)?
.get_header("X-Axiom-History-Query-Id")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a thing to be solved in this diff, but here's a Rustacean thought: How I'd like this to be a type that represents the id there.
Something like an

enum AxiomId {
  Nano(str),
  Fallback(uuid::Uuid)
} 

impl From<AxiomId> for String {} 

impl TryFrom<String> for AxiomId{}

I'm not a fan of passing strings around like it's Python 2 in 2010 :)

.map(|s| s.to_string());

let mut result = res.json::<QueryResult>().await?;
let mut result: QueryResult = res.json::<QueryResult>().await?;
result.saved_query_id = saved_query_id;

Ok(result)
}

/// Execute the given query on the dataset identified by its id.
#[maybe_async]
#[instrument(skip(self, opts))]
#[deprecated(
since = "0.6.0",
Expand All @@ -162,13 +162,9 @@ impl Client {
let res = self.http_client.post(path, &query).await?;

let saved_query_id = res
.headers()
.get("X-Axiom-History-Query-Id")
.map(|s| s.to_str())
.transpose()
.map_err(|_e| Error::InvalidQueryId)?
.get_header("X-Axiom-History-Query-Id")
.map(|s| s.to_string());
let mut result = res.json::<LegacyQueryResult>().await?;
let mut result: LegacyQueryResult = res.json::<LegacyQueryResult>().await?;
result.saved_query_id = saved_query_id;

Ok(result)
Expand All @@ -177,6 +173,7 @@ impl Client {
/// Ingest events into the dataset identified by its id.
/// Restrictions for field names (JSON object keys) can be reviewed here:
/// <https://www.axiom.co/docs/usage/field-restrictions>.
#[maybe_async]
#[instrument(skip(self, events))]
pub async fn ingest<N, I, E>(&self, dataset_name: N, events: I) -> Result<IngestStatus>
where
Expand All @@ -189,13 +186,24 @@ impl Client {
.map(|event| serde_json::to_vec(&event).map_err(Error::Serialize))
.collect();
let json_payload = json_lines?.join(&b"\n"[..]);

#[cfg(not(feature = "blocking"))]
let payload = spawn_blocking(move || {
let mut gzip_payload = GzEncoder::new(Vec::new(), Compression::default());
gzip_payload.write_all(&json_payload)?;
gzip_payload.finish()
})
.await;
#[cfg(feature = "tokio")]
#[cfg(feature = "blocking")]
let payload = {
let mut gzip_payload = GzEncoder::new(Vec::new(), Compression::default());
gzip_payload
.write_all(&json_payload)
.map_err(Error::Encoding)?;
gzip_payload.finish()
};

#[cfg(all(feature = "tokio", not(feature = "blocking")))]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I'd done this before, and I always considered it a code smell.
From the docs :

There are rare cases where features may be mutually incompatible with one another. This should be avoided if at all possible, because it requires coordinating all uses of the package in the dependency graph to cooperate to avoid enabling them together.

I'd like to see how the maybe_async approach works in practice to form a more informed opinion though. Initially, I think I'd find it weird that the function signatures changed under me by flipping a feature flag.
I'd also wonder about testing - typically tests make the most sense when run with --all-features, so this wonuld leave some paths untested.
Again, I'd like to see this in practice :)

let payload = payload.map_err(Error::JoinError)?;
let payload = payload.map_err(Error::Encoding)?;

Expand All @@ -211,6 +219,7 @@ impl Client {
/// Ingest data into the dataset identified by its id.
/// Restrictions for field names (JSON object keys) can be reviewed here:
/// <https://www.axiom.co/docs/usage/field-restrictions>.
#[maybe_async]
#[instrument(skip(self, payload))]
pub async fn ingest_bytes<N, P>(
&self,
Expand Down Expand Up @@ -243,6 +252,8 @@ impl Client {
/// with a backoff.
/// Restrictions for field names (JSON object keys) can be reviewed here:
/// <https://www.axiom.co/docs/usage/field-restrictions>.
#[async_impl]
#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async-std"))))]
#[instrument(skip(self, stream))]
pub async fn ingest_stream<N, S, E>(&self, dataset_name: N, stream: S) -> Result<IngestStatus>
where
Expand All @@ -261,6 +272,8 @@ impl Client {
}

/// Like [`Client::ingest_stream`], but takes a stream that contains results.
#[async_impl]
#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async-std"))))]
#[instrument(skip(self, stream))]
pub async fn try_ingest_stream<N, S, I, E>(
&self,
Expand All @@ -269,15 +282,15 @@ impl Client {
) -> Result<IngestStatus>
where
N: Into<String> + FmtDebug,
S: Stream<Item = StdResult<I, E>> + Send + Sync + 'static,
S: Stream<Item = std::result::Result<I, E>> + Send + Sync + 'static,
I: Serialize,
E: std::error::Error + Send + Sync + 'static,
{
let dataset_name = dataset_name.into();
let mut chunks = Box::pin(stream.chunks_timeout(1000, StdDuration::from_secs(1)));
let mut ingest_status = IngestStatus::default();
while let Some(events) = chunks.next().await {
let events: StdResult<Vec<I>, E> = events.into_iter().collect();
let events: std::result::Result<Vec<I>, E> = events.into_iter().collect();
match events {
Ok(events) => {
let new_ingest_status = self.ingest(dataset_name.clone(), events).await?;
Expand Down Expand Up @@ -367,7 +380,7 @@ impl Builder {
return Err(Error::MissingOrgId);
}

let http_client = http::Client::new(url.clone(), token, org_id)?;
let http_client = HttpClient::new(url.clone(), token, org_id)?;

Ok(Client {
http_client: http_client.clone(),
Expand Down
8 changes: 8 additions & 0 deletions src/datasets/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use maybe_async::maybe_async;
use std::{
convert::{TryFrom, TryInto},
fmt::Debug as FmtDebug,
Expand Down Expand Up @@ -27,6 +28,7 @@ impl Client {
}

/// Create a dataset with the given name and description.
#[maybe_async]
#[instrument(skip(self))]
pub async fn create<N, D>(&self, dataset_name: N, description: D) -> Result<Dataset>
where
Expand All @@ -45,6 +47,7 @@ impl Client {
}

/// Delete the dataset with the given ID.
#[maybe_async]
#[instrument(skip(self))]
pub async fn delete<N>(&self, dataset_name: N) -> Result<()>
where
Expand All @@ -56,6 +59,7 @@ impl Client {
}

/// Get a dataset by its id.
#[maybe_async]
#[instrument(skip(self))]
pub async fn get<N>(&self, dataset_name: N) -> Result<Dataset>
where
Expand All @@ -69,6 +73,7 @@ impl Client {
}

/// Retrieve the information of the dataset identified by its id.
#[maybe_async]
#[instrument(skip(self))]
#[deprecated(
since = "0.8.0",
Expand All @@ -86,6 +91,7 @@ impl Client {
}

/// List all available datasets.
#[maybe_async]
#[instrument(skip(self))]
pub async fn list(&self) -> Result<Vec<Dataset>> {
self.http_client.get("/v1/datasets").await?.json().await
Expand All @@ -96,6 +102,7 @@ impl Client {
/// Older ones will be deleted from the dataset.
/// The duration can either be a [`std::time::Duration`] or a
/// [`chrono::Duration`].
#[maybe_async]
#[instrument(skip(self))]
#[allow(deprecated)]
pub async fn trim<N, D>(&self, dataset_name: N, duration: D) -> Result<TrimResult>
Expand All @@ -113,6 +120,7 @@ impl Client {
}

/// Update a dataset.
#[maybe_async]
#[instrument(skip(self))]
pub async fn update<N, D>(&self, dataset_name: N, new_description: D) -> Result<Dataset>
where
Expand Down
29 changes: 26 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Error type definitions.

use serde::Deserialize;
use std::fmt;
use std::{fmt, io};
use thiserror::Error;

use crate::limits::Limits;
Expand All @@ -21,12 +21,21 @@ pub enum Error {
InvalidToken,
#[error("Invalid Org ID (make sure there are no invalid characters)")]
InvalidOrgId,
#[cfg(not(feature = "blocking"))]
#[error("Failed to setup HTTP client: {0}")]
HttpClientSetup(reqwest::Error),
#[cfg(not(feature = "blocking"))]
#[error("Failed to deserialize response: {0}")]
Deserialize(reqwest::Error),
#[cfg(feature = "blocking")]
#[error("Failed to deserialize response: {0}")]
Deserialize(io::Error),
#[cfg(not(feature = "blocking"))]
#[error("Http error: {0}")]
Http(reqwest::Error),
#[cfg(feature = "blocking")]
#[error("Http error: {0}")]
Http(ureq::Error),
#[error(transparent)]
Axiom(AxiomError),
#[error("Query ID contains invisible characters (this is a server error)")]
Expand All @@ -36,10 +45,10 @@ pub enum Error {
#[error(transparent)]
Serialize(#[from] serde_json::Error),
#[error("Failed to encode payload: {0}")]
Encoding(std::io::Error),
Encoding(io::Error),
#[error("Duration is out of range (can't be larger than i64::MAX milliseconds)")]
DurationOutOfRange,
#[cfg(feature = "tokio")]
#[cfg(all(feature = "tokio", not(feature = "blocking")))]
#[error("Failed to join thread: {0}")]
JoinError(tokio::task::JoinError),
#[error("Rate limit exceeded for the {scope} scope: {limits}")]
Expand All @@ -60,6 +69,7 @@ pub enum Error {

/// This is the manual implementation. We don't really care if the error is
/// permanent or transient at this stage so we just return Error::Http.
#[cfg(not(feature = "blocking"))]
impl From<backoff::Error<reqwest::Error>> for Error {
fn from(err: backoff::Error<reqwest::Error>) -> Self {
match err {
Expand All @@ -72,6 +82,19 @@ impl From<backoff::Error<reqwest::Error>> for Error {
}
}

#[cfg(feature = "blocking")]
impl From<backoff::Error<ureq::Error>> for Error {
fn from(err: backoff::Error<ureq::Error>) -> Self {
match err {
backoff::Error::Permanent(err) => Error::Http(err),
backoff::Error::Transient {
err,
retry_after: _,
} => Error::Http(err),
}
}
}

/// An error returned by the Axiom API.
#[derive(Deserialize, Debug)]
pub struct AxiomError {
Expand Down
Loading