From 6c2dd1b91b459445147b63612863f6f84dacd352 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 5 Oct 2021 11:15:24 -0400 Subject: [PATCH 01/20] Implementation of the current state of MPI Continuations proposal Continuations provide a mechanism for attaching callbacks to outstanding operation requests. A call to MPIX_Continue takes a request, a function pointer, a user-provided data pointer, and a status object (or `MPI_STATUS_IGNORE`), along with a continution request: ``` MPI_Request req; // status object has to remain valid until the callback is invoked MPI_Status *status = malloc(sizeof(MPI_Status)); char *buf = ...; MPI_Irecv(buf, ..., MPI_ANY_SOURCE, ... &req); MPIX_Continue(&req, &complete_cb, buf, status, cont_req); assert(req == MPI_REQUEST_NULL); ``` The ownership of non-persistent requests is returned to MPI and the pointer to the request will be set to `MPI_REQUEST_NULL`. The callback is passed the status pointer and the user-provided data pointer: ``` void complete_cb(MPI_Status *status, void *user_data) { printf("Send completed\n"); char *buf = (char*)user_data; process_msg(buf, status->MPI_SOURCE); free(buf); // free the send buffer free(status); // free the status } ``` The status has to remain valid until the invocation of the callback and is set according to the operation before the callback is invoked. The continuation is *registered* with the provided contination request. The continuation request is a request allocated with `MPIX_Continue_init`: ``` MPIX_Continue_init(info, &cont_req); ``` Continuation requests may be used to test/wait for completion of all continuations registered with cont_req using `MPI_Test/Wait`. Supported info keys are: - "mpi_continue_poll_only": only execute continuations when MPI_Test/Wait is called on the continuation request (default: false) - "mpi_continue_enqueue_complete": if true, the continuation is executed immediately if the operations are already complete when MPIX_Continue is called. Execution is deferred otherwise (default: false) - "mpi_continue_max_poll": the maximum number of continuations to execute when calling MPI_Test on the continuation request (default: -1, meaning unlimited) A continuation may in turn be attached to a continuation request, in which case the continuation request will be executed once all continuations registered with the continuation request have completed. In addition to MPIX_Continue, the proposal also includes MPIX_Continueall which attaches a continuation to a set of requests such that the continuation is executed once all operations have completed. The implementation reflects the current state of the proposal and may change at any time, following discussions in the MPI Forum. The goal of this Open MPI extension is to provide a working implementation to the community to experiment with this API. Signed-off-by: Joseph Schuchart Signed-off-by: George Bosilca --- ompi/mpiext/continue/Makefile.am | 22 + ompi/mpiext/continue/c/Makefile.am | 41 ++ ompi/mpiext/continue/c/continuation.c | 596 ++++++++++++++++++ ompi/mpiext/continue/c/continuation.h | 78 +++ ompi/mpiext/continue/c/continue.c | 65 ++ ompi/mpiext/continue/c/continue_init.c | 58 ++ ompi/mpiext/continue/c/continueall.c | 76 +++ ompi/mpiext/continue/c/mpiext_continue_c.h | 22 + .../continue/c/mpiext_continue_module.c | 26 + ompi/mpiext/continue/c/profile/Makefile.am | 37 ++ ompi/mpiext/continue/configure.m4 | 35 + ompi/mpiext/continue/owner.txt | 7 + ompi/request/req_test.c | 50 +- ompi/request/req_wait.c | 72 +++ ompi/request/request_dbg.h | 1 + 15 files changed, 1184 insertions(+), 2 deletions(-) create mode 100644 ompi/mpiext/continue/Makefile.am create mode 100644 ompi/mpiext/continue/c/Makefile.am create mode 100644 ompi/mpiext/continue/c/continuation.c create mode 100644 ompi/mpiext/continue/c/continuation.h create mode 100644 ompi/mpiext/continue/c/continue.c create mode 100644 ompi/mpiext/continue/c/continue_init.c create mode 100644 ompi/mpiext/continue/c/continueall.c create mode 100644 ompi/mpiext/continue/c/mpiext_continue_c.h create mode 100644 ompi/mpiext/continue/c/mpiext_continue_module.c create mode 100644 ompi/mpiext/continue/c/profile/Makefile.am create mode 100644 ompi/mpiext/continue/configure.m4 create mode 100644 ompi/mpiext/continue/owner.txt diff --git a/ompi/mpiext/continue/Makefile.am b/ompi/mpiext/continue/Makefile.am new file mode 100644 index 00000000000..70cd9a7c85a --- /dev/null +++ b/ompi/mpiext/continue/Makefile.am @@ -0,0 +1,22 @@ +# -*- shell-script -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# This Makefile is not traversed during a normal "make all" in an OMPI +# build. It *is* traversed during "make dist", however. So you can +# put EXTRA_DIST targets in here. +# +# You can also use this as a convenience for building this MPI +# extension (i.e., "make all" in this directory to invoke "make all" +# in all the subdirectories). + +SUBDIRS = c + diff --git a/ompi/mpiext/continue/c/Makefile.am b/ompi/mpiext/continue/c/Makefile.am new file mode 100644 index 00000000000..017ba9616a6 --- /dev/null +++ b/ompi/mpiext/continue/c/Makefile.am @@ -0,0 +1,41 @@ +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_BUILD_MPI_PROFILING is enabled when we want our generated MPI_* symbols +# to be replaced by PMPI_*. +# In this directory, we need it to be 0 + +AM_CPPFLAGS = -DOMPI_BUILD_MPI_PROFILING=0 -DOMPI_COMPILING_FORTRAN_WRAPPERS=0 + +include $(top_srcdir)/Makefile.ompi-rules + +noinst_LTLIBRARIES = libmpiext_continue_c.la + +# This is where the top-level header file (that is included in +# ) must be installed. +ompidir = $(ompiincludedir)/mpiext + +# This is the header file that is installed. +nodist_ompi_HEADERS = mpiext_continue_c.h + +libmpiext_continue_c_la_SOURCES = \ + continuation.c \ + continue.c \ + continueall.c \ + continue_init.c \ + mpiext_continue_module.c + +libmpiext_continue_c_la_LDFLAGS = -module -avoid-version + +ompi_HEADERS = $(headers) + +MAINTAINERCLEANFILES = $(nodist_libmpiext_continue_c_la_SOURCES) + diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c new file mode 100644 index 00000000000..77a7760f87d --- /dev/null +++ b/ompi/mpiext/continue/c/continuation.c @@ -0,0 +1,596 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#include "ompi_config.h" +#include "opal/class/opal_fifo.h" +#include "opal/class/opal_free_list.h" +#include "opal/sys/atomic.h" +#include "ompi/mpiext/continue/c/continuation.h" + + +static opal_free_list_t ompi_continuation_freelist; +static opal_free_list_t ompi_request_cont_data_freelist; + +/** + * Continuation class + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_continuation_t); + +struct ompi_continuation_t { + opal_free_list_item_t super; /**< Base type */ + struct ompi_request_t *cont_req; /**< The continuation request this continuation is registered with */ + MPIX_Continue_cb_function *cont_cb; /**< The callback function to invoke */ + void *cont_data; /**< Continuation state provided by the user */ + MPI_Status *cont_status; /**< user-provided pointers to status objects */ + opal_atomic_int32_t num_active; /**< The number of active operation requests on this callback */ +}; + +/* Convenience typedef */ +typedef struct ompi_continuation_t ompi_continuation_t; + +static void ompi_continuation_construct(ompi_continuation_t* cont) +{ + cont->cont_req = NULL; + cont->cont_cb = NULL; + cont->cont_data = NULL; + cont->num_active = 0; +} + +static void ompi_continuation_destruct(ompi_continuation_t* cont) +{ + assert(cont->cont_req == NULL); + assert(cont->cont_cb == NULL); + assert(cont->cont_data == NULL); + assert(cont->num_active == 0); +} + +OBJ_CLASS_INSTANCE( + ompi_continuation_t, + opal_free_list_item_t, + ompi_continuation_construct, + ompi_continuation_destruct); + +/** + * Data block associated with requests + * The same structure is used for continuation requests and operation + * requests with attached continuations. + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_request_cont_data_t); + +struct ompi_request_cont_data_t { + opal_free_list_item_t super; + ompi_continuation_t *cont_obj; /**< User-defined continuation state */ + ompi_status_public_t *cont_status; /**< The status object to set before invoking continuation */ + opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */ + opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */ + opal_atomic_lock_t cont_lock; /**< Lock used for continuation requests */ + bool cont_global_progress; + bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */ + uint32_t continue_max_poll; /**< max number of local continuations to execute at once */ +}; + +/* Convenience typedef */ +typedef struct ompi_request_cont_data_t ompi_request_cont_data_t; + +OBJ_CLASS_INSTANCE( + ompi_request_cont_data_t, + opal_free_list_item_t, + NULL, NULL); + +/** + * List of completed requests that need the user-defined completion callback + * invoked. + */ +static opal_list_t continuation_list; + +static opal_mutex_t request_cont_lock; + +/** + * Flag indicating whether the progress callback has been registered. + */ +static bool progress_callback_registered = false; + + +static int ompi_continue_request_free(ompi_request_t** cont_req); + +static inline +void ompi_continue_cont_destroy(ompi_continuation_t *cont, ompi_request_t *cont_req) +{ + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; + assert(NULL != req_cont_data); + assert(OMPI_REQUEST_CONT == cont_req->req_type); + + const bool using_threads = opal_using_threads(); + if (using_threads) { + opal_atomic_lock(&req_cont_data->cont_lock); + } + int num_active = --req_cont_data->cont_num_active; + assert(num_active >= 0); + if (0 == num_active) { + assert(!REQUEST_COMPLETE(cont_req)); + opal_atomic_wmb(); + /* signal that all continuations were found complete */ + ompi_request_complete(cont_req, true); + } + if (using_threads) { + opal_atomic_unlock(&req_cont_data->cont_lock); + } + OBJ_RELEASE(cont_req); + +#ifdef OPAL_ENABLE_DEBUG + cont->cont_cb = NULL; + cont->cont_data = NULL; + cont->cont_req = NULL; +#endif // OPAL_ENABLE_DEBUG + opal_free_list_return(&ompi_continuation_freelist, &cont->super); +} + +/** + * Process a callback. Returns the callback object to the freelist. + */ +static inline +void ompi_continue_cont_invoke(ompi_continuation_t *cont) +{ + ompi_request_t *cont_req = cont->cont_req; + assert(NULL != cont_req); + assert(OMPI_REQUEST_CONT == cont_req->req_type); + + MPIX_Continue_cb_function *fn = cont->cont_cb; + void *cont_data = cont->cont_data; + MPI_Status *statuses = cont->cont_status; + fn(statuses, cont_data); + ompi_continue_cont_destroy(cont, cont_req); +} + +/* + * Allow multiple threads to progress callbacks concurrently + * but protect from recursive progressing + */ +static opal_thread_local int in_progress = 0; + +static +int ompi_continue_progress_some(const uint32_t max) +{ + + if (in_progress || opal_list_is_empty(&continuation_list)) return 0; + + uint32_t completed = 0; + in_progress = 1; + + do { + ompi_continuation_t *cb; + OPAL_THREAD_LOCK(&request_cont_lock); + cb = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); + OPAL_THREAD_UNLOCK(&request_cont_lock); + if (NULL == cb) break; + ompi_continue_cont_invoke(cb); + } while (max > ++completed); + + in_progress = 0; + + return completed; +} + +static int ompi_continue_progress_callback() +{ + return ompi_continue_progress_some(1); +} + +int ompi_continue_progress_request(ompi_request_t *cont_req) +{ + ompi_request_cont_data_t *req_cont_data; + if (in_progress) return 0; + req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; + if (NULL == req_cont_data->cont_complete_list) { + /* progress as many as possible */ + return ompi_continue_progress_some(req_cont_data->continue_max_poll); + } + if (opal_list_is_empty(req_cont_data->cont_complete_list)) { + return 0; + } + + in_progress = 1; + + uint32_t max_poll = req_cont_data->continue_max_poll; + + uint32_t completed = 0; + const bool using_threads = opal_using_threads(); + while (max_poll > completed && !opal_list_is_empty(req_cont_data->cont_complete_list)) { + ompi_continuation_t *cb; + if (using_threads) { + opal_atomic_lock(&req_cont_data->cont_lock); + cb = (ompi_continuation_t *) opal_list_remove_first(req_cont_data->cont_complete_list); + opal_atomic_unlock(&req_cont_data->cont_lock); + } else { + cb = (ompi_continuation_t *) opal_list_remove_first(req_cont_data->cont_complete_list); + } + if (NULL == cb) break; + + ompi_continue_cont_invoke(cb); + completed++; + } + + in_progress = 0; + + return completed; +} + + +int ompi_continue_register_request_progress(ompi_request_t *cont_req) +{ + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; + + if (NULL == req_cont_data->cont_complete_list) return OMPI_SUCCESS; + + const bool using_threads = opal_using_threads(); + if (using_threads) { + OPAL_THREAD_LOCK(&request_cont_lock); + /* lock needed to sync with ompi_request_cont_enqueue_complete */ + opal_atomic_lock(&req_cont_data->cont_lock); + } + + /* signal that from now on all continuations should go into the global queue */ + req_cont_data->cont_global_progress = true; + + /* move all complete local continuations into the global queue */ + opal_list_join(&continuation_list, opal_list_get_begin(&continuation_list), + req_cont_data->cont_complete_list); + + if (using_threads) { + opal_atomic_unlock(&req_cont_data->cont_lock); + OPAL_THREAD_UNLOCK(&request_cont_lock); + } + + return OMPI_SUCCESS; +} + +int ompi_continue_deregister_request_progress(ompi_request_t *cont_req) +{ + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; + if (opal_using_threads()) { + /* lock needed to sync with ompi_request_cont_enqueue_complete */ + opal_atomic_lock(&req_cont_data->cont_lock); + req_cont_data->cont_global_progress = false; + opal_atomic_unlock(&req_cont_data->cont_lock); + } else { + req_cont_data->cont_global_progress = false; + } + + return OMPI_SUCCESS; +} + +int ompi_continuation_init(void) +{ + OBJ_CONSTRUCT(&request_cont_lock, opal_mutex_t); + OBJ_CONSTRUCT(&continuation_list, opal_list_t); + + OBJ_CONSTRUCT(&ompi_continuation_freelist, opal_free_list_t); + opal_free_list_init(&ompi_continuation_freelist, + sizeof(ompi_continuation_t), + opal_cache_line_size, + OBJ_CLASS(ompi_continuation_t), + 0, opal_cache_line_size, + 0, -1 , 8, NULL, 0, NULL, NULL, NULL); + + OBJ_CONSTRUCT(&ompi_request_cont_data_freelist, opal_free_list_t); + opal_free_list_init(&ompi_request_cont_data_freelist, + sizeof(ompi_request_cont_data_t), + opal_cache_line_size, + OBJ_CLASS(ompi_request_cont_data_t), + 0, opal_cache_line_size, + 0, -1 , 8, NULL, 0, NULL, NULL, NULL); + return OMPI_SUCCESS; +} + +int ompi_continuation_fini(void) +{ + if (progress_callback_registered) { + opal_progress_unregister(&ompi_continue_progress_callback); + } + + if (!opal_list_is_empty(&continuation_list)) { + fprintf(stderr, "WARN: Incomplete continuations found in during shutdown, go fix your application!\n"); + } + OBJ_DESTRUCT(&continuation_list); + + OBJ_DESTRUCT(&request_cont_lock); + OBJ_DESTRUCT(&ompi_continuation_freelist); + OBJ_DESTRUCT(&ompi_request_cont_data_freelist); + + return OMPI_SUCCESS; +} + +/** + * Enqueue the continuation for later invocation. + */ +static void +ompi_continue_enqueue_runnable(ompi_continuation_t *cont) +{ + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)cont->cont_req->req_complete_cb_data; + int retry; + do { + retry = 0; + if (NULL != req_cont_data->cont_complete_list + && !req_cont_data->cont_global_progress) { + opal_atomic_lock(&req_cont_data->cont_lock); + if (OPAL_UNLIKELY(req_cont_data->cont_global_progress)) { + opal_atomic_unlock(&req_cont_data->cont_lock); + /* try again, this time target the global list */ + retry = 1; + continue; + } + opal_list_append(req_cont_data->cont_complete_list, &cont->super.super); + opal_atomic_unlock(&req_cont_data->cont_lock); + } else { + OPAL_THREAD_LOCK(&request_cont_lock); + opal_list_append(&continuation_list, &cont->super.super); + if (OPAL_UNLIKELY(!progress_callback_registered)) { + /* TODO: Ideally, we want to ensure that the callback is called *after* + * all the other progress callbacks are done so that any + * completions have happened before we attempt to execute + * callbacks. There doesn't seem to exist the infrastructure though. + */ + opal_progress_register(&ompi_continue_progress_callback); + progress_callback_registered = true; + } + OPAL_THREAD_UNLOCK(&request_cont_lock); + } + } while (retry); +} + +/** + * Create and initialize a continuation object. + */ +static inline +ompi_continuation_t *ompi_continue_cont_create( + int count, + ompi_request_t *cont_req, + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + MPI_Status *cont_status) +{ + ompi_continuation_t *cont; + cont = (ompi_continuation_t *)opal_free_list_get(&ompi_continuation_freelist); + cont->cont_req = cont_req; + cont->cont_cb = cont_cb; + cont->cont_data = cont_data; + cont->num_active = count; + cont->cont_status = cont_status; + + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; + /* signal that the continuation request has a new continuation */ + OBJ_RETAIN(cont_req); + + const bool using_threads = opal_using_threads(); + if (using_threads) { + opal_atomic_lock(&req_cont_data->cont_lock); + } + int32_t num_active = req_cont_data->cont_num_active++; + if (num_active == 0) { + /* (re)activate the continuation request upon first registration */ + assert(REQUEST_COMPLETE(cont_req)); + req_cont_data->cont_obj = NULL; + cont_req->req_complete = REQUEST_PENDING; + cont_req->req_state = OMPI_REQUEST_ACTIVE; + } + if (using_threads) { + opal_atomic_unlock(&req_cont_data->cont_lock); + } + + return cont; +} + +static int request_completion_cb(ompi_request_t *request) +{ + assert(NULL != request->req_complete_cb_data); + int rc = 0; + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)request->req_complete_cb_data; + + ompi_continuation_t *cont = req_cont_data->cont_obj; + req_cont_data->cont_obj = NULL; + + /* set the status object */ + if (NULL != req_cont_data->cont_status) { + *req_cont_data->cont_status = request->req_status; + req_cont_data->cont_status = NULL; + } + + int32_t num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, -1); + + if (0 == num_active) { + /* the continuation is ready for execution */ + ompi_continue_enqueue_runnable(cont); + } + + /* inactivate / free the request */ + if (request->req_persistent) { + if (OMPI_REQUEST_CONT == request->req_type && opal_using_threads()) { + /* handle with care: another thread may register a new continuation already */ + opal_atomic_lock(&req_cont_data->cont_lock); + if (req_cont_data->cont_num_active == 0) { + request->req_state = OMPI_REQUEST_INACTIVE; + } + opal_atomic_unlock(&req_cont_data->cont_lock); + } else { + request->req_state = OMPI_REQUEST_INACTIVE; + } + } else { + /* release the request object and let the caller know */ + ompi_request_free(&request); + rc = 1; + } + + /* continuation requests keep their request_cont_data */ + if (OMPI_REQUEST_CONT != request->req_type) { + request->req_complete_cb_data = NULL; + opal_free_list_return(&ompi_request_cont_data_freelist, &req_cont_data->super); + } + + return rc; +} + +int ompi_continue_attach( + ompi_request_t *cont_req, + const int count, + ompi_request_t *requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + ompi_status_public_t statuses[]) +{ + assert(OMPI_REQUEST_CONT == cont_req->req_type); + + ompi_continuation_t *cont = ompi_continue_cont_create(count, cont_req, cont_cb, + cont_data, statuses); + + opal_atomic_wmb(); + + int32_t num_registered = 0; + for (int i = 0; i < count; ++i) { + ompi_request_t *request = requests[i]; + if (MPI_REQUEST_NULL != request) { + if (&ompi_request_empty == request) { + /* empty request: do not modify, just copy out the status */ + if (statuses != MPI_STATUSES_IGNORE) { + statuses[i] = request->req_status; + } + requests[i] = MPI_REQUEST_NULL; + } else { + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)request->req_complete_cb_data; + if (!req_cont_data) { + req_cont_data = (ompi_request_cont_data_t *)opal_free_list_get(&ompi_request_cont_data_freelist); + /* NOTE: request->req_complete_cb_data will be set in ompi_request_set_callback */ + } else { + assert(request->req_type == OMPI_REQUEST_CONT); + } + req_cont_data->cont_status = NULL; + if (statuses != MPI_STATUSES_IGNORE) { + req_cont_data->cont_status = &statuses[i]; + } + + req_cont_data->cont_obj = cont; + + assert(request->req_state == OMPI_REQUEST_ACTIVE || request->req_state == OMPI_REQUEST_INACTIVE); + + ompi_request_set_callback(request, &request_completion_cb, req_cont_data); + ++num_registered; + + /* take ownership of any non-persistent request */ + if (!request->req_persistent) + { + requests[i] = MPI_REQUEST_NULL; + } + } + + } + } + + int num_complete = count - num_registered; + int32_t last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, + -num_complete); + if (0 == last_num_active && 0 < num_complete) { + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; + if (req_cont_data->cont_enqueue_complete) { + /* enqueue for later processing */ + ompi_continue_enqueue_runnable(cont); + } else { + /** + * Execute the continuation immediately + */ + ompi_continue_cont_invoke(cont); + } + } + + return OMPI_SUCCESS; +} + +/** + * Continuation request management + */ +int ompi_continue_allocate_request(ompi_request_t **cont_req, ompi_info_t *info) +{ + ompi_request_t *res = OBJ_NEW(ompi_request_t); + + if (OPAL_LIKELY(NULL != cont_req)) { + res->req_type = OMPI_REQUEST_CONT; + res->req_complete = REQUEST_COMPLETED; + res->req_state = OMPI_REQUEST_INACTIVE; + res->req_persistent = true; + res->req_free = &ompi_continue_request_free; + res->req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ + + /* Continuation requests have a request_cont_data object that persists throughout their lifetime */ + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)opal_free_list_get(&ompi_request_cont_data_freelist); + res->req_complete_cb_data = req_cont_data; + opal_atomic_lock_init(&req_cont_data->cont_lock, 0); + + int flag; + bool test_poll = false; + ompi_info_get_bool(info, "mpi_continue_poll_only", &test_poll, &flag); + + if (flag && test_poll) { + req_cont_data->cont_complete_list = OBJ_NEW(opal_list_t); + } else { + req_cont_data->cont_complete_list = NULL; + } + + bool enqueue_complete = false; + ompi_info_get_bool(info, "mpi_continue_enqueue_complete", &enqueue_complete, &flag); + req_cont_data->cont_enqueue_complete = (flag && enqueue_complete); + + req_cont_data->continue_max_poll = INT32_MAX; + opal_cstring_t *value_str; + ompi_info_get(info, "mpi_continue_max_poll", &value_str, &flag); + if (flag) { + int max_poll = atoi(value_str->string); + OBJ_RELEASE(value_str); + if (max_poll > 0) { + req_cont_data->continue_max_poll = max_poll; + } + } + *cont_req = res; + + return MPI_SUCCESS; + } + + return OMPI_ERR_OUT_OF_RESOURCE; +} + +static int ompi_continue_request_free(ompi_request_t** cont_req) +{ + assert(OMPI_REQUEST_CONT == (*cont_req)->req_type); + assert(NULL != (*cont_req)->req_complete_cb_data); + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)(*cont_req)->req_complete_cb_data; + OMPI_REQUEST_FINI(*cont_req); + (*cont_req)->req_state = OMPI_REQUEST_INVALID; + (*cont_req)->req_complete_cb_data = NULL; + opal_free_list_return(&ompi_request_cont_data_freelist, &req_cont_data->super); + if (NULL != req_cont_data->cont_complete_list) { + OBJ_RELEASE(req_cont_data->cont_complete_list); + req_cont_data->cont_complete_list = NULL; + } + OBJ_RELEASE(*cont_req); + *cont_req = &ompi_request_null.request; + return OMPI_SUCCESS; +} diff --git a/ompi/mpiext/continue/c/continuation.h b/ompi/mpiext/continue/c/continuation.h new file mode 100644 index 00000000000..42648a917d1 --- /dev/null +++ b/ompi/mpiext/continue/c/continuation.h @@ -0,0 +1,78 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_CONTINUATION_H +#define OMPI_CONTINUATION_H + +#include "ompi_config.h" +#include "ompi/info/info.h" +#include "ompi/request/request.h" +#include "mpi.h" +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + + +BEGIN_C_DECLS + +/** + * Initialize the user-callback infrastructure. + */ +int ompi_continuation_init(void); + +/** + * Finalize the user-callback infrastructure. + */ +int ompi_continuation_fini(void); + +/** + * Register a request with local completion list for progressing through + * the progress engine. + */ +int ompi_continue_register_request_progress(ompi_request_t *cont_req); + +/** + * Deregister a request with local completion list from progressing through + * the progress engine. + */ +int ompi_continue_deregister_request_progress(ompi_request_t *cont_req); + +/** + * Progress a continuation request that has local completions. + */ +int ompi_continue_progress_request(ompi_request_t *cont_req); + +/** + * Attach a continuation to a set of operations represented by \c requests. + * The \c statuses will be set before the \c cont_cb callback is invoked and + * passed together with \c cont_data to the callback. Passing \c MPI_STATUSES_IGNORE + * is valid, in which case statuses are ignored. + * The continuation is registered with the continuation request \c cont_req, which + * can be used to query for and progress outstanding continuations. + */ +int ompi_continue_attach( + ompi_request_t *cont_req, + int count, + ompi_request_t *requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + ompi_status_public_t statuses[]); + + +/** + * Allocate a new continuation request. + */ +int ompi_continue_allocate_request(ompi_request_t **cont_req, ompi_info_t *info); + +END_C_DECLS + +#endif // OMPI_CONTINUATION_H diff --git a/ompi/mpiext/continue/c/continue.c b/ompi/mpiext/continue/c/continue.c new file mode 100644 index 00000000000..cb01a84250d --- /dev/null +++ b/ompi/mpiext/continue/c/continue.c @@ -0,0 +1,65 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continue = PMPIX_Continue +#endif +#define MPIX_Continue PMPIX_Continue +#endif + +static const char FUNC_NAME[] = "MPIX_Continue"; + +int MPIX_Continue( + MPI_Request *request, + MPIX_Continue_cb_function *cont_cb, + void *cb_data, + MPI_Status *status, + MPI_Request cont_req) +{ + int rc; + + MEMCHECKER( + memchecker_request(request); + ); + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == request) { + rc = MPI_ERR_REQUEST; + } + if (MPI_REQUEST_NULL == cont_req || OMPI_REQUEST_CONT != cont_req->req_type) { + rc = MPI_ERR_REQUEST; + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + rc = ompi_continue_attach(cont_req, 1, request, cont_cb, cb_data, + MPI_STATUS_IGNORE == status ? MPI_STATUSES_IGNORE : status); + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/continue_init.c b/ompi/mpiext/continue/c/continue_init.c new file mode 100644 index 00000000000..e7d1b4d02c7 --- /dev/null +++ b/ompi/mpiext/continue/c/continue_init.c @@ -0,0 +1,58 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continue_init = PMPIX_Continue_init +#endif +#define MPIX_Continue_init PMPIX_Continue_init +#endif + +static const char FUNC_NAME[] = "MPIX_Continue_init"; + +int MPIX_Continue_init(MPI_Request *cont_req, MPI_Info info) +{ + int rc = MPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == cont_req) { + rc = MPI_ERR_ARG; + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + + ompi_request_t *res; + rc = ompi_continue_allocate_request(&res, info); + + if (MPI_SUCCESS == rc) { + *cont_req = res; + } + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/continueall.c b/ompi/mpiext/continue/c/continueall.c new file mode 100644 index 00000000000..b7c15079b67 --- /dev/null +++ b/ompi/mpiext/continue/c/continueall.c @@ -0,0 +1,76 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continueall = PMPIX_Continueall +#endif +#define MPIX_Continueall PMPIX_Continueall +#endif + +static const char FUNC_NAME[] = "MPIX_Continueall"; + +int MPIX_Continueall( + int count, + MPI_Request requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + MPI_Status statuses[], + MPI_Request cont_req) +{ + int rc; + + MEMCHECKER( + for (int j = 0; j < count; j++){ + memchecker_request(&requests[j]); + } + ); + + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (MPI_REQUEST_NULL == cont_req || OMPI_REQUEST_CONT != cont_req->req_type) { + rc = MPI_ERR_REQUEST; + } + if( (NULL == requests) && (0 != count) ) { + rc = MPI_ERR_REQUEST; + } else { + for (int i = 0; i < count; i++) { + if (NULL == requests[i]) { + rc = MPI_ERR_REQUEST; + break; + } + } + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + rc = ompi_continue_attach(cont_req, count, requests, cont_cb, + cont_data, statuses); + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/mpiext_continue_c.h b/ompi/mpiext/continue/c/mpiext_continue_c.h new file mode 100644 index 00000000000..e3dceba1a15 --- /dev/null +++ b/ompi/mpiext/continue/c/mpiext_continue_c.h @@ -0,0 +1,22 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include + +typedef void (MPIX_Continue_cb_function)(MPI_Status *statuses, void *user_data); +OMPI_DECLSPEC int MPIX_Continue_init(MPI_Request *cont_req, MPI_Info info); +OMPI_DECLSPEC int MPIX_Continue(MPI_Request *request, MPIX_Continue_cb_function *cb, void *cb_data, + MPI_Status *status, MPI_Request cont_req); +OMPI_DECLSPEC int MPIX_Continueall(int count, MPI_Request request[], MPIX_Continue_cb_function *cb, void *cb_data, + MPI_Status status[], MPI_Request cont_req); diff --git a/ompi/mpiext/continue/c/mpiext_continue_module.c b/ompi/mpiext/continue/c/mpiext_continue_module.c new file mode 100644 index 00000000000..05db810d6de --- /dev/null +++ b/ompi/mpiext/continue/c/mpiext_continue_module.c @@ -0,0 +1,26 @@ +/** + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mpiext/mpiext.h" +#include "ompi/mpiext/continue/c/continuation.h" + +/* + * Similar to Open MPI components, a well-known struct provides + * function pointers to the extension's init/fini hooks. The struct + * must be a global symbol of the form ompi_mpiext_ and be + * of type ompi_mpiext_component_t. + */ +ompi_mpiext_component_t ompi_mpiext_continue = { + ompi_continuation_init, + ompi_continuation_fini +}; diff --git a/ompi/mpiext/continue/c/profile/Makefile.am b/ompi/mpiext/continue/c/profile/Makefile.am new file mode 100644 index 00000000000..1eab74e4be6 --- /dev/null +++ b/ompi/mpiext/continue/c/profile/Makefile.am @@ -0,0 +1,37 @@ +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_BUILD_MPI_PROFILING is enabled when we want our generated MPI_* symbols +# to be replaced by PMPI_*. +# In this directory, we need it to be 0 + +AM_CPPFLAGS = -DOMPI_BUILD_MPI_PROFILING=1 + +noinst_LTLIBRARIES = libpmpiext_continue_c.la + +nodist_libpmpiext_continue_c_la_SOURCES = \ + pcontinue.c \ + pcontinueall.c \ + pcontinue_init.c + +# +# Sym link in the sources from the real MPI directory +# +$(nodist_libpmpiext_continue_c_la_SOURCES): + $(OMPI_V_LN_S) if test ! -r $@ ; then \ + pname=`echo $@ | cut -b '2-'` ; \ + $(LN_S) $(top_srcdir)/ompi/mpiext/continue/c/$$pname $@ ; \ + fi + + +# These files were created by targets above + +MAINTAINERCLEANFILES = $(nodist_libpmpiext_continue_c_la_SOURCES) diff --git a/ompi/mpiext/continue/configure.m4 b/ompi/mpiext/continue/configure.m4 new file mode 100644 index 00000000000..be233124245 --- /dev/null +++ b/ompi/mpiext/continue/configure.m4 @@ -0,0 +1,35 @@ +# -*- shell-script -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_MPIEXT_continue_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([OMPI_MPIEXT_continue_CONFIG],[ + AC_CONFIG_FILES([ompi/mpiext/continue/Makefile]) + AC_CONFIG_FILES([ompi/mpiext/continue/c/Makefile]) + AC_CONFIG_FILES([ompi/mpiext/continue/c/profile/Makefile]) + + # This example can always build, so we just execute $1 if it was + # requested. + AS_IF([test "$ENABLE_continue" = "1" || \ + test "$ENABLE_EXT_ALL" = "1"], + [$1], + [$2]) + + AS_IF([test "$ENABLE_continue" = "1" || \ + test "$ENABLE_EXT_ALL" = "1"], + [AC_DEFINE_UNQUOTED([OMPI_HAVE_MPI_EXT_CONTINUE], [1], + [Whether MPI Continuations are enabled])], + []) +])dnl + +# we need init/finalize +AC_DEFUN([OMPI_MPIEXT_continue_NEED_INIT], [1]) diff --git a/ompi/mpiext/continue/owner.txt b/ompi/mpiext/continue/owner.txt new file mode 100644 index 00000000000..bc6c5ecb028 --- /dev/null +++ b/ompi/mpiext/continue/owner.txt @@ -0,0 +1,7 @@ +# +# owner/status file +# owner: institution that is responsible for this package +# status: e.g. active, maintenance, unmaintained +# +owner: UTK +status: active diff --git a/ompi/request/req_test.c b/ompi/request/req_test.c index cd04645a0c0..edf44eabdcc 100644 --- a/ompi/request/req_test.c +++ b/ompi/request/req_test.c @@ -26,6 +26,10 @@ #include "ompi/request/request_default.h" #include "ompi/request/grequest.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + int ompi_request_default_test(ompi_request_t ** rptr, int *completed, ompi_status_public_t * status ) @@ -87,7 +91,17 @@ int ompi_request_default_test(ompi_request_t ** rptr, * leaving. We will call the opal_progress only once per call. */ ++do_it_once; - if (0 != opal_progress()) { + int rc = 0; + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + rc = ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + + if (rc != 0 || 0 != opal_progress()) { goto recheck_request_status; } } @@ -155,6 +169,17 @@ int ompi_request_default_test_any( return MPI_ERR_PROC_FAILED_PENDING; } #endif /* OPAL_ENABLE_FT_MPI */ + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + /* requery the request */ + --i; + --rptr; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE } /* Only fall through here if we found nothing */ @@ -194,6 +219,15 @@ int ompi_request_default_test_all( num_completed++; continue; } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + #if OPAL_ENABLE_FT_MPI /* Check for dead requests due to process failure */ /* Special case for MPI_ANY_SOURCE */ @@ -217,8 +251,11 @@ int ompi_request_default_test_all( } } #endif /* OPAL_ENABLE_PROGRESS_THREADS */ - /* short-circuit */ + +#if !OMPI_HAVE_MPI_EXT_CONTINUE + /* short-circuit, unless there may be continuation requests */ break; +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } if (num_completed != count) { @@ -330,6 +367,7 @@ int ompi_request_default_test_some( indices[num_requests_done++] = i; continue; } + #if OPAL_ENABLE_FT_MPI /* Check for dead requests due to process failure */ /* Special case for MPI_ANY_SOURCE - Error managed below */ @@ -338,6 +376,14 @@ int ompi_request_default_test_some( indices[num_requests_done++] = i; } #endif /* OPAL_ENABLE_FT_MPI */ + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE } /* diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index 14a8dcbf134..056d140a2ea 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -31,14 +31,33 @@ #include "ompi/request/request_default.h" #include "ompi/request/grequest.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + int ompi_request_default_wait( ompi_request_t ** req_ptr, ompi_status_public_t * status) { ompi_request_t *req = *req_ptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + + ompi_request_wait_completion(req); +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI /* Special case for MPI_ANY_SOURCE */ if( MPI_ERR_PROC_FAILED_PENDING == req->req_status.MPI_ERROR ) { @@ -91,6 +110,9 @@ int ompi_request_default_wait_any(size_t count, int rc = OMPI_SUCCESS; ompi_request_t *request=NULL; ompi_wait_sync_t sync; +#if OMPI_HAVE_MPI_EXT_CONTINUE + bool have_cont_req = false; +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ if (OPAL_UNLIKELY(0 == count)) { *index = MPI_UNDEFINED; @@ -106,6 +128,13 @@ int ompi_request_default_wait_any(size_t count, request = requests[i]; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + have_cont_req = true; + ompi_continue_register_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + /* Check for null or completed persistent request. For * MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE. */ @@ -144,6 +173,19 @@ int ompi_request_default_wait_any(size_t count, rc = SYNC_WAIT(&sync); after_sync_wait: + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (have_cont_req) { + have_cont_req = false; + for (i = 0; i < count; i++) { + request = requests[i]; + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } + } + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + /* recheck the complete status and clean up the sync primitives. * Do it backward to return the earliest complete request to the * user. @@ -252,6 +294,12 @@ int ompi_request_default_wait_all( size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_register_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { failed++; @@ -308,6 +356,12 @@ int ompi_request_default_wait_all( size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { OMPI_COPY_STATUS(&statuses[i], ompi_status_empty, true); continue; @@ -372,6 +426,12 @@ int ompi_request_default_wait_all( size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { rc = ompi_status_empty.MPI_ERROR; goto absorb_error_and_continue; @@ -490,6 +550,12 @@ int ompi_request_default_wait_some(size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_register_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { num_requests_done++; @@ -523,6 +589,12 @@ int ompi_request_default_wait_some(size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { continue; } diff --git a/ompi/request/request_dbg.h b/ompi/request/request_dbg.h index e6aa3757d06..6c95d224a1b 100644 --- a/ompi/request/request_dbg.h +++ b/ompi/request/request_dbg.h @@ -29,6 +29,7 @@ typedef enum { OMPI_REQUEST_NOOP, /**< A request that does nothing (e.g., to PROC_NULL) */ OMPI_REQUEST_COMM, /**< MPI-3 non-blocking communicator duplication */ OMPI_REQUEST_PART, /**< MPI-4 partitioned communication request */ + OMPI_REQUEST_CONT, /**< MPI-X continuation request */ OMPI_REQUEST_MAX /**< Maximum request type */ } ompi_request_type_t; From 2eabc6f98e02013efe4c320ceef11fe885b192de Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 18 Oct 2021 18:37:22 -0400 Subject: [PATCH 02/20] Move parts of ompi_request_cont_data_t into custom request struct Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 272 ++++++++++++++------------ 1 file changed, 145 insertions(+), 127 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 77a7760f87d..ae425ac60bd 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -23,18 +23,24 @@ static opal_free_list_t ompi_continuation_freelist; static opal_free_list_t ompi_request_cont_data_freelist; +/* Forward-decl */ +typedef struct ompi_cont_request_t ompi_cont_request_t; + +static int ompi_continue_request_free(ompi_request_t** cont_req); + /** - * Continuation class + * Continuation class containing the callback, callback data, status, + * and number of outstanding operation requests. */ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_continuation_t); struct ompi_continuation_t { - opal_free_list_item_t super; /**< Base type */ - struct ompi_request_t *cont_req; /**< The continuation request this continuation is registered with */ - MPIX_Continue_cb_function *cont_cb; /**< The callback function to invoke */ - void *cont_data; /**< Continuation state provided by the user */ - MPI_Status *cont_status; /**< user-provided pointers to status objects */ - opal_atomic_int32_t num_active; /**< The number of active operation requests on this callback */ + opal_free_list_item_t super; /**< Base type */ + struct ompi_cont_request_t *cont_req; /**< The continuation request this continuation is registered with */ + MPIX_Continue_cb_function *cont_cb; /**< The callback function to invoke */ + void *cont_data; /**< Continuation state provided by the user */ + MPI_Status *cont_status; /**< user-provided pointers to status objects */ + opal_atomic_int32_t num_active; /**< The number of active operation requests on this callback */ }; /* Convenience typedef */ @@ -62,6 +68,56 @@ OBJ_CLASS_INSTANCE( ompi_continuation_construct, ompi_continuation_destruct); + +/** + * Continuation request, derived from an OMPI request. Continuation request + * keep track of registered continuations and complete once no active + * continuations are registered. + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_cont_request_t); +struct ompi_cont_request_t { + ompi_request_t super; + opal_atomic_lock_t cont_lock; /**< Lock used completing/restarting the cont request */ + bool cont_global_progress; + bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */ + opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */ + uint32_t continue_max_poll; /**< max number of local continuations to execute at once */ + opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */ +}; + +static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) +{ + OMPI_REQUEST_INIT(&cont_req->super, true); + cont_req->super.req_type = OMPI_REQUEST_CONT; + cont_req->super.req_complete = REQUEST_COMPLETED; + cont_req->super.req_state = OMPI_REQUEST_INACTIVE; + cont_req->super.req_persistent = true; + cont_req->super.req_free = &ompi_continue_request_free; + cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ + opal_atomic_lock_init(&cont_req->cont_lock, false); + cont_req->cont_enqueue_complete = false; + cont_req->cont_global_progress = false; + cont_req->cont_num_active = 0; + cont_req->continue_max_poll = UINT32_MAX; + cont_req->cont_complete_list = NULL; +} + +static void ompi_cont_request_destruct(ompi_cont_request_t* cont_req) +{ + OMPI_REQUEST_FINI(&cont_req->super); + assert(cont_req->cont_num_active == 0); + if (NULL != cont_req->cont_complete_list) { + OPAL_LIST_RELEASE(cont_req->cont_complete_list); + cont_req->cont_complete_list = NULL; + } +} + +OBJ_CLASS_INSTANCE( + ompi_cont_request_t, + ompi_request_t, + ompi_cont_request_construct, + ompi_cont_request_destruct); + /** * Data block associated with requests * The same structure is used for continuation requests and operation @@ -73,12 +129,6 @@ struct ompi_request_cont_data_t { opal_free_list_item_t super; ompi_continuation_t *cont_obj; /**< User-defined continuation state */ ompi_status_public_t *cont_status; /**< The status object to set before invoking continuation */ - opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */ - opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */ - opal_atomic_lock_t cont_lock; /**< Lock used for continuation requests */ - bool cont_global_progress; - bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */ - uint32_t continue_max_poll; /**< max number of local continuations to execute at once */ }; /* Convenience typedef */ @@ -102,31 +152,26 @@ static opal_mutex_t request_cont_lock; */ static bool progress_callback_registered = false; - -static int ompi_continue_request_free(ompi_request_t** cont_req); - static inline void ompi_continue_cont_destroy(ompi_continuation_t *cont, ompi_request_t *cont_req) { - ompi_request_cont_data_t *req_cont_data; - req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; - assert(NULL != req_cont_data); - assert(OMPI_REQUEST_CONT == cont_req->req_type); + ompi_cont_request_t *cont_req = cont->cont_req; + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); const bool using_threads = opal_using_threads(); if (using_threads) { - opal_atomic_lock(&req_cont_data->cont_lock); + opal_atomic_lock(&cont_req->cont_lock); } - int num_active = --req_cont_data->cont_num_active; + int num_active = --cont_req->cont_num_active; assert(num_active >= 0); if (0 == num_active) { - assert(!REQUEST_COMPLETE(cont_req)); + assert(!REQUEST_COMPLETE(&cont_req->super)); opal_atomic_wmb(); /* signal that all continuations were found complete */ - ompi_request_complete(cont_req, true); + ompi_request_complete(&cont_req->super, true); } if (using_threads) { - opal_atomic_unlock(&req_cont_data->cont_lock); + opal_atomic_unlock(&cont_req->cont_lock); } OBJ_RELEASE(cont_req); @@ -144,9 +189,9 @@ void ompi_continue_cont_destroy(ompi_continuation_t *cont, ompi_request_t *cont_ static inline void ompi_continue_cont_invoke(ompi_continuation_t *cont) { - ompi_request_t *cont_req = cont->cont_req; + ompi_cont_request_t *cont_req = cont->cont_req; assert(NULL != cont_req); - assert(OMPI_REQUEST_CONT == cont_req->req_type); + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); MPIX_Continue_cb_function *fn = cont->cont_cb; void *cont_data = cont->cont_data; @@ -155,10 +200,10 @@ void ompi_continue_cont_invoke(ompi_continuation_t *cont) ompi_continue_cont_destroy(cont, cont_req); } -/* - * Allow multiple threads to progress callbacks concurrently - * but protect from recursive progressing - */ +/** + * Allow multiple threads to progress callbacks concurrently + * but protect from recursive progressing + */ static opal_thread_local int in_progress = 0; static @@ -189,33 +234,32 @@ static int ompi_continue_progress_callback() return ompi_continue_progress_some(1); } -int ompi_continue_progress_request(ompi_request_t *cont_req) +int ompi_continue_progress_request(ompi_request_t *req) { - ompi_request_cont_data_t *req_cont_data; if (in_progress) return 0; - req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; - if (NULL == req_cont_data->cont_complete_list) { + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + if (NULL == cont_req->cont_complete_list) { /* progress as many as possible */ return ompi_continue_progress_some(req_cont_data->continue_max_poll); } - if (opal_list_is_empty(req_cont_data->cont_complete_list)) { + if (opal_list_is_empty(cont_req->cont_complete_list)) { return 0; } in_progress = 1; - uint32_t max_poll = req_cont_data->continue_max_poll; + const uint32_t max_poll = cont_req->continue_max_poll; uint32_t completed = 0; const bool using_threads = opal_using_threads(); - while (max_poll > completed && !opal_list_is_empty(req_cont_data->cont_complete_list)) { + while (max_poll > completed && !opal_list_is_empty(cont_req->cont_complete_list)) { ompi_continuation_t *cb; if (using_threads) { - opal_atomic_lock(&req_cont_data->cont_lock); - cb = (ompi_continuation_t *) opal_list_remove_first(req_cont_data->cont_complete_list); - opal_atomic_unlock(&req_cont_data->cont_lock); + opal_atomic_lock(&cont_req->cont_lock); + cb = (ompi_continuation_t *) opal_list_remove_first(cont_req->cont_complete_list); + opal_atomic_unlock(&cont_req->cont_lock); } else { - cb = (ompi_continuation_t *) opal_list_remove_first(req_cont_data->cont_complete_list); + cb = (ompi_continuation_t *) opal_list_remove_first(cont_req->cont_complete_list); } if (NULL == cb) break; @@ -229,46 +273,53 @@ int ompi_continue_progress_request(ompi_request_t *cont_req) } -int ompi_continue_register_request_progress(ompi_request_t *cont_req) +/** + * Register the provided continuation request to be included in the + * global progress loop (used while a thread is waiting for the contnuation + * request to complete). + */ +int ompi_continue_register_request_progress(ompi_request_t *req) { - ompi_request_cont_data_t *req_cont_data; - req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; - if (NULL == req_cont_data->cont_complete_list) return OMPI_SUCCESS; + if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; const bool using_threads = opal_using_threads(); if (using_threads) { OPAL_THREAD_LOCK(&request_cont_lock); /* lock needed to sync with ompi_request_cont_enqueue_complete */ - opal_atomic_lock(&req_cont_data->cont_lock); + opal_atomic_lock(&cont_req->cont_lock); } /* signal that from now on all continuations should go into the global queue */ - req_cont_data->cont_global_progress = true; + cont_req->cont_global_progress = true; /* move all complete local continuations into the global queue */ opal_list_join(&continuation_list, opal_list_get_begin(&continuation_list), - req_cont_data->cont_complete_list); + cont_req->cont_complete_list); if (using_threads) { - opal_atomic_unlock(&req_cont_data->cont_lock); + opal_atomic_unlock(&cont_req->cont_lock); OPAL_THREAD_UNLOCK(&request_cont_lock); } return OMPI_SUCCESS; } -int ompi_continue_deregister_request_progress(ompi_request_t *cont_req) +/** + * Remove the continuation request from being progressed by the global progress + * loop (after a wait completes). + */ +int ompi_continue_deregister_request_progress(ompi_request_t *req) { - ompi_request_cont_data_t *req_cont_data; - req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; if (opal_using_threads()) { /* lock needed to sync with ompi_request_cont_enqueue_complete */ - opal_atomic_lock(&req_cont_data->cont_lock); - req_cont_data->cont_global_progress = false; - opal_atomic_unlock(&req_cont_data->cont_lock); + opal_atomic_lock(&cont_req->cont_lock); + cont_req->cont_global_progress = false; + opal_atomic_unlock(&cont_req->cont_lock); } else { - req_cont_data->cont_global_progress = false; + cont_req->cont_global_progress = false; } return OMPI_SUCCESS; @@ -321,22 +372,21 @@ int ompi_continuation_fini(void) static void ompi_continue_enqueue_runnable(ompi_continuation_t *cont) { - ompi_request_cont_data_t *req_cont_data; - req_cont_data = (ompi_request_cont_data_t *)cont->cont_req->req_complete_cb_data; + ompi_cont_request_t *cont_req = cont->cont_req; int retry; do { retry = 0; - if (NULL != req_cont_data->cont_complete_list - && !req_cont_data->cont_global_progress) { - opal_atomic_lock(&req_cont_data->cont_lock); - if (OPAL_UNLIKELY(req_cont_data->cont_global_progress)) { - opal_atomic_unlock(&req_cont_data->cont_lock); + if (NULL != cont_req->cont_complete_list + && !cont_req->cont_global_progress) { + opal_atomic_lock(&cont_req->cont_lock); + if (OPAL_UNLIKELY(cont_req->cont_global_progress)) { + opal_atomic_unlock(&cont_req->cont_lock); /* try again, this time target the global list */ retry = 1; continue; } - opal_list_append(req_cont_data->cont_complete_list, &cont->super.super); - opal_atomic_unlock(&req_cont_data->cont_lock); + opal_list_append(cont_req->cont_complete_list, &cont->super.super); + opal_atomic_unlock(&cont_req->cont_lock); } else { OPAL_THREAD_LOCK(&request_cont_lock); opal_list_append(&continuation_list, &cont->super.super); @@ -360,7 +410,7 @@ ompi_continue_enqueue_runnable(ompi_continuation_t *cont) static inline ompi_continuation_t *ompi_continue_cont_create( int count, - ompi_request_t *cont_req, + ompi_cont_request_t *cont_req, MPIX_Continue_cb_function *cont_cb, void *cont_data, MPI_Status *cont_status) @@ -373,25 +423,22 @@ ompi_continuation_t *ompi_continue_cont_create( cont->num_active = count; cont->cont_status = cont_status; - ompi_request_cont_data_t *req_cont_data; - req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; /* signal that the continuation request has a new continuation */ OBJ_RETAIN(cont_req); const bool using_threads = opal_using_threads(); if (using_threads) { - opal_atomic_lock(&req_cont_data->cont_lock); + opal_atomic_lock(&cont_req->cont_lock); } - int32_t num_active = req_cont_data->cont_num_active++; + int32_t num_active = cont_req->cont_num_active++; if (num_active == 0) { /* (re)activate the continuation request upon first registration */ - assert(REQUEST_COMPLETE(cont_req)); - req_cont_data->cont_obj = NULL; - cont_req->req_complete = REQUEST_PENDING; - cont_req->req_state = OMPI_REQUEST_ACTIVE; + assert(REQUEST_COMPLETE(&cont_req->super)); + cont_req->super.req_complete = REQUEST_PENDING; + cont_req->super.req_state = OMPI_REQUEST_ACTIVE; } if (using_threads) { - opal_atomic_unlock(&req_cont_data->cont_lock); + opal_atomic_unlock(&cont_req->cont_lock); } return cont; @@ -424,11 +471,12 @@ static int request_completion_cb(ompi_request_t *request) if (request->req_persistent) { if (OMPI_REQUEST_CONT == request->req_type && opal_using_threads()) { /* handle with care: another thread may register a new continuation already */ - opal_atomic_lock(&req_cont_data->cont_lock); - if (req_cont_data->cont_num_active == 0) { - request->req_state = OMPI_REQUEST_INACTIVE; + ompi_cont_request_t *cont_req = cont->cont_req; + opal_atomic_lock(&cont_req->cont_lock); + if (cont_req->cont_num_active == 0) { + cont_req->super.req_state = OMPI_REQUEST_INACTIVE; } - opal_atomic_unlock(&req_cont_data->cont_lock); + opal_atomic_unlock(&cont_req->cont_lock); } else { request->req_state = OMPI_REQUEST_INACTIVE; } @@ -438,25 +486,23 @@ static int request_completion_cb(ompi_request_t *request) rc = 1; } - /* continuation requests keep their request_cont_data */ - if (OMPI_REQUEST_CONT != request->req_type) { - request->req_complete_cb_data = NULL; - opal_free_list_return(&ompi_request_cont_data_freelist, &req_cont_data->super); - } + request->req_complete_cb_data = NULL; + opal_free_list_return(&ompi_request_cont_data_freelist, &req_cont_data->super); return rc; } int ompi_continue_attach( - ompi_request_t *cont_req, + ompi_request_t *continuation_request, const int count, ompi_request_t *requests[], MPIX_Continue_cb_function *cont_cb, void *cont_data, ompi_status_public_t statuses[]) { - assert(OMPI_REQUEST_CONT == cont_req->req_type); + assert(OMPI_REQUEST_CONT == continuation_request->req_type); + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)continuation_request; ompi_continuation_t *cont = ompi_continue_cont_create(count, cont_req, cont_cb, cont_data, statuses); @@ -507,9 +553,7 @@ int ompi_continue_attach( int32_t last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, -num_complete); if (0 == last_num_active && 0 < num_complete) { - ompi_request_cont_data_t *req_cont_data; - req_cont_data = (ompi_request_cont_data_t *)cont_req->req_complete_cb_data; - if (req_cont_data->cont_enqueue_complete) { + if (cont_req->cont_enqueue_complete) { /* enqueue for later processing */ ompi_continue_enqueue_runnable(cont); } else { @@ -526,49 +570,33 @@ int ompi_continue_attach( /** * Continuation request management */ -int ompi_continue_allocate_request(ompi_request_t **cont_req, ompi_info_t *info) +int ompi_continue_allocate_request(ompi_request_t **cont_req_ptr, ompi_info_t *info) { - ompi_request_t *res = OBJ_NEW(ompi_request_t); + ompi_cont_request_t *cont_req = OBJ_NEW(ompi_cont_request_t); if (OPAL_LIKELY(NULL != cont_req)) { - res->req_type = OMPI_REQUEST_CONT; - res->req_complete = REQUEST_COMPLETED; - res->req_state = OMPI_REQUEST_INACTIVE; - res->req_persistent = true; - res->req_free = &ompi_continue_request_free; - res->req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ - - /* Continuation requests have a request_cont_data object that persists throughout their lifetime */ - ompi_request_cont_data_t *req_cont_data; - req_cont_data = (ompi_request_cont_data_t *)opal_free_list_get(&ompi_request_cont_data_freelist); - res->req_complete_cb_data = req_cont_data; - opal_atomic_lock_init(&req_cont_data->cont_lock, 0); - int flag; bool test_poll = false; ompi_info_get_bool(info, "mpi_continue_poll_only", &test_poll, &flag); if (flag && test_poll) { - req_cont_data->cont_complete_list = OBJ_NEW(opal_list_t); - } else { - req_cont_data->cont_complete_list = NULL; + cont_req->cont_complete_list = OBJ_NEW(opal_list_t); } bool enqueue_complete = false; ompi_info_get_bool(info, "mpi_continue_enqueue_complete", &enqueue_complete, &flag); - req_cont_data->cont_enqueue_complete = (flag && enqueue_complete); + cont_req->cont_enqueue_complete = (flag && enqueue_complete); - req_cont_data->continue_max_poll = INT32_MAX; opal_cstring_t *value_str; ompi_info_get(info, "mpi_continue_max_poll", &value_str, &flag); if (flag) { int max_poll = atoi(value_str->string); OBJ_RELEASE(value_str); if (max_poll > 0) { - req_cont_data->continue_max_poll = max_poll; + cont_req->continue_max_poll = max_poll; } } - *cont_req = res; + *cont_req_ptr = &cont_req->super; return MPI_SUCCESS; } @@ -576,21 +604,11 @@ int ompi_continue_allocate_request(ompi_request_t **cont_req, ompi_info_t *info) return OMPI_ERR_OUT_OF_RESOURCE; } -static int ompi_continue_request_free(ompi_request_t** cont_req) +static int ompi_continue_request_free(ompi_request_t** cont_req_ptr) { - assert(OMPI_REQUEST_CONT == (*cont_req)->req_type); - assert(NULL != (*cont_req)->req_complete_cb_data); - ompi_request_cont_data_t *req_cont_data; - req_cont_data = (ompi_request_cont_data_t *)(*cont_req)->req_complete_cb_data; - OMPI_REQUEST_FINI(*cont_req); - (*cont_req)->req_state = OMPI_REQUEST_INVALID; - (*cont_req)->req_complete_cb_data = NULL; - opal_free_list_return(&ompi_request_cont_data_freelist, &req_cont_data->super); - if (NULL != req_cont_data->cont_complete_list) { - OBJ_RELEASE(req_cont_data->cont_complete_list); - req_cont_data->cont_complete_list = NULL; - } - OBJ_RELEASE(*cont_req); - *cont_req = &ompi_request_null.request; + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)*cont_req_ptr; + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); + OBJ_RELEASE(cont_req); + *cont_req_ptr = &ompi_request_null.request; return OMPI_SUCCESS; } From c9404daf9e1fe9ec72b97ea1a46367263024bc83 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 18 Oct 2021 18:37:48 -0400 Subject: [PATCH 03/20] Rename functions to avoid confusion Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index ae425ac60bd..8f759c65262 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -153,7 +153,7 @@ static opal_mutex_t request_cont_lock; static bool progress_callback_registered = false; static inline -void ompi_continue_cont_destroy(ompi_continuation_t *cont, ompi_request_t *cont_req) +void ompi_continue_cont_release(ompi_continuation_t *cont) { ompi_cont_request_t *cont_req = cont->cont_req; assert(OMPI_REQUEST_CONT == cont_req->super.req_type); @@ -197,7 +197,7 @@ void ompi_continue_cont_invoke(ompi_continuation_t *cont) void *cont_data = cont->cont_data; MPI_Status *statuses = cont->cont_status; fn(statuses, cont_data); - ompi_continue_cont_destroy(cont, cont_req); + ompi_continue_cont_release(cont); } /** @@ -207,7 +207,7 @@ void ompi_continue_cont_invoke(ompi_continuation_t *cont) static opal_thread_local int in_progress = 0; static -int ompi_continue_progress_some(const uint32_t max) +int ompi_continue_progress_n(const uint32_t max) { if (in_progress || opal_list_is_empty(&continuation_list)) return 0; @@ -231,7 +231,7 @@ int ompi_continue_progress_some(const uint32_t max) static int ompi_continue_progress_callback() { - return ompi_continue_progress_some(1); + return ompi_continue_progress_n(1); } int ompi_continue_progress_request(ompi_request_t *req) @@ -240,7 +240,7 @@ int ompi_continue_progress_request(ompi_request_t *req) ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; if (NULL == cont_req->cont_complete_list) { /* progress as many as possible */ - return ompi_continue_progress_some(req_cont_data->continue_max_poll); + return ompi_continue_progress_n(cont_req->continue_max_poll); } if (opal_list_is_empty(cont_req->cont_complete_list)) { return 0; From 18f24e042ec1d94f12ccb3d4c647023d3a07cfa6 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 18 Oct 2021 19:08:50 -0400 Subject: [PATCH 04/20] SQUASHME: Replace opal_show_help instead of fprintf Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 3 ++- ompi/mpiext/continue/help-mpi-continue.txt | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 ompi/mpiext/continue/help-mpi-continue.txt diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 8f759c65262..3023530647e 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -355,7 +355,8 @@ int ompi_continuation_fini(void) } if (!opal_list_is_empty(&continuation_list)) { - fprintf(stderr, "WARN: Incomplete continuations found in during shutdown, go fix your application!\n"); + opal_show_help("help-mpi-continue.txt", "continue:incomplete_shutdown", + (int)opal_list_get_size(&continuation_list)); } OBJ_DESTRUCT(&continuation_list); diff --git a/ompi/mpiext/continue/help-mpi-continue.txt b/ompi/mpiext/continue/help-mpi-continue.txt new file mode 100644 index 00000000000..7c7d075a2da --- /dev/null +++ b/ompi/mpiext/continue/help-mpi-continue.txt @@ -0,0 +1,16 @@ +# -*- text -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# +# This is the US/English general help file for the OMPI Continuations extension. +# +[continue:incomplete_shutdown] +WARNING: Found %d incomplete continuations found in during shutdown! +# From f2b4e0d199318d3dcf3b3ec6f47c2e899d083faa Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 18 Oct 2021 19:09:29 -0400 Subject: [PATCH 05/20] Use OMPI_COPY_STATUS to set status Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 3023530647e..6c86b1c48b7 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -17,7 +17,9 @@ #include "opal/class/opal_fifo.h" #include "opal/class/opal_free_list.h" #include "opal/sys/atomic.h" +#include "opal/util/show_help.h" #include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/request/request.h" static opal_free_list_t ompi_continuation_freelist; @@ -457,7 +459,7 @@ static int request_completion_cb(ompi_request_t *request) /* set the status object */ if (NULL != req_cont_data->cont_status) { - *req_cont_data->cont_status = request->req_status; + OMPI_COPY_STATUS(req_cont_data->cont_status, request->req_status, true); req_cont_data->cont_status = NULL; } @@ -507,16 +509,23 @@ int ompi_continue_attach( ompi_continuation_t *cont = ompi_continue_cont_create(count, cont_req, cont_cb, cont_data, statuses); + /* memory barrier to make sure a thread completing a request see + * a correct continuation object */ opal_atomic_wmb(); int32_t num_registered = 0; for (int i = 0; i < count; ++i) { ompi_request_t *request = requests[i]; - if (MPI_REQUEST_NULL != request) { + if (MPI_REQUEST_NULL == request) { + /* set the status for null-request */ + if (statuses != MPI_STATUSES_IGNORE) { + OMPI_COPY_STATUS(&statuses[i], ompi_status_empty, true); + } + } else { if (&ompi_request_empty == request) { /* empty request: do not modify, just copy out the status */ if (statuses != MPI_STATUSES_IGNORE) { - statuses[i] = request->req_status; + OMPI_COPY_STATUS(&statuses[i], request->req_status, true); } requests[i] = MPI_REQUEST_NULL; } else { From 5d0c7bbecec25e0941fdb0186fb2c1faa2d177a2 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 18 Oct 2021 19:09:56 -0400 Subject: [PATCH 06/20] Add documentation for ompi_continue_register_request_progress Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 6c86b1c48b7..0f36cf02532 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -241,7 +241,7 @@ int ompi_continue_progress_request(ompi_request_t *req) if (in_progress) return 0; ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; if (NULL == cont_req->cont_complete_list) { - /* progress as many as possible */ + /* progress as many as allowed */ return ompi_continue_progress_n(cont_req->continue_max_poll); } if (opal_list_is_empty(cont_req->cont_complete_list)) { @@ -279,6 +279,11 @@ int ompi_continue_progress_request(ompi_request_t *req) * Register the provided continuation request to be included in the * global progress loop (used while a thread is waiting for the contnuation * request to complete). + * We move all local continuations into the global continuation list + * and mark the continuation request such that future continuations + * are directly put into the global continuations list. + * Once the wait completed (i.e., all continuations registered with the + * continuation request) we unmark it (see ompi_continue_deregister_request_progress). */ int ompi_continue_register_request_progress(ompi_request_t *req) { From d80d3caaea92b08cecc731e75a8d10944abce9e4 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Wed, 20 Oct 2021 21:49:44 -0400 Subject: [PATCH 07/20] Remove re-iteration of continuation requests in test_any --- ompi/request/req_test.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/ompi/request/req_test.c b/ompi/request/req_test.c index edf44eabdcc..8b29c038c88 100644 --- a/ompi/request/req_test.c +++ b/ompi/request/req_test.c @@ -175,9 +175,6 @@ int ompi_request_default_test_any( /* continuations may elect to not participate in global progress * so progress them separately. */ ompi_continue_progress_request(request); - /* requery the request */ - --i; - --rptr; } #endif // OMPI_HAVE_MPI_EXT_CONTINUE } From 5f89757478541d357ebd0ad4e9992484fc1d7430 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Wed, 20 Oct 2021 21:51:19 -0400 Subject: [PATCH 08/20] Enable the continuations extension only if explicitly requested Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/configure.m4 | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/ompi/mpiext/continue/configure.m4 b/ompi/mpiext/continue/configure.m4 index be233124245..72299b5e3ca 100644 --- a/ompi/mpiext/continue/configure.m4 +++ b/ompi/mpiext/continue/configure.m4 @@ -17,15 +17,13 @@ AC_DEFUN([OMPI_MPIEXT_continue_CONFIG],[ AC_CONFIG_FILES([ompi/mpiext/continue/c/Makefile]) AC_CONFIG_FILES([ompi/mpiext/continue/c/profile/Makefile]) - # This example can always build, so we just execute $1 if it was - # requested. - AS_IF([test "$ENABLE_continue" = "1" || \ - test "$ENABLE_EXT_ALL" = "1"], + # This module is not stable yet so it should only be built + # if explicitly requested + AS_IF([test "$ENABLE_continue" = "1"], [$1], [$2]) - AS_IF([test "$ENABLE_continue" = "1" || \ - test "$ENABLE_EXT_ALL" = "1"], + AS_IF([test "$ENABLE_continue" = "1"], [AC_DEFINE_UNQUOTED([OMPI_HAVE_MPI_EXT_CONTINUE], [1], [Whether MPI Continuations are enabled])], []) From 1a68acb66bbabb79429c4745ad9940626b56952f Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 25 Oct 2021 14:59:50 -0400 Subject: [PATCH 09/20] Remove lock out of critical path of creating/completing continuations Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 64 ++++++++++++--------------- 1 file changed, 28 insertions(+), 36 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 0f36cf02532..26041cfc955 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -92,7 +92,7 @@ static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) OMPI_REQUEST_INIT(&cont_req->super, true); cont_req->super.req_type = OMPI_REQUEST_CONT; cont_req->super.req_complete = REQUEST_COMPLETED; - cont_req->super.req_state = OMPI_REQUEST_INACTIVE; + cont_req->super.req_state = OMPI_REQUEST_ACTIVE; cont_req->super.req_persistent = true; cont_req->super.req_free = &ompi_continue_request_free; cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ @@ -160,20 +160,22 @@ void ompi_continue_cont_release(ompi_continuation_t *cont) ompi_cont_request_t *cont_req = cont->cont_req; assert(OMPI_REQUEST_CONT == cont_req->super.req_type); - const bool using_threads = opal_using_threads(); - if (using_threads) { - opal_atomic_lock(&cont_req->cont_lock); - } - int num_active = --cont_req->cont_num_active; + int num_active = opal_atomic_add_fetch_32(&cont_req->cont_num_active, -1); assert(num_active >= 0); if (0 == num_active) { - assert(!REQUEST_COMPLETE(&cont_req->super)); - opal_atomic_wmb(); - /* signal that all continuations were found complete */ - ompi_request_complete(&cont_req->super, true); - } - if (using_threads) { - opal_atomic_unlock(&cont_req->cont_lock); + const bool using_threads = opal_using_threads(); + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + /* double check that no other thread has completed or restarted the request already */ + if (0 == cont_req->cont_num_active && !REQUEST_COMPLETE(&cont_req->super)) { + opal_atomic_wmb(); + /* signal that all continuations were found complete */ + ompi_request_complete(&cont_req->super, true); + } + if (using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } } OBJ_RELEASE(cont_req); @@ -434,19 +436,19 @@ ompi_continuation_t *ompi_continue_cont_create( /* signal that the continuation request has a new continuation */ OBJ_RETAIN(cont_req); - const bool using_threads = opal_using_threads(); - if (using_threads) { - opal_atomic_lock(&cont_req->cont_lock); - } - int32_t num_active = cont_req->cont_num_active++; + int32_t num_active = opal_atomic_add_fetch_32(&cont_req->cont_num_active, 1); if (num_active == 0) { - /* (re)activate the continuation request upon first registration */ - assert(REQUEST_COMPLETE(&cont_req->super)); - cont_req->super.req_complete = REQUEST_PENDING; - cont_req->super.req_state = OMPI_REQUEST_ACTIVE; - } - if (using_threads) { - opal_atomic_unlock(&cont_req->cont_lock); + const bool using_threads = opal_using_threads(); + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + if (0 != cont_req->cont_num_active && REQUEST_COMPLETE(&cont_req->super)) { + /* (re)activate the continuation request upon first registration */ + cont_req->super.req_complete = REQUEST_PENDING; + } + if (using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } } return cont; @@ -477,15 +479,7 @@ static int request_completion_cb(ompi_request_t *request) /* inactivate / free the request */ if (request->req_persistent) { - if (OMPI_REQUEST_CONT == request->req_type && opal_using_threads()) { - /* handle with care: another thread may register a new continuation already */ - ompi_cont_request_t *cont_req = cont->cont_req; - opal_atomic_lock(&cont_req->cont_lock); - if (cont_req->cont_num_active == 0) { - cont_req->super.req_state = OMPI_REQUEST_INACTIVE; - } - opal_atomic_unlock(&cont_req->cont_lock); - } else { + if (OMPI_REQUEST_CONT != request->req_type) { request->req_state = OMPI_REQUEST_INACTIVE; } } else { @@ -549,8 +543,6 @@ int ompi_continue_attach( req_cont_data->cont_obj = cont; - assert(request->req_state == OMPI_REQUEST_ACTIVE || request->req_state == OMPI_REQUEST_INACTIVE); - ompi_request_set_callback(request, &request_completion_cb, req_cont_data); ++num_registered; From 14a857421d640601ec583b649d3730d33ebaeb76 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 8 Nov 2021 18:12:28 -0500 Subject: [PATCH 10/20] Fix printing of tear-down warning Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/Makefile.am | 4 +++- ompi/mpiext/continue/c/continuation.c | 4 ++-- ompi/mpiext/continue/{ => c}/help-mpi-continue.txt | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) rename ompi/mpiext/continue/{ => c}/help-mpi-continue.txt (84%) diff --git a/ompi/mpiext/continue/c/Makefile.am b/ompi/mpiext/continue/c/Makefile.am index 017ba9616a6..9e5b0cff253 100644 --- a/ompi/mpiext/continue/c/Makefile.am +++ b/ompi/mpiext/continue/c/Makefile.am @@ -33,7 +33,9 @@ libmpiext_continue_c_la_SOURCES = \ continue_init.c \ mpiext_continue_module.c -libmpiext_continue_c_la_LDFLAGS = -module -avoid-version +#libmpiext_continue_c_la_LDFLAGS = -module -avoid-version + +dist_ompidata_DATA = help-mpi-continue.txt ompi_HEADERS = $(headers) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 26041cfc955..408c909de59 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -364,8 +364,8 @@ int ompi_continuation_fini(void) } if (!opal_list_is_empty(&continuation_list)) { - opal_show_help("help-mpi-continue.txt", "continue:incomplete_shutdown", - (int)opal_list_get_size(&continuation_list)); + opal_show_help("help-mpi-continue.txt", "continue:incomplete_shutdown", 1, + opal_list_get_size(&continuation_list)); } OBJ_DESTRUCT(&continuation_list); diff --git a/ompi/mpiext/continue/help-mpi-continue.txt b/ompi/mpiext/continue/c/help-mpi-continue.txt similarity index 84% rename from ompi/mpiext/continue/help-mpi-continue.txt rename to ompi/mpiext/continue/c/help-mpi-continue.txt index 7c7d075a2da..f7cf4598b4b 100644 --- a/ompi/mpiext/continue/help-mpi-continue.txt +++ b/ompi/mpiext/continue/c/help-mpi-continue.txt @@ -12,5 +12,5 @@ # This is the US/English general help file for the OMPI Continuations extension. # [continue:incomplete_shutdown] -WARNING: Found %d incomplete continuations found in during shutdown! +WARNING: Found %zu incomplete continuations during shutdown! # From 7ede7ce8250f6349d132bd0320458f353f973f4d Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 8 Nov 2021 18:13:29 -0500 Subject: [PATCH 11/20] Fix logic error when creating a new continuation Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 408c909de59..c5c2cf94c71 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -436,7 +436,7 @@ ompi_continuation_t *ompi_continue_cont_create( /* signal that the continuation request has a new continuation */ OBJ_RETAIN(cont_req); - int32_t num_active = opal_atomic_add_fetch_32(&cont_req->cont_num_active, 1); + int32_t num_active = opal_atomic_fetch_add_32(&cont_req->cont_num_active, 1); if (num_active == 0) { const bool using_threads = opal_using_threads(); if (using_threads) { From 32fa3cc21b793c244141412b74745bedc2edbe2e Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 8 Nov 2021 18:14:23 -0500 Subject: [PATCH 12/20] Use new OMPI_MPIEXT_continue_POST_CONFIG hook Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/configure.m4 | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ompi/mpiext/continue/configure.m4 b/ompi/mpiext/continue/configure.m4 index 72299b5e3ca..1907dfa8767 100644 --- a/ompi/mpiext/continue/configure.m4 +++ b/ompi/mpiext/continue/configure.m4 @@ -22,12 +22,14 @@ AC_DEFUN([OMPI_MPIEXT_continue_CONFIG],[ AS_IF([test "$ENABLE_continue" = "1"], [$1], [$2]) +])dnl + +# we need init/finalize +AC_DEFUN([OMPI_MPIEXT_continue_NEED_INIT], [1]) +AC_DEFUN([OMPI_MPIEXT_continue_POST_CONFIG], [ AS_IF([test "$ENABLE_continue" = "1"], [AC_DEFINE_UNQUOTED([OMPI_HAVE_MPI_EXT_CONTINUE], [1], [Whether MPI Continuations are enabled])], []) -])dnl - -# we need init/finalize -AC_DEFUN([OMPI_MPIEXT_continue_NEED_INIT], [1]) +]) From d0308eedaf3e48c193f14a55f392f9024846134e Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 8 Nov 2021 18:16:23 -0500 Subject: [PATCH 13/20] Request test/wait: prevent continuation requests from being set to inactive Continuation requests are persistent requests that are always active. They are implicitly restarted when the first continuation is registered. Signed-off-by: Joseph Schuchart --- ompi/request/req_test.c | 38 +++++++++++++++++++++++++++++ ompi/request/req_wait.c | 53 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 87 insertions(+), 4 deletions(-) diff --git a/ompi/request/req_test.c b/ompi/request/req_test.c index 8b29c038c88..b87e22c661b 100644 --- a/ompi/request/req_test.c +++ b/ompi/request/req_test.c @@ -61,6 +61,14 @@ int ompi_request_default_test(ompi_request_t ** rptr, if (MPI_STATUS_IGNORE != status) { OMPI_COPY_STATUS(status, request->req_status, false); } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + return request->req_status.MPI_ERROR; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; return request->req_status.MPI_ERROR; @@ -146,6 +154,13 @@ int ompi_request_default_test_any( OMPI_COPY_STATUS(status, request->req_status, false); } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + return OMPI_SUCCESS; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; return OMPI_SUCCESS; @@ -279,6 +294,14 @@ int ompi_request_default_test_all( ompi_grequest_invoke_query(request, &request->req_status); } OMPI_COPY_STATUS(&statuses[i], request->req_status, true); + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; continue; @@ -314,6 +337,14 @@ int ompi_request_default_test_all( if (OMPI_REQUEST_GEN == request->req_type) { ompi_grequest_invoke_query(request, &request->req_status); } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; continue; @@ -436,6 +467,13 @@ int ompi_request_default_test_some( #endif /* OPAL_ENABLE_FT_MPI */ } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; } else { diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index 056d140a2ea..a331c6be012 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -77,6 +77,14 @@ int ompi_request_default_wait( if( MPI_STATUS_IGNORE != status ) { OMPI_COPY_STATUS(status, req->req_status, false); } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* continuation requests are alwys active, don't modify the state */ + return req->req_status.MPI_ERROR; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( req->req_persistent ) { if( req->req_state == OMPI_REQUEST_INACTIVE ) { if (MPI_STATUS_IGNORE != status) { @@ -84,6 +92,14 @@ int ompi_request_default_wait( } return OMPI_SUCCESS; } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* continuation requests are alwys active, don't modify the state */ + return req->req_status.MPI_ERROR; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + req->req_state = OMPI_REQUEST_INACTIVE; return req->req_status.MPI_ERROR; } @@ -220,7 +236,7 @@ int ompi_request_default_wait_any(size_t count, if( *index == (int)completed ) { /* Only one request has triggered. There was no in-flight * completions. Drop the signalled flag so we won't block - * in WAIT_SYNC_RELEASE + * in WAIT_SYNC_RELEASE */ WAIT_SYNC_SIGNALLED(&sync); } @@ -244,7 +260,14 @@ int ompi_request_default_wait_any(size_t count, } rc = request->req_status.MPI_ERROR; if( request->req_persistent ) { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT != request->req_type) { + request->req_state = OMPI_REQUEST_INACTIVE; + } +#else // OMPI_HAVE_MPI_EXT_CONTINUE request->req_state = OMPI_REQUEST_INACTIVE; +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + } else if (MPI_SUCCESS == rc) { /* Only free the request if there is no error on it */ /* If there's an error while freeing the request, @@ -400,6 +423,14 @@ int ompi_request_default_wait_all( size_t count, OMPI_COPY_STATUS(&statuses[i], request->req_status, true); + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; continue; @@ -471,6 +502,13 @@ int ompi_request_default_wait_all( size_t count, rc = request->req_status.MPI_ERROR; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; } else if (MPI_SUCCESS == rc) { @@ -602,14 +640,14 @@ int ompi_request_default_wait_some(size_t count, * a) request was found completed in the first loop * => ( indices[i] == 0 ) * b) request was completed between first loop and this check - * => ( indices[i] == 1 ) and we can NOT atomically mark the + * => ( indices[i] == 1 ) and we can NOT atomically mark the * request as pending. * c) request wasn't finished yet - * => ( indices[i] == 1 ) and we CAN atomically mark the + * => ( indices[i] == 1 ) and we CAN atomically mark the * request as pending. * NOTE that in any case (i >= num_requests_done) as latter grows * either slowly (in case of partial completion) - * OR in parallel with `i` (in case of full set completion) + * OR in parallel with `i` (in case of full set completion) */ if( !indices[num_active_reqs] ) { indices[num_requests_done++] = i; @@ -679,6 +717,13 @@ int ompi_request_default_wait_some(size_t count, rc = MPI_ERR_IN_STATUS; } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuation requests are alwys active, don't modify the state */ + continue; + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; } else { From aa0a59d5b052727a0a91d02fe71b3809720193c7 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 8 Nov 2021 18:57:27 -0500 Subject: [PATCH 14/20] Don't enqueue continuations twice for execution if everything is complete at registration Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index c5c2cf94c71..a6ffa876fbd 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -556,10 +556,11 @@ int ompi_continue_attach( } } + assert(count >= num_registered); int num_complete = count - num_registered; int32_t last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, -num_complete); - if (0 == last_num_active && 0 < num_complete) { + if (0 == last_num_active && 0 == num_registered) { if (cont_req->cont_enqueue_complete) { /* enqueue for later processing */ ompi_continue_enqueue_runnable(cont); From fa149c2a66ecde974d5d53e643a97d119fa5f90a Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Mon, 8 Nov 2021 23:40:40 -0500 Subject: [PATCH 15/20] Poll-only continuations should only be executed by the thread testing/waiting on them Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 144 +++++++++++++------------- 1 file changed, 73 insertions(+), 71 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index a6ffa876fbd..6fc9656663a 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -80,8 +80,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_cont_request_t); struct ompi_cont_request_t { ompi_request_t super; opal_atomic_lock_t cont_lock; /**< Lock used completing/restarting the cont request */ - bool cont_global_progress; bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */ + bool cont_in_wait; /**< Whether the continuation request is currently waited on */ opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */ uint32_t continue_max_poll; /**< max number of local continuations to execute at once */ opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */ @@ -98,7 +98,7 @@ static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ opal_atomic_lock_init(&cont_req->cont_lock, false); cont_req->cont_enqueue_complete = false; - cont_req->cont_global_progress = false; + cont_req->cont_in_wait = false; cont_req->cont_num_active = 0; cont_req->continue_max_poll = UINT32_MAX; cont_req->cont_complete_list = NULL; @@ -142,11 +142,13 @@ OBJ_CLASS_INSTANCE( NULL, NULL); /** - * List of completed requests that need the user-defined completion callback - * invoked. + * List of continuations eligible for execution */ static opal_list_t continuation_list; +/** + * Mutex to protect the continuation_list + */ static opal_mutex_t request_cont_lock; /** @@ -155,16 +157,15 @@ static opal_mutex_t request_cont_lock; static bool progress_callback_registered = false; static inline -void ompi_continue_cont_release(ompi_continuation_t *cont) +void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req, + int32_t num_release, + bool take_lock) { - ompi_cont_request_t *cont_req = cont->cont_req; - assert(OMPI_REQUEST_CONT == cont_req->super.req_type); - - int num_active = opal_atomic_add_fetch_32(&cont_req->cont_num_active, -1); + int num_active = opal_atomic_add_fetch_32(&cont_req->cont_num_active, -num_release); assert(num_active >= 0); if (0 == num_active) { const bool using_threads = opal_using_threads(); - if (using_threads) { + if (take_lock && using_threads) { opal_atomic_lock(&cont_req->cont_lock); } /* double check that no other thread has completed or restarted the request already */ @@ -173,10 +174,24 @@ void ompi_continue_cont_release(ompi_continuation_t *cont) /* signal that all continuations were found complete */ ompi_request_complete(&cont_req->super, true); } - if (using_threads) { + if (take_lock && using_threads) { opal_atomic_unlock(&cont_req->cont_lock); } } +} + +static inline +void ompi_continue_cont_release(ompi_continuation_t *cont) +{ + ompi_cont_request_t *cont_req = cont->cont_req; + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); + + /* if a thread is waiting on the request, we got here when + * the thread started executing the continuations, so the continuation + * request is complete already */ + if (!cont_req->cont_in_wait) { + ompi_continue_cont_req_release(cont_req, 1, true); + } OBJ_RELEASE(cont_req); #ifdef OPAL_ENABLE_DEBUG @@ -214,19 +229,22 @@ static int ompi_continue_progress_n(const uint32_t max) { - if (in_progress || opal_list_is_empty(&continuation_list)) return 0; + if (in_progress) return 0; uint32_t completed = 0; in_progress = 1; - do { - ompi_continuation_t *cb; - OPAL_THREAD_LOCK(&request_cont_lock); - cb = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); - OPAL_THREAD_UNLOCK(&request_cont_lock); - if (NULL == cb) break; - ompi_continue_cont_invoke(cb); - } while (max > ++completed); + if (!opal_list_is_empty(&continuation_list)) { + /* global progress */ + do { + ompi_continuation_t *cb; + OPAL_THREAD_LOCK(&request_cont_lock); + cb = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); + OPAL_THREAD_UNLOCK(&request_cont_lock); + if (NULL == cb) break; + ompi_continue_cont_invoke(cb); + } while (max > ++completed); + } in_progress = 0; @@ -293,24 +311,13 @@ int ompi_continue_register_request_progress(ompi_request_t *req) if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; - const bool using_threads = opal_using_threads(); - if (using_threads) { - OPAL_THREAD_LOCK(&request_cont_lock); - /* lock needed to sync with ompi_request_cont_enqueue_complete */ - opal_atomic_lock(&cont_req->cont_lock); - } + opal_atomic_lock(&cont_req->cont_lock); - /* signal that from now on all continuations should go into the global queue */ - cont_req->cont_global_progress = true; + cont_req->cont_in_wait = true; - /* move all complete local continuations into the global queue */ - opal_list_join(&continuation_list, opal_list_get_begin(&continuation_list), - cont_req->cont_complete_list); + ompi_continue_cont_req_release(cont_req, opal_list_get_size(cont_req->cont_complete_list), false); - if (using_threads) { - opal_atomic_unlock(&cont_req->cont_lock); - OPAL_THREAD_UNLOCK(&request_cont_lock); - } + opal_atomic_unlock(&cont_req->cont_lock); return OMPI_SUCCESS; } @@ -322,15 +329,14 @@ int ompi_continue_register_request_progress(ompi_request_t *req) int ompi_continue_deregister_request_progress(ompi_request_t *req) { ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; - if (opal_using_threads()) { - /* lock needed to sync with ompi_request_cont_enqueue_complete */ - opal_atomic_lock(&cont_req->cont_lock); - cont_req->cont_global_progress = false; - opal_atomic_unlock(&cont_req->cont_lock); - } else { - cont_req->cont_global_progress = false; - } + /* make sure we execute all outstanding continuations */ + uint32_t tmp_max_poll = cont_req->continue_max_poll; + cont_req->continue_max_poll = UINT32_MAX; + ompi_continue_progress_request(req); + cont_req->continue_max_poll = tmp_max_poll; + + cont_req->cont_in_wait = false; return OMPI_SUCCESS; } @@ -383,35 +389,31 @@ static void ompi_continue_enqueue_runnable(ompi_continuation_t *cont) { ompi_cont_request_t *cont_req = cont->cont_req; - int retry; - do { - retry = 0; - if (NULL != cont_req->cont_complete_list - && !cont_req->cont_global_progress) { - opal_atomic_lock(&cont_req->cont_lock); - if (OPAL_UNLIKELY(cont_req->cont_global_progress)) { - opal_atomic_unlock(&cont_req->cont_lock); - /* try again, this time target the global list */ - retry = 1; - continue; - } - opal_list_append(cont_req->cont_complete_list, &cont->super.super); - opal_atomic_unlock(&cont_req->cont_lock); - } else { - OPAL_THREAD_LOCK(&request_cont_lock); - opal_list_append(&continuation_list, &cont->super.super); - if (OPAL_UNLIKELY(!progress_callback_registered)) { - /* TODO: Ideally, we want to ensure that the callback is called *after* - * all the other progress callbacks are done so that any - * completions have happened before we attempt to execute - * callbacks. There doesn't seem to exist the infrastructure though. - */ - opal_progress_register(&ompi_continue_progress_callback); - progress_callback_registered = true; - } - OPAL_THREAD_UNLOCK(&request_cont_lock); + if (NULL != cont_req->cont_complete_list) { + opal_atomic_lock(&cont_req->cont_lock); + opal_list_append(cont_req->cont_complete_list, &cont->super.super); + if (cont_req->cont_in_wait) { + /* if a thread is waiting for this request to complete, signal completions + * the continuations will be executed at the end of the wait + * but we need to ensure that the request is marked complete first + */ + ompi_continue_cont_req_release(cont_req, 1, false); } - } while (retry); + opal_atomic_unlock(&cont_req->cont_lock); + } else { + OPAL_THREAD_LOCK(&request_cont_lock); + opal_list_append(&continuation_list, &cont->super.super); + if (OPAL_UNLIKELY(!progress_callback_registered)) { + /* TODO: Ideally, we want to ensure that the callback is called *after* + * all the other progress callbacks are done so that any + * completions have happened before we attempt to execute + * callbacks. There doesn't seem to exist the infrastructure though. + */ + opal_progress_register(&ompi_continue_progress_callback); + progress_callback_registered = true; + } + OPAL_THREAD_UNLOCK(&request_cont_lock); + } } /** From aa9677dacc5d5619dc034a8764b7b72684532428 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 9 Nov 2021 10:11:28 -0500 Subject: [PATCH 16/20] Allow the waiting thread to execute continuations if not blocked in the sync Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 57 ++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 6fc9656663a..7597e7e7005 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -156,6 +156,11 @@ static opal_mutex_t request_cont_lock; */ static bool progress_callback_registered = false; +/** + * Thread-local list of continuation requests that should be progressed. + */ +static opal_thread_local opal_list_t *thread_progress_list = NULL; + static inline void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req, int32_t num_release, @@ -234,16 +239,44 @@ int ompi_continue_progress_n(const uint32_t max) uint32_t completed = 0; in_progress = 1; + const bool using_threads = opal_using_threads(); + if (NULL != thread_progress_list) { + ompi_cont_request_t *cont_req; + OPAL_LIST_FOREACH(cont_req, thread_progress_list, ompi_cont_request_t) { + ompi_continuation_t *cb; + if (opal_list_is_empty(cont_req->cont_complete_list)) continue; + while (max > completed) { + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + cb = (ompi_continuation_t *) opal_list_remove_first(cont_req->cont_complete_list); + opal_atomic_unlock(&cont_req->cont_lock); + } else { + cb = (ompi_continuation_t *) opal_list_remove_first(cont_req->cont_complete_list); + } + if (NULL == cb) break; + + ompi_continue_cont_invoke(cb); + ++completed; + } + if (max <= completed) break; + } + } + if (!opal_list_is_empty(&continuation_list)) { /* global progress */ - do { + while (max > completed) { ompi_continuation_t *cb; - OPAL_THREAD_LOCK(&request_cont_lock); - cb = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); - OPAL_THREAD_UNLOCK(&request_cont_lock); + if (using_threads) { + opal_mutex_lock(&request_cont_lock); + cb = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); + opal_mutex_unlock(&request_cont_lock); + } else { + cb = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); + } if (NULL == cb) break; ompi_continue_cont_invoke(cb); - } while (max > ++completed); + ++completed; + } } in_progress = 0; @@ -319,6 +352,13 @@ int ompi_continue_register_request_progress(ompi_request_t *req) opal_atomic_unlock(&cont_req->cont_lock); + if (NULL == thread_progress_list) { + thread_progress_list = OBJ_NEW(opal_list_t); + } + + /* enqueue the continuation request to allow for progress by this thread */ + opal_list_append(thread_progress_list, &req->super.super); + return OMPI_SUCCESS; } @@ -330,6 +370,8 @@ int ompi_continue_deregister_request_progress(ompi_request_t *req) { ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; + /* make sure we execute all outstanding continuations */ uint32_t tmp_max_poll = cont_req->continue_max_poll; cont_req->continue_max_poll = UINT32_MAX; @@ -337,6 +379,11 @@ int ompi_continue_deregister_request_progress(ompi_request_t *req) cont_req->continue_max_poll = tmp_max_poll; cont_req->cont_in_wait = false; + + + /* remove the continuation request from the thread-local progress list */ + opal_list_remove_item(thread_progress_list, &req->super.super); + return OMPI_SUCCESS; } From e0147172946f120fed583e8b33b1b4a9a688c1f5 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 21 Jan 2022 17:46:29 -0500 Subject: [PATCH 17/20] Make sure threads waiting on continuation request execute continuations while waiting No other threads should execute them if the continuation request is limited to polling only. Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 103 +++++++++++++------------- ompi/mpiext/continue/c/continuation.h | 14 ++-- ompi/request/req_wait.c | 33 +++------ ompi/request/request.h | 30 ++++++++ opal/mca/threads/base/wait_sync.c | 10 ++- opal/mca/threads/wait_sync.h | 4 + 6 files changed, 110 insertions(+), 84 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 7597e7e7005..4f96ab03908 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -81,10 +81,10 @@ struct ompi_cont_request_t { ompi_request_t super; opal_atomic_lock_t cont_lock; /**< Lock used completing/restarting the cont request */ bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */ - bool cont_in_wait; /**< Whether the continuation request is currently waited on */ opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */ uint32_t continue_max_poll; /**< max number of local continuations to execute at once */ opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */ + ompi_wait_sync_t *sync; /**< Sync object this continuation request is attached to */ }; static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) @@ -98,10 +98,10 @@ static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ opal_atomic_lock_init(&cont_req->cont_lock, false); cont_req->cont_enqueue_complete = false; - cont_req->cont_in_wait = false; cont_req->cont_num_active = 0; cont_req->continue_max_poll = UINT32_MAX; cont_req->cont_complete_list = NULL; + cont_req->sync = NULL; } static void ompi_cont_request_destruct(ompi_cont_request_t* cont_req) @@ -156,10 +156,13 @@ static opal_mutex_t request_cont_lock; */ static bool progress_callback_registered = false; -/** - * Thread-local list of continuation requests that should be progressed. - */ -static opal_thread_local opal_list_t *thread_progress_list = NULL; +struct lazy_list_s { + opal_list_t list; + bool is_initialized; +}; +typedef struct lazy_list_s lazy_list_t; + +static opal_thread_local lazy_list_t thread_progress_list = { .is_initialized = false }; static inline void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req, @@ -179,6 +182,10 @@ void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req, /* signal that all continuations were found complete */ ompi_request_complete(&cont_req->super, true); } + if (NULL != cont_req->sync) { + /* release the sync object */ + OPAL_THREAD_ADD_FETCH32(&cont_req->sync->num_req_need_progress, -1); + } if (take_lock && using_threads) { opal_atomic_unlock(&cont_req->cont_lock); } @@ -191,12 +198,7 @@ void ompi_continue_cont_release(ompi_continuation_t *cont) ompi_cont_request_t *cont_req = cont->cont_req; assert(OMPI_REQUEST_CONT == cont_req->super.req_type); - /* if a thread is waiting on the request, we got here when - * the thread started executing the continuations, so the continuation - * request is complete already */ - if (!cont_req->cont_in_wait) { - ompi_continue_cont_req_release(cont_req, 1, true); - } + ompi_continue_cont_req_release(cont_req, 1, true); OBJ_RELEASE(cont_req); #ifdef OPAL_ENABLE_DEBUG @@ -240,9 +242,13 @@ int ompi_continue_progress_n(const uint32_t max) in_progress = 1; const bool using_threads = opal_using_threads(); - if (NULL != thread_progress_list) { + + /* execute thread-local continuations first + * (e.g., from continuation requests the current thread is waiting on) */ + lazy_list_t *tl_list = &thread_progress_list; + if (tl_list->is_initialized) { ompi_cont_request_t *cont_req; - OPAL_LIST_FOREACH(cont_req, thread_progress_list, ompi_cont_request_t) { + OPAL_LIST_FOREACH(cont_req, &tl_list->list, ompi_cont_request_t) { ompi_continuation_t *cb; if (opal_list_is_empty(cont_req->cont_complete_list)) continue; while (max > completed) { @@ -289,6 +295,12 @@ static int ompi_continue_progress_callback() return ompi_continue_progress_n(1); } +static int ompi_continue_wait_progress_callback() +{ + return ompi_continue_progress_n(UINT32_MAX); +} + + int ompi_continue_progress_request(ompi_request_t *req) { if (in_progress) return 0; @@ -329,42 +341,39 @@ int ompi_continue_progress_request(ompi_request_t *req) /** - * Register the provided continuation request to be included in the - * global progress loop (used while a thread is waiting for the contnuation - * request to complete). - * We move all local continuations into the global continuation list - * and mark the continuation request such that future continuations - * are directly put into the global continuations list. - * Once the wait completed (i.e., all continuations registered with the - * continuation request) we unmark it (see ompi_continue_deregister_request_progress). + * Register the continuation request so that it will be progressed even if + * it is poll-only and the thread is waiting on the provided sync object. */ -int ompi_continue_register_request_progress(ompi_request_t *req) +int ompi_continue_register_request_progress(ompi_request_t *req, ompi_wait_sync_t *sync) { ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; - opal_atomic_lock(&cont_req->cont_lock); + lazy_list_t *cont_req_list = &thread_progress_list; - cont_req->cont_in_wait = true; - - ompi_continue_cont_req_release(cont_req, opal_list_get_size(cont_req->cont_complete_list), false); + /* check that the thread-local list is initialized */ + if (!cont_req_list->is_initialized) { + OBJ_CONSTRUCT(&cont_req_list->list, opal_list_t); + cont_req_list->is_initialized = true; + } - opal_atomic_unlock(&cont_req->cont_lock); + /* add the continuation request to the thread-local list */ + opal_list_append(&cont_req_list->list, &cont_req->super.super.super); - if (NULL == thread_progress_list) { - thread_progress_list = OBJ_NEW(opal_list_t); + /* register with the sync object */ + if (NULL != sync) { + sync->num_req_need_progress++; + sync->progress_cb = &ompi_continue_wait_progress_callback; } - - /* enqueue the continuation request to allow for progress by this thread */ - opal_list_append(thread_progress_list, &req->super.super); + cont_req->sync = sync; return OMPI_SUCCESS; } /** - * Remove the continuation request from being progressed by the global progress - * loop (after a wait completes). + * Remove the poll-only continuation request from the thread's progress list after + * it has completed. */ int ompi_continue_deregister_request_progress(ompi_request_t *req) { @@ -372,17 +381,13 @@ int ompi_continue_deregister_request_progress(ompi_request_t *req) if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; - /* make sure we execute all outstanding continuations */ - uint32_t tmp_max_poll = cont_req->continue_max_poll; - cont_req->continue_max_poll = UINT32_MAX; - ompi_continue_progress_request(req); - cont_req->continue_max_poll = tmp_max_poll; - - cont_req->cont_in_wait = false; - + /* let the sync know we're done, it may suspend the thread now */ + if (NULL != cont_req->sync) { + cont_req->sync->num_req_need_progress--; + } /* remove the continuation request from the thread-local progress list */ - opal_list_remove_item(thread_progress_list, &req->super.super); + opal_list_remove_item(&thread_progress_list.list, &req->super.super); return OMPI_SUCCESS; } @@ -439,13 +444,6 @@ ompi_continue_enqueue_runnable(ompi_continuation_t *cont) if (NULL != cont_req->cont_complete_list) { opal_atomic_lock(&cont_req->cont_lock); opal_list_append(cont_req->cont_complete_list, &cont->super.super); - if (cont_req->cont_in_wait) { - /* if a thread is waiting for this request to complete, signal completions - * the continuations will be executed at the end of the wait - * but we need to ensure that the request is marked complete first - */ - ompi_continue_cont_req_release(cont_req, 1, false); - } opal_atomic_unlock(&cont_req->cont_lock); } else { OPAL_THREAD_LOCK(&request_cont_lock); @@ -601,7 +599,6 @@ int ompi_continue_attach( requests[i] = MPI_REQUEST_NULL; } } - } } @@ -609,7 +606,7 @@ int ompi_continue_attach( int num_complete = count - num_registered; int32_t last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, -num_complete); - if (0 == last_num_active && 0 == num_registered) { + if (0 == last_num_active) { if (cont_req->cont_enqueue_complete) { /* enqueue for later processing */ ompi_continue_enqueue_runnable(cont); diff --git a/ompi/mpiext/continue/c/continuation.h b/ompi/mpiext/continue/c/continuation.h index 42648a917d1..d32a071148a 100644 --- a/ompi/mpiext/continue/c/continuation.h +++ b/ompi/mpiext/continue/c/continuation.h @@ -22,6 +22,8 @@ #include "ompi/mpiext/continue/c/mpiext_continue_c.h" +struct ompi_request_t; + BEGIN_C_DECLS /** @@ -38,18 +40,18 @@ int ompi_continuation_fini(void); * Register a request with local completion list for progressing through * the progress engine. */ -int ompi_continue_register_request_progress(ompi_request_t *cont_req); +int ompi_continue_register_request_progress(struct ompi_request_t *cont_req, ompi_wait_sync_t *sync); /** * Deregister a request with local completion list from progressing through * the progress engine. */ -int ompi_continue_deregister_request_progress(ompi_request_t *cont_req); +int ompi_continue_deregister_request_progress(struct ompi_request_t *cont_req); /** * Progress a continuation request that has local completions. */ -int ompi_continue_progress_request(ompi_request_t *cont_req); +int ompi_continue_progress_request(struct ompi_request_t *cont_req); /** * Attach a continuation to a set of operations represented by \c requests. @@ -60,9 +62,9 @@ int ompi_continue_progress_request(ompi_request_t *cont_req); * can be used to query for and progress outstanding continuations. */ int ompi_continue_attach( - ompi_request_t *cont_req, + struct ompi_request_t *cont_req, int count, - ompi_request_t *requests[], + struct ompi_request_t *requests[], MPIX_Continue_cb_function *cont_cb, void *cont_data, ompi_status_public_t statuses[]); @@ -71,7 +73,7 @@ int ompi_continue_attach( /** * Allocate a new continuation request. */ -int ompi_continue_allocate_request(ompi_request_t **cont_req, ompi_info_t *info); +int ompi_continue_allocate_request(struct ompi_request_t **cont_req, ompi_info_t *info); END_C_DECLS diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index a331c6be012..2c64af2768c 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -41,23 +41,8 @@ int ompi_request_default_wait( { ompi_request_t *req = *req_ptr; -#if OMPI_HAVE_MPI_EXT_CONTINUE - if (OMPI_REQUEST_CONT == req->req_type) { - /* let the continuations be processed as part of the global progress loop - * while we're waiting for their completion */ - ompi_continue_register_request_progress(req); - } -#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ - - ompi_request_wait_completion(req); -#if OMPI_HAVE_MPI_EXT_CONTINUE - if (OMPI_REQUEST_CONT == req->req_type) { - ompi_continue_deregister_request_progress(req); - } -#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ - #if OPAL_ENABLE_FT_MPI /* Special case for MPI_ANY_SOURCE */ if( MPI_ERR_PROC_FAILED_PENDING == req->req_status.MPI_ERROR ) { @@ -144,13 +129,6 @@ int ompi_request_default_wait_any(size_t count, request = requests[i]; -#if OMPI_HAVE_MPI_EXT_CONTINUE - if (OMPI_REQUEST_CONT == request->req_type) { - have_cont_req = true; - ompi_continue_register_request_progress(request); - } -#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ - /* Check for null or completed persistent request. For * MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE. */ @@ -167,6 +145,13 @@ int ompi_request_default_wait_any(size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + have_cont_req = true; + ompi_continue_register_request_progress(request, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { completed = i; @@ -319,7 +304,7 @@ int ompi_request_default_wait_all( size_t count, #if OMPI_HAVE_MPI_EXT_CONTINUE if (OMPI_REQUEST_CONT == request->req_type) { - ompi_continue_register_request_progress(request); + ompi_continue_register_request_progress(request, &sync); } #endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ @@ -590,7 +575,7 @@ int ompi_request_default_wait_some(size_t count, #if OMPI_HAVE_MPI_EXT_CONTINUE if (OMPI_REQUEST_CONT == request->req_type) { - ompi_continue_register_request_progress(request); + ompi_continue_register_request_progress(request, &sync); } #endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ diff --git a/ompi/request/request.h b/ompi/request/request.h index 0e6fb80cbf7..930578a20e2 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -40,6 +40,10 @@ #include "ompi/constants.h" #include "ompi/runtime/params.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + BEGIN_C_DECLS /** @@ -465,7 +469,20 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) WAIT_SYNC_INIT(&sync, 1); if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, &sync)) { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ SYNC_WAIT(&sync); + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } else { /* completed before we had a chance to swap in the sync object */ WAIT_SYNC_SIGNALLED(&sync); @@ -487,6 +504,13 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) } opal_atomic_rmb(); } else { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req, NULL); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ while(!REQUEST_COMPLETE(req)) { opal_progress(); #if OPAL_ENABLE_FT_MPI @@ -497,6 +521,12 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) } #endif /* OPAL_ENABLE_FT_MPI */ } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } } diff --git a/opal/mca/threads/base/wait_sync.c b/opal/mca/threads/base/wait_sync.c index 2451419620e..53d85004dfe 100644 --- a/opal/mca/threads/base/wait_sync.c +++ b/opal/mca/threads/base/wait_sync.c @@ -98,7 +98,15 @@ int ompi_sync_wait_mt(ompi_wait_sync_t *sync) */ check_status: if (sync != wait_sync_list && num_thread_in_progress >= opal_max_thread_in_progress) { - opal_thread_internal_cond_wait(&sync->condition, &sync->lock); + if (0 < sync->num_req_need_progress) { + /* release the lock so that we can be signaled */ + opal_thread_internal_mutex_unlock(&sync->lock); + sync->progress_cb(); + /* retake the lock */ + opal_thread_internal_mutex_lock(&sync->lock); + } else { + opal_thread_internal_cond_wait(&sync->condition, &sync->lock); + } /** * At this point either the sync was completed in which case diff --git a/opal/mca/threads/wait_sync.h b/opal/mca/threads/wait_sync.h index c90e3d52a5c..f7bffc392e1 100644 --- a/opal/mca/threads/wait_sync.h +++ b/opal/mca/threads/wait_sync.h @@ -46,6 +46,8 @@ typedef struct ompi_wait_sync_t { opal_thread_internal_mutex_t lock; struct ompi_wait_sync_t *next; struct ompi_wait_sync_t *prev; + opal_progress_callback_t progress_cb; + opal_atomic_int32_t num_req_need_progress; volatile bool signaling; } ompi_wait_sync_t; @@ -119,6 +121,8 @@ static inline int sync_wait_st(ompi_wait_sync_t *sync) opal_thread_internal_cond_init(&(sync)->condition); \ opal_thread_internal_mutex_init(&(sync)->lock, false); \ } \ + (sync)->progress_cb = NULL; \ + (sync)->num_req_need_progress = 0; \ } while (0) /** From e7192818e75fd76fefd6c5ccc3e54dd743318e2a Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 19 Apr 2022 09:36:00 -0400 Subject: [PATCH 18/20] Add tests for MPI continuations (single- and multi-threaded) Signed-off-by: Joseph Schuchart --- test/continuations/Makefile.am | 33 ++++++ test/continuations/continutions-mt.c | 164 +++++++++++++++++++++++++++ test/continuations/continutions.c | 122 ++++++++++++++++++++ 3 files changed, 319 insertions(+) create mode 100644 test/continuations/Makefile.am create mode 100644 test/continuations/continutions-mt.c create mode 100644 test/continuations/continutions.c diff --git a/test/continuations/Makefile.am b/test/continuations/Makefile.am new file mode 100644 index 00000000000..75a8ed59aa9 --- /dev/null +++ b/test/continuations/Makefile.am @@ -0,0 +1,33 @@ +# Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. +# +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +if PROJECT_OMPI + noinst_PROGRAMS = continuations continuations-mt continuations-persistent + continuations_SOURCES = continuations.c + continuations_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la + + continuations-mt_SOURCES = continuations-mt.c + continuations-mt_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations-mt_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la + + continuations-persistent_SOURCES = continuations-persistent.c + continuations-persistent_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations-persistent_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la +endif # PROJECT_OMPI + +distclean-local: + rm -rf *.dSYM .deps .libs *.log *.o *.trs $(noinst_PROGRAMS) Makefile + diff --git a/test/continuations/continutions-mt.c b/test/continuations/continutions-mt.c new file mode 100644 index 00000000000..5d7f99707c0 --- /dev/null +++ b/test/continuations/continutions-mt.c @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2016 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include +#include +#include +#include +#include +#include + +#include "mpi.h" +#include "mpi-ext.h" +//#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#define OMPI_HAVE_MPI_EXT_CONTINUE + +#ifdef OMPI_HAVE_MPI_EXT_CONTINUE + +/* Block a thread on a receive until we release it from the main thread */ +static void* thread_recv(void* data) { + MPI_Request req; + int val; + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Irecv(&val, 1, MPI_INT, rank, 1002, MPI_COMM_WORLD, &req); + MPI_Wait(&req, MPI_STATUS_IGNORE); + return NULL; +} + +static void complete_cnt_cb(MPI_Status *status, void *user_data) { + assert(user_data != NULL); + _Atomic int *cb_cnt = (_Atomic int*)user_data; + ++(*cb_cnt); +} + +int main(int argc, char *argv[]) +{ + MPI_Request cont_req, op_req, reqs[2]; + _Atomic int cb_cnt; + int val; + int rank, size; + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + assert(provided == MPI_THREAD_MULTIPLE); + + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + pthread_t thread; + + pthread_create(&thread, NULL, &thread_recv, NULL); + + /* give enough slack to allow the thread to enter the wait + * from now on the thread is stuck in MPI_Wait, owning progress + */ + sleep(2); + + /* initialize the continuation request */ + MPIX_Continue_init(&cont_req, MPI_INFO_NULL); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPI_STATUSES_IGNORE, cont_req); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + MPI_Request_free(&cont_req); + + /**************************************************************** + * Do the same thing, but with a poll-only continuation request + ****************************************************************/ + + MPI_Info info; + MPI_Info_create(&info); + MPI_Info_set(info, "mpi_continue_poll_only", "true"); + MPI_Info_set(info, "mpi_continue_enqueue_complete", "true"); + + /* initialize the continuation request */ + MPIX_Continue_init(&cont_req, info); + + MPI_Info_free(&info); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPI_STATUSES_IGNORE, cont_req); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + MPI_Request_free(&cont_req); + + /* release the blocked thread */ + MPI_Send(&val, 1, MPI_INT, rank, 1002, MPI_COMM_WORLD); + pthread_join(thread, NULL); + + MPI_Finalize(); + + return 0; +} +#else +int main(int argc, char *argv[]) +{ + return 77; +} +#endif /* HAVE_MEMKIND_H */ diff --git a/test/continuations/continutions.c b/test/continuations/continutions.c new file mode 100644 index 00000000000..b1309e5e3cc --- /dev/null +++ b/test/continuations/continutions.c @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2016 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include +#include +#include +#include + +#include "mpi.h" +#include "mpi-ext.h" + +#define OMPI_HAVE_MPI_EXT_CONTINUE + +#ifdef OMPI_HAVE_MPI_EXT_CONTINUE + +static void complete_cnt_cb(MPI_Status *status, void *user_data) { + assert(user_data != NULL); + printf("complete_cnt_cb \n"); + int *cb_cnt = (int*)user_data; + *cb_cnt = *cb_cnt + 1; +} + +int main(int argc, char *argv[]) +{ + MPI_Request cont_req, cont_req2, op_req, reqs[2]; + int cb_cnt; + int val; + int rank, size; + MPI_Init(&argc, &argv); + + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + /* initialize the continuation request */ + MPIX_Continue_init(&cont_req, MPI_INFO_NULL); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + //MPI_Waitall(2, reqs, MPI_STATUSES_IGNORE); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPI_STATUSES_IGNORE, cont_req); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + /** + * One send, one recv, two continuations in two continuation requests + */ + MPI_Info info; + MPI_Info_create(&info); + MPI_Info_set(info, "mpi_continue_poll_only", "true"); + MPI_Info_set(info, "mpi_continue_enqueue_complete", "true"); + + /* initialize a poll-only continuation request */ + MPIX_Continue_init(&cont_req2, info); + + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &op_req); + MPIX_Continue(&op_req, &complete_cnt_cb, &cb_cnt, MPI_STATUS_IGNORE, cont_req2); + assert(op_req == MPI_REQUEST_NULL); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + printf("Waiting for poll-only cont request %p to complete\n", cont_req2); + MPI_Wait(&cont_req2, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + MPI_Request_free(&cont_req); + MPI_Request_free(&cont_req2); + MPI_Finalize(); + + return 0; +} +#else +int main(int argc, char *argv[]) +{ + return 77; +} +#endif /* HAVE_MEMKIND_H */ From 22374294d6658b9e5d3d01f06b42ae83fec019c5 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Wed, 29 Jun 2022 14:55:23 -0400 Subject: [PATCH 19/20] Don't execute callbacks immediately if we didn't see any completed requests Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 4f96ab03908..d4b1d5d5ff7 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -604,8 +604,10 @@ int ompi_continue_attach( assert(count >= num_registered); int num_complete = count - num_registered; - int32_t last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, - -num_complete); + int32_t last_num_active = count; + if (num_complete > 0) { + last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, -num_complete); + } if (0 == last_num_active) { if (cont_req->cont_enqueue_complete) { /* enqueue for later processing */ From e85407f6607a1160975496c2d7e04d373b1a1c9f Mon Sep 17 00:00:00 2001 From: Christoph Niethammer Date: Wed, 7 Sep 2022 12:09:22 +0200 Subject: [PATCH 20/20] Fix issue with circular dependency in request.h Some static functions in request.h make use of continuation routines if continuations are enabled. This causes problems with the circular include dependency of both. To overcome this problem include request.h here without enabling features of the continuation extension. Note: This solution must be rethought if one wants to use the affected static functions from request.h in continuation.h itself. Signed-off-by: Christoph Niethammer --- ompi/mpiext/continue/c/continuation.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.h b/ompi/mpiext/continue/c/continuation.h index d32a071148a..6de7e73328f 100644 --- a/ompi/mpiext/continue/c/continuation.h +++ b/ompi/mpiext/continue/c/continuation.h @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * Copyright (c) 2020-2022 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2021 The University of Tennessee and The University * of Tennessee Research Foundation. All rights @@ -17,13 +17,16 @@ #include "ompi_config.h" #include "ompi/info/info.h" + +/* Workaround to prevent issues with static functions and circular dependency in request.h */ +#define OLD_OMPI_HAVE_MPI_EXT_CONTINUE OMPI_HAVE_MPI_EXT_CONTINUE +#undef OMPI_HAVE_MPI_EXT_CONTINUE #include "ompi/request/request.h" +#define OMPI_HAVE_MPI_EXT_CONTINUE OLD_OMPI_HAVE_MPI_EXT_CONTINUE #include "mpi.h" #include "ompi/mpiext/continue/c/mpiext_continue_c.h" -struct ompi_request_t; - BEGIN_C_DECLS /**