Factotum 0.3.0 released with webhooks

We’re pleased to announce the 0.3.0 release of Snowplow’s DAG running tool Factotum! This release centers around making DAGs easier to create, monitor and reason about, including adding outbound webhooks to Factotum.

In the rest of this post we will cover:

  1. Improving the workflow when creating DAGs
  2. Improving job monitoring using webhooks
  3. Behaviors on task failure
  4. Extras
  5. Downloading and running Factotum
  6. Roadmap
  7. Contributing

1. Improving the workflow when creating DAGs

We’ve decided that to separate commands effectively, we needed to move to a “subcommand” style arguments system. For this reason, what was originally factotum <your factfile> is now factotum run <your factfile>. All new features will follow this scheme.

The other improvements around workflow broadly fall into the following categories: factfile validation, dry runs and Graphviz dotfile exports. These are discussed in the following sections!

Validating factfiles

Factfiles have always been schema’d and validated against the factfile schema. It’s not always convenient to locate this schema and ensure that the factfile you’re working on is valid, so as of version 0.3.0 we’ve introduced a built-in validation command. This includes checking that: your factfile is valid JSON; that it adheres to the JSON schema; and that each task can be executed.

You can use it like this:

factotum validate <your factfile>

If the factfile is valid, Factotum will respond:

<your Factfile> is a valid Factotum Factfile!

and if it’s not, you’ll get a message explaining the problem:

<your Factfile> is not a valid Factotum Factfile: Invalid JSON at line 1 column 1

Dry runs

Validating a factfile ensures Factotum can process your job. Dry runs are a way to show how your job will be executed, including a full output simulation.

Dry runs can be executed in the following way:

factotum run <your factfile> --dry-run

which, depending on the factfile will return something like this:

sample Factotum dry run output

The COMMAND here is the real command Factotum will use to execute your task, which can be copy-pasted and run in a shell if desired.

Graphviz dot output

For complicated DAGs, it’s not always easy to tell the dependency tree from the text output of a program. That’s why as of 0.3.0 Factotum supports exporting your DAG as a Graphviz dotfile. This export can be used to visualise your Factfile in any of a number of programs, or a web based renderer.

factotum dot <your factfile> --output dag.dot

This will build a dotfile representation of <your Factfile> and put the result in dag.dot. Here’s what you can expect to see after you’ve rendered this dotfile:

sample "echo" dotfile graph

(I used the command dot -Tsvg echo.dot -o echo-dot-output.svg to generate this image, using the graphviz package in Ubuntu.)

2. Improving job monitoring using webhooks

Data pipelines typically run on clusters, with a job or part of a job being assigned to one or more machines (which may or may not be virtual). It won’t necessarily be known in advance which box will run a specific job, or be straightforward to work out where a previous run was executed (or even if the box is still running).

This creates a problem unique to cluster-based software: how do I keep an auditable log of the jobs that have run, and how do I know which are currently running (and what they’re doing)?

There are a number of ways to “bridge” applications which use traditional log files for cluster use, for example using NFS and a central “log store”. However this solution isn’t perfect, and to make a log file really auditable it needs to be structured - a stream of unstructured messages is difficult to reason about (and query).

Many tools such as Airflow or Chronos would at this point bundle in MySQL or Postgres or Cassandra and use that to store state over time. This approach makes technical sense, but it does create a new and opaque data silo within your organisation; all this information is hidden away somewhere, and liable to change format between releases.

We’ve chosen a different path based on the Zen of Factotum and the idea that you should be able to depend on an abstraction rather than a specific implementation or tool. As of release 0.3.0, Factotum now can emit self describing events to a HTTP (or HTTPS) endpoint of your choice with the current state of the running job. This event is also suitable for ingesting into your existing Snowplow pipeline (though this is by no means required!).

Running with webhooks

The new functionality can be run by adding the --webhook <url> option. For example:

factotum run <your factfile> --webhook "http://my-endpoint.com"

You can ingest these events into Snowplow using the Iglu webhook adapter POST support (requires R83+):

factotum run <your factfile> --webhook "https://my-snowplow-collector.com/com.snowplowanalytics.iglu/v1"

When updates are sent

Updates are split into two different event types. The first is triggered when the state of the job changes, for example when the job is started or finished. The second is when the state of a specific task changes - for example, when a task is started or failed.

Update format

Job updates are described by com.snowplowanalytics.factotum/job_update/jsonschema/1-0-0 events, available in Iglu Central.

Here’s an example of a job update:

 1 {
 2     "schema": "iglu:com.snowplowanalytics.factotum/job_update/jsonschema/1-0-0",
 3     "data": {
 4         "applicationContext": {
 5             "name": "factotum",
 6             "version": "0.3.0"
 7         },
 8         "factfile": "ewogICAgInNjaGVtYSI6ICJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5mYWN0b3R1bS9mYWN0ZmlsZS9qc29uc2NoZW1hLzEtMC0wIiwKICAgICJkYXRhIjogewogICAgICAgICJuYW1lIjogImVjaG8gb3JkZXIgZGVtbyIsCiAgICAgICAgInRhc2tzIjogWwogICAgICAgICAgICB7CiAgICAgICAgICAgICAgICAibmFtZSI6ICJlY2hvIGFscGhhIiwKICAgICAgICAgICAgICAgICJleGVjdXRvciI6ICJzaGVsbCIsCiAgICAgICAgICAgICAgICAiY29tbWFuZCI6ICJlY2hvIiwKICAgICAgICAgICAgICAgICJhcmd1bWVudHMiOiBbICJhbHBoYSIgXSwKICAgICAgICAgICAgICAgICJkZXBlbmRzT24iOiBbXSwKICAgICAgICAgICAgICAgICJvblJlc3VsdCI6IHsKICAgICAgICAgICAgICAgICAgICAidGVybWluYXRlSm9iV2l0aFN1Y2Nlc3MiOiBbIDMgXSwKICAgICAgICAgICAgICAgICAgICAiY29udGludWVKb2IiOiBbIDAgXQogICAgICAgICAgICAgICAgfQogICAgICAgICAgICB9LAogICAgICAgICAgICB7CiAgICAgICAgICAgICAgICAibmFtZSI6ICJlY2hvIGJldGEiLAogICAgICAgICAgICAgICAgImV4ZWN1dG9yIjogInNoZWxsIiwKICAgICAgICAgICAgICAgICJjb21tYW5kIjogImVjaG8iLAogICAgICAgICAgICAgICAgImFyZ3VtZW50cyI6IFsgImJldGEiIF0sCiAgICAgICAgICAgICAgICAiZGVwZW5kc09uIjogWyAiZWNobyBhbHBoYSIgXSwKICAgICAgICAgICAgICAgICJvblJlc3VsdCI6IHsKICAgICAgICAgICAgICAgICAgICAidGVybWluYXRlSm9iV2l0aFN1Y2Nlc3MiOiBbIDMgXSwKICAgICAgICAgICAgICAgICAgICAiY29udGludWVKb2IiOiBbIDAgXQogICAgICAgICAgICAgICAgfQogICAgICAgICAgICB9LAogICAgICAgICAgICB7CiAgICAgICAgICAgICAgICAibmFtZSI6ICJlY2hvIG9tZWdhIiwKICAgICAgICAgICAgICAgICJleGVjdXRvciI6ICJzaGVsbCIsCiAgICAgICAgICAgICAgICAiY29tbWFuZCI6ICJlY2hvIiwKICAgICAgICAgICAgICAgICJhcmd1bWVudHMiOiBbICJhbmQgb21lZ2EhIiBdLAogICAgICAgICAgICAgICAgImRlcGVuZHNPbiI6IFsgImVjaG8gYmV0YSIgXSwKICAgICAgICAgICAgICAgICJvblJlc3VsdCI6IHsKICAgICAgICAgICAgICAgICAgICAidGVybWluYXRlSm9iV2l0aFN1Y2Nlc3MiOiBbIDMgXSwKICAgICAgICAgICAgICAgICAgICAiY29udGludWVKb2IiOiBbIDAgXQogICAgICAgICAgICAgICAgfQogICAgICAgICAgICB9CiAgICAgICAgXQogICAgfQp9Cg==",
 9         "jobName": "echo order demo",
10         "jobReference": "69d3b6cea7c85404060d7974466cf269fd719ba97cf2781b8c90cb2ea5594908",
11         "jobTransition": {
12             "currentState": "SUCCEEDED",
13             "previousState": "RUNNING"
14         },
15         "runDuration": "PT2.144857905S",
16         "runReference": "bd575102a6dd3669e79bfa92a64c9d750c4d0f59ef4f7281e715f109d623d0b5",
17         "runState": "SUCCEEDED",
18         "startTime": "2016-10-20T19:59:50.256Z",
19         "tags": {
20             "foo": "bar",
21             "foo2": "bar2"
22         },
23         "taskStates": [
24             {
25                 "duration": "PT0.002363397S",
26                 "returnCode": 0,
27                 "started": "2016-10-20T19:59:50.256Z",
28                 "state": "SUCCEEDED",
29                 "stdout": "alpha",
30                 "taskName": "echo alpha"
31             },
32             {
33                 "duration": "PT0.001529858S",
34                 "returnCode": 0,
35                 "started": "2016-10-20T19:59:50.260Z",
36                 "state": "SUCCEEDED",
37                 "stdout": "beta",
38                 "taskName": "echo beta"
39             },
40             {
41                 "duration": "PT0.002031825S",
42                 "returnCode": 0,
43                 "started": "2016-10-20T19:59:50.262Z",
44                 "state": "SUCCEEDED",
45                 "stdout": "and omega!",
46                 "taskName": "echo omega"
47             }
48         ]
49     }
50 }

Task updates are described by com.snowplowanalytics.factotum/task_update/jsonschema/1-0-0 events, also available in Iglu Central.

Here’s an example of a task update:

 1 {
 2     "schema": "iglu:com.snowplowanalytics.factotum/task_update/jsonschema/1-0-0",
 3     "data": {
 4         "applicationContext": {
 5             "name": "factotum",
 6             "version": "0.3.0"
 7         },
 8         "factfile": "ewogICAgInNjaGVtYSI6ICJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5mYWN0b3R1bS9mYWN0ZmlsZS9qc29uc2NoZW1hLzEtMC0wIiwKICAgICJkYXRhIjogewogICAgICAgICJuYW1lIjogImVjaG8gb3JkZXIgZGVtbyIsCiAgICAgICAgInRhc2tzIjogWwogICAgICAgICAgICB7CiAgICAgICAgICAgICAgICAibmFtZSI6ICJlY2hvIGFscGhhIiwKICAgICAgICAgICAgICAgICJleGVjdXRvciI6ICJzaGVsbCIsCiAgICAgICAgICAgICAgICAiY29tbWFuZCI6ICJlY2hvIiwKICAgICAgICAgICAgICAgICJhcmd1bWVudHMiOiBbICJhbHBoYSIgXSwKICAgICAgICAgICAgICAgICJkZXBlbmRzT24iOiBbXSwKICAgICAgICAgICAgICAgICJvblJlc3VsdCI6IHsKICAgICAgICAgICAgICAgICAgICAidGVybWluYXRlSm9iV2l0aFN1Y2Nlc3MiOiBbIDMgXSwKICAgICAgICAgICAgICAgICAgICAiY29udGludWVKb2IiOiBbIDAgXQogICAgICAgICAgICAgICAgfQogICAgICAgICAgICB9LAogICAgICAgICAgICB7CiAgICAgICAgICAgICAgICAibmFtZSI6ICJlY2hvIGJldGEiLAogICAgICAgICAgICAgICAgImV4ZWN1dG9yIjogInNoZWxsIiwKICAgICAgICAgICAgICAgICJjb21tYW5kIjogImVjaG8iLAogICAgICAgICAgICAgICAgImFyZ3VtZW50cyI6IFsgImJldGEiIF0sCiAgICAgICAgICAgICAgICAiZGVwZW5kc09uIjogWyAiZWNobyBhbHBoYSIgXSwKICAgICAgICAgICAgICAgICJvblJlc3VsdCI6IHsKICAgICAgICAgICAgICAgICAgICAidGVybWluYXRlSm9iV2l0aFN1Y2Nlc3MiOiBbIDMgXSwKICAgICAgICAgICAgICAgICAgICAiY29udGludWVKb2IiOiBbIDAgXQogICAgICAgICAgICAgICAgfQogICAgICAgICAgICB9LAogICAgICAgICAgICB7CiAgICAgICAgICAgICAgICAibmFtZSI6ICJlY2hvIG9tZWdhIiwKICAgICAgICAgICAgICAgICJleGVjdXRvciI6ICJzaGVsbCIsCiAgICAgICAgICAgICAgICAiY29tbWFuZCI6ICJlY2hvIiwKICAgICAgICAgICAgICAgICJhcmd1bWVudHMiOiBbICJhbmQgb21lZ2EhIiBdLAogICAgICAgICAgICAgICAgImRlcGVuZHNPbiI6IFsgImVjaG8gYmV0YSIgXSwKICAgICAgICAgICAgICAgICJvblJlc3VsdCI6IHsKICAgICAgICAgICAgICAgICAgICAidGVybWluYXRlSm9iV2l0aFN1Y2Nlc3MiOiBbIDMgXSwKICAgICAgICAgICAgICAgICAgICAiY29udGludWVKb2IiOiBbIDAgXQogICAgICAgICAgICAgICAgfQogICAgICAgICAgICB9CiAgICAgICAgXQogICAgfQp9Cg==",
 9         "jobName": "echo order demo",
10         "jobReference": "69d3b6cea7c85404060d7974466cf269fd719ba97cf2781b8c90cb2ea5594908",
11         "runDuration": "PT1.898637649S",
12         "runReference": "bd575102a6dd3669e79bfa92a64c9d750c4d0f59ef4f7281e715f109d623d0b5",
13         "runState": "RUNNING",
14         "startTime": "2016-10-20T19:59:50.256Z",
15         "tags": {
16             "foo": "bar",
17             "foo2": "bar2"
18         },
19         "taskStates": [
20             {
21                 "duration": "PT0.002363397S",
22                 "returnCode": 0,
23                 "started": "2016-10-20T19:59:50.256Z",
24                 "state": "SUCCEEDED",
25                 "stdout": "alpha",
26                 "taskName": "echo alpha"
27             },
28             {
29                 "duration": "PT0.001529858S",
30                 "returnCode": 0,
31                 "started": "2016-10-20T19:59:50.260Z",
32                 "state": "SUCCEEDED",
33                 "stdout": "beta",
34                 "taskName": "echo beta"
35             },
36             {
37                 "duration": "PT0.002031825S",
38                 "returnCode": 0,
39                 "started": "2016-10-20T19:59:50.262Z",
40                 "state": "SUCCEEDED",
41                 "stdout": "and omega!",
42                 "taskName": "echo omega"
43             }
44         ],
45         "taskTransitions": [
46             {
47                 "currentState": "SUCCEEDED",
48                 "previousState": "RUNNING",
49                 "taskName": "echo omega"
50             }
51         ]
52     }
53 }

Both events share many common fields. A description of all the fields in both events is given below (split up into fields common to both events, and then those specifically in task updates and job updates).

Common Fields

Field Required Description
schema Yes Self describing event wrapper
data Yes Self describing event wrapper
data.jobName Yes The name of the job, as it appears in the Factfile
data.jobReference Yes An ID unique to the Factfile for this job. If you’re using user defined tags, jobs with the same Factfile and differing tags will have different job IDs as tags are included when calculating job IDs.
data.runReference Yes A globally unique ID for this run
data.tags Yes An object representing any user defined tags for the running job
data.factfile Yes A base64 encoded copy of the Factfile that’s running
data.applicationContext.version Yes The version of Factotum that’s executing the job
data.runState Yes The current state of the job. This can be WAITING,RUNNING,SUCCEEDED or FAILED
data.startTime Yes The time the job started, in ISO8601 format
data.runDuration Yes The running time of the job so far in ISO8601 duration format
data.taskStates Yes An array of information on the state of each task
data.taskStates[_].taskName Yes The name of the task, as it appears in the Factfile
data.taskStates[_].state Yes The current state of the task. This can be WAITING,RUNNING,SUCCEEDED, SUCCEEDED_NO_OP, FAILED or SKIPPED
data.taskStates[_].started No Optional. The ISO8601 start time of the task
data.taskStates[_].duration No Optional. The ISO8601 duration of the task
data.taskStates[_].stdout No Optional. The output of the task to stdout
data.taskStates[_].stderr No Optional. The output of the task to stderr
data.taskStates[_].returnCode No Optional. The return code of the task
data.taskStates[_].errorMessage No Optional. The reason the task failed, or was skipped

Job transitions

job_update events have the following extra fields that provide information about the current state of the job.

Field Required Description
data.jobTransition Yes An object explaining the reason this event was emitted
data.jobTransition.previousState Yes The state of the job prior to the change occurring. This can be WAITING,RUNNING,SUCCEEDED, FAILED or null (if the job has started).
data.jobTransition.currentState Yes The state of the job after the change has occurred. This can be WAITING,RUNNING,SUCCEEDED, or FAILED.

Task transitions

task_update events have the following extra fields that provide information about the changes in state of tasks.

Field Required Description
data.taskTransitions[_] Yes An array of task level changes in execution. Each element represents the change in state for a single task
data.taskTransitions[_].taskName Yes The name of the task that has changed state (as it appears in the Factfile).
data.taskTransitions[_].previousState Yes The state the given task was previously in. This can be RUNNING, WAITING, SUCCEEDED,SUCCEEDED_NO_OP, FAILED, or SKIPPED
data.taskTransitions[_].currentState Yes The state the given task is now in. This can be RUNNING, WAITING, SUCCEEDED,SUCCEEDED_NO_OP, FAILED, or SKIPPED. More information on the current state of the task may be available in the taskStates field.

Tags

Tags are a way to add custom meta-data to your job runs. You can add any set of key-value pairs to your jobs - when using webhooks they’ll have the following effects:

  1. Appear in all webhook events under the tags field
  2. Be used in addition to the Factfile itself to calculate the job reference
    • This means that the same Factfile can generate two or more different job references if required

In both events, custom tags look like this:

1 "tags": {
2             "foo": "bar",
3             "foo2": "bar2"
4         },

You can add tags to your job with the --tag argument. To add the foo tag with the value bar:

factotum run samples/echo.factotum --webhook http://localhost --tag "foo,bar"

Multiple tags can be added by repeating the argument:

factotum run samples/echo.factotum --webhook http://localhost --tag "foo,bar" --tag "foo2,bar2"

3. Behaviors on task failure

Fail fast vs continue as far as possible

In previous releases of Factotum, when a task fails Factotum will stop processing your job as soon as possible. We call this behaviour “failing fast”; this is the default behavior of Make too (without the --keep-going flag being enabled). Failing fast is simple and predictable, however it often results in a lot of tasks that could have been run to not run at all. It’s also difficult to reason about, because the final state of the DAG depends not just on which tasks failed, but how long different tasks ran for.

That’s why as of this release, we’re switching to a different model. Factotum will now “keep going” and complete as many tasks as possible, with the tasks that depend on failing tasks being the only ones which are skipped.

Here’s a few diagrams cataloguing the difference in behavior. On the left is the previous version(s) of Factotum, and on the right is version 0.3.0+:


Factotum 0.2.0

no change in simple dags old

Factotum 0.3.0+

no change in simple dags new

In trivial DAGs (as shown above) the behavior between this version of Factotum and prior versions is the same.


Factotum 0.2.0

the change in dags old

Factotum 0.3.0+

the change in dags new

In DAGs with multiple dependency trees, in prior versions Factotum would stop as soon as possible (left). In this version Factotum will complete as much as possible (right).


Factotum 0.2.0

the change in complex dags old

Factotum 0.3.0+

the change in complex dags new

When DAGs split into parallel streams of execution, any sub-task that eventually depends on a failed task will now be skipped (right), compared to terminating at the first failure (prior versions, left).


Terminating early

A task that requests early DAG termination has always worked in the same way as failures (except that it’s not a failure!).

To keep things straightforward, we’ve also altered how “no operations” work in version 0.3.0 to match the new way of handling task failures (shown above). They will continue to “skip” subsequent tasks without generating an error.

4. Extras

macOS support

Factotum 0.3.0 now ships an macOS version. You can see how to get a copy here!

Turning off terminal colours with –no-colours

Colours aren’t for everyone, and they can be distracting if you’re piping data to a file (or another source that doesn’t understand colour codes). In version 0.2.0 we introduced support for the CLICOLOR environment variable (as described here). In this release we’re complementing that with a command line argument --no-colour that forces ANSI terminal colours to be turned off.

Eating our own dog food

Factotum is now released using Factotum! This means you can see a real example of using a Factfile here, including an example of “terminating early” and how it applies to builds.

5. Downloading and running Factotum

Factotum is now available for macOS and Linux (x86_64).

If you’re running Linux:

wget https://bintray.com/artifact/download/snowplow/snowplow-generic/factotum_0.3.0_linux_x86_64.zip unzip factotum_0.3.0_linux_x86_64.zip wget https://raw.githubusercontent.com/snowplow/factotum/master/samples/echo.factotum

If you’re running macOS:

wget https://bintray.com/artifact/download/snowplow/snowplow-generic/factotum_0.3.0_darwin_x86_64.zip unzip factotum_0.3.0_darwin_x86_64.zip wget https://raw.githubusercontent.com/snowplow/factotum/master/samples/echo.factotum

This series of commands will download the 0.3.0 release, unzip it in your current working directory and download a sample job for you to run. You can then run Factotum in the following way:

./factotum run ./echo.factotum

6. Roadmap for Factotum

We’re taking an iterative approach with Factotum - today Factotum won’t give you an entire stack for monitoring, scheduling and running data pipelines, but we plan on growing it into a set of tools that will.

Factotum will continue to be our “job executor”, but a more complete ecosystem will be developed around it - ideas include an optional scheduler, audit logging, user authentication, Mesos support and more. If you have specific features to suggest, please add a ticket to the GitHub repo.

7. Contributing

Factotum is completely open source - and has been from the start! If you’d like to get involved, or just try your hand at Rust, please check out the repository.

Thoughts or questions? Come join us in our Discourse forum!

Ed Lewis

Ed is a data engineer at Snowplow. You can find him on GitHub.