Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: move subscriptions to pending when reconnecting servers #1220

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 17 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 17 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

-- |
-- Module : Simplex.Messaging.Agent
Expand Down Expand Up @@ -479,7 +479,7 @@

reconnectAllServers :: AgentClient -> IO ()
reconnectAllServers c = do
reconnectServerClients c smpClients
withAgentEnv' c $ reconnectSMPServerClients c
reconnectServerClients c xftpClients
reconnectServerClients c ntfClients

Expand Down
25 changes: 25 additions & 0 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
closeAgentClient,
closeProtocolServerClients,
reconnectServerClients,
reconnectSMPServerClients,
reconnectSMPServer,
closeXFTPServerClient,
runSMPServerTest,
Expand Down Expand Up @@ -922,6 +923,30 @@
reconnectServerClients c clientsSel =
readTVarIO (clientsSel c) >>= mapM_ (forkIO . closeClient_ c)

reconnectSMPServerClients :: AgentClient -> AM' ()
reconnectSMPServerClients c = do
(clients, qs) <- atomically $ do
clients <- swapTVar (smpClients c) M.empty
qs <- RQ.getDelAllQueues (activeSubs c)
qs' <- RQ.getDelAllQueues (pendingSubs c)
pure (clients, qs <> qs')
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone DOWN_ALL)
mapM_ (liftIO . forkIO . closeClient_ c) clients
(qSubRs, _) <- subscribeQueues c qs
let upConns = subscribedConnsByServer qSubRs
forM_ (M.toList upConns) $ \(server, connIds) ->
liftIO $ notifyUP server (S.toList . S.fromList $ connIds)
where
subscribedConnsByServer :: [(RcvQueue, Either AgentErrorType ())] -> Map SMPServer [ConnId]
subscribedConnsByServer = foldl' insertConnId M.empty
where
insertConnId :: Map SMPServer [ConnId] -> (RcvQueue, Either AgentErrorType ()) -> Map SMPServer [ConnId]
insertConnId acc (RcvQueue {server, connId}, qSubResult) = case qSubResult of
Right _ -> M.insertWith (<>) server [connId] acc
Left _ -> acc
notifyUP :: SMPServer -> [ConnId] -> IO ()
notifyUP server connIds = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone (UP server connIds))

reconnectSMPServer :: AgentClient -> UserId -> SMPServer -> IO ()
reconnectSMPServer c userId srv = do
cs <- readTVarIO $ smpClients c
Expand Down Expand Up @@ -1904,7 +1929,7 @@
SEDatabaseBusy e -> CRITICAL True $ B.unpack e
e -> INTERNAL $ show e

userServers :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> TMap UserId (NonEmpty (ProtoServerWithAuth p))

Check warning on line 1932 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-9.6.3

Redundant constraint: UserProtocol p

Check warning on line 1932 in src/Simplex/Messaging/Agent/Client.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-22.04-9.6.3

Redundant constraint: UserProtocol p
userServers c = case protocolTypeI @p of
SPSMP -> smpServers c
SPXFTP -> xftpServers c
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ data AEvent (e :: AEntity) where
CONNECT :: AProtocolType -> TransportHost -> AEvent AENone
DISCONNECT :: AProtocolType -> TransportHost -> AEvent AENone
DOWN :: SMPServer -> [ConnId] -> AEvent AENone
DOWN_ALL :: AEvent AENone
UP :: SMPServer -> [ConnId] -> AEvent AENone
SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> AEvent AEConn
RSYNC :: RatchetSyncState -> Maybe AgentCryptoError -> ConnectionStats -> AEvent AEConn
Expand Down Expand Up @@ -406,6 +407,7 @@ data AEventTag (e :: AEntity) where
CONNECT_ :: AEventTag AENone
DISCONNECT_ :: AEventTag AENone
DOWN_ :: AEventTag AENone
DOWN_ALL_ :: AEventTag AENone
UP_ :: AEventTag AENone
SWITCH_ :: AEventTag AEConn
RSYNC_ :: AEventTag AEConn
Expand Down Expand Up @@ -458,6 +460,7 @@ aEventTag = \case
CONNECT {} -> CONNECT_
DISCONNECT {} -> DISCONNECT_
DOWN {} -> DOWN_
DOWN_ALL {} -> DOWN_ALL_
UP {} -> UP_
SWITCH {} -> SWITCH_
RSYNC {} -> RSYNC_
Expand Down
6 changes: 6 additions & 0 deletions src/Simplex/Messaging/Agent/TRcvQueues.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Simplex.Messaging.Agent.TRcvQueues
deleteQueue,
getSessQueues,
getDelSessQueues,
getDelAllQueues,
qKey,
)
where
Expand Down Expand Up @@ -96,6 +97,11 @@ getDelSessQueues tSess (TRcvQueues qs cs) = do
Nothing -> (cId : removed, Nothing)
Nothing -> (removed, Nothing) -- "impossible" in invariant holds, because we get keys from the known queues

getDelAllQueues :: TRcvQueues -> STM [RcvQueue]
getDelAllQueues (TRcvQueues qs cs) = do
writeTVar cs M.empty
M.elems <$> swapTVar qs M.empty

isSession :: RcvQueue -> (UserId, SMPServer, Maybe ConnId) -> Bool
isSession rq (uId, srv, connId_) =
userId rq == uId && server rq == srv && maybe True (connId rq ==) connId_
Expand Down
23 changes: 8 additions & 15 deletions tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wno-orphans #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 17 in tests/AgentTests/FunctionalAPITests.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

module AgentTests.FunctionalAPITests
( functionalAPITests,
Expand Down Expand Up @@ -2904,7 +2905,7 @@
_ -> error "timeout"

testTwoUsers :: HasCallStack => IO ()
testTwoUsers = withAgentClients2 $ \a b -> do
testTwoUsers = withAgentClientsCfg2 aCfg aCfg $ \a b -> do
let nc = netCfg initAgentServers
sessionMode nc `shouldBe` TSMUser
runRight_ $ do
Expand All @@ -2916,7 +2917,7 @@
b `hasClients` 1
liftIO $ setNetworkConfig a nc {sessionMode = TSMEntity}
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN_ALL) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 2

Expand All @@ -2925,9 +2926,7 @@
liftIO $ threadDelay 250000
liftIO $ setNetworkConfig a nc {sessionMode = TSMUser}
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", DOWN_ALL) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 1

Expand All @@ -2940,9 +2939,7 @@
b `hasClients` 1
liftIO $ setNetworkConfig a nc {sessionMode = TSMEntity}
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", DOWN_ALL) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 4
exchangeGreetingsMsgId 6 a bId1 b aId1
Expand All @@ -2952,20 +2949,16 @@
liftIO $ threadDelay 250000
liftIO $ setNetworkConfig a nc {sessionMode = TSMUser}
liftIO $ threadDelay 250000
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", DOWN _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", UP _ _) <- nGet a
("", "", DOWN_ALL) <- nGet a
("", "", UP _ _) <- nGet a
a `hasClients` 2
exchangeGreetingsMsgId 8 a bId1 b aId1
exchangeGreetingsMsgId 8 a bId1' b aId1'
exchangeGreetingsMsgId 6 a bId2 b aId2
exchangeGreetingsMsgId 6 a bId2' b aId2'
where
aCfg :: AgentConfig
aCfg = agentCfg {tbqSize = 16}
hasClients :: HasCallStack => AgentClient -> Int -> ExceptT AgentErrorType IO ()
hasClients c n = liftIO $ M.size <$> readTVarIO (smpClients c) `shouldReturn` n

Expand Down
2 changes: 1 addition & 1 deletion tests/CoreTests/TRcvQueuesTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ batchIdempotentTest = do
atomically $ RQ.batchAddQueues trq qs
checkDataInvariant trq `shouldReturn` True
readTVarIO (RQ.getRcvQueues trq) `shouldReturn` qs'
fmap L.nub <$> readTVarIO (RQ.getConnections trq) `shouldReturn`cs' -- connections get duplicated, but that doesn't appear to affect anybody
fmap L.nub <$> readTVarIO (RQ.getConnections trq) `shouldReturn` cs' -- connections get duplicated, but that doesn't appear to affect anybody

deleteConnTest :: IO ()
deleteConnTest = do
Expand Down
Loading