Conduits vs. Pipes

Published: 2012-03-24T14:31Z
Tags: haskell, pipes
License: CC-BY

Michael Snoyman released conduit-0.3 this week. The conduit package provides three datatypes that can be chained together: Source, Counduit and Sink. If you were to look at the source code, you will notice that there is a lot of overlap between these datatypes. In this post I'll show how these types can be combined into a single one, which is the idea used by the pipes package.

Compare:

data Sink i m o =
    Processing (i -> Sink i m o) (SinkClose m o)
  | Done (Maybe i) o
  | SinkM (m (Sink i m o))
type SinkClose m o = m o
data Conduit i m o =
    NeedInput (i -> Conduit i m o) (ConduitClose m o)
  | HaveOutput (Conduit i m o) (m ()) o
  | Finished (Maybe i)
  | ConduitM (m (Conduit i m o)) (m ())
type ConduitClose m o = Source m o

The differences between the two types are that:

The term output is in fact used differently by the two types, it becomes clearer when we say that Sink has a result of type r. Then the result of Conduit is r = (). On the other hand, a sink doesn't produce output to downstream conduits, so its output type would be Void.

Now let's also bring in Source,

data Source m a =
    Open (Source m a) (m ()) a
  | Closed
  | SourceM (m (Source m a)) (m ())

The SourceM constructor is exactly analogous to ConduitM, and Open is analogous to HaveOutput. A Source doesn't have input, so there is no analogue to NeedInput or Processing. The Closed constructor doesn't provide remaining input or result, since a source doesn't have either. However, we could say that its input is i = (), and its result is r = ().

It then becomes possible to unify the three datatypes into:

data Pipe m i o r =
    NeedInput (i -> Pipe m i o r) (Pipe m () o r)
  | HaveOutput (Pipe m i o r) (m ()) o
  | Finished (Maybe i) r
  | PipeM (m (Pipe m i o r)) (m r)
type Source m o = Pipe m () o () type Conduit i m o = Pipe m i o () type Sink i m r = Pipe m i Void r

This is almost exactly the type provided by the various incarnations of the pipes package!

The three composition operators of conduits become a single operator on pipes. The top level "run" operation takes a Pipe m () Void r, that is, a (composition of) pipes that takes no input and has no output.

What about the instances for Source, Conduit and Sink? In the conduit package Sink is an instance of Monad and its superclasses. That is also the case for Pipe. Source and Conduit are instances of Functor, which allows you to map a function over the output. The output is no longer the last type variable of Pipe. Instead we should provide an instance of Functor2 or Bifunctor, which have a method fmap2 :: (a -> b) -> f a r -> f b r.

Overall, reducing the number of datatypes from 3 to 1 sounds like a pretty good deal to me. I therefore think it would be great if conduit adopted the ideas from pipes.

Comments

Heinrich ApfelmusDate: 2012-03-24T20:45Zx

The actual bijections between the types can be found here: https://gist.github.com/2187593

JonathanDate: 2012-03-24T23:23Zx

Wouldn't the Source type really be Pipe m Void o (), not Pipe m () o (), as this makes it impossible to close with anything but Nothing for the remaining input and prevents you from sensibly using NeedInput?

Twan van LaarhovenDate: 2012-03-24T23:27Zx

Consider the point of someone writing a pipe. For output, the use of Void is clear. It means that you can not use output, because to do output you would have to come up with a value of type Void. However, input of type Void would mean that you can receive values of type Void, and once you get one you can use it to blow up the program. Of course you are never going to get one, but that is not clear from the types.

It's like the difference between false and false -> x.

Gabriel GonzalezDate: 2012-03-24T23:35Zx

No, it's not correct to make the Source have an input type of Void. This was a mistake I made in the initial release of pipes, but I just fixed it very recently. Here's the problem with it: There is absolutely no way to prevent a pipe from awaiting. Using pipes as an example, even with a pipe type of Pipe Void b m r, you can still satisfy that with await. In fact, if you examine the type of forever $ await, it is Pipe a b m r, which means that it's essentially a pipe that inhabits all pipes, including ones that have an input type of Void.

You can also show this algebraically, by looking at the argument of the Await constructor, which is a -> Pipe a b mr. The only way to forbid this constructor would be to zero it, but algebraically it translates to (Pipe a b m r)^a. Unfortunately, there is no natural number you can choose for a that zeros that expression.

Fortunately, though, there is no need to forbid the await constructor at all. Instead, you set the input type to () to ensure that it is trivially satisfiable. Then in any context where you need a pipe that does not await, you request that the pipe has an input type of (), and then supply it upstream with the following pipe:

p :: Producer b m r
produce = forever $ yield ()
p <+< produce :: Pipe a b

This is the correct way to guarantee that the result does not await. Conceptually, this is what runPipe now does when you run a pipeline. Instead of forbidding awaits, it just supplies the trivial () to them to keep the pipe going.

JonathanDate: 2012-03-25T00:08Zx

That makes sense now. I just with that there were a way to prevent someone from piping in information to a Source. For example, they could pump in various bottom values. The other thing that bothers me is the idea that it could close with remaining input, when there is no input.

I guess it doesn't matter, as you can't extract information from bottom values without being unsafe and the "remaining input" will almost always be ignored.

Gabriel GonzalezDate: 2012-03-25T00:29Zx

But that's the thing. You don't actually hook the producer up to anything upstream. The forever $ yield () example was just a conceptual analogy. In principle, you can feed it () manually (and that's what runPipe does to ensure correctness). And there is absolutely no information you can extract from an input type of (). An await with an input type of () extracts no more information than a return () would and returns immediately without querying any other pipe if you are the one supplying it with ()s.

applicativeDate: 2012-03-25T22:37Zx

I was having trouble finding a Monad instance, once can't mechanically follow the existing pipes libraries. In particular it's hard to figure out what to do with the cases PipeM mpipe mr >>= f and NeedInput topipe pipevoid, but maybe I'm missing something.

Twan van LaarhovenDate: 2012-03-25T23:47Zx

The problem with the Monad instance for PipeM comes from the close action, which I have given type m r in the post. That might not actually work. These close actions will be invoked when the downstream pipe has no need for more inputs, and in that case the result will be ignored anyway. So I think the type for these close actions will have to be m ().

The second problem is with the 'no more input' handler in NeedInput. It might be easier to also give it the type Pipe m i o r. An alternative is to define the function noMoreInput :: Monad m => Pipe m i1 o r -> Pipe m i2 o r, but that will have a bad performance impact.

Heinrich ApfelmusDate: 2012-03-28T07:40Zx

Twan: Wait a moment, I'm not convinced by your argument about Void, because it equally applies to the library internals that read the HaveOutput constructor when o = Void. Why is it not clear from the types that the library internals can never receive a value of type Void@?

Reply

(optional)
(optional, will not be revealed)
Name of the lazy functional programming language I write about:
Use > code for code blocks, @code@ for inline code. Some html is also allowed.