Coinbase Logo

Language and region

Optimizations in SOON (Spark cOntinuOus iNgestion) - Part 2

Tl;dr: Building a streaming ingestion framework at scale can be complex and expensive. In this part of the SOON blog series, we highlight some of the performance optimizations and improvements we have incorporated in SOON.

By The Coinbase Data Platform & Services Team

Engineering

, February 8, 2023

Coinbase

In a recent blog post, we introduced Spark cOntinuOus iNgestion (SOON) and its ecosystem — a unified, easy-to-use, and reliable framework for near real-time incremental streaming data ingestion. With an increasing number of large datasets getting on-boarded to SOON, it becomes important to improve the ingestion performance at the system level. In this blog post, we dive deeper into some well-known and out-of-the-box performance improvement ideas we have incorporated in SOON.

By working to optimize SOON, we are trying to counter the following:

  • Reduce ingestion cost by limiting compute resources. Streaming ingestion is more expensive than batch processing most of the time. Even though we multiplex ingestion pipelines to Spark clusters and enable auto-scaling, due to the nature of streaming, we keep the clusters always ON. So, a constant theme of our optimization strategy is to use as few resources as possible to reduce the ingestion cost of data streaming to the data warehouse.

  • Strikethe right balance between latency and cost. We have tiers of clusters serving similar ingestion jobs with similar latency SLOs. This results in independently configurable clusters based on the use cases and latency requirements; i.e. machine types or spark configs for CDC (Change Data Capture) and append-only events can be different. 

  • Integration with existing Data Platform Solutions at Coinbase. Snowflake is still being used as the primary data warehouse solution at Coinbase, which results in duplicate data in both Snowflake and Delta Lake. We want to have comparable user experience and performance while querying data in Delta Lake.

Merge optimizations

SOON supports both inserting (append-only) and merging (upsert + delete) for two types of Kafka events, CDC and non-CDC, to the Delta Lake. It can be used for different use cases, like merge updates and deduplication, at different scales from petabytes of append-only backend events to terabytes of database replication. 

Ingesting append-only events is relatively straightforward, as we need to take care of proper partitioning schemes and ensure not falling into the small file problem. In merge scenarios, the slowest performing part is the merge operation, which takes close to 80% of the overall micro-batch processing time. In order to optimize non-append-only ingestions, we want to attempt the merge operation.

A pseudo query for the merge operation in SOON:

updatedscreenshots

The merge operation updates, inserts, and deletes data in the target Delta Lake table by comparing it with the incremental micro-batch built from the source Kafka topic. The `id` column in this merge query is a merge key that the user needs to configure for the SOON job.

The performance impact of the merge query is not surprising since the merge query involves two joins with massive I/O:

  • A first inner join to find a set of parquet files corresponding to the match condition based on the merge key in the delta table.

  • A second outer join to write back newly-created parquet files after performing the insert/update/delete.

Common tuning for efficient merge operations

So far, we have identified the bottleneck and researched some solutions around it. We used a few commonly known optimization practices for merge operations in SOON.

  • Partitioning: We expose configs to allow traditional time-based partitioning and extensively use partitioning schemes for non-CDC events. What we found is that partitioning enhances performance drastically. For CDC events, partitioning does not look like a good idea in our setup due to the following:

  • Kafka Connect sends delete events with only the merge keys, which don’t contain any information about the partition timestamp columns. (It would be a bad idea to use merge keys as the partition columns due to its cardinality)

  • For most database tables, partitioning based on hour/date will cause many small files in different partitions for each micro-batch merge operation.

  • There might not simply be an immutable event timestamp field for partitioning.

  • Caching the Data Frame: We cache the intermediate Spark dataframes in memory wherever possible

  • Z-Order on the Merge Keys: We enable Z-order by default on merge keys for better data skipping. However, the delta lake (DBR 11.2 without Photon) merge query doesn’t leverage the column statistics or the Z-ordered columns.

  • Adjusting the shuffle partitions: We tune the shuffle partitions based on the Kafka topic traffic volume

  • Photon Engine: We found that Photon, the next Generation Lakehouse engine written in C++, is more performant for merge operations and SOON jobs, but we need to weigh its additional cost and performance gains vs. latency/SLA requirements on a case by case basis. (We are actively working with Databricks to further improve cost vs. performance gain with Photon).

All of these techniques are standard ways to improve Spark job performance, but we wanted to come up with a few more creative and effective optimizations to amplify the SOON performance further. The main idea is to decrease the number of data files to be scanned or re-written for the target Delta Lake table by either adding filters to the merge query based on the stream’s data distribution, or reducing the unnecessary updates of the same data.

Min-max range merge optimization

The merge operation scans the entire table, even for the deletion of a single row. With min-max range-based optimization, we prune parquet files extensively in the destination Delta Lake table based on the minimum and maximum values of configurable immutable columns from the source Kafka micro-batch dataframe. This limits the number of files that need to be scanned for the merge’s matching condition. The operations to find min and max are handled by the SOON application code.

Pseudo merge query with min-max range optimization:

screenshotupdate2

The filtration condition works well when it has a narrow range of updates in the source CDC dataframe. 

Comparing the end-to-end processing time of two identical streaming sources in identical environments with and without a min-max filter looks like the following:

chart1

Average micro-batch processing time for merge without min-max optimization: ~14 minutes

chart2

Average micro-batch processing time for merge with min-max optimization: ~11 minutes. It achieves about 20% performance gain after enabling it.

KMeans merge optimization 

When analyzing the distribution of the CDC traffic, we noticed that most of the changes happened in the recent timeline for a few large tables. It is only a few updates to the very old records making the min-max range very wide, thus making the min-max range optimization less than ideal.

Given this characteristic, KMeans Clustering is a good solution to further narrow down the ranges for filtering. With the help of the KMeans unsupervised learning algorithm, we generate buckets of ranges that take care of wide scan issues in min-max range optimization. We cache Dataframes heavily to make KMeans more efficient and have implemented KMeans to support three different column types: Integer/Long/Double/BigDecimal, Mongo ObjectId String, and Timestamp.

High-level procedures to do a KMeans optimization:

  • Conduct basic data sanitization like normalizations and transformations to generate feature vectors for a configured immutable column.

  • Use Spark-ML to train a KMeans unsupervised model on the feature vector.

  • Bucket the feature vectors to K clusters.

  • Use aggregation functions to find min and max ranges for each cluster.

  • Denormalize the min-max ranges to values in the original format.

For K = 3, the merge query becomes:

thirdscreenshotupdate

Comparing the end-to-end processing time of two identical streaming sources in identical environments with min-max filter and KMeans filter:

chart3

Average micro-batch processing time for merge with min-max optimization: ~5.3 minutes.

chart4

Average micro-batch processing time for merge with KMeans optimization: ~4.35 minutes. It achieves another 20% of end-to-end improvement in addition to min-max range optimization for tables where their CDC change pattern/distribution fits the KMeans scenario.

  • The overhead of running the KMeans algorithm itself is about 30+ seconds.

  • The table needs to be Z-ordered based on the KMeans columns to achieve the best performance gain.

No-update merge for deduplication

SOON supports event deduplication on merge keys while ingesting into the Delta Lake. Disabling updates in merge operations for deduplication makes merge faster as it effectively becomes append-only operations.

A sample query for deduplication-enabled merge on `id` column:

code4

UPDATE clause is not generated for duplicated events (matched by `id`) to reduce unnecessary data rewritten.

Read performance optimizations

Apart from the aforementioned optimization techniques in the ingestion pipeline, we use several other suitable features to ensure efficient query performance in Delta Lake. A few of these features include Vacuum, Optimize and ZOrder Indexing, and Partitioning on Generated columns. SOON is just a year old and currently in its adoption phase at Coinbase. We will continue to investigate the blind spot in performance and adopt new techniques to meet our low-latency streaming ingestion requirements.

Acknowledgements

We'd like to thank Chen Guo, Arujit Pradhan, Xinyu Liu, Xuan He, Yisheng Liang, Eric Sun, Leo Liang, and Michael Li for their contribution and support to this project.

Coinbase logo
The Coinbase Data Platform & Services Team

About The Coinbase Data Platform & Services Team

The Coinbase Data Platform & Services Team