Skip to content

Commit

Permalink
A couple of new unit-tests.
Browse files Browse the repository at this point in the history
These unit-test check that use of the same disp_binder with
fifo_t::cooperation for several coops leads to use separate event queue
for every cooperation.
  • Loading branch information
eao197 committed Oct 16, 2023
1 parent b4fb53b commit faca78c
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 0 deletions.
1 change: 1 addition & 0 deletions dev/test/so_5/disp/adv_thread_pool/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_subdirectory(chstate_in_safe)
add_subdirectory(cooperation_fifo)
add_subdirectory(cooperation_fifo_2)
add_subdirectory(individual_fifo)
add_subdirectory(simple)
add_subdirectory(subscr_in_safe)
Expand Down
1 change: 1 addition & 0 deletions dev/test/so_5/disp/adv_thread_pool/build_tests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
required_prj( "test/so_5/disp/adv_thread_pool/chstate_in_safe/prj.ut.rb" )
required_prj( "test/so_5/disp/adv_thread_pool/subscr_in_safe/prj.ut.rb" )
required_prj( "test/so_5/disp/adv_thread_pool/cooperation_fifo/prj.ut.rb" )
required_prj( "test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.ut.rb" )
required_prj( "test/so_5/disp/adv_thread_pool/individual_fifo/prj.ut.rb" )
required_prj( "test/so_5/disp/adv_thread_pool/unsafe_after_safe/prj.ut.rb" )
required_prj( "test/so_5/disp/adv_thread_pool/custom_work_thread/prj.ut.rb" )
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
set(UNITTEST _unit.test.disp.adv_thread_pool.cooperation_fifo_2)
include(${CMAKE_SOURCE_DIR}/cmake/unittest.cmake)
196 changes: 196 additions & 0 deletions dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* A test for adv_thread_pool dispatcher when a single binder with
* cooperative_fifo is used for several coops.
*/

#include <iostream>
#include <set>
#include <vector>
#include <exception>
#include <stdexcept>
#include <cstdlib>
#include <thread>
#include <chrono>
#include <sstream>

#include <so_5/all.hpp>
#include <so_5/spinlocks.hpp>

#include <test/3rd_party/various_helpers/time_limited_execution.hpp>
#include <test/3rd_party/various_helpers/benchmark_helpers.hpp>

#include "../for_each_lock_factory.hpp"

namespace tp_disp = so_5::disp::adv_thread_pool;

typedef std::set< so_5::current_thread_id_t > thread_id_set_t;

class thread_id_collector_t
{
public :
void add_current_thread()
{
std::lock_guard< so_5::default_spinlock_t > l( m_lock );

m_set.insert( so_5::query_current_thread_id() );
}

std::size_t set_size() const
{
return m_set.size();
}

const thread_id_set_t &
query_set() const
{
return m_set;
}

private :
so_5::default_spinlock_t m_lock;
thread_id_set_t m_set;
};

struct msg_shutdown : public so_5::signal_t {};

class a_test_t : public so_5::agent_t
{
public:
a_test_t(
so_5::environment_t & env,
thread_id_collector_t & collector,
const so_5::mbox_t & shutdowner_mbox )
: so_5::agent_t( env )
, m_collector( collector )
, m_shutdowner_mbox( shutdowner_mbox )
{
}

void
so_evt_start() override
{
// Block the current thread for some time.
// Because of that so_evt_start for an agent from a different
// coop has to be started on a separate thread.
std::this_thread::sleep_for( std::chrono::milliseconds{ 250 } );

m_collector.add_current_thread();

so_5::send< msg_shutdown >( m_shutdowner_mbox );
}

private :
thread_id_collector_t & m_collector;
const so_5::mbox_t m_shutdowner_mbox;
};

class a_shutdowner_t : public so_5::agent_t
{
public :
a_shutdowner_t(
so_5::environment_t & env,
std::size_t working_agents )
: so_5::agent_t( env )
, m_working_agents( working_agents )
{}

void
so_define_agent() override
{
so_subscribe_self().event( [this](mhood_t< msg_shutdown >) {
--m_working_agents;
if( !m_working_agents )
so_environment().stop();
} );
}

private :
std::size_t m_working_agents;
};

const std::size_t cooperation_count = 4;

void
run_sobjectizer(
tp_disp::queue_traits::lock_factory_t factory,
thread_id_collector_t & collector )
{
duration_meter_t duration( "running of test cooperations" );

so_5::launch(
[&]( so_5::environment_t & env )
{
so_5::mbox_t shutdowner_mbox;
{
auto c = env.make_coop();
auto a = c->make_agent< a_shutdowner_t >( cooperation_count );
shutdowner_mbox = a->so_direct_mbox();
env.register_coop( std::move( c ) );
}

auto disp = tp_disp::make_dispatcher(
env,
"adv_thread_pool",
tp_disp::disp_params_t{}
.thread_count( cooperation_count )
.set_queue_params( tp_disp::queue_traits::queue_params_t{}
.lock_factory( factory ) ) );

auto params = tp_disp::bind_params_t{}
.fifo( tp_disp::fifo_t::cooperation );
auto the_same_binder = disp.binder( params );

for( std::size_t i = 0; i != cooperation_count; ++i )
{
auto c = env.make_coop( the_same_binder );
c->make_agent< a_test_t >( collector, shutdowner_mbox );
env.register_coop( std::move( c ) );
}
} );
}

void
analyze_results( const thread_id_collector_t & collector )
{
if( cooperation_count != collector.set_size() )
{
throw std::runtime_error{
"there is a set with size: " + std::to_string( collector.set_size() )
};
}
}

void
run_and_check(
tp_disp::queue_traits::lock_factory_t factory )
{
thread_id_collector_t collector;

run_sobjectizer( factory, collector );

analyze_results( collector );
}

int
main()
{
try
{
for_each_lock_factory( []( tp_disp::queue_traits::lock_factory_t factory ) {
run_with_time_limit(
[&]()
{
run_and_check( factory );
},
240 );
} );
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}

return 0;
}

11 changes: 11 additions & 0 deletions dev/test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
require 'mxx_ru/cpp'

MxxRu::Cpp::exe_target {

required_prj( "so_5/prj.rb" )

target( "_unit.test.disp.adv_thread_pool.cooperation_fifo_2" )

cpp_source( "main.cpp" )
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require 'mxx_ru/binary_unittest'

Mxx_ru::setup_target(
Mxx_ru::Binary_unittest_target.new(
"test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.ut.rb",
"test/so_5/disp/adv_thread_pool/cooperation_fifo_2/prj.rb" )
)
1 change: 1 addition & 0 deletions dev/test/so_5/disp/thread_pool/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_subdirectory(simple)
add_subdirectory(cooperation_fifo)
add_subdirectory(cooperation_fifo_2)
add_subdirectory(individual_fifo)
add_subdirectory(threshold)
add_subdirectory(custom_work_thread)
1 change: 1 addition & 0 deletions dev/test/so_5/disp/thread_pool/build_tests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

required_prj( "#{path}/simple/prj.ut.rb" )
required_prj( "#{path}/cooperation_fifo/prj.ut.rb" )
required_prj( "#{path}/cooperation_fifo_2/prj.ut.rb" )
required_prj( "#{path}/individual_fifo/prj.ut.rb" )
required_prj( "#{path}/threshold/prj.ut.rb" )
required_prj( "#{path}/custom_work_thread/prj.ut.rb" )
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
set(UNITTEST _unit.test.disp.thread_pool.cooperation_fifo_2)
include(${CMAKE_SOURCE_DIR}/cmake/unittest.cmake)
Loading

0 comments on commit faca78c

Please sign in to comment.