diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..f18f925b --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "libs/onvif-rs"] + path = libs/onvif-rs + url = https://github.com/lumeohq/onvif-rs diff --git a/Cargo.toml b/Cargo.toml index 1b90127b..800d0d76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ regex = "1.10.4" #TODO: Investigate rweb to use openapi spec for free # https://github.com/kdy1/rweb +async-trait = "0.1.41" actix-files = "0.6.5" actix-web = { version = "4.6.0", features = ["rustls"] } actix-web-validator = "5.0.1" @@ -66,6 +67,10 @@ tracing-tracy = { version = "0.10.5", features = ["ondemand", "broadcast"] } # N # Reference: https://github.com/tokio-rs/tracing/issues/2441 tracing-appender = { git = "https://github.com/joaoantoniocardoso/tracing", branch = "tracing-appender-0.2.2-with-filename-suffix" } +# onvif library does not work using git dependency +onvif = { path = "libs/onvif-rs/onvif" } +onvif-schema = { path = "libs/onvif-rs/schema", package = "schema", default-features = true, features = ["analytics", "devicemgmt", "event", "media", "ptz"] } + anyhow = "1" tokio = { version = "1.37", features = ["full"] } enum_dispatch = "0.3.13" diff --git a/libs/onvif-rs b/libs/onvif-rs new file mode 160000 index 00000000..1b1e6fd8 --- /dev/null +++ b/libs/onvif-rs @@ -0,0 +1 @@ +Subproject commit 1b1e6fd8b54ceed0eff1c7d48b59f571da2b86d0 diff --git a/src/lib/controls/mod.rs b/src/lib/controls/mod.rs index dd198c6d..cd3ad155 100644 --- a/src/lib/controls/mod.rs +++ b/src/lib/controls/mod.rs @@ -1 +1,2 @@ -pub mod types; \ No newline at end of file +pub mod onvif; +pub mod types; diff --git a/src/lib/controls/onvif/client.rs b/src/lib/controls/onvif/client.rs new file mode 100644 index 00000000..582a6957 --- /dev/null +++ b/src/lib/controls/onvif/client.rs @@ -0,0 +1,149 @@ +use onvif::soap; +use onvif_schema::{devicemgmt::GetDeviceInformationResponse, transport}; + +use anyhow::{anyhow, Result}; +use tracing::*; + +pub struct Clients { + devicemgmt: soap::client::Client, + event: Option, + deviceio: Option, + media: Option, + media2: Option, + imaging: Option, + ptz: Option, + analytics: Option, +} + +pub struct Auth { + pub credentials: Option, + pub url: Box, +} + +impl Clients { + #[instrument(level = "debug", skip(auth))] + pub async fn try_new(auth: &Auth) -> Result { + let creds = &auth.credentials; + let devicemgmt_uri = url::Url::parse(&auth.url)?; + let base_uri = &devicemgmt_uri.origin().ascii_serialization(); + + let mut this = Self { + devicemgmt: soap::client::ClientBuilder::new(&devicemgmt_uri) + .credentials(creds.clone()) + .build(), + imaging: None, + ptz: None, + event: None, + deviceio: None, + media: None, + media2: None, + analytics: None, + }; + + let services = + onvif_schema::devicemgmt::get_services(&this.devicemgmt, &Default::default()).await?; + + for service in &services.service { + let service_url = url::Url::parse(&service.x_addr).map_err(anyhow::Error::msg)?; + if !service_url.as_str().starts_with(base_uri) { + return Err(anyhow!( + "Service URI {service_url:?} is not within base URI {base_uri:?}" + )); + } + let svc = Some( + soap::client::ClientBuilder::new(&service_url) + .credentials(creds.clone()) + .build(), + ); + match service.namespace.as_str() { + "http://www.onvif.org/ver10/device/wsdl" => { + if service_url != devicemgmt_uri { + warn!( + "advertised device mgmt uri {service_url} not expected {devicemgmt_uri}" + ); + } + } + "http://www.onvif.org/ver10/events/wsdl" => this.event = svc, + "http://www.onvif.org/ver10/deviceIO/wsdl" => this.deviceio = svc, + "http://www.onvif.org/ver10/media/wsdl" => this.media = svc, + "http://www.onvif.org/ver20/media/wsdl" => this.media2 = svc, + "http://www.onvif.org/ver20/imaging/wsdl" => this.imaging = svc, + "http://www.onvif.org/ver20/ptz/wsdl" => this.ptz = svc, + "http://www.onvif.org/ver20/analytics/wsdl" => this.analytics = svc, + _ => debug!("unknown service: {:?}", service), + } + } + + Ok(this) + } + + #[instrument(level = "debug", skip(self))] + pub async fn get_device_information( + &self, + ) -> Result { + onvif_schema::devicemgmt::get_device_information(&self.devicemgmt, &Default::default()) + .await + } + + pub async fn get_stream_uris(&self) -> Result, transport::Error> { + let mut urls: Vec = vec![]; + let media_client = self + .media + .as_ref() + .ok_or_else(|| transport::Error::Other("Client media is not available".into()))?; + let profiles = onvif_schema::media::get_profiles(media_client, &Default::default()).await?; + debug!("get_profiles response: {:#?}", &profiles); + let requests: Vec<_> = profiles + .profiles + .iter() + .map( + |p: &onvif_schema::onvif::Profile| onvif_schema::media::GetStreamUri { + profile_token: onvif_schema::onvif::ReferenceToken(p.token.0.clone()), + stream_setup: onvif_schema::onvif::StreamSetup { + stream: onvif_schema::onvif::StreamType::RtpUnicast, + transport: onvif_schema::onvif::Transport { + protocol: onvif_schema::onvif::TransportProtocol::Rtsp, + tunnel: vec![], + }, + }, + }, + ) + .collect(); + + let responses = futures::future::try_join_all( + requests + .iter() + .map(|r| onvif_schema::media::get_stream_uri(media_client, r)), + ) + .await?; + for (p, resp) in profiles.profiles.iter().zip(responses.iter()) { + debug!("token={} name={}", &p.token.0, &p.name.0); + debug!(" {}", &resp.media_uri.uri); + match url::Url::parse(&resp.media_uri.uri) { + Ok(address) => urls.push(address), + Err(error) => { + error!( + "Failed to parse stream url: {}, reason: {error:?}", + &resp.media_uri.uri + ) + } + } + if let Some(ref v) = p.video_encoder_configuration { + debug!( + " {:?}, {}x{}", + v.encoding, v.resolution.width, v.resolution.height + ); + if let Some(ref r) = v.rate_control { + debug!(" {} fps, {} kbps", r.frame_rate_limit, r.bitrate_limit); + } + } + if let Some(ref a) = p.audio_encoder_configuration { + debug!( + " audio: {:?}, {} kbps, {} kHz", + a.encoding, a.bitrate, a.sample_rate + ); + } + } + Ok(urls) + } +} diff --git a/src/lib/controls/onvif/manager.rs b/src/lib/controls/onvif/manager.rs new file mode 100644 index 00000000..cb7cda0f --- /dev/null +++ b/src/lib/controls/onvif/manager.rs @@ -0,0 +1,144 @@ +use std::sync::{Arc, Mutex}; + +use anyhow::Result; +use tracing::*; + +use crate::stream::types::CaptureConfiguration; +use crate::stream::{manager as stream_manager, types::StreamInformation}; +use crate::video::types::VideoSourceType; +use crate::video::video_source_redirect::{VideoSourceRedirect, VideoSourceRedirectType}; +use crate::video_stream::types::VideoAndStreamInformation; + +use super::client::*; + +lazy_static! { + static ref MANAGER: Arc> = Default::default(); +} + +#[derive(Debug)] +pub struct Manager { + _process: tokio::task::JoinHandle>, +} + +impl Drop for Manager { + fn drop(&mut self) { + self._process.abort(); + } +} + +impl Default for Manager { + #[instrument(level = "trace")] + fn default() -> Self { + Self { + _process: tokio::spawn(async move { Manager::discover_loop().await }), + } + } +} + +impl Manager { + // Construct our manager, should be done inside main + #[instrument(level = "debug")] + pub fn init() { + MANAGER.as_ref(); + } + + #[instrument(level = "debug")] + async fn discover_loop() -> Result<()> { + use futures::stream::StreamExt; + use std::net::{IpAddr, Ipv4Addr}; + + loop { + debug!("Discovering onvif..."); + + const MAX_CONCURRENT_JUMPERS: usize = 100; + + onvif::discovery::DiscoveryBuilder::default() + .listen_address(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) + .duration(tokio::time::Duration::from_secs(5)) + .run() + .await? + .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| async move { + debug!("Device found: {device:#?}"); + + //TODO: We should add support to auth later + let credentials = None; + let clients = match Clients::try_new(&Auth { + credentials: credentials.clone(), + url: device.urls.first().unwrap().to_string().into(), + }) + .await + { + Ok(clients) => clients, + Err(error) => { + error!("Failed creating clients: {error:#?}"); + return; + } + }; + + match clients.get_stream_uris().await { + Ok(stream_uris) => { + let mut url = stream_uris[0].clone(); + + let name = if let Ok(device) = &clients.get_device_information().await { + format!("{} - {} - {}", device.model, device.serial_number, url) + } else { + if let Some(name) = device.name { + format!("{name} - {url}") + } else { + format!("{url}") + } + }; + + if let Some(credentials) = credentials { + if url.set_username(&credentials.username).is_err() { + error!("Failed setting username for {url}"); + } + if url.set_password(Some(&credentials.password)).is_err() { + error!("Failed setting password for {url}"); + } + } + let video_source_redirect = VideoSourceRedirect { + name: name.clone(), + source: VideoSourceRedirectType::Redirect( + stream_uris[0].to_string(), + ), + }; + + let video_and_stream = VideoAndStreamInformation { + name: name.clone(), + stream_information: StreamInformation { + endpoints: vec![url], + configuration: CaptureConfiguration::Redirect( + Default::default(), + ), + extended_configuration: None, + }, + video_source: VideoSourceType::Redirect(video_source_redirect), + }; + + if let Ok(streams) = stream_manager::streams().await { + for stream in streams { + if let Err(error) = + video_and_stream.conflicts_with(&stream.video_and_stream) + { + debug!("Stream {name} is already registered: {error}"); + return; + } + } + } + + if let Err(error) = + stream_manager::add_stream_and_start(video_and_stream).await + { + error!("Failed adding stream: {error:#?}"); + } + } + Err(error) => { + error!("Failed getting stream uris: {error:#?}"); + } + } + }) + .await; + } + } +} diff --git a/src/lib/controls/onvif/mod.rs b/src/lib/controls/onvif/mod.rs new file mode 100644 index 00000000..af2485e8 --- /dev/null +++ b/src/lib/controls/onvif/mod.rs @@ -0,0 +1,2 @@ +mod client; +pub mod manager; diff --git a/src/main.rs b/src/main.rs index 794c847c..d14be36f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use mavlink_camera_manager::{cli, helper, logger, mavlink, server, settings, stream}; +use mavlink_camera_manager::{cli, controls, helper, logger, mavlink, server, settings, stream}; use tracing::*; @@ -11,6 +11,8 @@ async fn main() -> Result<(), std::io::Error> { // Settings should start before everybody else to ensure that the CLI are stored settings::manager::init(Some(&cli::manager::settings_file())); + controls::onvif::manager::Manager::init(); + mavlink::manager::Manager::init(); stream::manager::init();