Coverage for adhoc-cicd-odoo-odoo / odoo / orm / fields_textual.py: 70%
455 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
1from __future__ import annotations
3import collections.abc
4import logging
5import typing
6from collections import defaultdict
7from difflib import get_close_matches, unified_diff
8from hashlib import sha256
9from operator import attrgetter
11from markupsafe import Markup
12from markupsafe import escape as markup_escape
13from psycopg2.extras import Json as PsycopgJson
15from odoo.exceptions import AccessError, UserError
16from odoo.netsvc import COLOR_PATTERN, DEFAULT, GREEN, RED, ColoredFormatter
17from odoo.tools import SQL, html_normalize, html_sanitize, html2plaintext, is_html_empty, plaintext2html, sql
18from odoo.tools.misc import OrderedSet, SENTINEL, Sentinel
19from odoo.tools.sql import pattern_to_translated_trigram_pattern, pg_varchar, value_to_translated_trigram_pattern
20from odoo.tools.translate import html_translate
22from .fields import Field, _logger
23from .utils import COLLECTION_TYPES, SQL_OPERATORS
25if typing.TYPE_CHECKING:
26 from .models import BaseModel
27 from odoo.tools import Query
29if typing.TYPE_CHECKING:
30 from collections.abc import Callable
33class BaseString(Field[str | typing.Literal[False]]):
34 """ Abstract class for string fields. """
35 translate: bool | Callable[[Callable[[str], str], str], str] = False # whether the field is translated
36 size = None # maximum size of values (deprecated)
37 is_text = True
38 falsy_value = ''
40 def __init__(self, string: str | Sentinel = SENTINEL, **kwargs):
41 # translate is either True, False, or a callable
42 if 'translate' in kwargs and not callable(kwargs['translate']):
43 kwargs['translate'] = bool(kwargs['translate'])
44 super().__init__(string=string, **kwargs)
46 _related_translate = property(attrgetter('translate'))
48 def _description_translate(self, env):
49 return bool(self.translate)
51 def setup_related(self, model):
52 super().setup_related(model)
53 if self.store and self.translate: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true
54 _logger.warning("Translated stored related field (%s) will not be computed correctly in all languages", self)
56 def get_depends(self, model):
57 if self.translate and self.store:
58 dep, dep_ctx = super().get_depends(model)
59 if dep_ctx: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 _logger.warning("Translated stored fields (%s) cannot depend on context", self)
61 return dep, ()
62 return super().get_depends(model)
64 def _convert_db_column(self, model, column):
65 # specialized implementation for converting from/to translated fields
66 if self.translate or column['udt_name'] == 'jsonb':
67 sql.convert_column_translatable(model.env.cr, model._table, self.name, self.column_type[1])
68 else:
69 sql.convert_column(model.env.cr, model._table, self.name, self.column_type[1])
71 def get_trans_terms(self, value):
72 """ Return the sequence of terms to translate found in `value`. """
73 if not callable(self.translate): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 return [value] if value else []
75 terms = []
76 self.translate(terms.append, value)
77 return terms
79 def get_text_content(self, term):
80 """ Return the textual content for the given term. """
81 func = getattr(self.translate, 'get_text_content', lambda term: term)
82 return func(term)
84 def convert_to_column(self, value, record, values=None, validate=True):
85 return self.convert_to_cache(value, record, validate)
87 def convert_to_column_insert(self, value, record, values=None, validate=True):
88 if self.translate:
89 value = self.convert_to_column(value, record, values, validate)
90 if value is None:
91 return None
92 return PsycopgJson({'en_US': value, record.env.lang or 'en_US': value})
93 return super().convert_to_column_insert(value, record, values, validate)
95 def get_column_update(self, record):
96 if self.translate:
97 assert self not in record.env._field_depends_context, f"translated field {self} cannot depend on context"
98 value = record.env.transaction.field_data[self][record.id]
99 return PsycopgJson(value) if value else None
100 return super().get_column_update(record)
102 def convert_to_cache(self, value, record, validate=True):
103 if value is None or value is False:
104 return None
106 if isinstance(value, bytes):
107 s = value.decode()
108 else:
109 s = str(value)
110 value = s[:self.size]
111 if validate and callable(self.translate):
112 # pylint: disable=not-callable
113 value = self.translate(lambda t: None, value)
114 return value
116 def convert_to_record(self, value, record):
117 if value is None:
118 return False
119 if not self.translate:
120 return value
121 if isinstance(value, dict): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 lang = self.translation_lang(record.env)
123 # raise a KeyError for the __get__ function
124 value = value[lang]
125 if ( 125 ↛ 130line 125 didn't jump to line 130 because the condition on line 125 was never true
126 callable(self.translate)
127 and record.env.context.get('edit_translations')
128 and self.get_trans_terms(value)
129 ):
130 base_lang = record._get_base_lang()
131 lang = record.env.lang or 'en_US'
132 delay_translation = value != record.with_context(edit_translations=None, check_translations=None, lang=lang)[self.name]
134 if lang != base_lang:
135 base_value = record.with_context(edit_translations=None, check_translations=True, lang=base_lang)[self.name]
136 base_terms = self.get_trans_terms(base_value)
137 translated_terms = self.get_trans_terms(value) if value != base_value else base_terms
138 if len(base_terms) != len(translated_terms):
139 # term number mismatch, ignore all translations
140 value = base_value
141 translated_terms = base_terms
142 get_base = dict(zip(translated_terms, base_terms)).__getitem__
143 else:
144 get_base = lambda term: term
146 # use a wrapper to let the frontend js code identify each term and
147 # its metadata in the 'edit_translations' context
148 def translate_func(term):
149 source_term = get_base(term)
150 translation_state = 'translated' if lang == base_lang or source_term != term else 'to_translate'
151 translation_source_sha = sha256(source_term.encode()).hexdigest()
152 return (
153 '<span '
154 f'''{'class="o_delay_translation" ' if delay_translation else ''}'''
155 f'data-oe-model="{markup_escape(record._name)}" '
156 f'data-oe-id="{markup_escape(record.id)}" '
157 f'data-oe-field="{markup_escape(self.name)}" '
158 f'data-oe-translation-state="{translation_state}" '
159 f'data-oe-translation-source-sha="{translation_source_sha}"'
160 '>'
161 f'{term}'
162 '</span>'
163 )
164 # pylint: disable=not-callable
165 value = self.translate(translate_func, value)
166 return value
168 def convert_to_write(self, value, record):
169 return value
171 def get_translation_dictionary(self, from_lang_value, to_lang_values):
172 """ Build a dictionary from terms in from_lang_value to terms in to_lang_values
174 :param str from_lang_value: from xml/html
175 :param dict to_lang_values: {lang: lang_value}
177 :return: {from_lang_term: {lang: lang_term}}
178 :rtype: dict
179 """
181 from_lang_terms = self.get_trans_terms(from_lang_value)
182 dictionary = defaultdict(lambda: defaultdict(dict))
183 if not from_lang_terms:
184 return dictionary
185 dictionary.update({from_lang_term: defaultdict(dict) for from_lang_term in from_lang_terms})
187 for lang, to_lang_value in to_lang_values.items():
188 to_lang_terms = self.get_trans_terms(to_lang_value)
189 if len(from_lang_terms) != len(to_lang_terms): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 for from_lang_term in from_lang_terms:
191 dictionary[from_lang_term][lang] = from_lang_term
192 else:
193 for from_lang_term, to_lang_term in zip(from_lang_terms, to_lang_terms):
194 dictionary[from_lang_term][lang] = to_lang_term
195 return dictionary
197 def _get_stored_translations(self, record):
198 """
199 : return: {'en_US': 'value_en_US', 'fr_FR': 'French'}
200 """
201 # assert (self.translate and self.store and record)
202 record.flush_recordset([self.name])
203 cr = record.env.cr
204 cr.execute(SQL(
205 "SELECT %s FROM %s WHERE id = %s",
206 SQL.identifier(self.name),
207 SQL.identifier(record._table),
208 record.id,
209 ))
210 res = cr.fetchone()
211 return res[0] if res else None
213 def translation_lang(self, env):
214 return (env.lang or 'en_US') if self.translate is True else env._lang
216 def get_translation_fallback_langs(self, env):
217 lang = self.translation_lang(env)
218 if lang == '_en_US': 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 return '_en_US', 'en_US'
220 if lang == 'en_US': 220 ↛ 222line 220 didn't jump to line 222 because the condition on line 220 was always true
221 return ('en_US',)
222 if lang.startswith('_'):
223 return lang, lang[1:], '_en_US', 'en_US'
224 return lang, 'en_US'
226 def _get_cache_impl(self, env):
227 cache = super()._get_cache_impl(env)
228 if not self.translate or env.context.get('prefetch_langs'):
229 return cache
230 lang = self.translation_lang(env)
231 return LangProxyDict(self, cache, lang)
233 def _cache_missing_ids(self, records):
234 if self.translate and records.env.context.get('prefetch_langs'): 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was never true
235 # we always need to fetch the current language in the cache
236 records = records.with_context(prefetch_langs=False)
237 return super()._cache_missing_ids(records)
239 def _to_prefetch(self, record):
240 if self.translate and record.env.context.get('prefetch_langs'): 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was never true
241 # we always need to fetch the current language in the cache
242 return super()._to_prefetch(record.with_context(prefetch_langs=False)).with_env(record.env)
243 return super()._to_prefetch(record)
245 def _insert_cache(self, records, values):
246 if not self.translate:
247 super()._insert_cache(records, values)
248 return
250 assert self not in records.env._field_depends_context, f"translated field {self} cannot depend on context"
251 env = records.env
252 field_cache = env.transaction.field_data[self]
253 if env.context.get('prefetch_langs'): 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 installed = [lang for lang, _ in env['res.lang'].get_installed()]
255 langs = OrderedSet[str](installed + ['en_US'])
256 u_langs: list[str] = [f'_{lang}' for lang in langs] if self.translate is not True and env._lang.startswith('_') else []
257 for id_, val in zip(records._ids, values):
258 if val is None:
259 field_cache.setdefault(id_, None)
260 else:
261 if u_langs: # fallback missing _lang to lang if exists
262 val.update({f'_{k}': v for k, v in val.items() if k in langs and f'_{k}' not in val})
263 field_cache[id_] = {
264 **dict.fromkeys(langs, val['en_US']), # fallback missing lang to en_US
265 **dict.fromkeys(u_langs, val.get('_en_US')), # fallback missing _lang to _en_US
266 **val
267 }
268 else:
269 lang = self.translation_lang(env)
270 for id_, val in zip(records._ids, values):
271 if val is None:
272 field_cache.setdefault(id_, None)
273 else:
274 cache_value = field_cache.setdefault(id_, {})
275 if cache_value is not None: 275 ↛ 270line 275 didn't jump to line 270 because the condition on line 275 was always true
276 cache_value.setdefault(lang, val)
278 def _update_cache(self, records, cache_value, dirty=False):
279 if self.translate and cache_value is not None and records.env.context.get('prefetch_langs'):
280 assert isinstance(cache_value, dict), f"invalid cache value for {self}"
281 if len(records) > 1: 281 ↛ 283line 281 didn't jump to line 283 because the condition on line 281 was never true
282 # new dict for each record
283 for record in records:
284 super()._update_cache(record, dict(cache_value), dirty)
285 return
286 super()._update_cache(records, cache_value, dirty)
288 def write(self, records, value):
289 if not self.translate or value is False or value is None:
290 super().write(records, value)
291 return
292 cache_value = self.convert_to_cache(value, records)
293 records = self._filter_not_equal(records, cache_value)
294 if not records:
295 return
296 field_cache = self._get_cache(records.env)
297 dirty_ids = records.env._field_dirty.get(self, ())
299 # flush dirty None values
300 dirty_records = records.filtered(lambda rec: rec.id in dirty_ids)
301 if any(field_cache.get(record_id, SENTINEL) is None for record_id in dirty_records._ids): 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 dirty_records.flush_recordset([self.name])
304 dirty = self.store and any(records._ids)
305 lang = self.translation_lang(records.env)
307 # not dirty fields
308 if not dirty:
309 if self.compute and self.inverse:
310 # invalidate the values in other languages to force their recomputation
311 self._update_cache(records.with_context(prefetch_langs=True), {lang: cache_value}, dirty=False)
312 else:
313 self._update_cache(records, cache_value, dirty=False)
314 return
316 # model translation
317 if not callable(self.translate):
318 # invalidate clean fields because them may contain fallback value
319 clean_records = records.filtered(lambda rec: rec.id not in dirty_ids)
320 clean_records.invalidate_recordset([self.name])
321 self._update_cache(records, cache_value, dirty=True)
322 if lang != 'en_US' and not records.env['res.lang']._get_data(code='en_US'): 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was never true
323 # if 'en_US' is not active, we always write en_US to make sure value_en is meaningful
324 self._update_cache(records.with_context(lang='en_US'), cache_value, dirty=True)
325 return
327 # model term translation
328 new_translations_list = []
329 new_terms = set(self.get_trans_terms(cache_value))
330 delay_translations = records.env.context.get('delay_translations')
331 for record in records:
332 # shortcut when no term needs to be translated
333 if not new_terms:
334 new_translations_list.append({'en_US': cache_value, lang: cache_value})
335 continue
336 # _get_stored_translations can be refactored and prefetches translations for multi records,
337 # but it is really rare to write the same non-False/None/no-term value to multi records
338 stored_translations = self._get_stored_translations(record)
339 if not stored_translations:
340 new_translations_list.append({'en_US': cache_value, lang: cache_value})
341 continue
342 old_translations = {
343 k: stored_translations.get(f'_{k}', v)
344 for k, v in stored_translations.items()
345 if not k.startswith('_')
346 }
347 from_lang_value = old_translations.pop(lang, old_translations['en_US'])
348 translation_dictionary = self.get_translation_dictionary(from_lang_value, old_translations)
349 text2terms = defaultdict(list)
350 for term in new_terms:
351 if term_text := self.get_text_content(term): 351 ↛ 350line 351 didn't jump to line 350 because the condition on line 351 was always true
352 text2terms[term_text].append(term)
354 is_text = self.translate.is_text if hasattr(self.translate, 'is_text') else lambda term: True
355 term_adapter = self.translate.term_adapter if hasattr(self.translate, 'term_adapter') else None
356 for old_term in list(translation_dictionary.keys()):
357 if old_term not in new_terms:
358 old_term_text = self.get_text_content(old_term)
359 matches = get_close_matches(old_term_text, text2terms, 1, 0.9)
360 if matches: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true
361 closest_term = get_close_matches(old_term, text2terms[matches[0]], 1, 0)[0]
362 if closest_term in translation_dictionary:
363 continue
364 old_is_text = is_text(old_term)
365 closest_is_text = is_text(closest_term)
366 if old_is_text or not closest_is_text:
367 if not closest_is_text and records.env.context.get("install_mode") and lang == 'en_US' and term_adapter:
368 adapter = term_adapter(closest_term)
369 if adapter(old_term) is None: # old term and closest_term have different structures
370 continue
371 translation_dictionary[closest_term] = {k: adapter(v) for k, v in translation_dictionary.pop(old_term).items()}
372 else:
373 translation_dictionary[closest_term] = translation_dictionary.pop(old_term)
374 # pylint: disable=not-callable
375 new_translations = {
376 l: self.translate(lambda term: translation_dictionary.get(term, {l: None})[l], cache_value)
377 for l in old_translations.keys()
378 }
379 if delay_translations: 379 ↛ 380line 379 didn't jump to line 380 because the condition on line 379 was never true
380 new_store_translations = stored_translations
381 new_store_translations.update({f'_{k}': v for k, v in new_translations.items()})
382 new_store_translations.pop(f'_{lang}', None)
383 else:
384 new_store_translations = new_translations
385 new_store_translations[lang] = cache_value
387 if not records.env['res.lang']._get_data(code='en_US'): 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 new_store_translations['en_US'] = cache_value
389 new_store_translations.pop('_en_US', None)
390 new_translations_list.append(new_store_translations)
391 for record, new_translation in zip(records.with_context(prefetch_langs=True), new_translations_list, strict=True):
392 self._update_cache(record, new_translation, dirty=True)
394 def to_sql(self, model: BaseModel, alias: str) -> SQL:
395 sql_field = super().to_sql(model, alias)
396 if self.translate and not model.env.context.get('prefetch_langs'):
397 langs = self.get_translation_fallback_langs(model.env)
398 sql_field_langs = [SQL("%s->>%s", sql_field, lang) for lang in langs]
399 if len(sql_field_langs) == 1: 399 ↛ 401line 399 didn't jump to line 401 because the condition on line 399 was always true
400 return sql_field_langs[0]
401 return SQL("COALESCE(%s)", SQL(", ").join(sql_field_langs))
402 return sql_field
404 def expression_getter(self, field_expr):
405 if field_expr != 'display_name.no_error': 405 ↛ 410line 405 didn't jump to line 410 because the condition on line 405 was always true
406 return super().expression_getter(field_expr)
408 # when searching by display_name, don't raise AccessError but return an
409 # empty value instead
410 get_display_name = super().expression_getter('display_name')
412 def getter(record):
413 try:
414 return get_display_name(record)
415 except AccessError:
416 return ''
418 return getter
420 def condition_to_sql(self, field_expr: str, operator: str, value, model: BaseModel, alias: str, query: Query) -> SQL:
421 # build the condition
422 if self.translate and model.env.context.get('prefetch_langs'): 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 model = model.with_context(prefetch_langs=False)
424 base_condition = super().condition_to_sql(field_expr, operator, value, model, alias, query)
426 # faster SQL for index trigrams
427 if ( 427 ↛ 440line 427 didn't jump to line 440 because the condition on line 427 was never true
428 self.translate
429 and value
430 and operator in ('in', 'like', 'ilike', '=like', '=ilike')
431 and self.index == 'trigram'
432 and model.pool.has_trigram
433 and (
434 isinstance(value, str)
435 or (isinstance(value, COLLECTION_TYPES) and all(isinstance(v, str) for v in value))
436 )
437 ):
438 # a prefilter using trigram index to speed up '=', 'like', 'ilike'
439 # '!=', '<=', '<', '>', '>=', 'in', 'not in', 'not like', 'not ilike' cannot use this trick
440 if operator == 'in' and len(value) == 1:
441 value = value_to_translated_trigram_pattern(next(iter(value)))
442 elif operator != 'in':
443 value = pattern_to_translated_trigram_pattern(value)
444 else:
445 value = '%'
447 if value == '%':
448 return base_condition
450 raw_sql_field = self.to_sql(model.with_context(prefetch_langs=True), alias)
451 sql_left = SQL("jsonb_path_query_array(%s, '$.*')::text", raw_sql_field)
452 sql_operator = SQL_OPERATORS['like' if operator == 'in' else operator]
453 sql_right = SQL("%s", self.convert_to_column(value, model, validate=False))
454 unaccent = model.env.registry.unaccent
455 return SQL(
456 "(%s%s%s AND %s)",
457 unaccent(sql_left),
458 sql_operator,
459 unaccent(sql_right),
460 base_condition,
461 )
462 return base_condition
465class Char(BaseString):
466 """ Basic string field, can be length-limited, usually displayed as a
467 single-line string in clients.
469 :param int size: the maximum size of values stored for that field
471 :param bool trim: states whether the value is trimmed or not (by default,
472 ``True``). Note that the trim operation is applied by both the server code and the web client
473 This ensures consistent behavior between imported data and UI-entered data.
475 - The web client trims user input during in write/create flows in UI.
476 - The server trims values during import (in `base_import`) to avoid discrepancies between
477 trimmed form inputs and stored DB values.
479 :param translate: enable the translation of the field's values; use
480 ``translate=True`` to translate field values as a whole; ``translate``
481 may also be a callable such that ``translate(callback, value)``
482 translates ``value`` by using ``callback(term)`` to retrieve the
483 translation of terms.
484 :type translate: bool or callable
485 """
486 type = 'char'
487 trim: bool = True # whether value is trimmed (only by web client and base_import)
489 def _setup_attrs__(self, model_class, name):
490 super()._setup_attrs__(model_class, name)
491 assert self.size is None or isinstance(self.size, int), \
492 "Char field %s with non-integer size %r" % (self, self.size)
494 @property
495 def _column_type(self):
496 return ('varchar', pg_varchar(self.size))
498 def update_db_column(self, model, column):
499 if (
500 column and self.column_type[0] == 'varchar' and
501 column['udt_name'] == 'varchar' and column['character_maximum_length'] and
502 (self.size is None or column['character_maximum_length'] < self.size)
503 ):
504 # the column's varchar size does not match self.size; convert it
505 sql.convert_column(model.env.cr, model._table, self.name, self.column_type[1])
506 super().update_db_column(model, column)
508 _related_size = property(attrgetter('size'))
509 _related_trim = property(attrgetter('trim'))
510 _description_size = property(attrgetter('size'))
511 _description_trim = property(attrgetter('trim'))
513 def get_depends(self, model):
514 depends, depends_context = super().get_depends(model)
516 # display_name may depend on context['lang'] (`test_lp1071710`)
517 if (
518 self.name == 'display_name'
519 and self.compute
520 and not self.store
521 and model._rec_name
522 and model._fields[model._rec_name].base_field.translate
523 and 'lang' not in depends_context
524 ):
525 depends_context = [*depends_context, 'lang']
527 return depends, depends_context
530class Text(BaseString):
531 """ Very similar to :class:`Char` but used for longer contents, does not
532 have a size and usually displayed as a multiline text box.
534 :param translate: enable the translation of the field's values; use
535 ``translate=True`` to translate field values as a whole; ``translate``
536 may also be a callable such that ``translate(callback, value)``
537 translates ``value`` by using ``callback(term)`` to retrieve the
538 translation of terms.
539 :type translate: bool or callable
540 """
541 type = 'text'
542 _column_type = ('text', 'text')
545class Html(BaseString):
546 """ Encapsulates an html code content.
548 :param bool sanitize: whether value must be sanitized (default: ``True``)
549 :param bool sanitize_overridable: whether the sanitation can be bypassed by
550 the users part of the `base.group_sanitize_override` group (default: ``False``)
551 :param bool sanitize_tags: whether to sanitize tags
552 (only a white list of attributes is accepted, default: ``True``)
553 :param bool sanitize_attributes: whether to sanitize attributes
554 (only a white list of attributes is accepted, default: ``True``)
555 :param bool sanitize_style: whether to sanitize style attributes (default: ``False``)
556 :param bool sanitize_conditional_comments: whether to kill conditional comments. (default: ``True``)
557 :param bool sanitize_output_method: whether to sanitize using html or xhtml (default: ``html``)
558 :param bool strip_style: whether to strip style attributes
559 (removed and therefore not sanitized, default: ``False``)
560 :param bool strip_classes: whether to strip classes attributes (default: ``False``)
561 """
562 type = 'html'
563 _column_type = ('text', 'text')
565 sanitize: bool = True # whether value must be sanitized
566 sanitize_overridable: bool = False # whether the sanitation can be bypassed by the users part of the `base.group_sanitize_override` group
567 sanitize_tags: bool = True # whether to sanitize tags (only a white list of attributes is accepted)
568 sanitize_attributes: bool = True # whether to sanitize attributes (only a white list of attributes is accepted)
569 sanitize_style: bool = False # whether to sanitize style attributes
570 sanitize_form: bool = True # whether to sanitize forms
571 sanitize_conditional_comments: bool = True # whether to kill conditional comments. Otherwise keep them but with their content sanitized.
572 sanitize_output_method: str = 'html' # whether to sanitize using html or xhtml
573 strip_style: bool = False # whether to strip style attributes (removed and therefore not sanitized)
574 strip_classes: bool = False # whether to strip classes attributes
576 def _get_attrs(self, model_class, name):
577 # called by _setup_attrs__(), working together with BaseString._setup_attrs__()
578 attrs = super()._get_attrs(model_class, name)
579 # Shortcut for common sanitize options
580 # Outgoing and incoming emails should not be sanitized with the same options.
581 # e.g. conditional comments: no need to keep conditional comments for incoming emails,
582 # we do not need this Microsoft Outlook client feature for emails displayed Odoo's web client.
583 # While we need to keep them in mail templates and mass mailings, because they could be rendered in Outlook.
584 if attrs.get('sanitize') == 'email_outgoing':
585 attrs['sanitize'] = True
586 attrs.update({key: value for key, value in {
587 'sanitize_tags': False,
588 'sanitize_attributes': False,
589 'sanitize_conditional_comments': False,
590 'sanitize_output_method': 'xml',
591 }.items() if key not in attrs})
592 # Translated sanitized html fields must use html_translate or a callable.
593 # `elif` intended, because HTML fields with translate=True and sanitize=False
594 # where not using `html_translate` before and they must remain without `html_translate`.
595 # Otherwise, breaks `--test-tags .test_render_field`, for instance.
596 elif attrs.get('translate') is True and attrs.get('sanitize', True):
597 attrs['translate'] = html_translate
598 return attrs
600 _related_sanitize = property(attrgetter('sanitize'))
601 _related_sanitize_tags = property(attrgetter('sanitize_tags'))
602 _related_sanitize_attributes = property(attrgetter('sanitize_attributes'))
603 _related_sanitize_style = property(attrgetter('sanitize_style'))
604 _related_strip_style = property(attrgetter('strip_style'))
605 _related_strip_classes = property(attrgetter('strip_classes'))
607 _description_sanitize = property(attrgetter('sanitize'))
608 _description_sanitize_tags = property(attrgetter('sanitize_tags'))
609 _description_sanitize_attributes = property(attrgetter('sanitize_attributes'))
610 _description_sanitize_style = property(attrgetter('sanitize_style'))
611 _description_strip_style = property(attrgetter('strip_style'))
612 _description_strip_classes = property(attrgetter('strip_classes'))
614 def convert_to_column(self, value, record, values=None, validate=True):
615 value = self._convert(value, record, validate=validate)
616 return super().convert_to_column(value, record, values, validate=False)
618 def convert_to_cache(self, value, record, validate=True):
619 return self._convert(value, record, validate)
621 def _convert(self, value, record, validate):
622 if value is None or value is False:
623 return None
625 if not validate or not self.sanitize:
626 return value
628 sanitize_vals = {
629 'silent': True,
630 'sanitize_tags': self.sanitize_tags,
631 'sanitize_attributes': self.sanitize_attributes,
632 'sanitize_style': self.sanitize_style,
633 'sanitize_form': self.sanitize_form,
634 'sanitize_conditional_comments': self.sanitize_conditional_comments,
635 'output_method': self.sanitize_output_method,
636 'strip_style': self.strip_style,
637 'strip_classes': self.strip_classes
638 }
640 if self.sanitize_overridable:
641 if record.env.user.has_group('base.group_sanitize_override'): 641 ↛ 644line 641 didn't jump to line 644 because the condition on line 641 was always true
642 return value
644 original_value = record[self.name]
645 if original_value:
646 # Note that sanitize also normalize
647 original_value_sanitized = html_sanitize(original_value, **sanitize_vals)
648 original_value_normalized = html_normalize(original_value)
650 if (
651 not original_value_sanitized # sanitizer could empty it
652 or original_value_normalized != original_value_sanitized
653 ):
654 # The field contains element(s) that would be removed if
655 # sanitized. It means that someone who was part of a group
656 # allowing to bypass the sanitation saved that field
657 # previously.
659 diff = unified_diff(
660 original_value_sanitized.splitlines(),
661 original_value_normalized.splitlines(),
662 )
664 with_colors = isinstance(logging.getLogger().handlers[0].formatter, ColoredFormatter)
665 diff_str = f'The field ({record._description}, {self.string}) will not be editable:\n'
666 for line in list(diff)[2:]:
667 if with_colors:
668 color = {'-': RED, '+': GREEN}.get(line[:1], DEFAULT)
669 diff_str += COLOR_PATTERN % (30 + color, 40 + DEFAULT, line.rstrip() + "\n")
670 else:
671 diff_str += line.rstrip() + '\n'
672 _logger.info(diff_str)
674 raise UserError(record.env._(
675 "The field value you're saving (%(model)s %(field)s) includes content that is "
676 "restricted for security reasons. It is possible that someone "
677 "with higher privileges previously modified it, and you are therefore "
678 "not able to modify it yourself while preserving the content.",
679 model=record._description, field=self.string,
680 ))
682 return html_sanitize(value, **sanitize_vals)
684 def convert_to_record(self, value, record):
685 r = super().convert_to_record(value, record)
686 if isinstance(r, bytes): 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true
687 r = r.decode()
688 return r and Markup(r)
690 def convert_to_read(self, value, record, use_display_name=True):
691 r = super().convert_to_read(value, record, use_display_name)
692 if isinstance(r, bytes): 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true
693 r = r.decode()
694 return r and Markup(r)
696 def get_trans_terms(self, value):
697 # ensure the translation terms are stringified, otherwise we can break the PO file
698 return list(map(str, super().get_trans_terms(value)))
700 escape = staticmethod(markup_escape)
701 is_empty = staticmethod(is_html_empty)
702 to_plaintext = staticmethod(html2plaintext)
703 from_plaintext = staticmethod(plaintext2html)
706class LangProxyDict(collections.abc.MutableMapping):
707 """A view on a dict[id, dict[lang, value]] that maps id to value given a
708 fixed language."""
709 __slots__ = ('_cache', '_field', '_lang')
711 def __init__(self, field: BaseString, cache: dict, lang: str):
712 super().__init__()
713 self._field = field
714 self._cache = cache
715 self._lang = lang
717 def get(self, key, default=None):
718 # just for performance
719 vals = self._cache.get(key, SENTINEL)
720 if vals is SENTINEL:
721 return default
722 if vals is None:
723 return None
724 if not (self._field.compute or (self._field.store and (key or key.origin))): 724 ↛ 728line 724 didn't jump to line 728 because the condition on line 724 was never true
725 # the field's value is neither computed, nor in database
726 # (non-stored field or new record without origin), so fallback on
727 # its 'en_US' value in cache
728 return vals.get(self._lang, vals.get('en_US', default))
729 return vals.get(self._lang, default)
731 def __getitem__(self, key):
732 vals = self._cache[key]
733 if vals is None:
734 return None
735 if not (self._field.compute or (self._field.store and (key or key.origin))): 735 ↛ 739line 735 didn't jump to line 739 because the condition on line 735 was never true
736 # the field's value is neither computed, nor in database
737 # (non-stored field or new record without origin), so fallback on
738 # its 'en_US' value in cache
739 return vals.get(self._lang, vals.get('en_US'))
740 return vals[self._lang]
742 def __setitem__(self, key, value):
743 if value is None:
744 self._cache[key] = None
745 return
746 vals = self._cache.get(key)
747 if vals is None:
748 # key is not in cache, or {key: None} is in cache
749 self._cache[key] = vals = {self._lang: value}
750 else:
751 vals[self._lang] = value
752 if not (self._field.compute or (self._field.store and (key or key.origin))):
753 # the field's value is neither computed, nor in database
754 # (non-stored field or new record without origin), so the cache
755 # must contain the fallback 'en_US' value for other languages
756 vals.setdefault('en_US', value)
758 def __delitem__(self, key):
759 vals = self._cache.get(key)
760 if vals:
761 vals.pop(self._lang, None)
763 def __iter__(self):
764 for key, vals in self._cache.items():
765 if vals is None or self._lang in vals:
766 yield key
768 def __len__(self):
769 return sum(1 for _ in self)
771 def clear(self):
772 for vals in self._cache.values():
773 if vals:
774 vals.pop(self._lang, None)
776 def __repr__(self):
777 return f"<LangProxyDict lang={self._lang!r} size={len(self._cache)} at {hex(id(self))}>"