diff --git a/README.md b/README.md index cd5edff49..d0f39d267 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Gracefully recover from a temporarily unreachable server ([details](https://github.com/globalsign/mgo/pull/69)) * Use JSON tags when no explicit BSON are tags set ([details](https://github.com/globalsign/mgo/pull/91)) * Support [$changeStream](https://docs.mongodb.com/manual/changeStreams/) tailing on 3.6+ ([details](https://github.com/globalsign/mgo/pull/97)) +* Fix deadlock in cluster synchronisation ([details](https://github.com/globalsign/mgo/issues/120)) --- @@ -46,11 +47,13 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * @carter2000 * @cezarsa * @drichelson +* @dvic * @eaglerayp * @feliixx * @fmpwizard * @idy * @jameinel +* @KJTsanaktsidis * @gazoon * @mapete94 * @peterdeka diff --git a/cluster.go b/cluster.go index ac461d5b9..d639a4193 100644 --- a/cluster.go +++ b/cluster.go @@ -179,7 +179,7 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul }) }) - err := session.Run(cmd, result) + err := session.runOnSocket(socket, cmd, result) session.Close() return err } diff --git a/cluster_test.go b/cluster_test.go index 8945e0962..a0a197048 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1964,6 +1964,41 @@ func (s *S) TestConnectCloseConcurrency(c *C) { wg.Wait() } +func (s *S) TestNoDeadlockOnClose(c *C) { + if *fast { + // Unfortunately I seem to need quite a high dial timeout to get this to work + // on my machine. + c.Skip("-fast") + } + + var shouldStop int32 + atomic.StoreInt32(&shouldStop, 0) + + listener, err := net.Listen("tcp4", "127.0.0.1:") + c.Check(err, Equals, nil) + + go func() { + for atomic.LoadInt32(&shouldStop) == 0 { + sock, err := listener.Accept() + if err != nil { + // Probs just closed + continue + } + sock.Close() + } + }() + defer func() { + atomic.StoreInt32(&shouldStop, 1) + listener.Close() + }() + + session, err := mgo.DialWithTimeout(listener.Addr().String(), 10*time.Second) + // If execution reaches here, the deadlock did not happen and all is OK + if session != nil { + session.Close() + } +} + func (s *S) TestSelectServers(c *C) { if !s.versionAtLeast(2, 2) { c.Skip("read preferences introduced in 2.2") diff --git a/session.go b/session.go index 561f79487..0fae61498 100644 --- a/session.go +++ b/session.go @@ -806,6 +806,15 @@ func (db *Database) Run(cmd interface{}, result interface{}) error { return db.run(socket, cmd, result) } +// runOnSocket does the same as Run, but guarantees that your command will be run +// on the provided socket instance; if it's unhealthy, you will receive the error +// from it. +func (db *Database) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + socket.Acquire() + defer socket.Release() + return db.run(socket, cmd, result) +} + // Credential holds details to authenticate with a MongoDB server. type Credential struct { // Username and Password hold the basic details for authentication. @@ -2270,6 +2279,13 @@ func (s *Session) Run(cmd interface{}, result interface{}) error { return s.DB("admin").Run(cmd, result) } +// runOnSocket does the same as Run, but guarantees that your command will be run +// on the provided socket instance; if it's unhealthy, you will receive the error +// from it. +func (s *Session) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + return s.DB("admin").runOnSocket(socket, cmd, result) +} + // SelectServers restricts communication to servers configured with the // given tags. For example, the following statement restricts servers // used for reading operations to those with both tag "disk" set to