Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

should not use netpoll with gobwas #18

Closed
lesismal opened this issue Apr 26, 2021 · 4 comments
Closed

should not use netpoll with gobwas #18

lesismal opened this issue Apr 26, 2021 · 4 comments

Comments

@lesismal
Copy link

as I have said here:
gobwas/ws#121 (comment)

both gobwas's upgrade and wsutil/user.readRequest are blocking mode, if many connections send bytes one-by-one or send a half ws upgrade message or send only part of ws message, it will make all your gopool's task blocked, then your service not available.

try this client example:

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net"
	"net/url"
	"sync"
	"sync/atomic"
	"time"

	"github.com/gorilla/websocket"
)

var (
	connectedNum int64 = 0

	tunnelMux = sync.Mutex{}
	tunnels   = map[string]time.Duration{}
)

type Object map[string]interface{}

type Request struct {
	ID     int    `json:"id"`
	Method string `json:"method"`
	Params Object `json:"params"`
}

func client(id int, addr string, needLog bool, needDelay bool) {
	u := url.URL{Scheme: "ws", Host: addr, Path: "/ws"}
	if needLog {
		log.Printf("connecting to %s", u.String())
	}

	c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
	if err != nil {
		log.Print("dial:", err)
		return
	}
	defer c.Close()
	if needLog {
		log.Printf("connected to %s, connectedNum: %v", u.String(), atomic.AddInt64(&connectedNum, 1))
	}

	tunnelMux.Lock()
	tunnels[c.RemoteAddr().String()] = time.Second
	tunnelMux.Unlock()
	defer func() {
		tunnelMux.Lock()
		delete(tunnels, c.RemoteAddr().String())
		tunnelMux.Unlock()
	}()

	go func() {
		for {
			_, message, err := c.ReadMessage()
			if err != nil {
				if needLog {
					log.Println("read:", err)
				}
				return
			}
			if needLog {
				log.Println("read :", string(message))
			}
		}
	}()

	for {
		data, _ := json.Marshal(&Request{
			ID:     id,
			Method: "publish",
			Params: map[string]interface{}{
				"timestamp": time.Now().Unix(),
			},
		})
		err := c.WriteMessage(websocket.TextMessage, data)
		if err != nil {
			log.Printf("write: %v", err)
			return
		}
		if needLog {
			log.Println("write:", string(data))
		}

		time.Sleep(time.Second)
	}
}

func main() {
	go runProxy("localhost:3000", "localhost:3333")
	time.Sleep(time.Second / 10)

	// larger than your chat server's pool size
	num := 128
	for i := 1; i <= num; i++ {
		go client(i, "localhost:3000", false, true)
	}

	client(num+1, "localhost:3333", true, false)
}

func tunnel(clientConn *net.TCPConn, serverAddr string) {
	serverConn, dailErr := net.Dial("tcp", serverAddr)
	log.Printf("+ tunnel: [%v -> %v]\n", clientConn.LocalAddr().String(), serverAddr)
	if dailErr == nil {
		c2sCor := func() {
			defer func() {
				recover()
			}()

			var buf = make([]byte, 4096)
			for i := 0; true; i++ {
				nread, err := clientConn.Read(buf)
				if err != nil {
					clientConn.Close()
					serverConn.Close()
					break
				}

				tmp := buf[:nread]

				tunnelMux.Lock()
				delay := tunnels[clientConn.LocalAddr().String()]
				tunnelMux.Unlock()
				if delay > 0 {
					// fmt.Println("delay:", delay)
					for j := 0; j < len(tmp); j++ {
						_, err := serverConn.Write([]byte{tmp[j]})
						if err != nil {
							clientConn.Close()
							serverConn.Close()
							return
						}
						time.Sleep(delay)
					}
				} else {
					_, err := serverConn.Write(tmp)
					if err != nil {
						clientConn.Close()
						serverConn.Close()
						return
					}
				}
			}
		}

		s2cCor := func() {
			defer func() {
				recover()
			}()

			var buf = make([]byte, 4096)

			for i := 0; true; i++ {
				nread, err := serverConn.Read(buf)
				if err != nil {
					clientConn.Close()
					serverConn.Close()
					break
				}

				tmp := buf[:nread]

				tunnelMux.Lock()
				delay := tunnels[clientConn.LocalAddr().String()]
				tunnelMux.Unlock()
				if delay > 0 {
					// fmt.Println("delay:", delay)
					for j := 0; j < len(tmp); j++ {
						_, err := clientConn.Write([]byte{tmp[j]})
						if err != nil {
							clientConn.Close()
							serverConn.Close()
							return
						}
						time.Sleep(delay)
					}
				} else {
					_, err := clientConn.Write(tmp)
					if err != nil {
						clientConn.Close()
						serverConn.Close()
						return
					}
				}
			}
		}

		go c2sCor()
		go s2cCor()
	} else {
		clientConn.Close()
	}
}

func runProxy(proxyAddr string, serverAddr string) {
	tcpAddr, err := net.ResolveTCPAddr("tcp4", proxyAddr)
	if err != nil {
		fmt.Println("ResolveTCPAddr Error: ", err)
		return
	}

	listener, err2 := net.ListenTCP("tcp", tcpAddr)
	if err2 != nil {
		fmt.Println("ListenTCP Error: ", err2)
		return
	}

	defer listener.Close()

	fmt.Println(fmt.Sprintf("proxy running on: [%s -> %s]", proxyAddr, serverAddr))
	for {
		conn, err := listener.AcceptTCP()

		if err != nil {
			fmt.Println("AcceptTCP Error: ", err2)
		} else {
			go tunnel(conn, serverAddr)
		}
	}
}
@lesismal
Copy link
Author

Try to imagine that a connection comes, the netpoll event is passed to the pool, a goroutine of the pool will handle the upgrade, but this connection only sends a part of the bytes of the complete handshake message, because of its blocking mode, the handshake is blocked, then the goroutine is blocked. The handshake will return an error after the deadliner timeout and release the goroutine. Your server will be very slow to process new connections.

Try again to imagine that after a successful ws handshake, the client starts to send data to the server, but only a part of the bytes of a complete ws message is sent, netpoll vent is passed to the pool, and a goroutine of the pool starts to read, because of its blocking mode, read ws message is blocked, then the goroutine is blocked. If many connections behave like this, all the goroutines of the pool will be blocked, and your service will become unavailable.

@gobwas
Copy link
Owner

gobwas commented Jul 9, 2021

This problem was discussed here:
gobwas/ws#143.

Now I'm going to close this.

@gobwas gobwas closed this as completed Jul 9, 2021
@hellochenwang
Copy link

@lesismal is there an example to use gobwas with nbio?

@lesismal
Copy link
Author

lesismal commented Sep 3, 2021

ample to use gobwas with n

as I have said, gobwas has not implemented async-streaming-parsing, we should not use poller+gobwas but only use gobwas with std conn.

nbio support websocket itsself:
https://github.com/lesismal/nbio/blob/master/examples/websocket/server/server.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants