Skip to content

Commit d2b328a

Browse files
committed
Initial commit
0 parents  commit d2b328a

File tree

112 files changed

+27556
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

112 files changed

+27556
-0
lines changed

.DS_Store

6 KB
Binary file not shown.

.gitattributes

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Auto detect text files and perform LF normalization
2+
* text=auto

Dockerfile

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
FROM golang:1.18 AS builder
2+
3+
RUN apk update
4+
RUN apk add git
5+
6+
WORKDIR /app
7+
8+
COPY go.mod ./
9+
COPY go.sum ./
10+
RUN go mod download
11+
12+
COPY . ./
13+
14+
RUN go build -o /app/comqtt ./cmd/single
15+
16+
17+
FROM alpine
18+
19+
WORKDIR /
20+
COPY --from=builder /app/comqtt .
21+
22+
# tcp
23+
EXPOSE 1883
24+
25+
# websockets
26+
EXPOSE 1882
27+
28+
# dashboard
29+
EXPOSE 8080
30+
31+
ENTRYPOINT [ "/comqtt" ]

LICENSE.md

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
2+
The MIT License (MIT)
3+
4+
Copyright (c) 2021 Wind (comqtt)
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy
7+
of this software and associated documentation files (the "Software"), to deal
8+
in the Software without restriction, including without limitation the rights
9+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
copies of the Software, and to permit persons to whom the Software is
11+
furnished to do so, subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
SOFTWARE.

README.md

+324
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
2+
<p align="center">
3+
4+
[![Build Status](https://travis-ci.com/wind-c/comqtt.svg?token=59nqixhtefy2iQRwsPcu&branch=master)](https://travis-ci.com/wind-c/comqtt)
5+
[![contributions welcome](https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat)](https://github.com/wind-c/comqtt/issues)
6+
[![codecov](https://codecov.io/gh/wind-c/comqtt/branch/master/graph/badge.svg?token=6vBUgYVaVB)](https://codecov.io/gh/wind-c/comqtt)
7+
[![GoDoc](https://godoc.org/github.com/wind-c/comqtt?status.svg)](https://pkg.go.dev/github.com/wind-c/comqtt)
8+
9+
</p>
10+
11+
# CoMQTT
12+
### A High-performance MQTT server in Go (v3.0 | v3.1.1 | v5.0) and support the cluster
13+
14+
CoMQTT is an embeddable high-performance MQTT broker server written in Go, and compliant with the MQTT v3.0 and v3.1.1 and v5.0 specification for the development of IoT and smarthome projects. The server can be used either as a standalone binary or embedded as a library in your own projects. CoMQTT message throughput is comparable with everyone's favourites such as Mosquitto, Mosca, and VerneMQ.
15+
16+
CoMQTT is based on mochi-co/mqtt, its a very nice code base, thank you so much Mochi!
17+
18+
> #### 📦 💬 See Github Discussions for discussions about releases
19+
> Ongoing discussion about current and future releases can be found at https://github.com/wind-c/comqtt/discussions
20+
21+
#### What is MQTT?
22+
MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. [Learn more](https://mqtt.org/faq)
23+
24+
#### CoMQTT Features
25+
- Paho MQTT 3.0 / 3.1.1 / 5.0 compatible.
26+
- Full MQTT Feature-set (QoS, Retained, $SYS).
27+
- Trie-based Subscription model.
28+
- Ring Buffer packet codec.
29+
- TCP, Websocket, (including SSL/TLS) and Dashboard listeners.
30+
- Interfaces for Client Authentication and Topic access control.
31+
- Bolt persistence and storage interfaces (see examples folder).
32+
- Directly Publishing from embedding service (`s.Publish(topic, message, retain)`).
33+
- Basic Event Hooks (`OnMessage`, `onSubscribe`, `onUnsubscribe`, `OnConnect`, `OnDisconnect`, `onProcessMessage`, `OnError`, `OnStorage`).
34+
- ARM32 Compatible.
35+
- [Cluster Supported](cmd/cluster/README.md).
36+
37+
#### Roadmap
38+
- Auth plugin
39+
- Rule engine
40+
- Bridge
41+
- CoAP
42+
43+
#### Using the Broker
44+
CoMQTT can be used as a standalone broker. Simply checkout this repository and run the `main.go` entrypoint in the `cmd` folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners. A docker image is coming soon.
45+
46+
```
47+
cd cmd
48+
go build -o commqtt && ./comqtt
49+
```
50+
51+
#### Using Docker
52+
53+
A simple Dockerfile is provided for running the `cmd/single/main.go` Websocket, TCP, and Stats server:
54+
55+
```sh
56+
docker build -t comqtt:latest .
57+
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 comqtt:latest
58+
```
59+
60+
#### Quick Start
61+
62+
``` go
63+
import (
64+
mqtt "github.com/wind-c/comqtt/server"
65+
)
66+
67+
func main() {
68+
// Create the new MQTT Server.
69+
server := mqtt.NewServer(nil)
70+
71+
// Create a TCP listener on a standard port.
72+
tcp := listeners.NewTCP("t1", ":1883")
73+
74+
// Add the listener to the server with default options (nil).
75+
err := server.AddListener(tcp, nil)
76+
if err != nil {
77+
log.Fatal(err)
78+
}
79+
80+
// Start the broker. Serve() is blocking - see examples folder
81+
// for usage ideas.
82+
err = server.Serve()
83+
if err != nil {
84+
log.Fatal(err)
85+
}
86+
}
87+
```
88+
89+
Examples of running the broker with various configurations can be found in the `examples` folder.
90+
91+
#### Network Listeners
92+
The server comes with a variety of pre-packaged network listeners which allow the broker to accept connections on different protocols. The current listeners are:
93+
- `listeners.NewTCP(id, address string)` - A TCP Listener, taking a unique ID and a network address to bind.
94+
- `listeners.NewWebsocket(id, address string)` A Websocket Listener
95+
- `listeners.NewHTTPStats()` An HTTP $SYS info dashboard
96+
97+
##### Configuring Network Listeners
98+
When a listener is added to the server using `server.AddListener`, a `*listeners.Config` may be passed as the second argument.
99+
100+
##### Authentication and ACL
101+
Authentication and ACL may be configured on a per-listener basis by providing an Auth Controller to the listener configuration. Custom Auth Controllers should satisfy the `auth.Controller` interface found in `listeners/auth`. Two default controllers are provided, `auth.Allow`, which allows all traffic, and `auth.Disallow`, which denies all traffic.
102+
103+
```go
104+
err := server.AddListener(tcp, &listeners.Config{
105+
Auth: new(auth.Allow),
106+
})
107+
```
108+
109+
> If no auth controller is provided in the listener configuration, the server will default to _Disallowing_ all traffic to prevent unintentional security issues.
110+
111+
##### SSL
112+
SSL may be configured on both the TCP and Websocket listeners by providing a public-private PEM key pair to the listener configuration as `[]byte` slices.
113+
```go
114+
err := server.AddListener(tcp, &listeners.Config{
115+
Auth: new(auth.Allow),
116+
TLS: &listeners.TLS{
117+
Certificate: publicCertificate,
118+
PrivateKey: privateKey,
119+
},
120+
})
121+
```
122+
> Note the mandatory inclusion of the Auth Controller!
123+
124+
#### Event Hooks
125+
Some basic Event Hooks have been added, allowing you to call your own functions when certain events occur. The execution of the functions are blocking - if necessary, please handle goroutines within the embedding service.
126+
127+
Working examples can be found in the `examples/events` folder. Please open an issue if there is a particular event hook you are interested in!
128+
129+
##### OnConnect
130+
`server.Events.OnConnect` is called when a client successfully connects to the broker. The method receives the connect packet and the id and connection type for the client who connected.
131+
132+
```go
133+
import "github.com/wind-c/comqtt/server/events"
134+
135+
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
136+
fmt.Printf("<< OnConnect client connected %s: %+v\n", cl.ID, pk)
137+
}
138+
```
139+
140+
##### OnDisconnect
141+
`server.Events.OnDisconnect` is called when a client disconnects to the broker. If the client disconnected abnormally, the reason is indicated in the `err` error parameter.
142+
143+
```go
144+
server.Events.OnDisconnect = func(cl events.Client, err error) {
145+
fmt.Printf("<< OnDisconnect client disconnected %s: %v\n", cl.ID, err)
146+
}
147+
```
148+
149+
##### OnSubscribe
150+
`server.Events.OnSubscribe` is called when a client subscribes to a new topic filter.
151+
152+
```go
153+
server.Events.OnSubscribe = func(filter string, cl events.Client, qos byte) {
154+
fmt.Printf("<< OnSubscribe client subscribed %s: %s %v\n", cl.ID, filter, qos)
155+
}
156+
```
157+
158+
##### OnUnsubscribe
159+
`server.Events.OnUnsubscribe` is called when a client unsubscribes from a topic filter.
160+
161+
```go
162+
server.Events.OnUnsubscribe = func(filter string, cl events.Client) {
163+
fmt.Printf("<< OnUnsubscribe client unsubscribed %s: %s\n", cl.ID, filter)
164+
}
165+
```
166+
167+
##### OnMessage
168+
`server.Events.OnMessage` is called when a Publish packet (message) is received. The method receives the published message and information about the client who published it.
169+
170+
> This hook is only triggered when a message is received by clients. It is not triggered when using the direct `server.Publish` method.
171+
172+
173+
##### OnProcessMessage
174+
`server.Events.OnProcessMessage` is called before a publish packet (message) is processed. Specifically, the method callback is triggered after topic and ACL validation has occurred, but before the headers and payload are processed. You can use this if you want to programmatically change the data of the packet, such as setting it to retain, or altering the QoS flag.
175+
176+
If an error is returned, the packet will not be modified. and the existing packet will be used. If this is an unwanted outcome, the `mqtt.ErrRejectPacket` error can be returned from the callback, and the packet will be dropped/ignored, any further processing is abandoned.
177+
178+
> This hook is only triggered when a message is received by clients. It is not triggered when using the direct `server.Publish` method.
179+
180+
```go
181+
import "github.com/wind-c/comqtt/server/events"
182+
183+
server.Events.OnMessage = func(cl events.Client, pk events.Packet) (pkx events.Packet, err error) {
184+
if string(pk.Payload) == "hello" {
185+
pkx = pk
186+
pkx.Payload = []byte("hello world")
187+
return pkx, nil
188+
}
189+
190+
return pk, nil
191+
}
192+
```
193+
194+
The OnMessage hook can also be used to selectively only deliver messages to one or more clients based on their id, using the `AllowClients []string` field on the packet structure.
195+
196+
197+
198+
##### OnError
199+
`server.Events.OnError` is called when an error is encountered on the server, particularly within the use of a client connection status.
200+
201+
##### OnStorage
202+
`server.Events.OnStorage` is like `onError`, but receives the output of persistent storage methods.
203+
204+
205+
#### Server Options
206+
A few options can be passed to the `mqtt.NewServer(opts *Options)` function in order to override the default broker configuration. Currently these options are:
207+
208+
209+
- BufferSize (default 1024 * 256 bytes) - The default value is sufficient for most messaging sizes, but if you are sending many kilobytes of data (such as images), you should increase this to a value of (n*s) where is the typical size of your message and n is the number of messages you may have backlogged for a client at any given time.
210+
- BufferBlockSize (default 1024 * 8) - The minimum size in which R/W data will be allocated. If you are expecting only tiny or large payloads, you can alter this accordingly.
211+
212+
Any options which is not set or is `0` will use default values.
213+
214+
```go
215+
opts := &mqtt.Options{
216+
BufferSize: 512 * 1024,
217+
BufferBlockSize: 16 * 1024,
218+
}
219+
220+
s := mqtt.NewServer(opts)
221+
```
222+
223+
> See `examples/tcp/main.go` for an example implementation.
224+
225+
#### Direct Publishing
226+
When the broker is being embedded in a larger codebase, it can be useful to be able to publish messages directly to clients without having to implement a loopback TCP connection with an MQTT client. The `Publish` method allows you to inject publish messages directly into a queue to be delivered to any clients with matching topic filters. The `Retain` flag is supported.
227+
228+
```go
229+
// func (s *Server) Publish(topic string, payload []byte, retain bool) error
230+
err := s.Publish("a/b/c", []byte("hello"), false)
231+
if err != nil {
232+
log.Fatal(err)
233+
}
234+
```
235+
236+
A working example can be found in the `examples/events` folder.
237+
238+
#### Data Persistence
239+
CoMQTT provides a `persistence.Store` interface for developing and attaching persistent stores to the broker. The default persistence mechanism packaged with the broker is backed by [Bolt](https://github.com/etcd-io/bbolt) and can be enabled by assigning a `*bolt.Store` to the server.
240+
```go
241+
// import "github.com/wind-c/comqtt/server/persistence/bolt"
242+
err = server.AddStore(bolt.New("comqtt.db", nil))
243+
if err != nil {
244+
log.Fatal(err)
245+
}
246+
```
247+
> Persistence is on-demand (not flushed) and will potentially reduce throughput when compared to the standard in-memory store. Only use it if you need to maintain state through restarts.
248+
249+
#### Paho Interoperability Test
250+
You can check the broker against the [Paho Interoperability Test](https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability) by starting the broker using `examples/paho/main.go`, and then running the test with `python3 client_test.py` from the _interoperability_ folder.
251+
252+
253+
#### Performance (messages/second)
254+
Performance benchmarks were tested using [MQTT-Stresser](https://github.com/inovex/mqtt-stresser) on a 13-inch, Early 2015 Macbook Pro (2.7 GHz Intel Core i5). Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better. SEND = Publish throughput, RECV = Subscribe throughput.
255+
256+
> As usual, any performance benchmarks should be taken with a pinch of salt, but are shown to demonstrate typical throughput compared to the other leading MQTT brokers.
257+
258+
**Single Client, 10,000 messages**
259+
_With only 1 client, there is no variation in throughput so the benchmark is reports the same number for high, low, and median._
260+
261+
![1 Client, 10,000 Messages](assets/benchmarkchart_1_10000.png "1 Client, 10,000 Messages")
262+
263+
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=1 -num-messages=10000`
264+
265+
| | Comqtt | Mosquitto | EMQX | VerneMQ | Mosca |
266+
| :----------- |-------:| ----------: | -------: | --------: | --------:
267+
| SEND Max | 36505 | 30597 | 27202 | 32782 | 30125 |
268+
| SEND Min | 36505 | 30597 | 27202 | 32782 | 30125 |
269+
| SEND Median | 36505 | 30597 | 27202 |32782 | 30125 |
270+
| RECV Max | 152221 | 59130 | 7879 | 17551 | 9145 |
271+
| RECV Min | 152221 | 59130 | 7879 | 17551 | 9145 |
272+
| RECV Median | 152221 | 59130 | 7879 | 17551 | 9145 |
273+
274+
**10 Clients, 1,000 Messages**
275+
276+
![10 Clients, 1,000 Messages](assets/benchmarkchart_10_1000.png "10 Clients, 1,000 Messages")
277+
278+
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=1000`
279+
280+
| | Comqtt | Mosquitto | EMQX | VerneMQ | Mosca |
281+
| :----------- | --------: | ----------: | -------: | --------: | --------:
282+
| SEND Max | 37193 | 15775 | 17455 | 34138 | 36575 |
283+
| SEND Min | 6529 | 6446 | 7714 | 8583 | 7383 |
284+
| SEND Median | 15127 | 7813 | 10305 | 9887 | 8169 |
285+
| RECV Max | 33535 | 3710 | 3022 | 4534 | 9411 |
286+
| RECV Min | 7484 | 2661 | 1689 | 2021 | 2275 |
287+
| RECV Median | 11427 | 3142 | 1831 | 2468 | 4692 |
288+
289+
**10 Clients, 10,000 Messages**
290+
291+
![10 Clients, 10000 Messages](assets/benchmarkchart_10_10000.png "10 Clients, 10000 Messages")
292+
293+
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000`
294+
295+
| | Comqtt | Mosquitto | EMQX | VerneMQ | Mosca |
296+
| :----------- | --------: | ----------: | -------: | --------: | --------:
297+
| SEND Max | 13153 | 13270 | 12229 | 13025 | 38446 |
298+
| SEND Min | 8728 | 8513 | 8193 | 6483 | 3889 |
299+
| SEND Median | 9045 | 9532 | 9252 | 8031 | 9210 |
300+
| RECV Max | 20774 | 5052 | 2093 | 2071 | 43008 |
301+
| RECV Min | 10718 |3995 | 1531 | 1673 | 18764 |
302+
| RECV Median | 16339 | 4607 | 1620 | 1907 | 33524 |
303+
304+
**500 Clients, 100 Messages**
305+
306+
![500 Clients, 100 Messages](assets/benchmarkchart_500_100.png "500 Clients, 100 Messages")
307+
308+
`mqtt-stresser -broker tcp://localhost:1883 -num-clients=500 -num-messages=100`
309+
310+
| | Comqtt | Mosquitto | EMQX | VerneMQ | Mosca |
311+
| :----------- | --------: | ----------: | -------: | --------: | --------:
312+
| SEND Max | 70688 | 72686 | 71392 | 75336 | 73192 |
313+
| SEND Min | 1021 | 2577 | 1603 | 8417 | 2344 |
314+
| SEND Median | 49871 | 33076 | 33637 | 35200 | 31312 |
315+
| RECV Max | 116163 | 4215 | 3427 | 5484 | 10100 |
316+
| RECV Min | 1044 | 156 | 56 | 83 | 169 |
317+
| RECV Median | 24398 | 208 | 94 | 413 | 474 |
318+
319+
320+
## Contributions
321+
Contributions and feedback are both welcomed and encouraged! Open an [issue](https://github.com/wind-c/comqtt/issues) to report a bug, ask a question, or make a feature request.
322+
323+
324+

assets/.DS_Store

6 KB
Binary file not shown.

cmd/.DS_Store

6 KB
Binary file not shown.

0 commit comments

Comments
 (0)