Skip to content

Commit

Permalink
ntf: batch supervisor commands (#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
spaced4ndy authored Sep 13, 2024
1 parent e247f69 commit ea67b34
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 87 deletions.
62 changes: 37 additions & 25 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ newRcvConnSrv c userId connId enableNtfs cMode clientData pqInitKeys subMode srv
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
when enableNtfs $ do
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, NSCCreate)
atomically $ sendNtfSubCommand ns (NSCCreate, [connId])
let crData = ConnReqUriData SSSimplex smpAgentVRange [qUri] clientData
case cMode of
SCMContact -> pure (connId, CRContactUri crData)
Expand Down Expand Up @@ -923,7 +923,7 @@ createReplyQueue c ConnData {userId, connId, enableNtfs} SndQueue {smpClientVers
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
when enableNtfs $ do
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, NSCCreate)
atomically $ sendNtfSubCommand ns (NSCCreate, [connId])
pure qInfo

-- | Approve confirmation (LET command) in Reader monad
Expand Down Expand Up @@ -1013,14 +1013,13 @@ subscribeConnections' c connIds = do
order _ = 4
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType ()) -> Map ConnId SomeConn -> AM' ()
sendNtfCreate ns rcvRs cs = do
-- TODO this needs to be batched end to end.
-- Currently, the only change is to ignore failed subscriptions.
let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs
forM_ (M.restrictKeys cs oks) $ \case
SomeConn _ conn -> do
let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCSmpDelete
ConnData {connId} = toConnData conn
atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
cs' = M.restrictKeys cs oks
(csCreate, csDelete) = M.partition (\(SomeConn _ conn) -> enableNtfs $ toConnData conn) cs'
sendNtfCmd NSCCreate csCreate
sendNtfCmd NSCSmpDelete csDelete
where
sendNtfCmd cmd cs' = forM_ (L.nonEmpty $ M.keys cs') $ \cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids)
resumeDelivery :: Map ConnId SomeConn -> AM ()
resumeDelivery conns = do
conns' <- M.restrictKeys conns . S.fromList <$> withStore' c getConnectionsForDelivery
Expand Down Expand Up @@ -1259,7 +1258,7 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
withStore' c $ \db -> deleteConnRcvQueue db rq'
when (enableNtfs cData) $ do
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, NSCCreate)
atomically $ sendNtfSubCommand ns (NSCCreate, [connId])
let conn' = DuplexConnection cData (rq'' :| rqs') sqs
notify $ SWITCH QDRcv SPCompleted $ connectionStats conn'
_ -> internalErr "ICQDelete: cannot delete the only queue in connection"
Expand Down Expand Up @@ -1716,12 +1715,6 @@ connRcvQueues = \case
SndConnection _ _ -> []
NewConnection _ -> []

disableConn :: AgentClient -> ConnId -> AM' ()
disableConn c connId = do
atomically $ removeSubscription c connId
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDeleteSub)

-- Unlike deleteConnectionsAsync, this function does not mark connections as deleted in case of deletion failure.
deleteConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType ()))
deleteConnections' = deleteConnections_ getConns False False
Expand All @@ -1748,7 +1741,7 @@ prepareDeleteConnections_ getConnections c waitDelivery connIds = do
(delRs, rcvQs) = M.mapEither rcvQueues cs
rqs = concat $ M.elems rcvQs
connIds' = M.keys rcvQs
lift . forM_ connIds' $ disableConn c
lift $ forM_ (L.nonEmpty connIds') unsubConnIds
-- ! delRs is not used to notify about the result in any of the calling functions,
-- ! it is only used to check results count in deleteConnections_;
-- ! if it was used to notify about the result, it might be necessary to differentiate
Expand All @@ -1762,6 +1755,12 @@ prepareDeleteConnections_ getConnections c waitDelivery connIds = do
rcvQueues (SomeConn _ conn) = case connRcvQueues conn of
[] -> Left $ Right ()
rqs -> Right rqs
unsubConnIds :: NonEmpty ConnId -> AM' ()
unsubConnIds connIds' = do
forM_ connIds' $ \connId ->
atomically $ removeSubscription c connId
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (NSCDeleteSub, connIds')
notify = atomically . writeTBQueue (subQ c)

deleteConnQueues :: AgentClient -> Bool -> Bool -> [RcvQueue] -> AM' (Map ConnId (Either AgentErrorType ()))
Expand Down Expand Up @@ -2018,7 +2017,7 @@ toggleConnectionNtfs' c connId enable = do
withStore' c $ \db -> setConnectionNtfs db connId enable
ns <- asks ntfSupervisor
let cmd = if enable then NSCCreate else NSCSmpDelete
atomically $ sendNtfSubCommand ns (connId, cmd)
atomically $ sendNtfSubCommand ns (cmd, [connId])

deleteToken_ :: AgentClient -> NtfToken -> AM ()
deleteToken_ c@AgentClient {subQ} tkn@NtfToken {ntfTokenId, ntfTknStatus} = do
Expand Down Expand Up @@ -2066,13 +2065,26 @@ deleteNtfSubs c deleteCmd = do
sendNtfConnCommands :: AgentClient -> NtfSupervisorCommand -> AM ()
sendNtfConnCommands c cmd = do
ns <- asks ntfSupervisor
connIds <- liftIO $ getSubscriptions c
forM_ connIds $ \connId -> do
withStore' c (`getConnData` connId) >>= \case
Just (ConnData {enableNtfs}, _) ->
when enableNtfs . atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
_ ->
atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ ERR $ INTERNAL "no connection data")
connIds <- liftIO $ S.toList <$> getSubscriptions c
rs <- lift $ withStoreBatch' c (\db -> map (getConnData db) connIds)
let (connIds', errs) = enabledNtfConns (zip connIds rs)
forM_ (L.nonEmpty connIds') $ \connIds'' ->
atomically $ writeTBQueue (ntfSubQ ns) (cmd, connIds'')
-- TODO [batch ntf] notify ERRS
forM_ errs $ \(connId, e) ->
atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ ERR e)
where
enabledNtfConns :: [(ConnId, Either AgentErrorType (Maybe (ConnData, ConnectionMode)))] -> ([ConnId], [(ConnId, AgentErrorType)])
enabledNtfConns = foldr addEnabledConn ([], [])
where
addEnabledConn ::
(ConnId, Either AgentErrorType (Maybe (ConnData, ConnectionMode))) ->
([ConnId], [(ConnId, AgentErrorType)]) ->
([ConnId], [(ConnId, AgentErrorType)])
addEnabledConn cData_ (cIds, errs) = case cData_ of
(_, Right (Just (ConnData {connId, enableNtfs}, _))) -> if enableNtfs then (connId : cIds, errs) else (cIds, errs)
(connId, Right Nothing) -> (cIds, (connId, INTERNAL "no connection data") : errs)
(connId, Left e) -> (cIds, (connId, e) : errs)

setNtfServers :: AgentClient -> [NtfServer] -> IO ()
setNtfServers c = atomically . writeTVar (ntfServers c)
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ createAgentStore dbFilePath dbKey keepKey = createSQLiteStore dbFilePath dbKey k

data NtfSupervisor = NtfSupervisor
{ ntfTkn :: TVar (Maybe NtfToken),
ntfSubQ :: TBQueue (ConnId, NtfSupervisorCommand),
ntfSubQ :: TBQueue (NtfSupervisorCommand, NonEmpty ConnId),
ntfWorkers :: TMap NtfServer Worker,
ntfSMPWorkers :: TMap SMPServer Worker
}
Expand Down
Loading

0 comments on commit ea67b34

Please sign in to comment.