Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket ping pong timeout #866

Open
BigBoulard opened this issue Nov 14, 2023 · 3 comments
Open

Websocket ping pong timeout #866

BigBoulard opened this issue Nov 14, 2023 · 3 comments

Comments

@BigBoulard
Copy link

Hi guys,

I'm trying to build a chat app that looks like the below simplified version.

What the chat app is doing:

  • upgrades the connection to a WS connection
  • sets a read limit
  • sets a read deadline
  • sets a pong handler that will print PONG and then reset the read deadline
  • launches 2 goroutines for reading and writing
    • readMessages: reads any incoming message and close the program on any unexpected error
    • writeMessages:
      • reads the egress channel and writes the received payload back to the web socket
      • start a ticker that send a PING control frame to the WS
const (
	pingInterval = 2 * time.Second // interval at which we send a PING test
	pongWait     = 4 * time.Second // PONG timeout: we must receive PONG
)

type WSClient struct {
	wsconn *websocket.Conn
	egress chan []byte // writes comes to this egress chan cause WS conn are not thread-safe
}

type pongHandler func(string) error

func NewWSClient(c *gin.Context) error {
	// create a WSClient instance
	wsClient := &WSClient{
		egress: make(chan []byte),
	}

	// create Web Socket connection
	conn, err := createWS(c.Writer, c.Request, wsClient.pongHandler)
	if err != nil {
		return fmt.Errorf("app.NewWSClient: createWS error: %w", err)
	}
	wsClient.wsconn = conn

	// launch the read/write goroutines
	go wsClient.readMessages()
	go wsClient.writeMessages()

	return nil
}

// create the WS conn and configure
func createWS(w http.ResponseWriter, r *http.Request, pongHandler pongHandler) (*websocket.Conn, error) {
	conn, err := Upgrade(w, r)
	if err != nil {
		return nil, fmt.Errorf("createWS: upgrader.Upgrade error: %w", err)
	}

	conn.SetReadLimit(conf.MessageMaxSize)

	if err := conn.SetReadDeadline(time.Time{}); err != nil {
		return nil, fmt.Errorf("createWS: conn.SetReadDeadline error: %w", err)
	}

	if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
		return nil, fmt.Errorf("createWS: conn.SetReadDeadline error: %w", err)
	}

	conn.SetPongHandler(pongHandler)

	return conn, nil
}

// readMessages is run as a goroutine
func (c *WSClient) readMessages() {
	for {
		messageType, payload, err := c.wsconn.ReadMessage()
		if err != nil {
			log.Fatalf("wsconn.ReadMessage() error: %v", err)
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { // , websocket.CloseAbnormalClosure) {
				log.Fatalf("websocket.IsUnexpectedCloseError: %v", err)
			}
			c.wsconn.Close()
			return
		}
		log.Printf("received %s:\n%s", c.wsMsgTypeToString(messageType), string(payload))
		// writes the message back to the WS
		c.egress <- payload
	}
}

// write messages to the WS
func (c *WSClient) writeMessages() {
	ticker := time.NewTicker(pingInterval)
	defer func() {
		ticker.Stop()
	}()

	for {
		select {
		case data, ok := <-c.egress:
			if !ok {
				// if channel is closed, send a CLOSE signal to the WS
				if err := c.wsconn.WriteMessage(websocket.CloseMessage, nil); err != nil {
					log.Fatalf("egress channel and WS are closed %v", err)
				}
				c.wsconn.Close()
				return
			}
			// write the message to the connection
			if err := c.wsconn.WriteMessage(websocket.TextMessage, data); err != nil {
				log.Fatalf("WS closed: %v", err)
				c.wsconn.Close()
				return
			}

		case <-ticker.C:
			// Send a PING msg to the WS.
			if err := c.wsconn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
				log.Fatalf("Ping attempt but WS closed: %v", err)
				return
			}
		}
	}
}

// Respond to ping tick and reset the timer
func (c *WSClient) pongHandler(pongMsg string) error {
       log.Print("PONG") // NOT REACHED
	return c.wsconn.SetReadDeadline(time.Now().UTC().Add(pongWait))
}

func (c *WSClient) wsMsgTypeToString(msgType int) string {
	switch msgType {
	case 1:
		return "TEXT"
	case 2:
		return "BINARY"
	case 8:
		return "CLOSE"
	case 9:
		return "PING"
	case 10:
		return "PONG"
	default:
		return "unknown"
	}
}

What the stress-test script is doing

  • creates X rooms of 2 users
  • a WS connection is created for each user
  • a conversation is a sequence like this: user 1 writes, user 1 reads, user 2 writes, user 2 reads ... in an infinite loop
  • a PING handler that is supposed to send a PONG frame to each connection is set
const (
	nbRooms         = 3300 // = 6600 users
	minIdleTimeInMs = 800
	maxIdleTimeInMs = 2000
	throttle        = time.Millisecond * 50
)

func main() {
	configure()
	createChatRooms()
	handleShutdown()
}

func createChatRooms() {
	for i := 0; i < nbRooms; i++ {
		roomID := uuid.New().String()
		user1 := GenUser()
		user2 := GenUser()
		go func(roomID string, u1 User, u2 User) {
			startChat(roomID, u1, u2)
		}(roomID, *user1, *user2)
		time.Sleep(throttle)
	}
}

func startChat(roomID string, u1 User, u2 User) {
	// create users in DB etc.

	// create ws conns
	conn1, err := getWSConn(&u1.User, room.RoomID)
	if err != nil {
		log.Error(err, "getWSConn error")
		return
	}
	conn2, err := getWSConn(&u2.User, room.RoomID)
	if err != nil {
		log.Error(err, "getWSConn error")
		return
	}

	// Start Goroutines for handling messages from user 1 and user 2
	go startConversation(room, u1, conn1, u2, conn2)

	// WARN: connections are never closed intentionally here
}

func startConversation(room *roomdom.Room, u1 User, conn1 *websocket.Conn, u2 User, conn2 *websocket.Conn) {
	defer func() {
		conn1.Close()
		conn2.Close()
	}()
	log.Info(fmt.Sprintf("%s and %s are conversing on %s", u1.User.FirstName, u2.User.FirstName, room.RoomID))
	for {
		// write msg to conn1
		msg1, err := genMsg(u1.User.UID, room.RoomID)
		if err != nil {
			log.Fatal(err, "msg1 genMsg")
			return
		}
		if err := conn1.WriteMessage(websocket.TextMessage, msg1); err != nil {
			log.Fatal(err, "writeMsg1 error")
			return
		}
		time.Sleep(time.Millisecond * time.Duration(genInt(minIdleTimeInMs, maxIdleTimeInMs)))

		// read msg from conn1
		messageType, _, err := conn1.ReadMessage()
		if messageType == websocket.PingMessage || messageType == websocket.PongMessage {
			log.Info("startConversation", "received %s", wsMsgTypeToString(messageType)) // I NEVER receive any PONG message here
		}
		// messageType, payload, err := conn.ReadMessage()
		if err != nil {
			log.Fatal(err, "conn.ReadMessages error")
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
				log.Fatal(err, "UnexpectedCloseError")
			}
		}

		// write msg to conn2
		msg2, err := genMsg(u2.User.UID, room.RoomID)
		if err != nil {
			log.Error(err, "msg1 genMsg")
			return
		}
		if err := conn2.WriteMessage(websocket.TextMessage, msg2); err != nil {
			log.Error(err, "writeMsg2 error")
			return
		}

		// read msg from conn2
		_, _, err = conn2.ReadMessage()
		// messageType, payload, err := conn.ReadMessage()
		if err != nil {
			log.Fatal(err, "conn.ReadMessages error")
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
				log.Fatal(err, "UnexpectedCloseError")
			}
		}

		time.Sleep(time.Millisecond * time.Duration(genInt(minIdleTimeInMs, maxIdleTimeInMs)))
	}
}

func getWSConn(u *userdom.User, roomID string) (*websocket.Conn, error) {
	conn, _, err := websocket.DefaultDialer.Dial("wss://localhost:8090/ws...", nil)
        conn.SetPingHandler(func(data string) error {
		println("RECEIVED PING") // OK
		return conn.WriteMessage(websocket.PongMessage, []byte{})
	})
	return conn, err
}

func wsMsgTypeToString(msgType int) string {
	switch msgType {
	case 1:
		return "TEXT"
	case 2:
		return "BINARY"
	case 8:
		return "CLOSE"
	case 9:
		return "PING"
	case 10:
		return "PONG"
	default:
		return "unknown"
	}
}

func handleShutdown() {
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

	<-quit
	log.Info("Shutting down server gracefully...")
}

Problem

The stress-test script receives a PING frame and tries to send a PONG one but I see no incoming PONG frame in the chat app and the connection is then timed out.

wsconn.ReadMessage() error: read tcp 127.0.0.1:8090->127.0.0.1:55896: i/o timeout

Subsidiary questions

  • What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?
  • I'm using WriteMessage to send control frames. I saw that there's also WriteControl that takes a deadline and then calls SetWriteDeadline. I know that unlike WriteMessage, WriteControl is safe to be used concurrently. The thing is my chat app, I only set a read deadline cause the "idleness" of a connection is detected when the user leaves, so from a server-side perspective, when there's no more data to be read. I don't know how am I supposed to use WriteControl in this context.
  • if I remove the ping-pong part of the algorithm, the chat app can handle around 16350 web sockets, then after I get this error:
    error:websocket: close 1006 (abnormal closure): unexpected EOF.
    What can cause this issue cause I don't have much log?

Originally posted by @BigBoulard in gorilla/.github#26

@ghost
Copy link

ghost commented Nov 15, 2023

Control messages like PING are processed when reading a message. Ensure that the client reads all messages.

What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?

NextReader and NextWriter are the core functionality ReadMessage is a helper method for getting a reader using NextReader and reading from that reader to a buffer. WriteMessage is a helper method for getting a writer using NextWriter, writing the message and closing the writer.

I don't know how am I supposed to use WriteControl in this context.

Because your application does not write control messages concurrently with calls to WriteMessage / NextWriter, there's no need to use WriteControl in your application.

Applications should write with a deadline to protect against peers that do not read from the socket.

@BigBoulard
Copy link
Author

BigBoulard commented Nov 16, 2023

Control messages like PING are processed when reading a message. Ensure that the client reads all messages.

What's the difference/benefit of using ReadMessage/WriteMessage vs NextReader/NextWriter ?

NextReader and NextWriter are the core functionality ReadMessage is a helper method for getting a reader using NextReader and reading from that reader to a buffer. WriteMessage is a helper method for getting a writer using NextWriter, writing the message and closing the writer.

I don't know how am I supposed to use WriteControl in this context.

Because your application does not write control messages concurrently with calls to WriteMessage / NextWriter, there's no need to use WriteControl in your application.

Applications should write with a deadline to protect against peers that do not read from the socket.

Hi @pennystevens,

Thanks for getting back to me to provide clarity on this.

  • The main problem I have is if I remove the ping-pong algorithm, the chat app handle around 16 350 web sockets, then I get this error: error:websocket: close 1006 (abnormal closure): unexpected EOF.
    Is there a means so I can get more information on what's causing the issue?

I'm running the app in a docker desktop container as well as the stress-test script that is replicated cause I can see that each instance breaks after connecting around 7500 web sockets (same on localhost), so I launch several instances... I see no resource issue both CPU or Memory. I think I'm far from the number of available sockets (about 64k I think), so I'm searching on some limits maybe to be setup through the Go Compiler: max number of Goroutines etc..I don't know.

  • the second issue is that the server doesn't see any PONG message that should be sent by the stress test script despite the call to conn.WriteMessage(websocket.PongMessage, []byte{}) when the stress test script tries to create a connection ...
func getWSConn(u *userdom.User, roomID string) (*websocket.Conn, error) {
	conn, _, err := websocket.DefaultDialer.Dial("wss://localhost:8090/ws...", nil)
        conn.SetPingHandler(func(data string) error {
		println("RECEIVED PING") // OK
		return conn.WriteMessage(websocket.PongMessage, []byte{})
	})
	return conn, err
}

... and the creation of the PONG handler in the app:

func createWS(w http.ResponseWriter, r *http.Request, pongHandler pongHandler) (*websocket.Conn, error) {
//  ...
   conn.SetPongHandler(pongHandler)
   return conn, nil
}
// ...
// Respond to ping tick and reset the timer
func (c *WSClient) pongHandler(pongMsg string) error {
       log.Print("PONG") // NOT REACHED
	return c.wsconn.SetReadDeadline(time.Now().UTC().Add(pongWait))
}

Is there something I'm missing?

@hulkingshtick
Copy link

I noticed some problems with the code in issue.

This code:

	messageType, _, err := conn1.ReadMessage()
	if messageType == websocket.PingMessage || messageType == websocket.PongMessage {
		log.Info("startConversation", "received %s", wsMsgTypeToString(messageType)) // I NEVER receive any PONG message here
	}

will never report a ping or pong message because ReadMessage only returns data messages (text, binary). Control messages are handled by callbacks or, in the case of close, the error return value.

The ping handler:

    conn.SetPingHandler(func(data string) error {
	println("RECEIVED PING") // OK
	return conn.WriteMessage(websocket.PongMessage, []byte{})
})

has three problems.

  • The handler does not echo the data.
  • The handler calls WriteMessage concurrently with other writes.
  • The handler can terminate read on the connection early because it returns all errors from WriteMessage.

Fix the ping handler by starting from the default ping handler code:

    conn.SetPingHandler(func(data string) error {
	println("RECEIVED PING") // <-- your code added here
	// Make a best effort to send the pong message.
	_ = c.WriteControl(PongMessage, []byte(message), time.Now().Add(writeWait))
	return nil
})

The previous comment sets the pong handler to pongHandler. Is pongHandler set to *WSClient.PongHandler or is there more than one pong handling function in the code?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

No branches or pull requests

2 participants