diff --git a/CMakeLists.txt b/CMakeLists.txt index 9922e9c24..6d3a4c26b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,10 +77,7 @@ endif() lcm_option( LCM_ENABLE_GO "Build Go utilities, bindings is source distributed" - - # Disable until #294 is resolved - FALSE Go) - # GO_FOUND Go) + GO_FOUND Go) option(LCM_ENABLE_TESTS "Build unit tests" ON) if(LCM_ENABLE_TESTS) diff --git a/lcm-go/lcm/lcm.go b/lcm-go/lcm/lcm.go index f1ad180a3..82c560d0c 100644 --- a/lcm-go/lcm/lcm.go +++ b/lcm-go/lcm/lcm.go @@ -4,14 +4,15 @@ package lcm // // #include // #include +// #include // #include // // extern void goLCMCallbackHandler(void *, int, char *); // // static void lcm_msg_handler(const lcm_recv_buf_t *buffer, // const char *channel, void *userdata) { -// (void)userdata; -// goLCMCallbackHandler((void *)buffer->data, (int)buffer->data_size, +// (void)userdata; +// goLCMCallbackHandler((void *)buffer->data, (int)buffer->data_size, // (char *)channel); // } // @@ -19,6 +20,27 @@ package lcm // const char *channel) { // return lcm_subscribe(lcm, channel, &lcm_msg_handler, NULL); // } +// +// // Wrap lcm_handle in a select() as this is called in a separate go-routine +// // which waits indefinitely in lcm_handle on messages and is not notified of +// // calls to Destroy(). +// // This has the unfortunate effect of increasing the latency a bit. +// // +// // return 0 normally, or -1 when an error has occurred. +// static int lcm_go_handle(lcm_t *lcm) { +// int lcm_fd = lcm_get_fileno(lcm); +// fd_set fds; +// FD_ZERO(&fds); +// FD_SET(lcm_fd, &fds); +// +// int status = select(lcm_fd + 1, &fds, 0, 0, 0); +// +// if (FD_ISSET(lcm_fd, &fds)) { +// // LCM has events ready to be processed. +// return lcm_handle(lcm); +// } +// return status; +// } import "C" import ( @@ -99,6 +121,11 @@ func (lcm LCM) Subscribe(channel string, size int) (Subscription, error) { channel) } + // Set lcm backend queue size to "unlimited", as we are handling buffering + if C.lcm_subscription_set_queue_capacity(cPtr, 0) != 0 { + return Subscription{}, errors.New("could not set queue capacity") + } + subscription := Subscription{ ReceiveChan: make(chan []byte, size), channel: channel, @@ -157,6 +184,7 @@ func (lcm LCM) Publisher(channel string) (chan<- []byte, <-chan error) { buffer := C.malloc(dataSize) if buffer == nil { errs <- errors.New("could not malloc memory for lcm message") + break } defer C.free(buffer) C.memcpy(buffer, unsafe.Pointer(&data[0]), dataSize) @@ -198,7 +226,7 @@ func (lcm LCM) handle() { defer close(lcm.Errors) for !lcm.closed { - if status := C.lcm_handle(lcm.cPtr); status != 0 { + if status := C.lcm_go_handle(lcm.cPtr); status != 0 { lcm.Errors <- errors.New("could not call lcm_handle") } } diff --git a/lcm-go/lcm/lcm_test.go b/lcm-go/lcm/lcm_test.go index 5178cc89d..bf295caa9 100644 --- a/lcm-go/lcm/lcm_test.go +++ b/lcm-go/lcm/lcm_test.go @@ -1,6 +1,8 @@ package lcm import ( + "strconv" + "strings" "testing" "time" ) @@ -9,17 +11,20 @@ const ( // The amount of messages we want to exchange here. messageGoal = 10000 // Max time we want to wait for executing to be done. - timeout = 1 * time.Second + timeout = 4 * time.Second ) var ( - // Some sample data. + // Some sample data, prefixed with ':' when sent. someBytes = []byte("abcdefghijklmnopqrstuvwxyz") + someBytesLen = len(someBytes) ) func runTest(t *testing.T, provider string) { // The amount of messages that we received. - messages := 1 + messages := 0 + // The amount of dropped messages. + drops := 0 var lcm LCM var err error @@ -55,35 +60,68 @@ func runTest(t *testing.T, provider string) { // Send messageGoal messages go func() { t.Log("sending", "amount", messageGoal) - publishChan, _ := lcm.Publisher("TEST") - for i := 1; i < messageGoal; i++ { - publishChan <- someBytes + publishChan, errChan := lcm.Publisher("TEST") + for i := 1; i <= messageGoal; i++ { + // Prefix with message # + data := make([]byte, 0, 8+someBytesLen) + data = append(data, strconv.Itoa(i)...) + data = append(data, ": "...) + data = append(data, someBytes...) + + publishChan <- data + + // Give the backend some slack so we don't have drops + if i%99 == 0 { + time.Sleep(1 * time.Millisecond) + } } close(publishChan) + + // Check for and fail if errors + select { + case err := <-errChan: + t.Error(err) + default: + t.Log("sending successful") + } }() + timer := time.After(timeout) FOR_SELECT: for { select { - case <-time.After(timeout): + case <-timer: t.Log("timed out after", timeout) break FOR_SELECT - case _, ok := <-subscription.ReceiveChan: + case data, ok := <-subscription.ReceiveChan: if !ok { break FOR_SELECT } messages++ + + // Verify message sequence order + id, err := strconv.Atoi(strings.Split(string(data), ":")[0]) + if err != nil { + t.Fatal(err) + } + expected := messages+drops + if expected != id { + t.Errorf("%d messages dropped, expected %d, got %d", + id-expected, expected, id) + drops += id-expected + } } // Stop, if we are already there... - if messages == messageGoal { + if messages+drops == messageGoal { break } } // Did we receive messageGoal messages? if messages != messageGoal { - t.Fatalf("Expected %d but received %d messages.", messageGoal, messages) + t.Fatalf("Expected %d but received %d messages, %d drops, go-backend drops %d.", + messageGoal, messages, drops, subscription.Drops) } t.Log("received", "amount", messages) diff --git a/test/go/CMakeLists.txt b/test/go/CMakeLists.txt index 3422ed72c..c2b289532 100644 --- a/test/go/CMakeLists.txt +++ b/test/go/CMakeLists.txt @@ -3,7 +3,9 @@ find_program(GO_EXECUTABLE go) if(PYTHON_EXECUTABLE AND GO_EXECUTABLE) add_test(NAME Go::client_server COMMAND ${CMAKE_COMMAND} -E env - "GOPATH=${lcm_BINARY_DIR}/test/types/go:${GOPATH}" + "GOPATH=${lcm_BINARY_DIR}/test/types/go" + "CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm" + "CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm" ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/../run_client_server_test.py $ @@ -11,5 +13,17 @@ if(PYTHON_EXECUTABLE AND GO_EXECUTABLE) endif() add_test(NAME Go::unit_test COMMAND - ${GO_EXECUTABLE} test -v ./... - WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/) + ${CMAKE_COMMAND} -E env + "CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm" + "CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm" + ${GO_EXECUTABLE} test -v ./... + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/) + +if(GO_ARCH MATCHES "^amd64$" AND GO_PLATFORM MATCHES "^(linux|freebsd|darwin|windows)$") + add_test(NAME Go::unit_test::race_enabled COMMAND + ${CMAKE_COMMAND} -E env + "CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm" + "CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm" + ${GO_EXECUTABLE} test -v -race ./... + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/) +endif()