Marker Metadata Layer#
Observability is perhaps the most important aspect of any data pipeline. How can we be sure it is working as expected if we can’t see what is happening?
For blockbatch
processing (and also for several other patterns at OP Labs) we introduce the idea
of data pipeline markers
. This is simple: there is one marker for each parquet file written to
the data lake, and we have separate markers for each of the different blockbatch
data processing
steps.
Why are markers needed?#
Since locations in the data lake are deterministic (modulo the dt
value), we can use the data lake
itself as the source of truth for data that has been processed or needs to be processed.
For example, what blocks have been ingested for chain=op in the last 5 days? We could answer this
question by listing the contents of the GCS bucket. However, the listing approach is very inefficient
so instead we store metadata for each parquet file as a marker, which is written as a row in the
blockbatch_markers
table in ClickHouse cloud. At OP Labs we rely heavily on ClickHouse cloud,
so it was a natural choice to store the markers there.
In summary, the reason markers exist is so we can very quickly and cheaply answer questions about what data do we have in the data lake.
Monitoring#
The blockbatch_markers
table in ClickHouse stores the following metadata for each parquet file in
the data lake:
chain
: The chain the data belongs to.dt
: The date partition.root_path
: The logical name of the data artifact.data_path
: The physical path of the parquet file in the data lake.min_block
: The minimum block number in the parquet file.max_block
: The maximum block number in the parquet file.row_count
: The number of rows in the parquet file.num_parts
: The number of date partitions associated with the batch that the file belongs to.process_name
: The name of the process that wrote the parquet file.writer_name
: The name of the machine where the process was running.updated_at
: The timestamp of when the process wrote the parquet file.
For a given chain we know in advance how many blocks are produced per day (usually 1/s or 2/s).
So for a given date we can come up with an expectation of the number of blocks that should be
processed on a given dt
. The markers table let’s us very quickly query how many blocks were
actually processed for a given dt
, and this is in fact what we use for our pipeline monitoring
Hex dashboards (which can be found at go/pipeline
).
After data is ingested we can use the ingestion markers to formulate the expectation for data that
needs to be processed across the different blockbatch
models that we support. That expectation
and markers for the output model root_paths
are used to monitor the data processing parts of
the pipeline.
Job Planning#
Job planning is no different than processing. We can find out what data has been ingested and not
processed yet and then schedule work accordingly. In that way the blockbatch_markers
table acts
like a processing queue for pending work.