Skip to content

Commit

Permalink
Fixed race condition in RSA/WA
Browse files Browse the repository at this point in the history
  • Loading branch information
bpetri committed Sep 14, 2015
1 parent eedbf3b commit 2f63862
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct remote_service_admin {
array_list_pt exportedWires;

service_tracker_pt sendServicesTracker;
celix_thread_mutex_t sendServicesLock;
hash_map_pt sendServices;

celix_thread_mutex_t listenerListLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ celix_status_t remoteServiceAdmin_create(bundle_context_pt context, remote_servi

status = celixThreadMutex_create(&(*admin)->listenerListLock, NULL);
celixThreadMutex_create(&(*admin)->wtmListLock, NULL);
celixThreadMutex_create(&(*admin)->sendServicesLock, NULL);
celixThreadMutex_create(&(*admin)->exportedServicesLock, NULL);
celixThreadMutex_create(&(*admin)->importedServicesLock, NULL);

Expand All @@ -104,6 +105,7 @@ celix_status_t remoteServiceAdmin_destroy(remote_service_admin_pt *admin) {

arrayList_destroy((*admin)->exportedWires);

celixThreadMutex_destroy(&(*admin)->sendServicesLock);
celixThreadMutex_destroy(&(*admin)->exportedServicesLock);
celixThreadMutex_destroy(&(*admin)->importedServicesLock);

Expand Down Expand Up @@ -769,7 +771,7 @@ celix_status_t remoteServiceAdmin_removeImportedService(remote_service_admin_pt
importRegistration_destroy(registration);

if (arrayList_isEmpty(registration_factory->registrations)) {
logHelper_log(admin->loghelper, OSGI_LOGSERVICE_INFO, "RSA: closing proxy of service %s.", endpointDescription->service);
logHelper_log(admin->loghelper, OSGI_LOGSERVICE_INFO, "RSA: closing proxy of service %s", endpointDescription->service);

serviceTracker_close(registration_factory->proxyFactoryTracker);
importRegistrationFactory_close(registration_factory);
Expand All @@ -795,24 +797,29 @@ celix_status_t remoteServiceAdmin_send(remote_service_admin_pt admin, endpoint_d
} else {
wiring_send_service_pt wiringSendService = NULL;

wiringSendService = hashMap_get(admin->sendServices, wireId);
status = celixThreadMutex_lock(&admin->sendServicesLock);

if (wiringSendService == NULL) {
printf("RSA: No SendService w/ wireId %s found.\n", wireId);
status = CELIX_ILLEGAL_ARGUMENT;
} else {
json_t *root;
json_t *json_request;
json_error_t jsonError;
if (status == CELIX_SUCCESS) {
wiringSendService = hashMap_get(admin->sendServices, wireId);

if (wiringSendService == NULL) {
printf("RSA: No SendService w/ wireId %s found.\n", wireId);
status = CELIX_ILLEGAL_ARGUMENT;
} else {
json_t *root;
json_t *json_request;
json_error_t jsonError;

json_request = json_loads(request, 0, &jsonError);
root = json_pack("{s:i, s:o}", "service.id", endpointDescription->serviceId, "request", json_request);
char *json_data = json_dumps(root, 0);
json_request = json_loads(request, 0, &jsonError);
root = json_pack("{s:i, s:o}", "service.id", endpointDescription->serviceId, "request", json_request);
char *json_data = json_dumps(root, 0);

status = wiringSendService->send(wiringSendService, json_data, reply, replyStatus);
status = wiringSendService->send(wiringSendService, json_data, reply, replyStatus);

free(json_data);
json_decref(root);
free(json_data);
json_decref(root);
}
celixThreadMutex_unlock(&admin->sendServicesLock);
}
}

Expand Down Expand Up @@ -875,7 +882,12 @@ static celix_status_t remoteServiceAdmin_sendServiceAdded(void * handle, service
wiring_send_service_pt wiringSendService = (wiring_send_service_pt) service;
char* wireId = properties_get(wiringSendService->wiringEndpointDescription->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY);

hashMap_put(admin->sendServices, wireId, wiringSendService);
status = celixThreadMutex_lock(&admin->sendServicesLock);

if (status == CELIX_SUCCESS) {
hashMap_put(admin->sendServices, wireId, wiringSendService);
status = celixThreadMutex_unlock(&admin->sendServicesLock);
}

return status;
}
Expand All @@ -894,7 +906,12 @@ static celix_status_t remoteServiceAdmin_sendServiceRemoved(void * handle, servi
char* wireId = properties_get(wiringSendService->wiringEndpointDescription->properties, WIRING_ENDPOINT_DESCRIPTION_WIRE_ID_KEY);
printf("RSA: remove Wiring Endpoint w/ wireId %s\n", wireId);

hashMap_remove(admin->sendServices, wireId);
status = celixThreadMutex_lock(&admin->sendServicesLock);

if (status == CELIX_SUCCESS) {
hashMap_remove(admin->sendServices, wireId);
status = celixThreadMutex_unlock(&admin->sendServicesLock);
}

return status;
}
Expand Down
13 changes: 6 additions & 7 deletions wiring_admin/private/src/wiring_admin_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ celix_status_t wiringAdmin_removeExportedWiringEndpoint(wiring_admin_pt admin, w
wiringAdmin_stopWebserver(admin);
}
}

wiringEndpointDescription_destroy(&wEndpointDescription);

celixThreadMutex_unlock(&admin->exportedWiringEndpointLock);
Expand Down Expand Up @@ -506,8 +507,11 @@ celix_status_t wiringAdmin_removeImportedWiringEndpoint(wiring_admin_pt admin, w
wiring_send_service_pt wiringSendService = hashMap_remove(admin->wiringSendServices, wEndpointDescription);
service_registration_pt wiringSendRegistration = hashMap_remove(admin->wiringSendRegistrations, wEndpointDescription);

serviceRegistration_unregister(wiringSendRegistration);
free(wiringSendService);
status = serviceRegistration_unregister(wiringSendRegistration);

if (status == CELIX_SUCCESS) {
free(wiringSendService);
}

celixThreadMutex_unlock(&admin->importedWiringEndpointLock);

Expand All @@ -517,9 +521,6 @@ celix_status_t wiringAdmin_removeImportedWiringEndpoint(wiring_admin_pt admin, w
static celix_status_t wiringAdmin_send(wiring_send_service_pt sendService, char *request, char **reply, int* replyStatus) {

celix_status_t status = CELIX_SUCCESS;
wiring_admin_pt admin = sendService->admin;

celixThreadMutex_lock(&admin->importedWiringEndpointLock);

struct post post;
post.readptr = request;
Expand Down Expand Up @@ -564,8 +565,6 @@ static celix_status_t wiringAdmin_send(wiring_send_service_pt sendService, char

}

celixThreadMutex_unlock(&admin->importedWiringEndpointLock);

return status;
}

Expand Down

0 comments on commit 2f63862

Please sign in to comment.