ClickHouseDataset#
ClickHouseDataset
is the successor of the DailyDataset
from the
previous section.
This data ingestion pattern was created as a realization that writing data in GCS is good for portability (can read from both BigQuery and ClickHouse) but does not match the functionality we can get by writing directly to ClickHouse, in particular
The state of the ingestion table can be used to do incremental ingestion (only ingesting what we don’t already have).
The
ReplacingMergeTree
engine helps deduplicate input data.
Subclassing the ClickHouseDataset
class.#
This is the same as for the DailyDataset
class. We also follow the same directory structure
convention where we have one directory and one dataaccess.py
per datasource, for example:
# FILE: src/op_analytics/datasources/governance/dataaccess.py
from op_analytics.coreutils.clickhousedata import ClickhouseDataset
class Governance(ClickhouseDataset):
# Delegation events
DELEGATE_CHANGED_EVENTS = "ingest_delegate_changed_events_v1"
# Delegates
DELEGATES = "ingest_delegates_v1"
# Proposals
PROPOSALS = "ingest_proposals_v1"
Each enum member corresponds to a table where data will be ingested.
CREATE TABLE#
For each ingestion table defined in dataaccces.py
you are expected to provide a corresponding
CREATE TABLE
SQL file, which goes in the ddl/
folder right beside dataaccess.py
.
The file name should match the enum member value used in the ClickHouseDataset
subclass:
src/op_analytics/datasources/governance/ddl
├── ingest_delegate_changed_events_v1.sql
├── ingest_delegates_v1.sql
├── ingest_proposals_v1.sql
├── ingest_votes_v1.sql
└── ingest_voting_power_snaps_v1.sql
The ClickHouseDataset
class provides a create_table()
function which locates the DDL and
runs it against the database. Most of the time users don’t need to call create_table()
directly
because the method is called every time write()
is called.
Writing Data#
The ClickHouseDataset
class has a write()
method, very similar to the one in the DailyDataset
class. The method accepts a polars dataframe and will insert it to the destination table. For example:
DaoPowerIndex.CPI_SNAPSHOTS.write(powerindex.snapshots_df)
Keep in mind that there will be cases when first going through a polars dataframe is not the best
approach. So you can always insert data directly into the ClickHouse table. The enum members
of a ClickHouseDataset
subclass give you access to the db and table name strings so you can
use them as needed.
If you want to you can even set up your own INSERT INTO
statements in SQL files
and then coordinate execution of the statements from the execute.py
file.
Execution#
Similar to what we do for DailyDataset
data sources we have an execute.py file that has the
entrypoint function for the code that runs to ingest data. Here is an example:
from .historical import HistoricalCPI
from .powerindex import ConcentrationOfPowerIndex
from .dataaccess import DaoPowerIndex
def execute_pull():
powerindex = ConcentrationOfPowerIndex.fetch()
historical = HistoricalCPI.fetch()
DaoPowerIndex.CPI_SNAPSHOTS.write(powerindex.snapshots_df)
DaoPowerIndex.CPI_COUNCIL_PERCENTAGES.write(powerindex.council_percentages_df)
DaoPowerIndex.CPI_HISTORICAL.write(historical.df)
The business logic is isolated in separate dedicate modules (historical.py
, powerindex.py
)
and the prepared dataframes are written out using the write()
method.
Scheduling#
This is the same as for DailyDataset
. We have custom built Dagster assets that get assigned
to a specific scheduled job in our Dagster defs.py
.
Monitoring#
Monitoring is an area where we can improve ClickHouseDataset
. We don’t use
markers here at the moment, so we don’t have easy visibility of when data was written on a given
table. This is something that must be implemented in the future and will be important as new
data sources start adopting ClickHouseDataset
over DailyDataset
.