Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttrpc client send hang, even ctx is timeout. #174

Open
ningmingxiao opened this issue Sep 9, 2024 · 3 comments
Open

ttrpc client send hang, even ctx is timeout. #174

ningmingxiao opened this issue Sep 9, 2024 · 3 comments

Comments

@ningmingxiao
Copy link

ningmingxiao commented Sep 9, 2024

ttrpc send hang, even ctx is timeout

goroutine 6132118 [IO wait, 1818 minutes]:

internal/poll.runtime_pollWait(0x7f9e8013bbd0, 0x77)

        /usr/local/go/src/runtime/netpoll.go:306 +0x89

internal/poll.(*pollDesc).wait(0xc000f2b900?, 0xc0021e6000?, 0x0)

        /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0x32

internal/poll.(*pollDesc).waitWrite(...)

        /usr/local/go/src/internal/poll/fd_poll_runtime.go:93

internal/poll.(*FD).Write(0xc000f2b900, {0xc0021e6000, 0x74, 0x1000})

        /usr/local/go/src/internal/poll/fd_unix.go:391 +0x2f6

net.(*netFD).Write(0xc000f2b900, {0xc0021e6000?, 0xc000abf260?, 0x41c4f1?})

        /usr/local/go/src/net/fd_posix.go:96 +0x29

net.(*conn).Write(0xc00128a238, {0xc0021e6000?, 0x5222f5?, 0x7f9e8008bbd0?})

        /usr/local/go/src/net/net.go:195 +0x45

bufio.(*Writer).Flush(0xc0021de800)

        /usr/local/go/src/bufio/bufio.go:628 +0x62

github.com/containerd/ttrpc.(*channel).send(0xc0021de840, 0x21a7680?, 0xc0?, 0x0?, {0xc002fa2070, 0x6a, 0x6a})

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/channel.go:161 +0x98

github.com/containerd/ttrpc.(*Client).createStream(0xc001b59290, 0x0?, {0xc002fa2070, 0x6a, 0x6a})

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:370 +0x245

github.com/containerd/ttrpc.(*Client).dispatch(0xc001b59290, {0x1cba358, 0xc005081d70}, 0xc000462800?, 0xc0019feda0?)

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:480 +0x95

github.com/containerd/ttrpc.defaultClientInterceptor({0x1cba358?, 0xc005081d70?}, 0x2b5d3c0?, 0xc0056ecc40?, 0xc000abf5e0?, 0x45a7b1?)

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/interceptor.go:56 +0x26

github.com/containerd/ttrpc.(*Client).Call(0xc001b59290, {0x1cba358, 0xc005081d70}, {0x1a4b86d, 0x17}, {0x1a2ad25, 0x5}, {0x18ddd60?, 0xc003a836d0?}, {0x1993ba0, ...})

        github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:134 +0x353

github.com/containerd/containerd/api/runtime/task/v2.(*taskClient).State(0xc00128b950, {0x1cba358, 0xc005081d70}, 0x2b5d3c0?)

        github.com/containerd/containerd/api/runtime/task/v2/shim_ttrpc.pb.go:169 +0x98

github.com/containerd/containerd/runtime/v2.(*shimTask).State(0xc000f2f960, {0x1cba358, 0xc005081d70})

        github.com/containerd/containerd/runtime/v2/shim.go:698 +0xd4

github.com/containerd/containerd/services/tasks.getProcessState({0x1cba390?, 0xc005081aa0?}, {0x7f9e800dcf10, 0xc000f2f960})

        github.com/containerd/containerd/services/tasks/local.go:340 +0xef

github.com/containerd/containerd/services/tasks.(*local).Get(0xc003a83540?, {0x1cba390, 0xc005081aa0}, 0xc003a83540, {0x194a040?, 0x19be760?, 0x40f000?})

        github.com/containerd/containerd/services/tasks/local.go:386 +0xe5

github.com/containerd/containerd/services/tasks.(*service).Get(0x18e0160?, {0x1cba390?, 0xc005081aa0?}, 0x0?)

        github.com/containerd/containerd/services/tasks/service.go:86 +0x33

github.com/containerd/containerd/api/services/tasks/v1._Tasks_Get_Handler({0x19be760?, 0xc000014fc0}, {0x1cba390, 0xc005081aa0}, 0xc0036e90a0, 0x0)

        github.com/containerd/containerd/api/services/tasks/v1/tasks_grpc.pb.go:384 +0x170

google.golang.org/grpc.(*Server).processUnaryRPC(0xc0003e4000, {0x1cc2a40, 0xc003e6e000}, 0xc002ae5e60, 0xc00047de90, 0x2abe980, 0x0)

        github.com/containerd/containerd/vendor/google.golang.org/grpc/server.go:1336 +0xd33

google.golang.org/grpc.(*Server).handleStream(0xc0003e4000, {0x1cc2a40, 0xc003e6e000}, 0xc002ae5e60, 0x0)

        github.com/containerd/containerd/vendor/google.golang.org/grpc/server.go:1704 +0xa36

google.golang.org/grpc.(*Server).serveStreams.func1.2()

        github.com/containerd/containerd/vendor/google.golang.org/grpc/server.go:965 +0x98

created by google.golang.org/grpc.(*Server).serveStreams.func1

        github.com/containerd/containerd/vendor/google.golang.org/grpc/server.go:963 +0x28a
@ningmingxiao ningmingxiao changed the title ttrpc client send hang, event ctx is timeout. ttrpc client send hang, even ctx is timeout. Sep 9, 2024
@kevpar
Copy link
Member

kevpar commented Oct 8, 2024

What version of ttrpc are you using on the client? From your line numbers, it appears you may be using an old version.

We fixed a deadlock in the client in v1.2.4, it could be what you are hitting: 1b4f6f8

@ningmingxiao
Copy link
Author

ningmingxiao commented Oct 16, 2024

some stack

goroutine 4796 [IO wait]:
internal/poll.runtime_pollWait(0x7f9e8013bea0, 0x72)
        /usr/local/go/src/runtime/netpoll.go:306 +0x89
internal/poll.(*pollDesc).wait(0xc000138e80?, 0xc00075f000?, 0x0)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0x32
internal/poll.(*pollDesc).waitRead(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0xc000138e80, {0xc00075f000, 0x1000, 0x1000})
        /usr/local/go/src/internal/poll/fd_unix.go:167 +0x299
net.(*netFD).Read(0xc000138e80, {0xc00075f000?, 0xc00336dcf8?, 0x2?})
        /usr/local/go/src/net/fd_posix.go:55 +0x29
net.(*conn).Read(0xc000126858, {0xc00075f000?, 0x418313?, 0xc000905918?})
        /usr/local/go/src/net/net.go:183 +0x45
bufio.(*Reader).Read(0xc000d60060, {0xc000731220, 0xa, 0xc000bb14a0?})
        /usr/local/go/src/bufio/bufio.go:237 +0x1bb
io.ReadAtLeast({0x1ca1b00, 0xc000d60060}, {0xc000731220, 0xa, 0xa}, 0xa)
        /usr/local/go/src/io/io.go:332 +0x9a
io.ReadFull(...)
        /usr/local/go/src/io/io.go:351
github.com/containerd/ttrpc.readMessageHeader({0xc000731220?, 0xa?, 0x30?}, {0x1ca1b00?, 0xc000d60060?})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/channel.go:73 +0x55
github.com/containerd/ttrpc.(*channel).recv(0xc000731200)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/channel.go:121 +0x4d
github.com/containerd/ttrpc.(*Client).receiveLoop(0xc0008047e0)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:320 +0x85
github.com/containerd/ttrpc.(*Client).run(0xc0008047e0)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:301 +0x1e
created by github.com/containerd/ttrpc.NewClient
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:95 +0x1f6


goroutine 6132118 [IO wait, 1818 minutes]:
internal/poll.runtime_pollWait(0x7f9e8013bbd0, 0x77)
        /usr/local/go/src/runtime/netpoll.go:306 +0x89
internal/poll.(*pollDesc).wait(0xc000f2b900?, 0xc0021e6000?, 0x0)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:84 +0x32
internal/poll.(*pollDesc).waitWrite(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:93
internal/poll.(*FD).Write(0xc000f2b900, {0xc0021e6000, 0x74, 0x1000})
        /usr/local/go/src/internal/poll/fd_unix.go:391 +0x2f6
net.(*netFD).Write(0xc000f2b900, {0xc0021e6000?, 0xc000abf260?, 0x41c4f1?})
        /usr/local/go/src/net/fd_posix.go:96 +0x29
net.(*conn).Write(0xc00128a238, {0xc0021e6000?, 0x5222f5?, 0x7f9e8008bbd0?})
        /usr/local/go/src/net/net.go:195 +0x45
bufio.(*Writer).Flush(0xc0021de800)
        /usr/local/go/src/bufio/bufio.go:628 +0x62
github.com/containerd/ttrpc.(*channel).send(0xc0021de840, 0x21a7680?, 0xc0?, 0x0?, {0xc002fa2070, 0x6a, 0x6a})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/channel.go:161 +0x98
github.com/containerd/ttrpc.(*Client).createStream(0xc001b59290, 0x0?, {0xc002fa2070, 0x6a, 0x6a})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:370 +0x245
github.com/containerd/ttrpc.(*Client).dispatch(0xc001b59290, {0x1cba358, 0xc005081d70}, 0xc000462800?, 0xc0019feda0?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:480 +0x95
github.com/containerd/ttrpc.defaultClientInterceptor({0x1cba358?, 0xc005081d70?}, 0x2b5d3c0?, 0xc0056ecc40?, 0xc000abf5e0?, 0x45a7b1?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/interceptor.go:56 +0x26
github.com/containerd/ttrpc.(*Client).Call(0xc001b59290, {0x1cba358, 0xc005081d70}, {0x1a4b86d, 0x17}, {0x1a2ad25, 0x5}, {0x18ddd60?, 0xc003a836d0?}, {0x1993ba0, ...})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:134 +0x353
github.com/containerd/containerd/api/runtime/task/v2.(*taskClient).State(0xc00128b950, {0x1cba358, 0xc005081d70}, 0x2b5d3c0?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/api/runtime/task/v2/shim_ttrpc.pb.go:169 +0x98
		

sync.runtime_SemacquireMutex(0xc00124b800?, 0x4b?, 0x0?)
        /usr/local/go/src/runtime/sema.go:77 +0x26
sync.(*Mutex).lockSlow(0xc001b592cc)
        /usr/local/go/src/sync/mutex.go:171 +0x165
sync.(*Mutex).Lock(...)
        /usr/local/go/src/sync/mutex.go:90
github.com/containerd/ttrpc.(*Client).createStream(0xc001b59290, 0x0?, {0xc000f390a0, 0x6a, 0x6a})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:366 +0x1ce
github.com/containerd/ttrpc.(*Client).dispatch(0xc001b59290, {0x1cba358, 0xc0035bfdd0}, 0xc00124b800?, 0xc003836d30?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:480 +0x95
github.com/containerd/ttrpc.defaultClientInterceptor({0x1cba358?, 0xc0035bfdd0?}, 0x2b5d3c0?, 0xc00190ff80?, 0xc0053675e0?, 0x45a7b1?)
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/interceptor.go:56 +0x26
github.com/containerd/ttrpc.(*Client).Call(0xc001b59290, {0x1cba358, 0xc0035bfdd0}, {0x1a4b86d, 0x17}, {0x1a2ad25, 0x5}, {0x18ddd60?, 0xc003e172c0?}, {0x1993ba0, ...})
        /root/rpmbuild/BUILD/containerd.io-1.7.6/_build/src/github.com/containerd/containerd/vendor/github.com/containerd/ttrpc/client.go:134 +0x353

It will hit 1b4f6f8
I don't see it was blocked at "s := c.getStream(sid)" (It will block at c.getStream(sid))

func (c *Client) receiveLoop() error {
	for {
		select {
		case <-c.ctx.Done():
			return ErrClosed
		default:
			var (
				msg = &streamMessage{}
				err error
			)

			msg.header, msg.payload, err = c.channel.recv()
			if err != nil {
				_, ok := status.FromError(err)
				if !ok {
					// treat all errors that are not an rpc status as terminal.
					// all others poison the connection.
					return filterCloseErr(err)
				}
			}
			sid := streamID(msg.header.StreamID)
                        s := c.getStream(sid)

@kevpar i use https://github.com/containerd/ttrpc-rust as ttrpc server.

@ningmingxiao
Copy link
Author

ningmingxiao commented Oct 18, 2024

I review this pr 1b4f6f8

func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
	c.streamLock.Lock()

	// Check if closed since lock acquired to prevent adding
	// anything after cleanup completes
	select {
	case <-c.ctx.Done():
		c.streamLock.Unlock()
		return nil, ErrClosed
	default:
	}

	// Stream ID should be allocated at same time
	s := newStream(c.nextStreamID, c)
	c.streams[s.id] = s
	c.nextStreamID = c.nextStreamID + 2

	c.sendLock.Lock()
	defer c.sendLock.Unlock()
	c.streamLock.Unlock()

	if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
		return s, filterCloseErr(err)
	}

	return s, nil
}

If c.channel.send return there will not be dead lock.

If reader don't have chance to read data(hang at c.getStream ) ,will c.channel.send hang ?

func (c *Client) receiveLoop() error {
	for {
		select {
		case <-c.ctx.Done():
			return ErrClosed
		default:
			var (
				msg = &streamMessage{}
				err error
			)

			msg.header, msg.payload, err = c.channel.recv()
			if err != nil {
				_, ok := status.FromError(err)
				if !ok {
					// treat all errors that are not an rpc status as terminal.
					// all others poison the connection.
					return filterCloseErr(err)
				}
			}
			sid := streamID(msg.header.StreamID)
			s := c.getStream(sid)
			if s == nil {
				logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream")
				continue
			}

			if err != nil {
				s.closeWithError(err)
			} else {
				if err := s.receive(c.ctx, msg); err != nil {
					logrus.WithError(err).WithField("stream", sid).Errorf("ttrpc: failed to handle message")
				}
			}
		}
	}
}

can you show me your testcase ?
@kevpar thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants