Coverage for adhoc-cicd-odoo-odoo / odoo / tests / common.py: 27%

1587 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-09 18:05 +0000

1# -*- coding: utf-8 -*- 

2""" 

3The module :mod:`odoo.tests.common` provides unittest test cases and a few 

4helpers and classes to write tests. 

5 

6""" 

7from __future__ import annotations 

8 

9import base64 

10import binascii 

11import concurrent.futures 

12import contextlib 

13import difflib 

14import importlib 

15import inspect 

16import itertools 

17import json 

18import logging 

19import os 

20import pathlib 

21import platform 

22import pprint 

23import psutil 

24import re 

25import shutil 

26import signal 

27import subprocess 

28import sys 

29import tempfile 

30import threading 

31import time 

32import traceback 

33import unittest 

34import warnings 

35from collections import defaultdict, deque 

36from concurrent.futures import CancelledError, Future, InvalidStateError, wait 

37from contextlib import contextmanager, ExitStack 

38from copy import deepcopy 

39from datetime import datetime 

40from functools import lru_cache, partial, wraps 

41from itertools import islice, zip_longest 

42from textwrap import shorten 

43from typing import Optional, Iterable, cast 

44from unittest import TestResult 

45from unittest.mock import patch, _patch, Mock 

46from urllib.parse import parse_qsl, urlencode, urljoin, urlsplit, urlunsplit 

47from xmlrpc import client as xmlrpclib 

48from uuid import uuid4 

49from werkzeug.exceptions import BadRequest 

50 

51import freezegun 

52import requests 

53from lxml import etree, html 

54from passlib.context import CryptContext 

55from requests import PreparedRequest, Session 

56 

57import odoo.addons.base 

58import odoo.cli 

59import odoo.http 

60import odoo.models 

61import odoo.orm.registry 

62from odoo import api 

63from odoo.exceptions import AccessError 

64from odoo.fields import Command 

65from odoo.modules.registry import Registry, DummyRLock 

66from odoo.service import security 

67from odoo.sql_db import Cursor, Savepoint 

68from odoo.tools import config, float_compare, mute_logger, profiler, SQL, DotDict 

69from odoo.tools.mail import single_email_re 

70from odoo.tools.misc import find_in_path, lower_logging 

71from odoo.tools.xml_utils import _validate_xml 

72from odoo.addons.base.models import ir_actions_report 

73 

74from . import case, test_cursor 

75from .result import OdooTestResult 

76 

77try: 

78 import websocket 

79except ImportError: 

80 # chrome headless tests will be skipped 

81 websocket = None 

82 

83_logger = logging.getLogger(__name__) 

84if odoo.cli.COMMAND in ('server', 'start') and not config['test_enable']: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 _logger.error( 

86 "Importing test framework" 

87 ", avoid importing from business modules and when not running in test mode", 

88 stack_info=True, 

89 ) 

90else: 

91 _logger.info("Importing test framework", stack_info=_logger.isEnabledFor(logging.DEBUG)) 

92 

93 

94# backward compatibility: Form was defined in this file 

95def __getattr__(name): 

96 # pylint: disable=import-outside-toplevel 

97 if name != 'Form': 97 ↛ 100line 97 didn't jump to line 100 because the condition on line 97 was always true

98 raise AttributeError(name) 

99 

100 from .form import Form 

101 

102 warnings.warn( 

103 "Since 18.0: odoo.tests.common.Form is deprecated, use odoo.tests.Form", 

104 category=DeprecationWarning, 

105 stacklevel=2, 

106 ) 

107 return Form 

108 

109 

110# The odoo library is supposed already configured. 

111HOST = '127.0.0.1' 

112# Useless constant, tests are aware of the content of demo data 

113ADMIN_USER_ID = api.SUPERUSER_ID 

114 

115CHECK_BROWSER_SLEEP = 0.1 # seconds 

116CHECK_BROWSER_ITERATIONS = 100 

117BROWSER_WAIT = CHECK_BROWSER_SLEEP * CHECK_BROWSER_ITERATIONS # seconds 

118DEFAULT_SUCCESS_SIGNAL = 'test successful' 

119TEST_CURSOR_COOKIE_NAME = 'test_request_key' 

120 

121IGNORED_MSGS = re.compile(r""" 

122 failed\ to\ fetch # base error 

123 | connectionlosterror: # conversion by offlineFailToFetchErrorHandler 

124 | assetsloadingerror: # lazy loaded bundle 

125""", flags=re.VERBOSE | re.IGNORECASE).search 

126 

127def get_db_name(): 

128 dbnames = odoo.tools.config['db_name'] 

129 # If the database name is not provided on the command-line, 

130 # use the one on the thread (which means if it is provided on 

131 # the command-line, this will break when installing another 

132 # database from XML-RPC). 

133 if not dbnames and hasattr(threading.current_thread(), 'dbname'): 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 return threading.current_thread().dbname 

135 if len(dbnames) > 1: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 sys.exit("-d/--database/db_name has multiple database, please provide a single one") 

137 return dbnames[0] 

138 

139 

140standalone_tests = defaultdict(list) 

141 

142 

143class RegistryRLock(threading._RLock): 

144 @property 

145 def count(self): 

146 return self._count # Expose private attribute 

147 

148 

149# The lock should only be released when new test cursors are meant to be opened. 

150# Further filtering on cursors can be done by extending `assertCanOpenTestCursor`. 

151_registry_test_lock = RegistryRLock() 

152_registry_test_lock.acquire() 

153 

154 

155@contextmanager 

156def release_test_lock(): 

157 """ Releases the test lock in a context manager, the lock is acquired once the context is over.""" 

158 try: 

159 _registry_test_lock.release() 

160 yield 

161 finally: 

162 if not _registry_test_lock.acquire(timeout=60): 

163 tag = odoo.modules.module.current_test.canonical_tag 

164 exit(f'Could not re-acquire the registry lock during {tag}, exiting...') 

165 

166 

167def standalone(*tags): 

168 """ Decorator for standalone test functions. This is somewhat dedicated to 

169 tests that install, upgrade or uninstall some modules, which is currently 

170 forbidden in regular test cases. The function is registered under the given 

171 ``tags`` and the corresponding Odoo module name. 

172 """ 

173 def register(func): 

174 # register func by odoo module name 

175 if func.__module__.startswith('odoo.addons.'): 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true

176 module = func.__module__.split('.')[2] 

177 standalone_tests[module].append(func) 

178 # register func with aribitrary name, if any 

179 for tag in tags: 

180 standalone_tests[tag].append(func) 

181 standalone_tests['all'].append(func) 

182 return func 

183 

184 return register 

185 

186 

187def test_xsd(url=None, path=None, skip=False): 

188 def decorator(func): 

189 def wrapped_f(self, *args, **kwargs): 

190 if not skip: 

191 xmls = func(self, *args, **kwargs) 

192 _validate_xml(self.env, url, path, xmls) 

193 return wrapped_f 

194 return decorator 

195 

196 

197def new_test_user(env, login='', groups='base.group_user', context=None, **kwargs): 

198 """ Helper function to create a new test user. It allows to quickly create 

199 users given its login and groups (being a comma separated list of xml ids). 

200 Kwargs are directly propagated to the create to further customize the 

201 created user. 

202 

203 User creation uses a potentially customized environment using the context 

204 parameter allowing to specify a custom context. It can be used to force a 

205 specific behavior and/or simplify record creation. An example is to use 

206 mail-related context keys in mail tests to speedup record creation. 

207 

208 Some specific fields are automatically filled to avoid issues 

209 

210 * group_ids: it is filled using groups function parameter; 

211 * name: "login (groups)" by default as it is required; 

212 * email: it is either the login (if it is a valid email) or a generated 

213 string 'x.x@example.com' (x being the first login letter). This is due 

214 to email being required for most odoo operations; 

215 """ 

216 if not login: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 raise ValueError('New users require at least a login') 

218 if not groups: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 raise ValueError('New users require at least user groups') 

220 if context is None: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 context = {} 

222 

223 group_ids = [Command.set(kwargs.pop('group_ids', False) or [env.ref(g.strip()).id for g in groups.split(',')])] 

224 create_values = dict(kwargs, login=login, group_ids=group_ids) 

225 # automatically generate a name as "Login (groups)" to ease user comprehension 

226 if not create_values.get('name'): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 create_values['name'] = '%s (%s)' % (login, groups) 

228 # automatically give a password equal to login 

229 if not create_values.get('password'): 229 ↛ 232line 229 didn't jump to line 232 because the condition on line 229 was always true

230 create_values['password'] = login + 'x' * (8 - len(login)) 

231 # generate email if not given as most test require an email 

232 if 'email' not in create_values: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 if single_email_re.match(login): 

234 create_values['email'] = login 

235 else: 

236 create_values['email'] = '%s.%s@example.com' % (login[0], login[0]) 

237 # ensure company_id + allowed company constraint works if not given at create 

238 if 'company_id' in create_values and 'company_ids' not in create_values: 238 ↛ 241line 238 didn't jump to line 241 because the condition on line 238 was always true

239 create_values['company_ids'] = [(4, create_values['company_id'])] 

240 

241 return env['res.users'].with_context(**context).create(create_values) 

242 

243def loaded_demo_data(env): 

244 return bool(env.ref('base.user_demo', raise_if_not_found=False)) 

245 

246class RecordCapturer: 

247 def __init__(self, model, domain=None): 

248 self._model = model 

249 self._domain = domain or [] 

250 

251 def __enter__(self): 

252 self._before = self._model.search(self._domain, order='id') 

253 self._after = None 

254 return self 

255 

256 def __exit__(self, exc_type, exc_value, exc_traceback): 

257 if exc_type is None: 

258 self._after = self._model.search(self._domain, order='id') - self._before 

259 

260 @property 

261 def records(self): 

262 if self._after is None: 

263 return self._model.search(self._domain, order='id') - self._before 

264 return self._after 

265 

266 

267def _enter_context(cm, addcleanup): 

268 # We look up the special methods on the type to match the with 

269 # statement. 

270 cls = type(cm) 

271 try: 

272 enter = cls.__enter__ 

273 exit = cls.__exit__ 

274 except AttributeError: 

275 raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does " 

276 f"not support the context manager protocol") from None 

277 result = enter(cm) 

278 addcleanup(exit, cm, None, None, None) 

279 return result 

280 

281 

282def _normalize_arch_for_assert(arch_string, parser_method="xml"): 

283 """Takes some xml and normalize it to make it comparable to other xml 

284 in particular, blank text is removed, and the output is pretty-printed 

285 

286 :param str arch_string: the string representing an XML arch 

287 :param str parser_method: an string representing which lxml.Parser class to use 

288 when normalizing both archs. Takes either "xml" or "html" 

289 :return: the normalized arch 

290 :rtype str: 

291 """ 

292 Parser = None 

293 if parser_method == 'xml': 

294 Parser = etree.XMLParser 

295 elif parser_method == 'html': 

296 Parser = etree.HTMLParser 

297 parser = Parser(remove_blank_text=True) 

298 arch_string = etree.fromstring(arch_string, parser=parser) 

299 return etree.tostring(arch_string, pretty_print=True, encoding='unicode') 

300 

301class BlockedRequest(requests.exceptions.ConnectionError): 

302 pass 

303_super_send = requests.Session.send 

304class BaseCase(case.TestCase): 

305 """ Subclass of TestCase for Odoo-specific code. This class is abstract and 

306 expects self.registry, self.cr and self.uid to be initialized by subclasses. 

307 """ 

308 registry: Registry = None 

309 env: api.Environment = None 

310 cr: Cursor = None 

311 def __init_subclass__(cls): 

312 """Assigns default test tags ``standard`` and ``at_install`` to test 

313 cases not having them. Also sets a completely unnecessary 

314 ``test_module`` attribute. 

315 """ 

316 super().__init_subclass__() 

317 if cls.__module__.startswith('odoo.addons.'): 

318 if getattr(cls, 'test_tags', None) is None: 

319 cls.test_tags = {'standard', 'at_install'} 

320 cls.test_module = cls.__module__.split('.')[2] 

321 

322 longMessage = True # more verbose error message by default: https://www.odoo.com/r/Vmh 

323 warm = True # False during warm-up phase (see :func:`warmup`) 

324 _python_version = sys.version_info 

325 

326 _tests_run_count = int(os.environ.get('ODOO_TEST_FAILURE_RETRIES', 0)) + 1 

327 

328 _registry_patched = False 

329 _registry_readonly_enabled = True 

330 test_cursor_lock_timeout: int = 20 

331 

332 def __init__(self, methodName='runTest'): 

333 super().__init__(methodName) 

334 self.addTypeEqualityFunc(etree._Element, self.assertTreesEqual) 

335 self.addTypeEqualityFunc(html.HtmlElement, self.assertTreesEqual) 

336 if methodName != 'runTest': 336 ↛ exitline 336 didn't return from function '__init__' because the condition on line 336 was always true

337 self.test_tags = self.test_tags | set(self.get_method_additional_tags(getattr(self, methodName))) 

338 

339 

340 @classmethod 

341 def _request_handler(cls, s: Session, r: PreparedRequest, /, **kw): 

342 # allow localhost requests 

343 # TODO: also check port? 

344 url = urlsplit(r.url) 

345 timeout = kw.get('timeout') 

346 if timeout and timeout < 10: 

347 _logger.getChild('requests').info('request %s with timeout %s increased to 10s during tests', url, timeout) 

348 kw['timeout'] = 10 

349 if url.hostname in (HOST, 'localhost'): 

350 return _super_send(s, r, **kw) 

351 if url.scheme == 'file': 

352 return _super_send(s, r, **kw) 

353 

354 _logger.getChild('requests').info( 

355 "Blocking un-mocked external HTTP request %s %s", r.method, r.url) 

356 raise BlockedRequest(f"External requests verboten (was {r.method} {r.url})") 

357 

358 def run(self, result: OdooTestResult) -> None: 

359 testMethod = getattr(self, self._testMethodName) 

360 

361 if getattr(testMethod, '_retry', True) and getattr(self, '_retry', True): 361 ↛ 364line 361 didn't jump to line 364 because the condition on line 361 was always true

362 tests_run_count = self._tests_run_count 

363 else: 

364 tests_run_count = 1 

365 _logger.info('Auto retry disabled for %s', self) 

366 

367 for retry in range(tests_run_count): 

368 result.had_failure = False # reset in case of retry without soft_fail 

369 if retry: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 _logger.runbot(f'Retrying a failed test: {self}') 

371 if retry < tests_run_count-1: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 with warnings.catch_warnings(), \ 

373 result.soft_fail(), \ 

374 lower_logging(25, logging.INFO) as quiet_log: 

375 super().run(cast(TestResult, result)) 

376 if not (result.had_failure or quiet_log.had_error_log): 

377 break 

378 else: # last try 

379 super().run(cast(TestResult, result)) 

380 if not result.wasSuccessful() and BaseCase._tests_run_count != 1: 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true

381 _logger.runbot('Disabling auto-retry after a failed test') 

382 BaseCase._tests_run_count = 1 

383 

384 @classmethod 

385 def setUpClass(cls): 

386 def check_remaining_processes(): 

387 current_process = psutil.Process() 

388 children = current_process.children(recursive=False) 

389 for child in children: 389 ↛ 390line 389 didn't jump to line 390 because the loop on line 389 never started

390 _logger.warning('A child process was found, terminating it: %s', child) 

391 child.terminate() 

392 psutil.wait_procs(children, timeout=10) # mainly to avoid a zombie process that would be logged again at the end. 

393 cls.addClassCleanup(check_remaining_processes) 

394 

395 def check_remaining_patchers(): 

396 for patcher in _patch._active_patches: 396 ↛ 397line 396 didn't jump to line 397 because the loop on line 396 never started

397 _logger.warning("A patcher (targeting %s.%s) was remaining active at the end of %s, disabling it...", patcher.target, patcher.attribute, cls.__name__) 

398 patcher.stop() 

399 cls.addClassCleanup(check_remaining_patchers) 

400 super().setUpClass() 

401 if 'standard' in cls.test_tags or 'click_all' in cls.test_tags: 401 ↛ exitline 401 didn't return from function 'setUpClass' because the condition on line 401 was always true

402 # if the method is passed directly `patch` discards the session 

403 # object which we need 

404 # pylint: disable=unnecessary-lambda 

405 patcher = patch.object( 

406 requests.sessions.Session, 

407 'send', 

408 lambda s, r, **kwargs: cls._request_handler(s, r, **kwargs), 

409 ) 

410 patcher.start() 

411 cls.addClassCleanup(patcher.stop) 

412 

413 def setUp(self): 

414 super().setUp() 

415 self.http_request_key: str = '' 

416 self.http_request_allow_all: bool = False 

417 

418 def cursor(self): 

419 return self.registry.cursor() 

420 

421 @property 

422 def uid(self): 

423 """ Get the current uid. """ 

424 return self.env.uid 

425 

426 @uid.setter 

427 def uid(self, user): 

428 """ Set the uid by changing the test's environment. """ 

429 self.env = self.env(user=user) 

430 # set the updated environment as the default one 

431 self.env.transaction.default_env = self.env 

432 

433 def ref(self, xid): 

434 """ Returns database ID for the provided :term:`external identifier`, 

435 shortcut for ``_xmlid_lookup`` 

436 

437 :param xid: fully-qualified :term:`external identifier`, in the form 

438 :samp:`{module}.{identifier}` 

439 :raise: ValueError if not found 

440 :returns: registered id 

441 """ 

442 return self.browse_ref(xid).id 

443 

444 def browse_ref(self, xid): 

445 """ Returns a record object for the provided 

446 :term:`external identifier` 

447 

448 :param xid: fully-qualified :term:`external identifier`, in the form 

449 :samp:`{module}.{identifier}` 

450 :raise: ValueError if not found 

451 :returns: :class:`~odoo.models.BaseModel` 

452 """ 

453 assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'" 

454 return self.env.ref(xid) 

455 

456 def patch(self, obj, key, val): 

457 """ Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """ 

458 patcher = patch.object(obj, key, val) # this is unittest.mock.patch 

459 patcher.start() 

460 self.addCleanup(patcher.stop) 

461 

462 @classmethod 

463 def classPatch(cls, obj, key, val): 

464 """ Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """ 

465 patcher = patch.object(obj, key, val) # this is unittest.mock.patch 

466 patcher.start() 

467 cls.addClassCleanup(patcher.stop) 

468 

469 def startPatcher(self, patcher): 

470 mock = patcher.start() 

471 self.addCleanup(patcher.stop) 

472 return mock 

473 

474 @classmethod 

475 def startClassPatcher(cls, patcher): 

476 mock = patcher.start() 

477 cls.addClassCleanup(patcher.stop) 

478 return mock 

479 

480 def enterContext(self, cm): 

481 """Enters the supplied context manager. 

482 

483 If successful, also adds its __exit__ method as a cleanup 

484 function and returns the result of the __enter__ method. 

485 """ 

486 return _enter_context(cm, self.addCleanup) 

487 

488 @classmethod 

489 def enterClassContext(cls, cm): 

490 """Same as enterContext, but class-wide.""" 

491 return _enter_context(cm, cls.addClassCleanup) 

492 

493 @contextmanager 

494 def with_user(self, login): 

495 """ Change user for a given test, like with self.with_user() ... """ 

496 old_uid = self.uid 

497 old_env = self.env 

498 try: 

499 user = self.env['res.users'].sudo().search([('login', '=', login)]) 

500 assert user, "Login %s not found" % login 

501 # switch user 

502 self.uid = user.id 

503 self.env = self.env(user=self.uid) 

504 yield 

505 finally: 

506 # back 

507 self.uid = old_uid 

508 self.env = old_env 

509 

510 @contextmanager 

511 def debug_mode(self): 

512 """ Enable the effects of debug mode (in particular for group ``base.group_no_one``). """ 

513 request = Mock( 

514 httprequest=Mock(host='localhost'), 

515 db=self.env.cr.dbname, 

516 env=self.env, 

517 session=DotDict(odoo.http.get_default_session(), debug='1'), 

518 ) 

519 try: 

520 self.env.flush_all() 

521 self.env.invalidate_all() 

522 odoo.http._request_stack.push(request) 

523 yield 

524 self.env.flush_all() 

525 self.env.invalidate_all() 

526 finally: 

527 popped_request = odoo.http._request_stack.pop() 

528 if popped_request is not request: 

529 raise Exception('Wrong request stack cleanup.') 

530 

531 @contextmanager 

532 def _assertRaises(self, exception, *, msg=None): 

533 """ Context manager that clears the environment upon failure. """ 

534 with ExitStack() as init: 

535 if self.env: 535 ↛ 544line 535 didn't jump to line 544 because the condition on line 535 was always true

536 init.enter_context(self.env.cr.savepoint()) 

537 if issubclass(exception, AccessError): 537 ↛ 542line 537 didn't jump to line 542 because the condition on line 537 was never true

538 # The savepoint() above calls flush(), which leaves the 

539 # record cache with lots of data. This can prevent 

540 # access errors to be detected. In order to avoid this 

541 # issue, we clear the cache before proceeding. 

542 self.env.cr.clear() 

543 

544 with ExitStack() as inner: 

545 cm = inner.enter_context(super().assertRaises(exception, msg=msg)) 

546 # *moves* the cleanups from init to inner, this ensures the 

547 # savepoint gets rolled back when `yield` raises `exception`, 

548 # but still allows the initialisation to be protected *and* not 

549 # interfered with by `assertRaises`. 

550 inner.push(init.pop_all()) 

551 

552 yield cm 

553 

554 def assertRaises(self, exception, func=None, *args, **kwargs): 

555 if func: 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true

556 with self._assertRaises(exception): 

557 func(*args, **kwargs) 

558 else: 

559 return self._assertRaises(exception, **kwargs) 

560 

561 def _patchExecute(self, actual_queries, flush=True): 

562 Cursor_execute = Cursor.execute 

563 

564 def execute(self, query, params=None, log_exceptions=None): 

565 actual_queries.append(query.code if isinstance(query, SQL) else query) 

566 return Cursor_execute(self, query, params, log_exceptions) 

567 

568 if flush: 

569 self.env.flush_all() 

570 self.env.cr.flush() 

571 

572 with ( 

573 patch('odoo.sql_db.Cursor.execute', execute), 

574 patch.object(self.env.registry, 'unaccent', lambda x: x), 

575 ): 

576 yield actual_queries 

577 if flush: 

578 self.env.flush_all() 

579 self.env.cr.flush() 

580 

581 @contextmanager 

582 def assertQueries(self, expected, flush=True): 

583 """ Check the queries made by the current cursor. ``expected`` is a list 

584 of strings representing the expected queries being made. Query strings 

585 are matched against each other, ignoring case and whitespaces. 

586 """ 

587 actual_queries = [] 

588 

589 yield from self._patchExecute(actual_queries, flush) 

590 

591 if not self.warm: 

592 return 

593 

594 self.assertEqual( 

595 len(actual_queries), len(expected), 

596 "\n---- actual queries:\n%s\n---- expected queries:\n%s" % ( 

597 "\n".join(actual_queries), "\n".join(expected), 

598 ) 

599 ) 

600 for actual_query, expect_query in zip(actual_queries, expected): 

601 self.assertEqual( 

602 "".join(actual_query.lower().split()), 

603 "".join(expect_query.lower().split()), 

604 "\n---- actual query:\n%s\n---- not like:\n%s" % (actual_query, expect_query), 

605 ) 

606 

607 @contextmanager 

608 def assertQueriesContain(self, expected, flush=True): 

609 """ Check the queries made by the current cursor. ``expected`` is a list 

610 of strings representing the expected queries being made. Query strings 

611 are matched against each other, ignoring case and whitespaces. 

612 """ 

613 actual_queries = [] 

614 

615 yield from self._patchExecute(actual_queries, flush) 

616 

617 if not self.warm: 

618 return 

619 

620 self.assertEqual( 

621 len(actual_queries), len(expected), 

622 "\n---- actual queries:\n%s\n---- expected queries:\n%s" % ( 

623 "\n".join(actual_queries), "\n".join(expected), 

624 ) 

625 ) 

626 for actual_query, expect_query in zip(actual_queries, expected): 

627 self.assertIn( 

628 "".join(expect_query.lower().split()), 

629 "".join(actual_query.lower().split()), 

630 "\n---- actual query:\n%s\n---- doesn't contain:\n%s" % (actual_query, expect_query), 

631 ) 

632 

633 @contextmanager 

634 def assertQueryCount(self, default=0, flush=True, **counters): 

635 """ Context manager that counts queries. It may be invoked either with 

636 one value, or with a set of named arguments like ``login=value``:: 

637 

638 with self.assertQueryCount(42): 

639 ... 

640 

641 with self.assertQueryCount(admin=3, demo=5): 

642 ... 

643 

644 The second form is convenient when used with :func:`users`. 

645 """ 

646 if self.warm: 

647 # mock random in order to avoid random bus gc 

648 with patch('random.random', lambda: 1): 

649 login = self.env.user.login 

650 expected = counters.get(login, default) 

651 if flush: 

652 self.env.flush_all() 

653 self.env.cr.flush() 

654 count0 = self.cr.sql_log_count 

655 yield 

656 if flush: 

657 self.env.flush_all() 

658 self.env.cr.flush() 

659 count = self.cr.sql_log_count - count0 

660 if count != expected: 

661 # add some info on caller to allow semi-automatic update of query count 

662 _frame, filename, linenum, funcname, _lines, _index = inspect.stack()[2] 

663 filename = filename.replace('\\', '/') 

664 if "/odoo/addons/" in filename: 

665 filename = filename.rsplit("/odoo/addons/", 1)[1] 

666 if count > expected: 

667 msg = "Query count more than expected for user %s: %d > %d in %s at %s:%s" 

668 # add a subtest in order to continue the test_method in case of failures 

669 with self.subTest(): 

670 self.fail(msg % (login, count, expected, funcname, filename, linenum)) 

671 else: 

672 logger = logging.getLogger(type(self).__module__) 

673 msg = "Query count less than expected for user %s: %d < %d in %s at %s:%s" 

674 logger.info(msg, login, count, expected, funcname, filename, linenum) 

675 else: 

676 # flush before and after during warmup, in order to reproduce the 

677 # same operations, otherwise the caches might not be ready! 

678 if flush: 

679 self.env.flush_all() 

680 self.env.cr.flush() 

681 yield 

682 if flush: 

683 self.env.flush_all() 

684 self.env.cr.flush() 

685 

686 def assertRecordValues( 

687 self, 

688 records: odoo.models.BaseModel, 

689 expected_values: list[dict], 

690 *, 

691 field_names: Optional[Iterable[str]] = None, 

692 ) -> None: 

693 ''' Compare a recordset with a list of dictionaries representing the expected results. 

694 This method performs a comparison element by element based on their index. 

695 Then, the order of the expected values is extremely important. 

696 

697 .. note:: 

698 

699 - ``None`` expected values can be used for empty fields. 

700 - x2many fields are expected by ids (so the expected value should be 

701 a ``list[int]`` 

702 - many2one fields are expected by id (so the expected value should 

703 be an ``int`` 

704 

705 :param records: The records to compare. 

706 :param expected_values: Items to check the ``records`` against. 

707 :param field_names: list of fields to check during comparison, if 

708 unspecified all expected_values must have the same 

709 keys and all are checked 

710 ''' 

711 if not field_names: 

712 field_names = expected_values[0].keys() 

713 for i, v in enumerate(expected_values): 

714 self.assertEqual( 

715 v.keys(), field_names, 

716 f"All expected values must have the same keys, found differences between records 0 and {i}", 

717 ) 

718 

719 expected_reformatted = [] 

720 for vs in expected_values: 

721 r = {} 

722 for f in field_names: 

723 t = records._fields[f].type 

724 if t in ('one2many', 'many2many'): 

725 r[f] = sorted(vs[f]) 

726 elif t == 'float': 

727 r[f] = float(vs[f]) 

728 elif t == 'integer': 

729 r[f] = int(vs[f]) 

730 elif vs[f] is None: 

731 r[f] = False 

732 else: 

733 r[f] = vs[f] 

734 expected_reformatted.append(r) 

735 

736 record_reformatted = [] 

737 for record in records: 

738 r = {} 

739 for field_name in field_names: 

740 record_value = record[field_name] 

741 match record._fields[field_name]: 

742 case odoo.fields.Many2one(): 

743 record_value = record_value.id 

744 case odoo.fields.One2many() | odoo.fields.Many2many(): 

745 record_value = sorted(record_value.ids) 

746 case odoo.fields.Float() as field if digits := field.get_digits(record.env): 

747 record_value = Approx(record_value, digits[1], decorate=False) 

748 case odoo.fields.Monetary() as field if currency_field_name := field.get_currency_field(record): 

749 # don't round if there's no currency set 

750 if c := record[currency_field_name]: 

751 record_value = Approx(record_value, c, decorate=False) 

752 

753 r[field_name] = record_value 

754 record_reformatted.append(r) 

755 

756 try: 

757 self.assertSequenceEqual(expected_reformatted, record_reformatted, seq_type=list) 

758 return 

759 except AssertionError as e: 

760 standardMsg, _, diffMsg = str(e).rpartition('\n') 

761 if 'self.maxDiff' not in diffMsg: 

762 raise 

763 # move out of handler to avoid exception chaining 

764 

765 diffMsg = "".join(difflib.unified_diff( 

766 pprint.pformat(expected_reformatted).splitlines(keepends=True), 

767 pprint.pformat(record_reformatted).splitlines(keepends=True), 

768 fromfile="expected", tofile="records", 

769 )) 

770 self.fail(self._formatMessage(None, standardMsg + '\n' + diffMsg)) 

771 

772 # turns out this thing may not be quite as useful as we thought... 

773 def assertItemsEqual(self, a, b, msg=None): 

774 self.assertCountEqual(a, b, msg=None) 

775 

776 def assertTreesEqual(self, n1, n2, msg=None): 

777 self.assertIsNotNone(n1, msg) 

778 self.assertIsNotNone(n2, msg) 

779 self.assertEqual(n1.tag, n2.tag, msg) 

780 # Because lxml.attrib is an ordereddict for which order is important 

781 # to equality, even though *we* don't care 

782 self.assertEqual(dict(n1.attrib), dict(n2.attrib), msg) 

783 self.assertEqual((n1.text or u'').strip(), (n2.text or u'').strip(), msg) 

784 self.assertEqual((n1.tail or u'').strip(), (n2.tail or u'').strip(), msg) 

785 

786 for c1, c2 in zip_longest(n1, n2): 

787 self.assertTreesEqual(c1, c2, msg) 

788 

789 def _assertXMLEqual(self, original, expected, parser="xml"): 

790 """Asserts that two xmls archs are equal 

791 

792 :param original: the xml arch to test 

793 :type original: str 

794 :param expected: the xml arch of reference 

795 :type expected: str 

796 :param parser: an string representing which lxml.Parser class to use 

797 when normalizing both archs. Takes either "xml" or "html" 

798 :type parser: str 

799 """ 

800 self.maxDiff = 10000 

801 if original: 

802 original = _normalize_arch_for_assert(original, parser) 

803 if expected: 

804 expected = _normalize_arch_for_assert(expected, parser) 

805 self.assertEqual(original, expected) 

806 

807 def assertXMLEqual(self, original, expected): 

808 return self._assertXMLEqual(original, expected) 

809 

810 def assertHTMLEqual(self, original, expected): 

811 return self._assertXMLEqual(original, expected, 'html') 

812 

813 def profile(self, description='', **kwargs): 

814 test_method = getattr(self, '_testMethodName', 'Unknown test method') 

815 if not hasattr(self, 'profile_session'): 

816 self.profile_session = profiler.make_session(test_method) 

817 if 'db' not in kwargs: 

818 kwargs['db'] = self.env.cr.dbname 

819 return profiler.Profiler( 

820 description='%s uid:%s %s %s' % (test_method, self.env.user.id, 'warm' if self.warm else 'cold', description), 

821 profile_session=self.profile_session, 

822 **kwargs) 

823 

824 @classmethod 

825 def _registry_test_mode_patches(cls, *, cr: Cursor, registry: Registry): 

826 """ 

827 Returns the patches required for entering registry test mode. 

828 The patches are not started. 

829 """ 

830 def _patched_cursor(readonly: bool = False): 

831 return test_cursor.TestCursor( 

832 cr, _registry_test_lock, readonly and cls._registry_readonly_enabled 

833 ) 

834 return [ 

835 # New cursor should point to the test's cursor 

836 patch.object(registry, 'cursor', _patched_cursor), 

837 # Disable locking and signaling 

838 patch.object(Registry, '_lock', DummyRLock()), 

839 patch.object(registry, 'setup_signaling', return_value=None), #noop 

840 patch.object(registry, 'check_signaling', return_value=registry), 

841 ] 

842 

843 @classmethod 

844 def registry_enter_test_mode_cls(cls): 

845 """ 

846 Puts the registry in test mode. 

847 

848 New cursors returned by the registry will be instances of `TestCursor` 

849 which will wrap the current cursor. 

850 """ 

851 assert not cls._registry_patched, 'Can only patch registry once' 

852 assert cls.cr, 'No cursor' 

853 assert cls.registry, 'No registry' 

854 

855 cls.registry_patches = cls._registry_test_mode_patches( 

856 cr=cls.cr, registry=cls.registry, 

857 ) 

858 for p in cls.registry_patches: 

859 p.start() 

860 cls._registry_patched = True 

861 cls.addClassCleanup(cls.registry_leave_test_mode) 

862 

863 def registry_enter_test_mode(self, *, cr: Cursor | None = None, register_cleanup: bool = True) -> None: 

864 """ 

865 Puts the registry in test mode. 

866 

867 New cursors returned by the registry will be instances of `TestCursor` 

868 which will wrap the current cursor. 

869 

870 :param cr: the cursor to wrap (defaults to the current cursor if none) 

871 :param register_cleanup: whether to register cleanup. 

872 """ 

873 assert not type(self)._registry_patched, 'Can only patch registry once' 

874 assert cr or self.cr, 'No cursor' 

875 assert self.registry, 'No registry' 

876 

877 type(self).registry_patches = self._registry_test_mode_patches( 

878 cr=cr or self.cr, registry=self.registry, 

879 ) 

880 for p in self.registry_patches: 

881 p.start() 

882 type(self)._registry_patched = True 

883 if register_cleanup: 

884 self.addCleanup(self.registry_leave_test_mode) 

885 

886 @classmethod 

887 def registry_leave_test_mode(cls): 

888 assert cls._registry_patched, 'Registry is not patched' 

889 

890 for p in cls.registry_patches: 

891 p.stop() 

892 cls.registry_patches.clear() 

893 cls._registry_patched = False 

894 

895 @classmethod 

896 def set_registry_readonly_mode(cls, enabled: bool): 

897 assert cls._registry_patched, 'Registry is not patched' 

898 

899 cls._registry_readonly_enabled = enabled 

900 

901 def assertCanOpenTestCursor(self): 

902 """ Asserts that we can currently open a test cursor. """ 

903 if odoo.modules.module.current_test != self: 

904 message = f"Trying to open a test cursor for {self.canonical_tag} while already in a test {odoo.modules.module.current_test.canonical_tag}" 

905 _logger.runbot(message) 

906 raise BadRequest(message) 

907 request = odoo.http.request 

908 if not request or self.http_request_allow_all: 

909 return 

910 http_request_required_key = self.http_request_key 

911 http_request_key = request.cookies.get(TEST_CURSOR_COOKIE_NAME) 

912 if http_request_key != http_request_required_key: 

913 expected = http_request_required_key 

914 if not expected: 

915 expected = 'None (request are not enabled)' 

916 _logger.runbot( 

917 'Request with path %s has been ignored during test as it ' 

918 'it does not contain the test_cursor cookie or it is expired.' 

919 ' (required "%s", got "%s")', 

920 request.httprequest.path, expected, http_request_key 

921 ) 

922 raise BadRequest( 

923 'Request ignored during test as it does not contain the required cookie.' 

924 ) 

925 

926 def get_method_additional_tags(self, test_method): 

927 """Guess if the test_methods is a query_count and adds an `is_query_count` tag on the test 

928 """ 

929 additional_tags = [] 

930 if odoo.tools.config['test_tags'] and 'is_query_count' in odoo.tools.config['test_tags']: 930 ↛ 931line 930 didn't jump to line 931 because the condition on line 930 was never true

931 method_source = inspect.getsource(test_method) if test_method else '' 

932 if 'self.assertQueryCount' in method_source: 

933 additional_tags.append('is_query_count') 

934 return additional_tags 

935 

936class Like: 

937 """ 

938 A string-like object comparable to other strings but where the substring 

939 '...' can match anything in the other string. 

940 

941 Example of usage: 

942 

943 self.assertEqual("SELECT field1, field2, field3 FROM model", Like('SELECT ... FROM model')) 

944 self.assertIn(Like('Company ... (SF)'), ['TestPartner', 'Company 8 (SF)', 'SomeAdress']) 

945 self.assertEqual([ 

946 'TestPartner', 

947 'Company 8 (SF)', 

948 'Anything else' 

949 ], [ 

950 'TestPartner', 

951 Like('Company ... (SF)'), 

952 Like('...'), 

953 ]) 

954 

955 In case of mismatch, here is an example of error message 

956 

957 AssertionError: Lists differ: ['TestPartner', 'Company 8 (LA)', 'Anything else'] != ['TestPartner', ~Company ... (SF), ~...] 

958 

959 First differing element 1: 

960 'Company 8 (LA)' 

961 ~Company ... (SF)~ 

962 

963 - ['TestPartner', 'Company 8 (LA)', 'Anything else'] 

964 + ['TestPartner', ~Company ... (SF), ~...] 

965 

966 

967 """ 

968 def __init__(self, pattern): 

969 self.pattern = pattern 

970 self.regex = '.*'.join([re.escape(part.strip()) for part in self.pattern.split('...')]) 

971 

972 def __eq__(self, other): 

973 return re.fullmatch(self.regex, other.strip(), re.DOTALL) 

974 

975 def __repr__(self): 

976 return repr(self.pattern) 

977 

978 

979class WhitespaceInsensitive(str): 

980 __slots__ = () 

981 

982 def __hash__(self): 

983 return hash(re.sub(r'\s+', ' ', self)) 

984 

985 def __eq__(self, other): 

986 if not isinstance(other, str): 

987 return NotImplemented 

988 return re.sub(r'\s+', ' ', self) == re.sub(r'\s+', ' ', other) 

989 

990 

991class Approx: # noqa: PLW1641 

992 """A wrapper for approximate float comparisons. Uses float_compare under 

993 the hood. 

994 

995 Most of the time, :meth:`TestCase.assertAlmostEqual` is more useful, but it 

996 doesn't work for all helpers. 

997 """ 

998 def __init__(self, value: float, rounding: int | float | odoo.addons.base.models.res_currency.ResCurrency, /, decorate: bool) -> None: # noqa: PYI041 

999 self.value = value 

1000 self.decorate = decorate 

1001 if isinstance(rounding, int): 

1002 self.cmp = partial(float_compare, precision_digits=rounding) 

1003 elif isinstance(rounding, float): 

1004 self.cmp = partial(float_compare, precision_rounding=rounding) 

1005 else: 

1006 self.cmp = rounding.compare_amounts 

1007 

1008 def __repr__(self) -> str: 

1009 if self.decorate: 

1010 return f"~{self.value!r}" 

1011 return repr(self.value) 

1012 

1013 def __eq__(self, other: object) -> bool | NotImplemented: 

1014 if not isinstance(other, (float, int)): 

1015 return NotImplemented 

1016 return self.cmp(self.value, other) == 0 

1017 

1018 

1019 

1020class TransactionCase(BaseCase): 

1021 """ Test class in which all test methods are run in a single transaction, 

1022 but each test method is run in a sub-transaction managed by a savepoint. 

1023 The transaction's cursor is always closed without committing. 

1024 

1025 The data setup common to all methods should be done in the class method 

1026 `setUpClass`, so that it is done once for all test methods. This is useful 

1027 for test cases containing fast tests but with significant database setup 

1028 common to all cases (complex in-db test data). 

1029 

1030 After being run, each test method cleans up the record cache and the 

1031 registry cache. However, there is no cleanup of the registry models and 

1032 fields. If a test modifies the registry (custom models and/or fields), it 

1033 should prepare the necessary cleanup (`self.registry.reset_changes()`). 

1034 """ 

1035 muted_registry_logger = mute_logger(odoo.orm.registry._logger.name) 

1036 freeze_time = None 

1037 

1038 @classmethod 

1039 def _gc_filestore(cls): 

1040 # attachment can be created or unlink during the tests. 

1041 # they can addup during test and take some disc space. 

1042 # since cron are not running during tests, we need to gc manually 

1043 # We need to check the status of the file system outside of the test cursor 

1044 with Registry(get_db_name()).cursor() as cr: 

1045 gc_env = api.Environment(cr, api.SUPERUSER_ID, {}) 

1046 gc_env['ir.attachment']._gc_file_store_unsafe() 

1047 

1048 @classmethod 

1049 def setUpClass(cls): 

1050 super().setUpClass() 

1051 cls.addClassCleanup(cls._gc_filestore) 

1052 cls.registry = Registry(get_db_name()) 

1053 cls.registry_start_invalidated = cls.registry.registry_invalidated 

1054 cls.registry_start_sequence = cls.registry.registry_sequence 

1055 cls.registry_cache_sequences = dict(cls.registry.cache_sequences) 

1056 

1057 def reset_changes(): 

1058 if (cls.registry_start_sequence != cls.registry.registry_sequence) or cls.registry.registry_invalidated: 1058 ↛ 1061line 1058 didn't jump to line 1061 because the condition on line 1058 was always true

1059 with cls.registry.cursor() as cr: 

1060 cls.registry._setup_models__(cr) 

1061 cls.registry.registry_invalidated = cls.registry_start_invalidated 

1062 cls.registry.registry_sequence = cls.registry_start_sequence 

1063 with cls.muted_registry_logger: 

1064 cls.registry.clear_all_caches() 

1065 cls.registry.cache_invalidated.clear() 

1066 cls.registry.cache_sequences = cls.registry_cache_sequences 

1067 cls.addClassCleanup(reset_changes) 

1068 

1069 def signal_changes(): 

1070 if not cls.registry.ready: 

1071 _logger.info('Skipping signal changes during tests') 

1072 return 

1073 if cls.registry.registry_invalidated or cls.registry.cache_invalidated: 

1074 _logger.info('Simulating signal changes during tests') 

1075 if cls.registry.registry_invalidated: 

1076 cls.registry.registry_sequence += 1 

1077 for cache_name in cls.registry.cache_invalidated or (): 

1078 cls.registry.cache_sequences[cache_name] += 1 

1079 cls.registry.registry_invalidated = False 

1080 cls.registry.cache_invalidated.clear() 

1081 

1082 cls._signal_changes_patcher = patch.object(cls.registry, 'signal_changes', signal_changes) 

1083 cls.startClassPatcher(cls._signal_changes_patcher) 

1084 

1085 cls.cr = cls.registry.cursor() 

1086 cls.addClassCleanup(cast(Cursor, cls.cr).close) 

1087 

1088 def check_cursor_stack(): 

1089 for cursor in test_cursor.TestCursor._cursors_stack: 1089 ↛ 1090line 1089 didn't jump to line 1090 because the loop on line 1089 never started

1090 _logger.info('One curor was remaining in the TestCursor stack at the end of the test') 

1091 cursor._closed = True 

1092 test_cursor.TestCursor._cursors_stack = [] 

1093 

1094 cls.addClassCleanup(check_cursor_stack) 

1095 

1096 if cls.freeze_time: 1096 ↛ 1097line 1096 didn't jump to line 1097 because the condition on line 1096 was never true

1097 cls.startClassPatcher(cls.freeze_time) 

1098 

1099 def forbidden(*args, **kwars): 

1100 traceback.print_stack() 

1101 raise AssertionError('Cannot commit or rollback a cursor from inside a test, this will lead to a broken cursor when trying to rollback the test. Please rollback to a specific savepoint instead or open another cursor if really necessary') 

1102 

1103 cls.commit_patcher = patch.object(cls.cr, 'commit', forbidden) 

1104 cls.startClassPatcher(cls.commit_patcher) 

1105 cls.rollback_patcher = patch.object(cls.cr, 'rollback', forbidden) 

1106 cls.startClassPatcher(cls.rollback_patcher) 

1107 cls.close_patcher = patch.object(cls.cr, 'close', forbidden) 

1108 cls.startClassPatcher(cls.close_patcher) 

1109 

1110 cls.env = api.Environment(cls.cr, api.SUPERUSER_ID, {}) 

1111 

1112 # speedup CryptContext. Many user an password are done during tests, avoid spending time hasing password with many rounds 

1113 def _crypt_context(self): # noqa: ARG001 

1114 return CryptContext( 

1115 ['pbkdf2_sha512', 'plaintext'], 

1116 pbkdf2_sha512__rounds=1, 

1117 ) 

1118 cls._crypt_context_patcher = patch('odoo.addons.base.models.res_users.ResUsersPatchedInTest._crypt_context', _crypt_context) 

1119 cls.startClassPatcher(cls._crypt_context_patcher) 

1120 

1121 def setUp(self): 

1122 super().setUp() 

1123 

1124 def _check_registry_lock(): 

1125 if _registry_test_lock.count == 0: 1125 ↛ 1126line 1125 didn't jump to line 1126 because the condition on line 1125 was never true

1126 _logger.warning('The registry test lock is still released at the end of %s', self.canonical_tag) 

1127 elif _registry_test_lock.count > 1: 1127 ↛ 1128line 1127 didn't jump to line 1128 because the condition on line 1127 was never true

1128 _logger.warning( 

1129 'The registry test lock was acquired more than once (%s) at the end of %s', 

1130 _registry_test_lock.count, self.canonical_tag, 

1131 ) 

1132 

1133 self.addCleanup(_check_registry_lock) 

1134 # restore environments after the test to avoid invoking flush() with an 

1135 # invalid environment (inexistent user id) from another test 

1136 envs = self.env.transaction.envs 

1137 for env in list(envs): 

1138 self.addCleanup(env.clear) 

1139 # restore the set of known environments as it was at setUp 

1140 self.addCleanup(envs.update, list(envs)) 

1141 self.addCleanup(envs.clear) 

1142 

1143 self.addCleanup(self.muted_registry_logger(self.registry.clear_all_caches)) 

1144 

1145 # This prevents precommit functions and data from piling up 

1146 # until cr.flush is called in 'assertRaises' clauses 

1147 # (these are not cleared in self.env.clear or envs.clear) 

1148 cr = self.env.cr 

1149 

1150 def _reset(cb, funcs, data): 

1151 cb._funcs = funcs 

1152 cb.data = data 

1153 for callback in [cr.precommit, cr.postcommit, cr.prerollback, cr.postrollback]: 

1154 self.addCleanup(_reset, callback, deque(callback._funcs), deepcopy(callback.data)) 

1155 

1156 # flush everything in setUpClass before introducing a savepoint 

1157 self.env.flush_all() 

1158 

1159 savepoint = Savepoint(self.cr) 

1160 self.addCleanup(savepoint.close) 

1161 

1162 @contextmanager 

1163 def enter_registry_test_mode(self): 

1164 """ 

1165 Make so that all new cursors opened on this database registry reuse the 

1166 one currenly used by the tests. See ``registry_enter_test_mode``. 

1167 """ 

1168 # entering the test mode should flush/invalidate all changes in the 

1169 # current environment because changes happen inside other cursors 

1170 env = self.env 

1171 env.flush_all() 

1172 self.registry_enter_test_mode(register_cleanup=False) 

1173 try: 

1174 yield 

1175 finally: 

1176 self.registry_leave_test_mode() 

1177 env.invalidate_all() 

1178 

1179 @contextmanager 

1180 def allow_pdf_render(self): 

1181 """ 

1182 Allows wkhtmltopdf to send requests to the backend. 

1183 Enters registry mode if necessary. 

1184 """ 

1185 with ExitStack() as stack: 

1186 if not type(self)._registry_patched: 

1187 stack.enter_context(self.enter_registry_test_mode()) 

1188 old_run_wkhtmltopdf = ir_actions_report._run_wkhtmltopdf 

1189 

1190 def _patched_run_wkhtmltopdf(args): 

1191 with patch.object(self, 'http_request_key', 'wkhtmltopdf'), release_test_lock(): 

1192 args = ['--cookie', TEST_CURSOR_COOKIE_NAME, 'wkhtmltopdf', *args] 

1193 return old_run_wkhtmltopdf(args) 

1194 

1195 stack.enter_context( 

1196 patch.object(ir_actions_report, '_run_wkhtmltopdf', _patched_run_wkhtmltopdf) 

1197 ) 

1198 yield 

1199 

1200 

1201class SingleTransactionCase(BaseCase): 

1202 """ TestCase in which all test methods are run in the same transaction, 

1203 the transaction is started with the first test method and rolled back at 

1204 the end of the last. 

1205 """ 

1206 @classmethod 

1207 def __init_subclass__(cls): 

1208 super().__init_subclass__() 

1209 if issubclass(cls, TransactionCase): 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true

1210 _logger.warning("%s inherits from both TransactionCase and SingleTransactionCase") 

1211 

1212 @classmethod 

1213 def setUpClass(cls): 

1214 super().setUpClass() 

1215 cls.registry = Registry(get_db_name()) 

1216 cls.addClassCleanup(cls.registry.reset_changes) 

1217 cls.addClassCleanup(cls.registry.clear_all_caches) 

1218 

1219 cls.cr = cls.registry.cursor() 

1220 cls.addClassCleanup(cast(Cursor, cls.cr).close) 

1221 

1222 cls.env = api.Environment(cls.cr, api.SUPERUSER_ID, {}) 

1223 

1224 def setUp(self): 

1225 super(SingleTransactionCase, self).setUp() 

1226 self.env.flush_all() 

1227 

1228 

1229class ChromeBrowserException(Exception): 

1230 pass 

1231 

1232def run(gen_func): 

1233 def done(f): 

1234 try: 

1235 try: 

1236 r = f.result() 

1237 except Exception as e: 

1238 f = coro.throw(e) 

1239 else: 

1240 f = coro.send(r) 

1241 except StopIteration: 

1242 return 

1243 

1244 assert isinstance(f, Future), f"coroutine must yield futures, got {f}" 

1245 f.add_done_callback(done) 

1246 

1247 coro = gen_func() 

1248 try: 

1249 next(coro).add_done_callback(done) 

1250 except StopIteration: 

1251 return 

1252 

1253def save_test_file(test_name, content, prefix, extension='png', logger=_logger, document_type='Screenshot', date_format="%Y%m%d_%H%M%S_%f"): 

1254 assert re.fullmatch(r'\w*_', prefix) 

1255 assert re.fullmatch(r'[a-z]+', extension) 

1256 assert re.fullmatch(r'\w+', test_name) 

1257 now = datetime.now().strftime(date_format) 

1258 screenshots_dir = pathlib.Path(odoo.tools.config['screenshots']) / get_db_name() / 'screenshots' 

1259 screenshots_dir.mkdir(parents=True, exist_ok=True) 

1260 full_path = screenshots_dir / f'{prefix}{now}_{test_name}.{extension}' 

1261 full_path.write_bytes(content) 

1262 logger.runbot(f'{document_type} in: {full_path}') 

1263 

1264 

1265if os.name == 'posix' and platform.system() != 'Darwin': 1265 ↛ 1273line 1265 didn't jump to line 1273 because the condition on line 1265 was always true

1266 # since the introduction of pointer compression in Chrome 80 (v8 v8.0), 

1267 # the memory reservation algorithm requires more than 8GiB of 

1268 # virtual mem for alignment this exceeds our default memory limits. 

1269 def _preexec(): 

1270 import resource # noqa: PLC0415 

1271 resource.setrlimit(resource.RLIMIT_AS, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)) 

1272else: 

1273 _preexec = None 

1274 

1275 

1276class ChromeBrowser: 

1277 """ Helper object to control a Chrome headless process. """ 

1278 remote_debugging_port = 0 # 9222, change it in a non-git-tracked file 

1279 

1280 def __init__(self, test_case: HttpCase, success_signal: str = DEFAULT_SUCCESS_SIGNAL, headless: bool = True, debug: bool = False): 

1281 self.throttling_factor = 1 

1282 self._logger = test_case._logger 

1283 self.test_case = test_case 

1284 self.success_signal = success_signal 

1285 if websocket is None: 

1286 self._logger.warning("websocket-client module is not installed") 

1287 raise unittest.SkipTest("websocket-client module is not installed") 

1288 self.user_data_dir = tempfile.mkdtemp(suffix='_chrome_odoo') 

1289 

1290 if scs := odoo.tools.config['screencasts']: 

1291 self.screencaster = Screencaster(self, scs) 

1292 else: 

1293 self.screencaster = NoScreencast() 

1294 

1295 if os.name == 'posix': 

1296 self.sigxcpu_handler = signal.getsignal(signal.SIGXCPU) 

1297 signal.signal(signal.SIGXCPU, self.signal_handler) 

1298 else: 

1299 self.sigxcpu_handler = None 

1300 

1301 test_case.browser_size = test_case.browser_size.replace('x', ',') 

1302 

1303 self.chrome, self.devtools_port = self._chrome_start( 

1304 user_data_dir=self.user_data_dir, 

1305 touch_enabled=test_case.touch_enabled, 

1306 headless=headless, 

1307 debug=debug, 

1308 ) 

1309 self.ws = self._open_websocket() 

1310 self._request_id = itertools.count() 

1311 self._result = Future() 

1312 self.error_checker = None 

1313 self.had_failure = False 

1314 # maps request_id to Futures 

1315 self._responses = {} 

1316 # maps frame ids to callbacks 

1317 self._frames = {} 

1318 self._handlers = { 

1319 'Fetch.requestPaused': self._handle_request_paused, 

1320 'Runtime.consoleAPICalled': self._handle_console, 

1321 'Runtime.exceptionThrown': self._handle_exception, 

1322 'Page.frameStoppedLoading': self._handle_frame_stopped_loading, 

1323 'Page.screencastFrame': self.screencaster, 

1324 } 

1325 self._receiver = threading.Thread( 

1326 target=self._receive, 

1327 name="WebSocket events consumer", 

1328 args=(get_db_name(),) 

1329 ) 

1330 self._receiver.start() 

1331 self._logger.info('Enable chrome headless console log notification') 

1332 self._websocket_send('Runtime.enable') 

1333 self._websocket_request('Fetch.enable') 

1334 self._logger.info('Chrome headless enable page notifications') 

1335 self._websocket_send('Page.enable') 

1336 self._websocket_send('Page.setDownloadBehavior', params={ 

1337 'behavior': 'deny', 

1338 'eventsEnabled': False, 

1339 }) 

1340 self._websocket_send('Emulation.setFocusEmulationEnabled', params={'enabled': True}) 

1341 emulated_device = { 

1342 'mobile': False, 

1343 'width': None, 

1344 'height': None, 

1345 'deviceScaleFactor': 1, 

1346 } 

1347 emulated_device['width'], emulated_device['height'] = [int(size) for size in test_case.browser_size.split(",")] 

1348 self._websocket_request('Emulation.setDeviceMetricsOverride', params=emulated_device) 

1349 

1350 def signal_handler(self, sig, frame): 

1351 if sig == signal.SIGXCPU: 

1352 _logger.info('CPU time limit reached, stopping Chrome and shutting down') 

1353 self.stop() 

1354 exit() 

1355 

1356 def throttle(self, factor: int | None) -> None: 

1357 if not factor: 

1358 return 

1359 

1360 assert 1 <= factor <= 50 # arbitrary upper limit 

1361 self.throttling_factor = factor 

1362 self._websocket_request('Emulation.setCPUThrottlingRate', params={'rate': factor}) 

1363 

1364 def stop(self): 

1365 # method may be called during `_open_websocket` 

1366 if hasattr(self, 'ws'): 

1367 try: 

1368 self.screencaster.stop() 

1369 

1370 self._websocket_request('Page.stopLoading') 

1371 self._websocket_request('Runtime.evaluate', params={'expression': """ 

1372 ('serviceWorker' in navigator) && 

1373 navigator.serviceWorker.getRegistrations().then( 

1374 registrations => Promise.all(registrations.map(r => r.unregister())) 

1375 ) 

1376 """, 'awaitPromise': True}) 

1377 # wait for the screenshot or whatever 

1378 wait(self._responses.values(), 10) 

1379 self._result.cancel() 

1380 

1381 self._logger.info("Closing chrome headless with pid %s", self.chrome.pid) 

1382 self._websocket_request('Browser.close') 

1383 except ChromeBrowserException as e: 

1384 _logger.runbot("WS error during browser shutdown: %s", e) 

1385 except Exception: # noqa: BLE001 

1386 _logger.warning("Error during browser shutdown", exc_info=True) 

1387 self._logger.info("Closing websocket connection") 

1388 self.ws.close() 

1389 

1390 self._logger.info("Terminating chrome headless with pid %s", self.chrome.pid) 

1391 self.chrome.terminate() 

1392 try: 

1393 self.chrome.wait(5) 

1394 except subprocess.TimeoutExpired: 

1395 self._logger.warning("Killing chrome headless with pid %s: still alive", self.chrome.pid) 

1396 self.chrome.kill() 

1397 

1398 self._logger.info('Removing chrome user profile "%s"', self.user_data_dir) 

1399 shutil.rmtree(self.user_data_dir, ignore_errors=True) 

1400 

1401 # Restore previous signal handler 

1402 if self.sigxcpu_handler: 

1403 signal.signal(signal.SIGXCPU, self.sigxcpu_handler) 

1404 

1405 @property 

1406 def executable(self): 

1407 try: 

1408 return _find_executable() 

1409 except Exception: 

1410 self._logger.warning('Chrome executable not found') 

1411 raise 

1412 

1413 def _spawn_chrome(self, cmd): 

1414 log_path = pathlib.Path(self.user_data_dir, 'err.log') 

1415 with log_path.open('wb') as log_file: 

1416 # pylint: disable=subprocess-popen-preexec-fn 

1417 proc = subprocess.Popen(cmd, stderr=log_file, preexec_fn=_preexec) # noqa: PLW1509 

1418 

1419 port_file = pathlib.Path(self.user_data_dir, 'DevToolsActivePort') 

1420 for _ in range(CHECK_BROWSER_ITERATIONS): 

1421 time.sleep(CHECK_BROWSER_SLEEP) 

1422 if port_file.is_file() and port_file.stat().st_size > 5: 

1423 with port_file.open('r', encoding='utf-8') as f: 

1424 return proc, int(f.readline()) 

1425 

1426 if proc.poll() is None: 

1427 proc.terminate() 

1428 try: 

1429 proc.wait(5) 

1430 except subprocess.TimeoutExpired: 

1431 proc.kill() 

1432 proc.wait() 

1433 self._logger.warning('Chrome headless failed to start:\n%s', log_path.read_text(encoding="utf-8")) 

1434 # since the chrome never started, it's not going to be `stop`-ed so we 

1435 # need to cleanup the directory here 

1436 shutil.rmtree(self.user_data_dir, ignore_errors=True) 

1437 

1438 raise unittest.SkipTest(f'Failed to detect chrome devtools port after {BROWSER_WAIT :.1f}s.') 

1439 

1440 def _chrome_start( 

1441 self, 

1442 user_data_dir: str, 

1443 touch_enabled: bool, 

1444 headless=True, 

1445 debug=False, 

1446 ): 

1447 headless_switches = { 

1448 '--headless': '', 

1449 '--disable-extensions': '', 

1450 '--disable-background-networking' : '', 

1451 '--disable-background-timer-throttling' : '', 

1452 '--disable-backgrounding-occluded-windows': '', 

1453 '--disable-renderer-backgrounding' : '', 

1454 '--disable-breakpad': '', 

1455 '--disable-client-side-phishing-detection': '', 

1456 '--disable-crash-reporter': '', 

1457 '--disable-dev-shm-usage': '', 

1458 '--disable-namespace-sandbox': '', 

1459 '--disable-translate': '', 

1460 '--no-sandbox': '', 

1461 '--disable-gpu': '', 

1462 '--enable-unsafe-swiftshader': '', 

1463 '--mute-audio': '', 

1464 } 

1465 switches = { 

1466 # required for tours that use Youtube autoplay conditions (namely website_slides' "course_tour") 

1467 '--autoplay-policy': 'no-user-gesture-required', 

1468 '--disable-default-apps': '', 

1469 '--disable-device-discovery-notifications': '', 

1470 '--no-default-browser-check': '', 

1471 '--remote-debugging-address': HOST, 

1472 '--remote-debugging-port': str(self.remote_debugging_port), 

1473 '--user-data-dir': user_data_dir, 

1474 '--no-first-run': '', 

1475 # FIXME: these next 2 flags are temporarily uncommented to allow client 

1476 # code to manually run garbage collection. This is done as currently 

1477 # the Chrome unit test process doesn't have access to its available 

1478 # memory, so it cannot run the GC efficiently and may run out of memory 

1479 # and crash. These should be re-commented when the process is correctly 

1480 # configured. 

1481 '--enable-precise-memory-info': '', 

1482 '--js-flags': '--expose-gc', 

1483 } 

1484 if headless: 

1485 switches.update(headless_switches) 

1486 if touch_enabled: 

1487 # enable Chrome's Touch mode, useful to detect touch capabilities using 

1488 # "'ontouchstart' in window" 

1489 switches['--touch-events'] = '' 

1490 if debug is not False: 

1491 switches['--auto-open-devtools-for-tabs'] = '' 

1492 switches['--start-fullscreen'] = '' 

1493 

1494 cmd = [self.executable] 

1495 cmd += ['%s=%s' % (k, v) if v else k for k, v in switches.items()] 

1496 url = 'about:blank' 

1497 cmd.append(url) 

1498 try: 

1499 proc, devtools_port = self._spawn_chrome(cmd) 

1500 except OSError: 

1501 raise unittest.SkipTest("%s not found" % cmd[0]) 

1502 self._logger.info('Chrome pid: %s', proc.pid) 

1503 self._logger.info('Chrome headless temporary user profile dir: %s', self.user_data_dir) 

1504 

1505 return proc, devtools_port 

1506 

1507 def _json_command(self, command, timeout=3): 

1508 """Queries browser state using JSON 

1509 

1510 Available commands: 

1511 

1512 ``''`` 

1513 return list of tabs with their id 

1514 ``list`` (or ``json/``) 

1515 list tabs 

1516 ``new`` 

1517 open a new tab 

1518 :samp:`activate/{id}` 

1519 activate a tab 

1520 :samp:`close/{id}` 

1521 close a tab 

1522 ``version`` 

1523 get chrome and dev tools version 

1524 ``protocol`` 

1525 get the full protocol 

1526 """ 

1527 url = f'http://{HOST}:{self.devtools_port}/json/{command}'.rstrip('/') 

1528 self._logger.info("Issuing json command %s", url) 

1529 delay = 0.1 

1530 tries = 0 

1531 failure_info = None 

1532 message = None 

1533 while timeout > 0: 

1534 if self.chrome.poll() is not None: 

1535 message = 'Chrome crashed at startup' 

1536 break 

1537 try: 

1538 r = requests.get(url, timeout=3) 

1539 if r.ok: 

1540 return r.json() 

1541 except requests.ConnectionError as e: 

1542 failure_info = str(e) 

1543 message = 'Connection Error while trying to connect to Chrome debugger' 

1544 except requests.exceptions.ReadTimeout as e: 

1545 failure_info = str(e) 

1546 message = 'Connection Timeout while trying to connect to Chrome debugger' 

1547 break 

1548 

1549 time.sleep(delay) 

1550 timeout -= delay 

1551 delay = delay * 1.5 

1552 tries += 1 

1553 self._logger.error("%s after %s tries" % (message, tries)) 

1554 if failure_info: 

1555 self._logger.info(failure_info) 

1556 self.stop() 

1557 raise unittest.SkipTest("Error during Chrome headless connection") 

1558 

1559 def _open_websocket(self): 

1560 version = self._json_command('version') 

1561 self._logger.info('Browser version: %s', version['Browser']) 

1562 

1563 start = time.time() 

1564 while (time.time() - start) < 5.0: 

1565 ws_url = next(( 

1566 target['webSocketDebuggerUrl'] 

1567 for target in self._json_command('') 

1568 if target['type'] == 'page' 

1569 if target['url'] == 'about:blank' 

1570 ), None) 

1571 if ws_url: 

1572 break 

1573 

1574 time.sleep(0.1) 

1575 else: 

1576 self.stop() 

1577 raise unittest.SkipTest("Error during Chrome connection: never found 'page' target") 

1578 

1579 self._logger.info('Websocket url found: %s', ws_url) 

1580 ws = websocket.create_connection(ws_url, enable_multithread=True, suppress_origin=True) 

1581 if ws.getstatus() != 101: 

1582 raise unittest.SkipTest("Cannot connect to chrome dev tools") 

1583 ws.settimeout(0.01) 

1584 return ws 

1585 

1586 def _receive(self, dbname): 

1587 threading.current_thread().dbname = dbname 

1588 # So CDT uses a streamed JSON-RPC structure, meaning a request is 

1589 # {id, method, params} and eventually a {id, result | error} should 

1590 # arrive the other way, however for events it uses "notifications" 

1591 # meaning request objects without an ``id``, but *coming from the server 

1592 while True: # or maybe until `self._result` is `done()`? 

1593 try: 

1594 msg = self.ws.recv() 

1595 if not msg: 

1596 continue 

1597 self._logger.debug('\n<- %s', msg) 

1598 except websocket.WebSocketTimeoutException: 

1599 continue 

1600 except websocket.WebSocketConnectionClosedException as e: 

1601 if not self._result.done(): 

1602 del self.ws 

1603 self._result.set_exception(e) 

1604 for f in self._responses.values(): 

1605 f.cancel() 

1606 return 

1607 except Exception as e: 

1608 if isinstance(e, ConnectionResetError) and self._result.done(): 

1609 return 

1610 # if the socket is still connected something bad happened, 

1611 # otherwise the client was just shut down 

1612 if self.ws.connected: 

1613 self._result.set_exception(e) 

1614 raise 

1615 self._result.cancel() 

1616 return 

1617 

1618 res = json.loads(msg) 

1619 request_id = res.get('id') 

1620 try: 

1621 if request_id is None: 

1622 if handler := self._handlers.get(res['method']): 

1623 handler(**res['params']) 

1624 elif f := self._responses.pop(request_id, None): 

1625 if 'result' in res: 

1626 f.set_result(res['result']) 

1627 else: 

1628 f.set_exception(ChromeBrowserException(res['error']['message'])) 

1629 except Exception: 

1630 _logger.exception( 

1631 "While processing message %s", 

1632 shorten(str(msg), 500, placeholder='...'), 

1633 ) 

1634 

1635 def _websocket_request(self, method, *, params=None, timeout=10.0): 

1636 assert threading.get_ident() != self._receiver.ident,\ 

1637 "_websocket_request must not be called from the consumer thread" 

1638 if not hasattr(self, 'ws'): 

1639 return None 

1640 

1641 f = self._websocket_send(method, params=params, with_future=True) 

1642 try: 

1643 return f.result(timeout=timeout * self.throttling_factor) 

1644 except concurrent.futures.TimeoutError: 

1645 raise TimeoutError(f'{method}({params or ""})') 

1646 

1647 def _websocket_send(self, method, *, params=None, with_future=False): 

1648 """send chrome devtools protocol commands through websocket 

1649 

1650 If ``with_future`` is set, returns a ``Future`` for the operation. 

1651 """ 

1652 if not hasattr(self, 'ws'): 

1653 return None 

1654 

1655 result = None 

1656 request_id = next(self._request_id) 

1657 if with_future: 

1658 result = self._responses[request_id] = Future() 

1659 payload = {'method': method, 'id': request_id} 

1660 if params: 

1661 payload['params'] = params 

1662 self._logger.debug('\n-> %s', payload) 

1663 self.ws.send(json.dumps(payload)) 

1664 return result 

1665 

1666 def _handle_request_paused(self, **params): 

1667 url = params['request']['url'] 

1668 if url.startswith(f'http://{HOST}'): 

1669 cmd = 'Fetch.continueRequest' 

1670 response = {} 

1671 else: 

1672 cmd = 'Fetch.fulfillRequest' 

1673 response = self.test_case.fetch_proxy(url) 

1674 try: 

1675 self._websocket_send(cmd, params={'requestId': params['requestId'], **response}) 

1676 except websocket.WebSocketConnectionClosedException: 

1677 pass 

1678 except (BrokenPipeError, ConnectionResetError, OSError): 

1679 # this can happen if the browser is closed. Just ignore it. 

1680 _logger.info("Websocket error while handling request %s", params['request']['url']) 

1681 

1682 def _handle_console(self, type, args=None, stackTrace=None, **kw): # pylint: disable=redefined-builtin 

1683 # console formatting differs somewhat from Python's, if args[0] has 

1684 # format modifiers that many of args[1:] get formatted in, missing 

1685 # args are replaced by empty strings and extra args are concatenated 

1686 # (space-separated) 

1687 # 

1688 # current version modifies the args in place which could and should 

1689 # probably be improved 

1690 if args: 

1691 arg0, args = str(self._from_remoteobject(args[0])), args[1:] 

1692 else: 

1693 arg0, args = '', [] 

1694 formatted = [re.sub(r'%[%sdfoOc]', self.console_formatter(args), arg0)] 

1695 # formatter consumes args it uses, leaves unformatted args untouched 

1696 formatted.extend(str(self._from_remoteobject(arg)) for arg in args) 

1697 message = ' '.join(formatted) 

1698 stack = ''.join(self._format_stack({'type': type, 'stackTrace': stackTrace})) 

1699 if stack: 

1700 message += '\n' + stack 

1701 

1702 log_type = type 

1703 _logger = self._logger.getChild('browser') 

1704 if self._result.done() and IGNORED_MSGS(message): 

1705 log_type = 'dir' 

1706 _logger.log( 

1707 self._TO_LEVEL.get(log_type, logging.INFO), 

1708 "%s%s", 

1709 "Error received after termination: " if self._result.done() else "", 

1710 message # might still have %<x> characters 

1711 ) 

1712 

1713 if log_type == 'error': 

1714 self.had_failure = True 

1715 if self._result.done(): 

1716 return 

1717 if not self.error_checker or self.error_checker(message): 

1718 self.take_screenshot() 

1719 try: 

1720 self._result.set_exception(ChromeBrowserException(message)) 

1721 except CancelledError: 

1722 ... 

1723 except InvalidStateError: 

1724 self._logger.warning( 

1725 "Trying to set result to failed (%s) but found the future settled (%s)", 

1726 message, self._result 

1727 ) 

1728 elif message == self.success_signal: 

1729 @run 

1730 def _get_heap(): 

1731 yield self._websocket_send("HeapProfiler.collectGarbage", with_future=True) 

1732 r = yield self._websocket_send("Runtime.getHeapUsage", with_future=True) 

1733 _logger.info("heap %d (allocated %d)", r['usedSize'], r['totalSize']) 

1734 

1735 @run 

1736 def _check_form(): 

1737 node_id = 0 

1738 

1739 with contextlib.suppress(Exception): 

1740 d = yield self._websocket_send('DOM.getDocument', params={'depth': 0}, with_future=True) 

1741 form = yield self._websocket_send("DOM.querySelector", params={ 

1742 'nodeId': d['root']['nodeId'], 

1743 'selector': '.o_form_dirty', 

1744 }, with_future=True) 

1745 node_id = form['nodeId'] 

1746 

1747 if node_id: 

1748 self.take_screenshot("unsaved_form_") 

1749 msg = """\ 

1750Tour finished with a dirty form view being open. 

1751 

1752Dirty form views are automatically saved when the page is closed, \ 

1753which leads to stray network requests and inconsistencies.""" 

1754 if self._result.done(): 

1755 _logger.error("%s", msg) 

1756 else: 

1757 self._result.set_exception(ChromeBrowserException(msg)) 

1758 return 

1759 

1760 if not self._result.done(): 

1761 self._result.set_result(True) 

1762 elif self._result.exception() is None: 

1763 _logger.error("Tried to make the tour successful twice.") 

1764 

1765 

1766 def _handle_exception(self, exceptionDetails, timestamp): 

1767 message = exceptionDetails['text'] 

1768 exception = exceptionDetails.get('exception') 

1769 if exception: 

1770 message += str(self._from_remoteobject(exception)) 

1771 exceptionDetails['type'] = 'trace' # fake this so _format_stack works 

1772 stack = ''.join(self._format_stack(exceptionDetails)) 

1773 if stack: 

1774 message += '\n' + stack 

1775 

1776 if self._result.done(): 

1777 if not IGNORED_MSGS(message): 

1778 self._logger.getChild('browser').error( 

1779 "Exception received after termination: %s", message) 

1780 return 

1781 

1782 self.take_screenshot() 

1783 try: 

1784 self._result.set_exception(ChromeBrowserException(message)) 

1785 except CancelledError: 

1786 ... 

1787 except InvalidStateError: 

1788 self._logger.warning( 

1789 "Trying to set result to failed (%s) but found the future settled (%s)", 

1790 message, self._result 

1791 ) 

1792 

1793 def _handle_frame_stopped_loading(self, frameId): 

1794 wait = self._frames.pop(frameId, None) 

1795 if wait: 

1796 wait() 

1797 

1798 _TO_LEVEL = { 

1799 'debug': logging.DEBUG, 

1800 'log': logging.INFO, 

1801 'info': logging.INFO, 

1802 'warning': logging.WARNING, 

1803 'error': logging.ERROR, 

1804 'dir': logging.RUNBOT, 

1805 # TODO: what do with 

1806 # dir, dirxml, table, trace, clear, startGroup, startGroupCollapsed, 

1807 # endGroup, assert, profile, profileEnd, count, timeEnd 

1808 } 

1809 

1810 def take_screenshot(self, prefix='sc_') -> Future[dict]: 

1811 def handler(f): 

1812 try: 

1813 base_png = f.result(timeout=0)['data'] 

1814 except Exception as e: 

1815 self._logger.runbot("Couldn't capture screenshot: %s", e) 

1816 return 

1817 if not base_png: 

1818 self._logger.runbot("Couldn't capture screenshot: expected image data, got %r", base_png) 

1819 return 

1820 decoded = binascii.a2b_base64(base_png) 

1821 save_test_file(type(self.test_case).__name__, decoded, prefix, logger=self._logger) 

1822 

1823 self._logger.info('Asking for screenshot') 

1824 f = self._websocket_send('Page.captureScreenshot', with_future=True) 

1825 if f: 

1826 f.add_done_callback(handler) 

1827 return f 

1828 

1829 def set_cookie(self, name, value, path, domain): 

1830 params = {'name': name, 'value': value, 'path': path, 'domain': domain} 

1831 self._websocket_request('Network.setCookie', params=params) 

1832 

1833 def delete_cookie(self, name, **kwargs): 

1834 params = {k: v for k, v in kwargs.items() if k in ['url', 'domain', 'path']} 

1835 params['name'] = name 

1836 self._websocket_request('Network.deleteCookies', params=params) 

1837 

1838 def _wait_ready(self, ready_code=None, timeout=60): 

1839 timeout *= self.throttling_factor 

1840 ready_code = ready_code or "document.readyState === 'complete'" 

1841 self._logger.info('Evaluate ready code "%s"', ready_code) 

1842 start_time = time.time() 

1843 result = None 

1844 while True: 

1845 taken = time.time() - start_time 

1846 if taken > timeout: 

1847 break 

1848 

1849 result = self._websocket_request('Runtime.evaluate', params={ 

1850 'expression': "try { %s } catch {}" % ready_code, 

1851 'awaitPromise': True, 

1852 }, timeout=timeout-taken)['result'] 

1853 

1854 if result == {'type': 'boolean', 'value': True}: 

1855 time_to_ready = time.time() - start_time 

1856 if taken > 2: 

1857 self._logger.info('The ready code tooks too much time : %s', time_to_ready) 

1858 return True 

1859 

1860 self.take_screenshot(prefix='sc_failed_ready_') 

1861 self._logger.info('Ready code last try result: %s', result) 

1862 return False 

1863 

1864 def _wait_code_ok(self, code, timeout, error_checker=None): 

1865 timeout *= self.throttling_factor 

1866 self.error_checker = error_checker 

1867 self._logger.info('Evaluate test code "%s"', code) 

1868 start = time.time() 

1869 res = self._websocket_request('Runtime.evaluate', params={ 

1870 'expression': code, 

1871 'awaitPromise': True, 

1872 }, timeout=timeout)['result'] 

1873 if res.get('subtype') == 'error': 

1874 raise ChromeBrowserException("Running code returned an error: %s" % res) 

1875 

1876 err = ChromeBrowserException("failed") 

1877 try: 

1878 # if the runcode was a promise which took some time to execute, 

1879 # discount that from the timeout 

1880 if self._result.result(time.time() - start + timeout) and not self.had_failure: 

1881 return 

1882 except CancelledError: 

1883 # regular-ish shutdown 

1884 return 

1885 except ChromeBrowserException: 

1886 self.screencaster.save() 

1887 raise 

1888 except Exception as e: 

1889 err = e 

1890 

1891 self.take_screenshot() 

1892 self.screencaster.save() 

1893 

1894 if isinstance(err, concurrent.futures.TimeoutError): 

1895 raise ChromeBrowserException('Script timeout exceeded') from err 

1896 raise ChromeBrowserException("Unknown error") from err 

1897 

1898 def navigate_to(self, url, wait_stop=False): 

1899 self._logger.info('Navigating to: "%s"', url) 

1900 nav_result = self._websocket_request('Page.navigate', params={'url': url}, timeout=20.0) 

1901 self._logger.info("Navigation result: %s", nav_result) 

1902 if wait_stop: 

1903 frame_id = nav_result['frameId'] 

1904 e = threading.Event() 

1905 self._frames[frame_id] = e.set 

1906 self._logger.info('Waiting for frame %r to stop loading', frame_id) 

1907 e.wait(10) 

1908 

1909 def _from_remoteobject(self, arg): 

1910 """ attempts to make a CDT RemoteObject comprehensible 

1911 """ 

1912 objtype = arg['type'] 

1913 subtype = arg.get('subtype') 

1914 if objtype == 'undefined': 

1915 # the undefined remoteobject is literally just {type: undefined}... 

1916 return 'undefined' 

1917 elif objtype != 'object' or subtype not in (None, 'array'): 

1918 # value is the json representation for json object 

1919 # otherwise fallback on the description which is "a string 

1920 # representation of the object" e.g. the traceback for errors, the 

1921 # source for functions, ... finally fallback on the entire arg mess 

1922 return arg.get('value', arg.get('description', arg)) 

1923 elif subtype == 'array': 

1924 # apparently value is *not* the JSON representation for arrays 

1925 # instead it's just Array(3) which is useless, however the preview 

1926 # properties are the same as object which is useful (just ignore the 

1927 # name which is the index) 

1928 return '[%s]' % ', '.join( 

1929 repr(p['value']) if p['type'] == 'string' else str(p['value']) 

1930 for p in arg.get('preview', {}).get('properties', []) 

1931 if re.match(r'\d+', p['name']) 

1932 ) 

1933 # all that's left is type=object, subtype=None aka custom or 

1934 # non-standard objects, print as TypeName(param=val, ...), sadly because 

1935 # of the way Odoo widgets are created they all appear as Class(...) 

1936 # nb: preview properties are *not* recursive, the value is *all* we get 

1937 return '%s(%s)' % ( 

1938 arg.get('className') or 'object', 

1939 ', '.join( 

1940 '%s=%s' % (p['name'], repr(p['value']) if p['type'] == 'string' else p['value']) 

1941 for p in arg.get('preview', {}).get('properties', []) 

1942 if p.get('value') is not None 

1943 ) 

1944 ) 

1945 

1946 LINE_PATTERN = '\tat %(functionName)s (%(url)s:%(lineNumber)d:%(columnNumber)d)\n' 

1947 def _format_stack(self, logrecord): 

1948 if logrecord['type'] not in ['trace']: 

1949 return 

1950 

1951 trace = logrecord.get('stackTrace') 

1952 while trace: 

1953 for f in trace['callFrames']: 

1954 yield self.LINE_PATTERN % f 

1955 trace = trace.get('parent') 

1956 

1957 def console_formatter(self, args): 

1958 """ Formats similarly to the console API: 

1959 

1960 * if there are no args, don't format (return string as-is) 

1961 * %% -> % 

1962 * %c -> replace by styling directives (ignore for us) 

1963 * other known formatters -> replace by corresponding argument 

1964 * leftover known formatters (args exhausted) -> replace by empty string 

1965 * unknown formatters -> return as-is 

1966 """ 

1967 if not args: 

1968 return lambda m: m[0] 

1969 

1970 def replacer(m): 

1971 fmt = m[0][1] 

1972 if fmt == '%': 

1973 return '%' 

1974 if fmt in 'sdfoOc': 

1975 if not args: 

1976 return '' 

1977 repl = args.pop(0) 

1978 if fmt == 'c': 

1979 return '' 

1980 return str(self._from_remoteobject(repl)) 

1981 return m[0] 

1982 return replacer 

1983 

1984class NoScreencast: 

1985 def start(self): 

1986 pass 

1987 

1988 def stop(self): 

1989 pass 

1990 

1991 def save(self): 

1992 pass 

1993 

1994 def __call__(self, sessionId, data, metadata): 

1995 pass 

1996 

1997 

1998class Screencaster: 

1999 def __init__(self, browser: ChromeBrowser, directory: str): 

2000 self.stopped = False 

2001 self.browser: ChromeBrowser = browser 

2002 self._logger: logging.Logger = browser._logger 

2003 self.directory = pathlib.Path(directory, get_db_name(), 'screencasts') 

2004 ts = datetime.now() 

2005 self.frames_dir = self.directory / f'frames-{ts:%Y%m%dT%H%M%S.%f}' 

2006 self.frames_dir.mkdir(parents=True, exist_ok=True) 

2007 self.frames = [] 

2008 

2009 def start(self): 

2010 self._logger.info('Starting screencast') 

2011 self.browser._websocket_send('Page.startScreencast') 

2012 

2013 def __call__(self, sessionId, data, metadata): 

2014 self.browser._websocket_send('Page.screencastFrameAck', params={'sessionId': sessionId}) 

2015 if self.stopped: 

2016 # if already stopped, drop the frames as we might have removed the directory already 

2017 return 

2018 outfile = self.frames_dir / f'frame_{len(self.frames):05d}.png' 

2019 try: 

2020 outfile.write_bytes(binascii.a2b_base64(data.encode())) 

2021 except FileNotFoundError: 

2022 return 

2023 self.frames.append({ 

2024 'file_path': outfile, 

2025 'timestamp': metadata.get('timestamp') 

2026 }) 

2027 

2028 def stop(self): 

2029 self.browser._websocket_send('Page.stopScreencast') 

2030 self.stopped = True 

2031 if self.frames_dir.is_dir(): 

2032 shutil.rmtree(self.frames_dir, ignore_errors=True) 

2033 

2034 def save(self): 

2035 if self.stopped: 

2036 return 

2037 self.browser._websocket_send('Page.stopScreencast') 

2038 # Wait for frames just in case, ideally we'd wait for the Browse.close 

2039 # event or something but that doesn't exist. 

2040 time.sleep(5) 

2041 self.stopped = True 

2042 if not self.frames: 

2043 self._logger.debug('No screencast frames to encode') 

2044 return 

2045 

2046 frames, self.frames = self.frames, [] 

2047 t = time.time() 

2048 duration = 1/24 

2049 concat_script_path = self.frames_dir.with_suffix('.txt') 

2050 with concat_script_path.open("w") as concat_file: 

2051 for f, next_frame in zip_longest(frames, islice(frames, 1, None)): 

2052 frame_file_path = f['file_path'] 

2053 

2054 if f['timestamp'] is not None: 

2055 end_time = next_frame['timestamp'] if next_frame else t 

2056 duration = end_time - f['timestamp'] 

2057 concat_file.write(f"file '{frame_file_path}'\nduration {duration}\n") 

2058 concat_file.write(f"file '{frame_file_path}'") # needed by the concat plugin 

2059 

2060 try: 

2061 ffmpeg_path = find_in_path('ffmpeg') 

2062 except IOError: 

2063 self._logger.runbot('Screencast frames in: %s', self.frames_dir) 

2064 return 

2065 

2066 outfile = self.frames_dir.with_suffix('.mp4') 

2067 try: 

2068 subprocess.run([ 

2069 ffmpeg_path, 

2070 '-y', '-loglevel', 'warning', 

2071 '-f', 'concat', '-safe', '0', '-i', concat_script_path, 

2072 '-vf', 'pad=ceil(iw/2)*2:ceil(ih/2)*2', 

2073 '-c:v', 'libx265', '-x265-params', 'lossless=1', 

2074 outfile, 

2075 ], preexec_fn=_preexec, check=True) 

2076 except subprocess.CalledProcessError: 

2077 self._logger.error('Failed to encode screencast, screencast frames in %s', self.frames_dir) 

2078 else: 

2079 concat_script_path.unlink() 

2080 shutil.rmtree(self.frames_dir, ignore_errors=True) 

2081 self._logger.runbot('Screencast in: %s', outfile) 

2082 

2083 

2084@lru_cache(1) 

2085def _find_executable(): 

2086 system = platform.system() 

2087 if system == 'Linux': 

2088 for bin_ in ['google-chrome', 'chromium', 'chromium-browser', 'google-chrome-stable']: 

2089 try: 

2090 return find_in_path(bin_) 

2091 except IOError: 

2092 continue 

2093 

2094 elif system == 'Darwin': 

2095 bins = [ 

2096 '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', 

2097 '/Applications/Chromium.app/Contents/MacOS/Chromium', 

2098 ] 

2099 for bin_ in bins: 

2100 if os.path.exists(bin_): 

2101 return bin_ 

2102 

2103 elif system == 'Windows': 

2104 bins = [ 

2105 '%ProgramFiles%\\Google\\Chrome\\Application\\chrome.exe', 

2106 '%ProgramFiles(x86)%\\Google\\Chrome\\Application\\chrome.exe', 

2107 '%LocalAppData%\\Google\\Chrome\\Application\\chrome.exe', 

2108 ] 

2109 for bin_ in bins: 

2110 bin_ = os.path.expandvars(bin_) 

2111 if os.path.exists(bin_): 

2112 return bin_ 

2113 

2114 raise unittest.SkipTest("Chrome executable not found") 

2115 

2116class Opener(requests.Session): 

2117 """ 

2118 Flushes and clears the current transaction when starting a request. 

2119 

2120 This is likely necessary when we make a request to the server, as the 

2121 request is made with a test cursor, which uses a different cache than this 

2122 transaction. 

2123 """ 

2124 def __init__(self, http_case: HttpCase): 

2125 super().__init__() 

2126 self.test_case = http_case 

2127 self.cr = http_case.cr 

2128 

2129 def request(self, *args, **kwargs): 

2130 assert self.test_case.opener == self 

2131 self.cr.flush() 

2132 self.cr.clear() 

2133 with self.test_case.allow_requests(): 

2134 return super().request(*args, **kwargs) 

2135 

2136 

2137class Transport(xmlrpclib.Transport): 

2138 """ see :class:`Opener` """ 

2139 def __init__(self, http_case: HttpCase): 

2140 self.test_case = http_case 

2141 self.cr = http_case.cr 

2142 super().__init__() 

2143 

2144 def request(self, *args, **kwargs): 

2145 self.cr.flush() 

2146 self.cr.clear() 

2147 with self.test_case.allow_requests(all_requests=True): 

2148 return super().request(*args, **kwargs) 

2149 

2150 

2151class JsonRpcException(Exception): 

2152 def __init__(self, code, message): 

2153 super().__init__(message) 

2154 self.code = code 

2155 

2156 

2157class HttpCase(TransactionCase): 

2158 """ Transactional HTTP TestCase with url_open and Chrome headless helpers. """ 

2159 registry_test_mode = True 

2160 browser = None 

2161 browser_size = '1366x768' 

2162 touch_enabled = False 

2163 session: odoo.http.Session = None 

2164 

2165 _logger: logging.Logger = None 

2166 

2167 @classmethod 

2168 def setUpClass(cls): 

2169 super().setUpClass() 

2170 if cls.registry_test_mode: 

2171 cls.registry_enter_test_mode_cls() 

2172 

2173 ICP = cls.env['ir.config_parameter'] 

2174 ICP.set_param('web.base.url', cls.base_url()) 

2175 ICP.env.flush_all() 

2176 # v8 api with correct xmlrpc exception handling. 

2177 cls.xmlrpc_url = f'{cls.base_url()}/xmlrpc/2/' 

2178 cls._logger = logging.getLogger('%s.%s' % (cls.__module__, cls.__name__)) 

2179 

2180 @classmethod 

2181 def base_url(cls): 

2182 return f"http://{HOST}:{cls.http_port():d}" 

2183 

2184 @classmethod 

2185 def http_port(cls): 

2186 if odoo.service.server.server is None: 

2187 return None 

2188 return odoo.service.server.server.httpd.server_port 

2189 

2190 def setUp(self): 

2191 super().setUp() 

2192 

2193 self._logger = self._logger.getChild(self._testMethodName) 

2194 

2195 self.xmlrpc_common = xmlrpclib.ServerProxy(self.xmlrpc_url + 'common', transport=Transport(self)) 

2196 self.xmlrpc_db = xmlrpclib.ServerProxy(self.xmlrpc_url + 'db', transport=Transport(self)) 

2197 self.xmlrpc_object = xmlrpclib.ServerProxy(self.xmlrpc_url + 'object', transport=Transport(self), use_datetime=True) 

2198 # setup an url opener helper 

2199 self.opener = Opener(self) 

2200 self.http_key_sequence = itertools.count() 

2201 # we need to allow requests during pdf rendering. 

2202 old_run_wkhtmltopdf = ir_actions_report._run_wkhtmltopdf 

2203 

2204 def _patched_run_wkhtmltopdf(args): 

2205 with patch.object(self, 'http_request_key', 'wkhtmltopdf'), release_test_lock(): 

2206 args = ['--cookie', TEST_CURSOR_COOKIE_NAME, 'wkhtmltopdf', *args] 

2207 return old_run_wkhtmltopdf(args) 

2208 

2209 self.startPatcher( 

2210 patch.object(ir_actions_report, '_run_wkhtmltopdf', _patched_run_wkhtmltopdf), 

2211 ) 

2212 

2213 @contextmanager 

2214 def enter_registry_test_mode(self): 

2215 _logger.warning("HTTPCase is already in test mode") 

2216 yield 

2217 

2218 @contextmanager 

2219 def allow_pdf_render(self): 

2220 _logger.warning("HTTPCase does not require calling allow_pdf_render") 

2221 yield 

2222 

2223 @contextmanager 

2224 def allow_requests(self, browser: ChromeBrowser | None = None, all_requests=False): 

2225 """ 

2226 Allows HTTP requests for the scope of the context. 

2227 

2228 Params: 

2229 browser (ChromeBrowser | None): if given, add the cookie to the browser. 

2230 all_requests (bool): if True, allows all requests regardless of cookie. 

2231 """ 

2232 with ExitStack() as defer: 

2233 defer.enter_context(release_test_lock()) 

2234 if all_requests: 

2235 self.http_request_allow_all = True 

2236 new_key = f'{self.canonical_tag}__{next(self.http_key_sequence)}' 

2237 defer.enter_context(patch.object(self, 'http_request_key', new_key)) 

2238 old_cookie = self.opener.cookies.get(TEST_CURSOR_COOKIE_NAME) 

2239 if old_cookie: 

2240 defer.callback(self.opener.cookies.set, TEST_CURSOR_COOKIE_NAME, old_cookie) 

2241 else: 

2242 defer.callback(self.opener.cookies.pop, TEST_CURSOR_COOKIE_NAME, None) 

2243 self.opener.cookies[TEST_CURSOR_COOKIE_NAME] = new_key 

2244 if browser: 

2245 browser.set_cookie( 

2246 TEST_CURSOR_COOKIE_NAME, self.http_request_key, '/', HOST, 

2247 ) 

2248 yield 

2249 

2250 def parse_http_location(self, location): 

2251 """ Parse a Location http header typically found in 201/3xx 

2252 responses, return the corresponding parsed url object. The scheme/host 

2253 are taken from ``base_url()`` in case they are missing from the 

2254 header. 

2255 """ 

2256 if not location: 

2257 return urlsplit('') 

2258 s = urlsplit(urljoin(self.base_url(), location)) 

2259 # normalise query parameters 

2260 return s._replace(query=urlencode(parse_qsl(s.query))) 

2261 

2262 def assertURLEqual(self, test_url, truth_url, message=None): 

2263 """ Assert that two URLs are equivalent. If any URL is missing 

2264 a scheme and/or host, assume the same scheme/host as base_url() 

2265 """ 

2266 self.assertEqual( 

2267 self.parse_http_location(test_url), 

2268 self.parse_http_location(truth_url), 

2269 message, 

2270 ) 

2271 

2272 def build_rpc_payload(self, params=None): 

2273 """ 

2274 Helper to properly build jsonrpc payload 

2275 """ 

2276 return { 

2277 "jsonrpc": "2.0", 

2278 "method": "call", 

2279 "id": str(uuid4()), 

2280 "params": params or {}, 

2281 } 

2282 

2283 def url_open(self, url, data=None, files=None, timeout=12, headers=None, json=None, params=None, allow_redirects=True, cookies=None, method: str | None = None): 

2284 if not method and (data or files or json): 

2285 method = 'POST' 

2286 method = method or 'GET' 

2287 if url.startswith('/'): 

2288 url = self.base_url() + url 

2289 return self.opener.request(method, url, params=params, data=data, json=json, files=files, timeout=timeout, headers=headers, cookies=cookies, allow_redirects=allow_redirects) 

2290 

2291 def _wait_remaining_requests(self, timeout=10): 

2292 

2293 def get_http_request_threads(): 

2294 return [t for t in threading.enumerate() if t.name.startswith('odoo.service.http.request.')] 

2295 

2296 start_time = time.time() 

2297 request_threads = get_http_request_threads() 

2298 if not request_threads: 

2299 return 

2300 

2301 self._logger.info('waiting for threads: %s', request_threads) 

2302 

2303 for thread in request_threads: 

2304 thread.join(timeout - (time.time() - start_time)) 

2305 

2306 request_threads = get_http_request_threads() 

2307 for thread in request_threads: 

2308 self._logger.info("Stop waiting for thread %s handling request for url %s", 

2309 thread.name, getattr(thread, 'url', '<UNKNOWN>')) 

2310 

2311 if request_threads: 

2312 self._logger.info('remaining requests') 

2313 odoo.tools.misc.dumpstacks() 

2314 

2315 def logout(self, keep_db=True): 

2316 self.session.logout(keep_db=keep_db) 

2317 odoo.http.root.session_store.save(self.session) 

2318 

2319 def authenticate(self, user, password, *, 

2320 browser: ChromeBrowser = None, session_extra: dict | None = None): 

2321 if getattr(self, 'session', None): 

2322 odoo.http.root.session_store.delete(self.session) 

2323 

2324 self.session = session = odoo.http.root.session_store.new() 

2325 session.update( 

2326 odoo.http.get_default_session(), 

2327 db=get_db_name(), 

2328 # In order to avoid perform a query to each first `url_open` 

2329 # in a test (insert `res.device.log`). 

2330 _trace_disable=True, 

2331 ) 

2332 session.context['lang'] = odoo.http.DEFAULT_LANG 

2333 

2334 if session_extra: 

2335 if extra_ctx := session_extra.pop('context', None): 

2336 session.context.update(extra_ctx) 

2337 session.update(session_extra) 

2338 

2339 if user: # if authenticated 

2340 # Flush and clear the current transaction. This is useful, because 

2341 # the call below opens a test cursor, which uses a different cache 

2342 # than this transaction. 

2343 self.cr.flush() 

2344 self.cr.clear() 

2345 

2346 def patched_check_credentials(self, credential, env): 

2347 return {'uid': self.id, 'auth_method': 'password', 'mfa': 'default'} 

2348 

2349 # patching to speedup the check in case the password is hashed with many hashround + avoid to update the password 

2350 with patch('odoo.addons.base.models.res_users.ResUsersPatchedInTest._check_credentials', new=patched_check_credentials): 

2351 credential = {'login': user, 'password': password, 'type': 'password'} 

2352 auth_info = self.env['res.users'].authenticate(credential, {'interactive': False}) 

2353 uid = auth_info['uid'] 

2354 env = api.Environment(self.cr, uid, {}) 

2355 session.uid = uid 

2356 session.login = user 

2357 session.session_token = uid and security.compute_session_token(session, env) 

2358 session.context = dict(env['res.users'].context_get()) 

2359 

2360 odoo.http.root.session_store.save(session) 

2361 # Reset the opener: turns out when we set cookies['foo'] we're really 

2362 # setting a cookie on domain='' path='/'. 

2363 # 

2364 # But then our friendly neighborhood server might set a cookie for 

2365 # domain='localhost' path='/' (with the same value) which is considered 

2366 # a *different* cookie following ours rather than the same. 

2367 # 

2368 # When we update our cookie, it's done in-place, so the server-set 

2369 # cookie is still present and (as it follows ours and is more precise) 

2370 # very likely to still be used, therefore our session change is ignored. 

2371 # 

2372 # An alternative would be to set the cookie to None (unsetting it 

2373 # completely) or clear-ing session.cookies. 

2374 self.opener = Opener(self) 

2375 self.opener.cookies.set("session_id", session.sid, domain=HOST) 

2376 if browser: 

2377 self._logger.info('Setting session cookie in browser') 

2378 browser.set_cookie('session_id', session.sid, '/', HOST) 

2379 

2380 return session 

2381 

2382 def fetch_proxy(self, url): 

2383 """ 

2384 This method is called every time a request is made from the chrome browser outside the local network 

2385 Returns a response that will be sent to the browser to simulate the external request. 

2386 """ 

2387 

2388 if 'https://fonts.googleapis.com/css' in url: 

2389 _logger.info('External chrome request during tests: Return empty file for %s', url) 

2390 return self.make_fetch_proxy_response('') # return empty css file, we don't care 

2391 

2392 _logger.info('External chrome request during tests: returning 404 for %s', url) 

2393 return { 

2394 'body': '', 

2395 'responseCode': 404, 

2396 'responseHeaders': [], 

2397 } 

2398 

2399 def make_fetch_proxy_response(self, content, code=200): 

2400 if isinstance(content, str): 

2401 content = content.encode() 

2402 return { 

2403 'body': base64.b64encode(content).decode(), 

2404 'responseCode': code, 

2405 'responseHeaders': [ 

2406 {'name': 'access-control-allow-origin', 'value': '*'}, 

2407 {'name': 'cache-control', 'value': 'public, max-age=10000'}, 

2408 ], 

2409 } 

2410 

2411 def browser_js(self, url_path, code, ready='', login=None, timeout=60, cookies=None, error_checker=None, watch=False, success_signal=DEFAULT_SUCCESS_SIGNAL, debug=False, cpu_throttling=None, **kw): 

2412 """ Test JavaScript code running in the browser. 

2413 

2414 To signal success test do: `console.log()` with the expected `success_signal`. Default is "test successful" 

2415 To signal test failure raise an exception or call `console.error` with a message. 

2416 Test will stop when a failure occurs if `error_checker` is not defined or returns `True` for this message 

2417 

2418 :param string url_path: URL path to load the browser page on 

2419 :param string code: JavaScript code to be executed 

2420 :param string ready: JavaScript object to wait for before proceeding with the test 

2421 :param string login: logged in user which will execute the test. e.g. 'admin', 'demo' 

2422 :param int timeout: maximum time to wait for the test to complete (in seconds). Default is 60 seconds 

2423 :param dict cookies: dictionary of cookies to set before loading the page 

2424 :param error_checker: function to filter failures out. 

2425 If provided, the function is called with the error log message, and if it returns `False` the log is ignored and the test continue 

2426 If not provided, every error log triggers a failure 

2427 :param bool watch: open a new browser window to watch the test execution 

2428 :param string success_signal: string signal to wait for to consider the test successful 

2429 :param bool debug: automatically open a fullscreen Chrome window with opened devtools and a debugger breakpoint set at the start of the tour. 

2430 The tour is ran with the `debug=assets` query parameter. When an error is thrown, the debugger stops on the exception. 

2431 :param int cpu_throttling: CPU throttling rate as a slowdown factor (1 is no throttle, 2 is 2x slowdown, etc) 

2432 """ 

2433 if not self.env.registry.loaded: 

2434 self._logger.warning('HttpCase test should be in post_install only') 

2435 

2436 # increase timeout if coverage is running 

2437 if any(f.filename.endswith('/coverage/execfile.py') for f in inspect.stack() if f.filename): 

2438 timeout = timeout * 1.5 

2439 

2440 if debug is not False: 

2441 watch = True 

2442 timeout = 1e6 

2443 if watch: 

2444 self._logger.warning('watch mode is only suitable for local testing') 

2445 

2446 browser = ChromeBrowser(self, headless=not watch, success_signal=success_signal, debug=debug) 

2447 with self.allow_requests(browser=browser), contextlib.ExitStack() as atexit: 

2448 atexit.callback(self._wait_remaining_requests) 

2449 if "bus.bus" in self.env.registry: 

2450 from odoo.addons.bus.websocket import CloseCode, _kick_all, WebsocketConnectionHandler # noqa: PLC0415 

2451 from odoo.addons.bus.models.bus import BusBus # noqa: PLC0415 

2452 

2453 atexit.callback(_kick_all, CloseCode.KILL_NOW) 

2454 original_send_one = BusBus._sendone 

2455 

2456 def sendone_wrapper(self, target, notification_type, message): 

2457 original_send_one(self, target, notification_type, message) 

2458 self.env.cr.precommit.run() # Trigger the creation of bus.bus records 

2459 self.env.cr.postcommit.run() # Trigger notification dispatching 

2460 

2461 atexit.enter_context(patch.object(BusBus, "_sendone", sendone_wrapper)) 

2462 atexit.enter_context(patch.object( 

2463 WebsocketConnectionHandler, "websocket_allowed", return_value=True 

2464 )) 

2465 

2466 self.authenticate(login, login, browser=browser) 

2467 # Flush and clear the current transaction. This is useful in case 

2468 # we make requests to the server, as these requests are made with 

2469 # test cursors, which uses different caches than this transaction. 

2470 self.cr.flush() 

2471 self.cr.clear() 

2472 url = urljoin(self.base_url(), url_path) 

2473 if watch: 

2474 parsed = urlsplit(url) 

2475 qs = dict(parse_qsl(parsed.query)) 

2476 qs['watch'] = '1' 

2477 if debug is not False: 

2478 qs['debug'] = "assets" 

2479 url = urlunsplit(parsed._replace(query=urlencode(qs))) 

2480 self._logger.info('Open "%s" in browser', url) 

2481 

2482 browser.screencaster.start() 

2483 if cookies: 

2484 for name, value in cookies.items(): 

2485 browser.set_cookie(name, value, '/', HOST) 

2486 

2487 cpu_throttling_os = os.environ.get('ODOO_BROWSER_CPU_THROTTLING') # used by dedicated runbot builds 

2488 cpu_throttling = int(cpu_throttling_os) if cpu_throttling_os else cpu_throttling 

2489 

2490 if cpu_throttling: 

2491 _logger.log( 

2492 logging.INFO if cpu_throttling_os else logging.WARNING, 

2493 'CPU throttling mode is only suitable for local testing - ' 

2494 'Throttling browser CPU to %sx slowdown and extending timeout to %s sec', cpu_throttling, timeout) 

2495 browser.throttle(cpu_throttling) 

2496 

2497 browser.navigate_to(url, wait_stop=not bool(ready)) 

2498 atexit.callback(browser.stop) 

2499 

2500 # Needed because tests like test01.js (qunit tests) are passing a ready 

2501 # code = "" 

2502 self.assertTrue(browser._wait_ready(ready), 'The ready "%s" code was always falsy' % ready) 

2503 

2504 error = False 

2505 try: 

2506 browser._wait_code_ok(code, timeout, error_checker=error_checker) 

2507 except ChromeBrowserException as chrome_browser_exception: 

2508 error = chrome_browser_exception 

2509 if error: # dont keep initial traceback, keep that outside of except 

2510 if code: 

2511 message = 'The test code "%s" failed' % code 

2512 else: 

2513 message = "Some js test failed" 

2514 self.fail('%s\n\n%s' % (message, error)) 

2515 

2516 def start_tour(self, url_path, tour_name, step_delay=None, **kwargs): 

2517 """Wrapper for `browser_js` to start the given `tour_name` with the 

2518 optional delay between steps `step_delay`. Other arguments from 

2519 `browser_js` can be passed as keyword arguments.""" 

2520 options = { 

2521 'stepDelay': step_delay or 0, 

2522 'keepWatchBrowser': kwargs.get('watch', False), 

2523 'debug': kwargs.get('debug', False), 

2524 'startUrl': url_path, 

2525 'delayToCheckUndeterminisms': kwargs.pop('delay_to_check_undeterminisms', int(os.getenv("ODOO_TOUR_DELAY_TO_CHECK_UNDETERMINISMS", "0")) or 0), 

2526 } 

2527 code = kwargs.pop('code', f"odoo.startTour({tour_name!r}, {json.dumps(options)})") 

2528 ready = kwargs.pop('ready', f"odoo.isTourReady({tour_name!r})") 

2529 timeout = kwargs.pop('timeout', 60) 

2530 

2531 if step_delay is not None: 

2532 self._logger.warning('step_delay is only suitable for local testing') 

2533 if options["delayToCheckUndeterminisms"] > 0: 

2534 timeout = timeout + 1000 * options["delayToCheckUndeterminisms"] 

2535 _logger.runbot("Tour %s is launched with mode: check for undeterminisms.", tour_name) 

2536 Users = self.registry['res.users'] 

2537 

2538 def setup(_): 

2539 Users.tour_enabled = False 

2540 

2541 with patch.object(Users, 'tour_enabled', False),\ 

2542 patch.object(Users, '_post_model_setup__', setup),\ 

2543 patch.object(Users, '_compute_tour_enabled', lambda _: None): 

2544 self.browser_js(url_path=url_path, code=code, ready=ready, timeout=timeout, success_signal="tour succeeded", **kwargs) 

2545 

2546 def profile(self, **kwargs): 

2547 """ 

2548 for http_case, also patch _get_profiler_context_manager in order to profile all requests 

2549 """ 

2550 sup = super() 

2551 _profiler = sup.profile(**kwargs) 

2552 def route_profiler(request): 

2553 _route_profiler = sup.profile(description=request.httprequest.full_path, db=_profiler.db) 

2554 _profiler.sub_profilers.append(_route_profiler) 

2555 return _route_profiler 

2556 return profiler.Nested(_profiler, patch('odoo.http.Request._get_profiler_context_manager', route_profiler)) 

2557 

2558 def get_method_additional_tags(self, test_method): 

2559 """ 

2560 guess if the test_methods is a tour and adds an `is_tour` tag on the test 

2561 """ 

2562 additional_tags = super().get_method_additional_tags(test_method) 

2563 if odoo.tools.config['test_tags'] and 'is_tour' in odoo.tools.config['test_tags']: 2563 ↛ 2564line 2563 didn't jump to line 2564 because the condition on line 2563 was never true

2564 method_source = inspect.getsource(test_method) 

2565 if 'self.start_tour' in method_source: 

2566 additional_tags.append('is_tour') 

2567 return additional_tags 

2568 

2569 def make_jsonrpc_request(self, route, params=None, headers=None, cookies=None, timeout=12): 

2570 """Make a JSON-RPC request to the server. 

2571 

2572 :raises requests.HTTPError: if one occurred 

2573 :raises JsonRpcException: if the response contains an error 

2574 """ 

2575 response = self.opener.post(urljoin(self.base_url(), route), json={ 

2576 'id': 0, 

2577 'jsonrpc': '2.0', 

2578 'method': 'call', 

2579 'params': params or {}, 

2580 }, headers=headers, cookies=cookies, timeout=timeout) 

2581 response.raise_for_status() 

2582 decoded_response = response.json() 

2583 if 'error' in decoded_response: 

2584 raise JsonRpcException( 

2585 code=decoded_response['error']['code'], 

2586 message=decoded_response['error']['data']['name'] 

2587 ) 

2588 # workaround: JsonRPCDispatcher is broken and may send neither result nor error 

2589 return decoded_response.get('result') 

2590 

2591 

2592def no_retry(arg): 

2593 """Disable auto retry on decorated test method or test class""" 

2594 arg._retry = False 

2595 return arg 

2596 

2597 

2598def users(*logins): 

2599 """ Decorate a method to execute it once for each given user. """ 

2600 assert logins, "Expecting at least one login to execute" 

2601 

2602 def users_decorator(func, /): 

2603 @wraps(func) 

2604 def with_users(self, *args, **kwargs): 

2605 old_uid = self.uid 

2606 try: 

2607 # retrieve users 

2608 Users = self.env['res.users'].with_context(active_test=False) 

2609 user_id = { 

2610 user.login: user.id 

2611 for user in Users.search([('login', 'in', list(logins))]) 

2612 } 

2613 for login in logins: 

2614 with self.subTest(login=login): 

2615 # switch user and execute func 

2616 self.uid = user_id[login] 

2617 func(self, *args, **kwargs) 

2618 self.env.flush_all() 

2619 # Invalidate the cache between subtests, in order to not reuse 

2620 # the former user's cache (`test_read_mail`, `test_write_mail`) 

2621 self.env.invalidate_all() 

2622 finally: 

2623 self.uid = old_uid 

2624 

2625 return with_users 

2626 return users_decorator 

2627 

2628 

2629def warmup(func, /): 

2630 """ 

2631 Stabilize assertQueries and assertQueryCount assertions. 

2632 

2633 Reset the cache to a stable state by flushing pending changes and 

2634 invalidating the cache. 

2635 

2636 Warmup the ormcaches by running the decorated function an extra time 

2637 before the actual test runs. The extra execution ignores 

2638 assertQueries and assertQueryCount assertions, it also discardes all 

2639 changes but the ormcaches ones. 

2640 """ 

2641 @wraps(func) 

2642 def warmup(self, *args, **kwargs): 

2643 self.env.flush_all() 

2644 self.env.invalidate_all() 

2645 # run once to warm up the caches 

2646 self.warm = False 

2647 with contextlib.closing(self.cr.savepoint(flush=False)): 

2648 func(self, *args, **kwargs) 

2649 self.env.flush_all() 

2650 # run once for real 

2651 self.env.invalidate_all() 

2652 self.warm = True 

2653 func(self, *args, **kwargs) 

2654 return warmup 

2655 

2656 

2657def can_import(module): 

2658 """ Checks if <module> can be imported, returns ``True`` if it can be, 

2659 ``False`` otherwise. 

2660 

2661 To use with ``unittest.skipUnless`` for tests conditional on *optional* 

2662 dependencies, which may or may be present but must still be tested if 

2663 possible. 

2664 """ 

2665 try: 

2666 importlib.import_module(module) 

2667 except ImportError: 

2668 return False 

2669 else: 

2670 return True 

2671 

2672 

2673def tagged(*tags): 

2674 """A decorator to tag BaseCase objects. 

2675 

2676 Tags are stored in a set that can be accessed from a 'test_tags' attribute. 

2677 

2678 A tag prefixed by '-' will remove the tag e.g. to remove the 'standard' tag. 

2679 

2680 By default, all Test classes from odoo.tests.common have a test_tags 

2681 attribute that defaults to 'standard' and 'at_install'. 

2682 

2683 When using class inheritance, the tags ARE inherited. 

2684 """ 

2685 include = {t for t in tags if not t.startswith('-')} 

2686 exclude = {t[1:] for t in tags if t.startswith('-')} 

2687 

2688 def tags_decorator(obj): 

2689 obj.test_tags = (getattr(obj, 'test_tags', set()) | include) - exclude 

2690 at_install = 'at_install' in obj.test_tags 

2691 post_install = 'post_install' in obj.test_tags 

2692 if not (at_install ^ post_install): 2692 ↛ 2693line 2692 didn't jump to line 2693 because the condition on line 2692 was never true

2693 _logger.warning('A tests should be either at_install or post_install, which is not the case of %r', obj) 

2694 return obj 

2695 return tags_decorator 

2696 

2697 

2698class freeze_time: 

2699 """ Object to replace the freezegun in Odoo test suites 

2700 It properly handles the test classes decoration 

2701 Also, it can be used like the usual method decorator or context manager 

2702 """ 

2703 _freeze_time = staticmethod(freezegun.freeze_time) 

2704 

2705 def __init__(self, time_to_freeze=None, tz_offset=0, tick=False, as_kwarg='', auto_tick_seconds=0): 

2706 self.freezer = self._freeze_time( 

2707 time_to_freeze=time_to_freeze, 

2708 tz_offset=tz_offset, 

2709 tick=tick, 

2710 as_kwarg=as_kwarg, 

2711 auto_tick_seconds=auto_tick_seconds, 

2712 ) 

2713 

2714 def __call__(self, arg): 

2715 if isinstance(arg, type) and issubclass(arg, case.TestCase): 

2716 arg.freeze_time = self 

2717 return arg 

2718 

2719 return self.freezer(arg) 

2720 

2721 def __enter__(self): 

2722 return self.freezer.start() 

2723 

2724 def __exit__(self, *args): 

2725 self.freezer.stop() 

2726 

2727 start = __enter__ 

2728 stop = __exit__ 

2729 

2730 

2731freezegun.freeze_time = freeze_time