Implementing a C-level Ringbuffer in Python
We discussed Cython in our last market notes:
By the way - we added more to the bounty pool:
https://hangukquant.github.io/bounties/
The bounty is to thank and encourage people to learn and contribute to the quantpylib repo. Payouts are active and claimable.
Obtain the repo pass here. The cringbuffer implementation, benchmarks and tests has been incorporated into the repo and are available for quants to use. Documentation will follow.
This post - we are going to implement a ringbuffer extension class in Cython. Next post - we are going to implement a fast limit order book.
Implementation for a ringbuffer
A ringbuffer, also known as a circular buffer, is a data structure used in computing to efficiently manage a fixed-size, continuous stream of data. It comes with a predetermined capacity and typically uses two pointers—often called the "head" (left) and "tail" (right)—to mark the start and end of the buffer. When either pointer reaches the end of the buffer, it wraps around to the beginning. This allows data to be added or accessed from the head or tail in constant time, making it highly efficient for specific use cases. However, accessing data in the middle of the buffer is less efficient. Once the buffer reaches full capacity, adding new data overwrites the oldest data.
In the world of markets, continuous data streams are everywhere—think orderbook updates, market trades, and more. For handling contiguous numerical data in such scenarios, the numpy_ringbuffer library provides a handy implementation of the circular buffer. You can check out the reference implementation on GitHub (or at the bottom of the page).
Since our trading system receives up to hundreds of thousands of socket messages in the market data stream in short periods of time, we are interested in writing a C-level extension class that speeds up critical operations.
We will first offer our solution, and then explain our code in detail. First, we require declarations (if you are unfamiliar with Cython syntax, read the Cython tutorial from our last post first):
Here is the implementation file:
The explanation; (please read the reference numpy_ringbuffer implementation, as comparisons are drawn)
We know that NumPy arrays and array indexing are supported in Cython. By default, our ringbuffer data structure is implemented to allow overwrites, so the instance attribute allow_overwrite is excluded.
The functions _unwrap, append, and appendleft are declared as cdef or cpdef methods. The _fix_indices function is renamed to _wraparound in our implementation. Other methods are implemented with standard Python def semantics.
In the reference implementation, operations that add data (append and appendleft) include a conditional check where the branch self.is_full >> elif not len(self) evaluates to True when the buffer is full, yet empty. This handles the edge case when buffer capacity is vacuously zero. We can skip these checks if we validate buffer capacity to be positive integers. Otherwise, our initialization logic mirrors the reference.
The _unwrap method is only called within the extension class and interacts solely with C-types, so we can declare it as cdef. Plus, we’ve added a small optimization: if the data can be obtained contiguously, np.concatenate unnecessarily copies data from its first argument. Instead, we can just return the sliced view.
The cdef-ication of _fix_indices into _wraparound significantly boosts runtime by cutting function call overhead and optimizing internal arithmetic logic to avoid dynamic dispatch.
Extension type properties can be declared with standard Python syntax or, as in our example, using property is_full: . We also tweaked the behavior of shape, though this isn’t a performance optimization, so we won’t dive into it here.
Since we guarantee overwrite behavior and ensure the buffer has non-zero capacity, the logic for a full buffer in append and appendleft simplifies considerably. Beyond that, the logic stays largely the same. In fact, most other functions follow the reference implementation closely. Member attributes are now statically typed C-level attributes, but aside from that, no major changes stand out. The most compelling part of the code though, requires a bit of reflection.
Let’s establish some invariants—properties of the ringbuffer that hold true before and after every operation throughout its life. They are:
0 <= left < capacity
0 <= left <= right
The first condition is straightforward to verify: wherever the left index is modified, _wraparound is called, keeping it in bounds. For the second, notice that in _wraparound, both left and right indices shift by a constant factor, preserving their relative order.
In append and appendleft, the right index increments at least as much as the left, and the left decrements at least as much as the right. In pop and popleft, the right index decrements or the left index increments only when len(self)—or equivalently, self._right_index - self._left_index—is at least one. Thus, our invariant holds steady.
The goal is to get Cython to generate efficient C code that skips index bounds and wraparound checks. To pull this off, we need to guarantee our array indexing stays within capacity and never goes negative. Thanks to our invariants, indexing with the left pointer is safe. For the right pointer, we’re good as long as we apply a modulo operation (right % capacity) to the indexer.
On top of that, the % operator follows C-level semantics when we set the compiler directive cdivision=True, otherwise it sticks to Python semantics. For correct behavior, the first operand must be positive. Based on our invariants, the only right-indexing we need to worry about is in peek and pop. Since we’ve already confirmed the buffer isn’t empty, the right index is at least one—so we’re in the clear.
The compiler directives
#cython:language_level=3, boundscheck=False, wraparound=False, cdivision=True
is indeed safe, and generates efficient indexing code.
There’s one spot where we don’t control the indexing. Imagine a ringbuffer instance, say cbuffer. A user can freely call cbuffer[:-5], which triggers the __getitem__ method with negative indexing behavior. Here, we need to tell the compiler to fall back on wraparound and safe indexing code to handle it properly.
On a side note, tweaking only the right indices doesn’t mess with the invariant property checked by _wraparound. That means we can ditch the index-fixing step in the pop operation from the reference implementation—nice little cleanup!
Once we’re done, we’ll roll out test cases to confirm our ringbuffer optimizations don’t introduce sneaky side effects and actually deliver the performance boost we’re after.
The benchmarking code can be viewed in quantpylib.benchmarks.ringbuffer of the quantpylib repository.
The operation we’ll lean on most in the next section is the append functionality. We’ll chart the time taken in nanoseconds for 1000 appends (y-axis) across 1000 trials (x-axis).
Reference Implementation/ numpy_ringbuffer
"""
MIT License
Copyright (c) 2016 Eric Wieser
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import numpy as np
from collections import Sequence
class RingBuffer(Sequence):
def __init__(self, capacity, dtype=float, allow_overwrite=True):
"""
Create a new ring buffer with the given capacity and element type
Parameters
----------
capacity: int
The maximum capacity of the ring buffer
dtype: data-type, optional
Desired type of buffer elements. Use a type like (float, 2) to
produce a buffer with shape (N, 2)
allow_overwrite: bool
If false, throw an IndexError when trying to append to an already
full buffer
"""
self._arr = np.empty(capacity, dtype)
self._left_index = 0
self._right_index = 0
self._capacity = capacity
self._allow_overwrite = allow_overwrite
def _unwrap(self):
""" Copy the data from this buffer into unwrapped form """
return np.concatenate((
self._arr[self._left_index:min(self._right_index, self._capacity)],
self._arr[:max(self._right_index - self._capacity, 0)]
))
def _fix_indices(self):
"""
Enforce our invariant that 0 <= self._left_index < self._capacity
"""
if self._left_index >= self._capacity:
self._left_index -= self._capacity
self._right_index -= self._capacity
elif self._left_index < 0:
self._left_index += self._capacity
self._right_index += self._capacity
@property
def is_full(self):
""" True if there is no more space in the buffer """
return len(self) == self._capacity
# numpy compatibility
def __array__(self):
return self._unwrap()
@property
def dtype(self):
return self._arr.dtype
@property
def shape(self):
return (len(self),) + self._arr.shape[1:]
# these mirror methods from deque
@property
def maxlen(self):
return self._capacity
def append(self, value):
if self.is_full:
if not self._allow_overwrite:
raise IndexError('append to a full RingBuffer with overwrite disabled')
elif not len(self):
return
else:
self._left_index += 1
self._arr[self._right_index % self._capacity] = value
self._right_index += 1
self._fix_indices()
def appendleft(self, value):
if self.is_full:
if not self._allow_overwrite:
raise IndexError('append to a full RingBuffer with overwrite disabled')
elif not len(self):
return
else:
self._right_index -= 1
self._left_index -= 1
self._fix_indices()
self._arr[self._left_index] = value
def pop(self):
if len(self) == 0:
raise IndexError("pop from an empty RingBuffer")
self._right_index -= 1
self._fix_indices()
res = self._arr[self._right_index % self._capacity]
return res
def popleft(self):
if len(self) == 0:
raise IndexError("pop from an empty RingBuffer")
res = self._arr[self._left_index]
self._left_index += 1
self._fix_indices()
return res
def extend(self, values):
pass
def extendleft(self, values):
pass
# implement Sequence methods
def __len__(self):
return self._right_index - self._left_index
def __getitem__(self, item):
# handle simple (b[1]) and basic (b[np.array([1, 2, 3])]) fancy indexing specially
if not isinstance(item, tuple):
item_arr = np.asarray(item)
if issubclass(item_arr.dtype.type, np.integer):
item_arr = (item_arr + self._left_index) % self._capacity
return self._arr[item_arr]
# for everything else, get it right at the expense of efficiency
return self._unwrap()[item]
def __iter__(self):
# alarmingly, this is comparable in speed to using itertools.chain
return iter(self._unwrap())
# Everything else
def __repr__(self):
return '<RingBuffer of {!r}>'.format(np.asarray(self))