Coverage for adhoc-cicd-odoo-odoo / odoo / tools / intervals.py: 75%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:05 +0000

1# Part of Odoo. See LICENSE file for full copyright and licensing details. 

2from __future__ import annotations 

3 

4import itertools 

5import typing 

6import warnings 

7 

8if typing.TYPE_CHECKING: 

9 from collections.abc import Iterable, Iterator 

10 from collections.abc import Set as AbstractSet 

11 

12T = typing.TypeVar('T') 

13 

14 

15def _boundaries(intervals: Intervals[T] | Iterable[tuple[T, T, AbstractSet]], opening: str, closing: str) -> Iterator[tuple[T, str, AbstractSet]]: 

16 """ Iterate on the boundaries of intervals. """ 

17 for start, stop, recs in intervals: 

18 if start < stop: 

19 yield (start, opening, recs) 

20 yield (stop, closing, recs) 

21 

22 

23class Intervals(typing.Generic[T]): 

24 """ Collection of ordered disjoint intervals with some associated records. 

25 Each interval is a triple ``(start, stop, records)``, where ``records`` 

26 is a recordset. 

27 

28 By default, adjacent intervals are merged (1, 3, a) and (3, 5, b) become 

29 (1, 5, a | b). This behaviour can be prevented by setting 

30 `keep_distinct=True`. 

31 

32 """ 

33 def __init__(self, intervals: Iterable[tuple[T, T, AbstractSet]] | None = None, *, keep_distinct: bool = False): 

34 self._items: list[tuple[T, T, AbstractSet]] = [] 

35 self._keep_distinct = keep_distinct 

36 if intervals: 

37 # normalize the representation of intervals 

38 append = self._items.append 

39 starts: list[T] = [] 

40 items: AbstractSet | None = None 

41 if self._keep_distinct: 

42 boundaries = sorted(_boundaries(sorted(intervals), 'start', 'stop'), key=lambda i: i[0]) 

43 else: 

44 boundaries = sorted(_boundaries(intervals, 'start', 'stop')) 

45 for value, flag, value_items in boundaries: 

46 if flag == 'start': 

47 starts.append(value) 

48 if items is None: 48 ↛ 51line 48 didn't jump to line 51 because the condition on line 48 was always true

49 items = value_items 

50 else: 

51 items = items.union(value_items) 

52 else: 

53 start = starts.pop() 

54 if not starts: 54 ↛ 45line 54 didn't jump to line 45 because the condition on line 54 was always true

55 append((start, value, items)) 

56 items = None 

57 

58 def __bool__(self): 

59 return bool(self._items) 

60 

61 def __len__(self): 

62 return len(self._items) 

63 

64 def __iter__(self): 

65 return iter(self._items) 

66 

67 def __reversed__(self): 

68 return reversed(self._items) 

69 

70 def __or__(self, other): 

71 """ Return the union of two sets of intervals. """ 

72 return Intervals(itertools.chain(self._items, other._items), keep_distinct=self._keep_distinct) 

73 

74 def __and__(self, other): 

75 """ Return the intersection of two sets of intervals. """ 

76 return self._merge(other, False) 

77 

78 def __sub__(self, other): 

79 """ Return the difference of two sets of intervals. """ 

80 return self._merge(other, True) 

81 

82 def _merge(self, other: Intervals | Iterable[tuple[T, T, AbstractSet]], difference: bool) -> Intervals: 

83 """ Return the difference or intersection of two sets of intervals. """ 

84 result = Intervals(keep_distinct=self._keep_distinct) 

85 append = result._items.append 

86 

87 # using 'self' and 'other' below forces normalization 

88 bounds1 = _boundaries(self, 'start', 'stop') 

89 bounds2 = _boundaries(Intervals(other, keep_distinct=self._keep_distinct), 'switch', 'switch') 

90 

91 start = None # set by start/stop 

92 recs1 = None # set by start 

93 enabled = difference # changed by switch 

94 if self._keep_distinct: 94 ↛ 97line 94 didn't jump to line 97 because the condition on line 94 was always true

95 bounds = sorted(itertools.chain(bounds1, bounds2), key=lambda i: i[0]) 

96 else: 

97 bounds = sorted(itertools.chain(bounds1, bounds2)) 

98 for value, flag, recs in bounds: 

99 if flag == 'start': 

100 start = value 

101 recs1 = recs 

102 elif flag == 'stop': 

103 if enabled and start < value: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was always true

104 append((start, value, recs1)) 

105 start = None 

106 else: 

107 if not enabled and start is not None: 

108 start = value 

109 if enabled and start is not None and start < value: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 append((start, value, recs1)) 

111 enabled = not enabled 

112 

113 return result 

114 

115 def remove(self, interval): 

116 """ Remove an interval from the set. """ 

117 warnings.warn("Deprecated since 19.0, do not mutate intervals", DeprecationWarning) 

118 self._items.remove(interval) 

119 

120 def items(self): 

121 """ Return the intervals. """ 

122 warnings.warn("Deprecated since 19.0, just iterate over Intervals", DeprecationWarning) 

123 return self._items 

124 

125 

126def intervals_overlap(interval_a: tuple[T, T], interval_b: tuple[T, T]) -> bool: 

127 """Check whether intervals intersect. 

128 

129 :param interval_a: 

130 :param interval_b: 

131 :return: True if two non-zero intervals overlap 

132 """ 

133 start_a, stop_a = interval_a 

134 start_b, stop_b = interval_b 

135 return start_a < stop_b and stop_a > start_b 

136 

137 

138def invert_intervals(intervals: Iterable[tuple[T, T]], first_start: T, last_stop: T) -> list[tuple[T, T]]: 

139 """Return the intervals between the intervals that were passed in. 

140 

141 The expected use case is to turn "available intervals" into "unavailable intervals". 

142 :examples: 

143 ([(1, 2), (4, 5)], 0, 10) -> [(0, 1), (2, 4), (5, 10)] 

144 

145 :param intervals: 

146 :param first_start: start of whole interval 

147 :param last_stop: stop of whole interval 

148 """ 

149 items = [] 

150 prev_stop = first_start 

151 for start, stop in sorted(intervals): 

152 if prev_stop and prev_stop < start and start <= last_stop: 

153 items.append((prev_stop, start)) 

154 prev_stop = max(prev_stop, stop) 

155 if last_stop and prev_stop < last_stop: 

156 items.append((prev_stop, last_stop)) 

157 # abuse Intervals to merge contiguous intervals 

158 return [(start, stop) for start, stop, _ in Intervals([(start, stop, set()) for start, stop in items])]