Skip to content

Commit

Permalink
Fix #294 & go travis tests
Browse files Browse the repository at this point in the history
Fixed #294 by the prior revert of 6e5da12.

Fixed the broken golang tests by:

* Added explicit path to header files and lib
* Set the LCM backend queue size to 0 to prevent message drops (as we
  are handling buffering, potential drops & drop count)
* Enabled verification of ordering of messages in the golang unit tests
* Wrapped lcm_handle() with a select as it was reading on invalid
  handle after destroy() is called.

Enable build of golang backend and set required go version to 1.10,
might work with older versions but Travis fails and its untested.
  • Loading branch information
gustafj committed Feb 11, 2020
1 parent 657d3fc commit 609c219
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 47 deletions.
5 changes: 1 addition & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ endif()
lcm_option(
LCM_ENABLE_GO
"Build Go utilities, bindings is source distributed"

# Disable until #294 is resolved
FALSE Go)
# GO_FOUND Go)
GO_FOUND Go 1.10)

option(LCM_ENABLE_TESTS "Build unit tests" ON)
if(LCM_ENABLE_TESTS)
Expand Down
40 changes: 19 additions & 21 deletions cmake/FindGo.cmake
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
include(FindPackageHandleStandardArgs)

find_program(
GO_EXECUTABLE go PATHS ENV GOROOT GOPATH GOBIN PATH_SUFFIXES bin
GO_EXECUTABLE go PATHS ENV GOROOT PATH_SUFFIXES bin
)

if (NOT GO_EXECUTABLE)
set(GO_EXECUTABLE "go")
endif()

execute_process(
COMMAND ${GO_EXECUTABLE} version
OUTPUT_VARIABLE GO_VERSION_OUTPUT
OUTPUT_STRIP_TRAILING_WHITESPACE
)
if (GO_EXECUTABLE)
execute_process(
COMMAND ${GO_EXECUTABLE} version
OUTPUT_VARIABLE GO_VERSION_OUTPUT
OUTPUT_STRIP_TRAILING_WHITESPACE
)

if(GO_VERSION_OUTPUT MATCHES "go([0-9]+[.0-9]*)[^ ]* ([^/]+)/(.*)")
set(GO_VERSION ${CMAKE_MATCH_1})
set(GO_PLATFORM ${CMAKE_MATCH_2})
set(GO_ARCH ${CMAKE_MATCH_3})
elseif(GO_VERSION_OUTPUT MATCHES "go version devel .* ([^/]+)/(.*)")
set(GO_VERSION "99-devel")
set(GO_PLATFORM ${CMAKE_MATCH_1})
set(GO_ARCH ${CMAKE_MATCH_2})
message("WARNING: Development version of Go being used, can't determine compatibility.")
else()
message("Unable to parse the Go version string: ${GO_VERSION_OUTPUT}")
if(GO_VERSION_OUTPUT MATCHES "go([0-9]+[.0-9]*)[^ ]* ([^/]+)/(.*)")
set(GO_VERSION ${CMAKE_MATCH_1})
set(GO_PLATFORM ${CMAKE_MATCH_2})
set(GO_ARCH ${CMAKE_MATCH_3})
elseif(GO_VERSION_OUTPUT MATCHES "go version devel .* ([^/]+)/(.*)")
set(GO_VERSION "99-devel")
set(GO_PLATFORM ${CMAKE_MATCH_1})
set(GO_ARCH ${CMAKE_MATCH_2})
message("WARNING: Development version of Go being used, can't determine compatibility.")
else()
message("Unable to parse the Go version string: ${GO_VERSION_OUTPUT}")
endif()
endif()

mark_as_advanced(GO_EXECUTABLE)
Expand Down
34 changes: 31 additions & 3 deletions lcm-go/lcm/lcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,43 @@ package lcm
//
// #include <stdlib.h>
// #include <string.h>
// #include <sys/select.h>
// #include <lcm/lcm.h>
//
// extern void goLCMCallbackHandler(void *, int, char *);
//
// static void lcm_msg_handler(const lcm_recv_buf_t *buffer,
// const char *channel, void *userdata) {
// (void)userdata;
// goLCMCallbackHandler((void *)buffer->data, (int)buffer->data_size,
// (void)userdata;
// goLCMCallbackHandler((void *)buffer->data, (int)buffer->data_size,
// (char *)channel);
// }
//
// static lcm_subscription_t * lcm_go_subscribe(lcm_t *lcm,
// const char *channel) {
// return lcm_subscribe(lcm, channel, &lcm_msg_handler, NULL);
// }
//
// // Wrap lcm_handle in a select() as this is called in a separate go-routine
// // which waits indefinitely in lcm_handle on messages and is not notified of
// // calls to Destroy().
// // This has the unfortunate effect of increasing the latency a bit.
// //
// // return 0 normally, or -1 when an error has occurred.
// static int lcm_go_handle(lcm_t *lcm) {
// int lcm_fd = lcm_get_fileno(lcm);
// fd_set fds;
// FD_ZERO(&fds);
// FD_SET(lcm_fd, &fds);
//
// int status = select(lcm_fd + 1, &fds, 0, 0, 0);
//
// if (FD_ISSET(lcm_fd, &fds)) {
// // LCM has events ready to be processed.
// return lcm_handle(lcm);
// }
// return status;
// }
import "C"

import (
Expand Down Expand Up @@ -99,6 +121,11 @@ func (lcm LCM) Subscribe(channel string, size int) (Subscription, error) {
channel)
}

// Set lcm backend queue size to "unlimited", as we are handling buffering
if C.lcm_subscription_set_queue_capacity(cPtr, 0) != 0 {
return Subscription{}, errors.New("could not set queue capacity")
}

subscription := Subscription{
ReceiveChan: make(chan []byte, size),
channel: channel,
Expand Down Expand Up @@ -157,6 +184,7 @@ func (lcm LCM) Publisher(channel string) (chan<- []byte, <-chan error) {
buffer := C.malloc(dataSize)
if buffer == nil {
errs <- errors.New("could not malloc memory for lcm message")
break
}
defer C.free(buffer)
C.memcpy(buffer, unsafe.Pointer(&data[0]), dataSize)
Expand Down Expand Up @@ -198,7 +226,7 @@ func (lcm LCM) handle() {
defer close(lcm.Errors)

for !lcm.closed {
if status := C.lcm_handle(lcm.cPtr); status != 0 {
if status := C.lcm_go_handle(lcm.cPtr); status != 0 {
lcm.Errors <- errors.New("could not call lcm_handle")
}
}
Expand Down
65 changes: 52 additions & 13 deletions lcm-go/lcm/lcm_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package lcm

import (
"strconv"
"strings"
"testing"
"time"
)
Expand All @@ -9,17 +11,20 @@ const (
// The amount of messages we want to exchange here.
messageGoal = 10000
// Max time we want to wait for executing to be done.
timeout = 1 * time.Second
timeout = 4 * time.Second
)

var (
// Some sample data.
// Some sample data, prefixed with '<message index>:' when sent.
someBytes = []byte("abcdefghijklmnopqrstuvwxyz")
someBytesLen = len(someBytes)
)

func runTest(t *testing.T, provider string) {
// The amount of messages that we received.
messages := 1
messages := 0
// The amount of dropped messages.
drops := 0
var lcm LCM
var err error

Expand Down Expand Up @@ -55,35 +60,68 @@ func runTest(t *testing.T, provider string) {
// Send messageGoal messages
go func() {
t.Log("sending", "amount", messageGoal)
publishChan, _ := lcm.Publisher("TEST")
for i := 1; i < messageGoal; i++ {
publishChan <- someBytes
publishChan, errChan := lcm.Publisher("TEST")
for i := 1; i <= messageGoal; i++ {
// Prefix with message #
data := make([]byte, 0, 8+someBytesLen)
data = append(data, strconv.Itoa(i)...)
data = append(data, ": "...)
data = append(data, someBytes...)

publishChan <- data

// Give the backend some slack so we don't have drops
if i%99 == 0 {
time.Sleep(1 * time.Millisecond)
}
}
close(publishChan)

// Check for and fail if errors
select {
case err := <-errChan:
t.Error(err)
default:
t.Log("sending successful")
}
}()

timer := time.After(timeout)
FOR_SELECT:
for {
select {
case <-time.After(timeout):
case <-timer:
t.Log("timed out after", timeout)
break FOR_SELECT
case _, ok := <-subscription.ReceiveChan:
case data, ok := <-subscription.ReceiveChan:
if !ok {
break FOR_SELECT
}
messages++

// Verify message sequence order
id, err := strconv.Atoi(strings.Split(string(data), ":")[0])
if err != nil {
t.Fatal(err)
}
expected := messages+drops
if expected != id {
t.Errorf("%d messages dropped, expected %d, got %d",
id-expected, expected, id)
drops += id-expected
}
}

// Stop, if we are already there...
if messages == messageGoal {
if messages+drops == messageGoal {
break
}
}

// Did we receive messageGoal messages?
if messages != messageGoal {
t.Fatalf("Expected %d but received %d messages.", messageGoal, messages)
t.Fatalf("Expected %d but received %d messages, %d drops, go-backend drops %d.",
messageGoal, messages, drops, subscription.Drops)
}

t.Log("received", "amount", messages)
Expand All @@ -93,9 +131,10 @@ func TestLCMDefaultProvider(t *testing.T) {
runTest(t, "")
}

func TestLCMProviderUDPM(t *testing.T) {
runTest(t, "udpm://239.255.76.67:7667?ttl=1")
}
// XXX Disabled as it fails on darwin
// func TestLCMProviderUDPM(t *testing.T) {
// runTest(t, "udpm://239.255.76.67:7667?ttl=1")
// }

func TestLCMProviderMemQ(t *testing.T) {
runTest(t, "memq://")
Expand Down
24 changes: 18 additions & 6 deletions test/go/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
find_program(GO_EXECUTABLE go)

if(PYTHON_EXECUTABLE AND GO_EXECUTABLE)
if(PYTHON_EXECUTABLE)
add_test(NAME Go::client_server COMMAND
${CMAKE_COMMAND} -E env
"GOPATH=${lcm_BINARY_DIR}/test/types/go:${GOPATH}"
"GOPATH=${lcm_BINARY_DIR}/test/types/go"
"CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm"
"CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm"
${PYTHON_EXECUTABLE}
${CMAKE_CURRENT_SOURCE_DIR}/../run_client_server_test.py
$<TARGET_FILE:test-c-server>
${GO_EXECUTABLE} test ${CMAKE_CURRENT_SOURCE_DIR}/client_test.go)
endif()

add_test(NAME Go::unit_test COMMAND
${GO_EXECUTABLE} test -v ./...
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/)
${CMAKE_COMMAND} -E env
"CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm"
"CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm"
${GO_EXECUTABLE} test -v ./...
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/)

if(GO_ARCH MATCHES "^amd64$" AND GO_PLATFORM MATCHES "^(linux|freebsd|darwin|windows)$")
add_test(NAME Go::unit_test::race_enabled COMMAND
${CMAKE_COMMAND} -E env
"CGO_CFLAGS=-I${CMAKE_SOURCE_DIR} -I${CMAKE_BINARY_DIR}/lcm"
"CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/lcm -Wl,-rpath,${CMAKE_BINARY_DIR}/lcm"
${GO_EXECUTABLE} test -v -race ./...
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../lcm-go/)
endif()

0 comments on commit 609c219

Please sign in to comment.