Snowplow’s RDB Loader or Snowflake’s Snowpipe.

Share

Snowpipe is a general-purpose tool developed by Snowflake to support continuous loading of files as they appear in S3. Snowplow has developed an alternative called RDB Loader, which we believe is a better choice for users to get the best out of the Snowplow platform. (Apologies in advance for the number of ‘snows’ in this article, it literally cannot be avoided!)

Snowplow’s RDB Loader:

  1. Loads Snowplow events with a structure amenable to efficient analytic queries.
  2. Has automatic table migrations that allow data teams to easily evolve their tracking over time.
  3. Works well with Snowplow’s DBT data models, which are designed to work with this table structure.

In order to explain why Snowplow’s RDB Loader is the superior choice, this article takes a technical deep dive into different methods of loading Snowplow events with Snowpipe before explaining how we have resolved these issues in our loader.

If you’re already convinced, jump to our installation guide.

Table of contents

Snowplow event structure

Much of this article focuses on the custom data structures which you attach to Snowplow events in your tracking. These are the self-describing JSONs that describe entities that are specific for your business use case, and which are validated against schemas of your choice.

For example, you might design an “add-to-cart” event for your ecommerce site; or a “customer” entity to annotate events with details of the end user; or you might also use some standard schemas from Iglu Central, to be compatible with the Snowplow data models. It’s fair to say most of the interesting data for your analytic queries will be captured in these custom entities.

We can imagine two different ways of loading these multifaceted entities into Snowflake:

  1. All entities are loaded into a single “contexts” column, no matter their schema or what entity they describe.
  2. Each different type of entity is loaded into its own dedicated column. Each single column therefore contains entities that all conform to the same schema.

RDB Loader uses the second approach, and we will demonstrate how it is a much better table structure for enabling analytic queries. But for now let’s see if we can recreate this desired table structure by integrating Snowflake’s Snowpipe into a Snowplow pipeline.

Snowpipe method 1: Naive loading from the TSV enriched archive

Snowplow events in the enriched stream are in tab-separated TSV format. You can load these events to S3 using Snowplow’s S3 Loader and we call the resultant TSV files the “enriched archive”.

So is the enriched archive a good candidate for hooking into Snowpipe?

It is important to note that the TSV files in S3 do not have dedicated columns for each one of your custom entities; there is no dedicated column for your “customer” context or “add-to-cart” event. Instead, all custom data structures are written to the three generic self-describing JSON fields:

  • contexts column 52
  • unstruct_event column 58
  • derived_contexts column 122

If you look at the contexts column of the TSV file, you would find a mixture of different entities, with several different Iglu schemas.

This is how you might create a table in Snowflake to receive events directly from the enriched TSV format, assuming for now that no transformations are applied during the load. Note that unstruct_eventcontexts and derived_contexts are stored using the VARIANT data type. Also notice we have skipped most of the standard Snowplow fields for brevity.

CREATE TABLE atomic.events (
  app_id VARCHAR(255),
  platform VARCHAR(255),
  -- lots of other fields go here!
  unstruct_event VARIANT,
  contexts VARIANT,
  derived_contexts VARIANT
)

This is how you might create a pipe in Snowflake to continually load newly arriving data in the Snowplow enriched archive. Note that we parse the unstruct event and contexts as JSON as we load. This example assumes you have already created a Snowflake stage for your archive (plowstage).

CREATE PIPE plowpipe AS
COPY INTO atomic.events(app_id, platform, unstruct_event, contexts, derived_contexts)
FROM (
  SELECT $1, $2, PARSE_JSON($59), PARSE_JSON($53), PARSE_JSON($123)
  FROM @plowstage
)
file_format = (TYPE = 'CSV' FIELD_DELIMITER = '\t')

Let’s recap the important facts so far:

  • The most interesting data for your use case is typically in the unstructured event and entities columns, because they contain your custom data structures defined by your custom Iglu schemas.
  • If you use Snowplow’s RDB Loader then your warehouse table has a dedicated column for each custom schema.
  • But if you load from the TSV archive using Snowpipe then your warehouse table squeezes all custom events into a single unstruct_event column, and all contexts into a single contexts column.

This difference really becomes important when you come to query the data. The appendix of this blog post explains this in more detail, but for now we’ll just leave you with an example. If you load your events table using Snowplow’s RDB Loader, then a typical analytical query might look like this, to count the number of clicks through to a web page of interest, grouped by device:

select
    date_trunc('day', derived_tstamp) as reporting_date,
    contexts_com_snowplowanalytics_snowplow_ua_parser_context_1[0].deviceFamily::string as device_family,
    count(distinct contexts_com_snowplowanalytics_snowplow_web_page_1[0].id::string) as page_views

from atomic.events e

where e.event_name = 'link_click'
and e.unstruct_event_com_snowplowanalytics_snowplow_link_click_1:targetUrl::string = 'https://someexternalsite.com'
and not e.contexts_com_iab_snowplow_spiders_and_robots_1[0].spiderOrRobot::boolean

group by 1,2

If you load your events table using Snowpipe and the TSV archive, then your query needs to look like this to pull back the same data:

with prep as (

  select 
    e.event_id,
    e.event_name,
    e.derived_tstamp,
    e.unstruct_event,
    --collapse all flattened contexts into a single row again.
    array_agg(case when dc.value:schema::string like 'iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-%' then dc.value:data end) as ua_parser_context_1,
    array_agg(case when dc.value:schema::string like 'iglu:com.iab.snowplow/spiders_and_robots/jsonschema/1-0-0%' then dc.value:data end) as spiders_and_robots_context_1,
    array_agg(case when c.value:schema::string like 'iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-%' then c.value:data end) as page_view_context_1

  from atomic.events e,
  lateral flatten(input => e.derived_contexts, path => 'data', outer => true) dc,
  lateral flatten(input => e.contexts, path => 'data', outer => true) c

  where e.event_name = 'link_click'
  and e.unstruct_event:data.data.targetUrl::string = 'https://someexternalsite.com'

  group by 1,2,3,4

)

select
  date_trunc('day', derived_tstamp) as reporting_date,
  p.ua_parser_context_1[0].deviceFamily::string as device_family,
  count(distinct p.page_view_context_1[0].id::string) as page_views

from prep p

where not p.spiders_and_robots_context_1[0].spiderOrRobot::boolean --filter out bots

group by 1,2

The difference in length and complexity of those two examples should convince you that loading directly from the TSV archive leads to a much worse user experience for downstream analytics. Please do dig into the appendix if you want to understand how we derived those two queries and what they mean.

Snowpipe method 2: Load enriched TSV files with a transformation on the fly

We have argued it is strongly preferable to work with tables where unstructured events and contexts have been extracted into their own dedicated columns for each schema type. Snowpipe supports loads with a certain amount of transformation, so let’s explore whether we can get our desired table structure by making the pipe do some the work for us.

Here is a Snowpipe definition that extracts the add_to_cart event into its own dedicated column during the load.

CREATE PIPE plowpipe AS
COPY INTO atomic.events(
      app_id,
      platform,
      unstruct_event_com_acme_add_to_cart_1
)
FROM (
  SELECT
    $1, $2,
    case when PARSE_JSON($59):data.schema::string like 'iglu:com.acme/add_to_cart/jsonschema/1-%'
        then PARSE_JSON($59):data.data
        end
  FROM @plowstage
)
file_format = (TYPE = 'CSV' FIELD_DELIMITER = '\t')

You could add extra case statements for each schema type that you want to extract, e.g. another case statement for your remove_from_cart event, and yet another for your start_checkout event etc. This approach successfully gives the desired table structure with respect to unstructured events.

However, this approach cannot be extended to work for contexts and derived contexts. The problem is that the contexts and derived_contexts columns contain arrays of multiple contexts of many possible schemas. It is simply not possible to define a Snowpipe that extracts and filters items from such arrays.

Later, we also discuss the problems of hard-coding event names into a pipe definition.

Snowpipe method 3: Load the transformed JSON event files

The RDB Loader suite of applications includes a stream transformer, which reads TSV files from the enriched Kinesis stream and writes them to S3 in newline-separated JSON format. Unlike the TSV archive described above, this “transformed archive” does have the custom entities pulled out into their own dedicated fields. For example, the JSON might have a fields called contexts_com_acme_customer_1 just for the customer context, and unstruct_event_com_acme_add_to_cart_1 just for the add-to-cart event.

So is the transformed archive a good candidate for hooking into Snowpipe?

This is how you might manually create a table in Snowflake to hold the transformed data. In this example we assume the data teams in the organisation uses a custom event type called add_to_cart, and a custom context called user, and the standard derived context ua_parser_context. As before, we have skipped most of the standard Snowplow fields for brevity.

CREATE TABLE atomic.events (
  app_id VARCHAR(255),
  platform VARCHAR(255),
  -- lots of other fields go here!
  unstruct_event_com_acme_add_to_cart_1 VARIANT,
  contexts_com_acme_customer_1 VARIANT,
  contexts_com_snowplowanalytics_snowplow_ua_parser_context_1 VARIANT

This is how you might create a pipe in Snowflake to continually load newly arriving transformed events. Note that we extract the custom event types and contexts directly from the JSON.

CREATE PIPE plowpipe AS
COPY INTO atomic.events(
  app_id,
  platform,
  unstruct_event_com_acme_add_to_cart_1,
  contexts_com_acme_customer_1,
  contexts_com_snowplowanalytics_snowplow_ua_parser_context_1
)
FROM (
  SELECT
    $1:app_id,
    $1:platform,
    $1:unstruct_event_com_acme_add_to_cart_1,
    $1:contexts_com_acme_customer_1,
    $1:contexts_com_snowplowanalytics_snowplow_ua_parser_context_1
  FROM @plowstage
)
file_format = (format_name = myjsonformat)

Finally, we have demonstrated a Snowpipe approach that gives the desired table structure for unstructured events, contexts, and derived contexts. However, we go on to explain why we still prefer working with Snowplow’s RDB Loader rather than using this method.

Snowpipe’s problems with tracking new entities

Methods 2 and 3, outlined above, share a common problem when it comes to tracking new event types or entities for the first time. This particularly affects active data teams who are continually evolving their tracking over time by iterating on custom event and context schemas. When using the Snowpipe methods, the extracted unstructured events and contexts are explicitly listed in the pipe definitions. So, in order to work with a new event or entity in the Snowflake table, it first needs to get added to the Snowpipe definition.

It is not possible to directly modify a Snowpipe definition to include a new column. Your company’s Snowflake admin will need to manually perform a few awkward migrations:

  1. Delete the existing pipe, because it no longer covers every possible event type
  2. Alter the atomic events table to add a column for the new event type
  3. Recreate the pipe, listing all events and contexts including the new one.

You need to be very careful with step 3, because you don’t want to accidently reload all historic data as soon as you redefine the pipe. You can probably control this by using the MODIFIED_AFTER configuration option, but you must still be careful to make sure every recent event gets loaded at least once.

This manual migration looks difficult and dangerous, and yet it would be nice to run it as quickly as possible, to avoid lengthy delays in critical data reaching the warehouse.

RDB Loader and schema migrations

By contrast, Snowplow’s RDB loader is designed for a friction-free approach to adding new event types and context types to the table. The RDB loader takes the following steps, for every batch of events it loads:

  1. Inspects the event types and context types in the new batch of data
  2. Inspect the atomic events table to see which event types and contexts have been previously loaded
  3. Alters the table, adding a new column for any missing event type or context.
  4. Copies the batch of data into the altered table.

Our automatic schema migration approach empowers data teams who want to design schemas for events and contexts, and evolve the design over time. It helps out engineers when implementing Snowplow tracking, because they may add new event types and context types to their tracking without needing to worry about the complexities of how events get loaded to the warehouse. We believe nobody in a mature data team should encounter organisational friction when using Snowplow, such as needing to request a Snowflake admin to manually pause and reconfigure Snowplow loading.

Conclusion

This technical walkthrough has shown that Snowplow’s RDB Loader offers better:

  1. Table structure for analytic queries. Data analysts can more easily write queries that explore the richness in Snowplow’s unstructured events and contexts.
  2. Schema evolution. The events table is automatically modified as needed to receive new types of event or context. This decouples tracking teams and data teams from warehouse admins.
  3. Support for Snowplow’s data models.

Besides the superior data structure, Snowplow’s RDB Loader also comes with some helpful monitoring and observability functionality, as well as retry logic for increased reliability. Check out the release notes for a full overview of all features.

To start using Snowplow’s RDB Loader, head over to our documentation site and see the guide on running the RDB Loader. You will need to run the transformer to transform enriched TSV to wide-row JSON files in S3, and rdb-loader-snowflake to load the wide-row JSON into Snowflake.

If you are currently using Snowpipe then you could switch methods by deleting your pipe from Snowflake, and then launching the RDB Loader applications instead. The loader will automatically detect which entity columns you have already added to your events table, and which table migrations it must perform itself. Be warned, however, that depending on details of your previous Snowpipe setup, you might notice some data flows into different columns, which could impact your downstream analytical queries.

Appendix: Querying different table structures

This appendix shows some examples that demonstrate our core arguments:

  • That it is by far more preferable to query events once the custom entities have been pulled out into their own dedicated columns.
  • By contrast, if Snowpipe is used to load the TSV enriched archived, then analytic queries are more difficult because of the heterogeneity of the unstructured event and context columns.

Example 1: Link click events to a given target URL

Let’s query a field from a standard link_click event.

Snowpipe on TSV

If the table was loaded from TSV files with Snowpipe then we need to filter on event_name for link clicks as well as the target URL from the unstruct_event col.

select *

from atomic.events e

where e.event_name = 'link_click'
and e.unstruct_event:data.data.targetUrl::string = 'https://someexternalsite.com' --Filter for given URL

RDB Loader

If the table was loaded by the RDB loader then the link click event has been separated into its own column

select *

from atomic.events e

where e.event_name = 'link_click'
and e.unstruct_event_com_snowplowanalytics_snowplow_link_click_1.targetUrl::string = 'https://someexternalsite.com'

At this point both queries are similar and relatively easy write. Let’s look at a slightly more advanced query.

Example 2: Count of page views clicking through to an external site

Expanding on the previous example, let’s now calculate the number of page views per day that have clicked through to the given URL. For this we will need the page_view_id, which is a field from the web page context.

Snowpipe on TSV

select
  date_trunc('day', derived_tstamp) as reporting_date,
  count(distinct c.value:data.id::string) as page_views -- distinct count of page view ids

from atomic.events e,
lateral flatten(input => e.contexts:data, outer => true) c --flatten the array of contexts

where e.event_name = 'link_click'
and e.unstruct_event:data.data.targetUrl::string = 'https://someexternalsite.com'
and c.value:schema::string = 'iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0' --Filter flattened results for page view context schema

group by 1

RDB Loader

select
  date_trunc('day', derived_tstamp) as reporting_date,
  count(distinct contexts_com_snowplowanalytics_snowplow_web_page_1[0].id::string) as page_views -- page view id straight from page view context column

from atomic.events e

where e.event_name = 'link_click'
and e.unstruct_event_com_snowplowanalytics_snowplow_link_click_1.targetUrl::string = 'https://someexternalsite.com'

group by 1

Results

reporting_datepage_views
2022-01-2642
2022-01-27192
2022-01-2865
2022-01-2925
2022-01-30113

At this point you can start to see the divergence. With Snowpipe all contexts sit within the contexts column, nested within an array. As such, we have to flatten the context array and then filter for the context we need – in this case when page view context.

Conversely the RDB loader has already flattened all the contexts into their own distinct column, making selecting the page_view_id easier.

Now let’s go a little further…

Example 3: Cutting the results by device

So now we want to slice and dice our results by device. For this we can use the device_family field from the UA Parser context.

Snowpipe on TSV

As previously mentioned there are two context columns, the context and derived_context columns. The UA Parser context is derived during enrichment and as such sits in the derived_context column.

select
  date_trunc('day', derived_tstamp) as reporting_date,
  dc.value:data.deviceFamily as device_family, -- device family from UA Parser context
  count(distinct c.value:data.id::string) as page_views

from atomic.events e,
lateral flatten(input => e.contexts:data, outer => true) c,
lateral flatten(input => e.derived_contexts:data, outer => true) dc --flatten derived contexts to return UA parser context

where e.event_name = 'link_click'
and e.unstruct_event:data.data.targetUrl::string = 'https://someexternalsite.com'
and c.value:schema::string = 'iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0'
and dc.value:schema::string = 'iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0' -- filter for UA Parser's schema

group by 1,2

RDB Loader

select
  date_trunc('day', derived_tstamp) as reporting_date,
  contexts_com_snowplowanalytics_snowplow_ua_parser_context_1[0].deviceFamily::string as device_family, --select straight from ua parser context
  count(distinct contexts_com_snowplowanalytics_snowplow_web_page_1[0].id::string) as page_views

from atomic.events e

where e.event_name = 'link_click'
and e.unstruct_event_com_snowplowanalytics_snowplow_link_click_1:targetUrl::string = 'https://someexternalsite.com'

group by 1,2

Results

reporting_datedevice_familypage_views
2022-01-26iPhone29
2022-01-26Nokia 313
2022-01-26Spider24
2022-01-27iPhone25
2022-01-27Nokia 396

Now with Snowpipe we are having to perform two lateral flattens to fetch the two contexts we need. This increases query complexity as well as potentially reducing performance.

Furthermore, since we are filtering the UA Parser’s schema we are potentially excluding page views for which the UA Parser context was not enabled.

With the RDB loader, if at any point in time a context is added it will be separated into it’s own column. For events that do not have that particular context the column will simply be NULL. This means we do not risk filtering out events by mistake.

Example 4: Removing Bot traffic

We have noticed a portion of our traffic is coming from bots and wish to filter this out. Luckily there is an enrichment to identify just such activity, the spiders and robots enrichment.

Snowpipe on TSV

The spiders and robots context also sits within the derived context column. As such we need to flatten the derived context column and then return both the ua parser and spiders and robots context fields.

Since when flattened these two contexts will be split across multiple rows, we need to perform some sort of aggregation to collapse the event back down to a single row.

with prep as (

  select 
    e.event_id,
    e.event_name,
    e.derived_tstamp,
    e.unstruct_event,
    --collapse all flattened contexts into a single row again.
    array_agg(case when dc.value:schema::string like 'iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-%' then dc.value:data end) as ua_parser_context_1,
    array_agg(case when dc.value:schema::string like 'iglu:com.iab.snowplow/spiders_and_robots/jsonschema/1-0-0%' then dc.value:data end) as spiders_and_robots_context_1,
    array_agg(case when c.value:schema::string like 'iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-%' then c.value:data end) as page_view_context_1

  from atomic.events e,
  lateral flatten(input => e.derived_contexts, path => 'data', outer => true) dc,
  lateral flatten(input => e.contexts, path => 'data', outer => true) c

  where e.event_name = 'link_click'
  and e.unstruct_event:data.data.targetUrl::string = 'https://someexternalsite.com'

  group by 1,2,3,4

)

select
  date_trunc('day', derived_tstamp) as reporting_date,
  p.ua_parser_context_1[0].deviceFamily::string as device_family,
  count(distinct p.page_view_context_1[0].id::string) as page_views

from prep p

where not p.spiders_and_robots_context_1[0].spiderOrRobot::boolean --filter out bots

group by 1,2

RDB Loader

select
    date_trunc('day', derived_tstamp) as reporting_date,
    contexts_com_snowplowanalytics_snowplow_ua_parser_context_1[0].deviceFamily::string as device_family,
    count(distinct contexts_com_snowplowanalytics_snowplow_web_page_1[0].id::string) as page_views

from atomic.events e

where e.event_name = 'link_click'
and e.unstruct_event_com_snowplowanalytics_snowplow_link_click_1:targetUrl::string = 'https://someexternalsite.com'
and not e.contexts_com_iab_snowplow_spiders_and_robots_1[0].spiderOrRobot::boolean

group by 1,2

Results

reporting_datedevice_familypage_views
2022-01-26iPhone29
2022-01-26Nokia 313
2022-01-27iPhone25
2022-01-27Nokia 396

You will notice at this point the prep CTE used in the Snowpipe example results in a very similar table structure to that of the RDB loader, in that every chosen context entity now has its own column.