diff --git a/Cargo.lock b/Cargo.lock index 15a5691..3f5af5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,15 +479,6 @@ dependencies = [ "clap_derive", ] -[[package]] -name = "clap-num" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "822c4000301ac390e65995c62207501e3ef800a1fc441df913a5e8e4dc374816" -dependencies = [ - "num-traits", -] - [[package]] name = "clap_builder" version = "4.5.53" @@ -827,29 +818,6 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "env_filter" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" -dependencies = [ - "log", - "regex", -] - -[[package]] -name = "env_logger" -version = "0.11.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "jiff", - "log", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -1582,30 +1550,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" -[[package]] -name = "jiff" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49cce2b81f2098e7e3efc35bc2e0a6b7abec9d34128283d7a26fa8f32a6dbb35" -dependencies = [ - "jiff-static", - "log", - "portable-atomic", - "portable-atomic-util", - "serde_core", -] - -[[package]] -name = "jiff-static" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "980af8b43c3ad5d8d349ace167ec8170839f753a42d233ba19e08afe1850fa69" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.111", -] - [[package]] name = "jni" version = "0.21.1" @@ -2397,15 +2341,6 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" -[[package]] -name = "portable-atomic-util" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" -dependencies = [ - "portable-atomic", -] - [[package]] name = "potential_utf" version = "0.1.4" @@ -3565,6 +3500,27 @@ dependencies = [ "test-case-core", ] +[[package]] +name = "test-log" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37d53ac171c92a39e4769491c4b4dde7022c60042254b5fc044ae409d34a24d4" +dependencies = [ + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be35209fd0781c5401458ab66e4f98accf63553e8fae7425503e92fdd319783b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "testcontainers" version = "0.25.2" @@ -4108,16 +4064,15 @@ name = "up-subscription" version = "0.4.0" dependencies = [ "async-trait", - "env_logger", - "log", "mockall 0.14.0", "pickledb", "protobuf", "serde", "test-case", + "test-log", "tokio", + "tracing", "up-rust", - "uriparse", ] [[package]] @@ -4125,10 +4080,10 @@ name = "up-subscription-cli" version = "0.4.0" dependencies = [ "clap", - "clap-num", - "log", "serde_json", "tokio", + "tracing", + "tracing-subscriber", "up-rust", "up-subscription", "up-transport-mqtt5", diff --git a/Cargo.toml b/Cargo.toml index dfcbce6..b612812 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,12 +30,11 @@ version = "0.4.0" [workspace.dependencies] async-trait = { version = "0.1" } -env_logger = { version = "0.11" } -log = { version = "0.4" } mockall = { version = "0.14" } protobuf = { version = "3.7.2" } test-case = { version = "3.3" } tokio = { version = "1", features = ["full"] } +tracing = { version = "0.1", default-features = false, features = ["log", "std"] } up-rust = { version = "0.9.0", features = ["usubscription"] } up-subscription = { path = "./up-subscription" } diff --git a/up-subscription-cli/Cargo.toml b/up-subscription-cli/Cargo.toml index 7a8ca86..c3eb17f 100644 --- a/up-subscription-cli/Cargo.toml +++ b/up-subscription-cli/Cargo.toml @@ -28,18 +28,26 @@ license = false eula = false [features] -default = ["local"] -local = ["up-rust/util"] +default = [] mqtt5 = ["dep:up-transport-mqtt5"] zenoh = ["dep:up-transport-zenoh", "dep:serde_json"] [dependencies] -clap = { version = "4.5", features = ["derive", "env"] } -clap-num = { version = "1.2" } -log = { workspace = true } +clap = { version = "4.5.53", default-features = false, features = [ + "std", + "derive", + "env", + "color", + "help", + "usage", + "error-context", + "suggestions", +] } serde_json = { version = "1.0", optional = true } tokio = { workspace = true } -up-rust = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } +up-rust = { workspace = true, features = ["usubscription", "util" ] } up-subscription = { workspace = true } up-transport-mqtt5 = { version = "0.4.0", optional = true } up-transport-zenoh = { version = "0.9.0", optional = true } diff --git a/up-subscription-cli/src/main.rs b/up-subscription-cli/src/main.rs index ace2ef0..befc8f8 100644 --- a/up-subscription-cli/src/main.rs +++ b/up-subscription-cli/src/main.rs @@ -11,13 +11,11 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use clap::Parser; -use clap_num::number_range; -use log::*; use std::sync::Arc; -use tokio::signal; -use up_rust::{LocalUriProvider, UTransport}; +use clap::Parser; +use tokio::signal; +use tracing::error; use up_subscription::{ConfigurationError, USubscriptionConfiguration, USubscriptionService}; #[cfg(feature = "mqtt5")] @@ -27,176 +25,110 @@ use up_transport_mqtt5::Mqtt5TransportOptions; use crate::transport::zenoh::ZenohArgs; mod transport; -#[cfg(feature = "local")] use transport::get_local_transport; #[cfg(feature = "mqtt5")] use transport::get_mqtt5_transport; #[cfg(feature = "zenoh")] use transport::get_zenoh_transport; -fn between_1_and_1024(s: &str) -> Result { - number_range(s, 1, 1024) -} - -#[derive(Debug)] -pub enum StartupError { - ConfigurationError(String), -} - -impl StartupError { - pub fn configuration_error(message: T) -> StartupError - where - T: Into, - { - Self::ConfigurationError(message.into()) - } -} - -impl std::fmt::Display for StartupError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::ConfigurationError(e) => f.write_fmt(format_args!("Configuration error: {e}")), - } - } -} - -impl std::error::Error for StartupError {} - -#[derive(clap::ValueEnum, Clone, Default, Debug)] +#[derive(clap::Subcommand)] enum Transport { - #[default] - None, - #[cfg(feature = "local")] + /// Use in-memory local uProtocol transport Local, - #[cfg(feature = "mqtt5")] - Mqtt5, #[cfg(feature = "zenoh")] - Zenoh, + /// Use Zenoh as uProtocol transport + Zenoh { + #[command(flatten)] + options: ZenohArgs, + }, + #[cfg(feature = "mqtt5")] + /// Use MQTT 5 as uProtocol transport + Mqtt5 { + #[command(flatten)] + options: Mqtt5TransportOptions, + }, } // All our args #[derive(Parser)] -#[command(version, about = "Rust implementation of Eclipse uProtocol USubscription service.", long_about = None)] +#[command(version, about = "Rust implementation of Eclipse uProtocol Subscription service.", long_about = None)] pub(crate) struct Args { - /// Authority name for usubscription service - #[arg(short, long, env)] + /// Authority name for uSubscription service + #[arg(short, long, env, value_name = "NAME", value_parser=clap::builder::NonEmptyStringValueParser::new())] authority: String, - /// The transport implementation to use - #[arg(short, long, env)] - transport: Transport, + /// Number of subscription commands that can buffered - minimum 1, maximum 1024 + #[arg(short, long, env, value_name="SIZE", value_parser=clap::value_parser!(u16).range(1..=1024), default_value_t = up_subscription::DEFAULT_COMMAND_BUFFER_SIZE)] + subscription_buffer: u16, - /// Buffer size of subscription command channel - minimum 1, maximum 1024, defaults to 1024 - #[arg(short, long, env, value_parser=between_1_and_1024)] - subscription_buffer: Option, + /// Number of notifications that can buffered - minimum 1, maximum 1024 + #[arg(short, long, env, value_name="SIZE", value_parser=clap::value_parser!(u16).range(1..=1024), default_value_t = up_subscription::DEFAULT_COMMAND_BUFFER_SIZE)] + notification_buffer: u16, - /// Buffer size of notification command channel - minimum 1, maximum 1024, defaults to 1024 - #[arg(short, long, env, value_parser=between_1_and_1024)] - notification_buffer: Option, - - /// Enable or disable persistency, default is true (enable) - #[arg(short, long, env, default_value_t = true)] + /// Disable persistency of subscription and notification data + #[arg(long="no-persistency", env="NO_PERSISTENCY", default_value_t = true, action=clap::ArgAction::SetFalse)] persistency: bool, /// Filesystem location for storing persistent data, default is current working directory - #[arg(long, env)] + #[arg(long, env, value_name = "PATH")] storage_path: Option, - /// Increase verbosity of output, default is false (reduced verbosity) + /// Increase verbosity of output #[arg(short, long, env, default_value_t = false)] verbose: bool, - #[cfg(feature = "mqtt5")] - #[command(flatten)] - mqtt_args: Mqtt5TransportOptions, - - #[cfg(feature = "zenoh")] - #[command(flatten)] - zenoh_args: ZenohArgs, + #[command(subcommand)] + transport: Transport, } -#[tokio::main] -async fn main() { - let args = Args::parse(); - - // Setup logging, get configuration - unsafe { - // accept unsafe for now, as at this point this program definiely is not multi-threaded - std::env::set_var("RUST_LOG", "info"); - #[cfg(feature = "zenoh")] - std::env::set_var("RUST_LOG", "info,zenoh=warn"); +impl TryFrom<&Args> for USubscriptionConfiguration { + type Error = ConfigurationError; + + fn try_from(value: &Args) -> Result { + USubscriptionConfiguration::create( + value.authority.clone(), + Some(value.notification_buffer), + Some(value.subscription_buffer), + value.persistency, + value.storage_path.clone(), + ) + } +} - if args.verbose { - std::env::set_var("RUST_LOG", "trace"); - #[cfg(feature = "zenoh")] - std::env::set_var("RUST_LOG", "trace,zenoh=info"); - } +fn init_logging(args: &Args) { + if args.verbose { + tracing_subscriber::fmt() + .with_env_filter("trace,zenoh=info") + .init(); + } else { + tracing_subscriber::fmt() + .with_env_filter("info,zenoh=warn") + .init(); } - up_subscription::init_once(); +} - let _config = match config_from_args(&args) { - Err(e) => { - panic!("Configuration error: {e}") - } - Ok(config) => Arc::new(config), - }; +#[tokio::main] +async fn main() -> Result<(), Box> { + // Setup logging, get configuration + let args = Args::parse(); + init_logging(&args); + let config = USubscriptionConfiguration::try_from(&args).map(Arc::new)?; // Deal with transport module that we're to use - let transport: Option> = match args.transport { - #[cfg(feature = "local")] - Transport::Local => Some( - get_local_transport() - .await - .inspect_err(|e| panic!("Error setting up local transport: {}", e.get_message())) - .unwrap(), - ), + let transport = match args.transport { + Transport::Local => get_local_transport().await, #[cfg(feature = "mqtt5")] - Transport::Mqtt5 => Some( - get_mqtt5_transport(_config.clone(), args.mqtt_args) - .await - .inspect_err(|e| panic!("Error setting up MQTT5 transport: {}", e.get_message())) - .unwrap(), - ), + Transport::Mqtt5 { options } => get_mqtt5_transport(&args.authority, options).await, #[cfg(feature = "zenoh")] - Transport::Zenoh => Some( - get_zenoh_transport(_config.clone(), args.zenoh_args) - .await - .inspect_err(|e| panic!("Error setting up Zenoh transport: {}", e.get_message())) - .unwrap(), - ), - Transport::None => None, - }; - - if transport.is_none() { - panic!("No valid transport protocol"); - } + Transport::Zenoh { options } => get_zenoh_transport(&args.authority, options).await, + }?; // Set up and run USubscription service - let mut ustop = USubscriptionService::run(_config.clone(), transport.as_ref().unwrap().clone()) + let mut ustop = USubscriptionService::run(config.clone(), transport.clone()) .await - .expect("Error starting usubscription service"); - - info!( - "Usubscription service running and listeners up on {}", - _config.get_source_uri() - ); + .inspect_err(|e| error!("Error starting uSubscription service: {}", e))?; signal::ctrl_c().await.expect("failed to listen for event"); - info!("Stopping usubscription service"); ustop.stop().await; -} - -fn config_from_args(args: &Args) -> Result { - let authority: &str = args.authority.trim(); - if authority.is_empty() { - return Err(ConfigurationError::new("Authority name empty or missing")); - } - - USubscriptionConfiguration::create( - authority.to_string(), - args.notification_buffer, - args.subscription_buffer, - args.persistency, - args.storage_path.clone(), - ) + Ok(()) } diff --git a/up-subscription-cli/src/transport/local.rs b/up-subscription-cli/src/transport/local.rs index 8dd6798..c794381 100644 --- a/up-subscription-cli/src/transport/local.rs +++ b/up-subscription-cli/src/transport/local.rs @@ -13,8 +13,11 @@ use std::sync::Arc; -use up_rust::{local_transport::LocalTransport, UStatus, UTransport}; +use tracing::info; +use up_rust::{local_transport::LocalTransport, UTransport}; -pub(crate) async fn get_local_transport() -> Result, UStatus> { +pub(crate) async fn get_local_transport() -> Result, Box> +{ + info!("Using in-memory local uProtocol transport"); Ok(Arc::new(LocalTransport::default())) } diff --git a/up-subscription-cli/src/transport/mod.rs b/up-subscription-cli/src/transport/mod.rs index 1c9c62f..70851de 100644 --- a/up-subscription-cli/src/transport/mod.rs +++ b/up-subscription-cli/src/transport/mod.rs @@ -11,9 +11,7 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -#[cfg(feature = "local")] pub(crate) mod local; -#[cfg(feature = "local")] pub(crate) use local::get_local_transport; #[cfg(feature = "mqtt5")] diff --git a/up-subscription-cli/src/transport/mqtt5.rs b/up-subscription-cli/src/transport/mqtt5.rs index 0430db7..3b45654 100644 --- a/up-subscription-cli/src/transport/mqtt5.rs +++ b/up-subscription-cli/src/transport/mqtt5.rs @@ -13,16 +13,16 @@ use std::sync::Arc; -use up_rust::{LocalUriProvider, UStatus, UTransport}; +use tracing::info; +use up_rust::UTransport; use up_transport_mqtt5::{Mqtt5Transport, Mqtt5TransportOptions}; pub(crate) async fn get_mqtt5_transport( - uri_provider: Arc, + authority_name: &str, mqtt5_args: Mqtt5TransportOptions, -) -> Result, UStatus> { - Ok( - Mqtt5Transport::new(mqtt5_args, uri_provider.get_authority()) - .await - .map(Arc::new)?, - ) +) -> Result, Box> { + info!("Using MQTT 5 uProtocol transport"); + Ok(Mqtt5Transport::new(mqtt5_args, authority_name) + .await + .map(Arc::new)?) } diff --git a/up-subscription-cli/src/transport/zenoh.rs b/up-subscription-cli/src/transport/zenoh.rs index 210dc6a..c183193 100644 --- a/up-subscription-cli/src/transport/zenoh.rs +++ b/up-subscription-cli/src/transport/zenoh.rs @@ -11,11 +11,12 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use serde_json::json; use std::sync::Arc; -use up_transport_zenoh::{zenoh_config, UPTransportZenoh}; -use up_rust::{LocalUriProvider, UStatus, UTransport}; +use serde_json::json; +use tracing::info; +use up_rust::UTransport; +use up_transport_zenoh::{zenoh_config, UPTransportZenoh}; #[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] pub(crate) enum WhatAmIType { @@ -54,6 +55,7 @@ pub(crate) struct ZenohArgs { } pub fn get_zenoh_config(args: ZenohArgs) -> zenoh_config::Config { + info!("Using Zenoh uProtocol transport"); // Load the config from file path let mut zenoh_cfg = match &args.config { Some(path) => zenoh_config::Config::from_file(path).unwrap(), @@ -92,19 +94,19 @@ pub fn get_zenoh_config(args: ZenohArgs) -> zenoh_config::Config { } pub(crate) async fn get_zenoh_transport( - uri_provider: Arc, + authority_name: &str, zenoh_args: ZenohArgs, -) -> Result, UStatus> { +) -> Result, Box> { UPTransportZenoh::try_init_log_from_env(); - let zenoh_builder = UPTransportZenoh::builder(uri_provider.get_authority()) - .map_err(|e| UStatus::fail(e.get_message()))?; + let zenoh_builder = UPTransportZenoh::builder(authority_name) + .map_err(|e| Box::::from(e.get_message()))?; let transport_zenoh: Arc = zenoh_builder .with_config(get_zenoh_config(zenoh_args)) .build() .await - .map_err(|e| UStatus::fail(e.get_message())) + .map_err(|e| Box::::from(e.get_message())) .map(Arc::new)?; Ok(transport_zenoh) diff --git a/up-subscription/Cargo.toml b/up-subscription/Cargo.toml index 67dd87b..6e9e64a 100644 --- a/up-subscription/Cargo.toml +++ b/up-subscription/Cargo.toml @@ -23,18 +23,17 @@ version.workspace = true [dependencies] async-trait = { workspace = true } -env_logger = { workspace = true } -log = { workspace = true } pickledb = { version = "0.5.1" } protobuf = { workspace = true } serde = { version = "1.0" } tokio = { workspace = true } +tracing = { workspace = true } up-rust = { workspace = true } -uriparse = { version = "0.6" } [dev-dependencies] mockall = { workspace = true } test-case = { workspace = true } +test-log ={ version = "0.2", default-features = false, features = ["trace"] } up-rust = { workspace = true, features = ["test-util"] } [package.metadata.dist] diff --git a/up-subscription/src/common/helpers.rs b/up-subscription/src/common/helpers.rs index 347b304..e663a7e 100644 --- a/up-subscription/src/common/helpers.rs +++ b/up-subscription/src/common/helpers.rs @@ -11,23 +11,16 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use log::*; use std::future::Future; -use std::sync::Once; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::{task, time::Duration}; +use tokio::{task, time::Duration}; +use tracing::error; use up_rust::{ communication::{ServiceInvocationError, UPayload}, UAttributes, UUri, }; -static INIT: Once = Once::new(); - -pub fn init_once() { - INIT.call_once(env_logger::init); -} - type SpawnResult = std::result::Result>; pub(crate) fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> diff --git a/up-subscription/src/configuration.rs b/up-subscription/src/configuration.rs index c05a7c3..e04ce94 100644 --- a/up-subscription/src/configuration.rs +++ b/up-subscription/src/configuration.rs @@ -11,8 +11,7 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use std::path::{Path, PathBuf}; -use uriparse::Authority; +use std::path::PathBuf; use up_rust::{ core::usubscription::{USUBSCRIPTION_TYPE_ID, USUBSCRIPTION_VERSION_MAJOR}, @@ -20,7 +19,7 @@ use up_rust::{ }; /// Default subscription and notification command channel buffer size -pub(crate) const DEFAULT_COMMAND_BUFFER_SIZE: usize = 1024; +pub const DEFAULT_COMMAND_BUFFER_SIZE: u16 = 1024; #[derive(Debug)] pub struct ConfigurationError(String); @@ -42,11 +41,36 @@ impl std::fmt::Display for ConfigurationError { impl std::error::Error for ConfigurationError {} +// only accept persistency path if it points to an existing directory; if None set to cwd +fn get_storage_path(path: Option) -> Result { + match path { + None => Ok(std::env::current_dir().map_err(|e| { + ConfigurationError::new(format!("Error retrieving current working directory: {e}")) + })?), + Some(p) => { + let pb = PathBuf::from(&p); + if !pb.exists() { + return Err(ConfigurationError::new(format!( + "Persistency storage path '{}' does not exist", + p + ))); + } + if !pb.is_dir() { + return Err(ConfigurationError::new(format!( + "Persistency storage path '{}' is not a directory", + p + ))); + } + Ok(pb) + } + } +} + #[derive(Clone, Debug)] pub struct USubscriptionConfiguration { pub authority_name: String, - pub subscription_command_buffer: usize, - pub notification_command_buffer: usize, + pub subscription_command_buffer: u16, + pub notification_command_buffer: u16, pub persistency_enabled: bool, pub persistency_path: PathBuf, } @@ -71,30 +95,13 @@ impl USubscriptionConfiguration { /// Returns a ConfigurationError in case an invalid Authority string is provided; this is determined via the uriparse crate Authority::try_from() method. pub fn create( authority_name: String, - subscription_command_buffer: Option, - notification_command_buffer: Option, + subscription_command_buffer: Option, + notification_command_buffer: Option, persistency_enabled: bool, persistency_path: Option, ) -> Result { - if let Err(e) = Authority::try_from(authority_name.as_bytes()) { - return Err(ConfigurationError::new(format!( - "Invalid authority name: {e}" - ))); - } - - // only accept persistency path if it points to an existing directory; if None set to cwd - let persistency_path = if let Some(path_string) = persistency_path { - let p = Path::new(&path_string); - p.try_exists().unwrap_or_else(|_| { - panic!("Persistency storage path does not exist {path_string}") - }); - if !p.is_dir() { - panic!("Persistency storage path is not a directory {path_string}"); - } - p.to_path_buf() - } else { - std::env::current_dir().expect("Error retrieving current working directory") - }; + let authority_name = UUri::verify_authority(authority_name.as_str()) + .map_err(|e| ConfigurationError::new(format!("Invalid authority name: {e}")))?; Ok(USubscriptionConfiguration { authority_name, @@ -105,7 +112,7 @@ impl USubscriptionConfiguration { .unwrap_or(DEFAULT_COMMAND_BUFFER_SIZE) .clamp(1, DEFAULT_COMMAND_BUFFER_SIZE), persistency_enabled, - persistency_path, + persistency_path: get_storage_path(persistency_path)?, }) } } diff --git a/up-subscription/src/handlers/fetch_subscribers.rs b/up-subscription/src/handlers/fetch_subscribers.rs index 035732d..766fc87 100644 --- a/up-subscription/src/handlers/fetch_subscribers.rs +++ b/up-subscription/src/handlers/fetch_subscribers.rs @@ -12,9 +12,8 @@ ********************************************************************************/ use async_trait::async_trait; -use log::*; use tokio::{sync::mpsc::Sender, sync::oneshot}; - +use tracing::error; use up_rust::{ communication::{RequestHandler, ServiceInvocationError, UPayload}, core::usubscription::{ @@ -112,13 +111,11 @@ mod tests { use up_rust::UUri; - use crate::{helpers, tests::test_lib}; + use crate::tests::test_lib; // [utest->dsn~usubscription-fetch-subscribers-protobuf~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_subscribe_success() { - helpers::init_once(); - // create request and other required object(s) let fetch_subscribers_request = FetchSubscribersRequest { topic: Some(test_lib::helpers::local_topic1_uri()).into(), @@ -163,10 +160,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_resource_id() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None); @@ -191,10 +186,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_source_uri() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None); @@ -216,11 +209,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_request_payload() { - helpers::init_once(); - - // create request and other required object(s) let message_attributes = UAttributes { source: Some(test_lib::helpers::subscriber_uri1()).into(), ..Default::default() @@ -238,10 +228,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_request_payload_type() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -290,10 +278,8 @@ mod tests { resource_id: 0x0000_FFFF, ..Default::default() }; "Wildcard resource id in topic UUri")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_invalid_topic_uri(topic: UUri) { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(topic, None); let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap(); diff --git a/up-subscription/src/handlers/fetch_subscriptions.rs b/up-subscription/src/handlers/fetch_subscriptions.rs index 028e0c8..cdc6d2b 100644 --- a/up-subscription/src/handlers/fetch_subscriptions.rs +++ b/up-subscription/src/handlers/fetch_subscriptions.rs @@ -12,9 +12,8 @@ ********************************************************************************/ use async_trait::async_trait; -use log::*; use tokio::{sync::mpsc::Sender, sync::oneshot}; - +use tracing::error; use up_rust::{ communication::{RequestHandler, ServiceInvocationError, UPayload}, core::usubscription::{ @@ -153,13 +152,11 @@ mod tests { use up_rust::UUri; - use crate::{helpers, tests::test_lib}; + use crate::tests::test_lib; // [utest->dsn~usubscription-fetch-subscriptions-protobuf~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_fetch_subscriptions_success() { - helpers::init_once(); - // create request and other required object(s) let fetch_subscriptions_request = FetchSubscriptionsRequest { request: Some(up_rust::core::usubscription::Request::Subscriber( @@ -217,10 +214,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_resource_id() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None); @@ -245,10 +240,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_source_uri() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None); @@ -270,10 +263,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_request_payload() { - helpers::init_once(); - // create request and other required object(s) let message_attributes = UAttributes { source: Some(test_lib::helpers::subscriber_uri1()).into(), @@ -292,10 +283,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_request_payload_type() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -344,10 +333,8 @@ mod tests { resource_id: 0x0000_FFFF, ..Default::default() }; "Wildcard resource id in subscriber UUri")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_invalid_subscriber_uri(subscriber: UUri) { - helpers::init_once(); - // create request and other required object(s) let fetch_subscriptions_request = FetchSubscriptionsRequest { request: Some(up_rust::core::usubscription::Request::Subscriber( @@ -403,10 +390,8 @@ mod tests { resource_id: 0x0000_FFFF, ..Default::default() }; "Wildcard resource id in topic UUri")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_invalid_topic_uri(topic: UUri) { - helpers::init_once(); - // create request and other required object(s) let fetch_subscriptions_request = FetchSubscriptionsRequest { request: Some(up_rust::core::usubscription::Request::Topic(topic)), diff --git a/up-subscription/src/handlers/register_for_notifications.rs b/up-subscription/src/handlers/register_for_notifications.rs index fa86afe..c9fd904 100644 --- a/up-subscription/src/handlers/register_for_notifications.rs +++ b/up-subscription/src/handlers/register_for_notifications.rs @@ -12,9 +12,8 @@ ********************************************************************************/ use async_trait::async_trait; -use log::*; use tokio::sync::mpsc::Sender; - +use tracing::error; use up_rust::{ communication::{RequestHandler, ServiceInvocationError, UPayload}, core::usubscription::{ @@ -93,13 +92,11 @@ mod tests { use up_rust::UUri; - use crate::{helpers, tests::test_lib}; + use crate::tests::test_lib; // [utest->dsn~usubscription-register-notifications-protobuf~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_register_notification_success() { - helpers::init_once(); - // create request and other required object(s) let notification_request = NotificationsRequest { topic: Some(test_lib::helpers::local_topic1_uri()).into(), @@ -142,10 +139,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_resource_id() { - helpers::init_once(); - // create request and other required object(s) let notification_request = NotificationsRequest { topic: Some(test_lib::helpers::local_topic1_uri()).into(), @@ -172,10 +167,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_source_uri() { - helpers::init_once(); - // create request and other required object(s) let notification_request = NotificationsRequest { topic: Some(test_lib::helpers::local_topic1_uri()).into(), @@ -198,10 +191,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_request_payload() { - helpers::init_once(); - // create request and other required object(s) let message_attributes = UAttributes { source: Some(test_lib::helpers::subscriber_uri1()).into(), @@ -223,10 +214,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_request_payload_type() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -274,10 +263,8 @@ mod tests { resource_id: 0x0000_FFFF, ..Default::default() }; "Wildcard resource id in topic UUri")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_invalid_topic_uri(topic: UUri) { - helpers::init_once(); - // create request and other required object(s) let notification_request = NotificationsRequest { topic: Some(topic).into(), diff --git a/up-subscription/src/handlers/reset.rs b/up-subscription/src/handlers/reset.rs index 636526d..a1d3115 100644 --- a/up-subscription/src/handlers/reset.rs +++ b/up-subscription/src/handlers/reset.rs @@ -12,8 +12,8 @@ ********************************************************************************/ use async_trait::async_trait; -use log::*; use tokio::{sync::mpsc::Sender, sync::oneshot}; +use tracing::error; use up_rust::{ communication::{RequestHandler, ServiceInvocationError, UPayload}, @@ -91,13 +91,11 @@ mod tests { use up_rust::UUri; - use crate::{helpers, tests::test_lib}; + use crate::tests::test_lib; // [utest->dsn~usubscription-reset-protobuf~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_reset_success() { - helpers::init_once(); - // create request and other required object(s) let mut source_uri = test_lib::helpers::subscriber_uri1(); source_uri.resource_id = up_rust::core::usubscription::USUBSCRIPTION_TYPE_ID; @@ -133,10 +131,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_resource_id() { - helpers::init_once(); - // create request and other required object(s) let request_payload = UPayload::try_from_protobuf(ResetRequest::default()).unwrap(); let message_attributes = UAttributes { @@ -164,10 +160,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_source_uri() { - helpers::init_once(); - // create request and other required object(s) let request_payload = UPayload::try_from_protobuf(ResetRequest::default()).unwrap(); @@ -191,10 +185,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_request_payload() { - helpers::init_once(); - // create request and other required object(s) let message_attributes = UAttributes { source: Some(test_lib::helpers::subscriber_uri1()).into(), @@ -217,10 +209,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_request_payload_type() { - helpers::init_once(); - // create request and other required object(s) let unsubscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -251,10 +241,8 @@ mod tests { } // [utest->req~usubscription-reset-only-usubscription~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_source_entitytype() { - helpers::init_once(); - // create request and other required object(s) let bad_source = UUri::try_from("up://local/1000/1/F").expect("Error during test case setup"); diff --git a/up-subscription/src/handlers/subscribe.rs b/up-subscription/src/handlers/subscribe.rs index 9a8e0a4..e026e0f 100644 --- a/up-subscription/src/handlers/subscribe.rs +++ b/up-subscription/src/handlers/subscribe.rs @@ -11,11 +11,11 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_trait::async_trait; -use log::*; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::{sync::mpsc::Sender, sync::oneshot}; +use async_trait::async_trait; +use tokio::{sync::mpsc::Sender, sync::oneshot}; +use tracing::error; use up_rust::{ communication::{RequestHandler, ServiceInvocationError, UPayload}, core::usubscription::{ @@ -129,13 +129,11 @@ mod tests { use up_rust::{core::usubscription::State, UUri}; - use crate::{helpers, tests::test_lib}; + use crate::tests::test_lib; // [utest->dsn~usubscription-subscribe-protobuf~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_subscribe_success() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None); @@ -195,10 +193,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_resource_id() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None); @@ -224,10 +220,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_source_uri() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None); @@ -249,10 +243,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_request_payload() { - helpers::init_once(); - // create request and other required object(s) let message_attributes = UAttributes { source: Some(test_lib::helpers::subscriber_uri1()).into(), @@ -271,10 +263,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_request_payload_type() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -300,10 +290,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_future_subscription() { - helpers::init_once(); - let future_secs = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() @@ -376,10 +364,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_expired_subscription() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request( test_lib::helpers::local_topic1_uri(), @@ -412,10 +398,8 @@ mod tests { #[test_case("up://*/100000/1/8AC7"; "Wildcard authority in topic UUri")] #[test_case("up://local/FFFF0000/1/8AC7"; "Wildcard entity id in topic UUri")] #[test_case("up://local/100000/1/FFFF"; "Wildcard resource id in topic UUri")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_invalid_topic_uri(topic: &str) { - helpers::init_once(); - // create request and other required object(s) let topic = UUri::from_str(topic).expect("Test parameter UUri failed to parse"); let subscribe_request = test_lib::helpers::subscription_request(topic, None); diff --git a/up-subscription/src/handlers/unregister_for_notifications.rs b/up-subscription/src/handlers/unregister_for_notifications.rs index cd683af..2969e29 100644 --- a/up-subscription/src/handlers/unregister_for_notifications.rs +++ b/up-subscription/src/handlers/unregister_for_notifications.rs @@ -12,9 +12,8 @@ ********************************************************************************/ use async_trait::async_trait; -use log::*; use tokio::sync::mpsc::Sender; - +use tracing::error; use up_rust::{ communication::{RequestHandler, ServiceInvocationError, UPayload}, core::usubscription::{ @@ -99,13 +98,11 @@ mod tests { use up_rust::UUri; - use crate::{helpers, tests::test_lib}; + use crate::tests::test_lib; // [utest->dsn~usubscription-unregister-notifications-protobuf~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_unregister_notification_success() { - helpers::init_once(); - // create request and other required object(s) let notification_request = NotificationsRequest { topic: Some(test_lib::helpers::local_topic1_uri()).into(), @@ -148,10 +145,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_resource_id() { - helpers::init_once(); - // create request and other required object(s) let notification_request = NotificationsRequest { topic: Some(test_lib::helpers::local_topic1_uri()).into(), @@ -178,10 +173,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_source_uri() { - helpers::init_once(); - // create request and other required object(s) let notification_request = NotificationsRequest { topic: Some(test_lib::helpers::local_topic1_uri()).into(), @@ -204,10 +197,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_request_payload() { - helpers::init_once(); - // create request and other required object(s) let message_attributes = UAttributes { source: Some(test_lib::helpers::subscriber_uri1()).into(), @@ -229,10 +220,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_request_payload_type() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -262,10 +251,8 @@ mod tests { #[test_case("up://*/100000/1/8AC7"; "Wildcard authority in topic UUri")] #[test_case("up://local/FFFF0000/1/8AC7"; "Wildcard entity id in topic UUri")] #[test_case("up://local/100000/1/FFFF"; "Wildcard resource id in topic UUri")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_invalid_topic_uri(topic: &str) { - helpers::init_once(); - // create request and other required object(s) let topic = UUri::from_str(topic).expect("Test parameter UUri failed to parse"); // create request and other required object(s) diff --git a/up-subscription/src/handlers/unsubscribe.rs b/up-subscription/src/handlers/unsubscribe.rs index bdbdb70..558454c 100644 --- a/up-subscription/src/handlers/unsubscribe.rs +++ b/up-subscription/src/handlers/unsubscribe.rs @@ -12,9 +12,8 @@ ********************************************************************************/ use async_trait::async_trait; -use log::*; use tokio::{sync::mpsc::Sender, sync::oneshot}; - +use tracing::error; use up_rust::{ communication::{RequestHandler, ServiceInvocationError, UPayload}, core::usubscription::{ @@ -102,13 +101,11 @@ mod tests { use up_rust::{core::usubscription::State, UUri}; - use crate::{helpers, tests::test_lib}; + use crate::tests::test_lib; // [utest->dsn~usubscription-unsubscribe-protobuf~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_unsubscribe_success() { - helpers::init_once(); - // create request and other required object(s) let unsubscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -162,10 +159,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_resource_id() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -191,10 +186,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_source_uri() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::unsubscribe_request(test_lib::helpers::local_topic1_uri()); @@ -216,10 +209,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_no_request_payload() { - helpers::init_once(); - // create request and other required object(s) let message_attributes = UAttributes { source: Some(test_lib::helpers::subscriber_uri1()).into(), @@ -238,10 +229,8 @@ mod tests { assert!(result.is_err_and(|err| matches!(err, ServiceInvocationError::InvalidArgument(_)))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_wrong_request_payload_type() { - helpers::init_once(); - // create request and other required object(s) let subscribe_request = test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None); @@ -272,10 +261,8 @@ mod tests { #[test_case("up://*/100000/1/8AC7"; "Wildcard authority in topic UUri")] #[test_case("up://local/FFFF0000/1/8AC7"; "Wildcard entity id in topic UUri")] #[test_case("up://local/100000/1/FFFF"; "Wildcard resource id in topic UUri")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_invalid_topic_uri(topic: &str) { - helpers::init_once(); - // create request and other required object(s) let topic = UUri::from_str(topic).expect("Test parameter UUri failed to parse"); let subscribe_request = test_lib::helpers::subscription_request(topic, None); diff --git a/up-subscription/src/lib.rs b/up-subscription/src/lib.rs index 9ef4eb8..bfd4564 100644 --- a/up-subscription/src/lib.rs +++ b/up-subscription/src/lib.rs @@ -38,7 +38,9 @@ For a batteries-included approach to running up-subscription-rust, the `up-subsc mod usubscription; pub use usubscription::*; mod configuration; -pub use configuration::{ConfigurationError, USubscriptionConfiguration}; +pub use configuration::{ + ConfigurationError, USubscriptionConfiguration, DEFAULT_COMMAND_BUFFER_SIZE, +}; // actors implementing the backend management logic for tracking subscriptions etc mod notification_manager; @@ -62,7 +64,6 @@ pub(crate) mod handlers { mod common { pub(crate) mod helpers; } -pub use common::helpers::init_once; pub(crate) use common::*; #[cfg(test)] diff --git a/up-subscription/src/notification_manager.rs b/up-subscription/src/notification_manager.rs index 596cd10..50a3fcb 100644 --- a/up-subscription/src/notification_manager.rs +++ b/up-subscription/src/notification_manager.rs @@ -11,10 +11,10 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use log::*; use std::sync::Arc; -use tokio::sync::{mpsc::Receiver, mpsc::Sender, oneshot, Notify}; +use tokio::sync::{mpsc::Receiver, mpsc::Sender, oneshot, Notify}; +use tracing::{debug, error, warn}; use up_rust::{ core::usubscription::{ usubscription_uri, SubscriberInfo, SubscriptionStatus, Update, @@ -24,7 +24,6 @@ use up_rust::{ }; use crate::{ - helpers, persistency::{self, PersistencyError}, usubscription::{SubscriberUUri, TopicUUri, INCLUDE_SCHEMA}, USubscriptionConfiguration, @@ -119,8 +118,6 @@ pub(crate) async fn notification_engine( mut events: Receiver, shutdown: Arc, ) { - helpers::init_once(); - // keep track of which subscriber wants to be notified on which topic let mut notifications = persistency::NotificationStore::new(&configuration); diff --git a/up-subscription/src/persistency.rs b/up-subscription/src/persistency.rs index 84d83fb..3fb7f89 100644 --- a/up-subscription/src/persistency.rs +++ b/up-subscription/src/persistency.rs @@ -773,10 +773,8 @@ mod tests { #[test_case(TopicState::SUBSCRIBE_PENDING, &[1,0,0,0]; "State SUBSCRIBE_PENDING")] #[test_case(TopicState::SUBSCRIBED, &[2,0,0,0]; "State SUBSCRIBED")] #[test_case(TopicState::UNSUBSCRIBE_PENDING, &[3,0,0,0]; "State UNSUBSCRIBE_PENDING")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_serialize_deserialize_topic_state(state: TopicState, bytes: &[u8]) { - helpers::init_once(); - // One way... let serialized_bytes = serialize_topic_state(&state); assert!(serialized_bytes.is_ok()); @@ -792,10 +790,8 @@ mod tests { assert_eq!(reconstructed_state, state); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_validate_and_append_filename_success() { - helpers::init_once(); - let mut expected = PathBuf::from("."); expected.push("newfile"); @@ -809,10 +805,8 @@ mod tests { #[test_case( "\\"; "Relative path 3")] #[test_case( ""; "Empty filename")] #[test_case( "/dummy/newfile"; "Random path and name")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_validate_and_append_filename_relative_paths(file: &str) { - helpers::init_once(); - let r = validate_and_append_filename(&PathBuf::from("."), file); assert!(r.is_err()); } diff --git a/up-subscription/src/subscription_manager.rs b/up-subscription/src/subscription_manager.rs index 812f63f..d30143f 100644 --- a/up-subscription/src/subscription_manager.rs +++ b/up-subscription/src/subscription_manager.rs @@ -11,15 +11,15 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use log::*; #[cfg(test)] use std::collections::HashMap; use std::{sync::Arc, time::Duration}; + use tokio::{ sync::{mpsc, mpsc::Receiver, mpsc::Sender, oneshot, Notify}, time::sleep, }; - +use tracing::{debug, error, warn}; use up_rust::{ communication::{CallOptions, InMemoryRpcClient, RpcClient}, core::usubscription::{ @@ -147,8 +147,6 @@ pub(crate) async fn handle_message( notification_sender: Sender, shutdown: Arc, ) { - helpers::init_once(); - // track subscribers for topics - if you're in this list, you have SUBSCRIBED, otherwise you're considered UNSUBSCRIBED // [impl->req~usubscription-subscribe-persistency~1] let mut subscriptions = persistency::SubscriptionsStore::new(&configuration); @@ -833,10 +831,8 @@ mod tests { } // [utest->req~usubscription-subscribe-expiration~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_schedule_future_unsubscribe() { - helpers::init_once(); - let (internal_cmd_sender, mut internal_cmd_receiver) = mpsc::channel::(INTERNAL_COMMAND_BUFFER_SIZE); @@ -871,10 +867,8 @@ mod tests { } // [utest->req~usubscription-subscribe-expiration~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_schedule_past_unsubscribe() { - helpers::init_once(); - let (internal_cmd_sender, mut internal_cmd_receiver) = mpsc::channel::(INTERNAL_COMMAND_BUFFER_SIZE); @@ -908,10 +902,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_remote_subscribe() { - helpers::init_once(); - let expected_topic = test_lib::helpers::remote_topic1_uri(); // build request @@ -961,9 +953,8 @@ mod tests { }; } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_remote_unsubscribe() { - helpers::init_once(); let expected_topic = test_lib::helpers::remote_topic1_uri(); // build request diff --git a/up-subscription/src/tests/notification_manager_tests.rs b/up-subscription/src/tests/notification_manager_tests.rs index a0bb832..4599432 100644 --- a/up-subscription/src/tests/notification_manager_tests.rs +++ b/up-subscription/src/tests/notification_manager_tests.rs @@ -56,7 +56,7 @@ mod tests { fn new(config: Arc, expected_message: Vec) -> Self { let shutdown_notification = Arc::new(Notify::new()); let (command_sender, command_receiver) = - mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE); + mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE.into()); let transport_mock = test_lib::mocks::utransport_mock_for_notification_manager(expected_message); @@ -143,9 +143,8 @@ mod tests { } } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_add_notifyee() { - helpers::init_once(); let command_sender = CommandSender::new(Arc::new(CommandSender::get_config()), vec![]); let expected_subscriber = test_lib::helpers::subscriber_info1().uri.unwrap(); @@ -166,9 +165,8 @@ mod tests { assert!(notification_topics.contains(&(expected_subscriber, expected_topic))); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_remove_notifyee() { - helpers::init_once(); let command_sender = CommandSender::new(Arc::new(CommandSender::get_config()), vec![]); // prepare things @@ -201,10 +199,8 @@ mod tests { // [utest->dsn~usubscription-change-notification-type~1] // [utest->dsn~usubscription-change-notification-topic~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_state_change() { - helpers::init_once(); - // prepare things // this is the status&topic&subscriber that the notification is about let changing_status = SubscriptionStatus { @@ -247,10 +243,8 @@ mod tests { } // [utest->req~usubscription-register-notifications~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_state_change_direct_notification() { - helpers::init_once(); - // prepare things // this is the status&topic&subscriber that the notification is about let changing_status = SubscriptionStatus { @@ -312,10 +306,8 @@ mod tests { } // [utest->req~usubscription-unregister-notifications~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_unregister_direct_notification() { - helpers::init_once(); - // prepare things // this is the status&topic&subscriber that the notification is about let changing_status = SubscriptionStatus { diff --git a/up-subscription/src/tests/persistency_tests.rs b/up-subscription/src/tests/persistency_tests.rs index 8d9dbf6..bcd7f1b 100644 --- a/up-subscription/src/tests/persistency_tests.rs +++ b/up-subscription/src/tests/persistency_tests.rs @@ -16,7 +16,7 @@ mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{sleep, Duration}; - use crate::{helpers, persistency, test_lib, USubscriptionConfiguration}; + use crate::{persistency, test_lib, USubscriptionConfiguration}; fn get_configuration() -> USubscriptionConfiguration { USubscriptionConfiguration::create( @@ -29,10 +29,8 @@ mod tests { .unwrap() } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_get_and_prune_expiring_subscriptions() { - helpers::init_once(); - let mut subscriptions = persistency::SubscriptionsStore::new(&get_configuration()); // Prepare subscription persistency with two subscriptions, one with and one without expiry timestamp @@ -88,10 +86,8 @@ mod tests { } // [utest->req~usubscription-reset~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_reset_subscriptions() { - helpers::init_once(); - let mut subscriptions = persistency::SubscriptionsStore::new(&get_configuration()); let _ = subscriptions.add_subscription( @@ -118,10 +114,8 @@ mod tests { } // [utest->req~usubscription-reset~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_reset_remote_subscriptions() { - helpers::init_once(); - let mut remote_topics = persistency::RemoteTopicsStore::new(&get_configuration()); let _ = remote_topics.add_topic_or_get_state(&test_lib::helpers::local_topic1_uri()); @@ -140,10 +134,8 @@ mod tests { } // [utest->req~usubscription-reset~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_reset_notifications() { - helpers::init_once(); - let mut notifications = persistency::NotificationStore::new(&get_configuration()); notifications @@ -172,10 +164,8 @@ mod tests { assert_eq!(data.unwrap().len(), 0); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_one_subscriber_multiple_topics() { - helpers::init_once(); - let mut notifications = persistency::NotificationStore::new(&get_configuration()); let _ = notifications.add_notifyee( @@ -193,10 +183,8 @@ mod tests { assert_eq!(data.unwrap().len(), 2); } - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_multiple_subscribers_one_topic() { - helpers::init_once(); - let mut notifications = persistency::NotificationStore::new(&get_configuration()); let _ = notifications.add_notifyee( diff --git a/up-subscription/src/tests/subscription_manager_tests.rs b/up-subscription/src/tests/subscription_manager_tests.rs index 1fd09e0..ddac0b7 100644 --- a/up-subscription/src/tests/subscription_manager_tests.rs +++ b/up-subscription/src/tests/subscription_manager_tests.rs @@ -14,14 +14,16 @@ // [utest->dsn~usubscription-state-machine~1] #[cfg(test)] mod tests { - use protobuf::MessageFull; use std::collections::HashMap; use std::error::Error; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use std::vec; + + use protobuf::MessageFull; use test_case::test_case; use tokio::sync::{mpsc, mpsc::Sender, oneshot, Notify}; + use tracing::debug; use up_rust::{ core::usubscription::{ @@ -66,9 +68,9 @@ mod tests { let transport_mock = MockTransport::default(); let shutdown_notification = Arc::new(Notify::new()); let (command_sender, command_receiver) = - mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE); + mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE.into()); let (notification_sender, mut notification_receiver) = - mpsc::channel::(config.notification_command_buffer); + mpsc::channel::(config.notification_command_buffer.into()); // Spawn notification receiver task let shutdown_notification_cloned = shutdown_notification.clone(); @@ -77,7 +79,7 @@ mod tests { tokio::select! { Some(event) = notification_receiver.recv() => { if let NotificationEvent::StateChange { subscriber, topic, status, respond_to } = event { - println!( + debug!( "Change Notification received: {} - {} - {}", subscriber.unwrap().to_uri(true), topic.to_uri(true), @@ -133,9 +135,9 @@ mod tests { let transport_mock = MockTransport::default(); let shutdown_notification = Arc::new(Notify::new()); let (command_sender, command_receiver) = - mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE); + mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE.into()); let (notification_sender, mut notification_receiver) = - mpsc::channel::(config.notification_command_buffer); + mpsc::channel::(config.notification_command_buffer.into()); // Spawn notification receiver task let shutdown_notification_cloned = shutdown_notification.clone(); @@ -149,7 +151,7 @@ mod tests { NotificationEvent::StateChange { .. } => { if let Some(pos) = expected_notifications.iter().position(|e| e == &event) { if let NotificationEvent::StateChange { subscriber, status, topic, respond_to } = event { - println!( + debug!( "Change Notification received: {} - {} - {}", subscriber.unwrap_or_default().to_uri(true), topic.to_uri(true), @@ -166,23 +168,23 @@ mod tests { } }, NotificationEvent::SetNotificationTopics { notification_topics_replacement, respond_to } => { - println!("Notification Manager SetNotificationTopic command received"); + debug!("Notification Manager SetNotificationTopic command received"); notification_topics = notification_topics_replacement; let _ = respond_to.send(()); }, NotificationEvent::GetNotificationTopics { respond_to } => { - println!("Notification Manager GetNotificationTopic command received"); + debug!("Notification Manager GetNotificationTopic command received"); let _ = respond_to.send(notification_topics.clone()); }, NotificationEvent::Reset { respond_to } => { - println!("Notification Manager Reset command received"); + debug!("Notification Manager Reset command received"); let _ = respond_to.send(Ok(())); } _ => panic!("Received unexpected notification event: {event:?}") } }, _ = shutdown_notification_cloned.notified() => { - println!("Shutting down notification reception loop"); + debug!("Shutting down notification reception loop"); break; }, }; @@ -232,7 +234,7 @@ mod tests { let shutdown_notification = Arc::new(Notify::new()); let (command_sender, command_receiver) = - mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE); + mpsc::channel::(DEFAULT_COMMAND_BUFFER_SIZE.into()); let mock_transport = Arc::new( test_lib::mocks::utransport_mock_for_rpc(vec![( @@ -242,7 +244,7 @@ mod tests { .await, ); let (notification_sender, _) = - mpsc::channel::(config.notification_command_buffer); + mpsc::channel::(config.notification_command_buffer.into()); let shutdown_notification_cloned = shutdown_notification.clone(); helpers::spawn_and_log_error(async move { @@ -424,9 +426,8 @@ mod tests { (test_lib::helpers::local_topic1_uri(), test_lib::helpers::subscriber_uri2()), (test_lib::helpers::local_topic2_uri(), test_lib::helpers::subscriber_uri2()) ]; "Multiple susbcriber-topic combinations")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_subscribe(topic_subscribers: Vec<(TopicUUri, SubscriberUUri)>) { - helpers::init_once(); let command_sender = CommandSender::new(); // Prepare things @@ -457,9 +458,8 @@ mod tests { // [utest->req~usubscription-subscribe-expiration~1] // [utest->req~usubscription-subscribe-no-expiration~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_subscribe_with_expiry() { - helpers::init_once(); let command_sender = CommandSender::new(); let now = SystemTime::now() @@ -525,10 +525,8 @@ mod tests { // [utest->req~usubscription-subscribe-remote-response~1] #[test_case(test_lib::helpers::remote_topic1_uri(), State::SUBSCRIBE_PENDING; "Remote topic, remote state SUBSCRIBED_PENDING")] #[test_case(test_lib::helpers::remote_topic1_uri(), State::SUBSCRIBED; "Remote topic, remote state SUBSCRIBED")] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_remote_subscribe(remote_topic: TopicUUri, remote_state: State) { - helpers::init_once(); - // Prepare things let remote_subscription_request = SubscriptionRequest { topic: Some(remote_topic.clone()).into(), @@ -590,10 +588,8 @@ mod tests { // [utest->req~usubscription-subscribe-remote~1] // [utest->req~usubscription-unsubscribe-last-remote~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_repeated_remote_subscribe() { - helpers::init_once(); - // Prepare things let remote_topic = test_lib::helpers::remote_topic1_uri(); let remote_subscription_request = SubscriptionRequest { @@ -653,9 +649,8 @@ mod tests { // All subscribers for a topic unsubscribe // [utest->req~usubscription-unsubscribe~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_final_unsubscribe() { - helpers::init_once(); let command_sender = CommandSender::new(); // Prepare things @@ -694,9 +689,8 @@ mod tests { } // Only some subscribers of a topic unsubscribe - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_partial_unsubscribe() { - helpers::init_once(); let command_sender = CommandSender::new(); // Prepare things @@ -748,9 +742,8 @@ mod tests { // All subscribers for a remote topic unsubscribe // [utest->req~usubscription-unsubscribe-last-remote~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_final_remote_unsubscribe() { - helpers::init_once(); let remote_topic = test_lib::helpers::remote_topic1_uri(); // Prepare things @@ -828,9 +821,8 @@ mod tests { // Some subscribers for a remote topic unsubscribe, but at least one subscriber is left // [utest->req~usubscription-unsubscribe-last-remote~1] // [utest->req~usubscription-unsubscribe-remote-unsubscribed~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_partial_remote_unsubscribe() { - helpers::init_once(); let remote_topic = test_lib::helpers::remote_topic1_uri(); // Prepare things - we're not expecting any remote-unsubscribe action in this case @@ -895,10 +887,8 @@ mod tests { // [utest->req~usubscription-subscribe-notifications~1] // [utest->dsn~usubscription-change-notification-update~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_local_subscribe_notification() { - helpers::init_once(); - // Prepare things let topic = test_lib::helpers::local_topic1_uri(); let subscriber = test_lib::helpers::subscriber_uri1(); @@ -926,10 +916,8 @@ mod tests { } // [utest->dsn~usubscription-change-notification-update~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_local_unsubscribe_notification() { - helpers::init_once(); - // Prepare things // Prepare things #[allow(clippy::mutable_key_type)] @@ -971,9 +959,9 @@ mod tests { } // TODO: Let's see if this is actually covered by a requirement - otherwise it should go - // #[tokio::test] + // #[test_log::test(tokio::test)] // async fn test_remote_subscribe_notification() { - // helpers::init_once(); + // ; // // Prepare things // let topic = test_lib::helpers::remote_topic1_uri(); @@ -1011,10 +999,8 @@ mod tests { // } // [utest->dsn~usubscription-change-notification-update~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_state_change_notification() { - helpers::init_once(); - // Prepare things let topic = test_lib::helpers::remote_topic1_uri(); let subscriber = test_lib::helpers::subscriber_uri1(); @@ -1065,9 +1051,8 @@ mod tests { } // [utest->req~usubscription-fetch-subscribers~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_fetch_subscribers() { - helpers::init_once(); let command_sender = CommandSender::new(); // set starting state @@ -1113,9 +1098,8 @@ mod tests { } // [utest->req~usubscription-fetch-subscriptions-by-subscriber~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_fetch_subscriptions_by_subscriber() { - helpers::init_once(); let command_sender = CommandSender::new(); // set starting state @@ -1174,9 +1158,8 @@ mod tests { } // [utest->req~usubscription-fetch-subscriptions-by-topic~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_fetch_subscriptions_by_topic() { - helpers::init_once(); let command_sender = CommandSender::new(); // set starting state @@ -1233,10 +1216,8 @@ mod tests { } // [utest->req~usubscription-reset~1] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_reset_notifications() { - helpers::init_once(); - // Prepare things let topic = test_lib::helpers::remote_topic1_uri(); let subscriber = test_lib::helpers::subscriber_uri1(); diff --git a/up-subscription/src/usubscription.rs b/up-subscription/src/usubscription.rs index 028f909..2cd1a8f 100644 --- a/up-subscription/src/usubscription.rs +++ b/up-subscription/src/usubscription.rs @@ -20,6 +20,7 @@ use tokio::{ task::JoinHandle, }; +use tracing::info; use up_rust::{ communication::{InMemoryRpcServer, RpcServer}, core::usubscription::{ @@ -27,7 +28,7 @@ use up_rust::{ RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_RESET, RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE, }, - UCode, UStatus, UTransport, UUri, + LocalUriProvider, UCode, UStatus, UTransport, UUri, }; use crate::{ @@ -80,6 +81,7 @@ pub struct USubscriptionStopper { impl USubscriptionStopper { pub async fn stop(&mut self) { + info!("Stopping uSubscription service"); self.shutdown_notification.notify_waiters(); self.subscription_joiner @@ -111,8 +113,6 @@ impl USubscriptionService { config: Arc, transport: Arc, ) -> Result { - helpers::init_once(); - let server = Arc::new(InMemoryRpcServer::new(transport.clone(), config.clone())); let shutdown_notification = Arc::new(Notify::new()); @@ -121,7 +121,7 @@ impl USubscriptionService { let transport_cloned = transport.clone(); let shutdown_notification_cloned = shutdown_notification.clone(); let (notification_sender, notification_receiver) = - mpsc::channel::(config.notification_command_buffer); + mpsc::channel::(config.notification_command_buffer.into()); let notification_joiner = helpers::spawn_and_log_error(async move { notification_manager::notification_engine( config_cloned, @@ -138,7 +138,7 @@ impl USubscriptionService { let transport_cloned = transport.clone(); let shutdown_notification_cloned = shutdown_notification.clone(); let (subscription_sender, subscription_receiver) = - mpsc::channel::(config.subscription_command_buffer); + mpsc::channel::(config.subscription_command_buffer.into()); let notification_sender_cloned = notification_sender.clone(); let subscription_joiner = helpers::spawn_and_log_error(async move { subscription_manager::handle_message( @@ -154,6 +154,11 @@ impl USubscriptionService { register_handlers(server, subscription_sender, notification_sender).await?; + info!( + "uSubscription service is up and running, listening on {}", + config.get_source_uri().to_uri(true) + ); + Ok(USubscriptionStopper { subscription_joiner: Some(subscription_joiner), notification_joiner: Some(notification_joiner),