Snowplow BigQuery Loader released

03 December 2018  •  Anton Parkhomenko

We are tremendously excited to announce the public release of the Snowplow BigQuery Loader. Google BigQuery is a highly-scalable and fully-managed data warehouse with real-time ingestion and rich support for semi-structured data. Since its launch, we have had many Snowplow users and prospective users request that we extend Snowplow to support loading Snowplow data into BigQuery as a storage target. This release enables us to do just that.

The BigQuery Loader was the key “missing piece” in the Google Cloud Platform version of Snowplow pipeline, following Google Pub/Sub support in the Stream Collector and Beam Enrich in Snowplow core. This release gets us very close to completing an initial version of Snowplow that runs end-to-end in GCP, making Snowplow a truly multi-cloud platform.

Read on below the fold for:

  1. Google Cloud Platform (GCP) support
  2. Google BigQuery
  3. Snowplow BigQuery Loader
  4. Setup
  5. Roadmap
  6. Getting help

1. Google Cloud Platform (GCP) support

One year ago we published our “Porting Snowplow to Google Cloud Platform” RFC, which laid the ground for native support of Google Cloud Platform across the Snowplow core components: collector, enrichment, warehouse loading and event archival.

Since that announcement, we have been busy exploring the Google Cloud Platform and working on prototype components. R101 Neapolis introduced Google Cloud Pub/Sub support to our Stream Collector and added provisional Cloud Pub/Sub support to Stream Enrich (our existing real-time enrichment process for Kinesis).

R110 Valle dei Templi then added a new Beam Enrich application, which reads raw events from Pub/Sub subscription and performs standard the enrichment process using Google Cloud Dataflow, a service for distributed data processing; this replaced the Cloud Pub/Sub support in Stream Enrich.

And with today’s release of the BigQuery Loader, you can now deploy your entire pipeline on Google Cloud and analyze Snowplow enriched data using best-in-class tools such as BigQuery and Cloud Dataflow.

2. Google BigQuery

Google BigQuery is gaining increasing adoption and mind-share as a cloud-native, elastically scalable data warehouse. It provides seamless integration with many Google services, including other GCP products and Google Analytics, supports near real-time analytics and offers a familiar SQL interface for analysts.

BigQuery has a unique combination of features and characteristics that make it a perfect addition to a growing list of storage targets supported by Snowplow:

  • Elastic compute - BigQuery scales elastically, making it performant to run very computationally intensive queries
  • Real-time ingestion - with Snowplow loading into BigQuery we can sink tremendous volumes of data and query it with sub-second delays. This makes BigQuery appealing for use cases like fraud-detection or real-time recommendations engines
  • Fully managed - unlike with Amazon Redshift, for example, you don’t need to worry about warehouse maintenance, such as VACUUMing or cluster scaling - our test pipelines have been handling tens of millions records per day without any disruption
  • Support for semi-structured data - Google BigQuery has support for STRUCTs (JSON objects) and REPEATED elements (arrays), and enforces statically known structures for these entities so that all queries are type-checked
  • On-demand pricing model - with BigQuery you pay only for scanned rows. This makes BigQuery a better suited data store than Redshift for users who want to warehouse very large volumes of data
  • Extensive ecosystem - as one of the most popular analytics warehouses, BigQuery already has a wide range of integrations, support for various public datasets and growing list of partners. In particular, many Google services, including Campaign Manager (formerly DoubleClick), Ad Manager, and Youtube, support exporting data directly into BigQuery, where it is now possible to join that data with Snowplow data

3. Snowplow BigQuery Loader

3.1. Overview

We have been able to take the lessons learnt building loaders for Redshift and SnowflakeDB and apply them to the BigQuery Loader.

For example, the BigQuery Loader automatically updates table definitions in BigQuery when events and entities (i.e. contexts) are received with new schema versions. Simply ensure that any new schema versions have been uploaded your Iglu registry, then start sending events with the new schema: the BigQuery Loader will create the corresponding additional column inside your BigQuery events table automatically.

It was also great to integrate with BigQuery’s expressive SQL type system. To keep our event data as structured as possible, in BigQuery each self-describing schema is mapped to a STRUCT column, reflecting all its properties. This also works with any kind of nested structure, including deeply nested arrays - any shredded type can be represented as a separate column, not a separate table as we have to use in Redshift.

These two features are powered by our Iglu schema technology, which as of R10 Tiflis includes a full-featured BigQuery DDL abstract syntax tree and support for JSON Schema to BigQuery DDL generation.

3.2. Architecture

Unlike existing Loaders, the BigQuery Loader’s architecture is entirely real-time and designed for unbounded data streams. It does not use cloud/blob storage to stage the data, and it makes no assumptions about data volumes. As with the other components of Snowplow GCP pipeline, the Loader exclusively uses Cloud Pub/Sub topics in order to read enriched events, sink bad rows, and handle all related tasks.

The Snowplow BigQuery Loader consists of two applications:

  1. The Loader itself, a Cloud Dataflow job that transforms stream of enriched events into BigQuery format and ingests them
  2. The Mutator, a stand-alone JVM application that performs the necessary ALTER TABLE statements

architecture

The two applications communicate through the typesTopic. The Loader writes to that topic all of the types that it has encountered; the Mutator then reads from that topic to perform mutation of the events table as necessary. The Mutator should be constantly running and consuming Pub/Sub messages.

Alongside the typesTopic, the Loader makes use of two other Pub/Sub topics:

  1. badRows - rows that for some reason couldn’t be transformed into BigQuery format. These could be caused by an Iglu registry outage, or by an unexpected schema patch or overwrite. This closely resembles the “shredded bad” data generated by our RDB Shredder for Redshift, and contains the reason of failure and raw enriched JSON
  2. failedInserts - the Loader sends data to this topic that has passed transformation, but for some reason failed during the actual insertion stage. Unlike badRows data, these records unfortunately do not contain the reason of failure - they are in the form of ready-to-be-inserted BigQuery row format. The main source of failed inserts is the short period of time between the first event with new schema processed by the Loader, and the Mutator performing the necessary mutation

Both “bad rows” and “failed inserts” have different formats, causes and recovery strategies.

Bad rows should be extremely rare and in order to recover them one needs to sink the data to Cloud Storage (we recommend using our snowplow-google-cloud-storage-loader and apply an appropriate recovery strategy depending on the root cause. Stay tuned for the release of snowplow-event-recovery - to do just this.)

Failed inserts in turn usually can be simply forwarded to BigQuery using the auxiliary BigQuery Forwarder job. If they were simply caused by the Mutator’s delay, then BigQuery will accept these rows the second time.

Note that Pub/Sub has a retention time of 7 days. After this time, messages will be silently dropped. Therefore, we recommend sinking these topics to Cloud Storage to prevent data loss.

4. Setup

Setup of the Snowplow BigQuery Loader is relatively straightforward, involving the following steps:

  1. Setup rest of the Snowplow GCP stack
  2. Create the necessary Pub/Sub topics and subscriptions
  3. Initialize the empty events table (optionally with partitioning on the derived_tstamp column)
  4. Write the configuration file
  5. Launch the Mutator
  6. Submit the Loader job to Dataflow

Both Mutator and Loader use the same self-describing JSON configuration file with this schema:

iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0

Here is a configuration example:

{
    "schema": "iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0",
    "data": {
        "name": "Acme BigQuery",
        "id": "12b1159f-d110-4ab3-a7ae-c7698238d808",
        "input": "enriched-good-sub",
        "projectId": "acme-snowplow",
        "datasetId": "atomic",
        "tableId": "events",
        "typesTopic": "acme-bigquery-types-topic",
        "typesSubscription": "acme-test-types-sub",

        "badRows": "acme-bigquery-bad-rows-topic",
        "failedInserts": "acme-bigquery-failed-inserts-topic",

        "load": {
            "mode": "STREAMING_INSERTS",
            "retry": false
        },

        "purpose": "ENRICHED_EVENTS"
    }
}

For more information on these configuration properties, check out the Loader’s wiki.

You can initialize the Mutator like this:

$ ./snowplow-bigquery-mutator \
    listen      # Can be "init" to create empty table
    --config $CONFIG \
    --resolver $RESOLVER \

Then you can submit the Loader itself to Cloud Dataflow like so:

$ snowplow-bigquery-loader \
    --config=$CONFIG \
    --resolver=$RESOLVER \
    --runner=DataflowRunner \
    --saveHeapDumpsToGcsPath=gs://dataflow-staging-us-central1-102462720186/heap/

5. Roadmap

This is the first public release of BigQuery Loader, and it can be considered stable and reliable enough for most production use cases.

It has performed well in our internal testing program, but many things are still subject to change. Upcoming changes will most likely be focused on the following aspects:

  • Table structure - currently all columns are created with the precise version of schema, e.g. contexts_com_acme_product_context_1_0_0. We think that this model is enough for many use cases, but not optimal for data models which make heavy use of self-describing data with regularly evolving schemas. We’re thus considering MODEL-based versioning (e.g. contexts_com_acme_product_context_1) using table-sharding for the next major version of Loader - but are still open to suggestions
  • Deduplication - Google Cloud Pub/Sub has very weak delivery guarantees and a Snowplow pipeline has to contend with various sources of duplicates, so we will need some deduplication mechanism in due course
  • State management - currently, the loader tracks its own state via the typesTopic introduced above. This makes it very hard to reason about how BigQuery loading is proceeding, so we are looking for more sophisticated solutions going forwards

6. Getting help

You’ll find documentation for the BigQuery Loader on the project’s wiki.

For more details on this release, as always do check out the release notes on GitHub.

And if you have any questions or run into any problems, please visit our Discourse forum.