Coverage for adhoc-cicd-odoo-odoo / odoo / orm / domains.py: 73%
1044 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:22 +0000
1# Part of Odoo. See LICENSE file for full copyright and licensing details.
3""" Domain expression processing
5The domain represents a first-order logical expression.
6The main duty of this module is to represent filter conditions on models
7and ease rewriting them.
8A lot of things should be documented here, but as a first
9step in the right direction, some tests in test_expression.py
10might give you some additional information.
12The `Domain` is represented as an AST which is a predicate using boolean
13operators.
14- n-ary operators: AND, OR
15- unary operator: NOT
16- boolean constants: TRUE, FALSE
17- (simple) conditions: (expression, operator, value)
19Conditions are triplets of `(expression, operator, value)`.
20`expression` is usually a field name. It can be an expression that uses the
21dot-notation to traverse relationships or accesses properties of the field.
22The traversal of relationships is equivalent to using the `any` operator.
23`operator` in one of the CONDITION_OPERATORS, the detailed description of what
24is possible is documented there.
25`value` is a Python value which should be supported by the operator.
28For legacy reasons, a domain uses an inconsistent two-levels abstract
29syntax (domains were a regular Python data structures). At the first
30level, a domain is an expression made of conditions and domain operators
31used in prefix notation. The available operators at this level are
32'!', '&', and '|'. '!' is a unary 'not', '&' is a binary 'and',
33and '|' is a binary 'or'.
34For instance, here is a possible domain. (<cond> stands for an arbitrary
35condition, more on this later.):
37 ['&', '!', <cond>, '|', <cond2>, <cond3>]
39It is equivalent to this pseudo code using infix notation::
41 (not <cond1>) and (<cond2> or <cond3>)
43The second level of syntax deals with the condition representation. A condition
44is a triple of the form (left, operator, right). That is, a condition uses
45an infix notation, and the available operators, and possible left and
46right operands differ with those of the previous level. Here is a
47possible condition:
49 ('company_id.name', '=', 'Odoo')
50"""
51from __future__ import annotations
53import collections
54import enum
55import functools
56import itertools
57import logging
58import operator
59import pytz
60import types
61import typing
62import warnings
63from datetime import date, datetime, time, timedelta, timezone
65from odoo.exceptions import MissingError, UserError
66from odoo.tools import SQL, OrderedSet, Query, classproperty, partition, str2bool
67from odoo.tools.date_utils import parse_date, parse_iso_date
68from .identifiers import NewId
69from .utils import COLLECTION_TYPES, parse_field_expr
71if typing.TYPE_CHECKING:
72 from collections.abc import Callable, Collection, Iterable
73 from .fields import Field
74 from .models import BaseModel
76 M = typing.TypeVar('M', bound=BaseModel)
79_logger = logging.getLogger('odoo.domains')
81STANDARD_CONDITION_OPERATORS = frozenset([
82 'any', 'not any',
83 'any!', 'not any!',
84 'in', 'not in',
85 '<', '>', '<=', '>=',
86 'like', 'not like',
87 'ilike', 'not ilike',
88 '=like', 'not =like',
89 '=ilike', 'not =ilike',
90])
91"""List of standard operators for conditions.
92This should be supported in the framework at all levels.
94- `any` works for relational fields and `id` to check if a record matches
95 the condition
96 - if value is SQL or Query, see `any!`
97 - if bypass_search_access is set on the field, see `any!`
98 - if value is a Domain for a many2one (or `id`),
99 _search with active_test=False
100 - if value is a Domain for a x2many,
101 _search on the comodel of the field (with its context)
102- `any!` works like `any` but bypass adding record rules on the comodel
103- `in` for equality checks where the given value is a collection of values
104 - the collection is transformed into OrderedSet
105 - False value indicates that the value is *not set*
106 - for relational fields
107 - if int, bypass record rules
108 - if str, search using display_name of the model
109 - the value should have the type of the field
110 - SQL type is always accepted
111- `<`, `>`, ... inequality checks, similar behaviour to `in` with a single value
112- string pattern comparison
113 - `=like` case-sensitive compare to a string using SQL like semantics
114 - `=ilike` case-insensitive with `unaccent` comparison to a string
115 - `like`, `ilike` behave like the preceding methods, but add a wildcards
116 around the value
117"""
118CONDITION_OPERATORS = set(STANDARD_CONDITION_OPERATORS) # modifiable (for optimizations only)
119"""
120List of available operators for conditions.
121The non-standard operators can be reduced to standard operators by using the
122optimization function. See the respective optimization functions for the
123details.
124"""
125INTERNAL_CONDITION_OPERATORS = frozenset(('any!', 'not any!'))
127NEGATIVE_CONDITION_OPERATORS = {
128 'not any': 'any',
129 'not any!': 'any!',
130 'not in': 'in',
131 'not like': 'like',
132 'not ilike': 'ilike',
133 'not =like': '=like',
134 'not =ilike': '=ilike',
135 '!=': '=',
136 '<>': '=',
137}
138"""A subset of operators with a 'negative' semantic, mapping to the 'positive' operator."""
140# negations for operators (used in DomainNot)
141_INVERSE_OPERATOR = {
142 # from NEGATIVE_CONDITION_OPERATORS
143 'not any': 'any',
144 'not any!': 'any!',
145 'not in': 'in',
146 'not like': 'like',
147 'not ilike': 'ilike',
148 'not =like': '=like',
149 'not =ilike': '=ilike',
150 '!=': '=',
151 '<>': '=',
152 # positive to negative
153 'any': 'not any',
154 'any!': 'not any!',
155 'in': 'not in',
156 'like': 'not like',
157 'ilike': 'not ilike',
158 '=like': 'not =like',
159 '=ilike': 'not =ilike',
160 '=': '!=',
161}
162"""Dict to find the inverses of the operators."""
163_INVERSE_INEQUALITY = {
164 '<': '>=',
165 '>': '<=',
166 '>=': '<',
167 '<=': '>',
168}
169""" Dict to find the inverse of inequality operators.
170Handled differently because of null values."""
172_TRUE_LEAF = (1, '=', 1)
173_FALSE_LEAF = (0, '=', 1)
176class OptimizationLevel(enum.IntEnum):
177 """Indicator whether the domain was optimized."""
178 NONE = 0
179 BASIC = enum.auto()
180 DYNAMIC_VALUES = enum.auto()
181 FULL = enum.auto()
183 @functools.cached_property
184 def next_level(self):
185 assert self is not OptimizationLevel.FULL, "FULL level is the last one"
186 return OptimizationLevel(int(self) + 1)
189MAX_OPTIMIZE_ITERATIONS = 1000
192# --------------------------------------------------
193# Domain definition and manipulation
194# --------------------------------------------------
196class Domain:
197 """Representation of a domain as an AST.
198 """
199 # Domain is an abstract class (ABC), but not marked as such
200 # because we overwrite __new__ so typechecking for abstractmethod is incorrect.
201 # We do this so that we can use the Domain as both a factory for multiple
202 # types of domains, while still having `isinstance` working for it.
203 __slots__ = ('_opt_level',)
204 _opt_level: OptimizationLevel
206 def __new__(cls, *args, internal: bool = False):
207 """Build a domain AST.
209 ```
210 Domain([('a', '=', 5), ('b', '=', 8)])
211 Domain('a', '=', 5) & Domain('b', '=', 8)
212 Domain.AND([Domain('a', '=', 5), *other_domains, Domain.TRUE])
213 ```
215 If we have one argument, it is a `Domain`, or a list representation, or a bool.
216 In case we have multiple ones, there must be 3 of them:
217 a field (str), the operator (str) and a value for the condition.
219 By default, the special operators ``'any!'`` and ``'not any!'`` are
220 allowed in domain conditions (``Domain('a', 'any!', dom)``) but not in
221 domain lists (``Domain([('a', 'any!', dom)])``).
222 """
223 if len(args) > 1:
224 if isinstance(args[0], str):
225 return DomainCondition(*args).checked()
226 # special cases like True/False constants
227 if args == _TRUE_LEAF:
228 return _TRUE_DOMAIN
229 if args == _FALSE_LEAF: 229 ↛ 231line 229 didn't jump to line 231 because the condition on line 229 was always true
230 return _FALSE_DOMAIN
231 raise TypeError(f"Domain() invalid arguments: {args!r}")
233 arg = args[0]
234 if isinstance(arg, Domain):
235 return arg
236 if arg is True or arg == []:
237 return _TRUE_DOMAIN
238 if arg is False: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 return _FALSE_DOMAIN
240 if arg is NotImplemented: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 raise NotImplementedError
243 # parse as a list
244 # perf: do this inside __new__ to avoid calling function that return
245 # a Domain which would call implicitly __init__
246 if not isinstance(arg, (list, tuple)): 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 raise TypeError(f"Domain() invalid argument type for domain: {arg!r}")
248 stack: list[Domain] = []
249 try:
250 for item in reversed(arg):
251 if isinstance(item, (tuple, list)) and len(item) == 3:
252 if internal:
253 # process subdomains when processing internal operators
254 if item[1] in ('any', 'any!', 'not any', 'not any!') and isinstance(item[2], (list, tuple)): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 item = (item[0], item[1], Domain(item[2], internal=True))
256 elif item[1] in INTERNAL_CONDITION_OPERATORS: 256 ↛ 258line 256 didn't jump to line 258 because the condition on line 256 was never true
257 # internal operators are not accepted
258 raise ValueError(f"Domain() invalid item in domain: {item!r}")
259 stack.append(Domain(*item))
260 elif item == DomainAnd.OPERATOR:
261 stack.append(stack.pop() & stack.pop())
262 elif item == DomainOr.OPERATOR:
263 stack.append(stack.pop() | stack.pop())
264 elif item == DomainNot.OPERATOR:
265 stack.append(~stack.pop())
266 elif isinstance(item, Domain): 266 ↛ 269line 266 didn't jump to line 269 because the condition on line 266 was always true
267 stack.append(item)
268 else:
269 raise ValueError(f"Domain() invalid item in domain: {item!r}")
270 # keep the order and simplify already
271 if len(stack) == 1:
272 return stack[0]
273 return Domain.AND(reversed(stack))
274 except IndexError:
275 raise ValueError(f"Domain() malformed domain {arg!r}")
277 @classproperty
278 def TRUE(cls) -> Domain:
279 return _TRUE_DOMAIN
281 @classproperty
282 def FALSE(cls) -> Domain:
283 return _FALSE_DOMAIN
285 NEGATIVE_OPERATORS = types.MappingProxyType(NEGATIVE_CONDITION_OPERATORS)
287 @staticmethod
288 def custom(
289 *,
290 to_sql: Callable[[BaseModel, str, Query], SQL],
291 predicate: Callable[[BaseModel], bool] | None = None,
292 ) -> DomainCustom:
293 """Create a custom domain.
295 :param to_sql: callable(model, alias, query) that returns the SQL
296 :param predicate: callable(record) that checks whether a record is kept
297 when filtering
298 """
299 return DomainCustom(to_sql, predicate)
301 @staticmethod
302 def AND(items: Iterable) -> Domain:
303 """Build the conjuction of domains: (item1 AND item2 AND ...)"""
304 return DomainAnd.apply(Domain(item) for item in items)
306 @staticmethod
307 def OR(items: Iterable) -> Domain:
308 """Build the disjuction of domains: (item1 OR item2 OR ...)"""
309 return DomainOr.apply(Domain(item) for item in items)
311 def __setattr__(self, name, value):
312 raise TypeError("Domain objects are immutable")
314 def __delattr__(self, name):
315 raise TypeError("Domain objects are immutable")
317 def __and__(self, other):
318 """Domain & Domain"""
319 if isinstance(other, Domain): 319 ↛ 321line 319 didn't jump to line 321 because the condition on line 319 was always true
320 return DomainAnd.apply([self, other])
321 return NotImplemented
323 def __or__(self, other):
324 """Domain | Domain"""
325 if isinstance(other, Domain): 325 ↛ 327line 325 didn't jump to line 327 because the condition on line 325 was always true
326 return DomainOr.apply([self, other])
327 return NotImplemented
329 def __invert__(self):
330 """~Domain"""
331 return DomainNot(self)
333 def _negate(self, model: BaseModel) -> Domain:
334 """Apply (propagate) negation onto this domain. """
335 return ~self
337 def __add__(self, other):
338 """Domain + [...]
340 For backward-compatibility of domain composition.
341 Concatenate as lists.
342 If we have two domains, equivalent to '&'.
343 """
344 # TODO deprecate this possibility so that users combine domains correctly
345 if isinstance(other, Domain):
346 return self & other
347 if not isinstance(other, list):
348 raise TypeError('Domain() can concatenate only lists')
349 return list(self) + other
351 def __radd__(self, other):
352 """Commutative definition of *+*"""
353 # TODO deprecate this possibility so that users combine domains correctly
354 # we are pre-pending, return a list
355 # because the result may not be normalized
356 return other + list(self)
358 def __bool__(self):
359 """Indicate that the domain is not true.
361 For backward-compatibility, only the domain [] was False. Which means
362 that the TRUE domain is falsy and others are truthy.
363 """
364 # TODO deprecate this usage, we have is_true() and is_false()
365 # warnings.warn("Do not use bool() on Domain, use is_true() or is_false() instead", DeprecationWarning)
366 return not self.is_true()
368 def __eq__(self, other):
369 raise NotImplementedError
371 def __hash__(self):
372 raise NotImplementedError
374 def __iter__(self):
375 """For-backward compatibility, return the polish-notation domain list"""
376 yield from ()
377 raise NotImplementedError
379 def __reversed__(self):
380 """For-backward compatibility, reversed iter"""
381 return reversed(list(self))
383 def __repr__(self) -> str:
384 # return representation of the object as the old-style list
385 return repr(list(self))
387 def is_true(self) -> bool:
388 """Return whether self is TRUE"""
389 return False
391 def is_false(self) -> bool:
392 """Return whether self is FALSE"""
393 return False
395 def iter_conditions(self) -> Iterable[DomainCondition]:
396 """Yield simple conditions of the domain"""
397 yield from ()
399 def map_conditions(self, function: Callable[[DomainCondition], Domain]) -> Domain:
400 """Map a function to each condition and return the combined result"""
401 return self
403 def validate(self, model: BaseModel) -> None:
404 """Validates that the current domain is correct or raises an exception"""
405 # just execute the optimization code that goes through all the fields
406 self._optimize(model, OptimizationLevel.FULL)
408 def _as_predicate(self, records: M) -> Callable[[M], bool]:
409 """Return a predicate function from the domain (bound to records).
410 The predicate function return whether its argument (a single record)
411 satisfies the domain.
413 This is used to implement ``Model.filtered_domain``.
414 """
415 raise NotImplementedError
417 def optimize(self, model: BaseModel) -> Domain:
418 """Perform optimizations of the node given a model.
420 It is a pre-processing step to rewrite the domain into a logically
421 equivalent domain that is a more canonical representation of the
422 predicate. Multiple conditions can be merged together.
424 It applies basic optimizations only. Those are transaction-independent;
425 they only depend on the model's fields definitions. No model-specific
426 override is used, and the resulting domain may be reused in another
427 transaction without semantic impact.
428 The model's fields are used to validate conditions and apply
429 type-dependent optimizations. This optimization level may be useful to
430 simplify a domain that is sent to the client-side, thereby reducing its
431 payload/complexity.
432 """
433 return self._optimize(model, OptimizationLevel.BASIC)
435 def optimize_full(self, model: BaseModel) -> Domain:
436 """Perform optimizations of the node given a model.
438 Basic and advanced optimizations are applied.
439 Advanced optimizations may rely on model specific overrides
440 (search methods of fields, etc.) and the semantic equivalence is only
441 guaranteed at the given point in a transaction. We resolve inherited
442 and non-stored fields (using their search method) to transform the
443 conditions.
444 """
445 return self._optimize(model, OptimizationLevel.FULL)
447 @typing.final
448 def _optimize(self, model: BaseModel, level: OptimizationLevel) -> Domain:
449 """Perform optimizations of the node given a model.
451 Reach a fixed-point by applying the optimizations for the next level
452 on the node until we reach a stable node at the given level.
453 """
454 domain, previous, count = self, None, 0
455 while domain._opt_level < level:
456 if (count := count + 1) > MAX_OPTIMIZE_ITERATIONS: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true
457 raise RecursionError("Domain.optimize: too many loops")
458 next_level = domain._opt_level.next_level
459 previous, domain = domain, domain._optimize_step(model, next_level)
460 # set the optimization level if necessary (unlike DomainBool, for instance)
461 if domain == previous and domain._opt_level < next_level:
462 object.__setattr__(domain, '_opt_level', next_level) # noqa: PLC2801
463 return domain
465 def _optimize_step(self, model: BaseModel, level: OptimizationLevel) -> Domain:
466 """Implementation of domain for one level of optimizations."""
467 return self
469 def _to_sql(self, model: BaseModel, alias: str, query: Query) -> SQL:
470 """Build the SQL to inject into the query. The domain should be optimized first."""
471 raise NotImplementedError
474class DomainBool(Domain):
475 """Constant domain: True/False
477 It is NOT considered as a condition and these constants are removed
478 from nary domains.
479 """
480 __slots__ = ('value',)
481 value: bool
483 def __new__(cls, value: bool):
484 """Create a constant domain."""
485 self = object.__new__(cls)
486 object.__setattr__(self, 'value', value)
487 object.__setattr__(self, '_opt_level', OptimizationLevel.FULL)
488 return self
490 def __eq__(self, other):
491 return self is other # because this class has two instances only
493 def __hash__(self):
494 return hash(self.value)
496 def is_true(self) -> bool:
497 return self.value
499 def is_false(self) -> bool:
500 return not self.value
502 def __invert__(self):
503 return _FALSE_DOMAIN if self.value else _TRUE_DOMAIN
505 def __and__(self, other):
506 if isinstance(other, Domain): 506 ↛ 508line 506 didn't jump to line 508 because the condition on line 506 was always true
507 return other if self.value else self
508 return NotImplemented
510 def __or__(self, other):
511 if isinstance(other, Domain): 511 ↛ 513line 511 didn't jump to line 513 because the condition on line 511 was always true
512 return self if self.value else other
513 return NotImplemented
515 def __iter__(self):
516 yield _TRUE_LEAF if self.value else _FALSE_LEAF
518 def _as_predicate(self, records):
519 return lambda _: self.value
521 def _to_sql(self, model: BaseModel, alias: str, query: Query) -> SQL:
522 return SQL("TRUE") if self.value else SQL("FALSE")
525# singletons, available though Domain.TRUE and Domain.FALSE
526_TRUE_DOMAIN = DomainBool(True)
527_FALSE_DOMAIN = DomainBool(False)
530class DomainNot(Domain):
531 """Negation domain, contains a single child"""
532 OPERATOR = '!'
534 __slots__ = ('child',)
535 child: Domain
537 def __new__(cls, child: Domain):
538 """Create a domain which is the inverse of the child."""
539 self = object.__new__(cls)
540 object.__setattr__(self, 'child', child)
541 object.__setattr__(self, '_opt_level', OptimizationLevel.NONE)
542 return self
544 def __invert__(self):
545 return self.child
547 def __iter__(self):
548 yield self.OPERATOR
549 yield from self.child
551 def iter_conditions(self):
552 yield from self.child.iter_conditions()
554 def map_conditions(self, function) -> Domain:
555 return ~(self.child.map_conditions(function))
557 def _optimize_step(self, model: BaseModel, level: OptimizationLevel) -> Domain:
558 return self.child._optimize(model, level)._negate(model)
560 def __eq__(self, other):
561 return self is other or (isinstance(other, DomainNot) and self.child == other.child)
563 def __hash__(self):
564 return ~hash(self.child)
566 def _as_predicate(self, records):
567 predicate = self.child._as_predicate(records)
568 return lambda rec: not predicate(rec)
570 def _to_sql(self, model: BaseModel, alias: str, query: Query) -> SQL:
571 condition = self.child._to_sql(model, alias, query)
572 return SQL("(%s) IS NOT TRUE", condition)
575class DomainNary(Domain):
576 """Domain for a nary operator: AND or OR with multiple children"""
577 OPERATOR: str
578 OPERATOR_SQL: SQL = SQL(" ??? ")
579 ZERO: DomainBool = _FALSE_DOMAIN # default for lint checks
581 __slots__ = ('children',)
582 children: tuple[Domain, ...]
584 def __new__(cls, children: tuple[Domain, ...]):
585 """Create the n-ary domain with at least 2 conditions."""
586 assert len(children) >= 2
587 self = object.__new__(cls)
588 object.__setattr__(self, 'children', children)
589 object.__setattr__(self, '_opt_level', OptimizationLevel.NONE)
590 return self
592 @classmethod
593 def apply(cls, items: Iterable[Domain]) -> Domain:
594 """Return the result of combining AND/OR to a collection of domains."""
595 children = cls._flatten(items)
596 if len(children) == 1:
597 return children[0]
598 return cls(tuple(children))
600 @classmethod
601 def _flatten(cls, children: Iterable[Domain]) -> list[Domain]:
602 """Return an equivalent list of domains with respect to the boolean
603 operation of the class (AND/OR). Boolean subdomains are simplified,
604 and subdomains of the same class are flattened into the list.
605 The returned list is never empty.
606 """
607 result: list[Domain] = []
608 for child in children:
609 if isinstance(child, DomainBool):
610 if child != cls.ZERO:
611 return [child]
612 elif isinstance(child, cls):
613 result.extend(child.children) # same class, flatten
614 else:
615 result.append(child)
616 return result or [cls.ZERO]
618 def __iter__(self):
619 yield from itertools.repeat(self.OPERATOR, len(self.children) - 1)
620 for child in self.children:
621 yield from child
623 def __eq__(self, other):
624 return self is other or (
625 isinstance(other, DomainNary)
626 and self.OPERATOR == other.OPERATOR
627 and self.children == other.children
628 )
630 def __hash__(self):
631 return hash(self.OPERATOR) ^ hash(self.children)
633 @classproperty
634 def INVERSE(cls) -> type[DomainNary]:
635 """Return the inverted nary type, AND/OR"""
636 raise NotImplementedError
638 def __invert__(self):
639 return self.INVERSE(tuple(~child for child in self.children))
641 def _negate(self, model):
642 return self.INVERSE(tuple(child._negate(model) for child in self.children))
644 def iter_conditions(self):
645 for child in self.children:
646 yield from child.iter_conditions()
648 def map_conditions(self, function) -> Domain:
649 return self.apply(child.map_conditions(function) for child in self.children)
651 def _optimize_step(self, model: BaseModel, level: OptimizationLevel) -> Domain:
652 # optimize children
653 children = self._flatten(child._optimize(model, level) for child in self.children)
654 size = len(children)
655 if size > 1:
656 # sort children in order to ease their grouping by field and operator
657 children.sort(key=_optimize_nary_sort_key)
658 # run optimizations until some merge happens
659 cls = type(self)
660 for merge in _MERGE_OPTIMIZATIONS:
661 children = merge(cls, children, model)
662 if len(children) < size:
663 break
664 else:
665 # if no change, skip creation of a new object
666 if len(self.children) == len(children) and all(map(operator.is_, self.children, children)):
667 return self
668 return self.apply(children)
670 def _to_sql(self, model: BaseModel, alias: str, query: Query) -> SQL:
671 return SQL("(%s)", self.OPERATOR_SQL.join(
672 c._to_sql(model, alias, query)
673 for c in self.children
674 ))
677class DomainAnd(DomainNary):
678 """Domain: AND with multiple children"""
679 __slots__ = ()
680 OPERATOR = '&'
681 OPERATOR_SQL = SQL(" AND ")
682 ZERO = _TRUE_DOMAIN
684 @classproperty
685 def INVERSE(cls) -> type[DomainNary]:
686 return DomainOr
688 def __and__(self, other):
689 # simple optimization to append children
690 if isinstance(other, DomainAnd): 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true
691 return DomainAnd(self.children + other.children)
692 return super().__and__(other)
694 def _as_predicate(self, records):
695 # For the sake of performance, the list of predicates is generated
696 # lazily with a generator, which is memoized with `itertools.tee`
697 all_predicates = (child._as_predicate(records) for child in self.children)
699 def and_predicate(record):
700 nonlocal all_predicates
701 all_predicates, predicates = itertools.tee(all_predicates)
702 return all(pred(record) for pred in predicates)
704 return and_predicate
707class DomainOr(DomainNary):
708 """Domain: OR with multiple children"""
709 __slots__ = ()
710 OPERATOR = '|'
711 OPERATOR_SQL = SQL(" OR ")
712 ZERO = _FALSE_DOMAIN
714 @classproperty
715 def INVERSE(cls) -> type[DomainNary]:
716 return DomainAnd
718 def __or__(self, other):
719 # simple optimization to append children
720 if isinstance(other, DomainOr): 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 return DomainOr(self.children + other.children)
722 return super().__or__(other)
724 def _as_predicate(self, records):
725 # For the sake of performance, the list of predicates is generated
726 # lazily with a generator, which is memoized with `itertools.tee`
727 all_predicates = (child._as_predicate(records) for child in self.children)
729 def or_predicate(record):
730 nonlocal all_predicates
731 all_predicates, predicates = itertools.tee(all_predicates)
732 return any(pred(record) for pred in predicates)
734 return or_predicate
737class DomainCustom(Domain):
738 """Domain condition that generates directly SQL and possibly a ``filtered`` predicate."""
739 __slots__ = ('_filtered', '_sql')
741 _filtered: Callable[[BaseModel], bool] | None
742 _sql: Callable[[BaseModel, str, Query], SQL]
744 def __new__(
745 cls,
746 sql: Callable[[BaseModel, str, Query], SQL],
747 filtered: Callable[[BaseModel], bool] | None = None,
748 ):
749 """Create a new domain.
751 :param to_sql: callable(model, alias, query) that implements ``_to_sql``
752 which is used to generate the query for searching
753 :param predicate: callable(record) that checks whether a record is kept
754 when filtering (``Model.filtered``)
755 """
756 self = object.__new__(cls)
757 object.__setattr__(self, '_sql', sql)
758 object.__setattr__(self, '_filtered', filtered)
759 object.__setattr__(self, '_opt_level', OptimizationLevel.FULL)
760 return self
762 def _as_predicate(self, records):
763 if self._filtered is not None:
764 return self._filtered
765 # by default, run the SQL query
766 query = records._search(DomainCondition('id', 'in', records.ids) & self, order='id')
767 return DomainCondition('id', 'any', query)._as_predicate(records)
769 def __eq__(self, other):
770 return (
771 isinstance(other, DomainCustom)
772 and self._sql == other._sql
773 and self._filtered == other._filtered
774 )
776 def __hash__(self):
777 return hash(self._sql)
779 def __iter__(self):
780 yield self
782 def _to_sql(self, model: BaseModel, alias: str, query: Query) -> SQL:
783 return self._sql(model, alias, query)
786class DomainCondition(Domain):
787 """Domain condition on field: (field, operator, value)
789 A field (or expression) is compared to a value. The list of supported
790 operators are described in CONDITION_OPERATORS.
791 """
792 __slots__ = ('_field_instance', 'field_expr', 'operator', 'value')
793 _field_instance: Field | None # mutable cached property
794 field_expr: str
795 operator: str
796 value: typing.Any
798 def __new__(cls, field_expr: str, operator: str, value):
799 """Init a new simple condition (internal init)
801 :param field_expr: Field name or field path
802 :param operator: A valid operator
803 :param value: A value for the comparison
804 """
805 self = object.__new__(cls)
806 object.__setattr__(self, 'field_expr', field_expr)
807 object.__setattr__(self, 'operator', operator)
808 object.__setattr__(self, 'value', value)
809 object.__setattr__(self, '_field_instance', None)
810 object.__setattr__(self, '_opt_level', OptimizationLevel.NONE)
811 return self
813 def checked(self) -> DomainCondition:
814 """Validate `self` and return it if correct, otherwise raise an exception."""
815 if not isinstance(self.field_expr, str) or not self.field_expr: 815 ↛ 816line 815 didn't jump to line 816 because the condition on line 815 was never true
816 self._raise("Empty field name", error=TypeError)
817 operator = self.operator.lower()
818 if operator != self.operator: 818 ↛ 819line 818 didn't jump to line 819 because the condition on line 818 was never true
819 warnings.warn(f"Deprecated since 19.0, the domain condition {(self.field_expr, self.operator, self.value)!r} should have a lower-case operator", DeprecationWarning)
820 return DomainCondition(self.field_expr, operator, self.value).checked()
821 if operator not in CONDITION_OPERATORS: 821 ↛ 822line 821 didn't jump to line 822 because the condition on line 821 was never true
822 self._raise("Invalid operator")
823 # check already the consistency for domain manipulation
824 # these are common mistakes and optimizations, do them here to avoid recreating the domain
825 # - NewId is not a value
826 # - records are not accepted, use values
827 # - Query and Domain values should be using a relational operator
828 from .models import BaseModel # noqa: PLC0415
829 value = self.value
830 if value is None:
831 value = False
832 elif isinstance(value, NewId): 832 ↛ 833line 832 didn't jump to line 833 because the condition on line 832 was never true
833 _logger.warning("Domains don't support NewId, use .ids instead, for %r", (self.field_expr, self.operator, self.value))
834 operator = 'not in' if operator in NEGATIVE_CONDITION_OPERATORS else 'in'
835 value = []
836 elif isinstance(value, BaseModel): 836 ↛ 837line 836 didn't jump to line 837 because the condition on line 836 was never true
837 _logger.warning("The domain condition %r should not have a value which is a model", (self.field_expr, self.operator, self.value))
838 value = value.ids
839 elif isinstance(value, (Domain, Query, SQL)) and operator not in ('any', 'not any', 'any!', 'not any!', 'in', 'not in'): 839 ↛ 842line 839 didn't jump to line 842 because the condition on line 839 was never true
840 # accept SQL object in the right part for simple operators
841 # use case: compare 2 fields
842 _logger.warning("The domain condition %r should use the 'any' or 'not any' operator.", (self.field_expr, self.operator, self.value))
843 if value is not self.value:
844 return DomainCondition(self.field_expr, operator, value)
845 return self
847 def __invert__(self):
848 # do it only for simple fields (not expressions)
849 # inequalities are handled in _negate()
850 if "." not in self.field_expr and (neg_op := _INVERSE_OPERATOR.get(self.operator)):
851 return DomainCondition(self.field_expr, neg_op, self.value)
852 return super().__invert__()
854 def _negate(self, model):
855 # inverse of the operators is handled by construction
856 # except for inequalities for which we must know the field's type
857 if neg_op := _INVERSE_INEQUALITY.get(self.operator): 857 ↛ 861line 857 didn't jump to line 861 because the condition on line 857 was never true
858 # Inverse and add a self "or field is null"
859 # when the field does not have a falsy value.
860 # Having a falsy value is handled correctly in the SQL generation.
861 condition = DomainCondition(self.field_expr, neg_op, self.value)
862 if self._field(model).falsy_value is None:
863 is_null = DomainCondition(self.field_expr, 'in', OrderedSet([False]))
864 condition = is_null | condition
865 return condition
867 return super()._negate(model)
869 def __iter__(self):
870 field_expr, operator, value = self.field_expr, self.operator, self.value
871 # if the value is a domain or set, change it into a list
872 if isinstance(value, (*COLLECTION_TYPES, Domain)):
873 value = list(value)
874 yield (field_expr, operator, value)
876 def __eq__(self, other):
877 return self is other or (
878 isinstance(other, DomainCondition)
879 and self.field_expr == other.field_expr
880 and self.operator == other.operator
881 # we want stricter equality than this: `OrderedSet([x]) == {x}`
882 # to ensure that optimizations always return OrderedSet values
883 and self.value.__class__ is other.value.__class__
884 and self.value == other.value
885 )
887 def __hash__(self):
888 return hash(self.field_expr) ^ hash(self.operator) ^ hash(self.value)
890 def iter_conditions(self):
891 yield self
893 def map_conditions(self, function) -> Domain:
894 result = function(self)
895 assert isinstance(result, Domain), "result of map_conditions is not a Domain"
896 return result
898 def _raise(self, message: str, *args, error=ValueError) -> typing.NoReturn:
899 """Raise an error message for this condition"""
900 message += ' in condition (%r, %r, %r)'
901 raise error(message % (*args, self.field_expr, self.operator, self.value))
903 def _field(self, model: BaseModel) -> Field:
904 """Cached Field instance for the expression."""
905 field = self._field_instance # type: ignore[arg-type]
906 if field is None or field.model_name != model._name: 906 ↛ 907line 906 didn't jump to line 907 because the condition on line 906 was never true
907 field, _ = self.__get_field(model)
908 return field
910 def __get_field(self, model: BaseModel) -> tuple[Field, str]:
911 """Get the field or raise an exception"""
912 field_name, property_name = parse_field_expr(self.field_expr)
913 try:
914 field = model._fields[field_name]
915 except KeyError:
916 self._raise("Invalid field %s.%s", model._name, field_name)
917 # cache field value, with this hack to bypass immutability
918 object.__setattr__(self, '_field_instance', field)
919 return field, property_name or ''
921 def _optimize_step(self, model: BaseModel, level: OptimizationLevel) -> Domain:
922 """Optimization step.
924 Apply some generic optimizations and then dispatch optimizations
925 according to the operator and the type of the field.
926 Optimize recursively until a fixed point is found.
928 - Validate the field.
929 - Decompose *paths* into domains using 'any'.
930 - If the field is *not stored*, run the search function of the field.
931 - Run optimizations.
932 - Check the output.
933 """
934 assert level is self._opt_level.next_level, f"Trying to skip optimization level after {self._opt_level}"
936 if level == OptimizationLevel.BASIC:
937 # optimize path
938 field, property_name = self.__get_field(model)
939 if property_name and field.relational:
940 sub_domain = DomainCondition(property_name, self.operator, self.value)
941 return DomainCondition(field.name, 'any', sub_domain)
942 else:
943 field = self._field(model)
945 if level == OptimizationLevel.FULL:
946 # resolve inherited fields
947 # inherits implies both Field.delegate=True and Field.bypass_search_access=True
948 # so no additional permissions will be added by the 'any' operator below
949 if field.inherited:
950 assert field.related
951 parent_fname = field.related.split('.')[0]
952 parent_domain = DomainCondition(self.field_expr, self.operator, self.value)
953 return DomainCondition(parent_fname, 'any', parent_domain)
955 # handle searchable fields
956 if field.search and field.name == self.field_expr:
957 domain = self._optimize_field_search_method(model)
958 # The domain is optimized so that value data types are comparable.
959 # Only simple optimization to avoid endless recursion.
960 domain = domain.optimize(model)
961 if domain != self: 961 ↛ 965line 961 didn't jump to line 965 because the condition on line 961 was always true
962 return domain
964 # apply optimizations of the level for operator and type
965 optimizations = _OPTIMIZATIONS_FOR[level]
966 for opt in optimizations.get(self.operator, ()):
967 domain = opt(self, model)
968 if domain != self:
969 return domain
970 for opt in optimizations.get(field.type, ()):
971 domain = opt(self, model)
972 if domain != self:
973 return domain
975 # final checks
976 if self.operator not in STANDARD_CONDITION_OPERATORS and level == OptimizationLevel.FULL: 976 ↛ 977line 976 didn't jump to line 977 because the condition on line 976 was never true
977 self._raise("Not standard operator left")
979 return self
981 def _optimize_field_search_method(self, model: BaseModel) -> Domain:
982 field = self._field(model)
983 operator, value = self.operator, self.value
984 # use the `Field.search` function
985 original_exception = None
986 try:
987 computed_domain = field.determine_domain(model, operator, value)
988 except (NotImplementedError, UserError) as e:
989 computed_domain = NotImplemented
990 original_exception = e
991 else:
992 if computed_domain is not NotImplemented:
993 return Domain(computed_domain, internal=True)
994 # try with the positive operator
995 if (
996 original_exception is None
997 and (inversed_opeator := _INVERSE_OPERATOR.get(operator))
998 ):
999 computed_domain = field.determine_domain(model, inversed_opeator, value)
1000 if computed_domain is not NotImplemented: 1000 ↛ 1003line 1000 didn't jump to line 1003 because the condition on line 1000 was always true
1001 return ~Domain(computed_domain, internal=True)
1002 # compatibility for any!
1003 try:
1004 if operator in ('any!', 'not any!'): 1004 ↛ 1006line 1004 didn't jump to line 1006 because the condition on line 1004 was never true
1005 # Not strictly equivalent! If a search is executed, it will be done using sudo.
1006 computed_domain = DomainCondition(self.field_expr, operator.rstrip('!'), value)
1007 computed_domain = computed_domain._optimize_field_search_method(model.sudo())
1008 _logger.warning("Field %s should implement any! operator", field)
1009 return computed_domain
1010 except (NotImplementedError, UserError) as e:
1011 if original_exception is None:
1012 original_exception = e
1013 # backward compatibility to implement only '=' or '!='
1014 try:
1015 if operator == 'in': 1015 ↛ 1017line 1015 didn't jump to line 1017 because the condition on line 1015 was always true
1016 return Domain.OR(Domain(field.determine_domain(model, '=', v), internal=True) for v in value)
1017 elif operator == 'not in':
1018 return Domain.AND(Domain(field.determine_domain(model, '!=', v), internal=True) for v in value)
1019 except (NotImplementedError, UserError) as e:
1020 if original_exception is None:
1021 original_exception = e
1022 # raise the error
1023 if original_exception:
1024 raise original_exception
1025 raise UserError(model.env._(
1026 "Unsupported operator on %(field_label)s %(model_label)s in %(domain)s",
1027 domain=repr(self),
1028 field_label=self._field(model).get_description(model.env, ['string'])['string'],
1029 model_label=f"{model.env['ir.model']._get(model._name).name!r} ({model._name})",
1030 ))
1032 def _as_predicate(self, records):
1033 if not records: 1033 ↛ 1034line 1033 didn't jump to line 1034 because the condition on line 1033 was never true
1034 return lambda _: False
1036 if self._opt_level < OptimizationLevel.DYNAMIC_VALUES:
1037 return self._optimize(records, OptimizationLevel.DYNAMIC_VALUES)._as_predicate(records)
1039 operator = self.operator
1040 if operator in ('child_of', 'parent_of'): 1040 ↛ 1042line 1040 didn't jump to line 1042 because the condition on line 1040 was never true
1041 # TODO have a specific implementation for these
1042 return self._optimize(records, OptimizationLevel.FULL)._as_predicate(records)
1044 assert operator in STANDARD_CONDITION_OPERATORS, "Expecting a sub-set of operators"
1045 field_expr, value = self.field_expr, self.value
1046 positive_operator = NEGATIVE_CONDITION_OPERATORS.get(operator, operator)
1048 if isinstance(value, SQL): 1048 ↛ 1050line 1048 didn't jump to line 1050 because the condition on line 1048 was never true
1049 # transform into an Query value
1050 if positive_operator == operator:
1051 condition = self
1052 operator = 'any!'
1053 else:
1054 condition = ~self
1055 operator = 'not any!'
1056 positive_operator = 'any!'
1057 field_expr = 'id'
1058 value = records.with_context(active_test=False)._search(DomainCondition('id', 'in', OrderedSet(records.ids)) & condition)
1059 assert isinstance(value, Query)
1061 if isinstance(value, Query): 1061 ↛ 1063line 1061 didn't jump to line 1063 because the condition on line 1061 was never true
1062 # rebuild a domain with an 'in' values
1063 if positive_operator not in ('in', 'any', 'any!'):
1064 self._raise("Cannot filter using Query without the 'any' or 'in' operator")
1065 if positive_operator != 'in':
1066 operator = 'in' if positive_operator == operator else 'not in'
1067 positive_operator = 'in'
1068 value = set(value.get_result_ids())
1069 return DomainCondition(field_expr, operator, value)._as_predicate(records)
1071 field = self._field(records)
1072 if field_expr == 'display_name': 1072 ↛ 1074line 1072 didn't jump to line 1074 because the condition on line 1072 was never true
1073 # when searching by name, ignore AccessError
1074 field_expr = 'display_name.no_error'
1075 elif field_expr == 'id':
1076 # for new records, compare to their origin
1077 field_expr = 'id.origin'
1079 func = field.filter_function(records, field_expr, positive_operator, value)
1080 return func if positive_operator == operator else lambda rec: not func(rec)
1082 def _to_sql(self, model: BaseModel, alias: str, query: Query) -> SQL:
1083 field_expr, operator, value = self.field_expr, self.operator, self.value
1084 assert operator in STANDARD_CONDITION_OPERATORS, \
1085 f"Invalid operator {operator!r} for SQL in domain term {(field_expr, operator, value)!r}"
1086 assert self._opt_level >= OptimizationLevel.FULL, \
1087 f"Must fully optimize before generating the query {(field_expr, operator, value)}"
1089 field = self._field(model)
1090 model._check_field_access(field, 'read')
1091 return field.condition_to_sql(field_expr, operator, value, model, alias, query)
1094# --------------------------------------------------
1095# Optimizations: registration
1096# --------------------------------------------------
1098ANY_TYPES = (Domain, Query, SQL)
1100if typing.TYPE_CHECKING:
1101 ConditionOptimization = Callable[[DomainCondition, BaseModel], Domain]
1102 MergeOptimization = Callable[[type[DomainNary], list[Domain], BaseModel], list[Domain]]
1104_OPTIMIZATIONS_FOR: dict[OptimizationLevel, dict[str, list[ConditionOptimization]]] = {
1105 level: collections.defaultdict(list) for level in OptimizationLevel if level != OptimizationLevel.NONE}
1106_MERGE_OPTIMIZATIONS: list[MergeOptimization] = list()
1109def operator_optimization(operators: Collection[str], level: OptimizationLevel = OptimizationLevel.BASIC):
1110 """Register a condition operator optimization for (condition, model)"""
1111 assert operators, "Missing operator to register"
1112 CONDITION_OPERATORS.update(operators)
1114 def register(optimization: ConditionOptimization):
1115 mapping = _OPTIMIZATIONS_FOR[level]
1116 for operator in operators: # noqa: F402
1117 mapping[operator].append(optimization)
1118 return optimization
1120 return register
1123def field_type_optimization(field_types: Collection[str], level: OptimizationLevel = OptimizationLevel.BASIC):
1124 """Register a condition optimization by field type for (condition, model)"""
1125 def register(optimization: ConditionOptimization):
1126 mapping = _OPTIMIZATIONS_FOR[level]
1127 for field_type in field_types:
1128 mapping[field_type].append(optimization)
1129 return optimization
1131 return register
1134def _optimize_nary_sort_key(domain: Domain) -> tuple[str, str, str]:
1135 """Sorting key for nary domains so that similar operators are grouped together.
1137 1. Field name (non-simple conditions are sorted at the end)
1138 2. Operator type (equality, inequality, existence, string comparison, other)
1139 3. Operator
1141 Sorting allows to have the same optimized domain for equivalent conditions.
1142 For debugging, it eases to find conditions on fields.
1143 The generated SQL will be ordered by field name so that database caching
1144 can be applied more frequently.
1145 """
1146 if isinstance(domain, DomainCondition):
1147 # group the same field and same operator together
1148 operator = domain.operator
1149 positive_op = NEGATIVE_CONDITION_OPERATORS.get(operator, operator)
1150 if positive_op == 'in':
1151 order = "0in"
1152 elif positive_op == 'any':
1153 order = "1any"
1154 elif positive_op == 'any!':
1155 order = "2any"
1156 elif positive_op.endswith('like'):
1157 order = "like"
1158 else:
1159 order = positive_op
1160 return domain.field_expr, order, operator
1161 elif hasattr(domain, 'OPERATOR') and isinstance(domain.OPERATOR, str): 1161 ↛ 1165line 1161 didn't jump to line 1165 because the condition on line 1161 was always true
1162 # in python; '~' > any letter
1163 return '~', '', domain.OPERATOR
1164 else:
1165 return '~', '~', domain.__class__.__name__
1168def nary_optimization(optimization: MergeOptimization):
1169 """Register an optimization to a list of children of an nary domain.
1171 The function will take an iterable containing optimized children of a
1172 n-ary domain and returns *optimized* domains.
1174 Note that you always need to optimize both AND and OR domains. It is always
1175 possible because if you can optimize `a & b` then you can optimize `a | b`
1176 because it is optimizing `~(~a & ~b)`. Since operators can be negated,
1177 all implementations of optimizations are implemented in a mirrored way:
1178 `(optimize AND) if some_condition == cls.ZERO.value else (optimize OR)`.
1180 The optimization of nary domains starts by optimizing the children,
1181 then sorts them by (field, operator_type, operator) where operator type
1182 groups similar operators together.
1183 """
1184 _MERGE_OPTIMIZATIONS.append(optimization)
1185 return optimization
1188def nary_condition_optimization(operators: Collection[str], field_types: Collection[str] | None = None):
1189 """Register an optimization for condition children of an nary domain.
1191 The function will take a list of domain conditions of the same field and
1192 returns *optimized* domains.
1194 This is a adapter function that uses `nary_optimization`.
1196 NOTE: if you want to merge different operators, register for
1197 `operator=CONDITION_OPERATORS` and find conditions that you want to merge.
1198 """
1199 def register(optimization: Callable[[type[DomainNary], list[DomainCondition], BaseModel], list[Domain]]):
1200 @nary_optimization
1201 def optimizer(cls, domains: list[Domain], model):
1202 # trick: result remains None until an optimization is applied, after
1203 # which it becomes the optimization of domains[:index]
1204 result = None
1205 # when not None, domains[block:index] are all conditions with the same field_expr
1206 block = None
1208 domains_iterator = enumerate(domains)
1209 stop_item = (len(domains), None)
1210 while True:
1211 # enumerating domains and adding the stop_item as the sentinel
1212 # so that the last loop merges the domains and stops the iteration
1213 index, domain = next(domains_iterator, stop_item)
1214 matching = isinstance(domain, DomainCondition) and domain.operator in operators
1216 if block is not None and not (matching and domain.field_expr == domains[block].field_expr):
1217 # optimize domains[block:index] if necessary and "flush" them in result
1218 if block < index - 1 and (
1219 field_types is None or domains[block]._field(model).type in field_types
1220 ):
1221 if result is None: 1221 ↛ 1223line 1221 didn't jump to line 1223 because the condition on line 1221 was always true
1222 result = domains[:block]
1223 result.extend(optimization(cls, domains[block:index], model))
1224 elif result is not None:
1225 result.extend(domains[block:index])
1226 block = None
1228 # block is None or (matching and domain.field_expr == domains[block].field_expr)
1229 if domain is None:
1230 break
1231 if matching:
1232 if block is None:
1233 block = index
1234 elif result is not None:
1235 result.append(domain)
1237 # block is None
1238 return domains if result is None else result
1240 return optimization
1242 return register
1245# --------------------------------------------------
1246# Optimizations: conditions
1247# --------------------------------------------------
1250@operator_optimization(['=?'])
1251def _operator_equal_if_value(condition, _):
1252 """a =? b <=> not b or a = b"""
1253 if not condition.value:
1254 return _TRUE_DOMAIN
1255 return DomainCondition(condition.field_expr, '=', condition.value)
1258@operator_optimization(['<>'])
1259def _operator_different(condition, _):
1260 """a <> b => a != b"""
1261 # already a rewrite-rule
1262 warnings.warn("Operator '<>' is deprecated since 19.0, use '!=' directly", DeprecationWarning)
1263 return DomainCondition(condition.field_expr, '!=', condition.value)
1266@operator_optimization(['=='])
1267def _operator_equals(condition, _):
1268 """a == b => a = b"""
1269 # rewrite-rule
1270 warnings.warn("Operator '==' is deprecated since 19.0, use '=' directly", DeprecationWarning)
1271 return DomainCondition(condition.field_expr, '=', condition.value)
1274@operator_optimization(['=', '!='])
1275def _operator_equal_as_in(condition, _):
1276 """ Equality operators.
1278 Validation for some types and translate collection into 'in'.
1279 """
1280 value = condition.value
1281 operator = 'in' if condition.operator == '=' else 'not in'
1282 if isinstance(value, COLLECTION_TYPES):
1283 # TODO make a warning or equality against a collection
1284 if not value: # views sometimes use ('user_ids', '!=', []) to indicate the user is set 1284 ↛ 1285line 1284 didn't jump to line 1285 because the condition on line 1284 was never true
1285 _logger.debug("The domain condition %r should compare with False.", condition)
1286 value = OrderedSet([False])
1287 else:
1288 _logger.debug("The domain condition %r should use the 'in' or 'not in' operator.", condition)
1289 value = OrderedSet(value)
1290 elif isinstance(value, SQL): 1290 ↛ 1292line 1290 didn't jump to line 1292 because the condition on line 1290 was never true
1291 # transform '=' SQL("x") into 'in' SQL("(x)")
1292 value = SQL("(%s)", value)
1293 else:
1294 value = OrderedSet((value,))
1295 return DomainCondition(condition.field_expr, operator, value)
1298@operator_optimization(['in', 'not in'])
1299def _optimize_in_set(condition, _model):
1300 """Make sure the value is an OrderedSet or use 'any' operator"""
1301 value = condition.value
1302 if isinstance(value, OrderedSet) and value:
1303 # very common case, just skip creation of a new Domain instance
1304 return condition
1305 if isinstance(value, ANY_TYPES):
1306 operator = 'any' if condition.operator == 'in' else 'not any'
1307 return DomainCondition(condition.field_expr, operator, value)
1308 if not value:
1309 return _FALSE_DOMAIN if condition.operator == 'in' else _TRUE_DOMAIN
1310 if not isinstance(value, COLLECTION_TYPES):
1311 # TODO show warning, note that condition.field_expr in ('group_ids', 'user_ids') gives a lot of them
1312 _logger.debug("The domain condition %r should have a list value.", condition)
1313 value = [value]
1314 return DomainCondition(condition.field_expr, condition.operator, OrderedSet(value))
1317@operator_optimization(['in', 'not in'])
1318def _optimize_in_required(condition, model):
1319 """Remove checks against a null value for required fields."""
1320 value = condition.value
1321 field = condition._field(model)
1322 if (
1323 field.falsy_value is None
1324 and (field.required or field.name == 'id')
1325 and field in model.env.registry.not_null_fields
1326 # only optimize if there are no NewId's
1327 and all(model._ids)
1328 ):
1329 value = OrderedSet(v for v in value if v is not False)
1330 if len(value) == len(condition.value): 1330 ↛ 1332line 1330 didn't jump to line 1332 because the condition on line 1330 was always true
1331 return condition
1332 return DomainCondition(condition.field_expr, condition.operator, value)
1335@operator_optimization(['any', 'not any', 'any!', 'not any!'])
1336def _optimize_any_domain(condition, model):
1337 """Make sure the value is an optimized domain (or Query or SQL)"""
1338 value = condition.value
1339 if isinstance(value, ANY_TYPES) and not isinstance(value, Domain):
1340 if condition.operator in ('any', 'not any'):
1341 # update operator to 'any!'
1342 return DomainCondition(condition.field_expr, condition.operator + '!', condition.value)
1343 return condition
1344 domain = Domain(value)
1345 field = condition._field(model)
1346 if field.name == 'id': 1346 ↛ 1349line 1346 didn't jump to line 1349 because the condition on line 1346 was never true
1347 # id ANY domain <=> domain
1348 # id NOT ANY domain <=> ~domain
1349 return domain if condition.operator in ('any', 'any!') else ~domain
1350 if value is domain: 1350 ↛ 1353line 1350 didn't jump to line 1353 because the condition on line 1350 was always true
1351 # avoid recreating the same condition
1352 return condition
1353 return DomainCondition(condition.field_expr, condition.operator, domain)
1356# register and bind multiple levels later
1357def _optimize_any_domain_at_level(level: OptimizationLevel, condition, model):
1358 domain = condition.value
1359 if not isinstance(domain, Domain):
1360 return condition
1361 field = condition._field(model)
1362 if not field.relational: 1362 ↛ 1363line 1362 didn't jump to line 1363 because the condition on line 1362 was never true
1363 condition._raise("Cannot use 'any' with non-relational fields")
1364 try:
1365 comodel = model.env[field.comodel_name]
1366 except KeyError:
1367 condition._raise("Cannot determine the comodel relation")
1368 domain = domain._optimize(comodel, level)
1369 # const if the domain is empty, the result is a constant
1370 # if the domain is True, we keep it as is
1371 if domain.is_false():
1372 return _FALSE_DOMAIN if condition.operator in ('any', 'any!') else _TRUE_DOMAIN
1373 if domain is condition.value:
1374 # avoid recreating the same condition
1375 return condition
1376 return DomainCondition(condition.field_expr, condition.operator, domain)
1379[
1380 operator_optimization(('any', 'not any', 'any!', 'not any!'), level)(functools.partial(_optimize_any_domain_at_level, level))
1381 for level in OptimizationLevel
1382 if level > OptimizationLevel.NONE
1383]
1386@operator_optimization([op for op in CONDITION_OPERATORS if op.endswith('like')])
1387def _optimize_like_str(condition, model):
1388 """Validate value for pattern matching, must be a str"""
1389 value = condition.value
1390 if not value: 1390 ↛ 1392line 1390 didn't jump to line 1392 because the condition on line 1390 was never true
1391 # =like matches only empty string (inverse the condition)
1392 result = (condition.operator in NEGATIVE_CONDITION_OPERATORS) == ('=' in condition.operator)
1393 # relational and non-relation fields behave differently
1394 if condition._field(model).relational or '=' in condition.operator:
1395 return DomainCondition(condition.field_expr, '!=' if result else '=', False)
1396 return Domain(result)
1397 if isinstance(value, str): 1397 ↛ 1399line 1397 didn't jump to line 1399 because the condition on line 1397 was always true
1398 return condition
1399 if isinstance(value, SQL):
1400 warnings.warn("Since 19.0, use Domain.custom(to_sql=lambda model, alias, query: SQL(...))", DeprecationWarning)
1401 return condition
1402 if '=' in condition.operator:
1403 condition._raise("The pattern to match must be a string", error=TypeError)
1404 return DomainCondition(condition.field_expr, condition.operator, str(value))
1407@field_type_optimization(['many2one', 'one2many', 'many2many'])
1408def _optimize_relational_name_search(condition, model):
1409 """Search relational using `display_name`.
1411 When a relational field is compared to a string, we actually want to make
1412 a condition on the `display_name` field.
1413 Negative conditions are translated into a "not any" for consistency.
1414 """
1415 operator = condition.operator
1416 value = condition.value
1417 positive_operator = NEGATIVE_CONDITION_OPERATORS.get(operator, operator)
1418 any_operator = 'any' if positive_operator == operator else 'not any'
1419 # Handle like operator
1420 if operator.endswith('like'): 1420 ↛ 1421line 1420 didn't jump to line 1421 because the condition on line 1420 was never true
1421 return DomainCondition(
1422 condition.field_expr,
1423 any_operator,
1424 DomainCondition('display_name', positive_operator, value),
1425 )
1426 # Handle inequality as not supported
1427 if operator[0] in ('<', '>') and isinstance(value, str): 1427 ↛ 1428line 1427 didn't jump to line 1428 because the condition on line 1427 was never true
1428 condition._raise("Inequality not supported for relational field using a string", error=TypeError)
1429 # Handle equality with str values
1430 if positive_operator != 'in' or not isinstance(value, COLLECTION_TYPES):
1431 return condition
1432 str_values, other_values = partition(lambda v: isinstance(v, str), value)
1433 if not str_values:
1434 return condition
1435 domain = DomainCondition(
1436 condition.field_expr,
1437 any_operator,
1438 DomainCondition('display_name', positive_operator, str_values),
1439 )
1440 if other_values: 1440 ↛ 1441line 1440 didn't jump to line 1441 because the condition on line 1440 was never true
1441 if positive_operator == operator:
1442 domain |= DomainCondition(condition.field_expr, operator, other_values)
1443 else:
1444 domain &= DomainCondition(condition.field_expr, operator, other_values)
1445 return domain
1448@field_type_optimization(['boolean'])
1449def _optimize_boolean_in(condition, model):
1450 """b in boolean_values"""
1451 value = condition.value
1452 operator = condition.operator
1453 if operator not in ('in', 'not in') or not isinstance(value, COLLECTION_TYPES): 1453 ↛ 1454line 1453 didn't jump to line 1454 because the condition on line 1453 was never true
1454 condition._raise("Cannot compare %r to %s which is not a collection of length 1", condition.field_expr, type(value))
1455 if not all(isinstance(v, bool) for v in value):
1456 # parse the values
1457 if any(isinstance(v, str) for v in value): 1457 ↛ 1460line 1457 didn't jump to line 1460 because the condition on line 1457 was always true
1458 # TODO make a warning
1459 _logger.debug("Comparing boolean with a string in %s", condition)
1460 value = {
1461 str2bool(v.lower(), False) if isinstance(v, str) else bool(v)
1462 for v in value
1463 }
1464 if len(value) == 1 and not any(value):
1465 # when comparing boolean values, always compare to [True] if possible
1466 # it eases the implementation of search methods
1467 operator = _INVERSE_OPERATOR[operator]
1468 value = [True]
1469 return DomainCondition(condition.field_expr, operator, value)
1472@field_type_optimization(['boolean'], OptimizationLevel.FULL)
1473def _optimize_boolean_in_all(condition, model):
1474 """b in [True, False] => True"""
1475 if isinstance(condition.value, COLLECTION_TYPES) and set(condition.value) == {False, True}: 1475 ↛ 1479line 1475 didn't jump to line 1479 because the condition on line 1475 was never true
1476 # tautology is simplified to a boolean
1477 # note that this optimization removes fields (like active) from the domain
1478 # so we do this only on FULL level to avoid removing it from sub-domains
1479 return Domain(condition.operator == 'in')
1480 return condition
1483def _value_to_date(value, env, iso_only=False):
1484 # check datetime first, because it's a subclass of date
1485 if isinstance(value, datetime): 1485 ↛ 1486line 1485 didn't jump to line 1486 because the condition on line 1485 was never true
1486 return value.date()
1487 if isinstance(value, date) or value is False:
1488 return value
1489 if isinstance(value, str):
1490 if iso_only: 1490 ↛ 1498line 1490 didn't jump to line 1498 because the condition on line 1490 was always true
1491 try:
1492 value = parse_iso_date(value)
1493 except ValueError:
1494 # check format
1495 parse_date(value, env)
1496 return value
1497 else:
1498 value = parse_date(value, env)
1499 return _value_to_date(value, env)
1500 if isinstance(value, COLLECTION_TYPES): 1500 ↛ 1502line 1500 didn't jump to line 1502 because the condition on line 1500 was always true
1501 return OrderedSet(_value_to_date(v, env=env, iso_only=iso_only) for v in value)
1502 if isinstance(value, SQL):
1503 warnings.warn("Since 19.0, use Domain.custom(to_sql=lambda model, alias, query: SQL(...))", DeprecationWarning)
1504 return value
1505 raise ValueError(f'Failed to cast {value!r} into a date')
1508@field_type_optimization(['date'])
1509def _optimize_type_date(condition, model):
1510 """Make sure we have a date type in the value"""
1511 operator = condition.operator
1512 if ( 1512 ↛ 1516line 1512 didn't jump to line 1516 because the condition on line 1512 was never true
1513 operator not in ('in', 'not in', '>', '<', '<=', '>=')
1514 or "." in condition.field_expr
1515 ):
1516 return condition
1517 value = _value_to_date(condition.value, model.env, iso_only=True)
1518 if value is False and operator[0] in ('<', '>'): 1518 ↛ 1520line 1518 didn't jump to line 1520 because the condition on line 1518 was never true
1519 # comparison to False results in an empty domain
1520 return _FALSE_DOMAIN
1521 return DomainCondition(condition.field_expr, operator, value)
1524@field_type_optimization(['date'], level=OptimizationLevel.DYNAMIC_VALUES)
1525def _optimize_type_date_relative(condition, model):
1526 operator = condition.operator
1527 if (
1528 operator not in ('in', 'not in', '>', '<', '<=', '>=')
1529 or "." in condition.field_expr
1530 or not isinstance(condition.value, (str, OrderedSet))
1531 ):
1532 return condition
1533 value = _value_to_date(condition.value, model.env)
1534 return DomainCondition(condition.field_expr, operator, value)
1537def _value_to_datetime(value, env, iso_only=False):
1538 """Convert a value(s) to datetime.
1540 :returns: A tuple containing the converted value and a boolean indicating
1541 that all input values were dates.
1542 These are handled differently during rewrites.
1543 """
1544 if isinstance(value, datetime):
1545 if value.tzinfo: 1545 ↛ 1547line 1545 didn't jump to line 1547 because the condition on line 1545 was never true
1546 # cast to a naive datetime
1547 warnings.warn("Use naive datetimes in domains")
1548 value = value.astimezone(timezone.utc).replace(tzinfo=None)
1549 return value, False
1550 if value is False:
1551 return False, True
1552 if isinstance(value, str): 1552 ↛ 1553line 1552 didn't jump to line 1553 because the condition on line 1552 was never true
1553 if iso_only:
1554 try:
1555 value = parse_iso_date(value)
1556 except ValueError:
1557 # check formatting
1558 _dt, is_date = _value_to_datetime(parse_date(value, env), env)
1559 return value, is_date
1560 else:
1561 value = parse_date(value, env)
1562 return _value_to_datetime(value, env)
1563 if isinstance(value, date): 1563 ↛ 1564line 1563 didn't jump to line 1564 because the condition on line 1563 was never true
1564 if value.year in (1, 9999):
1565 # avoid overflow errors, treat as UTC timezone
1566 tz = None
1567 elif (tz := env.tz) != pytz.utc:
1568 # get the tzinfo (without LMT)
1569 tz = tz.localize(datetime.combine(value, time.min)).tzinfo
1570 else:
1571 tz = None
1572 value = datetime.combine(value, time.min, tz)
1573 if tz is not None:
1574 value = value.astimezone(timezone.utc).replace(tzinfo=None)
1575 return value, True
1576 if isinstance(value, COLLECTION_TYPES): 1576 ↛ 1579line 1576 didn't jump to line 1579 because the condition on line 1576 was always true
1577 value, is_date = zip(*(_value_to_datetime(v, env=env, iso_only=iso_only) for v in value))
1578 return OrderedSet(value), all(is_date)
1579 if isinstance(value, SQL):
1580 warnings.warn("Since 19.0, use Domain.custom(to_sql=lambda model, alias, query: SQL(...))", DeprecationWarning)
1581 return value, False
1582 raise ValueError(f'Failed to cast {value!r} into a datetime')
1585@field_type_optimization(['datetime'])
1586def _optimize_type_datetime(condition, model):
1587 """Make sure we have a datetime type in the value"""
1588 field_expr = condition.field_expr
1589 operator = condition.operator
1590 if ( 1590 ↛ 1594line 1590 didn't jump to line 1594 because the condition on line 1590 was never true
1591 operator not in ('in', 'not in', '>', '<', '<=', '>=')
1592 or "." in field_expr
1593 ):
1594 return condition
1595 value, is_date = _value_to_datetime(condition.value, model.env, iso_only=True)
1597 # Handle inequality
1598 if operator[0] in ('<', '>'):
1599 if value is False: 1599 ↛ 1600line 1599 didn't jump to line 1600 because the condition on line 1599 was never true
1600 return _FALSE_DOMAIN
1601 if not isinstance(value, datetime): 1601 ↛ 1602line 1601 didn't jump to line 1602 because the condition on line 1601 was never true
1602 return condition
1603 if value.microsecond:
1604 assert not is_date, "date don't have microseconds"
1605 value = value.replace(microsecond=0)
1606 delta = timedelta(days=1) if is_date else timedelta(seconds=1)
1607 if operator == '>': 1607 ↛ 1608line 1607 didn't jump to line 1608 because the condition on line 1607 was never true
1608 try:
1609 value += delta
1610 except OverflowError:
1611 # higher than max, not possible
1612 return _FALSE_DOMAIN
1613 operator = '>='
1614 elif operator == '<=':
1615 try:
1616 value += delta
1617 except OverflowError:
1618 # lower than max, just check if field is set
1619 return DomainCondition(field_expr, '!=', False)
1620 operator = '<'
1622 # Handle equality: compare to the whole second
1623 if ( 1623 ↛ 1628line 1623 didn't jump to line 1628 because the condition on line 1623 was never true
1624 operator in ('in', 'not in')
1625 and isinstance(value, COLLECTION_TYPES)
1626 and any(isinstance(v, datetime) for v in value)
1627 ):
1628 delta = timedelta(seconds=1)
1629 domain = DomainOr.apply(
1630 DomainCondition(field_expr, '>=', v.replace(microsecond=0))
1631 & DomainCondition(field_expr, '<', v.replace(microsecond=0) + delta)
1632 if isinstance(v, datetime) else DomainCondition(field_expr, '=', v)
1633 for v in value
1634 )
1635 if operator == 'not in':
1636 domain = ~domain
1637 return domain
1639 return DomainCondition(field_expr, operator, value)
1642@field_type_optimization(['datetime'], level=OptimizationLevel.DYNAMIC_VALUES)
1643def _optimize_type_datetime_relative(condition, model):
1644 operator = condition.operator
1645 if (
1646 operator not in ('in', 'not in', '>', '<', '<=', '>=')
1647 or "." in condition.field_expr
1648 or not isinstance(condition.value, (str, OrderedSet))
1649 ):
1650 return condition
1651 value, _ = _value_to_datetime(condition.value, model.env)
1652 return DomainCondition(condition.field_expr, operator, value)
1655@field_type_optimization(['binary'])
1656def _optimize_type_binary_attachment(condition, model):
1657 field = condition._field(model)
1658 operator = condition.operator
1659 value = condition.value
1660 if field.attachment and not (operator in ('in', 'not in') and set(value) == {False}):
1661 try:
1662 condition._raise('Binary field stored in attachment, accepts only existence check; skipping domain')
1663 except ValueError:
1664 # log with stacktrace
1665 _logger.exception("Invalid operator for a binary field")
1666 return _TRUE_DOMAIN
1667 if operator.endswith('like'):
1668 condition._raise('Cannot use like operators with binary fields', error=NotImplementedError)
1669 return condition
1672@operator_optimization(['parent_of', 'child_of'], OptimizationLevel.FULL)
1673def _operator_hierarchy(condition, model):
1674 """Transform a hierarchy operator into a simpler domain.
1676 ### Semantic of hierarchical operator: `(field, operator, value)`
1678 `field` is either 'id' to indicate to use the default parent relation (`_parent_name`)
1679 or it is a field where the comodel is the same as the model.
1680 The value is used to search a set of `related_records`. We start from the given value,
1681 which can be ids, a name (for searching by name), etc. Then we follow up the relation;
1682 forward in case of `parent_of` and backward in case of `child_of`.
1683 The resulting domain will have 'id' if the field is 'id' or a many2one.
1685 In the case where the comodel is not the same as the model, the result is equivalent to
1686 `('field', 'any', ('id', operator, value))`
1687 """
1688 if condition.operator == 'parent_of':
1689 hierarchy = _operator_parent_of_domain
1690 else:
1691 hierarchy = _operator_child_of_domain
1692 value = condition.value
1693 if value is False: 1693 ↛ 1694line 1693 didn't jump to line 1694 because the condition on line 1693 was never true
1694 return _FALSE_DOMAIN
1695 # Get:
1696 # - field: used in the resulting domain)
1697 # - parent (str | None): field name to find parent in the hierarchy
1698 # - comodel_sudo: used to resolve the hierarchy
1699 # - comodel: used to search for ids based on the value
1700 field = condition._field(model)
1701 if field.type == 'many2one':
1702 comodel = model.env[field.comodel_name].with_context(active_test=False)
1703 elif field.type in ('one2many', 'many2many'):
1704 comodel = model.env[field.comodel_name].with_context(**field.context)
1705 elif field.name == 'id': 1705 ↛ 1708line 1705 didn't jump to line 1708 because the condition on line 1705 was always true
1706 comodel = model
1707 else:
1708 condition._raise(f"Cannot execute {condition.operator} for {field}, works only for relational fields")
1709 comodel_sudo = comodel.sudo().with_context(active_test=False)
1710 parent = comodel._parent_name
1711 if comodel._name == model._name:
1712 if condition.field_expr != 'id':
1713 parent = condition.field_expr
1714 if field.type == 'many2one':
1715 field = model._fields['id']
1716 # Get the initial ids and bind them to comodel_sudo before resolving the hierarchy
1717 if isinstance(value, (int, str)):
1718 value = [value]
1719 elif not isinstance(value, COLLECTION_TYPES): 1719 ↛ 1720line 1719 didn't jump to line 1720 because the condition on line 1719 was never true
1720 condition._raise(f"Value of type {type(value)} is not supported")
1721 coids, other_values = partition(lambda v: isinstance(v, int), value)
1722 search_domain = _FALSE_DOMAIN
1723 if field.type == 'many2many':
1724 # always search for many2many
1725 search_domain |= DomainCondition('id', 'in', coids)
1726 coids = []
1727 if other_values: 1727 ↛ 1729line 1727 didn't jump to line 1729 because the condition on line 1727 was never true
1728 # search for strings
1729 search_domain |= Domain.OR(
1730 Domain('display_name', 'ilike', v)
1731 for v in other_values
1732 )
1733 coids += comodel.search(search_domain, order='id').ids
1734 if not coids: 1734 ↛ 1735line 1734 didn't jump to line 1735 because the condition on line 1734 was never true
1735 return _FALSE_DOMAIN
1736 result = hierarchy(comodel_sudo.browse(coids), parent)
1737 # Format the resulting domain
1738 if isinstance(result, Domain):
1739 if field.name == 'id':
1740 return result
1741 return DomainCondition(field.name, 'any!', result)
1742 return DomainCondition(field.name, 'in', result)
1745def _operator_child_of_domain(comodel: BaseModel, parent):
1746 """Return a set of ids or a domain to find all children of given model"""
1747 if comodel._parent_store and parent == comodel._parent_name:
1748 try:
1749 paths = comodel.mapped('parent_path')
1750 except MissingError:
1751 paths = comodel.exists().mapped('parent_path')
1752 domain = Domain.OR(
1753 DomainCondition('parent_path', '=like', path + '%') # type: ignore
1754 for path in paths
1755 )
1756 return domain
1757 else:
1758 # recursively retrieve all children nodes with sudo(); the
1759 # filtering of forbidden records is done by the rest of the
1760 # domain
1761 child_ids: OrderedSet[int] = OrderedSet()
1762 while comodel:
1763 child_ids.update(comodel._ids)
1764 query = comodel._search(DomainCondition(parent, 'in', OrderedSet(comodel.ids)))
1765 comodel = comodel.browse(OrderedSet(query.get_result_ids()) - child_ids)
1766 return child_ids
1769def _operator_parent_of_domain(comodel: BaseModel, parent):
1770 """Return a set of ids or a domain to find all parents of given model"""
1771 parent_ids: OrderedSet[int]
1772 if comodel._parent_store and parent == comodel._parent_name: 1772 ↛ 1786line 1772 didn't jump to line 1786 because the condition on line 1772 was always true
1773 try:
1774 paths = comodel.mapped('parent_path')
1775 except MissingError:
1776 paths = comodel.exists().mapped('parent_path')
1777 parent_ids = OrderedSet(
1778 int(label)
1779 for path in paths
1780 for label in path.split('/')[:-1]
1781 )
1782 else:
1783 # recursively retrieve all parent nodes with sudo() to avoid
1784 # access rights errors; the filtering of forbidden records is
1785 # done by the rest of the domain
1786 parent_ids = OrderedSet()
1787 try:
1788 comodel.mapped(parent)
1789 except MissingError:
1790 comodel = comodel.exists()
1791 while comodel:
1792 parent_ids.update(comodel._ids)
1793 comodel = comodel[parent].filtered(lambda p: p.id not in parent_ids)
1794 return parent_ids
1797@operator_optimization(['any', 'not any'], level=OptimizationLevel.FULL)
1798def _optimize_any_with_rights(condition, model):
1799 if model.env.su or condition._field(model).bypass_search_access: 1799 ↛ 1801line 1799 didn't jump to line 1801 because the condition on line 1799 was always true
1800 return DomainCondition(condition.field_expr, condition.operator + '!', condition.value)
1801 return condition
1804@field_type_optimization(['many2one'], level=OptimizationLevel.FULL)
1805def _optimize_m2o_bypass_comodel_id_lookup(condition, model):
1806 """Avoid comodel's subquery, if it can be compared with the field directly"""
1807 operator = condition.operator
1808 if ( 1808 ↛ 1823line 1808 didn't jump to line 1823 because the condition on line 1808 was never true
1809 operator in ('any!', 'not any!')
1810 and isinstance(subdomain := condition.value, DomainCondition)
1811 and subdomain.field_expr == 'id'
1812 and (suboperator := subdomain.operator) in ('in', 'not in', 'any!', 'not any!')
1813 ):
1814 # We are bypassing permissions, we can transform:
1815 # a ANY (id IN X) => a IN (X - {False})
1816 # a ANY (id NOT IN X) => a NOT IN (X | {False})
1817 # a ANY (id ANY X) => a ANY X
1818 # a ANY (id NOT ANY X) => a != False AND a NOT ANY X
1819 # a NOT ANY (id IN X) => a NOT IN (X - {False})
1820 # a NOT ANY (id NOT IN X) => a IN (X | {False})
1821 # a NOT ANY (id ANY X) => a NOT ANY X
1822 # a NOT ANY (id NOT ANY X) => a = False OR a ANY X
1823 val = subdomain.value
1824 match suboperator:
1825 case 'in':
1826 domain = DomainCondition(condition.field_expr, 'in', val - {False})
1827 case 'not in':
1828 domain = DomainCondition(condition.field_expr, 'not in', val | {False})
1829 case 'any!':
1830 domain = DomainCondition(condition.field_expr, 'any!', val)
1831 case 'not any!':
1832 domain = DomainCondition(condition.field_expr, '!=', False) \
1833 & DomainCondition(condition.field_expr, 'not any!', val)
1834 if operator == 'not any!':
1835 domain = ~domain
1836 return domain
1838 return condition
1841# --------------------------------------------------
1842# Optimizations: nary
1843# --------------------------------------------------
1846def _merge_set_conditions(cls: type[DomainNary], conditions):
1847 """Base function to merge equality conditions.
1849 Combine the 'in' and 'not in' conditions to a single set of values.
1851 Examples:
1853 a in {1} or a in {2} <=> a in {1, 2}
1854 a in {1, 2} and a not in {2, 5} => a in {1}
1855 """
1856 assert all(isinstance(cond.value, OrderedSet) for cond in conditions)
1858 # build the sets for 'in' and 'not in' conditions
1859 in_sets = [c.value for c in conditions if c.operator == 'in']
1860 not_in_sets = [c.value for c in conditions if c.operator == 'not in']
1862 # combine the sets
1863 field_expr = conditions[0].field_expr
1864 if cls.OPERATOR == '&':
1865 if in_sets: 1865 ↛ 1868line 1865 didn't jump to line 1868 because the condition on line 1865 was always true
1866 return [DomainCondition(field_expr, 'in', intersection(in_sets) - union(not_in_sets))]
1867 else:
1868 return [DomainCondition(field_expr, 'not in', union(not_in_sets))]
1869 else:
1870 if not_in_sets: 1870 ↛ 1871line 1870 didn't jump to line 1871 because the condition on line 1870 was never true
1871 return [DomainCondition(field_expr, 'not in', intersection(not_in_sets) - union(in_sets))]
1872 else:
1873 return [DomainCondition(field_expr, 'in', union(in_sets))]
1876def intersection(sets: list[OrderedSet]) -> OrderedSet:
1877 """Intersection of a list of OrderedSets"""
1878 return functools.reduce(operator.and_, sets)
1881def union(sets: list[OrderedSet]) -> OrderedSet:
1882 """Union of a list of OrderedSets"""
1883 return OrderedSet(elem for s in sets for elem in s)
1886@nary_condition_optimization(operators=('in', 'not in'))
1887def _optimize_merge_set_conditions_mono_value(cls: type[DomainNary], conditions, model):
1888 """Merge equality conditions.
1890 Combine the 'in' and 'not in' conditions to a single set of values.
1891 Do not touch x2many fields which have a different semantic.
1893 Examples:
1895 a in {1} or a in {2} <=> a in {1, 2}
1896 a in {1, 2} and a not in {2, 5} => a in {1}
1897 """
1898 field = conditions[0]._field(model)
1899 if field.type in ('many2many', 'one2many', 'properties'):
1900 return conditions
1901 return _merge_set_conditions(cls, conditions)
1904@nary_condition_optimization(operators=('in',), field_types=['many2many', 'one2many'])
1905def _optimize_merge_set_conditions_x2many_in(cls: type[DomainNary], conditions, model):
1906 """Merge domains of 'in' conditions for x2many fields like for 'any' operator.
1907 """
1908 if cls is DomainAnd:
1909 return conditions
1910 return _merge_set_conditions(cls, conditions)
1913@nary_condition_optimization(operators=('not in',), field_types=['many2many', 'one2many'])
1914def _optimize_merge_set_conditions_x2many_not_in(cls: type[DomainNary], conditions, model):
1915 """Merge domains of 'not in' conditions for x2many fields like for 'not any' operator.
1916 """
1917 if cls is DomainOr:
1918 return conditions
1919 return _merge_set_conditions(cls, conditions)
1922@nary_condition_optimization(['any'], ['many2one', 'one2many', 'many2many'])
1923@nary_condition_optimization(['any!'], ['many2one', 'one2many', 'many2many'])
1924def _optimize_merge_any(cls, conditions, model):
1925 """Merge domains of 'any' conditions for relational fields.
1927 This will lead to a smaller number of sub-queries which are equivalent.
1928 Example:
1930 a any (f = 8) or a any (g = 5) <=> a any (f = 8 or g = 5) (for all fields)
1931 a any (f = 8) and a any (g = 5) <=> a any (f = 8 and g = 5) (for many2one fields only)
1932 """
1933 field = conditions[0]._field(model)
1934 if field.type != 'many2one' and cls is DomainAnd:
1935 return conditions
1936 merge_conditions, other_conditions = partition(lambda c: isinstance(c.value, Domain), conditions)
1937 if len(merge_conditions) < 2: 1937 ↛ 1938line 1937 didn't jump to line 1938 because the condition on line 1937 was never true
1938 return conditions
1939 base = merge_conditions[0]
1940 sub_domain = cls(tuple(c.value for c in merge_conditions))
1941 return [DomainCondition(base.field_expr, base.operator, sub_domain), *other_conditions]
1944@nary_condition_optimization(['not any'], ['many2one', 'one2many', 'many2many'])
1945@nary_condition_optimization(['not any!'], ['many2one', 'one2many', 'many2many'])
1946def _optimize_merge_not_any(cls, conditions, model):
1947 """Merge domains of 'not any' conditions for relational fields.
1949 This will lead to a smaller number of sub-queries which are equivalent.
1950 Example:
1952 a not any (f = 1) or a not any (g = 5) => a not any (f = 1 and g = 5) (for many2one fields only)
1953 a not any (f = 1) and a not any (g = 5) => a not any (f = 1 or g = 5) (for all fields)
1954 """
1955 field = conditions[0]._field(model)
1956 if field.type != 'many2one' and cls is DomainOr:
1957 return conditions
1958 merge_conditions, other_conditions = partition(lambda c: isinstance(c.value, Domain), conditions)
1959 if len(merge_conditions) < 2:
1960 return conditions
1961 base = merge_conditions[0]
1962 sub_domain = cls.INVERSE(tuple(c.value for c in merge_conditions))
1963 return [DomainCondition(base.field_expr, base.operator, sub_domain), *other_conditions]
1966@nary_optimization
1967def _optimize_same_conditions(cls, conditions, model):
1968 """Merge (adjacent) conditions that are the same.
1970 Quick optimization for some conditions, just compare if we have the same
1971 condition twice.
1972 """
1973 # check if we need to create a new list (this is usually not the case)
1974 prev = None
1975 for condition in conditions:
1976 if prev == condition:
1977 break
1978 prev = condition
1979 else:
1980 return conditions
1982 # avoid any function calls, and use the stack semantics for prev comparison
1983 prev = None
1984 return [
1985 condition
1986 for condition in conditions
1987 if prev != (prev := condition)
1988 ]