General Purpose Data Pipelines: Transforms#
In the previous section we went over how to build data pipelines that process onchain data using ClickHouse as the compute engine. In this section we talk about data pipelines for a wider variety of use cases.
The “transforms” system was developed before we built the blockbatch load system and so we also use it for onchain data (e.g. the interop transforms group). That said it would be a good idea to migrate some of the onchain data tables from the transforms system to the blockbatch system, since the latter has better data integrity guarantees and better monitoring.
Transform Groups#
A transform group is a collection of tables and views that are processed together. You can think of a group as a mini-pipeline where a series of tables are populated one after the other.
Let’s take as an example the interop
transform group. This group has the following ClickHouse
tables:
dim_erc20_with_ntt_first_seen_v1
fact_erc20_oft_transfers_v1
fact_erc20_ntt_transfers_v1
dim_erc20_ntt_first_seen_v1
dim_erc20_oft_first_seen_v1
fact_erc20_create_traces_v2
export_fact_erc20_create_traces_v2
Transform Specification#
Each table in a transform group is defined by a CREATE
statement that defines the table schema
and ClickHouse engine (including the very important ORDER BY
clause). And an INSERT
statement
that will be used to populate the table.
Directory Structure and Naming Convention#
The directory structure for a transform group consists of two folders:
create/
: Has theCREATE
statements for all the tables in the group.insert/
: Has theINSERT
statements for all the tables in the group.
The naming convention for files in each of the folders is the following:
`[INDEX]_[tablename].sql
Where [INDEX]
is a number that indicates the order of execution and [tablename]
is the name of
the table.
Let’s take the fees
transforms croup as an example:
src/op_analytics/transforms/fees
├── create
│ ├── 01_agg_daily_transactions_grouping_sets.sql
│ ├── 02_agg_daily_transactions.sql
│ ├── 10_view_agg_daily_transactions_tx_from_tx_to_method.sql
│ ├── 11_view_agg_daily_transactions_tx_to_method.sql
│ └── 12_view_agg_daily_transactions_tx_to.sql
└── update
├── 01_agg_daily_transactions_grouping_sets.sql
└── 02_agg_daily_transactions.sql
This group has 2 tables and 3 views. For each of the two tables we have a corresponding update
SQL statement that is used to populate the table.
Note that it is perfectly fine to have any kind of SQL file in the create
folder. Although it’s
better not to get too creative. If you inspect the SQL files for the views
you will see that they
are straight-up CREATE VIEW
ClickHouse statements.
Transform Execution Model#
The execution model for a transform is very simple:
When the system runs a transforms it first runs all the
CREATE
statements in order according to their index.It then runs all the
INSERT
statements in order according to their index.The system provides the execution date as a query parameter (
dt
) that may or may not be utilized by theINSERT
statement.
Building Data Pipelines#
To build a transforms
pipeline all you need to do is create a new directory in the src/op_analytics/transforms
directory and add the appropriate SQL files.
Execution#
We provide a function to execute a transform group for a given date range:
src.op_analytics.transforms.main.execute_dt_transforms
For a detailed description of the parameters see the function’s docstring.
Prototyping and Backfilling#
The execute_dt_transforms
function is used from an IPython notebook to prototype and also to
manually backfill data. Notebooks for transforms are located in the
notebooks/adhoc/clickhouse_transforms
directory. Browse that directory for examples.
Scheduling#
All transforms-related Dagster assets are defined in the src/op_analytics/dagster/assets/transforms.py
file.
There is no hard and fast rule for how to define the assets. Generally we schedule one asset per
group, but there are cases where we may want to have more control over what is executed on each
day and so each asset can decide how it calls the execute_dt_transforms
function.
Similarly for Dagster jobs, there is no hard and fast rule but we generally define one scheduled
job per transform group. The jobs are defined in the src/op_analytics/dagster/defs.py
file.
Markers#
We use markers to track the execution of the transforms. The markers are stored in the
etl_monitor.transform_dt_markers
table in ClickHouse.
Monitoring#
Unfortunately we have not yet built monitoring tools for the transforms system. I have talked about
the idea of having quality
tables as part of a transforms group. A quality
table is a table that
contains information about anything that might be wrong with the data in the group.