diff --git a/.cmake-format b/.cmake-format index c1a8e85a8..f5f413d51 100644 --- a/.cmake-format +++ b/.cmake-format @@ -26,7 +26,8 @@ with section("parse"): 'kwargs': { 'NAME': '*', 'SRCS': '*', - 'LIBS': '*'}}, + 'LIBS': '*', + 'ENVS': '*'}}, 'add_umf_library': { "pargs": 0, "flags": [], diff --git a/.github/workflows/reusable_compatibility.yml b/.github/workflows/reusable_compatibility.yml index 5bf9bd817..5116a59f4 100644 --- a/.github/workflows/reusable_compatibility.yml +++ b/.github/workflows/reusable_compatibility.yml @@ -94,9 +94,11 @@ jobs: - name: Run "tag" UMF tests with latest UMF libs (warnings enabled) working-directory: ${{github.workspace}}/tag_version/build + # GTEST_FILTER is used below to skip test that is not compatible run: > UMF_LOG="level:warning;flush:debug;output:stderr;pid:no" LD_LIBRARY_PATH=${{github.workspace}}/latest_version/build/lib/ + GTEST_FILTER="-*umfIpcTest.GetPoolByOpenedHandle*" ctest --verbose windows: @@ -181,6 +183,7 @@ jobs: working-directory: ${{github.workspace}}/tag_version/build run: | $env:UMF_LOG="level:warning;flush:debug;output:stderr;pid:no" + $env:GTEST_FILTER="-*umfIpcTest.GetPoolByOpenedHandle*" cp ${{github.workspace}}/latest_version/build/bin/Debug/umf.dll ${{github.workspace}}/tag_version/build/bin/Debug/umf.dll ctest -C Debug --verbose @@ -230,8 +233,10 @@ jobs: - name: Run "tag" UMF tests working-directory: ${{github.workspace}}/tag_version/build - run: | - LD_LIBRARY_PATH=${{github.workspace}}/tag_version/build/lib/ ctest --output-on-failure + run: > + LD_LIBRARY_PATH=${{github.workspace}}/tag_version/build/lib/ + GTEST_FILTER="-*umfIpcTest.GetPoolByOpenedHandle*" + ctest --output-on-failure - name: Checkout latest UMF version uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 @@ -266,4 +271,5 @@ jobs: run: > UMF_LOG="level:warning;flush:debug;output:stderr;pid:no" LD_LIBRARY_PATH=${{github.workspace}}/latest_version/build/lib/ + GTEST_FILTER="-*umfIpcTest.GetPoolByOpenedHandle*" ctest --verbose -E "not_impl" diff --git a/docs/config/api.rst b/docs/config/api.rst index 1c20d709c..97e664d97 100644 --- a/docs/config/api.rst +++ b/docs/config/api.rst @@ -168,6 +168,26 @@ IPC API allows retrieving IPC handles for the memory buffers allocated from UMF memory pools. The memory provider used by the pool should support IPC operations for this API to work. Otherwise IPC APIs return an error. +IPC caching +------------------------------------------ + +UMF employs IPC caching to avoid multiple IPC handles being created for the same +coarse-grain memory region allocated by the memory provider. UMF guarantees that +for each coarse-grain memory region allocated by the memory provider, only one +IPC handle is created when the :any:`umfGetIPCHandle` function is called. All +subsequent calls to the :any:`umfGetIPCHandle` function for the pointer to the +same memory region will return the entry from the cache. + +The same is true for the :any:`umfOpenIPCHandle` function. The actual mapping +of the IPC handle to the virtual address space is created only once, and all +subsequent calls to open the same IPC handle will return the entry from the cache. +The size of the cache for opened IPC handles is controlled by the ``UMF_MAX_OPENED_IPC_HANDLES`` +environment variable. By default, the cache size is unlimited. However, if the environment +variable is set and the cache size exceeds the limit, old items will be evicted. UMF tracks +the ref count for each entry in the cache and can evict only items with the ref count equal to 0. +The ref count is increased when the :any:`umfOpenIPCHandle` function is called and decreased +when the :any:`umfCloseIPCHandle` function is called for the corresponding IPC handle. + .. _ipc-api: IPC API diff --git a/src/ipc.c b/src/ipc.c index 12c7bb978..d4e5cc806 100644 --- a/src/ipc.c +++ b/src/ipc.c @@ -146,19 +146,15 @@ umf_result_t umfOpenIPCHandle(umf_ipc_handler_handle_t hIPCHandler, } umf_result_t umfCloseIPCHandle(void *ptr) { - umf_alloc_info_t allocInfo; - umf_result_t ret = umfMemoryTrackerGetAllocInfo(ptr, &allocInfo); + umf_ipc_info_t ipcInfo; + umf_result_t ret = umfMemoryTrackerGetIpcInfo(ptr, &ipcInfo); if (ret != UMF_RESULT_SUCCESS) { - LOG_ERR("cannot get alloc info for ptr = %p.", ptr); + LOG_ERR("cannot get IPC info for ptr = %p.", ptr); return ret; } - // We cannot use umfPoolGetMemoryProvider function because it returns - // upstream provider but we need tracking one - umf_memory_provider_handle_t hProvider = allocInfo.pool->provider; - - return umfMemoryProviderCloseIPCHandle(hProvider, allocInfo.base, - allocInfo.baseSize); + return umfMemoryProviderCloseIPCHandle(ipcInfo.provider, ipcInfo.base, + ipcInfo.baseSize); } umf_result_t umfPoolGetIPCHandler(umf_memory_pool_handle_t hPool, diff --git a/src/ipc_cache.c b/src/ipc_cache.c index 6d5d39e4f..bf17a66a4 100644 --- a/src/ipc_cache.c +++ b/src/ipc_cache.c @@ -54,6 +54,22 @@ typedef struct ipc_opened_cache_t { ipc_opened_cache_global_t *IPC_OPENED_CACHE_GLOBAL = NULL; +// Returns value of the UMF_MAX_OPENED_IPC_HANDLES environment variable +// or 0 if it is not set. +static size_t umfIpcCacheGlobalInitMaxOpenedHandles(void) { + const char *max_size_str = getenv("UMF_MAX_OPENED_IPC_HANDLES"); + if (max_size_str) { + char *endptr; + size_t max_size = strtoul(max_size_str, &endptr, 10); + if (*endptr == '\0') { + return max_size; + } + LOG_ERR("Invalid value of UMF_MAX_OPENED_IPC_HANDLES: %s", + max_size_str); + } + return 0; +} + umf_result_t umfIpcCacheGlobalInit(void) { umf_result_t ret = UMF_RESULT_SUCCESS; ipc_opened_cache_global_t *cache_global = @@ -78,8 +94,7 @@ umf_result_t umfIpcCacheGlobalInit(void) { goto err_mutex_destroy; } - // TODO: make max_size configurable via environment variable - cache_global->max_size = 0; + cache_global->max_size = umfIpcCacheGlobalInitMaxOpenedHandles(); cache_global->cur_size = 0; cache_global->lru_list = NULL; @@ -191,7 +206,19 @@ umf_result_t umfIpcOpenedCacheGet(ipc_opened_cache_handle_t cache, if (entry == NULL && cache->global->max_size != 0 && cache->global->cur_size >= cache->global->max_size) { // If max_size is set and the cache is full, evict the least recently used entry. - entry = cache->global->lru_list->prev; + // we need to search for the least recently used entry with ref_count == 0 + // The utlist implementation of the doubly-linked list keeps a tail pointer in head->prev + ipc_opened_cache_entry_t *candidate = cache->global->lru_list->prev; + do { + uint64_t ref_count = 0; + utils_atomic_load_acquire_u64(&candidate->ref_count, + &ref_count); + if (ref_count == 0) { + entry = candidate; + break; + } + candidate = candidate->prev; + } while (candidate != cache->global->lru_list->prev); } if (entry) { // we have eviction candidate @@ -244,3 +271,20 @@ umf_result_t umfIpcOpenedCacheGet(ipc_opened_cache_handle_t cache, return ret; } + +umf_result_t +umfIpcHandleMappedCacheRelease(ipc_opened_cache_value_t *cacheValue) { + if (!cacheValue) { + LOG_ERR("cacheValue is NULL"); + return UMF_RESULT_ERROR_INVALID_ARGUMENT; + } + + // get pointer to the entry + ipc_opened_cache_entry_t *entry = + (ipc_opened_cache_entry_t *)((char *)cacheValue - + offsetof(ipc_opened_cache_entry_t, value)); + // decrement the ref count + utils_atomic_decrement_u64(&entry->ref_count); + + return UMF_RESULT_SUCCESS; +} diff --git a/src/ipc_cache.h b/src/ipc_cache.h index 80870d373..545c6e1e7 100644 --- a/src/ipc_cache.h +++ b/src/ipc_cache.h @@ -47,4 +47,6 @@ umf_result_t umfIpcOpenedCacheGet(ipc_opened_cache_handle_t cache, uint64_t handle_id, ipc_opened_cache_value_t **retEntry); +umf_result_t +umfIpcHandleMappedCacheRelease(ipc_opened_cache_value_t *cacheValue); #endif /* UMF_IPC_CACHE_H */ diff --git a/src/provider/provider_tracking.c b/src/provider/provider_tracking.c index bc560304c..92d3dd59c 100644 --- a/src/provider/provider_tracking.c +++ b/src/provider/provider_tracking.c @@ -21,6 +21,7 @@ #include "critnib.h" #include "ipc_cache.h" #include "ipc_internal.h" +#include "memory_pool_internal.h" #include "provider_tracking.h" #include "utils_common.h" #include "utils_concurrency.h" @@ -38,6 +39,8 @@ struct umf_memory_tracker_t { // for another memory pool (nested memory pooling). critnib *alloc_segments_map[MAX_LEVELS_OF_ALLOC_SEGMENT_MAP]; utils_mutex_t splitMergeMutex; + umf_ba_pool_t *ipc_info_allocator; + critnib *ipc_segments_map; }; typedef struct tracker_alloc_info_t { @@ -49,6 +52,12 @@ typedef struct tracker_alloc_info_t { size_t n_children; } tracker_alloc_info_t; +typedef struct tracker_ipc_info_t { + size_t size; + umf_memory_provider_handle_t provider; + ipc_opened_cache_value_t *ipc_cache_value; +} tracker_ipc_info_t; + // Get the most nested (on the highest level) allocation segment in the map with the `ptr` key. // If `no_children` is set to 1, the function will return the entry // only if it has no children on the higher level. @@ -267,6 +276,72 @@ static umf_result_t umfMemoryTrackerRemove(umf_memory_tracker_handle_t hTracker, return UMF_RESULT_SUCCESS; } +static umf_result_t +umfMemoryTrackerAddIpcSegment(umf_memory_tracker_handle_t hTracker, + const void *ptr, size_t size, + umf_memory_provider_handle_t provider, + ipc_opened_cache_value_t *cache_entry) { + assert(hTracker); + assert(provider); + assert(cache_entry); + + tracker_ipc_info_t *value = umf_ba_alloc(hTracker->ipc_info_allocator); + + if (value == NULL) { + LOG_ERR("failed to allocate tracker_ipc_info_t, ptr=%p, size=%zu", ptr, + size); + return UMF_RESULT_ERROR_OUT_OF_HOST_MEMORY; + } + + value->size = size; + value->provider = provider; + value->ipc_cache_value = cache_entry; + + int ret = + critnib_insert(hTracker->ipc_segments_map, (uintptr_t)ptr, value, 0); + if (ret == 0) { + LOG_DEBUG("IPC memory region is added, tracker=%p, ptr=%p, size=%zu, " + "provider=%p, cache_entry=%p", + (void *)hTracker, ptr, size, provider, cache_entry); + return UMF_RESULT_SUCCESS; + } + + LOG_ERR("failed to insert tracker_ipc_info_t, ret=%d, ptr=%p, size=%zu, " + "provider=%p, cache_entry=%p", + ret, ptr, size, provider, cache_entry); + + umf_ba_free(hTracker->ipc_info_allocator, value); + + if (ret == ENOMEM) { + return UMF_RESULT_ERROR_OUT_OF_HOST_MEMORY; + } + + return UMF_RESULT_ERROR_UNKNOWN; +} + +static umf_result_t +umfMemoryTrackerRemoveIpcSegment(umf_memory_tracker_handle_t hTracker, + const void *ptr) { + assert(ptr); + + void *value = critnib_remove(hTracker->ipc_segments_map, (uintptr_t)ptr); + + if (!value) { + LOG_ERR("pointer %p not found in the ipc_segments_map", ptr); + return UMF_RESULT_ERROR_UNKNOWN; + } + + tracker_ipc_info_t *v = value; + + LOG_DEBUG("IPC memory region removed: tracker=%p, ptr=%p, size=%zu, " + "provider=%p, cache_entry=%p", + (void *)hTracker, ptr, v->size, v->provider, v->ipc_cache_value); + + umf_ba_free(hTracker->ipc_info_allocator, value); + + return UMF_RESULT_SUCCESS; +} + umf_memory_pool_handle_t umfMemoryTrackerGetPool(const void *ptr) { umf_alloc_info_t allocInfo = {NULL, 0, NULL}; umf_result_t ret = umfMemoryTrackerGetAllocInfo(ptr, &allocInfo); @@ -331,6 +406,41 @@ umf_result_t umfMemoryTrackerGetAllocInfo(const void *ptr, return UMF_RESULT_SUCCESS; } +umf_result_t umfMemoryTrackerGetIpcInfo(const void *ptr, + umf_ipc_info_t *pIpcInfo) { + assert(pIpcInfo); + + if (ptr == NULL) { + return UMF_RESULT_ERROR_INVALID_ARGUMENT; + } + + if (TRACKER == NULL) { + LOG_ERR("tracker does not exist"); + return UMF_RESULT_ERROR_NOT_SUPPORTED; + } + + if (TRACKER->ipc_segments_map == NULL) { + LOG_ERR("tracker's ipc_segments_map does not exist"); + return UMF_RESULT_ERROR_NOT_SUPPORTED; + } + + uintptr_t rkey; + tracker_ipc_info_t *rvalue = NULL; + int found = critnib_find(TRACKER->ipc_segments_map, (uintptr_t)ptr, FIND_LE, + (void *)&rkey, (void **)&rvalue); + if (!found || (uintptr_t)ptr >= rkey + rvalue->size) { + LOG_DEBUG("pointer %p not found in the tracker, TRACKER=%p", ptr, + (void *)TRACKER); + return UMF_RESULT_ERROR_INVALID_ARGUMENT; + } + + pIpcInfo->base = (void *)rkey; + pIpcInfo->baseSize = rvalue->size; + pIpcInfo->provider = rvalue->provider; + + return UMF_RESULT_SUCCESS; +} + // Cache entry structure to store provider-specific IPC data. // providerIpcData is a Flexible Array Member because its size varies // depending on the provider. @@ -872,17 +982,17 @@ ipcOpenedCacheEvictionCallback(const ipc_opened_cache_key_t *key, const ipc_opened_cache_value_t *value) { umf_tracking_memory_provider_t *p = (umf_tracking_memory_provider_t *)key->local_provider; - // umfMemoryTrackerRemove should be called before umfMemoryProviderCloseIPCHandle + // umfMemoryTrackerRemoveIpcSegment should be called before umfMemoryProviderCloseIPCHandle // to avoid a race condition. If the order would be different, other thread - // could allocate the memory at address `ptr` before a call to umfMemoryTrackerRemove + // could allocate the memory at address `ptr` before a call to umfMemoryTrackerRemoveIpcSegment // resulting in inconsistent state. if (value->mapped_base_ptr) { - umf_result_t ret = - umfMemoryTrackerRemove(p->hTracker, value->mapped_base_ptr); + umf_result_t ret = umfMemoryTrackerRemoveIpcSegment( + p->hTracker, value->mapped_base_ptr); if (ret != UMF_RESULT_SUCCESS) { // DO NOT return an error here, because the tracking provider // cannot change behaviour of the upstream provider. - LOG_ERR("failed to remove the region from the tracker, ptr=%p, " + LOG_ERR("failed to remove the region from the IPC tracker, ptr=%p, " "size=%zu, ret = %d", value->mapped_base_ptr, value->mapped_size, ret); } @@ -895,12 +1005,13 @@ ipcOpenedCacheEvictionCallback(const ipc_opened_cache_key_t *key, } } -static umf_result_t upstreamOpenIPCHandle(umf_tracking_memory_provider_t *p, - void *providerIpcData, - size_t bufferSize, void **ptr) { +static umf_result_t +upstreamOpenIPCHandle(umf_tracking_memory_provider_t *p, void *providerIpcData, + size_t bufferSize, + ipc_opened_cache_value_t *cache_entry) { void *mapped_ptr = NULL; assert(p != NULL); - assert(ptr != NULL); + assert(cache_entry != NULL); umf_result_t ret = umfMemoryProviderOpenIPCHandle( p->hUpstream, providerIpcData, &mapped_ptr); if (ret != UMF_RESULT_SUCCESS) { @@ -909,7 +1020,21 @@ static umf_result_t upstreamOpenIPCHandle(umf_tracking_memory_provider_t *p, } assert(mapped_ptr != NULL); - ret = umfMemoryTrackerAdd(p->hTracker, p->pool, mapped_ptr, bufferSize); + // Today umfMemoryTrackerAddIpcSegment requires the memory provider handle + // to know which tracking provider instance opened the IPC handle. + // The `p` points to the tracking provider private data. + // Because of that we get handle to the tracking provider instance + // using `p->pool->provider`. + // + // TODO: + // Today we always create a pool and get an IPC handler from the pool. + // And tracking provider is always created together with a pool. + // And the IPC handler is a tracking memory provider in fact. + // However, we are considering adding an API that allows IPC handler creation + // from scratch (without creating a memory pool). In that case, we will + // create a tracker provider without a pool. So p->pool might be NULL in the future. + ret = umfMemoryTrackerAddIpcSegment(p->hTracker, mapped_ptr, bufferSize, + p->pool->provider, cache_entry); if (ret != UMF_RESULT_SUCCESS) { LOG_ERR("failed to add IPC region to the tracker, ptr=%p, " "size=%zu, " @@ -924,7 +1049,8 @@ static umf_result_t upstreamOpenIPCHandle(umf_tracking_memory_provider_t *p, return ret; } - *ptr = mapped_ptr; + cache_entry->mapped_size = bufferSize; + utils_atomic_store_release_ptr(&(cache_entry->mapped_base_ptr), mapped_ptr); return UMF_RESULT_SUCCESS; } @@ -959,45 +1085,46 @@ static umf_result_t trackingOpenIpcHandle(void *provider, void *providerIpcData, void *mapped_ptr = NULL; utils_atomic_load_acquire_ptr(&(cache_entry->mapped_base_ptr), (void **)&mapped_ptr); - if (mapped_ptr == NULL) { + if (mapped_ptr == NULL) { // new cache entry utils_mutex_lock(&(cache_entry->mmap_lock)); utils_atomic_load_acquire_ptr(&(cache_entry->mapped_base_ptr), (void **)&mapped_ptr); if (mapped_ptr == NULL) { ret = upstreamOpenIPCHandle(p, providerIpcData, - ipcUmfData->baseSize, &mapped_ptr); - if (ret == UMF_RESULT_SUCCESS) { - // Put to the cache - cache_entry->mapped_size = ipcUmfData->baseSize; - utils_atomic_store_release_ptr(&(cache_entry->mapped_base_ptr), - mapped_ptr); - } + ipcUmfData->baseSize, cache_entry); } + mapped_ptr = cache_entry->mapped_base_ptr; utils_mutex_unlock(&(cache_entry->mmap_lock)); } if (ret == UMF_RESULT_SUCCESS) { + assert(mapped_ptr != NULL); *ptr = mapped_ptr; } return ret; } +static tracker_ipc_info_t *getTrackerIpcInfo(const void *ptr) { + assert(ptr); + + uintptr_t key = (uintptr_t)ptr; + tracker_ipc_info_t *value = critnib_get(TRACKER->ipc_segments_map, key); + + return value; +} + static umf_result_t trackingCloseIpcHandle(void *provider, void *ptr, size_t size) { (void)provider; - (void)ptr; - (void)size; - // We keep opened IPC handles in the p->hIpcMappedCache. - // IPC handle is closed when it is evicted from the cache - // or when cache is destroyed. - // - // TODO: today the size of the IPC cache is infinite. - // When the threshold for the cache size is implemented - // we need to introduce a reference counting mechanism. - // The trackingOpenIpcHandle will increment the refcount for the corresponding entry. - // The trackingCloseIpcHandle will decrement the refcount for the corresponding cache entry. - return UMF_RESULT_SUCCESS; + tracker_ipc_info_t *trackerIpcInfo = getTrackerIpcInfo(ptr); + + if (!trackerIpcInfo) { + LOG_ERR("failed to get tracker ipc info, ptr=%p, size=%zu", ptr, size); + return UMF_RESULT_ERROR_INVALID_ARGUMENT; + } + + return umfIpcHandleMappedCacheRelease(trackerIpcInfo->ipc_cache_value); } umf_memory_provider_ops_t UMF_TRACKING_MEMORY_PROVIDER_OPS = { @@ -1086,16 +1213,29 @@ umf_memory_tracker_handle_t umfMemoryTrackerCreate(void) { for (i = 0; i < MAX_LEVELS_OF_ALLOC_SEGMENT_MAP; i++) { handle->alloc_segments_map[i] = critnib_new(); if (!handle->alloc_segments_map[i]) { - goto err_destroy_mutex; + goto err_destroy_alloc_segments_map; } } + handle->ipc_info_allocator = + umf_ba_create(sizeof(struct tracker_ipc_info_t)); + if (!handle->ipc_info_allocator) { + goto err_destroy_alloc_segments_map; + } + + handle->ipc_segments_map = critnib_new(); + if (!handle->ipc_segments_map) { + goto err_destroy_ipc_info_allocator; + } + LOG_DEBUG("tracker created, handle=%p, alloc_segments_map=%p", (void *)handle, (void *)handle->alloc_segments_map); return handle; -err_destroy_mutex: +err_destroy_ipc_info_allocator: + umf_ba_destroy(handle->ipc_info_allocator); +err_destroy_alloc_segments_map: for (int j = i; j >= 0; j--) { if (handle->alloc_segments_map[j]) { critnib_delete(handle->alloc_segments_map[j]); @@ -1137,5 +1277,9 @@ void umfMemoryTrackerDestroy(umf_memory_tracker_handle_t handle) { utils_mutex_destroy_not_free(&handle->splitMergeMutex); umf_ba_destroy(handle->alloc_info_allocator); handle->alloc_info_allocator = NULL; + critnib_delete(handle->ipc_segments_map); + handle->ipc_segments_map = NULL; + umf_ba_destroy(handle->ipc_info_allocator); + handle->ipc_info_allocator = NULL; umf_ba_global_free(handle); } diff --git a/src/provider/provider_tracking.h b/src/provider/provider_tracking.h index 9e868cf31..842449be5 100644 --- a/src/provider/provider_tracking.h +++ b/src/provider/provider_tracking.h @@ -45,6 +45,15 @@ typedef struct umf_alloc_info_t { umf_result_t umfMemoryTrackerGetAllocInfo(const void *ptr, umf_alloc_info_t *pAllocInfo); +typedef struct umf_ipc_info_t { + void *base; + size_t baseSize; + umf_memory_provider_handle_t provider; +} umf_ipc_info_t; + +umf_result_t umfMemoryTrackerGetIpcInfo(const void *ptr, + umf_ipc_info_t *pIpcInfo); + // Creates a memory provider that tracks each allocation/deallocation through umf_memory_tracker_handle_t and // forwards all requests to hUpstream memory Provider. hUpstream lifetime should be managed by the user of this function. umf_result_t umfTrackingMemoryProviderCreate( diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5f244b60e..e172115e1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -116,8 +116,9 @@ function(add_umf_test) # * NAME - a name of the test # * SRCS - source files # * LIBS - libraries to be linked with + # * ENVS - environment variables set(oneValueArgs NAME) - set(multiValueArgs SRCS LIBS) + set(multiValueArgs SRCS LIBS ENVS) cmake_parse_arguments( ARG "" @@ -139,6 +140,9 @@ function(add_umf_test) WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) set_tests_properties(${TEST_NAME} PROPERTIES LABELS "umf") + if(ARG_ENVS) + set_tests_properties(${TEST_NAME} PROPERTIES ENVIRONMENT ${ARG_ENVS}) + endif() if(WINDOWS) # add PATH to DLL on Windows @@ -524,6 +528,12 @@ add_umf_test( SRCS ipcAPI.cpp ${BA_SOURCES_FOR_TEST} LIBS ${UMF_UTILS_FOR_TEST}) +add_umf_test( + NAME ipc_max_opened_limit + SRCS ipcAPI.cpp ${BA_SOURCES_FOR_TEST} + LIBS ${UMF_UTILS_FOR_TEST} + ENVS "UMF_MAX_OPENED_IPC_HANDLES=10") + add_umf_test(NAME ipc_negative SRCS ipc_negative.cpp) function(add_umf_ipc_test) diff --git a/test/ipcFixtures.hpp b/test/ipcFixtures.hpp index 57bd04079..1fc57b900 100644 --- a/test/ipcFixtures.hpp +++ b/test/ipcFixtures.hpp @@ -68,6 +68,18 @@ using ipcTestParams = struct umfIpcTest : umf_test::test, ::testing::WithParamInterface { umfIpcTest() {} + size_t getOpenedIpcCacheSize() { + const char *max_size_str = getenv("UMF_MAX_OPENED_IPC_HANDLES"); + if (max_size_str) { + char *endptr; + size_t max_size = strtoul(max_size_str, &endptr, 10); + EXPECT_EQ(*endptr, '\0'); + if (*endptr == '\0') { + return max_size; + } + } + return 0; + } void SetUp() override { test::SetUp(); auto [pool_ops, pool_params_create, pool_params_destroy, provider_ops, @@ -80,6 +92,7 @@ struct umfIpcTest : umf_test::test, providerParamsCreate = provider_params_create; providerParamsDestroy = provider_params_destroy; memAccessor = accessor; + openedIpcCacheSize = getOpenedIpcCacheSize(); } void TearDown() override { test::TearDown(); } @@ -160,6 +173,7 @@ struct umfIpcTest : umf_test::test, umf_memory_provider_ops_t *providerOps = nullptr; pfnProviderParamsCreate providerParamsCreate = nullptr; pfnProviderParamsDestroy providerParamsDestroy = nullptr; + size_t openedIpcCacheSize = 0; void concurrentGetConcurrentPutHandles(bool shuffle) { std::vector ptrs; @@ -264,6 +278,156 @@ struct umfIpcTest : umf_test::test, pool.reset(nullptr); EXPECT_EQ(stat.putCount, stat.getCount); } + + void concurrentOpenConcurrentCloseHandles(bool shuffle) { + umf_result_t ret; + std::vector ptrs; + constexpr size_t ALLOC_SIZE = 100; + constexpr size_t NUM_POINTERS = 100; + umf::pool_unique_handle_t pool = makePool(); + ASSERT_NE(pool.get(), nullptr); + + for (size_t i = 0; i < NUM_POINTERS; ++i) { + void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE); + EXPECT_NE(ptr, nullptr); + ptrs.push_back(ptr); + } + + std::vector ipcHandles; + for (size_t i = 0; i < NUM_POINTERS; ++i) { + umf_ipc_handle_t ipcHandle; + size_t handleSize; + ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + ipcHandles.push_back(ipcHandle); + } + + std::array, NTHREADS> openedIpcHandles; + umf_ipc_handler_handle_t ipcHandler = nullptr; + ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + ASSERT_NE(ipcHandler, nullptr); + + umf_test::syncthreads_barrier syncthreads(NTHREADS); + + auto openHandlesFn = [shuffle, &ipcHandles, &openedIpcHandles, + &syncthreads, ipcHandler](size_t tid) { + // Each thread gets a copy of the pointers to shuffle them + std::vector localIpcHandles = ipcHandles; + if (shuffle) { + std::random_device rd; + std::mt19937 g(rd()); + std::shuffle(localIpcHandles.begin(), localIpcHandles.end(), g); + } + syncthreads(); + for (auto ipcHandle : localIpcHandles) { + void *ptr; + umf_result_t ret = + umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + openedIpcHandles[tid].push_back(ptr); + } + }; + + umf_test::parallel_exec(NTHREADS, openHandlesFn); + + auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) { + syncthreads(); + for (void *ptr : openedIpcHandles[tid]) { + umf_result_t ret = umfCloseIPCHandle(ptr); + EXPECT_EQ(ret, UMF_RESULT_SUCCESS); + } + }; + + umf_test::parallel_exec(NTHREADS, closeHandlesFn); + + for (auto ipcHandle : ipcHandles) { + ret = umfPutIPCHandle(ipcHandle); + EXPECT_EQ(ret, UMF_RESULT_SUCCESS); + } + + for (void *ptr : ptrs) { + ret = umfPoolFree(pool.get(), ptr); + EXPECT_EQ(ret, UMF_RESULT_SUCCESS); + } + + pool.reset(nullptr); + EXPECT_EQ(stat.getCount, stat.allocCount); + EXPECT_EQ(stat.putCount, stat.getCount); + EXPECT_EQ(stat.openCount, stat.allocCount); + EXPECT_EQ(stat.openCount, stat.closeCount); + } + + void concurrentOpenCloseHandles(bool shuffle) { + umf_result_t ret; + std::vector ptrs; + constexpr size_t ALLOC_SIZE = 100; + constexpr size_t NUM_POINTERS = 100; + umf::pool_unique_handle_t pool = makePool(); + ASSERT_NE(pool.get(), nullptr); + + for (size_t i = 0; i < NUM_POINTERS; ++i) { + void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE); + EXPECT_NE(ptr, nullptr); + ptrs.push_back(ptr); + } + + std::vector ipcHandles; + for (size_t i = 0; i < NUM_POINTERS; ++i) { + umf_ipc_handle_t ipcHandle; + size_t handleSize; + ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + ipcHandles.push_back(ipcHandle); + } + + umf_ipc_handler_handle_t ipcHandler = nullptr; + ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + ASSERT_NE(ipcHandler, nullptr); + + umf_test::syncthreads_barrier syncthreads(NTHREADS); + + auto openCloseHandlesFn = [shuffle, &ipcHandles, &syncthreads, + ipcHandler](size_t) { + // Each thread gets a copy of the pointers to shuffle them + std::vector localIpcHandles = ipcHandles; + if (shuffle) { + std::random_device rd; + std::mt19937 g(rd()); + std::shuffle(localIpcHandles.begin(), localIpcHandles.end(), g); + } + syncthreads(); + for (auto ipcHandle : localIpcHandles) { + void *ptr; + umf_result_t ret = + umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + ret = umfCloseIPCHandle(ptr); + EXPECT_EQ(ret, UMF_RESULT_SUCCESS); + } + }; + + umf_test::parallel_exec(NTHREADS, openCloseHandlesFn); + + for (auto ipcHandle : ipcHandles) { + ret = umfPutIPCHandle(ipcHandle); + EXPECT_EQ(ret, UMF_RESULT_SUCCESS); + } + + for (void *ptr : ptrs) { + ret = umfPoolFree(pool.get(), ptr); + EXPECT_EQ(ret, UMF_RESULT_SUCCESS); + } + + pool.reset(nullptr); + EXPECT_EQ(stat.getCount, stat.allocCount); + EXPECT_EQ(stat.putCount, stat.getCount); + if (openedIpcCacheSize == 0) { + EXPECT_EQ(stat.openCount, stat.allocCount); + } + EXPECT_EQ(stat.openCount, stat.closeCount); + } }; TEST_P(umfIpcTest, GetIPCHandleSize) { @@ -389,70 +553,6 @@ TEST_P(umfIpcTest, BasicFlow) { EXPECT_EQ(stat.closeCount, stat.openCount); } -TEST_P(umfIpcTest, GetPoolByOpenedHandle) { - constexpr size_t SIZE = 100; - constexpr size_t NUM_ALLOCS = 100; - constexpr size_t NUM_POOLS = 4; - void *ptrs[NUM_ALLOCS]; - void *openedPtrs[NUM_POOLS][NUM_ALLOCS]; - std::vector pools_to_open; - umf::pool_unique_handle_t pool = makePool(); - ASSERT_NE(pool.get(), nullptr); - - for (size_t i = 0; i < NUM_POOLS; ++i) { - pools_to_open.push_back(makePool()); - } - - for (size_t i = 0; i < NUM_ALLOCS; ++i) { - void *ptr = umfPoolMalloc(pool.get(), SIZE); - ASSERT_NE(ptr, nullptr); - ptrs[i] = ptr; - } - - for (size_t i = 0; i < NUM_ALLOCS; ++i) { - umf_ipc_handle_t ipcHandle = nullptr; - size_t handleSize = 0; - umf_result_t ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize); - ASSERT_EQ(ret, UMF_RESULT_SUCCESS); - - for (size_t pool_id = 0; pool_id < NUM_POOLS; pool_id++) { - void *ptr = nullptr; - umf_ipc_handler_handle_t ipcHandler = nullptr; - ret = - umfPoolGetIPCHandler(pools_to_open[pool_id].get(), &ipcHandler); - ASSERT_EQ(ret, UMF_RESULT_SUCCESS); - ASSERT_NE(ipcHandler, nullptr); - - ret = umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr); - ASSERT_EQ(ret, UMF_RESULT_SUCCESS); - openedPtrs[pool_id][i] = ptr; - } - - ret = umfPutIPCHandle(ipcHandle); - ASSERT_EQ(ret, UMF_RESULT_SUCCESS); - } - - for (size_t pool_id = 0; pool_id < NUM_POOLS; pool_id++) { - for (size_t i = 0; i < NUM_ALLOCS; ++i) { - umf_memory_pool_handle_t openedPool = - umfPoolByPtr(openedPtrs[pool_id][i]); - EXPECT_EQ(openedPool, pools_to_open[pool_id].get()); - } - } - - for (size_t pool_id = 0; pool_id < NUM_POOLS; pool_id++) { - for (size_t i = 0; i < NUM_ALLOCS; ++i) { - umf_result_t ret = umfCloseIPCHandle(openedPtrs[pool_id][i]); - EXPECT_EQ(ret, UMF_RESULT_SUCCESS); - } - } - - for (size_t i = 0; i < NUM_ALLOCS; ++i) { - umf_result_t ret = umfFree(ptrs[i]); - EXPECT_EQ(ret, UMF_RESULT_SUCCESS); - } -} - TEST_P(umfIpcTest, AllocFreeAllocTest) { constexpr size_t SIZE = 64 * 1024; umf::pool_unique_handle_t pool = makePool(); @@ -593,75 +693,20 @@ TEST_P(umfIpcTest, ConcurrentGetPutHandlesShuffled) { concurrentGetPutHandles(true); } -TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) { - umf_result_t ret; - std::vector ptrs; - constexpr size_t ALLOC_SIZE = 100; - constexpr size_t NUM_POINTERS = 100; - umf::pool_unique_handle_t pool = makePool(); - ASSERT_NE(pool.get(), nullptr); - - for (size_t i = 0; i < NUM_POINTERS; ++i) { - void *ptr = umfPoolMalloc(pool.get(), ALLOC_SIZE); - EXPECT_NE(ptr, nullptr); - ptrs.push_back(ptr); - } - - std::array ipcHandles; - for (size_t i = 0; i < NUM_POINTERS; ++i) { - umf_ipc_handle_t ipcHandle; - size_t handleSize; - ret = umfGetIPCHandle(ptrs[i], &ipcHandle, &handleSize); - ASSERT_EQ(ret, UMF_RESULT_SUCCESS); - ipcHandles[i] = ipcHandle; - } - - std::array, NTHREADS> openedIpcHandles; - umf_ipc_handler_handle_t ipcHandler = nullptr; - ret = umfPoolGetIPCHandler(pool.get(), &ipcHandler); - ASSERT_EQ(ret, UMF_RESULT_SUCCESS); - ASSERT_NE(ipcHandler, nullptr); - - umf_test::syncthreads_barrier syncthreads(NTHREADS); - - auto openHandlesFn = [&ipcHandles, &openedIpcHandles, &syncthreads, - ipcHandler](size_t tid) { - syncthreads(); - for (auto ipcHandle : ipcHandles) { - void *ptr; - umf_result_t ret = umfOpenIPCHandle(ipcHandler, ipcHandle, &ptr); - ASSERT_EQ(ret, UMF_RESULT_SUCCESS); - openedIpcHandles[tid].push_back(ptr); - } - }; - - umf_test::parallel_exec(NTHREADS, openHandlesFn); - - auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) { - syncthreads(); - for (void *ptr : openedIpcHandles[tid]) { - umf_result_t ret = umfCloseIPCHandle(ptr); - EXPECT_EQ(ret, UMF_RESULT_SUCCESS); - } - }; - - umf_test::parallel_exec(NTHREADS, closeHandlesFn); +TEST_P(umfIpcTest, ConcurrentOpenConcurrentCloseHandles) { + concurrentOpenConcurrentCloseHandles(false); +} - for (auto ipcHandle : ipcHandles) { - ret = umfPutIPCHandle(ipcHandle); - EXPECT_EQ(ret, UMF_RESULT_SUCCESS); - } +TEST_P(umfIpcTest, ConcurrentOpenConcurrentCloseHandlesShuffled) { + concurrentOpenConcurrentCloseHandles(true); +} - for (void *ptr : ptrs) { - ret = umfPoolFree(pool.get(), ptr); - EXPECT_EQ(ret, UMF_RESULT_SUCCESS); - } +TEST_P(umfIpcTest, ConcurrentOpenCloseHandles) { + concurrentOpenCloseHandles(false); +} - pool.reset(nullptr); - EXPECT_EQ(stat.getCount, stat.allocCount); - EXPECT_EQ(stat.putCount, stat.getCount); - EXPECT_EQ(stat.openCount, stat.allocCount); - EXPECT_EQ(stat.openCount, stat.closeCount); +TEST_P(umfIpcTest, ConcurrentOpenCloseHandlesShuffled) { + concurrentOpenCloseHandles(true); } TEST_P(umfIpcTest, ConcurrentDestroyIpcHandlers) { diff --git a/test/supp/drd-test_ipc_max_opened_limit.supp b/test/supp/drd-test_ipc_max_opened_limit.supp new file mode 100644 index 000000000..fbdbd0183 --- /dev/null +++ b/test/supp/drd-test_ipc_max_opened_limit.supp @@ -0,0 +1,34 @@ +{ + Conditional variable destruction false-positive + drd:CondErr + ... + fun:pthread_cond_destroy@* + ... +} + +{ + [false-positive] Double check locking pattern in trackingOpenIpcHandle + drd:ConflictingAccess + fun:utils_atomic_load_acquire_ptr + fun:trackingOpenIpcHandle + fun:umfMemoryProviderOpenIPCHandle + fun:umfOpenIPCHandle + ... +} + +{ + [false-positive] trackingGetIpcHandle + drd:ConflictingAccess + fun:trackingGetIpcHandle + fun:umfMemoryProviderGetIPCHandle + fun:umfGetIPCHandle +} + +{ + [false-positive] trackingGetIpcHandle + drd:ConflictingAccess + fun:memmove + fun:trackingGetIpcHandle + fun:umfMemoryProviderGetIPCHandle + fun:umfGetIPCHandle +} diff --git a/test/supp/drd-test_provider_devdax_memory_ipc.supp b/test/supp/drd-test_provider_devdax_memory_ipc.supp index f6f12aa1e..31608d30c 100644 --- a/test/supp/drd-test_provider_devdax_memory_ipc.supp +++ b/test/supp/drd-test_provider_devdax_memory_ipc.supp @@ -2,6 +2,17 @@ [false-positive] Double check locking pattern in trackingOpenIpcHandle drd:ConflictingAccess fun:utils_atomic_store_release_ptr + fun:upstreamOpenIPCHandle + fun:trackingOpenIpcHandle + fun:umfMemoryProviderOpenIPCHandle + fun:umfOpenIPCHandle + ... +} + +{ + [false-positive] Double check locking pattern in trackingOpenIpcHandle + drd:ConflictingAccess + fun:utils_atomic_load_acquire_ptr fun:trackingOpenIpcHandle fun:umfMemoryProviderOpenIPCHandle fun:umfOpenIPCHandle diff --git a/test/supp/drd-test_provider_file_memory_ipc.supp b/test/supp/drd-test_provider_file_memory_ipc.supp index 72fd6d87c..9883001f7 100644 --- a/test/supp/drd-test_provider_file_memory_ipc.supp +++ b/test/supp/drd-test_provider_file_memory_ipc.supp @@ -10,6 +10,17 @@ [false-positive] Double check locking pattern in trackingOpenIpcHandle drd:ConflictingAccess fun:utils_atomic_store_release_ptr + fun:upstreamOpenIPCHandle + fun:trackingOpenIpcHandle + fun:umfMemoryProviderOpenIPCHandle + fun:umfOpenIPCHandle + ... +} + +{ + [false-positive] Double check locking pattern in trackingOpenIpcHandle + drd:ConflictingAccess + fun:utils_atomic_load_acquire_ptr fun:trackingOpenIpcHandle fun:umfMemoryProviderOpenIPCHandle fun:umfOpenIPCHandle diff --git a/test/supp/drd-test_provider_os_memory.supp b/test/supp/drd-test_provider_os_memory.supp index f6f12aa1e..31608d30c 100644 --- a/test/supp/drd-test_provider_os_memory.supp +++ b/test/supp/drd-test_provider_os_memory.supp @@ -2,6 +2,17 @@ [false-positive] Double check locking pattern in trackingOpenIpcHandle drd:ConflictingAccess fun:utils_atomic_store_release_ptr + fun:upstreamOpenIPCHandle + fun:trackingOpenIpcHandle + fun:umfMemoryProviderOpenIPCHandle + fun:umfOpenIPCHandle + ... +} + +{ + [false-positive] Double check locking pattern in trackingOpenIpcHandle + drd:ConflictingAccess + fun:utils_atomic_load_acquire_ptr fun:trackingOpenIpcHandle fun:umfMemoryProviderOpenIPCHandle fun:umfOpenIPCHandle diff --git a/test/supp/helgrind-test_ipc_max_opened_limit.supp b/test/supp/helgrind-test_ipc_max_opened_limit.supp new file mode 100644 index 000000000..04f3a9199 --- /dev/null +++ b/test/supp/helgrind-test_ipc_max_opened_limit.supp @@ -0,0 +1,53 @@ +{ + False-positive race in critnib_insert (lack of instrumentation) + Helgrind:Race + fun:utils_atomic_store_release_ptr + fun:critnib_insert + ... +} + +{ + False-positive race in critnib_find (lack of instrumentation) + Helgrind:Race + fun:find_predecessor + fun:find_le + fun:critnib_find + ... +} + +{ + [false-positive] Double check locking pattern in trackingOpenIpcHandle + Helgrind:Race + fun:utils_atomic_store_release_ptr + fun:trackingOpenIpcHandle + fun:umfMemoryProviderOpenIPCHandle + fun:umfOpenIPCHandle + ... +} + +{ + [false-positive] Double check locking pattern in trackingOpenIpcHandle + Helgrind:Race + fun:utils_atomic_load_acquire_ptr + fun:trackingOpenIpcHandle + fun:umfMemoryProviderOpenIPCHandle + fun:umfOpenIPCHandle + ... +} + +{ + [false-positive] umfMemoryProviderGetIPCHandle + Helgrind:Race + fun:trackingGetIpcHandle + fun:umfMemoryProviderGetIPCHandle + fun:umfGetIPCHandle +} + +{ + [false-positive] umfMemoryProviderGetIPCHandle + Helgrind:Race + fun:memmove + fun:trackingGetIpcHandle + fun:umfMemoryProviderGetIPCHandle + fun:umfGetIPCHandle +} diff --git a/test/supp/helgrind-test_provider_devdax_memory_ipc.supp b/test/supp/helgrind-test_provider_devdax_memory_ipc.supp index 4bc776f43..63e7d626c 100644 --- a/test/supp/helgrind-test_provider_devdax_memory_ipc.supp +++ b/test/supp/helgrind-test_provider_devdax_memory_ipc.supp @@ -2,6 +2,17 @@ [false-positive] Double check locking pattern in trackingOpenIpcHandle Helgrind:Race fun:utils_atomic_store_release_ptr + fun:upstreamOpenIPCHandle + fun:trackingOpenIpcHandle + fun:umfMemoryProviderOpenIPCHandle + fun:umfOpenIPCHandle + ... +} + +{ + [false-positive] Double check locking pattern in trackingOpenIpcHandle + Helgrind:Race + fun:utils_atomic_load_acquire_ptr fun:trackingOpenIpcHandle fun:umfMemoryProviderOpenIPCHandle fun:umfOpenIPCHandle diff --git a/test/supp/helgrind-test_provider_file_memory_ipc.supp b/test/supp/helgrind-test_provider_file_memory_ipc.supp index de22665f5..11791e4ed 100644 --- a/test/supp/helgrind-test_provider_file_memory_ipc.supp +++ b/test/supp/helgrind-test_provider_file_memory_ipc.supp @@ -2,6 +2,7 @@ [false-positive] Double check locking pattern in trackingOpenIpcHandle Helgrind:Race fun:utils_atomic_store_release_ptr + fun:upstreamOpenIPCHandle fun:trackingOpenIpcHandle fun:umfMemoryProviderOpenIPCHandle fun:umfOpenIPCHandle diff --git a/test/supp/helgrind-test_provider_os_memory.supp b/test/supp/helgrind-test_provider_os_memory.supp index 4bc776f43..63e7d626c 100644 --- a/test/supp/helgrind-test_provider_os_memory.supp +++ b/test/supp/helgrind-test_provider_os_memory.supp @@ -2,6 +2,17 @@ [false-positive] Double check locking pattern in trackingOpenIpcHandle Helgrind:Race fun:utils_atomic_store_release_ptr + fun:upstreamOpenIPCHandle + fun:trackingOpenIpcHandle + fun:umfMemoryProviderOpenIPCHandle + fun:umfOpenIPCHandle + ... +} + +{ + [false-positive] Double check locking pattern in trackingOpenIpcHandle + Helgrind:Race + fun:utils_atomic_load_acquire_ptr fun:trackingOpenIpcHandle fun:umfMemoryProviderOpenIPCHandle fun:umfOpenIPCHandle