From 9e61ef6c1adfc1b4d9baab64b93dac9e7e5c67d7 Mon Sep 17 00:00:00 2001 From: Alessio Perugini Date: Tue, 19 Sep 2023 11:06:33 +0200 Subject: [PATCH] remove client stream --- client_example/main.go | 10 +---- commands/board/list.go | 10 ++--- commands/daemon/daemon.go | 38 ++----------------- internal/cli/board/list.go | 4 +- internal/integrationtest/arduino-cli.go | 6 +-- .../integrationtest/daemon/daemon_test.go | 24 ++++++++---- 6 files changed, 32 insertions(+), 60 deletions(-) diff --git a/client_example/main.go b/client_example/main.go index 4dca271bfaa..925073ab8e2 100644 --- a/client_example/main.go +++ b/client_example/main.go @@ -657,16 +657,13 @@ func callBoardList(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) } func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) { - watchClient, err := client.BoardListWatch(context.Background()) + req := &rpc.BoardListWatchRequest{Instance: instance} + watchClient, err := client.BoardListWatch(context.Background(), req) if err != nil { log.Fatalf("Board list watch error: %s\n", err) } // Start the watcher - watchClient.Send(&rpc.BoardListWatchRequest{ - Instance: instance, - }) - go func() { for { res, err := watchClient.Recv() @@ -693,9 +690,6 @@ func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Insta // Watch for 10 seconds and then interrupts timer := time.NewTicker(time.Duration(10 * time.Second)) <-timer.C - watchClient.Send(&rpc.BoardListWatchRequest{ - Interrupt: true, - }) } func callPlatformUnInstall(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) { diff --git a/commands/board/list.go b/commands/board/list.go index 1ac055253b9..464e6410e1c 100644 --- a/commands/board/list.go +++ b/commands/board/list.go @@ -258,22 +258,22 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool { // Watch returns a channel that receives boards connection and disconnection events. // It also returns a callback function that must be used to stop and dispose the watch. -func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, func(), error) { +func Watch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, error) { pme, release := commands.GetPackageManagerExplorer(req) if pme == nil { - return nil, nil, &arduino.InvalidInstanceError{} + return nil, &arduino.InvalidInstanceError{} } defer release() dm := pme.DiscoveryManager() watcher, err := dm.Watch() if err != nil { - return nil, nil, err + return nil, err } - ctx, cancel := context.WithCancel(context.Background()) go func() { <-ctx.Done() + logrus.Trace("closed watch") watcher.Close() }() @@ -301,5 +301,5 @@ func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, } }() - return outChan, cancel, nil + return outChan, nil } diff --git a/commands/daemon/daemon.go b/commands/daemon/daemon.go index 1cb5a667b60..ec7fd10e9d9 100644 --- a/commands/daemon/daemon.go +++ b/commands/daemon/daemon.go @@ -86,18 +86,10 @@ func (s *ArduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardS } // BoardListWatch FIXMEDOC -func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_BoardListWatchServer) error { +func (s *ArduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error { syncSend := NewSynchronizedSend(stream.Send) - msg, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - if msg.Instance == nil { - err = fmt.Errorf(tr("no instance specified")) + if req.Instance == nil { + err := fmt.Errorf(tr("no instance specified")) syncSend.Send(&rpc.BoardListWatchResponse{ EventType: "error", Error: err.Error(), @@ -105,33 +97,11 @@ func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_Boa return err } - eventsChan, closeWatcher, err := board.Watch(msg) + eventsChan, err := board.Watch(stream.Context(), req) if err != nil { return convertErrorToRPCStatus(err) } - go func() { - defer closeWatcher() - for { - msg, err := stream.Recv() - // Handle client closing the stream and eventual errors - if err == io.EOF { - logrus.Info("boards watcher stream closed") - return - } - if err != nil { - logrus.Infof("interrupting boards watcher: %v", err) - return - } - - // Message received, does the client want to interrupt? - if msg != nil && msg.Interrupt { - logrus.Info("boards watcher interrupted by client") - return - } - } - }() - for event := range eventsChan { if err := syncSend.Send(event); err != nil { logrus.Infof("sending board watch message: %v", err) diff --git a/internal/cli/board/list.go b/internal/cli/board/list.go index c6a9ffb26e9..65b0bd5e04d 100644 --- a/internal/cli/board/list.go +++ b/internal/cli/board/list.go @@ -16,6 +16,7 @@ package board import ( + "context" "errors" "fmt" "os" @@ -84,11 +85,10 @@ func runListCommand(watch bool, timeout int64, fqbn string) { } func watchList(inst *rpc.Instance) { - eventsChan, closeCB, err := board.Watch(&rpc.BoardListWatchRequest{Instance: inst}) + eventsChan, err := board.Watch(context.Background(), &rpc.BoardListWatchRequest{Instance: inst}) if err != nil { feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork) } - defer closeCB() // This is done to avoid printing the header each time a new event is received if feedback.GetFormat() == feedback.Text { diff --git a/internal/integrationtest/arduino-cli.go b/internal/integrationtest/arduino-cli.go index 2af89abd807..0b45646f602 100644 --- a/internal/integrationtest/arduino-cli.go +++ b/internal/integrationtest/arduino-cli.go @@ -362,16 +362,16 @@ func (inst *ArduinoCLIInstance) BoardList(timeout time.Duration) (*commands.Boar } // BoardListWatch calls the "BoardListWatch" gRPC method. -func (inst *ArduinoCLIInstance) BoardListWatch() (commands.ArduinoCoreService_BoardListWatchClient, error) { +func (inst *ArduinoCLIInstance) BoardListWatch(ctx context.Context) (commands.ArduinoCoreService_BoardListWatchClient, error) { boardListWatchReq := &commands.BoardListWatchRequest{ Instance: inst.instance, } logCallf(">>> BoardListWatch(%v)\n", boardListWatchReq) - watcher, err := inst.cli.daemonClient.BoardListWatch(context.Background()) + watcher, err := inst.cli.daemonClient.BoardListWatch(ctx, boardListWatchReq) if err != nil { return watcher, err } - return watcher, watcher.Send(boardListWatchReq) + return watcher, nil } // PlatformInstall calls the "PlatformInstall" gRPC method. diff --git a/internal/integrationtest/daemon/daemon_test.go b/internal/integrationtest/daemon/daemon_test.go index d550ea2ed09..aaadfff4588 100644 --- a/internal/integrationtest/daemon/daemon_test.go +++ b/internal/integrationtest/daemon/daemon_test.go @@ -27,6 +27,8 @@ import ( "github.com/arduino/arduino-cli/internal/integrationtest" "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1" "github.com/arduino/go-paths-helper" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/stretchr/testify/require" ) @@ -54,28 +56,34 @@ func TestArduinoCliDaemon(t *testing.T) { testWatcher := func() { // Run watcher - watcher, err := grpcInst.BoardListWatch() + ctx, cancel := context.WithCancel(context.TODO()) + watcher, err := grpcInst.BoardListWatch(ctx) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + watcherCanceldCh := make(chan struct{}) go func() { - defer cancel() for { msg, err := watcher.Recv() if err == io.EOF { fmt.Println("Watcher EOF") return } - require.Empty(t, msg.Error, "Board list watcher returned an error") + if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { + fmt.Println("Watcher canceled") + watcherCanceldCh <- struct{}{} + return + } require.NoError(t, err, "BoardListWatch grpc call returned an error") - fmt.Printf("WATCH> %v\n", msg) + require.Empty(t, msg.Error, "Board list watcher returned an error") + fmt.Printf("WATCH> %v %v\n", msg, err) } }() time.Sleep(time.Second) - require.NoError(t, watcher.CloseSend()) + cancel() + time.Sleep(time.Second) select { - case <-ctx.Done(): + case <-watcherCanceldCh: // all right! - case <-time.After(time.Second): + case <-time.After(2 * time.Second): require.Fail(t, "BoardListWatch didn't close") } }