Computing a Rolling Median¶
This describes how to code up a rolling median using a SkipList in both C++ and Python.
A rolling median operation means, for a SkipList, an insert(new_value) then at(middle) then remove(old_value).
The number of operations to calculate a rolling median is the data length minus the window length.
The same approach can be used for a rolling percentile.
Rolling Median in C++¶
Here is a reasonable C++ attempt at doing that with the arguments:
data- A vector of data of typeTof lengthL.win_length- a ‘window’ size. The median is computed over this number of values.result- a destination vector for the result. This will either end up withL - win_lengthvalues.
1#include "SkipList.h"
2
3template<typename T>
4RollingMedianResult rolling_median(const std::vector<T> data,
5 size_t win_length,
6 std::vector<T> &result) {
7 if (win_length == 0) {
8 return ROLLING_MEDIAN_WIN_LENGTH;
9 }
10 OrderedStructs::SkipList::HeadNode<T> sl;
11
12 result.clear();
13 std::vector<T> buffer;
14 for (size_t i = 0; i < data.size(); ++i) {
15 sl.insert(data[i]);
16 if (i >= win_length) {
17 if (win_length % 2 == 1) {
18 result.push_back(sl.at(win_length / 2));
19 } else {
20 /* Even length so average */
21 sl.at((win_length - 1) / 2, 2, buffer);
22 assert(buffer.size() == 2);
23 result.push_back(buffer[0] / 2 + buffer[1] / 2);
24 }
25 sl.remove(data[i - win_length]);
26 }
27 }
28 return ROLLING_MEDIAN_SUCCESS;
29}
A full example is the RollingMedian::rolling_median_lower_bound function in RollingMedian.h.
If you are working with C arrays (such as Numpy arrays) then this C’ish approach might be better, again error checking omitted:
1#include "SkipList.h"
2
3template <typename T>
4void rolling_median(const T *src, size_t count, size_t win_length, T *dest) {
5
6 OrderedStructs::SkipList::HeadNode<T> sl;
7 const T *tail = src;
8
9 for (size_t i = 0; i < count; ++i) {
10 sl.insert(*src);
11 if (i + 1 >= win_length) {
12 *dest = sl.at(win_length / 2);
13 ++dest;
14 sl.remove(*tail);
15 ++tail;
16 }
17 ++src;
18 }
19}
Multidimensional Numpy arrays have a stride value which is omitted in the above code but is simple to add. See RollingMedian.h and test/test_rolling_median.cpp for further examples.
Rolling percentiles require a argument that says what fraction of the window the required value lies. Again, this is easy to add.
Even Window Length¶
The above code assumes that if the window length is even that the median is at (window length - 1) / 2.
A more plausible median for even sized window lengths is the mean of (window length - 1) / 2 and
window length / 2.
This requires that the mean of two types is meaningful which it will not be for strings. In that case you will get this compilation error:
RollingMedian.h:91:52: error: invalid operands to binary expression ('value_type' (aka 'std::string') and 'int')
result.push_back(buffer[0] / 2 + buffer[1] / 2);
~~~~~~~~~ ^ ~
One remedy for this is to use the RollingMedian::rolling_median_lower_bound function.
This always uses the lower bound so works correctly for odd sized window lengths.
For even sized window lengths this chooses the lower value rather than averaging two values.
This is useful for, say, strings that can not be averaged.
C++ Performance¶
Here is a plot of the time taken to compute a rolling median on one million values using different window sizes. The time here is in ns/result and the number of results is 1e6 - window size. Given a data length of 1m then a window length of 1000 this would mean 999,000 operations. A window length of 500,000 this would mean 500,000 operations.
A window size of 1000 and 1m values (the size of the SkipList) takes around 750 ns/value or 0.75 second in total.
The test function is perf_roll_med_by_win_size() in src/cpp/test/test_performance.cpp.
Rolling Median in Python¶
Here is an example of computing a rolling median of a numpy 1D array.
This creates an array with the same length as the input starting with window_length NaN s:
1import numpy as np
2
3import orderedstructs
4
5def simple_python_rolling_median(vector: np.ndarray,
6 window_length: int) -> np.ndarray:
7 """Computes a rolling median of a numpy vector returning a new numpy
8 vector of the same length.
9 NaNs in the input are not handled but a ValueError will be raised."""
10 if vector.ndim != 1:
11 raise ValueError(
12 f'vector must be one dimensional not shape {vector.shape}'
13 )
14 skip_list = orderedstructs.SkipList(float)
15 ret = np.empty_like(vector)
16 for i in range(len(vector)):
17 value = vector[i]
18 skip_list.insert(value)
19 if i >= window_length - 1:
20 # // 4 for lower quartile
21 # * 3 // 4 for upper quartile etc.
22 median = skip_list.at(window_length // 2)
23 skip_list.remove(vector[i - window_length + 1])
24 else:
25 median = np.nan
26 ret[i] = median
27 return ret
This can be called thus:
np_array = np.arange(10.0)
print('Original:', np_array)
result = simple_python_rolling_median(np_array, 3)
print(' Result:', result)
And the result will be:
Original: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.]
Result: [nan nan 1. 2. 3. 4. 5. 6. 7. 8.]
Of course this Python code could be made much faster by using a Python C Extension.
Python Performance¶
Here is a plot of the time taken to compute a rolling median on one million values using different window sizes. The time here is in ns/result and the number of results is 1e6 - window size. Given a data length of 1m then a window length of 1000 this would mean 999,000 operations. A window length of 500,000 this would mean 500,000 operations.
A window size of 1000 and 1m values (the size of the SkipList) takes around 750 ns/value or 0.75 second in total.
Performance Comparison of C++ and Python¶
Here is the C++ performance plotted along with the Python performance.
As expected C++ is around 2x faster.
But Python has another trick up its sleeve that can make it outperform C++ decisively; multiprocessing with shared memory.
Handling NaNs¶
Not-a-number (NaN) values can not be inserted into a Skip List as they are not
comparable to anything (including themselves).
An attempt to call insert(), index(), has(), remove() with a NaN will raise an error.
In C++ this will throw a OrderedStructs::SkipList::FailedComparison.
In Python it will raise a ValueError.
This section looks at handling NaNs in Python.
Here are several ways of handling NaNs:
Propogate the Exception.
Make the Median NaN.
Forward Filling.
Propogate the Exception¶
Here is a rolling median that will raise a ValueError if there is a NaN in the input.
1def rolling_median_no_nan(vector: typing.List[float],
2 window_length: int) -> typing.List[float]:
3 """Computes a rolling median of a vector of floats and returns the results.
4 NaNs will throw an exception."""
5 skip_list = orderedstructs.SkipList(float)
6 ret: typing.List[float] = []
7 for i in range(len(vector)):
8 value = vector[i]
9 skip_list.insert(float(value)) # This will raise a ValueError on NaN
10 if i >= window_length:
11 median = skip_list.at(window_length // 2)
12 skip_list.remove(vector[i - window_length])
13 else:
14 median = math.nan
15 ret.append(median)
16 return ret
Make the Median NaN¶
Here is a rolling median that will make the median NaN if there is a NaN in the input. Incidentally this is the approach that numpy takes.
1def rolling_median_with_nan(vector: typing.List[float],
2 window_length: int) -> typing.List[float]:
3 """Computes a rolling median of a vector of floats and returns the results.
4 NaNs will be consumed."""
5 skip_list = orderedstructs.SkipList(float)
6 ret: typing.List[float] = []
7 for i in range(len(vector)):
8 value = vector[i]
9 if math.isnan(value):
10 median = math.nan
11 else:
12 skip_list.insert(float(value))
13 if i >= window_length:
14 median = skip_list.at(window_length // 2)
15 remove_value = vector[i - window_length]
16 if not math.isnan(remove_value):
17 skip_list.remove(remove_value)
18 else:
19 median = math.nan
20 ret.append(median)
21 return ret
The first row is the input, the second the output. Window length is 5:
[0.0, math.nan, 2.0, 3.0, 4.0, 5.0, 6.0, math.nan, 8.0, 9.0],
[math.nan, math.nan, math.nan, math.nan, math.nan, 3.0, 4.0, math.nan, 4.0, 5.0],
Forward Filling¶
Another approach is to replace the NaN with the previous value. This is very popular in FinTech and is commonly know as Forward Filling. Here is an implementation:
1def forward_fill(vector: typing.List[float]) -> None:
2 """Forward fills NaNs in-place."""
3 previous_value = math.nan
4 for i in range(len(vector)):
5 value = vector[i]
6 if math.isnan(value):
7 vector[i] = previous_value
8 if not math.isnan(value):
9 previous_value = value
10
11def rolling_median_with_nan_forward_fill(vector: typing.List[float],
12 window_length: int) -> typing.List[float]:
13 """Computes a rolling median of a vector of floats and returns the results.
14 NaNs will be forward filled."""
15 forward_fill(vector)
16 return rolling_median_no_nan(vector, window_length)
The first row is the input, the second the output. Window length is 5:
[0.0, math.nan, 2.0, 3.0, 4.0, 5.0, 6.0, math.nan, 8.0, 9.0],
[math.nan, math.nan, math.nan, math.nan, math.nan, 2.0, 3.0, 4.0, 5.0, 6.0],
Another example where [3] is now a NaN, the first row is the input, the second the output. Window length is 5:
[0.0, math.nan, 2.0, math.nan, 4.0, 5.0, 6.0, math.nan, 8.0, 9.0],
[math.nan, math.nan, math.nan, math.nan, math.nan, 2.0, 2.0, 4.0, 5.0, 6.0],
There is no ‘right way’ to handle NaNs. They are always problematic. For example what is the ‘right way’ to sort a sequence of values that may include NaNs?