# General Purpose Data Pipelines: Transforms In the previous section we went over how to build data pipelines that process onchain data using ClickHouse as the compute engine. In this section we talk about data pipelines for a wider variety of use cases. The "transforms" system was developed before we built the blockbatch load system and so we also use it for onchain data (e.g. the interop transforms group). That said it would be a good idea to migrate some of the onchain data tables from the transforms system to the blockbatch system, since the latter has better data integrity guarantees and better monitoring. ## Transform Groups A transform group is a collection of tables and views that are processed together. You can think of a group as a mini-pipeline where a series of tables are populated one after the other. Let's take as an example the `interop` transform group. This group has the following ClickHouse tables: - `dim_erc20_with_ntt_first_seen_v1` - `fact_erc20_oft_transfers_v1` - `fact_erc20_ntt_transfers_v1` - `dim_erc20_ntt_first_seen_v1` - `dim_erc20_oft_first_seen_v1` - `fact_erc20_create_traces_v2` - `export_fact_erc20_create_traces_v2` ## Transform Specification Each table in a transform group is defined by a `CREATE` statement that defines the table schema and ClickHouse engine (including the very important `ORDER BY` clause). And an `INSERT` statement that will be used to populate the table. ### Directory Structure and Naming Convention The directory structure for a transform group consists of two folders: - `create/`: Has the `CREATE` statements for all the tables in the group. - `insert/`: Has the `INSERT` statements for all the tables in the group. The naming convention for files in each of the folders is the following: - `[INDEX]_[tablename].sql Where `[INDEX]` is a number that indicates the order of execution and `[tablename]` is the name of the table. Let's take the `fees` transforms croup as an example: ``` src/op_analytics/transforms/fees ├── create │ ├── 01_agg_daily_transactions_grouping_sets.sql │ ├── 02_agg_daily_transactions.sql │ ├── 10_view_agg_daily_transactions_tx_from_tx_to_method.sql │ ├── 11_view_agg_daily_transactions_tx_to_method.sql │ └── 12_view_agg_daily_transactions_tx_to.sql └── update ├── 01_agg_daily_transactions_grouping_sets.sql └── 02_agg_daily_transactions.sql ``` This group has 2 tables and 3 views. For each of the two tables we have a corresponding `update` SQL statement that is used to populate the table. Note that it is perfectly fine to have any kind of SQL file in the `create` folder. Although it's better not to get too creative. If you inspect the SQL files for the `views` you will see that they are straight-up `CREATE VIEW` ClickHouse statements. ## Transform Execution Model The execution model for a transform is very simple: - When the system runs a transforms it first runs all the `CREATE` statements in order according to their index. - It then runs all the `INSERT` statements in order according to their index. - The system provides the execution date as a query parameter (`dt`) that may or may not be utilized by the `INSERT` statement. ## Building Data Pipelines To build a `transforms` pipeline all you need to do is create a new directory in the `src/op_analytics/transforms` directory and add the appropriate SQL files. ### Execution We provide a function to execute a transform group for a given date range: - `src.op_analytics.transforms.main.execute_dt_transforms` For a detailed description of the parameters see the function's docstring. ### Prototyping and Backfilling The `execute_dt_transforms` function is used from an IPython notebook to prototype and also to manually backfill data. Notebooks for transforms are located in the `notebooks/adhoc/clickhouse_transforms` directory. Browse that directory for examples. ### Scheduling All transforms-related Dagster assets are defined in the `src/op_analytics/dagster/assets/transforms.py` file. There is no hard and fast rule for how to define the assets. Generally we schedule one asset per group, but there are cases where we may want to have more control over what is executed on each day and so each asset can decide how it calls the `execute_dt_transforms` function. Similarly for Dagster jobs, there is no hard and fast rule but we generally define one scheduled job per transform group. The jobs are defined in the `src/op_analytics/dagster/defs.py` file. ### Markers We use markers to track the execution of the transforms. The markers are stored in the `etl_monitor.transform_dt_markers` table in ClickHouse. ### Monitoring Unfortunately we have not yet built monitoring tools for the transforms system. I have talked about the idea of having `quality` tables as part of a transforms group. A `quality` table is a table that contains information about anything that might be wrong with the data in the group.