Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 256: Enable stream tag API on the RUST client. #270

Merged
merged 13 commits into from
Jun 28, 2021
3 changes: 3 additions & 0 deletions controller-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ jsonwebtoken = "7"
serde = {version = "1.0", features = ["derive"] }
futures = "0.3"
tokio-rustls = "0.22.0"
num = "0.4"
num-derive = "0.3"
num-traits = "0.2"

[build-dependencies]
tonic-build = "0.4"
Expand Down
212 changes: 208 additions & 4 deletions controller-client/proto/Controller.proto
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
* Copyright Pravega Authors.
shrids marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
package io.pravega.controller.stream.api.grpc.v1;
Expand Down Expand Up @@ -40,11 +46,26 @@ service ControllerService {
rpc pingTransaction(PingTxnRequest) returns (PingTxnStatus);
rpc checkTransactionState(TxnRequest) returns (TxnState);
rpc createScope(ScopeInfo) returns (CreateScopeStatus);
rpc listScopes(ScopesRequest) returns (ScopesResponse);
rpc checkScopeExists(ScopeInfo) returns (ExistsResponse);
rpc checkStreamExists(StreamInfo) returns (ExistsResponse);
rpc listStreamsInScope(StreamsInScopeRequest) returns (StreamsInScopeResponse);
rpc deleteScope(ScopeInfo) returns (DeleteScopeStatus);
rpc getDelegationToken(StreamInfo) returns (DelegationToken);
rpc removeWriter(RemoveWriterRequest) returns (RemoveWriterResponse);
rpc noteTimestampFromWriter(TimestampFromWriter) returns (TimestampResponse);
rpc createKeyValueTable(KeyValueTableConfig) returns (CreateKeyValueTableStatus);
rpc getCurrentSegmentsKeyValueTable(KeyValueTableInfo) returns (SegmentRanges);
rpc listKeyValueTablesInScope(KVTablesInScopeRequest) returns (KVTablesInScopeResponse);
rpc deleteKeyValueTable(KeyValueTableInfo) returns (DeleteKVTableStatus);
rpc listSubscribers(StreamInfo) returns (SubscribersResponse);
rpc updateSubscriberStreamCut(SubscriberStreamCut) returns (UpdateSubscriberStatus);
rpc createReaderGroup(ReaderGroupConfiguration) returns (CreateReaderGroupResponse);
rpc getReaderGroupConfig(ReaderGroupInfo) returns (ReaderGroupConfigResponse);
rpc deleteReaderGroup(ReaderGroupInfo) returns (DeleteReaderGroupStatus);
rpc updateReaderGroup(ReaderGroupConfiguration) returns (UpdateReaderGroupResponse);
rpc getStreamConfiguration(StreamInfo) returns (StreamConfig);
rpc listStreamsInScopeForTag(StreamsInScopeWithTagRequest) returns (StreamsInScopeResponse);
}

message ServerRequest {
Expand All @@ -54,6 +75,119 @@ message ServerResponse {
repeated NodeUri nodeURI = 1;
}

message ReaderGroupConfiguration {
enum RetentionType {
NONE = 0;
MANUAL = 1;
AUTOMATIC = 2;
}
string scope = 1;
string readerGroupName = 2;
int64 groupRefreshTimeMillis = 3;
int64 automaticCheckpointIntervalMillis = 4;
int32 maxOutstandingCheckpointRequest = 5;
int32 retentionType = 6;
int64 generation = 7;
string readerGroupId = 8;
repeated StreamCut startingStreamCuts = 9;
repeated StreamCut endingStreamCuts = 10;
}

message ReaderGroupConfigResponse {
ReaderGroupConfiguration config = 1;
enum Status {
SUCCESS = 0;
FAILURE = 1;
RG_NOT_FOUND = 2;
}
Status status = 2;
}

message ReaderGroupInfo {
string scope = 1;
string readerGroup = 2;
string readerGroupId = 3;
int64 generation = 4;
}

message CreateReaderGroupResponse {
enum Status {
SUCCESS = 0;
FAILURE = 1;
SCOPE_NOT_FOUND = 2;
INVALID_RG_NAME = 3;
}
Status status = 1;
ReaderGroupConfiguration config = 2;
}

message DeleteReaderGroupStatus {
enum Status {
SUCCESS = 0;
FAILURE = 1;
RG_NOT_FOUND = 2;
}
Status status = 1;
}

message UpdateReaderGroupResponse {
enum Status {
SUCCESS = 0;
FAILURE = 1;
RG_NOT_FOUND = 2;
INVALID_CONFIG = 3;
}
Status status = 1;
int64 generation = 2;
}

message CreateKeyValueTableStatus {
enum Status {
SUCCESS = 0;
FAILURE = 1;
TABLE_EXISTS = 2;
SCOPE_NOT_FOUND = 3;
INVALID_TABLE_NAME = 4;
}
Status status = 1;
}

message KeyValueTableConfig {
string scope = 1;
string kvtName = 2;
int32 partitionCount = 3;
}

message KeyValueTableInfo {
string scope = 1;
string kvtName = 2;
}

message KVTablesInScopeRequest {
ScopeInfo scope = 1;
ContinuationToken continuationToken = 2;
}

message KVTablesInScopeResponse {
repeated KeyValueTableInfo kvtables = 1;
ContinuationToken continuationToken = 2;
enum Status {
SUCCESS = 0;
FAILURE = 1;
SCOPE_NOT_FOUND = 2;
}
Status status = 3;
}

message DeleteKVTableStatus {
enum Status {
SUCCESS = 0;
FAILURE = 1;
TABLE_NOT_FOUND = 2;
}
Status status = 1;
}

message CreateStreamStatus {
enum Status {
SUCCESS = 0;
Expand All @@ -75,6 +209,42 @@ message UpdateStreamStatus {
Status status = 1;
}

message UpdateSubscriberStatus {
enum Status {
SUCCESS = 0;
FAILURE = 1;
STREAM_NOT_FOUND = 2;
SUBSCRIBER_NOT_FOUND = 3;
STREAM_CUT_NOT_VALID = 4;
GENERATION_MISMATCH = 5;
}
Status status = 1;
}

message StreamSubscriberInfo {
string scope = 1;
string stream = 2;
string subscriber = 3;
int64 operationGeneration = 4;
}

message SubscriberStreamCut {
string subscriber = 1;
int64 generation = 2;
string readerGroupId = 3;
StreamCut streamCut = 4;
}

message SubscribersResponse {
repeated string subscribers = 1;
enum Status {
SUCCESS = 0;
FAILURE = 1;
STREAM_NOT_FOUND = 2;
}
Status status = 2;
}

message DeleteStreamStatus {
enum Status {
SUCCESS = 0;
Expand Down Expand Up @@ -117,10 +287,10 @@ message TxnStatus {

message PingTxnStatus {
enum Status {
reserved 3;
OK = 0;
LEASE_TOO_LARGE = 1;
MAX_EXECUTION_TIME_EXCEEDED = 2;
SCALE_GRACE_TIME_EXCEEDED = 3 [deprecated=true];
DISCONNECTED = 4;
COMMITTED = 5;
ABORTED = 6;
Expand All @@ -141,7 +311,7 @@ message TxnState {
State state = 1;
}

message ScopeInfo {
message ScopeInfo {
string scope = 1;
}

Expand All @@ -154,6 +324,12 @@ message ContinuationToken {
ContinuationToken continuationToken = 2;
}

message StreamsInScopeWithTagRequest {
ScopeInfo scope = 1;
string tag = 2;
ContinuationToken continuationToken =3;
}

message StreamsInScopeResponse {
repeated StreamInfo streams = 1;
ContinuationToken continuationToken = 2;
Expand All @@ -168,6 +344,15 @@ message ContinuationToken {
message StreamInfo {
string scope = 1;
string stream = 2;
enum AccessOperation {
UNSPECIFIED = 0;
NONE = 1;
ANY = 2;
READ = 3;
WRITE = 4;
READ_WRITE = 5;
}
AccessOperation accessOperation = 3;
}

message ScalingPolicy {
Expand All @@ -190,12 +375,18 @@ message RetentionPolicy {
}
RetentionPolicyType retentionType = 1;
int64 retentionParam = 2;
int64 retentionMax = 3;
}

message StreamConfig {
StreamInfo streamInfo = 1;
ScalingPolicy scalingPolicy = 2;
RetentionPolicy retentionPolicy = 3;
Tags tags = 4;
}

message Tags {
repeated string tag = 1;
}

message StreamCut {
Expand Down Expand Up @@ -240,7 +431,6 @@ message TxnId {
}

message CreateTxnRequest {
reserved 3;
StreamInfo streamInfo = 1;
int64 lease = 2;
}
Expand Down Expand Up @@ -373,3 +563,17 @@ message TimestampResponse {
}
Status result = 1;
}

message ScopesResponse {
repeated string scopes = 1;
ContinuationToken continuationToken = 2;
}

message ScopesRequest {
ContinuationToken continuationToken = 1;
}

message ExistsResponse {
bool exists = 1;
}

28 changes: 28 additions & 0 deletions controller-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ enum Command {
stream_name: String,
#[structopt(help = "Segment Count")]
segment_count: i32,
#[structopt(help = "tag", value_name = "Tag,", use_delimiter = true, min_values = 0)]
tags: Vec<String>,
},
/// Seal a Stream.
SealStream {
Expand All @@ -53,6 +55,13 @@ enum Command {
#[structopt(help = "Scope Name")]
scope_name: String,
},
/// List Streams for a tag, under a scope
ListStreamsForTag {
#[structopt(help = "Scope Name")]
scope_name: String,
#[structopt(help = "Tag Name")]
tag: String,
},
}

#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -93,6 +102,7 @@ fn main() {
scope_name,
stream_name,
segment_count,
tags,
} => {
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
Expand All @@ -109,6 +119,7 @@ fn main() {
retention_type: RetentionType::None,
retention_param: 0,
},
tags: if tags.is_empty() { None } else { Some(tags) },
};
let result = rt.block_on(controller_client.create_stream(&stream_cfg));
println!("Stream creation status {:?}", result);
Expand Down Expand Up @@ -146,5 +157,22 @@ fn main() {
future::ready(())
}));
}
Command::ListStreamsForTag { scope_name, tag } => {
use futures::future;
use futures::stream::StreamExt;
use pravega_controller_client::paginator::list_streams_for_tag;

let scope = Scope::from(scope_name.clone());
let stream = list_streams_for_tag(scope, tag.clone(), &controller_client);
println!("Listing streams with tag {:?} under scope {:?}", tag, scope_name);
rt.block_on(stream.for_each(|stream| {
if stream.is_ok() {
println!("{:?}", stream.unwrap());
} else {
println!("Error while fetching data from Controller. Details: {:?}", stream);
}
future::ready(())
}));
}
}
}
Loading