If Haskell was a god, often would he be depicted with the ravens Modularity and Abstraction flying above him, hovering the world and reporting to him every detail of our whereabouts. Haskell would sit on the Throne of Purity and look upon the world with an eye full of wisdom. And in his hand, the mighty Haskell would wield the Spear of Lazy Lists, which is said to have the power to tackle each and every problem the world might have to face. And to honour him, we would code and abstract everything with lazy lists. For millennia would lists be used to map, filter, separate, merge, group, and so forth.
But, one day, the Real-World Serpent, son of the wicked Foldr, would come. And the Real-World Serpent carries an eternal hatred towards lazy lists. Oh, that dreaded Serpent, that will throw everything it can muster to prevent us from staying within the warm comfort of abstraction and laziness. The Serpent will assemble its minions, Early-close and Strictness of effects, and unleash its wrath upon our world. Foldl, son of Haskell and brother of Foldr, would lead humanity to its last bastion, Streamgard, and organize the final fight…
So, long story short,
streaming
is a library that
allows you to leverage the insights you have gained while manipulating lazy
lists in Haskell to handle effectful streams of data. We already talked about
streaming
on this blog, with
this post
discussing the IO part and
this one comparing it to
pipes and
conduit. Here, we will be using
streaming
for highly efficient data processing and filtering. To this effect, we will use
it conjointly with another library,
foldl
, which gives us an
Applicative
interface to the usual list functions. In this blog post we will
apply them to the task of computing some statistics about a distribution of
data. We want to be able to:
- process the input data stream only once (aka in one pass),
- never repeat the effects that were used to produce that data stream,
- maintain the possibility to use the input stream as if it were a list, for instance by splitting it into two subparts, sending each subpart to be processed by a specific function.
So lets imagine that the statistics I want to compute on my input data distributions take the shape of a simple summary. This is what I want to obtain in the end:
data Summary v a = Summary
{ summaryLength :: Int
, summaryMins :: [a]
, summaryMaxes :: [a]
, summaryMean :: v
, summaryStdDev :: v
}
deriving (Show)
Nothing too fancy here, I just want to be able to compute the length, the n
smallest elements, the n'
biggest elements, the mean and the standard deviation
of my distribution. We distinguish the types a
and v
here because our input
distribution does not have to be numerical, as long as we have a projection a -> v
available. This way, we can compute a summary of a stream of (Double, String)
tuples, for instance, if the projection is just fst
.
So let’s have a little reminder of our conditions. We want to be able to read
the input data only once. But, we still want modularity and reusability. We do
not want to have to recode our Summary
-computing function every time we want
to add a new field, and we would like to reuse already existing functions
computing these statistics. And this is where the foldl
package comes in.
This package defines a type Fold
as follows:
data Fold a b = forall acc. Fold (acc -> a -> acc) acc (acc -> b)
You might recognize here the typical arguments of the classical foldl
function
of the Prelude
: a
is the type of each element of the input stream we
consume, the first field (acc -> a -> acc)
is an accumulation function and the
second field acc
is the initial value of the accumulator. The new component
is the b
type parameter and the last field (acc -> b)
. This one is called
extract. It is used to extract the final value out of the accumulator. This is
necessary so that Fold a
can be a Functor
and therefore an
Applicative
. See the
original blog post
and this talk by Gabriel Gonzalez
for more detail, though be aware that Fold
had a different shape back then.
One of the central ideas of the foldl
library is that Fold
implements the
Applicative
type class:
instance Applicative (Fold a)
Crucially, this instance combines two Fold
s, into a guaranteed one-pass
traversal of the data. Therefore we can safely decompose the computation of a
Summary
as follows:
import qualified Control.Foldl as L
import Data.Function (on)
summarizeBy :: (Floating v, Ord v)
=> (a -> v) -> Int -> Int -> L.Fold a (Summary v a)
summarizeBy f nMins nMaxes = Summary
<$> L.length
<*> collect ((>=) `on` f) nMins
<*> collect ((<=) `on` f) nMaxes
<*> L.premap f L.mean
<*> L.premap f L.std
What’s happening here? We are using a few of the functions already present in
the foldl
package and a new one, so let’s delve into it a bit. The function
summarizeBy
takes a projection f
, which we talked about earlier, the number
of smallest elements we want to collect and the number of biggest elements. Then
our five statistics are computed:
L.length :: L.Fold a Int
gives us the number of elements in the input.collect
, which we will define a bit later, accumulates either the mins or the maxes given a comparison function.L.mean
gives us the average. We useL.premap f
to turn it into a fold that will work on our projectionf
.L.std
gives us the standard deviation.
The combination of the above gives us a Fold a (Summary v a)
, something that
will consume a stream of a
’s and output a summary. At this point, nothing is
consumed, we have only composed folds together, and a Fold
is agnostic of the
exact nature of the input. Running it on any Foldable
datatype for instance is
just a matter of calling:
L.fold (summarizeBy id 3 3) [1..100]
The only function not provided by the foldl
package is the collect
function. Defining it as a brand new
Fold
is simple:
import Data.Sequence as Seq
collect :: (a -> a -> Bool) -> Int -> L.Fold a [a]
collect skipPred n = L.Fold insertPop Seq.empty (L.fold L.list)
where
insertPop acc x
| Seq.length acc < n = insert x acc
| otherwise = pop (insert x acc)
insert x s = let (before, after) = Seq.spanl (skipPred x) s
in before <> Seq.singleton x <> after
pop s = case viewr s of
s' :> _ -> s'
_ -> s
Here we manually defined a new Fold
from the three elements we mentioned
earlier: an accumulation function (insertPop
), an initial accumulator value
(Seq.empty
) and an extract function ((L.fold L.list)
, which also uses a
Fold
to turn the final sequence into a plain list).
Now, the astute reader will notice we left streaming
aside. Let’s get back to
it. Let’s use as an input the classic
Titanic dataset:
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
...
We want to get two different summaries for the fares: one for the passengers
that survived and one for those who did not. First, let’s load the CSV into a
stream
by using the
streaming-cassava
and
streaming-bytestring
packages:
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad (mzero)
import qualified Data.ByteString.Streaming as BS
import Streaming
import Streaming.Cassava
data Passenger { name :: !String, fare :: !Double, survived :: !Bool }
deriving (Show)
instance FromNamedRecord Passenger where
parsedNamedRecord m =
Person <$> m .: "Name" <*> m .: "Fare" <*> (toBool =<< (m .: "Survived"))
where toBool 0 = return False
toBool 1 = return True
toBool _ = mzero
streamCsv :: (MonadResource m) => Stream (Of Passenger) m ()
streamCsv = decodeByName (BS.readFile ".../titanic.csv")
Nothing too fancy here, just a bit of required boilerplate to be able to read
Passenger
s from the CSV file. MonadResource
is necessary to track the files
opened by our program. The type Stream (Of Passenger) m ()
means that we will
be manipulating a stream whose elements are Passenger
s, that will run some
effects in a monad m
and return no result in the end.
Now, lets split that input in two different substreams:
import qualified Streaming.Prelude as S
aliveDead :: Stream (Of Passenger) (Stream (Of Passenger) m) ()
aliveDead = S.partition survived streamCsv
Let’s look at the type of aliveDead
: it is a Stream
over another
Stream
. Stream (Of a)
is actually a monad transformer, the way the
partitioning happens is by creating two layers: one for the live passengers and
one for the dead ones. It’s not exactly a tuple of two streams (as it would be
with Data.List.partition
), but is has the same advantages: each layer can be
processed by different functions which don’t have to know where the stream they
process lies in the monad stack. Therefore, each one of these functions can be
expressed as:
summarizePassengers
:: (Monad m) => Stream (Of Passenger) m a -> m (Of (Summary Double Passenger) a)
summarizePassengers = L.purely S.fold (summarizeBy fare 3 3)
where m
can be any monad. This can be the bottom MonadResource or another
Stream
, summarizePassengers
does not mind and does not have to! Of
behaves
like a tuple, so it simply means that we return both the newly computed
Summary
and an a
(a
may just be ()
, but here we have to be a little more
general). S.fold
is the basic folding function for streams. L.purely fn f
“unpacks” a Fold
f
and calls a folding function fn
. So now, getting our
summaries is just a matter of
runAll = runResourceT $ do
(summaryAlive :> summaryDead :> ()) <-
summarizePassengers $ summarizePassengers aliveDead
...
So in the end, we split the input file in two substreams, we computed various
statistics twice, and despite all this streaming
and foldl
guarantee that
the input will be read only once in bounded memory.
These techniques are currently being applied by Tweag I/O in the context of a project with Novadiscovery. Novadiscovery is a consulting company for in silico clinical trials, namely simulation of virtual patients through biomodeling. Parts of this blog post are actual code from the tools we develop with them.