Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop minimum rate stream #11

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions include/web_video_server/image_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class ImageStreamer
ImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
image_transport::ImageTransport it);

virtual ~ImageStreamer();

void start();

bool isInactive();
Expand All @@ -24,9 +26,13 @@ class ImageStreamer
{
return topic_;
}
;

/**
* Restreams the last received image frame if older than max_age.
*/
virtual void restreamFrame(double max_age);
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time) = 0;
virtual void sendImage(const cv::Mat &, const ros::WallTime &time) = 0;

virtual void initialize(const cv::Mat &);

Expand All @@ -38,6 +44,10 @@ class ImageStreamer
int output_width_;
int output_height_;
bool invert_;
ros::WallTime last_frame;
cv::Mat output_size_image;
boost::mutex send_mutex_;

private:
image_transport::ImageTransport it_;
bool initialized_;
Expand Down
7 changes: 4 additions & 3 deletions include/web_video_server/jpeg_streamers.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ class MjpegStreamer : public ImageStreamer
public:
MjpegStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
image_transport::ImageTransport it);

~MjpegStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);
virtual void sendImage(const cv::Mat &, const ros::WallTime &time);

private:
int quality_;
Expand All @@ -38,8 +38,9 @@ class JpegSnapshotStreamer : public ImageStreamer
JpegSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, image_transport::ImageTransport it);

~JpegSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);
virtual void sendImage(const cv::Mat &, const ros::WallTime &time);

private:
int quality_;
Expand Down
5 changes: 3 additions & 2 deletions include/web_video_server/libav_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ class LibavStreamer : public ImageStreamer

~LibavStreamer();

static void SetupAVLibrary();
protected:
virtual void initializeEncoder();
virtual void sendImage(const cv::Mat&, const ros::Time& time);
virtual void sendImage(const cv::Mat&, const ros::WallTime& time);
virtual void initialize(const cv::Mat&);
AVOutputFormat* output_format_;
AVFormatContext* format_context_;
Expand All @@ -44,7 +45,7 @@ class LibavStreamer : public ImageStreamer
AVPicture* picture_;
AVPicture* tmp_picture_;
struct SwsContext* sws_context_;
ros::Time first_image_timestamp_;
ros::WallTime first_image_timestamp_;
boost::mutex encode_mutex_;

std::string format_name_;
Expand Down
2 changes: 2 additions & 0 deletions include/web_video_server/web_video_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ class WebVideoServer
async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end);

private:
void restreamFrames( double max_age );
void cleanup_inactive_streams();

ros::NodeHandle nh_;
image_transport::ImageTransport image_transport_;
ros::Timer cleanup_timer_;
int ros_threads_;
double publish_rate_;
int port_;
std::string address_;
boost::shared_ptr<async_web_server_cpp::HttpServer> server_;
Expand Down
44 changes: 41 additions & 3 deletions src/image_streamer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "web_video_server/image_streamer.h"
#include <cv_bridge/cv_bridge.h>

#include <iostream>
namespace web_video_server
{

Expand All @@ -14,6 +14,10 @@ ImageStreamer::ImageStreamer(const async_web_server_cpp::HttpRequest &request,
invert_ = request.has_query_param("invert");
}

ImageStreamer::~ImageStreamer()
{
}

void ImageStreamer::start()
{
image_sub_ = it_.subscribe(topic_, 1, &ImageStreamer::imageCallback, this);
Expand All @@ -23,6 +27,38 @@ void ImageStreamer::initialize(const cv::Mat &)
{
}

void ImageStreamer::restreamFrame(double max_age)
{
if (inactive_ || !initialized_ )
return;
try {
if ( last_frame + ros::WallDuration(max_age) < ros::WallTime::now() ) {
boost::mutex::scoped_lock lock(send_mutex_);
sendImage(output_size_image, ros::WallTime::now() ); // don't update last_frame, it may remain an old value.
}
}
catch (boost::system::system_error &e)
{
// happens when client disconnects
ROS_DEBUG("system_error exception: %s", e.what());
inactive_ = true;
return;
}
catch (std::exception &e)
{
ROS_ERROR_THROTTLE(30, "exception: %s", e.what());
inactive_ = true;
return;
}
catch (...)
{
ROS_ERROR_THROTTLE(30, "exception");
inactive_ = true;
return;
}

}

void ImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr &msg)
{
if (inactive_)
Expand Down Expand Up @@ -66,7 +102,7 @@ void ImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr &msg)
cv::flip(img, img, true);
}

cv::Mat output_size_image;
boost::mutex::scoped_lock lock(send_mutex_); // protects output_size_image
if (output_width_ != input_width || output_height_ != input_height)
{
cv::Mat img_resized;
Expand All @@ -84,7 +120,9 @@ void ImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr &msg)
initialize(output_size_image);
initialized_ = true;
}
sendImage(output_size_image, msg->header.stamp);

last_frame = ros::WallTime::now();
sendImage(output_size_image, last_frame );

}
catch (cv_bridge::Exception &e)
Expand Down
19 changes: 17 additions & 2 deletions src/jpeg_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ MjpegStreamer::MjpegStreamer(const async_web_server_cpp::HttpRequest &request,
connection->write("--boundarydonotcross \r\n");
}

void MjpegStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
MjpegStreamer::~MjpegStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void MjpegStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time)
{
std::vector<int> encode_params;
encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
Expand All @@ -35,6 +41,7 @@ void MjpegStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
headers->push_back(async_web_server_cpp::HttpHeader("X-Timestamp", stamp));
headers->push_back(
async_web_server_cpp::HttpHeader("Content-Length", boost::lexical_cast<std::string>(encoded_buffer.size())));
headers->push_back(async_web_server_cpp::HttpHeader("Access-Control-Allow-Origin", "*"));
connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers);
connection_->write_and_clear(encoded_buffer);
connection_->write("\r\n--boundarydonotcross \r\n");
Expand Down Expand Up @@ -64,7 +71,15 @@ JpegSnapshotStreamer::JpegSnapshotStreamer(const async_web_server_cpp::HttpReque
quality_ = request.get_query_param_value_or_default<int>("quality", 95);
}

void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
JpegSnapshotStreamer::~JpegSnapshotStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}



void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time)
{
std::vector<int> encode_params;
encode_params.push_back(CV_IMWRITE_JPEG_QUALITY);
Expand Down
19 changes: 13 additions & 6 deletions src/libav_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,22 @@ LibavStreamer::LibavStreamer(const async_web_server_cpp::HttpRequest &request,
{

bitrate_ = request.get_query_param_value_or_default<int>("bitrate", 100000);
qmin_ = request.get_query_param_value_or_default<int>("qmin", 10);
qmax_ = request.get_query_param_value_or_default<int>("qmax", 42);
gop_ = request.get_query_param_value_or_default<int>("gop", 250);
qmin_ = request.get_query_param_value_or_default<int>("qmin", 1);
qmax_ = request.get_query_param_value_or_default<int>("qmax", 40);
gop_ = request.get_query_param_value_or_default<int>("gop", 64);

}

void LibavStreamer::SetupAVLibrary()
{
av_lockmgr_register(&ffmpeg_boost_mutex_lock_manager);
av_register_all();
}

LibavStreamer::~LibavStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects all structures below when send is still in progress.
if (codec_context_)
avcodec_close(codec_context_);
if (frame_)
Expand Down Expand Up @@ -136,6 +142,7 @@ void LibavStreamer::initialize(const cv::Mat &img)
codec_context_ = video_stream_->codec;

// Set options
// calls av_opt_set_defaults internally:
avcodec_get_context_defaults3(codec_context_, codec_);

codec_context_->codec_id = output_format_->video_codec;
Expand All @@ -152,7 +159,7 @@ void LibavStreamer::initialize(const cv::Mat &img)
codec_context_->time_base.den = 1;
codec_context_->gop_size = gop_;
codec_context_->pix_fmt = PIX_FMT_YUV420P;
codec_context_->max_b_frames = 0;
codec_context_->max_b_frames = 1;

// Quality settings
codec_context_->qmin = qmin_;
Expand Down Expand Up @@ -230,7 +237,7 @@ void LibavStreamer::initializeEncoder()
{
}

void LibavStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
void LibavStreamer::sendImage(const cv::Mat &img, const ros::WallTime &time)
{
boost::mutex::scoped_lock lock(encode_mutex_);
if (first_image_timestamp_.isZero())
Expand Down Expand Up @@ -286,7 +293,7 @@ void LibavStreamer::sendImage(const cv::Mat &img, const ros::Time &time)

double seconds = (time - first_image_timestamp_).toSec();
// Encode video at 1/0.95 to minimize delay
pkt.pts = (int64_t)(seconds / av_q2d(video_stream_->time_base) * 0.95);
pkt.pts = (int64_t)(seconds / av_q2d(video_stream_->time_base) * 1.0);
if (pkt.pts <= 0)
pkt.pts = 1;
pkt.dts = AV_NOPTS_VALUE;
Expand Down
9 changes: 4 additions & 5 deletions src/vp8_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,14 @@ void Vp8Streamer::initializeEncoder()
av_opt_map["quality"] = quality_;
av_opt_map["deadline"] = "1";
av_opt_map["auto-alt-ref"] = "0";
av_opt_map["lag-in-frames"] = "1";
av_opt_map["rc_lookahead"] = "1";
av_opt_map["drop_frame"] = "1";
av_opt_map["lag-in-frames"] = "1"; // was: rc_lookahead
av_opt_map["error-resilient"] = "1";

for (AvOptMap::iterator itr = av_opt_map.begin(); itr != av_opt_map.end(); ++itr)
{
av_opt_set(codec_context_->priv_data, itr->first.c_str(), itr->second.c_str(), 0);
}

#if 0
// Buffering settings
int bufsize = 10;
codec_context_->rc_buffer_size = bufsize;
Expand All @@ -74,7 +72,8 @@ void Vp8Streamer::initializeEncoder()
av_opt_set_int(codec_context_->priv_data, "buf-initial", bufsize, 0);
av_opt_set_int(codec_context_->priv_data, "buf-optimal", bufsize, 0);
codec_context_->rc_buffer_aggressivity = 0.5;
codec_context_->frame_skip_threshold = 10;
codec_context_->frame_skip_threshold = 2;
#endif
}

Vp8StreamerType::Vp8StreamerType() :
Expand Down
51 changes: 43 additions & 8 deletions src/web_video_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ WebVideoServer::WebVideoServer(ros::NodeHandle &nh, ros::NodeHandle &private_nh)
private_nh.param("server_threads", server_threads, 1);

private_nh.param("ros_threads", ros_threads_, 2);
private_nh.param("publish_rate", publish_rate_, -1.0);

// required single-threaded, once-only init:
LibavStreamer::SetupAVLibrary();
stream_types_["mjpeg"] = boost::shared_ptr<ImageStreamerType>(new MjpegStreamerType());
stream_types_["vp8"] = boost::shared_ptr<ImageStreamerType>(new Vp8StreamerType());

Expand All @@ -66,25 +69,57 @@ WebVideoServer::~WebVideoServer()
void WebVideoServer::spin()
{
server_->run();

ROS_INFO_STREAM("Waiting For connections on " << address_ << ":" << port_);
ros::MultiThreadedSpinner spinner(ros_threads_);
spinner.spin();

ros::AsyncSpinner spinner(ros_threads_);
spinner.start();

if ( publish_rate_ > 0 ) {
ros::WallRate r(publish_rate_);

while( ros::ok() ) {
this->restreamFrames( 1.0 / publish_rate_ );
r.sleep();
}
} else {
ros::waitForShutdown();
}

server_->stop();
}

void WebVideoServer::restreamFrames( double max_age )
{
boost::mutex::scoped_lock lock(subscriber_mutex_);

typedef std::vector<boost::shared_ptr<ImageStreamer> >::iterator itr_type;

for (itr_type itr = image_subscribers_.begin(); itr < image_subscribers_.end(); ++itr)
{
(*itr)->restreamFrame( max_age );
}
}


void WebVideoServer::cleanup_inactive_streams()
{
boost::mutex::scoped_lock lock(subscriber_mutex_, boost::try_to_lock);
boost::mutex::scoped_lock lock(subscriber_mutex_);
if (lock)
{
typedef std::vector<boost::shared_ptr<ImageStreamer> >::iterator itr_type;
itr_type new_end = std::remove_if(image_subscribers_.begin(), image_subscribers_.end(),
boost::bind(&ImageStreamer::isInactive, _1));
for (itr_type itr = new_end; itr < image_subscribers_.end(); ++itr)
itr_type itr = image_subscribers_.begin();
while ( itr != image_subscribers_.end() )
{
ROS_INFO_STREAM("Removed Stream: " << (*itr)->getTopic());
if ( (*itr)->isInactive() ) {
ROS_INFO_STREAM("Removing Stream: " << (*itr)->getTopic() << " (streams left: "<< (image_subscribers_.end() - itr ) -1 << ")");
image_subscribers_.erase( itr );
itr = image_subscribers_.begin();
}
else {
++itr;
}
}
image_subscribers_.erase(new_end, image_subscribers_.end());
}
}

Expand Down