Skip to content

Dynamic Point Aggregation

Python CI

1. Design Summary

The optimal runtime complexity for dynamic point aggregation from \(N\) point values to \(O(N)\) disjoint intervals is \(\Omega(N \log N)\) due to the ordering requirement. The request handler uses put requests to add new values and get requests to make a new interval sequence.

Because offline scenarios (batch data) have known usage patterns, the batch variant (V1) can optimize the hot path by simplifying high-usage put requests to \(O(1)\) and expanding low-usage get requests to \(O(N \log N)\).

However, because online scenarios (streaming data) do not have known usage patterns, the streaming variant (V2) must instead balance complexity between put requests (\(O(\log N)\)) and get requests (\(O(N)\)).

📊 Complexity Analysis

Operation Batch Variant (V1) Streaming Variant (V2) Comparison
Initial Setup \(O(1)\) \(O(1)\) Equivalent
Batch Updates (N Put, 1 Get) \(O(N \log N)\) \(O(N \log N)\) Equivalent
Streaming Updates (N Put, K Get) \(O(K*N \log N)\) \(O(N \log N + K * N)\) V2 is Superior
Space Complexity \(O(N)\) \(O(N)\) Equivalent

2. Design Details

The batch variant (V1) uses an unordered set to add new values as data points in \(O(1)\) per put request and does one pass over a new sorted list with sweep line method to generate disjoint intervals in \(O(N \log N)\) per get request.

The streaming variant (V2) uses two sorted sets with binary search to add new values as interval bounds in \(O(\log N)\) per put request (or \(O(\sqrt N)\) with python sortedcontainers) and does one pass over the existing sorted sets with sweep line method to generate disjoint intervals in \(O(N)\) per get request.

3. Design Trade-offs

The Efficiency Paradox

Although the batch variant (V1) was 2.7x faster in small-scale offline scenarios (N=100, R=0.1), the streaming variant (V2) was 6.4x faster in large-scale online scenarios (N=50000, R=0.5). This performance gain in online scenarios was due to the runtime complexity of the streaming variant at \(O(N \log N + K*N)\) increasing more slowly with data size compared to the batch variant at \(O(K*N \log N)\).

With python sortedcontainers, these runtime complexities were instead \(O(N \sqrt N + K*N)\) and \(O(K*N \log N)\) respectively. Although the streaming variant was still faster, the performance gain was less dramatic since \(O(\sqrt N)\) increases faster than \(O(\log N)\).

Streaming Benchmark Results

When the request traffic was mostly put requests (R=0.1) to simulate offline scenarios, the batch variant (V1) was 2.7x faster when data size was smaller (N=100) and the streaming variant was 4.9x faster when data size was larger (N=50000).

Speedup by Data Size (R=0.1)

When the request traffic was balanced beween put and get requests (R=0.5) to simulate online scenarios, the batch variant (V1) was 1.2x faster when data size was smaller (N=100) and the streaming variant was 6.4x faster when data size was larger (N=50000).

Speedup by Data Size (R=0.5)

These results highlight that the batch variant (V1) is suitable for smaller data streams and offline scenarios, whereas the streaming variant (V2) is suitable for larger data streams and online scenarios.

TestCaseID DataSize RequestDistribution V1TimeElapsed V2TimeElapsed SpeedupV1V2 SpeedupV2V1
0 100 0.100 0.000 0.001 2.706 0.370
1 100 0.200 0.000 0.001 1.998 0.500
2 100 0.500 0.001 0.001 1.289 0.776
3 100 0.800 0.002 0.003 1.042 0.960
4 200 0.100 0.001 0.001 2.115 0.473
5 200 0.200 0.001 0.001 1.642 0.609
6 200 0.500 0.003 0.003 1.010 0.990
7 200 0.800 0.013 0.010 0.821 1.218
8 500 0.100 0.002 0.003 1.606 0.623
9 500 0.200 0.004 0.005 1.116 0.896
10 500 0.500 0.020 0.016 0.816 1.226
11 500 0.800 0.071 0.051 0.726 1.378
12 1000 0.100 0.008 0.009 1.059 0.944
13 1000 0.200 0.016 0.014 0.861 1.162
14 1000 0.500 0.070 0.050 0.715 1.400
15 1000 0.800 0.273 0.186 0.680 1.471
16 2000 0.100 0.028 0.025 0.920 1.087
17 2000 0.200 0.059 0.046 0.776 1.289
18 2000 0.500 0.236 0.159 0.671 1.489
19 2000 0.800 1.002 0.651 0.650 1.538
20 5000 0.100 0.140 0.105 0.751 1.331
21 5000 0.200 0.326 0.216 0.663 1.509
22 5000 0.500 1.251 0.735 0.587 1.703
23 5000 0.800 5.099 3.020 0.592 1.688
24 10000 0.100 0.466 0.266 0.572 1.748
25 10000 0.200 1.031 0.541 0.525 1.906
26 10000 0.500 4.038 1.950 0.483 2.071
27 10000 0.800 15.824 7.414 0.469 2.134
28 20000 0.100 1.198 0.504 0.421 2.376
29 20000 0.200 2.633 0.990 0.376 2.660
30 20000 0.500 10.557 3.617 0.343 2.919
31 20000 0.800 42.168 14.237 0.338 2.962
32 50000 0.100 3.473 0.695 0.200 4.996
33 50000 0.200 7.765 1.349 0.174 5.757
34 50000 0.500 30.483 4.756 0.156 6.410
35 50000 0.800 122.369 19.013 0.155 6.436

4. Design Optimizations

State Transitions

To ensure new point values are correctly merged with existing disjoint intervals, I applied Correctness by Design by separately handling the 3 overlap cases: the enclosed-by case (early exit if lower and upper bounds at insertion point are already aligned), the adjacent-to case (join with 1 or 2 neighbors if lower and upper bounds at insertion point are adjacent), and the distinct-from case (insert as new lower and upper bounds).

5. API Reference

data_streaming_accelerators.core.dynamic_point_aggregation

Classes

DynamicPointAggregationV1

Bases: DynamicPointAggregationBase

Source code in src/data_streaming_accelerators/core/dynamic_point_aggregation.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class DynamicPointAggregationV1(DynamicPointAggregationBase):

    def __init__(self):
        """Make new object for dynamic point aggregation api.
        Takes runtime O(1) and memory O(1)."""
        self._points = set()

    @validators.validate_args([None, check_point])
    def addNum(self, value: int) -> None:
        """Entry point for `put` api.
        Add new value to stream history if not already added.
        Takes runtime O(1) and memory O(1)."""
        self._points.add(value)  # ignores duplicates

    @validators.validate_args([None])
    def getIntervals(self) -> list[list[int]]:
        """Make interval sequence from stream history.
        Takes runtime O(N Log N) and memory O(N)."""
        if not self._points: return []  # skip if empty
        intervals = []; sorted_points = sorted(self._points)
        a = b = 0; point_a = point_b = sorted_points[0]
        for c in range(1, len(sorted_points)):
            point_c = sorted_points[c]
            if point_c == point_b+1:
                b = c; point_b = point_c; continue
            intervals.append([point_a, point_b])
            a = b = c; point_a = point_b = point_c
        intervals.append([point_a, point_b])
        return intervals
Functions
__init__()

Make new object for dynamic point aggregation api. Takes runtime O(1) and memory O(1).

Source code in src/data_streaming_accelerators/core/dynamic_point_aggregation.py
13
14
15
16
def __init__(self):
    """Make new object for dynamic point aggregation api.
    Takes runtime O(1) and memory O(1)."""
    self._points = set()
addNum(value)

Entry point for put api. Add new value to stream history if not already added. Takes runtime O(1) and memory O(1).

Source code in src/data_streaming_accelerators/core/dynamic_point_aggregation.py
18
19
20
21
22
23
@validators.validate_args([None, check_point])
def addNum(self, value: int) -> None:
    """Entry point for `put` api.
    Add new value to stream history if not already added.
    Takes runtime O(1) and memory O(1)."""
    self._points.add(value)  # ignores duplicates
getIntervals()

Make interval sequence from stream history. Takes runtime O(N Log N) and memory O(N).

Source code in src/data_streaming_accelerators/core/dynamic_point_aggregation.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@validators.validate_args([None])
def getIntervals(self) -> list[list[int]]:
    """Make interval sequence from stream history.
    Takes runtime O(N Log N) and memory O(N)."""
    if not self._points: return []  # skip if empty
    intervals = []; sorted_points = sorted(self._points)
    a = b = 0; point_a = point_b = sorted_points[0]
    for c in range(1, len(sorted_points)):
        point_c = sorted_points[c]
        if point_c == point_b+1:
            b = c; point_b = point_c; continue
        intervals.append([point_a, point_b])
        a = b = c; point_a = point_b = point_c
    intervals.append([point_a, point_b])
    return intervals

DynamicPointAggregationV2

Bases: DynamicPointAggregationBase

Source code in src/data_streaming_accelerators/core/dynamic_point_aggregation.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class DynamicPointAggregationV2(DynamicPointAggregationBase):

    def __init__(self):
        """Make new object for dynamic point aggregation api.
        Takes runtime O(1) and memory O(1)."""
        self._lower_bounds = SortedSet()
        self._upper_bounds = SortedSet()
        self._bounds_total = 0

    @validators.validate_args([None, check_point])
    def addNum(self, value: int) -> None:
        """Add new value to stream history if not already added.
        Takes runtime O(Sqrt N) and memory O(1)."""
        _lower_bounds, _upper_bounds = self._lower_bounds, self._upper_bounds
        _bounds_total = self._bounds_total

        # 1. Find the lower bounds in sorted set using binary search.
        # Takes runtime O(Log N) and memory O(1).
        ip_lower_bounds = -1
        if (_ip := _lower_bounds.bisect_right(value)) != 0:
            ip_lower_bounds = _ip-1
            if _lower_bounds[ip_lower_bounds] == value: return

        # 2. Find the upper bounds in sorted set at nearby indices.
        # In theory, takes runtime O(1) and memory O(1).
        # With sortedcontainers indexing, takes runtime O(Log N) and memory O(1).
        ip_upper_bounds = _bounds_total
        if (_ip := ip_lower_bounds) != -1 \
        and _upper_bounds[_ip] >= value:
            ip_upper_bounds = _ip
            if _upper_bounds[ip_upper_bounds] == value: return
        elif (_ip := ip_lower_bounds+1) != _bounds_total \
        and _upper_bounds[_ip] >= value:
            ip_upper_bounds = _ip
            if _upper_bounds[ip_upper_bounds] == value: return
        if ip_lower_bounds == ip_upper_bounds: return

        # 3. Update sorted set to merge adjacent intervals.
        # In theory, takes runtime O(Log N) and memory O(1).
        # With sortedcontainers add/remove, takes runtime O(Sqrt N) and memory O(1).
        is_adjacent_to_next = (
            (0 <= ip_lower_bounds+1 <= _bounds_total-1
             and _lower_bounds[ip_lower_bounds+1] == value+1)
        )
        is_adjacent_to_prev = (
            (0 <= ip_upper_bounds-1 <= _bounds_total-1
             and _upper_bounds[ip_upper_bounds-1] == value-1)
        )
        if is_adjacent_to_next and is_adjacent_to_prev:
            self._lower_bounds.remove(value+1)
            self._upper_bounds.remove(value-1)
            self._bounds_total -= 1
        elif is_adjacent_to_next:
            self._lower_bounds.remove(value+1)
            self._lower_bounds.add(value)
        elif is_adjacent_to_prev:
            self._upper_bounds.remove(value-1)
            self._upper_bounds.add(value)
        else:
            self._lower_bounds.add(value)
            self._upper_bounds.add(value)
            self._bounds_total += 1

    @validators.validate_args([None])
    def getIntervals(self) -> list[list[int]]:
        """Make interval sequence from stream history.
        Takes runtime O(N) and memory O(N)."""
        return [
            list(pair) for pair in zip(self._lower_bounds, self._upper_bounds)
        ]
Functions
__init__()

Make new object for dynamic point aggregation api. Takes runtime O(1) and memory O(1).

Source code in src/data_streaming_accelerators/core/dynamic_point_aggregation.py
45
46
47
48
49
50
def __init__(self):
    """Make new object for dynamic point aggregation api.
    Takes runtime O(1) and memory O(1)."""
    self._lower_bounds = SortedSet()
    self._upper_bounds = SortedSet()
    self._bounds_total = 0
addNum(value)

Add new value to stream history if not already added. Takes runtime O(Sqrt N) and memory O(1).

Source code in src/data_streaming_accelerators/core/dynamic_point_aggregation.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@validators.validate_args([None, check_point])
def addNum(self, value: int) -> None:
    """Add new value to stream history if not already added.
    Takes runtime O(Sqrt N) and memory O(1)."""
    _lower_bounds, _upper_bounds = self._lower_bounds, self._upper_bounds
    _bounds_total = self._bounds_total

    # 1. Find the lower bounds in sorted set using binary search.
    # Takes runtime O(Log N) and memory O(1).
    ip_lower_bounds = -1
    if (_ip := _lower_bounds.bisect_right(value)) != 0:
        ip_lower_bounds = _ip-1
        if _lower_bounds[ip_lower_bounds] == value: return

    # 2. Find the upper bounds in sorted set at nearby indices.
    # In theory, takes runtime O(1) and memory O(1).
    # With sortedcontainers indexing, takes runtime O(Log N) and memory O(1).
    ip_upper_bounds = _bounds_total
    if (_ip := ip_lower_bounds) != -1 \
    and _upper_bounds[_ip] >= value:
        ip_upper_bounds = _ip
        if _upper_bounds[ip_upper_bounds] == value: return
    elif (_ip := ip_lower_bounds+1) != _bounds_total \
    and _upper_bounds[_ip] >= value:
        ip_upper_bounds = _ip
        if _upper_bounds[ip_upper_bounds] == value: return
    if ip_lower_bounds == ip_upper_bounds: return

    # 3. Update sorted set to merge adjacent intervals.
    # In theory, takes runtime O(Log N) and memory O(1).
    # With sortedcontainers add/remove, takes runtime O(Sqrt N) and memory O(1).
    is_adjacent_to_next = (
        (0 <= ip_lower_bounds+1 <= _bounds_total-1
         and _lower_bounds[ip_lower_bounds+1] == value+1)
    )
    is_adjacent_to_prev = (
        (0 <= ip_upper_bounds-1 <= _bounds_total-1
         and _upper_bounds[ip_upper_bounds-1] == value-1)
    )
    if is_adjacent_to_next and is_adjacent_to_prev:
        self._lower_bounds.remove(value+1)
        self._upper_bounds.remove(value-1)
        self._bounds_total -= 1
    elif is_adjacent_to_next:
        self._lower_bounds.remove(value+1)
        self._lower_bounds.add(value)
    elif is_adjacent_to_prev:
        self._upper_bounds.remove(value-1)
        self._upper_bounds.add(value)
    else:
        self._lower_bounds.add(value)
        self._upper_bounds.add(value)
        self._bounds_total += 1
getIntervals()

Make interval sequence from stream history. Takes runtime O(N) and memory O(N).

Source code in src/data_streaming_accelerators/core/dynamic_point_aggregation.py
106
107
108
109
110
111
112
@validators.validate_args([None])
def getIntervals(self) -> list[list[int]]:
    """Make interval sequence from stream history.
    Takes runtime O(N) and memory O(N)."""
    return [
        list(pair) for pair in zip(self._lower_bounds, self._upper_bounds)
    ]