Reprocessing bad rows of Snowplow data using Hive, the JSON Serde and Qubole
This post is outdated. For more documentation on debugging and recovering bad rows, please visit:
- Debugging bad rows in Elasticsearch and Kibana
- Debugging bad rows in Elasticsearch using curl (without Kibana)
- Snowplow 81 release post (for recovering bad rows)
- Hadoop Event Recovery
One of the distinguishing features of the Snowplow data pipeline is the handling of “bad” data. Every row of incoming, raw data is validated. When a row fails validation, it is logged in a “bad rows” bucket on S3 alongside the error message that was generated by the failed validation. That means you can keep track of the number of rows that fail validation, and have the opportunity to update and then reprocess those bad rows. (This makes Snowplow different from traditional web analytics platforms, that simply ignore bad rows of data, and provide no insight into the volume of incoming data that ends up being ignored.)
This functionality was crucial in spotting that, in mid-August, Amazon made an undocumented update the CloudFront collector file format. This resulted in a sudden spike in the number of “bad rows” generated by Snowplow, as the
cs-uri-query field format changed from the format the Enrichment process expected. (For details of the change, see this blog post, and the links in it.) Amazon has since rolled back the update, and we have since updated Snowplow to be able to process rows in both formats. However, Snowplow users will have three weeks of data with lines of data missing, that ideally need to be reprocessed using the updated Snowplow version.
In this blog post, we will walk through:
- How to use Apache Hive, Qubole and Robert Congui’s JSON serde to monitor the number of bad rows generated over time
- How to use the same tools to reprocess the bad rows of data, so that they are added to your Snowplow data in Redshift / PostgreSQL
The steps necessary to reprocess the data will be very similar to those required regardless of the reason that the reprocessing is necessary: as a result, this blog post should be useful for anyone interested in using the bad rows functionality to debug and improve the robustness of their event data collection. It should also be useful for anyone interested in using Hive and the JSON serde to process JSON data in S3. (Bad row data is stored by Snowplow in JSON format.) We will use Qubole, our preferred platform for running Hive jobs on data in S3, which we previously introduced in this blog post.
- Understanding how Snowplow handles bad rows
- Processing the bad rows data using the JSON serde, Hive and Qubole
- Plotting the number of bad rows over time
- Reprocessing bad rows
The Snowplow enrichment process takes input lines of data, in the form of collector logs. It validates the format of the data in each of those lines. If the format is as expected, it performs the relevant enrichments on that data (e.g. referer parsing, geo-IP lookups), and writes the enriched data to the Out Bucket on S3, from where it can be loaded into Redshift / PostgreSQL. If the input line of data fails the validation, it gets written to the Bad Rows Bucket on S3.
:s3: :region: ADD HERE :buckets: :assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket :log: ADD HERE :in: ADD HERE :processing: ADD HERE :out: ADD HERE WITH SUB-FOLDER # e.g. s3://my-out-bucket/events :out_bad_rows: ADD HERE # e.g. s3://my-out-bucket/bad-rows :out_errors: ADD HERE # Leave blank unless :continue_on_unexpected_error: set to true below :archive: ADD HERE
Each bad row is a JSON containing just two fields:
- A field called
line(of type String), which is the raw line of data from the collector log
- A field called
errors(an Array of Strings), which includes an error message for every validation test the line failed
An example row generated for the Snowplow website, caused by Amazon’s CloudFront log file format update, is shown below (formatted to make it easier to read):
There are a couple of ways to process JSON data in Hive. For this tutorial, we’re going to use Roberto Congiu’s Hive-JSON-Serde. This is our preferred method of working with JSONs in Hive, where your complete data set is stored as a series of JSONs. (When you have a single JSON-formatted field in a regular Hive table, we recommend using the
get_json_object UDF to parse the JSON data.)
Now that we have placed the JSON serde in an S3 location that is accessible to us when we run Hive, we are in a position to fire up Qubole and start analyzing our bad rows data. Log into Qubole via the web UI to get started and open up the Composer window. (If you have not tried Qubole yet, we recommend you read our guide to getting started with Qubole.)
Now enter the following in the Qubole Composer:
ADD JAR s3://snowplow-hosted-assets/third-party/rcongiu/json-serde-1.1.6-jar-with-dependencies.jar;
After a short period Qubole should alert you that the JAR has been successfully uploaded:
Now we need to define a table so that Hive can query our bad row data in S3. Execute the following query in the Qubole Composer, making sure that you update the
LOCATION setting to point to the location in S3 where your bad rows are stored. (This can be worked out from your EmrEtlRunner’s
config.yml file, as explained above).
CREATE EXTERNAL TABLE `bad_rows` ( line string, errors array<string> ) PARTITIONED BY (run string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE LOCATION 's3n://snowplow-data/snplow/bad-rows/';
Our table is partitioned by
run - each time the Snowplow enrichment process is run (in our case daily), any bad rows are saved in their own separate subfolder labelled
run=2013-xx-xx.... Let’s recover those partitions, by executing the following:
ALTER TABLE `bad_rows` RECOVER PARTITIONS;
We run the Snowplow ETL once a day. As a result, each “run” represents one days worth of data. By counting the number of bad rows per run, we effectively calculate the number of bad rows of data generated per day. We can do that by executing the following query:
SELECT run, count(*) FROM `bad_rows` GROUP BY run;
Execute that in Qubole, and then download your results. (By clicking the Download link in the UI. If you open them in Excel, you should see something as follows:
|Run ID||Number of bad rows|
(We have added the headers to the table above - these will not be downloaded)
We can plot the data directly in Excel:
- We have no bad rows before August 17th, when Amazon updated their Cloudfront log format
- We then have bad rows every day since. (In our case, this varies between 2-25. This is on the Snowplow site, which attracts c.200 uniques per day.)
Using plots like the one above to spot emerging problems with your Snowplow data pipeline is one thing. When you’ve identified the cause of the problem, and fixed it (as we have), you then need to reprocess those bad lines of data.
Fortunately, this is pretty straightforward. We need to extract the bad lines out of the JSONs, and write them back into a new location in S3 in their raw form. We can then set the
IN bucket on the EmrEtlRunner to point to this new location, and run the updated Enrichment process on the data.
To extract the raw lines of data out of the JSONs, we first create another external table in Hive, this time in the location where we will save the data to be reprocessed:
CREATE EXTERNAL TABLE `data_to_reprocess` ( line string ) ROW FORMAT DELIMITED LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION 's3n://qubole-analysis/data-to-reprocess/snplow/2013-09-11/';
- We’ve created our table in the special bucket that we’ve given Qubole unrestricted write access to
- We’ve created a specific folder in that bucket for the new data, so it will be easy to find later
Now that we’ve created our table, we need to insert into it the bad rows to reprocess:
INSERT INTO TABLE `data_to_reprocess` SELECT line FROM `bad_rows`;
Note how we are only writing the actual raw line of data into the new table (and ignoring everything else in the
bad_rows table, including both the
run and the actual error message itself).
Bingo! When the query is complete, the data to reprocess is available in the new bucket we’ve created:
We now need to run the Snowplow Enrichment process on this new data set. We do that using EmrEtlRunner. Navigate to the server you run EmrEtlRunner from, and navigate to the directory it is installed in.
Now, create a copy of your EmrEtlRunner config.yml with a suitable name e.g.
config-process-bad-rows-2013-09-11.yml and update the In Bucket to point to the location of the the data to be reprocessed is (i.e. the location of the Hive
data_to_reprocess table). Don’t forget as well to update (if you haven’t already done so) the ETL to the latest version, which can handle the change in Amazon’s CloudFront log file format:
:snowplow: :hadoop_etl_version: 0.3.4 # Version of the Hadoop ETL
Now execute the following command at the command line:
$ bundle exec bin/snowplow-emr-etl-runner --config config/config-process-bad-rows-2013-09-11.yml
Make sure you update the path to point at the name of the config file you created in the previous step. This should kick off the Enrichment process in EMR. Once it has been completed, you can run the StorageLoader to load the newly processed data into Redshift / PostgreSQL as normal:
$ cd ../../4-storage/storage-loader $ bundle exec bin/snowplow-storage-loader --config config/config.yml
Done! The data that was previously excluded has now been added to your Snowplow database!