Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/PROTO: Disable protocols that need memory type copy #10490

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/ucp/am/eager_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ ucp_am_eager_multi_bcopy_proto_probe(const ucp_proto_init_params_t *init_params)
.opt_align_offs = UCP_PROTO_COMMON_OFFSET_INVALID
};

if (!ucp_am_check_init_params(init_params, UCP_PROTO_AM_OP_ID_MASK,
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV)) {
if (!ucp_am_check_init_params_avoid_copy(init_params,
UCP_PROTO_AM_OP_ID_MASK,
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV)) {
return;
}

Expand Down Expand Up @@ -220,8 +221,9 @@ ucp_am_eager_multi_zcopy_proto_probe(const ucp_proto_init_params_t *init_params)
.middle.tl_cap_flags = UCT_IFACE_FLAG_AM_ZCOPY
};

if (!ucp_am_check_init_params(init_params, UCP_PROTO_AM_OP_ID_MASK,
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV)) {
if (!ucp_am_check_init_params_avoid_copy(init_params,
UCP_PROTO_AM_OP_ID_MASK,
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV)) {
return;
}

Expand Down
12 changes: 6 additions & 6 deletions src/ucp/am/eager_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ ucp_am_eager_short_probe_common(const ucp_proto_init_params_t *init_params,
.tl_cap_flags = UCT_IFACE_FLAG_AM_SHORT
};

if (!ucp_am_check_init_params(init_params, UCS_BIT(op_id),
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV) ||
if (!ucp_am_check_init_params_avoid_copy(init_params, UCS_BIT(op_id),
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV) ||
!ucp_proto_is_short_supported(select_param)) {
return;
}
Expand Down Expand Up @@ -246,8 +246,8 @@ static void ucp_am_eager_single_bcopy_probe_common(
.tl_cap_flags = UCT_IFACE_FLAG_AM_BCOPY
};

if (!ucp_am_check_init_params(init_params, UCS_BIT(op_id),
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV)) {
if (!ucp_am_check_init_params_avoid_copy(init_params, UCS_BIT(op_id),
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV)) {
return;
}

Expand Down Expand Up @@ -338,8 +338,8 @@ static void ucp_am_eager_single_zcopy_probe_common(
.tl_cap_flags = UCT_IFACE_FLAG_AM_ZCOPY
};

if (!ucp_am_check_init_params(init_params, UCS_BIT(op_id),
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV) ||
if (!ucp_am_check_init_params_avoid_copy(init_params, UCS_BIT(op_id),
UCP_PROTO_SELECT_OP_FLAG_AM_RNDV) ||
(init_params->select_param->dt_class != UCP_DATATYPE_CONTIG)) {
return;
}
Expand Down
8 changes: 8 additions & 0 deletions src/ucp/am/ucp_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ ucp_am_check_init_params(const ucp_proto_init_params_t *init_params,
exclude_flags);
}

static UCS_F_ALWAYS_INLINE int
ucp_am_check_init_params_avoid_copy(const ucp_proto_init_params_t *init_params,
uint64_t op_id_mask, uint16_t exclude_flags)
{
return ucp_am_check_init_params(init_params, op_id_mask, exclude_flags) &&
!init_params->worker->context->config.ext.avoid_copy_mem_types;
}

static UCS_F_ALWAYS_INLINE int
ucp_proto_config_is_am(const ucp_proto_config_t *proto_config)
{
Expand Down
11 changes: 11 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ static ucs_config_field_t ucp_context_config_table[] = {
ucs_offsetof(ucp_context_config_t, rndv_frag_mem_types),
UCS_CONFIG_TYPE_BITMAP(ucs_memory_type_names)},

{"MEMTYPE_AVOID_COPY", "n",
"Avoid memory type copies when activated.\n",
ucs_offsetof(ucp_context_config_t, avoid_copy_mem_types), UCS_CONFIG_TYPE_BOOL},
Comment on lines +362 to +364
Copy link
Contributor

@rakhmets rakhmets Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{"MEMTYPE_AVOID_COPY", "n",
"Avoid memory type copies when activated.\n",
ucs_offsetof(ucp_context_config_t, avoid_copy_mem_types), UCS_CONFIG_TYPE_BOOL},
{"MEMTYPE_COPY_ENABLE", "y",
"Enable memory type copies",
ucs_offsetof(ucp_context_config_t, mem_type_copy_enable), UCS_CONFIG_TYPE_BOOL},

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok to fix from my side, since we had agreed on naming, @yosefe @brminich shall we go ahead with MEMTYPE_COPY_ENABLE?

Copy link
Contributor

@rakhmets rakhmets Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to explain my comment. We have dozens UCX variables ended by "ENABLE", but with "AVOID". And I think it is inconvenient to set a negative value to a variable with a negative in the name to enable a feature.


{"RNDV_PIPELINE_SEND_THRESH", "inf",
"RNDV size threshold to enable sender side pipeline for mem type",
ucs_offsetof(ucp_context_config_t, rndv_pipeline_send_thresh), UCS_CONFIG_TYPE_MEMUNITS},
Expand Down Expand Up @@ -1158,6 +1162,13 @@ static void ucp_add_tl_resource_if_enabled(

if (ucp_is_resource_enabled(resource, config, aux_tls, &rsc_flags,
dev_cfg_masks, tl_cfg_mask)) {
if (context->config.ext.avoid_copy_mem_types) {
if (!strcmp(resource->tl_name, "cuda_ipc")) {
ucs_debug("disabled cuda_ipc for memtype copy avoidance");
return;
}
}

if ((resource->sys_device != UCS_SYS_DEVICE_ID_UNKNOWN) &&
(resource->sys_device >= UCP_MAX_SYS_DEVICES)) {
ucs_diag(UCT_TL_RESOURCE_DESC_FMT
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ typedef struct ucp_context_config {
size_t rndv_num_frags[UCS_MEMORY_TYPE_LAST];
/** Memory types of fragments used for RNDV pipeline protocol */
uint64_t rndv_frag_mem_types;
/** Avoid memtype copies when set to true */
int avoid_copy_mem_types;
/** RNDV pipeline send threshold */
size_t rndv_pipeline_send_thresh;
/** Enabling 2-stage pipeline rndv protocol */
Expand Down
7 changes: 7 additions & 0 deletions src/ucp/rma/amo_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ static void ucp_proto_amo_probe(const ucp_proto_init_params_t *init_params,
int is_memtype)
{
ucp_worker_h worker = init_params->worker;
ucs_memory_type_t send_mem_type = init_params->select_param->mem_type;
ucs_memory_type_t reply_mem_type =
init_params->select_param->op.reply.mem_type;
ucp_proto_single_init_params_t params = {
Expand Down Expand Up @@ -181,6 +182,12 @@ static void ucp_proto_amo_probe(const ucp_proto_init_params_t *init_params,
return;
}

if ((!UCP_MEM_IS_ACCESSIBLE_FROM_CPU(send_mem_type) ||
!UCP_MEM_IS_ACCESSIBLE_FROM_CPU(reply_mem_type)) &&
init_params->worker->context->config.ext.avoid_copy_mem_types) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can disable it only for non-host memtypes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be fixed now

return;
}

if (op_id != UCP_OP_ID_AMO_POST) {
params.super.flags |= UCP_PROTO_COMMON_INIT_FLAG_RESPONSE;
if (!UCP_MEM_IS_ACCESSIBLE_FROM_CPU(reply_mem_type) &&
Expand Down
7 changes: 6 additions & 1 deletion src/ucp/rma/amo_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ static void ucp_proto_amo_sw_probe(const ucp_proto_init_params_t *init_params,
const ucp_ep_config_key_lane_t *lane_config;
const uct_iface_attr_t *iface_attr;

if (init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

/* If the endpoint has device atomic lanes, it means the target worker
expects only device atomics, so we cannot use SW atomics. */
ucs_carray_for_each(lane_config, ep_config_key->lanes,
Expand Down Expand Up @@ -485,7 +489,8 @@ ucp_proto_amo_sw_fetch_probe(const ucp_proto_init_params_t *init_params)
if (!ucp_proto_init_check_op(init_params,
UCS_BIT(UCP_OP_ID_AMO_FETCH) |
UCS_BIT(UCP_OP_ID_AMO_CSWAP)) ||
(init_params->select_param->dt_class != UCP_DATATYPE_CONTIG)) {
(init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rma/get_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ ucp_proto_get_am_bcopy_probe(const ucp_proto_init_params_t *init_params)
.tl_cap_flags = UCT_IFACE_FLAG_AM_BCOPY
};

if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_GET))) {
if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_GET)) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rma/get_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ ucp_proto_get_offload_bcopy_probe(const ucp_proto_init_params_t *init_params)
};

if ((init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) ||
!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_GET))) {
!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_GET)) ||
context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rma/put_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ ucp_proto_put_am_bcopy_probe(const ucp_proto_init_params_t *init_params)
.opt_align_offs = UCP_PROTO_COMMON_OFFSET_INVALID
};

if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_PUT))) {
if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_PUT)) ||
context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
6 changes: 4 additions & 2 deletions src/ucp/rma/put_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ ucp_proto_put_offload_short_probe(const ucp_proto_init_params_t *init_params)
};

if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_PUT)) ||
!ucp_proto_is_short_supported(init_params->select_param)) {
!ucp_proto_is_short_supported(init_params->select_param) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down Expand Up @@ -178,7 +179,8 @@ ucp_proto_put_offload_bcopy_probe(const ucp_proto_init_params_t *init_params)
.opt_align_offs = UCP_PROTO_COMMON_OFFSET_INVALID
};

if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_PUT))) {
if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_PUT)) ||
context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rndv/rndv_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ static void ucp_rndv_am_probe_common(ucp_proto_multi_init_params_t *params)
ucp_context_h context = params->super.super.worker->context;

if (!ucp_proto_rndv_op_check(&params->super.super, UCP_OP_ID_RNDV_SEND,
0)) {
0) ||
context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rndv/rndv_mtype.inl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ ucp_proto_rndv_mtype_init(const ucp_proto_init_params_t *init_params,

if ((init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) ||
(ucp_proto_rndv_mtype_ep(worker, frag_mem_type, mem_type) == NULL) ||
!ucp_proto_init_check_op(init_params, UCP_PROTO_RNDV_OP_ID_MASK)) {
!ucp_proto_init_check_op(init_params, UCP_PROTO_RNDV_OP_ID_MASK) ||
context->config.ext.avoid_copy_mem_types) {
return UCS_ERR_UNSUPPORTED;
}

Expand Down
6 changes: 4 additions & 2 deletions src/ucp/rndv/rndv_ppln.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ static size_t ucp_proto_rndv_ppln_pack_ack(void *dest, void *arg)
static void
ucp_proto_rndv_send_ppln_probe(const ucp_proto_init_params_t *init_params)
{
if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_RNDV_SEND))) {
if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_RNDV_SEND)) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down Expand Up @@ -356,7 +357,8 @@ ucp_proto_t ucp_rndv_send_ppln_proto = {
static void
ucp_proto_rndv_recv_ppln_probe(const ucp_proto_init_params_t *init_params)
{
if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_RNDV_RECV))) {
if (!ucp_proto_init_check_op(init_params, UCS_BIT(UCP_OP_ID_RNDV_RECV)) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/stream/stream_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ static void
ucp_stream_multi_common_probe(const ucp_proto_multi_init_params_t *params)
{
if (!ucp_proto_init_check_op(&params->super.super,
UCS_BIT(UCP_OP_ID_STREAM_SEND))) {
UCS_BIT(UCP_OP_ID_STREAM_SEND)) ||
params->super.super.worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/tag/eager_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ ucp_proto_eager_multi_probe_common(ucp_proto_multi_init_params_t *params,
{
ucp_context_config_t *context_config;

if (!ucp_tag_eager_check_op_id(&params->super.super, op_id, 0)) {
if (!ucp_tag_eager_check_op_id(&params->super.super, op_id, 0) ||
params->super.super.worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
9 changes: 6 additions & 3 deletions src/ucp/tag/eager_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ ucp_proto_eager_short_probe(const ucp_proto_init_params_t *init_params)

/* AM based proto can not be used if tag offload lane configured */
if (!ucp_tag_eager_check_op_id(init_params, UCP_OP_ID_TAG_SEND, 0) ||
!ucp_proto_is_short_supported(select_param)) {
!ucp_proto_is_short_supported(select_param) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down Expand Up @@ -147,7 +148,8 @@ ucp_proto_eager_bcopy_single_probe(const ucp_proto_init_params_t *init_params)
};

/* AM based proto can not be used if tag offload lane configured */
if (!ucp_tag_eager_check_op_id(init_params, UCP_OP_ID_TAG_SEND, 0)) {
if (!ucp_tag_eager_check_op_id(init_params, UCP_OP_ID_TAG_SEND, 0) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down Expand Up @@ -197,7 +199,8 @@ ucp_proto_eager_zcopy_single_probe(const ucp_proto_init_params_t *init_params)

/* AM based proto can not be used if tag offload lane configured */
if (!ucp_tag_eager_check_op_id(init_params, UCP_OP_ID_TAG_SEND, 0) ||
(init_params->select_param->dt_class != UCP_DATATYPE_CONTIG)) {
(init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
9 changes: 6 additions & 3 deletions src/ucp/tag/offload/eager.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ static void ucp_proto_eager_tag_offload_short_probe(
};

if (!ucp_tag_eager_check_op_id(init_params, UCP_OP_ID_TAG_SEND, 1) ||
!ucp_proto_is_short_supported(select_param)) {
!ucp_proto_is_short_supported(select_param) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down Expand Up @@ -146,7 +147,8 @@ static void ucp_proto_eager_tag_offload_bcopy_probe_common(
};

/* offload proto can not be used if no tag offload lane configured */
if (!ucp_tag_eager_check_op_id(init_params, op_id, 1)) {
if (!ucp_tag_eager_check_op_id(init_params, op_id, 1) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down Expand Up @@ -259,7 +261,8 @@ static void ucp_proto_eager_tag_offload_zcopy_probe_common(

/* offload proto can not be used if no tag offload lane configured */
if (!ucp_tag_eager_check_op_id(init_params, op_id, 1) ||
(init_params->select_param->dt_class != UCP_DATATYPE_CONTIG)) {
(init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) ||
init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/tag/offload/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ ucp_tag_rndv_offload_sw_proto_probe(const ucp_proto_init_params_t *init_params)
ucp_proto_rndv_ctrl_priv_t rpriv;

if (!ucp_tag_rndv_check_op_id(init_params) ||
!ucp_ep_config_key_has_tag_lane(init_params->ep_config_key)) {
!ucp_ep_config_key_has_tag_lane(init_params->ep_config_key) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a simple mock test to verify the protocol selection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to discuss because there are many protocols to tests in that case.

init_params->worker->context->config.ext.avoid_copy_mem_types) {
return;
}

Expand Down
Loading