Coverage for adhoc-cicd-odoo-odoo / odoo / orm / fields_relational.py: 75%

1050 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:05 +0000

1from __future__ import annotations 

2 

3import itertools 

4import logging 

5import typing 

6from collections import defaultdict 

7from collections.abc import Reversible 

8from operator import attrgetter 

9 

10from odoo.exceptions import AccessError, MissingError, UserError 

11from odoo.tools import SQL, OrderedSet, Query, sql, unique 

12from odoo.tools.constants import PREFETCH_MAX 

13from odoo.tools.misc import SENTINEL, Sentinel, unquote 

14 

15from .commands import Command 

16from .domains import Domain 

17from .fields import IR_MODELS, Field, _logger 

18from .fields_reference import Many2oneReference 

19from .identifiers import NewId 

20from .models import BaseModel 

21from .utils import COLLECTION_TYPES, SQL_OPERATORS, check_pg_name 

22 

23if typing.TYPE_CHECKING: 

24 from collections.abc import Sequence 

25 from odoo.tools.misc import Collector 

26 from .types import CommandValue, ContextType, DomainType, Environment, Registry 

27 

28 OnDelete = typing.Literal['cascade', 'set null', 'restrict'] 

29 

30_schema = logging.getLogger('odoo.schema') 

31 

32 

33class _Relational(Field[BaseModel]): 

34 """ Abstract class for relational fields. """ 

35 relational: typing.Literal[True] = True 

36 comodel_name: str 

37 domain: DomainType = [] # domain for searching values 

38 context: ContextType = {} # context for searching values 

39 bypass_search_access: bool = False # whether access rights are bypassed on the comodel 

40 check_company: bool = False 

41 

42 def __get__(self, records: BaseModel, owner=None): 

43 # base case: do the regular access 

44 if records is None or len(records._ids) <= 1: 

45 return super().__get__(records, owner) 

46 

47 records._check_field_access(self, 'read') 

48 

49 # multi-record case 

50 if self.compute and self.store: 

51 self.recompute(records) 

52 

53 # get the cache 

54 env = records.env 

55 field_cache = self._get_cache(env) 

56 

57 # retrieve values in cache, and fetch missing ones 

58 vals = [] 

59 for record_id in records._ids: 

60 try: 

61 vals.append(field_cache[record_id]) 

62 except KeyError: 

63 if self.store and record_id and len(vals) < len(records) - PREFETCH_MAX: 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was never true

64 # a lot of missing records, just fetch that field 

65 remaining = records[len(vals):] 

66 remaining.fetch([self.name]) 

67 # fetch does not raise MissingError, check value 

68 if record_id not in field_cache: 

69 raise MissingError("\n".join([ 

70 env._("Record does not exist or has been deleted."), 

71 env._("(Record: %(record)s, User: %(user)s)", record=record_id, user=env.uid), 

72 ])) from None 

73 else: 

74 remaining = records.__class__(env, (record_id,), records._prefetch_ids) 

75 super().__get__(remaining, owner) 

76 # we have the record now 

77 vals.append(field_cache[record_id]) 

78 

79 return self.convert_to_record_multi(vals, records) 

80 

81 def _update_inverse(self, records: BaseModel, value: BaseModel): 

82 """ Update the cached value of ``self`` for ``records`` with ``value``. """ 

83 raise NotImplementedError 

84 

85 def convert_to_record_multi(self, values, records): 

86 """ Convert a list of (relational field) values from the cache format to 

87 the record format, for the sake of optimization. 

88 """ 

89 raise NotImplementedError 

90 

91 def setup_nonrelated(self, model): 

92 super().setup_nonrelated(model) 

93 assert self.comodel_name in model.pool, \ 

94 f"Field {self} with unknown comodel_name {self.comodel_name or '???'!r}" 

95 

96 def setup_inverses(self, registry: Registry, inverses: Collector[Field, Field]): 

97 """ Populate ``inverses`` with ``self`` and its inverse fields. """ 

98 

99 def get_comodel_domain(self, model: BaseModel) -> Domain: 

100 """ Return a domain from the domain attribute. """ 

101 domain = self.domain 

102 if callable(domain): 

103 # the callable can return either a list, Domain or a string 

104 domain = domain(model) 

105 if not domain or isinstance(domain, str): 

106 # if we don't have a domain or 

107 # domain=str is used only for the client-side 

108 return Domain.TRUE 

109 return Domain(domain) 

110 

111 @property 

112 def _related_domain(self) -> DomainType | None: 

113 def validated(domain): 

114 if isinstance(domain, str) and not self.inherited: 

115 # string domains are expressions that are not valid for self's model 

116 return None 

117 return domain 

118 

119 if callable(self.domain): 

120 # will be called with another model than self's 

121 return lambda recs: validated(self.domain(recs.env[self.model_name])) # pylint: disable=not-callable 

122 else: 

123 return validated(self.domain) 

124 

125 _related_context = property(attrgetter('context')) 

126 

127 _description_relation = property(attrgetter('comodel_name')) 

128 _description_context = property(attrgetter('context')) 

129 

130 def _description_domain(self, env: Environment) -> str | list: 

131 domain = self._internal_description_domain_raw(env) 

132 if self.check_company: 

133 field_to_check = None 

134 if self.company_dependent: 

135 cids = '[allowed_company_ids[0]]' 

136 elif self.model_name == 'res.company': 

137 # when using check_company=True on a field on 'res.company', the 

138 # company_id comes from the id of the current record 

139 cids = '[id]' 

140 elif 'company_id' in env[self.model_name]: 

141 cids = '[company_id]' 

142 field_to_check = 'company_id' 

143 elif 'company_ids' in env[self.model_name]: 143 ↛ 147line 143 didn't jump to line 147 because the condition on line 143 was always true

144 cids = 'company_ids' 

145 field_to_check = 'company_ids' 

146 else: 

147 _logger.warning(env._( 

148 "Couldn't generate a company-dependent domain for field %s. " 

149 "The model doesn't have a 'company_id' or 'company_ids' field, and isn't company-dependent either.", 

150 self.model_name + '.' + self.name, 

151 )) 

152 return domain 

153 company_domain = env[self.comodel_name]._check_company_domain(companies=unquote(cids)) 

154 if not field_to_check: 

155 return f"{company_domain} + {domain or []}" 

156 else: 

157 no_company_domain = env[self.comodel_name]._check_company_domain(companies='') 

158 return f"({field_to_check} and {company_domain} or {no_company_domain}) + ({domain or []})" 

159 return domain 

160 

161 def _description_allow_hierachy_operators(self, env): 

162 """ Return if the child_of/parent_of makes sense on this field """ 

163 comodel = env[self.comodel_name] 

164 return comodel._parent_name in comodel._fields 

165 

166 def _internal_description_domain_raw(self, env) -> str | list: 

167 domain = self.domain 

168 if callable(domain): 

169 domain = domain(env[self.model_name]) 

170 if isinstance(domain, Domain): 

171 domain = list(domain) 

172 return domain or [] 

173 

174 def filter_function(self, records, field_expr, operator, value): 

175 getter = self.expression_getter(field_expr) 

176 

177 if (self.bypass_search_access or operator == 'any!') and not records.env.su: 177 ↛ 181line 177 didn't jump to line 181 because the condition on line 177 was never true

178 # When filtering with bypass access, search the corecords with sudo 

179 # and a special key in the context. To evaluate sub-domains, the 

180 # special key makes the environment un-sudoed before evaluation. 

181 expr_getter = getter 

182 sudo_env = records.sudo().with_context(filter_function_reset_sudo=True).env 

183 getter = lambda rec: expr_getter(rec.with_env(sudo_env)) # noqa: E731 

184 

185 corecords = getter(records) 

186 if operator in ('any', 'any!'): 

187 assert isinstance(value, Domain) 

188 if operator == 'any' and records.env.context.get('filter_function_reset_sudo'): 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 corecords = corecords.sudo(False)._filtered_access('read') 

190 corecords = corecords.filtered_domain(value) 

191 elif operator == 'in' and isinstance(value, COLLECTION_TYPES): 191 ↛ 204line 191 didn't jump to line 204 because the condition on line 191 was always true

192 value = set(value) 

193 if False in value: 

194 if not corecords: 

195 # shortcut, we know none of records has a corecord 

196 return lambda _: True 

197 if len(value) > 1: 

198 value.discard(False) 

199 filter_values = self.filter_function(records, field_expr, 'in', value) 

200 return lambda rec: not getter(rec) or filter_values(rec) 

201 return lambda rec: not getter(rec) 

202 corecords = corecords.filtered_domain(Domain('id', 'in', value)) 

203 else: 

204 corecords = corecords.filtered_domain(Domain('id', operator, value)) 

205 

206 if not corecords: 

207 return lambda _: False 

208 

209 ids = set(corecords._ids) 

210 return lambda rec: any(id_ in ids for val in getter(rec) for id_ in val._ids) 

211 

212 

213class Many2one(_Relational): 

214 """ The value of such a field is a recordset of size 0 (no 

215 record) or 1 (a single record). 

216 

217 :param str comodel_name: name of the target model 

218 ``Mandatory`` except for related or extended fields. 

219 

220 :param domain: an optional domain to set on candidate values on the 

221 client side (domain or a python expression that will be evaluated 

222 to provide domain) 

223 

224 :param dict context: an optional context to use on the client side when 

225 handling that field 

226 

227 :param str ondelete: what to do when the referred record is deleted; 

228 possible values are: ``'set null'``, ``'restrict'``, ``'cascade'`` 

229 

230 :param bool bypass_search_access: whether access rights are bypassed on the 

231 comodel (default: ``False``) 

232 

233 :param bool delegate: set it to ``True`` to make fields of the target model 

234 accessible from the current model (corresponds to ``_inherits``) 

235 

236 :param bool check_company: Mark the field to be verified in 

237 :meth:`~odoo.models.Model._check_company`. Has a different behaviour 

238 depending on whether the field is company_dependent or not. 

239 Constrains non-company-dependent fields to target records whose 

240 company_id(s) are compatible with the record's company_id(s). 

241 Constrains company_dependent fields to target records whose 

242 company_id(s) are compatible with the currently active company. 

243 """ 

244 type = 'many2one' 

245 _column_type = ('int4', 'int4') 

246 

247 ondelete: OnDelete | None = None # what to do when value is deleted 

248 delegate: bool = False # whether self implements delegation 

249 

250 def __init__(self, comodel_name: str | Sentinel = SENTINEL, string: str | Sentinel = SENTINEL, **kwargs): 

251 super().__init__(comodel_name=comodel_name, string=string, **kwargs) 

252 

253 def _setup_attrs__(self, model_class, name): 

254 super()._setup_attrs__(model_class, name) 

255 # determine self.delegate 

256 if name in model_class._inherits.values(): 

257 self.delegate = True 

258 # self.delegate implies self.bypass_search_access 

259 self.bypass_search_access = True 

260 elif self.delegate: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 comodel_name = self.comodel_name or 'comodel_name' 

262 raise TypeError(( 

263 f"The delegate field {self} must be declared in the model class e.g.\n" 

264 f"_inherits = {{{comodel_name!r}: {name!r}}}" 

265 )) 

266 

267 def setup_nonrelated(self, model): 

268 super().setup_nonrelated(model) 

269 # 3 cases: 

270 # 1) The ondelete attribute is not defined, we assign it a sensible default 

271 # 2) The ondelete attribute is defined and its definition makes sense 

272 # 3) The ondelete attribute is explicitly defined as 'set null' for a required m2o, 

273 # this is considered a programming error. 

274 if not self.ondelete: 

275 comodel = model.env[self.comodel_name] 

276 if model.is_transient() and not comodel.is_transient(): 

277 # Many2one relations from TransientModel Model are annoying because 

278 # they can block deletion due to foreign keys. So unless stated 

279 # otherwise, we default them to ondelete='cascade'. 

280 self.ondelete = 'cascade' if self.required else 'set null' 

281 else: 

282 self.ondelete = 'restrict' if self.required else 'set null' 

283 if self.ondelete == 'set null' and self.required: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 raise ValueError( 

285 "The m2o field %s of model %s is required but declares its ondelete policy " 

286 "as being 'set null'. Only 'restrict' and 'cascade' make sense." 

287 % (self.name, model._name) 

288 ) 

289 if self.ondelete == 'restrict' and self.comodel_name in IR_MODELS: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 raise ValueError( 

291 f"Field {self.name} of model {model._name} is defined as ondelete='restrict' " 

292 f"while having {self.comodel_name} as comodel, the 'restrict' mode is not " 

293 f"supported for this type of field as comodel." 

294 ) 

295 

296 def update_db(self, model, columns): 

297 comodel = model.env[self.comodel_name] 

298 if not model.is_transient() and comodel.is_transient(): 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true

299 raise ValueError('Many2one %s from Model to TransientModel is forbidden' % self) 

300 return super().update_db(model, columns) 

301 

302 def update_db_column(self, model, column): 

303 super().update_db_column(model, column) 

304 model.pool.post_init(self.update_db_foreign_key, model, column) 

305 

306 def update_db_foreign_key(self, model, column): 

307 if self.company_dependent: 

308 return 

309 comodel = model.env[self.comodel_name] 

310 # foreign keys do not work on views, and users can define custom models on sql views. 

311 if not model._is_an_ordinary_table() or not comodel._is_an_ordinary_table(): 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 return 

313 # ir_actions is inherited, so foreign key doesn't work on it 

314 if not comodel._auto or comodel._table == 'ir_actions': 

315 return 

316 # create/update the foreign key, and reflect it in 'ir.model.constraint' 

317 model.pool.add_foreign_key( 

318 model._table, self.name, comodel._table, 'id', self.ondelete or 'set null', 

319 model, self._module 

320 ) 

321 

322 def _update_inverse(self, records, value): 

323 for record in records: 

324 self._update_cache(record, self.convert_to_cache(value, record, validate=False)) 

325 

326 def convert_to_column(self, value, record, values=None, validate=True): 

327 return value or None 

328 

329 def convert_to_cache(self, value, record, validate=True): 

330 # cache format: id or None 

331 if type(value) is int or type(value) is NewId: 

332 id_ = value 

333 elif isinstance(value, BaseModel): 

334 if validate and (value._name != self.comodel_name or len(value) > 1): 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 raise ValueError("Wrong value for %s: %r" % (self, value)) 

336 id_ = value._ids[0] if value._ids else None 

337 elif isinstance(value, tuple): 337 ↛ 339line 337 didn't jump to line 339 because the condition on line 337 was never true

338 # value is either a pair (id, name), or a tuple of ids 

339 id_ = value[0] if value else None 

340 elif isinstance(value, dict): 340 ↛ 342line 340 didn't jump to line 342 because the condition on line 340 was never true

341 # return a new record (with the given field 'id' as origin) 

342 comodel = record.env[self.comodel_name] 

343 origin = comodel.browse(value.get('id')) 

344 id_ = comodel.new(value, origin=origin).id 

345 else: 

346 id_ = None 

347 

348 if self.delegate and record and not any(record._ids): 

349 # if all records are new, then so is the parent 

350 id_ = id_ and NewId(id_) 

351 

352 return id_ 

353 

354 def convert_to_record(self, value, record): 

355 # use registry to avoid creating a recordset for the model 

356 ids = () if value is None else (value,) 

357 prefetch_ids = PrefetchMany2one(record, self) 

358 return record.pool[self.comodel_name](record.env, ids, prefetch_ids) 

359 

360 def convert_to_record_multi(self, values, records): 

361 # return the ids as a recordset without duplicates 

362 prefetch_ids = PrefetchMany2one(records, self) 

363 ids = tuple(unique(id_ for id_ in values if id_ is not None)) 

364 return records.pool[self.comodel_name](records.env, ids, prefetch_ids) 

365 

366 def convert_to_read(self, value, record, use_display_name=True): 

367 if use_display_name and value: 

368 # evaluate display_name as superuser, because the visibility of a 

369 # many2one field value (id and name) depends on the current record's 

370 # access rights, and not the value's access rights. 

371 try: 

372 # performance: value.sudo() prefetches the same records as value 

373 return (value.id, value.sudo().display_name) 

374 except MissingError: 

375 # Should not happen, unless the foreign key is missing. 

376 return False 

377 else: 

378 return value.id 

379 

380 def convert_to_write(self, value, record): 

381 if type(value) is int or type(value) is NewId: 

382 return value 

383 if not value: 

384 return False 

385 if isinstance(value, BaseModel) and value._name == self.comodel_name: 385 ↛ 387line 385 didn't jump to line 387 because the condition on line 385 was always true

386 return value.id 

387 if isinstance(value, tuple): 

388 # value is either a pair (id, name), or a tuple of ids 

389 return value[0] if value else False 

390 if isinstance(value, dict): 

391 return record.env[self.comodel_name].new(value).id 

392 raise ValueError("Wrong value for %s: %r" % (self, value)) 

393 

394 def convert_to_export(self, value, record): 

395 return value.display_name if value else '' 

396 

397 def convert_to_display_name(self, value, record): 

398 return value.display_name 

399 

400 def write(self, records, value): 

401 # discard recomputation of self on records 

402 records.env.remove_to_compute(self, records) 

403 

404 # discard the records that are not modified 

405 cache_value = self.convert_to_cache(value, records) 

406 records = self._filter_not_equal(records, cache_value) 

407 if not records: 

408 return 

409 

410 # remove records from the cache of one2many fields of old corecords 

411 self._remove_inverses(records, cache_value) 

412 

413 # update the cache of self 

414 self._update_cache(records, cache_value, dirty=True) 

415 

416 # update the cache of one2many fields of new corecord 

417 self._update_inverses(records, cache_value) 

418 

419 def _remove_inverses(self, records: BaseModel, value): 

420 """ Remove `records` from the cached values of the inverse fields (o2m) of `self`. """ 

421 inverse_fields = records.pool.field_inverses[self] 

422 if not inverse_fields: 

423 return 

424 

425 record_ids = set(records._ids) 

426 # align(id) returns a NewId if records are new, a real id otherwise 

427 align = (lambda id_: id_) if all(record_ids) else (lambda id_: id_ and NewId(id_)) 

428 field_cache = self._get_cache(records.env) 

429 corecords = records.env[self.comodel_name].browse( 

430 align(coid) for record_id in records._ids 

431 if (coid := field_cache.get(record_id)) is not None 

432 ) 

433 

434 for invf in inverse_fields: 

435 inv_cache = invf._get_cache(corecords.env) 

436 for corecord in corecords: 

437 ids0 = inv_cache.get(corecord.id) 

438 if ids0 is not None: 

439 ids1 = tuple(id_ for id_ in ids0 if id_ not in record_ids) 

440 invf._update_cache(corecord, ids1) 

441 

442 def _update_inverses(self, records: BaseModel, value): 

443 """ Add `records` to the cached values of the inverse fields (o2m) of `self`. """ 

444 if value is None: 

445 return 

446 corecord = self.convert_to_record(value, records) 

447 for invf in records.pool.field_inverses[self]: 

448 valid_records = records.filtered_domain(invf.get_comodel_domain(corecord)) 

449 if not valid_records: 

450 continue 

451 ids0 = invf._get_cache(corecord.env).get(corecord.id) 

452 # if the value for the corecord is not in cache, but this is a new 

453 # record, assign it anyway, as you won't be able to fetch it from 

454 # database (see `test_sale_order`) 

455 if ids0 is not None or not corecord.id: 

456 ids1 = tuple(unique((ids0 or ()) + valid_records._ids)) 

457 invf._update_cache(corecord, ids1) 

458 

459 def to_sql(self, model: BaseModel, alias: str) -> SQL: 

460 sql_field = super().to_sql(model, alias) 

461 if self.company_dependent: 

462 comodel = model.env[self.comodel_name] 

463 sql_field = SQL( 

464 '''(SELECT %(cotable_alias)s.id 

465 FROM %(cotable)s AS %(cotable_alias)s 

466 WHERE %(cotable_alias)s.id = %(ref)s)''', 

467 cotable=SQL.identifier(comodel._table), 

468 cotable_alias=SQL.identifier(Query.make_alias(comodel._table, 'exists')), 

469 ref=sql_field, 

470 ) 

471 return sql_field 

472 

473 def condition_to_sql(self, field_expr: str, operator: str, value, model: BaseModel, alias: str, query: Query) -> SQL: 

474 if operator not in ('any', 'not any', 'any!', 'not any!') or field_expr != self.name: 

475 # for other operators than 'any', just generate condition based on column type 

476 return super().condition_to_sql(field_expr, operator, value, model, alias, query) 

477 

478 comodel = model.env[self.comodel_name] 

479 sql_field = model._field_to_sql(alias, field_expr, query) 

480 can_be_null = self not in model.env.registry.not_null_fields 

481 bypass_access = operator in ('any!', 'not any!') or self.bypass_search_access 

482 positive = operator in ('any', 'any!') 

483 

484 # Decide whether to use a LEFT JOIN 

485 left_join = bypass_access and isinstance(value, Domain) 

486 if left_join and not positive: 

487 # For 'not any!', we get a better query with a NOT IN when we have a 

488 # lot of positive conditions which have a better chance to use 

489 # indexes. 

490 # `field NOT IN (SELECT ... WHERE z = y)` better than 

491 # `LEFT JOIN ... ON field = id WHERE z <> y` 

492 # There are some exceptions: we filter on 'id'. 

493 left_join = sum( 

494 (-1 if cond.operator in Domain.NEGATIVE_OPERATORS else 1) 

495 for cond in value.iter_conditions() 

496 ) < 0 or any( 

497 cond.field_expr == 'id' and cond.operator not in Domain.NEGATIVE_OPERATORS 

498 for cond in value.iter_conditions() 

499 ) 

500 

501 if left_join: 

502 comodel, coalias = self.join(model, alias, query) 

503 if not positive: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true

504 value = (~value).optimize_full(comodel) 

505 sql = value._to_sql(comodel, coalias, query) 

506 if self.company_dependent: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true

507 sql = self._condition_to_sql_company(sql, field_expr, operator, value, model, alias, query) 

508 if can_be_null: 

509 if positive: 509 ↛ 512line 509 didn't jump to line 512 because the condition on line 509 was always true

510 sql = SQL("(%s IS NOT NULL AND %s)", sql_field, sql) 

511 else: 

512 sql = SQL("(%s IS NULL OR %s)", sql_field, sql) 

513 return sql 

514 

515 if isinstance(value, Domain): 515 ↛ 517line 515 didn't jump to line 517 because the condition on line 515 was always true

516 value = comodel._search(value, active_test=False, bypass_access=bypass_access) 

517 if isinstance(value, Query): 517 ↛ 519line 517 didn't jump to line 519 because the condition on line 517 was always true

518 subselect = value.subselect() 

519 elif isinstance(value, SQL): 

520 subselect = SQL("(%s)", value) 

521 else: 

522 raise TypeError(f"condition_to_sql() 'any' operator accepts Domain, SQL or Query, got {value}") 

523 sql = SQL( 

524 "%s%s%s", 

525 sql_field, 

526 SQL(" IN ") if positive else SQL(" NOT IN "), 

527 subselect, 

528 ) 

529 if can_be_null and not positive: 

530 sql = SQL("(%s IS NULL OR %s)", sql_field, sql) 

531 if self.company_dependent: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true

532 sql = self._condition_to_sql_company(sql, field_expr, operator, value, model, alias, query) 

533 return sql 

534 

535 def join(self, model: BaseModel, alias: str, query: Query) -> tuple[BaseModel, str]: 

536 """ Add a LEFT JOIN to ``query`` by following field ``self``, 

537 and return the joined table's corresponding model and alias. 

538 """ 

539 comodel = model.env[self.comodel_name] 

540 coalias = query.make_alias(alias, self.name) 

541 query.add_join('LEFT JOIN', coalias, comodel._table, SQL( 

542 "%s = %s", 

543 model._field_to_sql(alias, self.name, query), 

544 SQL.identifier(coalias, 'id'), 

545 )) 

546 return (comodel, coalias) 

547 

548 

549class _RelationalMulti(_Relational): 

550 r"Abstract class for relational fields \*2many." 

551 write_sequence = 20 

552 

553 # Important: the cache contains the ids of all the records in the relation, 

554 # including inactive records. Inactive records are filtered out by 

555 # convert_to_record(), depending on the context. 

556 

557 def _update_inverse(self, records, value): 

558 new_id = value.id 

559 assert not new_id, "Field._update_inverse can only be called with a new id" 

560 field_cache = self._get_cache(records.env) 

561 for record_id in records._ids: 

562 assert not record_id, "Field._update_inverse can only be called with new records" 

563 cache_value = field_cache.get(record_id, SENTINEL) 

564 if cache_value is SENTINEL: 564 ↛ 567line 564 didn't jump to line 567 because the condition on line 564 was always true

565 records.env.transaction.field_data_patches[self][record_id].append(new_id) 

566 else: 

567 field_cache[record_id] = tuple(unique(cache_value + (new_id,))) 

568 

569 def _update_cache(self, records, cache_value, dirty=False): 

570 field_patches = records.env.transaction.field_data_patches.get(self) 

571 if field_patches and records: 

572 for record in records: 

573 ids = field_patches.pop(record.id, ()) 

574 if ids: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true

575 value = tuple(unique(itertools.chain(cache_value, ids))) 

576 else: 

577 value = cache_value 

578 super()._update_cache(record, value, dirty) 

579 return 

580 super()._update_cache(records, cache_value, dirty) 

581 

582 def convert_to_cache(self, value, record, validate=True): 

583 # cache format: tuple(ids) 

584 if isinstance(value, BaseModel): 

585 if validate and value._name != self.comodel_name: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 raise ValueError("Wrong value for %s: %s" % (self, value)) 

587 ids = value._ids 

588 if record and not record.id: 588 ↛ 590line 588 didn't jump to line 590 because the condition on line 588 was never true

589 # x2many field value of new record is new records 

590 ids = tuple(it and NewId(it) for it in ids) 

591 return ids 

592 

593 elif isinstance(value, (list, tuple)): 

594 # value is a list/tuple of commands, dicts or record ids 

595 comodel = record.env[self.comodel_name] 

596 # if record is new, the field's value is new records 

597 if record and not record.id: 

598 browse = lambda it: comodel.browse((it and NewId(it),)) 

599 else: 

600 browse = comodel.browse 

601 # determine the value ids: in case of a real record or a new record 

602 # with origin, take its current value 

603 ids = OrderedSet(record[self.name]._ids if record._origin else ()) 

604 # modify ids with the commands 

605 for command in value: 

606 if isinstance(command, (tuple, list)): 

607 if command[0] == Command.CREATE: 

608 ids.add(comodel.new(command[2], ref=command[1]).id) 

609 elif command[0] == Command.UPDATE: 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true

610 line = browse(command[1]) 

611 if validate: 

612 line.update(command[2]) 

613 else: 

614 line._update_cache(command[2], validate=False) 

615 ids.add(line.id) 

616 elif command[0] in (Command.DELETE, Command.UNLINK): 616 ↛ 617line 616 didn't jump to line 617 because the condition on line 616 was never true

617 ids.discard(browse(command[1]).id) 

618 elif command[0] == Command.LINK: 

619 ids.add(browse(command[1]).id) 

620 elif command[0] == Command.CLEAR: 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true

621 ids.clear() 

622 elif command[0] == Command.SET: 622 ↛ 605line 622 didn't jump to line 605 because the condition on line 622 was always true

623 ids = OrderedSet(browse(it).id for it in command[2]) 

624 elif isinstance(command, dict): 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true

625 ids.add(comodel.new(command).id) 

626 else: 

627 ids.add(browse(command).id) 

628 # return result as a tuple 

629 return tuple(ids) 

630 

631 elif not value: 631 ↛ 634line 631 didn't jump to line 634 because the condition on line 631 was always true

632 return () 

633 

634 raise ValueError("Wrong value for %s: %s" % (self, value)) 

635 

636 def convert_to_record(self, value, record): 

637 # use registry to avoid creating a recordset for the model 

638 prefetch_ids = PrefetchX2many(record, self) 

639 Comodel = record.pool[self.comodel_name] 

640 corecords = Comodel(record.env, value, prefetch_ids) 

641 if ( 

642 Comodel._active_name 

643 and self.context.get('active_test', record.env.context.get('active_test', True)) 

644 ): 

645 corecords = corecords.filtered(Comodel._active_name).with_prefetch(prefetch_ids) 

646 return corecords 

647 

648 def convert_to_record_multi(self, values, records): 

649 # return the list of ids as a recordset without duplicates 

650 prefetch_ids = PrefetchX2many(records, self) 

651 Comodel = records.pool[self.comodel_name] 

652 ids = tuple(unique(id_ for ids in values for id_ in ids)) 

653 corecords = Comodel(records.env, ids, prefetch_ids) 

654 if ( 

655 Comodel._active_name 

656 and self.context.get('active_test', records.env.context.get('active_test', True)) 

657 ): 

658 corecords = corecords.filtered(Comodel._active_name).with_prefetch(prefetch_ids) 

659 return corecords 

660 

661 def convert_to_read(self, value, record, use_display_name=True): 

662 return value.ids 

663 

664 def convert_to_write(self, value, record): 

665 if isinstance(value, tuple): 

666 # a tuple of ids, this is the cache format 

667 value = record.env[self.comodel_name].browse(value) 

668 

669 if isinstance(value, BaseModel) and value._name == self.comodel_name: 

670 def get_origin(val): 

671 return val._origin if isinstance(val, BaseModel) else val 

672 

673 # make result with new and existing records 

674 inv_names = {field.name for field in record.pool.field_inverses[self]} 

675 result = [Command.set([])] 

676 for record in value: 

677 origin = record._origin 

678 if not origin: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true

679 values = record._convert_to_write({ 

680 name: record[name] 

681 for name in record._cache 

682 if name not in inv_names 

683 }) 

684 result.append(Command.create(values)) 

685 else: 

686 result[0][2].append(origin.id) 

687 if record != origin: 

688 values = record._convert_to_write({ 

689 name: record[name] 

690 for name in record._cache 

691 if name not in inv_names and get_origin(record[name]) != origin[name] 

692 }) 

693 if values: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true

694 result.append(Command.update(origin.id, values)) 

695 return result 

696 

697 if value is False or value is None: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 return [Command.clear()] 

699 

700 if isinstance(value, list): 700 ↛ 703line 700 didn't jump to line 703 because the condition on line 700 was always true

701 return value 

702 

703 raise ValueError("Wrong value for %s: %s" % (self, value)) 

704 

705 def convert_to_export(self, value, record): 

706 return ','.join(value.mapped('display_name')) if value else '' 

707 

708 def convert_to_display_name(self, value, record): 

709 raise NotImplementedError() 

710 

711 def get_depends(self, model): 

712 depends, depends_context = super().get_depends(model) 

713 if not self.compute and isinstance(domain := self.domain, (list, Domain)): 

714 domain = Domain(domain) 

715 depends = unique(itertools.chain(depends, ( 

716 self.name + '.' + condition.field_expr 

717 for condition in domain.iter_conditions() 

718 ))) 

719 return depends, depends_context 

720 

721 def create(self, record_values): 

722 """ Write the value of ``self`` on the given records, which have just 

723 been created. 

724 

725 :param record_values: a list of pairs ``(record, value)``, where 

726 ``value`` is in the format of method :meth:`BaseModel.write` 

727 """ 

728 self.write_batch(record_values, True) 

729 

730 def write(self, records, value): 

731 # discard recomputation of self on records 

732 records.env.remove_to_compute(self, records) 

733 self.write_batch([(records, value)]) 

734 

735 def write_batch(self, records_commands_list: Sequence[tuple[BaseModel, typing.Any]], create: bool = False) -> None: 

736 if not records_commands_list: 736 ↛ 737line 736 didn't jump to line 737 because the condition on line 736 was never true

737 return 

738 

739 for idx, (recs, value) in enumerate(records_commands_list): 

740 if isinstance(value, tuple): 740 ↛ 741line 740 didn't jump to line 741 because the condition on line 740 was never true

741 value = [Command.set(value)] 

742 elif isinstance(value, BaseModel) and value._name == self.comodel_name: 

743 value = [Command.set(value._ids)] 

744 elif value is False or value is None: 

745 value = [Command.clear()] 

746 elif isinstance(value, list) and value and not isinstance(value[0], (tuple, list)): 

747 value = [Command.set(tuple(value))] 

748 if not isinstance(value, list): 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true

749 raise ValueError("Wrong value for %s: %s" % (self, value)) 

750 records_commands_list[idx] = (recs, value) 

751 

752 record_ids = {rid for recs, cs in records_commands_list for rid in recs._ids} 

753 if all(record_ids): 

754 self.write_real(records_commands_list, create) 

755 else: 

756 assert not any(record_ids), f"{records_commands_list} contains a mix of real and new records. It is not supported." 

757 self.write_new(records_commands_list) 

758 

759 def write_real(self, records_commands_list: Sequence[tuple[BaseModel, list[CommandValue]]], create: bool = False) -> None: 

760 raise NotImplementedError 

761 

762 def write_new(self, records_commands_list: Sequence[tuple[BaseModel, list[CommandValue]]]) -> None: 

763 raise NotImplementedError 

764 

765 def _check_sudo_commands(self, comodel): 

766 # if the model doesn't accept sudo commands 

767 if not comodel._allow_sudo_commands: 

768 # Then, disable sudo and reset the transaction origin user 

769 return comodel.sudo(False).with_user(comodel.env.transaction.default_env.uid) 

770 return comodel 

771 

772 def condition_to_sql(self, field_expr: str, operator: str, value, model: BaseModel, alias: str, query: Query) -> SQL: 

773 assert field_expr == self.name, "Supporting condition only to field" 

774 comodel = model.env[self.comodel_name] 

775 if not self.store: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true

776 raise ValueError(f"Cannot convert {self} to SQL because it is not stored") 

777 

778 # update the operator to 'any' 

779 if operator in ('in', 'not in'): 

780 operator = 'any' if operator == 'in' else 'not any' 

781 assert operator in ('any', 'not any', 'any!', 'not any!'), \ 

782 f"Relational field {self} expects 'any' operator" 

783 exists = operator in ('any', 'any!') 

784 

785 # check the value and execute the query 

786 if isinstance(value, COLLECTION_TYPES): 

787 value = OrderedSet(value) 

788 comodel = comodel.sudo().with_context(active_test=False) 

789 if False in value: 

790 # [not]in (False, 1) => split conditions 

791 # We want records that have a record such as condition or 

792 # that don't have any records. 

793 if len(value) > 1: 

794 in_operator = 'in' if exists else 'not in' 

795 return SQL( 

796 "(%s OR %s)" if exists else "(%s AND %s)", 

797 self.condition_to_sql(field_expr, in_operator, (False,), model, alias, query), 

798 self.condition_to_sql(field_expr, in_operator, value - {False}, model, alias, query), 

799 ) 

800 # in (False) => not any (Domain.TRUE) 

801 # not in (False) => any (Domain.TRUE) 

802 value = comodel._search(Domain.TRUE) 

803 exists = not exists 

804 else: 

805 value = comodel.browse(value)._as_query(ordered=False) 

806 elif isinstance(value, SQL): 806 ↛ 808line 806 didn't jump to line 808 because the condition on line 806 was never true

807 # wrap SQL into a simple query 

808 comodel = comodel.sudo() 

809 value = Domain('id', 'any', value) 

810 coquery = self._get_query_for_condition_value(model, comodel, operator, value) 

811 return self._condition_to_sql_relational(model, alias, exists, coquery, query) 

812 

813 def _get_query_for_condition_value(self, model: BaseModel, comodel: BaseModel, operator: str, value: Domain | Query) -> Query: 

814 """ Return Query run on the comodel with the field.domain injected.""" 

815 field_domain = self.get_comodel_domain(model) 

816 if isinstance(value, Domain): 

817 domain = value & field_domain 

818 comodel = comodel.with_context(**self.context) 

819 bypass_access = self.bypass_search_access or operator in ('any!', 'not any!') 

820 query = comodel._search(domain, bypass_access=bypass_access) 

821 assert isinstance(query, Query) 

822 return query 

823 if isinstance(value, Query): 823 ↛ 830line 823 didn't jump to line 830 because the condition on line 823 was always true

824 # add the field_domain to the query 

825 domain = field_domain.optimize_full(comodel) 

826 if not domain.is_true(): 

827 # TODO should clone/copy Query value 

828 value.add_where(domain._to_sql(comodel, value.table, value)) 

829 return value 

830 raise NotImplementedError(f"Cannot build query for {value}") 

831 

832 def _condition_to_sql_relational(self, model: BaseModel, alias: str, exists: bool, coquery: Query, query: Query) -> SQL: 

833 raise NotImplementedError 

834 

835 

836class One2many(_RelationalMulti): 

837 """One2many field; the value of such a field is the recordset of all the 

838 records in ``comodel_name`` such that the field ``inverse_name`` is equal to 

839 the current record. 

840 

841 :param str comodel_name: name of the target model 

842 

843 :param str inverse_name: name of the inverse ``Many2one`` field in 

844 ``comodel_name`` 

845 

846 :param domain: an optional domain to set on candidate values on the 

847 client side (domain or a python expression that will be evaluated 

848 to provide domain) 

849 

850 :param dict context: an optional context to use on the client side when 

851 handling that field 

852 

853 :param bool bypass_search_access: whether access rights are bypassed on the 

854 comodel (default: ``False``) 

855 

856 The attributes ``comodel_name`` and ``inverse_name`` are mandatory except in 

857 the case of related fields or field extensions. 

858 """ 

859 type = 'one2many' 

860 

861 inverse_name: str | None = None # name of the inverse field 

862 copy: bool = False # o2m are not copied by default 

863 

864 def __init__(self, comodel_name: str | Sentinel = SENTINEL, inverse_name: str | Sentinel = SENTINEL, 

865 string: str | Sentinel = SENTINEL, **kwargs): 

866 super().__init__( 

867 comodel_name=comodel_name, 

868 inverse_name=inverse_name, 

869 string=string, 

870 **kwargs 

871 ) 

872 

873 def setup_nonrelated(self, model): 

874 super().setup_nonrelated(model) 

875 if self.inverse_name: 

876 # link self to its inverse field and vice-versa 

877 comodel = model.env[self.comodel_name] 

878 try: 

879 field = comodel._fields[self.inverse_name] 

880 field.setup(comodel) 

881 except KeyError: 

882 raise ValueError(f"{self.inverse_name!r} declared in {self!r} does not exist on {comodel._name!r}.") 

883 

884 def setup_inverses(self, registry, inverses): 

885 if self.inverse_name: 

886 # link self to its inverse field and vice-versa 

887 invf = registry[self.comodel_name]._fields[self.inverse_name] 

888 if isinstance(invf, (Many2one, Many2oneReference)): 

889 # setting one2many fields only invalidates many2one inverses; 

890 # integer inverses (res_model/res_id pairs) are not supported 

891 inverses.add(self, invf) 

892 inverses.add(invf, self) 

893 

894 _description_relation_field = property(attrgetter('inverse_name')) 

895 

896 def update_db(self, model, columns): 

897 if self.comodel_name in model.env: 897 ↛ exitline 897 didn't return from function 'update_db' because the condition on line 897 was always true

898 comodel = model.env[self.comodel_name] 

899 if self.inverse_name not in comodel._fields: 899 ↛ 900line 899 didn't jump to line 900 because the condition on line 899 was never true

900 raise UserError(model.env._( 

901 'No inverse field "%(inverse_field)s" found for "%(comodel)s"', 

902 inverse_field=self.inverse_name, 

903 comodel=self.comodel_name 

904 )) 

905 

906 def _additional_domain(self, env) -> Domain: 

907 if self.comodel_name and self.inverse_name: 

908 comodel = env.registry[self.comodel_name] 

909 inverse_field = comodel._fields[self.inverse_name] 

910 if inverse_field.type == 'many2one_reference': 

911 return Domain(inverse_field.model_field, '=', self.model_name) 

912 return Domain.TRUE 

913 

914 def get_comodel_domain(self, model: BaseModel) -> Domain: 

915 return super().get_comodel_domain(model) & self._additional_domain(model.env) 

916 

917 def _internal_description_domain_raw(self, env) -> str | list: 

918 domain = super()._internal_description_domain_raw(env) 

919 additional_domain = self._additional_domain(env) 

920 if additional_domain.is_true(): 

921 return domain 

922 return f"({domain}) + ({additional_domain})" 

923 

924 def __get__(self, records, owner=None): 

925 if records is not None and self.inverse_name is not None: 

926 # force the computation of the inverse field to ensure that the 

927 # cache value of self is consistent 

928 inverse_field = records.pool[self.comodel_name]._fields[self.inverse_name] 

929 if inverse_field.compute: 

930 records.env[self.comodel_name]._recompute_model([self.inverse_name]) 

931 return super().__get__(records, owner) 

932 

933 def read(self, records): 

934 # retrieve the lines in the comodel 

935 context = {'active_test': False} 

936 context.update(self.context) 

937 comodel = records.env[self.comodel_name].with_context(**context) 

938 inverse = self.inverse_name 

939 inverse_field = comodel._fields[inverse] 

940 

941 # optimization: fetch the inverse and active fields with search() 

942 domain = self.get_comodel_domain(records) & Domain(inverse, 'in', records.ids) 

943 field_names = [inverse] 

944 if comodel._active_name: 

945 field_names.append(comodel._active_name) 

946 try: 

947 lines = comodel.search_fetch(domain, field_names) 

948 except AccessError as e: 

949 raise AccessError(records.env._("Failed to read field %s", self) + '\n' + str(e)) from e 

950 

951 # group lines by inverse field (without prefetching other fields) 

952 get_id = (lambda rec: rec.id) if inverse_field.type == 'many2one' else int 

953 group = defaultdict(list) 

954 for line in lines: 

955 # line[inverse] may be a record or an integer 

956 group[get_id(line[inverse])].append(line.id) 

957 

958 # store result in cache 

959 values = [tuple(group[id_]) for id_ in records._ids] 

960 self._insert_cache(records, values) 

961 

962 def write_real(self, records_commands_list, create=False): 

963 """ Update real records. """ 

964 # records_commands_list = [(records, commands), ...] 

965 if not records_commands_list: 965 ↛ 966line 965 didn't jump to line 966 because the condition on line 965 was never true

966 return 

967 

968 model = records_commands_list[0][0].browse() 

969 comodel = model.env[self.comodel_name].with_context(**self.context) 

970 comodel = self._check_sudo_commands(comodel) 

971 

972 if self.store: 

973 inverse = self.inverse_name 

974 to_create = [] # line vals to create 

975 to_delete = [] # line ids to delete 

976 to_link = defaultdict(OrderedSet) # {record: line_ids} 

977 allow_full_delete = not create 

978 

979 def unlink(lines): 

980 if getattr(comodel._fields[inverse], 'ondelete', False) == 'cascade': 

981 to_delete.extend(lines._ids) 

982 else: 

983 lines[inverse] = False 

984 

985 def flush(): 

986 if to_link: 

987 before = {record: record[self.name] for record in to_link} 

988 if to_delete: 988 ↛ 990line 988 didn't jump to line 990 because the condition on line 988 was never true

989 # unlink() will remove the lines from the cache 

990 comodel.browse(to_delete).unlink() 

991 to_delete.clear() 

992 if to_create: 

993 # create() will add the new lines to the cache of records 

994 comodel.create(to_create) 

995 to_create.clear() 

996 if to_link: 

997 for record, line_ids in to_link.items(): 

998 lines = comodel.browse(line_ids) - before[record] 

999 # linking missing lines should fail 

1000 lines.mapped(inverse) 

1001 lines[inverse] = record 

1002 to_link.clear() 

1003 

1004 for recs, commands in records_commands_list: 

1005 for command in (commands or ()): 

1006 if command[0] == Command.CREATE: 

1007 for record in recs: 

1008 to_create.append(dict(command[2], **{inverse: record.id})) 

1009 allow_full_delete = False 

1010 elif command[0] == Command.UPDATE: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true

1011 prefetch_ids = recs[self.name]._prefetch_ids 

1012 comodel.browse(command[1]).with_prefetch(prefetch_ids).write(command[2]) 

1013 elif command[0] == Command.DELETE: 1013 ↛ 1014line 1013 didn't jump to line 1014 because the condition on line 1013 was never true

1014 to_delete.append(command[1]) 

1015 elif command[0] == Command.UNLINK: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true

1016 unlink(comodel.browse(command[1])) 

1017 elif command[0] == Command.LINK: 

1018 to_link[recs[-1]].add(command[1]) 

1019 allow_full_delete = False 

1020 elif command[0] in (Command.CLEAR, Command.SET): 1020 ↛ 1005line 1020 didn't jump to line 1005 because the condition on line 1020 was always true

1021 line_ids = command[2] if command[0] == Command.SET else [] 

1022 if not allow_full_delete: 

1023 # do not try to delete anything in creation mode if nothing has been created before 

1024 if line_ids: 

1025 # equivalent to Command.LINK 

1026 if line_ids.__class__ is int: 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true

1027 line_ids = [line_ids] 

1028 to_link[recs[-1]].update(line_ids) 

1029 allow_full_delete = False 

1030 continue 

1031 flush() 

1032 # assign the given lines to the last record only 

1033 lines = comodel.browse(line_ids) 

1034 domain = self.get_comodel_domain(model) & Domain(inverse, 'in', recs.ids) & Domain('id', 'not in', lines.ids) 

1035 unlink(comodel.search(domain)) 

1036 lines[inverse] = recs[-1] 

1037 

1038 flush() 

1039 

1040 else: 

1041 ids = OrderedSet(rid for recs, cs in records_commands_list for rid in recs._ids) 

1042 records = records_commands_list[0][0].browse(ids) 

1043 

1044 def link(record, lines): 

1045 ids = record[self.name]._ids 

1046 self._update_cache(record, tuple(unique(ids + lines._ids))) 

1047 

1048 def unlink(lines): 

1049 for record in records: 

1050 self._update_cache(record, (record[self.name] - lines)._ids) 

1051 

1052 for recs, commands in records_commands_list: 

1053 for command in (commands or ()): 

1054 if command[0] == Command.CREATE: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true

1055 for record in recs: 

1056 link(record, comodel.new(command[2], ref=command[1])) 

1057 elif command[0] == Command.UPDATE: 1057 ↛ 1058line 1057 didn't jump to line 1058 because the condition on line 1057 was never true

1058 comodel.browse(command[1]).write(command[2]) 

1059 elif command[0] == Command.DELETE: 1059 ↛ 1060line 1059 didn't jump to line 1060 because the condition on line 1059 was never true

1060 unlink(comodel.browse(command[1])) 

1061 elif command[0] == Command.UNLINK: 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true

1062 unlink(comodel.browse(command[1])) 

1063 elif command[0] == Command.LINK: 1063 ↛ 1064line 1063 didn't jump to line 1064 because the condition on line 1063 was never true

1064 link(recs[-1], comodel.browse(command[1])) 

1065 elif command[0] in (Command.CLEAR, Command.SET): 1065 ↛ 1053line 1065 didn't jump to line 1053 because the condition on line 1065 was always true

1066 # assign the given lines to the last record only 

1067 self._update_cache(recs, ()) 

1068 lines = comodel.browse(command[2] if command[0] == Command.SET else []) 

1069 self._update_cache(recs[-1], lines._ids) 

1070 

1071 def write_new(self, records_commands_list): 

1072 if not records_commands_list: 

1073 return 

1074 

1075 model = records_commands_list[0][0].browse() 

1076 comodel = model.env[self.comodel_name].with_context(**self.context) 

1077 comodel = self._check_sudo_commands(comodel) 

1078 

1079 ids = {record.id for records, _ in records_commands_list for record in records} 

1080 records = model.browse(ids) 

1081 

1082 def browse(ids): 

1083 return comodel.browse([id_ and NewId(id_) for id_ in ids]) 

1084 

1085 # make sure self is in cache 

1086 records[self.name] 

1087 

1088 if self.store: 

1089 inverse = self.inverse_name 

1090 

1091 # make sure self's inverse is in cache 

1092 inverse_field = comodel._fields[inverse] 

1093 for record in records: 

1094 inverse_field._update_cache(record[self.name], record.id) 

1095 

1096 for recs, commands in records_commands_list: 

1097 for command in commands: 

1098 if command[0] == Command.CREATE: 

1099 for record in recs: 

1100 line = comodel.new(command[2], ref=command[1]) 

1101 line[inverse] = record 

1102 elif command[0] == Command.UPDATE: 

1103 browse([command[1]]).update(command[2]) 

1104 elif command[0] == Command.DELETE: 

1105 browse([command[1]])[inverse] = False 

1106 elif command[0] == Command.UNLINK: 

1107 browse([command[1]])[inverse] = False 

1108 elif command[0] == Command.LINK: 

1109 browse([command[1]])[inverse] = recs[-1] 

1110 elif command[0] == Command.CLEAR: 

1111 self._update_cache(recs, ()) 

1112 elif command[0] == Command.SET: 

1113 # assign the given lines to the last record only 

1114 self._update_cache(recs, ()) 

1115 last, lines = recs[-1], browse(command[2]) 

1116 self._update_cache(last, lines._ids) 

1117 inverse_field._update_cache(lines, last.id) 

1118 

1119 else: 

1120 def link(record, lines): 

1121 ids = record[self.name]._ids 

1122 self._update_cache(record, tuple(unique(ids + lines._ids))) 

1123 

1124 def unlink(lines): 

1125 for record in records: 

1126 self._update_cache(record, (record[self.name] - lines)._ids) 

1127 

1128 for recs, commands in records_commands_list: 

1129 for command in commands: 

1130 if command[0] == Command.CREATE: 

1131 for record in recs: 

1132 link(record, comodel.new(command[2], ref=command[1])) 

1133 elif command[0] == Command.UPDATE: 

1134 browse([command[1]]).update(command[2]) 

1135 elif command[0] == Command.DELETE: 

1136 unlink(browse([command[1]])) 

1137 elif command[0] == Command.UNLINK: 

1138 unlink(browse([command[1]])) 

1139 elif command[0] == Command.LINK: 

1140 link(recs[-1], browse([command[1]])) 

1141 elif command[0] in (Command.CLEAR, Command.SET): 

1142 # assign the given lines to the last record only 

1143 self._update_cache(recs, ()) 

1144 lines = browse(command[2] if command[0] == Command.SET else []) 

1145 self._update_cache(recs[-1], lines._ids) 

1146 

1147 def _get_query_for_condition_value(self, model: BaseModel, comodel: BaseModel, operator, value) -> Query: 

1148 inverse_field = comodel._fields[self.inverse_name] 

1149 if inverse_field not in comodel.env.registry.not_null_fields: 

1150 # In the condition, one must avoid subqueries to return 

1151 # NULL values, since it makes the IN test NULL instead 

1152 # of FALSE. This may discard expected results, as for 

1153 # instance "id NOT IN (42, NULL)" is never TRUE. 

1154 if isinstance(value, Domain): 

1155 value &= Domain(inverse_field.name, 'not in', {False}) 

1156 else: 

1157 coquery = super()._get_query_for_condition_value(model, comodel, operator, value) 

1158 coquery.add_where(SQL( 

1159 "%s IS NOT NULL", 

1160 comodel._field_to_sql(coquery.table, inverse_field.name, coquery), 

1161 )) 

1162 return coquery 

1163 return super()._get_query_for_condition_value(model, comodel, operator, value) 

1164 

1165 def _condition_to_sql_relational(self, model: BaseModel, alias: str, exists: bool, coquery: Query, query: Query) -> SQL: 

1166 if coquery.is_empty(): 1166 ↛ 1167line 1166 didn't jump to line 1167 because the condition on line 1166 was never true

1167 return Domain(not exists)._to_sql(model, alias, query) 

1168 

1169 comodel = model.env[self.comodel_name].sudo() 

1170 inverse_field = comodel._fields[self.inverse_name] 

1171 if not inverse_field.store: 

1172 # determine ids1 in model related to ids2 

1173 # TODO should we support this in the future? 

1174 recs = comodel.browse(coquery).with_context(prefetch_fields=False) 

1175 if inverse_field.relational: 1175 ↛ 1179line 1175 didn't jump to line 1179 because the condition on line 1175 was always true

1176 inverses = inverse_field.__get__(recs) 

1177 else: 

1178 # int values, map them 

1179 inverses = model.browse(inverse_field.__get__(rec) for rec in recs) 

1180 subselect = inverses._as_query(ordered=False).subselect() 

1181 return SQL( 

1182 "%s%s%s", 

1183 SQL.identifier(alias, 'id'), 

1184 SQL_OPERATORS['in' if exists else 'not in'], 

1185 subselect, 

1186 ) 

1187 

1188 subselect = coquery.subselect( 

1189 SQL("%s AS __inverse", comodel._field_to_sql(coquery.table, inverse_field.name, coquery)) 

1190 ) 

1191 return SQL( 

1192 "%sEXISTS(SELECT FROM %s AS __sub WHERE __inverse = %s)", 

1193 SQL() if exists else SQL("NOT "), 

1194 subselect, 

1195 SQL.identifier(alias, 'id'), 

1196 ) 

1197 

1198 

1199class Many2many(_RelationalMulti): 

1200 """ Many2many field; the value of such a field is the recordset. 

1201 

1202 :param str comodel_name: name of the target model (string) 

1203 mandatory except in the case of related or extended fields 

1204 

1205 :param str relation: optional name of the table that stores the relation in 

1206 the database 

1207 

1208 :param str column1: optional name of the column referring to "these" records 

1209 in the table ``relation`` 

1210 

1211 :param str column2: optional name of the column referring to "those" records 

1212 in the table ``relation`` 

1213 

1214 The attributes ``relation``, ``column1`` and ``column2`` are optional. 

1215 If not given, names are automatically generated from model names, 

1216 provided ``model_name`` and ``comodel_name`` are different! 

1217 

1218 Note that having several fields with implicit relation parameters on a 

1219 given model with the same comodel is not accepted by the ORM, since 

1220 those field would use the same table. The ORM prevents two many2many 

1221 fields to use the same relation parameters, except if 

1222 

1223 - both fields use the same model, comodel, and relation parameters are 

1224 explicit; or 

1225 

1226 - at least one field belongs to a model with ``_auto = False``. 

1227 

1228 :param domain: an optional domain to set on candidate values on the 

1229 client side (domain or a python expression that will be evaluated 

1230 to provide domain) 

1231 

1232 :param dict context: an optional context to use on the client side when 

1233 handling that field 

1234 

1235 :param bool check_company: Mark the field to be verified in 

1236 :meth:`~odoo.models.Model._check_company`. Add a default company 

1237 domain depending on the field attributes. 

1238 

1239 """ 

1240 type = 'many2many' 

1241 

1242 _explicit: bool = True # whether schema is explicitly given 

1243 relation: str | None = None # name of table 

1244 column1: str | None = None # column of table referring to model 

1245 column2: str | None = None # column of table referring to comodel 

1246 ondelete: OnDelete | None = 'cascade' # optional ondelete for the column2 fkey 

1247 

1248 def __init__(self, comodel_name: str | Sentinel = SENTINEL, relation: str | Sentinel = SENTINEL, 

1249 column1: str | Sentinel = SENTINEL, column2: str | Sentinel = SENTINEL, 

1250 string: str | Sentinel = SENTINEL, **kwargs): 

1251 super().__init__( 

1252 comodel_name=comodel_name, 

1253 relation=relation, 

1254 column1=column1, 

1255 column2=column2, 

1256 string=string, 

1257 **kwargs 

1258 ) 

1259 

1260 def setup_nonrelated(self, model: BaseModel) -> None: 

1261 super().setup_nonrelated(model) 

1262 # 2 cases: 

1263 # 1) The ondelete attribute is defined and its definition makes sense 

1264 # 2) The ondelete attribute is explicitly defined as 'set null' for a m2m, 

1265 # this is considered a programming error. 

1266 if self.ondelete not in ('cascade', 'restrict'): 1266 ↛ 1267line 1266 didn't jump to line 1267 because the condition on line 1266 was never true

1267 raise ValueError( 

1268 "The m2m field %s of model %s declares its ondelete policy " 

1269 "as being %r. Only 'restrict' and 'cascade' make sense." 

1270 % (self.name, model._name, self.ondelete) 

1271 ) 

1272 if self.store: 

1273 if not (self.relation and self.column1 and self.column2): 

1274 if not self.relation: 

1275 self._explicit = False 

1276 # table name is based on the stable alphabetical order of tables 

1277 comodel = model.env[self.comodel_name] 

1278 if not self.relation: 

1279 tables = sorted([model._table, comodel._table]) 

1280 assert tables[0] != tables[1], \ 

1281 "%s: Implicit/canonical naming of many2many relationship " \ 

1282 "table is not possible when source and destination models " \ 

1283 "are the same" % self 

1284 self.relation = '%s_%s_rel' % tuple(tables) 

1285 if not self.column1: 

1286 self.column1 = '%s_id' % model._table 

1287 if not self.column2: 

1288 self.column2 = '%s_id' % comodel._table 

1289 # check validity of table name 

1290 check_pg_name(self.relation) 

1291 else: 

1292 self.relation = self.column1 = self.column2 = None 

1293 

1294 if self.relation: 

1295 # check whether other fields use the same schema 

1296 fields = model.pool.many2many_relations[self.relation, self.column1, self.column2] 

1297 for mname, fname in fields: 

1298 field = model.pool[mname]._fields[fname] 

1299 if ( 1299 ↛ 1310line 1299 didn't jump to line 1310 because the condition on line 1299 was always true

1300 field is self 

1301 ) or ( # same model: relation parameters must be explicit 

1302 self.model_name == field.model_name and 

1303 self.comodel_name == field.comodel_name and 

1304 self._explicit and field._explicit 

1305 ) or ( # different models: one model must be _auto=False 

1306 self.model_name != field.model_name and 

1307 not (model._auto and model.env[field.model_name]._auto) 

1308 ): 

1309 continue 

1310 msg = "Many2many fields %s and %s use the same table and columns" 

1311 raise TypeError(msg % (self, field)) 

1312 fields.add((self.model_name, self.name)) 

1313 

1314 def setup_inverses(self, registry, inverses): 

1315 if self.relation: 

1316 # retrieve inverse fields, and link them in field_inverses 

1317 for mname, fname in registry.many2many_relations[self.relation, self.column2, self.column1]: 

1318 field = registry[mname]._fields[fname] 

1319 inverses.add(self, field) 

1320 inverses.add(field, self) 

1321 

1322 def update_db(self, model, columns): 

1323 cr = model.env.cr 

1324 # Do not reflect relations for custom fields, as they do not belong to a 

1325 # module. They are automatically removed when dropping the corresponding 

1326 # 'ir.model.field'. 

1327 if not self.manual: 1327 ↛ 1330line 1327 didn't jump to line 1330 because the condition on line 1327 was always true

1328 model.pool.post_init(model.env['ir.model.relation']._reflect_relation, 

1329 model, self.relation, self._module) 

1330 comodel = model.env[self.comodel_name] 

1331 if not sql.table_exists(cr, self.relation): 

1332 cr.execute(SQL( 

1333 """ CREATE TABLE %(rel)s (%(id1)s INTEGER NOT NULL, 

1334 %(id2)s INTEGER NOT NULL, 

1335 PRIMARY KEY(%(id1)s, %(id2)s)); 

1336 COMMENT ON TABLE %(rel)s IS %(comment)s; 

1337 CREATE INDEX ON %(rel)s (%(id2)s, %(id1)s); """, 

1338 rel=SQL.identifier(self.relation), 

1339 id1=SQL.identifier(self.column1), 

1340 id2=SQL.identifier(self.column2), 

1341 comment=f"RELATION BETWEEN {model._table} AND {comodel._table}", 

1342 )) 

1343 _schema.debug("Create table %r: m2m relation between %r and %r", self.relation, model._table, comodel._table) 

1344 model.pool.post_init(self.update_db_foreign_keys, model) 

1345 return True 

1346 

1347 model.pool.post_init(self.update_db_foreign_keys, model) 

1348 

1349 def update_db_foreign_keys(self, model): 

1350 """ Add the foreign keys corresponding to the field's relation table. """ 

1351 comodel = model.env[self.comodel_name] 

1352 if model._is_an_ordinary_table(): 1352 ↛ 1357line 1352 didn't jump to line 1357 because the condition on line 1352 was always true

1353 model.pool.add_foreign_key( 

1354 self.relation, self.column1, model._table, 'id', 'cascade', 

1355 model, self._module, force=False, 

1356 ) 

1357 if comodel._is_an_ordinary_table(): 1357 ↛ exitline 1357 didn't return from function 'update_db_foreign_keys' because the condition on line 1357 was always true

1358 model.pool.add_foreign_key( 

1359 self.relation, self.column2, comodel._table, 'id', self.ondelete, 

1360 model, self._module, 

1361 ) 

1362 

1363 def read(self, records): 

1364 context = {'active_test': False} 

1365 context.update(self.context) 

1366 comodel = records.env[self.comodel_name].with_context(**context) 

1367 

1368 # bypass the access during search if method is overwriten to avoid 

1369 # possibly filtering all records of the comodel before joining 

1370 filter_access = self.bypass_search_access and type(comodel)._search is not BaseModel._search 

1371 

1372 # make the query for the lines 

1373 domain = self.get_comodel_domain(records) 

1374 try: 

1375 query = comodel._search(domain, order=comodel._order, bypass_access=filter_access) 

1376 except AccessError as e: 

1377 raise AccessError(records.env._("Failed to read field %s", self) + '\n' + str(e)) from e 

1378 

1379 # join with many2many relation table 

1380 sql_id1 = SQL.identifier(self.relation, self.column1) 

1381 sql_id2 = SQL.identifier(self.relation, self.column2) 

1382 query.add_join('JOIN', self.relation, None, SQL( 

1383 "%s = %s", sql_id2, SQL.identifier(comodel._table, 'id'), 

1384 )) 

1385 query.add_where(SQL("%s IN %s", sql_id1, tuple(records.ids))) 

1386 

1387 # retrieve pairs (record, line) and group by record 

1388 group = defaultdict(list) 

1389 for id1, id2 in records.env.execute_query(query.select(sql_id1, sql_id2)): 

1390 group[id1].append(id2) 

1391 

1392 # filter using record rules 

1393 if filter_access and group: 1393 ↛ 1394line 1393 didn't jump to line 1394 because the condition on line 1393 was never true

1394 corecord_ids = OrderedSet(id_ for ids in group.values() for id_ in ids) 

1395 accessible_corecords = comodel.browse(corecord_ids)._filtered_access('read') 

1396 if len(accessible_corecords) < len(corecord_ids): 

1397 # some records are inaccessible, remove them from groups 

1398 corecord_ids = set(accessible_corecords._ids) 

1399 for id1, ids in group.items(): 

1400 group[id1] = [id_ for id_ in ids if id_ in corecord_ids] 

1401 

1402 # store result in cache 

1403 values = [tuple(group[id_]) for id_ in records._ids] 

1404 self._insert_cache(records, values) 

1405 

1406 def write_real(self, records_commands_list, create=False): 

1407 # records_commands_list = [(records, commands), ...] 

1408 if not records_commands_list: 1408 ↛ 1409line 1408 didn't jump to line 1409 because the condition on line 1408 was never true

1409 return 

1410 

1411 model = records_commands_list[0][0].browse() 

1412 comodel = model.env[self.comodel_name].with_context(**self.context) 

1413 comodel = self._check_sudo_commands(comodel) 

1414 cr = model.env.cr 

1415 

1416 # determine old and new relation {x: ys} 

1417 set = OrderedSet 

1418 ids = set(rid for recs, cs in records_commands_list for rid in recs.ids) 

1419 records = model.browse(ids) 

1420 

1421 if self.store: 

1422 # Using `record[self.name]` generates 2 SQL queries when the value 

1423 # is not in cache: one that actually checks access rules for 

1424 # records, and the other one fetching the actual data. We use 

1425 # `self.read` instead to shortcut the first query. 

1426 missing_ids = tuple(self._cache_missing_ids(records)) 

1427 if missing_ids: 

1428 self.read(records.browse(missing_ids)) 

1429 

1430 # determine new relation {x: ys} 

1431 old_relation = {record.id: set(record[self.name]._ids) for record in records} 

1432 new_relation = {x: set(ys) for x, ys in old_relation.items()} 

1433 

1434 # operations on new relation 

1435 def relation_add(xs, y): 

1436 for x in xs: 

1437 new_relation[x].add(y) 

1438 

1439 def relation_remove(xs, y): 

1440 for x in xs: 

1441 new_relation[x].discard(y) 

1442 

1443 def relation_set(xs, ys): 

1444 for x in xs: 

1445 new_relation[x] = set(ys) 

1446 

1447 def relation_delete(ys): 

1448 # the pairs (x, y) have been cascade-deleted from relation 

1449 for ys1 in old_relation.values(): 

1450 ys1 -= ys 

1451 for ys1 in new_relation.values(): 

1452 ys1 -= ys 

1453 

1454 for recs, commands in records_commands_list: 

1455 to_create = [] # line vals to create 

1456 to_delete = [] # line ids to delete 

1457 for command in (commands or ()): 

1458 if not isinstance(command, (list, tuple)) or not command: 1458 ↛ 1459line 1458 didn't jump to line 1459 because the condition on line 1458 was never true

1459 continue 

1460 if command[0] == Command.CREATE: 

1461 to_create.append((recs._ids, command[2])) 

1462 elif command[0] == Command.UPDATE: 1462 ↛ 1463line 1462 didn't jump to line 1463 because the condition on line 1462 was never true

1463 prefetch_ids = recs[self.name]._prefetch_ids 

1464 comodel.browse(command[1]).with_prefetch(prefetch_ids).write(command[2]) 

1465 elif command[0] == Command.DELETE: 1465 ↛ 1466line 1465 didn't jump to line 1466 because the condition on line 1465 was never true

1466 to_delete.append(command[1]) 

1467 elif command[0] == Command.UNLINK: 

1468 relation_remove(recs._ids, command[1]) 

1469 elif command[0] == Command.LINK: 

1470 relation_add(recs._ids, command[1]) 

1471 elif command[0] in (Command.CLEAR, Command.SET): 1471 ↛ 1457line 1471 didn't jump to line 1457 because the condition on line 1471 was always true

1472 # new lines must no longer be linked to records 

1473 to_create = [(set(ids) - set(recs._ids), vals) for (ids, vals) in to_create] 

1474 relation_set(recs._ids, command[2] if command[0] == Command.SET else ()) 

1475 

1476 if to_create: 

1477 # create lines in batch, and link them 

1478 lines = comodel.create([vals for ids, vals in to_create]) 

1479 for line, (ids, _vals) in zip(lines, to_create): 

1480 relation_add(ids, line.id) 

1481 

1482 if to_delete: 1482 ↛ 1484line 1482 didn't jump to line 1484 because the condition on line 1482 was never true

1483 # delete lines in batch 

1484 comodel.browse(to_delete).unlink() 

1485 relation_delete(to_delete) 

1486 

1487 # check comodel access of added records 

1488 # we check the su flag of the environment of records, because su may be 

1489 # disabled on the comodel 

1490 if not model.env.su: 

1491 try: 

1492 comodel.browse( 

1493 co_id 

1494 for rec_id, new_co_ids in new_relation.items() 

1495 for co_id in new_co_ids - old_relation[rec_id] 

1496 ).check_access('read') 

1497 except AccessError as e: 

1498 raise AccessError(model.env._("Failed to write field %s", self) + "\n" + str(e)) 

1499 

1500 # update the cache of self 

1501 for record in records: 

1502 self._update_cache(record, tuple(new_relation[record.id])) 

1503 

1504 # determine the corecords for which the relation has changed 

1505 modified_corecord_ids = set() 

1506 

1507 # process pairs to add (beware of duplicates) 

1508 pairs = [(x, y) for x, ys in new_relation.items() for y in ys - old_relation[x]] 

1509 if pairs: 

1510 if self.store: 

1511 cr.execute(SQL( 

1512 "INSERT INTO %s (%s, %s) VALUES %s ON CONFLICT DO NOTHING", 

1513 SQL.identifier(self.relation), 

1514 SQL.identifier(self.column1), 

1515 SQL.identifier(self.column2), 

1516 SQL(", ").join(pairs), 

1517 )) 

1518 

1519 # update the cache of inverse fields 

1520 y_to_xs = defaultdict(set) 

1521 for x, y in pairs: 

1522 y_to_xs[y].add(x) 

1523 modified_corecord_ids.add(y) 

1524 for invf in records.pool.field_inverses[self]: 

1525 domain = invf.get_comodel_domain(comodel) 

1526 valid_ids = set(records.filtered_domain(domain)._ids) 

1527 if not valid_ids: 1527 ↛ 1528line 1527 didn't jump to line 1528 because the condition on line 1527 was never true

1528 continue 

1529 inv_cache = invf._get_cache(comodel.env) 

1530 for y, xs in y_to_xs.items(): 

1531 corecord = comodel.browse(y) 

1532 try: 

1533 ids0 = inv_cache[corecord.id] 

1534 ids1 = tuple(set(ids0) | (xs & valid_ids)) 

1535 invf._update_cache(corecord, ids1) 

1536 except KeyError: 

1537 pass 

1538 

1539 # process pairs to remove 

1540 pairs = [(x, y) for x, ys in old_relation.items() for y in ys - new_relation[x]] 

1541 if pairs: 

1542 y_to_xs = defaultdict(set) 

1543 for x, y in pairs: 

1544 y_to_xs[y].add(x) 

1545 modified_corecord_ids.add(y) 

1546 

1547 if self.store: 

1548 # express pairs as the union of cartesian products: 

1549 # pairs = [(1, 11), (1, 12), (1, 13), (2, 11), (2, 12), (2, 14)] 

1550 # -> y_to_xs = {11: {1, 2}, 12: {1, 2}, 13: {1}, 14: {2}} 

1551 # -> xs_to_ys = {{1, 2}: {11, 12}, {2}: {14}, {1}: {13}} 

1552 xs_to_ys = defaultdict(set) 

1553 for y, xs in y_to_xs.items(): 

1554 xs_to_ys[frozenset(xs)].add(y) 

1555 # delete the rows where (id1 IN xs AND id2 IN ys) OR ... 

1556 cr.execute(SQL( 

1557 "DELETE FROM %s WHERE %s", 

1558 SQL.identifier(self.relation), 

1559 SQL(" OR ").join( 

1560 SQL("%s IN %s AND %s IN %s", 

1561 SQL.identifier(self.column1), tuple(xs), 

1562 SQL.identifier(self.column2), tuple(ys)) 

1563 for xs, ys in xs_to_ys.items() 

1564 ), 

1565 )) 

1566 

1567 # update the cache of inverse fields 

1568 for invf in records.pool.field_inverses[self]: 

1569 inv_cache = invf._get_cache(comodel.env) 

1570 for y, xs in y_to_xs.items(): 

1571 corecord = comodel.browse(y) 

1572 try: 

1573 ids0 = inv_cache[corecord.id] 

1574 ids1 = tuple(id_ for id_ in ids0 if id_ not in xs) 

1575 invf._update_cache(corecord, ids1) 

1576 except KeyError: 

1577 pass 

1578 

1579 if modified_corecord_ids: 

1580 # trigger the recomputation of fields that depend on the inverse 

1581 # fields of self on the modified corecords 

1582 corecords = comodel.browse(modified_corecord_ids) 

1583 corecords.modified([ 

1584 invf.name 

1585 for invf in model.pool.field_inverses[self] 

1586 if invf.model_name == self.comodel_name 

1587 ]) 

1588 

1589 def write_new(self, records_commands_list): 

1590 """ Update self on new records. """ 

1591 if not records_commands_list: 1591 ↛ 1592line 1591 didn't jump to line 1592 because the condition on line 1591 was never true

1592 return 

1593 

1594 model = records_commands_list[0][0].browse() 

1595 comodel = model.env[self.comodel_name].with_context(**self.context) 

1596 comodel = self._check_sudo_commands(comodel) 

1597 new = lambda id_: id_ and NewId(id_) 

1598 

1599 # determine old and new relation {x: ys} 

1600 set = OrderedSet 

1601 old_relation = {record.id: set(record[self.name]._ids) for records, _ in records_commands_list for record in records} 

1602 new_relation = {x: set(ys) for x, ys in old_relation.items()} 

1603 

1604 for recs, commands in records_commands_list: 

1605 for command in commands: 

1606 if not isinstance(command, (list, tuple)) or not command: 1606 ↛ 1607line 1606 didn't jump to line 1607 because the condition on line 1606 was never true

1607 continue 

1608 if command[0] == Command.CREATE: 1608 ↛ 1609line 1608 didn't jump to line 1609 because the condition on line 1608 was never true

1609 line_id = comodel.new(command[2], ref=command[1]).id 

1610 for line_ids in new_relation.values(): 

1611 line_ids.add(line_id) 

1612 elif command[0] == Command.UPDATE: 1612 ↛ 1613line 1612 didn't jump to line 1613 because the condition on line 1612 was never true

1613 line_id = new(command[1]) 

1614 comodel.browse([line_id]).update(command[2]) 

1615 elif command[0] == Command.DELETE: 1615 ↛ 1616line 1615 didn't jump to line 1616 because the condition on line 1615 was never true

1616 line_id = new(command[1]) 

1617 for line_ids in new_relation.values(): 

1618 line_ids.discard(line_id) 

1619 elif command[0] == Command.UNLINK: 1619 ↛ 1620line 1619 didn't jump to line 1620 because the condition on line 1619 was never true

1620 line_id = new(command[1]) 

1621 for line_ids in new_relation.values(): 

1622 line_ids.discard(line_id) 

1623 elif command[0] == Command.LINK: 1623 ↛ 1624line 1623 didn't jump to line 1624 because the condition on line 1623 was never true

1624 line_id = new(command[1]) 

1625 for line_ids in new_relation.values(): 

1626 line_ids.add(line_id) 

1627 elif command[0] in (Command.CLEAR, Command.SET): 1627 ↛ 1605line 1627 didn't jump to line 1605 because the condition on line 1627 was always true

1628 # new lines must no longer be linked to records 

1629 line_ids = command[2] if command[0] == Command.SET else () 

1630 line_ids = set(new(line_id) for line_id in line_ids) 

1631 for id_ in recs._ids: 

1632 new_relation[id_] = set(line_ids) 

1633 

1634 if new_relation == old_relation: 

1635 return 

1636 

1637 records = model.browse(old_relation) 

1638 

1639 # update the cache of self 

1640 for record in records: 

1641 self._update_cache(record, tuple(new_relation[record.id])) 

1642 

1643 # determine the corecords for which the relation has changed 

1644 modified_corecord_ids = set() 

1645 

1646 # process pairs to add (beware of duplicates) 

1647 pairs = [(x, y) for x, ys in new_relation.items() for y in ys - old_relation[x]] 

1648 if pairs: 1648 ↛ 1670line 1648 didn't jump to line 1670 because the condition on line 1648 was always true

1649 # update the cache of inverse fields 

1650 y_to_xs = defaultdict(set) 

1651 for x, y in pairs: 

1652 y_to_xs[y].add(x) 

1653 modified_corecord_ids.add(y) 

1654 for invf in records.pool.field_inverses[self]: 1654 ↛ 1655line 1654 didn't jump to line 1655 because the loop on line 1654 never started

1655 domain = invf.get_comodel_domain(comodel) 

1656 valid_ids = set(records.filtered_domain(domain)._ids) 

1657 if not valid_ids: 

1658 continue 

1659 inv_cache = invf._get_cache(comodel.env) 

1660 for y, xs in y_to_xs.items(): 

1661 corecord = comodel.browse((y,)) 

1662 try: 

1663 ids0 = inv_cache[corecord.id] 

1664 ids1 = tuple(set(ids0) | (xs & valid_ids)) 

1665 invf._update_cache(corecord, ids1) 

1666 except KeyError: 

1667 pass 

1668 

1669 # process pairs to remove 

1670 pairs = [(x, y) for x, ys in old_relation.items() for y in ys - new_relation[x]] 

1671 if pairs: 1671 ↛ 1673line 1671 didn't jump to line 1673 because the condition on line 1671 was never true

1672 # update the cache of inverse fields 

1673 y_to_xs = defaultdict(set) 

1674 for x, y in pairs: 

1675 y_to_xs[y].add(x) 

1676 modified_corecord_ids.add(y) 

1677 for invf in records.pool.field_inverses[self]: 

1678 inv_cache = invf._get_cache(comodel.env) 

1679 for y, xs in y_to_xs.items(): 

1680 corecord = comodel.browse((y,)) 

1681 try: 

1682 ids0 = inv_cache[corecord.id] 

1683 ids1 = tuple(id_ for id_ in ids0 if id_ not in xs) 

1684 invf._update_cache(corecord, ids1) 

1685 except KeyError: 

1686 pass 

1687 

1688 if modified_corecord_ids: 1688 ↛ exitline 1688 didn't return from function 'write_new' because the condition on line 1688 was always true

1689 # trigger the recomputation of fields that depend on the inverse 

1690 # fields of self on the modified corecords 

1691 corecords = comodel.browse(modified_corecord_ids) 

1692 corecords.modified([ 

1693 invf.name 

1694 for invf in model.pool.field_inverses[self] 

1695 if invf.model_name == self.comodel_name 

1696 ]) 

1697 

1698 def _condition_to_sql_relational(self, model: BaseModel, alias: str, exists: bool, coquery: Query, query: Query) -> SQL: 

1699 if coquery.is_empty(): 1699 ↛ 1700line 1699 didn't jump to line 1700 because the condition on line 1699 was never true

1700 return SQL("FALSE") if exists else SQL("TRUE") 

1701 rel_table, rel_id1, rel_id2 = self.relation, self.column1, self.column2 

1702 rel_alias = query.make_alias(alias, self.name) 

1703 if not coquery.where_clause: 

1704 # case: no constraints on table and we have foreign keys 

1705 # so we can inverse the operator and check existence 

1706 exists = not exists 

1707 return SQL( 

1708 "%sEXISTS (SELECT 1 FROM %s AS %s WHERE %s = %s)", 

1709 SQL("NOT ") if exists else SQL(), 

1710 SQL.identifier(rel_table), 

1711 SQL.identifier(rel_alias), 

1712 SQL.identifier(rel_alias, rel_id1), 

1713 SQL.identifier(alias, 'id'), 

1714 ) 

1715 return SQL( 

1716 "%sEXISTS (SELECT 1 FROM %s AS %s WHERE %s = %s AND %s IN %s)", 

1717 SQL("NOT ") if not exists else SQL(), 

1718 SQL.identifier(rel_table), 

1719 SQL.identifier(rel_alias), 

1720 SQL.identifier(rel_alias, rel_id1), 

1721 SQL.identifier(alias, 'id'), 

1722 SQL.identifier(rel_alias, rel_id2), 

1723 coquery.subselect(), 

1724 ) 

1725 

1726 

1727class PrefetchMany2one(Reversible): 

1728 """ Iterable for the values of a many2one field on the prefetch set of a given record. """ 

1729 __slots__ = ('field', 'record') 

1730 

1731 def __init__(self, record: BaseModel, field: Many2one): 

1732 self.record = record 

1733 self.field = field 

1734 

1735 def __iter__(self): 

1736 field_cache = self.field._get_cache(self.record.env) 

1737 return unique( 

1738 coid for id_ in self.record._prefetch_ids 

1739 if (coid := field_cache.get(id_)) is not None 

1740 ) 

1741 

1742 def __reversed__(self): 

1743 field_cache = self.field._get_cache(self.record.env) 

1744 return unique( 

1745 coid for id_ in reversed(self.record._prefetch_ids) 

1746 if (coid := field_cache.get(id_)) is not None 

1747 ) 

1748 

1749 

1750class PrefetchX2many(Reversible): 

1751 """ Iterable for the values of an x2many field on the prefetch set of a given record. """ 

1752 __slots__ = ('field', 'record') 

1753 

1754 def __init__(self, record: BaseModel, field: _RelationalMulti): 

1755 self.record = record 

1756 self.field = field 

1757 

1758 def __iter__(self): 

1759 field_cache = self.field._get_cache(self.record.env) 

1760 return unique( 

1761 coid 

1762 for id_ in self.record._prefetch_ids 

1763 for coid in field_cache.get(id_, ()) 

1764 ) 

1765 

1766 def __reversed__(self): 

1767 field_cache = self.field._get_cache(self.record.env) 

1768 return unique( 

1769 coid 

1770 for id_ in reversed(self.record._prefetch_ids) 

1771 for coid in field_cache.get(id_, ()) 

1772 )