In addition to enabling real-time loading of data into Elasticsearch, we have made a number of other improvements to the real-time flow:
This release also makes some improvements to Snowplow Common Enrich and Hadoop Enrich which should be invaluable for users of our batch-based event pipeline. Sections below the fold are as follows:
We are hugely excited to be able to load Snowplow data, in real-time, into Elasticsearch. Elasticsearch is a very powerful database for enabling real-time reporting and dashboarding - we’ve started to explore and visualize Snowplow data in it using Kibana and have been impressed with the results.
We plan to blog in more detail on performing analyses and building visualization in Elasticsearch and Kibana in the near future.
Under the hood, the new Snowplow Elasticsearch Sink reads events from a Kinesis stream, transforms them into JSON, and writes them to an Elasticsearch cluster in real-time. It can be configured to read from either a stream of successfully enriched Snowplow events or the new bad rows stream. The sink uses the Amazon Kinesis Connector Library.
The overall architecture now looks like this:
If the sink cannot convert an event to JSON or the JSON is rejected by Elasticsearch, the failed event will be written to a Kinesis bad rows stream along with a message explaining what went wrong.
The jar is available from Snowplow Hosted Assets as snowplow-elasticsearch-sink-0.1.0.
A sample configuration file can be found in our GitHub as application.conf.example.
For more information about the Snowplow Elasticsearch Sink, see these wiki pages:
The Kinesis-based enrichment process now outputs bad rows to a separate stream. If you are using “local mode”, the separate stream will be
stderr; otherwise it will be a Kinesis stream specified in the configuration file.
Bad rows are converted to JSONs with a “line” field and an “errors” field. The “line” field contains the incoming serialized Thrift byte array which failed enrichment, Base64-encoded. The “errors” field is an array of error messages explaining what was wrong with the input.
Scala Kinesis Enrich now uses the latest version of Scala Common Enrich, the library shared by Scala Hadoop Enrich and Scala Kinesis Enrich. This means that it supports configurable enrichments. You can use the -enrichments command line option to pass a directory of enrichment configuration JSONs like this:
The enrichments directory replaces the “anon_ip” and “geo_ip” fields in the config file. Instead, create anon_ip.json and ip_lookups.json configuration JSONs in the enrichments directory.
Sensible defaults for all available enrichments can be found here.
The Scala Kinesis Enrich HOCON configuration file now requires a “resolver” field which is used to configure the Iglu Resolver used to validate the enrichment configuration JSONs.
As part of this update, we have also fixed the bug whereby Kinesis Enrich opened a MaxMind database file every time an event is enriched (!).
We are hugely indebted to community member Phil Kallos (@pkallos) who contributed several key improvements to the Kinesis flow:
CredentialsProvider.InstanceProfileobject by setting the access-key and secret-key configuration fields to “iam”
And there are more pull requests from Phil to be merged into future Kinesis releases too. Big thanks Phil!
In addition to Phil’s contribution on AWS credentials, this release adds another way to configure the AWS credentials required to use Amazon Kinesis. If you replace the access-key and secret-key values in the HOCON configuration with “env”, they will be set from the environment variables “AWS_ACCESS_KEY_ID” and “AWS_SECRET_ACCESS_KEY”.
This is useful if you want to keep your credentials out of GitHub.
Huge thanks to Sam Mason (@sambo1972) who contributed the ability to configure the Kinesis endpoint. In the “streams” section of the configuration HOCON, add the intended endpoint like so:
The same goes for the “stream” section of the Scala Stream Collector’s HOCON configuration file.
Community member Yuval Herziger (@yuvalherziger) noticed that version 0.1.0 of the Scala Stream Collector only accepted requests of up to 2048 characters. He discovered how to override this restriction by configuring spray-can (the HTTP server which the collector uses) and his fix has been incorporated into the default configuration files. Thanks Yuval!
Version 0.9.0 of Scala Common Enrich allows you extract the
network_userid field from the querystring of a GET request or body of a POST request rather than using a header.
If you have access to the network user ID cookie set by the Clojure Collector, you can now add it to events tracked server-side to match up server-side and client-side events generated by the same user.
Big thanks to Rupesh Mane (@rupeshmane), who used the Net-a-Porter URI parsing library to make the enrichment process more forgiving of non-compliant URIs. Many URIs which would previously fail with an error, for example due to containing illegal characters such as
|, can now be parsed.
This should significantly reduce the number of events which end up in the bad rows bucket due to malformed page or referer URIs.
There are several changes you need to make to move to the new versions of the Scala Stream Collector and Scala Kinesis Enrich:
New templates for the two configuration files can be found on GitHub (you will need to edit the AWS credentials and the stream names):
And a sample enrichment directory containing sensible configuration JSONs can be found here.
This release bumps the Hadoop Enrichment process to version 0.10.0.
In your EmrEtlRunner’s
config.yml file, update your Hadoop enrich job’s version to 0.10.0, like so:
For a complete example, see our sample
We have plenty more planned for the Kinesis event pipeline! You can find the next milestones here:
Beyond these releases, our further plans for the Kinesis flow include:
We have been delighted by the quality and breadth of the community contributions to the Kinesis pipeline so far - so if any of our roadmap for real-time captures your imagination, get involved!
Documentation for the Kinesis flow is available on the wiki. If you want help getting set up please talk to us. This is still only our second release on the Kinesis flow, so if do you find a bug, raise an issue!