Coverage for adhoc-cicd-odoo-odoo / odoo / tools / intervals.py: 75%
90 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:05 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:05 +0000
1# Part of Odoo. See LICENSE file for full copyright and licensing details.
2from __future__ import annotations
4import itertools
5import typing
6import warnings
8if typing.TYPE_CHECKING:
9 from collections.abc import Iterable, Iterator
10 from collections.abc import Set as AbstractSet
12T = typing.TypeVar('T')
15def _boundaries(intervals: Intervals[T] | Iterable[tuple[T, T, AbstractSet]], opening: str, closing: str) -> Iterator[tuple[T, str, AbstractSet]]:
16 """ Iterate on the boundaries of intervals. """
17 for start, stop, recs in intervals:
18 if start < stop:
19 yield (start, opening, recs)
20 yield (stop, closing, recs)
23class Intervals(typing.Generic[T]):
24 """ Collection of ordered disjoint intervals with some associated records.
25 Each interval is a triple ``(start, stop, records)``, where ``records``
26 is a recordset.
28 By default, adjacent intervals are merged (1, 3, a) and (3, 5, b) become
29 (1, 5, a | b). This behaviour can be prevented by setting
30 `keep_distinct=True`.
32 """
33 def __init__(self, intervals: Iterable[tuple[T, T, AbstractSet]] | None = None, *, keep_distinct: bool = False):
34 self._items: list[tuple[T, T, AbstractSet]] = []
35 self._keep_distinct = keep_distinct
36 if intervals:
37 # normalize the representation of intervals
38 append = self._items.append
39 starts: list[T] = []
40 items: AbstractSet | None = None
41 if self._keep_distinct:
42 boundaries = sorted(_boundaries(sorted(intervals), 'start', 'stop'), key=lambda i: i[0])
43 else:
44 boundaries = sorted(_boundaries(intervals, 'start', 'stop'))
45 for value, flag, value_items in boundaries:
46 if flag == 'start':
47 starts.append(value)
48 if items is None: 48 ↛ 51line 48 didn't jump to line 51 because the condition on line 48 was always true
49 items = value_items
50 else:
51 items = items.union(value_items)
52 else:
53 start = starts.pop()
54 if not starts: 54 ↛ 45line 54 didn't jump to line 45 because the condition on line 54 was always true
55 append((start, value, items))
56 items = None
58 def __bool__(self):
59 return bool(self._items)
61 def __len__(self):
62 return len(self._items)
64 def __iter__(self):
65 return iter(self._items)
67 def __reversed__(self):
68 return reversed(self._items)
70 def __or__(self, other):
71 """ Return the union of two sets of intervals. """
72 return Intervals(itertools.chain(self._items, other._items), keep_distinct=self._keep_distinct)
74 def __and__(self, other):
75 """ Return the intersection of two sets of intervals. """
76 return self._merge(other, False)
78 def __sub__(self, other):
79 """ Return the difference of two sets of intervals. """
80 return self._merge(other, True)
82 def _merge(self, other: Intervals | Iterable[tuple[T, T, AbstractSet]], difference: bool) -> Intervals:
83 """ Return the difference or intersection of two sets of intervals. """
84 result = Intervals(keep_distinct=self._keep_distinct)
85 append = result._items.append
87 # using 'self' and 'other' below forces normalization
88 bounds1 = _boundaries(self, 'start', 'stop')
89 bounds2 = _boundaries(Intervals(other, keep_distinct=self._keep_distinct), 'switch', 'switch')
91 start = None # set by start/stop
92 recs1 = None # set by start
93 enabled = difference # changed by switch
94 if self._keep_distinct: 94 ↛ 97line 94 didn't jump to line 97 because the condition on line 94 was always true
95 bounds = sorted(itertools.chain(bounds1, bounds2), key=lambda i: i[0])
96 else:
97 bounds = sorted(itertools.chain(bounds1, bounds2))
98 for value, flag, recs in bounds:
99 if flag == 'start':
100 start = value
101 recs1 = recs
102 elif flag == 'stop':
103 if enabled and start < value: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was always true
104 append((start, value, recs1))
105 start = None
106 else:
107 if not enabled and start is not None:
108 start = value
109 if enabled and start is not None and start < value: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 append((start, value, recs1))
111 enabled = not enabled
113 return result
115 def remove(self, interval):
116 """ Remove an interval from the set. """
117 warnings.warn("Deprecated since 19.0, do not mutate intervals", DeprecationWarning)
118 self._items.remove(interval)
120 def items(self):
121 """ Return the intervals. """
122 warnings.warn("Deprecated since 19.0, just iterate over Intervals", DeprecationWarning)
123 return self._items
126def intervals_overlap(interval_a: tuple[T, T], interval_b: tuple[T, T]) -> bool:
127 """Check whether intervals intersect.
129 :param interval_a:
130 :param interval_b:
131 :return: True if two non-zero intervals overlap
132 """
133 start_a, stop_a = interval_a
134 start_b, stop_b = interval_b
135 return start_a < stop_b and stop_a > start_b
138def invert_intervals(intervals: Iterable[tuple[T, T]], first_start: T, last_stop: T) -> list[tuple[T, T]]:
139 """Return the intervals between the intervals that were passed in.
141 The expected use case is to turn "available intervals" into "unavailable intervals".
142 :examples:
143 ([(1, 2), (4, 5)], 0, 10) -> [(0, 1), (2, 4), (5, 10)]
145 :param intervals:
146 :param first_start: start of whole interval
147 :param last_stop: stop of whole interval
148 """
149 items = []
150 prev_stop = first_start
151 for start, stop in sorted(intervals):
152 if prev_stop and prev_stop < start and start <= last_stop:
153 items.append((prev_stop, start))
154 prev_stop = max(prev_stop, stop)
155 if last_stop and prev_stop < last_stop:
156 items.append((prev_stop, last_stop))
157 # abuse Intervals to merge contiguous intervals
158 return [(start, stop) for start, stop, _ in Intervals([(start, stop, set()) for start, stop in items])]