-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathpoller.go
117 lines (93 loc) · 1.94 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// +build linux
package iouring
import (
"os"
"sync"
"golang.org/x/sys/unix"
)
const initEpollEvents = 1
type iourPoller struct {
sync.Mutex
fd int
iours map[int]*IOURing
events []unix.EpollEvent
}
var (
poller *iourPoller
initpollerLock sync.Mutex
)
func initpoller() error {
// fast path
if poller != nil {
return nil
}
initpollerLock.Lock()
defer initpollerLock.Unlock()
if poller != nil {
return nil
}
epfd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return os.NewSyscallError("epoll_create1", err)
}
poller = &iourPoller{
fd: epfd,
iours: make(map[int]*IOURing),
events: make([]unix.EpollEvent, initEpollEvents),
}
go poller.run()
return nil
}
func registerIOURing(iour *IOURing) error {
if err := initpoller(); err != nil {
return err
}
if err := unix.EpollCtl(poller.fd, unix.EPOLL_CTL_ADD, iour.eventfd,
&unix.EpollEvent{Fd: int32(iour.eventfd), Events: unix.EPOLLIN | unix.EPOLLET},
); err != nil {
return os.NewSyscallError("epoll_ctl_add", err)
}
poller.Lock()
poller.iours[iour.eventfd] = iour
poller.Unlock()
return nil
}
func removeIOURing(iour *IOURing) error {
poller.Lock()
delete(poller.iours, iour.eventfd)
poller.Unlock()
return os.NewSyscallError("epoll_ctl_del",
unix.EpollCtl(poller.fd, unix.EPOLL_CTL_DEL, iour.eventfd, nil))
}
func (poller *iourPoller) run() {
for {
n, err := unix.EpollWait(poller.fd, poller.events, -1)
if err != nil {
continue
}
for i := 0; i < n; i++ {
fd := int(poller.events[i].Fd)
poller.Lock()
iour, ok := poller.iours[fd]
poller.Unlock()
if !ok {
continue
}
select {
case iour.cqeSign <- struct{}{}:
default:
}
}
poller.adjust()
}
}
func (poller *iourPoller) adjust() {
poller.Lock()
l := len(poller.iours) - len(poller.events)
poller.Unlock()
if l <= 0 {
return
}
events := make([]unix.EpollEvent, l*2)
poller.events = append(poller.events, events...)
}