HangukQuant Research

HangukQuant Research

Share this post

HangukQuant Research
HangukQuant Research
market data replay (part1; agent based quant infrastructure)
Copy link
Facebook
Email
Notes
More

market data replay (part1; agent based quant infrastructure)

HangukQuant's avatar
HangukQuant
Sep 27, 2024
∙ Paid
4

Share this post

HangukQuant Research
HangukQuant Research
market data replay (part1; agent based quant infrastructure)
Copy link
Facebook
Email
Notes
More
Share

For those who missed it, we just created a quantpylib repo access pass as a cheaper alternative to the yearly substack:

quantpylib access pass

quantpylib access pass

HangukQuant
·
September 25, 2024
Read full story

We are running special discounts on launch, and there is about 4 days, remaining if you are interested!

Also, our funding arbitrage lectures (round 2) signups will be left open for one more week!

Elementary Mid-Frequency Funding Arbitrage (round 2)

Elementary Mid-Frequency Funding Arbitrage (round 2)

HangukQuant
·
September 23, 2024
Read full story

In the last post, we demonstrated how to do market making using quantpylib:

Market making. Code.

HangukQuant
·
September 23, 2024
Market making. Code.

Read full story

You can refer here too: https://hangukquant.github.io/scripts/market_making/

Now, one of the key criterions of good dev infrastructure is portability. This refers to portability across exchanges, strategies, environments and what not. The exchange portability was demonstrated in that post. Strategy portability is demonstrated with our genetic alphas, but we have not yet demonstrated such behavior with our hft/tick data backtests.

That is, we would have to write a completely different set of code for production and backtests - and this is not ideal (for obvious reasons, most critically error proneness and effort).

If we are able to replicate these events (tick arrival, order creates, order ack, order fills, latencies etc), then we would be one step closer to having a unified model. In particular, if we are able to create realistic simulator (mock) classes for the OMS and Feed, and match their interface (function signatures, etc) - then we would have a singular codebase for tests and production - all while preserving flexibility of the trade driver logic.

Today, we shall look at how to create a mock class for the data feed and do a market data replay.

Here is how different components/agents of the quant system might behave:

Different exchanges are connected through a proprietary gateway, and state managers such as order management systems and data feeds make use of the gateway to provide services to trading strategies (drivers).

To decide what we want to emulate, we need to first know what the real behavior looks like. Let’s write some code to get data feeds:

Streaming BTC trade and l2 on binance:

import os
import asyncio 
import numpy as np 

from pprint import pprint 
from dotenv import load_dotenv
load_dotenv()
from decimal import Decimal 

from quantpylib.utilities.general import save_pickle, load_pickle

simulated = True
exc,ticker = 'binance','BTCUSDT'
config_keys = {
    "binance" : {
        "key":os.getenv('BIN_KEY'),
        "secret":os.getenv('BIN_SECRET'),    
    },
}

from quantpylib.hft.feed import Feed
from quantpylib.utilities.general import _time
from quantpylib.gateway.master import Gateway 
gateway = Gateway(config_keys=config_keys)
feed = Feed(gateway=gateway)
time = _time 

async def lob_handler(lob):
    print(time(), lob.timestamp, lob.get_mid())

async def trade_handler(trade):
    print(time(), trade)

async def main():

    await gateway.init_clients()

    l2_feed = await feed.add_l2_book_feed(
        exc=exc,ticker=ticker,handler=lob_handler
    )
    trades_feed = await feed.add_trades_feed(
        exc=exc,ticker=ticker,handler=trade_handler
    )
    await asyncio.sleep(60)
    lob = feed.get_feed(l2_feed)
    trades = feed.get_feed(trades_feed)
    
    save_pickle('data.pickle',(lob,trades))
        
if __name__ == "__main__":
    asyncio.run(main())    

If we let this run, we will see that we get outputs such as:

1727456689856 (1727456689749, 66306.2, 0.082, 1) #trade_handler
1727456689989 (1727456689904, 66306.8, 0.463, -1) #trade_handler
1727456690139 (1727456690025, 66306.9, 0.009, 1) #trade_handler

1727456725394 1727456725325 66304.65 #lob_handler

After adding the data feeds, we sleep for sixty seconds. Since this is an asynchronous sleep, the tick data arrivals are passed into the handlers. Afterwards, the data buffers are obtained using their feed ID and saved.

Of the many pains that plague hft backtesting (market impact, queue position, traffic congestion etc), one of the most important is latency. There are many different types of latencies…consider:

In most cases, public and private feeds (l2 delta updates vs order fills) have different latencies. Request (req) and acknowledgments (ack) have different latencies.

In the general case…the OMS and Feed is used for multi-exchange portfolios. Geographical differences between different exchange servers (binance/tokyo vs bybit/singapore), as well as different exchange protocols (varying TPS etc) can affect latencies. We need to account for all of these for a more realistic modeling.

To do this, we need to create a data replay class. Let’s just concern ourself with public data feed for now, and unilaterally (from exchange server to local agent message passing).

The actual code is abit long for me to discuss, so here is a data replayer pseudo code (will push actual code to quantpylib/main later):

class Replayer():

    def __init__(
        self,
        l2_data,
        trade_data,
        exchange_latencies={
            Latencies.NULL:0,
            Latencies.REQ_PRIVATE:50, #milliseconds
            Latencies.REQ_PUBLIC:100,
            Latencies.ACK_PRIVATE:50,
            Latencies.ACK_PUBLIC:100,
            Latencies.FEED_PRIVATE:50,
            Latencies.FEED_PUBLIC:100,   
        }
    ):
        self.exchange_latencies = exchange_latencies
        self.l2_data = l2_data # {exc:{ticker:[{ts,b,a}]}}
        self.trade_data = trade_data #{exc:{ticker:[(ts,price,sz,dir)]}}
        self.feed = MockFeed(replayer=self,exchanges=self.exchanges)
        self.callbacks = defaultdict(list)
        self.event_queue = defaultdict(deque) 

        for exc in self.l2_data:
            for ticker in self.l2_data[exc]:
                feed_id = self.feed.get_feed_id(
                    exc=exc,
                    feed_cls=FeedCls.PERPETUAL,
                    feed_type=FeedType.L2BOOK,
                    ticker=ticker
                )
                self.add_events(
                    event_id=feed_id,
                    timestamps=[data['ts'] for data in self.l2_data[exc][ticker]],
                    events=self.l2_data[exc][ticker],
                    exc=exc,
                    latency=Latencies.FEED_PUBLIC
                )

        for exc in self.trade_data:
            for ticker in self.trade_data[exc]:
                feed_id = self.feed.get_feed_id(
                    exc=exc,
                    feed_cls=FeedCls.PERPETUAL,
                    feed_type=FeedType.TRADES,
                    ticker=ticker
                )
                self.add_events(
                    event_id=feed_id,
                    timestamps=[data[0] for data in self.trade_data[exc][ticker]],
                    events=self.trade_data[exc][ticker],
                    exc=exc,
                    latency=Latencies.FEED_PUBLIC
                )
    
    def time(self):
        return self.replay_ts

    def get_feed(self):
        return self.feed

    def add_callback(self,event_id,callback):
        self.callbacks[event_id].append(callback)

    def add_events(self,event_id,timestamps,events,exc,latency=Latencies.NULL):
        delay = self.exchange_latencies[exc][latency]
        for ts,msg in zip(timestamps,events):
            self.event_queue[event_id].append((ts+delay,msg))

    async def play(self):        
        queue = self.event_queue
        while True:
            events = list(queue.keys())
            selector = np.argmin(
                [queue[event_id][0][0] 
                if len(queue[event_id]) > 0 else 1e99 for event_id in events]
            )
            event_id = events[selector]
            ts,msg = queue[event_id].popleft()
            self.replay_ts = ts
        
            for callback in self.callbacks[event_id]:
                await callback(msg)
            
            if all([len(queue[event_id]) == 0 for event_id in events]):
                break

Notably, get_feed gets us a copy of MockFeed, which we need to implement.

The data replay is done using event queues and event IDs - each event queue is a serial order of events. The event that is earliest is picked first and replayed. The time of simulation is used for data replay rather than time.time() - as we would in the live trading system.

Let us implement the MockFeed:


class MockFeed(Feed):

    def __init__(self,replayer=None,gateway=None,exchanges=None):
        self.gateway = gateway
        self.feeds = {}
        self.handlers = defaultdict(list)
        self.replayer = replayer
    
    async def add_l2_book_feed(self,exc,ticker,handler=None,depth=20,buffer=100,**kwargs):
        feed_id = self.add_feed(
            exc=exc,
            feed_cls=FeedCls.PERPETUAL,
            feed_type=FeedType.L2BOOK,
            handler=handler,
            ticker=ticker
        )
        lob = LOB(depth=depth,buffer_size=buffer)
        on_update = self.handler(feed_id=feed_id)
        async def mock_mirror(msg):
            lob.update(msg['ts'],msg['b'],msg['a'],True)
            await on_update(lob)
            return 
        
        self.feeds[feed_id] = lob
        self.replayer.add_callback(feed_id,mock_mirror)
        return feed_id
    
    async def add_trades_feed(self,exc,ticker,handler=None,buffer=100,**kwargs):
        feed_id = self.add_feed(
            exc=exc,
            feed_cls=FeedCls.PERPETUAL,
            feed_type=FeedType.TRADES,
            handler=handler,
            ticker=ticker
        )
        trades = Trades(buffer_size=buffer)
        on_tick = self.handler(feed_id=feed_id,lambda_add=lambda trade: trades.append(trade))
        async def mock_subscribe(msg):
            await on_tick(msg)
            return 

        self.feeds[feed_id] = trades
        self.replayer.add_callback(feed_id,mock_subscribe)
        return feed_id

This directly inherits from the actual live data feed class. Instead of subscribing to the exchange for tick data, the subscription handler is instead added as a callback to messages from the appropriate replayer event queue.

We can now run something like this:

and when simulated, we will see similar data replayed such as:

1727456732127 1727456732027 66284.85 #lob
1727456739811.0 [ 1.72745674e+12  6.62781000e+04  7.00000000e-03 -1.00] #trade

We see that the data is pushed to the lob and trade handlers in chronological order, with the appropriate public feed latency.

This is a rather complex piece of infrastructure, and I am not sure if I did a good job in explaining, so do let me know in the comments if you have any questions.

Next, we are going to do MockOMS, which involves bilateral message passing - this makes for a much more complicated logic.

HangukQuant Community Github Repo

HangukQuant Community Github Repo

HangukQuant
·
November 24, 2023
Read full story

This post is for paid subscribers

Already a paid subscriber? Sign in
© 2025 QUANTA GLOBAL PTE. LTD. 202328387H.
Privacy ∙ Terms ∙ Collection notice
Start writingGet the app
Substack is the home for great culture

Share

Copy link
Facebook
Email
Notes
More