Skip to content

Commit

Permalink
Fix #294 & go travis tests
Browse files Browse the repository at this point in the history
Fixed #294 by the prior revert of 6e5da12. Enable build of golang
backend and fixed the broken golang tests by: added explicit path to
header files and lib, set the LCM backend queue size to prevent message
drops (as we are handling buffering, potential drops & drop count),
enabled verification of ordering of messages in the golang unit tests,
wrapped lcm_handle() with a select as it was reading on invalid handle
after destroy() is called.
  • Loading branch information
gustafj committed Feb 11, 2020
1 parent 657d3fc commit fd6277e
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 20 deletions.
5 changes: 1 addition & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ endif()
lcm_option(
LCM_ENABLE_GO
"Build Go utilities, bindings is source distributed"

# Disable until #294 is resolved
FALSE Go)
# GO_FOUND Go)
GO_FOUND Go)

option(LCM_ENABLE_TESTS "Build unit tests" ON)
if(LCM_ENABLE_TESTS)
Expand Down
34 changes: 31 additions & 3 deletions lcm-go/lcm/lcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,43 @@ package lcm
//
// #include <stdlib.h>
// #include <string.h>
// #include <sys/select.h>
// #include <lcm/lcm.h>
//
// extern void goLCMCallbackHandler(void *, int, char *);
//
// static void lcm_msg_handler(const lcm_recv_buf_t *buffer,
// const char *channel, void *userdata) {
// (void)userdata;
// goLCMCallbackHandler((void *)buffer->data, (int)buffer->data_size,
// (void)userdata;
// goLCMCallbackHandler((void *)buffer->data, (int)buffer->data_size,
// (char *)channel);
// }
//
// static lcm_subscription_t * lcm_go_subscribe(lcm_t *lcm,
// const char *channel) {
// return lcm_subscribe(lcm, channel, &lcm_msg_handler, NULL);
// }
//
// // Wrap lcm_handle in a select() as this is called in a separate go-routine
// // which waits indefinitely in lcm_handle on messages and is not notified of
// // calls to Destroy().
// // This has the unfortunate effect of increasing the latency a bit.
// //
// // return 0 normally, or -1 when an error has occurred.
// static int lcm_go_handle(lcm_t *lcm) {
// int lcm_fd = lcm_get_fileno(lcm);
// fd_set fds;
// FD_ZERO(&fds);
// FD_SET(lcm_fd, &fds);
//
// int status = select(lcm_fd + 1, &fds, 0, 0, 0);
//
// if (FD_ISSET(lcm_fd, &fds)) {
// // LCM has events ready to be processed.
// return lcm_handle(lcm);
// }
// return status;
// }
import "C"

import (
Expand Down Expand Up @@ -99,6 +121,11 @@ func (lcm LCM) Subscribe(channel string, size int) (Subscription, error) {
channel)
}

// Set lcm backend queue size to "unlimited", as we are handling buffering
if C.lcm_subscription_set_queue_capacity(cPtr, 0) != 0 {
return Subscription{}, errors.New("could not set queue capacity")
}

subscription := Subscription{
ReceiveChan: make(chan []byte, size),
channel: channel,
Expand Down Expand Up @@ -157,6 +184,7 @@ func (lcm LCM) Publisher(channel string) (chan<- []byte, <-chan error) {
buffer := C.malloc(dataSize)
if buffer == nil {
errs <- errors.New("could not malloc memory for lcm message")
break
}
defer C.free(buffer)
C.memcpy(buffer, unsafe.Pointer(&data[0]), dataSize)
Expand Down Expand Up @@ -198,7 +226,7 @@ func (lcm LCM) handle() {
defer close(lcm.Errors)

for !lcm.closed {
if status := C.lcm_handle(lcm.cPtr); status != 0 {
if status := C.lcm_go_handle(lcm.cPtr); status != 0 {
lcm.Errors <- errors.New("could not call lcm_handle")
}
}
Expand Down
58 changes: 48 additions & 10 deletions lcm-go/lcm/lcm_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package lcm

import (
"strconv"
"strings"
"testing"
"time"
)
Expand All @@ -9,17 +11,20 @@ const (
// The amount of messages we want to exchange here.
messageGoal = 10000
// Max time we want to wait for executing to be done.
timeout = 1 * time.Second
timeout = 4 * time.Second
)

var (
// Some sample data.
// Some sample data, prefixed with '<message index>:' when sent.
someBytes = []byte("abcdefghijklmnopqrstuvwxyz")
someBytesLen = len(someBytes)
)

func runTest(t *testing.T, provider string) {
// The amount of messages that we received.
messages := 1
messages := 0
// The amount of dropped messages.
drops := 0
var lcm LCM
var err error

Expand Down Expand Up @@ -55,35 +60,68 @@ func runTest(t *testing.T, provider string) {
// Send messageGoal messages
go func() {
t.Log("sending", "amount", messageGoal)
publishChan, _ := lcm.Publisher("TEST")
for i := 1; i < messageGoal; i++ {
publishChan <- someBytes
publishChan, errChan := lcm.Publisher("TEST")
for i := 1; i <= messageGoal; i++ {
// Prefix with message #
data := make([]byte, 0, 8+someBytesLen)
data = append(data, strconv.Itoa(i)...)
data = append(data, ": "...)
data = append(data, someBytes...)

publishChan <- data

// Give the backend some slack so we don't have drops
if i%99 == 0 {
time.Sleep(1 * time.Millisecond)
}
}
close(publishChan)

// Check for and fail if errors
select {
case err := <-errChan:
t.Error(err)
default:
t.Log("sending successful")
}
}()

timer := time.After(timeout)
FOR_SELECT:
for {
select {
case <-time.After(timeout):
case <-timer:
t.Log("timed out after", timeout)
break FOR_SELECT
case _, ok := <-subscription.ReceiveChan:
case data, ok := <-subscription.ReceiveChan:
if !ok {
break FOR_SELECT
}
messages++

// Verify message sequence order
id, err := strconv.Atoi(strings.Split(string(data), ":")[0])
if err != nil {
t.Fatal(err)
}
expected := messages+drops
if expected != id {
t.Errorf("%d messages dropped, expected %d, got %d",
id-expected, expected, id)
drops += id-expected
}
}

// Stop, if we are already there...
if messages == messageGoal {
if messages+drops == messageGoal {
break
}
}

// Did we receive messageGoal messages?
if messages != messageGoal {
t.Fatalf("Expected %d but received %d messages.", messageGoal, messages)
t.Fatalf("Expected %d but received %d messages, %d drops, go-backend drops %d.",
messageGoal, messages, drops, subscription.Drops)
}

t.Log("received", "amount", messages)
Expand Down
20 changes: 17 additions & 3 deletions test/go/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@ find_program(GO_EXECUTABLE go)
if(PYTHON_EXECUTABLE AND GO_EXECUTABLE)
add_test(NAME Go::client_server COMMAND
${CMAKE_COMMAND} -E env
"GOPATH=${lcm_BINARY_DIR}/test/types/go:${GOPATH}"
"GOPATH=${lcm_BINARY_DIR}/test/types/go"
"CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm"
"CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm"
${PYTHON_EXECUTABLE}
${CMAKE_CURRENT_SOURCE_DIR}/../run_client_server_test.py
$<TARGET_FILE:test-c-server>
${GO_EXECUTABLE} test ${CMAKE_CURRENT_SOURCE_DIR}/client_test.go)
endif()

add_test(NAME Go::unit_test COMMAND
${GO_EXECUTABLE} test -v ./...
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/)
${CMAKE_COMMAND} -E env
"CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm"
"CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm"
${GO_EXECUTABLE} test -v ./...
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/)

if(GO_ARCH MATCHES "^amd64$" AND GO_PLATFORM MATCHES "^(linux|freebsd|darwin|windows)$")
add_test(NAME Go::unit_test::race_enabled COMMAND
${CMAKE_COMMAND} -E env
"CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm"
"CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm"
${GO_EXECUTABLE} test -v -race ./...
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/)
endif()

0 comments on commit fd6277e

Please sign in to comment.