We are pleased to announce the immediate availability of Snowplow 78 Great Hornbill! This release brings our Kinesis pipeline functionally up-to-date with our Hadoop pipeline, and makes various further improvements to the Kinesis pipeline.
- Access to the latest Common Enrich version
- Click redirect mode
- Configurable cookie name
- Randomized partition keys
- Kinesis Elasticsearch Sink: increased flexibility
- New format for bad rows
- Kinesis Client Library upgrade
- Renaming Scala Kinesis Enrich to Stream Enrich
- Other improvements
- Getting help
1. Access to the latest Common Enrich version
Both Stream Enrich (the new name for Kinesis Enrich) and Scala Hadoop Enrich (for the batch pipeline) use our shared Common Enrich library for the core event enrichment logic. In this release, we have upgraded Stream Enrich from version 0.15.0 of Common Enrich to version 0.22.0.
This is a huge leap forward for the Kinesis pipeline, making available numerous powerful new features from the last 10 batch pipeline releases, including:
- Validation of unstructured events and custom contexts
- The Cookie Extractor Enrichment
- The Weather Enrichment
- SendGrid webhooks
2. Click redirect mode
www.example.com. To track clicks on this link, change it to point to your collector, and give put the original link target in the querystring (having first URL-encoded it.) The new URL looks like this:
When a user clicks the link, the collector will redirect them to
www.example.com and send a URI redirect event to Kinesis. You can add additional parameters to the event simply by adding them to the querystring in accordance with the Snowplow Tracker Protocol. For instance, to add information that the browser language is American English, change the link URL to:
3. Configurable cookie name
Thanks to the work of Kacper Bielecki, you can now configure the name of the cookie set by the Scala Stream Collector. You should add the field
name = mycookiename to the
collector.cookie section of the configuration.
For compatibility with cookies set by previous versions of the collector, set the name to “sp”. Thanks Kacper!
4. Randomized partition keys
The Scala Stream Collector and Stream Enrich have historically used the IP address of incoming events as the Kinesis partition key. This meant that any two events originating from the same IP address would end up in the same shard and would probably be processed in the same order.
For some applications the link between user and shard and the approximate preservation of order would be important, but the Snowplow real-time pipeline never uses it itself. It also has a disadvantage: if the collector is flooded by requests from a single IP address, the events will all end up in the same shard. This would slow down consumers processing the stream no matter how many shards the stream has.
For these reasons we have started generating the partition keys for events randomly. It is possible to retain the old behavior: just add a boolean field
useIpAddressAsPartitionKey set to
true to the
collector.sink.kinesis section of your Scala Stream Collector configuration, and add the same field to the
enriched.streams.out section of your Stream Enrich configuration.
5. Kinesis Elasticsearch Sink: increased flexibility
The Kinesis Elasticsearch Sink formerly supported three modes:
- Read from
stdin, write good events to
stdoutand bad events to
- Read from Kinesis, write good events to
stdoutand bad events to
- Read from Kinesis, write good events to Elasticsearch and bad events to Kinesis
We have made this much more flexible: it is now possible to read from
stdin or Kinesis, write good events to
stdout or Elasticsearch, and write bad events to Kinesis or
stderr, in any combination. You can also silently drop bad events if you prefer.
To preserve existing behavior, you will have to change the “sink” setting in your configuration file from this:
or from this:
6. New format for bad rows
We have updated the format of the bad rows emitted by Scala Stream Collector, Stream Enrich, and Kinesis Elasticsearch Sink: the “errors” field is no longer an array of strings, but instead an array of objects where each object contains both the error message and the level of the error.
An example of a bad row in the new format, generated by feeding Stream Enrich with a malformed Thrift object:
If you are loading Snowplow bad rows into for example Elasticsearch, please make sure to update all applications which can emit bad rows.
7. Kinesis Client Library upgrade
Stream Enrich and Kinesis Elasticsearch Sink use the Kinesis Client Library to consume data from Kinesis. We have upgraded to the latest version (1.6.1) of the library, which has important improvements:
- It doesn’t silently swallow exceptions
- It uploads the useful “MillisBehindLatest” metric to Amazon CloudWatch. This is helpful when determining whether an application consuming a stream is falling behind
We have also configured the Kinesis Client Library to upload monitoring information about Stream Enrich to CloudWatch – this feature was previously disabled.
8. Renaming Scala Kinesis Enrich to Stream Enrich
Scala Kinesis Enrich isn’t actually limited to using Kinesis: it can also read from
stdin and write to
stdout. In the future we plan on integrating Apache Kafka into this component. Since Scala Kinesis Enrich will actually support multiple different types of stream, we have renamed it to Stream Enrich.
9. Other improvements
We have also:
- Specified the UTF-8 character encoding for all string serialization and deserialization operations (#1963)
- Combined the two thread pools which the Scala Stream Collector used to write to the good stream and the bad stream into a single thread pool (#2369)
- Reduced the complexity of the algorithm used by Kinesis Elasticsearch Sink to convert enriched event TSVs to JSON (#1847)
- Fixed an error causing Kinesis Elasticsearch Sink’s internal monitoring to use a timestamp for the
- Made the regular expression for schemas used by Kinesis Elasticsearch Sink more permissive, allowing vendors with hyphens (#1998)
The Kinesis apps for R78 Great Hornbill are now all available in a single zip file here:
Upgrading will require the following configuration changes to the applications’ individual HOCON configuration files:
Scala Stream Collector
collector.cookie.name field to the HOCON and set its value to
Also note that the configuration file no longer supports loading AWS credentials from the classpath using ClasspathPropertiesFileCredentialsProvider. If your configuration looks like this:
then you should change “cpf” to “default” to use the DefaultAWSCredentialsProviderChain. You will need to ensure that your credentials are available in one of the places the AWS Java SDK looks. For more information about this, see the Javadoc.
Kinesis Elasticsearch Sink
sink.kinesis.out string with an object with two fields:
Additionally, move the
stream-type setting from the
sink.kinesis.in section to the
If you are loading Snowplow bad rows into for example Elasticsearch, please make sure to update all applications.
You can see an example of the correct configuration file layout here.
11. Getting help
For more details on this release, please check out the release notes on GitHub.