Dynamic Interval Management

1. Design Summary
The theoretical lower bound for merging \(N\) static intervals is \(\Omega(N \log N)\) due to the sorting requirement. By focusing on interval edges rather than the source intervals themselves, we can focus on efficient algorithms with compact implementation. However, in Online Streaming Environments, the task shifts from batch data loading (static intervals) to streaming data ingestion (dynamic intervals).

📊 Complexity Analysis
| Operation |
Batch Variants (v1/v2) |
Streaming Variant (v3) |
Comparison |
| Initial Setup |
\(O(N \log N)\) |
\(O(N \log N)\) |
Equivalent |
| Batch Updates (N) |
\(O(N \log N)\) |
\(O(N \log N)\) |
Equivalent |
| Streaming Updates (1) |
\(O(N \log N)\) |
\(O(\log N)\) |
Streaming Variant is superior |
| Space Complexity |
\(O(N)\) |
\(O(N)\) |
Equivalent |
🚀 The Pruning Logic (Visual)
The advantage of an Augmented AVL Interval Tree is its ability to prune search branches that cannot contain an overlap, ensuring \(O(\log N)\) search time.
graph TD
Start(["Search for Overlap with Query [7, 9]"]) --> Root
Root{"Node: [5, 8]<br/>Subtree Max: 20"} -- "Overlap Check" --> Match
Root -- "Check Left Subtree" --> LeftCheck{"Left Subtree Max >= 7?"}
Match{"Overlap Found!<br/>5 <= 9 AND 7 <= 8"}
LeftCheck -- "No (e.g., Max=6)" --> PruneLeft["<b>PRUNE:</b> Skip Left Branch"]
LeftCheck -- "Yes" --> SearchLeft[Recurse Left]
Root -- "Check Right Subtree" --> RightCheck{"Node Lo <= 9?"}
RightCheck -- "No (e.g., Lo=15)" --> PruneRight["<b>PRUNE:</b> Skip Right Branch"]
RightCheck -- "Yes" --> SearchRight[Recurse Right]
style PruneLeft fill:#ffcccc,stroke:#333
style PruneRight fill:#ffcccc,stroke:#333
style Match fill:#ccffcc,stroke:#333
2. Design Approaches
Optimal Approaches for Batch Data (Static Intervals)
- Sweep-line (Variant 1): Uses a sorted list of edge objects and a counter.
- Complexity (Batch): \(O(N \log N)\) setup, \(O(N)\) calculation.
- Complexity (Streaming): \(O(N \log N)\) per
insert/put operation.
- Verdict: Suitable for batch case due to second-best ~16ms benchmark with static intervals impacted by object-overhead during calculation. Not suitable for streaming case due to high complexity.
- Two-Pointer Search (Variant 2): Uses two sorted edge arrays and two pointers.
- Complexity (Batch): \(O(N \log N)\) setup, \(O(N)\) calculation.
- Complexity (Streaming): \(O(N \log N)\) per
insert/put operation.
- Verdict: Suitable for batch case due to best-in-class ~7ms benchmark with static intervals. Not suitable for streaming case due to high complexity.
Optimal Approaches for Streaming Data (Dynamic Intervals)
- Augmented AVL Interval Tree (Variant 3): A stateful, self-balancing BST.
- Complexity (Batch): \(O(N \log N)\) setup, \(O(N)\) calculation.
- Complexity (Streaming): \(O(\log N)\) per
insert/put operation.
- Verdict: Not suitable for batch case due to poor ~4130ms benchmark with static intervals impacted by object-overhead during calculation. Suitable for streaming case due to best-in-class complexity per
insert/put operation. Notice the optimal approaches for batch data merging are easier to maintain, with 90% fewer LOC (~25 vs ~250)!
3. Design Trade-offs
The Efficiency Paradox
In benchmarks with static intervals, the Two-Pointer Search (Variant 2) was 400x faster than the Interval Tree (Variant 3). However, in Online Streaming Environments, the Two-Pointer Search overall complexity degrades to \(O(N^2 \log N)\) over a series of \(N\) updates while the Interval Tree overall complexity maintains \(O(N \log N)\).
Streaming Benchmark Results
In benchmarks with dynamic intervals up to \(N=3000\), the Interval Tree (Variant 3) was as much as 50x faster than the Two-Pointer Search (Variant 2). While the Two-Pointer Search (Variant 2) is faster for a single batch run, it fails to scale in an Online Streaming Environment where the merged state must be updated for every new interval. As \(N\) increases, the Two-Pointer Search runtime explodes quadratically while the Interval Tree runtime continues linearly.
| Input Size (N) |
Batch-Style (s) |
Tree-Style (s) |
Speedup |
| 100 |
0.0015 |
0.0021 |
0.7x |
| 500 |
0.0359 |
0.0140 |
2.6x |
| 1000 |
0.1501 |
0.0230 |
6.5x |
| 2000 |
0.6743 |
0.0383 |
17.6x |
| 3000 |
1.6093 |
0.0318 |
50.6x |
4. Design Optimizations
By augmenting the BST with subtree_max metadata, I reduced the search-and-merge task from a linear scan to a logarithmic search. This allows the streaming platform to maintain a "Live" merged state with \(O(\log N)\) latency per request.
Invariant-Driven Correctness
To ensure system reliability during high-frequency updates, I prioritized Correctness by Design by explicitly maintaining the AVL balance and metadata invariants during every mutation (insert, delete).
Architectural Hardening
By implementing a Sentinel Node Architecture with a Two-Down AVL balancing subtree rotation strategy and removing the need for sentinel node checks throughout the codebase, I simplified the balancing feature and provided a cleaner and more maintainable implementation.
Python Object-Model Tuning
To reduce object-overhead due to the Python object-model, I implemented Attribute Flattening. After replacing packed collections (self.interval[0]) with discrete integers (self.lo, self.hi), I reduced constant factors in the hot execution path and achieved a 15% reduction in benchmarks with static intervals.
5. API Reference
data_streaming_accelerators.core.dynamic_interval_management
Classes
DynamicIntervalManagementV1
Bases: DynamicIntervalManagementBase
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366 | class DynamicIntervalManagementV1(DynamicIntervalManagementBase):
@validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
"""Entry point for `merge` api.
Takes runtime O(N Log N) and memory O(N) for N items."""
n = len(intervals)
sorted_edge_weights = [None] * (2*n)
for i, (lo, hi) in enumerate(intervals):
sorted_edge_weights[2*i] = (lo, -1)
sorted_edge_weights[2*i+1] = (hi, 1)
sorted_edge_weights.sort() # sweep-line approach
merged_intervals = [None] * n; count_intervals = 0
interval_lo, interval_weight = 0, 0
for edge, weight in sorted_edge_weights:
if weight == -1 and interval_weight == 0:
interval_lo = edge
interval_weight += weight
if weight == 1 and interval_weight == 0:
merged_intervals[count_intervals] = (interval_lo, edge)
count_intervals += 1
return merged_intervals[:count_intervals]
|
Functions
merge(intervals)
Entry point for merge api.
Takes runtime O(N Log N) and memory O(N) for N items.
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366 | @validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
"""Entry point for `merge` api.
Takes runtime O(N Log N) and memory O(N) for N items."""
n = len(intervals)
sorted_edge_weights = [None] * (2*n)
for i, (lo, hi) in enumerate(intervals):
sorted_edge_weights[2*i] = (lo, -1)
sorted_edge_weights[2*i+1] = (hi, 1)
sorted_edge_weights.sort() # sweep-line approach
merged_intervals = [None] * n; count_intervals = 0
interval_lo, interval_weight = 0, 0
for edge, weight in sorted_edge_weights:
if weight == -1 and interval_weight == 0:
interval_lo = edge
interval_weight += weight
if weight == 1 and interval_weight == 0:
merged_intervals[count_intervals] = (interval_lo, edge)
count_intervals += 1
return merged_intervals[:count_intervals]
|
DynamicIntervalManagementV2
Bases: DynamicIntervalManagementBase
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340 | class DynamicIntervalManagementV2(DynamicIntervalManagementBase):
@validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
"""Entry point for `merge` api.
Takes runtime O(N Log N) and memory O(N) for N items."""
n = len(intervals)
sorted_lo_edges = [None] * n
for i, (lo, _) in enumerate(intervals):
sorted_lo_edges[i] = lo
sorted_lo_edges.sort()
sorted_hi_edges = [None] * n
for i, (_, hi) in enumerate(intervals):
sorted_hi_edges[i] = hi
sorted_hi_edges.sort()
merged_intervals = [None] * n; count_intervals = 0
interval_lo, interval_weight = 0, 0
p1, p2 = 0, 0 # two-pointer search
while p1 < n and p2 < n:
lo, hi = sorted_lo_edges[p1], sorted_hi_edges[p2]
if lo <= hi:
if interval_weight == 0:
interval_lo = lo
interval_weight -= 1; p1 += 1
else:
interval_weight += 1; p2 += 1
if interval_weight == 0:
merged_intervals[count_intervals] = (interval_lo, hi)
count_intervals += 1
while p2 < n:
lo, hi = sorted_lo_edges[-1], sorted_hi_edges[p2]
interval_weight += 1; p2 += 1
if interval_weight == 0:
merged_intervals[count_intervals] = (interval_lo, hi)
count_intervals += 1
return merged_intervals[:count_intervals]
|
Functions
merge(intervals)
Entry point for merge api.
Takes runtime O(N Log N) and memory O(N) for N items.
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340 | @validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
"""Entry point for `merge` api.
Takes runtime O(N Log N) and memory O(N) for N items."""
n = len(intervals)
sorted_lo_edges = [None] * n
for i, (lo, _) in enumerate(intervals):
sorted_lo_edges[i] = lo
sorted_lo_edges.sort()
sorted_hi_edges = [None] * n
for i, (_, hi) in enumerate(intervals):
sorted_hi_edges[i] = hi
sorted_hi_edges.sort()
merged_intervals = [None] * n; count_intervals = 0
interval_lo, interval_weight = 0, 0
p1, p2 = 0, 0 # two-pointer search
while p1 < n and p2 < n:
lo, hi = sorted_lo_edges[p1], sorted_hi_edges[p2]
if lo <= hi:
if interval_weight == 0:
interval_lo = lo
interval_weight -= 1; p1 += 1
else:
interval_weight += 1; p2 += 1
if interval_weight == 0:
merged_intervals[count_intervals] = (interval_lo, hi)
count_intervals += 1
while p2 < n:
lo, hi = sorted_lo_edges[-1], sorted_hi_edges[p2]
interval_weight += 1; p2 += 1
if interval_weight == 0:
merged_intervals[count_intervals] = (interval_lo, hi)
count_intervals += 1
return merged_intervals[:count_intervals]
|
DynamicIntervalManagementV3
Bases: DynamicIntervalManagementBase
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299 | class DynamicIntervalManagementV3(DynamicIntervalManagementBase):
class IntervalMergeTreeAVLBSTNode:
SENTINEL_INTERVAL = (10**4+1, -1)
def __init__(self, **kw) -> None:
"""Make new node for interval merge tree.
Takes runtime O(1) and memory O(1)."""
self.from_parent = kw.get("from_parent", None)
self.from_parent_as_right = kw.get("from_parent_as_right", 0)
self.to_children_left = kw.get("to_children_left", None)
self.to_children_right = kw.get("to_children_right", None)
self.subtree_height = kw.get("subtree_height", 1)
self.subtree_max = kw.get("subtree_max", 0)
self.node_interval_lo = kw.get("node_interval_lo", -1)
self.node_interval_hi = kw.get("node_interval_hi", -1)
def interval_traversal(self) -> List[List[int]]:
"""Make traversal list in interval merge tree.
Takes runtime O(N) and memory O(N)."""
traversal_list = []
stack = []; p = self # iterative approach.
while stack or p is not None: # in-order (pointer-based).
# 1. Discover left child
while p is not None:
stack.append(p)
p = p.to_children_left
# 2. Explore current node
p = stack.pop()
if ((interval := (p.node_interval_lo, p.node_interval_hi))
!= self.__class__.SENTINEL_INTERVAL):
traversal_list.append(interval)
# 3. Discover right child
p = p.to_children_right
return traversal_list
def interval_find(self, query_lo: int, query_hi: int) -> Optional[Self]:
"""Find a matching node (overlap) in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
stack = []; p = self # iterative approach.
while stack or p is not None: # in-order (pointer-based).
# 1. Discover left child.
while p is not None:
stack.append(p)
if (p.to_children_left is not None
and p.to_children_left.subtree_max >= query_lo):
p = p.to_children_left
else:
p = None
# 2. Explore current node.
p = stack.pop()
if ((node_interval := (p.node_interval_lo, p.node_interval_hi))
!= self.__class__.SENTINEL_INTERVAL
and p.node_interval_lo <= query_hi
and query_lo <= p.node_interval_hi):
return p
# 3. Discover right child.
if (p.to_children_right is not None
and p.to_children_right.subtree_max >= query_lo):
p = p.to_children_right
else:
p = None
return None
def interval_insert(self, other_node: Self) -> None:
"""Insert a new node in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
# 1. Remove any existing overlaps.
merged_lo = other_node.node_interval_lo
merged_hi = other_node.node_interval_hi
while (p := self.interval_find(merged_lo, merged_hi)) is not None:
merged_lo = min(merged_lo, p.node_interval_lo)
merged_hi = max(merged_hi, p.node_interval_hi)
p.interval_delete()
other_node.node_interval_lo = merged_lo
other_node.node_interval_hi = merged_hi
other_node.balancing_node_metadata()
# 2. Insert the expanded merge interval.
p = self # iterative approach.
while p is not None: # directional search (pointer-based).
if merged_lo < p.node_interval_lo:
if p.to_children_left is None:
p.to_children_left = other_node
other_node.from_parent = p
other_node.from_parent_as_right = 0
break
p = p.to_children_left
else:
if p.to_children_right is None:
p.to_children_right = other_node
other_node.from_parent = p
other_node.from_parent_as_right = 1
break
p = p.to_children_right
# 3. Update metadata from bottom to top.
p = other_node
while p is not None:
p.balancing_node_metadata()
p.balancing_subtree_rotation()
p = p.from_parent
def interval_delete(self) -> None:
"""Delete an existing node in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
def _inorder_next(node) -> Optional[Self]:
# 1A. Either search down from right child to find left leaf
if node.to_children_right is not None:
p = node.to_children_right
while p.to_children_left is not None:
p = p.to_children_left
return p
# 1B. Or search up from parent to find lowest ancestor
# that parents the subtree as left child.
else:
p = node
while p.from_parent is not None:
if p.from_parent_as_right == 0:
return p.from_parent
p = p.from_parent
# 1C. Or return None
return None
parent, parent_as_right = self.from_parent, self.from_parent_as_right
child_l, child_r = self.to_children_left, self.to_children_right
update_from = None
# 1A. Either remove the node if it has no child nodes.
if child_l is None and child_r is None:
if parent is not None:
if not parent_as_right:
parent.to_children_left = None
else:
parent.to_children_right = None
update_from = parent
self.from_parent, self.from_parent_as_right = None, -1
# 1B. Or replace it with its child if it has one child node.
elif child_l is None or child_r is None:
child = child_l if child_l is not None else child_r
if parent is not None:
if not parent_as_right:
parent.to_children_left = child
else:
parent.to_children_right = child
child.from_parent = parent
child.from_parent_as_right = parent_as_right
update_from = parent
self.from_parent, self.from_parent_as_right = None, -1
# 1C. Or swap it with its successor if it has two child nodes.
else:
successor = _inorder_next(self)
self.node_interval_lo = successor.node_interval_lo
self.node_interval_hi = successor.node_interval_hi
successor.interval_delete() # updates metadata for self
return
# 2. Update metadata from bottom to top.
p = update_from
while p is not None:
p.balancing_node_metadata()
p.balancing_subtree_rotation()
p = p.from_parent
def balancing_node_metadata(self) -> None:
"""Update metadata in node for self-balancing BST.
Takes runtime O(1) and memory O(1)."""
child_l, child_r = self.to_children_left, self.to_children_right
# 1. Update subtree height from children.
height_l = 0 if child_l is None else child_l.subtree_height
height_r = 0 if child_r is None else child_r.subtree_height
self.subtree_height = 1 + max(height_l, height_r)
# 2. Update subtree max from children.
max_l = -1 if child_l is None else child_l.subtree_max
max_r = -1 if child_r is None else child_r.subtree_max
self.subtree_max = max(self.node_interval_hi, max_l, max_r)
def balancing_subtree_rotation(self) -> None:
"""Update rotation in subtrees for self-balancing BST.
Takes runtime O(1) and memory O(1)."""
def _get_balance(node):
child_l = node.to_children_left
child_r = node.to_children_right
# 1. Calculate balance from children.
height_l = 0 if child_l is None else child_l.subtree_height
height_r = 0 if child_r is None else child_r.subtree_height
return height_l - height_r
def _rotate_left(rotated_down):
rotated_up = rotated_down.to_children_right
parent = rotated_down.from_parent
parent_as_right = rotated_down.from_parent_as_right
# 1. Move rotated_up left to rotated_down right
middle_subtree = rotated_up.to_children_left
rotated_down.to_children_right = middle_subtree
if middle_subtree is not None:
middle_subtree.from_parent = rotated_down
middle_subtree.from_parent_as_right = 1
# 2. Promote rotated_up
rotated_up.from_parent = parent
rotated_up.from_parent_as_right = parent_as_right
if parent is not None:
if not parent_as_right:
parent.to_children_left = rotated_up
else:
parent.to_children_right = rotated_up
# 3. Demote rotated_down
rotated_up.to_children_left = rotated_down
rotated_down.from_parent = rotated_up
rotated_down.from_parent_as_right = 0
# 4. Update metadata from bottom to top
rotated_down.balancing_node_metadata()
rotated_up.balancing_node_metadata()
def _rotate_right(rotated_down):
rotated_up = rotated_down.to_children_left
parent = rotated_down.from_parent
parent_as_right = rotated_down.from_parent_as_right
# 1. Move rotated_up right to rotated_down left
middle_subtree = rotated_up.to_children_right
rotated_down.to_children_left = middle_subtree
if middle_subtree is not None:
middle_subtree.from_parent = rotated_down
middle_subtree.from_parent_as_right = 0
# 2. Promote rotated_up
rotated_up.from_parent = parent
rotated_up.from_parent_as_right = parent_as_right
if parent is not None:
if not parent_as_right:
parent.to_children_left = rotated_up
else:
parent.to_children_right = rotated_up
# 3. Demote rotated_down
rotated_up.to_children_right = rotated_down
rotated_down.from_parent = rotated_up
rotated_down.from_parent_as_right = 1
# 4. Update metadata from bottom to top
rotated_down.balancing_node_metadata()
rotated_up.balancing_node_metadata()
def _rotate_subtree(node):
balance = _get_balance(node)
child_l = node.to_children_left
child_r = node.to_children_right
balance_l = 0 if child_l is None else _get_balance(child_l)
balance_r = 0 if child_r is None else _get_balance(child_r)
# 1A. Either handle node insertion to left subtree of left child.
# This is the "Left-Left Case".
if balance_l == 1 and balance > 1:
_rotate_right(node)
# 1B. Or handle node insertion to right subtree of right child.
# This is the "Right-Right Case".
elif balance_r == -1 and balance < -1:
_rotate_left(node)
# 1C. Or handle node insertion to right subtree of left child.
# This is the "Left-Right Case".
elif balance_l == -1 and balance > 1:
_rotate_left(node.to_children_left)
_rotate_right(node)
# 1D. Or handle node insertion to left subtree of right child.
# This is the "Right-Left Case".
elif balance_r == 1 and balance < -1:
_rotate_right(node.to_children_right)
_rotate_left(node)
child_l = self.to_children_left
child_r = self.to_children_right
# 1. Update rotation in left subtree.
if child_l is not None:
_rotate_subtree(child_l)
# 2. Update rotation in right subtree.
if child_r is not None:
_rotate_subtree(child_r)
@validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
"""Entry point for `merge` api.
Takes runtime O(N Log N) and memory O(N) for N items."""
Node = self.__class__.IntervalMergeTreeAVLBSTNode # type alias
sentinel_lo, sentinel_hi = Node.SENTINEL_INTERVAL
sent = Node(node_interval_lo=sentinel_lo,
node_interval_hi=sentinel_hi, subtree_max=sentinel_hi)
for lo, hi in intervals:
sent.interval_insert(Node(node_interval_lo=lo,
node_interval_hi=hi, subtree_max=hi))
traversal = sent.to_children_left.interval_traversal()
return None if not traversal else sorted(traversal) # sorting optional
|
Classes
IntervalMergeTreeAVLBSTNode
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 | class IntervalMergeTreeAVLBSTNode:
SENTINEL_INTERVAL = (10**4+1, -1)
def __init__(self, **kw) -> None:
"""Make new node for interval merge tree.
Takes runtime O(1) and memory O(1)."""
self.from_parent = kw.get("from_parent", None)
self.from_parent_as_right = kw.get("from_parent_as_right", 0)
self.to_children_left = kw.get("to_children_left", None)
self.to_children_right = kw.get("to_children_right", None)
self.subtree_height = kw.get("subtree_height", 1)
self.subtree_max = kw.get("subtree_max", 0)
self.node_interval_lo = kw.get("node_interval_lo", -1)
self.node_interval_hi = kw.get("node_interval_hi", -1)
def interval_traversal(self) -> List[List[int]]:
"""Make traversal list in interval merge tree.
Takes runtime O(N) and memory O(N)."""
traversal_list = []
stack = []; p = self # iterative approach.
while stack or p is not None: # in-order (pointer-based).
# 1. Discover left child
while p is not None:
stack.append(p)
p = p.to_children_left
# 2. Explore current node
p = stack.pop()
if ((interval := (p.node_interval_lo, p.node_interval_hi))
!= self.__class__.SENTINEL_INTERVAL):
traversal_list.append(interval)
# 3. Discover right child
p = p.to_children_right
return traversal_list
def interval_find(self, query_lo: int, query_hi: int) -> Optional[Self]:
"""Find a matching node (overlap) in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
stack = []; p = self # iterative approach.
while stack or p is not None: # in-order (pointer-based).
# 1. Discover left child.
while p is not None:
stack.append(p)
if (p.to_children_left is not None
and p.to_children_left.subtree_max >= query_lo):
p = p.to_children_left
else:
p = None
# 2. Explore current node.
p = stack.pop()
if ((node_interval := (p.node_interval_lo, p.node_interval_hi))
!= self.__class__.SENTINEL_INTERVAL
and p.node_interval_lo <= query_hi
and query_lo <= p.node_interval_hi):
return p
# 3. Discover right child.
if (p.to_children_right is not None
and p.to_children_right.subtree_max >= query_lo):
p = p.to_children_right
else:
p = None
return None
def interval_insert(self, other_node: Self) -> None:
"""Insert a new node in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
# 1. Remove any existing overlaps.
merged_lo = other_node.node_interval_lo
merged_hi = other_node.node_interval_hi
while (p := self.interval_find(merged_lo, merged_hi)) is not None:
merged_lo = min(merged_lo, p.node_interval_lo)
merged_hi = max(merged_hi, p.node_interval_hi)
p.interval_delete()
other_node.node_interval_lo = merged_lo
other_node.node_interval_hi = merged_hi
other_node.balancing_node_metadata()
# 2. Insert the expanded merge interval.
p = self # iterative approach.
while p is not None: # directional search (pointer-based).
if merged_lo < p.node_interval_lo:
if p.to_children_left is None:
p.to_children_left = other_node
other_node.from_parent = p
other_node.from_parent_as_right = 0
break
p = p.to_children_left
else:
if p.to_children_right is None:
p.to_children_right = other_node
other_node.from_parent = p
other_node.from_parent_as_right = 1
break
p = p.to_children_right
# 3. Update metadata from bottom to top.
p = other_node
while p is not None:
p.balancing_node_metadata()
p.balancing_subtree_rotation()
p = p.from_parent
def interval_delete(self) -> None:
"""Delete an existing node in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
def _inorder_next(node) -> Optional[Self]:
# 1A. Either search down from right child to find left leaf
if node.to_children_right is not None:
p = node.to_children_right
while p.to_children_left is not None:
p = p.to_children_left
return p
# 1B. Or search up from parent to find lowest ancestor
# that parents the subtree as left child.
else:
p = node
while p.from_parent is not None:
if p.from_parent_as_right == 0:
return p.from_parent
p = p.from_parent
# 1C. Or return None
return None
parent, parent_as_right = self.from_parent, self.from_parent_as_right
child_l, child_r = self.to_children_left, self.to_children_right
update_from = None
# 1A. Either remove the node if it has no child nodes.
if child_l is None and child_r is None:
if parent is not None:
if not parent_as_right:
parent.to_children_left = None
else:
parent.to_children_right = None
update_from = parent
self.from_parent, self.from_parent_as_right = None, -1
# 1B. Or replace it with its child if it has one child node.
elif child_l is None or child_r is None:
child = child_l if child_l is not None else child_r
if parent is not None:
if not parent_as_right:
parent.to_children_left = child
else:
parent.to_children_right = child
child.from_parent = parent
child.from_parent_as_right = parent_as_right
update_from = parent
self.from_parent, self.from_parent_as_right = None, -1
# 1C. Or swap it with its successor if it has two child nodes.
else:
successor = _inorder_next(self)
self.node_interval_lo = successor.node_interval_lo
self.node_interval_hi = successor.node_interval_hi
successor.interval_delete() # updates metadata for self
return
# 2. Update metadata from bottom to top.
p = update_from
while p is not None:
p.balancing_node_metadata()
p.balancing_subtree_rotation()
p = p.from_parent
def balancing_node_metadata(self) -> None:
"""Update metadata in node for self-balancing BST.
Takes runtime O(1) and memory O(1)."""
child_l, child_r = self.to_children_left, self.to_children_right
# 1. Update subtree height from children.
height_l = 0 if child_l is None else child_l.subtree_height
height_r = 0 if child_r is None else child_r.subtree_height
self.subtree_height = 1 + max(height_l, height_r)
# 2. Update subtree max from children.
max_l = -1 if child_l is None else child_l.subtree_max
max_r = -1 if child_r is None else child_r.subtree_max
self.subtree_max = max(self.node_interval_hi, max_l, max_r)
def balancing_subtree_rotation(self) -> None:
"""Update rotation in subtrees for self-balancing BST.
Takes runtime O(1) and memory O(1)."""
def _get_balance(node):
child_l = node.to_children_left
child_r = node.to_children_right
# 1. Calculate balance from children.
height_l = 0 if child_l is None else child_l.subtree_height
height_r = 0 if child_r is None else child_r.subtree_height
return height_l - height_r
def _rotate_left(rotated_down):
rotated_up = rotated_down.to_children_right
parent = rotated_down.from_parent
parent_as_right = rotated_down.from_parent_as_right
# 1. Move rotated_up left to rotated_down right
middle_subtree = rotated_up.to_children_left
rotated_down.to_children_right = middle_subtree
if middle_subtree is not None:
middle_subtree.from_parent = rotated_down
middle_subtree.from_parent_as_right = 1
# 2. Promote rotated_up
rotated_up.from_parent = parent
rotated_up.from_parent_as_right = parent_as_right
if parent is not None:
if not parent_as_right:
parent.to_children_left = rotated_up
else:
parent.to_children_right = rotated_up
# 3. Demote rotated_down
rotated_up.to_children_left = rotated_down
rotated_down.from_parent = rotated_up
rotated_down.from_parent_as_right = 0
# 4. Update metadata from bottom to top
rotated_down.balancing_node_metadata()
rotated_up.balancing_node_metadata()
def _rotate_right(rotated_down):
rotated_up = rotated_down.to_children_left
parent = rotated_down.from_parent
parent_as_right = rotated_down.from_parent_as_right
# 1. Move rotated_up right to rotated_down left
middle_subtree = rotated_up.to_children_right
rotated_down.to_children_left = middle_subtree
if middle_subtree is not None:
middle_subtree.from_parent = rotated_down
middle_subtree.from_parent_as_right = 0
# 2. Promote rotated_up
rotated_up.from_parent = parent
rotated_up.from_parent_as_right = parent_as_right
if parent is not None:
if not parent_as_right:
parent.to_children_left = rotated_up
else:
parent.to_children_right = rotated_up
# 3. Demote rotated_down
rotated_up.to_children_right = rotated_down
rotated_down.from_parent = rotated_up
rotated_down.from_parent_as_right = 1
# 4. Update metadata from bottom to top
rotated_down.balancing_node_metadata()
rotated_up.balancing_node_metadata()
def _rotate_subtree(node):
balance = _get_balance(node)
child_l = node.to_children_left
child_r = node.to_children_right
balance_l = 0 if child_l is None else _get_balance(child_l)
balance_r = 0 if child_r is None else _get_balance(child_r)
# 1A. Either handle node insertion to left subtree of left child.
# This is the "Left-Left Case".
if balance_l == 1 and balance > 1:
_rotate_right(node)
# 1B. Or handle node insertion to right subtree of right child.
# This is the "Right-Right Case".
elif balance_r == -1 and balance < -1:
_rotate_left(node)
# 1C. Or handle node insertion to right subtree of left child.
# This is the "Left-Right Case".
elif balance_l == -1 and balance > 1:
_rotate_left(node.to_children_left)
_rotate_right(node)
# 1D. Or handle node insertion to left subtree of right child.
# This is the "Right-Left Case".
elif balance_r == 1 and balance < -1:
_rotate_right(node.to_children_right)
_rotate_left(node)
child_l = self.to_children_left
child_r = self.to_children_right
# 1. Update rotation in left subtree.
if child_l is not None:
_rotate_subtree(child_l)
# 2. Update rotation in right subtree.
if child_r is not None:
_rotate_subtree(child_r)
|
Functions
__init__(**kw)
Make new node for interval merge tree.
Takes runtime O(1) and memory O(1).
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
19
20
21
22
23
24
25
26
27
28
29 | def __init__(self, **kw) -> None:
"""Make new node for interval merge tree.
Takes runtime O(1) and memory O(1)."""
self.from_parent = kw.get("from_parent", None)
self.from_parent_as_right = kw.get("from_parent_as_right", 0)
self.to_children_left = kw.get("to_children_left", None)
self.to_children_right = kw.get("to_children_right", None)
self.subtree_height = kw.get("subtree_height", 1)
self.subtree_max = kw.get("subtree_max", 0)
self.node_interval_lo = kw.get("node_interval_lo", -1)
self.node_interval_hi = kw.get("node_interval_hi", -1)
|
balancing_node_metadata()
Update metadata in node for self-balancing BST.
Takes runtime O(1) and memory O(1).
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
175
176
177
178
179
180
181
182
183
184
185
186 | def balancing_node_metadata(self) -> None:
"""Update metadata in node for self-balancing BST.
Takes runtime O(1) and memory O(1)."""
child_l, child_r = self.to_children_left, self.to_children_right
# 1. Update subtree height from children.
height_l = 0 if child_l is None else child_l.subtree_height
height_r = 0 if child_r is None else child_r.subtree_height
self.subtree_height = 1 + max(height_l, height_r)
# 2. Update subtree max from children.
max_l = -1 if child_l is None else child_l.subtree_max
max_r = -1 if child_r is None else child_r.subtree_max
self.subtree_max = max(self.node_interval_hi, max_l, max_r)
|
balancing_subtree_rotation()
Update rotation in subtrees for self-balancing BST.
Takes runtime O(1) and memory O(1).
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 | def balancing_subtree_rotation(self) -> None:
"""Update rotation in subtrees for self-balancing BST.
Takes runtime O(1) and memory O(1)."""
def _get_balance(node):
child_l = node.to_children_left
child_r = node.to_children_right
# 1. Calculate balance from children.
height_l = 0 if child_l is None else child_l.subtree_height
height_r = 0 if child_r is None else child_r.subtree_height
return height_l - height_r
def _rotate_left(rotated_down):
rotated_up = rotated_down.to_children_right
parent = rotated_down.from_parent
parent_as_right = rotated_down.from_parent_as_right
# 1. Move rotated_up left to rotated_down right
middle_subtree = rotated_up.to_children_left
rotated_down.to_children_right = middle_subtree
if middle_subtree is not None:
middle_subtree.from_parent = rotated_down
middle_subtree.from_parent_as_right = 1
# 2. Promote rotated_up
rotated_up.from_parent = parent
rotated_up.from_parent_as_right = parent_as_right
if parent is not None:
if not parent_as_right:
parent.to_children_left = rotated_up
else:
parent.to_children_right = rotated_up
# 3. Demote rotated_down
rotated_up.to_children_left = rotated_down
rotated_down.from_parent = rotated_up
rotated_down.from_parent_as_right = 0
# 4. Update metadata from bottom to top
rotated_down.balancing_node_metadata()
rotated_up.balancing_node_metadata()
def _rotate_right(rotated_down):
rotated_up = rotated_down.to_children_left
parent = rotated_down.from_parent
parent_as_right = rotated_down.from_parent_as_right
# 1. Move rotated_up right to rotated_down left
middle_subtree = rotated_up.to_children_right
rotated_down.to_children_left = middle_subtree
if middle_subtree is not None:
middle_subtree.from_parent = rotated_down
middle_subtree.from_parent_as_right = 0
# 2. Promote rotated_up
rotated_up.from_parent = parent
rotated_up.from_parent_as_right = parent_as_right
if parent is not None:
if not parent_as_right:
parent.to_children_left = rotated_up
else:
parent.to_children_right = rotated_up
# 3. Demote rotated_down
rotated_up.to_children_right = rotated_down
rotated_down.from_parent = rotated_up
rotated_down.from_parent_as_right = 1
# 4. Update metadata from bottom to top
rotated_down.balancing_node_metadata()
rotated_up.balancing_node_metadata()
def _rotate_subtree(node):
balance = _get_balance(node)
child_l = node.to_children_left
child_r = node.to_children_right
balance_l = 0 if child_l is None else _get_balance(child_l)
balance_r = 0 if child_r is None else _get_balance(child_r)
# 1A. Either handle node insertion to left subtree of left child.
# This is the "Left-Left Case".
if balance_l == 1 and balance > 1:
_rotate_right(node)
# 1B. Or handle node insertion to right subtree of right child.
# This is the "Right-Right Case".
elif balance_r == -1 and balance < -1:
_rotate_left(node)
# 1C. Or handle node insertion to right subtree of left child.
# This is the "Left-Right Case".
elif balance_l == -1 and balance > 1:
_rotate_left(node.to_children_left)
_rotate_right(node)
# 1D. Or handle node insertion to left subtree of right child.
# This is the "Right-Left Case".
elif balance_r == 1 and balance < -1:
_rotate_right(node.to_children_right)
_rotate_left(node)
child_l = self.to_children_left
child_r = self.to_children_right
# 1. Update rotation in left subtree.
if child_l is not None:
_rotate_subtree(child_l)
# 2. Update rotation in right subtree.
if child_r is not None:
_rotate_subtree(child_r)
|
interval_delete()
Delete an existing node in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average.
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 | def interval_delete(self) -> None:
"""Delete an existing node in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
def _inorder_next(node) -> Optional[Self]:
# 1A. Either search down from right child to find left leaf
if node.to_children_right is not None:
p = node.to_children_right
while p.to_children_left is not None:
p = p.to_children_left
return p
# 1B. Or search up from parent to find lowest ancestor
# that parents the subtree as left child.
else:
p = node
while p.from_parent is not None:
if p.from_parent_as_right == 0:
return p.from_parent
p = p.from_parent
# 1C. Or return None
return None
parent, parent_as_right = self.from_parent, self.from_parent_as_right
child_l, child_r = self.to_children_left, self.to_children_right
update_from = None
# 1A. Either remove the node if it has no child nodes.
if child_l is None and child_r is None:
if parent is not None:
if not parent_as_right:
parent.to_children_left = None
else:
parent.to_children_right = None
update_from = parent
self.from_parent, self.from_parent_as_right = None, -1
# 1B. Or replace it with its child if it has one child node.
elif child_l is None or child_r is None:
child = child_l if child_l is not None else child_r
if parent is not None:
if not parent_as_right:
parent.to_children_left = child
else:
parent.to_children_right = child
child.from_parent = parent
child.from_parent_as_right = parent_as_right
update_from = parent
self.from_parent, self.from_parent_as_right = None, -1
# 1C. Or swap it with its successor if it has two child nodes.
else:
successor = _inorder_next(self)
self.node_interval_lo = successor.node_interval_lo
self.node_interval_hi = successor.node_interval_hi
successor.interval_delete() # updates metadata for self
return
# 2. Update metadata from bottom to top.
p = update_from
while p is not None:
p.balancing_node_metadata()
p.balancing_subtree_rotation()
p = p.from_parent
|
interval_find(query_lo, query_hi)
Find a matching node (overlap) in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average.
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 | def interval_find(self, query_lo: int, query_hi: int) -> Optional[Self]:
"""Find a matching node (overlap) in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
stack = []; p = self # iterative approach.
while stack or p is not None: # in-order (pointer-based).
# 1. Discover left child.
while p is not None:
stack.append(p)
if (p.to_children_left is not None
and p.to_children_left.subtree_max >= query_lo):
p = p.to_children_left
else:
p = None
# 2. Explore current node.
p = stack.pop()
if ((node_interval := (p.node_interval_lo, p.node_interval_hi))
!= self.__class__.SENTINEL_INTERVAL
and p.node_interval_lo <= query_hi
and query_lo <= p.node_interval_hi):
return p
# 3. Discover right child.
if (p.to_children_right is not None
and p.to_children_right.subtree_max >= query_lo):
p = p.to_children_right
else:
p = None
return None
|
interval_insert(other_node)
Insert a new node in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average.
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 | def interval_insert(self, other_node: Self) -> None:
"""Insert a new node in interval merge tree.
Takes runtime O(Log N) and memory O(1) on average."""
# 1. Remove any existing overlaps.
merged_lo = other_node.node_interval_lo
merged_hi = other_node.node_interval_hi
while (p := self.interval_find(merged_lo, merged_hi)) is not None:
merged_lo = min(merged_lo, p.node_interval_lo)
merged_hi = max(merged_hi, p.node_interval_hi)
p.interval_delete()
other_node.node_interval_lo = merged_lo
other_node.node_interval_hi = merged_hi
other_node.balancing_node_metadata()
# 2. Insert the expanded merge interval.
p = self # iterative approach.
while p is not None: # directional search (pointer-based).
if merged_lo < p.node_interval_lo:
if p.to_children_left is None:
p.to_children_left = other_node
other_node.from_parent = p
other_node.from_parent_as_right = 0
break
p = p.to_children_left
else:
if p.to_children_right is None:
p.to_children_right = other_node
other_node.from_parent = p
other_node.from_parent_as_right = 1
break
p = p.to_children_right
# 3. Update metadata from bottom to top.
p = other_node
while p is not None:
p.balancing_node_metadata()
p.balancing_subtree_rotation()
p = p.from_parent
|
interval_traversal()
Make traversal list in interval merge tree.
Takes runtime O(N) and memory O(N).
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 | def interval_traversal(self) -> List[List[int]]:
"""Make traversal list in interval merge tree.
Takes runtime O(N) and memory O(N)."""
traversal_list = []
stack = []; p = self # iterative approach.
while stack or p is not None: # in-order (pointer-based).
# 1. Discover left child
while p is not None:
stack.append(p)
p = p.to_children_left
# 2. Explore current node
p = stack.pop()
if ((interval := (p.node_interval_lo, p.node_interval_hi))
!= self.__class__.SENTINEL_INTERVAL):
traversal_list.append(interval)
# 3. Discover right child
p = p.to_children_right
return traversal_list
|
Functions
merge(intervals)
Entry point for merge api.
Takes runtime O(N Log N) and memory O(N) for N items.
Source code in src/data_streaming_accelerators/core/dynamic_interval_management.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299 | @validators.validate_args([None, check_intervals])
def merge(self, intervals: List[List[int]]) -> List[List[int]]:
"""Entry point for `merge` api.
Takes runtime O(N Log N) and memory O(N) for N items."""
Node = self.__class__.IntervalMergeTreeAVLBSTNode # type alias
sentinel_lo, sentinel_hi = Node.SENTINEL_INTERVAL
sent = Node(node_interval_lo=sentinel_lo,
node_interval_hi=sentinel_hi, subtree_max=sentinel_hi)
for lo, hi in intervals:
sent.interval_insert(Node(node_interval_lo=lo,
node_interval_hi=hi, subtree_max=hi))
traversal = sent.to_children_left.interval_traversal()
return None if not traversal else sorted(traversal) # sorting optional
|