Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add http echo example #23

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions example/g_lockOsThread/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package main

import (
"errors"
"fmt"
"github.com/mailru/easygo/netpoll"
"io"
"net"
"os"
"os/signal"
"runtime"
"syscall"
"time"
)

// N_READY_CONNECTIONS n worker thread
var N_READY_CONNECTIONS = (runtime.NumCPU() - 1) * 8

// ready ok connection channel
var readyConn = make(chan net.Conn,N_READY_CONNECTIONS)


func main() {
poller, err := netpoll.New(nil)
if err != nil {
panic(err)
}
listener, err := net.Listen("tcp", "0.0.0.0:8080")
if err != nil {
panic(err)
}
acceptDesc, err := netpoll.HandleListener(listener, netpoll.EPOLLIN)
if err != nil {
panic(err)
}
cancel := initHandleGoroutine()
// add accept goroutine cancel
acceptCancel := make(chan struct{})
cancel = append(cancel,acceptCancel)
// signal handle
signalChan := make(chan os.Signal,1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
go func() {
<-signalChan
for _,v := range cancel{
v <- struct{}{}
}
// close readyConn chan write pipe
close(readyConn)
// close acceptConn chan write pipe
close(acceptCancel)
// check old fd is handle ok?
for {
time.Sleep(time.Millisecond * 10)
if len(readyConn) == 0 && len(acceptCancel) == 0 {
os.Exit(0)
}
}
}()

acceptDone := make(chan struct{},255)
err = poller.Start(acceptDesc, func(event netpoll.Event) {
acceptDone <- struct{}{}
})
if err != nil {
panic(err)
}

go func() {
for {
select {
case <-acceptDone:
conn,err := listener.Accept()
if err != nil {
fmt.Println("accept error : ",err)
continue
}
readDesc, err := netpoll.HandleRead(conn)
if err != nil {
fmt.Println("handle read event error : ",err)
}
// on read
err = poller.Start(readDesc, func(event netpoll.Event) {
readyConn <- conn
})
if err != nil {
fmt.Println("poller read start error : ",err)
}
// on close
closeDesc, err := netpoll.Handle(conn,netpoll.EPOLLRDHUP)
err = poller.Start(closeDesc, func(event netpoll.Event) {
err := poller.Stop(readDesc)
if err != nil {
fmt.Println("poller read stop err : ", err)
}
err = poller.Stop(closeDesc)
if err != nil {
fmt.Println("poller close stop err : ", err)
}
err = readDesc.Close()
if err != nil {
fmt.Println("read desc close conn err : ", err)
}
err = closeDesc.Close()
if err != nil {
fmt.Println("close desc close conn err : ", err)
}
})
if err != nil {
fmt.Println("poller close start error : ",err)
}
case <-acceptCancel:
return
}
}
}()

// hang
select {}
}

func initHandleGoroutine() []chan struct{} {
cancel := make([]chan struct{},0,N_READY_CONNECTIONS/8)
for i := 0; i < N_READY_CONNECTIONS / 8; i++ {
nCancel := make(chan struct{})
cancel = append(cancel,nCancel)
go func() {
runtime.LockOSThread()
for {
select {
case conn := <- readyConn:
buffer := make([]byte,256)
_, err := conn.Read(buffer)
if err == io.EOF || errors.Is(err,net.ErrClosed) {
continue
}
if err != nil {
fmt.Println("read err : ",err)
continue
}
// reset buffer len
buffer = buffer[:0]
buffer = append(buffer, "HTTP/1.1 200 OK\r\nServer: easygo\r\nContent-Type: text/plain\r\nDate: "...)
buffer = time.Now().AppendFormat(buffer, "Mon, 02 Jan 2006 15:04:05 GMT")
buffer = append(buffer, "\r\nContent-Length: 12\r\n\r\nHello World!"...)
// write
_, err = conn.Write(buffer)
if err == io.EOF || errors.Is(err,net.ErrClosed) {
continue
}
if err != nil {
fmt.Println("write err : ",err)
continue
}
case <- nCancel:
return
}
}
}()
}
return cancel
}