Coverage for adhoc-cicd-odoo-odoo / odoo / tools / misc.py: 60%
792 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-09 18:15 +0000
1# Part of Odoo. See LICENSE file for full copyright and licensing details.
2"""
3Miscellaneous tools used by Odoo.
4"""
5from __future__ import annotations
7import base64
8import collections
9import csv
10import datetime
11import enum
12import hashlib
13import hmac as hmac_lib
14import itertools
15import json
16import logging
17import os
18import re
19import sys
20import tempfile
21import threading
22import time
23import traceback
24import typing
25import unicodedata
26import warnings
27import zlib
28from collections import defaultdict
29from collections.abc import Iterable, Iterator, Mapping, MutableMapping, MutableSet, Reversible
30from contextlib import ContextDecorator, contextmanager
31from difflib import HtmlDiff
32from functools import lru_cache, reduce, wraps
33from itertools import islice, groupby as itergroupby
34from operator import itemgetter
36import babel
37import babel.dates
38import markupsafe
39import pytz
40from lxml import etree, objectify
42# get_encodings, ustr and exception_to_unicode were originally from tools.misc.
43# There are moved to loglevels until we refactor tools.
44from odoo.loglevels import exception_to_unicode, get_encodings, ustr # noqa: F401
46from .config import config
47from .float_utils import float_round
48from .which import which
50K = typing.TypeVar('K')
51T = typing.TypeVar('T')
52if typing.TYPE_CHECKING:
53 from collections.abc import Callable, Collection, Sequence
54 from odoo.api import Environment
55 from odoo.addons.base.models.res_lang import LangData
57 P = typing.TypeVar('P')
59__all__ = [
60 'DEFAULT_SERVER_DATETIME_FORMAT',
61 'DEFAULT_SERVER_DATE_FORMAT',
62 'DEFAULT_SERVER_TIME_FORMAT',
63 'NON_BREAKING_SPACE',
64 'SKIPPED_ELEMENT_TYPES',
65 'DotDict',
66 'LastOrderedSet',
67 'OrderedSet',
68 'Reverse',
69 'babel_locale_parse',
70 'clean_context',
71 'consteq',
72 'discardattr',
73 'exception_to_unicode',
74 'file_open',
75 'file_open_temporary_directory',
76 'file_path',
77 'find_in_path',
78 'formatLang',
79 'format_amount',
80 'format_date',
81 'format_datetime',
82 'format_duration',
83 'format_time',
84 'frozendict',
85 'get_encodings',
86 'get_iso_codes',
87 'get_lang',
88 'groupby',
89 'hmac',
90 'hash_sign',
91 'verify_hash_signed',
92 'html_escape',
93 'human_size',
94 'is_list_of',
95 'merge_sequences',
96 'mod10r',
97 'mute_logger',
98 'parse_date',
99 'partition',
100 'posix_to_ldml',
101 'remove_accents',
102 'replace_exceptions',
103 'reverse_enumerate',
104 'split_every',
105 'str2bool',
106 'street_split',
107 'topological_sort',
108 'unique',
109 'ustr',
110 'real_time',
111]
113_logger = logging.getLogger(__name__)
115# List of etree._Element subclasses that we choose to ignore when parsing XML.
116# We include the *Base ones just in case, currently they seem to be subclasses of the _* ones.
117SKIPPED_ELEMENT_TYPES = (etree._Comment, etree._ProcessingInstruction, etree.CommentBase, etree.PIBase, etree._Entity)
119# Configure default global parser
120etree.set_default_parser(etree.XMLParser(resolve_entities=False))
121default_parser = etree.XMLParser(resolve_entities=False, remove_blank_text=True)
122default_parser.set_element_class_lookup(objectify.ObjectifyElementClassLookup())
123objectify.set_default_parser(default_parser)
125NON_BREAKING_SPACE = u'\N{NO-BREAK SPACE}'
127# ensure we have a non patched time for query times when using freezegun
128real_time = time.time.__call__ # type: ignore
131class Sentinel(enum.Enum):
132 """Class for typing parameters with a sentinel as a default"""
133 SENTINEL = -1
136SENTINEL = Sentinel.SENTINEL
138#----------------------------------------------------------
139# Subprocesses
140#----------------------------------------------------------
142def find_in_path(name):
143 path = os.environ.get('PATH', os.defpath).split(os.pathsep)
144 if config.get('bin_path') and config['bin_path'] != 'None':
145 path.append(config['bin_path'])
146 return which(name, path=os.pathsep.join(path))
148# ----------------------------------------------------------
149# Postgres subprocesses
150# ----------------------------------------------------------
153def find_pg_tool(name):
154 path = None
155 if config['pg_path'] and config['pg_path'] != 'None':
156 path = config['pg_path']
157 try:
158 return which(name, path=path)
159 except OSError:
160 raise Exception('Command `%s` not found.' % name)
163def exec_pg_environ():
164 """
165 Force the database PostgreSQL environment variables to the database
166 configuration of Odoo.
168 Note: On systems where pg_restore/pg_dump require an explicit password
169 (i.e. on Windows where TCP sockets are used), it is necessary to pass the
170 postgres user password in the PGPASSWORD environment variable or in a
171 special .pgpass file.
173 See also https://www.postgresql.org/docs/current/libpq-envars.html
174 """
175 env = os.environ.copy()
176 if config['db_host']:
177 env['PGHOST'] = config['db_host']
178 if config['db_port']:
179 env['PGPORT'] = str(config['db_port'])
180 if config['db_user']:
181 env['PGUSER'] = config['db_user']
182 if config['db_password']:
183 env['PGPASSWORD'] = config['db_password']
184 if config['db_app_name']:
185 env['PGAPPNAME'] = config['db_app_name'].replace('{pid}', f'env{os.getpid()}')[:63]
186 if config['db_sslmode']:
187 env['PGSSLMODE'] = config['db_sslmode']
188 return env
191# ----------------------------------------------------------
192# File paths
193# ----------------------------------------------------------
196def file_path(file_path: str, filter_ext: tuple[str, ...] = ('',), env: Environment | None = None, *, check_exists: bool = True) -> str:
197 """Verify that a file exists under a known `addons_path` directory and return its full path.
199 Examples::
201 >>> file_path('hr')
202 >>> file_path('hr/static/description/icon.png')
203 >>> file_path('hr/static/description/icon.png', filter_ext=('.png', '.jpg'))
205 :param str file_path: absolute file path, or relative path within any `addons_path` directory
206 :param list[str] filter_ext: optional list of supported extensions (lowercase, with leading dot)
207 :param env: optional environment, required for a file path within a temporary directory
208 created using `file_open_temporary_directory()`
209 :param check_exists: check that the file exists (default: True)
210 :return: the absolute path to the file
211 :raise FileNotFoundError: if the file is not found under the known `addons_path` directories
212 :raise ValueError: if the file doesn't have one of the supported extensions (`filter_ext`)
213 """
214 import odoo.addons # noqa: PLC0415
215 is_abs = os.path.isabs(file_path)
216 normalized_path = os.path.normpath(os.path.normcase(file_path))
218 if filter_ext and not normalized_path.lower().endswith(filter_ext): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 raise ValueError("Unsupported file: " + file_path)
221 # ignore leading 'addons/' if present, it's the final component of root_path, but
222 # may sometimes be included in relative paths
223 normalized_path = normalized_path.removeprefix('addons' + os.sep)
225 # if path is relative and represents a loaded module, accept only the
226 # __path__ for that module; otherwise, search in all accepted paths
227 file_path_split = normalized_path.split(os.path.sep)
228 if not is_abs and (module := sys.modules.get(f'odoo.addons.{file_path_split[0]}')):
229 addons_paths = list(map(os.path.dirname, module.__path__))
230 else:
231 root_path = os.path.abspath(config.root_path)
232 temporary_paths = env.transaction._Transaction__file_open_tmp_paths if env else []
233 addons_paths = [*odoo.addons.__path__, root_path, *temporary_paths]
235 for addons_dir in addons_paths:
236 # final path sep required to avoid partial match
237 parent_path = os.path.normpath(os.path.normcase(addons_dir)) + os.sep
238 if is_abs:
239 fpath = normalized_path
240 else:
241 fpath = os.path.normpath(os.path.join(parent_path, normalized_path))
242 if fpath.startswith(parent_path) and (
243 # we check existence when asked or we have multiple paths to check
244 # (there is one possibility for absolute paths)
245 (not check_exists and (is_abs or len(addons_paths) == 1))
246 or os.path.exists(fpath)
247 ):
248 return fpath
250 raise FileNotFoundError("File not found: " + file_path)
253def file_open(name: str, mode: str = "r", filter_ext: tuple[str, ...] = (), env: Environment | None = None):
254 """Open a file from within the addons_path directories, as an absolute or relative path.
256 Examples::
258 >>> file_open('hr/static/description/icon.png')
259 >>> file_open('hr/static/description/icon.png', filter_ext=('.png', '.jpg'))
260 >>> with file_open('/opt/odoo/addons/hr/static/description/icon.png', 'rb') as f:
261 ... contents = f.read()
263 :param name: absolute or relative path to a file located inside an addon
264 :param mode: file open mode, as for `open()`
265 :param list[str] filter_ext: optional list of supported extensions (lowercase, with leading dot)
266 :param env: optional environment, required to open a file within a temporary directory
267 created using `file_open_temporary_directory()`
268 :return: file object, as returned by `open()`
269 :raise FileNotFoundError: if the file is not found under the known `addons_path` directories
270 :raise ValueError: if the file doesn't have one of the supported extensions (`filter_ext`)
271 """
272 path = file_path(name, filter_ext=filter_ext, env=env, check_exists=False)
273 encoding = None
274 if 'b' not in mode:
275 # Force encoding for text mode, as system locale could affect default encoding,
276 # even with the latest Python 3 versions.
277 # Note: This is not covered by a unit test, due to the platform dependency.
278 # For testing purposes you should be able to force a non-UTF8 encoding with:
279 # `sudo locale-gen fr_FR; LC_ALL=fr_FR.iso8859-1 python3 ...'
280 # See also PEP-540, although we can't rely on that at the moment.
281 encoding = "utf-8"
282 if any(m in mode for m in ('w', 'x', 'a')) and not os.path.isfile(path): 282 ↛ 284line 282 didn't jump to line 284 because the condition on line 282 was never true
283 # Don't let create new files
284 raise FileNotFoundError(f"Not a file: {path}")
285 return open(path, mode, encoding=encoding)
288@contextmanager
289def file_open_temporary_directory(env: Environment):
290 """Create and return a temporary directory added to the directories `file_open` is allowed to read from.
292 `file_open` will be allowed to open files within the temporary directory
293 only for environments of the same transaction than `env`.
294 Meaning, other transactions/requests from other users or even other databases
295 won't be allowed to open files from this directory.
297 Examples::
299 >>> with odoo.tools.file_open_temporary_directory(self.env) as module_dir:
300 ... with zipfile.ZipFile('foo.zip', 'r') as z:
301 ... z.extract('foo/__manifest__.py', module_dir)
302 ... with odoo.tools.file_open('foo/__manifest__.py', env=self.env) as f:
303 ... manifest = f.read()
305 :param env: environment for which the temporary directory is created.
306 :return: the absolute path to the created temporary directory
307 """
308 with tempfile.TemporaryDirectory() as module_dir:
309 try:
310 env.transaction._Transaction__file_open_tmp_paths.append(module_dir)
311 yield module_dir
312 finally:
313 env.transaction._Transaction__file_open_tmp_paths.remove(module_dir)
316#----------------------------------------------------------
317# iterables
318#----------------------------------------------------------
319def flatten(list):
320 """Flatten a list of elements into a unique list
321 Author: Christophe Simonis (christophe@tinyerp.com)
323 Examples::
324 >>> flatten(['a'])
325 ['a']
326 >>> flatten('b')
327 ['b']
328 >>> flatten( [] )
329 []
330 >>> flatten( [[], [[]]] )
331 []
332 >>> flatten( [[['a','b'], 'c'], 'd', ['e', [], 'f']] )
333 ['a', 'b', 'c', 'd', 'e', 'f']
334 >>> t = (1,2,(3,), [4, 5, [6, [7], (8, 9), ([10, 11, (12, 13)]), [14, [], (15,)], []]])
335 >>> flatten(t)
336 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
337 """
338 warnings.warn(
339 "deprecated since 18.0",
340 category=DeprecationWarning,
341 stacklevel=2,
342 )
343 r = []
344 for e in list:
345 if isinstance(e, (bytes, str)) or not isinstance(e, collections.abc.Iterable):
346 r.append(e)
347 else:
348 r.extend(flatten(e))
349 return r
352def reverse_enumerate(lst: Sequence[T]) -> Iterator[tuple[int, T]]:
353 """Like enumerate but in the other direction
355 Usage::
357 >>> a = ['a', 'b', 'c']
358 >>> it = reverse_enumerate(a)
359 >>> it.next()
360 (2, 'c')
361 >>> it.next()
362 (1, 'b')
363 >>> it.next()
364 (0, 'a')
365 >>> it.next()
366 Traceback (most recent call last):
367 File "<stdin>", line 1, in <module>
368 StopIteration
369 """
370 return zip(range(len(lst) - 1, -1, -1), reversed(lst))
373def partition(pred: Callable[[T], bool], elems: Iterable[T]) -> tuple[list[T], list[T]]:
374 """ Return a pair equivalent to:
375 ``filter(pred, elems), filter(lambda x: not pred(x), elems)`` """
376 yes: list[T] = []
377 nos: list[T] = []
378 for elem in elems:
379 (yes if pred(elem) else nos).append(elem)
380 return yes, nos
383def topological_sort(elems: Mapping[T, Collection[T]]) -> list[T]:
384 """ Return a list of elements sorted so that their dependencies are listed
385 before them in the result.
387 :param elems: specifies the elements to sort with their dependencies; it is
388 a dictionary like `{element: dependencies}` where `dependencies` is a
389 collection of elements that must appear before `element`. The elements
390 of `dependencies` are not required to appear in `elems`; they will
391 simply not appear in the result.
393 :returns: a list with the keys of `elems` sorted according to their
394 specification.
395 """
396 # the algorithm is inspired by [Tarjan 1976],
397 # http://en.wikipedia.org/wiki/Topological_sorting#Algorithms
398 result = []
399 visited = set()
401 def visit(n):
402 if n not in visited:
403 visited.add(n)
404 if n in elems:
405 # first visit all dependencies of n, then append n to result
406 for it in elems[n]:
407 visit(it)
408 result.append(n)
410 for el in elems:
411 visit(el)
413 return result
416def merge_sequences(*iterables: Iterable[T]) -> list[T]:
417 """ Merge several iterables into a list. The result is the union of the
418 iterables, ordered following the partial order given by the iterables,
419 with a bias towards the end for the last iterable::
421 seq = merge_sequences(['A', 'B', 'C'])
422 assert seq == ['A', 'B', 'C']
424 seq = merge_sequences(
425 ['A', 'B', 'C'],
426 ['Z'], # 'Z' can be anywhere
427 ['Y', 'C'], # 'Y' must precede 'C';
428 ['A', 'X', 'Y'], # 'X' must follow 'A' and precede 'Y'
429 )
430 assert seq == ['A', 'B', 'X', 'Y', 'C', 'Z']
431 """
432 # dict is ordered
433 deps: defaultdict[T, list[T]] = defaultdict(list) # {item: elems_before_item}
434 for iterable in iterables:
435 prev: T | Sentinel = SENTINEL
436 for item in iterable:
437 if prev is SENTINEL:
438 deps[item] # just set the default
439 else:
440 deps[item].append(prev)
441 prev = item
442 return topological_sort(deps)
445def get_iso_codes(lang: str) -> str:
446 if lang.find('_') != -1:
447 lang_items = lang.split('_')
448 if lang_items[0] == lang_items[1].lower():
449 lang = lang_items[0]
450 return lang
453def scan_languages() -> list[tuple[str, str]]:
454 """ Returns all languages supported by OpenERP for translation
456 :returns: a list of (lang_code, lang_name) pairs
457 :rtype: [(str, unicode)]
458 """
459 try:
460 # read (code, name) from languages in base/data/res.lang.csv
461 with file_open('base/data/res.lang.csv') as csvfile:
462 reader = csv.reader(csvfile, delimiter=',', quotechar='"')
463 fields = next(reader)
464 code_index = fields.index("code")
465 name_index = fields.index("name")
466 result = [
467 (row[code_index], row[name_index])
468 for row in reader
469 ]
470 except Exception:
471 _logger.error("Could not read res.lang.csv")
472 result = []
474 return sorted(result or [('en_US', u'English')], key=itemgetter(1))
477def mod10r(number: str) -> str:
478 """
479 Input number : account or invoice number
480 Output return: the same number completed with the recursive mod10
481 key
482 """
483 codec=[0,9,4,6,8,2,7,1,3,5]
484 report = 0
485 result=""
486 for digit in number:
487 result += digit
488 if digit.isdigit():
489 report = codec[ (int(digit) + report) % 10 ]
490 return result + str((10 - report) % 10)
493def str2bool(s: str, default: bool | None = None) -> bool:
494 # allow this (for now?) because it's used for get_param
495 if type(s) is bool: 495 ↛ 496line 495 didn't jump to line 496 because the condition on line 495 was never true
496 return s # type: ignore
498 if not isinstance(s, str): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 warnings.warn(
500 f"Passed a non-str to `str2bool`: {s}",
501 DeprecationWarning,
502 stacklevel=2,
503 )
505 if default is None:
506 raise ValueError('Use 0/1/yes/no/true/false/on/off')
507 return bool(default)
509 s = s.lower()
510 if s in ('y', 'yes', '1', 'true', 't', 'on'):
511 return True
512 if s in ('n', 'no', '0', 'false', 'f', 'off'):
513 return False
514 if default is None: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true
515 raise ValueError('Use 0/1/yes/no/true/false/on/off')
516 return bool(default)
519def human_size(sz: float | str) -> str | typing.Literal[False]:
520 """
521 Return the size in a human readable format
522 """
523 if not sz:
524 return False
525 units = ('bytes', 'Kb', 'Mb', 'Gb', 'Tb')
526 if isinstance(sz, str):
527 sz=len(sz)
528 s, i = float(sz), 0
529 while s >= 1024 and i < len(units)-1:
530 s /= 1024
531 i += 1
532 return "%0.2f %s" % (s, units[i])
535DEFAULT_SERVER_DATE_FORMAT = "%Y-%m-%d"
536DEFAULT_SERVER_TIME_FORMAT = "%H:%M:%S"
537DEFAULT_SERVER_DATETIME_FORMAT = "%s %s" % (
538 DEFAULT_SERVER_DATE_FORMAT,
539 DEFAULT_SERVER_TIME_FORMAT)
541DATE_LENGTH = len(datetime.date.today().strftime(DEFAULT_SERVER_DATE_FORMAT))
543# Python's strftime supports only the format directives
544# that are available on the platform's libc, so in order to
545# be cross-platform we map to the directives required by
546# the C standard (1989 version), always available on platforms
547# with a C standard implementation.
548DATETIME_FORMATS_MAP = {
549 '%C': '', # century
550 '%D': '%m/%d/%Y', # modified %y->%Y
551 '%e': '%d',
552 '%E': '', # special modifier
553 '%F': '%Y-%m-%d',
554 '%g': '%Y', # modified %y->%Y
555 '%G': '%Y',
556 '%h': '%b',
557 '%k': '%H',
558 '%l': '%I',
559 '%n': '\n',
560 '%O': '', # special modifier
561 '%P': '%p',
562 '%R': '%H:%M',
563 '%r': '%I:%M:%S %p',
564 '%s': '', #num of seconds since epoch
565 '%T': '%H:%M:%S',
566 '%t': ' ', # tab
567 '%u': ' %w',
568 '%V': '%W',
569 '%y': '%Y', # Even if %y works, it's ambiguous, so we should use %Y
570 '%+': '%Y-%m-%d %H:%M:%S',
572 # %Z is a special case that causes 2 problems at least:
573 # - the timezone names we use (in res_user.context_tz) come
574 # from pytz, but not all these names are recognized by
575 # strptime(), so we cannot convert in both directions
576 # when such a timezone is selected and %Z is in the format
577 # - %Z is replaced by an empty string in strftime() when
578 # there is not tzinfo in a datetime value (e.g when the user
579 # did not pick a context_tz). The resulting string does not
580 # parse back if the format requires %Z.
581 # As a consequence, we strip it completely from format strings.
582 # The user can always have a look at the context_tz in
583 # preferences to check the timezone.
584 '%z': '',
585 '%Z': '',
586}
588POSIX_TO_LDML = {
589 'a': 'E',
590 'A': 'EEEE',
591 'b': 'MMM',
592 'B': 'MMMM',
593 #'c': '',
594 'd': 'dd',
595 '-d': 'd',
596 'H': 'HH',
597 'I': 'hh',
598 'j': 'DDD',
599 'm': 'MM',
600 '-m': 'M',
601 'M': 'mm',
602 'p': 'a',
603 'S': 'ss',
604 'U': 'w',
605 'w': 'e',
606 'W': 'w',
607 'y': 'yy',
608 'Y': 'yyyy',
609 # see comments above, and babel's format_datetime assumes an UTC timezone
610 # for naive datetime objects
611 #'z': 'Z',
612 #'Z': 'z',
613}
616def posix_to_ldml(fmt: str, locale: babel.Locale) -> str:
617 """ Converts a posix/strftime pattern into an LDML date format pattern.
619 :param fmt: non-extended C89/C90 strftime pattern
620 :param locale: babel locale used for locale-specific conversions (e.g. %x and %X)
621 :return: unicode
622 """
623 buf = []
624 pc = False
625 minus = False
626 quoted = []
628 for c in fmt:
629 # LDML date format patterns uses letters, so letters must be quoted
630 if not pc and c.isalpha(): 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true
631 quoted.append(c if c != "'" else "''")
632 continue
633 if quoted: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true
634 buf.append("'")
635 buf.append(''.join(quoted))
636 buf.append("'")
637 quoted = []
639 if pc:
640 if c == '%': # escaped percent 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true
641 buf.append('%')
642 elif c == 'x': # date format, short seems to match 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true
643 buf.append(locale.date_formats['short'].pattern)
644 elif c == 'X': # time format, seems to include seconds. short does not 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true
645 buf.append(locale.time_formats['medium'].pattern)
646 elif c == '-': 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 minus = True
648 continue
649 else: # look up format char in static mapping
650 if minus: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true
651 c = '-' + c
652 minus = False
653 buf.append(POSIX_TO_LDML[c])
654 pc = False
655 elif c == '%':
656 pc = True
657 else:
658 buf.append(c)
660 # flush anything remaining in quoted buffer
661 if quoted: 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true
662 buf.append("'")
663 buf.append(''.join(quoted))
664 buf.append("'")
666 return ''.join(buf)
669@typing.overload
670def split_every(n: int, iterable: Iterable[T]) -> Iterator[tuple[T, ...]]:
671 ...
674@typing.overload
675def split_every(n: int, iterable: Iterable[T], piece_maker: type[Collection[T]]) -> Iterator[Collection[T]]:
676 ...
679@typing.overload
680def split_every(n: int, iterable: Iterable[T], piece_maker: Callable[[Iterable[T]], P]) -> Iterator[P]:
681 ...
684def split_every(n: int, iterable: Iterable[T], piece_maker=tuple):
685 """Splits an iterable into length-n pieces. The last piece will be shorter
686 if ``n`` does not evenly divide the iterable length.
688 :param int n: maximum size of each generated chunk
689 :param Iterable iterable: iterable to chunk into pieces
690 :param piece_maker: callable taking an iterable and collecting each
691 chunk from its slice, *must consume the entire slice*.
692 """
693 iterator = iter(iterable)
694 piece = piece_maker(islice(iterator, n))
695 while piece:
696 yield piece
697 piece = piece_maker(islice(iterator, n))
700def discardattr(obj: object, key: str) -> None:
701 """ Perform a ``delattr(obj, key)`` but without crashing if ``key`` is not present. """
702 try:
703 delattr(obj, key)
704 except AttributeError:
705 pass
707# ---------------------------------------------
708# String management
709# ---------------------------------------------
712# Inspired by http://stackoverflow.com/questions/517923
713def remove_accents(input_str: str) -> str:
714 """Suboptimal-but-better-than-nothing way to replace accented
715 latin letters by an ASCII equivalent. Will obviously change the
716 meaning of input_str and work only for some cases"""
717 if not input_str: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true
718 return input_str
719 nkfd_form = unicodedata.normalize('NFKD', input_str)
720 return ''.join(c for c in nkfd_form if not unicodedata.combining(c))
723class unquote(str):
724 """A subclass of str that implements repr() without enclosing quotation marks
725 or escaping, keeping the original string untouched. The name come from Lisp's unquote.
726 One of the uses for this is to preserve or insert bare variable names within dicts during eval()
727 of a dict's repr(). Use with care.
729 Some examples (notice that there are never quotes surrounding
730 the ``active_id`` name:
732 >>> unquote('active_id')
733 active_id
734 >>> d = {'test': unquote('active_id')}
735 >>> d
736 {'test': active_id}
737 >>> print d
738 {'test': active_id}
739 """
740 __slots__ = ()
742 def __repr__(self):
743 return self
746class mute_logger(logging.Handler):
747 """Temporary suppress the logging.
749 Can be used as context manager or decorator::
751 @mute_logger('odoo.plic.ploc')
752 def do_stuff():
753 blahblah()
755 with mute_logger('odoo.foo.bar'):
756 do_suff()
757 """
758 def __init__(self, *loggers):
759 super().__init__()
760 self.loggers = loggers
761 self.old_params = {}
763 def __enter__(self):
764 for logger_name in self.loggers:
765 logger = logging.getLogger(logger_name)
766 self.old_params[logger_name] = (logger.handlers, logger.propagate)
767 logger.propagate = False
768 logger.handlers = [self]
770 def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
771 for logger_name in self.loggers:
772 logger = logging.getLogger(logger_name)
773 logger.handlers, logger.propagate = self.old_params[logger_name]
775 def __call__(self, func):
776 @wraps(func)
777 def deco(*args, **kwargs):
778 with self:
779 return func(*args, **kwargs)
780 return deco
782 def emit(self, record):
783 pass
786class lower_logging(logging.Handler):
787 """Temporary lower the max logging level.
788 """
789 def __init__(self, max_level, to_level=None):
790 super().__init__()
791 self.old_handlers = None
792 self.old_propagate = None
793 self.had_error_log = False
794 self.max_level = max_level
795 self.to_level = to_level or max_level
797 def __enter__(self):
798 logger = logging.getLogger()
799 self.old_handlers = logger.handlers[:]
800 self.old_propagate = logger.propagate
801 logger.propagate = False
802 logger.handlers = [self]
803 self.had_error_log = False
804 return self
806 def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
807 logger = logging.getLogger()
808 logger.handlers = self.old_handlers
809 logger.propagate = self.old_propagate
811 def emit(self, record):
812 if record.levelno > self.max_level:
813 record.levelname = f'_{record.levelname}'
814 record.levelno = self.to_level
815 self.had_error_log = True
816 if MungedTracebackLogRecord.__base__ is logging.LogRecord:
817 MungedTracebackLogRecord.__bases__ = (record.__class__,)
818 record.__class__ = MungedTracebackLogRecord
820 if logging.getLogger(record.name).isEnabledFor(record.levelno):
821 for handler in self.old_handlers:
822 if handler.level <= record.levelno:
823 handler.emit(record)
826class MungedTracebackLogRecord(logging.LogRecord):
827 def getMessage(self):
828 return super().getMessage().replace(
829 'Traceback (most recent call last):',
830 '_Traceback_ (most recent call last):',
831 )
834def stripped_sys_argv(*strip_args):
835 """Return sys.argv with some arguments stripped, suitable for reexecution or subprocesses"""
836 strip_args = sorted(set(strip_args) | set(['-s', '--save', '-u', '--update', '-i', '--init', '--i18n-overwrite']))
837 assert all(config.parser.has_option(s) for s in strip_args)
838 takes_value = dict((s, config.parser.get_option(s).takes_value()) for s in strip_args)
840 longs, shorts = list(tuple(y) for _, y in itergroupby(strip_args, lambda x: x.startswith('--')))
841 longs_eq = tuple(l + '=' for l in longs if takes_value[l])
843 args = sys.argv[:]
845 def strip(args, i):
846 return args[i].startswith(shorts) \
847 or args[i].startswith(longs_eq) or (args[i] in longs) \
848 or (i >= 1 and (args[i - 1] in strip_args) and takes_value[args[i - 1]])
850 return [x for i, x in enumerate(args) if not strip(args, i)]
853class ConstantMapping(Mapping[typing.Any, T], typing.Generic[T]):
854 """
855 An immutable mapping returning the provided value for every single key.
857 Useful for default value to methods
858 """
859 __slots__ = ['_value']
861 def __init__(self, val: T):
862 self._value = val
864 def __len__(self):
865 """
866 defaultdict updates its length for each individually requested key, is
867 that really useful?
868 """
869 return 0
871 def __iter__(self):
872 """
873 same as len, defaultdict updates its iterable keyset with each key
874 requested, is there a point for this?
875 """
876 return iter([])
878 def __getitem__(self, item) -> T:
879 return self._value
882def dumpstacks(sig=None, frame=None, thread_idents=None, log_level=logging.INFO):
883 """ Signal handler: dump a stack trace for each existing thread or given
884 thread(s) specified through the ``thread_idents`` sequence.
885 """
886 code = []
888 def extract_stack(stack):
889 for filename, lineno, name, line in traceback.extract_stack(stack):
890 yield 'File: "%s", line %d, in %s' % (filename, lineno, name)
891 if line:
892 yield " %s" % (line.strip(),)
894 # code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
895 # modified for python 2.5 compatibility
896 threads_info = {th.ident: {'repr': repr(th),
897 'uid': getattr(th, 'uid', 'n/a'),
898 'dbname': getattr(th, 'dbname', 'n/a'),
899 'url': getattr(th, 'url', 'n/a'),
900 'query_count': getattr(th, 'query_count', 'n/a'),
901 'query_time': getattr(th, 'query_time', None),
902 'perf_t0': getattr(th, 'perf_t0', None)}
903 for th in threading.enumerate()}
904 for threadId, stack in sys._current_frames().items():
905 if not thread_idents or threadId in thread_idents:
906 thread_info = threads_info.get(threadId, {})
907 query_time = thread_info.get('query_time')
908 perf_t0 = thread_info.get('perf_t0')
909 remaining_time = None
910 if query_time is not None and perf_t0:
911 remaining_time = '%.3f' % (real_time() - perf_t0 - query_time)
912 query_time = '%.3f' % query_time
913 # qc:query_count qt:query_time pt:python_time (aka remaining time)
914 code.append("\n# Thread: %s (db:%s) (uid:%s) (url:%s) (qc:%s qt:%s pt:%s)" %
915 (thread_info.get('repr', threadId),
916 thread_info.get('dbname', 'n/a'),
917 thread_info.get('uid', 'n/a'),
918 thread_info.get('url', 'n/a'),
919 thread_info.get('query_count', 'n/a'),
920 query_time or 'n/a',
921 remaining_time or 'n/a'))
922 for line in extract_stack(stack):
923 code.append(line)
925 import odoo # eventd
926 if odoo.evented:
927 # code from http://stackoverflow.com/questions/12510648/in-gevent-how-can-i-dump-stack-traces-of-all-running-greenlets
928 import gc
929 from greenlet import greenlet
930 for ob in gc.get_objects():
931 if not isinstance(ob, greenlet) or not ob:
932 continue
933 code.append("\n# Greenlet: %r" % (ob,))
934 for line in extract_stack(ob.gr_frame):
935 code.append(line)
937 _logger.log(log_level, "\n".join(code))
940def freehash(arg: typing.Any) -> int:
941 try:
942 return hash(arg)
943 except Exception:
944 if isinstance(arg, Mapping): 944 ↛ 945line 944 didn't jump to line 945 because the condition on line 944 was never true
945 return hash(frozendict(arg))
946 elif isinstance(arg, Iterable): 946 ↛ 949line 946 didn't jump to line 949 because the condition on line 946 was always true
947 return hash(frozenset(freehash(item) for item in arg))
948 else:
949 return id(arg)
952def clean_context(context: dict[str, typing.Any]) -> dict[str, typing.Any]:
953 """ This function take a dictionary and remove each entry with its key
954 starting with ``default_``
955 """
956 return {k: v for k, v in context.items() if not k.startswith('default_')}
959class frozendict(dict[K, T], typing.Generic[K, T]):
960 """ An implementation of an immutable dictionary. """
961 __slots__ = ()
963 def __delitem__(self, key):
964 raise NotImplementedError("'__delitem__' not supported on frozendict")
966 def __setitem__(self, key, val):
967 raise NotImplementedError("'__setitem__' not supported on frozendict")
969 def clear(self):
970 raise NotImplementedError("'clear' not supported on frozendict")
972 def pop(self, key, default=None):
973 raise NotImplementedError("'pop' not supported on frozendict")
975 def popitem(self):
976 raise NotImplementedError("'popitem' not supported on frozendict")
978 def setdefault(self, key, default=None):
979 raise NotImplementedError("'setdefault' not supported on frozendict")
981 def update(self, *args, **kwargs):
982 raise NotImplementedError("'update' not supported on frozendict")
984 def __hash__(self) -> int: # type: ignore
985 return hash(frozenset((key, freehash(val)) for key, val in self.items()))
988class Collector(dict[K, tuple[T, ...]], typing.Generic[K, T]):
989 """ A mapping from keys to tuples. This implements a relation, and can be
990 seen as a space optimization for ``defaultdict(tuple)``.
991 """
992 __slots__ = ()
994 def __getitem__(self, key: K) -> tuple[T, ...]:
995 return self.get(key, ())
997 def __setitem__(self, key: K, val: Iterable[T]):
998 val = tuple(val)
999 if val:
1000 super().__setitem__(key, val)
1001 else:
1002 super().pop(key, None)
1004 def add(self, key: K, val: T):
1005 vals = self[key]
1006 if val not in vals:
1007 self[key] = vals + (val,)
1009 def discard_keys_and_values(self, excludes: Collection[K | T]) -> None:
1010 for key in excludes:
1011 self.pop(key, None) # type: ignore
1012 for key, vals in list(self.items()):
1013 self[key] = tuple(val for val in vals if val not in excludes) # type: ignore
1016class StackMap(MutableMapping[K, T], typing.Generic[K, T]):
1017 """ A stack of mappings behaving as a single mapping, and used to implement
1018 nested scopes. The lookups search the stack from top to bottom, and
1019 returns the first value found. Mutable operations modify the topmost
1020 mapping only.
1021 """
1022 __slots__ = ['_maps']
1024 def __init__(self, m: MutableMapping[K, T] | None = None):
1025 self._maps = [] if m is None else [m]
1027 def __getitem__(self, key: K) -> T:
1028 for mapping in reversed(self._maps):
1029 try:
1030 return mapping[key]
1031 except KeyError:
1032 pass
1033 raise KeyError(key)
1035 def __setitem__(self, key: K, val: T):
1036 self._maps[-1][key] = val
1038 def __delitem__(self, key: K):
1039 del self._maps[-1][key]
1041 def __iter__(self) -> Iterator[K]:
1042 return iter({key for mapping in self._maps for key in mapping})
1044 def __len__(self) -> int:
1045 return sum(1 for key in self)
1047 def __str__(self) -> str:
1048 return f"<StackMap {self._maps}>"
1050 def pushmap(self, m: MutableMapping[K, T] | None = None):
1051 self._maps.append({} if m is None else m)
1053 def popmap(self) -> MutableMapping[K, T]:
1054 return self._maps.pop()
1057class OrderedSet(MutableSet[T], typing.Generic[T]):
1058 """ A set collection that remembers the elements first insertion order. """
1059 __slots__ = ['_map']
1061 def __init__(self, elems: Iterable[T] = ()):
1062 self._map: dict[T, None] = dict.fromkeys(elems)
1064 def __contains__(self, elem):
1065 return elem in self._map
1067 def __iter__(self):
1068 return iter(self._map)
1070 def __len__(self):
1071 return len(self._map)
1073 def add(self, elem):
1074 self._map[elem] = None
1076 def discard(self, elem):
1077 self._map.pop(elem, None)
1079 def update(self, elems):
1080 self._map.update(zip(elems, itertools.repeat(None)))
1082 def difference_update(self, elems):
1083 for elem in elems:
1084 self.discard(elem)
1086 def __repr__(self):
1087 return f'{type(self).__name__}({list(self)!r})'
1089 def intersection(self, *others):
1090 return reduce(OrderedSet.__and__, others, self)
1093class LastOrderedSet(OrderedSet[T], typing.Generic[T]):
1094 """ A set collection that remembers the elements last insertion order. """
1095 def add(self, elem):
1096 self.discard(elem)
1097 super().add(elem)
1100class Callbacks:
1101 """ A simple queue of callback functions. Upon run, every function is
1102 called (in addition order), and the queue is emptied.
1104 ::
1106 callbacks = Callbacks()
1108 # add foo
1109 def foo():
1110 print("foo")
1112 callbacks.add(foo)
1114 # add bar
1115 callbacks.add
1116 def bar():
1117 print("bar")
1119 # add foo again
1120 callbacks.add(foo)
1122 # call foo(), bar(), foo(), then clear the callback queue
1123 callbacks.run()
1125 The queue also provides a ``data`` dictionary, that may be freely used to
1126 store anything, but is mostly aimed at aggregating data for callbacks. The
1127 dictionary is automatically cleared by ``run()`` once all callback functions
1128 have been called.
1130 ::
1132 # register foo to process aggregated data
1133 @callbacks.add
1134 def foo():
1135 print(sum(callbacks.data['foo']))
1137 callbacks.data.setdefault('foo', []).append(1)
1138 ...
1139 callbacks.data.setdefault('foo', []).append(2)
1140 ...
1141 callbacks.data.setdefault('foo', []).append(3)
1143 # call foo(), which prints 6
1144 callbacks.run()
1146 Given the global nature of ``data``, the keys should identify in a unique
1147 way the data being stored. It is recommended to use strings with a
1148 structure like ``"{module}.{feature}"``.
1149 """
1150 __slots__ = ['_funcs', 'data']
1152 def __init__(self):
1153 self._funcs: collections.deque[Callable] = collections.deque()
1154 self.data = {}
1156 def add(self, func: Callable) -> None:
1157 """ Add the given function. """
1158 self._funcs.append(func)
1160 def run(self) -> None:
1161 """ Call all the functions (in addition order), then clear associated data.
1162 """
1163 while self._funcs:
1164 func = self._funcs.popleft()
1165 func()
1166 self.clear()
1168 def clear(self) -> None:
1169 """ Remove all callbacks and data from self. """
1170 self._funcs.clear()
1171 self.data.clear()
1173 def __len__(self) -> int:
1174 return len(self._funcs)
1177class ReversedIterable(Reversible[T], typing.Generic[T]):
1178 """ An iterable implementing the reversal of another iterable. """
1179 __slots__ = ['iterable']
1181 def __init__(self, iterable: Reversible[T]):
1182 self.iterable = iterable
1184 def __iter__(self):
1185 return reversed(self.iterable)
1187 def __reversed__(self):
1188 return iter(self.iterable)
1191def groupby(iterable: Iterable[T], key: Callable[[T], K] = lambda arg: arg) -> Iterable[tuple[K, list[T]]]:
1192 """ Return a collection of pairs ``(key, elements)`` from ``iterable``. The
1193 ``key`` is a function computing a key value for each element. This
1194 function is similar to ``itertools.groupby``, but aggregates all
1195 elements under the same key, not only consecutive elements.
1196 """
1197 groups = defaultdict(list)
1198 for elem in iterable:
1199 groups[key(elem)].append(elem)
1200 return groups.items()
1203def unique(it: Iterable[T]) -> Iterator[T]:
1204 """ "Uniquifier" for the provided iterable: will output each element of
1205 the iterable once.
1207 The iterable's elements must be hashahble.
1209 :param Iterable it:
1210 :rtype: Iterator
1211 """
1212 seen = set()
1213 for e in it:
1214 if e not in seen:
1215 seen.add(e)
1216 yield e
1219def submap(mapping: Mapping[K, T], keys: Iterable[K]) -> Mapping[K, T]:
1220 """
1221 Get a filtered copy of the mapping where only some keys are present.
1223 :param Mapping mapping: the original dict-like structure to filter
1224 :param Iterable keys: the list of keys to keep
1225 :return dict: a filtered dict copy of the original mapping
1226 """
1227 keys = frozenset(keys)
1228 return {key: mapping[key] for key in mapping if key in keys}
1231class Reverse(object):
1232 """ Wraps a value and reverses its ordering, useful in key functions when
1233 mixing ascending and descending sort on non-numeric data as the
1234 ``reverse`` parameter can not do piecemeal reordering.
1235 """
1236 __slots__ = ['val']
1238 def __init__(self, val):
1239 self.val = val
1241 def __eq__(self, other): return self.val == other.val 1241 ↛ exitline 1241 didn't return from function '__eq__' because the return on line 1241 wasn't executed
1242 def __ne__(self, other): return self.val != other.val 1242 ↛ exitline 1242 didn't return from function '__ne__' because the return on line 1242 wasn't executed
1244 def __ge__(self, other): return self.val <= other.val 1244 ↛ exitline 1244 didn't return from function '__ge__' because the return on line 1244 wasn't executed
1245 def __gt__(self, other): return self.val < other.val 1245 ↛ exitline 1245 didn't return from function '__gt__' because the return on line 1245 wasn't executed
1246 def __le__(self, other): return self.val >= other.val 1246 ↛ exitline 1246 didn't return from function '__le__' because the return on line 1246 wasn't executed
1247 def __lt__(self, other): return self.val > other.val 1247 ↛ exitline 1247 didn't return from function '__lt__' because the return on line 1247 wasn't executed
1249class replace_exceptions(ContextDecorator):
1250 """
1251 Hide some exceptions behind another error. Can be used as a function
1252 decorator or as a context manager.
1254 .. code-block:
1256 @route('/super/secret/route', auth='public')
1257 @replace_exceptions(AccessError, by=NotFound())
1258 def super_secret_route(self):
1259 if not request.session.uid:
1260 raise AccessError("Route hidden to non logged-in users")
1261 ...
1263 def some_util():
1264 ...
1265 with replace_exceptions(ValueError, by=UserError("Invalid argument")):
1266 ...
1267 ...
1269 :param exceptions: the exception classes to catch and replace.
1270 :param by: the exception to raise instead.
1271 """
1272 def __init__(self, *exceptions, by):
1273 if not exceptions: 1273 ↛ 1274line 1273 didn't jump to line 1274 because the condition on line 1273 was never true
1274 raise ValueError("Missing exceptions")
1276 wrong_exc = next((exc for exc in exceptions if not issubclass(exc, Exception)), None)
1277 if wrong_exc: 1277 ↛ 1278line 1277 didn't jump to line 1278 because the condition on line 1277 was never true
1278 raise TypeError(f"{wrong_exc} is not an exception class.")
1280 self.exceptions = exceptions
1281 self.by = by
1283 def __enter__(self):
1284 return self
1286 def __exit__(self, exc_type, exc_value, traceback):
1287 if exc_type is not None and issubclass(exc_type, self.exceptions): 1287 ↛ 1288line 1287 didn't jump to line 1288 because the condition on line 1287 was never true
1288 if isinstance(self.by, type) and exc_value.args:
1289 # copy the message
1290 raise self.by(exc_value.args[0]) from exc_value
1291 else:
1292 raise self.by from exc_value
1295html_escape = markupsafe.escape
1298def get_lang(env: Environment, lang_code: str | None = None) -> LangData:
1299 """
1300 Retrieve the first lang object installed, by checking the parameter lang_code,
1301 the context and then the company. If no lang is installed from those variables,
1302 fallback on english or on the first lang installed in the system.
1304 :param env:
1305 :param str lang_code: the locale (i.e. en_US)
1306 :return LangData: the first lang found that is installed on the system.
1307 """
1308 langs = [code for code, _ in env['res.lang'].get_installed()]
1309 lang = 'en_US' if 'en_US' in langs else langs[0]
1310 if lang_code and lang_code in langs:
1311 lang = lang_code
1312 elif (context_lang := env.context.get('lang')) in langs:
1313 lang = context_lang
1314 elif (company_lang := env.user.with_context(lang='en_US').company_id.partner_id.lang) in langs: 1314 ↛ 1316line 1314 didn't jump to line 1316 because the condition on line 1314 was always true
1315 lang = company_lang
1316 return env['res.lang']._get_data(code=lang)
1319@lru_cache
1320def babel_locale_parse(lang_code: str | None) -> babel.Locale:
1321 if lang_code: 1321 ↛ 1326line 1321 didn't jump to line 1326 because the condition on line 1321 was always true
1322 try:
1323 return babel.Locale.parse(lang_code)
1324 except Exception: # noqa: BLE001
1325 pass
1326 try:
1327 return babel.Locale.default()
1328 except Exception: # noqa: BLE001
1329 return babel.Locale.parse("en_US")
1332def formatLang(
1333 env: Environment,
1334 value: float | typing.Literal[''],
1335 digits: int = 2,
1336 grouping: bool = True,
1337 dp: str | None = None,
1338 currency_obj: typing.Any | None = None,
1339 rounding_method: typing.Literal['HALF-UP', 'HALF-DOWN', 'HALF-EVEN', "UP", "DOWN"] = 'HALF-EVEN',
1340 rounding_unit: typing.Literal['decimals', 'units', 'thousands', 'lakhs', 'millions'] = 'decimals',
1341) -> str:
1342 """
1343 This function will format a number `value` to the appropriate format of the language used.
1345 :param env: The environment.
1346 :param value: The value to be formatted.
1347 :param digits: The number of decimals digits.
1348 :param grouping: Usage of language grouping or not.
1349 :param dp: Name of the decimals precision to be used. This will override ``digits``
1350 and ``currency_obj`` precision.
1351 :param currency_obj: Currency to be used. This will override ``digits`` precision.
1352 :param rounding_method: The rounding method to be used:
1353 **'HALF-UP'** will round to the closest number with ties going away from zero,
1354 **'HALF-DOWN'** will round to the closest number with ties going towards zero,
1355 **'HALF_EVEN'** will round to the closest number with ties going to the closest
1356 even number,
1357 **'UP'** will always round away from 0,
1358 **'DOWN'** will always round towards 0.
1359 :param rounding_unit: The rounding unit to be used:
1360 **decimals** will round to decimals with ``digits`` or ``dp`` precision,
1361 **units** will round to units without any decimals,
1362 **thousands** will round to thousands without any decimals,
1363 **lakhs** will round to lakhs without any decimals,
1364 **millions** will round to millions without any decimals.
1366 :returns: The value formatted.
1367 """
1368 # We don't want to return 0
1369 if value == '': 1369 ↛ 1370line 1369 didn't jump to line 1370 because the condition on line 1369 was never true
1370 return ''
1372 if rounding_unit == 'decimals': 1372 ↛ 1378line 1372 didn't jump to line 1378 because the condition on line 1372 was always true
1373 if dp: 1373 ↛ 1374line 1373 didn't jump to line 1374 because the condition on line 1373 was never true
1374 digits = env['decimal.precision'].precision_get(dp)
1375 elif currency_obj: 1375 ↛ 1376line 1375 didn't jump to line 1376 because the condition on line 1375 was never true
1376 digits = currency_obj.decimal_places
1377 else:
1378 digits = 0
1380 rounding_unit_mapping = {
1381 'decimals': 1,
1382 'thousands': 10**3,
1383 'lakhs': 10**5,
1384 'millions': 10**6,
1385 'units': 1,
1386 }
1388 value /= rounding_unit_mapping[rounding_unit]
1390 rounded_value = float_round(value, precision_digits=digits, rounding_method=rounding_method)
1391 lang = env['res.lang'].browse(get_lang(env).id)
1392 formatted_value = lang.format(f'%.{digits}f', rounded_value, grouping=grouping)
1394 if currency_obj and currency_obj.symbol: 1394 ↛ 1395line 1394 didn't jump to line 1395 because the condition on line 1394 was never true
1395 arguments = (formatted_value, NON_BREAKING_SPACE, currency_obj.symbol)
1397 return '%s%s%s' % (arguments if currency_obj.position == 'after' else arguments[::-1])
1399 return formatted_value
1402def format_date(
1403 env: Environment,
1404 value: datetime.datetime | datetime.date | str,
1405 lang_code: str | None = None,
1406 date_format: str | typing.Literal[False] = False,
1407) -> str:
1408 """
1409 Formats the date in a given format.
1411 :param env: an environment.
1412 :param date, datetime or string value: the date to format.
1413 :param string lang_code: the lang code, if not specified it is extracted from the
1414 environment context.
1415 :param string date_format: the format or the date (LDML format), if not specified the
1416 default format of the lang.
1417 :return: date formatted in the specified format.
1418 :rtype: string
1419 """
1420 if not value:
1421 return ''
1422 from odoo.fields import Datetime # noqa: PLC0415
1423 if isinstance(value, str): 1423 ↛ 1424line 1423 didn't jump to line 1424 because the condition on line 1423 was never true
1424 if len(value) < DATE_LENGTH:
1425 return ''
1426 if len(value) > DATE_LENGTH:
1427 # a datetime, convert to correct timezone
1428 value = Datetime.from_string(value)
1429 value = Datetime.context_timestamp(env['res.lang'], value)
1430 else:
1431 value = Datetime.from_string(value)
1432 elif isinstance(value, datetime.datetime) and not value.tzinfo:
1433 # a datetime, convert to correct timezone
1434 value = Datetime.context_timestamp(env['res.lang'], value)
1436 lang = get_lang(env, lang_code)
1437 locale = babel_locale_parse(lang.code)
1438 if not date_format:
1439 date_format = posix_to_ldml(lang.date_format, locale=locale)
1441 assert isinstance(value, datetime.date) # datetime is a subclass of date
1442 return babel.dates.format_date(value, format=date_format, locale=locale)
1445def parse_date(env: Environment, value: str, lang_code: str | None = None) -> datetime.date | str:
1446 """
1447 Parse the date from a given format. If it is not a valid format for the
1448 localization, return the original string.
1450 :param env: an environment.
1451 :param string value: the date to parse.
1452 :param string lang_code: the lang code, if not specified it is extracted from the
1453 environment context.
1454 :return: date object from the localized string
1455 :rtype: datetime.date
1456 """
1457 lang = get_lang(env, lang_code)
1458 locale = babel_locale_parse(lang.code)
1459 try:
1460 return babel.dates.parse_date(value, locale=locale)
1461 except:
1462 return value
1465def format_datetime(
1466 env: Environment,
1467 value: datetime.datetime | str,
1468 tz: str | typing.Literal[False] = False,
1469 dt_format: str = 'medium',
1470 lang_code: str | None = None,
1471) -> str:
1472 """ Formats the datetime in a given format.
1474 :param env:
1475 :param str|datetime value: naive datetime to format either in string or in datetime
1476 :param str tz: name of the timezone in which the given datetime should be localized
1477 :param str dt_format: one of “full”, “long”, “medium”, or “short”, or a custom date/time pattern compatible with `babel` lib
1478 :param str lang_code: ISO code of the language to use to render the given datetime
1479 :rtype: str
1480 """
1481 if not value: 1481 ↛ 1482line 1481 didn't jump to line 1482 because the condition on line 1481 was never true
1482 return ''
1483 if isinstance(value, str): 1483 ↛ 1484line 1483 didn't jump to line 1484 because the condition on line 1483 was never true
1484 from odoo.fields import Datetime # noqa: PLC0415
1485 timestamp = Datetime.from_string(value)
1486 else:
1487 timestamp = value
1489 tz_name = tz or env.user.tz or 'UTC'
1490 utc_datetime = pytz.utc.localize(timestamp, is_dst=False)
1491 try:
1492 context_tz = pytz.timezone(tz_name)
1493 localized_datetime = utc_datetime.astimezone(context_tz)
1494 except Exception:
1495 localized_datetime = utc_datetime
1497 lang = get_lang(env, lang_code)
1499 locale = babel_locale_parse(lang.code or lang_code) # lang can be inactive, so `lang`is empty
1500 if not dt_format or dt_format == 'medium':
1501 date_format = posix_to_ldml(lang.date_format, locale=locale)
1502 time_format = posix_to_ldml(lang.time_format, locale=locale)
1503 dt_format = '%s %s' % (date_format, time_format)
1505 # Babel allows to format datetime in a specific language without change locale
1506 # So month 1 = January in English, and janvier in French
1507 # Be aware that the default value for format is 'medium', instead of 'short'
1508 # medium: Jan 5, 2016, 10:20:31 PM | 5 janv. 2016 22:20:31
1509 # short: 1/5/16, 10:20 PM | 5/01/16 22:20
1510 # Formatting available here : http://babel.pocoo.org/en/latest/dates.html#date-fields
1511 return babel.dates.format_datetime(localized_datetime, dt_format, locale=locale)
1514def format_time(
1515 env: Environment,
1516 value: datetime.time | datetime.datetime | str,
1517 tz: str | typing.Literal[False] = False,
1518 time_format: str = 'medium',
1519 lang_code: str | None = None,
1520) -> str:
1521 """ Format the given time (hour, minute and second) with the current user preference (language, format, ...)
1523 :param env:
1524 :param value: the time to format
1525 :type value: `datetime.time` instance. Could be timezoned to display tzinfo according to format (e.i.: 'full' format)
1526 :param tz: name of the timezone in which the given datetime should be localized
1527 :param time_format: one of “full”, “long”, “medium”, or “short”, or a custom time pattern
1528 :param lang_code: ISO
1530 :rtype str
1531 """
1532 if not value: 1532 ↛ 1533line 1532 didn't jump to line 1533 because the condition on line 1532 was never true
1533 return ''
1535 if isinstance(value, datetime.time): 1535 ↛ 1536line 1535 didn't jump to line 1536 because the condition on line 1535 was never true
1536 localized_time = value
1537 else:
1538 if isinstance(value, str): 1538 ↛ 1539line 1538 didn't jump to line 1539 because the condition on line 1538 was never true
1539 from odoo.fields import Datetime # noqa: PLC0415
1540 value = Datetime.from_string(value)
1541 assert isinstance(value, datetime.datetime)
1542 tz_name = tz or env.user.tz or 'UTC'
1543 utc_datetime = pytz.utc.localize(value, is_dst=False)
1544 try:
1545 context_tz = pytz.timezone(tz_name)
1546 localized_time = utc_datetime.astimezone(context_tz).timetz()
1547 except Exception:
1548 localized_time = utc_datetime.timetz()
1550 lang = get_lang(env, lang_code)
1551 locale = babel_locale_parse(lang.code)
1552 if not time_format or time_format == 'medium': 1552 ↛ 1553line 1552 didn't jump to line 1553 because the condition on line 1552 was never true
1553 time_format = posix_to_ldml(lang.time_format, locale=locale)
1555 return babel.dates.format_time(localized_time, format=time_format, locale=locale)
1558def _format_time_ago(
1559 env: Environment,
1560 time_delta: datetime.timedelta,
1561 lang_code: str | None = None,
1562 add_direction: bool = True,
1563) -> str:
1564 if not lang_code:
1565 langs: list[str] = [code for code, _ in env['res.lang'].get_installed()]
1566 if (ctx_lang := env.context.get('lang')) in langs:
1567 lang_code = ctx_lang
1568 else:
1569 lang_code = env.user.company_id.partner_id.lang or langs[0]
1570 assert isinstance(lang_code, str)
1571 locale = babel_locale_parse(lang_code)
1572 return babel.dates.format_timedelta(-time_delta, add_direction=add_direction, locale=locale)
1575def format_decimalized_number(number: float, decimal: int = 1) -> str:
1576 """Format a number to display to nearest metrics unit next to it.
1578 Do not display digits if all visible digits are null.
1579 Do not display units higher then "Tera" because most people don't know what
1580 a "Yotta" is.
1582 ::
1584 >>> format_decimalized_number(123_456.789)
1585 123.5k
1586 >>> format_decimalized_number(123_000.789)
1587 123k
1588 >>> format_decimalized_number(-123_456.789)
1589 -123.5k
1590 >>> format_decimalized_number(0.789)
1591 0.8
1592 """
1593 for unit in ['', 'k', 'M', 'G']:
1594 if abs(number) < 1000.0:
1595 return "%g%s" % (round(number, decimal), unit)
1596 number /= 1000.0
1597 return "%g%s" % (round(number, decimal), 'T')
1600def format_decimalized_amount(amount: float, currency=None) -> str:
1601 """Format an amount to display the currency and also display the metric unit
1602 of the amount.
1604 ::
1606 >>> format_decimalized_amount(123_456.789, env.ref("base.USD"))
1607 $123.5k
1608 """
1609 formated_amount = format_decimalized_number(amount)
1611 if not currency:
1612 return formated_amount
1614 if currency.position == 'before':
1615 return "%s%s" % (currency.symbol or '', formated_amount)
1617 return "%s %s" % (formated_amount, currency.symbol or '')
1620def format_amount(env: Environment, amount: float, currency, lang_code: str | None = None, trailing_zeroes: bool = True) -> str:
1621 fmt = "%.{0}f".format(currency.decimal_places)
1622 lang = env['res.lang'].browse(get_lang(env, lang_code).id)
1624 formatted_amount = lang.format(fmt, currency.round(amount), grouping=True)\
1625 .replace(r' ', u'\N{NO-BREAK SPACE}').replace(r'-', u'-\N{ZERO WIDTH NO-BREAK SPACE}')
1627 if not trailing_zeroes:
1628 formatted_amount = re.sub(fr'{re.escape(lang.decimal_point)}?0+$', '', formatted_amount)
1630 pre = post = u''
1631 if currency.position == 'before': 1631 ↛ 1634line 1631 didn't jump to line 1634 because the condition on line 1631 was always true
1632 pre = u'{symbol}\N{NO-BREAK SPACE}'.format(symbol=currency.symbol or '')
1633 else:
1634 post = u'\N{NO-BREAK SPACE}{symbol}'.format(symbol=currency.symbol or '')
1636 return u'{pre}{0}{post}'.format(formatted_amount, pre=pre, post=post)
1639def format_duration(value: float) -> str:
1640 """ Format a float: used to display integral or fractional values as
1641 human-readable time spans (e.g. 1.5 as "01:30").
1642 """
1643 hours, minutes = divmod(abs(value) * 60, 60)
1644 minutes = round(minutes)
1645 if minutes == 60: 1645 ↛ 1646line 1645 didn't jump to line 1646 because the condition on line 1645 was never true
1646 minutes = 0
1647 hours += 1
1648 if value < 0: 1648 ↛ 1649line 1648 didn't jump to line 1649 because the condition on line 1648 was never true
1649 return '-%02d:%02d' % (hours, minutes)
1650 return '%02d:%02d' % (hours, minutes)
1653consteq = hmac_lib.compare_digest
1656class ReadonlyDict(Mapping[K, T], typing.Generic[K, T]):
1657 """Helper for an unmodifiable dictionary, not even updatable using `dict.update`.
1659 This is similar to a `frozendict`, with one drawback and one advantage:
1661 - `dict.update` works for a `frozendict` but not for a `ReadonlyDict`.
1662 - `json.dumps` works for a `frozendict` by default but not for a `ReadonlyDict`.
1664 This comes from the fact `frozendict` inherits from `dict`
1665 while `ReadonlyDict` inherits from `collections.abc.Mapping`.
1667 So, depending on your needs,
1668 whether you absolutely must prevent the dictionary from being updated (e.g., for security reasons)
1669 or you require it to be supported by `json.dumps`, you can choose either option.
1671 E.g.
1672 data = ReadonlyDict({'foo': 'bar'})
1673 data['baz'] = 'xyz' # raises exception
1674 data.update({'baz', 'xyz'}) # raises exception
1675 dict.update(data, {'baz': 'xyz'}) # raises exception
1676 """
1677 __slots__ = ('_data__',)
1679 def __init__(self, data):
1680 self._data__ = dict(data)
1682 def __contains__(self, key: K):
1683 return key in self._data__
1685 def __getitem__(self, key: K) -> T:
1686 return self._data__[key]
1688 def __len__(self):
1689 return len(self._data__)
1691 def __iter__(self):
1692 return iter(self._data__)
1695class DotDict(dict):
1696 """Helper for dot.notation access to dictionary attributes
1698 E.g.
1699 foo = DotDict({'bar': False})
1700 return foo.bar
1701 """
1702 def __getattr__(self, attrib):
1703 val = self.get(attrib)
1704 return DotDict(val) if isinstance(val, dict) else val
1707def get_diff(data_from, data_to, custom_style=False, dark_color_scheme=False):
1708 """
1709 Return, in an HTML table, the diff between two texts.
1711 :param tuple data_from: tuple(text, name), name will be used as table header
1712 :param tuple data_to: tuple(text, name), name will be used as table header
1713 :param tuple custom_style: string, style css including <style> tag.
1714 :param bool dark_color_scheme: true if dark color scheme is used
1715 :return: a string containing the diff in an HTML table format.
1716 """
1717 def handle_style(html_diff, custom_style, dark_color_scheme):
1718 """ The HtmlDiff lib will add some useful classes on the DOM to
1719 identify elements. Simply append to those classes some BS4 ones.
1720 For the table to fit the modal width, some custom style is needed.
1721 """
1722 to_append = {
1723 'diff_header': 'bg-600 text-light text-center align-top px-2',
1724 'diff_next': 'd-none',
1725 }
1726 for old, new in to_append.items():
1727 html_diff = html_diff.replace(old, "%s %s" % (old, new))
1728 html_diff = html_diff.replace('nowrap', '')
1729 colors = ('#7f2d2f', '#406a2d', '#51232f', '#3f483b') if dark_color_scheme else (
1730 '#ffc1c0', '#abf2bc', '#ffebe9', '#e6ffec')
1731 html_diff += custom_style or '''
1732 <style>
1733 .modal-dialog.modal-lg:has(table.diff) {
1734 max-width: 1600px;
1735 padding-left: 1.75rem;
1736 padding-right: 1.75rem;
1737 }
1738 table.diff { width: 100%%; }
1739 table.diff th.diff_header { width: 50%%; }
1740 table.diff td.diff_header { white-space: nowrap; }
1741 table.diff td.diff_header + td { width: 50%%; }
1742 table.diff td { word-break: break-all; vertical-align: top; }
1743 table.diff .diff_chg, table.diff .diff_sub, table.diff .diff_add {
1744 display: inline-block;
1745 color: inherit;
1746 }
1747 table.diff .diff_sub, table.diff td:nth-child(3) > .diff_chg { background-color: %s }
1748 table.diff .diff_add, table.diff td:nth-child(6) > .diff_chg { background-color: %s }
1749 table.diff td:nth-child(3):has(>.diff_chg, .diff_sub) { background-color: %s }
1750 table.diff td:nth-child(6):has(>.diff_chg, .diff_add) { background-color: %s }
1751 </style>
1752 ''' % colors
1753 return html_diff
1755 diff = HtmlDiff(tabsize=2).make_table(
1756 data_from[0].splitlines(),
1757 data_to[0].splitlines(),
1758 data_from[1],
1759 data_to[1],
1760 context=True, # Show only diff lines, not all the code
1761 numlines=3,
1762 )
1763 return handle_style(diff, custom_style, dark_color_scheme)
1766def hmac(env, scope, message, hash_function=hashlib.sha256):
1767 """Compute HMAC with `database.secret` config parameter as key.
1769 :param env: sudo environment to use for retrieving config parameter
1770 :param message: message to authenticate
1771 :param scope: scope of the authentication, to have different signature for the same
1772 message in different usage
1773 :param hash_function: hash function to use for HMAC (default: SHA-256)
1774 """
1775 if not scope: 1775 ↛ 1776line 1775 didn't jump to line 1776 because the condition on line 1775 was never true
1776 raise ValueError('Non-empty scope required')
1778 secret = env['ir.config_parameter'].get_param('database.secret')
1779 message = repr((scope, message))
1780 return hmac_lib.new(
1781 secret.encode(),
1782 message.encode(),
1783 hash_function,
1784 ).hexdigest()
1787def hash_sign(env, scope, message_values, expiration=None, expiration_hours=None):
1788 """ Generate an urlsafe payload signed with the HMAC signature for an iterable set of data.
1789 This feature is very similar to JWT, but in a more generic implementation that is inline with out previous hmac implementation.
1791 :param env: sudo environment to use for retrieving config parameter
1792 :param scope: scope of the authentication, to have different signature for the same
1793 message in different usage
1794 :param message_values: values to be encoded inside the payload
1795 :param expiration: optional, a datetime or timedelta
1796 :param expiration_hours: optional, a int representing a number of hours before expiration. Cannot be set at the same time as expiration
1797 :return: the payload that can be used as a token
1798 """
1799 assert not (expiration and expiration_hours)
1800 assert message_values is not None
1802 if expiration_hours: 1802 ↛ 1805line 1802 didn't jump to line 1805 because the condition on line 1802 was always true
1803 expiration = datetime.datetime.now() + datetime.timedelta(hours=expiration_hours)
1804 else:
1805 if isinstance(expiration, datetime.timedelta):
1806 expiration = datetime.datetime.now() + expiration
1807 expiration_timestamp = 0 if not expiration else int(expiration.timestamp())
1808 message_strings = json.dumps(message_values)
1809 hash_value = hmac(env, scope, f'1:{message_strings}:{expiration_timestamp}', hash_function=hashlib.sha256)
1810 token = b"\x01" + expiration_timestamp.to_bytes(8, 'little') + bytes.fromhex(hash_value) + message_strings.encode()
1811 return base64.urlsafe_b64encode(token).decode().rstrip('=')
1814def verify_hash_signed(env, scope, payload):
1815 """ Verify and extract data from a given urlsafe payload generated with hash_sign()
1817 :param env: sudo environment to use for retrieving config parameter
1818 :param scope: scope of the authentication, to have different signature for the same
1819 message in different usage
1820 :param payload: the token to verify
1821 :return: The payload_values if the check was successful, None otherwise.
1822 """
1824 token = base64.urlsafe_b64decode(payload.encode()+b'===')
1825 version = token[:1]
1826 if version != b'\x01':
1827 raise ValueError('Unknown token version')
1829 expiration_value, hash_value, message = token[1:9], token[9:41].hex(), token[41:].decode()
1830 expiration_value = int.from_bytes(expiration_value, byteorder='little')
1831 hash_value_expected = hmac(env, scope, f'1:{message}:{expiration_value}', hash_function=hashlib.sha256)
1833 if consteq(hash_value, hash_value_expected) and (expiration_value == 0 or datetime.datetime.now().timestamp() < expiration_value):
1834 message_values = json.loads(message)
1835 return message_values
1836 return None
1839def limited_field_access_token(record, field_name, timestamp=None, *, scope):
1840 """Generate a token granting access to the given record and field_name in
1841 the given scope.
1843 The validitiy of the token is determined by the timestamp parameter.
1844 When it is not specified, a timestamp is automatically generated with a
1845 validity of at least 14 days. For a given record and field_name, the
1846 generated timestamp is deterministic within a 14-day period (even across
1847 different days/months/years) to allow browser caching, and expires after
1848 maximum 42 days to prevent infinite access. Different record/field
1849 combinations expire at different times to prevent thundering herd problems.
1851 :param record: the record to generate the token for
1852 :type record: class:`odoo.models.Model`
1853 :param field_name: the field name of record to generate the token for
1854 :type field_name: str
1855 :param scope: scope of the authentication, to have different signature for the same
1856 record/field in different usage
1857 :type scope: str
1858 :param timestamp: expiration timestamp of the token, or None to generate one
1859 :type timestamp: int, optional
1860 :return: the token, which includes the timestamp in hex format
1861 :rtype: string
1862 """
1863 record.ensure_one()
1864 if not timestamp: 1864 ↛ 1871line 1864 didn't jump to line 1871 because the condition on line 1864 was always true
1865 unique_str = repr((record._name, record.id, field_name))
1866 two_weeks = 1209600 # 2 * 7 * 24 * 60 * 60
1867 start_of_period = int(time.time()) // two_weeks * two_weeks
1868 adler32_max = 4294967295
1869 jitter = two_weeks * zlib.adler32(unique_str.encode()) // adler32_max
1870 timestamp = hex(start_of_period + 2 * two_weeks + jitter)
1871 token = hmac(record.env(su=True), scope, (record._name, record.id, field_name, timestamp))
1872 return f"{token}o{timestamp}"
1875def verify_limited_field_access_token(record, field_name, access_token, *, scope):
1876 """Verify the given access_token grants access to field_name of record.
1877 In particular, the token must have the right format, must be valid for the
1878 given record, and must not have expired.
1880 :param record: the record to verify the token for
1881 :type record: class:`odoo.models.Model`
1882 :param field_name: the field name of record to verify the token for
1883 :type field_name: str
1884 :param access_token: the access token to verify
1885 :type access_token: str
1886 :param scope: scope of the authentication, to have different signature for the same
1887 record/field in different usage
1888 :return: whether the token is valid for the record/field_name combination at
1889 the current date and time
1890 :rtype: bool
1891 """
1892 *_, timestamp = access_token.rsplit("o", 1)
1893 return consteq(
1894 access_token, limited_field_access_token(record, field_name, timestamp, scope=scope)
1895 ) and datetime.datetime.now() < datetime.datetime.fromtimestamp(int(timestamp, 16))
1898ADDRESS_REGEX = re.compile(r'^(.*?)(\s[0-9][0-9\S]*)?(?: - (.+))?$', flags=re.DOTALL)
1899def street_split(street):
1900 match = ADDRESS_REGEX.match(street or '')
1901 results = match.groups('') if match else ('', '', '')
1902 return {
1903 'street_name': results[0].strip(),
1904 'street_number': results[1].strip(),
1905 'street_number2': results[2],
1906 }
1909def is_list_of(values, type_: type) -> bool:
1910 """Return True if the given values is a list / tuple of the given type.
1912 :param values: The values to check
1913 :param type_: The type of the elements in the list / tuple
1914 """
1915 return isinstance(values, (list, tuple)) and all(isinstance(item, type_) for item in values)
1918def has_list_types(values, types: tuple[type, ...]) -> bool:
1919 """Return True if the given values have the same types as
1920 the one given in argument, in the same order.
1922 :param values: The values to check
1923 :param types: The types of the elements in the list / tuple
1924 """
1925 return (
1926 isinstance(values, (list, tuple)) and len(values) == len(types)
1927 and all(itertools.starmap(isinstance, zip(values, types)))
1928 )
1931def get_flag(country_code: str) -> str:
1932 """Get the emoji representing the flag linked to the country code.
1934 This emoji is composed of the two regional indicator emoji of the country code.
1935 """
1936 return "".join(chr(int(f"1f1{ord(c)+165:02x}", base=16)) for c in country_code)
1939def format_frame(frame) -> str:
1940 code = frame.f_code
1941 return f'{code.co_name} {code.co_filename}:{frame.f_lineno}'
1944def named_to_positional_printf(string: str, args: Mapping) -> tuple[str, tuple]:
1945 """ Convert a named printf-style format string with its arguments to an
1946 equivalent positional format string with its arguments.
1947 """
1948 pargs = _PrintfArgs(args)
1949 return string.replace('%%', '%%%%') % pargs, tuple(pargs.values)
1952class _PrintfArgs:
1953 """ Helper object to turn a named printf-style format string into a positional one. """
1954 __slots__ = ('mapping', 'values')
1956 def __init__(self, mapping):
1957 self.mapping: Mapping = mapping
1958 self.values: list = []
1960 def __getitem__(self, key):
1961 self.values.append(self.mapping[key])
1962 return "%s"