In this blog post we will discuss the small files problem in terms of our experiences with it at Snowplow. And we will argue that dealing with the small files problem - if you have it - is the single most important optimisation you can perform on your MapReduce process.
To give some necessary background on our architecture: Snowplow event trackers send user events to a pixel hosted on CloudFront, which logs those raw events to Amazon S3. Amazon’s CloudFront logging generates many small log files in S3: a relatively low-traffic e-commerce site using Snowplow generated 26,372 CloudFront log files over a six month period, containing 83,110 events - that’s just 3.2 events per log file.
Once the events have been collected in S3, Snowplow’s Hadoop job (written in Scalding) processes them, validating them and then enriching them with referer, geo-location and similar data; these enriched events are then written back to S3.
So you can see how our Enrichment process ran pretty directly into Hadoop’s small files problem. But quantifying the impact of small files on our job’s performance was impossible until we had a solution in place…
This week we implemented a solution to aggregate our tiny CloudFront logs into more sensibly sized input files - this enhancement will be released as part of Snowplow 0.8.6 shortly.
In testing this code and running before- and after- performance tests, we realised just how badly the small file problem was slowing down our Enrichment process. This screenshot shows you what we found:
That’s right - aggregating with the small files first reduced total processing time from 2 hours 57 minutes to just 9 minutes - of which 3 minutes was the aggregation, and 4 minutes was running our actual Enrichment process. That’s a speedup of 1,867%.
To make the comparison as helpful as possible, here is the exact specification of the before- and after- test. We have also added a second after- run with a more realistic cluster size.
|Metric||Before (with small files)||After (with small files aggregated)||After (with smaller cluster)|
|Source log files||26,372||26,372||26,372|
|Files read by job||Source log files||Aggregated log files||Aggregated log files|
|Location of files||Amazon S3||HDFS on Core instances||HDFS on Core instances|
|part- files out||23,618||141||141|
|Cluster||1 x m1.large, 18 x m1.medium||1 x m1.large, 18 x m1.medium||1 x m1.small, 1 x m1.small|
|Execution time||177 minutes||9 minutes||39 minutes|
|Aggregate step time||-||3 minutes||11 minutes|
|ETL step time||166 minutes||4 minutes||25 minutes|
|Norm. instance hours||120||40||2|
Health warning: this is one single benchmark, measuring the performance of the Snowplow Hadoop job using a single data set. We encourage you to run your own benchmarks.
This is an astonishing speed-up, which shows how badly the small files problem was impacting our Hadoop job. And aggregating the small files had another beneficial effect: the much smaller number of
part- output files meant much faster loading of events into Redshift.
So how did we fix the small files problem for Snowplow? In the next section we will discuss possible solutions for you to consider, and in the last section we will go into some more detail on the solution we chose.
As we did our background research into solutions to the small files problem, three main schools of thought emerged:
For us, option 1 was out of the question, as we have no control over how CloudFront writes its log files.
Option 2 was interesting - and we have had Snowplow users such as 99designs successfully adopt this approach; if you are interested in exploring this further, Lars Yencken from 99designs has shared a CloudFront log aggregation script in Python as a gist. However, overall option 2 seemed to us to introduce more complexity - with a new long-running process to run, and potentially fragility - with a manifest file now to maintain. We had super-interesting discussions with the Snowplow community about this in this Google Groups thread and this GitHub issue.
In the end, though, we opted for option 3, for a few reasons:
With that decided, we then looked for options to aggregate and compact small files on Hadoop, identifying three possible solutions:
--groupByoption for aggregating files (which the original DistCp seems to lack)
After trying to work with filecrush and Consolidator, ultimately we went with S3DistCp for Snowplow. In the next section, we will look at exactly how we set it up.
Once we had chosen S3DistCp, we had to update our ETL process to include it in our jobflow. Luckily, the S3DistCp documentation has an example on aggregating CloudFront files:
./elastic-mapreduce --jobflow j-3GY8JC4179IOK --jar \ /home/hadoop/lib/emr-s3distcp-1.0.jar \ --args '--src,s3://myawsbucket/cf,\ --dest,hdfs:///local,\ --groupBy,.*XABCD12345678.([0-9]+-[0-9]+-[0-9]+-[0-9]+).*,\ --targetSize,128,\ --outputCodec,lzo,--deleteOnSuccess'
Note that as well as aggregating the small files into 128 megabyte files, this step also changes the encoding to LZO. As the Amazon documentation explains it:
“Data compressed using LZO can be split into multiple maps as it is decompressed, so you don’t have to wait until the compression is complete, as you do with Gzip. This provides better performance when you analyze the data using Amazon EMR.”
We only needed to make a few changes to this example code for our own ETL:
XABCD12345678, so it made more sense to drop this
--deleteOnSuccessis dangerous - we don’t want to delete our source data and leave the only copy on a transient Hadoop cluster
Given the above, our updated
--groupBy regular expression was:
Now, all we needed to do was add the call to S3DistCp into our jobflow before our main ETL step. We use the excellent Elasticity Ruby library by Rob Slifka to execute our jobs, so calling S3DistCp was a matter of adding the extra step to our jobflow in Ruby:
And then we had to update our ETL job to take the
--dest of the S3DistCp step as its own input:
And that was it! If you want to see all of the code excerpted above, you can find it in the Snowplow project on GitHub. We did not have to make any changes to our main Hadoop ETL job, because Elastic MapReduce can handle LZO-compressed files invisibly to the job reading them. And no doubt the switch to LZO also contributed to the excellent performance we saw above.
So that’s everything - hopefully this post has helped to illustrate just how badly small files can slow down your Hadoop job, and what you can do about it: if you have a small file problem on Hadoop, there’s now no excuse not to fix it!