Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Broadcast deadlocks when concurrency #59

Open
AlanTianx opened this issue Dec 6, 2021 · 2 comments
Open

[BUG] Broadcast deadlocks when concurrency #59

AlanTianx opened this issue Dec 6, 2021 · 2 comments
Assignees
Labels
bug Something isn't working

Comments

@AlanTianx
Copy link

AlanTianx commented Dec 6, 2021

Describe the bug
I found in the actual application that the Broadcast() method will cause a deadlock

To Reproduce
Steps to reproduce the behavior:

  1. When nsConn is disconnected, it will cause Server.Broadcast() internal blocking'' when broadcasting a message on the websocket.OnNamespaceDisconnect or websocket.OnRoomLeft event
  2. code
package main

import (
	"fmt"
	gorilla "github.com/gorilla/websocket"
	"github.com/kataras/iris/v12"
	"github.com/kataras/iris/v12/websocket"
	"github.com/kataras/neffos"
	"net/http"
	"time"
)

func main() {
	namespace, room, event := "test", "ttt", "ch"
	chatServer := websocket.New(
		websocket.GorillaUpgrader(gorilla.Upgrader{
			ReadBufferSize:  1024,
			WriteBufferSize: 1024,
			// todo 允许所有的CORS 跨域请求,正式环境可以关闭
			CheckOrigin: func(r *http.Request) bool {
				return true
			},
		}),
		websocket.WithTimeout{
			WriteTimeout: time.Second * 60,
			ReadTimeout:  time.Second * 60,
			Namespaces: websocket.Namespaces{
				namespace: websocket.Events{
					websocket.OnNamespaceConnected: func(nsConn *websocket.NSConn, msg websocket.Message) error {
						fmt.Println("OnNamespaceConnected", nsConn.Conn.ID())
						nsConn.JoinRoom(nil, room)
						return nil
					},
					websocket.OnNamespaceDisconnect: func(nsConn *websocket.NSConn, msg websocket.Message) error {
						fmt.Println("OnNamespaceDisconnect", nsConn.Conn.ID())
						// Todo The broadcast here will cause blocking in `server.start()`
						//nsConn.Conn.Server().Broadcast(nsConn, neffos.Message{
						//	Body:      []byte("我离开了room" + nsConn.Conn.ID()),
						//	Namespace: namespace,
						//	Room:      room,
						//	To:        "",
						//	Event:     event,
						//})

						// Todo Add a certain delay and everything is normal
						time.AfterFunc(time.Millisecond*50, func() {
							nsConn.Conn.Server().Broadcast(nsConn, neffos.Message{
								Body:      []byte("我离开了room" + nsConn.Conn.ID()),
								Namespace: namespace,
								Room:      room,
								To:        "",
								Event:     event,
							})
						})

						return nil
					},
					websocket.OnRoomJoined: func(nsConn *websocket.NSConn, msg websocket.Message) error {
						fmt.Println("OnRoomJoined", nsConn.Conn.ID())
						nsConn.Emit(event, []byte("我是单独消息"))
						nsConn.Conn.Server().Broadcast(nil, neffos.Message{
							Body:      []byte("我加入了room" + nsConn.Conn.ID()),
							Namespace: namespace,
							Room:      room,
							To:        "",
							Event:     event,
						})

						return nil
					},
					websocket.OnRoomLeft: func(nsConn *websocket.NSConn, msg websocket.Message) error {
						fmt.Println("OnRoomLeft", nsConn.Conn.ID(), nsConn.Conn.IsClosed())
						// Todo The broadcast here will cause blocking in `server.start()`
						//nsConn.Conn.Server().Broadcast(nsConn, neffos.Message{
						//	Body:      []byte("我离开了room" + nsConn.Conn.ID()),
						//	Namespace: namespace,
						//	Room:      room,
						//	To:        "",
						//	Event:     event,
						//})

						// Todo Add a certain delay and everything is normal
						time.AfterFunc(time.Millisecond*50, func() {
							nsConn.Conn.Server().Broadcast(nsConn, neffos.Message{
								Body:      []byte("我离开了room" + nsConn.Conn.ID()),
								Namespace: namespace,
								Room:      room,
								To:        "",
								Event:     event,
							})
						})
						return nil
					},
				},
			},
		},
	)

	app := iris.New()
	app.Get("/e", websocket.Handler(chatServer, func(ctx iris.Context) string {
		return ctx.URLParam("id")
	}))

	// Todo If you don't want to lose messages, please turn it on
	chatServer.SyncBroadcaster = true

	app.Listen("0.0.0.0:8090")
}
@AlanTianx AlanTianx added the bug Something isn't working label Dec 6, 2021
@AlanTianx
Copy link
Author

  • test
package main

import (
	"context"
	"fmt"
	"github.com/kataras/neffos"
	"github.com/kataras/neffos/gorilla"
	"strconv"
	"testing"
	"time"
)

var (
	handler = neffos.WithTimeout{
		Namespaces: neffos.Namespaces{
			"test": neffos.Events{
				neffos.OnNamespaceConnected: func(c *neffos.NSConn, msg neffos.Message) error {
					fmt.Println("成功链接:", c.Conn.ID())
					c.Emit("ch", []byte(""))
					return nil
				},
				neffos.OnNamespaceDisconnect: func(c *neffos.NSConn, msg neffos.Message) error {
					fmt.Println("断开链接:", c.Conn.ID())
					return nil
				},
				neffos.OnRoomJoined: func(c *neffos.NSConn, msg neffos.Message) error {
					fmt.Println("OnRoomJoined:---", msg)
					return nil
				},
				neffos.OnRoomLeft: func(c *neffos.NSConn, msg neffos.Message) error {
					fmt.Println("OnRoomLeft:---", msg)
					return nil
				},
				"ch": func(conn *neffos.NSConn, message neffos.Message) error {
					fmt.Println("ch--------", string(message.Body))
					return nil
				},
			},
		},
	}
)

// 1
func TestWs(t *testing.T) {
	// 直接使用大猩猩的dialer
	dialer := gorilla.DefaultDialer

	fmt.Println("-- Running...")

	go connect(dialer, "ws://127.0.0.1:8090/e?id=a")
	select {}
}

// 30
func TestA(t *testing.T) {
	dialer := gorilla.DefaultDialer

	fmt.Println("-- Running...")

	for i := 0; i < 30; i++ {
		go connect(dialer, "ws://127.0.0.1:8090/e?id="+strconv.Itoa(i))
	}
	select {}
}

func connect(dialer neffos.Dialer, url string) {
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
	defer cancel()

	fmt.Println(url)
	client, err := neffos.Dial(ctx, dialer, url, handler)

	if err != nil {
		fmt.Printf("connection failure: %v\n", err)
		return
	}

	if client.ID == "" {
		panic("CLIENT'S ID IS EMPTY.\n DIAL NOW SHOULD BLOCK UNTIL ID IS FILLED(ACK) AND UNTIL SERVER'S CONFIRMATION")
	}

	ctxConnect, cancelConnect := context.WithDeadline(context.Background(), time.Now().Add(25*time.Second))
	defer cancelConnect()

	var c *neffos.NSConn

	c, err = client.Connect(ctxConnect, "test")

	if err != nil {
		fmt.Println("链接到namespace err:", err)

		return
	}

	if c.Conn.ID() == "" {
		panic("CLIENT'S CONNECTION ID IS EMPTY.\nCONNECT SHOULD BLOCK UNTIL ID IS FILLED(ACK) AND UNTIL SERVER'S CONFIRMATION TO NAMESPACE CONNECTION")
	}
}

@AlanTianx
Copy link
Author

After a more in-depth investigation, it was found that when broadcasting a message to A Conn, A Conn just disconnected, and the internal judgment method *Conn.canWrite() would generate lock waiting, which caused the broadcast to be blocked all the time.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants