diff --git a/node_discovery/private/src/node_discovery.c b/node_discovery/private/src/node_discovery.c index 9f96547..f7e81a5 100644 --- a/node_discovery/private/src/node_discovery.c +++ b/node_discovery/private/src/node_discovery.c @@ -268,6 +268,7 @@ celix_status_t node_discovery_removeNode(node_discovery_pt node_discovery, node_ char* wireId = properties_get(wep->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); if (strcmp(wireId, rmWireId) == 0) { + printf("NODE_DISCOVERY: Removing Wiring Endpoint %s - %s\n", node_desc->nodeId, rmWireId); node_discovery_informWiringEndpointListeners(node_discovery, wep, false); arrayListIterator_remove(wep_it); } @@ -298,6 +299,7 @@ celix_status_t node_discovery_informWiringEndpointListeners(node_discovery_pt no if (status == CELIX_SUCCESS) { if (node_discovery->listenerReferences != NULL) { hash_map_iterator_pt iter = hashMapIterator_create(node_discovery->listenerReferences); + while (hashMapIterator_hasNext(iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); @@ -305,16 +307,14 @@ celix_status_t node_discovery_informWiringEndpointListeners(node_discovery_pt no wiring_endpoint_listener_pt listener = NULL; char* scope = NULL; - char* rsa = NULL; serviceReference_getProperty(reference, (char *) INAETICS_WIRING_ENDPOINT_LISTENER_SCOPE, &scope); - serviceReference_getProperty(reference, (char *) "RSA", &rsa); filter_pt filter = filter_create(scope); bool matchResult = false; filter_match(filter, wEndpoint->properties, &matchResult); - if (matchResult && rsa == NULL) { + if (matchResult) { bundleContext_getService(node_discovery->context, reference, (void**) &listener); if (wEndpointAdded) { listener->wiringEndpointAdded(listener->handle, wEndpoint, scope); @@ -422,8 +422,10 @@ celix_status_t node_discovery_wiringEndpointListenerAdded(void * handle, service node_discovery_pt nodeDiscovery = handle; - char *nodeDiscoveryListener = NULL; + char* nodeDiscoveryListener = NULL; + serviceReference_getProperty(reference, "NODE_DISCOVERY", &nodeDiscoveryListener); + char *scope = NULL; serviceReference_getProperty(reference, (char *) INAETICS_WIRING_ENDPOINT_LISTENER_SCOPE, &scope); @@ -447,6 +449,7 @@ celix_status_t node_discovery_wiringEndpointListenerAdded(void * handle, service bool matchResult = false; filter_match(filter, ep_desc->properties, &matchResult); + if (matchResult) { wiring_endpoint_listener_pt listener = service; listener->wiringEndpointAdded(listener->handle, ep_desc, NULL); diff --git a/node_discovery/public/include/node_discovery.h b/node_discovery/public/include/node_discovery.h index ba0dc86..b345f27 100644 --- a/node_discovery/public/include/node_discovery.h +++ b/node_discovery/public/include/node_discovery.h @@ -10,7 +10,7 @@ #define NODE_DISCOVERY_ZONE_IDENTIFIER "NODE_DISCOVERY_ZONE_IDENTIFIER" #define NODE_DISCOVERY_NODE_IDENTIFIER "NODE_DISCOVERY_NODE_IDENTIFIER" -typedef struct node_discovery *node_discovery_pt; +typedef struct node_discovery* node_discovery_pt; #endif /* NODE_DISCOVERY_H_ */ diff --git a/remote_service_admin/private/src/import_registration_impl.c b/remote_service_admin/private/src/import_registration_impl.c index 805bcc8..2c4d0d8 100644 --- a/remote_service_admin/private/src/import_registration_impl.c +++ b/remote_service_admin/private/src/import_registration_impl.c @@ -5,6 +5,7 @@ #include #include +#include #include "celix_errno.h" diff --git a/remote_service_admin_inaetics/private/src/remote_service_admin_activator.c b/remote_service_admin_inaetics/private/src/remote_service_admin_activator.c index 18b49bb..813f48c 100644 --- a/remote_service_admin_inaetics/private/src/remote_service_admin_activator.c +++ b/remote_service_admin_inaetics/private/src/remote_service_admin_activator.c @@ -3,6 +3,7 @@ */ #include +#include #include "bundle_activator.h" #include "constants.h" @@ -70,6 +71,11 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) if (status != CELIX_SUCCESS) { printf("RSA: Creation of WTMTracker failed\n"); } else { + + /* the rsa also has a wiringEndpointListener because it needs to be informed, when a + * wiring endpoint has been sucessfully exported/imported and therefore it + * the according services can be exported/imported. + */ wEndpointListener->handle = (void*) activator->admin; wEndpointListener->wiringEndpointAdded = remoteServiceAdmin_addWiringEndpoint; wEndpointListener->wiringEndpointRemoved = remoteServiceAdmin_removeWiringEndpoint; diff --git a/remote_service_admin_inaetics/private/src/remote_service_admin_impl.c b/remote_service_admin_inaetics/private/src/remote_service_admin_impl.c index 91ceb3a..a6a3399 100644 --- a/remote_service_admin_inaetics/private/src/remote_service_admin_impl.c +++ b/remote_service_admin_inaetics/private/src/remote_service_admin_impl.c @@ -71,7 +71,7 @@ celix_status_t remoteServiceAdmin_create(bundle_context_pt context, remote_servi arrayList_create(&(*admin)->wtmList); (*admin)->exportedServices = hashMap_create(NULL, NULL, NULL, NULL); - (*admin)->importedServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->importedServices = hashMap_create(NULL, NULL, NULL, NULL); (*admin)->wiringReceiveServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*admin)->wiringReceiveServiceRegistrations = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*admin)->sendServicesTracker = NULL; @@ -190,6 +190,7 @@ static celix_status_t remoteServiceAdmin_wireIdEquals(void *a, void *b, bool *eq return CELIX_SUCCESS; } + static celix_status_t remoteServiceAdmin_receive(void* handle, char* data, char**response) { celix_status_t status = CELIX_ILLEGAL_ARGUMENT; remote_service_admin_pt admin = (remote_service_admin_pt) handle; @@ -256,12 +257,12 @@ celix_status_t remoteServiceAdmin_addWiringEndpoint(void *handle, wiring_endpoin if (status == CELIX_SUCCESS) { wireId = properties_get(wEndpoint->properties, (char*) WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); wireUuid = properties_get(wEndpoint->properties, (char*) OSGI_RSA_ENDPOINT_FRAMEWORK_UUID); - exportServiceId = properties_get(wEndpoint->properties, (char*) "requested.service.id"); - printf("RSA: callback received - wire available for serviceId %s\n", exportServiceId); + /* added wiring enpoint is used for export */ + if (wireUuid != NULL && strcmp(ownUuid, wireUuid) == 0) { + exportServiceId = properties_get(wEndpoint->properties, (char*) "requested.service.id"); + printf("RSA: exported wire available %s for serviceId %s\n", wireUuid, exportServiceId); - //if uuid = own fw uuid -> exported, otherwise imported - if (strcmp(ownUuid, wireUuid) == 0) { if (!arrayList_contains(admin->exportedWires, wireId)) { arrayList_add(admin->exportedWires, wireId); } @@ -307,6 +308,36 @@ celix_status_t remoteServiceAdmin_addWiringEndpoint(void *handle, wiring_endpoin status = celixThreadMutex_unlock(&admin->exportedServicesLock); } + /* added wiring enpoint is used for import */ + else { + char* id = properties_get(wEndpoint->properties, "requested.service"); + printf("RSA: imported wiring endpoint available - wire %s service %s\n", wireUuid, id); + + celixThreadMutex_lock(&admin->importedServicesLock); + + if (hashMap_size(admin->importedServices) == 0) { + printf("RSA: No imported services found!\n"); + } + + hash_map_iterator_pt importedServicesIterator = hashMapIterator_create(admin->importedServices); + while (hashMapIterator_hasNext(importedServicesIterator)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(importedServicesIterator); + endpoint_description_pt endpointDescription = hashMapEntry_getKey(entry); + + if (strcmp(id, endpointDescription->id) == 0) { + import_registration_factory_pt registration_factory = hashMapEntry_getValue(entry); + + // we create an importRegistration per imported service + registration_factory->trackedFactory->registerProxyService(registration_factory->trackedFactory->factory, endpointDescription, admin, (sendToHandle) &remoteServiceAdmin_send); + + printf("RSA: proxyService registered for %s w/ wireId %s.", endpointDescription->service, wireId); + } + } + + hashMapIterator_destroy(importedServicesIterator); + + celixThreadMutex_unlock(&admin->importedServicesLock); + } } return status; @@ -318,6 +349,9 @@ celix_status_t remoteServiceAdmin_removeWiringEndpoint(void *handle, wiring_endp remote_service_admin_pt admin = (remote_service_admin_pt) handle; char* wireId = properties_get(wEndpoint->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); + printf("RSA: Removing wiring endpoint w/ wireId %s\n", wireId); + + /* handle exported Wires */ array_list_pt exportRegistrationList = NULL; arrayList_create(&exportRegistrationList); @@ -363,6 +397,68 @@ celix_status_t remoteServiceAdmin_removeWiringEndpoint(void *handle, wiring_endp status = remoteServiceAdmin_unregisterReceive(admin, wireId); + + /* handle imported Wires */ + celixThreadMutex_lock(&admin->importedServicesLock); + + if (hashMap_size(admin->importedServices) == 0) { + printf("RSA: No imported services found!\n"); + } + + hash_map_iterator_pt importedServicesIterator = hashMapIterator_create(admin->importedServices); + while (hashMapIterator_hasNext(importedServicesIterator)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(importedServicesIterator); + endpoint_description_pt endpointDescription = hashMapEntry_getKey(entry); + + char* regWireId = properties_get(endpointDescription->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); + + if (regWireId == NULL) { + printf("RSA: No wireId found in endpointDescription for %s\n", endpointDescription->service); + } + else if (strcmp(wireId, regWireId) == 0) { + + /* TODO: registration handling missing - combine w/ removeImportedService */ + import_registration_factory_pt registration_factory = hashMapEntry_getValue(entry); + + // unregister proxy service per wireId + registration_factory->trackedFactory->unregisterProxyService(registration_factory->trackedFactory->factory, endpointDescription); + + // search for registrations + import_registration_pt registration = NULL; + unsigned int size = arrayList_size(registration_factory->registrations); + unsigned int i = 0; + + for(; i < size; ++i) { + registration = arrayList_get(registration_factory->registrations, i); + + if (registration->endpointDescription == endpointDescription) { + arrayList_removeElement(registration_factory->registrations, registration); + importRegistration_destroy(registration); + break; + } + } + + + if (arrayList_isEmpty(registration_factory->registrations)) { + logHelper_log(admin->loghelper, OSGI_LOGSERVICE_INFO, "RSA: closing proxy of service %s", endpointDescription->service); + + serviceTracker_close(registration_factory->proxyFactoryTracker); + importRegistrationFactory_close(registration_factory); + + importRegistrationFactory_destroy(®istration_factory); + } + + hashMap_remove(admin->importedServices, endpointDescription); + + printf("RSA: proxyService unregistered for %s w/ wireId %s.", endpointDescription->service, wireId); + } + } + + hashMapIterator_destroy(importedServicesIterator); + + celixThreadMutex_unlock(&admin->importedServicesLock); + + return status; } @@ -404,7 +500,6 @@ static celix_status_t remoteServiceAdmin_registerReceive(remote_service_admin_pt static celix_status_t remoteServiceAdmin_unregisterReceive(remote_service_admin_pt admin, char* wireId) { celix_status_t status = CELIX_SUCCESS; - printf("RSA: unregisterReceive w/ wireId %s\n", wireId); service_registration_pt wiringReceiveServiceRegistration = hashMap_remove(admin->wiringReceiveServiceRegistrations, wireId); wiring_receive_service_pt wiringReceiveService = hashMap_remove(admin->wiringReceiveServices, wireId); @@ -421,6 +516,7 @@ static celix_status_t remoteServiceAdmin_unregisterReceive(remote_service_admin_ serviceReference_getProperty(reference, (char*) INAETICS_WIRING_WIRE_ID, &serviceWireId); if (strcmp(wireId, serviceWireId) == 0) { + printf("RSA: unregisterReceive w/ wireId %s\n", wireId); status = serviceRegistration_unregister(wiringReceiveServiceRegistration); } } @@ -636,8 +732,6 @@ celix_status_t remoteServiceAdmin_installEndpoint(remote_service_admin_pt admin, properties_set(endpointProperties, (char*) OSGI_RSA_SERVICE_IMPORTED, "true"); properties_set(endpointProperties, (char*) OSGI_RSA_SERVICE_IMPORTED_CONFIGS, (char*) CONFIGURATION_TYPE); - printf("RSA: INSTALL ENDPOINT w/ UUID %s\n", endpoint_uuid); - endpoint_description_pt endpointDescription = NULL; remoteServiceAdmin_createEndpointDescription(admin, reference, endpointProperties, interface, &endpointDescription); @@ -703,55 +797,73 @@ celix_status_t remoteServiceAdmin_importService(remote_service_admin_pt admin, e printf("RSA: Missing WireId for service %s\n", endpointDescription->service); status = CELIX_SERVICE_EXCEPTION; } else { + printf("RSA: Import service %s w/ wireId %s\n", endpointDescription->service, wireId); + + import_registration_factory_pt registration_factory = NULL; + + hash_map_iterator_pt iter = hashMapIterator_create(admin->importedServices); + while (hashMapIterator_hasNext(iter) && registration_factory == NULL) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + endpoint_description_pt importedEndpointDescription = (endpoint_description_pt) hashMapEntry_getKey(entry); + + if (strcmp(importedEndpointDescription->service, endpointDescription->service) == 0) { + registration_factory = (import_registration_factory_pt) hashMapEntry_getValue(entry); + } + } - import_registration_factory_pt registration_factory = (import_registration_factory_pt) hashMap_get(admin->importedServices, endpointDescription->service); + hashMapIterator_destroy(iter); // check whether we already have a registration_factory registered in the hashmap if (registration_factory == NULL) { status = importRegistrationFactory_install(admin->loghelper, endpointDescription->service, admin->context, ®istration_factory); - if (status == CELIX_SUCCESS) { - hashMap_put(admin->importedServices, endpointDescription->service, registration_factory); + if (status != CELIX_SUCCESS) { + printf("RSA: Could not install importRegistrationFactory for %s\n", endpointDescription->service); } } - array_list_pt localWTMs = NULL; - remoteServiceAdmin_getWTMs(admin, &localWTMs); - int size = arrayList_size(localWTMs); - if (size == 0) { - printf("RSA: No WTM available yet.\n"); - } - int iter; - for (iter = 0; iter < size; iter++) { - wiring_topology_manager_service_pt wtmService = arrayList_get(localWTMs, iter); - properties_pt rsaProperties = properties_create(); + if (registration_factory != NULL) { + array_list_pt localWTMs = NULL; + remoteServiceAdmin_getWTMs(admin, &localWTMs); - properties_set(rsaProperties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY, wireId); - if (wtmService->importWiringEndpoint(wtmService->manager, rsaProperties) == CELIX_SUCCESS) { + importRegistration_create(endpointDescription, admin, (sendToHandle) &remoteServiceAdmin_send, admin->context, registration); + arrayList_add(registration_factory->registrations, *registration); - // factory available - if (status != CELIX_SUCCESS || (registration_factory->trackedFactory == NULL)) { - logHelper_log(admin->loghelper, OSGI_LOGSERVICE_WARNING, "RSA: no proxyFactory available."); - if (status == CELIX_SUCCESS) { - status = CELIX_SERVICE_EXCEPTION; - } - } else { - // we create an importRegistration per imported service - importRegistration_create(endpointDescription, admin, (sendToHandle) &remoteServiceAdmin_send, admin->context, registration); - registration_factory->trackedFactory->registerProxyService(registration_factory->trackedFactory->factory, endpointDescription, admin, (sendToHandle) &remoteServiceAdmin_send); + hashMap_put(admin->importedServices, endpointDescription, registration_factory); - arrayList_add(registration_factory->registrations, *registration); - } + int size = arrayList_size(localWTMs); + if (size == 0) { + printf("RSA: No WTM available yet.\n"); + } + + + int iter; + for (iter = 0; iter < size; iter++) { + wiring_topology_manager_service_pt wtmService = arrayList_get(localWTMs, iter); + properties_pt rsaProperties = properties_create(); + + properties_set(rsaProperties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY, wireId); + properties_set(rsaProperties, "requested.service", endpointDescription->id); + + // TODO: move locking + celixThreadMutex_unlock(&admin->importedServicesLock); + wtmService->importWiringEndpoint(wtmService->manager, rsaProperties) ; + celixThreadMutex_lock(&admin->importedServicesLock); } } + } celixThreadMutex_unlock(&admin->importedServicesLock); + /* we should return SUCCESS here, otherwise the TPM will not add it to + * its list of importedRegistrations and cannot remove it later on + */ return status; } + celix_status_t remoteServiceAdmin_removeImportedService(remote_service_admin_pt admin, import_registration_pt registration) { celix_status_t status = CELIX_SUCCESS; endpoint_description_pt endpointDescription = (endpoint_description_pt) registration->endpointDescription; @@ -759,7 +871,18 @@ celix_status_t remoteServiceAdmin_removeImportedService(remote_service_admin_pt celixThreadMutex_lock(&admin->importedServicesLock); - registration_factory = (import_registration_factory_pt) hashMap_get(admin->importedServices, endpointDescription->service); + /* get according registration factory */ + hash_map_iterator_pt importedServicesIterator = hashMapIterator_create(admin->importedServices); + while (hashMapIterator_hasNext(importedServicesIterator) && registration_factory == NULL) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(importedServicesIterator); + endpoint_description_pt importedEndpointDescription = hashMapEntry_getKey(entry); + + if (strcmp(importedEndpointDescription->id, endpointDescription->id) == 0) { + registration_factory = (import_registration_factory_pt) hashMapEntry_getValue(entry); + } + } + + hashMapIterator_destroy(importedServicesIterator); // factory available if ((registration_factory == NULL) || (registration_factory->trackedFactory == NULL)) { @@ -776,14 +899,17 @@ celix_status_t remoteServiceAdmin_removeImportedService(remote_service_admin_pt serviceTracker_close(registration_factory->proxyFactoryTracker); importRegistrationFactory_close(registration_factory); - hashMap_remove(admin->importedServices, endpointDescription->service); - importRegistrationFactory_destroy(®istration_factory); } + + hashMap_remove(admin->importedServices, endpointDescription); + } celixThreadMutex_unlock(&admin->importedServicesLock); + + return status; } @@ -811,11 +937,17 @@ celix_status_t remoteServiceAdmin_send(remote_service_admin_pt admin, endpoint_d json_error_t jsonError; json_request = json_loads(request, 0, &jsonError); - root = json_pack("{s:i, s:o}", "service.id", endpointDescription->serviceId, "request", json_request); + long serviceId = endpointDescription->serviceId; + root = json_pack("{s:i, s:o}", "service.id", serviceId, "request", json_request); char *json_data = json_dumps(root, 0); status = wiringSendService->send(wiringSendService, json_data, reply, replyStatus); + if (status != CELIX_SUCCESS || *reply == NULL) { + printf("RSA: wireSendService->send of wireId %s return no success\n", wireId); + } + + free(json_data); json_decref(root); } @@ -904,7 +1036,7 @@ static celix_status_t remoteServiceAdmin_sendServiceRemoved(void * handle, servi wiring_send_service_pt wiringSendService = (wiring_send_service_pt) service; char* wireId = properties_get(wiringSendService->wiringEndpointDescription->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); - printf("RSA: remove Wiring Endpoint w/ wireId %s\n", wireId); + printf("RSA: Send Service Removed for wireId %s\n", wireId); status = celixThreadMutex_lock(&admin->sendServicesLock); @@ -913,6 +1045,7 @@ static celix_status_t remoteServiceAdmin_sendServiceRemoved(void * handle, servi status = celixThreadMutex_unlock(&admin->sendServicesLock); } + return status; } diff --git a/wiring_admin/private/src/wiring_admin_impl.c b/wiring_admin/private/src/wiring_admin_impl.c index 13fa144..6b22fec 100644 --- a/wiring_admin/private/src/wiring_admin_impl.c +++ b/wiring_admin/private/src/wiring_admin_impl.c @@ -56,6 +56,7 @@ static celix_status_t wiringAdmin_wiringReceiveRemoved(void * handle, service_re static celix_status_t wiringAdmin_send(wiring_send_service_pt sendService, char *request, char **reply, int* replyStatus); + celix_status_t wiringAdmin_create(bundle_context_pt context, wiring_admin_pt *admin) { celix_status_t status = CELIX_SUCCESS; @@ -547,8 +548,8 @@ static celix_status_t wiringAdmin_send(wiring_send_service_pt sendService, char curl_easy_setopt(curl, CURLOPT_READDATA, &post); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, wiringAdmin_HTTPReqWrite); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&get); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 5L); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10L); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 2L); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2L); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (curl_off_t)post.size); res = curl_easy_perform(curl); @@ -562,7 +563,6 @@ static celix_status_t wiringAdmin_send(wiring_send_service_pt sendService, char } curl_easy_cleanup(curl); - } return status; diff --git a/wiring_topology_manager/private/include/wiring_topology_manager_impl.h b/wiring_topology_manager/private/include/wiring_topology_manager_impl.h index e49875b..a2a3961 100644 --- a/wiring_topology_manager/private/include/wiring_topology_manager_impl.h +++ b/wiring_topology_manager/private/include/wiring_topology_manager_impl.h @@ -26,6 +26,7 @@ struct wiring_topology_manager { hash_map_pt exportedWiringEndpoints; array_list_pt waitingForExport; + array_list_pt waitingForImport; celix_thread_mutex_t importedWiringEndpointsLock; hash_map_pt importedWiringEndpoints; @@ -65,6 +66,8 @@ celix_status_t wiringTopologyManager_importWiringEndpoint(wiring_topology_manage celix_status_t wiringTopologyManager_removeImportedWiringEndpoint(wiring_topology_manager_pt manager, properties_pt properties); celix_status_t wiringTopologyManager_WiringAdminServiceExportWiringEndpoint(wiring_topology_manager_pt manager, wiring_admin_service_pt wiringAdminService, properties_pt srvcProperties, wiring_endpoint_description_pt* wEndpoint); -celix_status_t wiringTopologyManager_WiringAdminServiceImportWiringEndpoint(wiring_topology_manager_pt manager, wiring_admin_service_pt wiringAdminService, wiring_endpoint_description_pt wEndpoint); +celix_status_t wiringTopologyManager_checkWiringAdminForImportWiringEndpoint(wiring_topology_manager_pt manager, wiring_admin_service_pt wiringAdminService, wiring_endpoint_description_pt wEndpoint); +celix_status_t wiringTopologyManager_checkWiringEndpointForImportService(wiring_topology_manager_pt manager, wiring_endpoint_description_pt wiringEndpointDesc, properties_pt requiredProperties); +celix_status_t wiringTopologyManager_checkWaitingForImportServices(wiring_topology_manager_pt manager); #endif /* WIRING_TOPOLOGY_MANAGER_IMPL_H_ */ diff --git a/wiring_topology_manager/private/src/wtm.c b/wiring_topology_manager/private/src/wtm.c index e4aefe2..8e940e2 100644 --- a/wiring_topology_manager/private/src/wtm.c +++ b/wiring_topology_manager/private/src/wtm.c @@ -6,7 +6,6 @@ #include #include -#include "bundle_context.h" #include "constants.h" #include "module.h" #include "bundle.h" @@ -30,7 +29,6 @@ bool properties_match(properties_pt properties, properties_pt reference); celix_status_t wiringTopologyManager_WiringAdminServiceExportWiringEndpoint(wiring_topology_manager_pt manager, wiring_admin_service_pt wiringAdminService, properties_pt srvcProperties, wiring_endpoint_description_pt* wEndpoint); -celix_status_t wiringTopologyManager_WiringAdminServiceImportWiringEndpoint(wiring_topology_manager_pt manager, wiring_admin_service_pt wiringAdminService, wiring_endpoint_description_pt wEndpoint); celix_status_t wiringTopologyManager_create(bundle_context_pt context, wiring_topology_manager_pt *manager) { celix_status_t status = CELIX_SUCCESS; @@ -44,6 +42,7 @@ celix_status_t wiringTopologyManager_create(bundle_context_pt context, wiring_to arrayList_create(&((*manager)->waList)); arrayList_create(&((*manager)->waitingForExport)); + arrayList_create(&((*manager)->waitingForImport)); celixThreadMutex_create(&((*manager)->waListLock), NULL); celixThreadMutex_create(&((*manager)->importedWiringEndpointsLock), NULL); @@ -105,35 +104,42 @@ celix_status_t wiringTopologyManager_destroy(wiring_topology_manager_pt manager) return status; } -celix_status_t wiringTopologyManager_WiringAdminServiceImportWiringEndpoint(wiring_topology_manager_pt manager, wiring_admin_service_pt wiringAdminService, wiring_endpoint_description_pt wEndpoint) { - celix_status_t status = CELIX_BUNDLE_EXCEPTION; - properties_pt adminProperties = NULL; - wiringAdminService->getWiringAdminProperties(wiringAdminService->admin, &adminProperties); +/* check wether waiting service can be exported */ +celix_status_t wiringTopologyManager_checkWaitingForImportServices(wiring_topology_manager_pt manager) { + celix_status_t status = CELIX_SUCCESS; + int size = arrayList_size(manager->waitingForImport); - if (adminProperties != NULL) { + for (--size; size >= 0; --size) { + properties_pt reqProperties = (properties_pt) arrayList_get(manager->waitingForImport, size); - /* only a wiringAdmin which provides the same config can import the endpoint */ - char* wiringConfigEndpoint = properties_get(wEndpoint->properties, WIRING_ADMIN_PROPERTIES_CONFIG_KEY); - char* wiringConfigAdmin = properties_get(adminProperties, WIRING_ADMIN_PROPERTIES_CONFIG_KEY); + hash_map_iterator_pt iter = hashMapIterator_create(manager->importedWiringEndpoints); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + wiring_endpoint_description_pt wEndpoint = hashMapEntry_getKey(entry); - if ((wiringConfigEndpoint != NULL) && (wiringConfigAdmin != NULL) && (strcmp(wiringConfigEndpoint, wiringConfigAdmin) == 0)) { - status = wiringAdminService->importWiringEndpoint(wiringAdminService->admin, wEndpoint); + if (wiringTopologyManager_checkWiringEndpointForImportService(manager, wEndpoint, reqProperties) == CELIX_SUCCESS) { + char* wireId = properties_get(wEndpoint->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); - if (status != CELIX_SUCCESS) { - printf("WTM: importWiringEndpoint via %s failed.\n", wiringConfigAdmin); - } - else { - printf("WTM: importWiringEndpoint via %s suceeded.\n", wiringConfigAdmin); + printf("WTM: WAITING service sucessfully imported via wire %s\n", wireId); + + arrayList_remove(manager->waitingForImport, size); + + /* async notifiy of RSA */ + char* requestedService = properties_get(reqProperties, "requested.service"); + properties_set(wEndpoint->properties, "requested.service", requestedService); + wiringTopologyManager_notifyListenersWiringEndpointAdded(manager, wEndpoint); } - } else { - printf("WTM: Wiring Admin does not match requirements (%s=%s)\n", wiringConfigEndpoint, wiringConfigAdmin); } + + hashMapIterator_destroy(iter); } return status; } + + /* Functions for wiring endpoint listener */ celix_status_t wiringTopologyManager_WiringEndpointAdded(void *handle, wiring_endpoint_description_pt wEndpoint, char *matchedFilter) { celix_status_t status = CELIX_SUCCESS; @@ -146,8 +152,14 @@ celix_status_t wiringTopologyManager_WiringEndpointAdded(void *handle, wiring_en if (wiringAdminList == NULL) { arrayList_create(&wiringAdminList); hashMap_put(manager->importedWiringEndpoints, wEndpoint, wiringAdminList); + + char* wireId = properties_get(wEndpoint->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); + + printf("WTM: WTM gots informed about wire %s\n", wireId); } + wiringTopologyManager_checkWaitingForImportServices(manager); + celixThreadMutex_unlock(&manager->importedWiringEndpointsLock); return status; @@ -180,6 +192,8 @@ celix_status_t wiringTopologyManager_WiringEndpointRemoved(void *handle, wiring_ printf("WTM: Removing of imported wiring endpoint (%s) failed.\n", wireId); } + wiringTopologyManager_notifyListenersWiringEndpointRemoved(manager, wEndpoint); + celixThreadMutex_unlock(&manager->importedWiringEndpointsLock); return status; @@ -247,6 +261,7 @@ bool properties_match(properties_pt properties, properties_pt reference) { strcmp(prop_key, OSGI_FRAMEWORK_SERVICE_ID) != 0 && strcmp(prop_key, OSGI_FRAMEWORK_OBJECTCLASS) != 0 && strcmp(prop_key, "service.exported.interfaces") != 0 && + strcmp(prop_key, "requested.service") != 0 && strcmp(prop_key, "type") != 0) { char* ref_value = (char*) hashMap_get(reference, prop_key); if (ref_value == NULL || (strcmp(ref_value, prop_value) != 0)) { @@ -416,53 +431,141 @@ celix_status_t wiringTopologyManager_removeExportedWiringEndpoint(wiring_topolog return status; } +/* check whether wiring endpoint can be imported by available wiring admins */ +celix_status_t wiringTopologyManager_checkWiringAdminForImportWiringEndpoint(wiring_topology_manager_pt manager, wiring_admin_service_pt wiringAdminService, wiring_endpoint_description_pt wEndpoint) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + properties_pt adminProperties = NULL; + + wiringAdminService->getWiringAdminProperties(wiringAdminService->admin, &adminProperties); + + if (adminProperties != NULL) { + + /* only a wiringAdmin which provides the same config can import the endpoint */ + char* wiringConfigEndpoint = properties_get(wEndpoint->properties, WIRING_ADMIN_PROPERTIES_CONFIG_KEY); + char* wiringConfigAdmin = properties_get(adminProperties, WIRING_ADMIN_PROPERTIES_CONFIG_KEY); + + if ((wiringConfigEndpoint != NULL) && (wiringConfigAdmin != NULL) && (strcmp(wiringConfigEndpoint, wiringConfigAdmin) == 0)) { + status = wiringAdminService->importWiringEndpoint(wiringAdminService->admin, wEndpoint); + + if (status != CELIX_SUCCESS) { + printf("WTM: importWiringEndpoint via %s failed.\n", wiringConfigAdmin); + } + else { + printf("WTM: importWiringEndpoint via %s suceeded.\n", wiringConfigAdmin); + } + } else { + printf("WTM: Wiring Admin does not match requirements (%s=%s)\n", wiringConfigEndpoint, wiringConfigAdmin); + } + } + + return status; +} + + +/* check whether wiring ednpoints can be used to import service */ +celix_status_t wiringTopologyManager_checkWiringEndpointForImportService(wiring_topology_manager_pt manager, wiring_endpoint_description_pt wiringEndpointDesc, properties_pt requiredProperties) { + + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + /* check whether the given wiring endpoint matches the required properties */ + if (properties_match(requiredProperties, wiringEndpointDesc->properties)) { + array_list_pt localWAs = NULL; + wiringTopologyManager_getWAs(manager, &localWAs); + + int listCnt = 0; + int listSize = arrayList_size(localWAs); + + array_list_pt wiringAdminList = (array_list_pt) hashMap_get(manager->importedWiringEndpoints, wiringEndpointDesc); + char* wireId = properties_get(wiringEndpointDesc->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); + + if (listSize == 0) { + printf("WTM: There are no WiringAdmins available for wireId %s\n", wireId); + } + + for (; listCnt < listSize; ++listCnt) { + wiring_admin_service_pt wiringAdminService = (wiring_admin_service_pt) arrayList_get(localWAs, listCnt); + + if (arrayList_contains(wiringAdminList, wiringAdminService)) { + printf("WTM: WiringEndpoint %s is already imported by WiringAdminService %p\n", wireId, wiringAdminService); + status = CELIX_SUCCESS; + + } else { + status = wiringTopologyManager_checkWiringAdminForImportWiringEndpoint(manager, wiringAdminService, wiringEndpointDesc); + + if (status == CELIX_SUCCESS) { + printf("WTM: WiringEndpoint %s sucessfully imported by WiringAdminService %p\n", wireId, wiringAdminService); + arrayList_add(wiringAdminList, wiringAdminService); + + + status = CELIX_SUCCESS; + } + else { + printf("WTM: WiringEndpoint %s imported by WiringAdminService %p FAILED\n", wireId, wiringAdminService); + } + } + } + + arrayList_destroy(localWAs); + + } else { + printf("WTM: rsaProperties do not match imported Endpoint\n"); + } + + return status; + +} + + + celix_status_t wiringTopologyManager_importWiringEndpoint(wiring_topology_manager_pt manager, properties_pt rsaProperties) { celix_status_t status = CELIX_SUCCESS; - celix_status_t status2 = CELIX_BUNDLE_EXCEPTION; hash_map_iterator_pt iter = NULL; + bool endpointAvailable = false; + char* requestedService = properties_get(rsaProperties, "requested.service"); + celixThreadMutex_lock(&manager->importedWiringEndpointsLock); iter = hashMapIterator_create(manager->importedWiringEndpoints); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt importedWiringEndpointEntry = (hash_map_entry_pt) hashMapIterator_nextEntry(iter); - wiring_endpoint_description_pt wiringEndpointDesc = (wiring_endpoint_description_pt) hashMapEntry_getKey(importedWiringEndpointEntry); - array_list_pt wiringAdminList = (array_list_pt) hashMapEntry_getValue(importedWiringEndpointEntry); + if (hashMap_size(manager->importedWiringEndpoints) == 0) { + printf("WTM: No imported WiringEndpoints available yet .\n"); + } - // do we have a matching wiring endpoint - if (properties_match(rsaProperties, wiringEndpointDesc->properties)) { - array_list_pt localWAs = NULL; - wiringTopologyManager_getWAs(manager, &localWAs); + while (hashMapIterator_hasNext(iter)) { + wiring_endpoint_description_pt wiringEndpointDesc = (wiring_endpoint_description_pt) hashMapIterator_nextKey(iter); - int listCnt = 0; - int listSize = arrayList_size(localWAs); + if (wiringTopologyManager_checkWiringEndpointForImportService(manager, wiringEndpointDesc, rsaProperties) == CELIX_SUCCESS) { + endpointAvailable = true; - for (; listCnt < listSize; ++listCnt) { - wiring_admin_service_pt wiringAdminService = (wiring_admin_service_pt) arrayList_get(localWAs, listCnt); + if (requestedService == NULL ) { + printf("WTM: no requestedService property found\n"); + } + else { + printf("WTM: perform async notify about sucessfully informed WiringEndpoint\n"); - if (arrayList_contains(wiringAdminList, wiringAdminService)) { - char* wireId = properties_get(wiringEndpointDesc->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY); - printf("WTM: WiringEndpoint %s is already imported by WiringAdminService %p\n", wireId, wiringAdminService); - } else { - status = wiringTopologyManager_WiringAdminServiceImportWiringEndpoint(manager, wiringAdminService, wiringEndpointDesc); + /* async notifiy of RSA */ + properties_set(wiringEndpointDesc->properties, "requested.service", requestedService); - if (status == CELIX_SUCCESS) { - arrayList_add(wiringAdminList, wiringAdminService); - status2 = status; - } - } + status = wiringTopologyManager_notifyListenersWiringEndpointAdded(manager, wiringEndpointDesc); } - - arrayList_destroy(localWAs); - } else { - printf("WTM: rsaProperties do not match imported Endpoint\n"); } } hashMapIterator_destroy(iter); + + // according endpoint not found + if (endpointAvailable == false) { + printf("WTM: according endpoint not found for service %s. Putting on the wait list.. \n", requestedService); + + arrayList_add(manager->waitingForImport, rsaProperties); + } + celixThreadMutex_unlock(&manager->importedWiringEndpointsLock); - return status2; + // should be SUCESS, so the RSA can return SUCCESS to the TPM, so + // it can be removed later (even if it has never been imported) + + return status; } celix_status_t wiringTopologyManager_removeImportedWiringEndpoint(wiring_topology_manager_pt manager, properties_pt properties) { @@ -504,6 +607,7 @@ celix_status_t wiringTopologyManager_removeImportedWiringEndpoint(wiring_topolog return status; } +/* informs about a sucessful exported wire */ celix_status_t wiringTopologyManager_notifyListenersWiringEndpointAdded(wiring_topology_manager_pt manager, wiring_endpoint_description_pt wEndpoint) { celix_status_t status = CELIX_SUCCESS; @@ -512,11 +616,13 @@ celix_status_t wiringTopologyManager_notifyListenersWiringEndpointAdded(wiring_t if (status == CELIX_SUCCESS) { hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList); while (hashMapIterator_hasNext(iter)) { - char *scope = NULL; + char* rsa = NULL; + char* scope = NULL; wiring_endpoint_listener_pt listener = NULL; service_reference_pt reference = hashMapIterator_nextKey(iter); serviceReference_getProperty(reference, (char *) INAETICS_WIRING_ENDPOINT_LISTENER_SCOPE, &scope); + serviceReference_getProperty(reference, "RSA", &rsa); status = bundleContext_getService(manager->context, reference, (void **) &listener); if (status == CELIX_SUCCESS) { @@ -525,7 +631,7 @@ celix_status_t wiringTopologyManager_notifyListenersWiringEndpointAdded(wiring_t bool matchResult = false; filter_match(filter, wEndpoint->properties, &matchResult); - if (matchResult) { + if (matchResult || (rsa != NULL)) { status = listener->wiringEndpointAdded(listener->handle, wEndpoint, scope); } @@ -548,11 +654,13 @@ celix_status_t wiringTopologyManager_notifyListenersWiringEndpointRemoved(wiring if (status == CELIX_SUCCESS) { hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList); while (hashMapIterator_hasNext(iter)) { + char* rsa = NULL; char *scope = NULL; wiring_endpoint_listener_pt listener = NULL; service_reference_pt reference = hashMapIterator_nextKey(iter); serviceReference_getProperty(reference, (char *) INAETICS_WIRING_ENDPOINT_LISTENER_SCOPE, &scope); + serviceReference_getProperty(reference, "RSA", &rsa); status = bundleContext_getService(manager->context, reference, (void **) &listener); if (status == CELIX_SUCCESS) { @@ -562,7 +670,7 @@ celix_status_t wiringTopologyManager_notifyListenersWiringEndpointRemoved(wiring bool matchResult = false; filter_match(filter, wEndpoint->properties, &matchResult); - if (matchResult) { + if (matchResult || (rsa != NULL)) { status = listener->wiringEndpointRemoved(listener->handle, wEndpoint, scope); } diff --git a/wiring_topology_manager/private/src/wtm_activator.c b/wiring_topology_manager/private/src/wtm_activator.c index 4659adc..8e5d758 100644 --- a/wiring_topology_manager/private/src/wtm_activator.c +++ b/wiring_topology_manager/private/src/wtm_activator.c @@ -4,6 +4,7 @@ #include #include +#include #include "constants.h" #include "bundle_activator.h" @@ -95,6 +96,7 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) properties_pt props = properties_create(); properties_set(props, (char *) INAETICS_WIRING_ENDPOINT_LISTENER_SCOPE, scope); + properties_set(props, (char *) "WTM", "true"); bundleContext_registerService(context, (char *) INAETICS_WIRING_ENDPOINT_LISTENER_SERVICE, wEndpointListener, props, &activator->wiringEndpointListenerService); diff --git a/wiring_topology_manager/private/src/wtm_wadmin_tracker.c b/wiring_topology_manager/private/src/wtm_wadmin_tracker.c index f62ea1e..f1733ba 100644 --- a/wiring_topology_manager/private/src/wtm_wadmin_tracker.c +++ b/wiring_topology_manager/private/src/wtm_wadmin_tracker.c @@ -102,7 +102,7 @@ celix_status_t wiringTopologyManager_waAdded(void * handle, service_reference_pt wiring_endpoint_description_pt wEndpoint = hashMapEntry_getKey(entry); array_list_pt wiringAdminList = hashMapEntry_getValue(entry); - status = wiringTopologyManager_WiringAdminServiceImportWiringEndpoint(manager, wiringAdminService, wEndpoint); + status = wiringTopologyManager_checkWiringAdminForImportWiringEndpoint(manager, wiringAdminService, wEndpoint); if (status == CELIX_SUCCESS) { arrayList_add(wiringAdminList, wiringAdminService); @@ -111,6 +111,10 @@ celix_status_t wiringTopologyManager_waAdded(void * handle, service_reference_pt } hashMapIterator_destroy(iter); + + /* check wether waiting service can be exported */ + wiringTopologyManager_checkWaitingForImportServices(manager); + celixThreadMutex_unlock(&manager->importedWiringEndpointsLock); printf("WTM: Added WA\n"); diff --git a/wiring_topology_manager/private/src/wtm_wendpointlistener_tracker.c b/wiring_topology_manager/private/src/wtm_wendpointlistener_tracker.c index be7c16b..7e5c056 100644 --- a/wiring_topology_manager/private/src/wtm_wendpointlistener_tracker.c +++ b/wiring_topology_manager/private/src/wtm_wendpointlistener_tracker.c @@ -2,6 +2,8 @@ * Licensed under Apache License v2. See LICENSE for more information. */ +#include + #include "service_tracker.h" #include "wiring_topology_manager_impl.h" @@ -33,19 +35,22 @@ celix_status_t wiringTopologyManager_wiringEndpointListenerAdded(void* handle, s celix_status_t status = CELIX_SUCCESS; wiring_topology_manager_pt manager = handle; char *scope = NULL; + char* wtm = NULL; - status = celixThreadMutex_lock(&manager->listenerListLock); + serviceReference_getProperty(reference, (char *) INAETICS_WIRING_ENDPOINT_LISTENER_SCOPE, &scope); + serviceReference_getProperty(reference, "WTM", &wtm); - if (status == CELIX_SUCCESS) { - hashMap_put(manager->listenerList, reference, NULL); - celixThreadMutex_unlock(&manager->listenerListLock); + if (wtm != NULL && strcmp(wtm, "true") == 0) { + printf("WTM: Ignoring own ENDPOINT_LISTENER\n"); } + else { + status = celixThreadMutex_lock(&manager->listenerListLock); - serviceReference_getProperty(reference, (char *) INAETICS_WIRING_ENDPOINT_LISTENER_SCOPE, &scope); + if (status == CELIX_SUCCESS) { + hashMap_put(manager->listenerList, reference, NULL); + celixThreadMutex_unlock(&manager->listenerListLock); + } - char *nodeDiscoveryListener = NULL; - serviceReference_getProperty(reference, "NODE_DISCOVERY", &nodeDiscoveryListener); - if (nodeDiscoveryListener != NULL && strcmp(nodeDiscoveryListener, "true") == 0) { filter_pt filter = filter_create(scope); status = celixThreadMutex_lock(&manager->exportedWiringEndpointsLock); @@ -74,10 +79,10 @@ celix_status_t wiringTopologyManager_wiringEndpointListenerAdded(void* handle, s celixThreadMutex_unlock(&manager->exportedWiringEndpointsLock); } filter_destroy(filter); - } else { - printf("WTM: Ignoring Non-Discovery ENDPOINT_LISTENER\n"); + } + return status; }