Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: ntf errs #1316

Merged
merged 7 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 17 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 17 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

-- |
-- Module : Simplex.Messaging.Agent
Expand Down Expand Up @@ -2067,12 +2067,10 @@
ns <- asks ntfSupervisor
connIds <- liftIO $ S.toList <$> getSubscriptions c
rs <- lift $ withStoreBatch' c (\db -> map (getConnData db) connIds)
let (connIds', errs) = enabledNtfConns (zip connIds rs)
let (connIds', cErrs) = enabledNtfConns (zip connIds rs)
forM_ (L.nonEmpty connIds') $ \connIds'' ->
atomically $ writeTBQueue (ntfSubQ ns) (cmd, connIds'')
-- TODO [batch ntf] notify ERRS
forM_ errs $ \(connId, e) ->
atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ ERR e)
unless (null cErrs) $ atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS cErrs)
where
enabledNtfConns :: [(ConnId, Either AgentErrorType (Maybe (ConnData, ConnectionMode)))] -> ([ConnId], [(ConnId, AgentErrorType)])
enabledNtfConns = foldr addEnabledConn ([], [])
Expand Down
37 changes: 29 additions & 8 deletions src/Simplex/Messaging/Agent/NtfSubSupervisor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 9 in src/Simplex/Messaging/Agent/NtfSubSupervisor.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 9 in src/Simplex/Messaging/Agent/NtfSubSupervisor.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

module Simplex.Messaging.Agent.NtfSubSupervisor
( runNtfSupervisor,
Expand All @@ -23,7 +24,7 @@
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Data.Bifunctor (first)
import Data.Either (rights)
import Data.Either (partitionEithers)
import Data.Foldable (foldr')
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
Expand Down Expand Up @@ -66,13 +67,18 @@
notifyErr e
notifyErr e = notifyInternalError' c $ "runNtfSupervisor error " <> show e

-- TODO [batch ntf] notify ERRS
partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId, AgentErrorType)], [b])
partitionErrs f xs = partitionEithers . zipWith (\x -> first (f x,)) xs
{-# INLINE partitionErrs #-}

processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM ()
processNtfCmd c (cmd, connIds) = do
logInfo $ "processNtfCmd - cmd = " <> tshow cmd
let connIds' = L.toList connIds
case cmd of
NSCCreate -> do
rqSubActions <- lift $ rights <$> withStoreBatch c (\db -> map (getQueueSub db) (L.toList connIds))
(cErrs, rqSubActions) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueueSub db) connIds')
notifyErrs c cErrs
logInfo $ "processNtfCmd, NSCCreate - length rqSubs = " <> tshow (length rqSubActions)
let (ns, rs, css, cns) = partitionQueueSubActions rqSubActions
createNewSubs ns
Expand All @@ -93,7 +99,8 @@
createNewSubs rqs = do
withTokenServer $ \ntfServer -> do
let newSubs = map (rqToNewSub ntfServer) rqs
void $ lift $ withStoreBatch c (\db -> map (storeNewSub db) newSubs)
(cErrs, _) <- lift $ partitionErrs ntfSubConnId newSubs <$> (withStoreBatch c $ \db -> map (storeNewSub db) newSubs)
notifyErrs c cErrs
kickSMPWorkers rqs
where
rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription
Expand All @@ -104,14 +111,18 @@
resetSubs rqSubs = do
withTokenServer $ \ntfServer -> do
let subsToReset = map (toResetSub ntfServer) rqSubs
lift $ void $ withStoreBatch' c (\db -> map (\sub -> supervisorUpdateNtfSub db sub (NSASMP NSASmpKey)) subsToReset)
(cErrs, _) <- lift $ partitionErrs ntfSubConnId subsToReset <$> (withStoreBatch' c $ \db -> map (storeResetSub db) subsToReset)
notifyErrs c cErrs
let rqs = map fst rqSubs
kickSMPWorkers rqs
where
toResetSub :: NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription
toResetSub ntfServer (rq, sub) =
let RcvQueue {server = smpServer} = rq
in sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
storeResetSub :: DB.Connection -> NtfSubscription -> IO ()
storeResetSub db sub = supervisorUpdateNtfSub db sub (NSASMP NSASmpKey)
ntfSubConnId NtfSubscription {connId} = connId
partitionQueueSubActions ::
[(RcvQueue, Maybe NtfSupervisorSub)] ->
( [RcvQueue], -- new subs
Expand Down Expand Up @@ -146,13 +157,19 @@
NSANtf _ -> (ns, rs, css, subNtfServer : cns)
reset = (ns, (rq, sub) : rs, css, cns)
NSCSmpDelete -> do
rqs <- lift $ rights <$> withStoreBatch c (\db -> map (fmap (first storeError) . getPrimaryRcvQueue db) (L.toList connIds))
(cErrs, rqs) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueue db) connIds')
logInfo $ "processNtfCmd, NSCSmpDelete - length rqs = " <> tshow (length rqs)
lift $ void $ withStoreBatch' c (\db -> map (\rq -> supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete)) rqs)
(cErrs', _) <- lift $ partitionErrs qConnId rqs <$> (withStoreBatch' c $ \db -> map (updateAction db) rqs)
notifyErrs c (cErrs <> cErrs')
kickSMPWorkers rqs
where
getQueue :: DB.Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
getQueue db connId = first storeError <$> getPrimaryRcvQueue db connId
updateAction :: DB.Connection -> RcvQueue -> IO ()
updateAction db rq = supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete)
NSCNtfWorker ntfServer -> lift . void $ getNtfNTFWorker True c ntfServer
NSCNtfSMPWorker smpServer -> lift . void $ getNtfSMPWorker True c smpServer
NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) (L.toList connIds)
NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) connIds'
where
kickSMPWorkers :: [RcvQueue] -> AM ()
kickSMPWorkers rqs = do
Expand Down Expand Up @@ -343,6 +360,10 @@
notifyInternalError' AgentClient {subQ} internalErrStr = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL internalErrStr)
{-# INLINE notifyInternalError' #-}

notifyErrs :: MonadIO m => AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs AgentClient {subQ} connErrs = unless (null connErrs) $ atomically $ writeTBQueue subQ ("", "", AEvt SAENone $ ERRS connErrs)
{-# INLINE notifyErrs #-}

getNtfToken :: AM' (Maybe NtfToken)
getNtfToken = do
tkn <- asks $ ntfTkn . ntfSupervisor
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ data AEvent (e :: AEntity) where
OK :: AEvent AEConn
JOINED :: SndQueueSecured -> AEvent AEConn
ERR :: AgentErrorType -> AEvent AEConn
ERRS :: [(ConnId, AgentErrorType)] -> AEvent AENone
SUSPENDED :: AEvent AENone
RFPROG :: Int64 -> Int64 -> AEvent AERcvFile
RFDONE :: FilePath -> AEvent AERcvFile
Expand Down Expand Up @@ -436,6 +437,7 @@ data AEventTag (e :: AEntity) where
OK_ :: AEventTag AEConn
JOINED_ :: AEventTag AEConn
ERR_ :: AEventTag AEConn
ERRS_ :: AEventTag AENone
SUSPENDED_ :: AEventTag AENone
-- XFTP commands and responses
RFDONE_ :: AEventTag AERcvFile
Expand Down Expand Up @@ -490,6 +492,7 @@ aEventTag = \case
OK -> OK_
JOINED _ -> JOINED_
ERR _ -> ERR_
ERRS _ -> ERRS_
SUSPENDED -> SUSPENDED_
RFPROG {} -> RFPROG_
RFDONE {} -> RFDONE_
Expand Down
Loading