Coinbase Logo

Language and region

SOON (Spark cOntinuOus iNgestion) for near real-time data at Coinbase - Part 1

Tl;dr: The increasing demand for data with low latency at Coinbase has required us to renovate our data ingestion and replication framework. This is the first of a series of blog posts introducing SOON and its ecosystem, the effort to resolve data latency, accuracy, and reliability issues in an unified manner.

By The Coinbase Data Platform & Services Team

Engineering

, January 25, 2023

Coinbase

Data with low latency is critically important to the data warehouse, especially to support real-time incident analysis and real-time metrics for dashboards. Though we have up-to-date data in databases like PostgreSQL, MongoDB and DynamoDB, they are designed for OLTP workloads and cannot adequately support the scenarios described above. The data need to be replicated to a data warehouse to serve queries doing group-bys, and joins across tables, from different systems or services. Therefore, we need a solution that can replicate tables of any size from OLTP stores to data warehouses in a timely manner.

The simplest way to achieve this is through incremental table replication based on row filtering. Typically, we can pick a timestamp column like updated_at to track the replication progress or lag between the source and the destination. Ideally, this column needs to be extra indexed on the source OLTP store to improve filtering performance. However, this is at the cost of reduced write performance and in reality, the performance of our Postgres incremental pipeline based on this method turns out to be unstable in production especially for large tables. On the other hand, this method cannot handle situations where hard deletes exist for the table. For the MongoDB incremental replication pipeline, we explored another mechanism based on MongoDB OpLog and Kinesis. However, we experience some data loss when traffic spikes in our production pipeline such that we still need to rely on full snapshots to resync and correct the incrementally built tables every day.

To resolve the pain points described above, Coinbase Data Platform and Services team designed and implemented SOON (Spark cOntinuOus iNgestion) based on Kafka, Kafka Connect, and Spark as an entirely new incremental table replication solution, which also supports Kafka events ingestion naturally. As mentioned in the previous blog for Kafka infrastructure renovation at Coinbase, SOON is an important streaming ingestion use case in the entire Kafka ecosystem.

Architecture

At Coinbase, we group the Kafka events into two categories: CDC (Change Data Capture) and non-CDC events. CDC events are the change-data-capture events generated by Kafka Connect source connectors; non-CDC events are the normal events sent out by the frontend or backend services through Kafka client libraries. Both types of events can be inserted (or appended) or merged (allow updates and deletes) into the Delta Lake on S3, which was open sourced by Databricks in June 2022

The following diagram shows the overall architecture for SOON: 

SOON1

SOON is designed to ingest the incremental changes (appends, updates, and deletes) to an existing table on Delta Lake. Although, we still need a different way to construct the starting version of a newly onboarded table. For non-CDC events, we can build a new table by consuming all existing Kafka events and partitioning the table based on some timestamp column. However, for CDC events, it’s a bit more complicated. Currently we rely on the existing full-snapshot dump pipeline built previously by the team using Airflow and AWS EMR, where data are exported to S3 then fully loaded into Snowflake on a daily basis. 

As the new table bootstrapping step for SOON, we use Spark Snowflake Connector to do a one-time copy of the table from Snowflake to Delta Lake, then trigger the SOON job by setting the Kafka offsets to a timestamp before the time snapshot was taken. If the table to be onboarded is small, we have an alternative option to directly snapshot the table into Kafka. This is supported by most Kafka Connect source connectors — including MongoDB source connector’s copy existing data feature and Postgres source connector’s initial snapshot mode feature. In the future, we want to remove the dependency on Snowflake such that SOON can directly load full snapshot data from S3.

Snowflake is still the main data warehouse solution at Coinbase. We need a way to copy SOON tables from Delta Lake to Snowflake before these pipelines are migrated to Delta Lake. We leverage Delta Lake’s change data feed (CDF) feature to support such incremental copy, which makes it possible to support hard deletes.

Performance

We have implemented a lot of innovative ideas in SOON to improve its performance. The biggest append-only non-CDC topic we have onboarded to SOON is the observability-metric topic, which has an ingress traffic at the level of hundreds of thousands events per second in an in-house binary format. The biggest CDC-merge table onboarded has a snapshot size of a few TBs in snappy parquet format and an inbound CDC update traffic of hundreds of events per second in JSON format. We strike a balance between the cost and data latency — for the biggest append-only table, we configured the latency to be 15 minutes and 30 minutes for the biggest CDC-merge table. We do have a few small tables which require lower latency and they are refreshed every minute. For large CDC-merge tables, the latency can be configured lower than 30 minutes if users can justify their business requirement for it at the extra computing expense.

Unified Ingestion Framework

A positive side effect of SOON is that this new solution unifies the user onboarding experience for all scenarios as long as they can convert what to ingest into a Kafka event. Previously, the team designed different architectures to support different ingestion or replication pipelines for Postgres, Mongo, Dynamo, MySQL and Kafka, which make the user onboarding experience hard and our team’s maintenance and operation overhead high. With SOON, all users only need to learn one set of onboarding experience and our team only needs to develop and maintain one incremental ingestion framework. 

SOON supports ingesting non-CDC events of any format for both append-only and merge scenarios. Currently, we have event decoders implemented for JSON, protobuf with schema registry, protobuf with proto descriptor files and binary format with any customized UDF for deserialization. For CDC events, currently we only support JSON format, and the CDC events must be in the standard SOON CDC schema. Below is a dummy example of a CDC event.

SOONcode

In this example:

  • “oc” denotes the “operation code” field, where only 3 values are allowed. “I” for the “Insert” event, “D” for the “Delete” event and “U” for the “Update” event.

  • “ns” denotes the “namespace” field, which is composed of two parts, the database or schema name, and the table name. This is currently serving as a placeholder field for future support of a shared Kafka CDC topic for multiple tables.

  • “ot” denotes the “operation time” field, which is the timestamp when the operation happens to trigger the current CDC event.

  • “pk” denotes the “primary key” field, which is a struct of all columns composing the composite primary keys or the unique index for a given row in the table. These columns will be used in the “merge” query to construct the match condition. The struct values are from a post-image after the change. It doesn’t matter if it isn’t from an immediate post-image after the change.

  • “va” denotes the “value” field, where it keeps a struct of the rest of the columns for a given row excluding the “pk” fields. Same as the “pk” field, its struct values are also from the same post-image after the change.

This is the standard CDC format that all databases’ source connectors should follow. We implemented our own in-house Kafka Connect Single Message Transformations (SMTs) to transform raw CDC events into this standardized format. Currently, this has been done for the MongoDB and Postgres source connectors.

SOON Table Onboarding

To onboard a table to SOON, users need to author a configuration file like the below:

SOONcode2

This configuration is using the HOCON format and it provides basic information for a SOON job, like source Kafka topic name, Kafka cluster name, output table name, backfill archive path, table schema, simple transformations by the derived columns, partition columns, and more. 

We plan to reduce users’ onboarding effort by building an external schema service to provide the schemas for replicated CDC tables and the append-only tables where protobuf format is used.

Future Work

We have only been utilizing the SOON framework for about one year at Coinbase. To ensure long-term success for this system, we will continue to invest into expanding SOON in areas like dynamic schema generation, better cluster sharing, simplified bootstrapping process, and easy onboarding.

With all the exciting features to be implemented for SOON, we will continue to update our blog with our progress in this new solution to meet the needs for low latency data, which exists for all data-driven products and decision-making.

Acknowledgements

We'd like to thank Chen Guo, Arujit Pradhan, Xinyu Liu, Xuan He, Yisheng Liang, Eric Sun, Leo Liang, and Michael Li for their contribution and support to this project.

Coinbase logo
The Coinbase Data Platform & Services Team

About The Coinbase Data Platform & Services Team

The Coinbase Data Platform & Services Team