20 August 2015  •  Releases  •  Vincent Ohprecio

AWS Lambda Scala example project released

We are pleased to announce the release of our new AWS Lambda Scala Example Project!

This is a simple time series analysis stream processing job written in Scala for AWS Lambda, processing JSON events from Amazon Kinesis and writing aggregates to Amazon DynamoDB.

data flow png

AWS Lambda can help you jumpstart your own real-time event processing pipeline, without having to setup and manage clusters of server infrastructure. We will take you through the steps to get this simple analytics-on-write job setup and processing your Kinesis event stream.

Read on after the fold for:

  1. What is AWS Lambda and Kinesis?
  2. Introducing analytics-on-write
  3. Detailed setup
  4. Troubleshooting
  5. Further reading

AWS Lambda is a compute service that runs your code in response to events and automatically manages the compute resources for you, making it easy to build applications that respond quickly to new information. AWS Lambda starts running your code within milliseconds of an event such as an image upload, in-app activity, website click, or output from a connected device. You can also use AWS Lambda to create new back-end services where compute resources are automatically triggered based on custom requests.

Amazon Kinesis is a fully managed service for real-time processing of streaming data at massive scale. In this project we leverage the integration between the Kinesis and Lambda services.

This is an example of the “pull” model where AWS Lambda polls the Amazon Kinesis stream and invokes your Lambda function when it detects new data on the stream.

Our AWS Lambda reads a Kinesis stream containing events in a JSON format:

  "timestamp": "2015-06-30T12:54:43.064528",
  "eventType": "Green",
  "id": "4ec80fb1-0963-4e35-8f54-ce760499d974"

Our Scala Lambda counts the events by eventType and aggregates these counts into 1 minute buckets. The job then takes these aggregates and saves them into a table in DynamoDB:

data table png

The most complete open-source example of an analytics-on-write implementation is Ian Meyers’ amazon-kinesis-aggregators project; our example project is in turn heavily influenced by the concepts in Ian’s work. Three important concepts to understand in analytics-on-write are:

  1. Downsampling where we reduce the event’s ISO 8601 timestamp down to minute precision, so for instance “2015-06-05T12:54:43.064528” becomes “2015-06-05T12:54:00.000000”. This downsampling gives us a fast way of bucketing or aggregating events via this downsampled key
  2. Bucketing is an aggregation technique that builds buckets, where each bucket is associated with a downstampled timestamp key and an event type criterion. By the end of the aggregation process, we’ll end up with a list of buckets - each one with a countable set of events that “belong” to it
  3. Atomic Increment is useful for updating values as they change because multiple requests from your application won’t collide. If your application needs to increase a count by 100, you can just tell Amazon DynamoDB to automatically increment the count by 100 as opposed to having to get the record, increment the count, and put it back into Amazon DynamoDB

In this tutorial, we’ll walk through the process of getting up and running with Amazon Kinesis and AWS Lambda Service. You will need git, Vagrant and VirtualBox installed locally. This project is specifically configured to run in AWS region “us-east-1” to ensure all AWS services are available.

Step 1: Build the project

First clone the repo and bring up Vagrant:

host$ git clone https://github.com/snowplow/aws-lambda-scala-example-project.git
host$ cd aws-lambda-scala-example-project/
host$ vagrant up && vagrant ssh
guest$ cd /vagrant
guest$ sbt assembly

Step 2: Add AWS credentials to the Vagrant box

You’re going to need IAM-based credentials for AWS. Get your keys and type in “aws configure” in the Vagrant box (the guest). In the below, I’m also setting the region to “us-east-1” and output formaat to “json”:

$ aws configure
AWS Secret Access Key [None]: ADD_YOUR_SECRET_KEY_HERE
Default region name [None]: us-east-1
Default output format [None]: json

Step 3: Create your DynamoDB table, IAM role, and Kinesis stream

We’re going to set up a DynamoDB table, IAM role (via CloudFormation), and a Kinesis stream. We will be using Python’s inv to run all of our tasks. I’m using “my-table” as the table name. The CloudFormation stack name is “LambdaStack” and the Kinesis stream name is “my-stream”. We will kick off this tutorial with the first command to create our Kinesis event stream:

$ inv create_kinesis_stream my-stream
Kinesis Stream [my-stream] not active yet
Kinesis Stream [my-stream] not active yet
Kinesis Stream [my-stream] not active yet
Kinesis successfully created.

Now create our DynamoDB table:

$ inv create_dynamodb_table default us-east-1 my-table

Now we can create our IAM role. We will be using CloudFormation to make our new role. Using inv create_role, we can create it like so:

$ inv create_role
Creating roles
Still creating
Giving Lambda proper permissions
Created role

Step 4: Upload project jar to Amazon S3

In the very first set, we “assembled” and compiled our Scala project files into a self contained jar. SBT built our jar file and put it into target folder here: ./target/scala-2.11/aws-lambda-scala-example-project-0.1.0. With the next inv command we will create a new bucket on S3 called aws_scala_lambda_bucket. The jar file will then be uploaded under the S3 key aws-lambda-scala-example-project-0.1.0.

Be patient while the uploader copies your multi-megabyte jar file to S3 with the following task:

$ inv upload_s3
Jar uploaded to S3 bucket aws_scala_lambda_bucket

Step 5: Configure AWS Lambda service

Now that we have built the project, and uploaded the jar file to the AWS Lambda service, we need to configure the Lambda service to watch for event traffic from our AWS Kinesis stream named my-stream. This command will connect to the Lambda service and create our Lambda function called ProcessingKinesisLambdaDynamoDB. Don’t worry, we are getting close to sending events to Kinesis!

$ inv create_lambda
Creating AWS Lambda function.
    "FunctionName": "ProcessingKinesisLambdaDynamoDB",
    "CodeSize": 38042279,
    "MemorySize": 1024,
    "FunctionArn": "arn:aws:lambda:us-east-1:842349429716:function:ProcessingKinesisLambdaDynamoDB",
    "Handler": "com.snowplowanalytics.awslambda.LambdaFunction::recordHandler",
    "Role": "arn:aws:iam::842340234716:role/LambdaStack-LambdaExecRole-7G57P4M2VV5P",
    "Timeout": 60,
    "LastModified": "2015-08-13T19:39:46.730+0000",
    "Runtime": "java8",
    "Description": ""

Step 6: Associate our Kinesis stream to our Lambda

Our Lambda function processes incoming event data from our Kinesis stream. AWS Lambda polls the Amazon Kinesis stream and invokes your Lambda function when it detects new data on the stream.

If you go to the AWS Lambda console webpage and select the Monitor tab, you can see the output log information in the Amazon CloudWatch service.

We need to “connect” or “associate” our Lambda function to our Kinesis by:

$ inv configure_lambda my-stream
Configured AWS Lambda Service
Added Kinesis as event source for Lambda function

Step 7: Generate events in your Kinesis stream

The final step for testing this project is to start sending some events to our new Kinesis stream. We have created a helper method to do this - run the below and leave it running in a separate terminal:

$ inv generate_events default us-east-1 my-stream
Event sent to Kinesis: {"timestamp": "2015-06-30T12:54:43.064528", "type": "Green", "id": "4ec80fb1-0963-4e35-8f54-ce760499d974"}
Event sent to Kinesis: {"timestamp": "2015-06-30T12:54:43.757797", "type": "Red", "id": "eb84b0d1-f793-4213-8a65-2fb09eab8c5c"}
Event sent to Kinesis: {"timestamp": "2015-06-30T12:54:44.295972", "type": "Yellow", "id": "4654bdc8-86d4-44a3-9920-fee7939e2582"}

Step 8: Inspect the "my-table" aggregates in DynamoDB

Success! You can now see data being written to the table in DynamoDB. Make sure you are in the correct AWS region, then click on my-table and hit the Explore Table button:

data table png

For each Timestamp and EventType pair, we see a Count, plus some CreatedAt and UpdatedAt metadata for debugging purposes. Our bucket size is 1 minute, and we have 5 discrete event types, hence the matrix of rows that we see.

Step 9: Shut everything down

Remember to shut off:

  • Kill the Python invoke event loading script
  • Delete your LambdaStack Cloudstack
  • Delete your my-stream Kinesis stream
  • Delete your my-table DynamoDB table
  • Delete your ProcessingKinesisLambdaDynamoDB function in AWS Lambda
  • Delete your cloudwatch logs associated to the Lambda function
  • Exit your Vagrant guest
  • vagrant halt
  • vagrant destroy

This is a short list of our most frequently asked questions.

I got a credentials error

This project requires configuration of AWS credentials. Read more about AWS credentials; configure your AWS credentials using the AWS CLI like so:

$ aws configure

I found an issue with the project:

Feel free to get in touch or raise an issue on GitHub!

This example project is a direct port of our AWS Lambda Node.js Example Project, which in turn was based on our Spark Streaming Example Project. If you want to see this approach implemented in different languages or processing frameworks, definitely check those out!

All three of these example projects are based on an event processing technique called analytics-on-write. We are planning on exploring these techniques further in a new project, called Icebucket. Stay tuned for more on this!

Thoughts or questions? Come join us in our Discourse forum!
Vincent Ohprecio
Vincent was a data engineering intern at Snowplow. You can find him on GitHub and Twitter.