Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,28 @@ jobs:
cc: gcc-14
analyzer: on
# 32-bit Windows: without SSL/security because Chocolateley only provides 64-bit OpenSSL
'Windows 2022 with Visual Studio 2022 (Debug, x86, no security)':
'Windows 2025 with Visual Studio 2022 (Debug, x86, no security)':
arch: x86
image: windows-2022
image: windows-2025
ssl: off
security: off
idlc_xtests: off
generator: 'Visual Studio 17 2022'
'Windows 2022 with Visual Studio 2022 (Debug, x86_64)':
image: windows-2022
'Windows 2025 with Visual Studio 2022 (Debug, x86_64)':
image: windows-2025
idlc_xtests: off
generator: 'Visual Studio 17 2022'
'Windows 2022 with Visual Studio 2022 (Release, x86_64, no tests)':
image: windows-2022
'Windows 2025 with Visual Studio 2022 (Release, x86_64, no tests)':
image: windows-2025
build_type: Release
testing: off
idlc_xtests: off
generator: 'Visual Studio 17 2022'
'Windows 2019 with Visual Studio 2019 (RelWithDebInfo, x86_64)':
image: windows-2019
'Windows 2022 with Visual Studio 2022 (RelWithDebInfo, x86_64)':
image: windows-2022
build_type: RelWithDebInfo
idlc_xtests: off
generator: 'Visual Studio 16 2019'
generator: 'Visual Studio 17 2022'
#'Windows 2019 with GCC 10 (Debug, x86_64)':
# image: windows-2019
# build_type: Debug
Expand Down
4 changes: 3 additions & 1 deletion src/core/ddsc/src/dds__types.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ typedef struct dds_waitset {
acquired while holding an ancestor's lock, but a waitset must be capable of triggering on
events on its parent */
ddsrt_mutex_t wait_lock;
ddsrt_cond_t wait_cond;

/* etime: dds_waitset_wait timeout */
ddsrt_cond_etime_t wait_cond;
size_t nentities; /* [wait_lock] */
size_t ntriggered; /* [wait_lock] */
dds_attachment *entities; /* [wait_lock] 0 .. ntriggered are triggred, ntriggred .. nentities are not */
Expand Down
2 changes: 1 addition & 1 deletion src/core/ddsc/src/dds__writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void dds_writer_status_cb (void *entity, const struct ddsi_status_cb_data * data
void dds_writer_invoke_cbs_for_pending_events(struct dds_entity *e, uint32_t status);

/** @component writer */
dds_return_t dds__ddsi_writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout);
dds_return_t dds__ddsi_writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, ddsrt_mtime_t abstimeout);

/** @component writer */
dds_return_t dds_request_writer_loan (dds_writer *wr, enum dds_writer_loan_type loan_type, uint32_t sz, void **sample)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_psmx.c
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,8 @@ void dds_pubsub_message_exchange_fini (struct dds_domain *domain)
static dds_return_t dds_psmx_endpoint_write_wrapper (const struct dds_psmx_endpoint_int *psmx_endpoint, dds_loaned_sample_t *data, size_t keysz, const void *key)
{
(void) keysz; (void) key;
return psmx_endpoint->ext->ops.write (psmx_endpoint->ext, data);
// FreeRTOS #defines "write" ...
return (psmx_endpoint->ext->ops.write) (psmx_endpoint->ext, data);
}

static dds_return_t dds_psmx_endpoint_write_with_key_wrapper (const struct dds_psmx_endpoint_int *psmx_endpoint, dds_loaned_sample_t *data, size_t keysz, const void *key)
Expand Down
3 changes: 1 addition & 2 deletions src/core/ddsc/src/dds_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ dds_return_t dds_wait_for_acks (dds_entity_t publisher_or_writer, dds_duration_t
if ((ret = dds_entity_pin (publisher_or_writer, &p_or_w_ent)) < 0)
return ret;

const dds_time_t tnow = dds_time ();
const dds_time_t abstimeout = (DDS_INFINITY - timeout <= tnow) ? DDS_NEVER : (tnow + timeout);
const ddsrt_mtime_t abstimeout = ddsrt_mtime_add_duration (ddsrt_time_monotonic (), timeout);
switch (dds_entity_kind (p_or_w_ent))
{
case DDS_KIND_PUBLISHER:
Expand Down
9 changes: 5 additions & 4 deletions src/core/ddsc/src/dds_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ dds_entity_t dds_create_topic_impl (
{
ddsrt_mutex_lock (&gv->new_topic_lock);
gv->new_topic_version++;
ddsrt_cond_broadcast (&gv->new_topic_cond);
ddsrt_cond_etime_broadcast (&gv->new_topic_cond);
ddsrt_mutex_unlock (&gv->new_topic_lock);
}

Expand Down Expand Up @@ -910,6 +910,8 @@ static dds_entity_t dds_find_topic_impl (dds_find_scope_t scope, dds_entity_t pa

if (name == NULL || !is_valid_name (name))
return DDS_RETCODE_BAD_PARAMETER;
if (timeout < 0)
return DDS_RETCODE_BAD_PARAMETER;
if ((ret = dds_entity_pin (participant, &e)) < 0)
return ret;
if (e->m_kind != DDS_KIND_PARTICIPANT)
Expand All @@ -919,8 +921,7 @@ static dds_entity_t dds_find_topic_impl (dds_find_scope_t scope, dds_entity_t pa
}
dds_participant *pp_topic = (dds_participant *) e;
struct ddsi_domaingv * gv = &e->m_domain->gv;
const dds_time_t tnow = dds_time ();
const dds_time_t abstimeout = (DDS_INFINITY - timeout <= tnow) ? DDS_NEVER : (tnow + timeout);
const ddsrt_etime_t abstimeout = ddsrt_etime_add_duration (ddsrt_time_elapsed (), timeout);
do
{
ddsrt_mutex_lock (&gv->new_topic_lock);
Expand All @@ -937,7 +938,7 @@ static dds_entity_t dds_find_topic_impl (dds_find_scope_t scope, dds_entity_t pa
ddsrt_mutex_lock (&gv->new_topic_lock);
while (hdl != DDS_RETCODE_TIMEOUT && gv->new_topic_version == tv)
{
if (!ddsrt_cond_waituntil (&gv->new_topic_cond, &gv->new_topic_lock, abstimeout))
if (!ddsrt_cond_etime_waituntil (&gv->new_topic_cond, &gv->new_topic_lock, abstimeout))
hdl = DDS_RETCODE_TIMEOUT;
}
ddsrt_mutex_unlock (&gv->new_topic_lock);
Expand Down
27 changes: 15 additions & 12 deletions src/core/ddsc/src/dds_waitset.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static bool is_triggered (struct dds_entity *e)
return t;
}

static dds_return_t dds_waitset_wait_impl (dds_entity_t waitset, dds_attach_t *xs, size_t nxs, dds_time_t abstimeout)
static dds_return_t dds_waitset_wait_impl (dds_entity_t waitset, dds_attach_t *xs, size_t nxs, ddsrt_etime_t abstimeout)
{
dds_waitset *ws;
dds_return_t ret;
Expand Down Expand Up @@ -78,7 +78,7 @@ static dds_return_t dds_waitset_wait_impl (dds_entity_t waitset, dds_attach_t *x

/* Only wait/keep waiting when we have something to observe and there aren't any triggers yet. */
while (ws->nentities > 0 && ws->ntriggered == 0 && !dds_handle_is_closed (&ws->m_entity.m_hdllink))
if (!ddsrt_cond_waituntil (&ws->wait_cond, &ws->wait_lock, abstimeout))
if (!ddsrt_cond_etime_waituntil (&ws->wait_cond, &ws->wait_lock, abstimeout))
break;

ret = (int32_t) ws->ntriggered;
Expand All @@ -94,7 +94,7 @@ static void dds_waitset_interrupt (struct dds_entity *e)
dds_waitset *ws = (dds_waitset *) e;
ddsrt_mutex_lock (&ws->wait_lock);
assert (dds_handle_is_closed (&ws->m_entity.m_hdllink));
ddsrt_cond_broadcast (&ws->wait_cond);
ddsrt_cond_etime_broadcast (&ws->wait_cond);
ddsrt_mutex_unlock (&ws->wait_lock);
}

Expand All @@ -109,7 +109,7 @@ static void dds_waitset_close (struct dds_entity *e)
{
/* can't be pinned => being deleted => will be removed from wait set soon enough
and go through delete_observer (which will trigger the condition variable) */
ddsrt_cond_wait (&ws->wait_cond, &ws->wait_lock);
ddsrt_cond_etime_wait (&ws->wait_cond, &ws->wait_lock);
}
else
{
Expand All @@ -128,7 +128,7 @@ static dds_return_t dds_waitset_delete (struct dds_entity *e)
{
dds_waitset *ws = (dds_waitset *) e;
ddsrt_mutex_destroy (&ws->wait_lock);
ddsrt_cond_destroy (&ws->wait_cond);
ddsrt_cond_etime_destroy (&ws->wait_cond);
ddsrt_free (ws->entities);
return DDS_RETCODE_OK;
}
Expand Down Expand Up @@ -173,7 +173,7 @@ dds_entity_t dds_create_waitset (dds_entity_t owner)
dds_waitset *waitset = dds_alloc (sizeof (*waitset));
dds_entity_t hdl = dds_entity_init (&waitset->m_entity, e, DDS_KIND_WAITSET, false, true, NULL, NULL, 0);
ddsrt_mutex_init (&waitset->wait_lock);
ddsrt_cond_init (&waitset->wait_cond);
ddsrt_cond_etime_init (&waitset->wait_cond);
waitset->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (e, &waitset->m_entity);
waitset->nentities = 0;
Expand Down Expand Up @@ -234,7 +234,7 @@ static void dds_waitset_observer (struct dds_waitset *ws, dds_entity_t observed)
ws->entities[ws->ntriggered++] = tmp;
}
/* Trigger waitset to wake up. */
ddsrt_cond_broadcast (&ws->wait_cond);
ddsrt_cond_etime_broadcast (&ws->wait_cond);
ddsrt_mutex_unlock (&ws->wait_lock);
}

Expand All @@ -258,7 +258,7 @@ static bool dds_waitset_attach_observer (struct dds_waitset *ws, struct dds_enti
ws->entities[i] = ws->entities[ws->ntriggered];
ws->entities[ws->ntriggered++] = tmp;
}
ddsrt_cond_broadcast (&ws->wait_cond);
ddsrt_cond_etime_broadcast (&ws->wait_cond);
ddsrt_mutex_unlock (&ws->wait_lock);
return true;
}
Expand All @@ -282,7 +282,7 @@ static void dds_waitset_delete_observer (struct dds_waitset *ws, dds_entity_t ob
ws->entities[i] = ws->entities[--ws->nentities];
}
}
ddsrt_cond_broadcast (&ws->wait_cond);
ddsrt_cond_etime_broadcast (&ws->wait_cond);
ddsrt_mutex_unlock (&ws->wait_lock);
}

Expand Down Expand Up @@ -380,15 +380,18 @@ dds_return_t dds_waitset_detach (dds_entity_t waitset, dds_entity_t entity)

dds_return_t dds_waitset_wait_until (dds_entity_t waitset, dds_attach_t *xs, size_t nxs, dds_time_t abstimeout)
{
return dds_waitset_wait_impl (waitset, xs, nxs, abstimeout);
// FIXME: perhaps there's a better way?
const ddsrt_wctime_t tnow_wc = ddsrt_time_wallclock ();
const ddsrt_etime_t tnow_e = ddsrt_time_elapsed ();
const ddsrt_etime_t abstimeout_e = (abstimeout <= tnow_wc.v) ? (ddsrt_etime_t){0} : ddsrt_etime_add_duration (tnow_e, abstimeout - tnow_wc.v);
return dds_waitset_wait_impl (waitset, xs, nxs, abstimeout_e);
}

dds_return_t dds_waitset_wait (dds_entity_t waitset, dds_attach_t *xs, size_t nxs, dds_duration_t reltimeout)
{
if (reltimeout < 0)
return DDS_RETCODE_BAD_PARAMETER;
const dds_time_t tnow = dds_time ();
const dds_time_t abstimeout = (DDS_INFINITY - reltimeout <= tnow) ? DDS_NEVER : (tnow + reltimeout);
const ddsrt_etime_t abstimeout = ddsrt_etime_add_duration (ddsrt_time_elapsed (), reltimeout);
return dds_waitset_wait_impl (waitset, xs, nxs, abstimeout);
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/ddsc/src/dds_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ dds_entity_t dds_get_publisher (dds_entity_t writer)
}
}

dds_return_t dds__ddsi_writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout)
dds_return_t dds__ddsi_writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, ddsrt_mtime_t abstimeout)
{
/* during lifetime of the writer m_wr is constant, it is only during deletion that it
gets erased at some point */
Expand Down
2 changes: 1 addition & 1 deletion src/core/ddsc/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ idlc_generate(TARGET DynamicData FILES DynamicData.idl WARNINGS no-implicit-exte
if(ENABLE_TYPELIB)
idlc_generate(TARGET XSpace FILES XSpace.idl XSpaceEnum.idl XSpaceMustUnderstand.idl XSpaceTypeConsistencyEnforcement.idl WARNINGS no-implicit-extensibility no-inherit-appendable)
idlc_generate(TARGET XSpaceNoTypeInfo FILES XSpaceNoTypeInfo.idl NO_TYPE_INFO WARNINGS no-implicit-extensibility)
idlc_generate(TARGET TypeBuilderTypes FILES TypeBuilderTypes.idl WARNINGS no-implicit-extensibility)
idlc_generate(TARGET TypeBuilderTypes FILES TypeBuilderTypes.idl WARNINGS no-implicit-extensibility no-inherit-appendable)
idlc_generate(TARGET DynamicTypeTypes FILES DynamicTypeTypes.idl)
endif()

Expand Down
16 changes: 8 additions & 8 deletions src/core/ddsc/tests/redundantnw.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ enum logger_state {

struct logger_arg {
ddsrt_mutex_t lock;
ddsrt_cond_t cond;
ddsrt_cond_mtime_t cond;
bool enabled;
bool data_seen;
bool acknack_seen;
Expand Down Expand Up @@ -105,7 +105,7 @@ static void logger (void *ptr, const dds_log_data_t *data)
check_destination_addresses (msg, arg->mc_for_data);
arg->state[data->domid][thridx] = LST_INACTIVE;
arg->data_seen = true;
ddsrt_cond_broadcast (&arg->cond);
ddsrt_cond_mtime_broadcast (&arg->cond);
}
break;
case LST_ACKNACK:
Expand All @@ -114,7 +114,7 @@ static void logger (void *ptr, const dds_log_data_t *data)
check_destination_addresses (msg, false);
arg->state[data->domid][thridx] = LST_INACTIVE;
arg->acknack_seen = true;
ddsrt_cond_broadcast (&arg->cond);
ddsrt_cond_mtime_broadcast (&arg->cond);
}
break;
}
Expand All @@ -135,7 +135,7 @@ CU_Test (ddsc_redundant_networking, uc_data_on_all_intfs)
.state = { { LST_INACTIVE, LST_INACTIVE }, {LST_INACTIVE, LST_INACTIVE } }
};
ddsrt_mutex_init (&larg.lock);
ddsrt_cond_init (&larg.cond);
ddsrt_cond_mtime_init (&larg.cond);
dds_set_log_mask (DDS_LC_TRACE);
dds_set_log_sink (&logger, &larg);
dds_set_trace_sink (&logger, &larg);
Expand All @@ -158,7 +158,7 @@ CU_Test (ddsc_redundant_networking, uc_data_on_all_intfs)
CU_ASSERT_FATAL (rc == 0);
dds_set_log_sink (NULL, NULL);
dds_set_trace_sink (NULL, NULL);
ddsrt_cond_destroy (&larg.cond);
ddsrt_cond_mtime_destroy (&larg.cond);
ddsrt_mutex_destroy (&larg.lock);
return;
}
Expand Down Expand Up @@ -249,15 +249,15 @@ CU_Test (ddsc_redundant_networking, uc_data_on_all_intfs)

// The ACK can be processed before the "xpack_send" line is output by the sending tev thread
// this gives a bit of extra time
dds_time_t waituntil = dds_time () + DDS_SECS (1);
ddsrt_mtime_t waituntil = ddsrt_mtime_add_duration (ddsrt_time_monotonic (), DDS_SECS (1));
ddsrt_mutex_lock (&larg.lock);
while (!larg.acknack_seen)
ddsrt_cond_waituntil (&larg.cond, &larg.lock, waituntil);
ddsrt_cond_mtime_waituntil (&larg.cond, &larg.lock, waituntil);
ddsrt_mutex_unlock (&larg.lock);

dds_set_log_sink (NULL, NULL);
dds_set_trace_sink (NULL, NULL);
ddsrt_cond_destroy (&larg.cond);
ddsrt_cond_mtime_destroy (&larg.cond);
ddsrt_mutex_destroy (&larg.lock);

CU_ASSERT_FATAL (larg.data_seen && larg.acknack_seen);
Expand Down
20 changes: 10 additions & 10 deletions src/core/ddsc/tests/test_oneliner.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ typedef struct { char n[MAXDOMS + 1]; } entname_t;
cb->cb_##kind = kind; \
cb->cb_##name##_status = status; \
cb->cb_called[DDS_##NAME##_STATUS_ID]++; \
ddsrt_cond_broadcast (&cb->ctx->g_cond); \
ddsrt_cond_mtime_broadcast (&cb->ctx->g_cond); \
ddsrt_mutex_unlock (&cb->ctx->g_mutex); \
}

Expand All @@ -77,7 +77,7 @@ static void data_on_readers_cb (dds_entity_t subscriber, void *arg)
ddsrt_mutex_lock (&cb->ctx->g_mutex);
cb->cb_subscriber = subscriber;
cb->cb_called[DDS_DATA_ON_READERS_STATUS_ID]++;
ddsrt_cond_broadcast (&cb->ctx->g_cond);
ddsrt_cond_mtime_broadcast (&cb->ctx->g_cond);
ddsrt_mutex_unlock (&cb->ctx->g_mutex);
}

Expand All @@ -87,7 +87,7 @@ static void data_available_cb (dds_entity_t reader, void *arg)
ddsrt_mutex_lock (&cb->ctx->g_mutex);
cb->cb_reader = reader;
cb->cb_called[DDS_DATA_AVAILABLE_STATUS_ID]++;
ddsrt_cond_broadcast (&cb->ctx->g_cond);
ddsrt_cond_mtime_broadcast (&cb->ctx->g_cond);
ddsrt_mutex_unlock (&cb->ctx->g_mutex);
}

Expand Down Expand Up @@ -1702,15 +1702,15 @@ static void checklistener (struct oneliner_ctx *ctx, int ll, int ent, struct one
min_cnt = max_cnt = (uint32_t) cnt;
}
ddsrt_mutex_lock (&ctx->g_mutex);
const dds_time_t twait_begin = dds_time ();
const ddsrt_mtime_t twait_begin = ddsrt_time_monotonic ();
bool cnt_ok = (ctx->cb[dom].cb_called[lldesc[ll].id] >= min_cnt && ctx->cb[dom].cb_called[lldesc[ll].id] <= max_cnt);
while (ctx->cb[dom].cb_called[lldesc[ll].id] < min_cnt && signalled)
{
signalled = ddsrt_cond_waitfor (&ctx->g_cond, &ctx->g_mutex, DDS_SECS (5));
signalled = ddsrt_cond_mtime_waituntil (&ctx->g_cond, &ctx->g_mutex, ddsrt_mtime_add_duration (twait_begin, DDS_SECS (5)));
cnt_ok = (ctx->cb[dom].cb_called[lldesc[ll].id] >= min_cnt && ctx->cb[dom].cb_called[lldesc[ll].id] <= max_cnt);
}
const dds_time_t twait_end = dds_time ();
const dds_duration_t dt = (twait_end - twait_begin);
const ddsrt_mtime_t twait_end = ddsrt_time_monotonic ();
const dds_duration_t dt = (twait_end.v - twait_begin.v);
mprintf (ctx, " cb_called %"PRIu32" (%s) after %"PRId64".%06us", ctx->cb[dom].cb_called[lldesc[ll].id], cnt_ok ? "ok" : "fail", dt / DDS_NSECS_IN_SEC, (unsigned) (dt % DDS_NSECS_IN_SEC) / 1000);
if (!cnt_ok)
{
Expand Down Expand Up @@ -1808,7 +1808,7 @@ static void dowaitforack (struct oneliner_ctx *ctx)
if (dds_entity_kind (x) != DDS_KIND_WRITER)
error_dds (ctx, ret, "wait for ack: %s is not a writer", entname.n);
else
ret = dds__ddsi_writer_wait_for_acks ((struct dds_writer *) x, (ent1 < 0) ? NULL : &rdguid.i, dds_time () + DDS_SECS (5));
ret = dds__ddsi_writer_wait_for_acks ((struct dds_writer *) x, (ent1 < 0) ? NULL : &rdguid.i, ddsrt_mtime_add_duration (ddsrt_time_monotonic (), DDS_SECS (5)));
dds_entity_unpin (x);
if (ret != 0)
{
Expand Down Expand Up @@ -2273,7 +2273,7 @@ void test_oneliner_init (struct oneliner_ctx *ctx, const char *config_override)

ctx->mprintf_needs_timestamp = 1;
ddsrt_mutex_init (&ctx->g_mutex);
ddsrt_cond_init (&ctx->g_cond);
ddsrt_cond_mtime_init (&ctx->g_cond);

create_unique_topic_name ("test_oneliner", ctx->topicname, sizeof (ctx->topicname));
}
Expand Down Expand Up @@ -2326,7 +2326,7 @@ int test_oneliner_fini (struct oneliner_ctx *ctx)
}
}
ddsrt_mutex_destroy (&ctx->g_mutex);
ddsrt_cond_destroy (&ctx->g_cond);
ddsrt_cond_mtime_destroy (&ctx->g_cond);
for (size_t i = 0; i < sizeof (ctx->doms) / sizeof (ctx->doms[0]); i++)
if (ctx->doms[i] && (ret = dds_delete (ctx->doms[i])) != 0)
setresult (ctx, ret, "terminate: delete domain on %"PRId32, ctx->doms[i]);
Expand Down
2 changes: 1 addition & 1 deletion src/core/ddsc/tests/test_oneliner.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ struct oneliner_ctx {
int mprintf_needs_timestamp;

ddsrt_mutex_t g_mutex;
ddsrt_cond_t g_cond;
ddsrt_cond_mtime_t g_cond;
struct oneliner_cb cb[3];

const char *config_override; // optional
Expand Down
Loading
Loading