diff --git a/.travis.yml b/.travis.yml index aa1b91f..df6c8b5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,10 @@ language: go go: - - 1.4 \ No newline at end of file + - 1.4 + - tip + +script: + - go install ./... + - go test -v + - ./svc-test.bash diff --git a/client.go b/client.go index a7c9c48..6f585f6 100644 --- a/client.go +++ b/client.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "fmt" + "log" "net" "sync" "sync/atomic" @@ -41,6 +42,27 @@ var ( ErrTooLarge = errors.New("synapse: message body too large") ) +// ErrorLogger is the logger used +// by the package to log protocol +// errors. (In general, protocol-level +// errors are not returned directly +// to the client.) It can be set during +// initialization. If it is left +// as nil, nothing will be logged. +var ErrorLogger *log.Logger + +func errorln(s string) { + if ErrorLogger != nil { + ErrorLogger.Println(s) + } +} + +func errorf(s string, args ...interface{}) { + if ErrorLogger != nil { + ErrorLogger.Printf(s, args...) + } +} + // Dial creates a new client by dialing // the provided network and remote address. // The provided timeout is used as the timeout @@ -87,6 +109,8 @@ func NewClient(c net.Conn, timeout time.Duration) (*Client, error) { cl.Close() return nil, fmt.Errorf("synapse: ping failed: %s", err) } + // sync links asap + go cl.syncLinks() return cl, nil } @@ -94,6 +118,7 @@ func NewClient(c net.Conn, timeout time.Duration) (*Client, error) { // Client is a client to // a single synapse server. type Client struct { + svc string conn net.Conn // connection wlock sync.Mutex // write lock csn uint64 // sequence number; atomic @@ -104,6 +129,10 @@ type Client struct { pending wMap // map seq number to waiting handler } +func (c *Client) Service() string { + return c.svc +} + // used to transfer control // flow to blocking goroutines type waiter struct { @@ -193,6 +222,7 @@ func (c *Client) readLoop() { // they are routed to waiters // precisely the same way if frame != fCMD && frame != fRES { + errorf("server at addr %s sent a bad frame", c.conn.RemoteAddr()) // ignore if !c.do(bwr.Skip(sz)) { return @@ -434,17 +464,11 @@ func (c *Client) sendCommand(cmd command, msg []byte) error { ret := command(w.in[0]) if ret == cmdInvalid || ret >= _maxcommand { - waiters.push(w) - return errInvalidCmd - } - - act := cmdDirectory[ret] - if act == nil { waiters.push(w) return errUnknownCmd } - act.Client(c, w.in[1:]) + cmdDirectory[ret].done(c, w.in[1:]) waiters.push(w) return nil } @@ -455,3 +479,11 @@ func (c *Client) sendCommand(cmd command, msg []byte) error { func (c *Client) ping() error { return c.sendCommand(cmdPing, nil) } + +// sync known service addresses +func (c *Client) syncLinks() { + err := c.sendCommand(cmdListLinks, svclistbytes()) + if err != nil { + errorf("error synchronizing links: %s", err) + } +} diff --git a/client_test.go b/client_test.go index 2bceec8..1a6519d 100644 --- a/client_test.go +++ b/client_test.go @@ -15,9 +15,45 @@ func isCode(err error, c Status) bool { return false } +func TestClientServiceName(t *testing.T) { + setTestLog(t) + if tcpClient.Service() != "test-endpoint" { + t.Errorf("expected service endpoint to be %q, but got %q", "test-endpoint", tcpClient.Service()) + } + if unxClient.Service() != "test-endpoint" { + t.Errorf("expected service endpoint to be %q, but got %q", "test-endpoint", unxClient.Service()) + } +} + +func TestNearest(t *testing.T) { + setTestLog(t) + svc := Nearest("test-endpoint") + if svc == nil { + t.Error("expected Nearest(test-endpoint) to return something") + } + if nwk, _ := svc.Addr(); nwk != "unix" { + t.Error("expected nearest endpoint to be a unix socket") + } + + all := Services("test-endpoint") + for _, s := range all { + t.Logf("found service: %v", s) + if s.Name() != "test-endpoint" { + t.Errorf("expected name %q -- got %q", "test-endpoint", s.Name()) + } + if s.HostID() != hostid { + t.Errorf("expected host id %d; got %d", hostid, s.HostID()) + } + } + if len(all) < 2 { + t.Errorf("expected at least 2 services; found %d", len(all)) + } +} + // open up a client and server; make // some concurrent requests func TestClient(t *testing.T) { + setTestLog(t) const concurrent = 5 wg := new(sync.WaitGroup) @@ -50,6 +86,7 @@ func TestClient(t *testing.T) { // the output of the debug handler // is only visible if '-v' is set func TestDebugClient(t *testing.T) { + setTestLog(t) instr := String("here's a message body!") var outstr String err := tcpClient.Call(DebugEcho, &instr, &outstr) @@ -64,6 +101,7 @@ func TestDebugClient(t *testing.T) { // test that 'nil' is a safe // argument to requests and responses func TestNop(t *testing.T) { + setTestLog(t) err := tcpClient.Call(Nop, nil, nil) if err != nil { t.Fatal(err) @@ -81,7 +119,7 @@ func BenchmarkTCPEcho(b *testing.B) { time.Sleep(1 * time.Millisecond) }() - go Serve(l, EchoHandler{}) + go Serve(l, "bench-endpoint", EchoHandler{}) cl, err := Dial("tcp", "localhost:7000", 50*time.Millisecond) if err != nil { b.Fatal(err) @@ -116,8 +154,9 @@ func BenchmarkUnixNoop(b *testing.B) { l.Close() time.Sleep(1 * time.Millisecond) }() - go Serve(l, NopHandler{}) - cl, err := Dial("unix", "bench", 50*time.Millisecond) + + go Serve(l, "bench-endpoint", NopHandler{}) + cl, err := Dial("unix", "bench", 1*time.Millisecond) if err != nil { b.Fatal(err) } @@ -140,7 +179,7 @@ func BenchmarkUnixNoop(b *testing.B) { func BenchmarkPipeNoop(b *testing.B) { srv, cln := net.Pipe() - go ServeConn(srv, NopHandler{}) + go ServeConn(srv, "pipe", NopHandler{}) defer srv.Close() diff --git a/command.go b/command.go index aa108b7..c0b3fd1 100644 --- a/command.go +++ b/command.go @@ -38,27 +38,32 @@ const ( // to the other type command uint8 -// cmdDirectory is a map of all the commands -// to their respective actions -var cmdDirectory = [_maxcommand]action{ - cmdPing: ping{}, +// Global directory of command handlers. +// +// If you need to implement a new command, +// add it here. +var cmdDirectory = [_maxcommand]cmdact{ + cmdInvalid: {badhandle, recvbad}, + cmdPing: {pinghandle, recvping}, + cmdListLinks: {sendlinks, recvlinks}, +} + +type cmdact struct { + // handle is called by the server when + // receiving a particular command + handle func(ch *connHandler, msg []byte) ([]byte, error) + + // done is called as the client-side finalizer + done func(c *Client, msg []byte) } -// an action is the consequence -// of a command - commands are -// mapped to actions -type action interface { - // Client is the action carried out on the client side - // when it receives a command response from a server - Client(c *Client, msg []byte) - - // Sever is the action carried out on the server side. It - // should return the reponse message (if any), and any - // error encountered. Errors will result in cmdInvalid - // sent to the client. - Server(ch *connHandler, msg []byte) (res []byte, err error) +// no-op handlers +func badhandle(ch *connHandler, msg []byte) ([]byte, error) { + return []byte{byte(cmdInvalid)}, nil } +func recvbad(c *Client, msg []byte) {} + // list of commands const ( // cmdInvalid is used @@ -72,14 +77,81 @@ const ( // command cmdPing + // sync service addresses + // between client and server + cmdListLinks + // a command >= _maxcommand // is invalid _maxcommand ) -// ping is a no-op on both sides -type ping struct{} +// client-side ping finalizer +func recvping(cl *Client, res []byte) { + r := cl.conn.RemoteAddr() + var s Service + _, err := s.UnmarshalMsg(res) + if err != nil { + errorf("server at addr %s sent a malformed ping response", r) + return + } + s.net = r.Network() + s.addr = r.String() + cache(&s) + cl.svc = s.name +} -func (p ping) Client(cl *Client, res []byte) {} +// server-side ping handler +func pinghandle(ch *connHandler, body []byte) ([]byte, error) { + s := Service{ + name: string(ch.svcname), + host: hostid, + } + return s.MarshalMsg(nil) +} -func (p ping) Server(ch *connHandler, body []byte) ([]byte, error) { return nil, nil } +func recvlinks(cl *Client, res []byte) { + sl := serviceTable{} + _, err := sl.UnmarshalMsg(res) + if err != nil { + errorf("server at addr %s sent malformed links: %s", cl.conn.RemoteAddr(), err) + return + } + svcCache.Lock() + for name, sv := range sl { + list := svcCache.tab[name] + for _, s := range sv { + if s.host == hostid || !isRoutable(s) { + continue + } + list = addSvc(list, s) + } + svcCache.tab[name] = list + } + svcCache.Unlock() +} + +func sendlinks(ch *connHandler, body []byte) ([]byte, error) { + sl := serviceTable{} + _, err := sl.UnmarshalMsg(body) + if err != nil { + return nil, err + } + svcCache.Lock() + body, _ = svcCache.tab.MarshalMsg(body[:0]) + for name, sv := range sl { + list := svcCache.tab[name] + for _, s := range sv { + if s.host == hostid { + continue + } + if ch.route != routeOSLocal { + s.dist++ + } + list = addSvc(list, s) + } + svcCache.tab[name] = list + } + svcCache.Unlock() + return body, nil +} diff --git a/server.go b/server.go index 5e8b80f..12ceaab 100644 --- a/server.go +++ b/server.go @@ -31,13 +31,22 @@ const ( // Serve starts a Server on 'l' that serves // the supplied handler. It blocks until the // listener closes. -func Serve(l net.Listener, h Handler) error { +func Serve(l net.Listener, service string, h Handler) error { + a := l.Addr() + s := Service{ + name: service, + net: a.Network(), + addr: a.String(), + host: hostid, + } + cache(&s) for { c, err := l.Accept() if err != nil { + uncache(&s) return err } - go ServeConn(c, h) + go ServeConn(c, service, h) } } @@ -47,7 +56,7 @@ func Serve(l net.Listener, h Handler) error { // Server must be provided. If the certificate is signed by a // certificate authority, the certFile should be the concatenation of // the server's certificate followed by the CA's certificate. -func ListenAndServeTLS(network, laddr string, certFile, keyFile string, h Handler) error { +func ListenAndServeTLS(network, laddr, service, certFile, keyFile string, h Handler) error { cert, err := tls.LoadX509KeyPair(certFile, keyFile) if err != nil { return err @@ -59,7 +68,7 @@ func ListenAndServeTLS(network, laddr string, certFile, keyFile string, h Handle if err != nil { return err } - return Serve(l, h) + return Serve(l, service, h) } // ListenAndServe opens up a network listener @@ -67,23 +76,25 @@ func ListenAndServeTLS(network, laddr string, certFile, keyFile string, h Handle // and begins serving with the provided handler. // ListenAndServe blocks until there is a fatal // listener error. -func ListenAndServe(network string, laddr string, h Handler) error { +func ListenAndServe(network, laddr, service string, h Handler) error { l, err := net.Listen(network, laddr) if err != nil { return err } - return Serve(l, h) + return Serve(l, service, h) } // ServeConn serves an individual network // connection. It blocks until the connection // is closed or it encounters a fatal error. -func ServeConn(c net.Conn, h Handler) { +func ServeConn(c net.Conn, service string, h Handler) { ch := connHandler{ + svcname: service, conn: c, h: h, remote: c.RemoteAddr(), writing: make(chan *connWrapper, 32), + route: getRoute(c.RemoteAddr()), } go ch.writeLoop() ch.connLoop() // returns on connection close @@ -116,15 +127,53 @@ func putFrame(bts []byte, seq uint64, ft fType, sz int) { bts[10] = byte(usz) } +// route is the network pathway +// assocaited with a particular +// conneciton (global, link-local, +// loopback, etc.) +type route uint16 + +const ( + routeUnknown route = iota // no idea + routeGlobal // globally routable + routeLinkLocal // link-local + routeOSLocal // same machine (unix socket / loopback) +) + +func getRoute(raddr net.Addr) route { + nwk := raddr.Network() + switch nwk { + case "unix": + return routeOSLocal + case "tcp", "tcp4", "tcp6": + a, err := net.ResolveTCPAddr(nwk, raddr.String()) + if err != nil { + return routeUnknown + } + if a.IP.IsLoopback() { + return routeOSLocal + } else if a.IP.IsLinkLocalUnicast() { + return routeLinkLocal + } else if a.IP.IsGlobalUnicast() { + return routeGlobal + } + fallthrough + default: + return routeUnknown + } +} + // connHandler handles network // connections and multiplexes requests // to connWrappers type connHandler struct { + svcname string h Handler conn net.Conn remote net.Addr wg sync.WaitGroup // outstanding handlers writing chan *connWrapper // write queue + route route // from whence? } func (c *connHandler) writeLoop() error { @@ -310,18 +359,13 @@ func (c *connHandler) handleReq(cw *connWrapper) { } func handleCmd(c *connHandler, seq uint64, cmd command, body []byte) { - if cmd == cmdInvalid || cmd >= _maxcommand { - return - } - - act := cmdDirectory[cmd] resbyte := byte(cmd) var res []byte var err error - if act == nil { + if cmd == cmdInvalid || cmd >= _maxcommand { resbyte = byte(cmdInvalid) } else { - res, err = act.Server(c, body) + res, err = cmdDirectory[cmd].handle(c, body) if err != nil { resbyte = byte(cmdInvalid) } diff --git a/service.go b/service.go new file mode 100644 index 0000000..49d9ee5 --- /dev/null +++ b/service.go @@ -0,0 +1,181 @@ +package synapse + +import ( + "crypto/rand" + "encoding/binary" + "fmt" + "net" + "sort" + "sync" +) + +//go:generate msgp -unexported -io=false + +// Nearest finds the "nearest" service +// with the given name. +func Nearest(service string) (s *Service) { + svcCache.Lock() + l := svcCache.tab[service] + if len(l) > 0 { + s = l[0] + } + svcCache.Unlock() + return +} + +// Services lists all of the known +// services endpoints for a given +// service name. +func Services(name string) []*Service { + svcCache.Lock() + l := svcCache.tab[name] + if len(l) == 0 { + svcCache.Unlock() + return nil + } + dup := make([]*Service, len(l)) + copy(dup, l) + svcCache.Unlock() + return dup +} + +// Service represents a unique address +// associated with a service. +type Service struct { + host uint64 // host id + name, net, addr string // address + dist int32 // distance +} + +func (s *Service) HostID() uint64 { return s.host } + +func (s *Service) Name() string { return s.name } + +func (s *Service) eqaddr(g *Service) bool { + return s.net == g.net && s.addr == g.addr +} + +// String returns {Name}#{HostID}@{net}:{addr} +// e.g. echo#9081234973@tcp:localhost:7000 +func (s *Service) String() string { + return fmt.Sprintf("%s#%d@%s:%s", s.name, s.host, s.net, s.addr) +} + +func (s *Service) Addr() (net, addr string) { + net, addr = s.net, s.addr + return +} + +type serviceList []*Service + +func (s serviceList) Len() int { return len(s) } +func (s serviceList) Less(i, j int) bool { + + // for os-local addresses, prefer unix sockets + // over loopback tcp + if s[i].dist == 0 && s[i].dist == s[j].dist { + if s[i].net == "unix" && s[j].net != "unix" { + return true + } + } + + return s[i].dist < s[j].dist +} + +func (s serviceList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func addSvc(list []*Service, s *Service) []*Service { + if len(list) == 0 { + return []*Service{s} + } + for _, sv := range list { + if sv.eqaddr(s) { + if s.dist < sv.dist { + sv.dist = s.dist + } + return list + } + } + out := append(list, s) + sort.Sort(serviceList(out)) + return out +} + +func removeSvc(list []*Service, s *Service) []*Service { + ll := len(list) + if ll == 0 { + return nil + } + for i, sv := range list { + if sv.eqaddr(s) { + list[i], list[ll-1], list = list[ll-1], nil, list[:ll-1] + sort.Sort(serviceList(list)) + return list + } + } + return list +} + +type serviceTable map[string]serviceList + +// svcCache stores all of the +// known services to this hostid. +// +var svcCache struct { + sync.Mutex + tab serviceTable +} + +// host id +var hostid uint64 + +func HostID() uint64 { return hostid } + +func init() { + svcCache.tab = make(serviceTable) + + var buf [8]byte + rand.Read(buf[:]) + hostid = binary.LittleEndian.Uint64(buf[:]) +} + +func isRoutable(s *Service) bool { + // unix sockets, etc. can + // be connected to on the + // same machine + if s.dist == 0 { + return true + } + + switch s.net { + case "tcp", "tcp6", "tcp4": + a, err := net.ResolveTCPAddr(s.net, s.addr) + if err != nil { + errorf("couldn't resolve tcp addr %s: %s", s.addr, err) + return false + } + // TODO: link-local addresses + return a.IP.IsGlobalUnicast() + default: + return false + } +} + +func cache(s *Service) { + svcCache.Lock() + svcCache.tab[s.name] = addSvc(svcCache.tab[s.name], s) + svcCache.Unlock() +} + +func uncache(s *Service) { + svcCache.Lock() + svcCache.tab[s.name] = removeSvc(svcCache.tab[s.name], s) + svcCache.Unlock() +} + +func svclistbytes() []byte { + svcCache.Lock() + data, _ := svcCache.tab.MarshalMsg(nil) + svcCache.Unlock() + return data +} diff --git a/service_gen.go b/service_gen.go new file mode 100644 index 0000000..497909f --- /dev/null +++ b/service_gen.go @@ -0,0 +1,244 @@ +package synapse + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import ( + "github.com/tinylib/msgp/msgp" +) + +// MarshalMsg implements msgp.Marshaler +func (z *Service) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 5 + // string "host" + o = append(o, 0x85, 0xa4, 0x68, 0x6f, 0x73, 0x74) + o = msgp.AppendUint64(o, z.host) + // string "name" + o = append(o, 0xa4, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.name) + // string "net" + o = append(o, 0xa3, 0x6e, 0x65, 0x74) + o = msgp.AppendString(o, z.net) + // string "addr" + o = append(o, 0xa4, 0x61, 0x64, 0x64, 0x72) + o = msgp.AppendString(o, z.addr) + // string "dist" + o = append(o, 0xa4, 0x64, 0x69, 0x73, 0x74) + o = msgp.AppendInt32(o, z.dist) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *Service) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var isz uint32 + isz, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for isz > 0 { + isz-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "host": + z.host, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "name": + z.name, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + case "net": + z.net, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + case "addr": + z.addr, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + case "dist": + z.dist, bts, err = msgp.ReadInt32Bytes(bts) + if err != nil { + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +func (z *Service) Msgsize() (s int) { + s = 1 + 5 + msgp.Uint64Size + 5 + msgp.StringPrefixSize + len(z.name) + 4 + msgp.StringPrefixSize + len(z.net) + 5 + msgp.StringPrefixSize + len(z.addr) + 5 + msgp.Int32Size + return +} + +// MarshalMsg implements msgp.Marshaler +func (z serviceList) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendArrayHeader(o, uint32(len(z))) + for xvk := range z { + if z[xvk] == nil { + o = msgp.AppendNil(o) + } else { + o, err = z[xvk].MarshalMsg(o) + if err != nil { + return + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *serviceList) UnmarshalMsg(bts []byte) (o []byte, err error) { + var xsz uint32 + xsz, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + return + } + if cap((*z)) >= int(xsz) { + (*z) = (*z)[:xsz] + } else { + (*z) = make(serviceList, xsz) + } + for bzg := range *z { + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + (*z)[bzg] = nil + } else { + if (*z)[bzg] == nil { + (*z)[bzg] = new(Service) + } + bts, err = (*z)[bzg].UnmarshalMsg(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +func (z serviceList) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + for bai := range z { + if z[bai] == nil { + s += msgp.NilSize + } else { + s += z[bai].Msgsize() + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z serviceTable) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendMapHeader(o, uint32(len(z))) + for cmr, ajw := range z { + o = msgp.AppendString(o, cmr) + o = msgp.AppendArrayHeader(o, uint32(len(ajw))) + for wht := range ajw { + if ajw[wht] == nil { + o = msgp.AppendNil(o) + } else { + o, err = ajw[wht].MarshalMsg(o) + if err != nil { + return + } + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *serviceTable) UnmarshalMsg(bts []byte) (o []byte, err error) { + var msz uint32 + msz, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + if (*z) == nil && msz > 0 { + (*z) = make(serviceTable, msz) + } else if len((*z)) > 0 { + for key, _ := range *z { + delete((*z), key) + } + } + for msz > 0 { + var hct string + var cua serviceList + msz-- + hct, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + var xsz uint32 + xsz, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + return + } + if cap(cua) >= int(xsz) { + cua = cua[:xsz] + } else { + cua = make(serviceList, xsz) + } + for xhx := range cua { + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + cua[xhx] = nil + } else { + if cua[xhx] == nil { + cua[xhx] = new(Service) + } + bts, err = cua[xhx].UnmarshalMsg(bts) + if err != nil { + return + } + } + } + (*z)[hct] = cua + } + o = bts + return +} + +func (z serviceTable) Msgsize() (s int) { + s = msgp.MapHeaderSize + if z != nil { + for lqf, daf := range z { + _ = daf + s += msgp.StringPrefixSize + len(lqf) + msgp.ArrayHeaderSize + for pks := range daf { + if daf[pks] == nil { + s += msgp.NilSize + } else { + s += daf[pks].Msgsize() + } + } + } + } + return +} diff --git a/service_gen_test.go b/service_gen_test.go new file mode 100644 index 0000000..ea5eb54 --- /dev/null +++ b/service_gen_test.go @@ -0,0 +1,185 @@ +package synapse + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import ( + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalService(t *testing.T) { + v := Service{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgService(b *testing.B) { + v := Service{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgService(b *testing.B) { + v := Service{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalService(b *testing.B) { + v := Service{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalserviceList(t *testing.T) { + v := serviceList{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgserviceList(b *testing.B) { + v := serviceList{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgserviceList(b *testing.B) { + v := serviceList{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalserviceList(b *testing.B) { + v := serviceList{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalserviceTable(t *testing.T) { + v := serviceTable{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgserviceTable(b *testing.B) { + v := serviceTable{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgserviceTable(b *testing.B) { + v := serviceTable{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalserviceTable(b *testing.B) { + v := serviceTable{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/setup_test.go b/setup_test.go index ef72d21..c3efde9 100644 --- a/setup_test.go +++ b/setup_test.go @@ -6,6 +6,7 @@ import ( "log" "net" "os" + "sync" "testing" "time" @@ -30,8 +31,32 @@ var ( rt *RouteTable ct testing.T + + loglock sync.Mutex ) +type faillog struct { + t *testing.T +} + +func (f *faillog) Write(b []byte) (int, error) { + loglock.Lock() + f.t.Error("error logged") + i, err := os.Stderr.Write(b) + loglock.Unlock() + return i, err +} + +func fail(t *testing.T) io.Writer { + return &faillog{t} +} + +func setTestLog(t *testing.T) { + loglock.Lock() + ErrorLogger = log.New(fail(t), "synapse-error-log: ", log.LstdFlags) + loglock.Unlock() +} + type testData []byte func (s *testData) MarshalMsg(b []byte) ([]byte, error) { @@ -97,14 +122,14 @@ func TestMain(m *testing.M) { panic(err) } - go Serve(l, rt) + go Serve(l, "test-endpoint", rt) ul, err := net.Listen("unix", "synapse") if err != nil { panic(err) } - go Serve(ul, rt) + go Serve(ul, "test-endpoint", rt) tcpClient, err = Dial("tcp", ":7070", 5*time.Millisecond) if err != nil { diff --git a/svc-test.bash b/svc-test.bash new file mode 100755 index 0000000..9aaf08d --- /dev/null +++ b/svc-test.bash @@ -0,0 +1,10 @@ +#! /bin/bash +set -e +ssts & +SSTS_PID=$! +sleep 0.1 +set +e +sstc +RET=$? +eval kill -s SIGINT $SSTS_PID || echo "Couldn't kill server." +exit $RET diff --git a/svc_test/sstc/main.go b/svc_test/sstc/main.go new file mode 100644 index 0000000..f90af9a --- /dev/null +++ b/svc_test/sstc/main.go @@ -0,0 +1,92 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "time" + + "github.com/tinylib/synapse" +) + +var ( + port = flag.String("port", ":7005", "tcp port to dial") +) + +func fatalln(str string) { + fmt.Println(str) + os.Exit(1) +} + +func perror(str string, err error) { + fmt.Println(str, err) + os.Exit(1) +} + +func dumpservices(srv string) { + fmt.Print("Known addresses for service ", srv) + sv := synapse.Services(srv) + if len(sv) == 0 { + fmt.Print(": None.\n") + return + } + fmt.Print("\n") + for _, s := range sv { + fmt.Println("\t", s) + } +} + +func init() { + synapse.ErrorLogger = log.New(os.Stderr, "syn-client-log: ", log.LstdFlags) +} + +func main() { + cl, err := synapse.Dial("tcp", *port, 25*time.Millisecond) + if err != nil { + perror("client: dial failure:", err) + } + fmt.Println("client: connected to service", cl.Service()) + + err = cl.Call(0, synapse.String("hello!"), nil) + if err != nil { + perror("client: call error:", err) + } + + // wait for service lists to synchronize + time.Sleep(50 * time.Millisecond) + + dumpservices(cl.Service()) + + ss := synapse.Nearest(cl.Service()) + if ss == nil { + fatalln("client: Nearest() returned nil") + } + + nwk, sock := ss.Addr() + if nwk != "unix" { + fatalln("client: Nearest(service).Addr() didn't return a unix socket") + } + + fmt.Println("client: found socket", sock) + var cl2 *synapse.Client + + cl2, err = synapse.Dial(nwk, sock, 25*time.Millisecond) + if err != nil { + perror("client: couldn't dial socket:", err) + } + // give the second client time + // to complete another round of + // list synchronization. + time.Sleep(50 * time.Millisecond) + err = cl2.Close() + if err != nil { + perror("client: close error:", err) + } + + err = cl.Close() + if err != nil { + perror("client: close error:", err) + } + fmt.Println("client: exiting successfully") +} diff --git a/svc_test/ssts/main.go b/svc_test/ssts/main.go new file mode 100644 index 0000000..a5b35d7 --- /dev/null +++ b/svc_test/ssts/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net" + "os" + "os/signal" + "strings" + + "github.com/tinylib/msgp/msgp" + "github.com/tinylib/synapse" +) + +var ( + sock = flag.String("sock", "ssvc_sock", "unix socket to listen on") + port = flag.String("port", ":7005", "tcp port to dial") +) + +func fatalln(str string) { + fmt.Println(str) + os.Exit(1) +} + +func perror(str string, err error) { + fmt.Println(str, err) + os.Exit(1) +} + +type echoHandler struct{} + +func (e echoHandler) ServeCall(req synapse.Request, res synapse.ResponseWriter) { + var r msgp.Raw + if err := req.Decode(&r); err != nil { + perror("server: error on Request.Decode():", err) + } + if err := res.Send(r); err != nil { + perror("server: error on ResponseWriter.Send():", err) + } + + sv := synapse.Services("service-test") + for _, s := range sv { + fmt.Println("serrver: known service:", s) + } +} + +func init() { + synapse.ErrorLogger = log.New(os.Stderr, "syn-server-log: ", log.LstdFlags) +} + +func main() { + go func() { + err := synapse.ListenAndServe("tcp", *port, "service-test", echoHandler{}) + if err != nil { + perror("server: listen tcp error:", err) + } + }() + ul, err := net.Listen("unix", *sock) + if err != nil { + perror("server: listen unix error:", err) + } + + // cleanup unix socket + // on kill so we don't + // leave it lying around + go func() { + in := make(chan os.Signal, 1) + signal.Notify(in, os.Interrupt) + <-in + ul.Close() + fmt.Println("server: exiting successfully") + os.Exit(0) + }() + + err = synapse.Serve(ul, "service-test", echoHandler{}) + if err != nil && !strings.Contains(err.Error(), "closed") { + perror("server: serve unix error:", err) + } +}