Tl;dr: The increasing demands on data streaming at Coinbase urge us to renovate our Kafka infrastructure and services. The following piece shows our evolving Kafka ecosystem, the efforts to modernize our data streaming platform, and the lessons we’ve learned since this blog from August 2021.
In 2020 and 2021, the data team at Coinbase shaped up a universal Kafka infrastructure on top of AWS MSK, open-sourced Kafka Connect, and Airflow ETL to empower engineers on event streaming, data analytics, and pub-sub use cases. As Kafka adoption accelerates, we discovered that users were still encountering a set of technical issues and limitations in our solutions. To tackle these issues and future requirements, our team conducted a technological upgrade and expansion in late 2021. The most effective efforts were the following:
Building a full-featured Kafka control plane to enable multi clusters
Hardening our home-crafted Kafka data plane - Streaming SDK
Improving Kafka onboarding experience
Embracing Kafka Rest Proxy and Schema Registry
Developing the stream processing solutions in Databricks
MSK Multi Cluster Federation
Like many multi-tenant systems, Kafka has known noisy neighbor problems. For instance, lagging consumers may affect other clients with harsh latency requirements due to page cache contamination. Additionally, a Kafka cluster does not scale indefinitely and at some point, users will have to offload some topics to new clusters. Although MSK (Amazon Managed Streaming for Apache Kafka) has some level of tolerance on network splits (outage on a single availability zone), cross-region failover could in theory boost Kafka clusters’ availability from 99.9% to 99.9999%.
These facts motivated us to find a resilient multi-cluster topology. With cumulative knowledge of AWS MSK, we managed to overcome the networking challenges on cross-VPC Kafka connectivity. In 2022, a few new MSK clusters were launched in a designated streaming AWS account.
Today, besides the big multi-tenant MSK cluster named <data-platform-primary>, there is a service-dedicated MSK cluster for pursuing an extreme latency goal. We are actively migrating CDC (Change Data Capture) pipelines and observability metrics to two new MSK clusters leaving only events and pub/sub messages in the primary cluster. Migrating the metrics pipeline from Kinesis to Kafka yields a significant saving on the relevant AWS costs as well as improved latency and reliability. Eventually, topics in a similar category will be co-located in the same group of MSK clusters.
Each category has some distinct traits — for instance, CDC events are strongly ordered, while other event types and pub/sub messages are typically weakly ordered. Pub/sub messages have tighter latency and availability requirements.
It takes a few steps to provision a new MSK cluster and gets it ready for serving traffic.
Only the broker instance type can be scaled down through a rolling replacement in MSK, scaling down the cluster size or disk space is prohibited by AWS. We were able to test different cluster configurations on the same day by creating and tearing down the cluster repeatedly for seeking a cost-efficient MKS configuration.
Full-featured Kafka Control Plane
We’ve developed a Kafka control plane for the following purposes
Provisioning topics to the designated MSK clusters
Managing topic ACLs on MSK clusters
Authenticating and authorizing topic access to Kafka clients as well as SSO users
Exposing topic and cluster metadata in gRPC and REST endpoints
In essence, the control plane is shaped as the single source of truth for Kafka configurations, security policies, message routing, and failover schemes. As an example, the following YAML snippet declares the Kafka topic <test-event-proto>.
The topic is created in 2 MSK clusters <data-platform-primary> and <observability-metrics>, while messages should always be sent to the first cluster unless <failover> takes place in case the first cluster experiences a cluster-wide outage.
Since not every service requires its staging Kafka topic, to avoid the cost of a dedicated staging Kafka environment, we introduced a config called <dev-variant-suffix>. Once it is specified, two topics are created in the development clusters:
The 2nd topic can be used by the staging instance of a service. We can also name the suffix as -test03, -qa02, or -v2 according to the needs.
Changes to the above YAML file will turn into incremental topic updates in the Kafka clusters. For data safety, the control plane is programmed to never delete any topic still having messages.
The above topic metadata is exposed via a gRPC and a REST endpoint of the control plane. The vast majority of our Kafka client services are agnostic to the MSK clusters until a request is sent to the Kafka control plane via a home-crafted Streaming SDK, where the topic metadata like MSK clusters, bootstrap URIs, supported auth modes, and the load balancing algorithm are retrieved. This design also makes cross-cluster topic migration easy and low-risk.
Kafka Data Plane - Streaming SDK
Services at Coinbase make use of the Streaming SDK as the data plane to interact with different messaging systems namely, Kafka, Kinesis, SQS, and SNS. For Kafka communication, the SDK reaches the Kafka control plane periodically to refresh topic and cluster metadata as mentioned earlier. Service owners are free from the hassle of Kafka client configuration. Equipped with circuit breakers, the SDK is capable of redirecting messages according to the load-balancing algorithm { na | failover | roundrobin | replicate } in case of a regional MSK outage. It is essentially a light form of Kafka cluster federation, which benefits asynchronous communication between mission-critical services with high availability and low latency requirements.
Diving slightly deeper, typical pub/sub topics are weakly-ordered, a message producer is free to shift between the MSK clusters where the topic is provisioned, so long as the consumer side subscribes to all these clusters. There are no problems for multiple Spark streams to simultaneously write to the same Delta table in Databricks, making our dynamic routing scheme work end-to-end for data streaming pipelines.
Streaming SDK comes with canned and pre-tuned settings for Kafka producers, which further simplifies the onboarding experience. The built-in Protobuf serializer automatically registers Protobuf schemas on the Confluent Schema Registry for schema validation and enforcement. In term of observability, the SDK automatically emit metrics covering latency, success/failure rate, message size, and other health indexes for easy monitoring and alerting.
Enriched Security Model
Kafka topic ACLs are codified as a YAML file and managed by the Kafka control plane.
The control plane is responsible for propagating the read and write ACL policies across all the MSK clusters. While user-read is fetched by Kafdrop and AKHQ to determine whether an SSO user has read access to the messages on this topic.
Support All 3 MSK Auth Modes
Among the 3 authentication approaches supported by MSK, mTLS is the first one adopted by Coinbase. SASL/SCRAM and IAM access control were recently enabled on the new MSK clusters. These authentication modes are returned as metadata to Kafka clients and are automatically taken care of by the Streaming SDK.
Make Kafka Experience Delightful
Kafdrop is a handy UI tool for displaying cluster and topic configurations, consumer groups and lags, and messages on different topics. When integrated with Confluent Schema Registry, Kafdrop displays protobuf messages in JSON format without needing to import the protoc-generated libs.
AKHQ was introduced to Coinbase early in 2022 to support multi-MSK clusters and we have been impressed by its administration capabilities and integration features. Forking AKHQ in Github allows us to customize its security by hooking up with our Kafka control plane, which determines the level of access a user has. The following operations are performed through AKHQ day in and day out:
Empty topics up on users’ requests
Delete protobuf schemas in the schema registry to unblock message publishing when breaking schema changes are introduced
Delete consumer groups upon users’ requests
Reset committed offsets for a consumer group
Update topic dynamic configurations not overseen by the control plane
Both Kafdrop and AKHQ are welcomed by users as opposed to command line tools as testing and debugging become much easier.
Confluent REST Proxy and Schema Registry
For services developed in programming languages lacking a production-grade Kafka client library, Confluent REST Proxy is the recommended way to interact.
To encourage the use of structured data, and as a requirement of Kafka Connect, we shipped tools for easy protobuf schema registration. Non-backward compatible schema changes will block message publishing on the producer side to prevent engineers from breaking the downstream data pipelines.
Kafka Streaming Pipelines
A home-grown streaming ingestion and DB replication framework SOON (Spark cOntinuOus iNgestion) was developed at Coinbase to ingest Kafka messages from various data sources to Delta Lake. Aiming to address the scalability and latency challenges in the old Airflow-based, Kafka-to-Snowflake ETL pipelines, SOON offers near-real-time ingestion performance through Spark structured streaming, and supports quick onboarding of the following use cases:
Append-only scenario (support only insert)
Merge CDC(Change Data Capture) scenario (support insert, update and delete)
Merge non-CDC scenario (support insert and update)
Data backfill for Append-only and Merge non-CDC scenarios
Apart from Databricks, Kafka Connect also plays a critical role in these pipelines. Custom SMT (single message transformation) plugins have been developed to transform the upstream database’s CDC events into a standard format that SOON can handle.
Kafka Incidents and Mitigations
In order to correctly route or receive messages on a specific topic, a Kafka client must understand the broker topology to find out the brokers hosting the individual partitions. A client must refresh the information at a configurable cadence through a metadata API call. When a broker’s request queue is saturated by metadata requests, produce requests are blocked affecting the throughput of Kafka producers. Besides, a high TLS connection rate often leads to elevated broker CPU usage and degraded Kafka performance. These issues are caused by misbehaving client services, for example, we found an AWS Lambda constantly establishing new Kafka connections for every request processed.
There is a limit on the number of TLS connections a Kafka broker can bear. Below are the max TLS connections recommended by another cloud provider:
These numbers are well aligned with our observations on MSK — a single broker node of type kafka.m5.12xlarge can handle roughly 30000 TLS connections. Finding the right Kafka message key has helped us to reduce broker connection count. For instance, observability metrics emitted by all Coinbase services are ingested into the same topic of 512 partitions inside a designated Kafka cluster. Without the message key specified, each Telegraf sidecar would connect to all the brokers for round robin, resulting in a formidable TLS connection count and an over-provisioned MSK cluster. Choosing EC2 instance ID as the message key effectively reduced the broker connections to one per EC2 instance.
Takeaways Continuously modernizing our Kafka infrastructure helps us to get the most out of our Kafka investment and empower Coinbase engineers over a wide range of evolving use cases.
---
Acknowledgements We'd like to extend our thanks to Xinyu Liu, Chen Guo, Junyuan Zhang, Sheng Zhao, LV Lu, Mona Xia, Eric Sun, Leo Liang, Arujit Pradhan, and Michael Li for their contribution to the success of Kafka at Coinbase.
About The Coinbase Data Platform & Services Team
The Coinbase Data Platform & Services Team
Institutional,
Sep 18, 2024