We are pleased to announce the release of Snowplow 88 Angkor Wat. This release introduces event de-duplication across different pipeline runs, powered by DynamoDB, along with an important refactoring of the batch pipeline configuration.
Read on for more information on R88 Angkor Wat, named after the largest religious monument in the world:
1. New storage targets configuration
Historically storage targets for the Snowplow batch pipeline have been configured from a shared set of properties in the EmrEtlRunner and StorageLoader
config.yml YAML file.
Using the same YAML properties to configure very different databases, such as Redshift and Elasticsearch, has been difficult and error-prone, especially for new Snowplow users. Continuing to overload these YAML properties as we add additional databases such as Google BigQuery, Snowflake and Azure SQL Data Warehouse is unsustainable.
As of this release, storage targets for the Snowplow batch pipeline are configured through database-specific self-describing JSONs - the same way that our enrichments are configured. This should reduce the scope for errors - not least because Snowplow will validate that these configuration JSONs are correct and complete.
This change means that the older
config.yml format is no longer valid; both EmrEtlRunner and StorageLoader now need to accept
--targets option specifying directory with storage configuration JSONs, plus the
--resolver option to validate these JSONs.
2. Cross-batch natural deduplication
2.1 The deduplication story so far
As a first step to solving the problem, in R76 Changeable Hawk Eagle we implemented in-batch natural deduplication, which removed duplicates originating due to at-least-once delivery semantics in Snowplow pipeline. Next, in R86 Petra, we introduced synthetic in-batch deduplication which one again drastically reduced amount of duplicates in our users’ clusters and completely removed them from each particular load, but left them across separate loads.
2.2 Cross-batch deduplication using DynamoDB
Today we’re going further and introducing new cross-batch deduplication that works with natural dupes across many loads, eliminating the duplicates problem for many users.
To solve this problem across pipeline runs we’re using Amazon DynamoDB storage, which allows us to keep track of which events we have processed across multiple runs; essentially we maintain an “event manifest” in DynamoDB with just some important information about each event:
- Event id - used to identify event
- Event fingerprint - used in conjunction with event id to identify natural duplicates
- ETL timestamp - used to check if previous Hadoop Shred was aborted and event is being reprocessed
- Time to live - timestamp allowing DynamoDB automatically clean-up stale objects (set to
etl_tstampplus 180 days)
The mechanics of the manifest set-and-check are relatively simple - you can find more technical information on the dedicated wiki page.
It’s important to note that, unlike previous in-batch deduplication logic, this new functionality needs to be explicitly enabled.
2.3 How to enable the new deduplication process
To start deduplicating events across batches you need to provide EmrEtlRunner a duplicate storage configuration via the new
Here is an example configuration:
If you don’t add duplicate storage configuration, then your Hadoop Shred job will continue work as before. Cross-batch deduplication is completely optional - you may decide that the cost-benefit calculus for enabling cross-batch duplication is not right for you.
2.4 Cost impact of running this process
This process introduces two additional costs to running your Snowplow batch pipeline:
- Increased EMR jobflow times, which has associated financial costs in terms of Normalized Instance Hours
- Additional AWS costs associated with the DynamoDB table used for the event manifest
The EMR jobflow time increases because Hadoop Shred job processes many events in parallel, while DynamoDB relies on a mechanism called provisioned throughput to cap reads and writes. Provisioned throughput throttles the jobflow when DynamoDB writes reach the specified capacity units. Throttling means that, no matter how powerful your EMR cluster, the job will proceed only as fast as the throttled writes can make it through to the DynamoDB table.
The default write capacity we use for storing the event manifest is 100 units, which roughly costs 50 USD per month. If this slows down your job considerably, then you can increase the DynamoDB write capacity to e.g. 500 units, but this increases DynamoDB costs roughly to 250 USD.
There’s no golden rule for calculating write capacity and cluster configuration - you should experiment with different options to find the best cost-performance profile for your event volumes. The DynamoDB monitoring UI in AWS is helpful here because it shows you the level of throttling on your DynamoDB writes.
2.5 Solving the "cold start" problem
The new cross-batch deduplication is powerful, but how do you handle the “cold start” problem where the event manifest table in DynamoDB starts off empty?
To help, we have developed a new Event Manifest Populator Spark job, which lets you pre-load the DynamoDB table from your enriched event archive.
Event Manifest Populator can be started on EMR with a PyInvoke script provided by us. To run it, you’ll need to download script itself and install
Last step is to run the actual job:
run_emr task sent to PyInvoke takes three positional arguments:
$ENRICHED_ARCHIVE_S3_PATHis the path to the enriched events archive in S3, as found at
$STORAGE_CONFIG_PATHis the duplicate storage configuration JSON
$IGLU_RESOLVER_PATHis your Iglu resolver JSON configuration
You can also add one extra argument:
--since, which specifies timespan of events you want to load to duplicate storage. Date is specified with
Note, that Event Manifest Populator works only with events produced with Snowplow versions equal or higher than R73 Cuban Macaw as event’s TSV format was changed.
You can find more about usage of Event Manifest Populator and its interface at its dedicated wiki page.
2.6 What's coming next for deduplication
Firstly, in an upcoming release we will release a Python script to generate SQL allowing you to clean out all historic event duplicates, which were loaded into your Redshift cluster by earlier, pre-deduplication versions of Hadoop Shred.
Looking further ahead, we are interested in extracting our DynamoDB-powered deduplication logic into a standalone library, so that this can be used with the loaders for other storage targets, such as S3/Parquet/Avro or Snowflake. our real-time pipeline, where at present moment all natural duplicates are being simply erased.
3.1 Upgrading EmrEtlRunner and StorageLoader
The latest version of the EmrEtlRunner and StorageLoader are available from our Bintray here.
3.2 Creating new targets configuration
Storage targets configuration JSONs can be generated from your existing
config.yml, using the
3-enrich/emr-etl-runner/config/convert_targets.rb script. These files should be stored in a folder, for example called
targets, alongside your existing
When complete, your folder layout will look something like this:
3.3 Updating config.yml
- Remove whole
storage.download.folder) from your
- Update the
hadoop_shredjob version in your configuration YAML like so:
3.4 Update EmrEtlRunner and StorageLoader scripts
- Append the option
--targets $TARGETS_DIRto both
- Append the option
snowplow-storage-loaderapplication. This is required to validate the storage target configurations
3.5 Enabling cross-batch deduplication
Please be aware that enabling this will have a potentially high cost and performance impact on your Snowplow batch pipeline.
If you want to start to deuplicate events across batches you need to add a new [dynamodb_config target][duplicate_storage_config] to your newly created
Optionally, before first run of Shred job with cross-batch deduplication, you may want to run Event Manifest Populator to back-fill the DynamoDB table.
When Hadoop Shred runs, if the table doesn’t exist then it will be automatically created with provisioned throughput by default set to 100 write capacity units and 100 read capacity units and the required schema to store and deduplicate events.
For relatively low (1m events per run) cases, the default settings will likely “just work”. However, we do strongly recommend monitoring the EMR job, and its AWS billing impact, closely and tweaking DynamoDB provisioned throughput and your EMR cluster specification accordingly.
Upcoming Snowplow releases include:
- R89 Plain of Jars, which will port our Hadoop Enrich and Hadoop Shred jobs from Scalding to Apache Spark
- R9x [HAD] 4 webhooks, which will add support for 4 new webhooks (Mailgun, Olark, Unbounce, StatusGator)
- R9x [HAD] GCP support pt. 1, the first phase of our support for running Snowplow real-time on Google Cloud Platform
- R9x [HAD] EmrEtlRunner robustness, continuing our work making EmrEtlRunner more reliable and modular
- R9x [HAD] StorageLoader reboot, which will port our StorageLoader app to Scala
5. Getting help
For more details on this release, as always please check out the release notes on GitHub.