Snowplow release 73 Cuban Macaw is now generally available! This release adds the ability to automatically load bad rows from the Snowplow Elastic MapReduce jobflow into Elasticsearch for analysis, and formally separates the Snowplow enriched event format from the TSV format used to load Redshift.
The rest of this post will cover the following topics:
- Loading bad rows into Elasticsearch
- Changes to the event format loaded into Redshift and Postgres
- Improved Hadoop job performance
- Better NAT traversal for the StorageLoader
- Getting help
1. Loading bad rows into Elasticsearch
This release brings to our batch pipeline a feature only previously available in our Kinesis pipeline: the ability to load your Snowplow bad rows from Amazon S3 into Elasticsearch for analysis.
This functionality is hugely helpful for diagnosing the causes of incoming events failing JSON Schema validation and investigating enrichment processing errors. We have tested this feature with Elasticsearch running natively on EC2, as well as with the great new Amazon Elasticsearch Service.
Here we are exploring bad rows for an internal Snowplow pipeline in Kibana:
If you need help setting up an Elasticsearch cluster for Snowplow, check out our Amazon Elasticsearch Service setup guide on our wiki.
To enable this in Snowplow, add an Elasticsearch target to your EmrEtlRunner configuration file:
Note that the “database” and “table” fields actually contain the index and type respectively where bad rows will be stored.
The “sources” field is an array of buckets from which to load bad rows. If you leave this field blank, then the bad rows buckets created by the current run of the EmrEtlRunner will be loaded. Alternatively you can explicitly specify an array of bad row buckets to load.
Note these updates to EmrEtlRunner’s command-line arguments:
- You can skip loading data into Elasticsearch by running EmrEtlRunner with the
- To run just the Elasticsearch load without any other EmrEtlRunner steps, explicitly skip all other steps using
- Note that running EmrEtlRunner with
--skip enrich,shredwill no longer skip the EMR job, since there is still the Elasticsearch step to run
Under the hood, our Hadoop Elasticsearch Sink uses the scalding-taps library, which itself uses elasticsearch-hadoop. For each Elasticsearch target, and for each S3 bucket source configured for that target, a separate step will be added to the jobflow to copy that source to that target. This means that if the job fails, you can tell by inspecting the job in the UI which of those copies has succeeded and which still needs to happen.
2. Changes to the event format loaded into Redshift and Postgres
In this release we have removed the direct dependency of the StorageLoader on the Snowplow enriched event format. Instead:
- Scala Hadoop Shred now copies the enriched events from the
enriched/goodbucket to the
- As part of the copy, Scala Hadoop Shred removes the
derived_contextscolumns – i.e. the three columns containing the self-describing JSONs which have just been shredded
- The StorageLoader the populates
atomic.eventsusing the JSON-less version of the TSV in
The short-term reason for this change was to remove the JSON columns from
atomic.events because they are very difficult to query, while also taking up significant disk space. Looking to the longer-term, this separation is a key first step in our eventual migration of the Snowplow enriched event format from a TSV/JSON hybrid to Apache Avro.
As part of this change, the truncation logic used to ensure that each field of the TSV is small enough to fit into the corresponding column in Postgres has been moved from Scala Common Enrich to Scala Hadoop Shred. As a direct result, the JSONs stored in the
derived_contexts columns can now be arbitrarily long.
3. Improved Hadoop job performance
We have sped up the Enrich and Shred jobs by caching intermediate results within HDFS using
forceToDisk. This prevents events from being processed twice (once for the enriched
events path and once for the validation failures path).
As well as reducing job time, this change should also significantly reduce the number of requests made to external APIs.
4. Better NAT traversal for the StorageLoader
If you have attempted to load Postgres or Redshift from an instance of StorageLoader running behind a NAT (e.g. in a private subnet), you may well have seen the
COPY transaction time out.
We have now updated the StorageLoader’s JDBC connection to use
tcpKeepAlive=true for long-running COPYs via NAT.
Upgrading EmrEtlRunner and StorageLoader
The latest version of the EmrEtlRunner and StorageLoader are available from our Bintray here.
You will need to update the jar versions in the “emr” section of your configuration YAML:
In order to start loading bad rows from the EMR jobflow into Elasticsearch, you will need to add an Elasticsearch target to the “targets” section of your configuration YAML as described above.
If you are using Postgres rather than Redshift, you should no longer pass the
--skip shred option to EmrEtlRunner. This is because the shred step now removes JSON fields from the enriched event TSV.
Updating your database
Use the appropriate migration script to update your version of the
atomic.events table to the latest schema:
If you are upgrading to this release from an older version of Snowplow, we also provide migration scripts to
atomic.events version 0.8.0 from 0.4.0, 0.5.0 and 0.6.0 versions.
Warning: these migration scripts will alter your
atomic.events table in-place, deleting the
derived_contexts columns. We recommend that you make a full backup before running these scripts.
6. Getting help
For more details on this release, please check out the R73 Cuban Macaw release notes on GitHub.