Coverage for adhoc-cicd-odoo-odoo / odoo / orm / fields_textual.py: 70%

455 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:15 +0000

1from __future__ import annotations 

2 

3import collections.abc 

4import logging 

5import typing 

6from collections import defaultdict 

7from difflib import get_close_matches, unified_diff 

8from hashlib import sha256 

9from operator import attrgetter 

10 

11from markupsafe import Markup 

12from markupsafe import escape as markup_escape 

13from psycopg2.extras import Json as PsycopgJson 

14 

15from odoo.exceptions import AccessError, UserError 

16from odoo.netsvc import COLOR_PATTERN, DEFAULT, GREEN, RED, ColoredFormatter 

17from odoo.tools import SQL, html_normalize, html_sanitize, html2plaintext, is_html_empty, plaintext2html, sql 

18from odoo.tools.misc import OrderedSet, SENTINEL, Sentinel 

19from odoo.tools.sql import pattern_to_translated_trigram_pattern, pg_varchar, value_to_translated_trigram_pattern 

20from odoo.tools.translate import html_translate 

21 

22from .fields import Field, _logger 

23from .utils import COLLECTION_TYPES, SQL_OPERATORS 

24 

25if typing.TYPE_CHECKING: 

26 from .models import BaseModel 

27 from odoo.tools import Query 

28 

29if typing.TYPE_CHECKING: 

30 from collections.abc import Callable 

31 

32 

33class BaseString(Field[str | typing.Literal[False]]): 

34 """ Abstract class for string fields. """ 

35 translate: bool | Callable[[Callable[[str], str], str], str] = False # whether the field is translated 

36 size = None # maximum size of values (deprecated) 

37 is_text = True 

38 falsy_value = '' 

39 

40 def __init__(self, string: str | Sentinel = SENTINEL, **kwargs): 

41 # translate is either True, False, or a callable 

42 if 'translate' in kwargs and not callable(kwargs['translate']): 

43 kwargs['translate'] = bool(kwargs['translate']) 

44 super().__init__(string=string, **kwargs) 

45 

46 _related_translate = property(attrgetter('translate')) 

47 

48 def _description_translate(self, env): 

49 return bool(self.translate) 

50 

51 def setup_related(self, model): 

52 super().setup_related(model) 

53 if self.store and self.translate: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 _logger.warning("Translated stored related field (%s) will not be computed correctly in all languages", self) 

55 

56 def get_depends(self, model): 

57 if self.translate and self.store: 

58 dep, dep_ctx = super().get_depends(model) 

59 if dep_ctx: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 _logger.warning("Translated stored fields (%s) cannot depend on context", self) 

61 return dep, () 

62 return super().get_depends(model) 

63 

64 def _convert_db_column(self, model, column): 

65 # specialized implementation for converting from/to translated fields 

66 if self.translate or column['udt_name'] == 'jsonb': 

67 sql.convert_column_translatable(model.env.cr, model._table, self.name, self.column_type[1]) 

68 else: 

69 sql.convert_column(model.env.cr, model._table, self.name, self.column_type[1]) 

70 

71 def get_trans_terms(self, value): 

72 """ Return the sequence of terms to translate found in `value`. """ 

73 if not callable(self.translate): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 return [value] if value else [] 

75 terms = [] 

76 self.translate(terms.append, value) 

77 return terms 

78 

79 def get_text_content(self, term): 

80 """ Return the textual content for the given term. """ 

81 func = getattr(self.translate, 'get_text_content', lambda term: term) 

82 return func(term) 

83 

84 def convert_to_column(self, value, record, values=None, validate=True): 

85 return self.convert_to_cache(value, record, validate) 

86 

87 def convert_to_column_insert(self, value, record, values=None, validate=True): 

88 if self.translate: 

89 value = self.convert_to_column(value, record, values, validate) 

90 if value is None: 

91 return None 

92 return PsycopgJson({'en_US': value, record.env.lang or 'en_US': value}) 

93 return super().convert_to_column_insert(value, record, values, validate) 

94 

95 def get_column_update(self, record): 

96 if self.translate: 

97 assert self not in record.env._field_depends_context, f"translated field {self} cannot depend on context" 

98 value = record.env.transaction.field_data[self][record.id] 

99 return PsycopgJson(value) if value else None 

100 return super().get_column_update(record) 

101 

102 def convert_to_cache(self, value, record, validate=True): 

103 if value is None or value is False: 

104 return None 

105 

106 if isinstance(value, bytes): 

107 s = value.decode() 

108 else: 

109 s = str(value) 

110 value = s[:self.size] 

111 if validate and callable(self.translate): 

112 # pylint: disable=not-callable 

113 value = self.translate(lambda t: None, value) 

114 return value 

115 

116 def convert_to_record(self, value, record): 

117 if value is None: 

118 return False 

119 if not self.translate: 

120 return value 

121 if isinstance(value, dict): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 lang = self.translation_lang(record.env) 

123 # raise a KeyError for the __get__ function 

124 value = value[lang] 

125 if ( 125 ↛ 130line 125 didn't jump to line 130 because the condition on line 125 was never true

126 callable(self.translate) 

127 and record.env.context.get('edit_translations') 

128 and self.get_trans_terms(value) 

129 ): 

130 base_lang = record._get_base_lang() 

131 lang = record.env.lang or 'en_US' 

132 delay_translation = value != record.with_context(edit_translations=None, check_translations=None, lang=lang)[self.name] 

133 

134 if lang != base_lang: 

135 base_value = record.with_context(edit_translations=None, check_translations=True, lang=base_lang)[self.name] 

136 base_terms = self.get_trans_terms(base_value) 

137 translated_terms = self.get_trans_terms(value) if value != base_value else base_terms 

138 if len(base_terms) != len(translated_terms): 

139 # term number mismatch, ignore all translations 

140 value = base_value 

141 translated_terms = base_terms 

142 get_base = dict(zip(translated_terms, base_terms)).__getitem__ 

143 else: 

144 get_base = lambda term: term 

145 

146 # use a wrapper to let the frontend js code identify each term and 

147 # its metadata in the 'edit_translations' context 

148 def translate_func(term): 

149 source_term = get_base(term) 

150 translation_state = 'translated' if lang == base_lang or source_term != term else 'to_translate' 

151 translation_source_sha = sha256(source_term.encode()).hexdigest() 

152 return ( 

153 '<span ' 

154 f'''{'class="o_delay_translation" ' if delay_translation else ''}''' 

155 f'data-oe-model="{markup_escape(record._name)}" ' 

156 f'data-oe-id="{markup_escape(record.id)}" ' 

157 f'data-oe-field="{markup_escape(self.name)}" ' 

158 f'data-oe-translation-state="{translation_state}" ' 

159 f'data-oe-translation-source-sha="{translation_source_sha}"' 

160 '>' 

161 f'{term}' 

162 '</span>' 

163 ) 

164 # pylint: disable=not-callable 

165 value = self.translate(translate_func, value) 

166 return value 

167 

168 def convert_to_write(self, value, record): 

169 return value 

170 

171 def get_translation_dictionary(self, from_lang_value, to_lang_values): 

172 """ Build a dictionary from terms in from_lang_value to terms in to_lang_values 

173 

174 :param str from_lang_value: from xml/html 

175 :param dict to_lang_values: {lang: lang_value} 

176 

177 :return: {from_lang_term: {lang: lang_term}} 

178 :rtype: dict 

179 """ 

180 

181 from_lang_terms = self.get_trans_terms(from_lang_value) 

182 dictionary = defaultdict(lambda: defaultdict(dict)) 

183 if not from_lang_terms: 

184 return dictionary 

185 dictionary.update({from_lang_term: defaultdict(dict) for from_lang_term in from_lang_terms}) 

186 

187 for lang, to_lang_value in to_lang_values.items(): 

188 to_lang_terms = self.get_trans_terms(to_lang_value) 

189 if len(from_lang_terms) != len(to_lang_terms): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 for from_lang_term in from_lang_terms: 

191 dictionary[from_lang_term][lang] = from_lang_term 

192 else: 

193 for from_lang_term, to_lang_term in zip(from_lang_terms, to_lang_terms): 

194 dictionary[from_lang_term][lang] = to_lang_term 

195 return dictionary 

196 

197 def _get_stored_translations(self, record): 

198 """ 

199 : return: {'en_US': 'value_en_US', 'fr_FR': 'French'} 

200 """ 

201 # assert (self.translate and self.store and record) 

202 record.flush_recordset([self.name]) 

203 cr = record.env.cr 

204 cr.execute(SQL( 

205 "SELECT %s FROM %s WHERE id = %s", 

206 SQL.identifier(self.name), 

207 SQL.identifier(record._table), 

208 record.id, 

209 )) 

210 res = cr.fetchone() 

211 return res[0] if res else None 

212 

213 def translation_lang(self, env): 

214 return (env.lang or 'en_US') if self.translate is True else env._lang 

215 

216 def get_translation_fallback_langs(self, env): 

217 lang = self.translation_lang(env) 

218 if lang == '_en_US': 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 return '_en_US', 'en_US' 

220 if lang == 'en_US': 220 ↛ 222line 220 didn't jump to line 222 because the condition on line 220 was always true

221 return ('en_US',) 

222 if lang.startswith('_'): 

223 return lang, lang[1:], '_en_US', 'en_US' 

224 return lang, 'en_US' 

225 

226 def _get_cache_impl(self, env): 

227 cache = super()._get_cache_impl(env) 

228 if not self.translate or env.context.get('prefetch_langs'): 

229 return cache 

230 lang = self.translation_lang(env) 

231 return LangProxyDict(self, cache, lang) 

232 

233 def _cache_missing_ids(self, records): 

234 if self.translate and records.env.context.get('prefetch_langs'): 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was never true

235 # we always need to fetch the current language in the cache 

236 records = records.with_context(prefetch_langs=False) 

237 return super()._cache_missing_ids(records) 

238 

239 def _to_prefetch(self, record): 

240 if self.translate and record.env.context.get('prefetch_langs'): 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was never true

241 # we always need to fetch the current language in the cache 

242 return super()._to_prefetch(record.with_context(prefetch_langs=False)).with_env(record.env) 

243 return super()._to_prefetch(record) 

244 

245 def _insert_cache(self, records, values): 

246 if not self.translate: 

247 super()._insert_cache(records, values) 

248 return 

249 

250 assert self not in records.env._field_depends_context, f"translated field {self} cannot depend on context" 

251 env = records.env 

252 field_cache = env.transaction.field_data[self] 

253 if env.context.get('prefetch_langs'): 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 installed = [lang for lang, _ in env['res.lang'].get_installed()] 

255 langs = OrderedSet[str](installed + ['en_US']) 

256 u_langs: list[str] = [f'_{lang}' for lang in langs] if self.translate is not True and env._lang.startswith('_') else [] 

257 for id_, val in zip(records._ids, values): 

258 if val is None: 

259 field_cache.setdefault(id_, None) 

260 else: 

261 if u_langs: # fallback missing _lang to lang if exists 

262 val.update({f'_{k}': v for k, v in val.items() if k in langs and f'_{k}' not in val}) 

263 field_cache[id_] = { 

264 **dict.fromkeys(langs, val['en_US']), # fallback missing lang to en_US 

265 **dict.fromkeys(u_langs, val.get('_en_US')), # fallback missing _lang to _en_US 

266 **val 

267 } 

268 else: 

269 lang = self.translation_lang(env) 

270 for id_, val in zip(records._ids, values): 

271 if val is None: 

272 field_cache.setdefault(id_, None) 

273 else: 

274 cache_value = field_cache.setdefault(id_, {}) 

275 if cache_value is not None: 275 ↛ 270line 275 didn't jump to line 270 because the condition on line 275 was always true

276 cache_value.setdefault(lang, val) 

277 

278 def _update_cache(self, records, cache_value, dirty=False): 

279 if self.translate and cache_value is not None and records.env.context.get('prefetch_langs'): 

280 assert isinstance(cache_value, dict), f"invalid cache value for {self}" 

281 if len(records) > 1: 281 ↛ 283line 281 didn't jump to line 283 because the condition on line 281 was never true

282 # new dict for each record 

283 for record in records: 

284 super()._update_cache(record, dict(cache_value), dirty) 

285 return 

286 super()._update_cache(records, cache_value, dirty) 

287 

288 def write(self, records, value): 

289 if not self.translate or value is False or value is None: 

290 super().write(records, value) 

291 return 

292 cache_value = self.convert_to_cache(value, records) 

293 records = self._filter_not_equal(records, cache_value) 

294 if not records: 

295 return 

296 field_cache = self._get_cache(records.env) 

297 dirty_ids = records.env._field_dirty.get(self, ()) 

298 

299 # flush dirty None values 

300 dirty_records = records.filtered(lambda rec: rec.id in dirty_ids) 

301 if any(field_cache.get(record_id, SENTINEL) is None for record_id in dirty_records._ids): 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 dirty_records.flush_recordset([self.name]) 

303 

304 dirty = self.store and any(records._ids) 

305 lang = self.translation_lang(records.env) 

306 

307 # not dirty fields 

308 if not dirty: 

309 if self.compute and self.inverse: 

310 # invalidate the values in other languages to force their recomputation 

311 self._update_cache(records.with_context(prefetch_langs=True), {lang: cache_value}, dirty=False) 

312 else: 

313 self._update_cache(records, cache_value, dirty=False) 

314 return 

315 

316 # model translation 

317 if not callable(self.translate): 

318 # invalidate clean fields because them may contain fallback value 

319 clean_records = records.filtered(lambda rec: rec.id not in dirty_ids) 

320 clean_records.invalidate_recordset([self.name]) 

321 self._update_cache(records, cache_value, dirty=True) 

322 if lang != 'en_US' and not records.env['res.lang']._get_data(code='en_US'): 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was never true

323 # if 'en_US' is not active, we always write en_US to make sure value_en is meaningful 

324 self._update_cache(records.with_context(lang='en_US'), cache_value, dirty=True) 

325 return 

326 

327 # model term translation 

328 new_translations_list = [] 

329 new_terms = set(self.get_trans_terms(cache_value)) 

330 delay_translations = records.env.context.get('delay_translations') 

331 for record in records: 

332 # shortcut when no term needs to be translated 

333 if not new_terms: 

334 new_translations_list.append({'en_US': cache_value, lang: cache_value}) 

335 continue 

336 # _get_stored_translations can be refactored and prefetches translations for multi records, 

337 # but it is really rare to write the same non-False/None/no-term value to multi records 

338 stored_translations = self._get_stored_translations(record) 

339 if not stored_translations: 

340 new_translations_list.append({'en_US': cache_value, lang: cache_value}) 

341 continue 

342 old_translations = { 

343 k: stored_translations.get(f'_{k}', v) 

344 for k, v in stored_translations.items() 

345 if not k.startswith('_') 

346 } 

347 from_lang_value = old_translations.pop(lang, old_translations['en_US']) 

348 translation_dictionary = self.get_translation_dictionary(from_lang_value, old_translations) 

349 text2terms = defaultdict(list) 

350 for term in new_terms: 

351 if term_text := self.get_text_content(term): 351 ↛ 350line 351 didn't jump to line 350 because the condition on line 351 was always true

352 text2terms[term_text].append(term) 

353 

354 is_text = self.translate.is_text if hasattr(self.translate, 'is_text') else lambda term: True 

355 term_adapter = self.translate.term_adapter if hasattr(self.translate, 'term_adapter') else None 

356 for old_term in list(translation_dictionary.keys()): 

357 if old_term not in new_terms: 

358 old_term_text = self.get_text_content(old_term) 

359 matches = get_close_matches(old_term_text, text2terms, 1, 0.9) 

360 if matches: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true

361 closest_term = get_close_matches(old_term, text2terms[matches[0]], 1, 0)[0] 

362 if closest_term in translation_dictionary: 

363 continue 

364 old_is_text = is_text(old_term) 

365 closest_is_text = is_text(closest_term) 

366 if old_is_text or not closest_is_text: 

367 if not closest_is_text and records.env.context.get("install_mode") and lang == 'en_US' and term_adapter: 

368 adapter = term_adapter(closest_term) 

369 if adapter(old_term) is None: # old term and closest_term have different structures 

370 continue 

371 translation_dictionary[closest_term] = {k: adapter(v) for k, v in translation_dictionary.pop(old_term).items()} 

372 else: 

373 translation_dictionary[closest_term] = translation_dictionary.pop(old_term) 

374 # pylint: disable=not-callable 

375 new_translations = { 

376 l: self.translate(lambda term: translation_dictionary.get(term, {l: None})[l], cache_value) 

377 for l in old_translations.keys() 

378 } 

379 if delay_translations: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true

380 new_store_translations = stored_translations 

381 new_store_translations.update({f'_{k}': v for k, v in new_translations.items()}) 

382 new_store_translations.pop(f'_{lang}', None) 

383 else: 

384 new_store_translations = new_translations 

385 new_store_translations[lang] = cache_value 

386 

387 if not records.env['res.lang']._get_data(code='en_US'): 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 new_store_translations['en_US'] = cache_value 

389 new_store_translations.pop('_en_US', None) 

390 new_translations_list.append(new_store_translations) 

391 for record, new_translation in zip(records.with_context(prefetch_langs=True), new_translations_list, strict=True): 

392 self._update_cache(record, new_translation, dirty=True) 

393 

394 def to_sql(self, model: BaseModel, alias: str) -> SQL: 

395 sql_field = super().to_sql(model, alias) 

396 if self.translate and not model.env.context.get('prefetch_langs'): 

397 langs = self.get_translation_fallback_langs(model.env) 

398 sql_field_langs = [SQL("%s->>%s", sql_field, lang) for lang in langs] 

399 if len(sql_field_langs) == 1: 399 ↛ 401line 399 didn't jump to line 401 because the condition on line 399 was always true

400 return sql_field_langs[0] 

401 return SQL("COALESCE(%s)", SQL(", ").join(sql_field_langs)) 

402 return sql_field 

403 

404 def expression_getter(self, field_expr): 

405 if field_expr != 'display_name.no_error': 405 ↛ 410line 405 didn't jump to line 410 because the condition on line 405 was always true

406 return super().expression_getter(field_expr) 

407 

408 # when searching by display_name, don't raise AccessError but return an 

409 # empty value instead 

410 get_display_name = super().expression_getter('display_name') 

411 

412 def getter(record): 

413 try: 

414 return get_display_name(record) 

415 except AccessError: 

416 return '' 

417 

418 return getter 

419 

420 def condition_to_sql(self, field_expr: str, operator: str, value, model: BaseModel, alias: str, query: Query) -> SQL: 

421 # build the condition 

422 if self.translate and model.env.context.get('prefetch_langs'): 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 model = model.with_context(prefetch_langs=False) 

424 base_condition = super().condition_to_sql(field_expr, operator, value, model, alias, query) 

425 

426 # faster SQL for index trigrams 

427 if ( 427 ↛ 440line 427 didn't jump to line 440 because the condition on line 427 was never true

428 self.translate 

429 and value 

430 and operator in ('in', 'like', 'ilike', '=like', '=ilike') 

431 and self.index == 'trigram' 

432 and model.pool.has_trigram 

433 and ( 

434 isinstance(value, str) 

435 or (isinstance(value, COLLECTION_TYPES) and all(isinstance(v, str) for v in value)) 

436 ) 

437 ): 

438 # a prefilter using trigram index to speed up '=', 'like', 'ilike' 

439 # '!=', '<=', '<', '>', '>=', 'in', 'not in', 'not like', 'not ilike' cannot use this trick 

440 if operator == 'in' and len(value) == 1: 

441 value = value_to_translated_trigram_pattern(next(iter(value))) 

442 elif operator != 'in': 

443 value = pattern_to_translated_trigram_pattern(value) 

444 else: 

445 value = '%' 

446 

447 if value == '%': 

448 return base_condition 

449 

450 raw_sql_field = self.to_sql(model.with_context(prefetch_langs=True), alias) 

451 sql_left = SQL("jsonb_path_query_array(%s, '$.*')::text", raw_sql_field) 

452 sql_operator = SQL_OPERATORS['like' if operator == 'in' else operator] 

453 sql_right = SQL("%s", self.convert_to_column(value, model, validate=False)) 

454 unaccent = model.env.registry.unaccent 

455 return SQL( 

456 "(%s%s%s AND %s)", 

457 unaccent(sql_left), 

458 sql_operator, 

459 unaccent(sql_right), 

460 base_condition, 

461 ) 

462 return base_condition 

463 

464 

465class Char(BaseString): 

466 """ Basic string field, can be length-limited, usually displayed as a 

467 single-line string in clients. 

468 

469 :param int size: the maximum size of values stored for that field 

470 

471 :param bool trim: states whether the value is trimmed or not (by default, 

472 ``True``). Note that the trim operation is applied by both the server code and the web client 

473 This ensures consistent behavior between imported data and UI-entered data. 

474 

475 - The web client trims user input during in write/create flows in UI. 

476 - The server trims values during import (in `base_import`) to avoid discrepancies between 

477 trimmed form inputs and stored DB values. 

478 

479 :param translate: enable the translation of the field's values; use 

480 ``translate=True`` to translate field values as a whole; ``translate`` 

481 may also be a callable such that ``translate(callback, value)`` 

482 translates ``value`` by using ``callback(term)`` to retrieve the 

483 translation of terms. 

484 :type translate: bool or callable 

485 """ 

486 type = 'char' 

487 trim: bool = True # whether value is trimmed (only by web client and base_import) 

488 

489 def _setup_attrs__(self, model_class, name): 

490 super()._setup_attrs__(model_class, name) 

491 assert self.size is None or isinstance(self.size, int), \ 

492 "Char field %s with non-integer size %r" % (self, self.size) 

493 

494 @property 

495 def _column_type(self): 

496 return ('varchar', pg_varchar(self.size)) 

497 

498 def update_db_column(self, model, column): 

499 if ( 

500 column and self.column_type[0] == 'varchar' and 

501 column['udt_name'] == 'varchar' and column['character_maximum_length'] and 

502 (self.size is None or column['character_maximum_length'] < self.size) 

503 ): 

504 # the column's varchar size does not match self.size; convert it 

505 sql.convert_column(model.env.cr, model._table, self.name, self.column_type[1]) 

506 super().update_db_column(model, column) 

507 

508 _related_size = property(attrgetter('size')) 

509 _related_trim = property(attrgetter('trim')) 

510 _description_size = property(attrgetter('size')) 

511 _description_trim = property(attrgetter('trim')) 

512 

513 def get_depends(self, model): 

514 depends, depends_context = super().get_depends(model) 

515 

516 # display_name may depend on context['lang'] (`test_lp1071710`) 

517 if ( 

518 self.name == 'display_name' 

519 and self.compute 

520 and not self.store 

521 and model._rec_name 

522 and model._fields[model._rec_name].base_field.translate 

523 and 'lang' not in depends_context 

524 ): 

525 depends_context = [*depends_context, 'lang'] 

526 

527 return depends, depends_context 

528 

529 

530class Text(BaseString): 

531 """ Very similar to :class:`Char` but used for longer contents, does not 

532 have a size and usually displayed as a multiline text box. 

533 

534 :param translate: enable the translation of the field's values; use 

535 ``translate=True`` to translate field values as a whole; ``translate`` 

536 may also be a callable such that ``translate(callback, value)`` 

537 translates ``value`` by using ``callback(term)`` to retrieve the 

538 translation of terms. 

539 :type translate: bool or callable 

540 """ 

541 type = 'text' 

542 _column_type = ('text', 'text') 

543 

544 

545class Html(BaseString): 

546 """ Encapsulates an html code content. 

547 

548 :param bool sanitize: whether value must be sanitized (default: ``True``) 

549 :param bool sanitize_overridable: whether the sanitation can be bypassed by 

550 the users part of the `base.group_sanitize_override` group (default: ``False``) 

551 :param bool sanitize_tags: whether to sanitize tags 

552 (only a white list of attributes is accepted, default: ``True``) 

553 :param bool sanitize_attributes: whether to sanitize attributes 

554 (only a white list of attributes is accepted, default: ``True``) 

555 :param bool sanitize_style: whether to sanitize style attributes (default: ``False``) 

556 :param bool sanitize_conditional_comments: whether to kill conditional comments. (default: ``True``) 

557 :param bool sanitize_output_method: whether to sanitize using html or xhtml (default: ``html``) 

558 :param bool strip_style: whether to strip style attributes 

559 (removed and therefore not sanitized, default: ``False``) 

560 :param bool strip_classes: whether to strip classes attributes (default: ``False``) 

561 """ 

562 type = 'html' 

563 _column_type = ('text', 'text') 

564 

565 sanitize: bool = True # whether value must be sanitized 

566 sanitize_overridable: bool = False # whether the sanitation can be bypassed by the users part of the `base.group_sanitize_override` group 

567 sanitize_tags: bool = True # whether to sanitize tags (only a white list of attributes is accepted) 

568 sanitize_attributes: bool = True # whether to sanitize attributes (only a white list of attributes is accepted) 

569 sanitize_style: bool = False # whether to sanitize style attributes 

570 sanitize_form: bool = True # whether to sanitize forms 

571 sanitize_conditional_comments: bool = True # whether to kill conditional comments. Otherwise keep them but with their content sanitized. 

572 sanitize_output_method: str = 'html' # whether to sanitize using html or xhtml 

573 strip_style: bool = False # whether to strip style attributes (removed and therefore not sanitized) 

574 strip_classes: bool = False # whether to strip classes attributes 

575 

576 def _get_attrs(self, model_class, name): 

577 # called by _setup_attrs__(), working together with BaseString._setup_attrs__() 

578 attrs = super()._get_attrs(model_class, name) 

579 # Shortcut for common sanitize options 

580 # Outgoing and incoming emails should not be sanitized with the same options. 

581 # e.g. conditional comments: no need to keep conditional comments for incoming emails, 

582 # we do not need this Microsoft Outlook client feature for emails displayed Odoo's web client. 

583 # While we need to keep them in mail templates and mass mailings, because they could be rendered in Outlook. 

584 if attrs.get('sanitize') == 'email_outgoing': 

585 attrs['sanitize'] = True 

586 attrs.update({key: value for key, value in { 

587 'sanitize_tags': False, 

588 'sanitize_attributes': False, 

589 'sanitize_conditional_comments': False, 

590 'sanitize_output_method': 'xml', 

591 }.items() if key not in attrs}) 

592 # Translated sanitized html fields must use html_translate or a callable. 

593 # `elif` intended, because HTML fields with translate=True and sanitize=False 

594 # where not using `html_translate` before and they must remain without `html_translate`. 

595 # Otherwise, breaks `--test-tags .test_render_field`, for instance. 

596 elif attrs.get('translate') is True and attrs.get('sanitize', True): 

597 attrs['translate'] = html_translate 

598 return attrs 

599 

600 _related_sanitize = property(attrgetter('sanitize')) 

601 _related_sanitize_tags = property(attrgetter('sanitize_tags')) 

602 _related_sanitize_attributes = property(attrgetter('sanitize_attributes')) 

603 _related_sanitize_style = property(attrgetter('sanitize_style')) 

604 _related_strip_style = property(attrgetter('strip_style')) 

605 _related_strip_classes = property(attrgetter('strip_classes')) 

606 

607 _description_sanitize = property(attrgetter('sanitize')) 

608 _description_sanitize_tags = property(attrgetter('sanitize_tags')) 

609 _description_sanitize_attributes = property(attrgetter('sanitize_attributes')) 

610 _description_sanitize_style = property(attrgetter('sanitize_style')) 

611 _description_strip_style = property(attrgetter('strip_style')) 

612 _description_strip_classes = property(attrgetter('strip_classes')) 

613 

614 def convert_to_column(self, value, record, values=None, validate=True): 

615 value = self._convert(value, record, validate=validate) 

616 return super().convert_to_column(value, record, values, validate=False) 

617 

618 def convert_to_cache(self, value, record, validate=True): 

619 return self._convert(value, record, validate) 

620 

621 def _convert(self, value, record, validate): 

622 if value is None or value is False: 

623 return None 

624 

625 if not validate or not self.sanitize: 

626 return value 

627 

628 sanitize_vals = { 

629 'silent': True, 

630 'sanitize_tags': self.sanitize_tags, 

631 'sanitize_attributes': self.sanitize_attributes, 

632 'sanitize_style': self.sanitize_style, 

633 'sanitize_form': self.sanitize_form, 

634 'sanitize_conditional_comments': self.sanitize_conditional_comments, 

635 'output_method': self.sanitize_output_method, 

636 'strip_style': self.strip_style, 

637 'strip_classes': self.strip_classes 

638 } 

639 

640 if self.sanitize_overridable: 

641 if record.env.user.has_group('base.group_sanitize_override'): 641 ↛ 644line 641 didn't jump to line 644 because the condition on line 641 was always true

642 return value 

643 

644 original_value = record[self.name] 

645 if original_value: 

646 # Note that sanitize also normalize 

647 original_value_sanitized = html_sanitize(original_value, **sanitize_vals) 

648 original_value_normalized = html_normalize(original_value) 

649 

650 if ( 

651 not original_value_sanitized # sanitizer could empty it 

652 or original_value_normalized != original_value_sanitized 

653 ): 

654 # The field contains element(s) that would be removed if 

655 # sanitized. It means that someone who was part of a group 

656 # allowing to bypass the sanitation saved that field 

657 # previously. 

658 

659 diff = unified_diff( 

660 original_value_sanitized.splitlines(), 

661 original_value_normalized.splitlines(), 

662 ) 

663 

664 with_colors = isinstance(logging.getLogger().handlers[0].formatter, ColoredFormatter) 

665 diff_str = f'The field ({record._description}, {self.string}) will not be editable:\n' 

666 for line in list(diff)[2:]: 

667 if with_colors: 

668 color = {'-': RED, '+': GREEN}.get(line[:1], DEFAULT) 

669 diff_str += COLOR_PATTERN % (30 + color, 40 + DEFAULT, line.rstrip() + "\n") 

670 else: 

671 diff_str += line.rstrip() + '\n' 

672 _logger.info(diff_str) 

673 

674 raise UserError(record.env._( 

675 "The field value you're saving (%(model)s %(field)s) includes content that is " 

676 "restricted for security reasons. It is possible that someone " 

677 "with higher privileges previously modified it, and you are therefore " 

678 "not able to modify it yourself while preserving the content.", 

679 model=record._description, field=self.string, 

680 )) 

681 

682 return html_sanitize(value, **sanitize_vals) 

683 

684 def convert_to_record(self, value, record): 

685 r = super().convert_to_record(value, record) 

686 if isinstance(r, bytes): 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true

687 r = r.decode() 

688 return r and Markup(r) 

689 

690 def convert_to_read(self, value, record, use_display_name=True): 

691 r = super().convert_to_read(value, record, use_display_name) 

692 if isinstance(r, bytes): 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true

693 r = r.decode() 

694 return r and Markup(r) 

695 

696 def get_trans_terms(self, value): 

697 # ensure the translation terms are stringified, otherwise we can break the PO file 

698 return list(map(str, super().get_trans_terms(value))) 

699 

700 escape = staticmethod(markup_escape) 

701 is_empty = staticmethod(is_html_empty) 

702 to_plaintext = staticmethod(html2plaintext) 

703 from_plaintext = staticmethod(plaintext2html) 

704 

705 

706class LangProxyDict(collections.abc.MutableMapping): 

707 """A view on a dict[id, dict[lang, value]] that maps id to value given a 

708 fixed language.""" 

709 __slots__ = ('_cache', '_field', '_lang') 

710 

711 def __init__(self, field: BaseString, cache: dict, lang: str): 

712 super().__init__() 

713 self._field = field 

714 self._cache = cache 

715 self._lang = lang 

716 

717 def get(self, key, default=None): 

718 # just for performance 

719 vals = self._cache.get(key, SENTINEL) 

720 if vals is SENTINEL: 

721 return default 

722 if vals is None: 

723 return None 

724 if not (self._field.compute or (self._field.store and (key or key.origin))): 724 ↛ 728line 724 didn't jump to line 728 because the condition on line 724 was never true

725 # the field's value is neither computed, nor in database 

726 # (non-stored field or new record without origin), so fallback on 

727 # its 'en_US' value in cache 

728 return vals.get(self._lang, vals.get('en_US', default)) 

729 return vals.get(self._lang, default) 

730 

731 def __getitem__(self, key): 

732 vals = self._cache[key] 

733 if vals is None: 

734 return None 

735 if not (self._field.compute or (self._field.store and (key or key.origin))): 735 ↛ 739line 735 didn't jump to line 739 because the condition on line 735 was never true

736 # the field's value is neither computed, nor in database 

737 # (non-stored field or new record without origin), so fallback on 

738 # its 'en_US' value in cache 

739 return vals.get(self._lang, vals.get('en_US')) 

740 return vals[self._lang] 

741 

742 def __setitem__(self, key, value): 

743 if value is None: 

744 self._cache[key] = None 

745 return 

746 vals = self._cache.get(key) 

747 if vals is None: 

748 # key is not in cache, or {key: None} is in cache 

749 self._cache[key] = vals = {self._lang: value} 

750 else: 

751 vals[self._lang] = value 

752 if not (self._field.compute or (self._field.store and (key or key.origin))): 

753 # the field's value is neither computed, nor in database 

754 # (non-stored field or new record without origin), so the cache 

755 # must contain the fallback 'en_US' value for other languages 

756 vals.setdefault('en_US', value) 

757 

758 def __delitem__(self, key): 

759 vals = self._cache.get(key) 

760 if vals: 

761 vals.pop(self._lang, None) 

762 

763 def __iter__(self): 

764 for key, vals in self._cache.items(): 

765 if vals is None or self._lang in vals: 

766 yield key 

767 

768 def __len__(self): 

769 return sum(1 for _ in self) 

770 

771 def clear(self): 

772 for vals in self._cache.values(): 

773 if vals: 

774 vals.pop(self._lang, None) 

775 

776 def __repr__(self): 

777 return f"<LangProxyDict lang={self._lang!r} size={len(self._cache)} at {hex(id(self))}>"