Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client will hang there if the server side crashed after the connection has already been established. #38

Open
breezelxp opened this issue Mar 26, 2018 · 2 comments

Comments

@breezelxp
Copy link

Let's see the example below:

Server side:

package main

import (
	"fmt"
	"net"
	"net/http"
	"time"

	"github.com/donovanhide/eventsource"
)

type TimeEvent time.Time

func (t TimeEvent) Id() string    { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string  { return time.Time(t).String() }

const (
	TICK_COUNT = 5
)

func TimePublisher(srv *eventsource.Server) {
	start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <- ticker.C:
		}
		srv.Publish([]string{"time"}, TimeEvent(start))
		start = start.Add(time.Second)
	}
}

func main() {
	srv := eventsource.NewServer()
	srv.Gzip = true
	defer srv.Close()
	l, err := net.Listen("tcp", "127.0.0.1:8099")
	if err != nil {
		return
	}
	defer l.Close()
	http.HandleFunc("/time", srv.Handler("time"))
	go http.Serve(l, nil)
	go TimePublisher(srv)
	fmt.Println("event source started.")
	select {}
}

Client side:

package main

import (
	"fmt"
	"github.com/donovanhide/eventsource"
)

func main() {
	stream, err := eventsource.Subscribe("http://127.0.0.1:8099/time", "")
	if err != nil {
		return
	}
	for ev := range stream.Events{
		fmt.Println(ev.Id(), ev.Event(), ev.Data())
	}
}

You'll find that after the connection has been extablished, and the client side has print logs like this:

1356998406000000000 Tick 2013-01-01 00:00:06 +0000 UTC
1356998407000000000 Tick 2013-01-01 00:00:07 +0000 UTC
1356998408000000000 Tick 2013-01-01 00:00:08 +0000 UTC
1356998409000000000 Tick 2013-01-01 00:00:09 +0000 UTC
1356998410000000000 Tick 2013-01-01 00:00:10 +0000 UTC
1356998411000000000 Tick 2013-01-01 00:00:11 +0000 UTC

But, if you kill the server side process now, you can find that client just hang there, and no errors occurs.

And there is no retry actions.

How to solve this, guys ?

@breezelxp breezelxp changed the title the client will hang there if the server side crashed after the connection has already been established. Client will hang there if the server side crashed after the connection has already been established. Mar 26, 2018
@waylandc
Copy link

The client hangs on https://github.com/donovanhide/eventsource/blob/3ed64d21fb0b6bd8b49bcfec08f3004daee8723d/stream.go#L193

I've been playing with this to find a fix but I'm not entirely sure my approach is correct yet. It seems that there's no error detection so I set the stream.isClosed if I encounter an error during Decode().

Let me share what I've done so far. If someone can finish or correct what I've tried if, please do so.

My comments are prefixed with WC

func (stream *Stream) receiveEvents(r io.ReadCloser) {
	dec := NewDecoder(r)

	for {
		ev, err := dec.Decode()
		if stream.isStreamClosed() {
			return
		}
		if err != nil {
			// WC Not sure if this is correct but let's close the stream so we know to reconnect later
			stream.markStreamClosed()
			// WC putting this err on the channel seems to be cause of Issue #38
			//stream.Errors <- err
			return
		}

		pub := ev.(*publication)
		if pub.Retry() > 0 {
			stream.retry = time.Duration(pub.Retry()) * time.Millisecond
		}
		if len(pub.Id()) > 0 {
			stream.lastEventId = pub.Id()
		}
		stream.Events <- ev
	}
}

func (stream *Stream) retryRestartStream() {
	backoff := stream.retry
	for {
		time.Sleep(backoff)
		if stream.isStreamClosed() {
			stream.Logger.Println("while retry stream is closed")
			// WC not sure why we'd return if stream is closed, probably my misunderstanding of what should invoke a restart
			// WC but I'm going to comment this out for now as I want the for loop to continue until it can restart/reconnect
			//return
		}
		// NOTE: because of the defer we're opening the new connection
		// before closing the old one. Shouldn't be a problem in practice,
		// but something to be aware of.
		// WC added this to only reconnect to stream if it's closed
		if stream.isStreamClosed() {
			if stream.Logger != nil {
				stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
			}
			r, err := stream.connect()
			if err == nil {
				stream.Logger.Println("reconnected")
				// WC set isStreamClosed to false (I copied markStreamClosed up top)
				stream.markStreamOpen()
				go stream.stream(r)
				return
			} else {
				stream.Logger.Printf("retryRestart couldn't connect: %v\n", err)
			}
			// WC I don't see where we're using this stream.Errors channel so commenting out for now
			//stream.Errors <- err
			backoff *= 2
		}
	}
}

if I shutdown server and then restart it, I can reconnect and receive events but I'm getting errors which I suspect are lingering http.Requests from the failed connect attempts.

net/http: request canceled (Client.Timeout exceeded while reading body)

@garbelini
Copy link

For future reference:
It seems the client blocks on the errors channel. All we need to do is flush that channel to let the client continue:

stream, err := eventsource.SubscribeWith("", client, request)
if err != nil {
    log.Fatal(err)
}

go func() {
    for true {
        streamError := <-stream.Errors
        log.Debug(streamError)
    }
}()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants