Skip to content

Commit

Permalink
feat: add optional timeout param to pull and acknowledge
Browse files Browse the repository at this point in the history
  • Loading branch information
Heiko Seeberger committed Jan 2, 2022
1 parent 8ab8298 commit 842240e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 18 deletions.
4 changes: 2 additions & 2 deletions examples/transform_versioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn run() -> Result<(), Error> {
)?;

let msg_envelopes: Vec<Result<MessageEnvelope<Message>, Error>> = pub_sub_client
.pull_with_transform(SUBSCRIPTION, 42, transform)
.pull_with_transform(SUBSCRIPTION, 42, Some(Duration::from_secs(45)), transform)
.await?;

for msg_envelope in msg_envelopes {
Expand All @@ -40,7 +40,7 @@ async fn run() -> Result<(), Error> {
m.id, m.message, m.delivery_attempt
);
match pub_sub_client
.acknowledge(SUBSCRIPTION, vec![&m.ack_id])
.acknowledge(SUBSCRIPTION, vec![&m.ack_id], Some(Duration::from_secs(10)))
.await
{
Ok(_) => println!("Successfully acknowledged"),
Expand Down
39 changes: 27 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,26 @@ impl PubSubClient {
&self,
subscription_id: &str,
max_messages: u32,
timeout: Option<Duration>,
) -> Result<Vec<Result<MessageEnvelope<M>, Error>>, Error> {
self.pull_with_transform(subscription_id, max_messages, |_, value| Ok(value))
self.pull_with_transform(subscription_id, max_messages, timeout, |_, value| Ok(value))
.await
}

pub async fn pull_with_transform<M, T>(
&self,
subscription_id: &str,
max_messages: u32,
timeout: Option<Duration>,
transform: T,
) -> Result<Vec<Result<MessageEnvelope<M>, Error>>, Error>
where
M: DeserializeOwned,
T: Fn(&ReceivedMessage, Value) -> Result<Value, Error>,
{
let received_messages = self.pull_raw(subscription_id, max_messages).await?;
let received_messages = self
.pull_raw(subscription_id, max_messages, timeout)
.await?;
let messages = deserialize(received_messages, transform);
Ok(messages)
}
Expand All @@ -191,13 +195,14 @@ impl PubSubClient {
&self,
subscription_id: &str,
max_messages: u32,
timeout: Option<Duration>,
) -> Result<Vec<ReceivedMessage>, Error> {
let url = format!(
"{}/v1/projects/{}/subscriptions/{}:pull",
self.base_url, self.project_id, subscription_id
);
let request = PullRequest { max_messages };
let response = self.send_request(&url, &request).await?;
let response = self.send_request(&url, &request, timeout).await?;

if !response.status().is_success() {
return Err(Error::unexpected_http_status_code_from_response(response).await);
Expand All @@ -217,13 +222,14 @@ impl PubSubClient {
&self,
subscription_id: &str,
ack_ids: Vec<&str>,
timeout: Option<Duration>,
) -> Result<(), Error> {
let url = format!(
"{}/v1/projects/{}/subscriptions/{}:acknowledge",
self.base_url, self.project_id, subscription_id
);
let request = AcknowledgeRequest { ack_ids };
let response = self.send_request(&url, &request).await?;
let response = self.send_request(&url, &request, timeout).await?;

if !response.status().is_success() {
return Err(Error::unexpected_http_status_code_from_response(response).await);
Expand All @@ -232,17 +238,26 @@ impl PubSubClient {
Ok(())
}

async fn send_request<R: Serialize>(&self, url: &str, request: &R) -> Result<Response, Error> {
async fn send_request<R: Serialize>(
&self,
url: &str,
request: &R,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let token = self
.token_fetcher
.fetch_token()
.await
.map_err(|source| Error::TokenFetch { source })?;
let response = self

let request = self
.reqwest_client
.post(url)
.bearer_auth(token.access_token())
.json(request)
.json(request);
let request = timeout.into_iter().fold(request, |r, t| r.timeout(t));

let response = request
.send()
.await
.map_err(|source| Error::HttpServiceCommunication { source })?;
Expand Down Expand Up @@ -300,13 +315,13 @@ mod tests {
}

#[test]
fn new_ok() {
fn test_new_ok() {
let result = PubSubClient::new("tests/valid_key.json", Duration::from_secs(30));
assert!(result.is_ok());
}

#[test]
fn new_err_non_existent_key() {
fn test_new_err_non_existent_key() {
let result = PubSubClient::new("non_existent", Duration::from_secs(30));
assert!(result.is_err());
match result.unwrap_err() {
Expand All @@ -322,7 +337,7 @@ mod tests {
}

#[test]
fn new_err_invalid_key() {
fn test_new_err_invalid_key() {
let result = PubSubClient::new("Cargo.toml", Duration::from_secs(30));
assert!(result.is_err());
match result.unwrap_err() {
Expand All @@ -338,7 +353,7 @@ mod tests {
}

#[test]
fn new_err_invalid_private_key() {
fn test_new_err_invalid_private_key() {
let result = PubSubClient::new("tests/invalid_key.json", Duration::from_secs(30));
assert!(result.is_err());
match result.unwrap_err() {
Expand All @@ -351,7 +366,7 @@ mod tests {
}

#[test]
fn messages_from_pull_response_ok() {
fn test_deserialize_ok() {
let received_messages = vec![
ReceivedMessage {
ack_id: "ack_id".to_string(),
Expand Down
18 changes: 14 additions & 4 deletions tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ async fn test() {
let pub_sub_client = pub_sub_client.unwrap();

// Pull
let response = pub_sub_client.pull::<Message>(SUBSCRIPTION_ID, 42).await;
let response = pub_sub_client
.pull::<Message>(SUBSCRIPTION_ID, 42, Some(Duration::from_secs(45)))
.await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.len(), 3);
Expand Down Expand Up @@ -114,11 +116,15 @@ async fn test() {

// Acknowledge
let ack_ids = vec![ack_id_1, ack_id_2];
let response = pub_sub_client.acknowledge(SUBSCRIPTION_ID, ack_ids).await;
let response = pub_sub_client
.acknowledge(SUBSCRIPTION_ID, ack_ids, Some(Duration::from_secs(10)))
.await;
assert!(response.is_ok());

// Pull again
let response = pub_sub_client.pull::<Message>(SUBSCRIPTION_ID, 42).await;
let response = pub_sub_client
.pull::<Message>(SUBSCRIPTION_ID, 42, Some(Duration::from_secs(45)))
.await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.len(), 1);
Expand All @@ -128,7 +134,11 @@ async fn test() {

// Acknowledge with invalid ACK ID
let response = pub_sub_client
.acknowledge(SUBSCRIPTION_ID, vec!["invalid"])
.acknowledge(
SUBSCRIPTION_ID,
vec!["invalid"],
Some(Duration::from_secs(10)),
)
.await;
assert!(response.is_err());
}

0 comments on commit 842240e

Please sign in to comment.