market data replay (part1; agent based quant infrastructure)
For those who missed it, we just created a quantpylib repo access pass as a cheaper alternative to the yearly substack:
We are running special discounts on launch, and there is about 4 days, remaining if you are interested!
Also, our funding arbitrage lectures (round 2) signups will be left open for one more week!
In the last post, we demonstrated how to do market making using quantpylib:
You can refer here too: https://hangukquant.github.io/scripts/market_making/
Now, one of the key criterions of good dev infrastructure is portability. This refers to portability across exchanges, strategies, environments and what not. The exchange portability was demonstrated in that post. Strategy portability is demonstrated with our genetic alphas, but we have not yet demonstrated such behavior with our hft/tick data backtests.
That is, we would have to write a completely different set of code for production and backtests - and this is not ideal (for obvious reasons, most critically error proneness and effort).
If we are able to replicate these events (tick arrival, order creates, order ack, order fills, latencies etc), then we would be one step closer to having a unified model. In particular, if we are able to create realistic simulator (mock) classes for the OMS and Feed, and match their interface (function signatures, etc) - then we would have a singular codebase for tests and production - all while preserving flexibility of the trade driver logic.
Today, we shall look at how to create a mock class for the data feed and do a market data replay.
Here is how different components/agents of the quant system might behave:
Different exchanges are connected through a proprietary gateway, and state managers such as order management systems and data feeds make use of the gateway to provide services to trading strategies (drivers).
To decide what we want to emulate, we need to first know what the real behavior looks like. Let’s write some code to get data feeds:
Streaming BTC trade and l2 on binance:
import os
import asyncio
import numpy as np
from pprint import pprint
from dotenv import load_dotenv
load_dotenv()
from decimal import Decimal
from quantpylib.utilities.general import save_pickle, load_pickle
simulated = True
exc,ticker = 'binance','BTCUSDT'
config_keys = {
"binance" : {
"key":os.getenv('BIN_KEY'),
"secret":os.getenv('BIN_SECRET'),
},
}
from quantpylib.hft.feed import Feed
from quantpylib.utilities.general import _time
from quantpylib.gateway.master import Gateway
gateway = Gateway(config_keys=config_keys)
feed = Feed(gateway=gateway)
time = _time
async def lob_handler(lob):
print(time(), lob.timestamp, lob.get_mid())
async def trade_handler(trade):
print(time(), trade)
async def main():
await gateway.init_clients()
l2_feed = await feed.add_l2_book_feed(
exc=exc,ticker=ticker,handler=lob_handler
)
trades_feed = await feed.add_trades_feed(
exc=exc,ticker=ticker,handler=trade_handler
)
await asyncio.sleep(60)
lob = feed.get_feed(l2_feed)
trades = feed.get_feed(trades_feed)
save_pickle('data.pickle',(lob,trades))
if __name__ == "__main__":
asyncio.run(main())
If we let this run, we will see that we get outputs such as:
1727456689856 (1727456689749, 66306.2, 0.082, 1) #trade_handler
1727456689989 (1727456689904, 66306.8, 0.463, -1) #trade_handler
1727456690139 (1727456690025, 66306.9, 0.009, 1) #trade_handler
1727456725394 1727456725325 66304.65 #lob_handler
After adding the data feeds, we sleep for sixty seconds. Since this is an asynchronous sleep, the tick data arrivals are passed into the handlers. Afterwards, the data buffers are obtained using their feed ID and saved.
Of the many pains that plague hft backtesting (market impact, queue position, traffic congestion etc), one of the most important is latency. There are many different types of latencies…consider:
In most cases, public and private feeds (l2 delta updates vs order fills) have different latencies. Request (req) and acknowledgments (ack) have different latencies.
In the general case…the OMS and Feed is used for multi-exchange portfolios. Geographical differences between different exchange servers (binance/tokyo vs bybit/singapore), as well as different exchange protocols (varying TPS etc) can affect latencies. We need to account for all of these for a more realistic modeling.
To do this, we need to create a data replay class. Let’s just concern ourself with public data feed for now, and unilaterally (from exchange server to local agent message passing).
The actual code is abit long for me to discuss, so here is a data replayer pseudo code (will push actual code to quantpylib/main later):
class Replayer():
def __init__(
self,
l2_data,
trade_data,
exchange_latencies={
Latencies.NULL:0,
Latencies.REQ_PRIVATE:50, #milliseconds
Latencies.REQ_PUBLIC:100,
Latencies.ACK_PRIVATE:50,
Latencies.ACK_PUBLIC:100,
Latencies.FEED_PRIVATE:50,
Latencies.FEED_PUBLIC:100,
}
):
self.exchange_latencies = exchange_latencies
self.l2_data = l2_data # {exc:{ticker:[{ts,b,a}]}}
self.trade_data = trade_data #{exc:{ticker:[(ts,price,sz,dir)]}}
self.feed = MockFeed(replayer=self,exchanges=self.exchanges)
self.callbacks = defaultdict(list)
self.event_queue = defaultdict(deque)
for exc in self.l2_data:
for ticker in self.l2_data[exc]:
feed_id = self.feed.get_feed_id(
exc=exc,
feed_cls=FeedCls.PERPETUAL,
feed_type=FeedType.L2BOOK,
ticker=ticker
)
self.add_events(
event_id=feed_id,
timestamps=[data['ts'] for data in self.l2_data[exc][ticker]],
events=self.l2_data[exc][ticker],
exc=exc,
latency=Latencies.FEED_PUBLIC
)
for exc in self.trade_data:
for ticker in self.trade_data[exc]:
feed_id = self.feed.get_feed_id(
exc=exc,
feed_cls=FeedCls.PERPETUAL,
feed_type=FeedType.TRADES,
ticker=ticker
)
self.add_events(
event_id=feed_id,
timestamps=[data[0] for data in self.trade_data[exc][ticker]],
events=self.trade_data[exc][ticker],
exc=exc,
latency=Latencies.FEED_PUBLIC
)
def time(self):
return self.replay_ts
def get_feed(self):
return self.feed
def add_callback(self,event_id,callback):
self.callbacks[event_id].append(callback)
def add_events(self,event_id,timestamps,events,exc,latency=Latencies.NULL):
delay = self.exchange_latencies[exc][latency]
for ts,msg in zip(timestamps,events):
self.event_queue[event_id].append((ts+delay,msg))
async def play(self):
queue = self.event_queue
while True:
events = list(queue.keys())
selector = np.argmin(
[queue[event_id][0][0]
if len(queue[event_id]) > 0 else 1e99 for event_id in events]
)
event_id = events[selector]
ts,msg = queue[event_id].popleft()
self.replay_ts = ts
for callback in self.callbacks[event_id]:
await callback(msg)
if all([len(queue[event_id]) == 0 for event_id in events]):
break
Notably, get_feed
gets us a copy of MockFeed, which we need to implement.
The data replay is done using event queues and event IDs - each event queue is a serial order of events. The event that is earliest is picked first and replayed. The time of simulation is used for data replay rather than time.time() - as we would in the live trading system.
Let us implement the MockFeed:
class MockFeed(Feed):
def __init__(self,replayer=None,gateway=None,exchanges=None):
self.gateway = gateway
self.feeds = {}
self.handlers = defaultdict(list)
self.replayer = replayer
async def add_l2_book_feed(self,exc,ticker,handler=None,depth=20,buffer=100,**kwargs):
feed_id = self.add_feed(
exc=exc,
feed_cls=FeedCls.PERPETUAL,
feed_type=FeedType.L2BOOK,
handler=handler,
ticker=ticker
)
lob = LOB(depth=depth,buffer_size=buffer)
on_update = self.handler(feed_id=feed_id)
async def mock_mirror(msg):
lob.update(msg['ts'],msg['b'],msg['a'],True)
await on_update(lob)
return
self.feeds[feed_id] = lob
self.replayer.add_callback(feed_id,mock_mirror)
return feed_id
async def add_trades_feed(self,exc,ticker,handler=None,buffer=100,**kwargs):
feed_id = self.add_feed(
exc=exc,
feed_cls=FeedCls.PERPETUAL,
feed_type=FeedType.TRADES,
handler=handler,
ticker=ticker
)
trades = Trades(buffer_size=buffer)
on_tick = self.handler(feed_id=feed_id,lambda_add=lambda trade: trades.append(trade))
async def mock_subscribe(msg):
await on_tick(msg)
return
self.feeds[feed_id] = trades
self.replayer.add_callback(feed_id,mock_subscribe)
return feed_id
This directly inherits from the actual live data feed class. Instead of subscribing to the exchange for tick data, the subscription handler is instead added as a callback to messages from the appropriate replayer event queue.
We can now run something like this:
and when simulated, we will see similar data replayed such as:
1727456732127 1727456732027 66284.85 #lob
1727456739811.0 [ 1.72745674e+12 6.62781000e+04 7.00000000e-03 -1.00] #trade
We see that the data is pushed to the lob and trade handlers in chronological order, with the appropriate public feed latency.
This is a rather complex piece of infrastructure, and I am not sure if I did a good job in explaining, so do let me know in the comments if you have any questions.
Next, we are going to do MockOMS, which involves bilateral message passing - this makes for a much more complicated logic.