Snowplow Event Recovery 0.1.0 released

Share

We are excited to announce the release of Snowplow Event Recovery.

The different Snowplow pipelines being all non-lossy, if something goes wrong during schema validation or enrichment, the payloads (alongside the errors that happened) are stored in a bad rows storage solution, be it a data stream or object storage, instead of being discarded.

The goal of recovery is to fix the payloads contained in these bad rows so that they are ready to be processed successfully by a Snowplow enrichment platform.

Snowplow Event Recovery lets you run data recoveries on data emitted by real-time Snowplow pipelines on AWS and GCP.

Please read on after the fold for:

  1. Overview
  2. Recovery scenarios
  3. Snowplow Event Recovery on AWS
  4. Snowplow Event Recovery on GCP
  5. Roadmap
  6. Getting help

1. Overview

Our current approach to data recovery, Hadoop Event Recovery, suffers from a few issues:

Snowplow Event Recovery aims to tackle most of these issues and make the data recovery process:

2. Recovery scenarios

Keeping these goals in mind, we started by thinking about what a recovery is, in essence. For us, it is a collection of what we’ve come to call a recovery scenario.

So, what are recovery scenarios? They are modular and composable processing units that will deal with a specific case you want to recover from.

As such, recovery scenarios are, at their essence, made up of two things:

For example, if we wanted to recover a set of bad rows consisting of:

We would use a different recovery scenario for each of them, so three in total:

2.1 Out of the box recovery scenarios

For the most common recovery scenarios, it makes sense to support them out of the box and not require any coding. From the recoveries we’ve run in the past, we’ve compiled a list of recovery scenarios that are supported out of the box by Snowplow Event Recovery.

In the table below, you can find what this list is made of, it contains:

NameMutationExample use caseParameters
Pass throughDoes not mutate the payload in any wayA missing schema that was added after the facterror
Replace in query stringReplaces part of the query string according to a regexMisspecified a schema when using the Iglu webhookerror, toReplace, replacement
Remove from query stringRemoves part of the query string according to a regexProperty was wrongfully tracked and is not part of the schemaerror, toRemove
Replace in base64 field in query stringReplaces part of a base64 field in the query string according to a regexProperty was sent as a string but should be an numericerror, base64Field (cx or ue_px), toReplace, replacement
Replace in bodyReplaces part of the body according to a regexMisspecified a schema when using the Iglu webhookerror, toReplace, replacement
Remove from bodyRemoves part of the body according to a regexProperty was wrongfully tracked and is not part of the schemaerror, toRemove
Replace in base64 field in bodyReplaces part of a base64 field in the body according to a regexProperty was sent as a string but should be an numericerror, base64Field (cx or ue_px), toReplace, replacement

Note that, for every recovery scenario leveraging a regex, it’s possible to use capture groups. For example, to remove brackets but keep their content we would have a toReplace argument containing \{(.*)\} and a replacement argument containing $1 (capture groups are one-based numbered).

2.2 Custom recovery scenarios

In addition to the outlined scenarios, we still wanted to make the idea of recovery scenarios extensible. As such, if the recovery you want to perform is not covered by the ones listed above, you can define your own by following the guide in the repository.

If you think your recovery scenar
io will be useful to others, please consider opening a pull request!

2.3 Configuration

Once you have identified the different recovery scenarios you will want to run, you can combine them in the configuration that we will feed to the recovery job. Here, we make use of each and every one of them as a showcase.

{ "schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/1-0-0", "data": [ # Schema com.acme/my_schema/jsonschema/1-0-0 was added after the fact { "name": "PassThrough", "error": "Could not find schema with key iglu:com.acme/my_schema/jsonschema/1-0-0 in any repository" }, # Typo in the schema name when using the Iglu webhook { "name": "ReplaceInQueryString", "error": "Could not find schema with key iglu:com.snowplowanalytics.snowplow/screen_vie/jsonschema/1-0-0 in any repository", "toReplace": "schema=iglu%3Acom.snowplowanalytics.snowplow%2Fscreen_vie%2Fjsonschema%2F1-0-0", "replacement": "schema=iglu%3Acom.snowplowanalytics.snowplow%2Fscreen_view%2Fjsonschema%2F1-0-0" }, # Removes illegal curlies in query strings (e.g. templates that haven't been filled) { "name": "RemoveFromQueryString", "error": "Exception extracting name-value pairs from querystring", "toRemove": "\{.*\}" }, # Replaces a string by an integer in ue_px, it can be reused for ReplaceInBase64FieldInBody { "name": "ReplaceInBase64FieldInQueryString", "error": "instance type (string) does not match any allowed primitive type (allowed: ["integer"])n level: "error"n schema: {"loadingURI":"#","pointer":"/properties/sessionIndex"", "base64Field": "ue_px", "toReplace": ""sessionIndex":"(\d+)"", # $1 refers to the first capture group "replacement": ""sessionIndex":$1" }, # Replaces the device created timestamp by a string { "name": "ReplaceInBody", "error": "instance type (integer) does not match any allowed primitive type (allowed: ["string"])n level: "error"n schema: {"loadingURI":"#","pointer":"/items/properties/dtm"", "toReplace": ""dtm":(\d+)", "replacement": ""dtm":"$1"" }, # Removes a field which shouldn't be there { "name": "RemoveFromBody", "error": "object instance has properties which are not allowed by the schema: ["test"]", "toRemove": ""test":".*",?" }, # Same as ReplaceInBase64FieldInQueryString { "name": "ReplaceInBase64FieldInBody", "error": "instance type (string) does not match any allowed primitive type (allowed: ["integer"])n level: "error"n schema: {"loadingURI":"#","pointer":"/properties/sessionIndex"", "base64Field": "ue_px", "toReplace": ""sessionIndex":"(\d+)"", # $1 refers to the first capture group "replacement": ""sessionIndex":$1" }, # Our custom recovery scenario, replaces a wrong Iglu webhook path { "name": "ReplaceInPath", "error": "Payload with vendor com.iglu and version v1 not supported", "toReplace": "com.iglu/v1", "replacement": "com.snowplowanalytics.iglu/v1" } ] }

2.4 Testing

It’s possible to test an entire recovery without running it or a custom recovery scenario by following the dedicated guide in our repositor
y
.

3. Snowplow Event Recovery on AWS

For AWS users, the recovery will take the form of a Spark job which you can run through EMR, for example. It will read bad rows from an S3 location, run the recovery on this data, and store the recovered payloads in another S3 location.

You can run the job using the JAR directly (which is hosted at s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/):

spark-submit  --class com.snowplowanalytcs.snowplow.event.recovery.Main  --master master-url  --deploy-mode deploy-mode  snowplow-event-recovery-spark-0.1.0.jar --input s3://bad-rows-location/ --output s3://recovered-collector-payloads-location/ --config base64-encoded-configuration

Or through an EMR step:

aws emr add-steps --cluster-id j-XXXXXXXX --steps  Name=snowplow-event-recovery, Type=CUSTOM_JAR, Jar=s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.1.0.jar, MainClass=com.snowplowanalytics.snowplow.event.recovery.Main, Args=[--input,s3://bad-rows-location/,--output,s3://recovered-collector-payloads-location/,--config,base64-encoded-configuration], ActionOnFailure=CONTINUE

Note that the configuration discussed above will need to be base64-encoded.

4. Snowplow Event Recovery on GCP

For GCP users, leveraging the data outputted by the Snowplow Google Cloud Storage Loader, recovery will take the shape of a Beam job runnable on Dataflow. It will read bad rows from a GCS location specified through a pattern, run the recovery on this data, and store the recovered payloads in a PubSub topic (ideally your PubSub topic containing the raw payloads so that fixed payloads can be directly picked up by the enrichment process).

You can run the job using the zip archive, which can be downloaded from Bintray here:

./bin/snowplow-event-recovery-beam  --runner=DataFlowRunner  --project=project-id  --zone=europe-west2-a  --gcpTempLocation=gs://location/  --inputDirectory=gs://bad-rows-location/*  --outputTopic=projects/project/topics/topic  --config=base64-encoded-configuration

Or using a Docker container:

docker run  -v $PWD/config:/snowplow/config  # if running outside GCP -e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json  # if running outside GCP snowplow-docker-registry.bintray.io/snowplow/snowplow-event-recovery:0.1.0  --runner=DataFlowRunner  --project=project-id  --zone=europe-west2-a  --gcpTempLocation=gs://location/  --inputDirectory=gs://bad-rows-location/*  --outputTopic=projects/project/topics/topic  --config=base64-encoded-configuration

Note that, here too, the configuration discussed above will need to be base64-encoded.

5. Roadmap

Continuing our data quality journey, we will next work towards a new bad row format. You can read more about this initiative in our RFC.

On the Snowplow front, the next releases will include:

After these two releases, the pipeline team will focus its effort on the new bad row format.

6. Getting help

For more details on this release, please check out the release notes on GitHub.

If you have any questions or run into any problem, please visit our Discourse forum.

Share

Related articles