Real-Time Financial Exchange Order Book#
In this example we are going to walk through how you can maintain a limit order book in real-time with very little extra infrastructure with Bytewax.
Skill Level |
Time to Complete |
Level |
---|---|---|
Intermediate Python programming, asynchronous programming |
Approx. 25 Min |
Intermediate |
Your Takeaway#
At the end of this tutorial you will understand how to use Bytewax to analyze financial exchange data. You will learn to establish connections to a WebSocket for real-time data, use Bytewax’s operators to efficiently manage an order book, and apply analytical techniques to assess trading opportunities based on the dynamics of buy and sell orders.
Resources#
bytewax/bytewax/docs/tutorials/orderbook-guide/orderbook_dataflow.py
Objectives#
In this example we are going to walk through how you can maintain a limit order book in real-time with very little extra infrastructure with Bytewax.
We are going to:
Connect to Coinbase via WebSockets for live order book updates.
Initialize order books with current snapshots for major cryptocurrencies.
Update order books in real-time with market changes.
Utilize advanced data structures for efficient order book management.
Process live data with Bytewax to maintain and summarize order books.
Filter updates for significant market movements based on spread.
Concepts#
To start off, we are going to diverge into some concepts around markets, exchanges and orders.
Order Book#
A Limit Order Book, or just Order Book is a record of all limit orders that are made. A limit order is an order to buy (bid) or sell (ask) an asset for a given price. This could facilitate the exchange of dollars for shares or, as in our case, they could be orders to exchange crypto currencies. On exchanges, the limit order book is constantly changing as orders are placed every fraction of a second. The order book can give a trader insight into the market, whether they are looking to determine liquidity, to create liquidity, design a trading algorithm or maybe determine when bad actors are trying to manipulate the market.
Bid and Ask#
In the order book, the ask price is the lowest price that a seller is willing to sell at and the bid price is the highest price that a buyer is willing to buy at. A limit order is different than a market order in that the limit order can be placed with generally 4 dimensions, the direction (buy/sell), the size, the price and the duration (time to expire). A market order, in comparison, has 2 dimensions, the direction and the size. It is up to the exchange to fill a market order and it is filled via what is available in the order book.
Level 2 Data#
An exchange will generally offer a few different tiers of information that traders can subscribe to. These can be quite expensive for some exchanges, but luckily for us, most crypto exchanges provide access to the various levels for free! For maintaining our order book we are going to need at least level2 order information. This gives us granularity of each limit order so that we can adjust the book in real-time. Without the ability to maintain our own order book, the snapshot would get almost instantly out of date and we would not be able to act with perfect information. We would have to download the order book every millisecond, or faster, to stay up to date and since the order book can be quite large, this isn’t really feasible.
Alright, let’s get started!
Set up and imports#
Before we begin, let’s import the necessary modules and set up the environment for building the dataflow.
Complete installation - we recommend using a virtual environment to manage your Python dependencies. You can install Bytewax using pip:
$ python -m venv venv
$ ./venv/bin/activate
(venv) $ pip install bytewax==0.21.0
Now, let’s import the required modules and set up the environment for building the dataflow.
10import json
11from dataclasses import dataclass, field
12from datetime import timedelta
13from typing import Dict, List, Optional
14
15import websockets
16from bytewax import operators as op
17from bytewax.connectors.stdio import StdOutSink
18from bytewax.dataflow import Dataflow
19from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async
20
Websocket Input#
Our goal is to build a scalable system that can monitor multiple cryptocurrency pairs across different workers in real time. By crafting an asynchronous function to connect to the Coinbase Pro WebSocket, we facilitate streaming of cryptocurrency data into our dataflow. This process involves the websockets Python library for WebSocket connections and bytewax for dataflow integration.
The function _ws_agen
yields cryptocurrency data, paired with their identifiers (e.g., ["BTC-USD", "ETH-USD"]
) by establishing a connection to the Coinbase Exchange WebSocket. It subscribes to the level2_batch
feed for live order book updates, first sending a JSON subscription message and awaiting a confirmation response with ws.recv()
.
25async def _ws_agen(product_id):
26 """Connect to websocket and yield messages as they arrive.
27
28 This function is a generator that connects to the Coinbase websocket and
29 yields messages as they arrive. It subscribes to the level2_batch channel for
30 the given product_id.
31
32 Args:
33 product_id (_type_): The product_id to subscribe to.
34
35 Yields:
36 _type_: A tuple of the product_id and the message as a dictionary.
37 """
38 url = "wss://ws-feed.exchange.coinbase.com"
39 async with websockets.connect(url) as websocket:
40 msg = json.dumps(
41 {
42 "type": "subscribe",
43 "product_ids": [product_id],
44 "channels": ["level2_batch"],
45 }
46 )
47 await websocket.send(msg)
48 # The first msg is just a confirmation that we have subscribed.
49 await websocket.recv()
50
51 while True:
52 msg = await websocket.recv()
53 yield (product_id, json.loads(msg))
54
55
To efficiently process and manage this data, we implement the CoinbasePartition
class, extending Bytewax’s StatefulSourcePartition
. This enables us to obtain the current orderbook at the beginning of the stream when we subscribe.
Within CoinbasePartition
, the _ws_agen
function is used for data fetching using batch_async
which allows us to collect a batch of incoming data for up to 0.5 seconds or after receiving 100 messages, optimizing data processing and state management. This structure ensures an efficient, scalable, and fault-tolerant system for real-time cryptocurrency market monitoring.
In this section we defined the key building blocks to enable asynchronous WebSocket connections and efficient data processing. Before we can establish a dataflow to maintain the order book, we need to define the data classes - this will enable a structured approach to data processing and management. Let’s take a look at this in the next section.
60class CoinbasePartition(StatefulSourcePartition):
61 """Process messages from the Coinbase websocket as they arrive.
62
63 This class is a partition that connects to the Coinbase websocket and
64 yields messages as they arrive. It subscribes to the level2_batch channel for
65 the given product_id.
66
67 For more information on StatefulSourcePartition, see the documentation:
68 https://docs.bytewax.io/stable/api/bytewax/bytewax.inputs.html#bytewax.inputs.StatefulSourcePartition
69
70 Args:
71 StatefulSourcePartition : The base class for a partition.
72 """
73
74 def __init__(self, product_id):
75 """Initializes the partition with the given product_id.
76
77 Args:
78 product_id (str): The product_id to subscribe to.
79 """
80 agen = _ws_agen(product_id)
81 self._batcher = batch_async(agen, timedelta(seconds=0.5), 100)
82
83 def next_batch(self):
84 """This function returns the next batch of messages from the websocket.
85
86 Returns:
87 _type_: A tuple of the product_id and the message as a dictionary.
88 """
89 return next(self._batcher)
90
91 def snapshot(self):
92 """This function returns a snapshot of the partition.
93
94 Returns:
95 _type_: _description_
96 """
97 return None
98
99
Defining data classes#
Through the Python dataclasses
library we can define the structure of the data we are working with. As part of this approach we define three dataclasses:
CoinbaseSource
: Serves as a source for partitioning data based on cryptocurrency product IDs. It is crucial for organizing and distributing the data flow across multiple workers, facilitating parallel processing of cryptocurrency pairs.
104@dataclass
105class CoinbaseSource(FixedPartitionedSource):
106 """Connect to the Coinbase websocket and yield messages as they arrive.
107
108 This class is a source that connects to the Coinbase websocket and
109 yields messages as they arrive. It subscribes to the level2_batch channel for
110 the given product_ids.
111
112 Note that the methods associated with this class are required parts
113 of the FixedPartitionedSource class.
114
115 For more information on FixedPartitionedSource, see the documentation:
116 https://docs.bytewax.io/stable/api/bytewax/bytewax.inputs.html#bytewax.inputs.FixedPartitionedSourceß
117
118 Args:
119 FixedPartitionedSource: The base class for a source.
120 """
121
122 product_ids: List[str]
123
124 def list_parts(self):
125 """This function returns the partitions for the source.
126
127 Returns:
128 List (str): The list of product_ids.
129 """
130 return self.product_ids
131
132 def build_part(self, step_id, for_key, _resume_state):
133 """This function builds a partition for the given product_id.
134
135 Args:
136 step_id (_type_): The step_id of the input operator.
137 for_key (_type_): Which partition to build.
138 Will always be one of the keys returned
139 by list_parts on this worker.
140 _resume_state (_type_): State data containing where in
141 the input stream this partition should
142 be begin reading during this execution.
143
144 Returns:
145 CoinbasePartition: The partition for the given product_id.
146 """
147 return CoinbasePartition(for_key)
148
149
OrderBookSummary
: Summarizes the state of an order book at a point in time, encapsulating the bid and ask prices, sizes, and the spread. This class is immutable(frozen=True)
, ensuring that each instance is a snapshot that cannot be altered, which is essential for accurate historical analysis and decision-making.
154@dataclass(frozen=True)
155class OrderBookSummary:
156 """Represents a summary of the order book state."""
157
158 bid_price: float
159 bid_size: float
160 ask_price: float
161 ask_size: float
162 spread: float
163
164
OrderBookState
: Maintains the current state of the order book, including all bids and asks. It allows for dynamic updates as new market data arrives, keeping track of the best bid and ask prices and their respective sizes.
169@dataclass
170class OrderBookState:
171 """Maintains the state of the order book."""
172
173 bids: Dict[float, float] = field(default_factory=dict)
174 asks: Dict[float, float] = field(default_factory=dict)
175 bid_price: Optional[float] = None
176 ask_price: Optional[float] = None
177
178 def update(self, data):
179 """Update the order book state with the given data.
180
181 Args:
182 data: The data to update the order book state with.
183 """
184 # Initialize bids and asks if they're empty
185 if not self.bids:
186 self.bids = {float(price): float(size) for price, size in data["bids"]}
187 self.bid_price = max(self.bids.keys(), default=None)
188 if not self.asks:
189 self.asks = {float(price): float(size) for price, size in data["asks"]}
190 self.ask_price = min(self.asks.keys(), default=None)
191
192 # Process updates from the "changes" field in the data
193 for change in data.get("changes", []):
194 side, price_str, size_str = change
195 price, size = float(price_str), float(size_str)
196
197 target_dict = self.asks if side == "sell" else self.bids
198
199 # If size is zero, remove the price level; otherwise,
200 # update/add the price level
201 if size == 0.0:
202 target_dict.pop(price, None)
203 else:
204 target_dict[price] = size
205
206 # After update, recalculate the best bid and ask prices
207 if side == "sell":
208 self.ask_price = min(self.asks.keys(), default=None)
209 else:
210 self.bid_price = max(self.bids.keys(), default=None)
211
212 def spread(self) -> float:
213 """Calculate the spread between the best bid and ask prices.
214
215 Returns:
216 float: The spread between the best bid and ask prices.
217 """
218 return self.ask_price - self.bid_price # type: ignore
219
220 def summarize(self):
221 """Summarize the order book state.
222
223 Returns:
224 OrderBookSummary: A summary of the order book state.
225 """
226 return OrderBookSummary(
227 bid_price=self.bid_price,
228 bid_size=self.bids[self.bid_price],
229 ask_price=self.ask_price,
230 ask_size=self.asks[self.ask_price],
231 spread=self.spread(),
232 )
233
234
In this section, we have defined the data classes that will enable us to maintain the order book in real time. These classes will be used to structure the data flow and manage the state of the order book. Now that we have defined the data classes, we can proceed to construct the dataflow to maintain the order book.
Constructing The Dataflow#
Before we get to the exciting part of our order book dataflow, we need to create our Dataflow object and prep the data. We’ll start with creating a Dataflow named ‘orderbook’. Once this is initialized, we can incorporate an input data source into the data flow. We can do this by using the bytewax module operator bytewax.operators
which we’ve imported here by a shorter name, op
. We will use the operator input
, specify its step_id
as “input”, pass the orderbook
dataflow object along with the data source - in this case the source of data is the CoinbaseSource
class we defined earlier initialized with the ids ["BTC-USD", "ETH-USD", "BTC-EUR", "ETH-EUR"]
.
238flow = Dataflow("orderbook")
239inp = op.input(
240 "input", flow, CoinbaseSource(["BTC-USD", "ETH-USD", "BTC-EUR", "ETH-EUR"])
241)
242# ('BTC-USD', {
243# 'type': 'l2update',
244# 'product_id': 'BTC-USD',
245# 'changes': [['buy', '36905.39', '0.00334873']],
246# 'time': '2022-05-05T17:25:09.072519Z',
247# })
Now that we have input for our Dataflow, we will establish a dataflow pipeline for processing live cryptocurrency order book updates. We will focus on analysis and data filtration based on order book spreads. Our goal is for the pipeline to extract and highlight trading opportunities through the analysis of spreads. Let’s take a look at key components of the dataflow pipeline:
The mapper
function updates and summarizes the order book state, initializing a new OrderBookState
object if one does not exist, and applying new data updates. The result is a state-summary tuple with key metrics like the best bid and ask prices, sizes, and the spread. We can then use the stateful_map
operator to apply the mapper function to each update received in our input batch.
252def mapper(state, value):
253 """Update the state with the given value and return the state and a summary."""
254 if state is None:
255 state = OrderBookState()
256
257 state.update(value)
258 return (state, state.summarize())
259
260
261stats = op.stateful_map("orderbook", inp, mapper)
262# ('BTC-USD', (36905.39, 0.00334873, 36905.4, 1.6e-05, 0.010000000002037268))
The last step is to filter the summaries by spread percentage, with a focus on identifying significant spreads greater than 0.1% of the ask price - we will use this as a proxy for trading opportunities. We will define the function and use filter
to apply the filter to summaries from the 'orderbook'
dataflow.
267# # filter on 0.1% spread as a per
268def just_large_spread(prod_summary):
269 """Filter out products with a spread less than 0.1%."""
270 product, summary = prod_summary
271 return summary.spread / summary.ask_price > 0.0001
272
273
274state = op.filter("big_spread", stats, just_large_spread)
To return the result of the dataflow, we can use output
.
278op.output("out", stats, StdOutSink())
Executing the Dataflow#
Now we can run our completed dataflow:
> python -m bytewax.run dataflow:flow
This will process real-time order book data for three cryptocurrency pairs: BTC-USD, ETH-EUR, and ETH-USD. Each summary provided detailed insights into the bid and ask sides of the market, including prices and sizes.
('BTC-EUR', OrderBookSummary(bid_price=60152.78, bid_size=0.0104, ask_price=60173.35, ask_size=0.02238611, spread=20.56999999999971))
('BTC-USD', OrderBookSummary(bid_price=65677.38, bid_size=0.05, ask_price=65368.71, ask_size=0.001663, spread=-308.67000000000553))
('ETH-EUR', OrderBookSummary(bid_price=3095.16, bid_size=0.20712451, ask_price=3079.9, ask_size=0.14696149, spread=-15.259999999999764))
Summary#
That’s it! You’ve learned how to use websockets with Bytewax and how to leverage stateful map to maintain the state in a streaming application.