Skip to content

Dynamic Interval Management

Python CI

1. Design Summary

The theoretical lower bound for merging \(N\) static intervals is \(\Omega(N \log N)\) due to the sorting requirement. By focusing on interval edges rather than the source intervals themselves, we can focus on efficient algorithms with compact implementation. However, in Online Streaming Environments, the task shifts from batch data loading (static intervals) to streaming data ingestion (dynamic intervals).

Graph for Performance Report

📊 Complexity Analysis

Operation Batch Variants (v1/v2) Streaming Variant (v3) Comparison
Initial Setup \(O(N \log N)\) \(O(N \log N)\) Equivalent
Batch Updates (N) \(O(N \log N)\) \(O(N \log N)\) Equivalent
Streaming Updates (1) \(O(N \log N)\) \(O(\log N)\) Streaming Variant is superior
Space Complexity \(O(N)\) \(O(N)\) Equivalent

🚀 The Pruning Logic (Visual)

The advantage of an Augmented AVL Interval Tree is its ability to prune search branches that cannot contain an overlap, ensuring \(O(\log N)\) search time.

graph TD
    Start(["Search for Overlap with Query  [7, 9]"]) --> Root

    Root{"Node:  [5, 8]<br/>Subtree Max: 20"} -- "Overlap Check" --> Match
    Root -- "Check Left Subtree" --> LeftCheck{"Left Subtree Max >= 7?"}

    Match{"Overlap Found!<br/>5 <= 9 AND 7 <= 8"}

    LeftCheck -- "No (e.g., Max=6)" --> PruneLeft["<b>PRUNE:</b> Skip Left Branch"]
    LeftCheck -- "Yes" --> SearchLeft[Recurse Left]

    Root -- "Check Right Subtree" --> RightCheck{"Node Lo <= 9?"}
    RightCheck -- "No (e.g., Lo=15)" --> PruneRight["<b>PRUNE:</b> Skip Right Branch"]
    RightCheck -- "Yes" --> SearchRight[Recurse Right]

    style PruneLeft fill:#ffcccc,stroke:#333
    style PruneRight fill:#ffcccc,stroke:#333
    style Match fill:#ccffcc,stroke:#333

2. Design Approaches

Optimal Approaches for Batch Data (Static Intervals)

  • Sweep-line (Variant 1): Uses a sorted list of edge objects and a counter.
    • Complexity (Batch): \(O(N \log N)\) setup, \(O(N)\) calculation.
    • Complexity (Streaming): \(O(N \log N)\) per insert/put operation.
    • Verdict: Suitable for batch case due to second-best ~16ms benchmark with static intervals impacted by object-overhead during calculation. Not suitable for streaming case due to high complexity.
  • Two-Pointer Search (Variant 2): Uses two sorted edge arrays and two pointers.
    • Complexity (Batch): \(O(N \log N)\) setup, \(O(N)\) calculation.
    • Complexity (Streaming): \(O(N \log N)\) per insert/put operation.
    • Verdict: Suitable for batch case due to best-in-class ~7ms benchmark with static intervals. Not suitable for streaming case due to high complexity.

Optimal Approaches for Streaming Data (Dynamic Intervals)

  • Augmented AVL Interval Tree (Variant 3): A stateful, self-balancing BST.
    • Complexity (Batch): \(O(N \log N)\) setup, \(O(N)\) calculation.
    • Complexity (Streaming): \(O(\log N)\) per insert/put operation.
    • Verdict: Not suitable for batch case due to poor ~4130ms benchmark with static intervals impacted by object-overhead during calculation. Suitable for streaming case due to best-in-class complexity per insert/put operation. Notice the optimal approaches for batch data merging are easier to maintain, with 90% fewer LOC (~25 vs ~250)!

3. Design Trade-offs

The Efficiency Paradox

In benchmarks with static intervals, the Two-Pointer Search (Variant 2) was 400x faster than the Interval Tree (Variant 3). However, in Online Streaming Environments, the Two-Pointer Search overall complexity degrades to \(O(N^2 \log N)\) over a series of \(N\) updates while the Interval Tree overall complexity maintains \(O(N \log N)\).

Streaming Benchmark Results

In benchmarks with dynamic intervals up to \(N=3000\), the Interval Tree (Variant 3) was as much as 50x faster than the Two-Pointer Search (Variant 2). While the Two-Pointer Search (Variant 2) is faster for a single batch run, it fails to scale in an Online Streaming Environment where the merged state must be updated for every new interval. As \(N\) increases, the Two-Pointer Search runtime explodes quadratically while the Interval Tree runtime continues linearly.

Input Size (N) Batch-Style (s) Tree-Style (s) Speedup
100 0.0015 0.0021 0.7x
500 0.0359 0.0140 2.6x
1000 0.1501 0.0230 6.5x
2000 0.6743 0.0383 17.6x
3000 1.6093 0.0318 50.6x

4. Design Optimizations

Augmented Metadata

By augmenting the BST with subtree_max metadata, I reduced the search-and-merge task from a linear scan to a logarithmic search. This allows the streaming platform to maintain a "Live" merged state with \(O(\log N)\) latency per request.

Invariant-Driven Correctness

To ensure system reliability during high-frequency updates, I prioritized Correctness by Design by explicitly maintaining the AVL balance and metadata invariants during every mutation (insert, delete).

Architectural Hardening

By implementing a Sentinel Node Architecture with a Two-Down AVL balancing subtree rotation strategy and removing the need for sentinel node checks throughout the codebase, I simplified the balancing feature and provided a cleaner and more maintainable implementation.

Python Object-Model Tuning

To reduce object-overhead due to the Python object-model, I implemented Attribute Flattening. After replacing packed collections (self.interval[0]) with discrete integers (self.lo, self.hi), I reduced constant factors in the hot execution path and achieved a 15% reduction in benchmarks with static intervals.

5. API Reference

data_streaming_accelerators.core.dynamic_interval_management

Classes

DynamicIntervalManagementV1

Bases: DynamicIntervalManagementBase

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
class DynamicIntervalManagementV1(DynamicIntervalManagementBase):

    @validators.validate_args([None, check_intervals])
    def merge(self, intervals: List[List[int]]) -> List[List[int]]:
        """Entry point for `merge` api.
        Takes runtime O(N Log N) and memory O(N) for N items."""

        n = len(intervals)

        sorted_edge_weights = [None] * (2*n)
        for i, (lo, hi) in enumerate(intervals):
            sorted_edge_weights[2*i] = (lo, -1)
            sorted_edge_weights[2*i+1] = (hi, 1)
        sorted_edge_weights.sort()  # sweep-line approach

        merged_intervals = [None] * n; count_intervals = 0
        interval_lo, interval_weight = 0, 0
        for edge, weight in sorted_edge_weights:
            if weight == -1 and interval_weight == 0:
                interval_lo = edge
            interval_weight += weight
            if weight == 1 and interval_weight == 0:
                merged_intervals[count_intervals] = (interval_lo, edge)
                count_intervals += 1
        return merged_intervals[:count_intervals]
Functions
merge(intervals)

Entry point for merge api. Takes runtime O(N Log N) and memory O(N) for N items.

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
    """Entry point for `merge` api.
    Takes runtime O(N Log N) and memory O(N) for N items."""

    n = len(intervals)

    sorted_edge_weights = [None] * (2*n)
    for i, (lo, hi) in enumerate(intervals):
        sorted_edge_weights[2*i] = (lo, -1)
        sorted_edge_weights[2*i+1] = (hi, 1)
    sorted_edge_weights.sort()  # sweep-line approach

    merged_intervals = [None] * n; count_intervals = 0
    interval_lo, interval_weight = 0, 0
    for edge, weight in sorted_edge_weights:
        if weight == -1 and interval_weight == 0:
            interval_lo = edge
        interval_weight += weight
        if weight == 1 and interval_weight == 0:
            merged_intervals[count_intervals] = (interval_lo, edge)
            count_intervals += 1
    return merged_intervals[:count_intervals]

DynamicIntervalManagementV2

Bases: DynamicIntervalManagementBase

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
class DynamicIntervalManagementV2(DynamicIntervalManagementBase):

    @validators.validate_args([None, check_intervals])
    def merge(self, intervals: List[List[int]]) -> List[List[int]]:
        """Entry point for `merge` api.
        Takes runtime O(N Log N) and memory O(N) for N items."""

        n = len(intervals)

        sorted_lo_edges = [None] * n
        for i, (lo, _) in enumerate(intervals):
            sorted_lo_edges[i] = lo
        sorted_lo_edges.sort()

        sorted_hi_edges = [None] * n
        for i, (_, hi) in enumerate(intervals):
            sorted_hi_edges[i] = hi
        sorted_hi_edges.sort()

        merged_intervals = [None] * n; count_intervals = 0
        interval_lo, interval_weight = 0, 0
        p1, p2 = 0, 0  # two-pointer search
        while p1 < n and p2 < n:
            lo, hi = sorted_lo_edges[p1], sorted_hi_edges[p2]
            if lo <= hi:
                if interval_weight == 0:
                    interval_lo = lo
                interval_weight -= 1; p1 += 1
            else:
                interval_weight += 1; p2 += 1
                if interval_weight == 0:
                    merged_intervals[count_intervals] = (interval_lo, hi)
                    count_intervals += 1
        while p2 < n:
            lo, hi = sorted_lo_edges[-1], sorted_hi_edges[p2]
            interval_weight += 1; p2 += 1
            if interval_weight == 0:
                merged_intervals[count_intervals] = (interval_lo, hi)
                count_intervals += 1
        return merged_intervals[:count_intervals]
Functions
merge(intervals)

Entry point for merge api. Takes runtime O(N Log N) and memory O(N) for N items.

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
    """Entry point for `merge` api.
    Takes runtime O(N Log N) and memory O(N) for N items."""

    n = len(intervals)

    sorted_lo_edges = [None] * n
    for i, (lo, _) in enumerate(intervals):
        sorted_lo_edges[i] = lo
    sorted_lo_edges.sort()

    sorted_hi_edges = [None] * n
    for i, (_, hi) in enumerate(intervals):
        sorted_hi_edges[i] = hi
    sorted_hi_edges.sort()

    merged_intervals = [None] * n; count_intervals = 0
    interval_lo, interval_weight = 0, 0
    p1, p2 = 0, 0  # two-pointer search
    while p1 < n and p2 < n:
        lo, hi = sorted_lo_edges[p1], sorted_hi_edges[p2]
        if lo <= hi:
            if interval_weight == 0:
                interval_lo = lo
            interval_weight -= 1; p1 += 1
        else:
            interval_weight += 1; p2 += 1
            if interval_weight == 0:
                merged_intervals[count_intervals] = (interval_lo, hi)
                count_intervals += 1
    while p2 < n:
        lo, hi = sorted_lo_edges[-1], sorted_hi_edges[p2]
        interval_weight += 1; p2 += 1
        if interval_weight == 0:
            merged_intervals[count_intervals] = (interval_lo, hi)
            count_intervals += 1
    return merged_intervals[:count_intervals]

DynamicIntervalManagementV3

Bases: DynamicIntervalManagementBase

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class DynamicIntervalManagementV3(DynamicIntervalManagementBase):

    class IntervalMergeTreeAVLBSTNode:
        SENTINEL_INTERVAL = (10**4+1, -1)

        def __init__(self, **kw) -> None:
            """Make new node for interval merge tree.
            Takes runtime O(1) and memory O(1)."""
            self.from_parent = kw.get("from_parent", None)
            self.from_parent_as_right = kw.get("from_parent_as_right", 0)
            self.to_children_left = kw.get("to_children_left", None)
            self.to_children_right = kw.get("to_children_right", None)
            self.subtree_height = kw.get("subtree_height", 1)
            self.subtree_max = kw.get("subtree_max", 0)
            self.node_interval_lo = kw.get("node_interval_lo", -1)
            self.node_interval_hi = kw.get("node_interval_hi", -1)

        def interval_traversal(self) -> List[List[int]]:
            """Make traversal list in interval merge tree.
            Takes runtime O(N) and memory O(N)."""
            traversal_list = []
            stack = []; p = self  # iterative approach.
            while stack or p is not None:  # in-order (pointer-based).
                # 1. Discover left child
                while p is not None:
                    stack.append(p)
                    p = p.to_children_left
                # 2. Explore current node
                p = stack.pop()
                if ((interval := (p.node_interval_lo, p.node_interval_hi))
                != self.__class__.SENTINEL_INTERVAL):
                    traversal_list.append(interval)
                # 3. Discover right child
                p = p.to_children_right
            return traversal_list

        def interval_find(self, query_lo: int, query_hi: int) -> Optional[Self]:
            """Find a matching node (overlap) in interval merge tree.
            Takes runtime O(Log N) and memory O(1) on average."""
            stack = []; p = self  # iterative approach.
            while stack or p is not None:  # in-order (pointer-based).
                # 1. Discover left child.
                while p is not None:
                    stack.append(p)
                    if (p.to_children_left is not None
                    and p.to_children_left.subtree_max >= query_lo):
                        p = p.to_children_left
                    else:
                        p = None
                # 2. Explore current node.
                p = stack.pop()
                if ((node_interval := (p.node_interval_lo, p.node_interval_hi))
                != self.__class__.SENTINEL_INTERVAL
                and p.node_interval_lo <= query_hi
                and query_lo <= p.node_interval_hi):
                    return p
                # 3. Discover right child.
                if (p.to_children_right is not None
                and p.to_children_right.subtree_max >= query_lo):
                    p = p.to_children_right
                else:
                    p = None
            return None

        def interval_insert(self, other_node: Self) -> None:
            """Insert a new node in interval merge tree.
            Takes runtime O(Log N) and memory O(1) on average."""
            # 1. Remove any existing overlaps.
            merged_lo = other_node.node_interval_lo
            merged_hi = other_node.node_interval_hi
            while (p := self.interval_find(merged_lo, merged_hi)) is not None:
                merged_lo = min(merged_lo, p.node_interval_lo)
                merged_hi = max(merged_hi, p.node_interval_hi)
                p.interval_delete()
            other_node.node_interval_lo = merged_lo
            other_node.node_interval_hi = merged_hi
            other_node.balancing_node_metadata()
            # 2. Insert the expanded merge interval.
            p = self  # iterative approach.
            while p is not None:  # directional search (pointer-based).
                if merged_lo < p.node_interval_lo:
                    if p.to_children_left is None:
                        p.to_children_left = other_node
                        other_node.from_parent = p
                        other_node.from_parent_as_right = 0
                        break
                    p = p.to_children_left
                else:
                    if p.to_children_right is None:
                        p.to_children_right = other_node
                        other_node.from_parent = p
                        other_node.from_parent_as_right = 1
                        break
                    p = p.to_children_right
            # 3. Update metadata from bottom to top.
            p = other_node
            while p is not None:
                p.balancing_node_metadata()
                p.balancing_subtree_rotation()
                p = p.from_parent

        def interval_delete(self) -> None:
            """Delete an existing node in interval merge tree.
            Takes runtime O(Log N) and memory O(1) on average."""

            def _inorder_next(node) -> Optional[Self]:
                # 1A. Either search down from right child to find left leaf
                if node.to_children_right is not None:
                    p = node.to_children_right
                    while p.to_children_left is not None:
                        p = p.to_children_left
                    return p
                # 1B. Or search up from parent to find lowest ancestor
                # that parents the subtree as left child.
                else:
                    p = node
                    while p.from_parent is not None:
                        if p.from_parent_as_right == 0:
                            return p.from_parent
                        p = p.from_parent
                # 1C. Or return None
                return None

            parent, parent_as_right = self.from_parent, self.from_parent_as_right
            child_l, child_r = self.to_children_left, self.to_children_right
            update_from = None
            # 1A. Either remove the node if it has no child nodes.
            if child_l is None and child_r is None:
                if parent is not None:
                    if not parent_as_right:
                        parent.to_children_left = None
                    else:
                        parent.to_children_right = None
                    update_from = parent
                self.from_parent, self.from_parent_as_right = None, -1
            # 1B. Or replace it with its child if it has one child node.
            elif child_l is None or child_r is None:
                child = child_l if child_l is not None else child_r
                if parent is not None:
                    if not parent_as_right:
                        parent.to_children_left = child
                    else:
                        parent.to_children_right = child
                    child.from_parent = parent
                    child.from_parent_as_right = parent_as_right
                    update_from = parent
                self.from_parent, self.from_parent_as_right = None, -1
            # 1C. Or swap it with its successor if it has two child nodes.
            else:
                successor = _inorder_next(self)
                self.node_interval_lo = successor.node_interval_lo
                self.node_interval_hi = successor.node_interval_hi
                successor.interval_delete()  # updates metadata for self
                return
            # 2. Update metadata from bottom to top.
            p = update_from
            while p is not None:
                p.balancing_node_metadata()
                p.balancing_subtree_rotation()
                p = p.from_parent

        def balancing_node_metadata(self) -> None:
            """Update metadata in node for self-balancing BST.
            Takes runtime O(1) and memory O(1)."""
            child_l, child_r = self.to_children_left, self.to_children_right
            # 1. Update subtree height from children.
            height_l = 0 if child_l is None else child_l.subtree_height
            height_r = 0 if child_r is None else child_r.subtree_height
            self.subtree_height = 1 + max(height_l, height_r)
            # 2. Update subtree max from children.
            max_l = -1 if child_l is None else child_l.subtree_max
            max_r = -1 if child_r is None else child_r.subtree_max
            self.subtree_max = max(self.node_interval_hi, max_l, max_r)

        def balancing_subtree_rotation(self) -> None:
            """Update rotation in subtrees for self-balancing BST.
            Takes runtime O(1) and memory O(1)."""

            def _get_balance(node):
                child_l = node.to_children_left
                child_r = node.to_children_right
                # 1. Calculate balance from children.
                height_l = 0 if child_l is None else child_l.subtree_height
                height_r = 0 if child_r is None else child_r.subtree_height
                return height_l - height_r

            def _rotate_left(rotated_down):
                rotated_up = rotated_down.to_children_right
                parent = rotated_down.from_parent
                parent_as_right = rotated_down.from_parent_as_right
                # 1. Move rotated_up left to rotated_down right
                middle_subtree = rotated_up.to_children_left
                rotated_down.to_children_right = middle_subtree
                if middle_subtree is not None:
                    middle_subtree.from_parent = rotated_down
                    middle_subtree.from_parent_as_right = 1
                # 2. Promote rotated_up
                rotated_up.from_parent = parent
                rotated_up.from_parent_as_right = parent_as_right
                if parent is not None:
                    if not parent_as_right:
                        parent.to_children_left = rotated_up
                    else:
                        parent.to_children_right = rotated_up
                # 3. Demote rotated_down
                rotated_up.to_children_left = rotated_down
                rotated_down.from_parent = rotated_up
                rotated_down.from_parent_as_right = 0
                # 4. Update metadata from bottom to top
                rotated_down.balancing_node_metadata()
                rotated_up.balancing_node_metadata()

            def _rotate_right(rotated_down):
                rotated_up = rotated_down.to_children_left
                parent = rotated_down.from_parent
                parent_as_right = rotated_down.from_parent_as_right
                # 1. Move rotated_up right to rotated_down left
                middle_subtree = rotated_up.to_children_right
                rotated_down.to_children_left = middle_subtree
                if middle_subtree is not None:
                    middle_subtree.from_parent = rotated_down
                    middle_subtree.from_parent_as_right = 0
                # 2. Promote rotated_up
                rotated_up.from_parent = parent
                rotated_up.from_parent_as_right = parent_as_right
                if parent is not None:
                    if not parent_as_right:
                        parent.to_children_left = rotated_up
                    else:
                        parent.to_children_right = rotated_up
                # 3. Demote rotated_down
                rotated_up.to_children_right = rotated_down
                rotated_down.from_parent = rotated_up
                rotated_down.from_parent_as_right = 1
                # 4. Update metadata from bottom to top
                rotated_down.balancing_node_metadata()
                rotated_up.balancing_node_metadata()

            def _rotate_subtree(node):
                balance = _get_balance(node)
                child_l = node.to_children_left
                child_r = node.to_children_right
                balance_l = 0 if child_l is None else _get_balance(child_l)
                balance_r = 0 if child_r is None else _get_balance(child_r)
                # 1A. Either handle node insertion to left subtree of left child.
                # This is the "Left-Left Case".
                if balance_l == 1 and balance > 1:
                    _rotate_right(node)
                # 1B. Or handle node insertion to right subtree of right child.
                # This is the "Right-Right Case".
                elif balance_r == -1 and balance < -1:
                    _rotate_left(node)
                # 1C. Or handle node insertion to right subtree of left child.
                # This is the "Left-Right Case".
                elif balance_l == -1 and balance > 1:
                    _rotate_left(node.to_children_left)
                    _rotate_right(node)
                # 1D. Or handle node insertion to left subtree of right child.
                # This is the "Right-Left Case".
                elif balance_r == 1 and balance < -1:
                    _rotate_right(node.to_children_right)
                    _rotate_left(node)

            child_l = self.to_children_left
            child_r = self.to_children_right
            # 1. Update rotation in left subtree.
            if child_l is not None:
                _rotate_subtree(child_l)
            # 2. Update rotation in right subtree.
            if child_r is not None:
                _rotate_subtree(child_r)

    @validators.validate_args([None, check_intervals])
    def merge(self, intervals: List[List[int]]) -> List[List[int]]:
        """Entry point for `merge` api.
        Takes runtime O(N Log N) and memory O(N) for N items."""

        Node = self.__class__.IntervalMergeTreeAVLBSTNode  # type alias
        sentinel_lo, sentinel_hi = Node.SENTINEL_INTERVAL
        sent = Node(node_interval_lo=sentinel_lo,
            node_interval_hi=sentinel_hi, subtree_max=sentinel_hi)
        for lo, hi in intervals:
            sent.interval_insert(Node(node_interval_lo=lo, 
                node_interval_hi=hi, subtree_max=hi))
        traversal = sent.to_children_left.interval_traversal()
        return None if not traversal else sorted(traversal)  # sorting optional
Classes
IntervalMergeTreeAVLBSTNode
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class IntervalMergeTreeAVLBSTNode:
    SENTINEL_INTERVAL = (10**4+1, -1)

    def __init__(self, **kw) -> None:
        """Make new node for interval merge tree.
        Takes runtime O(1) and memory O(1)."""
        self.from_parent = kw.get("from_parent", None)
        self.from_parent_as_right = kw.get("from_parent_as_right", 0)
        self.to_children_left = kw.get("to_children_left", None)
        self.to_children_right = kw.get("to_children_right", None)
        self.subtree_height = kw.get("subtree_height", 1)
        self.subtree_max = kw.get("subtree_max", 0)
        self.node_interval_lo = kw.get("node_interval_lo", -1)
        self.node_interval_hi = kw.get("node_interval_hi", -1)

    def interval_traversal(self) -> List[List[int]]:
        """Make traversal list in interval merge tree.
        Takes runtime O(N) and memory O(N)."""
        traversal_list = []
        stack = []; p = self  # iterative approach.
        while stack or p is not None:  # in-order (pointer-based).
            # 1. Discover left child
            while p is not None:
                stack.append(p)
                p = p.to_children_left
            # 2. Explore current node
            p = stack.pop()
            if ((interval := (p.node_interval_lo, p.node_interval_hi))
            != self.__class__.SENTINEL_INTERVAL):
                traversal_list.append(interval)
            # 3. Discover right child
            p = p.to_children_right
        return traversal_list

    def interval_find(self, query_lo: int, query_hi: int) -> Optional[Self]:
        """Find a matching node (overlap) in interval merge tree.
        Takes runtime O(Log N) and memory O(1) on average."""
        stack = []; p = self  # iterative approach.
        while stack or p is not None:  # in-order (pointer-based).
            # 1. Discover left child.
            while p is not None:
                stack.append(p)
                if (p.to_children_left is not None
                and p.to_children_left.subtree_max >= query_lo):
                    p = p.to_children_left
                else:
                    p = None
            # 2. Explore current node.
            p = stack.pop()
            if ((node_interval := (p.node_interval_lo, p.node_interval_hi))
            != self.__class__.SENTINEL_INTERVAL
            and p.node_interval_lo <= query_hi
            and query_lo <= p.node_interval_hi):
                return p
            # 3. Discover right child.
            if (p.to_children_right is not None
            and p.to_children_right.subtree_max >= query_lo):
                p = p.to_children_right
            else:
                p = None
        return None

    def interval_insert(self, other_node: Self) -> None:
        """Insert a new node in interval merge tree.
        Takes runtime O(Log N) and memory O(1) on average."""
        # 1. Remove any existing overlaps.
        merged_lo = other_node.node_interval_lo
        merged_hi = other_node.node_interval_hi
        while (p := self.interval_find(merged_lo, merged_hi)) is not None:
            merged_lo = min(merged_lo, p.node_interval_lo)
            merged_hi = max(merged_hi, p.node_interval_hi)
            p.interval_delete()
        other_node.node_interval_lo = merged_lo
        other_node.node_interval_hi = merged_hi
        other_node.balancing_node_metadata()
        # 2. Insert the expanded merge interval.
        p = self  # iterative approach.
        while p is not None:  # directional search (pointer-based).
            if merged_lo < p.node_interval_lo:
                if p.to_children_left is None:
                    p.to_children_left = other_node
                    other_node.from_parent = p
                    other_node.from_parent_as_right = 0
                    break
                p = p.to_children_left
            else:
                if p.to_children_right is None:
                    p.to_children_right = other_node
                    other_node.from_parent = p
                    other_node.from_parent_as_right = 1
                    break
                p = p.to_children_right
        # 3. Update metadata from bottom to top.
        p = other_node
        while p is not None:
            p.balancing_node_metadata()
            p.balancing_subtree_rotation()
            p = p.from_parent

    def interval_delete(self) -> None:
        """Delete an existing node in interval merge tree.
        Takes runtime O(Log N) and memory O(1) on average."""

        def _inorder_next(node) -> Optional[Self]:
            # 1A. Either search down from right child to find left leaf
            if node.to_children_right is not None:
                p = node.to_children_right
                while p.to_children_left is not None:
                    p = p.to_children_left
                return p
            # 1B. Or search up from parent to find lowest ancestor
            # that parents the subtree as left child.
            else:
                p = node
                while p.from_parent is not None:
                    if p.from_parent_as_right == 0:
                        return p.from_parent
                    p = p.from_parent
            # 1C. Or return None
            return None

        parent, parent_as_right = self.from_parent, self.from_parent_as_right
        child_l, child_r = self.to_children_left, self.to_children_right
        update_from = None
        # 1A. Either remove the node if it has no child nodes.
        if child_l is None and child_r is None:
            if parent is not None:
                if not parent_as_right:
                    parent.to_children_left = None
                else:
                    parent.to_children_right = None
                update_from = parent
            self.from_parent, self.from_parent_as_right = None, -1
        # 1B. Or replace it with its child if it has one child node.
        elif child_l is None or child_r is None:
            child = child_l if child_l is not None else child_r
            if parent is not None:
                if not parent_as_right:
                    parent.to_children_left = child
                else:
                    parent.to_children_right = child
                child.from_parent = parent
                child.from_parent_as_right = parent_as_right
                update_from = parent
            self.from_parent, self.from_parent_as_right = None, -1
        # 1C. Or swap it with its successor if it has two child nodes.
        else:
            successor = _inorder_next(self)
            self.node_interval_lo = successor.node_interval_lo
            self.node_interval_hi = successor.node_interval_hi
            successor.interval_delete()  # updates metadata for self
            return
        # 2. Update metadata from bottom to top.
        p = update_from
        while p is not None:
            p.balancing_node_metadata()
            p.balancing_subtree_rotation()
            p = p.from_parent

    def balancing_node_metadata(self) -> None:
        """Update metadata in node for self-balancing BST.
        Takes runtime O(1) and memory O(1)."""
        child_l, child_r = self.to_children_left, self.to_children_right
        # 1. Update subtree height from children.
        height_l = 0 if child_l is None else child_l.subtree_height
        height_r = 0 if child_r is None else child_r.subtree_height
        self.subtree_height = 1 + max(height_l, height_r)
        # 2. Update subtree max from children.
        max_l = -1 if child_l is None else child_l.subtree_max
        max_r = -1 if child_r is None else child_r.subtree_max
        self.subtree_max = max(self.node_interval_hi, max_l, max_r)

    def balancing_subtree_rotation(self) -> None:
        """Update rotation in subtrees for self-balancing BST.
        Takes runtime O(1) and memory O(1)."""

        def _get_balance(node):
            child_l = node.to_children_left
            child_r = node.to_children_right
            # 1. Calculate balance from children.
            height_l = 0 if child_l is None else child_l.subtree_height
            height_r = 0 if child_r is None else child_r.subtree_height
            return height_l - height_r

        def _rotate_left(rotated_down):
            rotated_up = rotated_down.to_children_right
            parent = rotated_down.from_parent
            parent_as_right = rotated_down.from_parent_as_right
            # 1. Move rotated_up left to rotated_down right
            middle_subtree = rotated_up.to_children_left
            rotated_down.to_children_right = middle_subtree
            if middle_subtree is not None:
                middle_subtree.from_parent = rotated_down
                middle_subtree.from_parent_as_right = 1
            # 2. Promote rotated_up
            rotated_up.from_parent = parent
            rotated_up.from_parent_as_right = parent_as_right
            if parent is not None:
                if not parent_as_right:
                    parent.to_children_left = rotated_up
                else:
                    parent.to_children_right = rotated_up
            # 3. Demote rotated_down
            rotated_up.to_children_left = rotated_down
            rotated_down.from_parent = rotated_up
            rotated_down.from_parent_as_right = 0
            # 4. Update metadata from bottom to top
            rotated_down.balancing_node_metadata()
            rotated_up.balancing_node_metadata()

        def _rotate_right(rotated_down):
            rotated_up = rotated_down.to_children_left
            parent = rotated_down.from_parent
            parent_as_right = rotated_down.from_parent_as_right
            # 1. Move rotated_up right to rotated_down left
            middle_subtree = rotated_up.to_children_right
            rotated_down.to_children_left = middle_subtree
            if middle_subtree is not None:
                middle_subtree.from_parent = rotated_down
                middle_subtree.from_parent_as_right = 0
            # 2. Promote rotated_up
            rotated_up.from_parent = parent
            rotated_up.from_parent_as_right = parent_as_right
            if parent is not None:
                if not parent_as_right:
                    parent.to_children_left = rotated_up
                else:
                    parent.to_children_right = rotated_up
            # 3. Demote rotated_down
            rotated_up.to_children_right = rotated_down
            rotated_down.from_parent = rotated_up
            rotated_down.from_parent_as_right = 1
            # 4. Update metadata from bottom to top
            rotated_down.balancing_node_metadata()
            rotated_up.balancing_node_metadata()

        def _rotate_subtree(node):
            balance = _get_balance(node)
            child_l = node.to_children_left
            child_r = node.to_children_right
            balance_l = 0 if child_l is None else _get_balance(child_l)
            balance_r = 0 if child_r is None else _get_balance(child_r)
            # 1A. Either handle node insertion to left subtree of left child.
            # This is the "Left-Left Case".
            if balance_l == 1 and balance > 1:
                _rotate_right(node)
            # 1B. Or handle node insertion to right subtree of right child.
            # This is the "Right-Right Case".
            elif balance_r == -1 and balance < -1:
                _rotate_left(node)
            # 1C. Or handle node insertion to right subtree of left child.
            # This is the "Left-Right Case".
            elif balance_l == -1 and balance > 1:
                _rotate_left(node.to_children_left)
                _rotate_right(node)
            # 1D. Or handle node insertion to left subtree of right child.
            # This is the "Right-Left Case".
            elif balance_r == 1 and balance < -1:
                _rotate_right(node.to_children_right)
                _rotate_left(node)

        child_l = self.to_children_left
        child_r = self.to_children_right
        # 1. Update rotation in left subtree.
        if child_l is not None:
            _rotate_subtree(child_l)
        # 2. Update rotation in right subtree.
        if child_r is not None:
            _rotate_subtree(child_r)
Functions
__init__(**kw)

Make new node for interval merge tree. Takes runtime O(1) and memory O(1).

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, **kw) -> None:
    """Make new node for interval merge tree.
    Takes runtime O(1) and memory O(1)."""
    self.from_parent = kw.get("from_parent", None)
    self.from_parent_as_right = kw.get("from_parent_as_right", 0)
    self.to_children_left = kw.get("to_children_left", None)
    self.to_children_right = kw.get("to_children_right", None)
    self.subtree_height = kw.get("subtree_height", 1)
    self.subtree_max = kw.get("subtree_max", 0)
    self.node_interval_lo = kw.get("node_interval_lo", -1)
    self.node_interval_hi = kw.get("node_interval_hi", -1)
balancing_node_metadata()

Update metadata in node for self-balancing BST. Takes runtime O(1) and memory O(1).

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
175
176
177
178
179
180
181
182
183
184
185
186
def balancing_node_metadata(self) -> None:
    """Update metadata in node for self-balancing BST.
    Takes runtime O(1) and memory O(1)."""
    child_l, child_r = self.to_children_left, self.to_children_right
    # 1. Update subtree height from children.
    height_l = 0 if child_l is None else child_l.subtree_height
    height_r = 0 if child_r is None else child_r.subtree_height
    self.subtree_height = 1 + max(height_l, height_r)
    # 2. Update subtree max from children.
    max_l = -1 if child_l is None else child_l.subtree_max
    max_r = -1 if child_r is None else child_r.subtree_max
    self.subtree_max = max(self.node_interval_hi, max_l, max_r)
balancing_subtree_rotation()

Update rotation in subtrees for self-balancing BST. Takes runtime O(1) and memory O(1).

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def balancing_subtree_rotation(self) -> None:
    """Update rotation in subtrees for self-balancing BST.
    Takes runtime O(1) and memory O(1)."""

    def _get_balance(node):
        child_l = node.to_children_left
        child_r = node.to_children_right
        # 1. Calculate balance from children.
        height_l = 0 if child_l is None else child_l.subtree_height
        height_r = 0 if child_r is None else child_r.subtree_height
        return height_l - height_r

    def _rotate_left(rotated_down):
        rotated_up = rotated_down.to_children_right
        parent = rotated_down.from_parent
        parent_as_right = rotated_down.from_parent_as_right
        # 1. Move rotated_up left to rotated_down right
        middle_subtree = rotated_up.to_children_left
        rotated_down.to_children_right = middle_subtree
        if middle_subtree is not None:
            middle_subtree.from_parent = rotated_down
            middle_subtree.from_parent_as_right = 1
        # 2. Promote rotated_up
        rotated_up.from_parent = parent
        rotated_up.from_parent_as_right = parent_as_right
        if parent is not None:
            if not parent_as_right:
                parent.to_children_left = rotated_up
            else:
                parent.to_children_right = rotated_up
        # 3. Demote rotated_down
        rotated_up.to_children_left = rotated_down
        rotated_down.from_parent = rotated_up
        rotated_down.from_parent_as_right = 0
        # 4. Update metadata from bottom to top
        rotated_down.balancing_node_metadata()
        rotated_up.balancing_node_metadata()

    def _rotate_right(rotated_down):
        rotated_up = rotated_down.to_children_left
        parent = rotated_down.from_parent
        parent_as_right = rotated_down.from_parent_as_right
        # 1. Move rotated_up right to rotated_down left
        middle_subtree = rotated_up.to_children_right
        rotated_down.to_children_left = middle_subtree
        if middle_subtree is not None:
            middle_subtree.from_parent = rotated_down
            middle_subtree.from_parent_as_right = 0
        # 2. Promote rotated_up
        rotated_up.from_parent = parent
        rotated_up.from_parent_as_right = parent_as_right
        if parent is not None:
            if not parent_as_right:
                parent.to_children_left = rotated_up
            else:
                parent.to_children_right = rotated_up
        # 3. Demote rotated_down
        rotated_up.to_children_right = rotated_down
        rotated_down.from_parent = rotated_up
        rotated_down.from_parent_as_right = 1
        # 4. Update metadata from bottom to top
        rotated_down.balancing_node_metadata()
        rotated_up.balancing_node_metadata()

    def _rotate_subtree(node):
        balance = _get_balance(node)
        child_l = node.to_children_left
        child_r = node.to_children_right
        balance_l = 0 if child_l is None else _get_balance(child_l)
        balance_r = 0 if child_r is None else _get_balance(child_r)
        # 1A. Either handle node insertion to left subtree of left child.
        # This is the "Left-Left Case".
        if balance_l == 1 and balance > 1:
            _rotate_right(node)
        # 1B. Or handle node insertion to right subtree of right child.
        # This is the "Right-Right Case".
        elif balance_r == -1 and balance < -1:
            _rotate_left(node)
        # 1C. Or handle node insertion to right subtree of left child.
        # This is the "Left-Right Case".
        elif balance_l == -1 and balance > 1:
            _rotate_left(node.to_children_left)
            _rotate_right(node)
        # 1D. Or handle node insertion to left subtree of right child.
        # This is the "Right-Left Case".
        elif balance_r == 1 and balance < -1:
            _rotate_right(node.to_children_right)
            _rotate_left(node)

    child_l = self.to_children_left
    child_r = self.to_children_right
    # 1. Update rotation in left subtree.
    if child_l is not None:
        _rotate_subtree(child_l)
    # 2. Update rotation in right subtree.
    if child_r is not None:
        _rotate_subtree(child_r)
interval_delete()

Delete an existing node in interval merge tree. Takes runtime O(Log N) and memory O(1) on average.

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def interval_delete(self) -> None:
    """Delete an existing node in interval merge tree.
    Takes runtime O(Log N) and memory O(1) on average."""

    def _inorder_next(node) -> Optional[Self]:
        # 1A. Either search down from right child to find left leaf
        if node.to_children_right is not None:
            p = node.to_children_right
            while p.to_children_left is not None:
                p = p.to_children_left
            return p
        # 1B. Or search up from parent to find lowest ancestor
        # that parents the subtree as left child.
        else:
            p = node
            while p.from_parent is not None:
                if p.from_parent_as_right == 0:
                    return p.from_parent
                p = p.from_parent
        # 1C. Or return None
        return None

    parent, parent_as_right = self.from_parent, self.from_parent_as_right
    child_l, child_r = self.to_children_left, self.to_children_right
    update_from = None
    # 1A. Either remove the node if it has no child nodes.
    if child_l is None and child_r is None:
        if parent is not None:
            if not parent_as_right:
                parent.to_children_left = None
            else:
                parent.to_children_right = None
            update_from = parent
        self.from_parent, self.from_parent_as_right = None, -1
    # 1B. Or replace it with its child if it has one child node.
    elif child_l is None or child_r is None:
        child = child_l if child_l is not None else child_r
        if parent is not None:
            if not parent_as_right:
                parent.to_children_left = child
            else:
                parent.to_children_right = child
            child.from_parent = parent
            child.from_parent_as_right = parent_as_right
            update_from = parent
        self.from_parent, self.from_parent_as_right = None, -1
    # 1C. Or swap it with its successor if it has two child nodes.
    else:
        successor = _inorder_next(self)
        self.node_interval_lo = successor.node_interval_lo
        self.node_interval_hi = successor.node_interval_hi
        successor.interval_delete()  # updates metadata for self
        return
    # 2. Update metadata from bottom to top.
    p = update_from
    while p is not None:
        p.balancing_node_metadata()
        p.balancing_subtree_rotation()
        p = p.from_parent
interval_find(query_lo, query_hi)

Find a matching node (overlap) in interval merge tree. Takes runtime O(Log N) and memory O(1) on average.

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def interval_find(self, query_lo: int, query_hi: int) -> Optional[Self]:
    """Find a matching node (overlap) in interval merge tree.
    Takes runtime O(Log N) and memory O(1) on average."""
    stack = []; p = self  # iterative approach.
    while stack or p is not None:  # in-order (pointer-based).
        # 1. Discover left child.
        while p is not None:
            stack.append(p)
            if (p.to_children_left is not None
            and p.to_children_left.subtree_max >= query_lo):
                p = p.to_children_left
            else:
                p = None
        # 2. Explore current node.
        p = stack.pop()
        if ((node_interval := (p.node_interval_lo, p.node_interval_hi))
        != self.__class__.SENTINEL_INTERVAL
        and p.node_interval_lo <= query_hi
        and query_lo <= p.node_interval_hi):
            return p
        # 3. Discover right child.
        if (p.to_children_right is not None
        and p.to_children_right.subtree_max >= query_lo):
            p = p.to_children_right
        else:
            p = None
    return None
interval_insert(other_node)

Insert a new node in interval merge tree. Takes runtime O(Log N) and memory O(1) on average.

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def interval_insert(self, other_node: Self) -> None:
    """Insert a new node in interval merge tree.
    Takes runtime O(Log N) and memory O(1) on average."""
    # 1. Remove any existing overlaps.
    merged_lo = other_node.node_interval_lo
    merged_hi = other_node.node_interval_hi
    while (p := self.interval_find(merged_lo, merged_hi)) is not None:
        merged_lo = min(merged_lo, p.node_interval_lo)
        merged_hi = max(merged_hi, p.node_interval_hi)
        p.interval_delete()
    other_node.node_interval_lo = merged_lo
    other_node.node_interval_hi = merged_hi
    other_node.balancing_node_metadata()
    # 2. Insert the expanded merge interval.
    p = self  # iterative approach.
    while p is not None:  # directional search (pointer-based).
        if merged_lo < p.node_interval_lo:
            if p.to_children_left is None:
                p.to_children_left = other_node
                other_node.from_parent = p
                other_node.from_parent_as_right = 0
                break
            p = p.to_children_left
        else:
            if p.to_children_right is None:
                p.to_children_right = other_node
                other_node.from_parent = p
                other_node.from_parent_as_right = 1
                break
            p = p.to_children_right
    # 3. Update metadata from bottom to top.
    p = other_node
    while p is not None:
        p.balancing_node_metadata()
        p.balancing_subtree_rotation()
        p = p.from_parent
interval_traversal()

Make traversal list in interval merge tree. Takes runtime O(N) and memory O(N).

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def interval_traversal(self) -> List[List[int]]:
    """Make traversal list in interval merge tree.
    Takes runtime O(N) and memory O(N)."""
    traversal_list = []
    stack = []; p = self  # iterative approach.
    while stack or p is not None:  # in-order (pointer-based).
        # 1. Discover left child
        while p is not None:
            stack.append(p)
            p = p.to_children_left
        # 2. Explore current node
        p = stack.pop()
        if ((interval := (p.node_interval_lo, p.node_interval_hi))
        != self.__class__.SENTINEL_INTERVAL):
            traversal_list.append(interval)
        # 3. Discover right child
        p = p.to_children_right
    return traversal_list
Functions
merge(intervals)

Entry point for merge api. Takes runtime O(N Log N) and memory O(N) for N items.

Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
@validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
    """Entry point for `merge` api.
    Takes runtime O(N Log N) and memory O(N) for N items."""

    Node = self.__class__.IntervalMergeTreeAVLBSTNode  # type alias
    sentinel_lo, sentinel_hi = Node.SENTINEL_INTERVAL
    sent = Node(node_interval_lo=sentinel_lo,
        node_interval_hi=sentinel_hi, subtree_max=sentinel_hi)
    for lo, hi in intervals:
        sent.interval_insert(Node(node_interval_lo=lo, 
            node_interval_hi=hi, subtree_max=hi))
    traversal = sent.to_children_left.interval_traversal()
    return None if not traversal else sorted(traversal)  # sorting optional