Skip to content

Commit

Permalink
Schedule an arbitrary nonempty list of automata
Browse files Browse the repository at this point in the history
  • Loading branch information
turion committed Aug 13, 2024
1 parent 1e46630 commit 0676ba3
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 30 deletions.
2 changes: 2 additions & 0 deletions rhine/rhine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ library
other-modules:
FRP.Rhine.ClSF.Except.Util
FRP.Rhine.ClSF.Random.Util
FRP.Rhine.Schedule.Internal

-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
Expand All @@ -156,6 +157,7 @@ library
time >=1.8,
time-domain ^>=0.1.0.2,
transformers >=0.5,
foldable1-classes-compat ^>= 0.1,

-- Directories containing source files.
hs-source-dirs: src
Expand Down
34 changes: 4 additions & 30 deletions rhine/src/FRP/Rhine/Schedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,16 @@ module FRP.Rhine.Schedule where
import Control.Arrow
import Data.List.NonEmpty as N

-- transformers
import Control.Monad.Trans.Reader

-- monad-schedule
import Control.Monad.Schedule.Class

-- automaton
import Data.Automaton
import Data.Automaton.Recursive (getRecursive, toRecursive)
import Data.Stream
import Data.Stream.Optimized (OptimizedStreamT (..), toStreamT)
import Data.Stream.Recursive qualified as StreamRecursive
import Data.Stream.Result

-- rhine
import FRP.Rhine.Clock
import FRP.Rhine.Schedule.Internal

-- * Scheduling

Expand All @@ -48,35 +42,15 @@ scheduleList :: (Monad m, MonadSchedule m) => NonEmpty (Automaton m a b) -> Auto
scheduleList automatons0 =
Automaton $
Stateful $
StreamT
{ state = (getRecursive . toRecursive <$> automatons0, [])
, step = \(automatons, running) -> ReaderT $ \a -> do
let bsAndConts = flip (runReaderT . StreamRecursive.getRecursive) a <$> automatons
(done, running') <- schedule (N.head bsAndConts :| N.tail bsAndConts ++ running)
return $ Result (resultState <$> done, running') $ output <$> done
}
scheduleStreams' $
toStreamT . getAutomaton <$> automatons0

{- | Run two automata concurrently.
Whenever one automaton returns a value, it is returned.
This is similar to 'scheduleList', but more efficient.
-}
schedulePair :: (Monad m, MonadSchedule m) => Automaton m a b -> Automaton m a b -> Automaton m a b
schedulePair (Automaton automatonL) (Automaton automatonR) = Automaton $! Stateful $! scheduleStreams (toStreamT automatonL) (toStreamT automatonR)
where
scheduleStreams :: (Monad m, MonadSchedule m) => StreamT m b -> StreamT m b -> StreamT m b
scheduleStreams (StreamT stateL0 stepL) (StreamT stateR0 stepR) =
StreamT
{ state = (stepL stateL0, stepR stateR0)
, step
}
where
step (runningL, runningR) = do
result <- race runningL runningR
case result of
Left (Result stateL' b, runningR') -> return $ Result (stepL stateL', runningR') b
Right (runningL', Result stateR' b) -> return $ Result (runningL', stepR stateR') b
schedulePair automatonL automatonR = concatS $ fmap toList $ scheduleList $ automatonL :| [automatonR]

-- | Run two running clocks concurrently.
runningSchedule ::
Expand Down
73 changes: 73 additions & 0 deletions rhine/src/FRP/Rhine/Schedule/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{-# LANGUAGE ExistentialQuantification #-}

module FRP.Rhine.Schedule.Internal where

-- base
import Control.Arrow
import Data.Function ((&))
import Data.Functor ((<&>))
import Data.Functor.Compose (Compose (..))
import Data.Kind (Type)
import Data.List.NonEmpty as N

-- foldable1-classes-compat
import Data.Foldable1 (Foldable1 (foldrMap1))

-- sop-core
import Data.SOP (HCollapse (hcollapse), HSequence (htraverse'), I (..), K (K), NP (..), NS (..), SListI, apInjs_NP, hliftA, hzipWith, unI)

-- monad-schedule
import Control.Monad.Schedule.Class

-- automaton
import Data.Stream hiding (concatS)
import Data.Stream.Result

newtype Step m b state = Step {getStep :: ResultStateT state m b}

newtype RunningResult b state = RunningResult {getRunningResult :: Result state b}

apInjs_NPNonEmpty :: (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))

Check warning on line 30 in rhine/src/FRP/Rhine/Schedule/Internal.hs

View workflow job for this annotation

GitHub Actions / Run hlint

Suggestion in apInjs_NPNonEmpty in module FRP.Rhine.Schedule.Internal: Use camelCase ▫︎ Found: "apInjs_NPNonEmpty ::\n (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))" ▫︎ Perhaps: "apInjsNPNonEmpty ::\n (SListI xs) => NP f (x ': xs) -> NonEmpty (NS f (x ': xs))"
apInjs_NPNonEmpty (fx :* fxs) = Z fx :| (S <$> apInjs_NP fxs)

Check warning on line 31 in rhine/src/FRP/Rhine/Schedule/Internal.hs

View workflow job for this annotation

GitHub Actions / Run hlint

Suggestion in apInjs_NPNonEmpty in module FRP.Rhine.Schedule.Internal: Use camelCase ▫︎ Found: "apInjs_NPNonEmpty (fx :* fxs) = ..." ▫︎ Perhaps: "apInjsNPNonEmpty (fx :* fxs) = ..."

data Streams m b = forall state (states :: [Type]).
(SListI states) =>
Streams
{ states :: NP I (state ': states)
, steps :: NP (Step m b) (state ': states)
}

buildStreams :: StreamT m b -> Streams m b
buildStreams StreamT {state, step} =
Streams
{ states = I state :* Nil
, steps = Step (ResultStateT step) :* Nil
}

consStreams :: StreamT m b -> Streams m b -> Streams m b
consStreams StreamT {state, step} Streams {states, steps} =
Streams
{ states = I state :* states
, steps = Step (ResultStateT step) :* steps
}

scheduleStreams :: (MonadSchedule m, Functor m, Applicative m) => Streams m b -> StreamT m (NonEmpty b)
scheduleStreams Streams {states, steps} =
StreamT
{ state = (apInjs_NPNonEmpty states, [])
, step = \(restingStates, runningStates) ->
fmap (htraverse' getCompose . hzipWith (\Step {getStep} -> Compose . fmap RunningResult . getResultStateT getStep . unI) steps) restingStates
& flip appendList runningStates
& schedule
& fmap
( \(finished, running) ->
let finishedStates = fmap (hliftA (I . resultState . getRunningResult)) finished
outputs =
finished
<&> (hliftA (getRunningResult >>> output >>> K) >>> hcollapse)
in Result (finishedStates, running) outputs
)
}

scheduleStreams' :: (MonadSchedule m, Applicative m) => NonEmpty (StreamT m b) -> StreamT m (NonEmpty b)
scheduleStreams' = scheduleStreams . foldrMap1 buildStreams consStreams

0 comments on commit 0676ba3

Please sign in to comment.