DailyDataset (GCS)#

Historical Note#

When we started building the OP Labs data platform we were heavy BigQuery users. For our most important dashboards we still are. To make it easier to migrate from BigQuery to ClickHouse we used GCS as the landing spot for ingested data. Data in GCS can be read from BigQuery by using external tables and from ClickHouse by using the s3 Table Function.

The DailyDataset data ingestion pattern was created with GCS in mind, and so all of the data ingested using this pattern lands in GCS. If you are thinking of integrating a new data source I strongly recommend you use the ClickHouseDataset pattern instead.

Subclassing the DailyDataset class#

Let’s take the defillama source as an example. For this datasource the data access class is defined in the src/op_analytics/datasources/defillama/dataaccess.py directory. At the time of writing here is how this file looks like:

from op_analytics.coreutils.partitioned.dailydata import DailyDataset


class DefiLlama(DailyDataset):
    """Supported defillama datasets."""

    # Chain TVL
    CHAINS_METADATA = "chains_metadata_v1"
    HISTORICAL_CHAIN_TVL = "historical_chain_tvl_v1"

    # Protocol TVL
    PROTOCOLS_METADATA = "protocols_metadata_v1"
    PROTOCOLS_TVL = "protocols_tvl_v1"
    PROTOCOLS_TOKEN_TVL = "protocols_token_tvl_v1"
    # Enrichment:
    PROTOCOL_TVL_FLOWS_FILTERED = "tvl_flows_breakdown_filtered_v1"

    # Stablecoins TVL
    STABLECOINS_METADATA = "stablecoins_metadata_v1"
    STABLECOINS_BALANCE = "stablecoins_balances_v1"

    # DEX Volumes, Fees, and Revenue at chain and chain/name levels of granularity
    VOLUME_FEES_REVENUE = "volume_fees_revenue_v1"
    VOLUME_FEES_REVENUE_BREAKDOWN = "volume_fees_revenue_breakdown_v1"

    # Yield
    YIELD_POOLS_METADATA = "yield_pools_metadata_v1"
    YIELD_POOLS_HISTORICAL = "yield_pools_historical_v1"

    # Lend/Borrow
    LEND_BORROW_POOLS_METADATA = "lend_borrow_pools_metadata_v1"
    LEND_BORROW_POOLS_HISTORICAL = "lend_borrow_pools_historical_v1"

    # Protocols metadata obtained from "dexs/dailyVolume", and "fees/dailyFees"
    # and "fees/dailyRevenue" endpoints.
    VOLUME_PROTOCOLS_METADATA = "volume_protocols_metadata_v1"
    FEES_PROTOCOLS_METADATA = "fees_protocols_metadata_v1"
    REVENUE_PROTOCOLS_METADATA = "revenue_protocols_metadata_v1"

    # Token Mappings
    TOKEN_MAPPINGS = "dim_token_mappings_v1"
    PROTOCOL_CATEGORY_MAPPINGS = "dim_protocol_category_mappings_v1"

The DailyDataset class is an Enum and the body of the subclass are the enumeration members which are strings. Each of these strings corresponds to the name of a table where ingested data will be written to.

Discoverability#

The reason we require a datasource/<name>/dataaccess.py module for each datasource and an enumeration of the associated tables is discoverability. If someone wants to know what tables are ingested for a given datasource they can easily find it in that file.

Writing Data#

The DailyDataset class is a simple enum but it offers a number of built-in methods that help implement data ingestion. The most important one is the write() method.

When ingesting data the mechanism of obtaining and transforming the data will be specific to each data source. The custom code needs to figure out a way to produce a polars dataframe with the data and then call the write() method of the destination table, for example

# Write stablecoin balances.
DefiLlama.STABLECOINS_BALANCE.write(
    dataframe=most_recent_dates(result.balances_df, n_dates=BALANCES_TABLE_LAST_N_DAYS),
    sort_by=["symbol", "chain"],
)

The only expectation is that the dataframe you pass to the write() method must have a dt column, since all data in GCS is written out partitioned by dt.

Root Paths#

The name of the DailyDataset subclass and the table enum member string value will be combined by the system to determine the root_path and in turn the exact location in GCS where the data will be written.

For example, the class DefiLlama(DailyDataset) above has a member called STABLECOINS_BALANCE = "stablecoins_balances_v1". The root path for this will be defillama/stablecoins_balances_v1 and the full URI for a parquet file corresponding to dt=2025-04-01 will be:

gs://oplabs-tools-data-sink/defillama/stablecoins_balances_v1/dt=2024-01-01/out.parquet

Other built-in functionality#

There are a number of other methods that come as part of the DailyDataset class. Refer to the code in dailydata.py for more details.

Execution and Scheduling#

To execute the ingestion we adopted the convention of having one or more execute.py files inside the datasource directory. Depending on the complexity of the datasource, you can have one single execute.py at the top, or event some sub-directories each with their own execute.py files where ingestion of a subset of tables in the data source is handled.

Using the defillama stablecoins data as an example, we have a file called src/op_analytics/datasources/defillama/stablecoins/execute.py.

For scheduling we use Dagster assets which are then scheduled to run daily in our Dagster defs.py file. Below we show the Dagster assent definition for the stablecoins data pull. Note how we include creation of BigQuery external tables there, which is easy thanks to built-in functionality in the DailyDatset class . We don’t need to recreate the external table every time the job runs, but it helps to have that code there so that people can be aware of where the external table definition is coming from.

@asset
def stablecoins(context: AssetExecutionContext):
    """Pull stablecoin data."""
    from op_analytics.datasources.defillama.stablecoins import execute
    from op_analytics.datasources.defillama.dataaccess import DefiLlama

    result = execute.execute_pull()
    context.log.info(result)

    DefiLlama.STABLECOINS_METADATA.create_bigquery_external_table()
    DefiLlama.STABLECOINS_METADATA.create_bigquery_external_table_at_latest_dt()
    DefiLlama.STABLECOINS_BALANCE.create_bigquery_external_table()

Prototyping and Debugging#

To prototype we use IPython notebooks. In the notebooks we exercise the execute_pull functions or some helper functions defined for a specific datasource. For some examples refer to the notebooks/adhoc/defillama/ directory where we have some notebooks that were used to prototype the original implementation of defillama pulls and are used regularly to debug issues observed in production.

Monitoring#

Like for everything else in our data platform we use markers to monitor DailyDataset ingestion. There is one marker per root_path and dt combination. This allows us to keep an eye of new data arriving in time for each table and raise alerts on our go/pipeline Hex dashboard when things are delayed. The marker metadata includes the row count ingested, so we can included that in the monitoring dashboard as well, right along side the number of hours since last ingested.

Here is a snapshot of part of the dashboard from April 2025:

architecture

Advanced use cases#

Some data sources are not easy to implement because of the way in which data is exposed by third-party systems. API calls may be error prone and need to be retried. Or APIs may be rate-limited requiring our systems to back off if they get throttled. Some cases may require a lot of transformations to get the data in the tabular form that we want to ingest it as. All of these cases require creative solutions.

If you encounter a challenge try to check the code to see if maybe there is another data source that has a similar problem that you can borrow some code from.

When to use ClickHouseData instead#

There is also the issue of recovering from transient failures. The DailyDataset pattern requires you to know all of the data for a single dt at once. If that is a lot of data there may be cases where you do a lot of work (call many endpoints) and something failing in the middle causes the job to crash. All the progress made is now lost.

Ingesting data from defillama and ingesting token metadata (totalSupply, etc.) for ERC-20 tokens are two ingestion use cases that have presented us with that all-or-nothing challenge we encounter with DailyDataset.

For both of these cases the solution we ended up implementing leverages ClickHouse. We write the data for a given root_path and dt in a buffer ClickHouse table. If the data pull fails halfway through the next time it kicks off we can look at the buffer table to see what we were able to complete on the last run and we resume from there. Once we are done we go ahead and write the entire dt data to GCS.

This solution works well, but it begs the question: If ClickHouse can offer better facilities for partial progress and incremental data ingestion why don’t we just use ClickHouse as the final destination (instead of as a buffer on the way to GCS)?

The answer is, for some processes (looking at you Defillama), we still need the data in GCS because our reporting logic lives in BigQuery, so we are not ready to move entirely to ClickHouse.

That said, after realizing how convenient ClickHouse ReplacingMergeTree tables are for ingesting data we decided to provide better support for that pattern, which is what led to the creation of the ClickHouseData ingestion pattern which we go over in the next section.