title: Building pipes with monad transformers
subtitle: where Pipe = ProducerT . ConsumerT
tags: haskell, pipes
sourcelink: This post is literate Haskell, click here to download the source code.
date: 2012-06-03 01:32 CEST
In this post I show another way to implement pipes, by combining a producer and consumer monad transformer. This implementation is for educational and entertainment purposes only: you probably shouldn't try to use it in production software. To quote Donald Knuth: I have only proved it correct, not tried it.
One obvious thing that is missing is finalization, but that could be added by passing along a finalizer with each call to @yield@, as described by Gabriel Gonzalez.
> -- IGNORE: boring stuff
> -- For once, this post is light on extensions
>
> -- IGNORE: boring stuff like imports
> {-# LANGUAGE PolymorphicComponents #-} -- like RankNTypes, but only inside data/newtype
> {-# LANGUAGE NoMonomorphismRestriction #-} -- Why isn't this standard yet?
>
> import Control.Monad
> import Control.Monad.Trans
> import Control.Monad.Trans.State
> import Control.Monad.Trans.Identity
> import Control.Monad.Trans.Either -- from EitherT package, only needed for test
-- Producers --
Let's start with producers. A producer can produce a stream of values of type @o@, and then ends with a value of type @a@. At each step, it performs a monad action.
> data ProducerT' o m a = Done a | More o (ProducerT o m a)
> newtype ProducerT o m a = ProducerT { runProducerT :: m (ProducerT' o m a) }
This monad transformer is similar to the ListT-done-right monad transformer. The difference is that the producer has a value at the end, while @ListT@ ends with an empty list. More importantly, @ListT@ is a monad over the list items, while @ProducerT@ is a monad over the end value. It can't be a monad over the stream values, because then @return@ would have to conjure a value of type @a@ out of nowhere.
The @Monad@ and @MonadTrans@ instances are straightforward:
> instance Monad m => Monad (ProducerT o m) where
> return = ProducerT . return . Done
> a >>= b = ProducerT $ runProducerT a >>= bind'
> where
> bind' (Done x) = runProducerT (b x)
> bind' (More o k) = return (More o (k >>= b))
>
> instance MonadTrans (ProducerT o) where
> lift = ProducerT . liftM Done
The point of producers is that they can produce values. So, let's make a function for that
> yield :: Monad m => o -> ProducerT o m ()
> yield x = ProducerT $ return $ More x (return ())
Given a producer, we can try to extract the first value. This succeeds if the stream is not empty, otherwise it returns the end value. In both cases we also return a the remaining producer:
> headProducerT :: Monad m => ProducerT o m a -> m (Either a o, ProducerT o m a)
> headProducerT = liftM step . runProducerT
> where
> step (Done x) = (Left x, return x)
> step (More o k) = (Right o, k)
This head function will form the building block for building consumers. If you look at the function's type, you might notice that it is very similar to that of a state monad. We could imagine a consumer as something that keeps track of the input producer, and repeatedly takes the head of it. So, a first idea might be
> type ConsumerT' i t m = StateT (ProducerT i m t) m
> await' :: Monad m => ConsumerT' i t m (Either t i)
> await' = StateT headProducerT
This seems to work. We can compose a producer and a consumer very easily with @evalStateT@:
> compose__pc :: Monad m => ProducerT i m t -> ConsumerT' i t m a -> m a
> compose__pc = flip evalStateT
In terms of pipes, we have composed a pipe with no input together with a pipe that produces no output, to give a 'pipe' with neither input nor output.
-- Pipes --
A general pipe is a computation that is both a producer and a consumer. There are two obvious ways of building one: with the consumer on the outside, or with the producer on the outside.
> type Pipe__CP i a o m b = ConsumerT' i a (ProducerT o m) b
> type Pipe__PC i a o m b = ProducerT o (ConsumerT' i a m) b
These types are ''not'' the same. @Pipe__CP@ first consumes a whole bunch of input, and then produces a whole bunch of output. In particular, it is impossible to stop early. In @Pipe__PC@, the operations are interleaved; before each output there can be more consuming. This second formulation is therefore the one that we want.
Before doing fully general composition, let's first compose a producer with a pipe,
> compose__p :: Monad m => ProducerT b m s -> Pipe__PC b s c m t -> ProducerT c m t
Remember that a value @p__1@ of type @Pipe__PC@ can look something like this:
> p__1 = ProducerT $ StateT $ \s__1 -> act >> return (More o__1 p__2, s__2)
> -- HIDDEN: stuff for example
> data A = A
> data B = B
> type M = []
> data I = I
> data O = O
> o__1 = O;
> p__1 :: Pipe__PC I A O M B
> p__2 = p__1
> s__2 = return A
> act = return ()
As before, the upstream producer is the state for the downstream consumer.
So, we can fill in the upstream producer for @s__1@. Once we do so, we get access to @s__2@, which should be filled in into @p__2@, etc.
In this way we turn the pipe from a @ProducerT o (StateT .. m) a@ into a @ProducerT o m a@. So more generally, we change the base monad of a monad transformer.
> class MonadTransRebase t where
> rebase :: NestTrans m n -> t m a -> t n a
>
> instance MonadTransRebase (ProducerT o) where
> rebase f = ProducerT . runNestTrans f rebase' . runProducerT
> where
> rebase' f' (Done d) = Done d
> rebase' f' (More x k) = More x (rebase f' k)
The type @NestTrans@ is a function from @m a@ to @n b@, where the transformation inside the monadic values can use a different @NestTrans@. Hence the 'nested' part of the name.
> newtype NestTrans m n = NestTrans
> { runNestTrans :: forall a b. (NestTrans m n -> a -> b) -> (m a -> n b) }
As said above, given the initial state, we can pass this state through the transfomer. Then the new state is used for nested @StateT@ computations.
> nestTransStateT :: Monad m => s -> NestTrans (StateT s m) m
> nestTransStateT s = NestTrans $ \f m ->
> liftM (\(a,s') -> f (nestTransStateT s') a) (runStateT m s)
This is all we need to define the composition:
> compose__p u v = rebase (nestTransStateT u) v
Note that it is possible to write all this without the @NestTrans@ newtype, but to do so generically requires rank 3 types (the first time that I have ever needed those). I leave that solution as an exercise to the reader.
-- Consumers, take 2 --
Now let's also try to do this the other way around, and compose a pipe with a consumer,
]> -- BLOCK: haskell
]> compose__c :: Monad m
]> => Pipe__PC a r b m s -> ConsumerT' b s m t -> ConsumerT' a r m t
But immediately we hit a problem. The downstream consumer expects its state to be of type @ProducerT b m s@, but the upstream has type @ProducerT b (ConsumerT .. m) s@. This is still a producer, but over a different base monad. In fact, the upstream producer's base monad is of the form @(t m)@, where @t@ is another monad transformer.
We can't just get rid of the @ConsumerT@, like we did on the downstream side, because we still need to be able to pass in the state later on.
The solution is to make the state type more general, and allow it to be @ProducerT@ over any transformation of a given base monad. Effectively we replace the state type @s m a@ by @forall t. s (t m) a@. This gives us the ''transformed state monad'':
> data TStateT s a m b = TStateT
> { runTStateT :: forall t. (MonadTrans t, Monad (t m))
> => s (t m) a -> t m (b, s (t m) a) }
Note that @s@ is not a state ''type'', but a state ''monad transformer''.
The instances are straightforward, and look identical to the instances for @StateT@, with the exception of an extra @lift@ in the @MonadTrans@ instance.
> instance Monad (TStateT s t m) where
> return a = TStateT $ \s -> return (a, s)
> m >>= k = TStateT $ \s -> do
> (a,s') <- runTStateT m s
> runTStateT (k a) s'
>
> instance MonadTrans (TStateT s t) where
> lift mx = TStateT $ \s -> lift $ liftM (\x -> (x,s)) mx
The new consumer type is just a @TState@ with @ProducerT@ as the state:
> type ConsumerT i = TStateT (ProducerT i)
> type GPipe i o m = o (i m)
> type Pipe i a o m = GPipe (ConsumerT i a) (ProducerT o) m
Awaiting looks much like before,
> await :: Monad m => Pipe i t o m (Either t i)
> await = lift $ TStateT headProducerT
All we need to do now to define composition is to make a @NestTrans@ for @TStateT@.
The function to do this is essentially the same as @nestTransStateT@ above:
> nestTransTStateT :: (Monad (t m), MonadTrans t)
> => s (t m) a -> NestTrans (TStateT s a m) (t m)
> nestTransTStateT s = NestTrans $ \f m ->
> liftM (\(a,s') -> f (nestTransTStateT s') a) (runTStateT m s)
and by magic, we get composition:
]> -- BLOCK: haskell
]> compose :: Monad m => Pipe a r b m s -> Pipe b s c m t -> Pipe a r c m t
]> compose = rebase . nestTransTStateT
-- General consumers and producers --
There is nothing specific to @ConsumerT@ or @ProducerT@ in the composition function.
All we require is that the 'consumer' on the left is a monad transformer, and that 'producer' on the right can be rebased. This leads to the more general type of compose:
> compose :: (MonadTransRebase t, MonadTrans r, Monad (r m))
> => GPipe r s m a -> GPipe (TStateT s a) t m b -> GPipe r t m b
> -- HIDDEN: already shown above, but with a more restricted type signature
> compose = rebase . nestTransTStateT
There are some interesting choices for @r@, @s@ and @t@ here.
By picking @r = IdentityT@, we get an upstream 'pipe' with no input, i.e. a producer.
By picking @t = IdentityT@, we get a downstream 'pipe' with no output, i.e. a consumer.
Finally, the transformer @s@ determines what information is based between the two pipes.
By using @ProducerT o@ you get a stream of @o@s followed by an @a@ at the end.
If you use @ListT@, there is a stream of @a@s with no value at the end.
If you use @IdentityT@, just a single value is passed, so you get function composition.
If you use @InfiniteListT@ you get a producer that guarantees that it gives an infinite stream of values.
And I believe it should also be possible to define more complex protocols, such as "first give 10 values of type @a@, then an unlimited number of @b@, and end with a @c@".
However, you do need a different @await@ function for all of these.
To close things off, here are the producers and consumers based on @IdentityT@.
> instance MonadTransRebase IdentityT where
> rebase f = IdentityT . runNestTrans f (const id) . runIdentityT
>
> type ProducerPipe o m = GPipe IdentityT (ProducerT o) m
> type ConsumerPipe i a m = GPipe (ConsumerT i a) IdentityT m
> type Pipeline m = GPipe IdentityT IdentityT m
>
> runPipeline :: Pipeline m a -> m a
> runPipeline = runIdentityT . runIdentityT
> -- HIDDEN
> -- Here is a test, stolen from the pipes-2.0 documentation
>
> -- use EitherT to get pipes style abort semantics
> type LPipe i o m a = Pipe i a o m a
> type LConsumer i m a = ConsumerPipe i a m a
> liftL = lift . lift . lift
> awaitL = EitherT gawait
> yieldL = lift . yield
> runL = liftM (either id id) . runEitherT
>
> -- the type signature above is too restrictive, just like compose's it should be generalized
> gawait = lift $ TStateT headProducerT
>
> take' :: Int -> LPipe a a IO ()
> take' n = runL $ do
> replicateM_ n $ do
> x <- awaitL
> yieldL x
> liftL $ putStrLn "You shall not pass!"
>
> fromList :: (Monad m) => [b] -> ProducerPipe b m ()
> fromList = runL . mapM_ yieldL
>
> printer :: (Show b) => LConsumer b IO r
> printer = runL $ forever $ do
> x <- awaitL
> liftL $ print x
>
> (<+<) = flip compose
>
> pipeline :: Pipeline IO ()
> pipeline = printer <+< take' 3 <+< fromList [1..]
> test1 = runPipeline pipeline
> test2 = runPipeline $ printer <+< (fromList [1..3] >> fromList [10..12])
> print' n = printer <+< take' n
> test3 = runPipeline $ (print' 3 >> print' 4) <+< fromList [1..]
> test4 = runPipeline $ printer <+< (take' 3 >> take' 4) <+< fromList [1..]