Funflow is a system for building and running workflows. It’s a system we’ve been working on for the last few months with a small number of clients. In this blog post we’ll talk about what workflows are, why we built Funflow, and what we’d like to do with it in the future.
What is a workflow?
At its core, a workflow takes some inputs and produce some outputs, possibly producing some side effects along the way. Of course, this basically describes any program. Workflow systems distinguish themselves in a few ways:
- Workflows are composed of a number of steps, which may themselves be independent programs (or workflows). Composition of this higher level of programs might be done by domain specialists rather than programmers.
- Workflows are often long running, with some likelihood of failure halfway through. In such cases, we would want to resume them midway through without needing to rerun the earlier steps.
- Workflows are often expensive, to the point where executing them on clusters, rather than a single node, can save a lot of time.
Workflow systems have perhaps seen the most use in the business process space, where they allow non-programmers to automate business processes (such as authorising a purchase order) which may involve multiple steps, some of which are automatic, and others which might involve human intervention.
Another area heavily involving workflows, and one in which we are particularly interested, is in scientific data processing. For example, in bioinformatics one might develop a workflow (also often called “pipeline” in this space) to perform some in-silico analysis of sequenced gene data. Such a workflow will be customised for a particular task, and may then be run for each sequenced sample.
Wherefore Funflow?
There are innumerable workflow systems already out there, with a myriad of different features. Tools like Apache Taverna provide a very mature solution for building enterprise workflows, with integrations for working with systems such as Hadoop and powerful GUI editors for composing workflows. At the other end of the scale, Luigi and Airflow are libraries for composing workflows in Python code.
So why build a new one? It came down to three things:
- Integration
- Caching
- Control over control flow
Integration
Most of the simple workflow systems we looked at were designed to be run from a
command line, by a person or at best by a tool like cron
or at
. We find,
however, that we often want to run a workflow as part of a larger application.
For example, when doing data analysis one might wish to run classifiers on
different subsets of data, and then visualise the results. We still want to be
able to use classifiers written in any language or running on a different
machine, but we should be able to track the progress in a surrounding program,
and easily get the results back. When we tried to do this with Luigi, we found
ourselves needing to parse the log output just to find out the ID of the job
we’d triggered! So integration, for us, means:
- First class support for pulling values into, and out of, the host program.
- Ability to trigger a pipeline from within your program and deal with its output as any other value.
Funflow’s workflows are just Haskell programs, built using arrow syntax. With Funflow, we can easily intermix steps done inside our Haskell process and steps done outside, for example by another program. Arrows give us,
- (the generality to model multiple types of computation)
a function
a -> b
is an arrow, as is a monadic functionMonad m => a -> m b
, or a stream transformerStream a -> Stream b
. - (introspectability) but because they’re less powerful than monads, we also get the ability to introspect them and do things like drawing the dependency graph.
- (explicit dependencies) Unlike traditional
do
notation, arrows must have all their arguments passed in explicitly - they cannot close over arbitrary variables in their environment. This means that we cannot accidentally pull in unneeded dependencies to our results. - (static type checking) And being a Haskell DSL means we get all the benefits of typing to prevent composing invalid workflows.
Caching
There are two hard problems in programming: naming things, cache invalidation, and off by one errors.
Consider the following two workflows, expressed as Haskell functions:
flow1 :: Foo -> Bar
flow1 = extractBar . someExpensiveComputation . someTransformation
flow2 :: Foo -> Baz
flow2 = extractBaz . someExpensiveComputation . someTransformation
What we notice about these is that the first part of the computation is shared
between flow1
and flow2
. If we feed the same input Foo
, we would
particularly like not to repeat someExpensiveComputation
.
This is not an idle example; we often see workflows where part of the workflow involves preprocessing of a reference data set, which may be done multiple times, either by different users or when running a pipeline multiple times. Perhaps more importantly, it may often be desirable to tweak the parameters of some late stage of processing and rerun the pipeline - again, without rerunning the unchanged earlier parts.
In order to address this issue, Funflow borrows a couple of ideas from the Nix package manager. The first of these is to remove the notion that the user has any control over where and how the outputs of the intermediate steps in workflows are stored. Instead of the user controlling where files are output, Funflow manages a section of the file system known as the store. Entries inside the store are addressed by a unique hash (the second idea borrowed from Nix), determined by hashing both the inputs to a step and the definition of that step itself. When Funflow executes a step in a workflow, it first determines the hash of the inputs and the step definition to determine the output path. If this path already exists (since store items are immutable once written), we can skip the computation and use the result from the cache.
Crucially, Funflow goes further by trying to cache more aggressively than Nix. Whilst the hash of the inputs and step definition determines the path to which the step writes its output, upon completion of the step these outputs are moved into another path determined by their own hash: in other words, the store also works as content addressable storage. What’s the benefit of this? Well, firstly, it ensures that when multiple steps produce the same output, that output is cached only once on disk. However, it also solves the problem suggested by the following flows:
flow1 :: Int -> Bar
flow1 = extractBar . someExpensiveComputation . (* 2)
flow2 :: String -> Bar
flow2 = extractBar . someExpensiveComputation . length
In this example, the tails of the flows are similar. If I provide 4
to the
first flow and "workflow"
to the second, however, then these computations will
converge after their first steps, and before someExpensiveComputation
. If a
Nix derivation were used in this case, two outputs would be produced and
someExpensiveComputation
would run twice, because ultimately the inputs
differ. Funflow, on the other hand, allows computations to converge.
Doing sensible caching might seem like a poor reason to build our own workflow system. In reality, though, it’s a large part of what makes one up. Going back to our concepts above on what characterises a workflow as different from any other program, Funflow’s system for managing inputs and outputs addresses them pretty directly:
- Since long running pipelines are likely to fail, it’s important to make sure we can resume them from the point of failure, even if we’ve modified the pipeline in order to address that failure. At the same time, we need to be sure that we don’t accidentally find ourselves reusing cached data if it’s inappropriate.
- In order to ship computations between machines, we need to be sure that the full environment to run that computation is available on the target machine. By making this environment explicit, and constraining what can be in it, Funflow can make this safe and easy to do.
What’s more, the same approach we take in doing caching correctly provides solutions to a number of other important problems that arise in workflow management. For example, one important component of scientific research is for other teams to be able to reproduce the results underlying a paper. Funflow makes it possible to capture the closure for a given output, which can then be distributed easily to other locations. By using Docker containers for the processing of individual steps, and by strictly controlling their inputs, we can also ensure that computation is isolated from other parts of the system, reducing the “noise” which can sometimes change results.
Control over control flow
As mentioned in the introduction, workflows can often be expected to fail. When they do, we’d like to control exactly what the results of that are. For example, sometimes those failures may be expected and we want to cause the workflow to take a different path when they occur. On other occasions, we want to completely abort.
Funflow supports exception handling within the workflow, so we can handle these situations correctly. It goes somewhat further: Funflow supports an effects system to allow you to define your own universe of actions which can happen during flows, and even allows you to swap out the interpreter entirely if needed.
Why not use Nix/bazel/another build system?
We’ve written in the past about how we use Nix at Tweag, and mentioned above how various features of Funflow were inspired by it. Likewise, Tweag has worked on adapting Bazel to build polyglot Haskell projects. So one might be tempted to ask - why not just use one of these?
One certainly could build such a workflow using Nix or Bazel. But Nix doesn’t support the same notion of content addressable storage as Funflow does (although it might be getting it soon). Both systems are untyped, meaning a week-long workflow could fail halfway through due to mismatched outputs to inputs, a type of bug that could have been caught by a type checker. Furthermore, in neither systems do we have precise control of the flow graph evaluator. We can’t easily program custom error recovery strategies, for example.
Beyond this, there’s a wider point about the difference in composition between Funflow (and most workflow systems) and build systems. In build systems one builds a tower of targets, each one explicitly depending on the targets below it. In a workflow system, one builds a series of steps with inputs and outputs, and then wire them together to create a pipeline. Each step has no explicit knowledge of the steps before or after it. In software development folklore, this is called dependency inversion.
These approaches are dual to each other, and it’s certainly possible to translate between them. But taking this approach makes it easier to do things like swap steps in the middle of a pipeline in and out - something we often wish to do in a workflow.
An example flow
To conclude this post, here’s an example of using Funflow to compile and run a small C program. This demonstrates a number of Funflow’s features:
- Interleaving of steps defined in Haskell code and steps running outside of the Haskell process.
- The use of docker containers for task isolation.
- And, of course, running this flow multiple times will re-use cached results from the Funflow content store.
The example is shortened in favour of readability. The full code is available in the Funflow repository.
-- | This flow takes a number, builds a C program to process that number,
-- and returns the program's output.
mainFlow :: SimpleFlow Int String
mainFlow = proc n -> do
moduleDouble <- compileModule -< "int times2(int n) { return 2*n; }\n"
moduleSquare <- compileModule -< "int square(int n) { return n*n; }\n"
moduleMain <- compileModule -<
"#include <stdio.h>\n\
\#include <stdlib.h>\n\
\int times2(int n);\
\int square(int n);\
\int main(int argc, char **argv) {\
\ int n = atoi(argv[1]);\
\ int r = times2(n) + square(n);\
\ printf(\"%d\\n\", r);\
\}"
exec <- compileExec -< [moduleDouble, moduleSquare, moduleMain]
out <- runExec -< (exec, [show n])
readString_ -< out
-- | This flow takes a string which is assumed to be the source code
-- for a 'C' module. It writes this to a file, then uses an external
-- step to compile the module.
compileModule :: SimpleFlow String (Content File)
compileModule = proc csrc -> do
cInput <- writeString -< (csrc, [relfile|out.c|])
scriptInput <- writeExecutableString -< (compileScript, [relfile|compile.sh|])
compiled <- compileDocker -< (cInput, scriptInput)
returnA -< compiled :</> [relfile|out.o|]
where
compileScript =
"#!/usr/bin/env bash\n\
\gcc -c -o $2 $1\n"
compileDocker = docker $ \(cInput, scriptInput) -> Docker.Config
{ Docker.image = "gcc"
, Docker.optImageID = Just "7.3.0"
, Docker.input = Docker.MultiInput
$ Map.fromList [ ("script", IPItem $ CS.contentItem scriptInput)
, ("data", IPItem $ CS.contentItem cInput)
]
, Docker.command = "/input/script/compile.sh"
, Docker.args = ["/input/data/out.c", "/output/out.o"]
}
-- | This flow takes a list of files which are assumed to be 'C' modules.
-- It uses an external step to compile those modules into an executable.
compileExec :: SimpleFlow [Content File] (Content File)
-- | This flow takes a file which is assumed to be an executable,
-- and a list of strings that are arguments for the executable.
-- It uses an external step to run the executable with the given arguments.
-- The output is stored in the file @out@ in the returned item.
runExec :: SimpleFlow (Content File, [String]) CS.Item
Conclusion
We think Funflow is a useful addition to the space of workflow management tools, and we’re looking forward to applying it to more use cases. We have so far used it for build and deployment, data visualisation and for running a bioinformatics pipeline. We’re increasingly focusing on making it easier to use, including working towards graphical composition of workflows so one can simply drag and drop them into place.
If you have a problem you’d like to apply Funflow to, then please check out the repo or get in touch.