Kinesis Tee 0.1.0 released for Kinesis stream filtering and transformation


We are pleased to announce the release of version 0.1.0 of Kinesis Tee.

Kinesis Tee is like Unix tee, but for Kinesis streams. You can use it to:

  • Write a Kinesis stream to another Kinesis stream (in the same region, or a different AWS account/region)
  • Transform the format of a Kinesis stream
  • Filter records from a Kinesis stream based on JavaScript rules

In the rest of this post we will cover:

  1. Introducing Kinesis Tee
  2. Example: mirroring a Kinesis stream to another account
  3. Example: converting Snowplow enriched events to nested JSON
  4. Example: filtering records in real time
  5. Documentation
  6. Roadmap
  7. Getting help

1. Introducing Kinesis Tee

The core purpose of Kinesis Tee is to connect two or more Kinesis streams together. These streams can be in different regions, or even located in different AWS billable accounts and regions.

Kinesis Tee is an AWS Lambda function that is triggered when events are received in the Kinesis source stream.

When traffic in your Kinesis source stream triggers Kinesis Tee, the Lambda function looks up a configuration file in DynamoDB (See: Configuration) and uses it to determine the action to take. This configuration is a self-describing Avro containing:

  1. A single sink stream to write records to
  2. An optional stream transformer to convert the records to another supported format
  3. An optional steam filter to determine whether to write the records to the sink stream

The stream transformer section of the configuration gives you the ability to modify the records as they are passed from the source stream to the sink stream. Currently, only converting Snowplow Enriched Events (TSV format) to (nested) JSON is supported. This is done using the Snowplow Scala Analytics SDK.

The stream filter section of the configuration lets you discard records from the source stream, ensuring they are not published to the sink stream. This is controllable by you – using a JavaScript function. See more in Filtering.

Let’s go through some brief examples showing the power of Kinesis Tee.

2. Example: mirroring a Kinesis stream to another account

A common use for Kinesis Tee is mirroring a Kinesis stream to another AWS account, perhaps in another region.

In order to use Kinesis Tee as a pass-through (no filter/record changes) to another account, the following configuration can be used:

{ "schema": "iglu:com.snowplowanalytics.kinesistee.config/Configuration/avro/1-0-0", "data": { "name": "My Kinesis Tee example", "targetStream": { "name": "my-target-stream", "targetAccount": { "com.snowplowanalytics.kinesistee.config.TargetAccount": { "awsAccessKey": "<<ADD HERE>>", "awsSecretAccessKey": "<<ADD HERE>>", "region": "eu-west-1" } } }, "transformer": null, "filter": null } }

Replace <<ADD HERE>> in the above example with your AWS credentials.

3. Example: converting Snowplow enriched events to nested JSON

If you are a Snowplow real-time pipeline user, you may well want to plug your Kinesis stream of Snowplow enriched events into other tools such as Kinesis Analytics.

Built in to Kinesis Tee is a “Snowplow to nested JSON” transformer. This converts Snowplow enriched events (TSV) into (nested) JSON using the Snowplow Scala Analytics SDK. Here’s an example:

{ "schema": "iglu:com.snowplowanalytics.kinesistee.config/Configuration/avro/1-0-0", "data": { "name": "My Kinesis Tee example", "targetStream": { "name": "my-target-stream", "targetAccount": { null } }, "transformer":{ "com.snowplowanalytics.kinesistee.config.Transformer":{ "builtIn":"SNOWPLOW_TO_NESTED_JSON" } }, "filter": null } }

Currently only the SNOWPLOW_TO_NESTED_JSON transformation, or null for no transformations, are supported; however we are planning to add support for arbitrary JavaScript-powered transformations in the future.

4. Example: filtering records in real time

Imagine we have a Kinesis stream consisting of JSON monitoring events emitted by all the machines on a factory floor. The events look like this:

{ "machineId": "4fd3249e", "temp": 56.7, "lastTemp": 35.8 }

We want to pass on events indicating that a machine is overheating rapidly to a dedicated Kinesis stream:

{ "schema":"iglu:com.snowplowanalytics.kinesistee.config/Configuration/avro/1-0-0", "data":{ "name":"My Kinesis Tee example", "targetStream":{ "name":"my-target-stream", "targetAccount":{ null } }, "transformer":{ null }, "filter":{ "com.snowplowanalytics.kinesistee.config.Filter":{ "javascript":"ZnVuY3Rpb24gZmlsdGVyKHJlY29yZCkgew0KICAgIHJldHVybiAocmVjb3JkLnRlbXAgLSByZWNvcmQubGFzdFRlbXAgPiAxMCk7DQp9" } } } }

The javascript field in the above configuration contains a Base64 encoded JavaScript function. This function must be called filter, and in this case decodes to the following:

function filter(record) { return (record.temp - record.lastTemp > 10<span class="p">); }

This filter will only send a record from the Kinesis source stream into the sink stream if:

  • The record is parseable as JSON
  • The record contains numeric temp and lastTemp fields
  • The difference between temp and lastTemp is greater than 10 degrees

All other records will be silently discarded by Kinesis Tee.

5. Documentation

Kinesis Tee uses Snowplow community member Jorge Bastida’s excellent Gordon project for deployment. Find out more on setting up Kinesis Tee, plus more configuration examples, in:

If you want to understand the architecture of Kinesis Tee, perhaps with a view to contributing to the codebase, check out this guide:

6. Roadmap

We see Kinesis Tee as a fundamental building block for assembling asynchronous micro-service architectures on top of Kinesis. We have big plans for Kinesis Tee, including but not limited to:

  • Allowing users to write arbitrary JavaScript transformation functions (similar to the existing filter functions) (issue #9)
  • Adding daisychaining of transformation and filtering steps – allowing for multiple transformations, and filters based on each transformation (issue #18)
  • Adding routing, so that a given event can end up in one or more target streams (issue #8)
  • Adding better error handling, so that processing failures can be properly captured in a “bad stream” (issue #11)

7. Getting help

We hope that you find Kinesis Tee useful – of course, this is only its first release, so don’t be afraid to get in touch or raise an issue on GitHub!