Coverage for adhoc-cicd-odoo-odoo / odoo / orm / fields_relational.py: 73%
1050 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:22 +0000
1from __future__ import annotations
3import itertools
4import logging
5import typing
6from collections import defaultdict
7from collections.abc import Reversible
8from operator import attrgetter
10from odoo.exceptions import AccessError, MissingError, UserError
11from odoo.tools import SQL, OrderedSet, Query, sql, unique
12from odoo.tools.constants import PREFETCH_MAX
13from odoo.tools.misc import SENTINEL, Sentinel, unquote
15from .commands import Command
16from .domains import Domain
17from .fields import IR_MODELS, Field, _logger
18from .fields_reference import Many2oneReference
19from .identifiers import NewId
20from .models import BaseModel
21from .utils import COLLECTION_TYPES, SQL_OPERATORS, check_pg_name
23if typing.TYPE_CHECKING:
24 from collections.abc import Sequence
25 from odoo.tools.misc import Collector
26 from .types import CommandValue, ContextType, DomainType, Environment, Registry
28 OnDelete = typing.Literal['cascade', 'set null', 'restrict']
30_schema = logging.getLogger('odoo.schema')
33class _Relational(Field[BaseModel]):
34 """ Abstract class for relational fields. """
35 relational: typing.Literal[True] = True
36 comodel_name: str
37 domain: DomainType = [] # domain for searching values
38 context: ContextType = {} # context for searching values
39 bypass_search_access: bool = False # whether access rights are bypassed on the comodel
40 check_company: bool = False
42 def __get__(self, records: BaseModel, owner=None):
43 # base case: do the regular access
44 if records is None or len(records._ids) <= 1:
45 return super().__get__(records, owner)
47 records._check_field_access(self, 'read')
49 # multi-record case
50 if self.compute and self.store:
51 self.recompute(records)
53 # get the cache
54 env = records.env
55 field_cache = self._get_cache(env)
57 # retrieve values in cache, and fetch missing ones
58 vals = []
59 for record_id in records._ids:
60 try:
61 vals.append(field_cache[record_id])
62 except KeyError:
63 if self.store and record_id and len(vals) < len(records) - PREFETCH_MAX: 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true
64 # a lot of missing records, just fetch that field
65 remaining = records[len(vals):]
66 remaining.fetch([self.name])
67 # fetch does not raise MissingError, check value
68 if record_id not in field_cache:
69 raise MissingError("\n".join([
70 env._("Record does not exist or has been deleted."),
71 env._("(Record: %(record)s, User: %(user)s)", record=record_id, user=env.uid),
72 ])) from None
73 else:
74 remaining = records.__class__(env, (record_id,), records._prefetch_ids)
75 super().__get__(remaining, owner)
76 # we have the record now
77 vals.append(field_cache[record_id])
79 return self.convert_to_record_multi(vals, records)
81 def _update_inverse(self, records: BaseModel, value: BaseModel):
82 """ Update the cached value of ``self`` for ``records`` with ``value``. """
83 raise NotImplementedError
85 def convert_to_record_multi(self, values, records):
86 """ Convert a list of (relational field) values from the cache format to
87 the record format, for the sake of optimization.
88 """
89 raise NotImplementedError
91 def setup_nonrelated(self, model):
92 super().setup_nonrelated(model)
93 assert self.comodel_name in model.pool, \
94 f"Field {self} with unknown comodel_name {self.comodel_name or '???'!r}"
96 def setup_inverses(self, registry: Registry, inverses: Collector[Field, Field]):
97 """ Populate ``inverses`` with ``self`` and its inverse fields. """
99 def get_comodel_domain(self, model: BaseModel) -> Domain:
100 """ Return a domain from the domain attribute. """
101 domain = self.domain
102 if callable(domain):
103 # the callable can return either a list, Domain or a string
104 domain = domain(model)
105 if not domain or isinstance(domain, str):
106 # if we don't have a domain or
107 # domain=str is used only for the client-side
108 return Domain.TRUE
109 return Domain(domain)
111 @property
112 def _related_domain(self) -> DomainType | None:
113 def validated(domain):
114 if isinstance(domain, str) and not self.inherited:
115 # string domains are expressions that are not valid for self's model
116 return None
117 return domain
119 if callable(self.domain):
120 # will be called with another model than self's
121 return lambda recs: validated(self.domain(recs.env[self.model_name])) # pylint: disable=not-callable
122 else:
123 return validated(self.domain)
125 _related_context = property(attrgetter('context'))
127 _description_relation = property(attrgetter('comodel_name'))
128 _description_context = property(attrgetter('context'))
130 def _description_domain(self, env: Environment) -> str | list:
131 domain = self._internal_description_domain_raw(env)
132 if self.check_company:
133 field_to_check = None
134 if self.company_dependent:
135 cids = '[allowed_company_ids[0]]'
136 elif self.model_name == 'res.company':
137 # when using check_company=True on a field on 'res.company', the
138 # company_id comes from the id of the current record
139 cids = '[id]'
140 elif 'company_id' in env[self.model_name]:
141 cids = '[company_id]'
142 field_to_check = 'company_id'
143 elif 'company_ids' in env[self.model_name]: 143 ↛ 147line 143 didn't jump to line 147 because the condition on line 143 was always true
144 cids = 'company_ids'
145 field_to_check = 'company_ids'
146 else:
147 _logger.warning(env._(
148 "Couldn't generate a company-dependent domain for field %s. "
149 "The model doesn't have a 'company_id' or 'company_ids' field, and isn't company-dependent either.",
150 self.model_name + '.' + self.name,
151 ))
152 return domain
153 company_domain = env[self.comodel_name]._check_company_domain(companies=unquote(cids))
154 if not field_to_check:
155 return f"{company_domain} + {domain or []}"
156 else:
157 no_company_domain = env[self.comodel_name]._check_company_domain(companies='')
158 return f"({field_to_check} and {company_domain} or {no_company_domain}) + ({domain or []})"
159 return domain
161 def _description_allow_hierachy_operators(self, env):
162 """ Return if the child_of/parent_of makes sense on this field """
163 comodel = env[self.comodel_name]
164 return comodel._parent_name in comodel._fields
166 def _internal_description_domain_raw(self, env) -> str | list:
167 domain = self.domain
168 if callable(domain):
169 domain = domain(env[self.model_name])
170 if isinstance(domain, Domain):
171 domain = list(domain)
172 return domain or []
174 def filter_function(self, records, field_expr, operator, value):
175 getter = self.expression_getter(field_expr)
177 if (self.bypass_search_access or operator == 'any!') and not records.env.su: 177 ↛ 181line 177 didn't jump to line 181 because the condition on line 177 was never true
178 # When filtering with bypass access, search the corecords with sudo
179 # and a special key in the context. To evaluate sub-domains, the
180 # special key makes the environment un-sudoed before evaluation.
181 expr_getter = getter
182 sudo_env = records.sudo().with_context(filter_function_reset_sudo=True).env
183 getter = lambda rec: expr_getter(rec.with_env(sudo_env)) # noqa: E731
185 corecords = getter(records)
186 if operator in ('any', 'any!'):
187 assert isinstance(value, Domain)
188 if operator == 'any' and records.env.context.get('filter_function_reset_sudo'): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 corecords = corecords.sudo(False)._filtered_access('read')
190 corecords = corecords.filtered_domain(value)
191 elif operator == 'in' and isinstance(value, COLLECTION_TYPES): 191 ↛ 204line 191 didn't jump to line 204 because the condition on line 191 was always true
192 value = set(value)
193 if False in value:
194 if not corecords:
195 # shortcut, we know none of records has a corecord
196 return lambda _: True
197 if len(value) > 1:
198 value.discard(False)
199 filter_values = self.filter_function(records, field_expr, 'in', value)
200 return lambda rec: not getter(rec) or filter_values(rec)
201 return lambda rec: not getter(rec)
202 corecords = corecords.filtered_domain(Domain('id', 'in', value))
203 else:
204 corecords = corecords.filtered_domain(Domain('id', operator, value))
206 if not corecords:
207 return lambda _: False
209 ids = set(corecords._ids)
210 return lambda rec: any(id_ in ids for val in getter(rec) for id_ in val._ids)
213class Many2one(_Relational):
214 """ The value of such a field is a recordset of size 0 (no
215 record) or 1 (a single record).
217 :param str comodel_name: name of the target model
218 ``Mandatory`` except for related or extended fields.
220 :param domain: an optional domain to set on candidate values on the
221 client side (domain or a python expression that will be evaluated
222 to provide domain)
224 :param dict context: an optional context to use on the client side when
225 handling that field
227 :param str ondelete: what to do when the referred record is deleted;
228 possible values are: ``'set null'``, ``'restrict'``, ``'cascade'``
230 :param bool bypass_search_access: whether access rights are bypassed on the
231 comodel (default: ``False``)
233 :param bool delegate: set it to ``True`` to make fields of the target model
234 accessible from the current model (corresponds to ``_inherits``)
236 :param bool check_company: Mark the field to be verified in
237 :meth:`~odoo.models.Model._check_company`. Has a different behaviour
238 depending on whether the field is company_dependent or not.
239 Constrains non-company-dependent fields to target records whose
240 company_id(s) are compatible with the record's company_id(s).
241 Constrains company_dependent fields to target records whose
242 company_id(s) are compatible with the currently active company.
243 """
244 type = 'many2one'
245 _column_type = ('int4', 'int4')
247 ondelete: OnDelete | None = None # what to do when value is deleted
248 delegate: bool = False # whether self implements delegation
250 def __init__(self, comodel_name: str | Sentinel = SENTINEL, string: str | Sentinel = SENTINEL, **kwargs):
251 super().__init__(comodel_name=comodel_name, string=string, **kwargs)
253 def _setup_attrs__(self, model_class, name):
254 super()._setup_attrs__(model_class, name)
255 # determine self.delegate
256 if name in model_class._inherits.values():
257 self.delegate = True
258 # self.delegate implies self.bypass_search_access
259 self.bypass_search_access = True
260 elif self.delegate: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true
261 comodel_name = self.comodel_name or 'comodel_name'
262 raise TypeError((
263 f"The delegate field {self} must be declared in the model class e.g.\n"
264 f"_inherits = {{{comodel_name!r}: {name!r}}}"
265 ))
267 def setup_nonrelated(self, model):
268 super().setup_nonrelated(model)
269 # 3 cases:
270 # 1) The ondelete attribute is not defined, we assign it a sensible default
271 # 2) The ondelete attribute is defined and its definition makes sense
272 # 3) The ondelete attribute is explicitly defined as 'set null' for a required m2o,
273 # this is considered a programming error.
274 if not self.ondelete:
275 comodel = model.env[self.comodel_name]
276 if model.is_transient() and not comodel.is_transient():
277 # Many2one relations from TransientModel Model are annoying because
278 # they can block deletion due to foreign keys. So unless stated
279 # otherwise, we default them to ondelete='cascade'.
280 self.ondelete = 'cascade' if self.required else 'set null'
281 else:
282 self.ondelete = 'restrict' if self.required else 'set null'
283 if self.ondelete == 'set null' and self.required: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 raise ValueError(
285 "The m2o field %s of model %s is required but declares its ondelete policy "
286 "as being 'set null'. Only 'restrict' and 'cascade' make sense."
287 % (self.name, model._name)
288 )
289 if self.ondelete == 'restrict' and self.comodel_name in IR_MODELS: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 raise ValueError(
291 f"Field {self.name} of model {model._name} is defined as ondelete='restrict' "
292 f"while having {self.comodel_name} as comodel, the 'restrict' mode is not "
293 f"supported for this type of field as comodel."
294 )
296 def update_db(self, model, columns):
297 comodel = model.env[self.comodel_name]
298 if not model.is_transient() and comodel.is_transient(): 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true
299 raise ValueError('Many2one %s from Model to TransientModel is forbidden' % self)
300 return super().update_db(model, columns)
302 def update_db_column(self, model, column):
303 super().update_db_column(model, column)
304 model.pool.post_init(self.update_db_foreign_key, model, column)
306 def update_db_foreign_key(self, model, column):
307 if self.company_dependent:
308 return
309 comodel = model.env[self.comodel_name]
310 # foreign keys do not work on views, and users can define custom models on sql views.
311 if not model._is_an_ordinary_table() or not comodel._is_an_ordinary_table(): 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 return
313 # ir_actions is inherited, so foreign key doesn't work on it
314 if not comodel._auto or comodel._table == 'ir_actions':
315 return
316 # create/update the foreign key, and reflect it in 'ir.model.constraint'
317 model.pool.add_foreign_key(
318 model._table, self.name, comodel._table, 'id', self.ondelete or 'set null',
319 model, self._module
320 )
322 def _update_inverse(self, records, value):
323 for record in records:
324 self._update_cache(record, self.convert_to_cache(value, record, validate=False))
326 def convert_to_column(self, value, record, values=None, validate=True):
327 return value or None
329 def convert_to_cache(self, value, record, validate=True):
330 # cache format: id or None
331 if type(value) is int or type(value) is NewId:
332 id_ = value
333 elif isinstance(value, BaseModel):
334 if validate and (value._name != self.comodel_name or len(value) > 1): 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 raise ValueError("Wrong value for %s: %r" % (self, value))
336 id_ = value._ids[0] if value._ids else None
337 elif isinstance(value, tuple): 337 ↛ 339line 337 didn't jump to line 339 because the condition on line 337 was never true
338 # value is either a pair (id, name), or a tuple of ids
339 id_ = value[0] if value else None
340 elif isinstance(value, dict): 340 ↛ 342line 340 didn't jump to line 342 because the condition on line 340 was never true
341 # return a new record (with the given field 'id' as origin)
342 comodel = record.env[self.comodel_name]
343 origin = comodel.browse(value.get('id'))
344 id_ = comodel.new(value, origin=origin).id
345 else:
346 id_ = None
348 if self.delegate and record and not any(record._ids):
349 # if all records are new, then so is the parent
350 id_ = id_ and NewId(id_)
352 return id_
354 def convert_to_record(self, value, record):
355 # use registry to avoid creating a recordset for the model
356 ids = () if value is None else (value,)
357 prefetch_ids = PrefetchMany2one(record, self)
358 return record.pool[self.comodel_name](record.env, ids, prefetch_ids)
360 def convert_to_record_multi(self, values, records):
361 # return the ids as a recordset without duplicates
362 prefetch_ids = PrefetchMany2one(records, self)
363 ids = tuple(unique(id_ for id_ in values if id_ is not None))
364 return records.pool[self.comodel_name](records.env, ids, prefetch_ids)
366 def convert_to_read(self, value, record, use_display_name=True):
367 if use_display_name and value: 367 ↛ 371line 367 didn't jump to line 371 because the condition on line 367 was never true
368 # evaluate display_name as superuser, because the visibility of a
369 # many2one field value (id and name) depends on the current record's
370 # access rights, and not the value's access rights.
371 try:
372 # performance: value.sudo() prefetches the same records as value
373 return (value.id, value.sudo().display_name)
374 except MissingError:
375 # Should not happen, unless the foreign key is missing.
376 return False
377 else:
378 return value.id
380 def convert_to_write(self, value, record):
381 if type(value) is int or type(value) is NewId:
382 return value
383 if not value:
384 return False
385 if isinstance(value, BaseModel) and value._name == self.comodel_name: 385 ↛ 387line 385 didn't jump to line 387 because the condition on line 385 was always true
386 return value.id
387 if isinstance(value, tuple):
388 # value is either a pair (id, name), or a tuple of ids
389 return value[0] if value else False
390 if isinstance(value, dict):
391 return record.env[self.comodel_name].new(value).id
392 raise ValueError("Wrong value for %s: %r" % (self, value))
394 def convert_to_export(self, value, record):
395 return value.display_name if value else ''
397 def convert_to_display_name(self, value, record):
398 return value.display_name
400 def write(self, records, value):
401 # discard recomputation of self on records
402 records.env.remove_to_compute(self, records)
404 # discard the records that are not modified
405 cache_value = self.convert_to_cache(value, records)
406 records = self._filter_not_equal(records, cache_value)
407 if not records:
408 return
410 # remove records from the cache of one2many fields of old corecords
411 self._remove_inverses(records, cache_value)
413 # update the cache of self
414 self._update_cache(records, cache_value, dirty=True)
416 # update the cache of one2many fields of new corecord
417 self._update_inverses(records, cache_value)
419 def _remove_inverses(self, records: BaseModel, value):
420 """ Remove `records` from the cached values of the inverse fields (o2m) of `self`. """
421 inverse_fields = records.pool.field_inverses[self]
422 if not inverse_fields:
423 return
425 record_ids = set(records._ids)
426 # align(id) returns a NewId if records are new, a real id otherwise
427 align = (lambda id_: id_) if all(record_ids) else (lambda id_: id_ and NewId(id_))
428 field_cache = self._get_cache(records.env)
429 corecords = records.env[self.comodel_name].browse(
430 align(coid) for record_id in records._ids
431 if (coid := field_cache.get(record_id)) is not None
432 )
434 for invf in inverse_fields:
435 inv_cache = invf._get_cache(corecords.env)
436 for corecord in corecords:
437 ids0 = inv_cache.get(corecord.id)
438 if ids0 is not None:
439 ids1 = tuple(id_ for id_ in ids0 if id_ not in record_ids)
440 invf._update_cache(corecord, ids1)
442 def _update_inverses(self, records: BaseModel, value):
443 """ Add `records` to the cached values of the inverse fields (o2m) of `self`. """
444 if value is None:
445 return
446 corecord = self.convert_to_record(value, records)
447 for invf in records.pool.field_inverses[self]:
448 valid_records = records.filtered_domain(invf.get_comodel_domain(corecord))
449 if not valid_records:
450 continue
451 ids0 = invf._get_cache(corecord.env).get(corecord.id)
452 # if the value for the corecord is not in cache, but this is a new
453 # record, assign it anyway, as you won't be able to fetch it from
454 # database (see `test_sale_order`)
455 if ids0 is not None or not corecord.id:
456 ids1 = tuple(unique((ids0 or ()) + valid_records._ids))
457 invf._update_cache(corecord, ids1)
459 def to_sql(self, model: BaseModel, alias: str) -> SQL:
460 sql_field = super().to_sql(model, alias)
461 if self.company_dependent:
462 comodel = model.env[self.comodel_name]
463 sql_field = SQL(
464 '''(SELECT %(cotable_alias)s.id
465 FROM %(cotable)s AS %(cotable_alias)s
466 WHERE %(cotable_alias)s.id = %(ref)s)''',
467 cotable=SQL.identifier(comodel._table),
468 cotable_alias=SQL.identifier(Query.make_alias(comodel._table, 'exists')),
469 ref=sql_field,
470 )
471 return sql_field
473 def condition_to_sql(self, field_expr: str, operator: str, value, model: BaseModel, alias: str, query: Query) -> SQL:
474 if operator not in ('any', 'not any', 'any!', 'not any!') or field_expr != self.name:
475 # for other operators than 'any', just generate condition based on column type
476 return super().condition_to_sql(field_expr, operator, value, model, alias, query)
478 comodel = model.env[self.comodel_name]
479 sql_field = model._field_to_sql(alias, field_expr, query)
480 can_be_null = self not in model.env.registry.not_null_fields
481 bypass_access = operator in ('any!', 'not any!') or self.bypass_search_access
482 positive = operator in ('any', 'any!')
484 # Decide whether to use a LEFT JOIN
485 left_join = bypass_access and isinstance(value, Domain)
486 if left_join and not positive:
487 # For 'not any!', we get a better query with a NOT IN when we have a
488 # lot of positive conditions which have a better chance to use
489 # indexes.
490 # `field NOT IN (SELECT ... WHERE z = y)` better than
491 # `LEFT JOIN ... ON field = id WHERE z <> y`
492 # There are some exceptions: we filter on 'id'.
493 left_join = sum(
494 (-1 if cond.operator in Domain.NEGATIVE_OPERATORS else 1)
495 for cond in value.iter_conditions()
496 ) < 0 or any(
497 cond.field_expr == 'id' and cond.operator not in Domain.NEGATIVE_OPERATORS
498 for cond in value.iter_conditions()
499 )
501 if left_join:
502 comodel, coalias = self.join(model, alias, query)
503 if not positive: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true
504 value = (~value).optimize_full(comodel)
505 sql = value._to_sql(comodel, coalias, query)
506 if self.company_dependent: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true
507 sql = self._condition_to_sql_company(sql, field_expr, operator, value, model, alias, query)
508 if can_be_null: 508 ↛ 513line 508 didn't jump to line 513 because the condition on line 508 was always true
509 if positive: 509 ↛ 512line 509 didn't jump to line 512 because the condition on line 509 was always true
510 sql = SQL("(%s IS NOT NULL AND %s)", sql_field, sql)
511 else:
512 sql = SQL("(%s IS NULL OR %s)", sql_field, sql)
513 return sql
515 if isinstance(value, Domain):
516 value = comodel._search(value, active_test=False, bypass_access=bypass_access)
517 if isinstance(value, Query): 517 ↛ 519line 517 didn't jump to line 519 because the condition on line 517 was always true
518 subselect = value.subselect()
519 elif isinstance(value, SQL):
520 subselect = SQL("(%s)", value)
521 else:
522 raise TypeError(f"condition_to_sql() 'any' operator accepts Domain, SQL or Query, got {value}")
523 sql = SQL(
524 "%s%s%s",
525 sql_field,
526 SQL(" IN ") if positive else SQL(" NOT IN "),
527 subselect,
528 )
529 if can_be_null and not positive:
530 sql = SQL("(%s IS NULL OR %s)", sql_field, sql)
531 if self.company_dependent: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 sql = self._condition_to_sql_company(sql, field_expr, operator, value, model, alias, query)
533 return sql
535 def join(self, model: BaseModel, alias: str, query: Query) -> tuple[BaseModel, str]:
536 """ Add a LEFT JOIN to ``query`` by following field ``self``,
537 and return the joined table's corresponding model and alias.
538 """
539 comodel = model.env[self.comodel_name]
540 coalias = query.make_alias(alias, self.name)
541 query.add_join('LEFT JOIN', coalias, comodel._table, SQL(
542 "%s = %s",
543 model._field_to_sql(alias, self.name, query),
544 SQL.identifier(coalias, 'id'),
545 ))
546 return (comodel, coalias)
549class _RelationalMulti(_Relational):
550 r"Abstract class for relational fields \*2many."
551 write_sequence = 20
553 # Important: the cache contains the ids of all the records in the relation,
554 # including inactive records. Inactive records are filtered out by
555 # convert_to_record(), depending on the context.
557 def _update_inverse(self, records, value):
558 new_id = value.id
559 assert not new_id, "Field._update_inverse can only be called with a new id"
560 field_cache = self._get_cache(records.env)
561 for record_id in records._ids:
562 assert not record_id, "Field._update_inverse can only be called with new records"
563 cache_value = field_cache.get(record_id, SENTINEL)
564 if cache_value is SENTINEL: 564 ↛ 567line 564 didn't jump to line 567 because the condition on line 564 was always true
565 records.env.transaction.field_data_patches[self][record_id].append(new_id)
566 else:
567 field_cache[record_id] = tuple(unique(cache_value + (new_id,)))
569 def _update_cache(self, records, cache_value, dirty=False):
570 field_patches = records.env.transaction.field_data_patches.get(self)
571 if field_patches and records:
572 for record in records:
573 ids = field_patches.pop(record.id, ())
574 if ids: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true
575 value = tuple(unique(itertools.chain(cache_value, ids)))
576 else:
577 value = cache_value
578 super()._update_cache(record, value, dirty)
579 return
580 super()._update_cache(records, cache_value, dirty)
582 def convert_to_cache(self, value, record, validate=True):
583 # cache format: tuple(ids)
584 if isinstance(value, BaseModel):
585 if validate and value._name != self.comodel_name: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 raise ValueError("Wrong value for %s: %s" % (self, value))
587 ids = value._ids
588 if record and not record.id:
589 # x2many field value of new record is new records
590 ids = tuple(it and NewId(it) for it in ids)
591 return ids
593 elif isinstance(value, (list, tuple)):
594 # value is a list/tuple of commands, dicts or record ids
595 comodel = record.env[self.comodel_name]
596 # if record is new, the field's value is new records
597 if record and not record.id:
598 browse = lambda it: comodel.browse((it and NewId(it),))
599 else:
600 browse = comodel.browse
601 # determine the value ids: in case of a real record or a new record
602 # with origin, take its current value
603 ids = OrderedSet(record[self.name]._ids if record._origin else ())
604 # modify ids with the commands
605 for command in value:
606 if isinstance(command, (tuple, list)):
607 if command[0] == Command.CREATE:
608 ids.add(comodel.new(command[2], ref=command[1]).id)
609 elif command[0] == Command.UPDATE: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true
610 line = browse(command[1])
611 if validate:
612 line.update(command[2])
613 else:
614 line._update_cache(command[2], validate=False)
615 ids.add(line.id)
616 elif command[0] in (Command.DELETE, Command.UNLINK): 616 ↛ 617line 616 didn't jump to line 617 because the condition on line 616 was never true
617 ids.discard(browse(command[1]).id)
618 elif command[0] == Command.LINK:
619 ids.add(browse(command[1]).id)
620 elif command[0] == Command.CLEAR:
621 ids.clear()
622 elif command[0] == Command.SET: 622 ↛ 605line 622 didn't jump to line 605 because the condition on line 622 was always true
623 ids = OrderedSet(browse(it).id for it in command[2])
624 elif isinstance(command, dict): 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true
625 ids.add(comodel.new(command).id)
626 else:
627 ids.add(browse(command).id)
628 # return result as a tuple
629 return tuple(ids)
631 elif not value: 631 ↛ 634line 631 didn't jump to line 634 because the condition on line 631 was always true
632 return ()
634 raise ValueError("Wrong value for %s: %s" % (self, value))
636 def convert_to_record(self, value, record):
637 # use registry to avoid creating a recordset for the model
638 prefetch_ids = PrefetchX2many(record, self)
639 Comodel = record.pool[self.comodel_name]
640 corecords = Comodel(record.env, value, prefetch_ids)
641 if (
642 Comodel._active_name
643 and self.context.get('active_test', record.env.context.get('active_test', True))
644 ):
645 corecords = corecords.filtered(Comodel._active_name).with_prefetch(prefetch_ids)
646 return corecords
648 def convert_to_record_multi(self, values, records):
649 # return the list of ids as a recordset without duplicates
650 prefetch_ids = PrefetchX2many(records, self)
651 Comodel = records.pool[self.comodel_name]
652 ids = tuple(unique(id_ for ids in values for id_ in ids))
653 corecords = Comodel(records.env, ids, prefetch_ids)
654 if (
655 Comodel._active_name
656 and self.context.get('active_test', records.env.context.get('active_test', True))
657 ):
658 corecords = corecords.filtered(Comodel._active_name).with_prefetch(prefetch_ids)
659 return corecords
661 def convert_to_read(self, value, record, use_display_name=True):
662 return value.ids
664 def convert_to_write(self, value, record):
665 if isinstance(value, tuple):
666 # a tuple of ids, this is the cache format
667 value = record.env[self.comodel_name].browse(value)
669 if isinstance(value, BaseModel) and value._name == self.comodel_name: 669 ↛ 697line 669 didn't jump to line 697 because the condition on line 669 was always true
670 def get_origin(val):
671 return val._origin if isinstance(val, BaseModel) else val
673 # make result with new and existing records
674 inv_names = {field.name for field in record.pool.field_inverses[self]}
675 result = [Command.set([])]
676 for record in value:
677 origin = record._origin
678 if not origin: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true
679 values = record._convert_to_write({
680 name: record[name]
681 for name in record._cache
682 if name not in inv_names
683 })
684 result.append(Command.create(values))
685 else:
686 result[0][2].append(origin.id)
687 if record != origin:
688 values = record._convert_to_write({
689 name: record[name]
690 for name in record._cache
691 if name not in inv_names and get_origin(record[name]) != origin[name]
692 })
693 if values: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true
694 result.append(Command.update(origin.id, values))
695 return result
697 if value is False or value is None:
698 return [Command.clear()]
700 if isinstance(value, list):
701 return value
703 raise ValueError("Wrong value for %s: %s" % (self, value))
705 def convert_to_export(self, value, record):
706 return ','.join(value.mapped('display_name')) if value else ''
708 def convert_to_display_name(self, value, record):
709 raise NotImplementedError()
711 def get_depends(self, model):
712 depends, depends_context = super().get_depends(model)
713 if not self.compute and isinstance(domain := self.domain, (list, Domain)):
714 domain = Domain(domain)
715 depends = unique(itertools.chain(depends, (
716 self.name + '.' + condition.field_expr
717 for condition in domain.iter_conditions()
718 )))
719 return depends, depends_context
721 def create(self, record_values):
722 """ Write the value of ``self`` on the given records, which have just
723 been created.
725 :param record_values: a list of pairs ``(record, value)``, where
726 ``value`` is in the format of method :meth:`BaseModel.write`
727 """
728 self.write_batch(record_values, True)
730 def write(self, records, value):
731 # discard recomputation of self on records
732 records.env.remove_to_compute(self, records)
733 self.write_batch([(records, value)])
735 def write_batch(self, records_commands_list: Sequence[tuple[BaseModel, typing.Any]], create: bool = False) -> None:
736 if not records_commands_list: 736 ↛ 737line 736 didn't jump to line 737 because the condition on line 736 was never true
737 return
739 for idx, (recs, value) in enumerate(records_commands_list):
740 if isinstance(value, tuple): 740 ↛ 741line 740 didn't jump to line 741 because the condition on line 740 was never true
741 value = [Command.set(value)]
742 elif isinstance(value, BaseModel) and value._name == self.comodel_name:
743 value = [Command.set(value._ids)]
744 elif value is False or value is None:
745 value = [Command.clear()]
746 elif isinstance(value, list) and value and not isinstance(value[0], (tuple, list)):
747 value = [Command.set(tuple(value))]
748 if not isinstance(value, list): 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true
749 raise ValueError("Wrong value for %s: %s" % (self, value))
750 records_commands_list[idx] = (recs, value)
752 record_ids = {rid for recs, cs in records_commands_list for rid in recs._ids}
753 if all(record_ids):
754 self.write_real(records_commands_list, create)
755 else:
756 assert not any(record_ids), f"{records_commands_list} contains a mix of real and new records. It is not supported."
757 self.write_new(records_commands_list)
759 def write_real(self, records_commands_list: Sequence[tuple[BaseModel, list[CommandValue]]], create: bool = False) -> None:
760 raise NotImplementedError
762 def write_new(self, records_commands_list: Sequence[tuple[BaseModel, list[CommandValue]]]) -> None:
763 raise NotImplementedError
765 def _check_sudo_commands(self, comodel):
766 # if the model doesn't accept sudo commands
767 if not comodel._allow_sudo_commands:
768 # Then, disable sudo and reset the transaction origin user
769 return comodel.sudo(False).with_user(comodel.env.transaction.default_env.uid)
770 return comodel
772 def condition_to_sql(self, field_expr: str, operator: str, value, model: BaseModel, alias: str, query: Query) -> SQL:
773 assert field_expr == self.name, "Supporting condition only to field"
774 comodel = model.env[self.comodel_name]
775 if not self.store: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true
776 raise ValueError(f"Cannot convert {self} to SQL because it is not stored")
778 # update the operator to 'any'
779 if operator in ('in', 'not in'):
780 operator = 'any' if operator == 'in' else 'not any'
781 assert operator in ('any', 'not any', 'any!', 'not any!'), \
782 f"Relational field {self} expects 'any' operator"
783 exists = operator in ('any', 'any!')
785 # check the value and execute the query
786 if isinstance(value, COLLECTION_TYPES):
787 value = OrderedSet(value)
788 comodel = comodel.sudo().with_context(active_test=False)
789 if False in value:
790 # [not]in (False, 1) => split conditions
791 # We want records that have a record such as condition or
792 # that don't have any records.
793 if len(value) > 1: 793 ↛ 794line 793 didn't jump to line 794 because the condition on line 793 was never true
794 in_operator = 'in' if exists else 'not in'
795 return SQL(
796 "(%s OR %s)" if exists else "(%s AND %s)",
797 self.condition_to_sql(field_expr, in_operator, (False,), model, alias, query),
798 self.condition_to_sql(field_expr, in_operator, value - {False}, model, alias, query),
799 )
800 # in (False) => not any (Domain.TRUE)
801 # not in (False) => any (Domain.TRUE)
802 value = comodel._search(Domain.TRUE)
803 exists = not exists
804 else:
805 value = comodel.browse(value)._as_query(ordered=False)
806 elif isinstance(value, SQL): 806 ↛ 808line 806 didn't jump to line 808 because the condition on line 806 was never true
807 # wrap SQL into a simple query
808 comodel = comodel.sudo()
809 value = Domain('id', 'any', value)
810 coquery = self._get_query_for_condition_value(model, comodel, operator, value)
811 return self._condition_to_sql_relational(model, alias, exists, coquery, query)
813 def _get_query_for_condition_value(self, model: BaseModel, comodel: BaseModel, operator: str, value: Domain | Query) -> Query:
814 """ Return Query run on the comodel with the field.domain injected."""
815 field_domain = self.get_comodel_domain(model)
816 if isinstance(value, Domain):
817 domain = value & field_domain
818 comodel = comodel.with_context(**self.context)
819 bypass_access = self.bypass_search_access or operator in ('any!', 'not any!')
820 query = comodel._search(domain, bypass_access=bypass_access)
821 assert isinstance(query, Query)
822 return query
823 if isinstance(value, Query): 823 ↛ 830line 823 didn't jump to line 830 because the condition on line 823 was always true
824 # add the field_domain to the query
825 domain = field_domain.optimize_full(comodel)
826 if not domain.is_true():
827 # TODO should clone/copy Query value
828 value.add_where(domain._to_sql(comodel, value.table, value))
829 return value
830 raise NotImplementedError(f"Cannot build query for {value}")
832 def _condition_to_sql_relational(self, model: BaseModel, alias: str, exists: bool, coquery: Query, query: Query) -> SQL:
833 raise NotImplementedError
836class One2many(_RelationalMulti):
837 """One2many field; the value of such a field is the recordset of all the
838 records in ``comodel_name`` such that the field ``inverse_name`` is equal to
839 the current record.
841 :param str comodel_name: name of the target model
843 :param str inverse_name: name of the inverse ``Many2one`` field in
844 ``comodel_name``
846 :param domain: an optional domain to set on candidate values on the
847 client side (domain or a python expression that will be evaluated
848 to provide domain)
850 :param dict context: an optional context to use on the client side when
851 handling that field
853 :param bool bypass_search_access: whether access rights are bypassed on the
854 comodel (default: ``False``)
856 The attributes ``comodel_name`` and ``inverse_name`` are mandatory except in
857 the case of related fields or field extensions.
858 """
859 type = 'one2many'
861 inverse_name: str | None = None # name of the inverse field
862 copy: bool = False # o2m are not copied by default
864 def __init__(self, comodel_name: str | Sentinel = SENTINEL, inverse_name: str | Sentinel = SENTINEL,
865 string: str | Sentinel = SENTINEL, **kwargs):
866 super().__init__(
867 comodel_name=comodel_name,
868 inverse_name=inverse_name,
869 string=string,
870 **kwargs
871 )
873 def setup_nonrelated(self, model):
874 super().setup_nonrelated(model)
875 if self.inverse_name:
876 # link self to its inverse field and vice-versa
877 comodel = model.env[self.comodel_name]
878 try:
879 field = comodel._fields[self.inverse_name]
880 field.setup(comodel)
881 except KeyError:
882 raise ValueError(f"{self.inverse_name!r} declared in {self!r} does not exist on {comodel._name!r}.")
884 def setup_inverses(self, registry, inverses):
885 if self.inverse_name:
886 # link self to its inverse field and vice-versa
887 invf = registry[self.comodel_name]._fields[self.inverse_name]
888 if isinstance(invf, (Many2one, Many2oneReference)): 888 ↛ 892line 888 didn't jump to line 892 because the condition on line 888 was always true
889 # setting one2many fields only invalidates many2one inverses;
890 # integer inverses (res_model/res_id pairs) are not supported
891 inverses.add(self, invf)
892 inverses.add(invf, self)
894 _description_relation_field = property(attrgetter('inverse_name'))
896 def update_db(self, model, columns):
897 if self.comodel_name in model.env: 897 ↛ exitline 897 didn't return from function 'update_db' because the condition on line 897 was always true
898 comodel = model.env[self.comodel_name]
899 if self.inverse_name not in comodel._fields: 899 ↛ 900line 899 didn't jump to line 900 because the condition on line 899 was never true
900 raise UserError(model.env._(
901 'No inverse field "%(inverse_field)s" found for "%(comodel)s"',
902 inverse_field=self.inverse_name,
903 comodel=self.comodel_name
904 ))
906 def _additional_domain(self, env) -> Domain:
907 if self.comodel_name and self.inverse_name:
908 comodel = env.registry[self.comodel_name]
909 inverse_field = comodel._fields[self.inverse_name]
910 if inverse_field.type == 'many2one_reference':
911 return Domain(inverse_field.model_field, '=', self.model_name)
912 return Domain.TRUE
914 def get_comodel_domain(self, model: BaseModel) -> Domain:
915 return super().get_comodel_domain(model) & self._additional_domain(model.env)
917 def _internal_description_domain_raw(self, env) -> str | list:
918 domain = super()._internal_description_domain_raw(env)
919 additional_domain = self._additional_domain(env)
920 if additional_domain.is_true():
921 return domain
922 return f"({domain}) + ({additional_domain})"
924 def __get__(self, records, owner=None):
925 if records is not None and self.inverse_name is not None:
926 # force the computation of the inverse field to ensure that the
927 # cache value of self is consistent
928 inverse_field = records.pool[self.comodel_name]._fields[self.inverse_name]
929 if inverse_field.compute:
930 records.env[self.comodel_name]._recompute_model([self.inverse_name])
931 return super().__get__(records, owner)
933 def read(self, records):
934 # retrieve the lines in the comodel
935 context = {'active_test': False}
936 context.update(self.context)
937 comodel = records.env[self.comodel_name].with_context(**context)
938 inverse = self.inverse_name
939 inverse_field = comodel._fields[inverse]
941 # optimization: fetch the inverse and active fields with search()
942 domain = self.get_comodel_domain(records) & Domain(inverse, 'in', records.ids)
943 field_names = [inverse]
944 if comodel._active_name:
945 field_names.append(comodel._active_name)
946 try:
947 lines = comodel.search_fetch(domain, field_names)
948 except AccessError as e:
949 raise AccessError(records.env._("Failed to read field %s", self) + '\n' + str(e)) from e
951 # group lines by inverse field (without prefetching other fields)
952 get_id = (lambda rec: rec.id) if inverse_field.type == 'many2one' else int
953 group = defaultdict(list)
954 for line in lines:
955 # line[inverse] may be a record or an integer
956 group[get_id(line[inverse])].append(line.id)
958 # store result in cache
959 values = [tuple(group[id_]) for id_ in records._ids]
960 self._insert_cache(records, values)
962 def write_real(self, records_commands_list, create=False):
963 """ Update real records. """
964 # records_commands_list = [(records, commands), ...]
965 if not records_commands_list: 965 ↛ 966line 965 didn't jump to line 966 because the condition on line 965 was never true
966 return
968 model = records_commands_list[0][0].browse()
969 comodel = model.env[self.comodel_name].with_context(**self.context)
970 comodel = self._check_sudo_commands(comodel)
972 if self.store:
973 inverse = self.inverse_name
974 to_create = [] # line vals to create
975 to_delete = [] # line ids to delete
976 to_link = defaultdict(OrderedSet) # {record: line_ids}
977 allow_full_delete = not create
979 def unlink(lines):
980 if getattr(comodel._fields[inverse], 'ondelete', False) == 'cascade':
981 to_delete.extend(lines._ids)
982 else:
983 lines[inverse] = False
985 def flush():
986 if to_link:
987 before = {record: record[self.name] for record in to_link}
988 if to_delete: 988 ↛ 990line 988 didn't jump to line 990 because the condition on line 988 was never true
989 # unlink() will remove the lines from the cache
990 comodel.browse(to_delete).unlink()
991 to_delete.clear()
992 if to_create:
993 # create() will add the new lines to the cache of records
994 comodel.create(to_create)
995 to_create.clear()
996 if to_link:
997 for record, line_ids in to_link.items():
998 lines = comodel.browse(line_ids) - before[record]
999 # linking missing lines should fail
1000 lines.mapped(inverse)
1001 lines[inverse] = record
1002 to_link.clear()
1004 for recs, commands in records_commands_list:
1005 for command in (commands or ()):
1006 if command[0] == Command.CREATE:
1007 for record in recs:
1008 to_create.append(dict(command[2], **{inverse: record.id}))
1009 allow_full_delete = False
1010 elif command[0] == Command.UPDATE: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true
1011 prefetch_ids = recs[self.name]._prefetch_ids
1012 comodel.browse(command[1]).with_prefetch(prefetch_ids).write(command[2])
1013 elif command[0] == Command.DELETE: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true
1014 to_delete.append(command[1])
1015 elif command[0] == Command.UNLINK: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true
1016 unlink(comodel.browse(command[1]))
1017 elif command[0] == Command.LINK:
1018 to_link[recs[-1]].add(command[1])
1019 allow_full_delete = False
1020 elif command[0] in (Command.CLEAR, Command.SET): 1020 ↛ 1005line 1020 didn't jump to line 1005 because the condition on line 1020 was always true
1021 line_ids = command[2] if command[0] == Command.SET else []
1022 if not allow_full_delete:
1023 # do not try to delete anything in creation mode if nothing has been created before
1024 if line_ids:
1025 # equivalent to Command.LINK
1026 if line_ids.__class__ is int: 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true
1027 line_ids = [line_ids]
1028 to_link[recs[-1]].update(line_ids)
1029 allow_full_delete = False
1030 continue
1031 flush()
1032 # assign the given lines to the last record only
1033 lines = comodel.browse(line_ids)
1034 domain = self.get_comodel_domain(model) & Domain(inverse, 'in', recs.ids) & Domain('id', 'not in', lines.ids)
1035 unlink(comodel.search(domain))
1036 lines[inverse] = recs[-1]
1038 flush()
1040 else:
1041 ids = OrderedSet(rid for recs, cs in records_commands_list for rid in recs._ids)
1042 records = records_commands_list[0][0].browse(ids)
1044 def link(record, lines):
1045 ids = record[self.name]._ids
1046 self._update_cache(record, tuple(unique(ids + lines._ids)))
1048 def unlink(lines):
1049 for record in records:
1050 self._update_cache(record, (record[self.name] - lines)._ids)
1052 for recs, commands in records_commands_list:
1053 for command in (commands or ()):
1054 if command[0] == Command.CREATE: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true
1055 for record in recs:
1056 link(record, comodel.new(command[2], ref=command[1]))
1057 elif command[0] == Command.UPDATE: 1057 ↛ 1058line 1057 didn't jump to line 1058 because the condition on line 1057 was never true
1058 comodel.browse(command[1]).write(command[2])
1059 elif command[0] == Command.DELETE: 1059 ↛ 1060line 1059 didn't jump to line 1060 because the condition on line 1059 was never true
1060 unlink(comodel.browse(command[1]))
1061 elif command[0] == Command.UNLINK: 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true
1062 unlink(comodel.browse(command[1]))
1063 elif command[0] == Command.LINK: 1063 ↛ 1064line 1063 didn't jump to line 1064 because the condition on line 1063 was never true
1064 link(recs[-1], comodel.browse(command[1]))
1065 elif command[0] in (Command.CLEAR, Command.SET): 1065 ↛ 1053line 1065 didn't jump to line 1053 because the condition on line 1065 was always true
1066 # assign the given lines to the last record only
1067 self._update_cache(recs, ())
1068 lines = comodel.browse(command[2] if command[0] == Command.SET else [])
1069 self._update_cache(recs[-1], lines._ids)
1071 def write_new(self, records_commands_list):
1072 if not records_commands_list:
1073 return
1075 model = records_commands_list[0][0].browse()
1076 comodel = model.env[self.comodel_name].with_context(**self.context)
1077 comodel = self._check_sudo_commands(comodel)
1079 ids = {record.id for records, _ in records_commands_list for record in records}
1080 records = model.browse(ids)
1082 def browse(ids):
1083 return comodel.browse([id_ and NewId(id_) for id_ in ids])
1085 # make sure self is in cache
1086 records[self.name]
1088 if self.store:
1089 inverse = self.inverse_name
1091 # make sure self's inverse is in cache
1092 inverse_field = comodel._fields[inverse]
1093 for record in records:
1094 inverse_field._update_cache(record[self.name], record.id)
1096 for recs, commands in records_commands_list:
1097 for command in commands:
1098 if command[0] == Command.CREATE:
1099 for record in recs:
1100 line = comodel.new(command[2], ref=command[1])
1101 line[inverse] = record
1102 elif command[0] == Command.UPDATE:
1103 browse([command[1]]).update(command[2])
1104 elif command[0] == Command.DELETE:
1105 browse([command[1]])[inverse] = False
1106 elif command[0] == Command.UNLINK:
1107 browse([command[1]])[inverse] = False
1108 elif command[0] == Command.LINK:
1109 browse([command[1]])[inverse] = recs[-1]
1110 elif command[0] == Command.CLEAR:
1111 self._update_cache(recs, ())
1112 elif command[0] == Command.SET:
1113 # assign the given lines to the last record only
1114 self._update_cache(recs, ())
1115 last, lines = recs[-1], browse(command[2])
1116 self._update_cache(last, lines._ids)
1117 inverse_field._update_cache(lines, last.id)
1119 else:
1120 def link(record, lines):
1121 ids = record[self.name]._ids
1122 self._update_cache(record, tuple(unique(ids + lines._ids)))
1124 def unlink(lines):
1125 for record in records:
1126 self._update_cache(record, (record[self.name] - lines)._ids)
1128 for recs, commands in records_commands_list:
1129 for command in commands:
1130 if command[0] == Command.CREATE:
1131 for record in recs:
1132 link(record, comodel.new(command[2], ref=command[1]))
1133 elif command[0] == Command.UPDATE:
1134 browse([command[1]]).update(command[2])
1135 elif command[0] == Command.DELETE:
1136 unlink(browse([command[1]]))
1137 elif command[0] == Command.UNLINK:
1138 unlink(browse([command[1]]))
1139 elif command[0] == Command.LINK:
1140 link(recs[-1], browse([command[1]]))
1141 elif command[0] in (Command.CLEAR, Command.SET):
1142 # assign the given lines to the last record only
1143 self._update_cache(recs, ())
1144 lines = browse(command[2] if command[0] == Command.SET else [])
1145 self._update_cache(recs[-1], lines._ids)
1147 def _get_query_for_condition_value(self, model: BaseModel, comodel: BaseModel, operator, value) -> Query:
1148 inverse_field = comodel._fields[self.inverse_name]
1149 if inverse_field not in comodel.env.registry.not_null_fields: 1149 ↛ 1163line 1149 didn't jump to line 1163 because the condition on line 1149 was always true
1150 # In the condition, one must avoid subqueries to return
1151 # NULL values, since it makes the IN test NULL instead
1152 # of FALSE. This may discard expected results, as for
1153 # instance "id NOT IN (42, NULL)" is never TRUE.
1154 if isinstance(value, Domain):
1155 value &= Domain(inverse_field.name, 'not in', {False})
1156 else:
1157 coquery = super()._get_query_for_condition_value(model, comodel, operator, value)
1158 coquery.add_where(SQL(
1159 "%s IS NOT NULL",
1160 comodel._field_to_sql(coquery.table, inverse_field.name, coquery),
1161 ))
1162 return coquery
1163 return super()._get_query_for_condition_value(model, comodel, operator, value)
1165 def _condition_to_sql_relational(self, model: BaseModel, alias: str, exists: bool, coquery: Query, query: Query) -> SQL:
1166 if coquery.is_empty(): 1166 ↛ 1167line 1166 didn't jump to line 1167 because the condition on line 1166 was never true
1167 return Domain(not exists)._to_sql(model, alias, query)
1169 comodel = model.env[self.comodel_name].sudo()
1170 inverse_field = comodel._fields[self.inverse_name]
1171 if not inverse_field.store: 1171 ↛ 1174line 1171 didn't jump to line 1174 because the condition on line 1171 was never true
1172 # determine ids1 in model related to ids2
1173 # TODO should we support this in the future?
1174 recs = comodel.browse(coquery).with_context(prefetch_fields=False)
1175 if inverse_field.relational:
1176 inverses = inverse_field.__get__(recs)
1177 else:
1178 # int values, map them
1179 inverses = model.browse(inverse_field.__get__(rec) for rec in recs)
1180 subselect = inverses._as_query(ordered=False).subselect()
1181 return SQL(
1182 "%s%s%s",
1183 SQL.identifier(alias, 'id'),
1184 SQL_OPERATORS['in' if exists else 'not in'],
1185 subselect,
1186 )
1188 subselect = coquery.subselect(
1189 SQL("%s AS __inverse", comodel._field_to_sql(coquery.table, inverse_field.name, coquery))
1190 )
1191 return SQL(
1192 "%sEXISTS(SELECT FROM %s AS __sub WHERE __inverse = %s)",
1193 SQL() if exists else SQL("NOT "),
1194 subselect,
1195 SQL.identifier(alias, 'id'),
1196 )
1199class Many2many(_RelationalMulti):
1200 """ Many2many field; the value of such a field is the recordset.
1202 :param str comodel_name: name of the target model (string)
1203 mandatory except in the case of related or extended fields
1205 :param str relation: optional name of the table that stores the relation in
1206 the database
1208 :param str column1: optional name of the column referring to "these" records
1209 in the table ``relation``
1211 :param str column2: optional name of the column referring to "those" records
1212 in the table ``relation``
1214 The attributes ``relation``, ``column1`` and ``column2`` are optional.
1215 If not given, names are automatically generated from model names,
1216 provided ``model_name`` and ``comodel_name`` are different!
1218 Note that having several fields with implicit relation parameters on a
1219 given model with the same comodel is not accepted by the ORM, since
1220 those field would use the same table. The ORM prevents two many2many
1221 fields to use the same relation parameters, except if
1223 - both fields use the same model, comodel, and relation parameters are
1224 explicit; or
1226 - at least one field belongs to a model with ``_auto = False``.
1228 :param domain: an optional domain to set on candidate values on the
1229 client side (domain or a python expression that will be evaluated
1230 to provide domain)
1232 :param dict context: an optional context to use on the client side when
1233 handling that field
1235 :param bool check_company: Mark the field to be verified in
1236 :meth:`~odoo.models.Model._check_company`. Add a default company
1237 domain depending on the field attributes.
1239 """
1240 type = 'many2many'
1242 _explicit: bool = True # whether schema is explicitly given
1243 relation: str | None = None # name of table
1244 column1: str | None = None # column of table referring to model
1245 column2: str | None = None # column of table referring to comodel
1246 ondelete: OnDelete | None = 'cascade' # optional ondelete for the column2 fkey
1248 def __init__(self, comodel_name: str | Sentinel = SENTINEL, relation: str | Sentinel = SENTINEL,
1249 column1: str | Sentinel = SENTINEL, column2: str | Sentinel = SENTINEL,
1250 string: str | Sentinel = SENTINEL, **kwargs):
1251 super().__init__(
1252 comodel_name=comodel_name,
1253 relation=relation,
1254 column1=column1,
1255 column2=column2,
1256 string=string,
1257 **kwargs
1258 )
1260 def setup_nonrelated(self, model: BaseModel) -> None:
1261 super().setup_nonrelated(model)
1262 # 2 cases:
1263 # 1) The ondelete attribute is defined and its definition makes sense
1264 # 2) The ondelete attribute is explicitly defined as 'set null' for a m2m,
1265 # this is considered a programming error.
1266 if self.ondelete not in ('cascade', 'restrict'): 1266 ↛ 1267line 1266 didn't jump to line 1267 because the condition on line 1266 was never true
1267 raise ValueError(
1268 "The m2m field %s of model %s declares its ondelete policy "
1269 "as being %r. Only 'restrict' and 'cascade' make sense."
1270 % (self.name, model._name, self.ondelete)
1271 )
1272 if self.store:
1273 if not (self.relation and self.column1 and self.column2):
1274 if not self.relation:
1275 self._explicit = False
1276 # table name is based on the stable alphabetical order of tables
1277 comodel = model.env[self.comodel_name]
1278 if not self.relation:
1279 tables = sorted([model._table, comodel._table])
1280 assert tables[0] != tables[1], \
1281 "%s: Implicit/canonical naming of many2many relationship " \
1282 "table is not possible when source and destination models " \
1283 "are the same" % self
1284 self.relation = '%s_%s_rel' % tuple(tables)
1285 if not self.column1:
1286 self.column1 = '%s_id' % model._table
1287 if not self.column2:
1288 self.column2 = '%s_id' % comodel._table
1289 # check validity of table name
1290 check_pg_name(self.relation)
1291 else:
1292 self.relation = self.column1 = self.column2 = None
1294 if self.relation:
1295 # check whether other fields use the same schema
1296 fields = model.pool.many2many_relations[self.relation, self.column1, self.column2]
1297 for mname, fname in fields:
1298 field = model.pool[mname]._fields[fname]
1299 if ( 1299 ↛ 1310line 1299 didn't jump to line 1310 because the condition on line 1299 was always true
1300 field is self
1301 ) or ( # same model: relation parameters must be explicit
1302 self.model_name == field.model_name and
1303 self.comodel_name == field.comodel_name and
1304 self._explicit and field._explicit
1305 ) or ( # different models: one model must be _auto=False
1306 self.model_name != field.model_name and
1307 not (model._auto and model.env[field.model_name]._auto)
1308 ):
1309 continue
1310 msg = "Many2many fields %s and %s use the same table and columns"
1311 raise TypeError(msg % (self, field))
1312 fields.add((self.model_name, self.name))
1314 def setup_inverses(self, registry, inverses):
1315 if self.relation:
1316 # retrieve inverse fields, and link them in field_inverses
1317 for mname, fname in registry.many2many_relations[self.relation, self.column2, self.column1]:
1318 field = registry[mname]._fields[fname]
1319 inverses.add(self, field)
1320 inverses.add(field, self)
1322 def update_db(self, model, columns):
1323 cr = model.env.cr
1324 # Do not reflect relations for custom fields, as they do not belong to a
1325 # module. They are automatically removed when dropping the corresponding
1326 # 'ir.model.field'.
1327 if not self.manual: 1327 ↛ 1330line 1327 didn't jump to line 1330 because the condition on line 1327 was always true
1328 model.pool.post_init(model.env['ir.model.relation']._reflect_relation,
1329 model, self.relation, self._module)
1330 comodel = model.env[self.comodel_name]
1331 if not sql.table_exists(cr, self.relation):
1332 cr.execute(SQL(
1333 """ CREATE TABLE %(rel)s (%(id1)s INTEGER NOT NULL,
1334 %(id2)s INTEGER NOT NULL,
1335 PRIMARY KEY(%(id1)s, %(id2)s));
1336 COMMENT ON TABLE %(rel)s IS %(comment)s;
1337 CREATE INDEX ON %(rel)s (%(id2)s, %(id1)s); """,
1338 rel=SQL.identifier(self.relation),
1339 id1=SQL.identifier(self.column1),
1340 id2=SQL.identifier(self.column2),
1341 comment=f"RELATION BETWEEN {model._table} AND {comodel._table}",
1342 ))
1343 _schema.debug("Create table %r: m2m relation between %r and %r", self.relation, model._table, comodel._table)
1344 model.pool.post_init(self.update_db_foreign_keys, model)
1345 return True
1347 model.pool.post_init(self.update_db_foreign_keys, model)
1349 def update_db_foreign_keys(self, model):
1350 """ Add the foreign keys corresponding to the field's relation table. """
1351 comodel = model.env[self.comodel_name]
1352 if model._is_an_ordinary_table(): 1352 ↛ 1357line 1352 didn't jump to line 1357 because the condition on line 1352 was always true
1353 model.pool.add_foreign_key(
1354 self.relation, self.column1, model._table, 'id', 'cascade',
1355 model, self._module, force=False,
1356 )
1357 if comodel._is_an_ordinary_table(): 1357 ↛ exitline 1357 didn't return from function 'update_db_foreign_keys' because the condition on line 1357 was always true
1358 model.pool.add_foreign_key(
1359 self.relation, self.column2, comodel._table, 'id', self.ondelete,
1360 model, self._module,
1361 )
1363 def read(self, records):
1364 context = {'active_test': False}
1365 context.update(self.context)
1366 comodel = records.env[self.comodel_name].with_context(**context)
1368 # bypass the access during search if method is overwriten to avoid
1369 # possibly filtering all records of the comodel before joining
1370 filter_access = self.bypass_search_access and type(comodel)._search is not BaseModel._search
1372 # make the query for the lines
1373 domain = self.get_comodel_domain(records)
1374 try:
1375 query = comodel._search(domain, order=comodel._order, bypass_access=filter_access)
1376 except AccessError as e:
1377 raise AccessError(records.env._("Failed to read field %s", self) + '\n' + str(e)) from e
1379 # join with many2many relation table
1380 sql_id1 = SQL.identifier(self.relation, self.column1)
1381 sql_id2 = SQL.identifier(self.relation, self.column2)
1382 query.add_join('JOIN', self.relation, None, SQL(
1383 "%s = %s", sql_id2, SQL.identifier(comodel._table, 'id'),
1384 ))
1385 query.add_where(SQL("%s IN %s", sql_id1, tuple(records.ids)))
1387 # retrieve pairs (record, line) and group by record
1388 group = defaultdict(list)
1389 for id1, id2 in records.env.execute_query(query.select(sql_id1, sql_id2)):
1390 group[id1].append(id2)
1392 # filter using record rules
1393 if filter_access and group: 1393 ↛ 1394line 1393 didn't jump to line 1394 because the condition on line 1393 was never true
1394 corecord_ids = OrderedSet(id_ for ids in group.values() for id_ in ids)
1395 accessible_corecords = comodel.browse(corecord_ids)._filtered_access('read')
1396 if len(accessible_corecords) < len(corecord_ids):
1397 # some records are inaccessible, remove them from groups
1398 corecord_ids = set(accessible_corecords._ids)
1399 for id1, ids in group.items():
1400 group[id1] = [id_ for id_ in ids if id_ in corecord_ids]
1402 # store result in cache
1403 values = [tuple(group[id_]) for id_ in records._ids]
1404 self._insert_cache(records, values)
1406 def write_real(self, records_commands_list, create=False):
1407 # records_commands_list = [(records, commands), ...]
1408 if not records_commands_list: 1408 ↛ 1409line 1408 didn't jump to line 1409 because the condition on line 1408 was never true
1409 return
1411 model = records_commands_list[0][0].browse()
1412 comodel = model.env[self.comodel_name].with_context(**self.context)
1413 comodel = self._check_sudo_commands(comodel)
1414 cr = model.env.cr
1416 # determine old and new relation {x: ys}
1417 set = OrderedSet
1418 ids = set(rid for recs, cs in records_commands_list for rid in recs.ids)
1419 records = model.browse(ids)
1421 if self.store:
1422 # Using `record[self.name]` generates 2 SQL queries when the value
1423 # is not in cache: one that actually checks access rules for
1424 # records, and the other one fetching the actual data. We use
1425 # `self.read` instead to shortcut the first query.
1426 missing_ids = tuple(self._cache_missing_ids(records))
1427 if missing_ids:
1428 self.read(records.browse(missing_ids))
1430 # determine new relation {x: ys}
1431 old_relation = {record.id: set(record[self.name]._ids) for record in records}
1432 new_relation = {x: set(ys) for x, ys in old_relation.items()}
1434 # operations on new relation
1435 def relation_add(xs, y):
1436 for x in xs:
1437 new_relation[x].add(y)
1439 def relation_remove(xs, y):
1440 for x in xs:
1441 new_relation[x].discard(y)
1443 def relation_set(xs, ys):
1444 for x in xs:
1445 new_relation[x] = set(ys)
1447 def relation_delete(ys):
1448 # the pairs (x, y) have been cascade-deleted from relation
1449 for ys1 in old_relation.values():
1450 ys1 -= ys
1451 for ys1 in new_relation.values():
1452 ys1 -= ys
1454 for recs, commands in records_commands_list:
1455 to_create = [] # line vals to create
1456 to_delete = [] # line ids to delete
1457 for command in (commands or ()):
1458 if not isinstance(command, (list, tuple)) or not command: 1458 ↛ 1459line 1458 didn't jump to line 1459 because the condition on line 1458 was never true
1459 continue
1460 if command[0] == Command.CREATE: 1460 ↛ 1461line 1460 didn't jump to line 1461 because the condition on line 1460 was never true
1461 to_create.append((recs._ids, command[2]))
1462 elif command[0] == Command.UPDATE: 1462 ↛ 1463line 1462 didn't jump to line 1463 because the condition on line 1462 was never true
1463 prefetch_ids = recs[self.name]._prefetch_ids
1464 comodel.browse(command[1]).with_prefetch(prefetch_ids).write(command[2])
1465 elif command[0] == Command.DELETE: 1465 ↛ 1466line 1465 didn't jump to line 1466 because the condition on line 1465 was never true
1466 to_delete.append(command[1])
1467 elif command[0] == Command.UNLINK:
1468 relation_remove(recs._ids, command[1])
1469 elif command[0] == Command.LINK:
1470 relation_add(recs._ids, command[1])
1471 elif command[0] in (Command.CLEAR, Command.SET): 1471 ↛ 1457line 1471 didn't jump to line 1457 because the condition on line 1471 was always true
1472 # new lines must no longer be linked to records
1473 to_create = [(set(ids) - set(recs._ids), vals) for (ids, vals) in to_create]
1474 relation_set(recs._ids, command[2] if command[0] == Command.SET else ())
1476 if to_create: 1476 ↛ 1478line 1476 didn't jump to line 1478 because the condition on line 1476 was never true
1477 # create lines in batch, and link them
1478 lines = comodel.create([vals for ids, vals in to_create])
1479 for line, (ids, _vals) in zip(lines, to_create):
1480 relation_add(ids, line.id)
1482 if to_delete: 1482 ↛ 1484line 1482 didn't jump to line 1484 because the condition on line 1482 was never true
1483 # delete lines in batch
1484 comodel.browse(to_delete).unlink()
1485 relation_delete(to_delete)
1487 # check comodel access of added records
1488 # we check the su flag of the environment of records, because su may be
1489 # disabled on the comodel
1490 if not model.env.su: 1490 ↛ 1491line 1490 didn't jump to line 1491 because the condition on line 1490 was never true
1491 try:
1492 comodel.browse(
1493 co_id
1494 for rec_id, new_co_ids in new_relation.items()
1495 for co_id in new_co_ids - old_relation[rec_id]
1496 ).check_access('read')
1497 except AccessError as e:
1498 raise AccessError(model.env._("Failed to write field %s", self) + "\n" + str(e))
1500 # update the cache of self
1501 for record in records:
1502 self._update_cache(record, tuple(new_relation[record.id]))
1504 # determine the corecords for which the relation has changed
1505 modified_corecord_ids = set()
1507 # process pairs to add (beware of duplicates)
1508 pairs = [(x, y) for x, ys in new_relation.items() for y in ys - old_relation[x]]
1509 if pairs:
1510 if self.store:
1511 cr.execute(SQL(
1512 "INSERT INTO %s (%s, %s) VALUES %s ON CONFLICT DO NOTHING",
1513 SQL.identifier(self.relation),
1514 SQL.identifier(self.column1),
1515 SQL.identifier(self.column2),
1516 SQL(", ").join(pairs),
1517 ))
1519 # update the cache of inverse fields
1520 y_to_xs = defaultdict(set)
1521 for x, y in pairs:
1522 y_to_xs[y].add(x)
1523 modified_corecord_ids.add(y)
1524 for invf in records.pool.field_inverses[self]:
1525 domain = invf.get_comodel_domain(comodel)
1526 valid_ids = set(records.filtered_domain(domain)._ids)
1527 if not valid_ids: 1527 ↛ 1528line 1527 didn't jump to line 1528 because the condition on line 1527 was never true
1528 continue
1529 inv_cache = invf._get_cache(comodel.env)
1530 for y, xs in y_to_xs.items():
1531 corecord = comodel.browse(y)
1532 try:
1533 ids0 = inv_cache[corecord.id]
1534 ids1 = tuple(set(ids0) | (xs & valid_ids))
1535 invf._update_cache(corecord, ids1)
1536 except KeyError:
1537 pass
1539 # process pairs to remove
1540 pairs = [(x, y) for x, ys in old_relation.items() for y in ys - new_relation[x]]
1541 if pairs:
1542 y_to_xs = defaultdict(set)
1543 for x, y in pairs:
1544 y_to_xs[y].add(x)
1545 modified_corecord_ids.add(y)
1547 if self.store:
1548 # express pairs as the union of cartesian products:
1549 # pairs = [(1, 11), (1, 12), (1, 13), (2, 11), (2, 12), (2, 14)]
1550 # -> y_to_xs = {11: {1, 2}, 12: {1, 2}, 13: {1}, 14: {2}}
1551 # -> xs_to_ys = {{1, 2}: {11, 12}, {2}: {14}, {1}: {13}}
1552 xs_to_ys = defaultdict(set)
1553 for y, xs in y_to_xs.items():
1554 xs_to_ys[frozenset(xs)].add(y)
1555 # delete the rows where (id1 IN xs AND id2 IN ys) OR ...
1556 cr.execute(SQL(
1557 "DELETE FROM %s WHERE %s",
1558 SQL.identifier(self.relation),
1559 SQL(" OR ").join(
1560 SQL("%s IN %s AND %s IN %s",
1561 SQL.identifier(self.column1), tuple(xs),
1562 SQL.identifier(self.column2), tuple(ys))
1563 for xs, ys in xs_to_ys.items()
1564 ),
1565 ))
1567 # update the cache of inverse fields
1568 for invf in records.pool.field_inverses[self]:
1569 inv_cache = invf._get_cache(comodel.env)
1570 for y, xs in y_to_xs.items():
1571 corecord = comodel.browse(y)
1572 try:
1573 ids0 = inv_cache[corecord.id]
1574 ids1 = tuple(id_ for id_ in ids0 if id_ not in xs)
1575 invf._update_cache(corecord, ids1)
1576 except KeyError:
1577 pass
1579 if modified_corecord_ids:
1580 # trigger the recomputation of fields that depend on the inverse
1581 # fields of self on the modified corecords
1582 corecords = comodel.browse(modified_corecord_ids)
1583 corecords.modified([
1584 invf.name
1585 for invf in model.pool.field_inverses[self]
1586 if invf.model_name == self.comodel_name
1587 ])
1589 def write_new(self, records_commands_list):
1590 """ Update self on new records. """
1591 if not records_commands_list: 1591 ↛ 1592line 1591 didn't jump to line 1592 because the condition on line 1591 was never true
1592 return
1594 model = records_commands_list[0][0].browse()
1595 comodel = model.env[self.comodel_name].with_context(**self.context)
1596 comodel = self._check_sudo_commands(comodel)
1597 new = lambda id_: id_ and NewId(id_)
1599 # determine old and new relation {x: ys}
1600 set = OrderedSet
1601 old_relation = {record.id: set(record[self.name]._ids) for records, _ in records_commands_list for record in records}
1602 new_relation = {x: set(ys) for x, ys in old_relation.items()}
1604 for recs, commands in records_commands_list:
1605 for command in commands:
1606 if not isinstance(command, (list, tuple)) or not command: 1606 ↛ 1607line 1606 didn't jump to line 1607 because the condition on line 1606 was never true
1607 continue
1608 if command[0] == Command.CREATE: 1608 ↛ 1609line 1608 didn't jump to line 1609 because the condition on line 1608 was never true
1609 line_id = comodel.new(command[2], ref=command[1]).id
1610 for line_ids in new_relation.values():
1611 line_ids.add(line_id)
1612 elif command[0] == Command.UPDATE: 1612 ↛ 1613line 1612 didn't jump to line 1613 because the condition on line 1612 was never true
1613 line_id = new(command[1])
1614 comodel.browse([line_id]).update(command[2])
1615 elif command[0] == Command.DELETE: 1615 ↛ 1616line 1615 didn't jump to line 1616 because the condition on line 1615 was never true
1616 line_id = new(command[1])
1617 for line_ids in new_relation.values():
1618 line_ids.discard(line_id)
1619 elif command[0] == Command.UNLINK: 1619 ↛ 1620line 1619 didn't jump to line 1620 because the condition on line 1619 was never true
1620 line_id = new(command[1])
1621 for line_ids in new_relation.values():
1622 line_ids.discard(line_id)
1623 elif command[0] == Command.LINK: 1623 ↛ 1624line 1623 didn't jump to line 1624 because the condition on line 1623 was never true
1624 line_id = new(command[1])
1625 for line_ids in new_relation.values():
1626 line_ids.add(line_id)
1627 elif command[0] in (Command.CLEAR, Command.SET): 1627 ↛ 1605line 1627 didn't jump to line 1605 because the condition on line 1627 was always true
1628 # new lines must no longer be linked to records
1629 line_ids = command[2] if command[0] == Command.SET else ()
1630 line_ids = set(new(line_id) for line_id in line_ids)
1631 for id_ in recs._ids:
1632 new_relation[id_] = set(line_ids)
1634 if new_relation == old_relation:
1635 return
1637 records = model.browse(old_relation)
1639 # update the cache of self
1640 for record in records:
1641 self._update_cache(record, tuple(new_relation[record.id]))
1643 # determine the corecords for which the relation has changed
1644 modified_corecord_ids = set()
1646 # process pairs to add (beware of duplicates)
1647 pairs = [(x, y) for x, ys in new_relation.items() for y in ys - old_relation[x]]
1648 if pairs: 1648 ↛ 1670line 1648 didn't jump to line 1670 because the condition on line 1648 was always true
1649 # update the cache of inverse fields
1650 y_to_xs = defaultdict(set)
1651 for x, y in pairs:
1652 y_to_xs[y].add(x)
1653 modified_corecord_ids.add(y)
1654 for invf in records.pool.field_inverses[self]: 1654 ↛ 1655line 1654 didn't jump to line 1655 because the loop on line 1654 never started
1655 domain = invf.get_comodel_domain(comodel)
1656 valid_ids = set(records.filtered_domain(domain)._ids)
1657 if not valid_ids:
1658 continue
1659 inv_cache = invf._get_cache(comodel.env)
1660 for y, xs in y_to_xs.items():
1661 corecord = comodel.browse((y,))
1662 try:
1663 ids0 = inv_cache[corecord.id]
1664 ids1 = tuple(set(ids0) | (xs & valid_ids))
1665 invf._update_cache(corecord, ids1)
1666 except KeyError:
1667 pass
1669 # process pairs to remove
1670 pairs = [(x, y) for x, ys in old_relation.items() for y in ys - new_relation[x]]
1671 if pairs: 1671 ↛ 1673line 1671 didn't jump to line 1673 because the condition on line 1671 was never true
1672 # update the cache of inverse fields
1673 y_to_xs = defaultdict(set)
1674 for x, y in pairs:
1675 y_to_xs[y].add(x)
1676 modified_corecord_ids.add(y)
1677 for invf in records.pool.field_inverses[self]:
1678 inv_cache = invf._get_cache(comodel.env)
1679 for y, xs in y_to_xs.items():
1680 corecord = comodel.browse((y,))
1681 try:
1682 ids0 = inv_cache[corecord.id]
1683 ids1 = tuple(id_ for id_ in ids0 if id_ not in xs)
1684 invf._update_cache(corecord, ids1)
1685 except KeyError:
1686 pass
1688 if modified_corecord_ids: 1688 ↛ exitline 1688 didn't return from function 'write_new' because the condition on line 1688 was always true
1689 # trigger the recomputation of fields that depend on the inverse
1690 # fields of self on the modified corecords
1691 corecords = comodel.browse(modified_corecord_ids)
1692 corecords.modified([
1693 invf.name
1694 for invf in model.pool.field_inverses[self]
1695 if invf.model_name == self.comodel_name
1696 ])
1698 def _condition_to_sql_relational(self, model: BaseModel, alias: str, exists: bool, coquery: Query, query: Query) -> SQL:
1699 if coquery.is_empty(): 1699 ↛ 1700line 1699 didn't jump to line 1700 because the condition on line 1699 was never true
1700 return SQL("FALSE") if exists else SQL("TRUE")
1701 rel_table, rel_id1, rel_id2 = self.relation, self.column1, self.column2
1702 rel_alias = query.make_alias(alias, self.name)
1703 if not coquery.where_clause:
1704 # case: no constraints on table and we have foreign keys
1705 # so we can inverse the operator and check existence
1706 exists = not exists
1707 return SQL(
1708 "%sEXISTS (SELECT 1 FROM %s AS %s WHERE %s = %s)",
1709 SQL("NOT ") if exists else SQL(),
1710 SQL.identifier(rel_table),
1711 SQL.identifier(rel_alias),
1712 SQL.identifier(rel_alias, rel_id1),
1713 SQL.identifier(alias, 'id'),
1714 )
1715 return SQL(
1716 "%sEXISTS (SELECT 1 FROM %s AS %s WHERE %s = %s AND %s IN %s)",
1717 SQL("NOT ") if not exists else SQL(),
1718 SQL.identifier(rel_table),
1719 SQL.identifier(rel_alias),
1720 SQL.identifier(rel_alias, rel_id1),
1721 SQL.identifier(alias, 'id'),
1722 SQL.identifier(rel_alias, rel_id2),
1723 coquery.subselect(),
1724 )
1727class PrefetchMany2one(Reversible):
1728 """ Iterable for the values of a many2one field on the prefetch set of a given record. """
1729 __slots__ = ('field', 'record')
1731 def __init__(self, record: BaseModel, field: Many2one):
1732 self.record = record
1733 self.field = field
1735 def __iter__(self):
1736 field_cache = self.field._get_cache(self.record.env)
1737 return unique(
1738 coid for id_ in self.record._prefetch_ids
1739 if (coid := field_cache.get(id_)) is not None
1740 )
1742 def __reversed__(self):
1743 field_cache = self.field._get_cache(self.record.env)
1744 return unique(
1745 coid for id_ in reversed(self.record._prefetch_ids)
1746 if (coid := field_cache.get(id_)) is not None
1747 )
1750class PrefetchX2many(Reversible):
1751 """ Iterable for the values of an x2many field on the prefetch set of a given record. """
1752 __slots__ = ('field', 'record')
1754 def __init__(self, record: BaseModel, field: _RelationalMulti):
1755 self.record = record
1756 self.field = field
1758 def __iter__(self):
1759 field_cache = self.field._get_cache(self.record.env)
1760 return unique(
1761 coid
1762 for id_ in self.record._prefetch_ids
1763 for coid in field_cache.get(id_, ())
1764 )
1766 def __reversed__(self):
1767 field_cache = self.field._get_cache(self.record.env)
1768 return unique(
1769 coid
1770 for id_ in reversed(self.record._prefetch_ids)
1771 for coid in field_cache.get(id_, ())
1772 )