Coverage for adhoc-cicd-odoo-odoo / odoo / tools / misc.py: 60%

792 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:05 +0000

1# Part of Odoo. See LICENSE file for full copyright and licensing details. 

2""" 

3Miscellaneous tools used by Odoo. 

4""" 

5from __future__ import annotations 

6 

7import base64 

8import collections 

9import csv 

10import datetime 

11import enum 

12import hashlib 

13import hmac as hmac_lib 

14import itertools 

15import json 

16import logging 

17import os 

18import re 

19import sys 

20import tempfile 

21import threading 

22import time 

23import traceback 

24import typing 

25import unicodedata 

26import warnings 

27import zlib 

28from collections import defaultdict 

29from collections.abc import Iterable, Iterator, Mapping, MutableMapping, MutableSet, Reversible 

30from contextlib import ContextDecorator, contextmanager 

31from difflib import HtmlDiff 

32from functools import lru_cache, reduce, wraps 

33from itertools import islice, groupby as itergroupby 

34from operator import itemgetter 

35 

36import babel 

37import babel.dates 

38import markupsafe 

39import pytz 

40from lxml import etree, objectify 

41 

42# get_encodings, ustr and exception_to_unicode were originally from tools.misc. 

43# There are moved to loglevels until we refactor tools. 

44from odoo.loglevels import exception_to_unicode, get_encodings, ustr # noqa: F401 

45 

46from .config import config 

47from .float_utils import float_round 

48from .which import which 

49 

50K = typing.TypeVar('K') 

51T = typing.TypeVar('T') 

52if typing.TYPE_CHECKING: 

53 from collections.abc import Callable, Collection, Sequence 

54 from odoo.api import Environment 

55 from odoo.addons.base.models.res_lang import LangData 

56 

57 P = typing.TypeVar('P') 

58 

59__all__ = [ 

60 'DEFAULT_SERVER_DATETIME_FORMAT', 

61 'DEFAULT_SERVER_DATE_FORMAT', 

62 'DEFAULT_SERVER_TIME_FORMAT', 

63 'NON_BREAKING_SPACE', 

64 'SKIPPED_ELEMENT_TYPES', 

65 'DotDict', 

66 'LastOrderedSet', 

67 'OrderedSet', 

68 'Reverse', 

69 'babel_locale_parse', 

70 'clean_context', 

71 'consteq', 

72 'discardattr', 

73 'exception_to_unicode', 

74 'file_open', 

75 'file_open_temporary_directory', 

76 'file_path', 

77 'find_in_path', 

78 'formatLang', 

79 'format_amount', 

80 'format_date', 

81 'format_datetime', 

82 'format_duration', 

83 'format_time', 

84 'frozendict', 

85 'get_encodings', 

86 'get_iso_codes', 

87 'get_lang', 

88 'groupby', 

89 'hmac', 

90 'hash_sign', 

91 'verify_hash_signed', 

92 'html_escape', 

93 'human_size', 

94 'is_list_of', 

95 'merge_sequences', 

96 'mod10r', 

97 'mute_logger', 

98 'parse_date', 

99 'partition', 

100 'posix_to_ldml', 

101 'remove_accents', 

102 'replace_exceptions', 

103 'reverse_enumerate', 

104 'split_every', 

105 'str2bool', 

106 'street_split', 

107 'topological_sort', 

108 'unique', 

109 'ustr', 

110 'real_time', 

111] 

112 

113_logger = logging.getLogger(__name__) 

114 

115# List of etree._Element subclasses that we choose to ignore when parsing XML. 

116# We include the *Base ones just in case, currently they seem to be subclasses of the _* ones. 

117SKIPPED_ELEMENT_TYPES = (etree._Comment, etree._ProcessingInstruction, etree.CommentBase, etree.PIBase, etree._Entity) 

118 

119# Configure default global parser 

120etree.set_default_parser(etree.XMLParser(resolve_entities=False)) 

121default_parser = etree.XMLParser(resolve_entities=False, remove_blank_text=True) 

122default_parser.set_element_class_lookup(objectify.ObjectifyElementClassLookup()) 

123objectify.set_default_parser(default_parser) 

124 

125NON_BREAKING_SPACE = u'\N{NO-BREAK SPACE}' 

126 

127# ensure we have a non patched time for query times when using freezegun 

128real_time = time.time.__call__ # type: ignore 

129 

130 

131class Sentinel(enum.Enum): 

132 """Class for typing parameters with a sentinel as a default""" 

133 SENTINEL = -1 

134 

135 

136SENTINEL = Sentinel.SENTINEL 

137 

138#---------------------------------------------------------- 

139# Subprocesses 

140#---------------------------------------------------------- 

141 

142def find_in_path(name): 

143 path = os.environ.get('PATH', os.defpath).split(os.pathsep) 

144 if config.get('bin_path') and config['bin_path'] != 'None': 

145 path.append(config['bin_path']) 

146 return which(name, path=os.pathsep.join(path)) 

147 

148# ---------------------------------------------------------- 

149# Postgres subprocesses 

150# ---------------------------------------------------------- 

151 

152 

153def find_pg_tool(name): 

154 path = None 

155 if config['pg_path'] and config['pg_path'] != 'None': 

156 path = config['pg_path'] 

157 try: 

158 return which(name, path=path) 

159 except OSError: 

160 raise Exception('Command `%s` not found.' % name) 

161 

162 

163def exec_pg_environ(): 

164 """ 

165 Force the database PostgreSQL environment variables to the database 

166 configuration of Odoo. 

167 

168 Note: On systems where pg_restore/pg_dump require an explicit password 

169 (i.e. on Windows where TCP sockets are used), it is necessary to pass the 

170 postgres user password in the PGPASSWORD environment variable or in a 

171 special .pgpass file. 

172 

173 See also https://www.postgresql.org/docs/current/libpq-envars.html 

174 """ 

175 env = os.environ.copy() 

176 if config['db_host']: 

177 env['PGHOST'] = config['db_host'] 

178 if config['db_port']: 

179 env['PGPORT'] = str(config['db_port']) 

180 if config['db_user']: 

181 env['PGUSER'] = config['db_user'] 

182 if config['db_password']: 

183 env['PGPASSWORD'] = config['db_password'] 

184 if config['db_app_name']: 

185 env['PGAPPNAME'] = config['db_app_name'].replace('{pid}', f'env{os.getpid()}')[:63] 

186 if config['db_sslmode']: 

187 env['PGSSLMODE'] = config['db_sslmode'] 

188 return env 

189 

190 

191# ---------------------------------------------------------- 

192# File paths 

193# ---------------------------------------------------------- 

194 

195 

196def file_path(file_path: str, filter_ext: tuple[str, ...] = ('',), env: Environment | None = None, *, check_exists: bool = True) -> str: 

197 """Verify that a file exists under a known `addons_path` directory and return its full path. 

198 

199 Examples:: 

200 

201 >>> file_path('hr') 

202 >>> file_path('hr/static/description/icon.png') 

203 >>> file_path('hr/static/description/icon.png', filter_ext=('.png', '.jpg')) 

204 

205 :param str file_path: absolute file path, or relative path within any `addons_path` directory 

206 :param list[str] filter_ext: optional list of supported extensions (lowercase, with leading dot) 

207 :param env: optional environment, required for a file path within a temporary directory 

208 created using `file_open_temporary_directory()` 

209 :param check_exists: check that the file exists (default: True) 

210 :return: the absolute path to the file 

211 :raise FileNotFoundError: if the file is not found under the known `addons_path` directories 

212 :raise ValueError: if the file doesn't have one of the supported extensions (`filter_ext`) 

213 """ 

214 import odoo.addons # noqa: PLC0415 

215 is_abs = os.path.isabs(file_path) 

216 normalized_path = os.path.normpath(os.path.normcase(file_path)) 

217 

218 if filter_ext and not normalized_path.lower().endswith(filter_ext): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 raise ValueError("Unsupported file: " + file_path) 

220 

221 # ignore leading 'addons/' if present, it's the final component of root_path, but 

222 # may sometimes be included in relative paths 

223 normalized_path = normalized_path.removeprefix('addons' + os.sep) 

224 

225 # if path is relative and represents a loaded module, accept only the 

226 # __path__ for that module; otherwise, search in all accepted paths 

227 file_path_split = normalized_path.split(os.path.sep) 

228 if not is_abs and (module := sys.modules.get(f'odoo.addons.{file_path_split[0]}')): 

229 addons_paths = list(map(os.path.dirname, module.__path__)) 

230 else: 

231 root_path = os.path.abspath(config.root_path) 

232 temporary_paths = env.transaction._Transaction__file_open_tmp_paths if env else [] 

233 addons_paths = [*odoo.addons.__path__, root_path, *temporary_paths] 

234 

235 for addons_dir in addons_paths: 

236 # final path sep required to avoid partial match 

237 parent_path = os.path.normpath(os.path.normcase(addons_dir)) + os.sep 

238 if is_abs: 

239 fpath = normalized_path 

240 else: 

241 fpath = os.path.normpath(os.path.join(parent_path, normalized_path)) 

242 if fpath.startswith(parent_path) and ( 

243 # we check existence when asked or we have multiple paths to check 

244 # (there is one possibility for absolute paths) 

245 (not check_exists and (is_abs or len(addons_paths) == 1)) 

246 or os.path.exists(fpath) 

247 ): 

248 return fpath 

249 

250 raise FileNotFoundError("File not found: " + file_path) 

251 

252 

253def file_open(name: str, mode: str = "r", filter_ext: tuple[str, ...] = (), env: Environment | None = None): 

254 """Open a file from within the addons_path directories, as an absolute or relative path. 

255 

256 Examples:: 

257 

258 >>> file_open('hr/static/description/icon.png') 

259 >>> file_open('hr/static/description/icon.png', filter_ext=('.png', '.jpg')) 

260 >>> with file_open('/opt/odoo/addons/hr/static/description/icon.png', 'rb') as f: 

261 ... contents = f.read() 

262 

263 :param name: absolute or relative path to a file located inside an addon 

264 :param mode: file open mode, as for `open()` 

265 :param list[str] filter_ext: optional list of supported extensions (lowercase, with leading dot) 

266 :param env: optional environment, required to open a file within a temporary directory 

267 created using `file_open_temporary_directory()` 

268 :return: file object, as returned by `open()` 

269 :raise FileNotFoundError: if the file is not found under the known `addons_path` directories 

270 :raise ValueError: if the file doesn't have one of the supported extensions (`filter_ext`) 

271 """ 

272 path = file_path(name, filter_ext=filter_ext, env=env, check_exists=False) 

273 encoding = None 

274 if 'b' not in mode: 

275 # Force encoding for text mode, as system locale could affect default encoding, 

276 # even with the latest Python 3 versions. 

277 # Note: This is not covered by a unit test, due to the platform dependency. 

278 # For testing purposes you should be able to force a non-UTF8 encoding with: 

279 # `sudo locale-gen fr_FR; LC_ALL=fr_FR.iso8859-1 python3 ...' 

280 # See also PEP-540, although we can't rely on that at the moment. 

281 encoding = "utf-8" 

282 if any(m in mode for m in ('w', 'x', 'a')) and not os.path.isfile(path): 282 ↛ 284line 282 didn't jump to line 284 because the condition on line 282 was never true

283 # Don't let create new files 

284 raise FileNotFoundError(f"Not a file: {path}") 

285 return open(path, mode, encoding=encoding) 

286 

287 

288@contextmanager 

289def file_open_temporary_directory(env: Environment): 

290 """Create and return a temporary directory added to the directories `file_open` is allowed to read from. 

291 

292 `file_open` will be allowed to open files within the temporary directory 

293 only for environments of the same transaction than `env`. 

294 Meaning, other transactions/requests from other users or even other databases 

295 won't be allowed to open files from this directory. 

296 

297 Examples:: 

298 

299 >>> with odoo.tools.file_open_temporary_directory(self.env) as module_dir: 

300 ... with zipfile.ZipFile('foo.zip', 'r') as z: 

301 ... z.extract('foo/__manifest__.py', module_dir) 

302 ... with odoo.tools.file_open('foo/__manifest__.py', env=self.env) as f: 

303 ... manifest = f.read() 

304 

305 :param env: environment for which the temporary directory is created. 

306 :return: the absolute path to the created temporary directory 

307 """ 

308 with tempfile.TemporaryDirectory() as module_dir: 

309 try: 

310 env.transaction._Transaction__file_open_tmp_paths.append(module_dir) 

311 yield module_dir 

312 finally: 

313 env.transaction._Transaction__file_open_tmp_paths.remove(module_dir) 

314 

315 

316#---------------------------------------------------------- 

317# iterables 

318#---------------------------------------------------------- 

319def flatten(list): 

320 """Flatten a list of elements into a unique list 

321 Author: Christophe Simonis (christophe@tinyerp.com) 

322 

323 Examples:: 

324 >>> flatten(['a']) 

325 ['a'] 

326 >>> flatten('b') 

327 ['b'] 

328 >>> flatten( [] ) 

329 [] 

330 >>> flatten( [[], [[]]] ) 

331 [] 

332 >>> flatten( [[['a','b'], 'c'], 'd', ['e', [], 'f']] ) 

333 ['a', 'b', 'c', 'd', 'e', 'f'] 

334 >>> t = (1,2,(3,), [4, 5, [6, [7], (8, 9), ([10, 11, (12, 13)]), [14, [], (15,)], []]]) 

335 >>> flatten(t) 

336 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] 

337 """ 

338 warnings.warn( 

339 "deprecated since 18.0", 

340 category=DeprecationWarning, 

341 stacklevel=2, 

342 ) 

343 r = [] 

344 for e in list: 

345 if isinstance(e, (bytes, str)) or not isinstance(e, collections.abc.Iterable): 

346 r.append(e) 

347 else: 

348 r.extend(flatten(e)) 

349 return r 

350 

351 

352def reverse_enumerate(lst: Sequence[T]) -> Iterator[tuple[int, T]]: 

353 """Like enumerate but in the other direction 

354 

355 Usage:: 

356 

357 >>> a = ['a', 'b', 'c'] 

358 >>> it = reverse_enumerate(a) 

359 >>> it.next() 

360 (2, 'c') 

361 >>> it.next() 

362 (1, 'b') 

363 >>> it.next() 

364 (0, 'a') 

365 >>> it.next() 

366 Traceback (most recent call last): 

367 File "<stdin>", line 1, in <module> 

368 StopIteration 

369 """ 

370 return zip(range(len(lst) - 1, -1, -1), reversed(lst)) 

371 

372 

373def partition(pred: Callable[[T], bool], elems: Iterable[T]) -> tuple[list[T], list[T]]: 

374 """ Return a pair equivalent to: 

375 ``filter(pred, elems), filter(lambda x: not pred(x), elems)`` """ 

376 yes: list[T] = [] 

377 nos: list[T] = [] 

378 for elem in elems: 

379 (yes if pred(elem) else nos).append(elem) 

380 return yes, nos 

381 

382 

383def topological_sort(elems: Mapping[T, Collection[T]]) -> list[T]: 

384 """ Return a list of elements sorted so that their dependencies are listed 

385 before them in the result. 

386 

387 :param elems: specifies the elements to sort with their dependencies; it is 

388 a dictionary like `{element: dependencies}` where `dependencies` is a 

389 collection of elements that must appear before `element`. The elements 

390 of `dependencies` are not required to appear in `elems`; they will 

391 simply not appear in the result. 

392 

393 :returns: a list with the keys of `elems` sorted according to their 

394 specification. 

395 """ 

396 # the algorithm is inspired by [Tarjan 1976], 

397 # http://en.wikipedia.org/wiki/Topological_sorting#Algorithms 

398 result = [] 

399 visited = set() 

400 

401 def visit(n): 

402 if n not in visited: 

403 visited.add(n) 

404 if n in elems: 

405 # first visit all dependencies of n, then append n to result 

406 for it in elems[n]: 

407 visit(it) 

408 result.append(n) 

409 

410 for el in elems: 

411 visit(el) 

412 

413 return result 

414 

415 

416def merge_sequences(*iterables: Iterable[T]) -> list[T]: 

417 """ Merge several iterables into a list. The result is the union of the 

418 iterables, ordered following the partial order given by the iterables, 

419 with a bias towards the end for the last iterable:: 

420 

421 seq = merge_sequences(['A', 'B', 'C']) 

422 assert seq == ['A', 'B', 'C'] 

423 

424 seq = merge_sequences( 

425 ['A', 'B', 'C'], 

426 ['Z'], # 'Z' can be anywhere 

427 ['Y', 'C'], # 'Y' must precede 'C'; 

428 ['A', 'X', 'Y'], # 'X' must follow 'A' and precede 'Y' 

429 ) 

430 assert seq == ['A', 'B', 'X', 'Y', 'C', 'Z'] 

431 """ 

432 # dict is ordered 

433 deps: defaultdict[T, list[T]] = defaultdict(list) # {item: elems_before_item} 

434 for iterable in iterables: 

435 prev: T | Sentinel = SENTINEL 

436 for item in iterable: 

437 if prev is SENTINEL: 

438 deps[item] # just set the default 

439 else: 

440 deps[item].append(prev) 

441 prev = item 

442 return topological_sort(deps) 

443 

444 

445def get_iso_codes(lang: str) -> str: 

446 if lang.find('_') != -1: 

447 lang_items = lang.split('_') 

448 if lang_items[0] == lang_items[1].lower(): 

449 lang = lang_items[0] 

450 return lang 

451 

452 

453def scan_languages() -> list[tuple[str, str]]: 

454 """ Returns all languages supported by OpenERP for translation 

455 

456 :returns: a list of (lang_code, lang_name) pairs 

457 :rtype: [(str, unicode)] 

458 """ 

459 try: 

460 # read (code, name) from languages in base/data/res.lang.csv 

461 with file_open('base/data/res.lang.csv') as csvfile: 

462 reader = csv.reader(csvfile, delimiter=',', quotechar='"') 

463 fields = next(reader) 

464 code_index = fields.index("code") 

465 name_index = fields.index("name") 

466 result = [ 

467 (row[code_index], row[name_index]) 

468 for row in reader 

469 ] 

470 except Exception: 

471 _logger.error("Could not read res.lang.csv") 

472 result = [] 

473 

474 return sorted(result or [('en_US', u'English')], key=itemgetter(1)) 

475 

476 

477def mod10r(number: str) -> str: 

478 """ 

479 Input number : account or invoice number 

480 Output return: the same number completed with the recursive mod10 

481 key 

482 """ 

483 codec=[0,9,4,6,8,2,7,1,3,5] 

484 report = 0 

485 result="" 

486 for digit in number: 

487 result += digit 

488 if digit.isdigit(): 

489 report = codec[ (int(digit) + report) % 10 ] 

490 return result + str((10 - report) % 10) 

491 

492 

493def str2bool(s: str, default: bool | None = None) -> bool: 

494 # allow this (for now?) because it's used for get_param 

495 if type(s) is bool: 495 ↛ 496line 495 didn't jump to line 496 because the condition on line 495 was never true

496 return s # type: ignore 

497 

498 if not isinstance(s, str): 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 warnings.warn( 

500 f"Passed a non-str to `str2bool`: {s}", 

501 DeprecationWarning, 

502 stacklevel=2, 

503 ) 

504 

505 if default is None: 

506 raise ValueError('Use 0/1/yes/no/true/false/on/off') 

507 return bool(default) 

508 

509 s = s.lower() 

510 if s in ('y', 'yes', '1', 'true', 't', 'on'): 

511 return True 

512 if s in ('n', 'no', '0', 'false', 'f', 'off'): 

513 return False 

514 if default is None: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true

515 raise ValueError('Use 0/1/yes/no/true/false/on/off') 

516 return bool(default) 

517 

518 

519def human_size(sz: float | str) -> str | typing.Literal[False]: 

520 """ 

521 Return the size in a human readable format 

522 """ 

523 if not sz: 

524 return False 

525 units = ('bytes', 'Kb', 'Mb', 'Gb', 'Tb') 

526 if isinstance(sz, str): 

527 sz=len(sz) 

528 s, i = float(sz), 0 

529 while s >= 1024 and i < len(units)-1: 

530 s /= 1024 

531 i += 1 

532 return "%0.2f %s" % (s, units[i]) 

533 

534 

535DEFAULT_SERVER_DATE_FORMAT = "%Y-%m-%d" 

536DEFAULT_SERVER_TIME_FORMAT = "%H:%M:%S" 

537DEFAULT_SERVER_DATETIME_FORMAT = "%s %s" % ( 

538 DEFAULT_SERVER_DATE_FORMAT, 

539 DEFAULT_SERVER_TIME_FORMAT) 

540 

541DATE_LENGTH = len(datetime.date.today().strftime(DEFAULT_SERVER_DATE_FORMAT)) 

542 

543# Python's strftime supports only the format directives 

544# that are available on the platform's libc, so in order to 

545# be cross-platform we map to the directives required by 

546# the C standard (1989 version), always available on platforms 

547# with a C standard implementation. 

548DATETIME_FORMATS_MAP = { 

549 '%C': '', # century 

550 '%D': '%m/%d/%Y', # modified %y->%Y 

551 '%e': '%d', 

552 '%E': '', # special modifier 

553 '%F': '%Y-%m-%d', 

554 '%g': '%Y', # modified %y->%Y 

555 '%G': '%Y', 

556 '%h': '%b', 

557 '%k': '%H', 

558 '%l': '%I', 

559 '%n': '\n', 

560 '%O': '', # special modifier 

561 '%P': '%p', 

562 '%R': '%H:%M', 

563 '%r': '%I:%M:%S %p', 

564 '%s': '', #num of seconds since epoch 

565 '%T': '%H:%M:%S', 

566 '%t': ' ', # tab 

567 '%u': ' %w', 

568 '%V': '%W', 

569 '%y': '%Y', # Even if %y works, it's ambiguous, so we should use %Y 

570 '%+': '%Y-%m-%d %H:%M:%S', 

571 

572 # %Z is a special case that causes 2 problems at least: 

573 # - the timezone names we use (in res_user.context_tz) come 

574 # from pytz, but not all these names are recognized by 

575 # strptime(), so we cannot convert in both directions 

576 # when such a timezone is selected and %Z is in the format 

577 # - %Z is replaced by an empty string in strftime() when 

578 # there is not tzinfo in a datetime value (e.g when the user 

579 # did not pick a context_tz). The resulting string does not 

580 # parse back if the format requires %Z. 

581 # As a consequence, we strip it completely from format strings. 

582 # The user can always have a look at the context_tz in 

583 # preferences to check the timezone. 

584 '%z': '', 

585 '%Z': '', 

586} 

587 

588POSIX_TO_LDML = { 

589 'a': 'E', 

590 'A': 'EEEE', 

591 'b': 'MMM', 

592 'B': 'MMMM', 

593 #'c': '', 

594 'd': 'dd', 

595 '-d': 'd', 

596 'H': 'HH', 

597 'I': 'hh', 

598 'j': 'DDD', 

599 'm': 'MM', 

600 '-m': 'M', 

601 'M': 'mm', 

602 'p': 'a', 

603 'S': 'ss', 

604 'U': 'w', 

605 'w': 'e', 

606 'W': 'w', 

607 'y': 'yy', 

608 'Y': 'yyyy', 

609 # see comments above, and babel's format_datetime assumes an UTC timezone 

610 # for naive datetime objects 

611 #'z': 'Z', 

612 #'Z': 'z', 

613} 

614 

615 

616def posix_to_ldml(fmt: str, locale: babel.Locale) -> str: 

617 """ Converts a posix/strftime pattern into an LDML date format pattern. 

618 

619 :param fmt: non-extended C89/C90 strftime pattern 

620 :param locale: babel locale used for locale-specific conversions (e.g. %x and %X) 

621 :return: unicode 

622 """ 

623 buf = [] 

624 pc = False 

625 minus = False 

626 quoted = [] 

627 

628 for c in fmt: 

629 # LDML date format patterns uses letters, so letters must be quoted 

630 if not pc and c.isalpha(): 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true

631 quoted.append(c if c != "'" else "''") 

632 continue 

633 if quoted: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true

634 buf.append("'") 

635 buf.append(''.join(quoted)) 

636 buf.append("'") 

637 quoted = [] 

638 

639 if pc: 

640 if c == '%': # escaped percent 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true

641 buf.append('%') 

642 elif c == 'x': # date format, short seems to match 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true

643 buf.append(locale.date_formats['short'].pattern) 

644 elif c == 'X': # time format, seems to include seconds. short does not 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true

645 buf.append(locale.time_formats['medium'].pattern) 

646 elif c == '-': 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true

647 minus = True 

648 continue 

649 else: # look up format char in static mapping 

650 if minus: 650 ↛ 651line 650 didn't jump to line 651 because the condition on line 650 was never true

651 c = '-' + c 

652 minus = False 

653 buf.append(POSIX_TO_LDML[c]) 

654 pc = False 

655 elif c == '%': 

656 pc = True 

657 else: 

658 buf.append(c) 

659 

660 # flush anything remaining in quoted buffer 

661 if quoted: 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true

662 buf.append("'") 

663 buf.append(''.join(quoted)) 

664 buf.append("'") 

665 

666 return ''.join(buf) 

667 

668 

669@typing.overload 

670def split_every(n: int, iterable: Iterable[T]) -> Iterator[tuple[T, ...]]: 

671 ... 

672 

673 

674@typing.overload 

675def split_every(n: int, iterable: Iterable[T], piece_maker: type[Collection[T]]) -> Iterator[Collection[T]]: 

676 ... 

677 

678 

679@typing.overload 

680def split_every(n: int, iterable: Iterable[T], piece_maker: Callable[[Iterable[T]], P]) -> Iterator[P]: 

681 ... 

682 

683 

684def split_every(n: int, iterable: Iterable[T], piece_maker=tuple): 

685 """Splits an iterable into length-n pieces. The last piece will be shorter 

686 if ``n`` does not evenly divide the iterable length. 

687 

688 :param int n: maximum size of each generated chunk 

689 :param Iterable iterable: iterable to chunk into pieces 

690 :param piece_maker: callable taking an iterable and collecting each 

691 chunk from its slice, *must consume the entire slice*. 

692 """ 

693 iterator = iter(iterable) 

694 piece = piece_maker(islice(iterator, n)) 

695 while piece: 

696 yield piece 

697 piece = piece_maker(islice(iterator, n)) 

698 

699 

700def discardattr(obj: object, key: str) -> None: 

701 """ Perform a ``delattr(obj, key)`` but without crashing if ``key`` is not present. """ 

702 try: 

703 delattr(obj, key) 

704 except AttributeError: 

705 pass 

706 

707# --------------------------------------------- 

708# String management 

709# --------------------------------------------- 

710 

711 

712# Inspired by http://stackoverflow.com/questions/517923 

713def remove_accents(input_str: str) -> str: 

714 """Suboptimal-but-better-than-nothing way to replace accented 

715 latin letters by an ASCII equivalent. Will obviously change the 

716 meaning of input_str and work only for some cases""" 

717 if not input_str: 717 ↛ 718line 717 didn't jump to line 718 because the condition on line 717 was never true

718 return input_str 

719 nkfd_form = unicodedata.normalize('NFKD', input_str) 

720 return ''.join(c for c in nkfd_form if not unicodedata.combining(c)) 

721 

722 

723class unquote(str): 

724 """A subclass of str that implements repr() without enclosing quotation marks 

725 or escaping, keeping the original string untouched. The name come from Lisp's unquote. 

726 One of the uses for this is to preserve or insert bare variable names within dicts during eval() 

727 of a dict's repr(). Use with care. 

728 

729 Some examples (notice that there are never quotes surrounding 

730 the ``active_id`` name: 

731 

732 >>> unquote('active_id') 

733 active_id 

734 >>> d = {'test': unquote('active_id')} 

735 >>> d 

736 {'test': active_id} 

737 >>> print d 

738 {'test': active_id} 

739 """ 

740 __slots__ = () 

741 

742 def __repr__(self): 

743 return self 

744 

745 

746class mute_logger(logging.Handler): 

747 """Temporary suppress the logging. 

748 

749 Can be used as context manager or decorator:: 

750 

751 @mute_logger('odoo.plic.ploc') 

752 def do_stuff(): 

753 blahblah() 

754 

755 with mute_logger('odoo.foo.bar'): 

756 do_suff() 

757 """ 

758 def __init__(self, *loggers): 

759 super().__init__() 

760 self.loggers = loggers 

761 self.old_params = {} 

762 

763 def __enter__(self): 

764 for logger_name in self.loggers: 

765 logger = logging.getLogger(logger_name) 

766 self.old_params[logger_name] = (logger.handlers, logger.propagate) 

767 logger.propagate = False 

768 logger.handlers = [self] 

769 

770 def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): 

771 for logger_name in self.loggers: 

772 logger = logging.getLogger(logger_name) 

773 logger.handlers, logger.propagate = self.old_params[logger_name] 

774 

775 def __call__(self, func): 

776 @wraps(func) 

777 def deco(*args, **kwargs): 

778 with self: 

779 return func(*args, **kwargs) 

780 return deco 

781 

782 def emit(self, record): 

783 pass 

784 

785 

786class lower_logging(logging.Handler): 

787 """Temporary lower the max logging level. 

788 """ 

789 def __init__(self, max_level, to_level=None): 

790 super().__init__() 

791 self.old_handlers = None 

792 self.old_propagate = None 

793 self.had_error_log = False 

794 self.max_level = max_level 

795 self.to_level = to_level or max_level 

796 

797 def __enter__(self): 

798 logger = logging.getLogger() 

799 self.old_handlers = logger.handlers[:] 

800 self.old_propagate = logger.propagate 

801 logger.propagate = False 

802 logger.handlers = [self] 

803 self.had_error_log = False 

804 return self 

805 

806 def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): 

807 logger = logging.getLogger() 

808 logger.handlers = self.old_handlers 

809 logger.propagate = self.old_propagate 

810 

811 def emit(self, record): 

812 if record.levelno > self.max_level: 

813 record.levelname = f'_{record.levelname}' 

814 record.levelno = self.to_level 

815 self.had_error_log = True 

816 if MungedTracebackLogRecord.__base__ is logging.LogRecord: 

817 MungedTracebackLogRecord.__bases__ = (record.__class__,) 

818 record.__class__ = MungedTracebackLogRecord 

819 

820 if logging.getLogger(record.name).isEnabledFor(record.levelno): 

821 for handler in self.old_handlers: 

822 if handler.level <= record.levelno: 

823 handler.emit(record) 

824 

825 

826class MungedTracebackLogRecord(logging.LogRecord): 

827 def getMessage(self): 

828 return super().getMessage().replace( 

829 'Traceback (most recent call last):', 

830 '_Traceback_ (most recent call last):', 

831 ) 

832 

833 

834def stripped_sys_argv(*strip_args): 

835 """Return sys.argv with some arguments stripped, suitable for reexecution or subprocesses""" 

836 strip_args = sorted(set(strip_args) | set(['-s', '--save', '-u', '--update', '-i', '--init', '--i18n-overwrite'])) 

837 assert all(config.parser.has_option(s) for s in strip_args) 

838 takes_value = dict((s, config.parser.get_option(s).takes_value()) for s in strip_args) 

839 

840 longs, shorts = list(tuple(y) for _, y in itergroupby(strip_args, lambda x: x.startswith('--'))) 

841 longs_eq = tuple(l + '=' for l in longs if takes_value[l]) 

842 

843 args = sys.argv[:] 

844 

845 def strip(args, i): 

846 return args[i].startswith(shorts) \ 

847 or args[i].startswith(longs_eq) or (args[i] in longs) \ 

848 or (i >= 1 and (args[i - 1] in strip_args) and takes_value[args[i - 1]]) 

849 

850 return [x for i, x in enumerate(args) if not strip(args, i)] 

851 

852 

853class ConstantMapping(Mapping[typing.Any, T], typing.Generic[T]): 

854 """ 

855 An immutable mapping returning the provided value for every single key. 

856 

857 Useful for default value to methods 

858 """ 

859 __slots__ = ['_value'] 

860 

861 def __init__(self, val: T): 

862 self._value = val 

863 

864 def __len__(self): 

865 """ 

866 defaultdict updates its length for each individually requested key, is 

867 that really useful? 

868 """ 

869 return 0 

870 

871 def __iter__(self): 

872 """ 

873 same as len, defaultdict updates its iterable keyset with each key 

874 requested, is there a point for this? 

875 """ 

876 return iter([]) 

877 

878 def __getitem__(self, item) -> T: 

879 return self._value 

880 

881 

882def dumpstacks(sig=None, frame=None, thread_idents=None, log_level=logging.INFO): 

883 """ Signal handler: dump a stack trace for each existing thread or given 

884 thread(s) specified through the ``thread_idents`` sequence. 

885 """ 

886 code = [] 

887 

888 def extract_stack(stack): 

889 for filename, lineno, name, line in traceback.extract_stack(stack): 

890 yield 'File: "%s", line %d, in %s' % (filename, lineno, name) 

891 if line: 

892 yield " %s" % (line.strip(),) 

893 

894 # code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696 

895 # modified for python 2.5 compatibility 

896 threads_info = {th.ident: {'repr': repr(th), 

897 'uid': getattr(th, 'uid', 'n/a'), 

898 'dbname': getattr(th, 'dbname', 'n/a'), 

899 'url': getattr(th, 'url', 'n/a'), 

900 'query_count': getattr(th, 'query_count', 'n/a'), 

901 'query_time': getattr(th, 'query_time', None), 

902 'perf_t0': getattr(th, 'perf_t0', None)} 

903 for th in threading.enumerate()} 

904 for threadId, stack in sys._current_frames().items(): 

905 if not thread_idents or threadId in thread_idents: 

906 thread_info = threads_info.get(threadId, {}) 

907 query_time = thread_info.get('query_time') 

908 perf_t0 = thread_info.get('perf_t0') 

909 remaining_time = None 

910 if query_time is not None and perf_t0: 

911 remaining_time = '%.3f' % (real_time() - perf_t0 - query_time) 

912 query_time = '%.3f' % query_time 

913 # qc:query_count qt:query_time pt:python_time (aka remaining time) 

914 code.append("\n# Thread: %s (db:%s) (uid:%s) (url:%s) (qc:%s qt:%s pt:%s)" % 

915 (thread_info.get('repr', threadId), 

916 thread_info.get('dbname', 'n/a'), 

917 thread_info.get('uid', 'n/a'), 

918 thread_info.get('url', 'n/a'), 

919 thread_info.get('query_count', 'n/a'), 

920 query_time or 'n/a', 

921 remaining_time or 'n/a')) 

922 for line in extract_stack(stack): 

923 code.append(line) 

924 

925 import odoo # eventd 

926 if odoo.evented: 

927 # code from http://stackoverflow.com/questions/12510648/in-gevent-how-can-i-dump-stack-traces-of-all-running-greenlets 

928 import gc 

929 from greenlet import greenlet 

930 for ob in gc.get_objects(): 

931 if not isinstance(ob, greenlet) or not ob: 

932 continue 

933 code.append("\n# Greenlet: %r" % (ob,)) 

934 for line in extract_stack(ob.gr_frame): 

935 code.append(line) 

936 

937 _logger.log(log_level, "\n".join(code)) 

938 

939 

940def freehash(arg: typing.Any) -> int: 

941 try: 

942 return hash(arg) 

943 except Exception: 

944 if isinstance(arg, Mapping): 944 ↛ 945line 944 didn't jump to line 945 because the condition on line 944 was never true

945 return hash(frozendict(arg)) 

946 elif isinstance(arg, Iterable): 946 ↛ 949line 946 didn't jump to line 949 because the condition on line 946 was always true

947 return hash(frozenset(freehash(item) for item in arg)) 

948 else: 

949 return id(arg) 

950 

951 

952def clean_context(context: dict[str, typing.Any]) -> dict[str, typing.Any]: 

953 """ This function take a dictionary and remove each entry with its key 

954 starting with ``default_`` 

955 """ 

956 return {k: v for k, v in context.items() if not k.startswith('default_')} 

957 

958 

959class frozendict(dict[K, T], typing.Generic[K, T]): 

960 """ An implementation of an immutable dictionary. """ 

961 __slots__ = () 

962 

963 def __delitem__(self, key): 

964 raise NotImplementedError("'__delitem__' not supported on frozendict") 

965 

966 def __setitem__(self, key, val): 

967 raise NotImplementedError("'__setitem__' not supported on frozendict") 

968 

969 def clear(self): 

970 raise NotImplementedError("'clear' not supported on frozendict") 

971 

972 def pop(self, key, default=None): 

973 raise NotImplementedError("'pop' not supported on frozendict") 

974 

975 def popitem(self): 

976 raise NotImplementedError("'popitem' not supported on frozendict") 

977 

978 def setdefault(self, key, default=None): 

979 raise NotImplementedError("'setdefault' not supported on frozendict") 

980 

981 def update(self, *args, **kwargs): 

982 raise NotImplementedError("'update' not supported on frozendict") 

983 

984 def __hash__(self) -> int: # type: ignore 

985 return hash(frozenset((key, freehash(val)) for key, val in self.items())) 

986 

987 

988class Collector(dict[K, tuple[T, ...]], typing.Generic[K, T]): 

989 """ A mapping from keys to tuples. This implements a relation, and can be 

990 seen as a space optimization for ``defaultdict(tuple)``. 

991 """ 

992 __slots__ = () 

993 

994 def __getitem__(self, key: K) -> tuple[T, ...]: 

995 return self.get(key, ()) 

996 

997 def __setitem__(self, key: K, val: Iterable[T]): 

998 val = tuple(val) 

999 if val: 

1000 super().__setitem__(key, val) 

1001 else: 

1002 super().pop(key, None) 

1003 

1004 def add(self, key: K, val: T): 

1005 vals = self[key] 

1006 if val not in vals: 

1007 self[key] = vals + (val,) 

1008 

1009 def discard_keys_and_values(self, excludes: Collection[K | T]) -> None: 

1010 for key in excludes: 

1011 self.pop(key, None) # type: ignore 

1012 for key, vals in list(self.items()): 

1013 self[key] = tuple(val for val in vals if val not in excludes) # type: ignore 

1014 

1015 

1016class StackMap(MutableMapping[K, T], typing.Generic[K, T]): 

1017 """ A stack of mappings behaving as a single mapping, and used to implement 

1018 nested scopes. The lookups search the stack from top to bottom, and 

1019 returns the first value found. Mutable operations modify the topmost 

1020 mapping only. 

1021 """ 

1022 __slots__ = ['_maps'] 

1023 

1024 def __init__(self, m: MutableMapping[K, T] | None = None): 

1025 self._maps = [] if m is None else [m] 

1026 

1027 def __getitem__(self, key: K) -> T: 

1028 for mapping in reversed(self._maps): 

1029 try: 

1030 return mapping[key] 

1031 except KeyError: 

1032 pass 

1033 raise KeyError(key) 

1034 

1035 def __setitem__(self, key: K, val: T): 

1036 self._maps[-1][key] = val 

1037 

1038 def __delitem__(self, key: K): 

1039 del self._maps[-1][key] 

1040 

1041 def __iter__(self) -> Iterator[K]: 

1042 return iter({key for mapping in self._maps for key in mapping}) 

1043 

1044 def __len__(self) -> int: 

1045 return sum(1 for key in self) 

1046 

1047 def __str__(self) -> str: 

1048 return f"<StackMap {self._maps}>" 

1049 

1050 def pushmap(self, m: MutableMapping[K, T] | None = None): 

1051 self._maps.append({} if m is None else m) 

1052 

1053 def popmap(self) -> MutableMapping[K, T]: 

1054 return self._maps.pop() 

1055 

1056 

1057class OrderedSet(MutableSet[T], typing.Generic[T]): 

1058 """ A set collection that remembers the elements first insertion order. """ 

1059 __slots__ = ['_map'] 

1060 

1061 def __init__(self, elems: Iterable[T] = ()): 

1062 self._map: dict[T, None] = dict.fromkeys(elems) 

1063 

1064 def __contains__(self, elem): 

1065 return elem in self._map 

1066 

1067 def __iter__(self): 

1068 return iter(self._map) 

1069 

1070 def __len__(self): 

1071 return len(self._map) 

1072 

1073 def add(self, elem): 

1074 self._map[elem] = None 

1075 

1076 def discard(self, elem): 

1077 self._map.pop(elem, None) 

1078 

1079 def update(self, elems): 

1080 self._map.update(zip(elems, itertools.repeat(None))) 

1081 

1082 def difference_update(self, elems): 

1083 for elem in elems: 

1084 self.discard(elem) 

1085 

1086 def __repr__(self): 

1087 return f'{type(self).__name__}({list(self)!r})' 

1088 

1089 def intersection(self, *others): 

1090 return reduce(OrderedSet.__and__, others, self) 

1091 

1092 

1093class LastOrderedSet(OrderedSet[T], typing.Generic[T]): 

1094 """ A set collection that remembers the elements last insertion order. """ 

1095 def add(self, elem): 

1096 self.discard(elem) 

1097 super().add(elem) 

1098 

1099 

1100class Callbacks: 

1101 """ A simple queue of callback functions. Upon run, every function is 

1102 called (in addition order), and the queue is emptied. 

1103 

1104 :: 

1105 

1106 callbacks = Callbacks() 

1107 

1108 # add foo 

1109 def foo(): 

1110 print("foo") 

1111 

1112 callbacks.add(foo) 

1113 

1114 # add bar 

1115 callbacks.add 

1116 def bar(): 

1117 print("bar") 

1118 

1119 # add foo again 

1120 callbacks.add(foo) 

1121 

1122 # call foo(), bar(), foo(), then clear the callback queue 

1123 callbacks.run() 

1124 

1125 The queue also provides a ``data`` dictionary, that may be freely used to 

1126 store anything, but is mostly aimed at aggregating data for callbacks. The 

1127 dictionary is automatically cleared by ``run()`` once all callback functions 

1128 have been called. 

1129 

1130 :: 

1131 

1132 # register foo to process aggregated data 

1133 @callbacks.add 

1134 def foo(): 

1135 print(sum(callbacks.data['foo'])) 

1136 

1137 callbacks.data.setdefault('foo', []).append(1) 

1138 ... 

1139 callbacks.data.setdefault('foo', []).append(2) 

1140 ... 

1141 callbacks.data.setdefault('foo', []).append(3) 

1142 

1143 # call foo(), which prints 6 

1144 callbacks.run() 

1145 

1146 Given the global nature of ``data``, the keys should identify in a unique 

1147 way the data being stored. It is recommended to use strings with a 

1148 structure like ``"{module}.{feature}"``. 

1149 """ 

1150 __slots__ = ['_funcs', 'data'] 

1151 

1152 def __init__(self): 

1153 self._funcs: collections.deque[Callable] = collections.deque() 

1154 self.data = {} 

1155 

1156 def add(self, func: Callable) -> None: 

1157 """ Add the given function. """ 

1158 self._funcs.append(func) 

1159 

1160 def run(self) -> None: 

1161 """ Call all the functions (in addition order), then clear associated data. 

1162 """ 

1163 while self._funcs: 

1164 func = self._funcs.popleft() 

1165 func() 

1166 self.clear() 

1167 

1168 def clear(self) -> None: 

1169 """ Remove all callbacks and data from self. """ 

1170 self._funcs.clear() 

1171 self.data.clear() 

1172 

1173 def __len__(self) -> int: 

1174 return len(self._funcs) 

1175 

1176 

1177class ReversedIterable(Reversible[T], typing.Generic[T]): 

1178 """ An iterable implementing the reversal of another iterable. """ 

1179 __slots__ = ['iterable'] 

1180 

1181 def __init__(self, iterable: Reversible[T]): 

1182 self.iterable = iterable 

1183 

1184 def __iter__(self): 

1185 return reversed(self.iterable) 

1186 

1187 def __reversed__(self): 

1188 return iter(self.iterable) 

1189 

1190 

1191def groupby(iterable: Iterable[T], key: Callable[[T], K] = lambda arg: arg) -> Iterable[tuple[K, list[T]]]: 

1192 """ Return a collection of pairs ``(key, elements)`` from ``iterable``. The 

1193 ``key`` is a function computing a key value for each element. This 

1194 function is similar to ``itertools.groupby``, but aggregates all 

1195 elements under the same key, not only consecutive elements. 

1196 """ 

1197 groups = defaultdict(list) 

1198 for elem in iterable: 

1199 groups[key(elem)].append(elem) 

1200 return groups.items() 

1201 

1202 

1203def unique(it: Iterable[T]) -> Iterator[T]: 

1204 """ "Uniquifier" for the provided iterable: will output each element of 

1205 the iterable once. 

1206 

1207 The iterable's elements must be hashahble. 

1208 

1209 :param Iterable it: 

1210 :rtype: Iterator 

1211 """ 

1212 seen = set() 

1213 for e in it: 

1214 if e not in seen: 

1215 seen.add(e) 

1216 yield e 

1217 

1218 

1219def submap(mapping: Mapping[K, T], keys: Iterable[K]) -> Mapping[K, T]: 

1220 """ 

1221 Get a filtered copy of the mapping where only some keys are present. 

1222 

1223 :param Mapping mapping: the original dict-like structure to filter 

1224 :param Iterable keys: the list of keys to keep 

1225 :return dict: a filtered dict copy of the original mapping 

1226 """ 

1227 keys = frozenset(keys) 

1228 return {key: mapping[key] for key in mapping if key in keys} 

1229 

1230 

1231class Reverse(object): 

1232 """ Wraps a value and reverses its ordering, useful in key functions when 

1233 mixing ascending and descending sort on non-numeric data as the 

1234 ``reverse`` parameter can not do piecemeal reordering. 

1235 """ 

1236 __slots__ = ['val'] 

1237 

1238 def __init__(self, val): 

1239 self.val = val 

1240 

1241 def __eq__(self, other): return self.val == other.val 1241 ↛ exitline 1241 didn't return from function '__eq__' because the return on line 1241 wasn't executed

1242 def __ne__(self, other): return self.val != other.val 1242 ↛ exitline 1242 didn't return from function '__ne__' because the return on line 1242 wasn't executed

1243 

1244 def __ge__(self, other): return self.val <= other.val 1244 ↛ exitline 1244 didn't return from function '__ge__' because the return on line 1244 wasn't executed

1245 def __gt__(self, other): return self.val < other.val 1245 ↛ exitline 1245 didn't return from function '__gt__' because the return on line 1245 wasn't executed

1246 def __le__(self, other): return self.val >= other.val 1246 ↛ exitline 1246 didn't return from function '__le__' because the return on line 1246 wasn't executed

1247 def __lt__(self, other): return self.val > other.val 1247 ↛ exitline 1247 didn't return from function '__lt__' because the return on line 1247 wasn't executed

1248 

1249class replace_exceptions(ContextDecorator): 

1250 """ 

1251 Hide some exceptions behind another error. Can be used as a function 

1252 decorator or as a context manager. 

1253 

1254 .. code-block: 

1255 

1256 @route('/super/secret/route', auth='public') 

1257 @replace_exceptions(AccessError, by=NotFound()) 

1258 def super_secret_route(self): 

1259 if not request.session.uid: 

1260 raise AccessError("Route hidden to non logged-in users") 

1261 ... 

1262 

1263 def some_util(): 

1264 ... 

1265 with replace_exceptions(ValueError, by=UserError("Invalid argument")): 

1266 ... 

1267 ... 

1268 

1269 :param exceptions: the exception classes to catch and replace. 

1270 :param by: the exception to raise instead. 

1271 """ 

1272 def __init__(self, *exceptions, by): 

1273 if not exceptions: 1273 ↛ 1274line 1273 didn't jump to line 1274 because the condition on line 1273 was never true

1274 raise ValueError("Missing exceptions") 

1275 

1276 wrong_exc = next((exc for exc in exceptions if not issubclass(exc, Exception)), None) 

1277 if wrong_exc: 1277 ↛ 1278line 1277 didn't jump to line 1278 because the condition on line 1277 was never true

1278 raise TypeError(f"{wrong_exc} is not an exception class.") 

1279 

1280 self.exceptions = exceptions 

1281 self.by = by 

1282 

1283 def __enter__(self): 

1284 return self 

1285 

1286 def __exit__(self, exc_type, exc_value, traceback): 

1287 if exc_type is not None and issubclass(exc_type, self.exceptions): 1287 ↛ 1288line 1287 didn't jump to line 1288 because the condition on line 1287 was never true

1288 if isinstance(self.by, type) and exc_value.args: 

1289 # copy the message 

1290 raise self.by(exc_value.args[0]) from exc_value 

1291 else: 

1292 raise self.by from exc_value 

1293 

1294 

1295html_escape = markupsafe.escape 

1296 

1297 

1298def get_lang(env: Environment, lang_code: str | None = None) -> LangData: 

1299 """ 

1300 Retrieve the first lang object installed, by checking the parameter lang_code, 

1301 the context and then the company. If no lang is installed from those variables, 

1302 fallback on english or on the first lang installed in the system. 

1303 

1304 :param env: 

1305 :param str lang_code: the locale (i.e. en_US) 

1306 :return LangData: the first lang found that is installed on the system. 

1307 """ 

1308 langs = [code for code, _ in env['res.lang'].get_installed()] 

1309 lang = 'en_US' if 'en_US' in langs else langs[0] 

1310 if lang_code and lang_code in langs: 

1311 lang = lang_code 

1312 elif (context_lang := env.context.get('lang')) in langs: 

1313 lang = context_lang 

1314 elif (company_lang := env.user.with_context(lang='en_US').company_id.partner_id.lang) in langs: 1314 ↛ 1316line 1314 didn't jump to line 1316 because the condition on line 1314 was always true

1315 lang = company_lang 

1316 return env['res.lang']._get_data(code=lang) 

1317 

1318 

1319@lru_cache 

1320def babel_locale_parse(lang_code: str | None) -> babel.Locale: 

1321 if lang_code: 1321 ↛ 1326line 1321 didn't jump to line 1326 because the condition on line 1321 was always true

1322 try: 

1323 return babel.Locale.parse(lang_code) 

1324 except Exception: # noqa: BLE001 

1325 pass 

1326 try: 

1327 return babel.Locale.default() 

1328 except Exception: # noqa: BLE001 

1329 return babel.Locale.parse("en_US") 

1330 

1331 

1332def formatLang( 

1333 env: Environment, 

1334 value: float | typing.Literal[''], 

1335 digits: int = 2, 

1336 grouping: bool = True, 

1337 dp: str | None = None, 

1338 currency_obj: typing.Any | None = None, 

1339 rounding_method: typing.Literal['HALF-UP', 'HALF-DOWN', 'HALF-EVEN', "UP", "DOWN"] = 'HALF-EVEN', 

1340 rounding_unit: typing.Literal['decimals', 'units', 'thousands', 'lakhs', 'millions'] = 'decimals', 

1341) -> str: 

1342 """ 

1343 This function will format a number `value` to the appropriate format of the language used. 

1344 

1345 :param env: The environment. 

1346 :param value: The value to be formatted. 

1347 :param digits: The number of decimals digits. 

1348 :param grouping: Usage of language grouping or not. 

1349 :param dp: Name of the decimals precision to be used. This will override ``digits`` 

1350 and ``currency_obj`` precision. 

1351 :param currency_obj: Currency to be used. This will override ``digits`` precision. 

1352 :param rounding_method: The rounding method to be used: 

1353 **'HALF-UP'** will round to the closest number with ties going away from zero, 

1354 **'HALF-DOWN'** will round to the closest number with ties going towards zero, 

1355 **'HALF_EVEN'** will round to the closest number with ties going to the closest 

1356 even number, 

1357 **'UP'** will always round away from 0, 

1358 **'DOWN'** will always round towards 0. 

1359 :param rounding_unit: The rounding unit to be used: 

1360 **decimals** will round to decimals with ``digits`` or ``dp`` precision, 

1361 **units** will round to units without any decimals, 

1362 **thousands** will round to thousands without any decimals, 

1363 **lakhs** will round to lakhs without any decimals, 

1364 **millions** will round to millions without any decimals. 

1365 

1366 :returns: The value formatted. 

1367 """ 

1368 # We don't want to return 0 

1369 if value == '': 1369 ↛ 1370line 1369 didn't jump to line 1370 because the condition on line 1369 was never true

1370 return '' 

1371 

1372 if rounding_unit == 'decimals': 1372 ↛ 1378line 1372 didn't jump to line 1378 because the condition on line 1372 was always true

1373 if dp: 1373 ↛ 1374line 1373 didn't jump to line 1374 because the condition on line 1373 was never true

1374 digits = env['decimal.precision'].precision_get(dp) 

1375 elif currency_obj: 1375 ↛ 1376line 1375 didn't jump to line 1376 because the condition on line 1375 was never true

1376 digits = currency_obj.decimal_places 

1377 else: 

1378 digits = 0 

1379 

1380 rounding_unit_mapping = { 

1381 'decimals': 1, 

1382 'thousands': 10**3, 

1383 'lakhs': 10**5, 

1384 'millions': 10**6, 

1385 'units': 1, 

1386 } 

1387 

1388 value /= rounding_unit_mapping[rounding_unit] 

1389 

1390 rounded_value = float_round(value, precision_digits=digits, rounding_method=rounding_method) 

1391 lang = env['res.lang'].browse(get_lang(env).id) 

1392 formatted_value = lang.format(f'%.{digits}f', rounded_value, grouping=grouping) 

1393 

1394 if currency_obj and currency_obj.symbol: 1394 ↛ 1395line 1394 didn't jump to line 1395 because the condition on line 1394 was never true

1395 arguments = (formatted_value, NON_BREAKING_SPACE, currency_obj.symbol) 

1396 

1397 return '%s%s%s' % (arguments if currency_obj.position == 'after' else arguments[::-1]) 

1398 

1399 return formatted_value 

1400 

1401 

1402def format_date( 

1403 env: Environment, 

1404 value: datetime.datetime | datetime.date | str, 

1405 lang_code: str | None = None, 

1406 date_format: str | typing.Literal[False] = False, 

1407) -> str: 

1408 """ 

1409 Formats the date in a given format. 

1410 

1411 :param env: an environment. 

1412 :param date, datetime or string value: the date to format. 

1413 :param string lang_code: the lang code, if not specified it is extracted from the 

1414 environment context. 

1415 :param string date_format: the format or the date (LDML format), if not specified the 

1416 default format of the lang. 

1417 :return: date formatted in the specified format. 

1418 :rtype: string 

1419 """ 

1420 if not value: 

1421 return '' 

1422 from odoo.fields import Datetime # noqa: PLC0415 

1423 if isinstance(value, str): 1423 ↛ 1424line 1423 didn't jump to line 1424 because the condition on line 1423 was never true

1424 if len(value) < DATE_LENGTH: 

1425 return '' 

1426 if len(value) > DATE_LENGTH: 

1427 # a datetime, convert to correct timezone 

1428 value = Datetime.from_string(value) 

1429 value = Datetime.context_timestamp(env['res.lang'], value) 

1430 else: 

1431 value = Datetime.from_string(value) 

1432 elif isinstance(value, datetime.datetime) and not value.tzinfo: 

1433 # a datetime, convert to correct timezone 

1434 value = Datetime.context_timestamp(env['res.lang'], value) 

1435 

1436 lang = get_lang(env, lang_code) 

1437 locale = babel_locale_parse(lang.code) 

1438 if not date_format: 

1439 date_format = posix_to_ldml(lang.date_format, locale=locale) 

1440 

1441 assert isinstance(value, datetime.date) # datetime is a subclass of date 

1442 return babel.dates.format_date(value, format=date_format, locale=locale) 

1443 

1444 

1445def parse_date(env: Environment, value: str, lang_code: str | None = None) -> datetime.date | str: 

1446 """ 

1447 Parse the date from a given format. If it is not a valid format for the 

1448 localization, return the original string. 

1449 

1450 :param env: an environment. 

1451 :param string value: the date to parse. 

1452 :param string lang_code: the lang code, if not specified it is extracted from the 

1453 environment context. 

1454 :return: date object from the localized string 

1455 :rtype: datetime.date 

1456 """ 

1457 lang = get_lang(env, lang_code) 

1458 locale = babel_locale_parse(lang.code) 

1459 try: 

1460 return babel.dates.parse_date(value, locale=locale) 

1461 except: 

1462 return value 

1463 

1464 

1465def format_datetime( 

1466 env: Environment, 

1467 value: datetime.datetime | str, 

1468 tz: str | typing.Literal[False] = False, 

1469 dt_format: str = 'medium', 

1470 lang_code: str | None = None, 

1471) -> str: 

1472 """ Formats the datetime in a given format. 

1473 

1474 :param env: 

1475 :param str|datetime value: naive datetime to format either in string or in datetime 

1476 :param str tz: name of the timezone in which the given datetime should be localized 

1477 :param str dt_format: one of “full”, “long”, “medium”, or “short”, or a custom date/time pattern compatible with `babel` lib 

1478 :param str lang_code: ISO code of the language to use to render the given datetime 

1479 :rtype: str 

1480 """ 

1481 if not value: 1481 ↛ 1482line 1481 didn't jump to line 1482 because the condition on line 1481 was never true

1482 return '' 

1483 if isinstance(value, str): 1483 ↛ 1484line 1483 didn't jump to line 1484 because the condition on line 1483 was never true

1484 from odoo.fields import Datetime # noqa: PLC0415 

1485 timestamp = Datetime.from_string(value) 

1486 else: 

1487 timestamp = value 

1488 

1489 tz_name = tz or env.user.tz or 'UTC' 

1490 utc_datetime = pytz.utc.localize(timestamp, is_dst=False) 

1491 try: 

1492 context_tz = pytz.timezone(tz_name) 

1493 localized_datetime = utc_datetime.astimezone(context_tz) 

1494 except Exception: 

1495 localized_datetime = utc_datetime 

1496 

1497 lang = get_lang(env, lang_code) 

1498 

1499 locale = babel_locale_parse(lang.code or lang_code) # lang can be inactive, so `lang`is empty 

1500 if not dt_format or dt_format == 'medium': 

1501 date_format = posix_to_ldml(lang.date_format, locale=locale) 

1502 time_format = posix_to_ldml(lang.time_format, locale=locale) 

1503 dt_format = '%s %s' % (date_format, time_format) 

1504 

1505 # Babel allows to format datetime in a specific language without change locale 

1506 # So month 1 = January in English, and janvier in French 

1507 # Be aware that the default value for format is 'medium', instead of 'short' 

1508 # medium: Jan 5, 2016, 10:20:31 PM | 5 janv. 2016 22:20:31 

1509 # short: 1/5/16, 10:20 PM | 5/01/16 22:20 

1510 # Formatting available here : http://babel.pocoo.org/en/latest/dates.html#date-fields 

1511 return babel.dates.format_datetime(localized_datetime, dt_format, locale=locale) 

1512 

1513 

1514def format_time( 

1515 env: Environment, 

1516 value: datetime.time | datetime.datetime | str, 

1517 tz: str | typing.Literal[False] = False, 

1518 time_format: str = 'medium', 

1519 lang_code: str | None = None, 

1520) -> str: 

1521 """ Format the given time (hour, minute and second) with the current user preference (language, format, ...) 

1522 

1523 :param env: 

1524 :param value: the time to format 

1525 :type value: `datetime.time` instance. Could be timezoned to display tzinfo according to format (e.i.: 'full' format) 

1526 :param tz: name of the timezone in which the given datetime should be localized 

1527 :param time_format: one of “full”, “long”, “medium”, or “short”, or a custom time pattern 

1528 :param lang_code: ISO 

1529 

1530 :rtype str 

1531 """ 

1532 if not value: 1532 ↛ 1533line 1532 didn't jump to line 1533 because the condition on line 1532 was never true

1533 return '' 

1534 

1535 if isinstance(value, datetime.time): 1535 ↛ 1536line 1535 didn't jump to line 1536 because the condition on line 1535 was never true

1536 localized_time = value 

1537 else: 

1538 if isinstance(value, str): 1538 ↛ 1539line 1538 didn't jump to line 1539 because the condition on line 1538 was never true

1539 from odoo.fields import Datetime # noqa: PLC0415 

1540 value = Datetime.from_string(value) 

1541 assert isinstance(value, datetime.datetime) 

1542 tz_name = tz or env.user.tz or 'UTC' 

1543 utc_datetime = pytz.utc.localize(value, is_dst=False) 

1544 try: 

1545 context_tz = pytz.timezone(tz_name) 

1546 localized_time = utc_datetime.astimezone(context_tz).timetz() 

1547 except Exception: 

1548 localized_time = utc_datetime.timetz() 

1549 

1550 lang = get_lang(env, lang_code) 

1551 locale = babel_locale_parse(lang.code) 

1552 if not time_format or time_format == 'medium': 1552 ↛ 1553line 1552 didn't jump to line 1553 because the condition on line 1552 was never true

1553 time_format = posix_to_ldml(lang.time_format, locale=locale) 

1554 

1555 return babel.dates.format_time(localized_time, format=time_format, locale=locale) 

1556 

1557 

1558def _format_time_ago( 

1559 env: Environment, 

1560 time_delta: datetime.timedelta, 

1561 lang_code: str | None = None, 

1562 add_direction: bool = True, 

1563) -> str: 

1564 if not lang_code: 

1565 langs: list[str] = [code for code, _ in env['res.lang'].get_installed()] 

1566 if (ctx_lang := env.context.get('lang')) in langs: 

1567 lang_code = ctx_lang 

1568 else: 

1569 lang_code = env.user.company_id.partner_id.lang or langs[0] 

1570 assert isinstance(lang_code, str) 

1571 locale = babel_locale_parse(lang_code) 

1572 return babel.dates.format_timedelta(-time_delta, add_direction=add_direction, locale=locale) 

1573 

1574 

1575def format_decimalized_number(number: float, decimal: int = 1) -> str: 

1576 """Format a number to display to nearest metrics unit next to it. 

1577 

1578 Do not display digits if all visible digits are null. 

1579 Do not display units higher then "Tera" because most people don't know what 

1580 a "Yotta" is. 

1581 

1582 :: 

1583 

1584 >>> format_decimalized_number(123_456.789) 

1585 123.5k 

1586 >>> format_decimalized_number(123_000.789) 

1587 123k 

1588 >>> format_decimalized_number(-123_456.789) 

1589 -123.5k 

1590 >>> format_decimalized_number(0.789) 

1591 0.8 

1592 """ 

1593 for unit in ['', 'k', 'M', 'G']: 

1594 if abs(number) < 1000.0: 

1595 return "%g%s" % (round(number, decimal), unit) 

1596 number /= 1000.0 

1597 return "%g%s" % (round(number, decimal), 'T') 

1598 

1599 

1600def format_decimalized_amount(amount: float, currency=None) -> str: 

1601 """Format an amount to display the currency and also display the metric unit 

1602 of the amount. 

1603 

1604 :: 

1605 

1606 >>> format_decimalized_amount(123_456.789, env.ref("base.USD")) 

1607 $123.5k 

1608 """ 

1609 formated_amount = format_decimalized_number(amount) 

1610 

1611 if not currency: 

1612 return formated_amount 

1613 

1614 if currency.position == 'before': 

1615 return "%s%s" % (currency.symbol or '', formated_amount) 

1616 

1617 return "%s %s" % (formated_amount, currency.symbol or '') 

1618 

1619 

1620def format_amount(env: Environment, amount: float, currency, lang_code: str | None = None, trailing_zeroes: bool = True) -> str: 

1621 fmt = "%.{0}f".format(currency.decimal_places) 

1622 lang = env['res.lang'].browse(get_lang(env, lang_code).id) 

1623 

1624 formatted_amount = lang.format(fmt, currency.round(amount), grouping=True)\ 

1625 .replace(r' ', u'\N{NO-BREAK SPACE}').replace(r'-', u'-\N{ZERO WIDTH NO-BREAK SPACE}') 

1626 

1627 if not trailing_zeroes: 

1628 formatted_amount = re.sub(fr'{re.escape(lang.decimal_point)}?0+$', '', formatted_amount) 

1629 

1630 pre = post = u'' 

1631 if currency.position == 'before': 1631 ↛ 1634line 1631 didn't jump to line 1634 because the condition on line 1631 was always true

1632 pre = u'{symbol}\N{NO-BREAK SPACE}'.format(symbol=currency.symbol or '') 

1633 else: 

1634 post = u'\N{NO-BREAK SPACE}{symbol}'.format(symbol=currency.symbol or '') 

1635 

1636 return u'{pre}{0}{post}'.format(formatted_amount, pre=pre, post=post) 

1637 

1638 

1639def format_duration(value: float) -> str: 

1640 """ Format a float: used to display integral or fractional values as 

1641 human-readable time spans (e.g. 1.5 as "01:30"). 

1642 """ 

1643 hours, minutes = divmod(abs(value) * 60, 60) 

1644 minutes = round(minutes) 

1645 if minutes == 60: 1645 ↛ 1646line 1645 didn't jump to line 1646 because the condition on line 1645 was never true

1646 minutes = 0 

1647 hours += 1 

1648 if value < 0: 1648 ↛ 1649line 1648 didn't jump to line 1649 because the condition on line 1648 was never true

1649 return '-%02d:%02d' % (hours, minutes) 

1650 return '%02d:%02d' % (hours, minutes) 

1651 

1652 

1653consteq = hmac_lib.compare_digest 

1654 

1655 

1656class ReadonlyDict(Mapping[K, T], typing.Generic[K, T]): 

1657 """Helper for an unmodifiable dictionary, not even updatable using `dict.update`. 

1658 

1659 This is similar to a `frozendict`, with one drawback and one advantage: 

1660 

1661 - `dict.update` works for a `frozendict` but not for a `ReadonlyDict`. 

1662 - `json.dumps` works for a `frozendict` by default but not for a `ReadonlyDict`. 

1663 

1664 This comes from the fact `frozendict` inherits from `dict` 

1665 while `ReadonlyDict` inherits from `collections.abc.Mapping`. 

1666 

1667 So, depending on your needs, 

1668 whether you absolutely must prevent the dictionary from being updated (e.g., for security reasons) 

1669 or you require it to be supported by `json.dumps`, you can choose either option. 

1670 

1671 E.g. 

1672 data = ReadonlyDict({'foo': 'bar'}) 

1673 data['baz'] = 'xyz' # raises exception 

1674 data.update({'baz', 'xyz'}) # raises exception 

1675 dict.update(data, {'baz': 'xyz'}) # raises exception 

1676 """ 

1677 __slots__ = ('_data__',) 

1678 

1679 def __init__(self, data): 

1680 self._data__ = dict(data) 

1681 

1682 def __contains__(self, key: K): 

1683 return key in self._data__ 

1684 

1685 def __getitem__(self, key: K) -> T: 

1686 return self._data__[key] 

1687 

1688 def __len__(self): 

1689 return len(self._data__) 

1690 

1691 def __iter__(self): 

1692 return iter(self._data__) 

1693 

1694 

1695class DotDict(dict): 

1696 """Helper for dot.notation access to dictionary attributes 

1697 

1698 E.g. 

1699 foo = DotDict({'bar': False}) 

1700 return foo.bar 

1701 """ 

1702 def __getattr__(self, attrib): 

1703 val = self.get(attrib) 

1704 return DotDict(val) if isinstance(val, dict) else val 

1705 

1706 

1707def get_diff(data_from, data_to, custom_style=False, dark_color_scheme=False): 

1708 """ 

1709 Return, in an HTML table, the diff between two texts. 

1710 

1711 :param tuple data_from: tuple(text, name), name will be used as table header 

1712 :param tuple data_to: tuple(text, name), name will be used as table header 

1713 :param tuple custom_style: string, style css including <style> tag. 

1714 :param bool dark_color_scheme: true if dark color scheme is used 

1715 :return: a string containing the diff in an HTML table format. 

1716 """ 

1717 def handle_style(html_diff, custom_style, dark_color_scheme): 

1718 """ The HtmlDiff lib will add some useful classes on the DOM to 

1719 identify elements. Simply append to those classes some BS4 ones. 

1720 For the table to fit the modal width, some custom style is needed. 

1721 """ 

1722 to_append = { 

1723 'diff_header': 'bg-600 text-light text-center align-top px-2', 

1724 'diff_next': 'd-none', 

1725 } 

1726 for old, new in to_append.items(): 

1727 html_diff = html_diff.replace(old, "%s %s" % (old, new)) 

1728 html_diff = html_diff.replace('nowrap', '') 

1729 colors = ('#7f2d2f', '#406a2d', '#51232f', '#3f483b') if dark_color_scheme else ( 

1730 '#ffc1c0', '#abf2bc', '#ffebe9', '#e6ffec') 

1731 html_diff += custom_style or ''' 

1732 <style> 

1733 .modal-dialog.modal-lg:has(table.diff) { 

1734 max-width: 1600px; 

1735 padding-left: 1.75rem; 

1736 padding-right: 1.75rem; 

1737 } 

1738 table.diff { width: 100%%; } 

1739 table.diff th.diff_header { width: 50%%; } 

1740 table.diff td.diff_header { white-space: nowrap; } 

1741 table.diff td.diff_header + td { width: 50%%; } 

1742 table.diff td { word-break: break-all; vertical-align: top; } 

1743 table.diff .diff_chg, table.diff .diff_sub, table.diff .diff_add { 

1744 display: inline-block; 

1745 color: inherit; 

1746 } 

1747 table.diff .diff_sub, table.diff td:nth-child(3) > .diff_chg { background-color: %s } 

1748 table.diff .diff_add, table.diff td:nth-child(6) > .diff_chg { background-color: %s } 

1749 table.diff td:nth-child(3):has(>.diff_chg, .diff_sub) { background-color: %s } 

1750 table.diff td:nth-child(6):has(>.diff_chg, .diff_add) { background-color: %s } 

1751 </style> 

1752 ''' % colors 

1753 return html_diff 

1754 

1755 diff = HtmlDiff(tabsize=2).make_table( 

1756 data_from[0].splitlines(), 

1757 data_to[0].splitlines(), 

1758 data_from[1], 

1759 data_to[1], 

1760 context=True, # Show only diff lines, not all the code 

1761 numlines=3, 

1762 ) 

1763 return handle_style(diff, custom_style, dark_color_scheme) 

1764 

1765 

1766def hmac(env, scope, message, hash_function=hashlib.sha256): 

1767 """Compute HMAC with `database.secret` config parameter as key. 

1768 

1769 :param env: sudo environment to use for retrieving config parameter 

1770 :param message: message to authenticate 

1771 :param scope: scope of the authentication, to have different signature for the same 

1772 message in different usage 

1773 :param hash_function: hash function to use for HMAC (default: SHA-256) 

1774 """ 

1775 if not scope: 1775 ↛ 1776line 1775 didn't jump to line 1776 because the condition on line 1775 was never true

1776 raise ValueError('Non-empty scope required') 

1777 

1778 secret = env['ir.config_parameter'].get_param('database.secret') 

1779 message = repr((scope, message)) 

1780 return hmac_lib.new( 

1781 secret.encode(), 

1782 message.encode(), 

1783 hash_function, 

1784 ).hexdigest() 

1785 

1786 

1787def hash_sign(env, scope, message_values, expiration=None, expiration_hours=None): 

1788 """ Generate an urlsafe payload signed with the HMAC signature for an iterable set of data. 

1789 This feature is very similar to JWT, but in a more generic implementation that is inline with out previous hmac implementation. 

1790 

1791 :param env: sudo environment to use for retrieving config parameter 

1792 :param scope: scope of the authentication, to have different signature for the same 

1793 message in different usage 

1794 :param message_values: values to be encoded inside the payload 

1795 :param expiration: optional, a datetime or timedelta 

1796 :param expiration_hours: optional, a int representing a number of hours before expiration. Cannot be set at the same time as expiration 

1797 :return: the payload that can be used as a token 

1798 """ 

1799 assert not (expiration and expiration_hours) 

1800 assert message_values is not None 

1801 

1802 if expiration_hours: 1802 ↛ 1805line 1802 didn't jump to line 1805 because the condition on line 1802 was always true

1803 expiration = datetime.datetime.now() + datetime.timedelta(hours=expiration_hours) 

1804 else: 

1805 if isinstance(expiration, datetime.timedelta): 

1806 expiration = datetime.datetime.now() + expiration 

1807 expiration_timestamp = 0 if not expiration else int(expiration.timestamp()) 

1808 message_strings = json.dumps(message_values) 

1809 hash_value = hmac(env, scope, f'1:{message_strings}:{expiration_timestamp}', hash_function=hashlib.sha256) 

1810 token = b"\x01" + expiration_timestamp.to_bytes(8, 'little') + bytes.fromhex(hash_value) + message_strings.encode() 

1811 return base64.urlsafe_b64encode(token).decode().rstrip('=') 

1812 

1813 

1814def verify_hash_signed(env, scope, payload): 

1815 """ Verify and extract data from a given urlsafe payload generated with hash_sign() 

1816 

1817 :param env: sudo environment to use for retrieving config parameter 

1818 :param scope: scope of the authentication, to have different signature for the same 

1819 message in different usage 

1820 :param payload: the token to verify 

1821 :return: The payload_values if the check was successful, None otherwise. 

1822 """ 

1823 

1824 token = base64.urlsafe_b64decode(payload.encode()+b'===') 

1825 version = token[:1] 

1826 if version != b'\x01': 

1827 raise ValueError('Unknown token version') 

1828 

1829 expiration_value, hash_value, message = token[1:9], token[9:41].hex(), token[41:].decode() 

1830 expiration_value = int.from_bytes(expiration_value, byteorder='little') 

1831 hash_value_expected = hmac(env, scope, f'1:{message}:{expiration_value}', hash_function=hashlib.sha256) 

1832 

1833 if consteq(hash_value, hash_value_expected) and (expiration_value == 0 or datetime.datetime.now().timestamp() < expiration_value): 

1834 message_values = json.loads(message) 

1835 return message_values 

1836 return None 

1837 

1838 

1839def limited_field_access_token(record, field_name, timestamp=None, *, scope): 

1840 """Generate a token granting access to the given record and field_name in 

1841 the given scope. 

1842 

1843 The validitiy of the token is determined by the timestamp parameter. 

1844 When it is not specified, a timestamp is automatically generated with a 

1845 validity of at least 14 days. For a given record and field_name, the 

1846 generated timestamp is deterministic within a 14-day period (even across 

1847 different days/months/years) to allow browser caching, and expires after 

1848 maximum 42 days to prevent infinite access. Different record/field 

1849 combinations expire at different times to prevent thundering herd problems. 

1850 

1851 :param record: the record to generate the token for 

1852 :type record: class:`odoo.models.Model` 

1853 :param field_name: the field name of record to generate the token for 

1854 :type field_name: str 

1855 :param scope: scope of the authentication, to have different signature for the same 

1856 record/field in different usage 

1857 :type scope: str 

1858 :param timestamp: expiration timestamp of the token, or None to generate one 

1859 :type timestamp: int, optional 

1860 :return: the token, which includes the timestamp in hex format 

1861 :rtype: string 

1862 """ 

1863 record.ensure_one() 

1864 if not timestamp: 1864 ↛ 1871line 1864 didn't jump to line 1871 because the condition on line 1864 was always true

1865 unique_str = repr((record._name, record.id, field_name)) 

1866 two_weeks = 1209600 # 2 * 7 * 24 * 60 * 60 

1867 start_of_period = int(time.time()) // two_weeks * two_weeks 

1868 adler32_max = 4294967295 

1869 jitter = two_weeks * zlib.adler32(unique_str.encode()) // adler32_max 

1870 timestamp = hex(start_of_period + 2 * two_weeks + jitter) 

1871 token = hmac(record.env(su=True), scope, (record._name, record.id, field_name, timestamp)) 

1872 return f"{token}o{timestamp}" 

1873 

1874 

1875def verify_limited_field_access_token(record, field_name, access_token, *, scope): 

1876 """Verify the given access_token grants access to field_name of record. 

1877 In particular, the token must have the right format, must be valid for the 

1878 given record, and must not have expired. 

1879 

1880 :param record: the record to verify the token for 

1881 :type record: class:`odoo.models.Model` 

1882 :param field_name: the field name of record to verify the token for 

1883 :type field_name: str 

1884 :param access_token: the access token to verify 

1885 :type access_token: str 

1886 :param scope: scope of the authentication, to have different signature for the same 

1887 record/field in different usage 

1888 :return: whether the token is valid for the record/field_name combination at 

1889 the current date and time 

1890 :rtype: bool 

1891 """ 

1892 *_, timestamp = access_token.rsplit("o", 1) 

1893 return consteq( 

1894 access_token, limited_field_access_token(record, field_name, timestamp, scope=scope) 

1895 ) and datetime.datetime.now() < datetime.datetime.fromtimestamp(int(timestamp, 16)) 

1896 

1897 

1898ADDRESS_REGEX = re.compile(r'^(.*?)(\s[0-9][0-9\S]*)?(?: - (.+))?$', flags=re.DOTALL) 

1899def street_split(street): 

1900 match = ADDRESS_REGEX.match(street or '') 

1901 results = match.groups('') if match else ('', '', '') 

1902 return { 

1903 'street_name': results[0].strip(), 

1904 'street_number': results[1].strip(), 

1905 'street_number2': results[2], 

1906 } 

1907 

1908 

1909def is_list_of(values, type_: type) -> bool: 

1910 """Return True if the given values is a list / tuple of the given type. 

1911 

1912 :param values: The values to check 

1913 :param type_: The type of the elements in the list / tuple 

1914 """ 

1915 return isinstance(values, (list, tuple)) and all(isinstance(item, type_) for item in values) 

1916 

1917 

1918def has_list_types(values, types: tuple[type, ...]) -> bool: 

1919 """Return True if the given values have the same types as 

1920 the one given in argument, in the same order. 

1921 

1922 :param values: The values to check 

1923 :param types: The types of the elements in the list / tuple 

1924 """ 

1925 return ( 

1926 isinstance(values, (list, tuple)) and len(values) == len(types) 

1927 and all(itertools.starmap(isinstance, zip(values, types))) 

1928 ) 

1929 

1930 

1931def get_flag(country_code: str) -> str: 

1932 """Get the emoji representing the flag linked to the country code. 

1933 

1934 This emoji is composed of the two regional indicator emoji of the country code. 

1935 """ 

1936 return "".join(chr(int(f"1f1{ord(c)+165:02x}", base=16)) for c in country_code) 

1937 

1938 

1939def format_frame(frame) -> str: 

1940 code = frame.f_code 

1941 return f'{code.co_name} {code.co_filename}:{frame.f_lineno}' 

1942 

1943 

1944def named_to_positional_printf(string: str, args: Mapping) -> tuple[str, tuple]: 

1945 """ Convert a named printf-style format string with its arguments to an 

1946 equivalent positional format string with its arguments. 

1947 """ 

1948 pargs = _PrintfArgs(args) 

1949 return string.replace('%%', '%%%%') % pargs, tuple(pargs.values) 

1950 

1951 

1952class _PrintfArgs: 

1953 """ Helper object to turn a named printf-style format string into a positional one. """ 

1954 __slots__ = ('mapping', 'values') 

1955 

1956 def __init__(self, mapping): 

1957 self.mapping: Mapping = mapping 

1958 self.values: list = [] 

1959 

1960 def __getitem__(self, key): 

1961 self.values.append(self.mapping[key]) 

1962 return "%s"