Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/time sequencer python #156

Open
wants to merge 5 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 123 additions & 2 deletions src/message_filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@

"""Message Filter Objects."""

from functools import reduce
import itertools
import threading
# import builtin_interfaces
from bisect import insort_right
from functools import reduce

import rclpy
from builtin_interfaces.msg import Time as TimeMsg
from rclpy.clock import ROSClock
from rclpy.duration import Duration
from rclpy.logging import LoggingSeverity
from rclpy.node import Node
from rclpy.time import Time


Expand Down Expand Up @@ -316,3 +318,122 @@ def add(self, msg, my_queue, my_queue_index=None):
del q[t.nanoseconds]
break # fast finish after the synchronization
self.lock.release()


class TimeSequencer(SimpleFilter):
"""
Sequences messages based on the timestamp of their header.

At construction, the TimeSequencer takes a duration 'delay' which specifies
how long to queue up messages to provide a time sequencing over them.
As messages arrive, they are sorted according to their timestamps.
A callback for a message is never invoked until the messages' timestamp is
out of date by at least the delay. However, for all messages which are out of
date by at least delay, their callbacks are invoked in temporal order.
If a message arrives from a time prior to a message which has already had its
callback invoked, it is thrown away.
"""

def __init__(
self,
input_filter: SimpleFilter,
delay: Duration | float,
update_rate: Duration | float,
queue_size: int,
node: Node,
msg_stamp_attr: str = "header.stamp",
):
"""
Construct a TimeSequencer filter for a subscriber.

Args:
----
input_filter (SimpleFilter): The input filter to connect to.
Typically a Subscriber.
delay (Duration | float): The delay (in seconds) to wait for
messages to arrive before dispatching them.
update_rate (Duration | float): The rate at which to check for
messages that are ready to be dispatched.
queue_size (int): The maximum number of messages to store.
node (Node): The node to create the timer on.
msg_stamp_attr (str, optional): The attribute to use for retrieving
the timestamp from the message. Should point to a
builtin_interfaces.msg.Time field. Defaults to "header.stamp".

"""
super().__init__()
if not isinstance(delay, Duration):
delay = Duration(seconds=delay)
if not isinstance(update_rate, Duration):
update_rate = Duration(seconds=update_rate)
self.delay = delay
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add types to member variables

self.update_rate = update_rate
self.queue_size = queue_size
self.lock = threading.Lock()
self.messages = []
self.last_time = Time()
self.node = node
self.msg_stamp_attrs = msg_stamp_attr.split(".")
self.update_timer = self.node.create_timer(
self.update_rate.nanoseconds / 1e9, self.dispatch
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this spins a timer up, I think it would make sense to also have a corresponding shutdown function to clean up resources:

def shutdown(self):
        self.update_timer.cancel()
        self.node.destroy_timer(self.update_timer)

self.incoming_connection = None
if input_filter is not None:
self.connectInput(input_filter)

def get_msg_stamp_attr(self, msg):
obj = msg
for attr in self.msg_stamp_attrs:
if not hasattr(obj, attr):
return None
obj = getattr(obj, attr)
return obj

def connectInput(self, input_filter: SimpleFilter):
if self.incoming_connection is not None:
raise RuntimeError("Already connected to an input filter.")
self.incoming_connection = input_filter.registerCallback(self.add)

def add(self, msg):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also underscore here for internal function

with self.lock:
stamp = self.get_stamp(msg)
if stamp is None:
return
if stamp.nanoseconds < self.last_time.nanoseconds:
return
# Insert msg into messages in sorted order
insort_right(self.messages, (stamp, msg))
# If queue_size is exceeded, remove the earliest message
if self.queue_size != 0 and len(self.messages) > self.queue_size:
del self.messages[0]
Comment on lines +408 to +409
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this correctly, I think that it should be documented that a queue_size == 0 means that there is effectively an unlimited queue.


def get_stamp(self, msg):
stamp = self.get_msg_stamp_attr(msg)
if stamp is not None:
if not isinstance(stamp, TimeMsg):
raise TypeError(
f"Expected {TimeMsg}, got {type(stamp)} in msg attribute "
f"{'.'.join(self.msg_stamp_attrs)}"
)
stamp = Time.from_msg(stamp)
return stamp
else:
self.node.get_logger().warn(
"Cannot use message without timestamp; discarding message."
)
return None

def dispatch(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider prefixing this with a single _ to mark that it's for internal use only.

to_call = []
with self.lock:
while self.messages:
stamp, msg = self.messages[0]
if stamp + self.delay <= self.node.get_clock().now():
self.last_time = stamp
# Remove message from messages
self.messages.pop(0)
to_call.append(msg)
else:
break
for msg in to_call:
self.signalMessage(msg)
Loading