Snowplow 0.9.12 released with real-time loading of data into Elasticsearch beta

26 November 2014  •  Fred Blundun

Back in February, we introduced initial support for real-time event analytics using Amazon Kinesis. We are excited to announce the release of Snowplow 0.9.12 which significantly improves and extends our Kinesis support. The major new feature is our all new Kinesis Elasticsearch Sink, which streams event data from Kinesis into Elasticsearch in real-time. The data is then available to power real-time dashboards and analysis (e.g. using Kibana).


In addition to enabling real-time loading of data into Elasticsearch, we have made a number of other improvements to the real-time flow:

  1. Bad rows of data are now loaded into a dedicated bad rows stream in Kinesis
  2. The real-time flow now runs the latest version of Scala Common Enrich, making it possible to employ the same configurable enrichments in the real-time flow that are currently available in the batch flow.

This release also makes some improvements to Snowplow Common Enrich and Hadoop Enrich which should be invaluable for users of our batch-based event pipeline. Sections below the fold are as follows:

  1. Snowplow Elasticsearch Sink
  2. Bad rows stream
  3. Support for the latest version of Scala Common Enrich
  4. Phil Kallos’ contributions
  5. Configuring AWS credentials
  6. Configurable Kinesis endpoint
  7. HTTP request character limit override
  8. For Hadoop Enrich users: support for tnuid
  9. For Hadoop Enrich users: more relaxed URI parsing
  10. Upgrading
  11. Roadmap and contributing
  12. Getting help

1. Snowplow Elasticsearch Sink

We are hugely excited to be able to load Snowplow data, in real-time, into Elasticsearch. Elasticsearch is a very powerful database for enabling real-time reporting and dashboarding - we’ve started to explore and visualize Snowplow data in it using Kibana and have been impressed with the results.


We plan to blog in more detail on performing analyses and building visualization in Elasticsearch and Kibana in the near future.

Under the hood, the new Snowplow Elasticsearch Sink reads events from a Kinesis stream, transforms them into JSON, and writes them to an Elasticsearch cluster in real-time. It can be configured to read from either a stream of successfully enriched Snowplow events or the new bad rows stream. The sink uses the Amazon Kinesis Connector Library.

The overall architecture now looks like this:


If the sink cannot convert an event to JSON or the JSON is rejected by Elasticsearch, the failed event will be written to a Kinesis bad rows stream along with a message explaining what went wrong.

The jar is available from Snowplow Hosted Assets as snowplow-elasticsearch-sink-0.1.0.

A sample configuration file can be found in our GitHub as application.conf.example.

For more information about the Snowplow Elasticsearch Sink, see these wiki pages:

2. Bad rows stream

The Kinesis-based enrichment process now outputs bad rows to a separate stream. If you are using “local mode”, the separate stream will be stderr; otherwise it will be a Kinesis stream specified in the configuration file.

Bad rows are converted to JSONs with a “line” field and an “errors” field. The “line” field contains the incoming serialized Thrift byte array which failed enrichment, Base64-encoded. The “errors” field is an array of error messages explaining what was wrong with the input.

3. Support for the latest version of Scala Common Enrich

Scala Kinesis Enrich now uses the latest version of Scala Common Enrich, the library shared by Scala Hadoop Enrich and Scala Kinesis Enrich. This means that it supports configurable enrichments. You can use the -enrichments command line option to pass a directory of enrichment configuration JSONs like this:

$ ./scala-kinesis-enrich-0.2.0 -config my.conf -enrichments path/to/enrichment-directory

The enrichments directory replaces the “anon_ip” and “geo_ip” fields in the config file. Instead, create anon_ip.json and ip_lookups.json configuration JSONs in the enrichments directory.

Sensible defaults for all available enrichments can be found here.

The Scala Kinesis Enrich HOCON configuration file now requires a “resolver” field which is used to configure the Iglu Resolver used to validate the enrichment configuration JSONs.

As part of this update, we have also fixed the bug whereby Kinesis Enrich opened a MaxMind database file every time an event is enriched (!).

4. Phil Kallos' contributions

We are hugely indebted to community member Phil Kallos (@pkallos) who contributed several key improvements to the Kinesis flow:

  • Improved performance for the Scala Stream Collector through concurrency
  • The ability to run the enrichment process without needing permission for the kinesis:ListStreams action
  • The ability to configure the AWS access key and secret key from Scalazon’s CredentialsProvider.InstanceProfile object by setting the access-key and secret-key configuration fields to “iam”

And there are more pull requests from Phil to be merged into future Kinesis releases too. Big thanks Phil!

5. Configuring AWS credentials

In addition to Phil’s contribution on AWS credentials, this release adds another way to configure the AWS credentials required to use Amazon Kinesis. If you replace the access-key and secret-key values in the HOCON configuration with “env”, they will be set from the environment variables “AWS_ACCESS_KEY_ID” and “AWS_SECRET_ACCESS_KEY”.

This is useful if you want to keep your credentials out of GitHub.

6. Configurable Kinesis endpoint

Huge thanks to Sam Mason (@sambo1972) who contributed the ability to configure the Kinesis endpoint. In the “streams” section of the configuration HOCON, add the intended endpoint like so:

streams {
	region: "ap-southeast-2"

The same goes for the “stream” section of the Scala Stream Collector’s HOCON configuration file.

7. HTTP request character limit override

Community member Yuval Herziger (@yuvalherziger) noticed that version 0.1.0 of the Scala Stream Collector only accepted requests of up to 2048 characters. He discovered how to override this restriction by configuring spray-can (the HTTP server which the collector uses) and his fix has been incorporated into the default configuration files. Thanks Yuval!

8. For Hadoop Enrich users: support for tnuid

Version 0.9.0 of Scala Common Enrich allows you extract the network_userid field from the querystring of a GET request or body of a POST request rather than using a header.

If you have access to the network user ID cookie set by the Clojure Collector, you can now add it to events tracked server-side to match up server-side and client-side events generated by the same user.

9. For Hadoop Enrich users: more relaxed URI parsing

Big thanks to Rupesh Mane (@rupeshmane), who used the Net-a-Porter URI parsing library to make the enrichment process more forgiving of non-compliant URIs. Many URIs which would previously fail with an error, for example due to containing illegal characters such as |, can now be parsed.

This should significantly reduce the number of events which end up in the bad rows bucket due to malformed page or referer URIs.

10. Upgrading

There are several changes you need to make to move to the new versions of the Scala Stream Collector and Scala Kinesis Enrich:

  • You must provide a “region” field (with a value like “us-east-1”) in the configuration files
  • You must provide a “resolver” field in the Scala Kinesis Enrich containing the data used to configure the Iglu resolver
  • If you run Scala Kinesis Enrich without the -enrichments option, the IP anonymization enrichment and the IP address lookup enrichment will not run automatically

New templates for the two configuration files can be found on GitHub (you will need to edit the AWS credentials and the stream names):

And a sample enrichment directory containing sensible configuration JSONs can be found here.

This release bumps the Hadoop Enrichment process to version 0.10.0.

In your EmrEtlRunner’s config.yml file, update your Hadoop enrich job’s version to 0.10.0, like so:

    :hadoop_enrich: 0.10.0 # WAS 0.9.0

For a complete example, see our sample config.yml template.

11. Roadmap and contributing

We have plenty more planned for the Kinesis event pipeline! You can find the next milestones here:

Beyond these releases, our further plans for the Kinesis flow include:

  • Analytics-on-write leveraging the new Amazon Kinesis Aggregators framework
  • Real-time shredding of events into Redshift and other columnar databases
  • Support for other storage types including timeseries and in-memory grids
  • In-stream decisioning, alerting and response loops

We have been delighted by the quality and breadth of the community contributions to the Kinesis pipeline so far - so if any of our roadmap for real-time captures your imagination, get involved!

12. Getting help

Documentation for the Kinesis flow is available on the wiki. If you want help getting set up please talk to us. This is still only our second release on the Kinesis flow, so if do you find a bug, raise an issue!