Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/cdr/include/dds/cdr/dds_cdrstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ typedef struct dds_cdrstream_allocator {
} dds_cdrstream_allocator_t;

typedef struct dds_cdrstream_desc_key {
char *name; /* Name of key field */
uint32_t ops_offs; /* Offset for key ops */
uint32_t idx; /* Key index in containing type (definition order) */
} dds_cdrstream_desc_key_t;
Expand Down
7 changes: 7 additions & 0 deletions src/core/cdr/src/dds_cdrstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -6919,6 +6919,7 @@ static void copy_desc_keys (dds_cdrstream_desc_key_t **dst, const struct dds_cdr
*dst = allocator->malloc (nkeys * sizeof (**dst));
for (uint32_t i = 0; i < nkeys; i++)
{
(*dst)[i].name = ddsrt_strdup(keys[i].m_name);
(*dst)[i].ops_offs = keys[i].m_offset;
(*dst)[i].idx = keys[i].m_idx;
}
Expand Down Expand Up @@ -7036,6 +7037,12 @@ void dds_cdrstream_desc_fini (struct dds_cdrstream_desc *desc, const struct dds_
{
if (desc->keys.nkeys > 0)
{
for (size_t i = 0; i < desc->keys.nkeys; i++) {
if (desc->keys.keys != NULL)
allocator->free (desc->keys.keys[i].name);
if (desc->keys.keys_definition_order != NULL)
allocator->free (desc->keys.keys_definition_order[i].name);
}
if (desc->keys.keys != NULL)
allocator->free (desc->keys.keys);
if (desc->keys.keys_definition_order != NULL)
Expand Down
6 changes: 6 additions & 0 deletions src/core/ddsc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ prepend(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src/"
dds_heap_loan.c
dds_psmx.c
dds_guid.c
dds_sql_expr.c
dds_content_filter.c
dds_filter.c
)

if(ENABLE_TYPELIB)
Expand Down Expand Up @@ -93,6 +96,9 @@ prepend(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src/"
dds__sysdef_validation.h
dds__qos_provider.h
dds__guid.h
dds__sql_expr.h
dds__content_filter.h
dds__filter.h
)

prepend(hdrs_public_ddsc "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>/dds/"
Expand Down
34 changes: 34 additions & 0 deletions src/core/ddsc/include/dds/dds.h
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,10 @@ enum dds_topic_filter_mode {
* @warning Unstable API
*/
union dds_topic_filter_function_union {
/* FIXME:
* instead of deserialize whole sample, let user to deser. only specific
* field. (`key` only for the begining?)
* */
dds_topic_filter_sample_fn sample; /**< Use with mode dds_topic_filter_mode::DDS_TOPIC_FILTER_SAMPLE */
dds_topic_filter_sample_arg_fn sample_arg; /**< Use with mode dds_topic_filter_mode::DDS_TOPIC_FILTER_SAMPLE_ARG */
dds_topic_filter_sampleinfo_arg_fn sampleinfo_arg; /**< Use with mode dds_topic_filter_mode::DDS_TOPIC_FILTER_SAMPLEINFO_ARG */
Expand All @@ -1641,6 +1645,36 @@ struct dds_topic_filter {
void *arg; /**< Provide an argument, can be NULL */
};

typedef enum dds_topic_filter_mode dds_function_content_filter_mode_t;
typedef union dds_topic_filter_function_union dds_function_content_filter_fn_t;

typedef struct dds_expression_content_filter dds_expression_content_filter_t;
typedef struct dds_topic_filter dds_function_content_filter_t;

/**
* @brief Content filter container;
* @ingroup content_filter
*/
struct dds_content_filter {
dds_content_filter_kind_t kind; /**< Provide a content filter kind */
union {
dds_expression_content_filter_t *expr;
dds_function_content_filter_t *func;
} filter; /**< Provide a filter implementation container */
};

DDS_EXPORT dds_return_t dds_expression_filter_create (const char *expression, dds_expression_content_filter_t **filter);
DDS_EXPORT void dds_expression_filter_free (dds_expression_content_filter_t *filter);
DDS_EXPORT dds_return_t dds_expression_filter_bind_unsigned (dds_expression_content_filter_t *filter, size_t id, uint64_t param);
DDS_EXPORT dds_return_t dds_expression_filter_bind_integer (dds_expression_content_filter_t *filter, size_t id, int64_t param);
DDS_EXPORT dds_return_t dds_expression_filter_bind_real (dds_expression_content_filter_t *filter, size_t id, double param);
DDS_EXPORT dds_return_t dds_expression_filter_bind_string (dds_expression_content_filter_t *filter, size_t id, char *param);
DDS_EXPORT dds_return_t dds_expression_filter_bind_blob (dds_expression_content_filter_t *filter, size_t id, unsigned char *param, size_t param_sz);

DDS_EXPORT dds_return_t dds_function_filter_create (const dds_function_content_filter_mode_t mode, const dds_function_content_filter_fn_t fn, dds_function_content_filter_t **filter);
DDS_EXPORT void dds_function_filter_free (dds_function_content_filter_t *filter);
DDS_EXPORT dds_return_t dds_function_filter_bind_arg (dds_function_content_filter_t *filter, void *arg);

/**
* @anchor dds_set_topic_filter_and_arg
* @brief Sets a filter and filter argument on a topic.
Expand Down
32 changes: 29 additions & 3 deletions src/core/ddsc/include/dds/ddsc/dds_public_qos.h
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,18 @@ dds_qset_psmx_instances (
uint32_t n,
const char **values);

/**
* @ingroup qos_setters
* @component qos_obj
* @brief Set conent filter in a qos structure
*
* @param[in,out] qos - Pointer to a dds_qos_t structure that will store the policy
* @param[in] filter - Pointer to preiously created content filter struct
*/
DDS_EXPORT void
dds_qset_content_filter (
dds_qos_t *qos,
const dds_content_filter_t filter);

/**
* @defgroup qos_getters (QoS Getters)
Expand Down Expand Up @@ -1116,9 +1128,9 @@ DDS_EXPORT bool dds_qget_entity_name (const dds_qos_t *qos, char **name);
* @component qos_obj
* @brief Gets the names of the PSMX Instances set in a qos structure
*
* @param qos Pointer to a dds_qos_t structure
* @param n_out Number of PSMX Instance Names returned
* @param values Array of pointers to PSMX instance Names
* @param[in] qos - Pointer to a dds_qos_t structure
* @param[in,out] n_out - Number of PSMX Instance Names returned
* @param[in,out] values - Array of pointers to PSMX instance Names
* @return bool indicating success or failure
*/
DDS_EXPORT bool
Expand All @@ -1127,6 +1139,20 @@ dds_qget_psmx_instances (
uint32_t *n_out,
char ***values);

/**
* @ingroup qos_getters
* @component qos_obj
* @brief Get content filter set in qos structure
*
* @param[in] qos - Pointer to a dds_qos_t structure
* @param[in,out] filter - Pointer to content filter struct.
* @return bool indicating success or failure
*/
DDS_EXPORT bool
dds_qget_content_filter(
const dds_qos_t *qos,
dds_content_filter_t **filter);

#if defined (__cplusplus)
}
#endif
Expand Down
16 changes: 16 additions & 0 deletions src/core/ddsc/include/dds/ddsc/dds_public_qosdefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ typedef enum dds_qos_policy_id {
*/
typedef struct dds_qos dds_qos_t;

/**
* @brief QoS content filter datatype
* @ingroup qos
*/
typedef struct dds_content_filter dds_content_filter_t;

/**
* @brief Durability QoS: Applies to Topic, DataReader, DataWriter
* @ingroup qos
Expand Down Expand Up @@ -176,6 +182,16 @@ dds_type_consistency_kind_t;
*/
typedef int16_t dds_data_representation_id_t;

/**
* @brief Topic Filter QoS: Applies to Topic, DataReader, DataWriter
* @ingroup qos
* */
typedef enum dds_content_filter_kind
{
DDS_CONTENT_FILTER_EXPRESSION, /**< Filter set using expression */
DDS_CONTENT_FILTER_FUNCTION, /**< Filter set with filter function */
} dds_content_filter_kind_t;

#if defined (__cplusplus)
}
#endif
Expand Down
54 changes: 54 additions & 0 deletions src/core/ddsc/src/dds__content_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright(c) 2025 ZettaScale Technology and others
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
// v. 1.0 which is available at
// http://www.eclipse.org/org/documents/edl-v10.php.
//
// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause

#ifndef DDS__CONTENT_FILTER_H
#define DDS__CONTENT_FILTER_H

#include "dds__sql_expr.h"

#if defined (__cplusplus)
extern "C" {
#endif

#define DDS_EXPR_FILTER_PARAM_INTEGER DDS_SQL_TK_INTEGER
#define DDS_EXPR_FILTER_PARAM_UNSIGNED DDS_SQL_TK_UNSIGNED
#define DDS_EXPR_FILTER_PARAM_REAL DDS_SQL_TK_FLOAT
#define DDS_EXPR_FILTER_PARAM_STRING DDS_SQL_TK_STRING
#define DDS_EXPR_FILTER_PARAM_BLOB DDS_SQL_TK_BLOB

struct dds_expression_filter_param
{
int t;
union {
int64_t i; double d; uint64_t u;
} n;
union {
char *s;
unsigned char *u;
} s;
size_t sz;
};

struct dds_expression_content_filter
{
char *expression;
size_t nparam;
struct dds_expression_filter_param *param;
};

struct dds_content_filter *dds_content_filter_dup (const struct dds_content_filter *filter);
void dds_content_filter_free (struct dds_content_filter *filter);
bool dds_content_filter_valid (const struct dds_content_filter *filter);

#if defined (__cplusplus)
}
#endif

#endif // DDS__CONTENT_FILTER_H
30 changes: 30 additions & 0 deletions src/core/ddsc/src/dds__filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright(c) 2025 ZettaScale Technology and others
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
// v. 1.0 which is available at
// http://www.eclipse.org/org/documents/edl-v10.php.
//
// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause

#ifndef DDS__FILTER_H
#define DDS__FILTER_H

#include "dds__types.h"

#if defined (__cplusplus)
extern "C" {
#endif

dds_return_t dds_filter_create (dds_domainid_t domain_id, const struct dds_content_filter *filter, const struct ddsi_sertype *st, struct dds_filter **out);
void dds_filter_free (struct dds_filter *filter);
dds_return_t dds_filter_update (const struct dds_content_filter *filter, const struct ddsi_sertype *st, struct dds_filter *out);
bool dds_filter_reader_accept (const struct dds_filter *filter, const struct dds_reader *rd, const struct ddsi_serdata *sd, const struct dds_sample_info *si);
bool dds_filter_writer_accept (const struct dds_filter *filter, const struct dds_writer *wr, const void *sample);

#if defined (__cplusplus)
}
#endif

#endif // DDS__FILTER_H
7 changes: 4 additions & 3 deletions src/core/ddsc/src/dds__qos.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ extern "C" {
DDSI_QP_DEADLINE | DDSI_QP_LATENCY_BUDGET | DDSI_QP_OWNERSHIP | DDSI_QP_LIVELINESS | \
DDSI_QP_RELIABILITY | DDSI_QP_TRANSPORT_PRIORITY | DDSI_QP_LIFESPAN | \
DDSI_QP_DESTINATION_ORDER | DDSI_QP_HISTORY | DDSI_QP_RESOURCE_LIMITS | \
DDSI_QP_DATA_REPRESENTATION | DDSI_QP_ENTITY_NAME)
DDSI_QP_DATA_REPRESENTATION | DDSI_QP_ENTITY_NAME | DDSI_QP_CONTENT_FILTER)

#define DDS_PARTICIPANT_QOS_MASK \
(DDSI_QP_USER_DATA | DDSI_QP_ADLINK_ENTITY_FACTORY | DDSI_QP_CYCLONE_IGNORELOCAL | \
Expand All @@ -41,7 +41,7 @@ extern "C" {
DDSI_QP_RESOURCE_LIMITS | DDSI_QP_ADLINK_READER_DATA_LIFECYCLE | \
DDSI_QP_CYCLONE_IGNORELOCAL | DDSI_QP_PROPERTY_LIST | \
DDSI_QP_TYPE_CONSISTENCY_ENFORCEMENT | DDSI_QP_DATA_REPRESENTATION | \
DDSI_QP_ENTITY_NAME | DDSI_QP_PSMX)
DDSI_QP_ENTITY_NAME | DDSI_QP_PSMX | DDSI_QP_CONTENT_FILTER)

#define DDS_SUBSCRIBER_QOS_MASK \
(DDSI_QP_PARTITION | DDSI_QP_PRESENTATION | DDSI_QP_GROUP_DATA | \
Expand All @@ -54,7 +54,8 @@ extern "C" {
DDSI_QP_LIFESPAN | DDSI_QP_DESTINATION_ORDER | DDSI_QP_HISTORY | \
DDSI_QP_RESOURCE_LIMITS | DDSI_QP_ADLINK_WRITER_DATA_LIFECYCLE | \
DDSI_QP_CYCLONE_IGNORELOCAL | DDSI_QP_PROPERTY_LIST | DDSI_QP_DATA_REPRESENTATION | \
DDSI_QP_ENTITY_NAME | DDSI_QP_PSMX | DDSI_QP_CYCLONE_WRITER_BATCHING)
DDSI_QP_ENTITY_NAME | DDSI_QP_PSMX | DDSI_QP_CYCLONE_WRITER_BATCHING | \
DDSI_QP_CONTENT_FILTER)

/** @component qos_obj */
dds_return_t dds_ensure_valid_data_representation (dds_qos_t *qos, uint32_t allowed_data_representations, dds_data_type_properties_t data_type_props, dds_entity_kind_t entitykind);
Expand Down
Loading