Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend automaton #381

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2cc253e
Fix naming in mapOutput and toRecursive
turion Jun 10, 2024
b784bd9
Add stepInstant
turion Jun 26, 2024
d698740
Fix naming in runStreamExcept
turion Dec 30, 2024
343264c
Add Foldable and Traversable instances
turion Dec 30, 2024
7d32c7d
Add mmap
turion Jun 26, 2024
c153f86
Move toRecursive and fromRecursive
turion Jun 25, 2024
2549fba
Simplify type signature of handleAutomaton
turion Dec 30, 2024
b0fb98d
parallely and applying
turion Jun 25, 2024
23b4c06
Avoid mtl dependency where possible
turion Jun 26, 2024
8f5ec75
Add arr'
turion Dec 30, 2024
88346f0
Relax constraints for Strong and Choice
turion Dec 30, 2024
85ada3a
Add Filterable, Witherable instances and catMaybeS
turion Dec 30, 2024
93345c7
WIP FilterAutomaton
turion Dec 30, 2024
857b381
FIXUP (should drop itself) fix some warnings
turion Jun 26, 2024
96fc2c6
Add unzipResult
turion Jun 26, 2024
48ea06c
Fix import
turion Jun 26, 2024
9167495
Add hoist' variants without monad constraint
turion Dec 28, 2024
0820f0c
Fix import
Sep 26, 2024
a51f71a
Fix naming & hlint
turion Dec 30, 2024
b0a7f18
Fix haddock
turion Sep 28, 2024
014bd22
WIP Handling composed functors, runListS
turion Dec 30, 2024
275c41e
WIP foldStream functions
turion Dec 29, 2024
2db7198
WIP Apply some hints
turion Dec 29, 2024
8e3f607
Fix haddock
turion Dec 30, 2024
5d649de
Add mapException
turion Dec 30, 2024
345aee1
Implement mtl instances for StreamExcept
turion Dec 30, 2024
a0d9afe
Implement mtl instances for AutomatonExcept
turion Dec 30, 2024
c0e62db
Haddock toRecursive and runStreamExcept
turion Dec 30, 2024
3d4eb18
Derive Monoid for automata
turion Dec 30, 2024
e5d0fba
Document Semialign instance
turion Dec 30, 2024
71111a0
Document concatS
turion Dec 30, 2024
d350ac8
WIP split
turion Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions automaton/automaton.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ common opts
simple-affine-space ^>=0.2,
these >=1.1 && <=1.3,
transformers >=0.5,
witherable ^>=0.4,
mtl ^>= 2.3,

if flag(dev)
ghc-options: -Werror
Expand All @@ -47,12 +49,18 @@ common opts
default-extensions:
Arrows
DataKinds
DeriveFunctor
DerivingVia
FlexibleContexts
FlexibleInstances
GADTs
ImportQualifiedPost
LambdaCase
MultiParamTypeClasses
NamedFieldPuns
NoStarIsType
RankNTypes
StandaloneDeriving
TupleSections
TypeApplications
TypeFamilies
Expand All @@ -64,6 +72,7 @@ library
import: opts
exposed-modules:
Data.Automaton
Data.Automaton.Filter
Data.Automaton.Recursive
Data.Automaton.Trans.Accum
Data.Automaton.Trans.Except
Expand All @@ -73,6 +82,7 @@ library
Data.Automaton.Trans.Reader
Data.Automaton.Trans.State
Data.Automaton.Trans.Writer
Data.Automaton.Traversing
Data.Stream
Data.Stream.Except
Data.Stream.Internal
Expand All @@ -95,6 +105,7 @@ test-suite automaton-test
Automaton
Automaton.Except
Automaton.Trans.Accum
Automaton.Traversing
Stream

build-depends:
Expand All @@ -103,6 +114,7 @@ test-suite automaton-test
tasty >=1.4 && <1.6,
tasty-hunit ^>=0.10,
tasty-quickcheck >=0.10 && <0.12,
containers >=0.5,

executable UserSawtooth
import: opts
Expand Down
214 changes: 159 additions & 55 deletions automaton/src/Data/Automaton.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE UndecidableInstances #-}

Expand All @@ -20,7 +19,7 @@ import Data.Function ((&))
import Data.Functor ((<&>))
import Data.Functor.Compose (Compose (..))
import Data.Maybe (fromMaybe)
import Data.Monoid (Last (..), Sum (..))
import Data.Monoid (Ap (..), Last (..), Sum (..))
import Prelude hiding (id, (.))

-- mmorph
Expand All @@ -44,11 +43,18 @@ import Data.VectorSpace (VectorSpace (..))
-- align
import Data.Semialign (Align (..), Semialign (..))

-- these
import Data.These (these)

-- witherable
import Witherable (Filterable (..))

-- automaton
import Data.Stream (StreamT (..), fixStream)
import Data.Stream (StreamT (..), hoist', runTraversableS, snapshotCompose)
import Data.Stream.Internal (JointState (..))
import Data.Stream.Optimized (
OptimizedStreamT (..),
catMaybeS,
concatS,
stepOptimizedStream,
)
Expand Down Expand Up @@ -80,8 +86,8 @@ automaton2 :: Automaton m b c
sequentially :: Automaton m a c
sequentially = automaton1 >>> automaton2

parallely :: Automaton m (a, b) (b, c)
parallely = automaton1 *** automaton2
inParallel :: Automaton m (a, b) (b, c)
inParallel = automaton1 *** automaton2
@
In sequential composition, the output of the first automaton is passed as input to the second one.
In parallel composition, both automata receive input simulataneously and process it independently.
Expand Down Expand Up @@ -179,19 +185,7 @@ instance (Monad m) => Arrow (Automaton m) where
arr f = Automaton $! Stateless $! asks f
{-# INLINE arr #-}

first (Automaton (Stateful StreamT {state, step})) =
Automaton $!
Stateful $!
StreamT
{ state
, step = \s ->
ReaderT
( \(b, d) ->
fmap (,d)
<$> runReaderT (step s) b
)
}
first (Automaton (Stateless m)) = Automaton $ Stateless $ ReaderT $ \(b, d) -> (,d) <$> runReaderT m b
first = first'
{-# INLINE first #-}

instance (Monad m) => ArrowChoice (Automaton m) where
Expand Down Expand Up @@ -237,24 +231,10 @@ instance (Monad m) => ArrowChoice (Automaton m) where
(runReaderT . fmap Right $ mR)
{-# INLINE (+++) #-}

left (Automaton (Stateful (StreamT {state, step}))) =
Automaton $!
Stateful $!
StreamT
{ state
, step = \s -> ReaderT $ either (fmap (fmap Left) . runReaderT (step s)) (pure . Result s . Right)
}
left (Automaton (Stateless ma)) = Automaton $! Stateless $! ReaderT $! either (fmap Left . runReaderT ma) (pure . Right)
left = left'
{-# INLINE left #-}

right (Automaton (Stateful (StreamT {state, step}))) =
Automaton $!
Stateful $!
StreamT
{ state
, step = \s -> ReaderT $ either (pure . Result s . Left) (fmap (fmap Right) . runReaderT (step s))
}
right (Automaton (Stateless ma)) = Automaton $! Stateless $! ReaderT $! either (pure . Left) (fmap Right . runReaderT ma)
right = right'
{-# INLINE right #-}

f ||| g = f +++ g >>> arr untag
Expand All @@ -263,6 +243,10 @@ instance (Monad m) => ArrowChoice (Automaton m) where
untag (Right y) = y
{-# INLINE (|||) #-}

-- | Like 'arr', but requires only 'Applicative'
arr' :: (Applicative m) => (a -> b) -> Automaton m a b
arr' f = Automaton $! Stateless $! ReaderT $ pure . f

-- | Caution, this can make your program hang. Try to use 'feedback' or 'unfold' where possible, or combine 'loop' with 'delay'.
instance (MonadFix m) => ArrowLoop (Automaton m) where
loop (Automaton (Stateless ma)) = Automaton $! Stateless $! ReaderT (\b -> fst <$> mfix ((. snd) $ ($ b) $ curry $ runReaderT ma))
Expand All @@ -281,6 +265,12 @@ instance (Monad m, Alternative m) => ArrowZero (Automaton m) where
instance (Monad m, Alternative m) => ArrowPlus (Automaton m) where
(<+>) = (<|>)

-- instance Semigroup w => Semigroup (Automaton m a w) where
-- instance Monoid w => Monoid (Automaton m a w) where

deriving via Ap (Automaton m a) w instance (Applicative m, Semigroup w) => Semigroup (Automaton m a w)
deriving via Ap (Automaton m a) w instance (Applicative m, Monoid w) => Monoid (Automaton m a w)

-- | Consume an input and produce output effectfully, without keeping internal state
arrM :: (Functor m) => (a -> m b) -> Automaton m a b
arrM f = Automaton $! StreamOptimized.constM $! ReaderT f
Expand Down Expand Up @@ -385,18 +375,47 @@ withAutomaton :: (Functor m1, Functor m2) => (forall s. (a1 -> m1 (Result s b1))
withAutomaton f = Automaton . StreamOptimized.mapOptimizedStreamT (ReaderT . f . runReaderT) . getAutomaton
{-# INLINE withAutomaton #-}

instance (Monad m) => Profunctor (Automaton m) where
dimap f g Automaton {getAutomaton} = Automaton $ g <$> hoist (withReaderT f) getAutomaton
lmap f Automaton {getAutomaton} = Automaton $ hoist (withReaderT f) getAutomaton
instance (Functor m) => Profunctor (Automaton m) where
dimap f g Automaton {getAutomaton} = Automaton $ g <$> StreamOptimized.hoist' (withReaderT f) getAutomaton
lmap f Automaton {getAutomaton} = Automaton $ StreamOptimized.hoist' (withReaderT f) getAutomaton
rmap = fmap

instance (Monad m) => Choice (Automaton m) where
right' = right
left' = left
instance (Applicative m) => Choice (Automaton m) where
right' (Automaton (Stateful (StreamT {state, step}))) =
Automaton $!
Stateful $!
StreamT
{ state
, step = \s -> ReaderT $ either (pure . Result s . Left) (fmap (fmap Right) . runReaderT (step s))
}
right' (Automaton (Stateless ma)) = Automaton $! Stateless $! ReaderT $! either (pure . Left) (fmap Right . runReaderT ma)
{-# INLINE right' #-}

instance (Monad m) => Strong (Automaton m) where
second' = second
first' = first
left' (Automaton (Stateful (StreamT {state, step}))) =
Automaton $!
Stateful $!
StreamT
{ state
, step = \s -> ReaderT $ either (fmap (fmap Left) . runReaderT (step s)) (pure . Result s . Right)
}
left' (Automaton (Stateless ma)) = Automaton $! Stateless $! ReaderT $! either (fmap Left . runReaderT ma) (pure . Right)
{-# INLINE left' #-}

instance (Applicative m) => Strong (Automaton m) where
first' (Automaton (Stateful StreamT {state, step})) =
Automaton $!
Stateful $!
StreamT
{ state
, step = \s ->
ReaderT
( \(b, d) ->
fmap (,d)
<$> runReaderT (step s) b
)
}
first' (Automaton (Stateless m)) = Automaton $ Stateless $ ReaderT $ \(b, d) -> (,d) <$> runReaderT m b
{-# INLINE first' #-}

-- | Step an automaton several steps at once, depending on how long the input is.
instance (Monad m) => Traversing (Automaton m) where
Expand Down Expand Up @@ -432,31 +451,110 @@ traverseS = traverse'
traverseS_ :: (Monad m, Traversable f) => Automaton m a b -> Automaton m (f a) ()
traverseS_ automaton = traverse' automaton >>> arr (const ())

{- | Launch arbitrarily many copies of the automaton in parallel.
-- FIXME It's also conceivable to have Automaton (Compose m t) a b -> Automaton m a (t b)
-- TODO But should we use parallelism?
-- https://hackage.haskell.org/package/parallel-3.1.0.1/docs/Control-Parallel-Strategies.html#v:parTraversable

{- | Launch arbitrarily many copies of the automaton in parallel, according to the shape of the input data.

* The copies of the automaton are launched on demand as the shape of the input grows.
* The automaton copy at a certain position will always receive the input at that position (if it is supplied).
* If the input data is smaller than the automaton copies, the uncovered automata will not be stepped.

* The copies of the automaton are launched on demand as the input lists grow.
* The n-th copy will always receive the n-th input.
* If the input list has length n, the n+1-th automaton copy will not be stepped.
The behaviour for some typical example types:

Caution: Uses memory of the order of the largest list that was ever input during runtime.
* Lists: The copies of the automaton are launched on demand as the input lists grow
The n-th copy will always receive the n-th input.
If the input list has length n, the n+1-th automaton copy will not be stepped.
* 'Maybe': As soon as a 'Just' is received, an automaton is started. It is stepped only when more 'Just' values arrive.
* 'Map': Whenever an input for a new key arrives, a new automaton is started.

Caution: Uses memory of the order of the largest shape that was ever input during runtime.

Note: "in parallel" refers purely the data model, it does not mean that multiple cores are used for the computations.
-}
parallely :: (Applicative m) => Automaton m a b -> Automaton m [a] [b]
parallely :: (Applicative m, Traversable t, Align t, Filterable t) => Automaton m a b -> Automaton m (t a) (t b)
parallely Automaton {getAutomaton = Stateful stream} = Automaton $ Stateful $ parallely' stream
where
parallely' :: (Applicative m) => StreamT (ReaderT a m) b -> StreamT (ReaderT [a] m) [b]
parallely' StreamT {state, step} = fixStream (JointState state) $ \fixstep jointState@(JointState s fixstate) -> ReaderT $ \case
[] -> pure $! Result jointState []
(a : as) -> apResult . fmap (:) <$> runReaderT (step s) a <*> runReaderT (fixstep fixstate) as
parallely' :: (Applicative m, Traversable t, Align t, Filterable t) => StreamT (ReaderT a m) b -> StreamT (ReaderT (t a) m) (t b)
parallely' StreamT {state, step} =
StreamT
{ state = nil
, step = \s -> ReaderT $ \as ->
-- Analyse at which positions there is state or input
align s as
& traverse
( these
-- There is state at this position, but no input, don't do anything
(\s -> pure $ Result s Nothing)
-- There is no state yet at this position, but input. Perform the step, initialising with the original initial state
(fmap (fmap Just) . runReaderT (step state))
-- There is already state, and there is input. Perform the step normally
(\s a -> fmap Just <$> runReaderT (step s) a)
)
<&> ( \sas ->
Result
-- Keep all the resulting states
(resultState <$> sas)
-- Wither the output shape by removing all positions where no step has been performed
(Witherable.mapMaybe output sas)
)
}
parallely Automaton {getAutomaton = Stateless f} = Automaton $ Stateless $ ReaderT $ traverse $ runReaderT f
{-# INLINE parallely #-}

{- | Run multiple copies of the same 'Automaton', applying new input shapes to an accumulated one.

* The state is initialized as 'pure'
* As more input in an @f@ shape arrives, it is applied as an effect in the state using the 'Applicative' instance of @f@

Caution: The state grows depending on how @'Applicative' f@ is implemented.
For example, for lists the size of the state is proportional to the /product/ of all inputs that have arrived.
I.e. it grows exponentially for constantly bigger-than-1 sized lists, and drops to 0 once an empty list is added.

The behaviour for some typical example types:

* Lists: The input lists are interpreted as nondeterministic choices, and for every possible combination of choices, one automaton is run, and all output lists concatenated.
* 'Maybe': The automaton is stepped normally on 'Just' values, and stopped on 'Nothing', never outputting any other value than 'Nothing'.
* 'Either': Like 'Maybe', but with an exception value.
* 'ZipList': The output is the size of the /smallest/ list ever input, and the state is shrunk every time the input is smaller than before.
-}

-- FIXME unit test all of these
applying :: (Applicative m, Traversable f, Applicative f) => Automaton m a b -> Automaton m (f a) (f b)
applying = handleAutomaton applying'
where
applying' :: (Applicative m, Traversable f, Applicative f) => StreamT (ReaderT a m) b -> StreamT (ReaderT (f a) m) (f b)
applying' StreamT {state, step} =
StreamT
{ state = pure state
, step = \s -> ReaderT $ \as ->
(runReaderT . step <$> s <*> as)
& sequenceA
& fmap unzipResult
}
{-# INLINE applying #-}

-- | Given a transformation of streams, apply it to an automaton, without changing the input.
handleAutomaton_ :: (Monad m) => (forall m. (Monad m) => StreamT m a -> StreamT m b) -> Automaton m i a -> Automaton m i b
handleAutomaton_ f = Automaton . StreamOptimized.withOptimized f . getAutomaton

-- | Given a transformation of streams, apply it to an automaton. The input can be accessed through the 'ReaderT' effect.
handleAutomaton :: (Monad m) => (StreamT (ReaderT a m) b -> StreamT (ReaderT c n) d) -> Automaton m a b -> Automaton n c d
{- | Given a transformation of streams, apply it to an automaton. The input can be accessed through the 'ReaderT' effect.

In contrast to 'handleAutomaton_', the functor type can change.
-}
handleAutomaton :: (Functor m) => (StreamT (ReaderT a m) b -> StreamT (ReaderT c n) d) -> Automaton m a b -> Automaton n c d
handleAutomaton f = Automaton . StreamOptimized.handleOptimized f . getAutomaton

{- | Drop 'Nothing' values from the output, retrying an input value until the automaton outputs a 'Just'.

See 'Data.Stream.catMaybeS'.

Caution: If @automaton@ outputs 'Nothing' forever, then @'catMaybeS' automaton@ will loop and never produce output.
-}
catMaybeS :: (Monad m) => Automaton m a (Maybe b) -> Automaton m a b
catMaybeS = Automaton . Data.Stream.Optimized.catMaybeS . getAutomaton

{- | Buffer the output of an automaton. See 'Data.Stream.concatS'.

The input for the automaton is not buffered.
Expand All @@ -466,6 +564,12 @@ then the next 9 inputs will be ignored.
concatS :: (Monad m) => Automaton m a [b] -> Automaton m a b
concatS (Automaton automaton) = Automaton $ Data.Stream.Optimized.concatS automaton

runTraversableS :: (Monad m, Traversable t, Monad t) => Automaton (Compose m t) a b -> Automaton m a (t b)
runTraversableS = handleAutomaton $ Data.Stream.runTraversableS . Data.Stream.hoist' (Compose . ReaderT . fmap getCompose . runReaderT)

snapshot :: Functor m => Automaton m a b -> Automaton m a (m b)
snapshot = handleAutomaton $ hoist' (ReaderT . getCompose) . Data.Stream.snapshotCompose . hoist' (Compose . runReaderT)

-- * Examples

-- | Pass through a value unchanged, and perform a side effect depending on it
Expand Down
Loading
Loading