Skip to content

Commit 2746996

Browse files
authored
Make constructor methods for pub/sub/etc. take rcl_node_t mutex (#290)
The constructors are not public, so we're free to use internal types as arguments. This signature makes it possible, or at least easier, to write parameter services that are owned by the node: With this change, the Node does not need to be constructed before the parameter services are constructed, which would somewhat contradict the node owning these services.
1 parent 27342d5 commit 2746996

File tree

5 files changed

+49
-31
lines changed

5 files changed

+49
-31
lines changed

rclrs/src/client.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use rosidl_runtime_rs::Message;
99

1010
use crate::error::{RclReturnCode, ToResult};
1111
use crate::MessageCow;
12-
use crate::Node;
1312
use crate::{rcl_bindings::*, RclrsError};
1413

1514
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -56,8 +55,11 @@ type RequestId = i64;
5655

5756
/// Main class responsible for sending requests to a ROS service.
5857
///
59-
/// The only available way to instantiate clients is via [`Node::create_client`], this is to
60-
/// ensure that [`Node`]s can track all the clients that have been created.
58+
/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
59+
/// ensure that [`Node`][2]s can track all the clients that have been created.
60+
///
61+
/// [1]: crate::Node::create_client
62+
/// [2]: crate::Node
6163
pub struct Client<T>
6264
where
6365
T: rosidl_runtime_rs::Service,
@@ -72,7 +74,7 @@ where
7274
T: rosidl_runtime_rs::Service,
7375
{
7476
/// Creates a new client.
75-
pub(crate) fn new(node: &Node, topic: &str) -> Result<Self, RclrsError>
77+
pub(crate) fn new(rcl_node_mtx: Arc<Mutex<rcl_node_t>>, topic: &str) -> Result<Self, RclrsError>
7678
// This uses pub(crate) visibility to avoid instantiating this struct outside
7779
// [`Node::create_client`], see the struct's documentation for the rationale
7880
where
@@ -86,7 +88,6 @@ where
8688
err,
8789
s: topic.into(),
8890
})?;
89-
let rcl_node = { &mut *node.rcl_node_mtx.lock().unwrap() };
9091

9192
// SAFETY: No preconditions for this function.
9293
let client_options = unsafe { rcl_client_get_default_options() };
@@ -98,7 +99,7 @@ where
9899
// afterwards.
99100
rcl_client_init(
100101
&mut rcl_client,
101-
rcl_node,
102+
&*rcl_node_mtx.lock().unwrap(),
102103
type_support,
103104
topic_c_string.as_ptr(),
104105
&client_options,
@@ -108,7 +109,7 @@ where
108109

109110
let handle = Arc::new(ClientHandle {
110111
rcl_client_mtx: Mutex::new(rcl_client),
111-
rcl_node_mtx: node.rcl_node_mtx.clone(),
112+
rcl_node_mtx,
112113
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
113114
});
114115

rclrs/src/node.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl Node {
184184
where
185185
T: rosidl_runtime_rs::Service,
186186
{
187-
let client = Arc::new(Client::<T>::new(self, topic)?);
187+
let client = Arc::new(Client::<T>::new(Arc::clone(&self.rcl_node_mtx), topic)?);
188188
self.clients
189189
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
190190
Ok(client)
@@ -243,7 +243,7 @@ impl Node {
243243
where
244244
T: Message,
245245
{
246-
Publisher::<T>::new(self, topic, qos)
246+
Publisher::<T>::new(Arc::clone(&self.rcl_node_mtx), topic, qos)
247247
}
248248

249249
/// Creates a [`Service`][1].
@@ -259,7 +259,11 @@ impl Node {
259259
T: rosidl_runtime_rs::Service,
260260
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
261261
{
262-
let service = Arc::new(Service::<T>::new(self, topic, callback)?);
262+
let service = Arc::new(Service::<T>::new(
263+
Arc::clone(&self.rcl_node_mtx),
264+
topic,
265+
callback,
266+
)?);
263267
self.services
264268
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
265269
Ok(service)
@@ -278,7 +282,12 @@ impl Node {
278282
where
279283
T: Message,
280284
{
281-
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
285+
let subscription = Arc::new(Subscription::<T>::new(
286+
Arc::clone(&self.rcl_node_mtx),
287+
topic,
288+
qos,
289+
callback,
290+
)?);
282291
self.subscriptions
283292
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
284293
Ok(subscription)

rclrs/src/publisher.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use rosidl_runtime_rs::{Message, RmwMessage};
99
use crate::error::{RclrsError, ToResult};
1010
use crate::qos::QoSProfile;
1111
use crate::rcl_bindings::*;
12-
use crate::Node;
1312

1413
mod loaned_message;
1514
pub use loaned_message::*;
@@ -69,7 +68,11 @@ where
6968
/// Creates a new `Publisher`.
7069
///
7170
/// Node and namespace changes are always applied _before_ topic remapping.
72-
pub fn new(node: &Node, topic: &str, qos: QoSProfile) -> Result<Self, RclrsError>
71+
pub fn new(
72+
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
73+
topic: &str,
74+
qos: QoSProfile,
75+
) -> Result<Self, RclrsError>
7376
where
7477
T: Message,
7578
{
@@ -81,7 +84,6 @@ where
8184
err,
8285
s: topic.into(),
8386
})?;
84-
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();
8587

8688
// SAFETY: No preconditions for this function.
8789
let mut publisher_options = unsafe { rcl_publisher_get_default_options() };
@@ -94,7 +96,7 @@ where
9496
// TODO: type support?
9597
rcl_publisher_init(
9698
&mut rcl_publisher,
97-
rcl_node,
99+
&*rcl_node_mtx.lock().unwrap(),
98100
type_support_ptr,
99101
topic_c_string.as_ptr(),
100102
&publisher_options,
@@ -104,7 +106,7 @@ where
104106

105107
Ok(Self {
106108
rcl_publisher_mtx: Mutex::new(rcl_publisher),
107-
rcl_node_mtx: Arc::clone(&node.rcl_node_mtx),
109+
rcl_node_mtx,
108110
type_support_ptr,
109111
message: PhantomData,
110112
})

rclrs/src/service.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
66
use rosidl_runtime_rs::Message;
77

88
use crate::error::{RclReturnCode, ToResult};
9-
use crate::{rcl_bindings::*, MessageCow, Node, RclrsError};
9+
use crate::{rcl_bindings::*, MessageCow, RclrsError};
1010

1111
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
1212
// they are running in. Therefore, this type can be safely sent to another thread.
@@ -51,8 +51,11 @@ type ServiceCallback<Request, Response> =
5151

5252
/// Main class responsible for responding to requests sent by ROS clients.
5353
///
54-
/// The only available way to instantiate services is via [`Node::create_service`], this is to
55-
/// ensure that [`Node`]s can track all the services that have been created.
54+
/// The only available way to instantiate services is via [`Node::create_service()`][1], this is to
55+
/// ensure that [`Node`][2]s can track all the services that have been created.
56+
///
57+
/// [1]: crate::Node::create_service
58+
/// [2]: crate::Node
5659
pub struct Service<T>
5760
where
5861
T: rosidl_runtime_rs::Service,
@@ -67,7 +70,11 @@ where
6770
T: rosidl_runtime_rs::Service,
6871
{
6972
/// Creates a new service.
70-
pub(crate) fn new<F>(node: &Node, topic: &str, callback: F) -> Result<Self, RclrsError>
73+
pub(crate) fn new<F>(
74+
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
75+
topic: &str,
76+
callback: F,
77+
) -> Result<Self, RclrsError>
7178
// This uses pub(crate) visibility to avoid instantiating this struct outside
7279
// [`Node::create_service`], see the struct's documentation for the rationale
7380
where
@@ -82,7 +89,6 @@ where
8289
err,
8390
s: topic.into(),
8491
})?;
85-
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();
8692

8793
// SAFETY: No preconditions for this function.
8894
let service_options = unsafe { rcl_service_get_default_options() };
@@ -93,8 +99,8 @@ where
9399
// The topic name and the options are copied by this function, so they can be dropped
94100
// afterwards.
95101
rcl_service_init(
96-
&mut rcl_service as *mut _,
97-
rcl_node as *mut _,
102+
&mut rcl_service,
103+
&*rcl_node_mtx.lock().unwrap(),
98104
type_support,
99105
topic_c_string.as_ptr(),
100106
&service_options as *const _,
@@ -104,7 +110,7 @@ where
104110

105111
let handle = Arc::new(ServiceHandle {
106112
rcl_service_mtx: Mutex::new(rcl_service),
107-
rcl_node_mtx: node.rcl_node_mtx.clone(),
113+
rcl_node_mtx,
108114
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
109115
});
110116

rclrs/src/subscription.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use rosidl_runtime_rs::{Message, RmwMessage};
88

99
use crate::error::{RclReturnCode, ToResult};
1010
use crate::qos::QoSProfile;
11-
use crate::Node;
1211
use crate::{rcl_bindings::*, RclrsError};
1312

1413
mod callback;
@@ -63,11 +62,13 @@ pub trait SubscriptionBase: Send + Sync {
6362
/// When a subscription is created, it may take some time to get "matched" with a corresponding
6463
/// publisher.
6564
///
66-
/// The only available way to instantiate subscriptions is via [`Node::create_subscription`], this
67-
/// is to ensure that [`Node`]s can track all the subscriptions that have been created.
65+
/// The only available way to instantiate subscriptions is via [`Node::create_subscription()`][3], this
66+
/// is to ensure that [`Node`][4]s can track all the subscriptions that have been created.
6867
///
6968
/// [1]: crate::spin_once
7069
/// [2]: crate::spin
70+
/// [3]: crate::Node::create_subscription
71+
/// [4]: crate::Node
7172
pub struct Subscription<T>
7273
where
7374
T: Message,
@@ -84,7 +85,7 @@ where
8485
{
8586
/// Creates a new subscription.
8687
pub(crate) fn new<Args>(
87-
node: &Node,
88+
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
8889
topic: &str,
8990
qos: QoSProfile,
9091
callback: impl SubscriptionCallback<T, Args>,
@@ -102,7 +103,6 @@ where
102103
err,
103104
s: topic.into(),
104105
})?;
105-
let rcl_node = &mut *node.rcl_node_mtx.lock().unwrap();
106106

107107
// SAFETY: No preconditions for this function.
108108
let mut subscription_options = unsafe { rcl_subscription_get_default_options() };
@@ -115,7 +115,7 @@ where
115115
// TODO: type support?
116116
rcl_subscription_init(
117117
&mut rcl_subscription,
118-
rcl_node,
118+
&*rcl_node_mtx.lock().unwrap(),
119119
type_support,
120120
topic_c_string.as_ptr(),
121121
&subscription_options,
@@ -125,7 +125,7 @@ where
125125

126126
let handle = Arc::new(SubscriptionHandle {
127127
rcl_subscription_mtx: Mutex::new(rcl_subscription),
128-
rcl_node_mtx: node.rcl_node_mtx.clone(),
128+
rcl_node_mtx,
129129
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
130130
});
131131

0 commit comments

Comments
 (0)