diff --git a/Three.V8/GamePlayer.cpp b/Three.V8/GamePlayer.cpp index 2119ec8..bf90496 100644 --- a/Three.V8/GamePlayer.cpp +++ b/Three.V8/GamePlayer.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include "GamePlayer.h" #define ENABLE_MSAA 1 @@ -57,7 +58,7 @@ void GamePlayer::LoadScript(const char* dir, const char* filename) void GamePlayer::UnloadScript() { if (m_context != nullptr) - { + { v8::Isolate* isolate = m_v8vm.m_isolate; v8::HandleScope handle_scope(isolate); v8::Context::Scope context_scope(m_context->m_context.Get(isolate)); @@ -72,11 +73,8 @@ void GamePlayer::UnloadScript() void GamePlayer::Idle() { - if (m_context != nullptr) - { - m_context->CheckPendings(); - v8::platform::PumpMessageLoop(m_v8vm.m_platform.get(), m_v8vm.m_isolate); - } + AsyncCallbacks::CheckPendings(); + v8::platform::PumpMessageLoop(m_v8vm.m_platform.get(), m_v8vm.m_isolate); } void GamePlayer::Draw(int width, int height) diff --git a/Three.V8/binding.cpp b/Three.V8/binding.cpp index 0cdd767..7b9a4bd 100644 --- a/Three.V8/binding.cpp +++ b/Three.V8/binding.cpp @@ -583,81 +583,6 @@ void GameContext::remove_object(void* ptr) m_objects.erase(ptr); } -void GameContext::add_ws_client(WSClient* client) -{ - m_ws_clients.insert(client); -} - -void GameContext::remove_ws_client(WSClient* client) -{ - auto iter = m_ws_clients.find(client); - if (iter != m_ws_clients.end()) - { - m_ws_clients.erase(iter); - } -} - -void GameContext::add_opus_recorder(OpusRecorder* rec) -{ - m_opus_recorders.insert(rec); -} - -void GameContext::remove_opus_recorder(OpusRecorder* rec) -{ - auto iter = m_opus_recorders.find(rec); - if (iter != m_opus_recorders.end()) - { - m_opus_recorders.erase(iter); - } -} - - -void GameContext::add_avc_recorder(AVCRecorder* rec) -{ - m_avc_recorders.insert(rec); -} - -void GameContext::remove_avc_recorder(AVCRecorder* rec) -{ - auto iter = m_avc_recorders.find(rec); - if (iter != m_avc_recorders.end()) - { - m_avc_recorders.erase(iter); - } -} - -void GameContext::CheckPendings() -{ - m_http->CheckPendings(); - - { - auto iter = m_ws_clients.begin(); - while (iter != m_ws_clients.end()) - { - (*iter)->CheckPending(); - iter++; - } - } -#if THREE_MM - { - auto iter = m_opus_recorders.begin(); - while (iter != m_opus_recorders.end()) - { - (*iter)->CheckPending(); - iter++; - } - } - { - auto iter = m_avc_recorders.begin(); - while (iter != m_avc_recorders.end()) - { - (*iter)->CheckPending(); - iter++; - } - } -#endif -} - void GameContext::SetPrintCallbacks(void* ptr, PrintCallback print_callback, PrintCallback error_callback) { m_print_callback_data = ptr; diff --git a/Three.V8/binding.h b/Three.V8/binding.h index e1a1516..38d1127 100644 --- a/Three.V8/binding.h +++ b/Three.V8/binding.h @@ -50,9 +50,6 @@ struct GlobalDefinitions class GamePlayer; class HttpClient; -class WSClient; -class OpusRecorder; -class AVCRecorder; class UIManager; class GameContext { @@ -76,26 +73,12 @@ class GameContext void regiter_object(v8::Local obj, Dtor dtor); void remove_object(void* ptr); - void add_ws_client(WSClient* client); - void remove_ws_client(WSClient* client); - - void add_opus_recorder(OpusRecorder* rec); - void remove_opus_recorder(OpusRecorder* rec); - - void add_avc_recorder(AVCRecorder* rec); - void remove_avc_recorder(AVCRecorder* rec); - - void CheckPendings(); - typedef void (*PrintCallback)(void* ptr, const char* str); void SetPrintCallbacks(void* ptr, PrintCallback print_callback, PrintCallback error_callback); private: GamePlayer* m_gamePlayer; - std::unique_ptr m_http; - std::unordered_set m_ws_clients; - std::unordered_set m_opus_recorders; - std::unordered_set m_avc_recorders; + std::unique_ptr m_http; std::unique_ptr m_ui_manager; static GlobalDefinitions s_globals; void _create_context(); diff --git a/Three.V8/multimedia/AVCRecorder.hpp b/Three.V8/multimedia/AVCRecorder.hpp index 99b89c0..84d9a74 100644 --- a/Three.V8/multimedia/AVCRecorder.hpp +++ b/Three.V8/multimedia/AVCRecorder.hpp @@ -46,9 +46,9 @@ void WrapperAVCRecorder::dtor(void* ptr, GameContext* ctx) if (data != nullptr) { delete data; + self->SetCallback(nullptr, nullptr); } } - ctx->remove_avc_recorder(self); delete self; } @@ -66,7 +66,6 @@ void WrapperAVCRecorder::New(const v8::FunctionCallbackInfo& info) AVCRecorder* self = new AVCRecorder(id_device); info.This()->SetAlignedPointerInInternalField(0, self); lctx.ctx()->regiter_object(info.This(), dtor); - lctx.ctx()->add_avc_recorder(self); } diff --git a/Three.V8/multimedia/OpusRecorder.hpp b/Three.V8/multimedia/OpusRecorder.hpp index 836964e..6ecebad 100644 --- a/Three.V8/multimedia/OpusRecorder.hpp +++ b/Three.V8/multimedia/OpusRecorder.hpp @@ -46,9 +46,9 @@ void WrapperOpusRecorder::dtor(void* ptr, GameContext* ctx) if (data != nullptr) { delete data; + self->SetCallback(nullptr, nullptr); } - } - ctx->remove_opus_recorder(self); + } delete self; } @@ -65,8 +65,7 @@ void WrapperOpusRecorder::New(const v8::FunctionCallbackInfo& info) OpusRecorder* self = new OpusRecorder(id_device); info.This()->SetAlignedPointerInInternalField(0, self); - lctx.ctx()->regiter_object(info.This(), dtor); - lctx.ctx()->add_opus_recorder(self); + lctx.ctx()->regiter_object(info.This(), dtor); } diff --git a/Three.V8/network/WSClient.hpp b/Three.V8/network/WSClient.hpp index 293940c..246d3a1 100644 --- a/Three.V8/network/WSClient.hpp +++ b/Three.V8/network/WSClient.hpp @@ -53,6 +53,7 @@ void WrapperWSClient::dtor(void* ptr, GameContext* ctx) if (data != nullptr) { delete data; + self->SetOpenCallback(nullptr, nullptr); } } { @@ -60,9 +61,9 @@ void WrapperWSClient::dtor(void* ptr, GameContext* ctx) if (data != nullptr) { delete data; + self->SetMessageCallback(nullptr, nullptr); } - } - ctx->remove_ws_client(self); + } delete self; } @@ -74,8 +75,7 @@ void WrapperWSClient::New(const v8::FunctionCallbackInfo& info) WSClient* self = new WSClient(url.c_str()); info.This()->SetAlignedPointerInInternalField(0, self); - lctx.ctx()->regiter_object(info.This(), dtor); - lctx.ctx()->add_ws_client(self); + lctx.ctx()->regiter_object(info.This(), dtor); } void WrapperWSClient::Send(const v8::FunctionCallbackInfo& info) diff --git a/ThreeEngine/ThreeEngine.vcxproj b/ThreeEngine/ThreeEngine.vcxproj index 5502596..6ce1ea2 100644 --- a/ThreeEngine/ThreeEngine.vcxproj +++ b/ThreeEngine/ThreeEngine.vcxproj @@ -258,6 +258,7 @@ + @@ -408,6 +409,8 @@ + + diff --git a/ThreeEngine/ThreeEngine.vcxproj.filters b/ThreeEngine/ThreeEngine.vcxproj.filters index 2eb3de0..0659dbb 100644 --- a/ThreeEngine/ThreeEngine.vcxproj.filters +++ b/ThreeEngine/ThreeEngine.vcxproj.filters @@ -537,6 +537,9 @@ Source Files\renderers\routines + + Source Files\utils + @@ -992,6 +995,12 @@ Header Files\renderers\routines + + Header Files\utils + + + Header Files\utils + diff --git a/ThreeEngine/network/HttpClient.cpp b/ThreeEngine/network/HttpClient.cpp index bdf3b86..8ba18a2 100644 --- a/ThreeEngine/network/HttpClient.cpp +++ b/ThreeEngine/network/HttpClient.cpp @@ -1,12 +1,30 @@ #include using namespace boost::urls; +#include "utils/AsyncCallbacks.h" #include "HttpClient.h" #include #include #include "root_certificates.hpp" +class HttpClient::GetData : public Callable +{ +public: + std::string url; + GetCallback callback; + void* userData = nullptr; + std::thread* thread = nullptr; + GetResult result; + + void call() override + { + thread->join(); + delete thread; + callback(result, userData); + } +}; + HttpClient::HttpClient() : m_resolver(m_ioc) , m_ssl_ctx(ssl::context::tlsv12_client) @@ -17,48 +35,7 @@ HttpClient::HttpClient() HttpClient::~HttpClient() { - // Get - { - auto iter = m_pending_gets.begin(); - while (iter != m_pending_gets.end()) - { - PendingGet* get_data = *iter; - get_data->thread->join(); - delete get_data->thread; - delete get_data; - iter++; - } - } -} - - -void HttpClient::CheckPendings() -{ - // Get - { - std::vector remove_lst; - - auto iter = m_pending_gets.begin(); - while (iter != m_pending_gets.end()) - { - PendingGet* get_data = *iter; - if (get_data->finished) - { - get_data->thread->join(); - delete get_data->thread; - get_data->callback(get_data->result, get_data->userData); - remove_lst.push_back(get_data); - } - iter++; - } - - for (size_t i = 0; i < remove_lst.size(); i++) - { - delete remove_lst[i]; - m_pending_gets.erase(remove_lst[i]); - } - } } bool HttpClient::Get(const char* url, std::vector& data) @@ -170,20 +147,19 @@ bool HttpClient::Get(const char* url, std::vector& data) return false; } -void HttpClient::GetThread(HttpClient* self, PendingGet* get_data) +void HttpClient::GetThread(HttpClient* self, GetData* get_data) { get_data->result.result = self->Get(get_data->url.c_str(), get_data->result.data); - get_data->finished = true; + AsyncCallbacks::Add(get_data); } void HttpClient::GetAsync(const char* url, GetCallback callback, void* userData) { - PendingGet* get_data = new PendingGet; + GetData* get_data = new GetData; get_data->url = url; get_data->callback = callback; get_data->userData = userData; get_data->thread = new std::thread(GetThread, this, get_data); - m_pending_gets.insert(get_data); } bool HttpClient::GetHeaders(const char* url, std::unordered_map& headers) diff --git a/ThreeEngine/network/HttpClient.h b/ThreeEngine/network/HttpClient.h index 0e2f999..b363711 100644 --- a/ThreeEngine/network/HttpClient.h +++ b/ThreeEngine/network/HttpClient.h @@ -17,7 +17,6 @@ using tcp = net::ip::tcp; // from #include #include -#include #include // Get Async @@ -34,8 +33,6 @@ class HttpClient public: HttpClient(); ~HttpClient(); - - void CheckPendings(); bool Get(const char* url, std::vector& data); void GetAsync(const char* url, GetCallback callback, void* userData); @@ -48,20 +45,10 @@ class HttpClient tcp::resolver m_resolver; // ssl - ssl::context m_ssl_ctx; + ssl::context m_ssl_ctx; // Get Async - struct PendingGet - { - std::string url; - GetCallback callback; - void* userData = nullptr; - std::thread* thread = nullptr; - bool finished = false; - GetResult result; - - }; - std::unordered_set m_pending_gets; - static void GetThread(HttpClient* self, PendingGet* get_data); + class GetData; + static void GetThread(HttpClient* self, GetData* get_data); }; \ No newline at end of file diff --git a/ThreeEngine/network/WSClient.cpp b/ThreeEngine/network/WSClient.cpp index 2c226aa..af24474 100644 --- a/ThreeEngine/network/WSClient.cpp +++ b/ThreeEngine/network/WSClient.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -27,54 +26,75 @@ using namespace boost::urls; #include "root_certificates.hpp" -#include "utils/Semaphore.h" +#include "utils/ConcurrentQueue.h" +#include "utils/AsyncCallbacks.h" -template -class ConcurrentQueue +class WSClient::Impl { public: - ConcurrentQueue() + Impl() { + AsyncCallbacks::AddSubQueue(&async_queue); } - ~ConcurrentQueue() + virtual ~Impl() { + AsyncCallbacks::RemoveSubQueue(&async_queue); } - size_t Size() - { - return m_queue.size(); - } + virtual void Send(const void* data, size_t size, bool is_binary) = 0; + + OpenCallback open_callback = nullptr; + void* open_callback_data = nullptr; - T Front() + MessageCallback msg_callback = nullptr; + void* msg_callback_data = nullptr; + + struct Msg { - return m_queue.front(); - } + std::vector data; + bool is_binary; + }; - void Push(T packet) + class OpenCallable : public Callable { - m_mutex.lock(); - m_queue.push(packet); - m_mutex.unlock(); - m_semaphore_out.notify(); - } + public: + Impl* impl; + OpenCallable(Impl* impl) : impl(impl) + { + + } + + void call() override + { + if (impl->open_callback != nullptr) + { + impl->open_callback(impl->open_callback_data); + } + } + }; - T Pop() + class MessageCallable : public Callable { - m_semaphore_out.wait(); - m_mutex.lock(); - T ret = m_queue.front(); - m_queue.pop(); - m_mutex.unlock(); - return ret; - } + public: + Impl* impl; + Msg msg; + MessageCallable(Impl* impl, const Msg& msg) : impl(impl), msg(msg) + { -private: - std::queue m_queue; - std::mutex m_mutex; - Semaphore m_semaphore_out; -}; + } + void call() override + { + if (impl->msg_callback != nullptr) + { + impl->msg_callback(msg.data.data(), msg.data.size(), msg.is_binary, impl->msg_callback_data); + } + } + }; + + AsyncQueue async_queue; +}; class WSClientImpl : public WSClient::Impl { @@ -91,27 +111,7 @@ class WSClientImpl : public WSClient::Impl { m_running = false; m_thread->join(); - } - - void CheckPending() override - { - if (m_connected) - { - m_connected = false; - if (open_callback != nullptr) - { - open_callback(open_callback_data); - } - } - while (m_user_read_queue.Size() > 0) - { - Msg msg = m_user_read_queue.Pop(); - if (msg_callback != nullptr) - { - msg_callback(msg.data.data(), msg.data.size(), msg.is_binary, msg_callback_data); - } - } - } + } void Send(const void* data, size_t size, bool is_binary) override { @@ -129,19 +129,11 @@ class WSClientImpl : public WSClient::Impl beast::flat_buffer m_buffer; std::string m_host; - - struct Msg - { - std::vector data; - bool is_binary; - }; + std::queue m_write_queue; - - ConcurrentQueue m_user_read_queue; ConcurrentQueue m_user_write_queue; - bool m_running = true; - bool m_connected = false; + bool m_running = true; std::unique_ptr m_thread; static void on_create(WSClientImpl* self, std::string host, std::string port) @@ -213,7 +205,7 @@ class WSClientImpl : public WSClient::Impl std::placeholders::_1, std::placeholders::_2)); - m_connected = true; + async_queue.Add(new OpenCallable(this)); } void on_read(boost::system::error_code ec, @@ -225,7 +217,8 @@ class WSClientImpl : public WSClient::Impl msg.data.resize(m_buffer.data().size()); memcpy(msg.data.data(), m_buffer.data().data(), m_buffer.data().size()); msg.is_binary = m_ws.got_binary(); - m_user_read_queue.Push(msg); + + async_queue.Add(new MessageCallable(this, msg)); m_buffer.clear(); @@ -281,26 +274,6 @@ class WSClientImpl_SSL : public WSClient::Impl m_thread->join(); } - void CheckPending() override - { - if (m_connected) - { - m_connected = false; - if (open_callback != nullptr) - { - open_callback(open_callback_data); - } - } - while (m_user_read_queue.Size() > 0) - { - Msg msg = m_user_read_queue.Pop(); - if (msg_callback != nullptr) - { - msg_callback(msg.data.data(), msg.data.size(), msg.is_binary, msg_callback_data); - } - } - } - void Send(const void* data, size_t size, bool is_binary) override { std::vector msg_data(size); @@ -321,18 +294,10 @@ class WSClientImpl_SSL : public WSClient::Impl beast::flat_buffer m_buffer; std::string m_host; - struct Msg - { - std::vector data; - bool is_binary; - }; std::queue m_write_queue; - - ConcurrentQueue m_user_read_queue; ConcurrentQueue m_user_write_queue; - bool m_running = true; - bool m_connected = false; + bool m_running = true; std::unique_ptr m_thread; static void on_create(WSClientImpl_SSL* self, std::string host, std::string port) @@ -414,7 +379,7 @@ class WSClientImpl_SSL : public WSClient::Impl std::placeholders::_1, std::placeholders::_2)); - m_connected = true; + async_queue.Add(new OpenCallable(this)); } void on_read(boost::system::error_code ec, @@ -426,7 +391,8 @@ class WSClientImpl_SSL : public WSClient::Impl msg.data.resize(m_buffer.data().size()); memcpy(msg.data.data(), m_buffer.data().data(), m_buffer.data().size()); msg.is_binary = m_ws.got_binary(); - m_user_read_queue.Push(msg); + + async_queue.Add(new MessageCallable(this, msg)); m_buffer.clear(); @@ -498,11 +464,6 @@ WSClient::~WSClient() } -void WSClient::CheckPending() -{ - m_impl->CheckPending(); -} - void WSClient::Send(const void* data, size_t size, bool is_binary) { m_impl->Send(data, size, is_binary); diff --git a/ThreeEngine/network/WSClient.h b/ThreeEngine/network/WSClient.h index 5d4f18c..5361fcd 100644 --- a/ThreeEngine/network/WSClient.h +++ b/ThreeEngine/network/WSClient.h @@ -11,20 +11,8 @@ class WSClient WSClient(const char* url); ~WSClient(); - class Impl - { - public: - virtual void CheckPending() = 0; - virtual void Send(const void* data, size_t size, bool is_binary) = 0; - - OpenCallback open_callback = nullptr; - void* open_callback_data = nullptr; - - MessageCallback msg_callback = nullptr; - void* msg_callback_data = nullptr; - }; - - void CheckPending(); + class Impl; + void Send(const void* data, size_t size, bool is_binary); void SetOpenCallback(OpenCallback open_callback, void* open_callback_data); diff --git a/ThreeEngine/utils/AsyncCallbacks.cpp b/ThreeEngine/utils/AsyncCallbacks.cpp new file mode 100644 index 0000000..9a810d1 --- /dev/null +++ b/ThreeEngine/utils/AsyncCallbacks.cpp @@ -0,0 +1,73 @@ +#include "AsyncCallbacks.h" + +AsyncQueue::AsyncQueue() +{ + +} + +AsyncQueue::~AsyncQueue() +{ + CheckPendings(); +} + +void AsyncQueue::Add(Callable* callable) +{ + queue.Push(callable); +} + +void AsyncQueue::CheckPendings() +{ + while (queue.Size() > 0) + { + Callable* callable = queue.Pop(); + callable->call(); + delete callable; + } +} + +AsyncCallbacks::AsyncCallbacks() +{ + +} + +AsyncCallbacks::~AsyncCallbacks() +{ + +} + +AsyncCallbacks& AsyncCallbacks::_singleton() +{ + static AsyncCallbacks singleton; + return singleton; +} + +void AsyncCallbacks::Add(Callable* callable) +{ + AsyncCallbacks& singleton = _singleton(); + singleton.m_default_queue.Add(callable); +} + +void AsyncCallbacks::AddSubQueue(AsyncQueue* queue) +{ + AsyncCallbacks& singleton = _singleton(); + singleton.m_sub_queues.insert(queue); +} + +void AsyncCallbacks::RemoveSubQueue(AsyncQueue* queue) +{ + AsyncCallbacks& singleton = _singleton(); + singleton.m_sub_queues.erase(queue); +} + +void AsyncCallbacks::CheckPendings() +{ + AsyncCallbacks& singleton = _singleton(); + singleton.m_default_queue.CheckPendings(); + auto iter = singleton.m_sub_queues.begin(); + while (iter != singleton.m_sub_queues.end()) + { + (*iter)->CheckPendings(); + iter++; + } + +} \ No newline at end of file diff --git a/ThreeEngine/utils/AsyncCallbacks.h b/ThreeEngine/utils/AsyncCallbacks.h new file mode 100644 index 0000000..8b6abae --- /dev/null +++ b/ThreeEngine/utils/AsyncCallbacks.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include "ConcurrentQueue.h" + +class Callable +{ +public: + Callable() {}; + virtual ~Callable() { } + virtual void call() = 0; +}; + +class AsyncQueue +{ +public: + AsyncQueue(); + ~AsyncQueue(); + void Add(Callable* callable); + void CheckPendings(); + +private: + ConcurrentQueue queue; +}; + +class AsyncCallbacks +{ +public: + static void Add(Callable* callable); + static void AddSubQueue(AsyncQueue* queue); + static void RemoveSubQueue(AsyncQueue* queue); + static void CheckPendings(); + +private: + AsyncQueue m_default_queue; + std::unordered_set m_sub_queues; + + AsyncCallbacks(); + ~AsyncCallbacks(); + + static AsyncCallbacks& _singleton(); + +}; diff --git a/ThreeEngine/utils/ConcurrentQueue.h b/ThreeEngine/utils/ConcurrentQueue.h new file mode 100644 index 0000000..60b48d2 --- /dev/null +++ b/ThreeEngine/utils/ConcurrentQueue.h @@ -0,0 +1,51 @@ +#pragma once + +#include +#include "utils/Semaphore.h" + +template +class ConcurrentQueue +{ +public: + ConcurrentQueue() + { + } + + ~ConcurrentQueue() + { + } + + size_t Size() + { + return m_queue.size(); + } + + T Front() + { + return m_queue.front(); + } + + void Push(T packet) + { + m_mutex.lock(); + m_queue.push(packet); + m_mutex.unlock(); + m_semaphore_out.notify(); + } + + T Pop() + { + m_semaphore_out.wait(); + m_mutex.lock(); + T ret = m_queue.front(); + m_queue.pop(); + m_mutex.unlock(); + return ret; + } + +private: + std::queue m_queue; + std::mutex m_mutex; + Semaphore m_semaphore_out; +}; + diff --git a/ThreeMM/AVCPlayer.cpp b/ThreeMM/AVCPlayer.cpp index 4a99ade..4be1f63 100644 --- a/ThreeMM/AVCPlayer.cpp +++ b/ThreeMM/AVCPlayer.cpp @@ -1,58 +1,9 @@ #include -#include #include "utils/Image.h" -#include "utils/Semaphore.h" +#include "utils/ConcurrentQueue.h" #include "renderers/GLUtils.h" #include "AVCPlayer.h" - -template -class ConcurrentQueue -{ -public: - ConcurrentQueue() - { - } - - ~ConcurrentQueue() - { - } - - size_t Size() - { - return m_queue.size(); - } - - T Front() - { - return m_queue.front(); - } - - void Push(T packet) - { - m_mutex.lock(); - m_queue.push(packet); - m_mutex.unlock(); - m_semaphore_out.notify(); - } - - T Pop() - { - m_semaphore_out.wait(); - m_mutex.lock(); - T ret = m_queue.front(); - m_queue.pop(); - m_mutex.unlock(); - return ret; - } - -private: - std::queue m_queue; - std::mutex m_mutex; - Semaphore m_semaphore_out; -}; - - extern "C" { #include #include diff --git a/ThreeMM/AVCRecorder.cpp b/ThreeMM/AVCRecorder.cpp index 6310dd6..223d06b 100644 --- a/ThreeMM/AVCRecorder.cpp +++ b/ThreeMM/AVCRecorder.cpp @@ -3,10 +3,9 @@ #include #include -#include #include "utils/Image.h" #include "utils/Utils.h" -#include "utils/Semaphore.h" +#include "utils/AsyncCallbacks.h" #include "AVCRecorder.h" #include "MMCamera.h" @@ -21,51 +20,6 @@ extern "C" #include } -template -class ConcurrentQueue -{ -public: - ConcurrentQueue() - { - } - - ~ConcurrentQueue() - { - } - - size_t Size() - { - return m_queue.size(); - } - - T Front() - { - return m_queue.front(); - } - - void Push(T packet) - { - m_mutex.lock(); - m_queue.push(packet); - m_mutex.unlock(); - m_semaphore_out.notify(); - } - - T Pop() - { - m_semaphore_out.wait(); - m_mutex.lock(); - T ret = m_queue.front(); - m_queue.pop(); - m_mutex.unlock(); - return ret; - } - -private: - std::queue m_queue; - std::mutex m_mutex; - Semaphore m_semaphore_out; -}; static const AVCodecID video_codec_id = AV_CODEC_ID_H264; static const AVPixelFormat pix_fmt = AV_PIX_FMT_NV12; @@ -214,10 +168,14 @@ class AVCRecorder::Internal m_start_time = time_micro_sec(); m_thread_encode = (std::unique_ptr)(new std::thread(thread_encode, this)); + + AsyncCallbacks::AddSubQueue(&m_async_queue); } ~Internal() { + AsyncCallbacks::RemoveSubQueue(&m_async_queue); + m_running = false; m_thread_encode->join(); @@ -230,7 +188,10 @@ class AVCRecorder::Internal avformat_close_input(&m_p_fmt_ctx); } - ConcurrentQueue> m_pkt_queue; + PacketCallback callback = nullptr; + void* callback_data = nullptr; + + private: int m_width_in, m_height_in; @@ -250,6 +211,29 @@ class AVCRecorder::Internal struct SwsContext* m_sws_ctx = nullptr; + class PacketCallable : public Callable + { + public: + Internal* self; + std::vector packet; + + PacketCallable(Internal* self, const std::vector& packet) + : self(self), packet(packet) + { + + } + + void call() override + { + if (self->callback != nullptr) + { + self->callback(packet.data(), packet.size(), self->callback_data); + } + } + }; + + AsyncQueue m_async_queue; + static void thread_encode(Internal* self) { while (self->m_running) @@ -295,7 +279,8 @@ class AVCRecorder::Internal std::vector packet(pkt.size+1); packet[0] = pkt.flags & 1; memcpy(packet.data() + 1, pkt.data, pkt.size); - self->m_pkt_queue.Push(packet); + + self->m_async_queue.Add(new PacketCallable(self, packet)); av_packet_unref(&pkt); } @@ -329,25 +314,11 @@ AVCRecorder::~AVCRecorder() void AVCRecorder::SetCallback(PacketCallback callback, void* callback_data) { - this->callback = callback; - this->callback_data = callback_data; + m_internal->callback = callback; + m_internal->callback_data = callback_data; } void* AVCRecorder::GetCallbackData() { - return callback_data; + return m_internal->callback_data; } - -void AVCRecorder::CheckPending() -{ - while (m_internal->m_pkt_queue.Size() > 0) - { - std::vector pkt = m_internal->m_pkt_queue.Pop(); - if (callback != nullptr) - { - callback(pkt.data(), pkt.size(), callback_data); - } - } -} - - diff --git a/ThreeMM/AVCRecorder.h b/ThreeMM/AVCRecorder.h index 6fe3bc5..cf72487 100644 --- a/ThreeMM/AVCRecorder.h +++ b/ThreeMM/AVCRecorder.h @@ -14,12 +14,8 @@ class AVCRecorder void SetCallback(PacketCallback callback, void* callback_data); void* GetCallbackData(); - void CheckPending(); - private: class Internal; std::unique_ptr m_internal; - PacketCallback callback = nullptr; - void* callback_data = nullptr; }; diff --git a/ThreeMM/OpusPlayer.cpp b/ThreeMM/OpusPlayer.cpp index 86fd535..a4cf327 100644 --- a/ThreeMM/OpusPlayer.cpp +++ b/ThreeMM/OpusPlayer.cpp @@ -1,55 +1,8 @@ #include -#include -#include "utils/Semaphore.h" #include "AudioBuffer.h" #include "AudioIO.h" #include "OpusPlayer.h" - -template -class ConcurrentQueue -{ -public: - ConcurrentQueue() - { - } - - ~ConcurrentQueue() - { - } - - size_t Size() - { - return m_queue.size(); - } - - T Front() - { - return m_queue.front(); - } - - void Push(T packet) - { - m_mutex.lock(); - m_queue.push(packet); - m_mutex.unlock(); - m_semaphore_out.notify(); - } - - T Pop() - { - m_semaphore_out.wait(); - m_mutex.lock(); - T ret = m_queue.front(); - m_queue.pop(); - m_mutex.unlock(); - return ret; - } - -private: - std::queue m_queue; - std::mutex m_mutex; - Semaphore m_semaphore_out; -}; +#include "utils/ConcurrentQueue.h" extern "C" { diff --git a/ThreeMM/OpusRecorder.cpp b/ThreeMM/OpusRecorder.cpp index 9a567d2..1fcd64b 100644 --- a/ThreeMM/OpusRecorder.cpp +++ b/ThreeMM/OpusRecorder.cpp @@ -1,57 +1,9 @@ #include #include -#include -#include "utils/Semaphore.h" #include "AudioBuffer.h" #include "AudioIO.h" #include "OpusRecorder.h" - - -template -class ConcurrentQueue -{ -public: - ConcurrentQueue() - { - } - - ~ConcurrentQueue() - { - } - - size_t Size() - { - return m_queue.size(); - } - - T Front() - { - return m_queue.front(); - } - - void Push(T packet) - { - m_mutex.lock(); - m_queue.push(packet); - m_mutex.unlock(); - m_semaphore_out.notify(); - } - - T Pop() - { - m_semaphore_out.wait(); - m_mutex.lock(); - T ret = m_queue.front(); - m_queue.pop(); - m_mutex.unlock(); - return ret; - } - -private: - std::queue m_queue; - std::mutex m_mutex; - Semaphore m_semaphore_out; -}; +#include "utils/AsyncCallbacks.h" class AudioRecorder { @@ -147,10 +99,14 @@ class OpusRecorder::Internal m_audio_recorder = (std::unique_ptr)(new AudioRecorder(id_audio_device)); m_thread_record = (std::unique_ptr)(new std::thread(thread_record, this)); + + AsyncCallbacks::AddSubQueue(&m_async_queue); } ~Internal() { + AsyncCallbacks::RemoveSubQueue(&m_async_queue); + m_recording = false; m_thread_record->join(); m_thread_record = nullptr; @@ -164,7 +120,8 @@ class OpusRecorder::Internal } - ConcurrentQueue> m_pkt_queue; + PacketCallback callback = nullptr; + void* callback_data = nullptr; private: AVCodecContext* m_audio_enc; @@ -181,6 +138,29 @@ class OpusRecorder::Internal bool m_recording = true; std::unique_ptr m_thread_record; + class PacketCallable : public Callable + { + public: + Internal* self; + std::vector packet; + + PacketCallable(Internal* self, const std::vector& packet) + : self(self), packet(packet) + { + + } + + void call() override + { + if (self->callback != nullptr) + { + self->callback(packet.data(), packet.size(), self->callback_data); + } + } + }; + + AsyncQueue m_async_queue; + void update_audio() { AVCodecContext* c = m_audio_enc; @@ -229,7 +209,8 @@ class OpusRecorder::Internal std::vector packet(pkt.size); memcpy(packet.data(), pkt.data, pkt.size); - m_pkt_queue.Push(packet); + + m_async_queue.Add(new PacketCallable(this, packet)); av_packet_unref(&pkt); } @@ -268,23 +249,11 @@ OpusRecorder::~OpusRecorder() void OpusRecorder::SetCallback(PacketCallback callback, void* callback_data) { - this->callback = callback; - this->callback_data = callback_data; + m_internal->callback = callback; + m_internal->callback_data = callback_data; } void* OpusRecorder::GetCallbackData() { - return callback_data; + return m_internal->callback_data; } - -void OpusRecorder::CheckPending() -{ - while (m_internal->m_pkt_queue.Size() > 0) - { - std::vector pkt = m_internal->m_pkt_queue.Pop(); - if (callback != nullptr) - { - callback(pkt.data(), pkt.size(), callback_data); - } - } -} \ No newline at end of file diff --git a/ThreeMM/OpusRecorder.h b/ThreeMM/OpusRecorder.h index e364934..0aab958 100644 --- a/ThreeMM/OpusRecorder.h +++ b/ThreeMM/OpusRecorder.h @@ -13,12 +13,8 @@ class OpusRecorder void SetCallback(PacketCallback callback, void* callback_data); void* GetCallbackData(); - void CheckPending(); - private: class Internal; std::unique_ptr m_internal; - PacketCallback callback = nullptr; - void* callback_data = nullptr; };