Skip to content

Commit 527a204

Browse files
Centralize an entrypoint to locally revoking a comm
Signed-off-by: Matthew Whitlock <[email protected]>
1 parent a4bfac9 commit 527a204

File tree

5 files changed

+30
-17
lines changed

5 files changed

+30
-17
lines changed

ompi/communicator/communicator.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,11 @@ OMPI_DECLSPEC bool ompi_comm_is_proc_active(ompi_communicator_t *comm, int peer_
728728
*/
729729
OMPI_DECLSPEC int ompi_comm_set_rank_failed(ompi_communicator_t *comm, int peer_id, bool remote);
730730

731+
/*
732+
* Locally revoke a communicator
733+
*/
734+
OMPI_DECLSPEC bool ompi_comm_revoke_local(ompi_communicator_t* comm, bool coll_only);
735+
731736
/*
732737
* Returns true if point-to-point communications with the target process
733738
* are supported (this means if the process is a valid peer, if the

ompi/communicator/ft/comm_ft.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -805,7 +805,7 @@ int ompi_comm_set_rank_failed(ompi_communicator_t *comm, int peer_id, bool remot
805805
opal_atomic_wmb(); /* non-locked update needs a memory barrier to propagate */
806806

807807
/* Disable collectives */
808-
MCA_PML_CALL(revoke_comm(comm, true));
808+
ompi_comm_revoke_local(comm, true);
809809

810810
if( NULL != ompi_rank_failure_cbfunc ) {
811811
(*ompi_rank_failure_cbfunc)(comm, peer_id, remote);

ompi/communicator/ft/comm_ft_revoke.c

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#include "ompi/communicator/communicator.h"
1919
#include "ompi/mca/pml/pml.h"
2020

21-
static int ompi_comm_revoke_local(ompi_communicator_t* comm,
21+
static int ompi_comm_revoke_msg_cb(ompi_communicator_t* comm,
2222
ompi_comm_rbcast_message_t* msg);
2323

2424
static int comm_revoke_cb_type = -1;
@@ -27,7 +27,7 @@ int ompi_comm_revoke_init(void)
2727
{
2828
int ret;
2929

30-
ret = ompi_comm_rbcast_register_cb_type(ompi_comm_revoke_local);
30+
ret = ompi_comm_rbcast_register_cb_type(ompi_comm_revoke_msg_cb);
3131
if( 0 <= ret ) {
3232
comm_revoke_cb_type = ret;
3333
return OMPI_SUCCESS;
@@ -66,20 +66,22 @@ int ompi_comm_revoke_internal(ompi_communicator_t* comm)
6666
return ret;
6767
}
6868

69-
70-
/* internal code to revoke the communicator structure. Can be called from the
71-
* API or from receiving a revoke message */
72-
static int ompi_comm_revoke_local(ompi_communicator_t* comm, ompi_comm_rbcast_message_t* msg)
69+
/*
70+
* Internal code to locally revoke a comm and update all necessary state
71+
*/
72+
bool ompi_comm_revoke_local(ompi_communicator_t* comm, bool coll_only)
7373
{
74-
if( comm->comm_revoked ) {
74+
if( comm->comm_revoked || (coll_only && comm->coll_revoked) ) {
7575
OPAL_OUTPUT_VERBOSE((9, ompi_ftmpi_output_handle,
76-
"%s %s: comm %s:%d is already revoked, nothing to do",
77-
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), __func__, ompi_comm_print_cid(comm), comm->c_epoch));
76+
"%s %s: comm %s:%d is already %srevoked, nothing to do",
77+
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), __func__, ompi_comm_print_cid(comm), comm->c_epoch,
78+
coll_only ? "coll " : ""));
7879
return false;
7980
}
8081
OPAL_OUTPUT_VERBOSE((9, ompi_ftmpi_output_handle,
81-
"%s %s: comm %s:%d is marked revoked locally",
82-
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), __func__, ompi_comm_print_cid(comm), comm->c_epoch));
82+
"%s %s: comm %s:%d is marked %srevoked locally",
83+
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), __func__, ompi_comm_print_cid(comm), comm->c_epoch,
84+
coll_only ? "coll " : ""));
8385
/*
8486
* Locally revoke the communicator
8587
*
@@ -90,11 +92,17 @@ static int ompi_comm_revoke_local(ompi_communicator_t* comm, ompi_comm_rbcast_me
9092
*/
9193
comm->any_source_enabled = false;
9294
/* purge the communicator unexpected fragments and matching logic */
93-
MCA_PML_CALL(revoke_comm(comm, false));
95+
MCA_PML_CALL(revoke_comm(comm, coll_only));
9496
/* revoke any subcomms created by coll */
9597
comm->c_coll->coll_revoke_local(comm);
9698
/* Signal the point-to-point stack to recheck requests */
9799
wait_sync_global_wakeup(MPI_ERR_REVOKED);
98100
return true;
99101
}
100102

103+
/* internal code to revoke the communicator structure. Can be called from the
104+
* API or from receiving a revoke message */
105+
static int ompi_comm_revoke_msg_cb(ompi_communicator_t* comm, ompi_comm_rbcast_message_t* msg)
106+
{
107+
return ompi_comm_revoke_local(comm, false);
108+
}

ompi/mca/coll/han/coll_han_subcomms.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,18 +421,18 @@ int mca_coll_han_revoke_local(ompi_communicator_t *comm,
421421
mca_coll_han_module_t *han_module = (mca_coll_han_module_t*) module;
422422
for(int i = 0; i < NB_TOPO_LVL; i++){
423423
if(NULL == han_module->sub_comm[i]) continue;
424-
ompi_comm_revoke_internal(han_module->sub_comm[i]);
424+
ompi_comm_revoke_local(han_module->sub_comm[i], true);
425425
}
426426
if(han_module->cached_low_comms != NULL){
427427
for(int i = 0; i < COLL_HAN_LOW_MODULES; i++){
428428
if(NULL == han_module->cached_low_comms[i]) continue;
429-
ompi_comm_revoke_internal(han_module->cached_low_comms[i]);
429+
ompi_comm_revoke_local(han_module->cached_low_comms[i], true);
430430
}
431431
}
432432
if(han_module->cached_up_comms != NULL){
433433
for(int i = 0; i < COLL_HAN_LOW_MODULES; i++){
434434
if(NULL == han_module->cached_low_comms[i]) continue;
435-
ompi_comm_revoke_internal(han_module->cached_low_comms[i]);
435+
ompi_comm_revoke_local(han_module->cached_low_comms[i], true);
436436
}
437437
}
438438
return MPI_SUCCESS;

ompi/mca/pml/ob1/pml_ob1_recvfrag.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ int mca_pml_ob1_revoke_comm( struct ompi_communicator_t* ompi_comm, bool coll_on
346346

347347
/* For intercomm, also work with the local_comm */
348348
if( OMPI_COMM_IS_INTER(ompi_comm) ) {
349-
mca_pml_ob1_revoke_comm(ompi_comm->c_local_comm, coll_only);
349+
ompi_comm_revoke_local(ompi_comm->c_local_comm, coll_only);
350350
}
351351

352352
OBJ_CONSTRUCT(&nack_list, opal_list_t);

0 commit comments

Comments
 (0)