-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstream_messenger.go
155 lines (139 loc) · 3.85 KB
/
stream_messenger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright 2017 Grigory Zubankov. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
//
// +build linux darwin
package zerodt
import (
"encoding/binary"
"encoding/json"
"errors"
"io"
"net"
"os"
"time"
)
// StreamMessenger a simple messenger based on net.Conn.
// The simplest way to create messenger is to use syscall.Socketpair():
//
// fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
// f0 := os.NewFile(uintptr(fds[0]), "s|0")
// f1 := os.NewFile(uintptr(fds[1]), "s|1")
//
// m0, err := ListenSocket(f0)
// m1, err := ListenSocket(f1)
//
//
// Packet format:
// +-----------------------+---------+
// | Header (8 bytes) | Payload |
// +-----------------------+---------+
// | MagicN | Payload Size | Payload |
// +-----------------------+---------+
//
type StreamMessenger struct {
c net.Conn
}
// ListenSocket TODO
func ListenSocket(s *os.File) (*StreamMessenger, error) {
defer s.Close()
c, err := net.FileConn(s)
if err != nil {
return nil, err
}
return &StreamMessenger{c}, nil
}
// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail with a timeout (see type Error) instead of
// blocking. The deadline applies to all future and pending
// I/O, not just the immediately following call to Read or
// Write. After a deadline has been exceeded, the connection
// can be refreshed by setting a deadline in the future.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (m *StreamMessenger) SetDeadline(t time.Time) error {
return m.c.SetDeadline(t)
}
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (m *StreamMessenger) SetReadDeadline(t time.Time) error {
return m.c.SetReadDeadline(t)
}
// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (m *StreamMessenger) SetWriteDeadline(t time.Time) error {
return m.c.SetWriteDeadline(t)
}
// Recv receives a message from the channel.
func (m *StreamMessenger) Recv(v interface{}) (err error) {
b, err := m.recv()
if err != nil {
return err
}
return json.Unmarshal(b, v)
}
// Send sends a message to the channel.
func (m *StreamMessenger) Send(v interface{}) error {
b, err := json.Marshal(v)
if err != nil {
return err
}
return m.send(b)
}
func (m *StreamMessenger) recv() ([]byte, error) {
h := header{}
err := binary.Read(m.c, binary.LittleEndian, &h)
if err != nil {
return nil, err
}
if !isValidHeader(h) {
return nil, errors.New("StreamMessenger: the header is invalid")
}
// Read the whole message to avoid breaking the stream.
rs := make([]byte, h.Size)
_, err = io.ReadFull(m.c, rs)
if err != nil {
return nil, err
}
return rs, nil
}
func (m *StreamMessenger) send(data []byte) error {
err := binary.Write(m.c, binary.LittleEndian, newHeader(len(data)))
if err != nil {
return err
}
_, err = m.c.Write(data)
if err != nil {
return err
}
return nil
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (m *StreamMessenger) Close() error {
return m.c.Close()
}
const (
headerPrefix = uint32(0x5a45524f)
)
type header struct {
Prefix uint32
Size uint32
}
func newHeader(size int) header {
return header{headerPrefix, uint32(size)}
}
func isValidHeader(h header) bool {
return h.Prefix == headerPrefix
}