Tl;dr: This blog post builds and maintains a real time order book using the Coinbase Prime API.
Coinbase Prime’s deep liquidity is made possible by connecting Prime clients via Smart Order Router to several venues – not just Coinbase Exchange. As such, the Coinbase Prime order book aggregates liquidity from each venue to create a single book for trading, which is made available for streaming via L2 data WebSocket. This blog post steps through how to handle ingesting this data and displaying it as a real-time order book on a simple web UI.
The example application detailed below uses Pandas to construct and maintain a local order book, SQLite to store a copy in a database, and Dash to create a simple frontend from which we can display periodic snapshots of your database. In all, your real-time order book application will be less than 300 lines of code.
Optionally, at the bottom of this blog post, you’ll find a GitHub link that you may clone to get started immediately.
Create a Python file called backend.py. For this application, we will need to import several modules in order to properly connect to Coinbase Prime and process the WebSocket response. You’ll also import a class that will be written in the next file.
import asyncio, websockets, sqlite3, json, hmac, hashlib, base64, time, sys from dotenv import load_dotenv from orderbook import OrderBookProcessor
For anything you don’t have installed, you can install with the following command, e.g.:
pip install websockets
In order to receive a response from Coinbase Prime APIs, you’ll need to grab your API key credentials from the Prime UI. Once you have them, you can plug them into the below variables as strings. If you are working out of the repository linked at the bottom of this page, you will instead plug these values into the example.env without quotes and rename that file to “.env”.
ACCESS_KEY = 'ACCESS_KEY' SECRET_KEY = 'SECRET_KEY' PASSPHRASE = 'SVC_ACCOUNTID' SVC_ACCOUNTID = 'SVC_ACCOUNTID'
You will need to set the Prime URI and establish a timestamp which will be used for your API signature. You will also need to specify the channel that you wish to connect to. More details around supported Prime channels can be found here. Lastly, you need to define a name for the database that you will create.
URI = 'wss://ws-feed.prime.coinbase.com' timestamp = str(int(time.time())) channel = 'l2_data' conn = sqlite3.connect('prime_orderbook.db')
There are three main variables you can customize in this application. The first is which product you wish to build an order book for. Coinbase Prime follows the format of 'product1-product2'. Next, you may define the specific aggregation level that you wish to set for your order book, and finally how many rows you want your order book to be. Aggregation level specifies what size bin will be created for each price level in the denominating currency (e.g. USD). For now, use the following:
product_id = 'ETH-USD' agg_level = '0.1' row_count = '50'
The main_loop function written below is what submits your authentication signature and performs initial processing and delegation for the WebSocket response from Prime.
The l2_data WebSocket outputs an array of dictionaries containing the order's side, event time, price, and quantity. The first message provides a snapshot of the order book at the current moment. This will include all price levels and quantities. For ETH-USD, this is currently around 17,000 rows. Every subsequent message is referred to as an update. This is not a complete snapshot; rather, it is everything that has changed since the previous message. This means that in order to properly maintain the order book, you will need to first store the snapshot and then apply updates to price levels as they change.
In order to properly maintain this order book given its message design, you will need several functions to correctly process the response, which will be written later on.
async def main_loop(): while True: try: async with websockets.connect(URI, ping_interval=None, max_size=None) as websocket: auth_message = await create_auth_message( channel, product_id, timestamp ) await websocket.send(auth_message) while True: response = await websocket.recv() parsed = json.loads(response)
if parsed['channel'] == 'l2_data' and parsed['events'][0]['type'] == 'snapshot': processor = OrderBookProcessor(response) elif processor is not None: processor.apply_update(response) if processor is not None: table = processor.create_df(agg_level=agg_level) print('updated') table.to_sql('book', conn, if_exists='replace', index=False) sys.stdout.flush() except websockets.ConnectionClosed: continue
You need to authenticate every time you connect to Prime via API, so the following code provides everything to accomplish that. These functions will be called automatically within main_loop.
async def create_auth_message(channel, product_id, timestamp): signature = sign( channel, ACCESS_KEY, SECRET_KEY, SVC_ACCOUNTID, product_id ) auth_message = json.dumps({ 'type': 'subscribe', 'channel': channel, 'access_key': ACCESS_KEY, 'api_key_id': SVC_ACCOUNTID, 'timestamp': timestamp, 'passphrase': PASSPHRASE, 'signature': signature, 'product_ids': [product_id], }) return auth_message
def sign(channel, key, secret, account_id, product_ids): message = channel + key + account_id + timestamp + product_ids signature = hmac.new( SECRET_KEY.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_b64 = base64.b64encode(signature).decode() return signature_b64
Finally, write a standard Python initialization function to let your script know to run main_loop:
if __name__ == '__main__': asyncio.run(main_loop())
For your next file, create orderbook.py and import the following:
import json, math import pandas as pd from decimal import Decimal
You will now create a class that initializes an order book on receipt of your first message. On subsequent messages, you will run a function from this class to update the order book and remove rows far from the mid price. After each message is complete, you will create a Pandas dataframe with the current order book data, apply aggregation logic to it, and export this to a SQL database. The SQL database will be overwritten whenever a new order book is available.
Create a class called OrderBookProcessor. Within this class, you will build seven functions.
__init__: creates arrays of bids and asks from the initial snapshot apply_update: iterates through update messages and applies the following function _apply: appends new bids and asks to the bids and asks arrays _filter_closed: removes empty bids and asks from the bids and asks arrays (this is indicated in WebSocket responses by sending an update with qty = 0) _sort: ensures that all bids and asks are properly ordered before bids and asks far from the mid price are dropped create_df: creates and manipulates Pandas DataFrames from a smaller version of the bids and asks arrays aggregate_levels: uses agg_level to sum bids and asks into bins
class OrderBookProcessor(): def __init__(self, snapshot): self.bids = [] self.offers = [] snapshot_data = json.loads(snapshot) px_levels = snapshot_data['events'][0]['updates'] for i in range(len(px_levels)): level = px_levels[i] if level['side'] == 'bid': self.bids.append(level) elif level['side'] == 'offer': self.offers.append(level) else: raise IOError() self._sort()
def apply_update(self, data): event = json.loads(data) if event['channel'] != 'l2_data': return events = event['events'] for e in events: updates = e['updates'] for update in updates: self._apply(update) self._filter_closed() self._sort()
def _apply(self, level): if level['side'] == 'bid': found = False for i in range(len(self.bids)): if self.bids[i]['px'] == level['px']: self.bids[i] = level found = True break if not found: self.bids.append(level) else: found = False for i in range(len(self.offers)): if self.offers[i]['px'] == level['px']: self.offers[i] = level found = True break if not found: self.offers.append(level)
def _filter_closed(self): self.bids = [x for x in self.bids if abs(float(x['qty'])) > 0] self.offers = [x for x in self.offers if abs(float(x['qty'])) > 0]
def _sort(self): self.bids = sorted(self.bids, key=lambda x: float(x['px']) * -1) self.offers = sorted(self.offers, key=lambda x: float(x['px']))
def create_df(self, agg_level):
bids_subset = int(len(self.bids)/16) asks_subset = int(len(self.offers)/16)
bids = self.bids[:bids_subset] asks = self.offers[:asks_subset]
bid_df = pd.DataFrame(bids, columns=['px', 'qty'], dtype=float) ask_df = pd.DataFrame(asks, columns=['px', 'qty'], dtype=float)
bid_df = self.aggregate_levels( bid_df, agg_level=Decimal(agg_level), side='bid') ask_df = self.aggregate_levels( ask_df, agg_level=Decimal(agg_level), side='offer')
bid_df = bid_df.sort_values('px', ascending=False) ask_df = ask_df.sort_values('px', ascending=False)
bid_df.reset_index(inplace=True) bid_df['id'] = bid_df['index'].index.astype(str) + '_bid'
ask_df = ask_df.iloc[::-1] ask_df.reset_index(inplace=True) ask_df['id'] = ask_df['index'].index.astype(str) + '_ask' ask_df = ask_df.iloc[::-1]
order_book = pd.concat([ask_df, bid_df]) return order_book
def aggregate_levels(self, levels_df, agg_level, side): if side == 'bid': right = False def label_func(x): return x.left elif side == 'offer': right = True def label_func(x): return x.right
min_level = math.floor(Decimal(min(levels_df.px)) / agg_level - 1) * agg_level max_level = math.ceil(Decimal(max(levels_df.px)) / agg_level + 1) * agg_level
level_bounds = [float(min_level + agg_level * x) for x in range(int((max_level - min_level) / agg_level) + 1)]
levels_df['bin'] = pd.cut(levels_df.px, bins=level_bounds, precision=10, right=right)
levels_df = levels_df.groupby('bin').agg(qty=('qty', 'sum')).reset_index()
levels_df['px'] = levels_df.bin.apply(label_func) levels_df = levels_df[levels_df.qty > 0] levels_df = levels_df[['px', 'qty']]
return levels_df
Let’s dive deeper into a couple nuances from the above scripts.
The purpose of aggregate_levels is to create equal sized price levels to make an order book easier to interpret. Without this function, the order book would list every distinct price currently available, making it very difficult to efficiently read or place orders. By aggregating liquidity into logical increments, e.g. $1, $0.10, $0.01, etc., users can select price levels without worrying about that price level rapidly disappearing or changing.
A further nuance of this function is a slight formulaic difference between creating bids and asks. Bids and asks are pooled oppositely such that displayed bids are the minimum price within its bin, while for asks the maximum price is used. This is to ensure that proper liquidity can be accessed when users place limit taker orders.
Consider the following example. Given ETH-USD at an aggregation of 1, at the time of writing you will see bid levels of 1792, 1793, 1794, etc in your order book. You will need to display the sum of liquidity for 1792 where 1792 is the minimum price for that bucket: 1793 ≥ bids > 1792.
The inverse is true for asks. The displayed price for a given bin on the ask side will be the maximum price. Using 1792, the bucket will be structured as: 1792 ≥ asks > 1791.
Another important note is the inclusion of additional columns to your DataFrame. Within create_df, we are creating an additional column called id which appends the order side and how close this price is to the mid price. This is to assist with your frontend.
For your web app UI, you will use Dash, a framework for building and displaying data applications, which is powered by Flask for its backend. Your Dash app will read the SQL database that you created in your first script, convert that into a Pandas DataFrame, and display it as a Dash DataTable which will automatically update every n milliseconds.
Create a new file called frontend.py and import the following functions to get started:
import sqlite3 import pandas as pd from dash import html, dcc, Output, Input, Dash, dash_table from backend import product_id, agg_level, row_count
Declare your application and set up a layout. The layout code provided in this section is for example purposes only.
The key element to making your application real-time is through the use of the Interval component. This will re-query the SQL database every n milliseconds. This methodology is recommended for several reasons. It is important to not drop any messages from the Prime WebSocket, so processing and displaying too much data may cause you to fall behind. Prime will automatically disconnect if that happens. Once you have successfully stored the order book data, you can display snapshots of it at a cadence that feels most comfortable for you. By default this value is set to 1000 ms.
The Dash DataTable allows you to display just the columns that are important to your UI, which for this example will be just price and quantity. Conditional formatting is applied to mimic the appearance of a standard UI order book. At the top of the application, you will also display the product name and its current maximum bid.
app = Dash(__name__)
app.layout = html.Div([ html.H1(id='mid-price'), dcc.Interval(id='update', interval=1000), dash_table.DataTable(id='my-table', columns=[ {'name': 'Price', 'id': 'px', 'type': 'text'}, {'name': 'Qty', 'id': 'qty', 'type': 'text'}, ], style_data_conditional=[ { 'if': { 'column_id': 'px', 'filter_query': '{id} contains "bid"' }, 'backgroundColor': '#50C878' }, { 'if': { 'column_id': 'px', 'filter_query': '{id} contains "ask"' }, 'backgroundColor': '#DC143C' }, { 'if': { 'column_id': 'qty', 'filter_query': '{id} contains "ask"' }, 'backgroundColor': '#FAA0A0' }, { 'if': { 'column_id': 'qty', 'filter_query': '{id} contains "bid"' }, 'backgroundColor': '#C1E1C1' } ], style_cell={'font-family': 'Courier New'} )
], style={'width': '30%', 'font-family': 'Arial'}, )
The last remaining element for this application to write is the code to call the SQL database and structure it to be displayed on the DataTable. To accomplish this, create a function called load_data. You may set the amount of records you wish to copy from the order book. By including an ID column in the DataFrame that you stored, you can select by this ID to ensure that you are pulling values closest to the mid price.
DataTables work best with DataFrames, so you will load this data into a DataFrame, then sort the rows descending by price.
To utilize automatic Interval refreshing, create a callback function with Interval as the input and the DataTable as the output. This will refresh the snapshot of the SQL database accordingly.
Another callback function can be added to display the current product and price.
@app.callback( Output('mid-price', 'children'), Input('update', 'n_intervals'), ) def update_mid(intervals): df = load_data()
max_bid = df.loc[df['id'].str.contains('bid'), 'px'].max()
one_usd = '1' if agg_level == one_usd: max_bid = int(max_bid)
return f'{product_id}: {max_bid}'
@app.callback( Output('my-table', 'data'), Input('update', 'n_intervals'), ) def update_table(intervals): df = load_data()
return df.to_dict(orient='records')
def load_data(): conn = sqlite3.connect('prime_orderbook.db') cursor = conn.cursor()
query = f'SELECT * FROM book ORDER BY id + 0 ASC LIMIT {row_count}'
data = cursor.execute(query)
df = pd.DataFrame(data)
df.columns = ['index', 'px', 'qty', 'id'] df = df.sort_values(by=['px'], ascending=False)
return df
if __name__ == '__main__': app.run_server(debug=True)
With your code complete, you can begin the application by running backend.py and then in a separate terminal window, running frontend.py. Your final product should look similar to this and update at whatever frequency was set in your frontend:
This code is available on the Coinbase Samples GitHub, or by running the following command:
git clone https://github.com/coinbase-samples/prime-order-book-py
Disclaimer: Information is provided for informational purposes only and is not investment advice. This is not a recommendation to buy or sell digital assets or to employ a particular investment strategy, codes, or APIs. Coinbase makes no representation on the accuracy, suitability, or validity of any information provided herein. Additional information can be found here.
Company,
Dec 18, 2024