Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: Pure-Peace/peace
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.12.0
Choose a base ref
...
head repository: Pure-Peace/peace
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: main
Choose a head ref

There isn’t anything to compare.

v0.12.0 and main are entirely different commit histories.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -80,7 +80,9 @@ prost-types = "0.11"

# async programming
tokio = "1"
tokio-stream = "0.1"
hyper = "0.14"
h2 = "0.3"
futures = "0.3"
futures-util = "0.3"
async-trait = "0.1"
3 changes: 0 additions & 3 deletions bin/events/src/app.rs
Original file line number Diff line number Diff line change
@@ -20,9 +20,6 @@ pub struct EventsConfig {

#[command(flatten)]
pub frame_cfg: RpcFrameConfig,

#[arg(long, short = 'P')]
pub ed25519_private_key_path: Option<String>,
}

#[derive(Clone)]
45 changes: 37 additions & 8 deletions bin/events/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use core_events::DynEventsService;
use core_events::{
DynEventsService, EventsError, SubscriptionWithOutputStream,
};
use pb_base::ExecSuccess;
use pb_events::*;
use std::pin::Pin;
@@ -17,16 +19,43 @@ impl EventsRpcImpl {

#[tonic::async_trait]
impl events_rpc_server::EventsRpc for EventsRpcImpl {
type ConnectServerStream =
type CreateSubscriptionStream =
Pin<Box<dyn Stream<Item = Result<Event, Status>> + Send>>;

async fn connect_server(
async fn create_subscription(
&self,
request: Request<ConnectRequest>,
) -> Result<Response<Self::ConnectServerStream>, Status> {
/* let output_stream = ReceiverStream::new(rx); */
todo!();
request: Request<CreateSubscriptionRequest>,
) -> Result<Response<Self::CreateSubscriptionStream>, Status> {
let CreateSubscriptionRequest { subscriber_key } = request.into_inner();

/* Ok(Response::new(Box::pin(output_stream) as Self::ConnectServerStream)) */
let SubscriptionWithOutputStream { stream, .. } = self
.events_service
.create_subscription(subscriber_key, 1, 1)
.await?;

Ok(Response::new(Box::pin(stream) as Self::CreateSubscriptionStream))
}

async fn remove_subscription(
&self,
request: Request<RemoveSubscriptionRequest>,
) -> Result<Response<ExecSuccess>, Status> {
let RemoveSubscriptionRequest { subscriber_key } = request.into_inner();

self.events_service.remove_subscription(&subscriber_key).await?;

Ok(Response::new(ExecSuccess::default()))
}

async fn publish(
&self,
request: Request<PublishRequest>,
) -> Result<Response<ExecSuccess>, Status> {
let PublishRequest { subscriber_key, event } = request.into_inner();
let event = event.ok_or(EventsError::InvalidArgument)?;

self.events_service.publish(&subscriber_key, event).await?;

Ok(Response::new(ExecSuccess::default()))
}
}
1 change: 1 addition & 0 deletions core/pb/modules/events/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code)]
#![allow(non_snake_case)]
#![allow(unused_imports)]

mod peace {
use pb_base as base;
19 changes: 17 additions & 2 deletions core/pb/proto/peace/services/events.proto
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
syntax = "proto3";

import "peace/base.proto";

package peace.services.events;

service EventsRPC {
rpc ConnectServer (ConnectRequest) returns (stream Event);
rpc CreateSubscription (CreateSubscriptionRequest) returns (stream Event);
rpc RemoveSubscription (RemoveSubscriptionRequest) returns (peace.base.ExecSuccess);
rpc Publish (PublishRequest) returns (peace.base.ExecSuccess);
}

message Event {
string topic = 1;
string content = 2;
}

message ConnectRequest {}
message CreateSubscriptionRequest {
string subscriber_key = 1;
}

message RemoveSubscriptionRequest {
string subscriber_key = 1;
}

message PublishRequest {
string subscriber_key = 1;
Event event = 2;
}
2 changes: 2 additions & 0 deletions core/services/events/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ default = []

[dependencies]
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
async-trait = { workspace = true }
anyhow = { workspace = true }
@@ -28,6 +29,7 @@ peace_cfg = { workspace = true }
pb_base = { workspace = true }
pb_events = { workspace = true }

tools = { workspace = true }

infra_services = { workspace = true }

6 changes: 6 additions & 0 deletions core/services/events/src/error.rs
Original file line number Diff line number Diff line change
@@ -3,6 +3,12 @@ use tonic::Status;

#[derive(thiserror::Error, Debug, Serialize, Deserialize, RpcError)]
pub enum EventsError {
#[error("subscription key not exists")]
SubscriptionNotExists,
#[error("failed to send event: {0}")]
SendEventError(String),
#[error("invalid argument")]
InvalidArgument,
#[error("TonicError: {0}")]
TonicError(String),
}
202 changes: 192 additions & 10 deletions core/services/events/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,153 @@
use crate::*;
use async_trait::async_trait;
use infra_services::{FromRpcClient, IntoService, RpcClient};
use std::sync::Arc;
use tonic::transport::Channel;
use infra_services::IntoService;
use pb_base::ExecSuccess;
use pb_events::Event;
use std::{collections::HashMap, hash::Hash, ops::Deref, sync::Arc};
use tokio::{
sync::{
mpsc::{self, Sender},
RwLock,
},
task::JoinHandle,
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tools::atomic::{AtomicOperation, Usize};

#[derive(Clone, Default)]
pub struct EventsServiceImpl {}
#[derive(Debug)]
pub struct Subscription<T> {
pub tx: Sender<T>,
pub task: JoinHandle<()>,
}

impl EventsServiceImpl {
#[inline]
pub fn new() -> Self {
Self {}
#[derive(Debug)]
pub struct SubscriptionWithOutputStream<T> {
pub subscription: Arc<Subscription<T>>,
pub stream: ReceiverStream<Result<T, Status>>,
}

#[derive(Debug)]
pub struct SubscriptionStoreInner<K, T> {
pub indexes: RwLock<HashMap<K, Arc<Subscription<T>>>>,
pub len: Usize,
}

pub struct EventsServiceImpl {
pub store: Arc<SubscriptionStore<String, Event>>,
}

#[derive(Debug, Clone)]
pub struct SubscriptionStore<K, T> {
pub inner: Arc<SubscriptionStoreInner<K, T>>,
}

impl<K, T> Deref for SubscriptionStore<K, T> {
type Target = Arc<SubscriptionStoreInner<K, T>>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl EventsService for EventsServiceImpl {}
impl<K, T> Deref for SubscriptionStoreInner<K, T> {
type Target = RwLock<HashMap<K, Arc<Subscription<T>>>>;

fn deref(&self) -> &Self::Target {
&self.indexes
}
}

impl<K, T> Default for SubscriptionStoreInner<K, T> {
fn default() -> Self {
Self { indexes: Default::default(), len: Default::default() }
}
}

impl<K, T> SubscriptionStoreInner<K, T>
where
K: Eq + Hash,
{
pub async fn add(
&self,
subscriber_key: K,
subscription: Arc<Subscription<T>>,
) -> Option<Arc<Subscription<T>>> {
let removed = self.write().await.insert(subscriber_key, subscription);
if removed.is_none() {
self.len.add(1);
}

removed
}

pub async fn remove(
&self,
subscriber_key: &K,
) -> Option<Arc<Subscription<T>>> {
let removed = self.write().await.remove(subscriber_key);
if removed.is_some() {
self.len.sub(1);
}

removed
}
}

impl<K, T> Default for SubscriptionStore<K, T> {
fn default() -> Self {
Self { inner: Default::default() }
}
}

impl<K, T> SubscriptionStore<K, T>
where
K: Eq + Hash + Clone + Send + Sync + 'static,
T: Send + 'static,
{
pub async fn create(
&self,
subscriber_key: K,
buffer_server: usize,
buffer_client: usize,
) -> SubscriptionWithOutputStream<T> {
let (server_tx, mut server_rx) = mpsc::channel::<T>(buffer_server);
let (client_tx, client_rx) =
mpsc::channel::<Result<T, Status>>(buffer_client);

let handle = {
let inner_cloned = self.inner.clone();
let subscriber_key_cloned = subscriber_key.clone();

tokio::spawn(async move {
let inner = inner_cloned;
let subscriber_key = subscriber_key_cloned;

while let Some(t) = server_rx.recv().await {
match client_tx.send(Ok(t)).await {
Ok(_) => {},
Err(_) => {
// client stream dropped
break;
},
}
}
println!("stream ended");
inner.remove(&subscriber_key).await;
})
};

let subscription =
Arc::new(Subscription { tx: server_tx, task: handle });

self.inner.add(subscriber_key, subscription.clone()).await;

SubscriptionWithOutputStream {
subscription,
stream: ReceiverStream::new(client_rx),
}
}
}

impl IntoService<DynEventsService> for EventsServiceImpl {
#[inline]
@@ -23,6 +156,55 @@ impl IntoService<DynEventsService> for EventsServiceImpl {
}
}

impl EventsServiceImpl {
#[inline]
pub fn new() -> Self {
Self { store: Arc::default() }
}
}

#[async_trait]
impl EventsService for EventsServiceImpl {
async fn create_subscription(
&self,
key: String,
buffer_server: usize,
buffer_client: usize,
) -> Result<SubscriptionWithOutputStream<Event>, EventsError> {
Ok(self.store.create(key, buffer_server, buffer_client).await)
}

async fn remove_subscription(
&self,
subscriber_key: &String,
) -> Result<Option<Arc<Subscription<Event>>>, EventsError> {
Ok(self.store.remove(subscriber_key).await)
}

async fn publish(
&self,
subscriber_key: &String,
event: Event,
) -> Result<ExecSuccess, EventsError> {
let subscription = {
self.store
.read()
.await
.get(subscriber_key)
.cloned()
.ok_or(EventsError::SubscriptionNotExists)?
};

subscription
.tx
.send(event)
.await
.map_err(|err| EventsError::SendEventError(err.to_string()))?;

Ok(ExecSuccess::default())
}
}

/* #[derive(Debug, Clone)]
pub struct EventsServiceRemote(EventsRpcClient<Channel>);
1 change: 1 addition & 0 deletions core/services/events/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#[allow(unused_imports)]
#[macro_use]
extern crate peace_logs;

Loading