Skip to content
This repository was archived by the owner on May 29, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
language: go

go:
- 1.4
- 1.4
- tip

script:
- go install ./...
- go test -v
- ./svc-test.bash
46 changes: 39 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"log"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -41,6 +42,27 @@ var (
ErrTooLarge = errors.New("synapse: message body too large")
)

// ErrorLogger is the logger used
// by the package to log protocol
// errors. (In general, protocol-level
// errors are not returned directly
// to the client.) It can be set during
// initialization. If it is left
// as nil, nothing will be logged.
var ErrorLogger *log.Logger

func errorln(s string) {
if ErrorLogger != nil {
ErrorLogger.Println(s)
}
}

func errorf(s string, args ...interface{}) {
if ErrorLogger != nil {
ErrorLogger.Printf(s, args...)
}
}

// Dial creates a new client by dialing
// the provided network and remote address.
// The provided timeout is used as the timeout
Expand Down Expand Up @@ -87,13 +109,16 @@ func NewClient(c net.Conn, timeout time.Duration) (*Client, error) {
cl.Close()
return nil, fmt.Errorf("synapse: ping failed: %s", err)
}
// sync links asap
go cl.syncLinks()

return cl, nil
}

// Client is a client to
// a single synapse server.
type Client struct {
svc string
conn net.Conn // connection
wlock sync.Mutex // write lock
csn uint64 // sequence number; atomic
Expand All @@ -104,6 +129,10 @@ type Client struct {
pending wMap // map seq number to waiting handler
}

func (c *Client) Service() string {
return c.svc
}

// used to transfer control
// flow to blocking goroutines
type waiter struct {
Expand Down Expand Up @@ -193,6 +222,7 @@ func (c *Client) readLoop() {
// they are routed to waiters
// precisely the same way
if frame != fCMD && frame != fRES {
errorf("server at addr %s sent a bad frame", c.conn.RemoteAddr())
// ignore
if !c.do(bwr.Skip(sz)) {
return
Expand Down Expand Up @@ -434,17 +464,11 @@ func (c *Client) sendCommand(cmd command, msg []byte) error {
ret := command(w.in[0])

if ret == cmdInvalid || ret >= _maxcommand {
waiters.push(w)
return errInvalidCmd
}

act := cmdDirectory[ret]
if act == nil {
waiters.push(w)
return errUnknownCmd
}

act.Client(c, w.in[1:])
cmdDirectory[ret].done(c, w.in[1:])
waiters.push(w)
return nil
}
Expand All @@ -455,3 +479,11 @@ func (c *Client) sendCommand(cmd command, msg []byte) error {
func (c *Client) ping() error {
return c.sendCommand(cmdPing, nil)
}

// sync known service addresses
func (c *Client) syncLinks() {
err := c.sendCommand(cmdListLinks, svclistbytes())
if err != nil {
errorf("error synchronizing links: %s", err)
}
}
47 changes: 43 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,45 @@ func isCode(err error, c Status) bool {
return false
}

func TestClientServiceName(t *testing.T) {
setTestLog(t)
if tcpClient.Service() != "test-endpoint" {
t.Errorf("expected service endpoint to be %q, but got %q", "test-endpoint", tcpClient.Service())
}
if unxClient.Service() != "test-endpoint" {
t.Errorf("expected service endpoint to be %q, but got %q", "test-endpoint", unxClient.Service())
}
}

func TestNearest(t *testing.T) {
setTestLog(t)
svc := Nearest("test-endpoint")
if svc == nil {
t.Error("expected Nearest(test-endpoint) to return something")
}
if nwk, _ := svc.Addr(); nwk != "unix" {
t.Error("expected nearest endpoint to be a unix socket")
}

all := Services("test-endpoint")
for _, s := range all {
t.Logf("found service: %v", s)
if s.Name() != "test-endpoint" {
t.Errorf("expected name %q -- got %q", "test-endpoint", s.Name())
}
if s.HostID() != hostid {
t.Errorf("expected host id %d; got %d", hostid, s.HostID())
}
}
if len(all) < 2 {
t.Errorf("expected at least 2 services; found %d", len(all))
}
}

// open up a client and server; make
// some concurrent requests
func TestClient(t *testing.T) {
setTestLog(t)

const concurrent = 5
wg := new(sync.WaitGroup)
Expand Down Expand Up @@ -50,6 +86,7 @@ func TestClient(t *testing.T) {
// the output of the debug handler
// is only visible if '-v' is set
func TestDebugClient(t *testing.T) {
setTestLog(t)
instr := String("here's a message body!")
var outstr String
err := tcpClient.Call(DebugEcho, &instr, &outstr)
Expand All @@ -64,6 +101,7 @@ func TestDebugClient(t *testing.T) {
// test that 'nil' is a safe
// argument to requests and responses
func TestNop(t *testing.T) {
setTestLog(t)
err := tcpClient.Call(Nop, nil, nil)
if err != nil {
t.Fatal(err)
Expand All @@ -81,7 +119,7 @@ func BenchmarkTCPEcho(b *testing.B) {
time.Sleep(1 * time.Millisecond)
}()

go Serve(l, EchoHandler{})
go Serve(l, "bench-endpoint", EchoHandler{})
cl, err := Dial("tcp", "localhost:7000", 50*time.Millisecond)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -116,8 +154,9 @@ func BenchmarkUnixNoop(b *testing.B) {
l.Close()
time.Sleep(1 * time.Millisecond)
}()
go Serve(l, NopHandler{})
cl, err := Dial("unix", "bench", 50*time.Millisecond)

go Serve(l, "bench-endpoint", NopHandler{})
cl, err := Dial("unix", "bench", 1*time.Millisecond)
if err != nil {
b.Fatal(err)
}
Expand All @@ -140,7 +179,7 @@ func BenchmarkUnixNoop(b *testing.B) {
func BenchmarkPipeNoop(b *testing.B) {
srv, cln := net.Pipe()

go ServeConn(srv, NopHandler{})
go ServeConn(srv, "pipe", NopHandler{})

defer srv.Close()

Expand Down
114 changes: 93 additions & 21 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,32 @@ const (
// to the other
type command uint8

// cmdDirectory is a map of all the commands
// to their respective actions
var cmdDirectory = [_maxcommand]action{
cmdPing: ping{},
// Global directory of command handlers.
//
// If you need to implement a new command,
// add it here.
var cmdDirectory = [_maxcommand]cmdact{
cmdInvalid: {badhandle, recvbad},
cmdPing: {pinghandle, recvping},
cmdListLinks: {sendlinks, recvlinks},
}

type cmdact struct {
// handle is called by the server when
// receiving a particular command
handle func(ch *connHandler, msg []byte) ([]byte, error)

// done is called as the client-side finalizer
done func(c *Client, msg []byte)
}

// an action is the consequence
// of a command - commands are
// mapped to actions
type action interface {
// Client is the action carried out on the client side
// when it receives a command response from a server
Client(c *Client, msg []byte)

// Sever is the action carried out on the server side. It
// should return the reponse message (if any), and any
// error encountered. Errors will result in cmdInvalid
// sent to the client.
Server(ch *connHandler, msg []byte) (res []byte, err error)
// no-op handlers
func badhandle(ch *connHandler, msg []byte) ([]byte, error) {
return []byte{byte(cmdInvalid)}, nil
}

func recvbad(c *Client, msg []byte) {}

// list of commands
const (
// cmdInvalid is used
Expand All @@ -72,14 +77,81 @@ const (
// command
cmdPing

// sync service addresses
// between client and server
cmdListLinks

// a command >= _maxcommand
// is invalid
_maxcommand
)

// ping is a no-op on both sides
type ping struct{}
// client-side ping finalizer
func recvping(cl *Client, res []byte) {
r := cl.conn.RemoteAddr()
var s Service
_, err := s.UnmarshalMsg(res)
if err != nil {
errorf("server at addr %s sent a malformed ping response", r)
return
}
s.net = r.Network()
s.addr = r.String()
cache(&s)
cl.svc = s.name
}

func (p ping) Client(cl *Client, res []byte) {}
// server-side ping handler
func pinghandle(ch *connHandler, body []byte) ([]byte, error) {
s := Service{
name: string(ch.svcname),
host: hostid,
}
return s.MarshalMsg(nil)
}

func (p ping) Server(ch *connHandler, body []byte) ([]byte, error) { return nil, nil }
func recvlinks(cl *Client, res []byte) {
sl := serviceTable{}
_, err := sl.UnmarshalMsg(res)
if err != nil {
errorf("server at addr %s sent malformed links: %s", cl.conn.RemoteAddr(), err)
return
}
svcCache.Lock()
for name, sv := range sl {
list := svcCache.tab[name]
for _, s := range sv {
if s.host == hostid || !isRoutable(s) {
continue
}
list = addSvc(list, s)
}
svcCache.tab[name] = list
}
svcCache.Unlock()
}

func sendlinks(ch *connHandler, body []byte) ([]byte, error) {
sl := serviceTable{}
_, err := sl.UnmarshalMsg(body)
if err != nil {
return nil, err
}
svcCache.Lock()
body, _ = svcCache.tab.MarshalMsg(body[:0])
for name, sv := range sl {
list := svcCache.tab[name]
for _, s := range sv {
if s.host == hostid {
continue
}
if ch.route != routeOSLocal {
s.dist++
}
list = addSvc(list, s)
}
svcCache.tab[name] = list
}
svcCache.Unlock()
return body, nil
}
Loading