Skip to content

Commit 3e0df56

Browse files
committed
feat: epoll managered by runtime netpoller
1 parent bb9c3f7 commit 3e0df56

File tree

5 files changed

+130
-10
lines changed

5 files changed

+130
-10
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/cloudwego/netpoll
22

3-
go 1.15
3+
go 1.18
44

55
require (
66
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7

poll_default_bsd.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@ import (
2525
"unsafe"
2626
)
2727

28+
func openPollFile() (int, error) {
29+
return syscall.Kqueue()
30+
}
31+
2832
func openPoll() (Poll, error) {
2933
return openDefaultPoll()
3034
}
3135

3236
func openDefaultPoll() (*defaultPoll, error) {
3337
l := new(defaultPoll)
34-
p, err := syscall.Kqueue()
38+
p, err := openPollFile()
3539
if err != nil {
3640
return nil, err
3741
}

poll_default_linux.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@ package netpoll
1616

1717
import (
1818
"errors"
19-
"runtime"
19+
"fmt"
2020
"sync"
2121
"sync/atomic"
2222
"syscall"
2323
"unsafe"
2424
)
2525

26+
func openPollFile() (int, error) {
27+
return EpollCreate(0)
28+
}
29+
2630
func openPoll() (Poll, error) {
2731
return openDefaultPoll()
2832
}
@@ -31,11 +35,16 @@ func openDefaultPoll() (*defaultPoll, error) {
3135
var poll = new(defaultPoll)
3236

3337
poll.buf = make([]byte, 8)
34-
var p, err = EpollCreate(0)
38+
var p, err = openPollFile()
3539
if err != nil {
3640
return nil, err
3741
}
3842
poll.fd = p
43+
pd, errno := runtime_pollOpen(uintptr(poll.fd))
44+
if errno != 0 {
45+
return nil, Exception(ErrUnsupported, fmt.Sprintf("when poll open: errno=%d", errno))
46+
}
47+
poll.pd = pd
3948

4049
var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
4150
if e0 != 0 {
@@ -60,6 +69,7 @@ func openDefaultPoll() (*defaultPoll, error) {
6069
type defaultPoll struct {
6170
pollArgs
6271
fd int // epoll fd
72+
pd uintptr // runtime pd for epoll fd
6373
wop *FDOperator // eventfd, wake epoll_wait
6474
buf []byte // read wfd trigger msg
6575
trigger uint32 // trigger flag
@@ -90,23 +100,28 @@ func (a *pollArgs) reset(size, caps int) {
90100
// Wait implements Poll.
91101
func (p *defaultPoll) Wait() (err error) {
92102
// init
93-
var caps, msec, n = barriercap, -1, 0
103+
var caps, n = barriercap, 0
94104
p.Reset(128, caps)
95105
// wait
96106
for {
97107
if n == p.size && p.size < 128*1024 {
98108
p.Reset(p.size<<1, caps)
99109
}
100-
n, err = EpollWait(p.fd, p.events, msec)
110+
n, err = EpollWait(p.fd, p.events, 0)
101111
if err != nil && err != syscall.EINTR {
102112
return err
103113
}
104-
if n <= 0 {
105-
msec = -1
106-
runtime.Gosched()
114+
if n == 0 {
115+
errno := runtime_pollReset(p.pd, 'r')
116+
if errno != 0 {
117+
return Exception(ErrUnsupported, fmt.Sprintf("when poll reset: errno=%d", errno))
118+
}
119+
errno = runtime_pollWait(p.pd, 'r')
120+
if errno != 0 {
121+
return Exception(ErrUnsupported, fmt.Sprintf("when poll wait: errno=%d", errno))
122+
}
107123
continue
108124
}
109-
msec = 0
110125
if p.Handler(p.events[:n]) {
111126
return nil
112127
}

runtime.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright 2024 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package netpoll
16+
17+
import (
18+
_ "unsafe"
19+
)
20+
21+
//go:linkname runtime_pollOpen internal/poll.runtime_pollOpen
22+
func runtime_pollOpen(fd uintptr) (pd uintptr, errno int)
23+
24+
//go:linkname runtime_pollWait internal/poll.runtime_pollWait
25+
func runtime_pollWait(pd uintptr, mode int) (errno int)
26+
27+
//go:linkname runtime_pollReset internal/poll.runtime_pollReset
28+
func runtime_pollReset(pd uintptr, mode int) (errno int)

runtime_linux_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2024 CloudWeGo Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build linux || loong64
16+
// +build linux loong64
17+
18+
package netpoll
19+
20+
import (
21+
"syscall"
22+
"testing"
23+
"time"
24+
)
25+
26+
func TestRuntimeNetpoller(t *testing.T) {
27+
pfd, err := openPollFile()
28+
MustNil(t, err)
29+
30+
pd, errno := runtime_pollOpen(uintptr(pfd))
31+
Assert(t, errno == 0, errno)
32+
t.Logf("poll open success: pd=%d", pd)
33+
34+
var rfd, wfd = GetSysFdPairs()
35+
36+
eventin := &epollevent{
37+
events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR,
38+
data: [8]byte{0, 0, 0, 0, 0, 0, 0, 1},
39+
}
40+
err = EpollCtl(pfd, syscall.EPOLL_CTL_ADD, rfd, eventin)
41+
MustNil(t, err)
42+
43+
go func() {
44+
time.Sleep(time.Millisecond * 100)
45+
46+
iovec := [1]syscall.Iovec{}
47+
buf := []byte("hello")
48+
n, err := writev(wfd, [][]byte{buf}, iovec[:])
49+
MustNil(t, err)
50+
Equal(t, n, 5)
51+
t.Logf("poll read success: %s", string(buf[:n]))
52+
}()
53+
54+
begin := time.Now()
55+
errno = runtime_pollWait(pd, 'r'+'w')
56+
Assert(t, errno == 0, errno)
57+
cost := time.Since(begin)
58+
Assert(t, cost.Milliseconds() >= 100)
59+
60+
events := make([]epollevent, 1)
61+
n, err := EpollWait(pfd, events, 0)
62+
MustNil(t, err)
63+
Equal(t, n, 1)
64+
t.Logf("poll wait success")
65+
66+
iovec := [1]syscall.Iovec{}
67+
buf := make([]byte, 1024)
68+
bs := [1][]byte{buf}
69+
n, err = readv(rfd, bs[:], iovec[:])
70+
MustNil(t, err)
71+
Equal(t, n, 5)
72+
t.Logf("poll read success: %s", string(buf[:n]))
73+
}

0 commit comments

Comments
 (0)