diff --git a/ompi/mpiext/continue/Makefile.am b/ompi/mpiext/continue/Makefile.am new file mode 100644 index 00000000000..70cd9a7c85a --- /dev/null +++ b/ompi/mpiext/continue/Makefile.am @@ -0,0 +1,22 @@ +# -*- shell-script -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# This Makefile is not traversed during a normal "make all" in an OMPI +# build. It *is* traversed during "make dist", however. So you can +# put EXTRA_DIST targets in here. +# +# You can also use this as a convenience for building this MPI +# extension (i.e., "make all" in this directory to invoke "make all" +# in all the subdirectories). + +SUBDIRS = c + diff --git a/ompi/mpiext/continue/c/Makefile.am b/ompi/mpiext/continue/c/Makefile.am new file mode 100644 index 00000000000..9e5b0cff253 --- /dev/null +++ b/ompi/mpiext/continue/c/Makefile.am @@ -0,0 +1,43 @@ +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_BUILD_MPI_PROFILING is enabled when we want our generated MPI_* symbols +# to be replaced by PMPI_*. +# In this directory, we need it to be 0 + +AM_CPPFLAGS = -DOMPI_BUILD_MPI_PROFILING=0 -DOMPI_COMPILING_FORTRAN_WRAPPERS=0 + +include $(top_srcdir)/Makefile.ompi-rules + +noinst_LTLIBRARIES = libmpiext_continue_c.la + +# This is where the top-level header file (that is included in +# ) must be installed. +ompidir = $(ompiincludedir)/mpiext + +# This is the header file that is installed. +nodist_ompi_HEADERS = mpiext_continue_c.h + +libmpiext_continue_c_la_SOURCES = \ + continuation.c \ + continue.c \ + continueall.c \ + continue_init.c \ + mpiext_continue_module.c + +#libmpiext_continue_c_la_LDFLAGS = -module -avoid-version + +dist_ompidata_DATA = help-mpi-continue.txt + +ompi_HEADERS = $(headers) + +MAINTAINERCLEANFILES = $(nodist_libmpiext_continue_c_la_SOURCES) + diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c new file mode 100644 index 00000000000..d4b1d5d5ff7 --- /dev/null +++ b/ompi/mpiext/continue/c/continuation.c @@ -0,0 +1,670 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#include "ompi_config.h" +#include "opal/class/opal_fifo.h" +#include "opal/class/opal_free_list.h" +#include "opal/sys/atomic.h" +#include "opal/util/show_help.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/request/request.h" + + +static opal_free_list_t ompi_continuation_freelist; +static opal_free_list_t ompi_request_cont_data_freelist; + +/* Forward-decl */ +typedef struct ompi_cont_request_t ompi_cont_request_t; + +static int ompi_continue_request_free(ompi_request_t** cont_req); + +/** + * Continuation class containing the callback, callback data, status, + * and number of outstanding operation requests. + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_continuation_t); + +struct ompi_continuation_t { + opal_free_list_item_t super; /**< Base type */ + struct ompi_cont_request_t *cont_req; /**< The continuation request this continuation is registered with */ + MPIX_Continue_cb_function *cont_cb; /**< The callback function to invoke */ + void *cont_data; /**< Continuation state provided by the user */ + MPI_Status *cont_status; /**< user-provided pointers to status objects */ + opal_atomic_int32_t num_active; /**< The number of active operation requests on this callback */ +}; + +/* Convenience typedef */ +typedef struct ompi_continuation_t ompi_continuation_t; + +static void ompi_continuation_construct(ompi_continuation_t* cont) +{ + cont->cont_req = NULL; + cont->cont_cb = NULL; + cont->cont_data = NULL; + cont->num_active = 0; +} + +static void ompi_continuation_destruct(ompi_continuation_t* cont) +{ + assert(cont->cont_req == NULL); + assert(cont->cont_cb == NULL); + assert(cont->cont_data == NULL); + assert(cont->num_active == 0); +} + +OBJ_CLASS_INSTANCE( + ompi_continuation_t, + opal_free_list_item_t, + ompi_continuation_construct, + ompi_continuation_destruct); + + +/** + * Continuation request, derived from an OMPI request. Continuation request + * keep track of registered continuations and complete once no active + * continuations are registered. + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_cont_request_t); +struct ompi_cont_request_t { + ompi_request_t super; + opal_atomic_lock_t cont_lock; /**< Lock used completing/restarting the cont request */ + bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */ + opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */ + uint32_t continue_max_poll; /**< max number of local continuations to execute at once */ + opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */ + ompi_wait_sync_t *sync; /**< Sync object this continuation request is attached to */ +}; + +static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) +{ + OMPI_REQUEST_INIT(&cont_req->super, true); + cont_req->super.req_type = OMPI_REQUEST_CONT; + cont_req->super.req_complete = REQUEST_COMPLETED; + cont_req->super.req_state = OMPI_REQUEST_ACTIVE; + cont_req->super.req_persistent = true; + cont_req->super.req_free = &ompi_continue_request_free; + cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ + opal_atomic_lock_init(&cont_req->cont_lock, false); + cont_req->cont_enqueue_complete = false; + cont_req->cont_num_active = 0; + cont_req->continue_max_poll = UINT32_MAX; + cont_req->cont_complete_list = NULL; + cont_req->sync = NULL; +} + +static void ompi_cont_request_destruct(ompi_cont_request_t* cont_req) +{ + OMPI_REQUEST_FINI(&cont_req->super); + assert(cont_req->cont_num_active == 0); + if (NULL != cont_req->cont_complete_list) { + OPAL_LIST_RELEASE(cont_req->cont_complete_list); + cont_req->cont_complete_list = NULL; + } +} + +OBJ_CLASS_INSTANCE( + ompi_cont_request_t, + ompi_request_t, + ompi_cont_request_construct, + ompi_cont_request_destruct); + +/** + * Data block associated with requests + * The same structure is used for continuation requests and operation + * requests with attached continuations. + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_request_cont_data_t); + +struct ompi_request_cont_data_t { + opal_free_list_item_t super; + ompi_continuation_t *cont_obj; /**< User-defined continuation state */ + ompi_status_public_t *cont_status; /**< The status object to set before invoking continuation */ +}; + +/* Convenience typedef */ +typedef struct ompi_request_cont_data_t ompi_request_cont_data_t; + +OBJ_CLASS_INSTANCE( + ompi_request_cont_data_t, + opal_free_list_item_t, + NULL, NULL); + +/** + * List of continuations eligible for execution + */ +static opal_list_t continuation_list; + +/** + * Mutex to protect the continuation_list + */ +static opal_mutex_t request_cont_lock; + +/** + * Flag indicating whether the progress callback has been registered. + */ +static bool progress_callback_registered = false; + +struct lazy_list_s { + opal_list_t list; + bool is_initialized; +}; +typedef struct lazy_list_s lazy_list_t; + +static opal_thread_local lazy_list_t thread_progress_list = { .is_initialized = false }; + +static inline +void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req, + int32_t num_release, + bool take_lock) +{ + int num_active = opal_atomic_add_fetch_32(&cont_req->cont_num_active, -num_release); + assert(num_active >= 0); + if (0 == num_active) { + const bool using_threads = opal_using_threads(); + if (take_lock && using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + /* double check that no other thread has completed or restarted the request already */ + if (0 == cont_req->cont_num_active && !REQUEST_COMPLETE(&cont_req->super)) { + opal_atomic_wmb(); + /* signal that all continuations were found complete */ + ompi_request_complete(&cont_req->super, true); + } + if (NULL != cont_req->sync) { + /* release the sync object */ + OPAL_THREAD_ADD_FETCH32(&cont_req->sync->num_req_need_progress, -1); + } + if (take_lock && using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } + } +} + +static inline +void ompi_continue_cont_release(ompi_continuation_t *cont) +{ + ompi_cont_request_t *cont_req = cont->cont_req; + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); + + ompi_continue_cont_req_release(cont_req, 1, true); + OBJ_RELEASE(cont_req); + +#ifdef OPAL_ENABLE_DEBUG + cont->cont_cb = NULL; + cont->cont_data = NULL; + cont->cont_req = NULL; +#endif // OPAL_ENABLE_DEBUG + opal_free_list_return(&ompi_continuation_freelist, &cont->super); +} + +/** + * Process a callback. Returns the callback object to the freelist. + */ +static inline +void ompi_continue_cont_invoke(ompi_continuation_t *cont) +{ + ompi_cont_request_t *cont_req = cont->cont_req; + assert(NULL != cont_req); + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); + + MPIX_Continue_cb_function *fn = cont->cont_cb; + void *cont_data = cont->cont_data; + MPI_Status *statuses = cont->cont_status; + fn(statuses, cont_data); + ompi_continue_cont_release(cont); +} + +/** + * Allow multiple threads to progress callbacks concurrently + * but protect from recursive progressing + */ +static opal_thread_local int in_progress = 0; + +static +int ompi_continue_progress_n(const uint32_t max) +{ + + if (in_progress) return 0; + + uint32_t completed = 0; + in_progress = 1; + + const bool using_threads = opal_using_threads(); + + /* execute thread-local continuations first + * (e.g., from continuation requests the current thread is waiting on) */ + lazy_list_t *tl_list = &thread_progress_list; + if (tl_list->is_initialized) { + ompi_cont_request_t *cont_req; + OPAL_LIST_FOREACH(cont_req, &tl_list->list, ompi_cont_request_t) { + ompi_continuation_t *cb; + if (opal_list_is_empty(cont_req->cont_complete_list)) continue; + while (max > completed) { + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + cb = (ompi_continuation_t *) opal_list_remove_first(cont_req->cont_complete_list); + opal_atomic_unlock(&cont_req->cont_lock); + } else { + cb = (ompi_continuation_t *) opal_list_remove_first(cont_req->cont_complete_list); + } + if (NULL == cb) break; + + ompi_continue_cont_invoke(cb); + ++completed; + } + if (max <= completed) break; + } + } + + if (!opal_list_is_empty(&continuation_list)) { + /* global progress */ + while (max > completed) { + ompi_continuation_t *cb; + if (using_threads) { + opal_mutex_lock(&request_cont_lock); + cb = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); + opal_mutex_unlock(&request_cont_lock); + } else { + cb = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); + } + if (NULL == cb) break; + ompi_continue_cont_invoke(cb); + ++completed; + } + } + + in_progress = 0; + + return completed; +} + +static int ompi_continue_progress_callback() +{ + return ompi_continue_progress_n(1); +} + +static int ompi_continue_wait_progress_callback() +{ + return ompi_continue_progress_n(UINT32_MAX); +} + + +int ompi_continue_progress_request(ompi_request_t *req) +{ + if (in_progress) return 0; + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + if (NULL == cont_req->cont_complete_list) { + /* progress as many as allowed */ + return ompi_continue_progress_n(cont_req->continue_max_poll); + } + if (opal_list_is_empty(cont_req->cont_complete_list)) { + return 0; + } + + in_progress = 1; + + const uint32_t max_poll = cont_req->continue_max_poll; + + uint32_t completed = 0; + const bool using_threads = opal_using_threads(); + while (max_poll > completed && !opal_list_is_empty(cont_req->cont_complete_list)) { + ompi_continuation_t *cb; + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + cb = (ompi_continuation_t *) opal_list_remove_first(cont_req->cont_complete_list); + opal_atomic_unlock(&cont_req->cont_lock); + } else { + cb = (ompi_continuation_t *) opal_list_remove_first(cont_req->cont_complete_list); + } + if (NULL == cb) break; + + ompi_continue_cont_invoke(cb); + completed++; + } + + in_progress = 0; + + return completed; +} + + +/** + * Register the continuation request so that it will be progressed even if + * it is poll-only and the thread is waiting on the provided sync object. + */ +int ompi_continue_register_request_progress(ompi_request_t *req, ompi_wait_sync_t *sync) +{ + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + + if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; + + lazy_list_t *cont_req_list = &thread_progress_list; + + /* check that the thread-local list is initialized */ + if (!cont_req_list->is_initialized) { + OBJ_CONSTRUCT(&cont_req_list->list, opal_list_t); + cont_req_list->is_initialized = true; + } + + /* add the continuation request to the thread-local list */ + opal_list_append(&cont_req_list->list, &cont_req->super.super.super); + + /* register with the sync object */ + if (NULL != sync) { + sync->num_req_need_progress++; + sync->progress_cb = &ompi_continue_wait_progress_callback; + } + cont_req->sync = sync; + + return OMPI_SUCCESS; +} + +/** + * Remove the poll-only continuation request from the thread's progress list after + * it has completed. + */ +int ompi_continue_deregister_request_progress(ompi_request_t *req) +{ + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + + if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; + + /* let the sync know we're done, it may suspend the thread now */ + if (NULL != cont_req->sync) { + cont_req->sync->num_req_need_progress--; + } + + /* remove the continuation request from the thread-local progress list */ + opal_list_remove_item(&thread_progress_list.list, &req->super.super); + + return OMPI_SUCCESS; +} + +int ompi_continuation_init(void) +{ + OBJ_CONSTRUCT(&request_cont_lock, opal_mutex_t); + OBJ_CONSTRUCT(&continuation_list, opal_list_t); + + OBJ_CONSTRUCT(&ompi_continuation_freelist, opal_free_list_t); + opal_free_list_init(&ompi_continuation_freelist, + sizeof(ompi_continuation_t), + opal_cache_line_size, + OBJ_CLASS(ompi_continuation_t), + 0, opal_cache_line_size, + 0, -1 , 8, NULL, 0, NULL, NULL, NULL); + + OBJ_CONSTRUCT(&ompi_request_cont_data_freelist, opal_free_list_t); + opal_free_list_init(&ompi_request_cont_data_freelist, + sizeof(ompi_request_cont_data_t), + opal_cache_line_size, + OBJ_CLASS(ompi_request_cont_data_t), + 0, opal_cache_line_size, + 0, -1 , 8, NULL, 0, NULL, NULL, NULL); + return OMPI_SUCCESS; +} + +int ompi_continuation_fini(void) +{ + if (progress_callback_registered) { + opal_progress_unregister(&ompi_continue_progress_callback); + } + + if (!opal_list_is_empty(&continuation_list)) { + opal_show_help("help-mpi-continue.txt", "continue:incomplete_shutdown", 1, + opal_list_get_size(&continuation_list)); + } + OBJ_DESTRUCT(&continuation_list); + + OBJ_DESTRUCT(&request_cont_lock); + OBJ_DESTRUCT(&ompi_continuation_freelist); + OBJ_DESTRUCT(&ompi_request_cont_data_freelist); + + return OMPI_SUCCESS; +} + +/** + * Enqueue the continuation for later invocation. + */ +static void +ompi_continue_enqueue_runnable(ompi_continuation_t *cont) +{ + ompi_cont_request_t *cont_req = cont->cont_req; + if (NULL != cont_req->cont_complete_list) { + opal_atomic_lock(&cont_req->cont_lock); + opal_list_append(cont_req->cont_complete_list, &cont->super.super); + opal_atomic_unlock(&cont_req->cont_lock); + } else { + OPAL_THREAD_LOCK(&request_cont_lock); + opal_list_append(&continuation_list, &cont->super.super); + if (OPAL_UNLIKELY(!progress_callback_registered)) { + /* TODO: Ideally, we want to ensure that the callback is called *after* + * all the other progress callbacks are done so that any + * completions have happened before we attempt to execute + * callbacks. There doesn't seem to exist the infrastructure though. + */ + opal_progress_register(&ompi_continue_progress_callback); + progress_callback_registered = true; + } + OPAL_THREAD_UNLOCK(&request_cont_lock); + } +} + +/** + * Create and initialize a continuation object. + */ +static inline +ompi_continuation_t *ompi_continue_cont_create( + int count, + ompi_cont_request_t *cont_req, + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + MPI_Status *cont_status) +{ + ompi_continuation_t *cont; + cont = (ompi_continuation_t *)opal_free_list_get(&ompi_continuation_freelist); + cont->cont_req = cont_req; + cont->cont_cb = cont_cb; + cont->cont_data = cont_data; + cont->num_active = count; + cont->cont_status = cont_status; + + /* signal that the continuation request has a new continuation */ + OBJ_RETAIN(cont_req); + + int32_t num_active = opal_atomic_fetch_add_32(&cont_req->cont_num_active, 1); + if (num_active == 0) { + const bool using_threads = opal_using_threads(); + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + if (0 != cont_req->cont_num_active && REQUEST_COMPLETE(&cont_req->super)) { + /* (re)activate the continuation request upon first registration */ + cont_req->super.req_complete = REQUEST_PENDING; + } + if (using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } + } + + return cont; +} + +static int request_completion_cb(ompi_request_t *request) +{ + assert(NULL != request->req_complete_cb_data); + int rc = 0; + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)request->req_complete_cb_data; + + ompi_continuation_t *cont = req_cont_data->cont_obj; + req_cont_data->cont_obj = NULL; + + /* set the status object */ + if (NULL != req_cont_data->cont_status) { + OMPI_COPY_STATUS(req_cont_data->cont_status, request->req_status, true); + req_cont_data->cont_status = NULL; + } + + int32_t num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, -1); + + if (0 == num_active) { + /* the continuation is ready for execution */ + ompi_continue_enqueue_runnable(cont); + } + + /* inactivate / free the request */ + if (request->req_persistent) { + if (OMPI_REQUEST_CONT != request->req_type) { + request->req_state = OMPI_REQUEST_INACTIVE; + } + } else { + /* release the request object and let the caller know */ + ompi_request_free(&request); + rc = 1; + } + + request->req_complete_cb_data = NULL; + opal_free_list_return(&ompi_request_cont_data_freelist, &req_cont_data->super); + + return rc; +} + +int ompi_continue_attach( + ompi_request_t *continuation_request, + const int count, + ompi_request_t *requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + ompi_status_public_t statuses[]) +{ + assert(OMPI_REQUEST_CONT == continuation_request->req_type); + + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)continuation_request; + ompi_continuation_t *cont = ompi_continue_cont_create(count, cont_req, cont_cb, + cont_data, statuses); + + /* memory barrier to make sure a thread completing a request see + * a correct continuation object */ + opal_atomic_wmb(); + + int32_t num_registered = 0; + for (int i = 0; i < count; ++i) { + ompi_request_t *request = requests[i]; + if (MPI_REQUEST_NULL == request) { + /* set the status for null-request */ + if (statuses != MPI_STATUSES_IGNORE) { + OMPI_COPY_STATUS(&statuses[i], ompi_status_empty, true); + } + } else { + if (&ompi_request_empty == request) { + /* empty request: do not modify, just copy out the status */ + if (statuses != MPI_STATUSES_IGNORE) { + OMPI_COPY_STATUS(&statuses[i], request->req_status, true); + } + requests[i] = MPI_REQUEST_NULL; + } else { + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)request->req_complete_cb_data; + if (!req_cont_data) { + req_cont_data = (ompi_request_cont_data_t *)opal_free_list_get(&ompi_request_cont_data_freelist); + /* NOTE: request->req_complete_cb_data will be set in ompi_request_set_callback */ + } else { + assert(request->req_type == OMPI_REQUEST_CONT); + } + req_cont_data->cont_status = NULL; + if (statuses != MPI_STATUSES_IGNORE) { + req_cont_data->cont_status = &statuses[i]; + } + + req_cont_data->cont_obj = cont; + + ompi_request_set_callback(request, &request_completion_cb, req_cont_data); + ++num_registered; + + /* take ownership of any non-persistent request */ + if (!request->req_persistent) + { + requests[i] = MPI_REQUEST_NULL; + } + } + } + } + + assert(count >= num_registered); + int num_complete = count - num_registered; + int32_t last_num_active = count; + if (num_complete > 0) { + last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, -num_complete); + } + if (0 == last_num_active) { + if (cont_req->cont_enqueue_complete) { + /* enqueue for later processing */ + ompi_continue_enqueue_runnable(cont); + } else { + /** + * Execute the continuation immediately + */ + ompi_continue_cont_invoke(cont); + } + } + + return OMPI_SUCCESS; +} + +/** + * Continuation request management + */ +int ompi_continue_allocate_request(ompi_request_t **cont_req_ptr, ompi_info_t *info) +{ + ompi_cont_request_t *cont_req = OBJ_NEW(ompi_cont_request_t); + + if (OPAL_LIKELY(NULL != cont_req)) { + int flag; + bool test_poll = false; + ompi_info_get_bool(info, "mpi_continue_poll_only", &test_poll, &flag); + + if (flag && test_poll) { + cont_req->cont_complete_list = OBJ_NEW(opal_list_t); + } + + bool enqueue_complete = false; + ompi_info_get_bool(info, "mpi_continue_enqueue_complete", &enqueue_complete, &flag); + cont_req->cont_enqueue_complete = (flag && enqueue_complete); + + opal_cstring_t *value_str; + ompi_info_get(info, "mpi_continue_max_poll", &value_str, &flag); + if (flag) { + int max_poll = atoi(value_str->string); + OBJ_RELEASE(value_str); + if (max_poll > 0) { + cont_req->continue_max_poll = max_poll; + } + } + *cont_req_ptr = &cont_req->super; + + return MPI_SUCCESS; + } + + return OMPI_ERR_OUT_OF_RESOURCE; +} + +static int ompi_continue_request_free(ompi_request_t** cont_req_ptr) +{ + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)*cont_req_ptr; + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); + OBJ_RELEASE(cont_req); + *cont_req_ptr = &ompi_request_null.request; + return OMPI_SUCCESS; +} diff --git a/ompi/mpiext/continue/c/continuation.h b/ompi/mpiext/continue/c/continuation.h new file mode 100644 index 00000000000..6de7e73328f --- /dev/null +++ b/ompi/mpiext/continue/c/continuation.h @@ -0,0 +1,83 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020-2022 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_CONTINUATION_H +#define OMPI_CONTINUATION_H + +#include "ompi_config.h" +#include "ompi/info/info.h" + +/* Workaround to prevent issues with static functions and circular dependency in request.h */ +#define OLD_OMPI_HAVE_MPI_EXT_CONTINUE OMPI_HAVE_MPI_EXT_CONTINUE +#undef OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/request/request.h" +#define OMPI_HAVE_MPI_EXT_CONTINUE OLD_OMPI_HAVE_MPI_EXT_CONTINUE +#include "mpi.h" +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + + +BEGIN_C_DECLS + +/** + * Initialize the user-callback infrastructure. + */ +int ompi_continuation_init(void); + +/** + * Finalize the user-callback infrastructure. + */ +int ompi_continuation_fini(void); + +/** + * Register a request with local completion list for progressing through + * the progress engine. + */ +int ompi_continue_register_request_progress(struct ompi_request_t *cont_req, ompi_wait_sync_t *sync); + +/** + * Deregister a request with local completion list from progressing through + * the progress engine. + */ +int ompi_continue_deregister_request_progress(struct ompi_request_t *cont_req); + +/** + * Progress a continuation request that has local completions. + */ +int ompi_continue_progress_request(struct ompi_request_t *cont_req); + +/** + * Attach a continuation to a set of operations represented by \c requests. + * The \c statuses will be set before the \c cont_cb callback is invoked and + * passed together with \c cont_data to the callback. Passing \c MPI_STATUSES_IGNORE + * is valid, in which case statuses are ignored. + * The continuation is registered with the continuation request \c cont_req, which + * can be used to query for and progress outstanding continuations. + */ +int ompi_continue_attach( + struct ompi_request_t *cont_req, + int count, + struct ompi_request_t *requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + ompi_status_public_t statuses[]); + + +/** + * Allocate a new continuation request. + */ +int ompi_continue_allocate_request(struct ompi_request_t **cont_req, ompi_info_t *info); + +END_C_DECLS + +#endif // OMPI_CONTINUATION_H diff --git a/ompi/mpiext/continue/c/continue.c b/ompi/mpiext/continue/c/continue.c new file mode 100644 index 00000000000..cb01a84250d --- /dev/null +++ b/ompi/mpiext/continue/c/continue.c @@ -0,0 +1,65 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continue = PMPIX_Continue +#endif +#define MPIX_Continue PMPIX_Continue +#endif + +static const char FUNC_NAME[] = "MPIX_Continue"; + +int MPIX_Continue( + MPI_Request *request, + MPIX_Continue_cb_function *cont_cb, + void *cb_data, + MPI_Status *status, + MPI_Request cont_req) +{ + int rc; + + MEMCHECKER( + memchecker_request(request); + ); + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == request) { + rc = MPI_ERR_REQUEST; + } + if (MPI_REQUEST_NULL == cont_req || OMPI_REQUEST_CONT != cont_req->req_type) { + rc = MPI_ERR_REQUEST; + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + rc = ompi_continue_attach(cont_req, 1, request, cont_cb, cb_data, + MPI_STATUS_IGNORE == status ? MPI_STATUSES_IGNORE : status); + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/continue_init.c b/ompi/mpiext/continue/c/continue_init.c new file mode 100644 index 00000000000..e7d1b4d02c7 --- /dev/null +++ b/ompi/mpiext/continue/c/continue_init.c @@ -0,0 +1,58 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continue_init = PMPIX_Continue_init +#endif +#define MPIX_Continue_init PMPIX_Continue_init +#endif + +static const char FUNC_NAME[] = "MPIX_Continue_init"; + +int MPIX_Continue_init(MPI_Request *cont_req, MPI_Info info) +{ + int rc = MPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == cont_req) { + rc = MPI_ERR_ARG; + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + + ompi_request_t *res; + rc = ompi_continue_allocate_request(&res, info); + + if (MPI_SUCCESS == rc) { + *cont_req = res; + } + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/continueall.c b/ompi/mpiext/continue/c/continueall.c new file mode 100644 index 00000000000..b7c15079b67 --- /dev/null +++ b/ompi/mpiext/continue/c/continueall.c @@ -0,0 +1,76 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continueall = PMPIX_Continueall +#endif +#define MPIX_Continueall PMPIX_Continueall +#endif + +static const char FUNC_NAME[] = "MPIX_Continueall"; + +int MPIX_Continueall( + int count, + MPI_Request requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + MPI_Status statuses[], + MPI_Request cont_req) +{ + int rc; + + MEMCHECKER( + for (int j = 0; j < count; j++){ + memchecker_request(&requests[j]); + } + ); + + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (MPI_REQUEST_NULL == cont_req || OMPI_REQUEST_CONT != cont_req->req_type) { + rc = MPI_ERR_REQUEST; + } + if( (NULL == requests) && (0 != count) ) { + rc = MPI_ERR_REQUEST; + } else { + for (int i = 0; i < count; i++) { + if (NULL == requests[i]) { + rc = MPI_ERR_REQUEST; + break; + } + } + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + rc = ompi_continue_attach(cont_req, count, requests, cont_cb, + cont_data, statuses); + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/help-mpi-continue.txt b/ompi/mpiext/continue/c/help-mpi-continue.txt new file mode 100644 index 00000000000..f7cf4598b4b --- /dev/null +++ b/ompi/mpiext/continue/c/help-mpi-continue.txt @@ -0,0 +1,16 @@ +# -*- text -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# +# This is the US/English general help file for the OMPI Continuations extension. +# +[continue:incomplete_shutdown] +WARNING: Found %zu incomplete continuations during shutdown! +# diff --git a/ompi/mpiext/continue/c/mpiext_continue_c.h b/ompi/mpiext/continue/c/mpiext_continue_c.h new file mode 100644 index 00000000000..e3dceba1a15 --- /dev/null +++ b/ompi/mpiext/continue/c/mpiext_continue_c.h @@ -0,0 +1,22 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include + +typedef void (MPIX_Continue_cb_function)(MPI_Status *statuses, void *user_data); +OMPI_DECLSPEC int MPIX_Continue_init(MPI_Request *cont_req, MPI_Info info); +OMPI_DECLSPEC int MPIX_Continue(MPI_Request *request, MPIX_Continue_cb_function *cb, void *cb_data, + MPI_Status *status, MPI_Request cont_req); +OMPI_DECLSPEC int MPIX_Continueall(int count, MPI_Request request[], MPIX_Continue_cb_function *cb, void *cb_data, + MPI_Status status[], MPI_Request cont_req); diff --git a/ompi/mpiext/continue/c/mpiext_continue_module.c b/ompi/mpiext/continue/c/mpiext_continue_module.c new file mode 100644 index 00000000000..05db810d6de --- /dev/null +++ b/ompi/mpiext/continue/c/mpiext_continue_module.c @@ -0,0 +1,26 @@ +/** + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mpiext/mpiext.h" +#include "ompi/mpiext/continue/c/continuation.h" + +/* + * Similar to Open MPI components, a well-known struct provides + * function pointers to the extension's init/fini hooks. The struct + * must be a global symbol of the form ompi_mpiext_ and be + * of type ompi_mpiext_component_t. + */ +ompi_mpiext_component_t ompi_mpiext_continue = { + ompi_continuation_init, + ompi_continuation_fini +}; diff --git a/ompi/mpiext/continue/c/profile/Makefile.am b/ompi/mpiext/continue/c/profile/Makefile.am new file mode 100644 index 00000000000..1eab74e4be6 --- /dev/null +++ b/ompi/mpiext/continue/c/profile/Makefile.am @@ -0,0 +1,37 @@ +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_BUILD_MPI_PROFILING is enabled when we want our generated MPI_* symbols +# to be replaced by PMPI_*. +# In this directory, we need it to be 0 + +AM_CPPFLAGS = -DOMPI_BUILD_MPI_PROFILING=1 + +noinst_LTLIBRARIES = libpmpiext_continue_c.la + +nodist_libpmpiext_continue_c_la_SOURCES = \ + pcontinue.c \ + pcontinueall.c \ + pcontinue_init.c + +# +# Sym link in the sources from the real MPI directory +# +$(nodist_libpmpiext_continue_c_la_SOURCES): + $(OMPI_V_LN_S) if test ! -r $@ ; then \ + pname=`echo $@ | cut -b '2-'` ; \ + $(LN_S) $(top_srcdir)/ompi/mpiext/continue/c/$$pname $@ ; \ + fi + + +# These files were created by targets above + +MAINTAINERCLEANFILES = $(nodist_libpmpiext_continue_c_la_SOURCES) diff --git a/ompi/mpiext/continue/configure.m4 b/ompi/mpiext/continue/configure.m4 new file mode 100644 index 00000000000..1907dfa8767 --- /dev/null +++ b/ompi/mpiext/continue/configure.m4 @@ -0,0 +1,35 @@ +# -*- shell-script -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_MPIEXT_continue_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([OMPI_MPIEXT_continue_CONFIG],[ + AC_CONFIG_FILES([ompi/mpiext/continue/Makefile]) + AC_CONFIG_FILES([ompi/mpiext/continue/c/Makefile]) + AC_CONFIG_FILES([ompi/mpiext/continue/c/profile/Makefile]) + + # This module is not stable yet so it should only be built + # if explicitly requested + AS_IF([test "$ENABLE_continue" = "1"], + [$1], + [$2]) +])dnl + +# we need init/finalize +AC_DEFUN([OMPI_MPIEXT_continue_NEED_INIT], [1]) + +AC_DEFUN([OMPI_MPIEXT_continue_POST_CONFIG], [ + AS_IF([test "$ENABLE_continue" = "1"], + [AC_DEFINE_UNQUOTED([OMPI_HAVE_MPI_EXT_CONTINUE], [1], + [Whether MPI Continuations are enabled])], + []) +]) diff --git a/ompi/mpiext/continue/owner.txt b/ompi/mpiext/continue/owner.txt new file mode 100644 index 00000000000..bc6c5ecb028 --- /dev/null +++ b/ompi/mpiext/continue/owner.txt @@ -0,0 +1,7 @@ +# +# owner/status file +# owner: institution that is responsible for this package +# status: e.g. active, maintenance, unmaintained +# +owner: UTK +status: active diff --git a/ompi/request/req_test.c b/ompi/request/req_test.c index cd04645a0c0..b87e22c661b 100644 --- a/ompi/request/req_test.c +++ b/ompi/request/req_test.c @@ -26,6 +26,10 @@ #include "ompi/request/request_default.h" #include "ompi/request/grequest.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + int ompi_request_default_test(ompi_request_t ** rptr, int *completed, ompi_status_public_t * status ) @@ -57,6 +61,14 @@ int ompi_request_default_test(ompi_request_t ** rptr, if (MPI_STATUS_IGNORE != status) { OMPI_COPY_STATUS(status, request->req_status, false); } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + return request->req_status.MPI_ERROR; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; return request->req_status.MPI_ERROR; @@ -87,7 +99,17 @@ int ompi_request_default_test(ompi_request_t ** rptr, * leaving. We will call the opal_progress only once per call. */ ++do_it_once; - if (0 != opal_progress()) { + int rc = 0; + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + rc = ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + + if (rc != 0 || 0 != opal_progress()) { goto recheck_request_status; } } @@ -132,6 +154,13 @@ int ompi_request_default_test_any( OMPI_COPY_STATUS(status, request->req_status, false); } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + return OMPI_SUCCESS; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; return OMPI_SUCCESS; @@ -155,6 +184,14 @@ int ompi_request_default_test_any( return MPI_ERR_PROC_FAILED_PENDING; } #endif /* OPAL_ENABLE_FT_MPI */ + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE } /* Only fall through here if we found nothing */ @@ -194,6 +231,15 @@ int ompi_request_default_test_all( num_completed++; continue; } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + #if OPAL_ENABLE_FT_MPI /* Check for dead requests due to process failure */ /* Special case for MPI_ANY_SOURCE */ @@ -217,8 +263,11 @@ int ompi_request_default_test_all( } } #endif /* OPAL_ENABLE_PROGRESS_THREADS */ - /* short-circuit */ + +#if !OMPI_HAVE_MPI_EXT_CONTINUE + /* short-circuit, unless there may be continuation requests */ break; +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } if (num_completed != count) { @@ -245,6 +294,14 @@ int ompi_request_default_test_all( ompi_grequest_invoke_query(request, &request->req_status); } OMPI_COPY_STATUS(&statuses[i], request->req_status, true); + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; continue; @@ -280,6 +337,14 @@ int ompi_request_default_test_all( if (OMPI_REQUEST_GEN == request->req_type) { ompi_grequest_invoke_query(request, &request->req_status); } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; continue; @@ -330,6 +395,7 @@ int ompi_request_default_test_some( indices[num_requests_done++] = i; continue; } + #if OPAL_ENABLE_FT_MPI /* Check for dead requests due to process failure */ /* Special case for MPI_ANY_SOURCE - Error managed below */ @@ -338,6 +404,14 @@ int ompi_request_default_test_some( indices[num_requests_done++] = i; } #endif /* OPAL_ENABLE_FT_MPI */ + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE } /* @@ -393,6 +467,13 @@ int ompi_request_default_test_some( #endif /* OPAL_ENABLE_FT_MPI */ } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; } else { diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index 14a8dcbf134..2c64af2768c 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -31,6 +31,10 @@ #include "ompi/request/request_default.h" #include "ompi/request/grequest.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + int ompi_request_default_wait( ompi_request_t ** req_ptr, ompi_status_public_t * status) @@ -58,6 +62,14 @@ int ompi_request_default_wait( if( MPI_STATUS_IGNORE != status ) { OMPI_COPY_STATUS(status, req->req_status, false); } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* continuation requests are alwys active, don't modify the state */ + return req->req_status.MPI_ERROR; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( req->req_persistent ) { if( req->req_state == OMPI_REQUEST_INACTIVE ) { if (MPI_STATUS_IGNORE != status) { @@ -65,6 +77,14 @@ int ompi_request_default_wait( } return OMPI_SUCCESS; } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* continuation requests are alwys active, don't modify the state */ + return req->req_status.MPI_ERROR; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + req->req_state = OMPI_REQUEST_INACTIVE; return req->req_status.MPI_ERROR; } @@ -91,6 +111,9 @@ int ompi_request_default_wait_any(size_t count, int rc = OMPI_SUCCESS; ompi_request_t *request=NULL; ompi_wait_sync_t sync; +#if OMPI_HAVE_MPI_EXT_CONTINUE + bool have_cont_req = false; +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ if (OPAL_UNLIKELY(0 == count)) { *index = MPI_UNDEFINED; @@ -122,6 +145,13 @@ int ompi_request_default_wait_any(size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + have_cont_req = true; + ompi_continue_register_request_progress(request, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { completed = i; @@ -144,6 +174,19 @@ int ompi_request_default_wait_any(size_t count, rc = SYNC_WAIT(&sync); after_sync_wait: + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (have_cont_req) { + have_cont_req = false; + for (i = 0; i < count; i++) { + request = requests[i]; + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } + } + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + /* recheck the complete status and clean up the sync primitives. * Do it backward to return the earliest complete request to the * user. @@ -178,7 +221,7 @@ int ompi_request_default_wait_any(size_t count, if( *index == (int)completed ) { /* Only one request has triggered. There was no in-flight * completions. Drop the signalled flag so we won't block - * in WAIT_SYNC_RELEASE + * in WAIT_SYNC_RELEASE */ WAIT_SYNC_SIGNALLED(&sync); } @@ -202,7 +245,14 @@ int ompi_request_default_wait_any(size_t count, } rc = request->req_status.MPI_ERROR; if( request->req_persistent ) { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT != request->req_type) { + request->req_state = OMPI_REQUEST_INACTIVE; + } +#else // OMPI_HAVE_MPI_EXT_CONTINUE request->req_state = OMPI_REQUEST_INACTIVE; +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + } else if (MPI_SUCCESS == rc) { /* Only free the request if there is no error on it */ /* If there's an error while freeing the request, @@ -252,6 +302,12 @@ int ompi_request_default_wait_all( size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_register_request_progress(request, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { failed++; @@ -308,6 +364,12 @@ int ompi_request_default_wait_all( size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { OMPI_COPY_STATUS(&statuses[i], ompi_status_empty, true); continue; @@ -346,6 +408,14 @@ int ompi_request_default_wait_all( size_t count, OMPI_COPY_STATUS(&statuses[i], request->req_status, true); + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; continue; @@ -372,6 +442,12 @@ int ompi_request_default_wait_all( size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { rc = ompi_status_empty.MPI_ERROR; goto absorb_error_and_continue; @@ -411,6 +487,13 @@ int ompi_request_default_wait_all( size_t count, rc = request->req_status.MPI_ERROR; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; } else if (MPI_SUCCESS == rc) { @@ -490,6 +573,12 @@ int ompi_request_default_wait_some(size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_register_request_progress(request, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { num_requests_done++; @@ -523,6 +612,12 @@ int ompi_request_default_wait_some(size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { continue; } @@ -530,14 +625,14 @@ int ompi_request_default_wait_some(size_t count, * a) request was found completed in the first loop * => ( indices[i] == 0 ) * b) request was completed between first loop and this check - * => ( indices[i] == 1 ) and we can NOT atomically mark the + * => ( indices[i] == 1 ) and we can NOT atomically mark the * request as pending. * c) request wasn't finished yet - * => ( indices[i] == 1 ) and we CAN atomically mark the + * => ( indices[i] == 1 ) and we CAN atomically mark the * request as pending. * NOTE that in any case (i >= num_requests_done) as latter grows * either slowly (in case of partial completion) - * OR in parallel with `i` (in case of full set completion) + * OR in parallel with `i` (in case of full set completion) */ if( !indices[num_active_reqs] ) { indices[num_requests_done++] = i; @@ -607,6 +702,13 @@ int ompi_request_default_wait_some(size_t count, rc = MPI_ERR_IN_STATUS; } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; } else { diff --git a/ompi/request/request.h b/ompi/request/request.h index 0e6fb80cbf7..930578a20e2 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -40,6 +40,10 @@ #include "ompi/constants.h" #include "ompi/runtime/params.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + BEGIN_C_DECLS /** @@ -465,7 +469,20 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) WAIT_SYNC_INIT(&sync, 1); if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, &sync)) { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ SYNC_WAIT(&sync); + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } else { /* completed before we had a chance to swap in the sync object */ WAIT_SYNC_SIGNALLED(&sync); @@ -487,6 +504,13 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) } opal_atomic_rmb(); } else { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req, NULL); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ while(!REQUEST_COMPLETE(req)) { opal_progress(); #if OPAL_ENABLE_FT_MPI @@ -497,6 +521,12 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) } #endif /* OPAL_ENABLE_FT_MPI */ } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } } diff --git a/ompi/request/request_dbg.h b/ompi/request/request_dbg.h index e6aa3757d06..6c95d224a1b 100644 --- a/ompi/request/request_dbg.h +++ b/ompi/request/request_dbg.h @@ -29,6 +29,7 @@ typedef enum { OMPI_REQUEST_NOOP, /**< A request that does nothing (e.g., to PROC_NULL) */ OMPI_REQUEST_COMM, /**< MPI-3 non-blocking communicator duplication */ OMPI_REQUEST_PART, /**< MPI-4 partitioned communication request */ + OMPI_REQUEST_CONT, /**< MPI-X continuation request */ OMPI_REQUEST_MAX /**< Maximum request type */ } ompi_request_type_t; diff --git a/opal/mca/threads/base/wait_sync.c b/opal/mca/threads/base/wait_sync.c index 2451419620e..53d85004dfe 100644 --- a/opal/mca/threads/base/wait_sync.c +++ b/opal/mca/threads/base/wait_sync.c @@ -98,7 +98,15 @@ int ompi_sync_wait_mt(ompi_wait_sync_t *sync) */ check_status: if (sync != wait_sync_list && num_thread_in_progress >= opal_max_thread_in_progress) { - opal_thread_internal_cond_wait(&sync->condition, &sync->lock); + if (0 < sync->num_req_need_progress) { + /* release the lock so that we can be signaled */ + opal_thread_internal_mutex_unlock(&sync->lock); + sync->progress_cb(); + /* retake the lock */ + opal_thread_internal_mutex_lock(&sync->lock); + } else { + opal_thread_internal_cond_wait(&sync->condition, &sync->lock); + } /** * At this point either the sync was completed in which case diff --git a/opal/mca/threads/wait_sync.h b/opal/mca/threads/wait_sync.h index c90e3d52a5c..f7bffc392e1 100644 --- a/opal/mca/threads/wait_sync.h +++ b/opal/mca/threads/wait_sync.h @@ -46,6 +46,8 @@ typedef struct ompi_wait_sync_t { opal_thread_internal_mutex_t lock; struct ompi_wait_sync_t *next; struct ompi_wait_sync_t *prev; + opal_progress_callback_t progress_cb; + opal_atomic_int32_t num_req_need_progress; volatile bool signaling; } ompi_wait_sync_t; @@ -119,6 +121,8 @@ static inline int sync_wait_st(ompi_wait_sync_t *sync) opal_thread_internal_cond_init(&(sync)->condition); \ opal_thread_internal_mutex_init(&(sync)->lock, false); \ } \ + (sync)->progress_cb = NULL; \ + (sync)->num_req_need_progress = 0; \ } while (0) /** diff --git a/test/continuations/Makefile.am b/test/continuations/Makefile.am new file mode 100644 index 00000000000..75a8ed59aa9 --- /dev/null +++ b/test/continuations/Makefile.am @@ -0,0 +1,33 @@ +# Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. +# +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +if PROJECT_OMPI + noinst_PROGRAMS = continuations continuations-mt continuations-persistent + continuations_SOURCES = continuations.c + continuations_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la + + continuations-mt_SOURCES = continuations-mt.c + continuations-mt_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations-mt_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la + + continuations-persistent_SOURCES = continuations-persistent.c + continuations-persistent_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations-persistent_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la +endif # PROJECT_OMPI + +distclean-local: + rm -rf *.dSYM .deps .libs *.log *.o *.trs $(noinst_PROGRAMS) Makefile + diff --git a/test/continuations/continutions-mt.c b/test/continuations/continutions-mt.c new file mode 100644 index 00000000000..5d7f99707c0 --- /dev/null +++ b/test/continuations/continutions-mt.c @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2016 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include +#include +#include +#include +#include +#include + +#include "mpi.h" +#include "mpi-ext.h" +//#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#define OMPI_HAVE_MPI_EXT_CONTINUE + +#ifdef OMPI_HAVE_MPI_EXT_CONTINUE + +/* Block a thread on a receive until we release it from the main thread */ +static void* thread_recv(void* data) { + MPI_Request req; + int val; + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Irecv(&val, 1, MPI_INT, rank, 1002, MPI_COMM_WORLD, &req); + MPI_Wait(&req, MPI_STATUS_IGNORE); + return NULL; +} + +static void complete_cnt_cb(MPI_Status *status, void *user_data) { + assert(user_data != NULL); + _Atomic int *cb_cnt = (_Atomic int*)user_data; + ++(*cb_cnt); +} + +int main(int argc, char *argv[]) +{ + MPI_Request cont_req, op_req, reqs[2]; + _Atomic int cb_cnt; + int val; + int rank, size; + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + assert(provided == MPI_THREAD_MULTIPLE); + + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + pthread_t thread; + + pthread_create(&thread, NULL, &thread_recv, NULL); + + /* give enough slack to allow the thread to enter the wait + * from now on the thread is stuck in MPI_Wait, owning progress + */ + sleep(2); + + /* initialize the continuation request */ + MPIX_Continue_init(&cont_req, MPI_INFO_NULL); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPI_STATUSES_IGNORE, cont_req); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + MPI_Request_free(&cont_req); + + /**************************************************************** + * Do the same thing, but with a poll-only continuation request + ****************************************************************/ + + MPI_Info info; + MPI_Info_create(&info); + MPI_Info_set(info, "mpi_continue_poll_only", "true"); + MPI_Info_set(info, "mpi_continue_enqueue_complete", "true"); + + /* initialize the continuation request */ + MPIX_Continue_init(&cont_req, info); + + MPI_Info_free(&info); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPI_STATUSES_IGNORE, cont_req); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + MPI_Request_free(&cont_req); + + /* release the blocked thread */ + MPI_Send(&val, 1, MPI_INT, rank, 1002, MPI_COMM_WORLD); + pthread_join(thread, NULL); + + MPI_Finalize(); + + return 0; +} +#else +int main(int argc, char *argv[]) +{ + return 77; +} +#endif /* HAVE_MEMKIND_H */ diff --git a/test/continuations/continutions.c b/test/continuations/continutions.c new file mode 100644 index 00000000000..b1309e5e3cc --- /dev/null +++ b/test/continuations/continutions.c @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2016 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include +#include +#include +#include + +#include "mpi.h" +#include "mpi-ext.h" + +#define OMPI_HAVE_MPI_EXT_CONTINUE + +#ifdef OMPI_HAVE_MPI_EXT_CONTINUE + +static void complete_cnt_cb(MPI_Status *status, void *user_data) { + assert(user_data != NULL); + printf("complete_cnt_cb \n"); + int *cb_cnt = (int*)user_data; + *cb_cnt = *cb_cnt + 1; +} + +int main(int argc, char *argv[]) +{ + MPI_Request cont_req, cont_req2, op_req, reqs[2]; + int cb_cnt; + int val; + int rank, size; + MPI_Init(&argc, &argv); + + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + /* initialize the continuation request */ + MPIX_Continue_init(&cont_req, MPI_INFO_NULL); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + //MPI_Waitall(2, reqs, MPI_STATUSES_IGNORE); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPI_STATUSES_IGNORE, cont_req); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + /** + * One send, one recv, two continuations in two continuation requests + */ + MPI_Info info; + MPI_Info_create(&info); + MPI_Info_set(info, "mpi_continue_poll_only", "true"); + MPI_Info_set(info, "mpi_continue_enqueue_complete", "true"); + + /* initialize a poll-only continuation request */ + MPIX_Continue_init(&cont_req2, info); + + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req2); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + printf("Waiting for poll-only cont request %p to complete\n", cont_req2); + MPI_Wait(&cont_req2, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + MPI_Request_free(&cont_req); + MPI_Request_free(&cont_req2); + MPI_Finalize(); + + return 0; +} +#else +int main(int argc, char *argv[]) +{ + return 77; +} +#endif /* HAVE_MEMKIND_H */