Real-Time Financial Exchange Order Book#

Currency symbol

In this example we are going to walk through how you can maintain a limit order book in real-time with very little extra infrastructure with Bytewax.

Skill Level

Time to Complete

Level

Intermediate Python programming, asynchronous programming

Approx. 25 Min

Intermediate

Your Takeaway#

At the end of this tutorial you will understand how to use Bytewax to analyze financial exchange data. You will learn to establish connections to a WebSocket for real-time data, use Bytewax’s operators to efficiently manage an order book, and apply analytical techniques to assess trading opportunities based on the dynamics of buy and sell orders.

Resources#

bytewax/bytewax/docs/tutorials/orderbook-guide/orderbook_dataflow.py

Objectives#

In this example we are going to walk through how you can maintain a limit order book in real-time with very little extra infrastructure with Bytewax.

We are going to:

  • Connect to Coinbase via WebSockets for live order book updates.

  • Initialize order books with current snapshots for major cryptocurrencies.

  • Update order books in real-time with market changes.

  • Utilize advanced data structures for efficient order book management.

  • Process live data with Bytewax to maintain and summarize order books.

  • Filter updates for significant market movements based on spread.

Concepts#

To start off, we are going to diverge into some concepts around markets, exchanges and orders.

Order Book#

A Limit Order Book, or just Order Book is a record of all limit orders that are made. A limit order is an order to buy (bid) or sell (ask) an asset for a given price. This could facilitate the exchange of dollars for shares or, as in our case, they could be orders to exchange crypto currencies. On exchanges, the limit order book is constantly changing as orders are placed every fraction of a second. The order book can give a trader insight into the market, whether they are looking to determine liquidity, to create liquidity, design a trading algorithm or maybe determine when bad actors are trying to manipulate the market.

Bid and Ask#

In the order book, the ask price is the lowest price that a seller is willing to sell at and the bid price is the highest price that a buyer is willing to buy at. A limit order is different than a market order in that the limit order can be placed with generally 4 dimensions, the direction (buy/sell), the size, the price and the duration (time to expire). A market order, in comparison, has 2 dimensions, the direction and the size. It is up to the exchange to fill a market order and it is filled via what is available in the order book.

Level 2 Data#

An exchange will generally offer a few different tiers of information that traders can subscribe to. These can be quite expensive for some exchanges, but luckily for us, most crypto exchanges provide access to the various levels for free! For maintaining our order book we are going to need at least level2 order information. This gives us granularity of each limit order so that we can adjust the book in real-time. Without the ability to maintain our own order book, the snapshot would get almost instantly out of date and we would not be able to act with perfect information. We would have to download the order book every millisecond, or faster, to stay up to date and since the order book can be quite large, this isn’t really feasible.

Alright, let’s get started!

Set up and imports#

Before we begin, let’s import the necessary modules and set up the environment for building the dataflow.

Complete installation - we recommend using a virtual environment to manage your Python dependencies. You can install Bytewax using pip:

$ python -m venv venv
$ ./venv/bin/activate
(venv) $ pip install bytewax==NOT_RELEASED.latest

Now, let’s import the required modules and set up the environment for building the dataflow.

dataflow.py#
10import json
11from dataclasses import dataclass, field
12from datetime import timedelta
13from typing import Dict, List, Optional
14
15import websockets
16from bytewax import operators as op
17from bytewax.connectors.stdio import StdOutSink
18from bytewax.dataflow import Dataflow
19from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async
20

Websocket Input#

Our goal is to build a scalable system that can monitor multiple cryptocurrency pairs across different workers in real time. By crafting an asynchronous function to connect to the Coinbase Pro WebSocket, we facilitate streaming of cryptocurrency data into our dataflow. This process involves the websockets Python library for WebSocket connections and bytewax for dataflow integration.

The function _ws_agen yields cryptocurrency data, paired with their identifiers (e.g., ["BTC-USD", "ETH-USD"]) by establishing a connection to the Coinbase Exchange WebSocket. It subscribes to the level2_batch feed for live order book updates, first sending a JSON subscription message and awaiting a confirmation response with ws.recv().

dataflow.py#
25async def _ws_agen(product_id):
26    """Connect to websocket and yield messages as they arrive.
27
28    This function is a generator that connects to the Coinbase websocket and
29    yields messages as they arrive. It subscribes to the level2_batch channel for
30    the given product_id.
31
32    Args:
33        product_id (_type_): The product_id to subscribe to.
34
35    Yields:
36        _type_: A tuple of the product_id and the message as a dictionary.
37    """
38    url = "wss://ws-feed.exchange.coinbase.com"
39    async with websockets.connect(url) as websocket:
40        msg = json.dumps(
41            {
42                "type": "subscribe",
43                "product_ids": [product_id],
44                "channels": ["level2_batch"],
45            }
46        )
47        await websocket.send(msg)
48        # The first msg is just a confirmation that we have subscribed.
49        await websocket.recv()
50
51        while True:
52            msg = await websocket.recv()
53            yield (product_id, json.loads(msg))
54
55

To efficiently process and manage this data, we implement the CoinbasePartition class, extending Bytewax’s StatefulSourcePartition. This enables us to obtain the current orderbook at the beginning of the stream when we subscribe.

Within CoinbasePartition, the _ws_agen function is used for data fetching using batch_async which allows us to collect a batch of incoming data for up to 0.5 seconds or after receiving 100 messages, optimizing data processing and state management. This structure ensures an efficient, scalable, and fault-tolerant system for real-time cryptocurrency market monitoring.

In this section we defined the key building blocks to enable asynchronous WebSocket connections and efficient data processing. Before we can establish a dataflow to maintain the order book, we need to define the data classes - this will enable a structured approach to data processing and management. Let’s take a look at this in the next section.

dataflow.py#
60class CoinbasePartition(StatefulSourcePartition):
61    """Process messages from the Coinbase websocket as they arrive.
62
63    This class is a partition that connects to the Coinbase websocket and
64    yields messages as they arrive. It subscribes to the level2_batch channel for
65    the given product_id.
66
67    For more information on StatefulSourcePartition, see the documentation:
68    https://docs.bytewax.io/stable/api/bytewax/bytewax.inputs.html#bytewax.inputs.StatefulSourcePartition
69
70    Args:
71        StatefulSourcePartition : The base class for a partition.
72    """
73
74    def __init__(self, product_id):
75        """Initializes the partition with the given product_id.
76
77        Args:
78            product_id (str): The product_id to subscribe to.
79        """
80        agen = _ws_agen(product_id)
81        self._batcher = batch_async(agen, timedelta(seconds=0.5), 100)
82
83    def next_batch(self):
84        """This function returns the next batch of messages from the websocket.
85
86        Returns:
87            _type_: A tuple of the product_id and the message as a dictionary.
88        """
89        return next(self._batcher)
90
91    def snapshot(self):
92        """This function returns a snapshot of the partition.
93
94        Returns:
95            _type_: _description_
96        """
97        return None
98
99

Defining data classes#

Through the Python dataclasses library we can define the structure of the data we are working with. As part of this approach we define three dataclasses:

  • CoinbaseSource: Serves as a source for partitioning data based on cryptocurrency product IDs. It is crucial for organizing and distributing the data flow across multiple workers, facilitating parallel processing of cryptocurrency pairs.

dataflow.py#
104@dataclass
105class CoinbaseSource(FixedPartitionedSource):
106    """Connect to the Coinbase websocket and yield messages as they arrive.
107
108    This class is a source that connects to the Coinbase websocket and
109    yields messages as they arrive. It subscribes to the level2_batch channel for
110    the given product_ids.
111
112    Note that the methods associated with this class are required parts
113    of the FixedPartitionedSource class.
114
115    For more information on FixedPartitionedSource, see the documentation:
116    https://docs.bytewax.io/stable/api/bytewax/bytewax.inputs.html#bytewax.inputs.FixedPartitionedSourceß
117
118    Args:
119        FixedPartitionedSource: The base class for a source.
120    """
121
122    product_ids: List[str]
123
124    def list_parts(self):
125        """This function returns the partitions for the source.
126
127        Returns:
128            List (str): The list of product_ids.
129        """
130        return self.product_ids
131
132    def build_part(self, step_id, for_key, _resume_state):
133        """This function builds a partition for the given product_id.
134
135        Args:
136            step_id (_type_): The step_id of the input operator.
137            for_key (_type_): Which partition to build.
138                                Will always be one of the keys returned
139                                by list_parts on this worker.
140            _resume_state (_type_): State data containing where in
141                                    the input stream this partition should
142                                    be begin reading during this execution.
143
144        Returns:
145            CoinbasePartition: The partition for the given product_id.
146        """
147        return CoinbasePartition(for_key)
148
149
  • OrderBookSummary: Summarizes the state of an order book at a point in time, encapsulating the bid and ask prices, sizes, and the spread. This class is immutable (frozen=True), ensuring that each instance is a snapshot that cannot be altered, which is essential for accurate historical analysis and decision-making.

dataflow.py#
154@dataclass(frozen=True)
155class OrderBookSummary:
156    """Represents a summary of the order book state."""
157
158    bid_price: float
159    bid_size: float
160    ask_price: float
161    ask_size: float
162    spread: float
163
164
  • OrderBookState: Maintains the current state of the order book, including all bids and asks. It allows for dynamic updates as new market data arrives, keeping track of the best bid and ask prices and their respective sizes.

dataflow.py#
169@dataclass
170class OrderBookState:
171    """Maintains the state of the order book."""
172
173    bids: Dict[float, float] = field(default_factory=dict)
174    asks: Dict[float, float] = field(default_factory=dict)
175    bid_price: Optional[float] = None
176    ask_price: Optional[float] = None
177
178    def update(self, data):
179        """Update the order book state with the given data.
180
181        Args:
182            data: The data to update the order book state with.
183        """
184        # Initialize bids and asks if they're empty
185        if not self.bids:
186            self.bids = {float(price): float(size) for price, size in data["bids"]}
187            self.bid_price = max(self.bids.keys(), default=None)
188        if not self.asks:
189            self.asks = {float(price): float(size) for price, size in data["asks"]}
190            self.ask_price = min(self.asks.keys(), default=None)
191
192        # Process updates from the "changes" field in the data
193        for change in data.get("changes", []):
194            side, price_str, size_str = change
195            price, size = float(price_str), float(size_str)
196
197            target_dict = self.asks if side == "sell" else self.bids
198
199            # If size is zero, remove the price level; otherwise,
200            # update/add the price level
201            if size == 0.0:
202                target_dict.pop(price, None)
203            else:
204                target_dict[price] = size
205
206            # After update, recalculate the best bid and ask prices
207            if side == "sell":
208                self.ask_price = min(self.asks.keys(), default=None)
209            else:
210                self.bid_price = max(self.bids.keys(), default=None)
211
212    def spread(self) -> float:
213        """Calculate the spread between the best bid and ask prices.
214
215        Returns:
216            float: The spread between the best bid and ask prices.
217        """
218        return self.ask_price - self.bid_price  # type: ignore
219
220    def summarize(self):
221        """Summarize the order book state.
222
223        Returns:
224            OrderBookSummary: A summary of the order book state.
225        """
226        return OrderBookSummary(
227            bid_price=self.bid_price,
228            bid_size=self.bids[self.bid_price],
229            ask_price=self.ask_price,
230            ask_size=self.asks[self.ask_price],
231            spread=self.spread(),
232        )
233
234

In this section, we have defined the data classes that will enable us to maintain the order book in real time. These classes will be used to structure the data flow and manage the state of the order book. Now that we have defined the data classes, we can proceed to construct the dataflow to maintain the order book.

Constructing The Dataflow#

Before we get to the exciting part of our order book dataflow, we need to create our Dataflow object and prep the data. We’ll start with creating a Dataflow named ‘orderbook’. Once this is initialized, we can incorporate an input data source into the data flow. We can do this by using the bytewax module operator bytewax.operators which we’ve imported here by a shorter name, op. We will use the operator input, specify its step_id as “input”, pass the orderbook dataflow object along with the data source - in this case the source of data is the CoinbaseSource class we defined earlier initialized with the ids ["BTC-USD", "ETH-USD", "BTC-EUR", "ETH-EUR"].

dataflow.py#
238flow = Dataflow("orderbook")
239inp = op.input(
240    "input", flow, CoinbaseSource(["BTC-USD", "ETH-USD", "BTC-EUR", "ETH-EUR"])
241)
242# ('BTC-USD', {
243#     'type': 'l2update',
244#     'product_id': 'BTC-USD',
245#     'changes': [['buy', '36905.39', '0.00334873']],
246#     'time': '2022-05-05T17:25:09.072519Z',
247# })

Now that we have input for our Dataflow, we will establish a dataflow pipeline for processing live cryptocurrency order book updates. We will focus on analysis and data filtration based on order book spreads. Our goal is for the pipeline to extract and highlight trading opportunities through the analysis of spreads. Let’s take a look at key components of the dataflow pipeline:

The mapper function updates and summarizes the order book state, initializing a new OrderBookState object if one does not exist, and applying new data updates. The result is a state-summary tuple with key metrics like the best bid and ask prices, sizes, and the spread. We can then use the stateful_map operator to apply the mapper function to each update received in our input batch.

dataflow.py#
252def mapper(state, value):
253    """Update the state with the given value and return the state and a summary."""
254    if state is None:
255        state = OrderBookState()
256
257    state.update(value)
258    return (state, state.summarize())
259
260
261stats = op.stateful_map("orderbook", inp, mapper)
262# ('BTC-USD', (36905.39, 0.00334873, 36905.4, 1.6e-05, 0.010000000002037268))

The last step is to filter the summaries by spread percentage, with a focus on identifying significant spreads greater than 0.1% of the ask price - we will use this as a proxy for trading opportunities. We will define the function and use filter to apply the filter to summaries from the 'orderbook' dataflow.

dataflow.py#
267# # filter on 0.1% spread as a per
268def just_large_spread(prod_summary):
269    """Filter out products with a spread less than 0.1%."""
270    product, summary = prod_summary
271    return summary.spread / summary.ask_price > 0.0001
272
273
274state = op.filter("big_spread", stats, just_large_spread)

To return the result of the dataflow, we can use output.

dataflow.py#
278op.output("out", stats, StdOutSink())

Executing the Dataflow#

Now we can run our completed dataflow:

> python -m bytewax.run dataflow:flow

This will process real-time order book data for three cryptocurrency pairs: BTC-USD, ETH-EUR, and ETH-USD. Each summary provided detailed insights into the bid and ask sides of the market, including prices and sizes.

('BTC-EUR', OrderBookSummary(bid_price=60152.78, bid_size=0.0104, ask_price=60173.35, ask_size=0.02238611, spread=20.56999999999971))
('BTC-USD', OrderBookSummary(bid_price=65677.38, bid_size=0.05, ask_price=65368.71, ask_size=0.001663, spread=-308.67000000000553))
('ETH-EUR', OrderBookSummary(bid_price=3095.16, bid_size=0.20712451, ask_price=3079.9, ask_size=0.14696149, spread=-15.259999999999764))

Summary#

That’s it! You’ve learned how to use websockets with Bytewax and how to leverage stateful map to maintain the state in a streaming application.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now