From 5d05198cb4985d3e1b509f70967d9cd9dca374c9 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 22 May 2020 19:05:32 +0530 Subject: [PATCH 01/19] fix tokio dependency. Signed-off-by: Sandeep --- benches/benchmark.rs | 2 +- controller-client/src/cli.rs | 2 +- controller-client/src/lib.rs | 51 ++++--- controller-client/src/main.rs | 2 +- controller-client/src/test.rs | 4 +- integration_test/src/controller_tests.rs | 2 +- integration_test/src/disconnection_tests.rs | 4 +- .../src/event_stream_writer_tests.rs | 2 +- integration_test/src/tablemap_tests.rs | 2 +- integration_test/src/wirecommand_tests.rs | 130 +++++++++++------- src/client_factory.rs | 4 +- 11 files changed, 114 insertions(+), 91 deletions(-) diff --git a/benches/benchmark.rs b/benches/benchmark.rs index 51d157f4b..a6a5c5f1b 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -196,7 +196,7 @@ fn mock_connection_no_block(c: &mut Criterion) { async fn set_up(config: ClientConfig) -> EventStreamWriter { let scope_name = Scope::new("testWriterPerf".into()); let stream_name = Stream::new("testWriterPerf".into()); - let client_factory = ClientFactory::new(config.clone()); + let client_factory = ClientFactory::new(config.clone()).await; let controller_client = client_factory.get_controller_client(); create_scope_stream(controller_client, &scope_name, &stream_name, 1).await; let scoped_stream = ScopedStream { diff --git a/controller-client/src/cli.rs b/controller-client/src/cli.rs index 8d07b08c8..39e08deb2 100644 --- a/controller-client/src/cli.rs +++ b/controller-client/src/cli.rs @@ -78,7 +78,7 @@ fn main() { .build() .expect("creating config"); // create a controller client. - let controller_client = ControllerClientImpl::new(config); + let controller_client = rt.block_on(ControllerClientImpl::new(config)); match opt.cmd { Command::CreateScope { scope_name } => { let scope_result = rt.block_on(controller_client.create_scope(&Scope::new(scope_name))); diff --git a/controller-client/src/lib.rs b/controller-client/src/lib.rs index 4d1656d20..c6cf04180 100644 --- a/controller-client/src/lib.rs +++ b/controller-client/src/lib.rs @@ -229,7 +229,7 @@ pub struct ControllerClientImpl { channel: RwLock>, } -fn get_channel(config: &ClientConfig) -> Channel { +async fn get_channel(config: &ClientConfig) -> Channel { const HTTP_PREFIX: &str = "http://"; // Placeholder to add authentication headers. @@ -243,8 +243,7 @@ fn get_channel(config: &ClientConfig) -> Channel { let iterable_endpoints = (0..config.max_controller_connections).map(|_a| Channel::builder(uri_result.clone())); - - Channel::balance_list(iterable_endpoints) + async { Channel::balance_list(iterable_endpoints) }.await } #[allow(unused_variables)] @@ -275,7 +274,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -306,7 +305,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -336,7 +335,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -365,7 +364,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -394,7 +393,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -423,7 +422,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -453,7 +452,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -466,7 +465,7 @@ impl ControllerClient for ControllerClientImpl { let operation_name = "getCurrentSegments"; match op_status { Ok(segment_ranges) => Ok(StreamSegments::from(segment_ranges.into_inner())), - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -483,7 +482,7 @@ impl ControllerClient for ControllerClientImpl { let operation_name = "createTransaction"; match op_status { Ok(create_txn_response) => Ok(TxnSegments::from(create_txn_response.into_inner())), - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -535,7 +534,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -577,7 +576,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -613,7 +612,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -647,7 +646,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -659,7 +658,7 @@ impl ControllerClient for ControllerClientImpl { let operation_name = "get_endpoint"; match op_status { Ok(response) => Ok(response.into_inner()), - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } .map(PravegaNodeUri::from) } @@ -685,7 +684,7 @@ impl ControllerClient for ControllerClientImpl { let operation_name = "get_successors_segment"; match op_status { Ok(response) => Ok(response.into_inner()), - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } .map(StreamSegmentsWithPredecessors::from) } @@ -734,7 +733,7 @@ impl ControllerClient for ControllerClientImpl { }), } } - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -767,7 +766,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } } @@ -778,9 +777,9 @@ impl ControllerClientImpl { /// The requests will be load balanced across multiple connections and every connection supports /// multiplexing of requests. /// - pub fn new(config: ClientConfig) -> Self { + pub async fn new(config: ClientConfig) -> Self { // actual connection is established lazily. - let ch = get_channel(&config); + let ch = get_channel(&config).await; ControllerClientImpl { config, channel: RwLock::new(ControllerServiceClient::new(ch)), @@ -791,8 +790,8 @@ impl ControllerClientImpl { /// reset method needs to be invoked in the case of ConnectionError. /// This logic can be removed once https://github.com/tower-rs/tower/issues/383 is fixed. /// - pub fn reset(&self) { - let ch = get_channel(&self.config); + pub async fn reset(&self) { + let ch = get_channel(&self.config).await; let mut x = self.channel.write().unwrap(); *x = ControllerServiceClient::new(ch); } @@ -808,7 +807,7 @@ impl ControllerClientImpl { } // Method used to translate grpc errors to ControllerError. - fn map_grpc_error(&self, operation_name: &str, status: Status) -> ControllerError { + async fn map_grpc_error(&self, operation_name: &str, status: Status) -> ControllerError { match status.code() { Code::InvalidArgument | Code::NotFound @@ -822,7 +821,7 @@ impl ControllerClientImpl { error_msg: status.to_string(), }, Code::Unknown => { - self.reset(); + self.reset().await; ControllerError::ConnectionError { can_retry: true, error_msg: status.to_string(), diff --git a/controller-client/src/main.rs b/controller-client/src/main.rs index fb5237848..59257e0b7 100644 --- a/controller-client/src/main.rs +++ b/controller-client/src/main.rs @@ -24,7 +24,7 @@ async fn main() -> std::result::Result<(), Box> .build() .expect("creating config"); // start Pravega standalone before invoking this function. - let controller_client = ControllerClientImpl::new(config); + let controller_client = ControllerClientImpl::new(config).await; let scope_name = Scope::new("testScope123".into()); let stream_name = Stream::new("testStream".into()); diff --git a/controller-client/src/test.rs b/controller-client/src/test.rs index 9fe509c91..4d4e7484c 100644 --- a/controller-client/src/test.rs +++ b/controller-client/src/test.rs @@ -19,7 +19,7 @@ async fn test_create_scope_error() { .build() .expect("build client config"); - let client = ControllerClientImpl::new(config); + let client = ControllerClientImpl::new(config).await; let request = Scope::new("testScope124".into()); let create_scope_result = client.create_scope(&request).await; @@ -40,7 +40,7 @@ async fn test_create_stream_error() { .controller_uri("127.0.0.1:9090".parse::().unwrap()) .build() .expect("build client config"); - let client = ControllerClientImpl::new(config); + let client = ControllerClientImpl::new(config).await; let request = StreamConfiguration { scoped_stream: ScopedStream { diff --git a/integration_test/src/controller_tests.rs b/integration_test/src/controller_tests.rs index 9e545b438..f156234e6 100644 --- a/integration_test/src/controller_tests.rs +++ b/integration_test/src/controller_tests.rs @@ -19,7 +19,7 @@ pub async fn test_controller_apis() { .controller_uri(TEST_CONTROLLER_URI) .build() .expect("creating config"); - let client_factory = ClientFactory::new(config.clone()); + let client_factory = ClientFactory::new(config.clone()).await; let controller = client_factory.get_controller_client(); let scope_name = Scope::new("testScope123".into()); diff --git a/integration_test/src/disconnection_tests.rs b/integration_test/src/disconnection_tests.rs index 42544266b..46cdf2762 100644 --- a/integration_test/src/disconnection_tests.rs +++ b/integration_test/src/disconnection_tests.rs @@ -86,7 +86,7 @@ async fn test_retry_while_start_pravega() { .controller_uri(controller_uri) .build() .expect("build client config"); - let controller_client = ControllerClientImpl::new(config); + let controller_client = ControllerClientImpl::new(config).await; let scope_name = Scope::new("retryScope".into()); @@ -143,7 +143,7 @@ async fn test_retry_with_unexpected_reply() { .build() .expect("build client config"); - let controller_client = ControllerClientImpl::new(config); + let controller_client = ControllerClientImpl::new(config).await; //Get the endpoint. let segment_name = ScopedSegment { diff --git a/integration_test/src/event_stream_writer_tests.rs b/integration_test/src/event_stream_writer_tests.rs index f17da29d5..38bc5043d 100644 --- a/integration_test/src/event_stream_writer_tests.rs +++ b/integration_test/src/event_stream_writer_tests.rs @@ -36,7 +36,7 @@ pub async fn test_event_stream_writer() { .controller_uri(TEST_CONTROLLER_URI) .build() .expect("creating config"); - let client_factory = ClientFactory::new(config.clone()); + let client_factory = ClientFactory::new(config.clone()).await; let controller_client = client_factory.get_controller_client(); create_scope_stream(controller_client, &scope_name, &stream_name, 1).await; diff --git a/integration_test/src/tablemap_tests.rs b/integration_test/src/tablemap_tests.rs index ae63a1da4..502c4dcec 100644 --- a/integration_test/src/tablemap_tests.rs +++ b/integration_test/src/tablemap_tests.rs @@ -27,7 +27,7 @@ pub async fn test_tablemap() { .build() .expect("creating config"); - let client_factory = ClientFactory::new(config.clone()); + let client_factory = ClientFactory::new(config.clone()).await; test_single_key_operations(&client_factory).await; test_multiple_key_operations(&client_factory).await; test_multiple_key_remove_operations(&client_factory).await; diff --git a/integration_test/src/wirecommand_tests.rs b/integration_test/src/wirecommand_tests.rs index 56950ce01..0ac01c310 100644 --- a/integration_test/src/wirecommand_tests.rs +++ b/integration_test/src/wirecommand_tests.rs @@ -39,61 +39,85 @@ lazy_static! { let manager = SegmentConnectionManager::new(cf, CONFIG.max_connections_in_pool); ConnectionPool::new(manager) }; - static ref CONTROLLER_CLIENT: ControllerClientImpl = { ControllerClientImpl::new(CONFIG.clone()) }; } pub async fn wirecommand_test_wrapper() { - let timeout_second = time::Duration::from_secs(30); + let controller: ControllerClientImpl = ControllerClientImpl::new(CONFIG.clone()).await; - timeout(timeout_second, test_hello()).await.unwrap(); + let timeout_second = time::Duration::from_secs(30); - timeout(timeout_second, test_keep_alive()).await.unwrap(); + timeout(timeout_second, test_hello(&controller)).await.unwrap(); - timeout(timeout_second, test_setup_append()).await.unwrap(); + timeout(timeout_second, test_keep_alive(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_create_segment()).await.unwrap(); + timeout(timeout_second, test_setup_append(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_update_and_get_segment_attribute()) + timeout(timeout_second, test_create_segment(&controller)) .await .unwrap(); - timeout(timeout_second, test_get_stream_segment_info()) + timeout(timeout_second, test_update_and_get_segment_attribute(&controller)) .await .unwrap(); - timeout(timeout_second, test_seal_segment()).await.unwrap(); + timeout(timeout_second, test_get_stream_segment_info(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_delete_segment()).await.unwrap(); + timeout(timeout_second, test_seal_segment(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_conditional_append_and_read_segment()) + timeout(timeout_second, test_delete_segment(&controller)) .await .unwrap(); - timeout(timeout_second, test_update_segment_policy()) + timeout( + timeout_second, + test_conditional_append_and_read_segment(&controller), + ) + .await + .unwrap(); + + timeout(timeout_second, test_update_segment_policy(&controller)) .await .unwrap(); - timeout(timeout_second, test_merge_segment()).await.unwrap(); + timeout(timeout_second, test_merge_segment(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_truncate_segment()).await.unwrap(); + timeout(timeout_second, test_truncate_segment(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_update_table_entries()) + timeout(timeout_second, test_update_table_entries(&controller)) .await .unwrap(); - timeout(timeout_second, test_read_table_key()).await.unwrap(); + timeout(timeout_second, test_read_table_key(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_read_table()).await.unwrap(); + timeout(timeout_second, test_read_table(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_read_table_entries()).await.unwrap(); + timeout(timeout_second, test_read_table_entries(&controller)) + .await + .unwrap(); } -async fn test_hello() { +async fn test_hello(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); // Create scope and stream - CONTROLLER_CLIENT + controller_client .create_scope(&scope_name) .await .expect("create scope"); @@ -114,7 +138,7 @@ async fn test_hello() { retention_param: 0, }, }; - CONTROLLER_CLIENT + controller_client .create_stream(&request) .await .expect("create stream"); @@ -124,7 +148,7 @@ async fn test_hello() { stream: stream_name.clone(), segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -150,7 +174,7 @@ async fn test_hello() { } // KeepAlive would not send back reply. -async fn test_keep_alive() { +async fn test_keep_alive(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -159,7 +183,7 @@ async fn test_keep_alive() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -175,7 +199,7 @@ async fn test_keep_alive() { client_connection.write(&request).await.expect("send request"); } -async fn test_setup_append() { +async fn test_setup_append(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -184,7 +208,7 @@ async fn test_setup_append() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -238,7 +262,7 @@ async fn test_setup_append() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_create_segment() { +async fn test_create_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -247,7 +271,7 @@ async fn test_create_segment() { segment: Segment { number: 1 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -290,7 +314,7 @@ async fn test_create_segment() { ); } -async fn test_seal_segment() { +async fn test_seal_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -299,7 +323,7 @@ async fn test_seal_segment() { segment: Segment { number: 1 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -325,7 +349,7 @@ async fn test_seal_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_update_and_get_segment_attribute() { +async fn test_update_and_get_segment_attribute(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -334,7 +358,7 @@ async fn test_update_and_get_segment_attribute() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -379,7 +403,7 @@ async fn test_update_and_get_segment_attribute() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_get_stream_segment_info() { +async fn test_get_stream_segment_info(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let stream = ScopedStream { @@ -388,7 +412,7 @@ async fn test_get_stream_segment_info() { }; //seal this stream. - CONTROLLER_CLIENT.seal_stream(&stream).await.expect("seal stream"); + controller_client.seal_stream(&stream).await.expect("seal stream"); let segment_name = ScopedSegment { scope: scope_name.clone(), @@ -396,7 +420,7 @@ async fn test_get_stream_segment_info() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -422,7 +446,7 @@ async fn test_get_stream_segment_info() { } } -async fn test_delete_segment() { +async fn test_delete_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -431,7 +455,7 @@ async fn test_delete_segment() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -455,7 +479,7 @@ async fn test_delete_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_conditional_append_and_read_segment() { +async fn test_conditional_append_and_read_segment(controller_client: &ControllerClientImpl) { // create a segment. let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); @@ -466,7 +490,7 @@ async fn test_conditional_append_and_read_segment() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -541,7 +565,7 @@ async fn test_conditional_append_and_read_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_update_segment_policy() { +async fn test_update_segment_policy(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); @@ -551,7 +575,7 @@ async fn test_update_segment_policy() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -578,7 +602,7 @@ async fn test_update_segment_policy() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_merge_segment() { +async fn test_merge_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); @@ -588,7 +612,7 @@ async fn test_merge_segment() { segment: Segment { number: 1 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -668,7 +692,7 @@ async fn test_merge_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_truncate_segment() { +async fn test_truncate_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); @@ -678,7 +702,7 @@ async fn test_truncate_segment() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -705,7 +729,7 @@ async fn test_truncate_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_update_table_entries() { +async fn test_update_table_entries(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); // create a new segment. @@ -715,7 +739,7 @@ async fn test_update_table_entries() { segment: Segment { number: 2 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -811,7 +835,7 @@ async fn test_update_table_entries() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_read_table_key() { +async fn test_read_table_key(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); let segment_name = ScopedSegment { @@ -820,7 +844,7 @@ async fn test_read_table_key() { segment: Segment { number: 2 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -851,7 +875,7 @@ async fn test_read_table_key() { } } -async fn test_read_table() { +async fn test_read_table(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); let segment_name = ScopedSegment { @@ -860,7 +884,7 @@ async fn test_read_table() { segment: Segment { number: 2 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -903,7 +927,7 @@ async fn test_read_table() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_read_table_entries() { +async fn test_read_table_entries(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); let segment_name = ScopedSegment { @@ -912,7 +936,7 @@ async fn test_read_table_entries() { segment: Segment { number: 2 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") diff --git a/src/client_factory.rs b/src/client_factory.rs index 3fdb8dd6e..dfb47b9fd 100644 --- a/src/client_factory.rs +++ b/src/client_factory.rs @@ -32,14 +32,14 @@ pub struct ClientFactoryInternal { } impl ClientFactory { - pub fn new(config: ClientConfig) -> ClientFactory { + pub async fn new(config: ClientConfig) -> ClientFactory { let _ = setup_logger(); //Ignore failure let cf = ConnectionFactory::create(config.connection_type); let pool = ConnectionPool::new(SegmentConnectionManager::new(cf, config.max_connections_in_pool)); let controller = if config.mock { Box::new(MockController::new(config.controller_uri)) as Box } else { - Box::new(ControllerClientImpl::new(config)) as Box + Box::new(ControllerClientImpl::new(config).await) as Box }; ClientFactory(Arc::new(ClientFactoryInternal { connection_pool: pool, From 1faeaa21106c2681af8ac00207577fa328c4975c Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 22 May 2020 20:23:01 +0530 Subject: [PATCH 02/19] pyo3 requires nightly. Signed-off-by: Sandeep --- rust-toolchain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain b/rust-toolchain index b07d35aa3..c1a4c6bb4 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1,2 +1,2 @@ -stable +nightly From ab9b055a5010e4c66caaa4c4d7859ae097c33384 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 22 May 2020 20:47:22 +0530 Subject: [PATCH 03/19] Add pyo3 dependency. Signed-off-by: Sandeep --- Cargo.toml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 687fba448..4d9febae2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2018" categories = ["Network programming"] keywords = ["streaming", "client", "pravega"] -readme = "Readme.md" +readme = "README.md" repository = "https://github.com/pravega/pravega-client-rust" license = "Apache-2.0" description = "A Rust client for Pravega. (Pravega.io)" @@ -52,6 +52,14 @@ ordered-float = { version= "1.0.2", features = ["serde"]} criterion = "0.3" byteorder = "1.3" +[lib] +name = "pravega_client_rust" +crate-type = ["cdylib"] + +[dependencies.pyo3] +version = "0.10.1" +features = ["extension-module"] + [[bin]] name = "server-cli" path = "src/cli.rs" From daa2c59b46bc770ba1d9fad3f99b49a4495e6a6a Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 22 May 2020 20:54:21 +0530 Subject: [PATCH 04/19] ensure rlib is also generated for the cli. Signed-off-by: Sandeep --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4d9febae2..a8a1b87d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ byteorder = "1.3" [lib] name = "pravega_client_rust" -crate-type = ["cdylib"] +crate-type = ["cdylib", "rlib"] [dependencies.pyo3] version = "0.10.1" From f13212540a547d6f667690f5198d47d3f779bf87 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Sun, 24 May 2020 12:20:18 +0530 Subject: [PATCH 05/19] Fix cdylib issue . Signed-off-by: Sandeep --- Cargo.toml | 10 +----- bindings/Cargo.toml | 17 +++++++++ bindings/pyproject.toml | 3 ++ bindings/src/lib.rs | 9 +++++ bindings/src/python_binding/mod.rs | 27 ++++++++++++++ bindings/src/python_binding/stream_manager.rs | 35 +++++++++++++++++++ bindings/tox.ini | 12 +++++++ 7 files changed, 104 insertions(+), 9 deletions(-) create mode 100644 bindings/Cargo.toml create mode 100644 bindings/pyproject.toml create mode 100644 bindings/src/lib.rs create mode 100644 bindings/src/python_binding/mod.rs create mode 100644 bindings/src/python_binding/stream_manager.rs create mode 100644 bindings/tox.ini diff --git a/Cargo.toml b/Cargo.toml index a8a1b87d1..9d3119806 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ authors = ["Tom Kaitchuck ", "Wenqi Mou "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +#name = "pravega_client_rust" +#crate-type = ["rlib","cdylib",] +crate-type = ["cdylib"] +[dependencies.pyo3] +version = "0.10.1" +features = ["extension-module"] + +[dependencies] diff --git a/bindings/pyproject.toml b/bindings/pyproject.toml new file mode 100644 index 000000000..90cb176e8 --- /dev/null +++ b/bindings/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["maturin"] +build-backend = "maturin" diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs new file mode 100644 index 000000000..9f607025c --- /dev/null +++ b/bindings/src/lib.rs @@ -0,0 +1,9 @@ +mod python_binding; + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +} diff --git a/bindings/src/python_binding/mod.rs b/bindings/src/python_binding/mod.rs new file mode 100644 index 000000000..5127e5f84 --- /dev/null +++ b/bindings/src/python_binding/mod.rs @@ -0,0 +1,27 @@ +// +// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + +use pyo3::prelude::*; +use pyo3::wrap_pyfunction; +// mod stream_manager; +// use stream_manager::StreamManager; +/// Formats the sum of two numbers as string. +#[pyfunction] +fn sum_as_string(a: usize, b: usize) -> PyResult { + Ok((a + b).to_string()) +} + +#[pymodule] +/// A Python module implemented in Rust. +fn bindings(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_wrapped(wrap_pyfunction!(sum_as_string))?; + //m.add_class::()?; + Ok(()) +} diff --git a/bindings/src/python_binding/stream_manager.rs b/bindings/src/python_binding/stream_manager.rs new file mode 100644 index 000000000..2b3b54044 --- /dev/null +++ b/bindings/src/python_binding/stream_manager.rs @@ -0,0 +1,35 @@ +// +// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + +use pyo3::prelude::*; + +#[pyclass] +pub(crate) struct StreamManager { + num: i32, + debug: bool, + controller_ip: String, +} + +#[pymethods] +impl StreamManager { + #[new] + fn new(num: i32) -> Self { + StreamManager { + num, + debug: false, + controller_ip: "localhost:9090".to_string(), + } + } + + pub fn create_scope(&self, scope: &str) -> PyResult { + println!("creating scope {:?}", scope); + Ok(self.num as usize) + } +} diff --git a/bindings/tox.ini b/bindings/tox.ini new file mode 100644 index 000000000..4439063d0 --- /dev/null +++ b/bindings/tox.ini @@ -0,0 +1,12 @@ +[tox] +envlist = py36, py37 +requires = tox-pyo3 + +[testenv] +pyo3 = True +deps = + pytest +commands = + pytest -vvvv + +skip_install = True From 603299bf77c3c6377b2afdb82d06896dfa98ca4a Mon Sep 17 00:00:00 2001 From: Sandeep Date: Mon, 25 May 2020 12:53:08 +0530 Subject: [PATCH 06/19] First version of the python apis. Signed-off-by: Sandeep --- bindings/Cargo.toml | 14 +++- bindings/src/python_binding/mod.rs | 8 +- bindings/src/python_binding/stream_manager.rs | 78 +++++++++++++++++-- 3 files changed, 86 insertions(+), 14 deletions(-) diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml index 1b7ab6af4..b1974c8e5 100644 --- a/bindings/Cargo.toml +++ b/bindings/Cargo.toml @@ -7,11 +7,21 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] -#name = "pravega_client_rust" -#crate-type = ["rlib","cdylib",] +name = "pravega_client" crate-type = ["cdylib"] [dependencies.pyo3] version = "0.10.1" features = ["extension-module"] [dependencies] +log = "0.4" +pravega-client-rust = { path = "../" } +pravega-wire-protocol = { path = "../wire_protocol"} +pravega-controller-client = { path = "../controller-client"} +pravega-rust-client-shared = { path = "../shared"} +pravega-rust-client-retry = {path = "../retry"} +pravega-connection-pool = {path= "../connection_pool"} +tokio = { version = "0.2.13", features = ["full"] } +lazy_static = "1.4.0" +uuid = {version = "0.8", features = ["v4"]} +futures = "0.3.5" diff --git a/bindings/src/python_binding/mod.rs b/bindings/src/python_binding/mod.rs index 5127e5f84..d7f4e426a 100644 --- a/bindings/src/python_binding/mod.rs +++ b/bindings/src/python_binding/mod.rs @@ -10,8 +10,8 @@ use pyo3::prelude::*; use pyo3::wrap_pyfunction; -// mod stream_manager; -// use stream_manager::StreamManager; +mod stream_manager; +use stream_manager::StreamManager; /// Formats the sum of two numbers as string. #[pyfunction] fn sum_as_string(a: usize, b: usize) -> PyResult { @@ -20,8 +20,8 @@ fn sum_as_string(a: usize, b: usize) -> PyResult { #[pymodule] /// A Python module implemented in Rust. -fn bindings(_py: Python, m: &PyModule) -> PyResult<()> { +fn pravega_client(_py: Python, m: &PyModule) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(sum_as_string))?; - //m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/bindings/src/python_binding/stream_manager.rs b/bindings/src/python_binding/stream_manager.rs index 2b3b54044..929ac80f2 100644 --- a/bindings/src/python_binding/stream_manager.rs +++ b/bindings/src/python_binding/stream_manager.rs @@ -8,28 +8,90 @@ // http://www.apache.org/licenses/LICENSE-2.0 // +use pravega_client_rust::client_factory::ClientFactory; +use pravega_rust_client_shared::*; +use pravega_wire_protocol::client_config::{ClientConfigBuilder, TEST_CONTROLLER_URI}; +use pyo3::exceptions; use pyo3::prelude::*; +use pyo3::PyResult; +use std::net::SocketAddr; +use tokio::runtime::Runtime; #[pyclass] pub(crate) struct StreamManager { - num: i32, - debug: bool, controller_ip: String, + rt: Runtime, + cf: ClientFactory, } #[pymethods] impl StreamManager { #[new] - fn new(num: i32) -> Self { + fn new(controller_uri: String) -> Self { + let runtime = tokio::runtime::Runtime::new().expect("create runtime"); + let handle = runtime.handle().clone(); + let config = ClientConfigBuilder::default() + .controller_uri( + controller_uri + .parse::() + .expect("Parsing controller ip"), + ) + .build() + .expect("creating config"); + let client_factory = handle.block_on(ClientFactory::new(config.clone())); + StreamManager { - num, - debug: false, - controller_ip: "localhost:9090".to_string(), + controller_ip: controller_uri, + rt: runtime, + cf: client_factory, } } - pub fn create_scope(&self, scope: &str) -> PyResult { + pub fn create_scope(&self, scope: &str) -> PyResult { + let handle = self.rt.handle().clone(); println!("creating scope {:?}", scope); - Ok(self.num as usize) + + let controller = self.cf.get_controller_client(); + let scope_name = Scope::new(scope.to_string()); + // let stream_name = Stream::new("testStream".into()); + + let scope_result = handle.block_on(controller.create_scope(&scope_name)); + println!("Scope Creation {:?}", scope_result); + match scope_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + pub fn create_stream(&self, scope: &str, stream: &str, initial_segments: i32) -> PyResult { + let handle = self.rt.handle().clone(); + println!( + "creating stream {:?} under scope {:?} with segment count {:?}", + stream, scope, initial_segments + ); + let stream_cfg = StreamConfiguration { + scoped_stream: ScopedStream { + scope: Scope::new(scope.to_string()), + stream: Stream::new(stream.to_string()), + }, + scaling: Scaling { + scale_type: ScaleType::FixedNumSegments, + target_rate: 0, + scale_factor: 0, + min_num_segments: initial_segments, + }, + retention: Retention { + retention_type: RetentionType::None, + retention_param: 0, + }, + }; + let controller = self.cf.get_controller_client(); + + let stream_result = handle.block_on(controller.create_stream(&stream_cfg)); + println!("Stream creation result {:?}", stream_result); + match stream_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } } } From 40bd4eb88a57ad16abfc33ca3d9a80509261c934 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 26 May 2020 13:10:44 +0530 Subject: [PATCH 07/19] First version of event writer. Improve StreamManager apis. Signed-off-by: Sandeep --- bindings/Cargo.toml | 11 +- bindings/Readme.md | 3 + bindings/src/lib.rs | 2 + bindings/src/python_binding/mod.rs | 4 + bindings/src/python_binding/stream_manager.rs | 117 ++++++++++++++++-- bindings/src/python_binding/stream_writer.rs | 61 +++++++++ 6 files changed, 184 insertions(+), 14 deletions(-) create mode 100644 bindings/Readme.md create mode 100644 bindings/src/python_binding/stream_writer.rs diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml index b1974c8e5..a9fe6ec35 100644 --- a/bindings/Cargo.toml +++ b/bindings/Cargo.toml @@ -1,14 +1,22 @@ [package] name = "bindings" version = "0.1.0" -authors = ["Sandeep "] edition = "2018" +categories = ["Network programming"] +keywords = ["streaming", "client", "pravega"] +readme = "Readme.md" +repository = "https://github.com/pravega/pravega-client-rust" +license = "Apache-2.0" +description = "An internal library used by the Rust client for Pravega to generated language bindings for Python and WASM." +authors = ["Tom Kaitchuck ", "Wenqi Mou ", + "Sandeep Shridhar ", "Wenxiao Zhang "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] name = "pravega_client" crate-type = ["cdylib"] + [dependencies.pyo3] version = "0.10.1" features = ["extension-module"] @@ -25,3 +33,4 @@ tokio = { version = "0.2.13", features = ["full"] } lazy_static = "1.4.0" uuid = {version = "0.8", features = ["v4"]} futures = "0.3.5" +derive-new = "0.5" diff --git a/bindings/Readme.md b/bindings/Readme.md new file mode 100644 index 000000000..b9ba89bb9 --- /dev/null +++ b/bindings/Readme.md @@ -0,0 +1,3 @@ +# Language Bindings + +This provides a way to generate multiple language bindings to interact with Pravega. \ No newline at end of file diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs index 9f607025c..58f3472ab 100644 --- a/bindings/src/lib.rs +++ b/bindings/src/lib.rs @@ -1,4 +1,6 @@ mod python_binding; +#[macro_use] +extern crate derive_new; #[cfg(test)] mod tests { diff --git a/bindings/src/python_binding/mod.rs b/bindings/src/python_binding/mod.rs index d7f4e426a..f012563e9 100644 --- a/bindings/src/python_binding/mod.rs +++ b/bindings/src/python_binding/mod.rs @@ -11,7 +11,10 @@ use pyo3::prelude::*; use pyo3::wrap_pyfunction; mod stream_manager; +mod stream_writer; use stream_manager::StreamManager; +use stream_writer::StreamWriter; + /// Formats the sum of two numbers as string. #[pyfunction] fn sum_as_string(a: usize, b: usize) -> PyResult { @@ -23,5 +26,6 @@ fn sum_as_string(a: usize, b: usize) -> PyResult { fn pravega_client(_py: Python, m: &PyModule) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(sum_as_string))?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/bindings/src/python_binding/stream_manager.rs b/bindings/src/python_binding/stream_manager.rs index 929ac80f2..e201ca0cd 100644 --- a/bindings/src/python_binding/stream_manager.rs +++ b/bindings/src/python_binding/stream_manager.rs @@ -8,9 +8,10 @@ // http://www.apache.org/licenses/LICENSE-2.0 // +use crate::python_binding::stream_writer::StreamWriter; use pravega_client_rust::client_factory::ClientFactory; use pravega_rust_client_shared::*; -use pravega_wire_protocol::client_config::{ClientConfigBuilder, TEST_CONTROLLER_URI}; +use pravega_wire_protocol::client_config::{ClientConfig, ClientConfigBuilder}; use pyo3::exceptions; use pyo3::prelude::*; use pyo3::PyResult; @@ -19,9 +20,10 @@ use tokio::runtime::Runtime; #[pyclass] pub(crate) struct StreamManager { - controller_ip: String, + _controller_ip: String, rt: Runtime, cf: ClientFactory, + config: ClientConfig, } #[pymethods] @@ -41,38 +43,68 @@ impl StreamManager { let client_factory = handle.block_on(ClientFactory::new(config.clone())); StreamManager { - controller_ip: controller_uri, + _controller_ip: controller_uri, rt: runtime, cf: client_factory, + config, } } - pub fn create_scope(&self, scope: &str) -> PyResult { + /// + /// Create a Scope in Pravega. + /// + pub fn create_scope(&self, scope_name: &str) -> PyResult { let handle = self.rt.handle().clone(); - println!("creating scope {:?}", scope); + println!("creating scope {:?}", scope_name); let controller = self.cf.get_controller_client(); - let scope_name = Scope::new(scope.to_string()); - // let stream_name = Stream::new("testStream".into()); + let scope_name = Scope::new(scope_name.to_string()); let scope_result = handle.block_on(controller.create_scope(&scope_name)); - println!("Scope Creation {:?}", scope_result); + println!("Scope creation status {:?}", scope_result); match scope_result { Ok(t) => Ok(t), Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), } } - pub fn create_stream(&self, scope: &str, stream: &str, initial_segments: i32) -> PyResult { + /// + /// Delete a Scope in Pravega. + /// + pub fn delete_scope(&self, scope_name: &str) -> PyResult { + let handle = self.rt.handle().clone(); + println!("Delete scope {:?}", scope_name); + + let controller = self.cf.get_controller_client(); + let scope_name = Scope::new(scope_name.to_string()); + // let stream_name = Stream::new("testStream".into()); + + let scope_result = handle.block_on(controller.delete_scope(&scope_name)); + println!("Scope deletion status {:?}", scope_result); + match scope_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Create a Stream in Pravega. + /// + pub fn create_stream( + &self, + scope_name: &str, + stream_name: &str, + initial_segments: i32, + ) -> PyResult { let handle = self.rt.handle().clone(); println!( "creating stream {:?} under scope {:?} with segment count {:?}", - stream, scope, initial_segments + stream_name, scope_name, initial_segments ); let stream_cfg = StreamConfiguration { scoped_stream: ScopedStream { - scope: Scope::new(scope.to_string()), - stream: Stream::new(stream.to_string()), + scope: Scope::new(scope_name.to_string()), + stream: Stream::new(stream_name.to_string()), }, scaling: Scaling { scale_type: ScaleType::FixedNumSegments, @@ -88,10 +120,69 @@ impl StreamManager { let controller = self.cf.get_controller_client(); let stream_result = handle.block_on(controller.create_stream(&stream_cfg)); - println!("Stream creation result {:?}", stream_result); + println!("Stream creation status {:?}", stream_result); match stream_result { Ok(t) => Ok(t), Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), } } + + /// + /// Create a Stream in Pravega. + /// + pub fn seal_stream(&self, scope_name: &str, stream_name: &str) -> PyResult { + let handle = self.rt.handle().clone(); + println!("Sealing stream {:?} under scope {:?} ", stream_name, scope_name); + let scoped_stream = ScopedStream { + scope: Scope::new(scope_name.to_string()), + stream: Stream::new(stream_name.to_string()), + }; + + let controller = self.cf.get_controller_client(); + + let stream_result = handle.block_on(controller.seal_stream(&scoped_stream)); + println!("Sealing stream status {:?}", stream_result); + match stream_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Delete a Stream in Pravega. + /// + pub fn delete_stream(&self, scope_name: &str, stream_name: &str) -> PyResult { + let handle = self.rt.handle().clone(); + println!("Deleting stream {:?} under scope {:?} ", stream_name, scope_name); + let scoped_stream = ScopedStream { + scope: Scope::new(scope_name.to_string()), + stream: Stream::new(stream_name.to_string()), + }; + + let controller = self.cf.get_controller_client(); + let stream_result = handle.block_on(controller.delete_stream(&scoped_stream)); + println!("Deleting stream status {:?}", stream_result); + match stream_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Create a Writer for a given Stream. + /// + pub fn create_writer(&self, scope_name: &str, stream_name: &str) -> PyResult { + let scoped_stream = ScopedStream { + scope: Scope::new(scope_name.to_string()), + stream: Stream::new(stream_name.to_string()), + }; + let stream_writer = self.rt.handle().clone().block_on(async { + StreamWriter::new( + self.cf + .create_event_stream_writer(scoped_stream, self.config.clone()), + self.rt.handle().clone(), + ) + }); + Ok(stream_writer) + } } diff --git a/bindings/src/python_binding/stream_writer.rs b/bindings/src/python_binding/stream_writer.rs new file mode 100644 index 000000000..e6d43562e --- /dev/null +++ b/bindings/src/python_binding/stream_writer.rs @@ -0,0 +1,61 @@ +// +// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + +use pravega_client_rust::error::EventStreamWriterError; +use pravega_client_rust::event_stream_writer::EventStreamWriter; +use pyo3::exceptions; +use pyo3::prelude::*; +use pyo3::PyResult; +use tokio::runtime::Handle; + +#[pyclass] +#[derive(new)] // this ensures the python object cannot be created without the using StreamManager. +pub(crate) struct StreamWriter { + writer: EventStreamWriter, + handle: Handle, +} + +#[pymethods] +impl StreamWriter { + /// + /// Write an event to the Stream. The operation blocks until the write operations is completed. + /// + pub fn write_event(&mut self, event: String) -> PyResult<()> { + println!("Writing a single event"); + let result = self.handle.block_on(self.writer.write_event(event.into_bytes())); + let result_oneshot: Result<(), EventStreamWriterError> = + self.handle.block_on(result).expect("Write failed"); + + match result_oneshot { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Write an event to the Stream given a routing key. + /// + pub fn write_event_by_routing_key(&mut self, event: String, routing_key: String) -> PyResult<()> { + println!("Writing a single event for a given routing key"); + let result = self.handle.block_on( + self.writer + .write_event_by_routing_key(routing_key, event.into_bytes()), + ); + let result_oneshot: Result<(), EventStreamWriterError> = self + .handle + .block_on(result) + .expect("Write for specified routing key failed"); + + match result_oneshot { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } +} From ad60eccff84a1427ddcae751a5aed782b18c7fb0 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 26 May 2020 16:53:08 +0530 Subject: [PATCH 08/19] Add test python file. Signed-off-by: Sandeep --- bindings/src/lib.rs | 18 ++++++++------- bindings/src/pravega_client_test.py | 36 +++++++++++++++++++++++++++++ bindings/tox.ini | 4 ++-- 3 files changed, 48 insertions(+), 10 deletions(-) create mode 100644 bindings/src/pravega_client_test.py diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs index 58f3472ab..502f25391 100644 --- a/bindings/src/lib.rs +++ b/bindings/src/lib.rs @@ -1,11 +1,13 @@ +// +// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + mod python_binding; #[macro_use] extern crate derive_new; - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/bindings/src/pravega_client_test.py b/bindings/src/pravega_client_test.py new file mode 100644 index 000000000..d5b27a723 --- /dev/null +++ b/bindings/src/pravega_client_test.py @@ -0,0 +1,36 @@ +# +# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# + +import unittest +import pravega_client; + +class PravegaTest(unittest.TestCase): + def test_writeEvent(self): + print("Creating a Stream Manager, ensure pravega is running") + stream_manager=pravega_client.StreamManager("127.0.0.1:9090") + + print("Creating a scope") + scope_result=stream_manager.create_scope("testScope") + self.assertEqual(True, scope_result, "Scope creation status") + + print("Creating a stream") + stream_result=stream_manager.create_stream("testScope", "testStream", 1) + self.assertEqual(True, stream_result, "Stream creation status") + + print("Creating a writer for Stream") + w1=stream_manager.create_writer("testScope","testStream") + + print("Write events") + w1.write_event("test event1") + w1.write_event("test event2") + + +if __name__ == '__main__': + unittest.main() diff --git a/bindings/tox.ini b/bindings/tox.ini index 4439063d0..db9408d19 100644 --- a/bindings/tox.ini +++ b/bindings/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py36, py37 +envlist = py36 requires = tox-pyo3 [testenv] @@ -9,4 +9,4 @@ deps = commands = pytest -vvvv -skip_install = True +skip_install = True \ No newline at end of file From c07dc0a2bc3502e8333c7cc8f7f4622a74f83642 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 26 May 2020 16:55:43 +0530 Subject: [PATCH 09/19] Update git ignore for python files Signed-off-by: Sandeep --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 855e9bdfb..b1f2f77ae 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,6 @@ pravega-*.tgz .vscode /shared/target/ .gradle +*.log + +__pycache__/ From d21570da3ff7521668d5af86c8d1df956c988db3 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Mon, 1 Jun 2020 19:39:32 +0530 Subject: [PATCH 10/19] Implement a Python __repr__ function Signed-off-by: Sandeep --- bindings/src/python_binding/stream_manager.rs | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/bindings/src/python_binding/stream_manager.rs b/bindings/src/python_binding/stream_manager.rs index e201ca0cd..78e769886 100644 --- a/bindings/src/python_binding/stream_manager.rs +++ b/bindings/src/python_binding/stream_manager.rs @@ -12,15 +12,15 @@ use crate::python_binding::stream_writer::StreamWriter; use pravega_client_rust::client_factory::ClientFactory; use pravega_rust_client_shared::*; use pravega_wire_protocol::client_config::{ClientConfig, ClientConfigBuilder}; -use pyo3::exceptions; use pyo3::prelude::*; use pyo3::PyResult; +use pyo3::{exceptions, PyObjectProtocol}; use std::net::SocketAddr; use tokio::runtime::Runtime; #[pyclass] pub(crate) struct StreamManager { - _controller_ip: String, + controller_ip: String, rt: Runtime, cf: ClientFactory, config: ClientConfig, @@ -43,7 +43,7 @@ impl StreamManager { let client_factory = handle.block_on(ClientFactory::new(config.clone())); StreamManager { - _controller_ip: controller_uri, + controller_ip: controller_uri, rt: runtime, cf: client_factory, config, @@ -185,4 +185,24 @@ impl StreamManager { }); Ok(stream_writer) } + + /// Returns the facet string representation. + fn to_str(&self) -> String { + format!( + "Controller ip: {:?} ClientConfig: {:?}", + self.controller_ip, self.config + ) + } +} + +/// +/// Refer https://docs.python.org/3/reference/datamodel.html#basic-customization +/// This function will be called by the repr() built-in function to compute the “official” string +/// representation of an Python object. +/// +#[pyproto] +impl PyObjectProtocol for StreamManager { + fn __repr__(&self) -> PyResult { + Ok(format!("StreamManager({})", self.to_str())) + } } From 130f12e0dc7831e3e4f5b87d81476429e24d1c16 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Mon, 1 Jun 2020 22:03:40 +0530 Subject: [PATCH 11/19] Improve POC. Signed-off-by: Sandeep --- bindings/Cargo.toml | 7 +++++ bindings/src/lib.rs | 15 ++++++++- bindings/src/python_binding/mod.rs | 31 ------------------- .../{python_binding => }/stream_manager.rs | 2 +- .../src/{python_binding => }/stream_writer.rs | 0 rust-toolchain | 1 + 6 files changed, 23 insertions(+), 33 deletions(-) delete mode 100644 bindings/src/python_binding/mod.rs rename bindings/src/{python_binding => }/stream_manager.rs (99%) rename bindings/src/{python_binding => }/stream_writer.rs (100%) diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml index a9fe6ec35..6e687f684 100644 --- a/bindings/Cargo.toml +++ b/bindings/Cargo.toml @@ -17,6 +17,11 @@ authors = ["Tom Kaitchuck ", "Wenqi Mou PyResult<()> { + m.add_class::()?; + m.add_class::()?; + Ok(()) +} diff --git a/bindings/src/python_binding/mod.rs b/bindings/src/python_binding/mod.rs deleted file mode 100644 index f012563e9..000000000 --- a/bindings/src/python_binding/mod.rs +++ /dev/null @@ -1,31 +0,0 @@ -// -// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// - -use pyo3::prelude::*; -use pyo3::wrap_pyfunction; -mod stream_manager; -mod stream_writer; -use stream_manager::StreamManager; -use stream_writer::StreamWriter; - -/// Formats the sum of two numbers as string. -#[pyfunction] -fn sum_as_string(a: usize, b: usize) -> PyResult { - Ok((a + b).to_string()) -} - -#[pymodule] -/// A Python module implemented in Rust. -fn pravega_client(_py: Python, m: &PyModule) -> PyResult<()> { - m.add_wrapped(wrap_pyfunction!(sum_as_string))?; - m.add_class::()?; - m.add_class::()?; - Ok(()) -} diff --git a/bindings/src/python_binding/stream_manager.rs b/bindings/src/stream_manager.rs similarity index 99% rename from bindings/src/python_binding/stream_manager.rs rename to bindings/src/stream_manager.rs index 78e769886..78f04ff5e 100644 --- a/bindings/src/python_binding/stream_manager.rs +++ b/bindings/src/stream_manager.rs @@ -8,7 +8,7 @@ // http://www.apache.org/licenses/LICENSE-2.0 // -use crate::python_binding::stream_writer::StreamWriter; +use crate::stream_writer::StreamWriter; use pravega_client_rust::client_factory::ClientFactory; use pravega_rust_client_shared::*; use pravega_wire_protocol::client_config::{ClientConfig, ClientConfigBuilder}; diff --git a/bindings/src/python_binding/stream_writer.rs b/bindings/src/stream_writer.rs similarity index 100% rename from bindings/src/python_binding/stream_writer.rs rename to bindings/src/stream_writer.rs diff --git a/rust-toolchain b/rust-toolchain index c1a4c6bb4..0dca6923c 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1,2 +1,3 @@ +# Change to stable once issue https://github.com/PyO3/pyo3/issues/5 is resolved. nightly From 0b0af9ed3932246ea781048c849456bc5f0345f0 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Mon, 1 Jun 2020 22:03:56 +0530 Subject: [PATCH 12/19] Ensure github flows uses nightly. Signed-off-by: Sandeep --- .github/workflows/cibuild.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/cibuild.yml b/.github/workflows/cibuild.yml index 0a381d8e9..3fd038deb 100644 --- a/.github/workflows/cibuild.yml +++ b/.github/workflows/cibuild.yml @@ -42,10 +42,10 @@ jobs: path: target key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.toml') }} - - name: Install stable toolchain + - name: Install nightly toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly override: true components: rustfmt, clippy @@ -85,10 +85,10 @@ jobs: path: target key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.toml') }} - - name: Install stable toolchain + - name: Install nightly toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly override: true components: rustfmt, clippy @@ -131,10 +131,10 @@ jobs: path: target key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.toml') }} - - name: Install stable toolchain + - name: Install nightly toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly override: true components: rustfmt, clippy @@ -174,10 +174,10 @@ jobs: path: target key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.toml') }} - - name: Install stable toolchain + - name: Install nightly toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly override: true components: rustfmt, clippy From 998f8aad866ed85e0e0bb8561bc65e8a7ea68715 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Mon, 1 Jun 2020 22:40:07 +0530 Subject: [PATCH 13/19] Enable Byte array as a parameter for Python apis. Signed-off-by: Sandeep --- bindings/src/stream_writer.rs | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/bindings/src/stream_writer.rs b/bindings/src/stream_writer.rs index e6d43562e..6d18ab928 100644 --- a/bindings/src/stream_writer.rs +++ b/bindings/src/stream_writer.rs @@ -25,11 +25,31 @@ pub(crate) struct StreamWriter { #[pymethods] impl StreamWriter { /// - /// Write an event to the Stream. The operation blocks until the write operations is completed. + /// Write an event as a String into to the Pravega Stream. The operation blocks until the write operations is completed. /// pub fn write_event(&mut self, event: String) -> PyResult<()> { + self.write_event_bytes(event.into_bytes()) // + } + + /// + /// Write an event into the Pravega Stream for the given routing key. + /// + pub fn write_event_by_routing_key(&mut self, event: String, routing_key: String) -> PyResult<()> { + self.write_event_by_routing_key_bytes(event.into_bytes(), routing_key) + } + + /// + /// Write an event to Pravega Stream. The operation blocks until the write operations is completed. + /// Python can also be used to convert a given object into bytes. + /// + /// E.g: + /// >>> e="test" + /// >>> b=e.encode("utf-8") // Python api to convert an object to byte array. + /// >>> w1.write_event_bytes(b) + /// + pub fn write_event_bytes(&mut self, event: Vec) -> PyResult<()> { println!("Writing a single event"); - let result = self.handle.block_on(self.writer.write_event(event.into_bytes())); + let result = self.handle.block_on(self.writer.write_event(event)); let result_oneshot: Result<(), EventStreamWriterError> = self.handle.block_on(result).expect("Write failed"); @@ -40,14 +60,13 @@ impl StreamWriter { } /// - /// Write an event to the Stream given a routing key. + /// Write an event to the Pravega Stream given a routing key. /// - pub fn write_event_by_routing_key(&mut self, event: String, routing_key: String) -> PyResult<()> { + pub fn write_event_by_routing_key_bytes(&mut self, event: Vec, routing_key: String) -> PyResult<()> { println!("Writing a single event for a given routing key"); - let result = self.handle.block_on( - self.writer - .write_event_by_routing_key(routing_key, event.into_bytes()), - ); + let result = self + .handle + .block_on(self.writer.write_event_by_routing_key(routing_key, event)); let result_oneshot: Result<(), EventStreamWriterError> = self .handle .block_on(result) From d31f3cd5bc7889e85174063b974ad7d97f86c245 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Mon, 1 Jun 2020 23:03:06 +0530 Subject: [PATCH 14/19] Cargo test fails for bindings. Signed-off-by: Sandeep --- bindings/Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml index 6e687f684..5d9842e10 100644 --- a/bindings/Cargo.toml +++ b/bindings/Cargo.toml @@ -16,12 +16,14 @@ authors = ["Tom Kaitchuck ", "Wenqi Mou Date: Tue, 2 Jun 2020 18:00:38 +0530 Subject: [PATCH 15/19] CR Changes. Signed-off-by: Sandeep --- .github/workflows/cibuild.yml | 3 +-- bindings/Cargo.toml | 19 +++++++++---------- bindings/src/lib.rs | 17 +++++++++++++---- bindings/src/stream_manager.rs | 25 ++++++++++++++++--------- bindings/src/stream_writer.rs | 18 ++++++++++++------ 5 files changed, 51 insertions(+), 31 deletions(-) diff --git a/.github/workflows/cibuild.yml b/.github/workflows/cibuild.yml index 3fd038deb..5f8f48669 100644 --- a/.github/workflows/cibuild.yml +++ b/.github/workflows/cibuild.yml @@ -91,12 +91,11 @@ jobs: toolchain: nightly override: true components: rustfmt, clippy - - name: Run cargo test uses: actions-rs/cargo@v1 with: command: test - args: --workspace + args: -p pravega-rust-client-channel -p pravega-controller-client -p pravega-rust-client-integration-test -p pravega-rust-client-retry -p pravega-rust-client-shared -p pravega-wire-protocol -p pravega-client-rust - name: Run code cov run: | diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml index 5d9842e10..16f499b80 100644 --- a/bindings/Cargo.toml +++ b/bindings/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "bindings" +name = "pravega-rust-client-bindings" version = "0.1.0" edition = "2018" categories = ["Network programming"] @@ -16,17 +16,13 @@ authors = ["Tom Kaitchuck ", "Wenqi Mou PyResult<()> { @@ -24,3 +31,5 @@ fn pravega_client(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; Ok(()) } + +fn main() {} diff --git a/bindings/src/stream_manager.rs b/bindings/src/stream_manager.rs index 78f04ff5e..2f6e860c7 100644 --- a/bindings/src/stream_manager.rs +++ b/bindings/src/stream_manager.rs @@ -8,16 +8,21 @@ // http://www.apache.org/licenses/LICENSE-2.0 // -use crate::stream_writer::StreamWriter; -use pravega_client_rust::client_factory::ClientFactory; -use pravega_rust_client_shared::*; -use pravega_wire_protocol::client_config::{ClientConfig, ClientConfigBuilder}; -use pyo3::prelude::*; -use pyo3::PyResult; -use pyo3::{exceptions, PyObjectProtocol}; -use std::net::SocketAddr; -use tokio::runtime::Runtime; +cfg_if! { + if #[cfg(feature = "python_binding")] { + use crate::stream_writer::StreamWriter; + use pravega_client_rust::client_factory::ClientFactory; + use pravega_rust_client_shared::*; + use pravega_wire_protocol::client_config::{ClientConfig, ClientConfigBuilder}; + use pyo3::prelude::*; + use pyo3::PyResult; + use pyo3::{exceptions, PyObjectProtocol}; + use std::net::SocketAddr; + use tokio::runtime::Runtime; + } +} +#[cfg(feature = "python_binding")] #[pyclass] pub(crate) struct StreamManager { controller_ip: String, @@ -26,6 +31,7 @@ pub(crate) struct StreamManager { config: ClientConfig, } +#[cfg(feature = "python_binding")] #[pymethods] impl StreamManager { #[new] @@ -200,6 +206,7 @@ impl StreamManager { /// This function will be called by the repr() built-in function to compute the “official” string /// representation of an Python object. /// +#[cfg(feature = "python_binding")] #[pyproto] impl PyObjectProtocol for StreamManager { fn __repr__(&self) -> PyResult { diff --git a/bindings/src/stream_writer.rs b/bindings/src/stream_writer.rs index 6d18ab928..e7ee1df86 100644 --- a/bindings/src/stream_writer.rs +++ b/bindings/src/stream_writer.rs @@ -8,13 +8,18 @@ // http://www.apache.org/licenses/LICENSE-2.0 // -use pravega_client_rust::error::EventStreamWriterError; -use pravega_client_rust::event_stream_writer::EventStreamWriter; -use pyo3::exceptions; -use pyo3::prelude::*; -use pyo3::PyResult; -use tokio::runtime::Handle; +cfg_if! { + if #[cfg(feature = "python_binding")] { + use pravega_client_rust::error::EventStreamWriterError; + use pravega_client_rust::event_stream_writer::EventStreamWriter; + use pyo3::exceptions; + use pyo3::prelude::*; + use pyo3::PyResult; + use tokio::runtime::Handle; + } +} +#[cfg(feature = "python_binding")] #[pyclass] #[derive(new)] // this ensures the python object cannot be created without the using StreamManager. pub(crate) struct StreamWriter { @@ -22,6 +27,7 @@ pub(crate) struct StreamWriter { handle: Handle, } +#[cfg(feature = "python_binding")] #[pymethods] impl StreamWriter { /// From 224a61723841a17b70ede0fe5bfd50dc4290b061 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 2 Jun 2020 18:01:36 +0530 Subject: [PATCH 16/19] remove redundant main Signed-off-by: Sandeep --- bindings/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs index d1e2173dc..6669d6a48 100644 --- a/bindings/src/lib.rs +++ b/bindings/src/lib.rs @@ -31,5 +31,3 @@ fn pravega_client(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; Ok(()) } - -fn main() {} From 9398fb8d7b591b949bc3b37c02fa301f5eeed60c Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 2 Jun 2020 18:13:03 +0530 Subject: [PATCH 17/19] fix clippy. Signed-off-by: Sandeep --- bindings/src/stream_writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bindings/src/stream_writer.rs b/bindings/src/stream_writer.rs index e7ee1df86..9c585969f 100644 --- a/bindings/src/stream_writer.rs +++ b/bindings/src/stream_writer.rs @@ -60,7 +60,7 @@ impl StreamWriter { self.handle.block_on(result).expect("Write failed"); match result_oneshot { - Ok(t) => Ok(t), + Ok(_t) => Ok(()), Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), } } @@ -79,7 +79,7 @@ impl StreamWriter { .expect("Write for specified routing key failed"); match result_oneshot { - Ok(t) => Ok(t), + Ok(_t) => Ok(()), Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), } } From 8c3a68080246b478a3dbd93895b977a937ffe4f7 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 2 Jun 2020 18:46:54 +0530 Subject: [PATCH 18/19] Fix clippy errors. Signed-off-by: Sandeep --- connection_pool/src/lib.rs | 3 +-- controller-client/src/lib.rs | 1 + controller-client/src/model_helper.rs | 2 +- retry/src/lib.rs | 3 +-- rust-toolchain | 3 +-- shared/src/lib.rs | 3 +-- src/lib.rs | 3 +-- wire_protocol/src/lib.rs | 3 +-- 8 files changed, 8 insertions(+), 13 deletions(-) diff --git a/connection_pool/src/lib.rs b/connection_pool/src/lib.rs index 9cf6ec2b1..b78e6b244 100644 --- a/connection_pool/src/lib.rs +++ b/connection_pool/src/lib.rs @@ -21,8 +21,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] diff --git a/controller-client/src/lib.rs b/controller-client/src/lib.rs index c6cf04180..37938008e 100644 --- a/controller-client/src/lib.rs +++ b/controller-client/src/lib.rs @@ -25,6 +25,7 @@ )] #![allow(clippy::multiple_crate_versions)] #![allow(dead_code)] +#![allow(clippy::similar_names)] use std::result::Result as StdResult; use std::time::{Duration, Instant}; diff --git a/controller-client/src/model_helper.rs b/controller-client/src/model_helper.rs index 709d2ba18..1ce2d762e 100644 --- a/controller-client/src/model_helper.rs +++ b/controller-client/src/model_helper.rs @@ -121,7 +121,7 @@ impl Into for pravega_rust_client_shared::StreamCu fn into(self) -> crate::controller::StreamCut { crate::controller::StreamCut { stream_info: Some(self.scoped_stream.into()), - cut: self.segment_offset_map.to_owned(), // create a clone + cut: self.segment_offset_map, } } } diff --git a/retry/src/lib.rs b/retry/src/lib.rs index 410bc9364..cf0f2ed0e 100644 --- a/retry/src/lib.rs +++ b/retry/src/lib.rs @@ -35,8 +35,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] diff --git a/rust-toolchain b/rust-toolchain index 0dca6923c..21b07a553 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1,3 +1,2 @@ -# Change to stable once issue https://github.com/PyO3/pyo3/issues/5 is resolved. nightly - +## Change to stable once issue https://github.com/PyO3/pyo3/issues/5 is resolved, tracked by https://github.com/PyO3/pyo3/issues/210 diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 246302195..6293bc081 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -20,8 +20,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] diff --git a/src/lib.rs b/src/lib.rs index 38cafb282..92d715ccd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,8 +22,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] diff --git a/wire_protocol/src/lib.rs b/wire_protocol/src/lib.rs index 564bc6762..f7eb3552f 100644 --- a/wire_protocol/src/lib.rs +++ b/wire_protocol/src/lib.rs @@ -21,8 +21,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] From 259e4ab22debf992cf7da61842f10a3b889cd916 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 3 Jun 2020 09:18:25 +0530 Subject: [PATCH 19/19] Remove comment. Signed-off-by: Sandeep --- bindings/src/stream_manager.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bindings/src/stream_manager.rs b/bindings/src/stream_manager.rs index 2f6e860c7..c21ac4c6c 100644 --- a/bindings/src/stream_manager.rs +++ b/bindings/src/stream_manager.rs @@ -83,7 +83,6 @@ impl StreamManager { let controller = self.cf.get_controller_client(); let scope_name = Scope::new(scope_name.to_string()); - // let stream_name = Stream::new("testStream".into()); let scope_result = handle.block_on(controller.delete_scope(&scope_name)); println!("Scope deletion status {:?}", scope_result);