Skip to content

Commit

Permalink
feat: add the WatchState API (#582)
Browse files Browse the repository at this point in the history
This pull request introduces a new stream API named "WatchState." 
Currently, it only provides the current temporality, 
which will enable clients to check if the server has been paused.
  • Loading branch information
Rustin170506 authored Nov 5, 2024
1 parent 1f41b61 commit 7c1f9f2
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 48 deletions.
19 changes: 19 additions & 0 deletions console-api/proto/instrument.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ service Instrument {
rpc WatchUpdates(InstrumentRequest) returns (stream Update) {}
// Produces a stream of updates describing the activity of a specific task.
rpc WatchTaskDetails(TaskDetailsRequest) returns (stream tasks.TaskDetails) {}
// Produces a stream of state of the aggregator.
rpc WatchState(StateRequest) returns (stream State) {}
// Registers that the console observer wants to pause the stream.
rpc Pause(PauseRequest) returns (PauseResponse) {}
// Registers that the console observer wants to resume the stream.
Expand Down Expand Up @@ -72,6 +74,23 @@ message Update {
common.RegisterMetadata new_metadata = 5;
}

// StateRequest requests the current state of the aggregator.
message StateRequest {
}

// State carries the current state of the aggregator.
message State {
Temporality temporality = 1;
}

// The time "state" of the aggregator.
enum Temporality {
// The aggregator is currently live.
LIVE = 0;
// The aggregator is currently paused.
PAUSED = 1;
}

// `PauseResponse` is the value returned after a pause request.
message PauseResponse {
}
Expand Down
125 changes: 125 additions & 0 deletions console-api/src/generated/rs.tokio.console.instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,50 @@ pub struct Update {
#[prost(message, optional, tag = "5")]
pub new_metadata: ::core::option::Option<super::common::RegisterMetadata>,
}
/// StateRequest requests the current state of the aggregator.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct StateRequest {}
/// State carries the current state of the aggregator.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct State {
#[prost(enumeration = "Temporality", tag = "1")]
pub temporality: i32,
}
/// `PauseResponse` is the value returned after a pause request.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PauseResponse {}
/// `ResumeResponse` is the value returned after a resume request.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ResumeResponse {}
/// The time "state" of the aggregator.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Temporality {
/// The aggregator is currently live.
Live = 0,
/// The aggregator is currently paused.
Paused = 1,
}
impl Temporality {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Self::Live => "LIVE",
Self::Paused => "PAUSED",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"LIVE" => Some(Self::Live),
"PAUSED" => Some(Self::Paused),
_ => None,
}
}
}
/// Generated client implementations.
pub mod instrument_client {
#![allow(
Expand Down Expand Up @@ -208,6 +246,36 @@ pub mod instrument_client {
);
self.inner.server_streaming(req, path, codec).await
}
/// Produces a stream of state of the aggregator.
pub async fn watch_state(
&mut self,
request: impl tonic::IntoRequest<super::StateRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::State>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/rs.tokio.console.instrument.Instrument/WatchState",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new(
"rs.tokio.console.instrument.Instrument",
"WatchState",
),
);
self.inner.server_streaming(req, path, codec).await
}
/// Registers that the console observer wants to pause the stream.
pub async fn pause(
&mut self,
Expand Down Expand Up @@ -302,6 +370,17 @@ pub mod instrument_server {
tonic::Response<Self::WatchTaskDetailsStream>,
tonic::Status,
>;
/// Server streaming response type for the WatchState method.
type WatchStateStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::State, tonic::Status>,
>
+ std::marker::Send
+ 'static;
/// Produces a stream of state of the aggregator.
async fn watch_state(
&self,
request: tonic::Request<super::StateRequest>,
) -> std::result::Result<tonic::Response<Self::WatchStateStream>, tonic::Status>;
/// Registers that the console observer wants to pause the stream.
async fn pause(
&self,
Expand Down Expand Up @@ -482,6 +561,52 @@ pub mod instrument_server {
};
Box::pin(fut)
}
"/rs.tokio.console.instrument.Instrument/WatchState" => {
#[allow(non_camel_case_types)]
struct WatchStateSvc<T: Instrument>(pub Arc<T>);
impl<
T: Instrument,
> tonic::server::ServerStreamingService<super::StateRequest>
for WatchStateSvc<T> {
type Response = super::State;
type ResponseStream = T::WatchStateStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::StateRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Instrument>::watch_state(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = WatchStateSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/rs.tokio.console.instrument.Instrument/Pause" => {
#[allow(non_camel_case_types)]
struct PauseSvc<T: Instrument>(pub Arc<T>);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/* eslint-disable */
// @ts-nocheck

import { InstrumentRequest, PauseRequest, PauseResponse, ResumeRequest, ResumeResponse, TaskDetailsRequest, Update } from "./instrument_pb.js";
import { InstrumentRequest, PauseRequest, PauseResponse, ResumeRequest, ResumeResponse, State, StateRequest, TaskDetailsRequest, Update } from "./instrument_pb.js";
import { MethodKind } from "@bufbuild/protobuf";
import { TaskDetails } from "./tasks_pb.js";

Expand Down Expand Up @@ -37,6 +37,17 @@ export const Instrument = {
O: TaskDetails,
kind: MethodKind.ServerStreaming,
},
/**
* Produces a stream of state of the aggregator.
*
* @generated from rpc rs.tokio.console.instrument.Instrument.WatchState
*/
watchState: {
name: "WatchState",
I: StateRequest,
O: State,
kind: MethodKind.ServerStreaming,
},
/**
* Registers that the console observer wants to pause the stream.
*
Expand Down
98 changes: 98 additions & 0 deletions console-subscriber/examples/grpc_web/app/src/gen/instrument_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@ import { TaskUpdate } from "./tasks_pb.js";
import { ResourceUpdate } from "./resources_pb.js";
import { AsyncOpUpdate } from "./async_ops_pb.js";

/**
* The time "state" of the aggregator.
*
* @generated from enum rs.tokio.console.instrument.Temporality
*/
export enum Temporality {
/**
* The aggregator is currently live.
*
* @generated from enum value: LIVE = 0;
*/
LIVE = 0,

/**
* The aggregator is currently paused.
*
* @generated from enum value: PAUSED = 1;
*/
PAUSED = 1,
}
// Retrieve enum metadata with: proto3.getEnumType(Temporality)
proto3.util.setEnumType(Temporality, "rs.tokio.console.instrument.Temporality", [
{ no: 0, name: "LIVE" },
{ no: 1, name: "PAUSED" },
]);

/**
* InstrumentRequest requests the stream of updates
* to observe the async runtime state over time.
Expand Down Expand Up @@ -239,6 +265,78 @@ export class Update extends Message<Update> {
}
}

/**
* StateRequest requests the current state of the aggregator.
*
* @generated from message rs.tokio.console.instrument.StateRequest
*/
export class StateRequest extends Message<StateRequest> {
constructor(data?: PartialMessage<StateRequest>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "rs.tokio.console.instrument.StateRequest";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): StateRequest {
return new StateRequest().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): StateRequest {
return new StateRequest().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): StateRequest {
return new StateRequest().fromJsonString(jsonString, options);
}

static equals(a: StateRequest | PlainMessage<StateRequest> | undefined, b: StateRequest | PlainMessage<StateRequest> | undefined): boolean {
return proto3.util.equals(StateRequest, a, b);
}
}

/**
* State carries the current state of the aggregator.
*
* @generated from message rs.tokio.console.instrument.State
*/
export class State extends Message<State> {
/**
* @generated from field: rs.tokio.console.instrument.Temporality temporality = 1;
*/
temporality = Temporality.LIVE;

constructor(data?: PartialMessage<State>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "rs.tokio.console.instrument.State";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "temporality", kind: "enum", T: proto3.getEnumType(Temporality) },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): State {
return new State().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): State {
return new State().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): State {
return new State().fromJsonString(jsonString, options);
}

static equals(a: State | PlainMessage<State> | undefined, b: State | PlainMessage<State> | undefined): boolean {
return proto3.util.equals(State, a, b);
}
}

/**
* `PauseResponse` is the value returned after a pause request.
*
Expand Down
Loading

0 comments on commit 7c1f9f2

Please sign in to comment.